Repository: incubator-gearpump Updated Branches: refs/heads/master 24e1a4546 -> 83e1eb636
[GEARPUMP-317] Fix Task minClock Author: manuzhang <[email protected]> Closes #187 from manuzhang/watermark. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/83e1eb63 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/83e1eb63 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/83e1eb63 Branch: refs/heads/master Commit: 83e1eb63643274d3914470cbb60b9908b2885e8e Parents: 24e1a45 Author: manuzhang <[email protected]> Authored: Mon Jun 19 11:46:32 2017 +0800 Committer: manuzhang <[email protected]> Committed: Mon Jun 19 11:46:43 2017 +0800 ---------------------------------------------------------------------- .../streaming/dsl/task/GroupByTask.scala | 18 +- .../streaming/dsl/task/TransformTask.scala | 7 +- .../dsl/window/impl/WindowRunner.scala | 46 ++-- .../streaming/source/DataSourceProcessor.scala | 20 +- .../streaming/source/DataSourceTask.scala | 9 +- .../gearpump/streaming/task/Subscription.scala | 59 ++-- .../apache/gearpump/streaming/task/Task.scala | 5 + .../gearpump/streaming/task/TaskActor.scala | 273 +++++++++---------- .../gearpump/streaming/task/TaskUtil.scala | 30 ++ .../gearpump/streaming/task/TaskWrapper.scala | 4 + .../streaming/dsl/task/TransformTaskSpec.scala | 6 +- .../window/impl/DefaultWindowRunnerSpec.scala | 2 +- .../streaming/source/DataSourceTaskSpec.scala | 5 +- .../streaming/task/SubscriptionSpec.scala | 53 ++-- 14 files changed, 298 insertions(+), 239 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83e1eb63/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala index 8301fb9..b3f3ad2 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala @@ -26,7 +26,8 @@ import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants.{GEARPUMP_STREAMING_GROUPBY_FUNCTION, GEARPUMP_STREAMING_OPERATOR} import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner} -import org.apache.gearpump.streaming.task.{Task, TaskContext} +import org.apache.gearpump.streaming.source.Watermark +import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil} /** * Processes messages in groups as defined by groupBy function. @@ -61,13 +62,14 @@ class GroupByTask[IN, GROUP, OUT]( } override def onWatermarkProgress(watermark: Instant): Unit = { - groups.values.forEach(new Consumer[WindowRunner[IN, OUT]] { - override def accept(runner: WindowRunner[IN, OUT]): Unit = { - runner.trigger(watermark).foreach { - result => - taskContext.output(Message(result.value, result.timestamp)) + if (groups.isEmpty && watermark == Watermark.MAX) { + taskContext.updateWatermark(Watermark.MAX) + } else { + groups.values.forEach(new Consumer[WindowRunner[IN, OUT]] { + override def accept(runner: WindowRunner[IN, OUT]): Unit = { + TaskUtil.trigger(watermark, runner, taskContext) } - } - }) + }) + } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83e1eb63/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala index 6a455a5..5ad64fa 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala @@ -23,7 +23,7 @@ import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants._ import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner} -import org.apache.gearpump.streaming.task.{Task, TaskContext} +import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil} class TransformTask[IN, OUT]( runner: WindowRunner[IN, OUT], @@ -41,9 +41,6 @@ class TransformTask[IN, OUT]( } override def onWatermarkProgress(watermark: Instant): Unit = { - runner.trigger(watermark).foreach { - result => - taskContext.output(Message(result.value, result.timestamp)) - } + TaskUtil.trigger(watermark, runner, taskContext) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83e1eb63/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala index 2025618..17a9525 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala @@ -26,28 +26,34 @@ import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner import org.apache.gearpump.streaming.dsl.window.api.WindowFunction.Context import org.apache.gearpump.streaming.dsl.window.api.{Discarding, Windows} +import org.apache.gearpump.streaming.source.Watermark +import org.apache.gearpump.streaming.task.TaskUtil import scala.collection.mutable.ArrayBuffer case class TimestampedValue[T](value: T, timestamp: Instant) +case class TriggeredOutputs[T](outputs: TraversableOnce[TimestampedValue[T]], + watermark: Instant) + trait WindowRunner[IN, OUT] extends java.io.Serializable { def process(timestampedValue: TimestampedValue[IN]): Unit - def trigger(time: Instant): TraversableOnce[TimestampedValue[OUT]] + def trigger(time: Instant): TriggeredOutputs[OUT] } case class AndThen[IN, MIDDLE, OUT](left: WindowRunner[IN, MIDDLE], right: WindowRunner[MIDDLE, OUT]) extends WindowRunner[IN, OUT] { - def process(timestampedValue: TimestampedValue[IN]): Unit = { + override def process(timestampedValue: TimestampedValue[IN]): Unit = { left.process(timestampedValue) } - def trigger(time: Instant): TraversableOnce[TimestampedValue[OUT]] = { - left.trigger(time).foreach(right.process) - right.trigger(time) + override def trigger(time: Instant): TriggeredOutputs[OUT] = { + val lOutputs = left.trigger(time) + lOutputs.outputs.foreach(right.process) + right.trigger(lOutputs.watermark) } } @@ -59,6 +65,7 @@ class DefaultWindowRunner[IN, OUT]( private val windowFn = windows.windowFn private val windowInputs = new TreeSortedMap[Window, FastList[TimestampedValue[IN]]] private var setup = false + private var watermark = Watermark.MIN override def process(timestampedValue: TimestampedValue[IN]): Unit = { val wins = windowFn(new Context[IN] { @@ -98,10 +105,11 @@ class DefaultWindowRunner[IN, OUT]( } } - override def trigger(time: Instant): TraversableOnce[TimestampedValue[OUT]] = { + override def trigger(time: Instant): TriggeredOutputs[OUT] = { @annotation.tailrec def onTrigger( - outputs: ArrayBuffer[TimestampedValue[OUT]]): TraversableOnce[TimestampedValue[OUT]] = { + outputs: ArrayBuffer[TimestampedValue[OUT]], + wmk: Instant): TriggeredOutputs[OUT] = { if (windowInputs.notEmpty()) { val firstWin = windowInputs.firstKey if (!time.isBefore(firstWin.endTime)) { @@ -118,25 +126,31 @@ class DefaultWindowRunner[IN, OUT]( } }) fnRunner.finish().foreach { - out: OUT => outputs += TimestampedValue(out, firstWin.endTime.minusMillis(1)) + out: OUT => + outputs += TimestampedValue(out, firstWin.endTime.minusMillis(1)) } + val newWmk = TaskUtil.max(wmk, firstWin.endTime) if (windows.accumulationMode == Discarding) { fnRunner.teardown() - setup = false // discarding, setup need to be called for each window - onTrigger(outputs) - } else { - // accumulating, setup is only called for the first window - onTrigger(outputs) + setup = false } + onTrigger(outputs, newWmk) } else { - outputs + // minimum of end of last triggered window and start of first un-triggered window + TriggeredOutputs(outputs, TaskUtil.min(wmk, firstWin.startTime)) } } else { - outputs + if (time == Watermark.MAX) { + TriggeredOutputs(outputs, Watermark.MAX) + } else { + TriggeredOutputs(outputs, wmk) + } } } - onTrigger(ArrayBuffer.empty[TimestampedValue[OUT]]) + val triggeredOutputs = onTrigger(ArrayBuffer.empty[TimestampedValue[OUT]], watermark) + watermark = TaskUtil.max(watermark, triggeredOutputs.watermark) + TriggeredOutputs(triggeredOutputs.outputs, watermark) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83e1eb63/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala index d1cc5c8..dd4c0d3 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala @@ -20,6 +20,9 @@ package org.apache.gearpump.streaming.source import akka.actor.ActorSystem import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner +import org.apache.gearpump.streaming.dsl.window.api.{WindowFunction, Windows} +import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, Window, WindowRunner} import org.apache.gearpump.streaming.{Constants, Processor} /** @@ -43,6 +46,21 @@ object DataSourceProcessor { taskConf: UserConfig = UserConfig.empty)(implicit system: ActorSystem) : Processor[DataSourceTask[Any, Any]] = { Processor[DataSourceTask[Any, Any]](parallelism, description, - taskConf.withValue[DataSource](Constants.GEARPUMP_STREAMING_SOURCE, dataSource)) + taskConf + .withValue[DataSource](Constants.GEARPUMP_STREAMING_SOURCE, dataSource) + .withValue[WindowRunner[Any, Any]](Constants.GEARPUMP_STREAMING_OPERATOR, + new DefaultWindowRunner[Any, Any]( + Windows(PerElementWindowFunction, description = "perElementWindows"), + new DummyRunner[Any]))) + } + + + case object PerElementWindowFunction extends WindowFunction { + override def apply[T]( + context: WindowFunction.Context[T]): Array[Window] = { + Array(Window(context.timestamp, context.timestamp.plusMillis(1))) + } + + override def isNonMerging: Boolean = true } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83e1eb63/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala index 74b0cc2..f93c496 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala @@ -24,7 +24,7 @@ import org.apache.gearpump._ import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants._ import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner} -import org.apache.gearpump.streaming.task.{Task, TaskContext} +import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil} /** * Default Task container for [[org.apache.gearpump.streaming.source.DataSource]] that @@ -56,7 +56,7 @@ class DataSourceTask[IN, OUT] private[source]( private val batchSize = conf.getInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE).getOrElse(1000) override def onStart(startTime: Instant): Unit = { - LOG.info(s"opening data source at $startTime") + LOG.info(s"opening data source at ${startTime.toEpochMilli}") source.open(context, startTime) self ! Watermark(source.getWatermark) @@ -73,10 +73,7 @@ class DataSourceTask[IN, OUT] private[source]( } override def onWatermarkProgress(watermark: Instant): Unit = { - windowRunner.trigger(watermark).foreach { - result => - context.output(Message(result.value, result.timestamp)) - } + TaskUtil.trigger(watermark, windowRunner, context) } override def onStop(): Unit = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83e1eb63/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala index 44ec2c6..8a6f04f 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala @@ -23,12 +23,13 @@ import com.google.common.primitives.Shorts import org.apache.gearpump.streaming.partitioner.{MulticastPartitioner, Partitioner, UnicastPartitioner} import org.apache.gearpump.streaming.AppMasterToExecutor.MsgLostException import org.apache.gearpump.streaming.LifeTime +import org.apache.gearpump.streaming.source.Watermark import org.apache.gearpump.streaming.task.Subscription._ import org.apache.gearpump.util.LogUtil -import org.apache.gearpump.{MAX_TIME_MILLIS, MIN_TIME_MILLIS, Message, TimeStamp} +import org.apache.gearpump.{MIN_TIME_MILLIS, Message, TimeStamp} /** - * Manges the output and message clock for single downstream processor + * Manages the output and message clock for single downstream processor * * @param subscriber downstream processor * @param maxPendingMessageCount trigger flow control. Should be bigger than @@ -39,8 +40,9 @@ class Subscription( appId: Int, executorId: Int, taskId: TaskId, - subscriber: Subscriber, sessionId: Int, - transport: ExpressTransport, + subscriber: Subscriber, + sessionId: Int, + publisher: TaskActor, maxPendingMessageCount: Int = MAX_PENDING_MESSAGE_COUNT, ackOnceEveryMessageCount: Int = ONE_ACKREQUEST_EVERY_MESSAGE_COUNT) { @@ -55,10 +57,12 @@ class Subscription( // Don't worry if this store negative number. We will wrap the Short private val messageCount: Array[Short] = new Array[Short](parallelism) private val pendingMessageCount: Array[Short] = new Array[Short](parallelism) - private val candidateMinClockSince: Array[Short] = new Array[Short](parallelism) + private val processingWatermarkSince: Array[Short] = new Array[Short](parallelism) - private val minClockValue: Array[TimeStamp] = Array.fill(parallelism)(Long.MaxValue) - private val candidateMinClock: Array[TimeStamp] = Array.fill(parallelism)(Long.MaxValue) + private val outputWatermark: Array[TimeStamp] = Array.fill(parallelism)( + Watermark.MIN.toEpochMilli) + private val processingWatermark: Array[TimeStamp] = Array.fill(parallelism)( + Watermark.MIN.toEpochMilli) private var maxPendingCount: Short = 0 @@ -84,7 +88,7 @@ class Subscription( def start(): Unit = { val ackRequest = InitialAckRequest(taskId, sessionId) - transport.transport(ackRequest, allTasks: _*) + publisher.transport(ackRequest, allTasks: _*) } def sendMessage(msg: Message): Int = { @@ -106,12 +110,9 @@ class Subscription( msg.timestamp.toEpochMilli)) { val targetTask = TaskId(processorId, partition) - transport.transport(msg, targetTask) + publisher.transport(msg, targetTask) - this.minClockValue(partition) = Math.min(this.minClockValue(partition), - msg.timestamp.toEpochMilli) - this.candidateMinClock(partition) = - Math.min(this.candidateMinClock(partition), msg.timestamp.toEpochMilli) + this.processingWatermark(partition) = publisher.getProcessingWatermark.toEpochMilli incrementMessageCount(partition, 1) @@ -165,15 +166,9 @@ class Subscription( if (ack.sessionId == sessionId) { if (ack.actualReceivedNum == ack.seq) { - if ((ack.seq - candidateMinClockSince(index)).toShort >= 0) { - if (ack.seq == messageCount(index)) { - // All messages have been acked. - minClockValue(index) = Long.MaxValue - } else { - minClockValue(index) = candidateMinClock(index) - } - candidateMinClock(index) = Long.MaxValue - candidateMinClockSince(index) = messageCount(index) + if ((ack.seq - processingWatermarkSince(index)).toShort >= 0) { + outputWatermark(index) = processingWatermark(index) + processingWatermarkSince(index) = messageCount(index) } pendingMessageCount(ack.taskId.index) = (messageCount(ack.taskId.index) - ack.seq).toShort @@ -186,20 +181,24 @@ class Subscription( } } - def minClock: TimeStamp = { - minClockValue.min + def watermark: TimeStamp = { + outputWatermark.min } def allowSendingMoreMessages(): Boolean = { maxPendingCount < maxPendingMessageCount } - def sendAckRequestOnStallingTime(stallingTime: TimeStamp): Unit = { - minClockValue.indices.foreach { i => - if (minClockValue(i) == stallingTime && pendingMessageCount(i) > 0 - && allowSendingMoreMessages) { + def onStallingTime(stallingTime: TimeStamp): Unit = { + outputWatermark.indices.foreach { i => + if (outputWatermark(i) == stallingTime && + pendingMessageCount(i) > 0 && + allowSendingMoreMessages) { sendAckRequest(i) sendLatencyProbe(i) + } else if (publisher.getProcessingWatermark == Watermark.MAX && + pendingMessageCount(i) == 0) { + outputWatermark(i) = Watermark.MAX.toEpochMilli } } } @@ -210,7 +209,7 @@ class Subscription( incrementMessageCount(partition, ackOnceEveryMessageCount) val targetTask = TaskId(processorId, partition) val ackRequest = AckRequest(taskId, messageCount(partition), sessionId) - transport.transport(ackRequest, targetTask) + publisher.transport(ackRequest, targetTask) } private def incrementMessageCount(partition: Int, count: Int): Unit = { @@ -226,7 +225,7 @@ class Subscription( private def sendLatencyProbe(partition: Int): Unit = { val probeLatency = LatencyProbe(System.currentTimeMillis()) val targetTask = TaskId(processorId, partition) - transport.transport(probeLatency, targetTask) + publisher.transport(probeLatency, targetTask) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83e1eb63/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala index 90a8bff..dc80511 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala @@ -116,6 +116,11 @@ trait TaskContext { def upstreamMinClock: TimeStamp /** + * Update TaskActor with the processing progress (watermark) + */ + def updateWatermark(watermark: Instant): Unit + + /** * Logger is environment dependant, it should be provided by * containing environment. */ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83e1eb63/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala index 8ef45f3..1b90146 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala @@ -35,7 +35,7 @@ import org.apache.gearpump.streaming.ExecutorToAppMaster._ import org.apache.gearpump.streaming.ProcessorId import org.apache.gearpump.streaming.task.TaskActor._ import org.apache.gearpump.util.{LogUtil, TimeOutScheduler} -import org.apache.gearpump.{MAX_TIME_MILLIS, MIN_TIME_MILLIS, Message, TimeStamp} +import org.apache.gearpump.{Message, TimeStamp} import scala.collection.JavaConverters._ import scala.concurrent.duration._ @@ -51,16 +51,14 @@ class TaskActor( val task: TaskWrapper, inputSerializerPool: SerializationFramework) extends Actor with ExpressTransport with TimeOutScheduler { - private var upstreamMinClock: TimeStamp = MIN_TIME_MILLIS - private var _minClock: TimeStamp = MIN_TIME_MILLIS - - def serializerPool: SerializationFramework = inputSerializerPool - - private val config = context.system.settings.config - + final val LATENCY_PROBE_INTERVAL = FiniteDuration(1, TimeUnit.SECONDS) + // Clock report interval + final val CLOCK_REPORT_INTERVAL = FiniteDuration(100, TimeUnit.MILLISECONDS) + // Flush interval + final val FLUSH_INTERVAL = FiniteDuration(100, TimeUnit.MILLISECONDS) val LOG: Logger = LogUtil.getLogger(getClass, app = taskContextData.appId, executor = taskContextData.executorId, task = taskId) - + private val config = context.system.settings.config // Metrics private val metricName = s"app${taskContextData.appId}.processor${taskId.processorId}.task${taskId.index}" @@ -69,41 +67,48 @@ class TaskActor( private val processTime = Metrics(context.system).histogram(s"$metricName:processTime") private val sendThroughput = Metrics(context.system).meter(s"$metricName:sendThroughput") private val receiveThroughput = Metrics(context.system).meter(s"$metricName:receiveThroughput") - private val maxPendingMessageCount = config.getInt(GEARPUMP_STREAMING_MAX_PENDING_MESSAGE_COUNT) private val ackOnceEveryMessageCount = config.getInt( GEARPUMP_STREAMING_ACK_ONCE_EVERY_MESSAGE_COUNT) - private val executor = context.parent - private var life = taskContextData.life + private val queue = new util.LinkedList[AnyRef]() + // SecurityChecker will be responsible of dropping messages from + // unknown sources + private val securityChecker = new SecurityChecker(taskId, self) + private val stashQueue = new util.LinkedList[MessageAndSender]() // Latency probe import context.dispatcher - final val LATENCY_PROBE_INTERVAL = FiniteDuration(1, TimeUnit.SECONDS) - - // Clock report interval - final val CLOCK_REPORT_INTERVAL = FiniteDuration(1, TimeUnit.SECONDS) - - // Flush interval - final val FLUSH_INTERVAL = FiniteDuration(100, TimeUnit.MILLISECONDS) - - private val queue = new util.LinkedList[AnyRef]() - + private var upstreamWatermark: Instant = Watermark.MIN + private var processingWatermark: Instant = Watermark.MIN + private var watermark: Instant = Watermark.MIN + private var life = taskContextData.life private var subscriptions = List.empty[(Int, Subscription)] - - // SecurityChecker will be responsible of dropping messages from - // unknown sources - private val securityChecker = new SecurityChecker(taskId, self) private[task] var sessionId = NONE_SESSION // Reports to appMaster with my address express.registerLocalActor(TaskId.toLong(taskId), self) - final def receive: Receive = null + final override def postStop(): Unit = { + onStop() + } task.setTaskActor(this) + def onStop(): Unit = task.onStop() + + final override def preStart(): Unit = { + val register = RegisterTask(taskId, taskContextData.executorId, local) + LOG.info(s"$register") + executor ! register + context.become(waitForTaskRegistered) + } + + def serializerPool: SerializationFramework = inputSerializerPool + + final def receive: Receive = null + def onStart(startTime: Instant): Unit = { task.onStart(startTime) } @@ -112,9 +117,6 @@ class TaskActor( def onUnManagedMessage(msg: Any): Unit = task.receiveUnManagedMessage.apply(msg) - def onStop(): Unit = task.onStop() - - /** * output to a downstream by specifying a arrayIndex * @param arrayIndex this is not same as ProcessorId @@ -133,84 +135,16 @@ class TaskActor( sendThroughput.mark(count) } - final override def postStop(): Unit = { - onStop() - } - - final override def preStart(): Unit = { - val register = RegisterTask(taskId, taskContextData.executorId, local) - LOG.info(s"$register") - executor ! register - context.become(waitForTaskRegistered) - } - - private def allowSendingMoreMessages(): Boolean = { - subscriptions.forall(_._2.allowSendingMoreMessages()) - } - - private def doHandleMessage(): Int = { - var done = false - - var count = 0 - - while (allowSendingMoreMessages() && !done) { - val msg = queue.poll() - if (msg != null) { - msg match { - case SendAck(ack, targetTask) => - transport(ack, targetTask) - case m: Message => - count += 1 - onNext(m) - case other => - // un-managed message - onUnManagedMessage(other) - } - } else { - done = true - } - } - - count - } - - private def onStartClock(): Unit = { - LOG.info(s"received start, clock: $upstreamMinClock, sessionId: $sessionId") - subscriptions = taskContextData.subscribers.map { subscriber => - (subscriber.processorId, - new Subscription(taskContextData.appId, taskContextData.executorId, taskId, subscriber, - sessionId, this, maxPendingMessageCount, ackOnceEveryMessageCount)) - }.sortBy(_._1) - - subscriptions.foreach(_._2.start()) - - stashQueue.asScala.foreach { item => - handleMessages(item.sender).apply(item.msg) - } - stashQueue.clear() - - // Put this as the last step so that the subscription is already initialized. - // Message sending in current Task before onStart will not be delivered to - // target - onStart(Instant.ofEpochMilli(_minClock)) - - taskContextData.appMaster ! GetUpstreamMinClock(taskId) - context.become(handleMessages(sender)) - } - def waitForTaskRegistered: Receive = { case TaskRegistered(_, id, startClock) => this.sessionId = id - this._minClock = startClock - context.become(waitForStartClock) + context.become(waitForStartTask(startClock)) } - private val stashQueue = new util.LinkedList[MessageAndSender]() - - def waitForStartClock: Receive = { + def waitForStartTask(startClock: TimeStamp): Receive = { case start@StartTask(tid) => assert(tid == this.taskId, s"$start sent to the wrong task ${this.taskId}") - onStartClock() + onStartTask(startClock) case other: AnyRef => stashQueue.add(MessageAndSender(other, sender())) } @@ -246,7 +180,7 @@ class TaskActor( case watermark@Watermark(instant) => assert(sender.eq(self), "Watermark should only be sent from Task to itself") - onUpstreamMinClock(instant.toEpochMilli) + onUpstreamMinClock(instant) receiveMessage(watermark.toMessage, sender) case UpstreamMinClock(upstreamClock) => @@ -256,7 +190,7 @@ class TaskActor( // 3. upstreamClock is None for source task since it's reported as watermark above // by external source // 4. this is designed to avoid flooding the ClockService - upstreamClock.foreach(onUpstreamMinClock) + upstreamClock.foreach(clock => onUpstreamMinClock(Instant.ofEpochMilli(clock))) reportMinClock() case ChangeTask(_, dagVersion, newLife, subscribers) => @@ -291,14 +225,69 @@ class TaskActor( } /** - * Returns min clock of this task - */ - def minClock: TimeStamp = _minClock - - /** * Returns min clock of upstream task */ - def getUpstreamMinClock: TimeStamp = upstreamMinClock + def getUpstreamMinClock: TimeStamp = upstreamWatermark.toEpochMilli + + def getProcessingWatermark: Instant = processingWatermark + + def updateWatermark(watermark: Instant): Unit = { + processingWatermark = TaskUtil.max(processingWatermark, watermark) + } + + private def allowSendingMoreMessages(): Boolean = { + subscriptions.forall(_._2.allowSendingMoreMessages()) + } + + private def doHandleMessage(): Int = { + var done = false + + var count = 0 + + while (allowSendingMoreMessages() && !done) { + val msg = queue.poll() + if (msg != null) { + msg match { + case SendAck(ack, targetTask) => + transport(ack, targetTask) + case m: Message => + count += 1 + onNext(m) + case other => + // un-managed message + onUnManagedMessage(other) + } + } else { + done = true + } + } + + count + } + + private def onStartTask(startClock: TimeStamp): Unit = { + LOG.info(s"received start, clock: $startClock, sessionId: $sessionId") + subscriptions = taskContextData.subscribers.map { subscriber => + (subscriber.processorId, + new Subscription(taskContextData.appId, taskContextData.executorId, taskId, subscriber, + sessionId, this, maxPendingMessageCount, ackOnceEveryMessageCount)) + }.sortBy(_._1) + + subscriptions.foreach(_._2.start()) + + stashQueue.asScala.foreach { item => + handleMessages(item.sender).apply(item.msg) + } + stashQueue.clear() + + // Put this as the last step so that the subscription is already initialized. + // Message sending in current Task before onStart will not be delivered to + // target + onStart(Instant.ofEpochMilli(startClock)) + + taskContextData.appMaster ! GetUpstreamMinClock(taskId) + context.become(handleMessages(sender)) + } private def receiveMessage(msg: Message, sender: ActorRef): Unit = { val messageAfterCheck = securityChecker.checkMessage(msg, sender) @@ -335,44 +324,52 @@ class TaskActor( * for other tasks, the clock comes from that reported to ClockService * by upstream tasks */ - private def onUpstreamMinClock(upstreamClock: TimeStamp): Unit = { - if (upstreamClock > this.upstreamMinClock) { - this.upstreamMinClock = upstreamClock - task.onWatermarkProgress(Instant.ofEpochMilli(this.upstreamMinClock)) + private def onUpstreamMinClock(upstreamClock: Instant): Unit = { + if (upstreamClock.isAfter(this.upstreamWatermark)) { + this.upstreamWatermark = upstreamClock + task.onWatermarkProgress(upstreamWatermark) } - val subMinClock = subscriptions.foldLeft(MAX_TIME_MILLIS) { (min, sub) => - val subMin = sub._2.minClock - // A subscription is holding back the _minClock; - // we send AckRequest to its tasks to push _minClock forward - if (subMin == _minClock) { - sub._2.sendAckRequestOnStallingTime(_minClock) - } - Math.min(min, subMin) - } + // For Task without subscriptions, this will be Watermark.MAX + val subWatermark = getSubscriptionWatermark(subscriptions, watermark) - _minClock = Math.max(life.birth, Math.min(upstreamMinClock, subMinClock)) + watermark = TaskUtil.max(Instant.ofEpochMilli(life.birth), + TaskUtil.min(upstreamWatermark, + TaskUtil.min(processingWatermark, subWatermark))) // Checks whether current task is dead. - if (_minClock > life.death) { + if (watermark.toEpochMilli > life.death) { // There will be no more message received... val unRegister = UnRegisterTask(taskId, taskContextData.executorId) executor ! unRegister - LOG.info(s"Sending $unRegister, current minclock: ${_minClock}, life: $life") + LOG.info(s"Sending $unRegister, current watermark: $watermark, life: $life") } } private def reportMinClock(): Unit = { - val update = UpdateClock(taskId, _minClock) + val update = UpdateClock(taskId, watermark.toEpochMilli) context.system.scheduler.scheduleOnce(CLOCK_REPORT_INTERVAL) { taskContextData.appMaster ! update } } + + private def getSubscriptionWatermark(subs: List[(Int, Subscription)], wmk: Instant): Instant = { + Instant.ofEpochMilli(subs.foldLeft(Watermark.MAX.toEpochMilli) { + case (min, (_, sub)) => + val subWmk = sub.watermark + if (subWmk == wmk.toEpochMilli) { + sub.onStallingTime(subWmk) + } + Math.min(min, subWmk) + }) + } } object TaskActor { + val NONE_SESSION: Int = -1 + // If the message comes from an unknown source, securityChecker will drop it class SecurityChecker(task_id: TaskId, self: ActorRef) { @@ -381,15 +378,6 @@ object TaskActor { // Uses mutable HashMap for performance optimization private val receivedMsgCount = new IntShortHashMap() - // Tricky performance optimization to save memory. - // We store the session Id in the uid of ActorPath - // ActorPath.hashCode is same as uid. - private def getSessionId(actor: ActorRef): Int = { - // TODO: As method uid is protected in [akka] package. We - // are using hashCode instead of uid. - actor.hashCode() - } - def handleInitialAckRequest(ackRequest: InitialAckRequest): Ack = { LOG.debug(s"Handle InitialAckRequest for session $ackRequest") val sessionId = ackRequest.sessionId @@ -431,13 +419,20 @@ object TaskActor { } } } + + // Tricky performance optimization to save memory. + // We store the session Id in the uid of ActorPath + // ActorPath.hashCode is same as uid. + private def getSessionId(actor: ActorRef): Int = { + // TODO: As method uid is protected in [akka] package. We + // are using hashCode instead of uid. + actor.hashCode() + } } case class SendAck(ack: Ack, targetTask: TaskId) - case object FLUSH - - val NONE_SESSION: Int = -1 - case class MessageAndSender(msg: AnyRef, sender: ActorRef) + + case object FLUSH } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83e1eb63/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala index 7459c64..bd889c4 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala @@ -18,6 +18,11 @@ package org.apache.gearpump.streaming.task +import java.time.Instant + +import org.apache.gearpump.Message +import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner} + object TaskUtil { /** @@ -30,4 +35,29 @@ object TaskUtil { val loader = Thread.currentThread().getContextClassLoader() loader.loadClass(className).asSubclass(classOf[Task]) } + + def trigger[IN, OUT](watermark: Instant, runner: WindowRunner[IN, OUT], + context: TaskContext): Unit = { + val triggeredOutputs = runner.trigger(watermark) + context.updateWatermark(triggeredOutputs.watermark) + triggeredOutputs.outputs.foreach { case TimestampedValue(v, t) => + context.output(Message(v, t)) + } + } + + /** + * @return t1 if t1 is not larger than t2 and t2 otherwise + */ + def min(t1: Instant, t2: Instant): Instant = { + if (t1.isAfter(t2)) t2 + else t1 + } + + /** + * @return t1 if t1 is not smaller than t2 and t2 otherwise + */ + def max(t1: Instant, t2: Instant): Instant = { + if (t2.isBefore(t1)) t1 + else t2 + } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83e1eb63/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala index f5f099c..82cae96 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala @@ -111,6 +111,10 @@ class TaskWrapper( actor.getUpstreamMinClock } + override def updateWatermark(watermark: Instant): Unit = { + actor.updateWatermark(watermark) + } + def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: => Unit): Cancellable = { val dispatcher = actor.context.system.dispatcher actor.context.system.scheduler.schedule(initialDelay, interval)(f)(dispatcher) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83e1eb63/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala index 6b66f01..0bb4d6a 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala @@ -22,7 +22,7 @@ import java.time.Instant import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner} +import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, TriggeredOutputs, WindowRunner} import org.mockito.{Matchers => MockitoMatchers} import org.mockito.Mockito.{verify, when} import org.scalacheck.Gen @@ -48,9 +48,11 @@ class TransformTaskSpec extends PropSpec with PropertyChecks with Matchers with task.onNext(message) verify(windowRunner).process(TimestampedValue(value, time)) - when(windowRunner.trigger(watermark)).thenReturn(Some(TimestampedValue(value, time))) + when(windowRunner.trigger(watermark)).thenReturn( + TriggeredOutputs(Some(TimestampedValue(value, time)), watermark)) task.onWatermarkProgress(watermark) verify(context).output(message) + verify(context).updateWatermark(watermark) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83e1eb63/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala index 98e9919..b23d0ee 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala @@ -50,7 +50,7 @@ class DefaultWindowRunnerSpec extends PropSpec with PropertyChecks new FoldRunner[KV, Option[KV]](reduce, "reduce")) data.foreach(m => windowRunner.process(TimestampedValue(m.value.asInstanceOf[KV], m.timestamp))) - windowRunner.trigger(Watermark.MAX).toList shouldBe + windowRunner.trigger(Watermark.MAX).outputs.toList shouldBe List( TimestampedValue(Some(("foo", 1)), Instant.ofEpochMilli(4)), TimestampedValue(Some(("foo", 1)), Instant.ofEpochMilli(18)), http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83e1eb63/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala index f7a3a63..d62739a 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala @@ -23,7 +23,7 @@ import java.time.Instant import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner} +import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, TriggeredOutputs, WindowRunner} import org.mockito.Mockito._ import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar @@ -63,12 +63,13 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with when(dataSource.read()).thenReturn(msg) when(runner.trigger(Watermark.MAX)).thenReturn( - Some(TimestampedValue(str.asInstanceOf[Any], timestamp))) + TriggeredOutputs(Some(TimestampedValue(str.asInstanceOf[Any], timestamp)), Watermark.MAX)) sourceTask.onNext(Message("next")) sourceTask.onWatermarkProgress(Watermark.MAX) verify(taskContext).output(msg) + verify(taskContext).updateWatermark(Watermark.MAX) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83e1eb63/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala index 65cb17a..285bf44 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala @@ -24,7 +24,7 @@ import java.util.Random import org.mockito.Mockito._ import org.scalatest.mock.MockitoSugar import org.scalatest.{FlatSpec, Matchers} -import org.apache.gearpump.Message +import org.apache.gearpump.{MIN_TIME_MILLIS, Message} import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner} import org.apache.gearpump.streaming.source.Watermark @@ -47,40 +47,42 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar { val subscriber = Subscriber(downstreamProcessorId, partitioner, downstreamProcessor.parallelism, downstreamProcessor.life) - private def prepare: (Subscription, ExpressTransport) = { - val transport = mock[ExpressTransport] - val subscription = new Subscription(appId, executorId, taskId, subscriber, session, transport) + private def prepare: (Subscription, TaskActor) = { + val sender = mock[TaskActor] + + val subscription = new Subscription(appId, executorId, taskId, subscriber, session, sender) subscription.start() val expectedAckRequest = InitialAckRequest(taskId, session) - verify(transport, times(1)).transport(expectedAckRequest, TaskId(1, 0), TaskId(1, 1)) + verify(sender, times(1)).transport(expectedAckRequest, TaskId(1, 0), TaskId(1, 1)) - (subscription, transport) + (subscription, sender) } it should "not send any more message when its life ends" in { - val (subscription, transport) = prepare + val (subscription, _) = prepare subscription.changeLife(LifeTime(0, 0)) val count = subscription.sendMessage(Message("some")) assert(count == 0) } it should "send message and handle ack correctly" in { - val (subscription, transport) = prepare + val (subscription, sender) = prepare val msg1 = Message("1", timestamp = Instant.ofEpochMilli(70)) + when(sender.getProcessingWatermark).thenReturn(msg1.timestamp) subscription.sendMessage(msg1) - verify(transport, times(1)).transport(msg1, TaskId(1, 1)) - assert(subscription.minClock == 70) + verify(sender, times(1)).transport(msg1, TaskId(1, 1)) + assert(subscription.watermark == MIN_TIME_MILLIS) val msg2 = Message("0", timestamp = Instant.ofEpochMilli(50)) + when(sender.getProcessingWatermark).thenReturn(msg2.timestamp) subscription.sendMessage(msg2) - verify(transport, times(1)).transport(msg2, TaskId(1, 0)) - // minClock has been set to smaller one - assert(subscription.minClock == 50) + verify(sender, times(1)).transport(msg2, TaskId(1, 0)) + assert(subscription.watermark == MIN_TIME_MILLIS) - val initialMinClock = subscription.minClock + val initialMinClock = subscription.watermark // Acks initial AckRequest(0) subscription.receiveAck(Ack(TaskId(1, 1), 0, 0, session)) @@ -88,46 +90,39 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar { // Sends 100 messages 100 until 200 foreach { clock => + when(sender.getProcessingWatermark).thenReturn(Instant.ofEpochMilli(clock), + Instant.ofEpochMilli(clock)) subscription.sendMessage(Message("1", clock)) subscription.sendMessage(Message("2", clock)) } - // Ack not received, minClock no change - assert(subscription.minClock == initialMinClock) + assert(subscription.watermark == 50) subscription.receiveAck(Ack(TaskId(1, 1), 100, 100, session)) subscription.receiveAck(Ack(TaskId(1, 0), 100, 100, session)) // Ack received, minClock changed - assert(subscription.minClock > initialMinClock) + assert(subscription.watermark > initialMinClock) // Expects to receive two ackRequest for two downstream tasks val ackRequestForTask0 = AckRequest(taskId, 200, session) - verify(transport, times(1)).transport(ackRequestForTask0, TaskId(1, 0)) + verify(sender, times(1)).transport(ackRequestForTask0, TaskId(1, 0)) val ackRequestForTask1 = AckRequest(taskId, 200, session) - verify(transport, times(1)).transport(ackRequestForTask1, TaskId(1, 1)) + verify(sender, times(1)).transport(ackRequestForTask1, TaskId(1, 1)) } it should "disallow more message sending if there is no ack back" in { - val (subscription, transport) = prepare + val (subscription, sender) = prepare // send 100 messages 0 until (Subscription.MAX_PENDING_MESSAGE_COUNT * 2 + 1) foreach { clock => + when(sender.getProcessingWatermark).thenReturn(Watermark.MAX) subscription.sendMessage(Message(randomMessage, clock)) } assert(!subscription.allowSendingMoreMessages()) } - it should "report minClock as Long.MaxValue when there is no pending message" in { - val (subscription, _) = prepare - val msg1 = Message("1", timestamp = Instant.ofEpochMilli(70)) - subscription.sendMessage(msg1) - assert(subscription.minClock == 70) - subscription.receiveAck(Ack(TaskId(1, 1), 1, 1, session)) - assert(subscription.minClock == Watermark.MAX.toEpochMilli) - } - private def randomMessage: String = new Random().nextInt.toString }
