[GEARPUMP-217] Merge master into sql branch Author: manuzhang <[email protected]> Author: huafengw <[email protected]> Author: darionyaphet <[email protected]>
Closes #218 from manuzhang/sync_master. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/1cf87bf7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/1cf87bf7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/1cf87bf7 Branch: refs/heads/sql Commit: 1cf87bf77cac2d10614a8c3c7b0758071d4cd7cc Parents: 54686e0 Author: manuzhang <[email protected]> Authored: Sat Aug 19 19:57:23 2017 +0800 Committer: manuzhang <[email protected]> Committed: Sat Aug 19 19:57:32 2017 +0800 ---------------------------------------------------------------------- .travis.yml | 2 +- .../scala/org/apache/gearpump/Message.scala | 6 +- .../main/scala/org/apache/gearpump/Time.scala | 34 ++++ .../gearpump/cluster/ClusterMessage.scala | 20 +-- .../cluster/appmaster/ApplicationMetaData.scala | 1 - .../appmaster/ApplicationRuntimeInfo.scala | 12 +- .../cluster/client/RuntimeEnvironment.scala | 4 +- .../cluster/embedded/EmbeddedCluster.scala | 2 - .../embedded/EmbeddedRuntimeEnvironment.scala | 48 ++++++ .../org/apache/gearpump/cluster/main/Info.scala | 3 +- .../org/apache/gearpump/cluster/main/Kill.scala | 3 +- .../gearpump/cluster/main/MainRunner.scala | 3 +- .../apache/gearpump/cluster/main/Replay.scala | 3 +- .../gearpump/cluster/master/AppManager.scala | 7 +- .../cluster/master/AppMasterLauncher.scala | 2 +- .../apache/gearpump/cluster/master/Master.scala | 1 - .../gearpump/cluster/scheduler/Scheduler.scala | 4 +- .../gearpump/jarstore/FileDirective.scala | 2 +- .../apache/gearpump/jarstore/FileServer.scala | 4 +- .../scala/org/apache/gearpump/package.scala | 29 ---- .../scala/org/apache/gearpump/util/Graph.scala | 20 +-- .../gearpump/util/HistoryMetricsService.scala | 4 +- .../org/apache/gearpump/util/LogUtil.scala | 2 - .../scala/org/apache/gearpump/util/Util.scala | 2 +- .../apache/gearpump/cluster/main/MainSpec.scala | 2 +- .../gearpump/util/RestartPolicySpec.scala | 2 - .../DistServiceAppMasterSpec.scala | 2 +- .../wordcount/dsl/WindowedWordCount.scala | 6 +- .../GearpumpMaterializerSession.scala | 5 +- .../gearpump/akkastream/example/Test8.scala | 2 +- .../gearpump/akkastream/example/Test9.scala | 2 +- .../akkastream/graph/GraphPartitioner.scala | 2 +- .../gearpump/akkastream/graph/SubGraph.scala | 1 - .../materializer/RemoteMaterializerImpl.scala | 4 +- .../experiments/storm/main/GearpumpNimbus.scala | 2 +- .../producer/StormSpoutOutputCollector.scala | 8 +- .../storm/topology/GearpumpStormComponent.scala | 5 +- .../storm/topology/GearpumpTuple.scala | 6 +- .../storm/util/StormOutputCollector.scala | 9 +- .../StormBoltOutputCollectorSpec.scala | 2 - .../StormSpoutOutputCollectorSpec.scala | 2 - .../topology/GearpumpStormComponentSpec.scala | 5 +- .../storm/topology/GearpumpTupleSpec.scala | 4 +- .../storm/util/StormOutputCollectorSpec.scala | 9 +- .../experiments/yarn/glue/NMClient.scala | 2 +- .../experiments/yarn/glue/RMClient.scala | 2 +- external/hadoopfs/README.md | 2 +- .../hadoop/HadoopCheckpointStore.scala | 6 +- .../hadoop/HadoopCheckpointStoreFactory.scala | 2 - .../lib/HadoopCheckpointStoreReader.scala | 8 +- .../lib/HadoopCheckpointStoreWriter.scala | 4 +- .../lib/rotation/FileSizeRotationSpec.scala | 4 +- .../gearpump/streaming/kafka/dsl/KafkaDSL.scala | 1 - .../kafka/lib/source/AbstractKafkaSource.scala | 5 +- .../streaming/kafka/lib/store/KafkaStore.scala | 10 +- .../streaming/kafka/KafkaStoreSpec.scala | 16 +- project/BuildExamples.scala | 17 +- project/BuildExperiments.scala | 2 +- project/BuildGearpump.scala | 56 +++--- project/Dependencies.scala | 11 +- .../gearpump/services/MasterService.scala | 2 +- .../gearpump/services/StaticService.scala | 1 - .../services/AppMasterServiceSpec.scala | 4 +- .../gearpump/services/MasterServiceSpec.scala | 4 +- .../gearpump/streaming/ClusterMessage.scala | 6 +- .../gearpump/streaming/StreamApplication.scala | 13 +- .../streaming/appmaster/AppMaster.scala | 6 +- .../streaming/appmaster/ClockService.scala | 54 +++--- .../streaming/appmaster/JarScheduler.scala | 6 +- .../appmaster/StreamAppMasterSummary.scala | 8 +- .../streaming/appmaster/TaskManager.scala | 6 +- .../dsl/api/functions/FilterFunction.scala | 2 - .../dsl/api/functions/FoldFunction.scala | 2 - .../dsl/api/functions/MapFunction.scala | 2 - .../api/functions/SerializableFunction.scala | 32 ++++ .../dsl/javaapi/functions/FlatMapFunction.scala | 2 +- .../apache/gearpump/streaming/dsl/package.scala | 48 ++++++ .../apache/gearpump/streaming/dsl/plan/OP.scala | 171 ++++++++++++------- .../gearpump/streaming/dsl/plan/Planner.scala | 20 ++- .../streaming/dsl/scalaapi/Stream.scala | 1 + .../streaming/dsl/scalaapi/StreamApp.scala | 25 ++- .../scalaapi/functions/FlatMapFunction.scala | 2 +- .../functions/SerializableFunction.scala | 32 ---- .../dsl/window/api/AccumulationMode.scala | 10 ++ .../streaming/dsl/window/api/Trigger.scala | 9 + .../dsl/window/api/WindowFunction.scala | 30 +++- .../streaming/dsl/window/api/Windows.scala | 18 +- .../dsl/window/impl/ReduceFnRunner.scala | 29 ---- .../streaming/dsl/window/impl/Window.scala | 12 +- .../dsl/window/impl/WindowRunner.scala | 25 ++- .../streaming/metrics/ProcessorAggregator.scala | 12 +- .../gearpump/streaming/sink/DataSinkTask.scala | 4 + .../gearpump/streaming/source/Watermark.scala | 11 +- .../streaming/state/api/MonoidState.scala | 6 +- .../streaming/state/api/PersistentState.scala | 8 +- .../streaming/state/api/PersistentTask.scala | 7 +- .../state/impl/CheckpointManager.scala | 14 +- .../state/impl/InMemoryCheckpointStore.scala | 12 +- .../streaming/state/impl/NonWindowState.scala | 6 +- .../gearpump/streaming/state/impl/Window.scala | 10 +- .../streaming/state/impl/WindowState.scala | 16 +- .../streaming/task/SerializedMessage.scala | 4 +- .../gearpump/streaming/task/Subscription.scala | 13 +- .../apache/gearpump/streaming/task/Task.scala | 5 +- .../gearpump/streaming/task/TaskActor.scala | 9 +- .../streaming/task/TaskControlMessage.scala | 14 +- .../gearpump/streaming/task/TaskWrapper.scala | 7 +- .../transaction/api/CheckpointStore.scala | 8 +- .../transaction/api/TimeStampFilter.scala | 7 +- .../gearpump/streaming/StreamingTestUtil.scala | 1 - .../streaming/appmaster/AppMasterSpec.scala | 4 +- .../streaming/appmaster/DagManagerSpec.scala | 2 - .../streaming/appmaster/TaskManagerSpec.scala | 4 +- .../streaming/dsl/task/GroupByTaskSpec.scala | 2 +- .../streaming/dsl/task/TransformTaskSpec.scala | 1 - .../state/impl/CheckpointManagerSpec.scala | 8 +- .../state/impl/NonWindowStateSpec.scala | 8 +- .../streaming/state/impl/WindowSpec.scala | 6 +- .../streaming/state/impl/WindowStateSpec.scala | 15 +- .../streaming/task/SubscriptionSpec.scala | 6 +- 120 files changed, 728 insertions(+), 530 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index 95c1427..8148c32 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,6 @@ language: - scala -sudo: false +sudo: required jdk: - oraclejdk8 addons: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/Message.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/Message.scala b/core/src/main/scala/org/apache/gearpump/Message.scala index 4dc5c09..7051396 100644 --- a/core/src/main/scala/org/apache/gearpump/Message.scala +++ b/core/src/main/scala/org/apache/gearpump/Message.scala @@ -20,6 +20,8 @@ package org.apache.gearpump import java.time.Instant +import org.apache.gearpump.Time.MilliSeconds + trait Message { val value: Any @@ -35,7 +37,7 @@ trait Message { * * @param value Accept any type except Null, Nothing and Unit */ -case class DefaultMessage(value: Any, timeInMillis: TimeStamp) extends Message { +case class DefaultMessage(value: Any, timeInMillis: MilliSeconds) extends Message { /** * @param value Accept any type except Null, Nothing and Unit @@ -74,7 +76,7 @@ object Message { * @param value Accept any type except Null, Nothing and Unit * @param timestamp timestamp must be smaller than Long.MaxValue */ - def apply(value: Any, timestamp: TimeStamp): Message = { + def apply(value: Any, timestamp: MilliSeconds): Message = { DefaultMessage(value, timestamp) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/Time.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/Time.scala b/core/src/main/scala/org/apache/gearpump/Time.scala new file mode 100644 index 0000000..054becf --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/Time.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump + +/** + * Types and constants of time in gearpump + */ +object Time { + type MilliSeconds = Long + + // maximum valid time that won't overflow when being converted to milli-seconds + // Long.MaxValue is reserved for unreachable time + val MAX_TIME_MILLIS: Long = Long.MaxValue - 1 + + // minimum valid time won't overflow when being converted to milli-seconds + val MIN_TIME_MILLIS: Long = Long.MinValue + + val UNREACHABLE: Long = Long.MaxValue +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala b/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala index e8956ac..8a067b5 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala @@ -23,7 +23,7 @@ import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary} import scala.util.Try import akka.actor.ActorRef import com.typesafe.config.Config -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.cluster.appmaster.WorkerInfo import org.apache.gearpump.cluster.master.MasterSummary import org.apache.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest} @@ -142,7 +142,7 @@ object MasterToClient { case class MasterConfig(config: Config) - case class HistoryMetricsItem(time: TimeStamp, value: MetricType) + case class HistoryMetricsItem(time: MilliSeconds, value: MetricType) /** * History metrics returned from master, worker, or app master. @@ -157,7 +157,7 @@ object MasterToClient { case class HistoryMetrics(path: String, metrics: List[HistoryMetricsItem]) /** Return the last error of this streaming application job */ - case class LastFailure(time: TimeStamp, error: String) + case class LastFailure(time: MilliSeconds, error: String) sealed trait ApplicationResult @@ -208,8 +208,8 @@ object AppMasterToMaster { def appName: String def actorPath: String def status: ApplicationStatus - def startTime: TimeStamp - def uptime: TimeStamp + def startTime: MilliSeconds + def uptime: MilliSeconds def user: String } @@ -220,8 +220,8 @@ object AppMasterToMaster { appName: String = null, actorPath: String = null, status: ApplicationStatus = ApplicationStatus.ACTIVE, - startTime: TimeStamp = 0L, - uptime: TimeStamp = 0L, + startTime: MilliSeconds = 0L, + uptime: MilliSeconds = 0L, user: String = null) extends AppMasterSummary @@ -244,7 +244,7 @@ object AppMasterToMaster { * Denotes the application state change of an app. */ case class ApplicationStatusChanged(appId: Int, newStatus: ApplicationStatus, - timeStamp: TimeStamp, error: Throwable) + timeStamp: MilliSeconds, error: Throwable) } object MasterToAppMaster { @@ -263,8 +263,8 @@ object MasterToAppMaster { sealed trait StreamingType case class AppMasterData(status: ApplicationStatus, appId: Int = 0, appName: String = null, - appMasterPath: String = null, workerPath: String = null, submissionTime: TimeStamp = 0, - startTime: TimeStamp = 0, finishTime: TimeStamp = 0, user: String = null) + appMasterPath: String = null, workerPath: String = null, submissionTime: MilliSeconds = 0, + startTime: MilliSeconds = 0, finishTime: MilliSeconds = 0, user: String = null) case class AppMasterDataRequest(appId: Int, detail: Boolean = false) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationMetaData.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationMetaData.scala b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationMetaData.scala index b011a0d..bcaf1f0 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationMetaData.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationMetaData.scala @@ -19,7 +19,6 @@ package org.apache.gearpump.cluster.appmaster import org.apache.gearpump.cluster.{AppDescription, AppJar} -import akka.routing.MurmurHash._ /** * The meta data of an application, which stores the crucial infomation of how to launch http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationRuntimeInfo.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationRuntimeInfo.scala b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationRuntimeInfo.scala index d9b73e2..1054628 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationRuntimeInfo.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationRuntimeInfo.scala @@ -20,7 +20,7 @@ package org.apache.gearpump.cluster.appmaster import akka.actor.ActorRef import com.typesafe.config.{Config, ConfigFactory} -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.cluster.{ApplicationStatus, ApplicationTerminalStatus} /** Run time info of Application */ @@ -31,9 +31,9 @@ case class ApplicationRuntimeInfo( appMaster: ActorRef = ActorRef.noSender, worker: ActorRef = ActorRef.noSender, user: String = "", - submissionTime: TimeStamp = 0, - startTime: TimeStamp = 0, - finishTime: TimeStamp = 0, + submissionTime: MilliSeconds = 0, + startTime: MilliSeconds = 0, + finishTime: MilliSeconds = 0, config: Config = ConfigFactory.empty(), status: ApplicationStatus = ApplicationStatus.NONEXIST) { @@ -41,11 +41,11 @@ case class ApplicationRuntimeInfo( this.copy(appMaster = appMaster, worker = worker) } - def onAppMasterActivated(timeStamp: TimeStamp): ApplicationRuntimeInfo = { + def onAppMasterActivated(timeStamp: MilliSeconds): ApplicationRuntimeInfo = { this.copy(startTime = timeStamp, status = ApplicationStatus.ACTIVE) } - def onFinalStatus(timeStamp: TimeStamp, finalStatus: ApplicationTerminalStatus): + def onFinalStatus(timeStamp: MilliSeconds, finalStatus: ApplicationTerminalStatus): ApplicationRuntimeInfo = { this.copy(finishTime = timeStamp, status = finalStatus) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala index cf5842f..c78e06c 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala @@ -20,7 +20,7 @@ package org.apache.gearpump.cluster.client import com.typesafe.config.Config import org.apache.gearpump.cluster.client.RuntimeEnvironment.RemoteClientContext -import org.apache.gearpump.cluster.embedded.EmbeddedRuntimeEnvironemnt +import org.apache.gearpump.cluster.embedded.EmbeddedRuntimeEnvironment /** * The RuntimeEnvironment is the context decides where an application is submitted to. @@ -45,7 +45,7 @@ object RuntimeEnvironment { class RemoteClientContext(akkaConf: Config) extends ClientContext(akkaConf, null, null) def get() : RuntimeEnvironment = { - Option(envInstance).getOrElse(new EmbeddedRuntimeEnvironemnt) + Option(envInstance).getOrElse(new EmbeddedRuntimeEnvironment) } def newClientContext(akkaConf: Config): ClientContext = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala index 8abcd96..3fcd569 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala @@ -24,8 +24,6 @@ import scala.concurrent.duration.Duration import akka.actor.{ActorRef, ActorSystem, Props} import com.typesafe.config.{Config, ConfigValueFactory} import org.apache.gearpump.cluster.ClusterConfig -import org.apache.gearpump.cluster.client.ClientContext -import org.apache.gearpump.cluster.embedded.EmbeddedRuntimeEnvironemnt.EmbeddedClientContext import org.apache.gearpump.cluster.master.{Master => MasterActor} import org.apache.gearpump.cluster.worker.{Worker => WorkerActor} import org.apache.gearpump.util.Constants.{GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, GEARPUMP_CLUSTER_MASTERS, GEARPUMP_METRIC_ENABLED, MASTER} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironment.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironment.scala b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironment.scala new file mode 100644 index 0000000..bf3b5a7 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironment.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.embedded + +import com.typesafe.config.Config +import org.apache.gearpump.cluster.client.{ClientContext, RuntimeEnvironment} +import org.apache.gearpump.cluster.embedded.EmbeddedRuntimeEnvironment.EmbeddedClientContext + +/** + * The EmbeddedRuntimeEnvironment is initiated when user trying to launch their application + * from IDE. It will create an embedded cluster and user's application will run in a single + * local process. + */ +class EmbeddedRuntimeEnvironment extends RuntimeEnvironment { + override def newClientContext(akkaConf: Config): ClientContext = { + new EmbeddedClientContext(akkaConf) + } +} + +object EmbeddedRuntimeEnvironment { + class EmbeddedClientContext private(cluster: EmbeddedCluster) + extends ClientContext(cluster.config, cluster.system, cluster.master) { + + def this(akkaConf: Config) { + this(new EmbeddedCluster(akkaConf)) + } + + override def close(): Unit = { + super.close() + cluster.stop() + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/cluster/main/Info.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/Info.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/Info.scala index e2f8bad..fa2d429 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/main/Info.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/main/Info.scala @@ -19,8 +19,7 @@ package org.apache.gearpump.cluster.main import org.apache.gearpump.cluster.MasterToAppMaster.AppMastersData import org.apache.gearpump.cluster.client.ClientContext -import org.apache.gearpump.util.{AkkaApp, LogUtil} -import org.slf4j.Logger +import org.apache.gearpump.util.AkkaApp /** Tool to query master info */ object Info extends AkkaApp with ArgumentsParser { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala index 4f07707..d5a3520 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala @@ -19,8 +19,7 @@ package org.apache.gearpump.cluster.main import org.apache.gearpump.cluster.client.ClientContext -import org.apache.gearpump.util.{AkkaApp, LogUtil} -import org.slf4j.Logger +import org.apache.gearpump.util.AkkaApp /** Tool to kill an App */ object Kill extends AkkaApp with ArgumentsParser { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala index 42c2081..11b7239 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala @@ -18,8 +18,7 @@ package org.apache.gearpump.cluster.main -import org.apache.gearpump.util.{AkkaApp, LogUtil} -import org.slf4j.Logger +import org.apache.gearpump.util.AkkaApp /** Tool to run any main class by providing a jar */ object MainRunner extends AkkaApp with ArgumentsParser { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala index 03ec899..8c2d7ef 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala @@ -18,8 +18,7 @@ package org.apache.gearpump.cluster.main import org.apache.gearpump.cluster.client.ClientContext -import org.apache.gearpump.util.{AkkaApp, LogUtil} -import org.slf4j.Logger +import org.apache.gearpump.util.AkkaApp // Internal tool to restart an application object Replay extends AkkaApp with ArgumentsParser { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala index e41a2c5..450d512 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala @@ -20,8 +20,8 @@ package org.apache.gearpump.cluster.master import akka.actor._ import akka.pattern.ask -import com.typesafe.config.{Config, ConfigFactory} -import org.apache.gearpump._ +import com.typesafe.config.ConfigFactory +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, SaveAppDataFailed, _} import org.apache.gearpump.cluster.AppMasterToWorker._ import org.apache.gearpump.cluster.{ApplicationStatus, ApplicationTerminalStatus} @@ -38,7 +38,6 @@ import org.apache.gearpump.util.{ActorUtil, TimeOutScheduler, Util, _} import org.slf4j.Logger import scala.concurrent.Future -import scala.concurrent.duration._ import scala.util.{Failure, Success} /** @@ -228,7 +227,7 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch } private def onApplicationStatusChanged(appId: Int, newStatus: ApplicationStatus, - timeStamp: TimeStamp, error: Throwable): Unit = { + timeStamp: MilliSeconds, error: Throwable): Unit = { applicationRegistry.get(appId) match { case Some(appRuntimeInfo) => if (appRuntimeInfo.status.canTransitTo(newStatus)) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala index 2d79558..d791a10 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala @@ -34,7 +34,7 @@ import org.apache.gearpump.cluster.AppMasterToWorker.{LaunchExecutor, ShutdownEx import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult import org.apache.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected -import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, ApplicationRuntimeInfo, WorkerInfo} +import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, WorkerInfo} import org.apache.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest} import org.apache.gearpump.cluster.{AppDescription, AppJar, _} import org.apache.gearpump.transport.HostPort http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala index 8da417e..68a12d1 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala @@ -22,7 +22,6 @@ import java.lang.management.ManagementFactory import org.apache.gearpump.cluster.worker.WorkerId import org.apache.gearpump.jarstore.JarStoreServer -import scala.collection.JavaConverters._ import scala.collection.immutable import akka.actor._ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala b/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala index ec9f1ba..1329127 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala @@ -18,7 +18,7 @@ package org.apache.gearpump.cluster.scheduler import akka.actor.{Actor, ActorRef} -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, UpdateResourceSucceed, WorkerRegistered} import org.apache.gearpump.cluster.WorkerToMaster.ResourceUpdate import org.apache.gearpump.cluster.master.Master.WorkerTerminated @@ -71,7 +71,7 @@ abstract class Scheduler extends Actor { object Scheduler { case class PendingRequest( - appId: Int, appMaster: ActorRef, request: ResourceRequest, timeStamp: TimeStamp) + appId: Int, appMaster: ActorRef, request: ResourceRequest, timeStamp: MilliSeconds) case class ApplicationFinished(appId: Int) } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/jarstore/FileDirective.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/jarstore/FileDirective.scala b/core/src/main/scala/org/apache/gearpump/jarstore/FileDirective.scala index 969da04..d45e102 100644 --- a/core/src/main/scala/org/apache/gearpump/jarstore/FileDirective.scala +++ b/core/src/main/scala/org/apache/gearpump/jarstore/FileDirective.scala @@ -146,7 +146,7 @@ object FileDirective { if (p.filename.isDefined) { val targetPath = File.createTempFile(s"userfile_${p.name}_", s"${p.filename.getOrElse("")}") - val writtenFuture = p.entity.dataBytes.runWith(FileIO.toFile(targetPath)) + val writtenFuture = p.entity.dataBytes.runWith(FileIO.toPath(targetPath.toPath)) writtenFuture.map(written => if (written.count > 0) { Map(p.name -> Left(FileInfo(p.filename.get, targetPath, written.count))) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala b/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala index 4ce8f2d..8c1d19a 100644 --- a/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala +++ b/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala @@ -139,14 +139,14 @@ object FileServer { // Download file to local val response = Source.single(HttpRequest(uri = download)).via(httpClient).runWith(Sink.head) val downloaded = response.flatMap { response => - response.entity.dataBytes.runWith(FileIO.toFile(saveAs)) + response.entity.dataBytes.runWith(FileIO.toPath(saveAs.toPath)) } downloaded.map(written => Unit) } private def entity(file: File)(implicit ec: ExecutionContext): Future[RequestEntity] = { val entity = HttpEntity(MediaTypes.`application/octet-stream`, file.length(), - FileIO.fromFile(file, chunkSize = 100000)) + FileIO.fromPath(file.toPath, chunkSize = 100000)) val body = Source.single( Multipart.FormData.BodyPart( "uploadfile", http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/package.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/package.scala b/core/src/main/scala/org/apache/gearpump/package.scala deleted file mode 100644 index 6e20277..0000000 --- a/core/src/main/scala/org/apache/gearpump/package.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache - -package object gearpump { - type TimeStamp = Long - - // maximum time won't overflow when converted to milli-seconds - val MAX_TIME_MILLIS: Long = Long.MaxValue - 1 - - // minimum time won't overflow when converted to milli-seconds - val MIN_TIME_MILLIS: Long = Long.MinValue -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/util/Graph.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/util/Graph.scala b/core/src/main/scala/org/apache/gearpump/util/Graph.scala index 609b133..f110f5f 100644 --- a/core/src/main/scala/org/apache/gearpump/util/Graph.scala +++ b/core/src/main/scala/org/apache/gearpump/util/Graph.scala @@ -449,15 +449,15 @@ object Graph { new Path(path :+ Right(edge)) } - def ~>[Node >: N](node: Node): Path[Node, E] = { + def ~>[NodeT >: N](node: NodeT): Path[NodeT, E] = { new Path(path :+ Left(node)) } - def to[Node >: N, Edge >: E](node: Node, edge: Edge): Path[Node, Edge] = { + def to[NodeT >: N, EdgeT >: E](node: NodeT, edge: EdgeT): Path[NodeT, EdgeT] = { this ~ edge ~> node } - private[Graph] def updategraph[Node >: N, Edge >: E](graph: Graph[Node, Edge]): Unit = { + private[Graph] def updategraph[NodeT >: N, EdgeT >: E](graph: Graph[NodeT, EdgeT]): Unit = { val nodeEdgePair: Tuple2[Option[N], Option[E]] = (None, None) path.foldLeft(nodeEdgePair) { (pair, either) => val (lastNode, lastEdge) = pair @@ -465,7 +465,7 @@ object Graph { case Left(node) => graph.addVertex(node) if (lastNode.isDefined) { - graph.addEdge(lastNode.get, lastEdge.getOrElse(null.asInstanceOf[Edge]), node) + graph.addEdge(lastNode.get, lastEdge.getOrElse(null.asInstanceOf[EdgeT]), node) } (Some(node), None) case Right(edge) => @@ -481,29 +481,29 @@ object Graph { implicit class Node[N, E](self: N) extends Path[N, E](List(Left(self))) { - override def ~[Edge](edge: Edge): Path[N, Edge] = { + override def ~[EdgeT](edge: EdgeT): Path[N, EdgeT] = { new Path(List(Left(self), Right(edge))) } - override def ~>[Node >: N](node: Node): Path[Node, E] = { + override def ~>[NodeT >: N](node: NodeT): Path[NodeT, E] = { new NodeList(List(self, node)) } - override def to[Node >: N, Edge >: E](node: Node, edge: Edge): Path[Node, Edge] = { + override def to[NodeT >: N, EdgeT >: E](node: NodeT, edge: EdgeT): Path[NodeT, EdgeT] = { this ~ edge ~> node } } class NodeList[N, E](nodes: List[N]) extends Path[N, E](nodes.map(Left(_))) { - override def ~[Edge](edge: Edge): Path[N, Edge] = { + override def ~[EdgeT](edge: EdgeT): Path[N, EdgeT] = { new Path(nodes.map(Left(_)) :+ Right(edge)) } - override def ~>[Node >: N](node: Node): Path[Node, E] = { + override def ~>[NodeT >: N](node: NodeT): Path[NodeT, E] = { new NodeList(nodes :+ node) } - override def to[Node >: N, Edge >: E](node: Node, edge: Edge): Path[Node, Edge] = { + override def to[NodeT >: N, EdgeT >: E](node: NodeT, edge: EdgeT): Path[NodeT, EdgeT] = { this ~ edge ~> node } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala b/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala index ee59678..d45d761 100644 --- a/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala +++ b/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala @@ -25,7 +25,7 @@ import akka.actor.Actor import com.typesafe.config.Config import org.slf4j.Logger -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, ReadOption} import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem} import org.apache.gearpump.metrics.Metrics._ @@ -217,7 +217,7 @@ object HistoryMetricsService { add(inputMetrics, System.currentTimeMillis()) } - def add(inputMetrics: MetricType, now: TimeStamp): Unit = { + def add(inputMetrics: MetricType, now: MilliSeconds): Unit = { val metrics = HistoryMetricsItem(now, inputMetrics) latest = List(metrics) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/util/LogUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/util/LogUtil.scala b/core/src/main/scala/org/apache/gearpump/util/LogUtil.scala index 44cb87f..98850a8 100644 --- a/core/src/main/scala/org/apache/gearpump/util/LogUtil.scala +++ b/core/src/main/scala/org/apache/gearpump/util/LogUtil.scala @@ -19,9 +19,7 @@ package org.apache.gearpump.util import java.io.File -import java.net.InetAddress import java.util.Properties -import scala.util.Try import com.typesafe.config.Config import org.apache.log4j.PropertyConfigurator http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/util/Util.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/util/Util.scala b/core/src/main/scala/org/apache/gearpump/util/Util.scala index fe4b540..2763fd2 100644 --- a/core/src/main/scala/org/apache/gearpump/util/Util.scala +++ b/core/src/main/scala/org/apache/gearpump/util/Util.scala @@ -27,7 +27,7 @@ import scala.util.{Failure, Success, Try} import com.typesafe.config.{Config, ConfigFactory} import org.apache.gearpump.cluster.AppJar -import org.apache.gearpump.jarstore.{JarStoreClient, JarStoreServer} +import org.apache.gearpump.jarstore.JarStoreClient import org.apache.gearpump.transport.HostPort object Util { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala index 29fcd26..554d9c3 100644 --- a/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala +++ b/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala @@ -35,7 +35,7 @@ import org.apache.gearpump.util.{Constants, LogUtil, Util} import org.scalatest._ import scala.concurrent.Future -import scala.util.{Success, Try} +import scala.util.Success class MainSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/test/scala/org/apache/gearpump/util/RestartPolicySpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/util/RestartPolicySpec.scala b/core/src/test/scala/org/apache/gearpump/util/RestartPolicySpec.scala index 5d0c66d..2dcae2f 100644 --- a/core/src/test/scala/org/apache/gearpump/util/RestartPolicySpec.scala +++ b/core/src/test/scala/org/apache/gearpump/util/RestartPolicySpec.scala @@ -20,8 +20,6 @@ package org.apache.gearpump.util import org.scalatest.{FlatSpec, Matchers} -import scala.concurrent.duration._ - class RestartPolicySpec extends FlatSpec with Matchers { "RestartPolicy" should "forbid too many restarts" in { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/examples/distributeservice/src/test/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala ---------------------------------------------------------------------- diff --git a/examples/distributeservice/src/test/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala b/examples/distributeservice/src/test/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala index b78bfc2..741a883 100644 --- a/examples/distributeservice/src/test/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala +++ b/examples/distributeservice/src/test/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala @@ -27,7 +27,7 @@ import org.scalatest.{BeforeAndAfter, Matchers, WordSpec} import org.apache.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, RegisterAppMaster, RequestResource} import org.apache.gearpump.cluster.AppMasterToWorker.LaunchExecutor import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterRegistered, ResourceAllocated, WorkerList} -import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, ApplicationRuntimeInfo} +import org.apache.gearpump.cluster.appmaster.AppMasterRuntimeEnvironment import org.apache.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceAllocation, ResourceRequest} import org.apache.gearpump.cluster.worker.WorkerId import org.apache.gearpump.cluster.{AppDescription, AppMasterContext, TestUtil, UserConfig} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala index 2aa1bb4..379c7b6 100644 --- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala +++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala @@ -24,7 +24,7 @@ import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} import org.apache.gearpump.streaming.dsl.scalaapi.{LoggerSink, StreamApp} import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, FixedWindows} -import org.apache.gearpump.streaming.source.DataSource +import org.apache.gearpump.streaming.source.{DataSource, Watermark} import org.apache.gearpump.streaming.task.TaskContext import org.apache.gearpump.util.AkkaApp @@ -45,7 +45,7 @@ object WindowedWordCount extends AkkaApp with ArgumentsParser { groupBy(_._1). sum.sink(new LoggerSink) - context.submit(app) + context.submit(app).waitUntilFinish() context.close() } @@ -79,7 +79,7 @@ object WindowedWordCount extends AkkaApp with ArgumentsParser { override def getWatermark: Instant = { if (data.isEmpty) { - watermark = watermark.plusMillis(1) + watermark = Watermark.MAX } watermark } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala index 8a869d2..afe7d33 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala @@ -69,9 +69,10 @@ class GearpumpMaterializerSession(system: ActorSystem, topLevel: Module, enterScope(copied) materializedValues.put(copied, materializeModule(copied, currentAttributes)) exitScope(copied) - case composite => + case composite: CompositeModule => materializedValues.put(composite, materializeComposite(composite, currentAttributes)) - case EmptyModule => + case _ => + // ignore other modules } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala index ad2ac61..e87752d 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala @@ -20,7 +20,7 @@ package org.apache.gearpump.akkastream.example import akka.NotUsed import akka.actor.ActorSystem -import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Materializer} +import akka.stream.{ActorMaterializer, ActorMaterializerSettings} import akka.stream.scaladsl._ import org.apache.gearpump.akkastream.GearpumpMaterializer import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala index 66414e0..25c7071 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala @@ -26,7 +26,7 @@ import org.apache.gearpump.akkastream.GearpumpMaterializer import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} import org.apache.gearpump.util.AkkaApp -import scala.concurrent.{Await, Future} +import scala.concurrent.Await import scala.concurrent.duration._ /** http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala index f7919c0..d764331 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala @@ -26,7 +26,7 @@ import org.apache.gearpump.akkastream.graph.GraphPartitioner.Strategy import org.apache.gearpump.akkastream.module._ import akka.stream.impl.StreamLayout.Module import akka.stream.impl.fusing.GraphStageModule -import akka.stream.impl.fusing.GraphStages.{MaterializedValueSource, SimpleLinearGraphStage, SingleSource} +import akka.stream.impl.fusing.GraphStages.{MaterializedValueSource, SingleSource} import akka.stream.impl.{SinkModule, SourceModule} import org.apache.gearpump.util.Graph http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala index a74143e..494be45 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala @@ -18,7 +18,6 @@ package org.apache.gearpump.akkastream.graph -import akka.actor.ActorSystem import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge import akka.stream.impl.StreamLayout.Module import org.apache.gearpump.util.Graph http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala index e2cdbd4..a62b8e3 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala @@ -161,7 +161,7 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) { ProcessorOp(processor.processor, parallelism, updatedConf, "source") case sinkBridge: SinkBridgeModule[_, _] => ProcessorOp(classOf[SinkBridgeTask], parallelism, conf, "sink") - case groupBy: GroupByModule[Any, Any] => + case groupBy: GroupByModule[_, _] => GroupByOp(groupBy.groupBy, parallelism, "groupBy", conf) case reduce: ReduceModule[_] => reduceOp(reduce.f, conf) @@ -238,7 +238,7 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) { val foldConf = conf.withValue(FoldTask.ZERO, fold.zero.asInstanceOf[AnyRef]). withValue(FoldTask.AGGREGATOR, fold.f) ProcessorOp(classOf[FoldTask[_, _]], parallelism, foldConf, "fold") - case groupBy: GroupBy[Any, Any] => + case groupBy: GroupBy[_, _] => GroupByOp(groupBy.keyFor, groupBy.maxSubstreams, "groupBy", conf) case groupedWithin: GroupedWithin[_] => val diConf = conf.withValue[FiniteDuration](GroupedWithinTask.TIME_WINDOW, groupedWithin.d). http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala index 987546c..4a438d7 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala @@ -37,7 +37,7 @@ import org.apache.storm.shade.org.yaml.snakeyaml.Yaml import org.slf4j.Logger import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} -import org.apache.gearpump.cluster.{ApplicationStatus, MasterToAppMaster, UserConfig} +import org.apache.gearpump.cluster.{ApplicationStatus, UserConfig} import org.apache.gearpump.experiments.storm.topology.GearpumpStormTopology import org.apache.gearpump.experiments.storm.util.TimeCacheMapWrapper.Callback import org.apache.gearpump.experiments.storm.util.{GraphBuilder, StormConstants, StormUtil, TimeCacheMapWrapper} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala index 5794b1d..9b9bea7 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala @@ -21,10 +21,10 @@ package org.apache.gearpump.experiments.storm.producer import java.util.{List => JList} import backtype.storm.spout.{ISpout, ISpoutOutputCollector} -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.experiments.storm.util.StormOutputCollector -case class PendingMessage(id: Object, messageTime: TimeStamp, startTime: TimeStamp) +case class PendingMessage(id: Object, messageTime: MilliSeconds, startTime: MilliSeconds) /** * this is used by Storm Spout to emit messages @@ -57,7 +57,7 @@ private[storm] class StormSpoutOutputCollector( setPendingOrAck(messageId, curTime, curTime) } - def ackPendingMessage(checkpointClock: TimeStamp): Unit = { + def ackPendingMessage(checkpointClock: MilliSeconds): Unit = { this.checkpointClock = checkpointClock nextPendingMessage.foreach { case PendingMessage(_, messageTime, _) => if (messageTime <= this.checkpointClock) { @@ -83,7 +83,7 @@ private[storm] class StormSpoutOutputCollector( nextPendingMessage = None } - private def setPendingOrAck(messageId: Object, startTime: TimeStamp, messageTime: TimeStamp) + private def setPendingOrAck(messageId: Object, startTime: MilliSeconds, messageTime: MilliSeconds) : Unit = { if (ackEnabled) { val newPendingMessage = PendingMessage(messageId, messageTime, startTime) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala index 248ca44..6aa5dc9 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala @@ -43,7 +43,8 @@ import org.apache.gearpump.experiments.storm.util.{StormOutputCollector, StormUt import org.apache.gearpump.streaming.DAG import org.apache.gearpump.streaming.task.{GetDAG, TaskContext, TaskId} import org.apache.gearpump.util.{Constants, LogUtil} -import org.apache.gearpump.{Message, TimeStamp} +import org.apache.gearpump.Message +import org.apache.gearpump.Time.MilliSeconds import org.slf4j.Logger import scala.collection.JavaConverters._ @@ -149,7 +150,7 @@ object GearpumpStormComponent { } } - def checkpoint(clock: TimeStamp): Unit = { + def checkpoint(clock: MilliSeconds): Unit = { collector.ackPendingMessage(clock) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala index eb61acb..9f2fa1f 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala @@ -23,7 +23,7 @@ import java.util.{List => JList} import backtype.storm.task.GeneralTopologyContext import backtype.storm.tuple.{Tuple, TupleImpl} -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds /** * this carries Storm tuple values in the Gearpump world @@ -42,7 +42,7 @@ private[storm] class GearpumpTuple( * @param topologyContext topology context used for all tasks * @return a Tuple */ - def toTuple(topologyContext: GeneralTopologyContext, timestamp: TimeStamp): Tuple = { + def toTuple(topologyContext: GeneralTopologyContext, timestamp: MilliSeconds): Tuple = { TimedTuple(topologyContext, values, sourceTaskId, sourceStreamId, timestamp) } @@ -64,6 +64,6 @@ private[storm] class GearpumpTuple( } case class TimedTuple(topologyContext: GeneralTopologyContext, tuple: JList[AnyRef], - sourceTaskId: Integer, sourceStreamId: String, timestamp: TimeStamp) + sourceTaskId: Integer, sourceStreamId: String, timestamp: MilliSeconds) extends TupleImpl(topologyContext, tuple, sourceTaskId, sourceStreamId, null) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala index fd023a9..a95725e 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala @@ -28,7 +28,8 @@ import backtype.storm.task.TopologyContext import backtype.storm.tuple.Fields import backtype.storm.utils.Utils import org.slf4j.Logger -import org.apache.gearpump.{MIN_TIME_MILLIS, Message, TimeStamp} +import org.apache.gearpump.{Message, Time} +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.experiments.storm.topology.GearpumpTuple import org.apache.gearpump.experiments.storm.util.StormUtil._ import org.apache.gearpump.streaming.ProcessorId @@ -56,7 +57,7 @@ object StormOutputCollector { streamGroupers, componentToProcessorId, values) } new StormOutputCollector(stormTaskId, taskToComponent, targets, getTargetPartitionsFn, - taskContext, MIN_TIME_MILLIS) + taskContext, Time.MIN_TIME_MILLIS) } /** @@ -164,7 +165,7 @@ class StormOutputCollector( targets: JMap[String, JMap[String, Grouping]], getTargetPartitionsFn: (String, JList[AnyRef]) => (Map[String, Array[Int]], JList[Integer]), val taskContext: TaskContext, - private var timestamp: TimeStamp) { + private var timestamp: MilliSeconds) { import org.apache.gearpump.experiments.storm.util.StormOutputCollector._ /** @@ -213,7 +214,7 @@ class StormOutputCollector( /** * set timestamp from each incoming Message if not attached. */ - def setTimestamp(timestamp: TimeStamp): Unit = { + def setTimestamp(timestamp: MilliSeconds): Unit = { this.timestamp = timestamp } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala index 430b1c0..2fe124d 100644 --- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala @@ -18,8 +18,6 @@ package org.apache.gearpump.experiments.storm.processor -import java.util.{List => JList} - import backtype.storm.tuple.Tuple import backtype.storm.utils.Utils import org.apache.gearpump.experiments.storm.util.StormOutputCollector http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala index 49afe05..8faf7d2 100644 --- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala @@ -18,8 +18,6 @@ package org.apache.gearpump.experiments.storm.producer -import java.util.{List => JList} - import backtype.storm.spout.ISpout import backtype.storm.utils.Utils import org.apache.gearpump.experiments.storm.util.StormOutputCollector http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala index 0891070..50204ca 100644 --- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala @@ -24,12 +24,13 @@ import akka.actor.ActorRef import backtype.storm.spout.{ISpout, SpoutOutputCollector} import backtype.storm.task.{GeneralTopologyContext, IBolt, OutputCollector, TopologyContext} import backtype.storm.tuple.Tuple +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.experiments.storm.producer.StormSpoutOutputCollector import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, GearpumpSpout} import org.apache.gearpump.experiments.storm.util.StormOutputCollector import org.apache.gearpump.streaming.task.{TaskContext, TaskId} import org.apache.gearpump.streaming.{DAG, MockUtil} -import org.apache.gearpump.{Message, TimeStamp} +import org.apache.gearpump.Message import org.mockito.Matchers.{anyObject, eq => mockitoEq} import org.mockito.Mockito._ import org.scalacheck.Gen @@ -75,7 +76,7 @@ class GearpumpStormComponentSpec property("GearpumpBolt lifecycle") { val timestampGen = Gen.chooseNum[Long](0L, 1000L) val freqGen = Gen.chooseNum[Int](1, 100) - forAll(timestampGen, freqGen) { (timestamp: TimeStamp, freq: Int) => + forAll(timestampGen, freqGen) { (timestamp: MilliSeconds, freq: Int) => val config = mock[JMap[AnyRef, AnyRef]] val bolt = mock[IBolt] val taskContext = MockUtil.mockTaskContext http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala index f12e54f..dacbdfd 100644 --- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala @@ -21,7 +21,7 @@ import java.util.{List => JList} import backtype.storm.task.GeneralTopologyContext import backtype.storm.tuple.Fields -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.mockito.Mockito._ import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar @@ -40,7 +40,7 @@ class GearpumpTupleSpec extends PropSpec with PropertyChecks with Matchers with } yield new GearpumpTuple(values, new Integer(sourceTaskId), sourceStreamId, null) forAll(tupleGen, Gen.alphaStr, Gen.chooseNum[Long](0, Long.MaxValue)) { - (gearpumpTuple: GearpumpTuple, componentId: String, timestamp: TimeStamp) => + (gearpumpTuple: GearpumpTuple, componentId: String, timestamp: MilliSeconds) => val topologyContext = mock[GeneralTopologyContext] val fields = new Fields(gearpumpTuple.values.asScala.map(_.asInstanceOf[String]): _*) when(topologyContext.getComponentId(gearpumpTuple.sourceTaskId)).thenReturn(componentId) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala index 05627c9..7fab2cc 100644 --- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala @@ -27,7 +27,8 @@ import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar import org.scalatest.prop.PropertyChecks import org.scalatest.{Matchers, PropSpec} -import org.apache.gearpump.{MIN_TIME_MILLIS, Message, TimeStamp} +import org.apache.gearpump.{Message, Time} +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.experiments.storm.topology.GearpumpTuple import org.apache.gearpump.streaming.MockUtil @@ -41,7 +42,7 @@ class StormOutputCollectorSpec property("StormOutputCollector emits tuple values into a stream") { forAll(timestampGen, streamIdGen, valuesGen) { - (timestamp: TimeStamp, streamId: String, values: JList[AnyRef]) => + (timestamp: MilliSeconds, streamId: String, values: JList[AnyRef]) => val targets = mock[JMap[String, JMap[String, Grouping]]] val taskToComponent = mock[JMap[Integer, String]] val getTargetPartitionsFn = mock[(String, JList[AnyRef]) => @@ -52,7 +53,7 @@ class StormOutputCollectorSpec targetStormTaskIds)) val taskContext = MockUtil.mockTaskContext val stormOutputCollector = new StormOutputCollector(stormTaskId, taskToComponent, - targets, getTargetPartitionsFn, taskContext, MIN_TIME_MILLIS) + targets, getTargetPartitionsFn, taskContext, Time.MIN_TIME_MILLIS) when(targets.containsKey(streamId)).thenReturn(false) stormOutputCollector.emit(streamId, values) shouldBe StormOutputCollector.EMPTY_LIST @@ -85,7 +86,7 @@ class StormOutputCollectorSpec targetStormTaskIds)) val taskContext = MockUtil.mockTaskContext val stormOutputCollector = new StormOutputCollector(stormTaskId, taskToComponent, - targets, getTargetPartitionsFn, taskContext, MIN_TIME_MILLIS) + targets, getTargetPartitionsFn, taskContext, Time.MIN_TIME_MILLIS) when(targets.containsKey(streamId)).thenReturn(false) verify(taskContext, times(0)).output(anyObject[Message]) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/NMClient.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/NMClient.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/NMClient.scala index 59f3832..810b557 100644 --- a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/NMClient.scala +++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/NMClient.scala @@ -25,7 +25,7 @@ import com.typesafe.config.Config import org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster.ContainerStarted import org.apache.gearpump.experiments.yarn.glue.Records._ import org.apache.gearpump.util.LogUtil -import org.apache.hadoop.yarn.api.records.{ApplicationId => YarnApplicationId, ApplicationReport => YarnApplicationReport, Container => YarnContainer, ContainerId => YarnContainerId, ContainerStatus => YarnContainerStatus, NodeId => YarnNodeId, Resource => YarnResource} +import org.apache.hadoop.yarn.api.records.{ContainerId => YarnContainerId, ContainerStatus => YarnContainerStatus} import org.apache.hadoop.yarn.client.api.async.NMClientAsync import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl /** http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/RMClient.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/RMClient.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/RMClient.scala index 629e233..0625b2d 100644 --- a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/RMClient.scala +++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/RMClient.scala @@ -22,7 +22,7 @@ import akka.actor.ActorRef import org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster.{AppMasterRegistered, ContainersAllocated, ContainersCompleted, ResourceManagerException, ShutdownApplication} import org.apache.gearpump.experiments.yarn.glue.Records._ import org.apache.gearpump.util.LogUtil -import org.apache.hadoop.yarn.api.records.{ApplicationId => YarnApplicationId, ApplicationReport => YarnApplicationReport, Container => YarnContainer, ContainerId => YarnContainerId, ContainerStatus => YarnContainerStatus, FinalApplicationStatus, NodeId => YarnNodeId, NodeReport, Priority, Resource => YarnResource} +import org.apache.hadoop.yarn.api.records.{Container => YarnContainer, ContainerId => YarnContainerId, ContainerStatus => YarnContainerStatus, FinalApplicationStatus, NodeReport, Priority} import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync import org.slf4j.Logger http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/external/hadoopfs/README.md ---------------------------------------------------------------------- diff --git a/external/hadoopfs/README.md b/external/hadoopfs/README.md index 7a9aeef..b02c378 100644 --- a/external/hadoopfs/README.md +++ b/external/hadoopfs/README.md @@ -7,7 +7,7 @@ Gearpump components for interacting with HDFS file systems. 1. File Rotation interface ```scala trait Rotation extends Serializable { - def mark(timestamp: TimeStamp, offset: Long): Unit + def mark(timestamp: MilliSeconds, offset: Long): Unit def shouldRotate: Boolean def rotate: Unit } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala index e26a2ee..5f3ca74 100644 --- a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala +++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala @@ -23,7 +23,7 @@ import java.time.Instant import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.slf4j.Logger -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.streaming.hadoop.lib.rotation.Rotation import org.apache.gearpump.streaming.hadoop.lib.{HadoopCheckpointStoreReader, HadoopCheckpointStoreWriter} import org.apache.gearpump.streaming.transaction.api.CheckpointStore @@ -72,7 +72,7 @@ class HadoopCheckpointStore( * b. closes current writer and reset * c. rotation rotates */ - override def persist(timestamp: TimeStamp, checkpoint: Array[Byte]): Unit = { + override def persist(timestamp: MilliSeconds, checkpoint: Array[Byte]): Unit = { curTime = timestamp if (curWriter.isEmpty) { curStartTime = curTime @@ -110,7 +110,7 @@ class HadoopCheckpointStore( * 5. looks for the checkpoint in the found store * }}} */ - override def recover(timestamp: TimeStamp): Option[Array[Byte]] = { + override def recover(timestamp: MilliSeconds): Option[Array[Byte]] = { var checkpoint: Option[Array[Byte]] = None if (fs.exists(dir)) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala index acc2438..4068413 100644 --- a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala +++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala @@ -23,10 +23,8 @@ import java.io.{ObjectInputStream, ObjectOutputStream} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.hadoop.lib.HadoopUtil import org.apache.gearpump.streaming.hadoop.lib.rotation.{FileSizeRotation, Rotation} -import org.apache.gearpump.streaming.task.TaskContext import org.apache.gearpump.streaming.transaction.api.{CheckpointStore, CheckpointStoreFactory} object HadoopCheckpointStoreFactory { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala index 082e963..cce4b5d 100644 --- a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala +++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala @@ -23,15 +23,15 @@ import java.io.EOFException import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds class HadoopCheckpointStoreReader( path: Path, hadoopConfig: Configuration) - extends Iterator[(TimeStamp, Array[Byte])] { + extends Iterator[(MilliSeconds, Array[Byte])] { private val stream = HadoopUtil.getInputStream(path, hadoopConfig) - private var nextTimeStamp: Option[TimeStamp] = None + private var nextTimeStamp: Option[MilliSeconds] = None private var nextData: Option[Array[Byte]] = None override def hasNext: Boolean = { @@ -56,7 +56,7 @@ class HadoopCheckpointStoreReader( } } - override def next(): (TimeStamp, Array[Byte]) = { + override def next(): (MilliSeconds, Array[Byte]) = { val timeAndData = for { time <- nextTimeStamp data <- nextData http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala index 11c12c4..ce7154a 100644 --- a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala +++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala @@ -21,12 +21,12 @@ package org.apache.gearpump.streaming.hadoop.lib import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds class HadoopCheckpointStoreWriter(path: Path, hadoopConfig: Configuration) { private lazy val stream = HadoopUtil.getOutputStream(path, hadoopConfig) - def write(timestamp: TimeStamp, data: Array[Byte]): Long = { + def write(timestamp: MilliSeconds, data: Array[Byte]): Long = { stream.writeLong(timestamp) stream.writeInt(data.length) stream.write(data) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala b/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala index a469956..8d0170e 100644 --- a/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala +++ b/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala @@ -23,7 +23,7 @@ import java.time.Instant import org.scalacheck.Gen import org.scalatest.prop.PropertyChecks import org.scalatest.{Matchers, PropSpec} -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds class FileSizeRotationSpec extends PropSpec with PropertyChecks with Matchers { @@ -31,7 +31,7 @@ class FileSizeRotationSpec extends PropSpec with PropertyChecks with Matchers { val fileSizeGen = Gen.chooseNum[Long](1, Long.MaxValue) property("FileSize rotation rotates on file size") { - forAll(timestampGen, fileSizeGen) { (timestamp: TimeStamp, fileSize: Long) => + forAll(timestampGen, fileSizeGen) { (timestamp: MilliSeconds, fileSize: Long) => val rotation = new FileSizeRotation(fileSize) rotation.shouldRotate shouldBe false rotation.mark(Instant.ofEpochMilli(timestamp), rotation.maxBytes / 2) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala index 996ae0b..391cd42 100644 --- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala @@ -20,7 +20,6 @@ package org.apache.gearpump.streaming.kafka.dsl import java.util.Properties import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.dsl import org.apache.gearpump.streaming.dsl.scalaapi.{Stream, StreamApp} import org.apache.gearpump.streaming.kafka.{KafkaSink, KafkaSource} import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala index d5a8729..6633bf4 100644 --- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala @@ -36,7 +36,8 @@ import org.apache.gearpump.streaming.kafka.util.KafkaConfig.KafkaConfigFactory import org.apache.gearpump.streaming.task.TaskContext import org.apache.gearpump.streaming.transaction.api._ import org.apache.gearpump.util.LogUtil -import org.apache.gearpump.{Message, TimeStamp} +import org.apache.gearpump.Message +import org.apache.gearpump.Time.MilliSeconds import org.slf4j.Logger object AbstractKafkaSource { @@ -147,7 +148,7 @@ abstract class AbstractKafkaSource( } } - private def maybeRecover(startTime: TimeStamp): Unit = { + private def maybeRecover(startTime: MilliSeconds): Unit = { checkpointStores.foreach { case (tp, store) => for { bytes <- store.recover(startTime) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/store/KafkaStore.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/store/KafkaStore.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/store/KafkaStore.scala index e2450f4..dbbd0ea 100644 --- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/store/KafkaStore.scala +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/store/KafkaStore.scala @@ -22,7 +22,7 @@ import java.util.Properties import com.twitter.bijection.Injection import kafka.api.OffsetRequest -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.streaming.kafka.lib.source.consumer.KafkaConsumer import org.apache.gearpump.streaming.kafka.util.KafkaConfig import org.apache.gearpump.streaming.kafka.util.KafkaConfig.KafkaConfigFactory @@ -82,9 +82,9 @@ class KafkaStore private[kafka]( extends CheckpointStore { import org.apache.gearpump.streaming.kafka.lib.store.KafkaStore._ - private var maxTime: TimeStamp = 0L + private var maxTime: MilliSeconds = 0L - override def persist(time: TimeStamp, checkpoint: Array[Byte]): Unit = { + override def persist(time: MilliSeconds, checkpoint: Array[Byte]): Unit = { // make sure checkpointed timestamp is monotonically increasing // hence (1, 1), (3, 2), (2, 3) is checkpointed as (1, 1), (3, 2), (3, 3) if (time > maxTime) { @@ -98,14 +98,14 @@ class KafkaStore private[kafka]( LOG.debug("KafkaStore persisted state ({}, {})", key, value) } - override def recover(time: TimeStamp): Option[Array[Byte]] = { + override def recover(time: MilliSeconds): Option[Array[Byte]] = { var checkpoint: Option[Array[Byte]] = None optConsumer.foreach { consumer => while (consumer.hasNext && checkpoint.isEmpty) { val kafkaMsg = consumer.next() checkpoint = for { k <- kafkaMsg.key - t <- Injection.invert[TimeStamp, Array[Byte]](k).toOption + t <- Injection.invert[MilliSeconds, Array[Byte]](k).toOption c = kafkaMsg.msg if t >= time } yield c }
