Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 2877c81b6 -> 080bdca62


[GEARPUMP-320] Handle stashed messages after onStart

Author: manuzhang <[email protected]>

Closes #192 from manuzhang/fix_start.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/080bdca6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/080bdca6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/080bdca6

Branch: refs/heads/master
Commit: 080bdca62d56d4d798f1cf6b4b12c75d8f93bcdd
Parents: 2877c81
Author: manuzhang <[email protected]>
Authored: Fri Jun 23 09:57:43 2017 +0800
Committer: manuzhang <[email protected]>
Committed: Fri Jun 23 09:57:56 2017 +0800

----------------------------------------------------------------------
 .../org/apache/gearpump/streaming/task/TaskActor.scala    | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/080bdca6/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
index 1b90146..1fb61bd 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
@@ -275,16 +275,16 @@ class TaskActor(
 
     subscriptions.foreach(_._2.start())
 
-    stashQueue.asScala.foreach { item =>
-      handleMessages(item.sender).apply(item.msg)
-    }
-    stashQueue.clear()
-
     // Put this as the last step so that the subscription is already 
initialized.
     // Message sending in current Task before onStart will not be delivered to
     // target
     onStart(Instant.ofEpochMilli(startClock))
 
+    stashQueue.asScala.foreach { item =>
+      handleMessages(item.sender).apply(item.msg)
+    }
+    stashQueue.clear()
+
     taskContextData.appMaster ! GetUpstreamMinClock(taskId)
     context.become(handleMessages(sender))
   }

Reply via email to