Repository: incubator-gearpump Updated Branches: refs/heads/master b5f7fb6eb -> a996d397f
[GEARPUMP-274] Set lower bound of time to Long.MinValue milliseconds Author: manuzhang <[email protected]> Closes #151 from manuzhang/time. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/a996d397 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/a996d397 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/a996d397 Branch: refs/heads/master Commit: a996d397f7ed3cbcda934abafd67126607bb1eb7 Parents: b5f7fb6 Author: manuzhang <[email protected]> Authored: Tue Feb 21 09:45:42 2017 +0800 Committer: manuzhang <[email protected]> Committed: Tue Feb 21 09:45:49 2017 +0800 ---------------------------------------------------------------------- core/src/main/scala/org/apache/gearpump/package.scala | 7 ++++++- .../experiments/storm/util/StormOutputCollector.scala | 4 ++-- .../storm/util/StormOutputCollectorSpec.scala | 14 +++++++------- .../apache/gearpump/streaming/StreamApplication.scala | 6 ++---- .../gearpump/streaming/appmaster/ClockService.scala | 13 +++++++------ .../apache/gearpump/streaming/source/Watermark.scala | 6 +++++- .../apache/gearpump/streaming/task/Subscription.scala | 8 ++++---- .../apache/gearpump/streaming/task/TaskActor.scala | 10 +++++----- 8 files changed, 38 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a996d397/core/src/main/scala/org/apache/gearpump/package.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/package.scala b/core/src/main/scala/org/apache/gearpump/package.scala index b1118d3..2f74ac4 100644 --- a/core/src/main/scala/org/apache/gearpump/package.scala +++ b/core/src/main/scala/org/apache/gearpump/package.scala @@ -20,5 +20,10 @@ package org.apache package object gearpump { type TimeStamp = Long - val LatestTime = -1 + + // maximum time won't overflow when converted to milli-seconds + val MAX_TIME_MILLIS: Long = Long.MaxValue + + // minimum time won't overflow when converted to milli-seconds + val MIN_TIME_MILLIS: Long = Long.MinValue } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a996d397/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala index 1b239b5..fd023a9 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala @@ -28,7 +28,7 @@ import backtype.storm.task.TopologyContext import backtype.storm.tuple.Fields import backtype.storm.utils.Utils import org.slf4j.Logger -import org.apache.gearpump._ +import org.apache.gearpump.{MIN_TIME_MILLIS, Message, TimeStamp} import org.apache.gearpump.experiments.storm.topology.GearpumpTuple import org.apache.gearpump.experiments.storm.util.StormUtil._ import org.apache.gearpump.streaming.ProcessorId @@ -56,7 +56,7 @@ object StormOutputCollector { streamGroupers, componentToProcessorId, values) } new StormOutputCollector(stormTaskId, taskToComponent, targets, getTargetPartitionsFn, - taskContext, LatestTime) + taskContext, MIN_TIME_MILLIS) } /** http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a996d397/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala index e0e9e61..6b894da 100644 --- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala @@ -28,17 +28,17 @@ import org.scalatest.mock.MockitoSugar import org.scalatest.prop.PropertyChecks import org.scalatest.{Matchers, PropSpec} -import org.apache.gearpump._ +import org.apache.gearpump.{Message, MIN_TIME_MILLIS, TimeStamp} import org.apache.gearpump.experiments.storm.topology.GearpumpTuple import org.apache.gearpump.streaming.MockUtil class StormOutputCollectorSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { - val stormTaskId = 0 - val streamIdGen = Gen.alphaStr - val valuesGen = Gen.listOf[String](Gen.alphaStr).map(_.asJava.asInstanceOf[JList[AnyRef]]) - val timestampGen = Gen.chooseNum[Long](0L, 1000L) + private val stormTaskId = 0 + private val streamIdGen = Gen.alphaStr + private val valuesGen = Gen.listOf[String](Gen.alphaStr).map(_.asJava.asInstanceOf[JList[AnyRef]]) + private val timestampGen = Gen.chooseNum[Long](0L, 1000L) property("StormOutputCollector emits tuple values into a stream") { forAll(timestampGen, streamIdGen, valuesGen) { @@ -53,7 +53,7 @@ class StormOutputCollectorSpec targetStormTaskIds)) val taskContext = MockUtil.mockTaskContext val stormOutputCollector = new StormOutputCollector(stormTaskId, taskToComponent, - targets, getTargetPartitionsFn, taskContext, LatestTime) + targets, getTargetPartitionsFn, taskContext, MIN_TIME_MILLIS) when(targets.containsKey(streamId)).thenReturn(false) stormOutputCollector.emit(streamId, values) shouldBe StormOutputCollector.EMPTY_LIST @@ -86,7 +86,7 @@ class StormOutputCollectorSpec targetStormTaskIds)) val taskContext = MockUtil.mockTaskContext val stormOutputCollector = new StormOutputCollector(stormTaskId, taskToComponent, - targets, getTargetPartitionsFn, taskContext, LatestTime) + targets, getTargetPartitionsFn, taskContext, MIN_TIME_MILLIS) when(targets.containsKey(streamId)).thenReturn(false) verify(taskContext, times(0)).output(anyObject[Message]) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a996d397/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala index d4b3719..23350dd 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala @@ -20,10 +20,8 @@ package org.apache.gearpump.streaming import scala.language.implicitConversions import scala.reflect.ClassTag - import akka.actor.ActorSystem - -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.{MAX_TIME_MILLIS, MIN_TIME_MILLIS, TimeStamp} import org.apache.gearpump.cluster._ import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner, PartitionerDescription, PartitionerObject} import org.apache.gearpump.streaming.appmaster.AppMaster @@ -115,7 +113,7 @@ case class LifeTime(birth: TimeStamp, death: TimeStamp) { } object LifeTime { - val Immortal = LifeTime(0L, Long.MaxValue) + val Immortal = LifeTime(MIN_TIME_MILLIS, MAX_TIME_MILLIS) } /** http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a996d397/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala index 0a2999d..2a24b66 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala @@ -25,7 +25,7 @@ import java.util.concurrent.TimeUnit import akka.actor.{Actor, ActorRef, Cancellable, Stash} import com.google.common.primitives.Longs -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.{MIN_TIME_MILLIS, TimeStamp} import org.apache.gearpump.cluster.ClientToMaster.GetStallingTasks import org.apache.gearpump.streaming.AppMasterToMaster.StallingTasks import org.apache.gearpump.streaming._ @@ -53,8 +53,8 @@ class ClockService( import context.dispatcher private val healthChecker = new HealthChecker(stallingThresholdSeconds = 60) - private var healthCheckScheduler: Cancellable = null - private var snapshotScheduler: Cancellable = null + private var healthCheckScheduler: Cancellable = _ + private var snapshotScheduler: Cancellable = _ override def receive: Receive = null @@ -88,7 +88,7 @@ class ClockService( // We use Array instead of List for Performance consideration private var processorClocks = Array.empty[ProcessorClock] - private var checkpointClocks: Map[TaskId, Vector[TimeStamp]] = null + private var checkpointClocks: Map[TaskId, Vector[TimeStamp]] = _ private var minCheckpointClock: Option[TimeStamp] = None @@ -337,7 +337,8 @@ object ClockService { case object HealthCheck class ProcessorClock(val processorId: ProcessorId, val life: LifeTime, val parallelism: Int, - private var _min: TimeStamp = 0L, private var _taskClocks: Array[TimeStamp] = null) { + private var _min: TimeStamp = MIN_TIME_MILLIS, + private var _taskClocks: Array[TimeStamp] = null) { def copy(life: LifeTime): ProcessorClock = { new ProcessorClock(processorId, life, parallelism, _min, _taskClocks) @@ -370,7 +371,7 @@ object ClockService { class HealthChecker(stallingThresholdSeconds: Int) { private val LOG: Logger = LogUtil.getLogger(getClass) - private var minClock: ClockValue = null + private var minClock: ClockValue = _ private val stallingThresholdMilliseconds = stallingThresholdSeconds * 1000 // 60 seconds private var stallingTasks = Array.empty[TaskId] http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a996d397/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala index 1f8d3a1..9c27bde 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala @@ -20,6 +20,7 @@ package org.apache.gearpump.streaming.source import java.time.Instant import org.apache.gearpump.Message +import org.apache.gearpump.{MAX_TIME_MILLIS, MIN_TIME_MILLIS} /** * message used by source task to report source watermark. @@ -31,5 +32,8 @@ case class Watermark(instant: Instant) { object Watermark { // maximum time won't overflow when converted to milli-seconds - val MAX = Instant.ofEpochMilli(Long.MaxValue) + val MAX: Instant = Instant.ofEpochMilli(MAX_TIME_MILLIS) + + // minimum time won't overflow when converted to milli-seconds + val MIN: Instant = Instant.ofEpochMilli(MIN_TIME_MILLIS) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a996d397/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala index 3e68580..c6817f5 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala @@ -26,7 +26,7 @@ import org.apache.gearpump.streaming.AppMasterToExecutor.MsgLostException import org.apache.gearpump.streaming.LifeTime import org.apache.gearpump.streaming.task.Subscription._ import org.apache.gearpump.util.LogUtil -import org.apache.gearpump.{Message, TimeStamp} +import org.apache.gearpump.{MAX_TIME_MILLIS, Message, MIN_TIME_MILLIS, TimeStamp} /** * Manges the output and message clock for single downstream processor @@ -58,8 +58,8 @@ class Subscription( private val pendingMessageCount: Array[Short] = new Array[Short](parallelism) private val candidateMinClockSince: Array[Short] = new Array[Short](parallelism) - private val minClockValue: Array[TimeStamp] = Array.fill(parallelism)(Long.MaxValue) - private val candidateMinClock: Array[TimeStamp] = Array.fill(parallelism)(Long.MaxValue) + private val minClockValue: Array[TimeStamp] = Array.fill(parallelism)(MAX_TIME_MILLIS) + private val candidateMinClock: Array[TimeStamp] = Array.fill(parallelism)(MAX_TIME_MILLIS) private var maxPendingCount: Short = 0 @@ -133,7 +133,7 @@ class Subscription( } } - private var lastFlushTime: Long = 0L + private var lastFlushTime: Long = MIN_TIME_MILLIS private val FLUSH_INTERVAL = 5 * 1000 // ms private def needFlush: Boolean = { System.currentTimeMillis() - lastFlushTime > FLUSH_INTERVAL && http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a996d397/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala index 14c2b59..318ebf8 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala @@ -33,7 +33,7 @@ import org.apache.gearpump.streaming.AppMasterToExecutor._ import org.apache.gearpump.streaming.ExecutorToAppMaster._ import org.apache.gearpump.streaming.ProcessorId import org.apache.gearpump.util.{LogUtil, TimeOutScheduler} -import org.apache.gearpump.{Message, TimeStamp} +import org.apache.gearpump.{MAX_TIME_MILLIS, Message, MIN_TIME_MILLIS, TimeStamp} /** * @@ -46,8 +46,8 @@ class TaskActor( val task: TaskWrapper, inputSerializerPool: SerializationFramework) extends Actor with ExpressTransport with TimeOutScheduler { - var upstreamMinClock: TimeStamp = 0L - private var _minClock: TimeStamp = 0L + private var upstreamMinClock: TimeStamp = MIN_TIME_MILLIS + private var _minClock: TimeStamp = MIN_TIME_MILLIS private var minClockReported: Boolean = true def serializerPool: SerializationFramework = inputSerializerPool @@ -246,7 +246,7 @@ class TaskActor( receiveMessage(watermark.toMessage, sender) - case upstream@UpstreamMinClock(upstreamClock) => + case UpstreamMinClock(upstreamClock) => updateUpstreamMinClock(upstreamClock) case ChangeTask(_, dagVersion, life, subscribers) => @@ -316,7 +316,7 @@ class TaskActor( task.onWatermarkProgress(Instant.ofEpochMilli(this.upstreamMinClock)) } - val subMinClock = subscriptions.foldLeft(Long.MaxValue) { (min, sub) => + val subMinClock = subscriptions.foldLeft(MAX_TIME_MILLIS) { (min, sub) => val subMin = sub._2.minClock // A subscription is holding back the _minClock; // we send AckRequest to its tasks to push _minClock forward
