[GEARPUMP-338] Improve time related types and constants Author: manuzhang <[email protected]>
Closes #208 from manuzhang/fix_time. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/f96aca99 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/f96aca99 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/f96aca99 Branch: refs/heads/master Commit: f96aca995d6acd0770529a39fb25a4c4ef18c31e Parents: b6f5ccd Author: manuzhang <[email protected]> Authored: Fri Aug 4 10:11:58 2017 +0800 Committer: manuzhang <[email protected]> Committed: Fri Aug 4 10:12:12 2017 +0800 ---------------------------------------------------------------------- .../scala/org/apache/gearpump/Message.scala | 6 ++- .../main/scala/org/apache/gearpump/Time.scala | 34 +++++++++++++ .../gearpump/cluster/ClusterMessage.scala | 20 ++++---- .../appmaster/ApplicationRuntimeInfo.scala | 12 ++--- .../gearpump/cluster/master/AppManager.scala | 4 +- .../gearpump/cluster/scheduler/Scheduler.scala | 4 +- .../scala/org/apache/gearpump/package.scala | 29 ----------- .../gearpump/util/HistoryMetricsService.scala | 4 +- .../gearpump/util/RestartPolicySpec.scala | 2 - .../producer/StormSpoutOutputCollector.scala | 8 +-- .../storm/topology/GearpumpStormComponent.scala | 5 +- .../storm/topology/GearpumpTuple.scala | 6 +-- .../storm/util/StormOutputCollector.scala | 9 ++-- .../storm/topology/GearpumpTupleSpec.scala | 4 +- external/hadoopfs/README.md | 2 +- .../hadoop/HadoopCheckpointStore.scala | 6 +-- .../lib/HadoopCheckpointStoreReader.scala | 8 +-- .../lib/HadoopCheckpointStoreWriter.scala | 4 +- .../lib/rotation/FileSizeRotationSpec.scala | 4 +- .../kafka/lib/source/AbstractKafkaSource.scala | 5 +- .../streaming/kafka/lib/store/KafkaStore.scala | 10 ++-- .../streaming/kafka/KafkaStoreSpec.scala | 16 +++--- .../gearpump/streaming/ClusterMessage.scala | 4 +- .../gearpump/streaming/StreamApplication.scala | 12 ++--- .../streaming/appmaster/AppMaster.scala | 6 +-- .../streaming/appmaster/ClockService.scala | 53 ++++++++++---------- .../streaming/appmaster/JarScheduler.scala | 6 +-- .../appmaster/StreamAppMasterSummary.scala | 8 +-- .../streaming/appmaster/TaskManager.scala | 6 +-- .../dsl/window/api/WindowFunction.scala | 9 ++-- .../streaming/dsl/window/impl/Window.scala | 4 +- .../streaming/metrics/ProcessorAggregator.scala | 12 ++--- .../gearpump/streaming/source/Watermark.scala | 11 ++-- .../streaming/state/api/MonoidState.scala | 6 +-- .../streaming/state/api/PersistentState.scala | 8 +-- .../streaming/state/api/PersistentTask.scala | 5 +- .../state/impl/CheckpointManager.scala | 14 +++--- .../state/impl/InMemoryCheckpointStore.scala | 10 ++-- .../streaming/state/impl/NonWindowState.scala | 6 +-- .../gearpump/streaming/state/impl/Window.scala | 10 ++-- .../streaming/state/impl/WindowState.scala | 16 +++--- .../streaming/task/SerializedMessage.scala | 4 +- .../gearpump/streaming/task/Subscription.scala | 13 ++--- .../apache/gearpump/streaming/task/Task.scala | 5 +- .../gearpump/streaming/task/TaskActor.scala | 9 ++-- .../streaming/task/TaskControlMessage.scala | 14 +++--- .../gearpump/streaming/task/TaskWrapper.scala | 7 +-- .../transaction/api/CheckpointStore.scala | 6 +-- .../transaction/api/TimeStampFilter.scala | 7 +-- .../streaming/appmaster/TaskManagerSpec.scala | 4 +- .../state/impl/CheckpointManagerSpec.scala | 8 +-- .../state/impl/NonWindowStateSpec.scala | 8 +-- .../streaming/state/impl/WindowSpec.scala | 6 +-- .../streaming/state/impl/WindowStateSpec.scala | 15 +++--- .../streaming/task/SubscriptionSpec.scala | 6 +-- 55 files changed, 270 insertions(+), 250 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/core/src/main/scala/org/apache/gearpump/Message.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/Message.scala b/core/src/main/scala/org/apache/gearpump/Message.scala index 4dc5c09..7051396 100644 --- a/core/src/main/scala/org/apache/gearpump/Message.scala +++ b/core/src/main/scala/org/apache/gearpump/Message.scala @@ -20,6 +20,8 @@ package org.apache.gearpump import java.time.Instant +import org.apache.gearpump.Time.MilliSeconds + trait Message { val value: Any @@ -35,7 +37,7 @@ trait Message { * * @param value Accept any type except Null, Nothing and Unit */ -case class DefaultMessage(value: Any, timeInMillis: TimeStamp) extends Message { +case class DefaultMessage(value: Any, timeInMillis: MilliSeconds) extends Message { /** * @param value Accept any type except Null, Nothing and Unit @@ -74,7 +76,7 @@ object Message { * @param value Accept any type except Null, Nothing and Unit * @param timestamp timestamp must be smaller than Long.MaxValue */ - def apply(value: Any, timestamp: TimeStamp): Message = { + def apply(value: Any, timestamp: MilliSeconds): Message = { DefaultMessage(value, timestamp) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/core/src/main/scala/org/apache/gearpump/Time.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/Time.scala b/core/src/main/scala/org/apache/gearpump/Time.scala new file mode 100644 index 0000000..054becf --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/Time.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump + +/** + * Types and constants of time in gearpump + */ +object Time { + type MilliSeconds = Long + + // maximum valid time that won't overflow when being converted to milli-seconds + // Long.MaxValue is reserved for unreachable time + val MAX_TIME_MILLIS: Long = Long.MaxValue - 1 + + // minimum valid time won't overflow when being converted to milli-seconds + val MIN_TIME_MILLIS: Long = Long.MinValue + + val UNREACHABLE: Long = Long.MaxValue +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala b/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala index e8956ac..8a067b5 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala @@ -23,7 +23,7 @@ import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary} import scala.util.Try import akka.actor.ActorRef import com.typesafe.config.Config -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.cluster.appmaster.WorkerInfo import org.apache.gearpump.cluster.master.MasterSummary import org.apache.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest} @@ -142,7 +142,7 @@ object MasterToClient { case class MasterConfig(config: Config) - case class HistoryMetricsItem(time: TimeStamp, value: MetricType) + case class HistoryMetricsItem(time: MilliSeconds, value: MetricType) /** * History metrics returned from master, worker, or app master. @@ -157,7 +157,7 @@ object MasterToClient { case class HistoryMetrics(path: String, metrics: List[HistoryMetricsItem]) /** Return the last error of this streaming application job */ - case class LastFailure(time: TimeStamp, error: String) + case class LastFailure(time: MilliSeconds, error: String) sealed trait ApplicationResult @@ -208,8 +208,8 @@ object AppMasterToMaster { def appName: String def actorPath: String def status: ApplicationStatus - def startTime: TimeStamp - def uptime: TimeStamp + def startTime: MilliSeconds + def uptime: MilliSeconds def user: String } @@ -220,8 +220,8 @@ object AppMasterToMaster { appName: String = null, actorPath: String = null, status: ApplicationStatus = ApplicationStatus.ACTIVE, - startTime: TimeStamp = 0L, - uptime: TimeStamp = 0L, + startTime: MilliSeconds = 0L, + uptime: MilliSeconds = 0L, user: String = null) extends AppMasterSummary @@ -244,7 +244,7 @@ object AppMasterToMaster { * Denotes the application state change of an app. */ case class ApplicationStatusChanged(appId: Int, newStatus: ApplicationStatus, - timeStamp: TimeStamp, error: Throwable) + timeStamp: MilliSeconds, error: Throwable) } object MasterToAppMaster { @@ -263,8 +263,8 @@ object MasterToAppMaster { sealed trait StreamingType case class AppMasterData(status: ApplicationStatus, appId: Int = 0, appName: String = null, - appMasterPath: String = null, workerPath: String = null, submissionTime: TimeStamp = 0, - startTime: TimeStamp = 0, finishTime: TimeStamp = 0, user: String = null) + appMasterPath: String = null, workerPath: String = null, submissionTime: MilliSeconds = 0, + startTime: MilliSeconds = 0, finishTime: MilliSeconds = 0, user: String = null) case class AppMasterDataRequest(appId: Int, detail: Boolean = false) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationRuntimeInfo.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationRuntimeInfo.scala b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationRuntimeInfo.scala index d9b73e2..1054628 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationRuntimeInfo.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationRuntimeInfo.scala @@ -20,7 +20,7 @@ package org.apache.gearpump.cluster.appmaster import akka.actor.ActorRef import com.typesafe.config.{Config, ConfigFactory} -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.cluster.{ApplicationStatus, ApplicationTerminalStatus} /** Run time info of Application */ @@ -31,9 +31,9 @@ case class ApplicationRuntimeInfo( appMaster: ActorRef = ActorRef.noSender, worker: ActorRef = ActorRef.noSender, user: String = "", - submissionTime: TimeStamp = 0, - startTime: TimeStamp = 0, - finishTime: TimeStamp = 0, + submissionTime: MilliSeconds = 0, + startTime: MilliSeconds = 0, + finishTime: MilliSeconds = 0, config: Config = ConfigFactory.empty(), status: ApplicationStatus = ApplicationStatus.NONEXIST) { @@ -41,11 +41,11 @@ case class ApplicationRuntimeInfo( this.copy(appMaster = appMaster, worker = worker) } - def onAppMasterActivated(timeStamp: TimeStamp): ApplicationRuntimeInfo = { + def onAppMasterActivated(timeStamp: MilliSeconds): ApplicationRuntimeInfo = { this.copy(startTime = timeStamp, status = ApplicationStatus.ACTIVE) } - def onFinalStatus(timeStamp: TimeStamp, finalStatus: ApplicationTerminalStatus): + def onFinalStatus(timeStamp: MilliSeconds, finalStatus: ApplicationTerminalStatus): ApplicationRuntimeInfo = { this.copy(finishTime = timeStamp, status = finalStatus) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala index b00cc17..450d512 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala @@ -21,7 +21,7 @@ package org.apache.gearpump.cluster.master import akka.actor._ import akka.pattern.ask import com.typesafe.config.ConfigFactory -import org.apache.gearpump._ +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, SaveAppDataFailed, _} import org.apache.gearpump.cluster.AppMasterToWorker._ import org.apache.gearpump.cluster.{ApplicationStatus, ApplicationTerminalStatus} @@ -227,7 +227,7 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch } private def onApplicationStatusChanged(appId: Int, newStatus: ApplicationStatus, - timeStamp: TimeStamp, error: Throwable): Unit = { + timeStamp: MilliSeconds, error: Throwable): Unit = { applicationRegistry.get(appId) match { case Some(appRuntimeInfo) => if (appRuntimeInfo.status.canTransitTo(newStatus)) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala b/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala index ec9f1ba..1329127 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala @@ -18,7 +18,7 @@ package org.apache.gearpump.cluster.scheduler import akka.actor.{Actor, ActorRef} -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, UpdateResourceSucceed, WorkerRegistered} import org.apache.gearpump.cluster.WorkerToMaster.ResourceUpdate import org.apache.gearpump.cluster.master.Master.WorkerTerminated @@ -71,7 +71,7 @@ abstract class Scheduler extends Actor { object Scheduler { case class PendingRequest( - appId: Int, appMaster: ActorRef, request: ResourceRequest, timeStamp: TimeStamp) + appId: Int, appMaster: ActorRef, request: ResourceRequest, timeStamp: MilliSeconds) case class ApplicationFinished(appId: Int) } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/core/src/main/scala/org/apache/gearpump/package.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/package.scala b/core/src/main/scala/org/apache/gearpump/package.scala deleted file mode 100644 index 6e20277..0000000 --- a/core/src/main/scala/org/apache/gearpump/package.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache - -package object gearpump { - type TimeStamp = Long - - // maximum time won't overflow when converted to milli-seconds - val MAX_TIME_MILLIS: Long = Long.MaxValue - 1 - - // minimum time won't overflow when converted to milli-seconds - val MIN_TIME_MILLIS: Long = Long.MinValue -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala b/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala index ee59678..d45d761 100644 --- a/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala +++ b/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala @@ -25,7 +25,7 @@ import akka.actor.Actor import com.typesafe.config.Config import org.slf4j.Logger -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, ReadOption} import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem} import org.apache.gearpump.metrics.Metrics._ @@ -217,7 +217,7 @@ object HistoryMetricsService { add(inputMetrics, System.currentTimeMillis()) } - def add(inputMetrics: MetricType, now: TimeStamp): Unit = { + def add(inputMetrics: MetricType, now: MilliSeconds): Unit = { val metrics = HistoryMetricsItem(now, inputMetrics) latest = List(metrics) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/core/src/test/scala/org/apache/gearpump/util/RestartPolicySpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/util/RestartPolicySpec.scala b/core/src/test/scala/org/apache/gearpump/util/RestartPolicySpec.scala index 5d0c66d..2dcae2f 100644 --- a/core/src/test/scala/org/apache/gearpump/util/RestartPolicySpec.scala +++ b/core/src/test/scala/org/apache/gearpump/util/RestartPolicySpec.scala @@ -20,8 +20,6 @@ package org.apache.gearpump.util import org.scalatest.{FlatSpec, Matchers} -import scala.concurrent.duration._ - class RestartPolicySpec extends FlatSpec with Matchers { "RestartPolicy" should "forbid too many restarts" in { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala index 5794b1d..9b9bea7 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala @@ -21,10 +21,10 @@ package org.apache.gearpump.experiments.storm.producer import java.util.{List => JList} import backtype.storm.spout.{ISpout, ISpoutOutputCollector} -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.experiments.storm.util.StormOutputCollector -case class PendingMessage(id: Object, messageTime: TimeStamp, startTime: TimeStamp) +case class PendingMessage(id: Object, messageTime: MilliSeconds, startTime: MilliSeconds) /** * this is used by Storm Spout to emit messages @@ -57,7 +57,7 @@ private[storm] class StormSpoutOutputCollector( setPendingOrAck(messageId, curTime, curTime) } - def ackPendingMessage(checkpointClock: TimeStamp): Unit = { + def ackPendingMessage(checkpointClock: MilliSeconds): Unit = { this.checkpointClock = checkpointClock nextPendingMessage.foreach { case PendingMessage(_, messageTime, _) => if (messageTime <= this.checkpointClock) { @@ -83,7 +83,7 @@ private[storm] class StormSpoutOutputCollector( nextPendingMessage = None } - private def setPendingOrAck(messageId: Object, startTime: TimeStamp, messageTime: TimeStamp) + private def setPendingOrAck(messageId: Object, startTime: MilliSeconds, messageTime: MilliSeconds) : Unit = { if (ackEnabled) { val newPendingMessage = PendingMessage(messageId, messageTime, startTime) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala index 248ca44..6aa5dc9 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala @@ -43,7 +43,8 @@ import org.apache.gearpump.experiments.storm.util.{StormOutputCollector, StormUt import org.apache.gearpump.streaming.DAG import org.apache.gearpump.streaming.task.{GetDAG, TaskContext, TaskId} import org.apache.gearpump.util.{Constants, LogUtil} -import org.apache.gearpump.{Message, TimeStamp} +import org.apache.gearpump.Message +import org.apache.gearpump.Time.MilliSeconds import org.slf4j.Logger import scala.collection.JavaConverters._ @@ -149,7 +150,7 @@ object GearpumpStormComponent { } } - def checkpoint(clock: TimeStamp): Unit = { + def checkpoint(clock: MilliSeconds): Unit = { collector.ackPendingMessage(clock) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala index eb61acb..9f2fa1f 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala @@ -23,7 +23,7 @@ import java.util.{List => JList} import backtype.storm.task.GeneralTopologyContext import backtype.storm.tuple.{Tuple, TupleImpl} -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds /** * this carries Storm tuple values in the Gearpump world @@ -42,7 +42,7 @@ private[storm] class GearpumpTuple( * @param topologyContext topology context used for all tasks * @return a Tuple */ - def toTuple(topologyContext: GeneralTopologyContext, timestamp: TimeStamp): Tuple = { + def toTuple(topologyContext: GeneralTopologyContext, timestamp: MilliSeconds): Tuple = { TimedTuple(topologyContext, values, sourceTaskId, sourceStreamId, timestamp) } @@ -64,6 +64,6 @@ private[storm] class GearpumpTuple( } case class TimedTuple(topologyContext: GeneralTopologyContext, tuple: JList[AnyRef], - sourceTaskId: Integer, sourceStreamId: String, timestamp: TimeStamp) + sourceTaskId: Integer, sourceStreamId: String, timestamp: MilliSeconds) extends TupleImpl(topologyContext, tuple, sourceTaskId, sourceStreamId, null) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala index fd023a9..a95725e 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala @@ -28,7 +28,8 @@ import backtype.storm.task.TopologyContext import backtype.storm.tuple.Fields import backtype.storm.utils.Utils import org.slf4j.Logger -import org.apache.gearpump.{MIN_TIME_MILLIS, Message, TimeStamp} +import org.apache.gearpump.{Message, Time} +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.experiments.storm.topology.GearpumpTuple import org.apache.gearpump.experiments.storm.util.StormUtil._ import org.apache.gearpump.streaming.ProcessorId @@ -56,7 +57,7 @@ object StormOutputCollector { streamGroupers, componentToProcessorId, values) } new StormOutputCollector(stormTaskId, taskToComponent, targets, getTargetPartitionsFn, - taskContext, MIN_TIME_MILLIS) + taskContext, Time.MIN_TIME_MILLIS) } /** @@ -164,7 +165,7 @@ class StormOutputCollector( targets: JMap[String, JMap[String, Grouping]], getTargetPartitionsFn: (String, JList[AnyRef]) => (Map[String, Array[Int]], JList[Integer]), val taskContext: TaskContext, - private var timestamp: TimeStamp) { + private var timestamp: MilliSeconds) { import org.apache.gearpump.experiments.storm.util.StormOutputCollector._ /** @@ -213,7 +214,7 @@ class StormOutputCollector( /** * set timestamp from each incoming Message if not attached. */ - def setTimestamp(timestamp: TimeStamp): Unit = { + def setTimestamp(timestamp: MilliSeconds): Unit = { this.timestamp = timestamp } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala index f12e54f..dacbdfd 100644 --- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala @@ -21,7 +21,7 @@ import java.util.{List => JList} import backtype.storm.task.GeneralTopologyContext import backtype.storm.tuple.Fields -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.mockito.Mockito._ import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar @@ -40,7 +40,7 @@ class GearpumpTupleSpec extends PropSpec with PropertyChecks with Matchers with } yield new GearpumpTuple(values, new Integer(sourceTaskId), sourceStreamId, null) forAll(tupleGen, Gen.alphaStr, Gen.chooseNum[Long](0, Long.MaxValue)) { - (gearpumpTuple: GearpumpTuple, componentId: String, timestamp: TimeStamp) => + (gearpumpTuple: GearpumpTuple, componentId: String, timestamp: MilliSeconds) => val topologyContext = mock[GeneralTopologyContext] val fields = new Fields(gearpumpTuple.values.asScala.map(_.asInstanceOf[String]): _*) when(topologyContext.getComponentId(gearpumpTuple.sourceTaskId)).thenReturn(componentId) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/external/hadoopfs/README.md ---------------------------------------------------------------------- diff --git a/external/hadoopfs/README.md b/external/hadoopfs/README.md index 7a9aeef..b02c378 100644 --- a/external/hadoopfs/README.md +++ b/external/hadoopfs/README.md @@ -7,7 +7,7 @@ Gearpump components for interacting with HDFS file systems. 1. File Rotation interface ```scala trait Rotation extends Serializable { - def mark(timestamp: TimeStamp, offset: Long): Unit + def mark(timestamp: MilliSeconds, offset: Long): Unit def shouldRotate: Boolean def rotate: Unit } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala index e26a2ee..5f3ca74 100644 --- a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala +++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala @@ -23,7 +23,7 @@ import java.time.Instant import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.slf4j.Logger -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.streaming.hadoop.lib.rotation.Rotation import org.apache.gearpump.streaming.hadoop.lib.{HadoopCheckpointStoreReader, HadoopCheckpointStoreWriter} import org.apache.gearpump.streaming.transaction.api.CheckpointStore @@ -72,7 +72,7 @@ class HadoopCheckpointStore( * b. closes current writer and reset * c. rotation rotates */ - override def persist(timestamp: TimeStamp, checkpoint: Array[Byte]): Unit = { + override def persist(timestamp: MilliSeconds, checkpoint: Array[Byte]): Unit = { curTime = timestamp if (curWriter.isEmpty) { curStartTime = curTime @@ -110,7 +110,7 @@ class HadoopCheckpointStore( * 5. looks for the checkpoint in the found store * }}} */ - override def recover(timestamp: TimeStamp): Option[Array[Byte]] = { + override def recover(timestamp: MilliSeconds): Option[Array[Byte]] = { var checkpoint: Option[Array[Byte]] = None if (fs.exists(dir)) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala index 082e963..cce4b5d 100644 --- a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala +++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala @@ -23,15 +23,15 @@ import java.io.EOFException import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds class HadoopCheckpointStoreReader( path: Path, hadoopConfig: Configuration) - extends Iterator[(TimeStamp, Array[Byte])] { + extends Iterator[(MilliSeconds, Array[Byte])] { private val stream = HadoopUtil.getInputStream(path, hadoopConfig) - private var nextTimeStamp: Option[TimeStamp] = None + private var nextTimeStamp: Option[MilliSeconds] = None private var nextData: Option[Array[Byte]] = None override def hasNext: Boolean = { @@ -56,7 +56,7 @@ class HadoopCheckpointStoreReader( } } - override def next(): (TimeStamp, Array[Byte]) = { + override def next(): (MilliSeconds, Array[Byte]) = { val timeAndData = for { time <- nextTimeStamp data <- nextData http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala index 11c12c4..ce7154a 100644 --- a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala +++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala @@ -21,12 +21,12 @@ package org.apache.gearpump.streaming.hadoop.lib import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds class HadoopCheckpointStoreWriter(path: Path, hadoopConfig: Configuration) { private lazy val stream = HadoopUtil.getOutputStream(path, hadoopConfig) - def write(timestamp: TimeStamp, data: Array[Byte]): Long = { + def write(timestamp: MilliSeconds, data: Array[Byte]): Long = { stream.writeLong(timestamp) stream.writeInt(data.length) stream.write(data) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala b/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala index a469956..8d0170e 100644 --- a/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala +++ b/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala @@ -23,7 +23,7 @@ import java.time.Instant import org.scalacheck.Gen import org.scalatest.prop.PropertyChecks import org.scalatest.{Matchers, PropSpec} -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds class FileSizeRotationSpec extends PropSpec with PropertyChecks with Matchers { @@ -31,7 +31,7 @@ class FileSizeRotationSpec extends PropSpec with PropertyChecks with Matchers { val fileSizeGen = Gen.chooseNum[Long](1, Long.MaxValue) property("FileSize rotation rotates on file size") { - forAll(timestampGen, fileSizeGen) { (timestamp: TimeStamp, fileSize: Long) => + forAll(timestampGen, fileSizeGen) { (timestamp: MilliSeconds, fileSize: Long) => val rotation = new FileSizeRotation(fileSize) rotation.shouldRotate shouldBe false rotation.mark(Instant.ofEpochMilli(timestamp), rotation.maxBytes / 2) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala index d5a8729..6633bf4 100644 --- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala @@ -36,7 +36,8 @@ import org.apache.gearpump.streaming.kafka.util.KafkaConfig.KafkaConfigFactory import org.apache.gearpump.streaming.task.TaskContext import org.apache.gearpump.streaming.transaction.api._ import org.apache.gearpump.util.LogUtil -import org.apache.gearpump.{Message, TimeStamp} +import org.apache.gearpump.Message +import org.apache.gearpump.Time.MilliSeconds import org.slf4j.Logger object AbstractKafkaSource { @@ -147,7 +148,7 @@ abstract class AbstractKafkaSource( } } - private def maybeRecover(startTime: TimeStamp): Unit = { + private def maybeRecover(startTime: MilliSeconds): Unit = { checkpointStores.foreach { case (tp, store) => for { bytes <- store.recover(startTime) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/store/KafkaStore.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/store/KafkaStore.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/store/KafkaStore.scala index e2450f4..dbbd0ea 100644 --- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/store/KafkaStore.scala +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/store/KafkaStore.scala @@ -22,7 +22,7 @@ import java.util.Properties import com.twitter.bijection.Injection import kafka.api.OffsetRequest -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.streaming.kafka.lib.source.consumer.KafkaConsumer import org.apache.gearpump.streaming.kafka.util.KafkaConfig import org.apache.gearpump.streaming.kafka.util.KafkaConfig.KafkaConfigFactory @@ -82,9 +82,9 @@ class KafkaStore private[kafka]( extends CheckpointStore { import org.apache.gearpump.streaming.kafka.lib.store.KafkaStore._ - private var maxTime: TimeStamp = 0L + private var maxTime: MilliSeconds = 0L - override def persist(time: TimeStamp, checkpoint: Array[Byte]): Unit = { + override def persist(time: MilliSeconds, checkpoint: Array[Byte]): Unit = { // make sure checkpointed timestamp is monotonically increasing // hence (1, 1), (3, 2), (2, 3) is checkpointed as (1, 1), (3, 2), (3, 3) if (time > maxTime) { @@ -98,14 +98,14 @@ class KafkaStore private[kafka]( LOG.debug("KafkaStore persisted state ({}, {})", key, value) } - override def recover(time: TimeStamp): Option[Array[Byte]] = { + override def recover(time: MilliSeconds): Option[Array[Byte]] = { var checkpoint: Option[Array[Byte]] = None optConsumer.foreach { consumer => while (consumer.hasNext && checkpoint.isEmpty) { val kafkaMsg = consumer.next() checkpoint = for { k <- kafkaMsg.key - t <- Injection.invert[TimeStamp, Array[Byte]](k).toOption + t <- Injection.invert[MilliSeconds, Array[Byte]](k).toOption c = kafkaMsg.msg if t >= time } yield c } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala index 67c64c4..da99d64 100644 --- a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala +++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala @@ -22,7 +22,7 @@ import java.util.Properties import com.twitter.bijection.Injection import kafka.api.OffsetRequest import kafka.common.TopicAndPartition -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.streaming.MockUtil import org.apache.gearpump.streaming.kafka.lib.source.consumer.{KafkaMessage, KafkaConsumer} import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient @@ -92,7 +92,7 @@ class KafkaStoreSpec extends PropSpec with PropertyChecks with Matchers with Moc property("KafkaStore should read checkpoint from timestamp on recover") { forAll(Gen.alphaStr, timestampGen) { - (topic: String, recoverTime: TimeStamp) => + (topic: String, recoverTime: MilliSeconds) => val consumer = mock[KafkaConsumer] val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] val kafkaStore = new KafkaStore(topic, producer, Some(consumer)) @@ -104,7 +104,7 @@ class KafkaStoreSpec extends PropSpec with PropertyChecks with Matchers with Moc } forAll(Gen.alphaStr, timestampGen) { - (topic: String, recoverTime: TimeStamp) => + (topic: String, recoverTime: MilliSeconds) => val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] val kafkaStore = new KafkaStore(topic, producer, None) @@ -113,12 +113,12 @@ class KafkaStoreSpec extends PropSpec with PropertyChecks with Matchers with Moc } forAll(Gen.alphaStr, timestampGen, timestampGen) { - (topic: String, recoverTime: TimeStamp, checkpointTime: TimeStamp) => + (topic: String, recoverTime: MilliSeconds, checkpointTime: MilliSeconds) => val consumer = mock[KafkaConsumer] val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] val kafkaStore = new KafkaStore(topic, producer, Some(consumer)) - val key = Injection[TimeStamp, Array[Byte]](checkpointTime) + val key = Injection[MilliSeconds, Array[Byte]](checkpointTime) val msg = key val kafkaMsg = KafkaMessage(TopicAndPartition(topic, 0), 0, Some(key), msg) @@ -139,7 +139,7 @@ class KafkaStoreSpec extends PropSpec with PropertyChecks with Matchers with Moc property("KafkaStore persist should write checkpoint with monotonically increasing timestamp") { forAll(Gen.alphaStr, timestampGen, Gen.alphaStr) { - (topic: String, checkpointTime: TimeStamp, data: String) => + (topic: String, checkpointTime: MilliSeconds, data: String) => val consumer = mock[KafkaConsumer] val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] val kafkaStore = new KafkaStore(topic, producer, Some(consumer)) @@ -155,12 +155,12 @@ class KafkaStoreSpec extends PropSpec with PropertyChecks with Matchers with Moc } def verifyProducer(producer: Producer[Array[Byte], Array[Byte]], count: Int, - topic: String, partition: Int, time: TimeStamp, data: String): Unit = { + topic: String, partition: Int, time: MilliSeconds, data: String): Unit = { verify(producer, times(count)).send( MockUtil.argMatch[ProducerRecord[Array[Byte], Array[Byte]]](record => record.topic() == topic && record.partition() == partition - && Injection.invert[TimeStamp, Array[Byte]](record.key()).get == time + && Injection.invert[MilliSeconds, Array[Byte]](record.key()).get == time && Injection.invert[String, Array[Byte]](record.value()).get == data )) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala index d85d042..f0e4f84 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala @@ -20,7 +20,7 @@ package org.apache.gearpump.streaming import akka.actor.ActorRef -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.cluster.appmaster.WorkerInfo import org.apache.gearpump.cluster.scheduler.Resource import org.apache.gearpump.streaming.appmaster.TaskRegistry.TaskLocations @@ -61,7 +61,7 @@ object AppMasterToExecutor { case class StartAllTasks(dagVersion: Int) case class StartDynamicDag(dagVersion: Int) - case class TaskRegistered(taskId: TaskId, sessionId: Int, startClock: TimeStamp) + case class TaskRegistered(taskId: TaskId, sessionId: Int, startClock: MilliSeconds) case class TaskRejected(taskId: TaskId) case object RestartClockService http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala index 125612b..f15e1b3 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala @@ -20,7 +20,8 @@ package org.apache.gearpump.streaming import scala.reflect.ClassTag import akka.actor.ActorSystem -import org.apache.gearpump.{MAX_TIME_MILLIS, MIN_TIME_MILLIS, TimeStamp} +import org.apache.gearpump.Time +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.cluster._ import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner, PartitionerDescription, PartitionerObject} import org.apache.gearpump.streaming.appmaster.AppMaster @@ -101,8 +102,8 @@ object Processor { * When input message's timestamp is beyond current processor's lifetime, * then it will not be processed by this processor. */ -case class LifeTime(birth: TimeStamp, death: TimeStamp) { - def contains(timestamp: TimeStamp): Boolean = { +case class LifeTime(birth: MilliSeconds, death: MilliSeconds) { + def contains(timestamp: MilliSeconds): Boolean = { timestamp >= birth && timestamp < death } @@ -112,8 +113,7 @@ case class LifeTime(birth: TimeStamp, death: TimeStamp) { } object LifeTime { - // MAX_TIME_MILLIS is Long.MaxValue - 1 - val Immortal = LifeTime(MIN_TIME_MILLIS, MAX_TIME_MILLIS + 1) + val Immortal = LifeTime(Time.MIN_TIME_MILLIS, Time.UNREACHABLE) } /** @@ -158,7 +158,7 @@ object StreamApplication { val graph = dag.mapVertex { processor => val updatedProcessor = ProcessorToProcessorDescription(indices(processor), processor) updatedProcessor - }.mapEdge { (node1, edge, node2) => + }.mapEdge { (_, edge, _) => PartitionerDescription(new PartitionerObject( Option(edge).getOrElse(StreamApplication.hashPartitioner))) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala index ba4b058..3c5c7da 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala @@ -21,7 +21,7 @@ package org.apache.gearpump.streaming.appmaster import java.lang.management.ManagementFactory import akka.actor._ -import org.apache.gearpump._ +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.cluster.AppMasterToMaster.ApplicationStatusChanged import org.apache.gearpump.cluster.ClientToMaster._ import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterActivated, AppMasterDataDetailRequest, ReplayFromTimestampWindowTrailingEdge} @@ -67,7 +67,7 @@ class AppMaster(appContext: AppMasterContext, app: AppDescription) extends Appli import akka.pattern.ask private implicit val dispatcher = context.dispatcher - private val startTime: TimeStamp = System.currentTimeMillis() + private val startTime: MilliSeconds = System.currentTimeMillis() private val LOG: Logger = LogUtil.getLogger(getClass, app = appId) LOG.info(s"AppMaster[$appId] is launched by $username, app: $app xxxxxxxxxxxxxxxxx") @@ -322,7 +322,7 @@ class AppMaster(appContext: AppMasterContext, app: AppDescription) extends Appli context.stop(self) } - private def getMinClock: Future[TimeStamp] = { + private def getMinClock: Future[MilliSeconds] = { clockService match { case Some(service) => (service ? GetLatestMinClock).asInstanceOf[Future[LatestMinClock]].map(_.clock) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala index b514d6f..90141d4 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala @@ -25,7 +25,8 @@ import java.util.concurrent.TimeUnit import akka.actor.{Actor, ActorRef, Cancellable, Stash} import com.google.common.primitives.Longs -import org.apache.gearpump.{MIN_TIME_MILLIS, TimeStamp} +import org.apache.gearpump.Time +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.cluster.ClientToMaster.GetStallingTasks import org.apache.gearpump.streaming.AppMasterToMaster.StallingTasks import org.apache.gearpump.streaming._ @@ -60,8 +61,8 @@ class ClockService( LOG.info("Initializing Clock service, get snapshotted StartClock ....") store.get(START_CLOCK).map { clock => // check for null first since - // (null).asInstanceOf[TimeStamp] is zero - val startClock = if (clock != null) clock.asInstanceOf[TimeStamp] else MIN_TIME_MILLIS + // (null).asInstanceOf[MilliSeconds] is zero + val startClock = if (clock != null) clock.asInstanceOf[MilliSeconds] else Time.MIN_TIME_MILLIS minCheckpointClock = Some(startClock) @@ -88,32 +89,32 @@ class ClockService( // We use Array instead of List for Performance consideration private var processorClocks = Array.empty[ProcessorClock] - private var checkpointClocks: Map[TaskId, Vector[TimeStamp]] = _ + private var checkpointClocks: Map[TaskId, Vector[MilliSeconds]] = _ - private var minCheckpointClock: Option[TimeStamp] = None + private var minCheckpointClock: Option[MilliSeconds] = None private def checkpointEnabled(processor: ProcessorDescription): Boolean = { val taskConf = processor.taskConf taskConf != null && taskConf.getBoolean("state.checkpoint.enable").contains(true) } - private def resetCheckpointClocks(dag: DAG, startClock: TimeStamp): Unit = { + private def resetCheckpointClocks(dag: DAG, startClock: MilliSeconds): Unit = { this.checkpointClocks = dag.processors.filter(startClock < _._2.life.death) .filter { case (_, processor) => checkpointEnabled(processor) }.flatMap { case (id, processor) => - (0 until processor.parallelism).map(TaskId(id, _) -> Vector.empty[TimeStamp]) + (0 until processor.parallelism).map(TaskId(id, _) -> Vector.empty[MilliSeconds]) } if (this.checkpointClocks.isEmpty) { minCheckpointClock = None } } - private def initDag(startClock: TimeStamp): Unit = { + private def initDag(startClock: MilliSeconds): Unit = { recoverDag(this.dag, startClock) } - private def recoverDag(dag: DAG, startClock: TimeStamp): Unit = { + private def recoverDag(dag: DAG, startClock: MilliSeconds): Unit = { this.clocks = dag.processors.filter(startClock < _._2.life.death). map { pair => val (processorId, processor) = pair @@ -130,7 +131,7 @@ class ClockService( resetCheckpointClocks(dag, startClock) } - private def dynamicDAG(dag: DAG, startClock: TimeStamp): Unit = { + private def dynamicDAG(dag: DAG, startClock: MilliSeconds): Unit = { val newClocks = dag.processors.filter(startClock < _._2.life.death). map { pair => val (processorId, processor) = pair @@ -207,7 +208,7 @@ class ClockService( } } - private def getUpStreamMinClock(processorId: ProcessorId): Option[TimeStamp] = { + private def getUpStreamMinClock(processorId: ProcessorId): Option[MilliSeconds] = { upstreamClocks.get(processorId).map(ProcessorClocks.minClock) } @@ -303,7 +304,7 @@ class ClockService( } } - private def minClock: TimeStamp = { + private def minClock: MilliSeconds = { ProcessorClocks.minClock(processorClocks) } @@ -313,7 +314,7 @@ class ClockService( healthChecker.check(minTimestamp, clocks, dag, System.currentTimeMillis()) } - private def getStartClock: TimeStamp = { + private def getStartClock: MilliSeconds = { minCheckpointClock.getOrElse(minClock) } @@ -321,7 +322,7 @@ class ClockService( store.put(START_CLOCK, getStartClock) } - private def updateCheckpointClocks(task: TaskId, time: TimeStamp): Unit = { + private def updateCheckpointClocks(task: TaskId, time: MilliSeconds): Unit = { val clocks = checkpointClocks(task) :+ time checkpointClocks += task -> clocks @@ -340,17 +341,17 @@ object ClockService { case object HealthCheck class ProcessorClock(val processorId: ProcessorId, val life: LifeTime, val parallelism: Int, - private var _min: TimeStamp = MIN_TIME_MILLIS, - private var _taskClocks: Array[TimeStamp] = null) { + private var _min: MilliSeconds = Time.MIN_TIME_MILLIS, + private var _taskClocks: Array[MilliSeconds] = null) { def copy(life: LifeTime): ProcessorClock = { new ProcessorClock(processorId, life, parallelism, _min, _taskClocks) } - def min: TimeStamp = _min - def taskClocks: Array[TimeStamp] = _taskClocks + def min: MilliSeconds = _min + def taskClocks: Array[MilliSeconds] = _taskClocks - def init(startClock: TimeStamp): Unit = { + def init(startClock: MilliSeconds): Unit = { if (taskClocks == null) { this._min = startClock this._taskClocks = new Array(parallelism) @@ -358,7 +359,7 @@ object ClockService { } } - def updateMinClock(taskIndex: Int, clock: TimeStamp): Unit = { + def updateMinClock(taskIndex: Int, clock: MilliSeconds): Unit = { taskClocks(taskIndex) = clock _min = Longs.min(taskClocks: _*) } @@ -381,8 +382,8 @@ object ClockService { /** Check for stalling tasks */ def check( - currentMinClock: TimeStamp, processorClocks: Map[ProcessorId, ProcessorClock], - dag: DAG, now: TimeStamp): Unit = { + currentMinClock: MilliSeconds, processorClocks: Map[ProcessorId, ProcessorClock], + dag: DAG, now: MilliSeconds): Unit = { var isClockStalling = false if (null == minClock || currentMinClock > minClock.appClock) { minClock = ClockValue(systemClock = now, appClock = currentMinClock) @@ -423,7 +424,7 @@ object ClockService { } object HealthChecker { - case class ClockValue(systemClock: TimeStamp, appClock: TimeStamp) { + case class ClockValue(systemClock: MilliSeconds, appClock: MilliSeconds) { def prettyPrint: String = { "(system clock: " + new Date(systemClock).toString + ", app clock: " + appClock + ")" } @@ -433,7 +434,7 @@ object ClockService { object ProcessorClocks { // Get the Min clock of all processors - def minClock(clock: Array[ProcessorClock]): TimeStamp = { + def minClock(clock: Array[ProcessorClock]): MilliSeconds = { var i = 0 var min = if (clock.length == 0) 0L else clock(0).min while (i < clock.length) { @@ -445,7 +446,7 @@ object ClockService { } case class ChangeToNewDAG(dag: DAG) - case class ChangeToNewDAGSuccess(clocks: Map[ProcessorId, TimeStamp]) + case class ChangeToNewDAGSuccess(clocks: Map[ProcessorId, MilliSeconds]) - case class StoredStartClock(clock: TimeStamp) + case class StoredStartClock(clock: MilliSeconds) } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala index e023cdf..e31f863 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala @@ -20,7 +20,7 @@ package org.apache.gearpump.streaming.appmaster import akka.actor._ import akka.pattern.ask import com.typesafe.config.Config -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.cluster.AppJar import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest} import org.apache.gearpump.cluster.worker.WorkerId @@ -47,7 +47,7 @@ class JarScheduler(appId: Int, appName: String, config: Config, factory: ActorRe private implicit val timeout = Constants.FUTURE_TIMEOUT /** Set the current DAG version active */ - def setDag(dag: DAG, startClock: Future[TimeStamp]): Unit = { + def setDag(dag: DAG, startClock: Future[MilliSeconds]): Unit = { actor ! TransitToNewDag startClock.map { start => actor ! NewDag(dag, start) @@ -82,7 +82,7 @@ object JarScheduler { case class ResourceRequestDetail(jar: AppJar, requests: Array[ResourceRequest]) - case class NewDag(dag: DAG, startTime: TimeStamp) + case class NewDag(dag: DAG, startTime: MilliSeconds) case object TransitToNewDag http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala index 126ab92..1214cd0 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala @@ -18,7 +18,7 @@ package org.apache.gearpump.streaming.appmaster -import org.apache.gearpump._ +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.cluster.AppMasterToMaster.AppMasterSummary import org.apache.gearpump.cluster.{ApplicationStatus, UserConfig} import org.apache.gearpump.streaming.appmaster.AppMaster.ExecutorBrief @@ -32,10 +32,10 @@ case class StreamAppMasterSummary( appId: Int, appName: String = null, actorPath: String = null, - clock: TimeStamp = 0L, + clock: MilliSeconds = 0L, status: ApplicationStatus = ApplicationStatus.ACTIVE, - startTime: TimeStamp = 0L, - uptime: TimeStamp = 0L, + startTime: MilliSeconds = 0L, + uptime: MilliSeconds = 0L, user: String = null, homeDirectory: String = "", logFile: String = "", http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala index 51c4de9..bae5c02 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala @@ -20,7 +20,7 @@ package org.apache.gearpump.streaming.appmaster import akka.actor._ import akka.pattern.ask -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.cluster.MasterToAppMaster.ReplayFromTimestampWindowTrailingEdge import org.apache.gearpump.streaming.AppMasterToExecutor._ import org.apache.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, RegisterTask, UnRegisterTask} @@ -86,11 +86,11 @@ private[appmaster] class TaskManager( dagManager ! WatchChange(watcher = self) executorManager ! SetTaskManager(self) - private def getStartClock: Future[TimeStamp] = { + private def getStartClock: Future[MilliSeconds] = { (clockService ? GetStartClock).asInstanceOf[Future[StartClock]].map(_.clock) } - private var startClock: Future[TimeStamp] = getStartClock + private var startClock: Future[MilliSeconds] = getStartClock def receive: Receive = applicationReady(DagReadyState.empty) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala index a2f51c7..85ca969 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala @@ -19,7 +19,8 @@ package org.apache.gearpump.streaming.dsl.window.api import java.time.{Duration, Instant} -import org.apache.gearpump.{MIN_TIME_MILLIS, MAX_TIME_MILLIS, TimeStamp} +import org.apache.gearpump.Time +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.streaming.dsl.window.impl.Window import scala.collection.mutable.ArrayBuffer @@ -46,8 +47,8 @@ abstract class NonMergingWindowFunction extends WindowFunction { object GlobalWindowFunction { - val globalWindow = Array(Window(Instant.ofEpochMilli(MIN_TIME_MILLIS), - Instant.ofEpochMilli(MAX_TIME_MILLIS))) + val globalWindow = Array(Window(Instant.ofEpochMilli(Time.MIN_TIME_MILLIS), + Instant.ofEpochMilli(Time.MAX_TIME_MILLIS))) } case class GlobalWindowFunction() extends NonMergingWindowFunction { @@ -80,7 +81,7 @@ case class SlidingWindowFunction(size: Duration, step: Duration) windows.toArray } - private def lastStartFor(timestamp: TimeStamp, windowStep: Long): TimeStamp = { + private def lastStartFor(timestamp: MilliSeconds, windowStep: Long): MilliSeconds = { timestamp - (timestamp + windowStep) % windowStep } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala index 2425ff2..7536473 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala @@ -19,10 +19,10 @@ package org.apache.gearpump.streaming.dsl.window.impl import java.time.Instant -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds object Window { - def ofEpochMilli(startTime: TimeStamp, endTime: TimeStamp): Window = { + def ofEpochMilli(startTime: MilliSeconds, endTime: MilliSeconds): Window = { Window(Instant.ofEpochMilli(startTime), Instant.ofEpochMilli(endTime)) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala index 8f8b7ab..058d36b 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala @@ -22,7 +22,7 @@ import java.util import com.google.common.collect.Iterators import com.typesafe.config.Config -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.cluster.ClientToMaster.ReadOption import org.apache.gearpump.cluster.MasterToClient.HistoryMetricsItem import org.apache.gearpump.metrics.Metrics.{Histogram, Meter} @@ -64,7 +64,7 @@ class ProcessorAggregator(historyMetricConfig: HistoryMetricsConfig) extends Met } def aggregate( - readOption: ReadOption.ReadOption, inputs: Iterator[HistoryMetricsItem], now: TimeStamp) + readOption: ReadOption.ReadOption, inputs: Iterator[HistoryMetricsItem], now: MilliSeconds) : List[HistoryMetricsItem] = { val (start, end, interval) = getTimeRange(readOption, now) val timeSlotsCount = ((end - start - 1) / interval + 1).toInt @@ -103,8 +103,8 @@ class ProcessorAggregator(historyMetricConfig: HistoryMetricsConfig) extends Met } // Returns (start, end, interval) - private def getTimeRange(readOption: ReadOption.ReadOption, now: TimeStamp) - : (TimeStamp, TimeStamp, TimeStamp) = { + private def getTimeRange(readOption: ReadOption.ReadOption, now: MilliSeconds) + : (MilliSeconds, MilliSeconds, MilliSeconds) = { readOption match { case ReadOption.ReadRecent => val end = now @@ -229,7 +229,7 @@ object ProcessorAggregator { var p99: Double = 0 var p999: Double = 0 - var startTime: TimeStamp = Long.MaxValue + var startTime: MilliSeconds = Long.MaxValue override def aggregate(item: HistoryMetricsItem): Unit = { val input = item.value.asInstanceOf[Histogram] @@ -263,7 +263,7 @@ object ProcessorAggregator { var m1: Double = 0 var rateUnit: String = null - var startTime: TimeStamp = Long.MaxValue + var startTime: MilliSeconds = Long.MaxValue override def aggregate(item: HistoryMetricsItem): Unit = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala index 14abff8..607af85 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala @@ -19,7 +19,7 @@ package org.apache.gearpump.streaming.source import java.time.Instant -import org.apache.gearpump.{MAX_TIME_MILLIS, MIN_TIME_MILLIS, Message} +import org.apache.gearpump.{Message, Time} /** * message used by source task to report source watermark. @@ -28,9 +28,14 @@ case class Watermark(instant: Instant) { def toMessage: Message = Message("watermark", instant) } +/** + * All input data with event times less than watermark have been observed + */ object Watermark { - val MAX: Instant = Instant.ofEpochMilli(MAX_TIME_MILLIS + 1) + // all input data have been observed + val MAX: Instant = Instant.ofEpochMilli(Time.MAX_TIME_MILLIS + 1) - val MIN: Instant = Instant.ofEpochMilli(MIN_TIME_MILLIS) + // no input data have been observed + val MIN: Instant = Instant.ofEpochMilli(Time.MIN_TIME_MILLIS) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala index 0e2f83a..0118c07 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala @@ -18,7 +18,7 @@ package org.apache.gearpump.streaming.state.api -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds /** * MonoidState uses Algebird Monoid to aggregate state @@ -37,11 +37,11 @@ abstract class MonoidState[T](monoid: Monoid[T]) extends PersistentState[T] { override def get: Option[T] = Option(monoid.plus(left, right)) - override def setNextCheckpointTime(nextCheckpointTime: TimeStamp): Unit = { + override def setNextCheckpointTime(nextCheckpointTime: MilliSeconds): Unit = { checkpointTime = nextCheckpointTime } - protected def updateState(timestamp: TimeStamp, t: T): Unit = { + protected def updateState(timestamp: MilliSeconds, t: T): Unit = { if (timestamp < checkpointTime) { left = monoid.plus(left, t) } else { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala index 906d331..39b17c9 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala @@ -18,7 +18,7 @@ package org.apache.gearpump.streaming.state.api -import org.apache.gearpump._ +import org.apache.gearpump.Time.MilliSeconds /** * PersistentState is part of the transaction API @@ -33,19 +33,19 @@ trait PersistentState[T] { * Recovers state to a previous checkpoint * usually invoked by the framework */ - def recover(timestamp: TimeStamp, bytes: Array[Byte]): Unit + def recover(timestamp: MilliSeconds, bytes: Array[Byte]): Unit /** * Updates state on a new message * this is invoked by user */ - def update(timestamp: TimeStamp, t: T): Unit + def update(timestamp: MilliSeconds, t: T): Unit /** * Sets next checkpoint time * should be invoked by the framework */ - def setNextCheckpointTime(timeStamp: TimeStamp): Unit + def setNextCheckpointTime(timeStamp: MilliSeconds): Unit /** * Gets a binary snapshot of state http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala index df37ba1..3a3b0a7 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala @@ -20,12 +20,13 @@ package org.apache.gearpump.streaming.state.api import java.time.Instant +import org.apache.gearpump.Message +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.state.impl.{CheckpointManager, PersistentStateConfig} import org.apache.gearpump.streaming.task.{Task, TaskContext, UpdateCheckpointClock} import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory import org.apache.gearpump.util.LogUtil -import org.apache.gearpump.{Message, TimeStamp} object PersistentTask { val LOG = LogUtil.getLogger(getClass) @@ -97,7 +98,7 @@ abstract class PersistentTask[T](taskContext: TaskContext, conf: UserConfig) checkpointManager.close() } - private def reportCheckpointClock(timestamp: TimeStamp): Unit = { + private def reportCheckpointClock(timestamp: MilliSeconds): Unit = { appMaster ! UpdateCheckpointClock(taskContext.taskId, timestamp) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala index 82b7952..7d9e92a 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala @@ -18,7 +18,7 @@ package org.apache.gearpump.streaming.state.impl -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.streaming.transaction.api.CheckpointStore /** Manage physical checkpoints to persitent storage like HDFS */ @@ -28,11 +28,11 @@ class CheckpointManager(checkpointInterval: Long, private var maxMessageTime: Long = 0L private var checkpointTime: Option[Long] = None - def recover(timestamp: TimeStamp): Option[Array[Byte]] = { + def recover(timestamp: MilliSeconds): Option[Array[Byte]] = { checkpointStore.recover(timestamp) } - def checkpoint(timestamp: TimeStamp, checkpoint: Array[Byte]): Option[TimeStamp] = { + def checkpoint(timestamp: MilliSeconds, checkpoint: Array[Byte]): Option[MilliSeconds] = { checkpointStore.persist(timestamp, checkpoint) checkpointTime = checkpointTime.collect { case time if maxMessageTime > time => time + (1 + (maxMessageTime - time) / checkpointInterval) * checkpointInterval @@ -41,7 +41,7 @@ class CheckpointManager(checkpointInterval: Long, checkpointTime } - def update(messageTime: TimeStamp): Option[TimeStamp] = { + def update(messageTime: MilliSeconds): Option[MilliSeconds] = { maxMessageTime = Math.max(maxMessageTime, messageTime) if (checkpointTime.isEmpty) { checkpointTime = Some((1 + messageTime / checkpointInterval) * checkpointInterval) @@ -50,15 +50,15 @@ class CheckpointManager(checkpointInterval: Long, checkpointTime } - def shouldCheckpoint(upstreamMinClock: TimeStamp): Boolean = { + def shouldCheckpoint(upstreamMinClock: MilliSeconds): Boolean = { checkpointTime.exists(time => upstreamMinClock >= time) } - def getCheckpointTime: Option[TimeStamp] = checkpointTime + def getCheckpointTime: Option[MilliSeconds] = checkpointTime def close(): Unit = { checkpointStore.close() } - private[impl] def getMaxMessageTime: TimeStamp = maxMessageTime + private[impl] def getMaxMessageTime: MilliSeconds = maxMessageTime } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala index 8853d07..cecf127 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala @@ -18,7 +18,7 @@ package org.apache.gearpump.streaming.state.impl -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.streaming.transaction.api.{CheckpointStore, CheckpointStoreFactory} /** @@ -26,18 +26,18 @@ import org.apache.gearpump.streaming.transaction.api.{CheckpointStore, Checkpoin * should not be used in real cases */ class InMemoryCheckpointStore extends CheckpointStore { - private var checkpoints = Map.empty[TimeStamp, Array[Byte]] + private var checkpoints = Map.empty[MilliSeconds, Array[Byte]] - override def persist(timestamp: TimeStamp, checkpoint: Array[Byte]): Unit = { + override def persist(timestamp: MilliSeconds, checkpoint: Array[Byte]): Unit = { checkpoints += timestamp -> checkpoint } - override def recover(timestamp: TimeStamp): Option[Array[Byte]] = { + override def recover(timestamp: MilliSeconds): Option[Array[Byte]] = { checkpoints.get(timestamp) } override def close(): Unit = { - checkpoints = Map.empty[TimeStamp, Array[Byte]] + checkpoints = Map.empty[MilliSeconds, Array[Byte]] } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/NonWindowState.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/NonWindowState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/NonWindowState.scala index b161713..1393f4a 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/NonWindowState.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/NonWindowState.scala @@ -20,7 +20,7 @@ package org.apache.gearpump.streaming.state.impl import org.slf4j.Logger -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.streaming.state.api.{Monoid, MonoidState, Serializer} import org.apache.gearpump.streaming.state.impl.NonWindowState._ import org.apache.gearpump.util.LogUtil @@ -35,11 +35,11 @@ object NonWindowState { class NonWindowState[T](monoid: Monoid[T], serializer: Serializer[T]) extends MonoidState[T](monoid) { - override def recover(timestamp: TimeStamp, bytes: Array[Byte]): Unit = { + override def recover(timestamp: MilliSeconds, bytes: Array[Byte]): Unit = { serializer.deserialize(bytes).foreach(left = _) } - override def update(timestamp: TimeStamp, t: T): Unit = { + override def update(timestamp: MilliSeconds, t: T): Unit = { updateState(timestamp, t) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/Window.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/Window.scala index c1f647e..0318d3d 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/Window.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/Window.scala @@ -17,7 +17,7 @@ */ package org.apache.gearpump.streaming.state.impl -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds /** * Used in window applications @@ -29,10 +29,10 @@ class Window(val windowSize: Long, val windowStep: Long) { this(windowConfig.windowSize, windowConfig.windowStep) } - private var clock: TimeStamp = 0L + private var clock: MilliSeconds = 0L private var startTime = 0L - def update(clock: TimeStamp): Unit = { + def update(clock: MilliSeconds): Unit = { this.clock = clock } @@ -40,7 +40,7 @@ class Window(val windowSize: Long, val windowStep: Long) { startTime += windowStep } - def slideTo(timestamp: TimeStamp): Unit = { + def slideTo(timestamp: MilliSeconds): Unit = { startTime = timestamp / windowStep * windowStep } @@ -48,7 +48,7 @@ class Window(val windowSize: Long, val windowStep: Long) { clock >= (startTime + windowSize) } - def range: (TimeStamp, TimeStamp) = { + def range: (MilliSeconds, MilliSeconds) = { startTime -> (startTime + windowSize) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowState.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowState.scala index 348f09e..a73b6db 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowState.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowState.scala @@ -22,7 +22,7 @@ import scala.collection.immutable.TreeMap import org.slf4j.Logger -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.streaming.state.api.{Group, MonoidState, Serializer} import org.apache.gearpump.streaming.state.impl.WindowState._ import org.apache.gearpump.streaming.task.TaskContext @@ -31,7 +31,7 @@ import org.apache.gearpump.util.LogUtil /** * an interval is a dynamic time range that is divided by window boundary and checkpoint time */ -case class Interval(startTime: TimeStamp, endTime: TimeStamp) extends Ordered[Interval] { +case class Interval(startTime: MilliSeconds, endTime: MilliSeconds) extends Ordered[Interval] { override def compare(that: Interval): Int = { if (startTime < that.startTime) -1 else if (startTime > that.startTime) 1 @@ -63,7 +63,7 @@ class WindowState[T](group: Group[T], private var lastCheckpointTime = 0L - override def recover(timestamp: TimeStamp, bytes: Array[Byte]): Unit = { + override def recover(timestamp: MilliSeconds, bytes: Array[Byte]): Unit = { window.slideTo(timestamp) serializer.deserialize(bytes) .foreach { states => @@ -74,7 +74,7 @@ class WindowState[T](group: Group[T], } } - override def update(timestamp: TimeStamp, t: T): Unit = { + override def update(timestamp: MilliSeconds, t: T): Unit = { val (startTime, endTime) = window.range if (timestamp >= startTime && timestamp < endTime) { updateState(timestamp, t) @@ -127,7 +127,7 @@ class WindowState[T](group: Group[T], * upperBound2 = step * Nmin2 + size > t * }}} */ - private[impl] def getInterval(timestamp: TimeStamp, checkpointTime: TimeStamp): Interval = { + private[impl] def getInterval(timestamp: MilliSeconds, checkpointTime: MilliSeconds): Interval = { val windowSize = window.windowSize val windowStep = window.windowStep val lowerBound1 = timestamp / windowStep * windowStep @@ -147,8 +147,8 @@ class WindowState[T](group: Group[T], } } - private[impl] def updateIntervalStates(timestamp: TimeStamp, t: T, checkpointTime: TimeStamp) - : Unit = { + private[impl] def updateIntervalStates(timestamp: MilliSeconds, t: T, + checkpointTime: MilliSeconds): Unit = { val interval = getInterval(timestamp, checkpointTime) intervalStates.get(interval) match { case Some(st) => @@ -158,7 +158,7 @@ class WindowState[T](group: Group[T], } } - private[impl] def getIntervalStates(startTime: TimeStamp, endTime: TimeStamp) + private[impl] def getIntervalStates(startTime: MilliSeconds, endTime: MilliSeconds) : TreeMap[Interval, T] = { intervalStates.dropWhile(_._1.endTime <= startTime).takeWhile(_._1.endTime <= endTime) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializedMessage.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializedMessage.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializedMessage.scala index 675d5cc..c3e3b14 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializedMessage.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializedMessage.scala @@ -19,9 +19,9 @@ package org.apache.gearpump.streaming.task import java.io.{DataInput, DataOutput} -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds -case class SerializedMessage(timeStamp: TimeStamp, bytes: Array[Byte]) +case class SerializedMessage(timeStamp: MilliSeconds, bytes: Array[Byte]) class SerializedMessageSerializer extends TaskMessageSerializer[SerializedMessage] { override def getLength(obj: SerializedMessage): Int = 12 + obj.bytes.length http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala index 8a6f04f..24f1763 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala @@ -20,13 +20,14 @@ package org.apache.gearpump.streaming.task import org.slf4j.Logger import com.google.common.primitives.Shorts +import org.apache.gearpump.{Message, Time} +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.streaming.partitioner.{MulticastPartitioner, Partitioner, UnicastPartitioner} import org.apache.gearpump.streaming.AppMasterToExecutor.MsgLostException import org.apache.gearpump.streaming.LifeTime import org.apache.gearpump.streaming.source.Watermark import org.apache.gearpump.streaming.task.Subscription._ import org.apache.gearpump.util.LogUtil -import org.apache.gearpump.{MIN_TIME_MILLIS, Message, TimeStamp} /** * Manages the output and message clock for single downstream processor @@ -59,9 +60,9 @@ class Subscription( private val pendingMessageCount: Array[Short] = new Array[Short](parallelism) private val processingWatermarkSince: Array[Short] = new Array[Short](parallelism) - private val outputWatermark: Array[TimeStamp] = Array.fill(parallelism)( + private val outputWatermark: Array[MilliSeconds] = Array.fill(parallelism)( Watermark.MIN.toEpochMilli) - private val processingWatermark: Array[TimeStamp] = Array.fill(parallelism)( + private val processingWatermark: Array[MilliSeconds] = Array.fill(parallelism)( Watermark.MIN.toEpochMilli) private var maxPendingCount: Short = 0 @@ -135,7 +136,7 @@ class Subscription( } } - private var lastFlushTime: Long = MIN_TIME_MILLIS + private var lastFlushTime: Long = Time.MIN_TIME_MILLIS private val FLUSH_INTERVAL = 5 * 1000 // ms private def needFlush: Boolean = { System.currentTimeMillis() - lastFlushTime > FLUSH_INTERVAL && @@ -181,7 +182,7 @@ class Subscription( } } - def watermark: TimeStamp = { + def watermark: MilliSeconds = { outputWatermark.min } @@ -189,7 +190,7 @@ class Subscription( maxPendingCount < maxPendingMessageCount } - def onStallingTime(stallingTime: TimeStamp): Unit = { + def onStallingTime(stallingTime: MilliSeconds): Unit = { outputWatermark.indices.foreach { i => if (outputWatermark(i) == stallingTime && pendingMessageCount(i) > 0 && http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala index dc80511..b587cc7 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala @@ -23,10 +23,11 @@ import java.time.Instant import scala.concurrent.duration.FiniteDuration import akka.actor.Actor.Receive import akka.actor.{ActorRef, ActorSystem, Cancellable, Props} +import org.apache.gearpump.Message +import org.apache.gearpump.Time.MilliSeconds import org.slf4j.Logger import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.util.LogUtil -import org.apache.gearpump.{Message, TimeStamp} /** * This provides context information for a task. @@ -113,7 +114,7 @@ trait TaskContext { * * @return the min clock */ - def upstreamMinClock: TimeStamp + def upstreamMinClock: MilliSeconds /** * Update TaskActor with the processing progress (watermark)
