Repository: incubator-gearpump Updated Branches: refs/heads/master 2877c81b6 -> 080bdca62
[GEARPUMP-320] Handle stashed messages after onStart Author: manuzhang <[email protected]> Closes #192 from manuzhang/fix_start. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/080bdca6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/080bdca6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/080bdca6 Branch: refs/heads/master Commit: 080bdca62d56d4d798f1cf6b4b12c75d8f93bcdd Parents: 2877c81 Author: manuzhang <[email protected]> Authored: Fri Jun 23 09:57:43 2017 +0800 Committer: manuzhang <[email protected]> Committed: Fri Jun 23 09:57:56 2017 +0800 ---------------------------------------------------------------------- .../org/apache/gearpump/streaming/task/TaskActor.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/080bdca6/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala index 1b90146..1fb61bd 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala @@ -275,16 +275,16 @@ class TaskActor( subscriptions.foreach(_._2.start()) - stashQueue.asScala.foreach { item => - handleMessages(item.sender).apply(item.msg) - } - stashQueue.clear() - // Put this as the last step so that the subscription is already initialized. // Message sending in current Task before onStart will not be delivered to // target onStart(Instant.ofEpochMilli(startClock)) + stashQueue.asScala.foreach { item => + handleMessages(item.sender).apply(item.msg) + } + stashQueue.clear() + taskContextData.appMaster ! GetUpstreamMinClock(taskId) context.become(handleMessages(sender)) }
