Repository: incubator-gearpump Updated Branches: refs/heads/master d6cc5da7b -> fb0f1ef31
[GEARPUMP-271] Don't count self messages in receiveThroughput Author: manuzhang <[email protected]> Closes #147 from manuzhang/source. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/fb0f1ef3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/fb0f1ef3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/fb0f1ef3 Branch: refs/heads/master Commit: fb0f1ef312e39fde7b987be9a55d2e58115e53e5 Parents: d6cc5da Author: manuzhang <[email protected]> Authored: Tue Feb 14 06:10:39 2017 +0800 Committer: manuzhang <[email protected]> Committed: Tue Feb 14 06:10:46 2017 +0800 ---------------------------------------------------------------------- .../apache/gearpump/streaming/task/TaskActor.scala | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb0f1ef3/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala index c814fa5..14c2b59 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala @@ -147,11 +147,10 @@ class TaskActor( subscriptions.forall(_._2.allowSendingMoreMessages()) } - private def doHandleMessage(): Unit = { + private def doHandleMessage(): Int = { var done = false var count = 0 - val start = System.currentTimeMillis() while (allowSendingMoreMessages() && !done) { val msg = queue.poll() @@ -171,10 +170,7 @@ class TaskActor( } } - receiveThroughput.mark(count) - if (count > 0) { - processTime.update((System.currentTimeMillis() - start) / count) - } + count } private def onStartClock(): Unit = { @@ -296,7 +292,14 @@ class TaskActor( messageAfterCheck match { case Some(m) => queue.add(m) - doHandleMessage() + val start = System.currentTimeMillis() + val count = doHandleMessage() + if (!self.eq(sender)) { + receiveThroughput.mark(count) + } + if (count > 0) { + processTime.update((System.currentTimeMillis() - start) / count) + } case None => // TODO: Indicate the error and avoid the LOG flood // LOG.error(s"Task $taskId drop message $msg")
