Repository: incubator-gearpump Updated Branches: refs/heads/master 175b08e64 -> 0ac68b87f
[GEARPUMP-358] Decrease the frequency of watermark calculation Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the commit message is formatted like: `[GEARPUMP-<Jira issue #>] Meaningful description of pull request` - [ ] Make sure tests pass via `sbt clean test`. - [ ] Make sure old documentation affected by the pull request has been updated and new documentation added for new functionality. Author: huafengw <[email protected]> Closes #232 from huafengw/regre. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/0ac68b87 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/0ac68b87 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/0ac68b87 Branch: refs/heads/master Commit: 0ac68b87f45b31a082cac74e616c09c14d6c69b0 Parents: 175b08e Author: huafengw <[email protected]> Authored: Wed Oct 18 22:20:51 2017 +0800 Committer: manuzhang <[email protected]> Committed: Wed Oct 18 22:20:51 2017 +0800 ---------------------------------------------------------------------- .../scala/org/apache/gearpump/streaming/task/TaskActor.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/0ac68b87/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala index fb2aaed..b43457e 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala @@ -87,6 +87,7 @@ class TaskActor( private var life = taskContextData.life private var subscriptions = List.empty[(Int, Subscription)] private[task] var sessionId = NONE_SESSION + private var minClockReported: Boolean = true // Reports to appMaster with my address express.registerLocalActor(TaskId.toLong(taskId), self) @@ -181,7 +182,10 @@ class TaskActor( case watermark@Watermark(instant) => assert(sender.eq(self), "Watermark should only be sent from Task to itself") - onUpstreamMinClock(instant) + if (minClockReported) { + onUpstreamMinClock(instant) + minClockReported = false + } receiveMessage(watermark.toMessage, sender) case UpstreamMinClock(upstreamClock) => @@ -352,6 +356,7 @@ class TaskActor( val update = UpdateClock(taskId, watermark.toEpochMilli) context.system.scheduler.scheduleOnce(CLOCK_REPORT_INTERVAL) { taskContextData.appMaster ! update + minClockReported = true } }
