Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 175b08e64 -> 0ac68b87f


[GEARPUMP-358] Decrease the frequency of watermark calculation

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the commit message is formatted like:
   `[GEARPUMP-<Jira issue #>] Meaningful description of pull request`
 - [ ] Make sure tests pass via `sbt clean test`.
 - [ ] Make sure old documentation affected by the pull request has been 
updated and new documentation added for new functionality.

Author: huafengw <[email protected]>

Closes #232 from huafengw/regre.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/0ac68b87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/0ac68b87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/0ac68b87

Branch: refs/heads/master
Commit: 0ac68b87f45b31a082cac74e616c09c14d6c69b0
Parents: 175b08e
Author: huafengw <[email protected]>
Authored: Wed Oct 18 22:20:51 2017 +0800
Committer: manuzhang <[email protected]>
Committed: Wed Oct 18 22:20:51 2017 +0800

----------------------------------------------------------------------
 .../scala/org/apache/gearpump/streaming/task/TaskActor.scala  | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/0ac68b87/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
index fb2aaed..b43457e 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
@@ -87,6 +87,7 @@ class TaskActor(
   private var life = taskContextData.life
   private var subscriptions = List.empty[(Int, Subscription)]
   private[task] var sessionId = NONE_SESSION
+  private var minClockReported: Boolean = true
 
   // Reports to appMaster with my address
   express.registerLocalActor(TaskId.toLong(taskId), self)
@@ -181,7 +182,10 @@ class TaskActor(
 
     case watermark@Watermark(instant) =>
       assert(sender.eq(self), "Watermark should only be sent from Task to 
itself")
-      onUpstreamMinClock(instant)
+      if (minClockReported) {
+        onUpstreamMinClock(instant)
+        minClockReported = false
+      }
       receiveMessage(watermark.toMessage, sender)
 
     case UpstreamMinClock(upstreamClock) =>
@@ -352,6 +356,7 @@ class TaskActor(
     val update = UpdateClock(taskId, watermark.toEpochMilli)
     context.system.scheduler.scheduleOnce(CLOCK_REPORT_INTERVAL) {
       taskContextData.appMaster ! update
+      minClockReported = true
     }
   }
 

Reply via email to