fix #1318, fix MinClock not updated fast enough for slow stream
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/269838e8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/269838e8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/269838e8 Branch: refs/heads/master Commit: 269838e8bb4560156344b3c0931a53fbbb43cc92 Parents: 099842a Author: manuzhang <[email protected]> Authored: Mon Dec 14 18:21:49 2015 +0800 Committer: manuzhang <[email protected]> Committed: Tue Apr 26 14:22:39 2016 +0800 ---------------------------------------------------------------------- .../gearpump/streaming/task/Subscription.scala | 49 +++++++++++++++----- .../io/gearpump/streaming/task/TaskActor.scala | 46 +++++++++++------- .../streaming/task/SubscriptionSpec.scala | 4 +- 3 files changed, 70 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/269838e8/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala b/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala index 6836687..5f321e4 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala @@ -107,18 +107,15 @@ class Subscription( this.minClockValue(partition) = Math.min(this.minClockValue(partition), msg.timestamp) this.candidateMinClock(partition) = Math.min(this.candidateMinClock(partition), msg.timestamp) - messageCount(partition) = (messageCount(partition) + 1).toShort - pendingMessageCount(partition) = (pendingMessageCount(partition) + 1).toShort - updateMaxPendingCount() + incrementMessageCount(partition, 1) if (messageCount(partition) % ackOnceEveryMessageCount == 0) { - val ackRequest = AckRequest(taskId, messageCount(partition), sessionId) - transport.transport(ackRequest, targetTask) + sendAckRequest(partition) } - if (messageCount(partition) % maxPendingMessageCount == 0) { - val probeLatency = LatencyProbe(System.currentTimeMillis()) - transport.transport(probeLatency, targetTask) + if (messageCount(partition) / maxPendingMessageCount != + (messageCount(partition) + ackOnceEveryMessageCount) / maxPendingMessageCount) { + sendLatencyProbe(partition) } return 1 @@ -140,8 +137,7 @@ class Subscription( private def flush: Unit = { lastFlushTime = System.currentTimeMillis() allTasks.foreach { targetTaskId => - val ackRequest = AckRequest(taskId, messageCount(targetTaskId.index), sessionId) - transport.transport(ackRequest, targetTaskId) + sendAckRequest(targetTaskId.index) } } @@ -189,9 +185,40 @@ class Subscription( maxPendingCount < maxPendingMessageCount } - private def updateMaxPendingCount() : Unit = { + def sendAckRequestOnStallingTime(stallingTime: TimeStamp): Unit = { + minClockValue.indices.foreach { i => + if (minClockValue(i) == stallingTime && allowSendingMoreMessages) { + sendAckRequest(i) + sendLatencyProbe(i) + } + } + } + + private def sendAckRequest(partition: Int): Unit = { + // we increment more count for each AckRequest + // to throttle the number of unacked AckRequest + incrementMessageCount(partition, ackOnceEveryMessageCount) + val targetTask = TaskId(processorId, partition) + val ackRequest = AckRequest(taskId, messageCount(partition), sessionId) + transport.transport(ackRequest, targetTask) + } + + private def incrementMessageCount(partition: Int, count: Int): Unit = { + messageCount(partition) = (messageCount(partition) + count).toShort + pendingMessageCount(partition) = (pendingMessageCount(partition) + count).toShort + updateMaxPendingCount() + } + + private def updateMaxPendingCount(): Unit = { maxPendingCount = Shorts.max(pendingMessageCount: _*) } + + private def sendLatencyProbe(partition: Int): Unit = { + val probeLatency = LatencyProbe(System.currentTimeMillis()) + val targetTask = TaskId(processorId, partition) + transport.transport(probeLatency, targetTask) + } + } object Subscription { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/269838e8/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala index 9a3ceff..d54fce5 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala @@ -26,7 +26,7 @@ import io.gearpump.cluster.UserConfig import io.gearpump.gs.collections.impl.map.mutable.primitive.IntShortHashMap import io.gearpump.metrics.Metrics import io.gearpump.serializer.SerializationFramework -import io.gearpump.streaming.AppMasterToExecutor.{TaskRejected, _} +import io.gearpump.streaming.AppMasterToExecutor._ import io.gearpump.streaming.ExecutorToAppMaster._ import io.gearpump.streaming.{Constants, ProcessorId} import io.gearpump.util.{LogUtil, TimeOutScheduler} @@ -46,6 +46,7 @@ class TaskActor( inputSerializerPool: SerializationFramework) extends Actor with ExpressTransport with TimeOutScheduler{ var upstreamMinClock: TimeStamp = 0L + private var _minClock: TimeStamp = 0L def serializerPool: SerializationFramework = inputSerializerPool @@ -137,12 +138,6 @@ class TaskActor( context.become(waitForTaskRegistered) } - def minClockAtCurrentTask: TimeStamp = { - this.subscriptions.foldLeft(Long.MaxValue){ (clock, subscription) => - Math.min(clock, subscription._2.minClock) - } - } - private def allowSendingMoreMessages(): Boolean = { subscriptions.forall(_._2.allowSendingMoreMessages()) } @@ -227,7 +222,7 @@ class TaskActor( } case ackRequest: AckRequest => //enqueue to handle the ackRequest and send back ack later - val ackResponse = securityChecker.generateAckResponse(ackRequest, sender) + val ackResponse = securityChecker.generateAckResponse(ackRequest, sender, ackOnceEveryMessageCount) if (null != ackResponse) { queue.add(SendAck(ackResponse, ackRequest.taskId)) doHandleMessage() @@ -242,19 +237,31 @@ class TaskActor( receiveMessage(inputMessage, sender) case upstream@ UpstreamMinClock(upstreamClock) => this.upstreamMinClock = upstreamClock - val latestMinClock = minClock - val update = UpdateClock(taskId, latestMinClock) + + val subMinClock = subscriptions.foldLeft(Long.MaxValue) { (min, sub) => + val subMin = sub._2.minClock + // a subscription is holding back the _minClock; + // we send AckRequest to its tasks to push _minClock forward + if (subMin == _minClock) { + sub._2.sendAckRequestOnStallingTime(_minClock) + } + Math.min(min, subMin) + } + + _minClock = Math.max(life.birth, Math.min(upstreamMinClock, subMinClock)) + + val update = UpdateClock(taskId, _minClock) context.system.scheduler.scheduleOnce(CLOCK_REPORT_INTERVAL) { appMaster ! update } // check whether current task is dead. - if (latestMinClock > life.death) { + if (_minClock > life.death) { // There will be no more message received... val unRegister = UnRegisterTask(taskId, executorId) executor ! unRegister - LOG.info(s"Sending $unRegister, current minclock: $latestMinClock, life: $life") + LOG.info(s"Sending $unRegister, current minclock: ${_minClock}, life: $life") } case ChangeTask(_, dagVersion, life, subscribers) => @@ -285,10 +292,14 @@ class TaskActor( doHandleMessage() } - def minClock: TimeStamp = { - Math.max(life.birth, Math.min(upstreamMinClock, minClockAtCurrentTask)) - } + /** + * @return min clock of this task + */ + def minClock: TimeStamp = _minClock + /** + * @return min clock of upstream task + */ def getUpstreamMinClock: TimeStamp = upstreamMinClock private def receiveMessage(msg: Message, sender: ActorRef): Unit = { @@ -341,9 +352,12 @@ object TaskActor { } } - def generateAckResponse(ackRequest: AckRequest, sender: ActorRef): Ack = { + def generateAckResponse(ackRequest: AckRequest, sender: ActorRef, incrementCount: Int): Ack = { val sessionId = ackRequest.sessionId if (receivedMsgCount.containsKey(sessionId)) { + // we increment more count for each AckRequest + // to throttle the number of unacked AckRequest + receivedMsgCount.put(sessionId, (receivedMsgCount.get(sessionId) + incrementCount).toShort) Ack(task_id, ackRequest.seq, receivedMsgCount.get(sessionId), ackRequest.sessionId) } else { LOG.error(s"get unknown AckRequest $ackRequest from ${sender.toString()}") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/269838e8/streaming/src/test/scala/io/gearpump/streaming/task/SubscriptionSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/task/SubscriptionSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/task/SubscriptionSpec.scala index 138456a..ecad47b 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/task/SubscriptionSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/task/SubscriptionSpec.scala @@ -100,10 +100,10 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar { // we expect to receive two ackRequest for two downstream tasks - val ackRequestForTask0 = AckRequest(taskId, 100, session) + val ackRequestForTask0 = AckRequest(taskId, 200, session) verify(transport, times(1)).transport(ackRequestForTask0, TaskId(1,0)) - val ackRequestForTask1 = AckRequest(taskId, 100, session) + val ackRequestForTask1 = AckRequest(taskId, 200, session) verify(transport, times(1)).transport(ackRequestForTask1, TaskId(1, 1)) }
