Repository: incubator-samza Updated Branches: refs/heads/master 8acef467f -> 3964ce735
SAMZA-251: add metrics for choose/process/window/commit/send time Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/3964ce73 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/3964ce73 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/3964ce73 Branch: refs/heads/master Commit: 3964ce73530fac3d658b088e1f005054bc7214d1 Parents: 8acef46 Author: Yan Fang <[email protected]> Authored: Thu Aug 7 23:01:17 2014 -0700 Committer: Yan Fang <[email protected]> Committed: Thu Aug 7 23:01:17 2014 -0700 ---------------------------------------------------------------------- .../org/apache/samza/container/RunLoop.scala | 97 +++++++++++--------- .../samza/container/SamzaContainerMetrics.scala | 5 + .../org/apache/samza/util/TimerUtils.scala | 41 +++++++++ .../apache/samza/container/TestRunLoop.scala | 44 ++++++++- 4 files changed, 139 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3964ce73/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala index 7fb4763..6851731 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala @@ -20,8 +20,9 @@ package org.apache.samza.container import grizzled.slf4j.Logging -import org.apache.samza.system.{SystemStreamPartition, SystemConsumers} +import org.apache.samza.system.{ SystemStreamPartition, SystemConsumers } import org.apache.samza.task.ReadableCoordinator +import org.apache.samza.util.TimerUtils /** * Each {@link SamzaContainer} uses a single-threaded execution model: activities for @@ -38,7 +39,7 @@ class RunLoop( val metrics: SamzaContainerMetrics, val windowMs: Long = -1, val commitMs: Long = 60000, - val clock: () => Long = { System.currentTimeMillis }) extends Runnable with Logging { + val clock: () => Long = { System.currentTimeMillis }) extends Runnable with TimerUtils with Logging { private var lastWindowMs = 0L private var lastCommitMs = 0L @@ -52,7 +53,7 @@ class RunLoop( // We could just pass in the SystemStreamPartitionMap during construction, but it's safer and cleaner to derive the information directly def getSystemStreamPartitionToTaskInstance(taskInstance: TaskInstance) = taskInstance.systemStreamPartitions.map(_ -> taskInstance).toMap - taskInstances.values.map{ getSystemStreamPartitionToTaskInstance }.flatten.toMap + taskInstances.values.map { getSystemStreamPartitionToTaskInstance }.flatten.toMap } /** @@ -76,22 +77,27 @@ class RunLoop( trace("Attempting to choose a message to process.") metrics.processes.inc - val envelope = consumerMultiplexer.choose + updateTimer(metrics.processMs) { - if (envelope != null) { - val ssp = envelope.getSystemStreamPartition + val envelope = updateTimer(metrics.chooseMs) { + consumerMultiplexer.choose + } + + if (envelope != null) { + val ssp = envelope.getSystemStreamPartition - trace("Processing incoming message envelope for SSP %s." format ssp) - metrics.envelopes.inc + trace("Processing incoming message envelope for SSP %s." format ssp) + metrics.envelopes.inc - val taskInstance = systemStreamPartitionToTaskInstance(ssp) + val taskInstance = systemStreamPartitionToTaskInstance(ssp) - val coordinator = new ReadableCoordinator(taskInstance.taskName) - taskInstance.process(envelope, coordinator) - checkCoordinator(coordinator) - } else { - trace("No incoming message envelope was available.") - metrics.nullEnvelopes.inc + val coordinator = new ReadableCoordinator(taskInstance.taskName) + taskInstance.process(envelope, coordinator) + checkCoordinator(coordinator) + } else { + trace("No incoming message envelope was available.") + metrics.nullEnvelopes.inc + } } } @@ -99,52 +105,56 @@ class RunLoop( * Invokes WindowableTask.window on all tasks if it's time to do so. */ private def window { - if (windowMs >= 0 && lastWindowMs + windowMs < clock()) { - trace("Windowing stream tasks.") - lastWindowMs = clock() - metrics.windows.inc - - taskInstances.foreach { case (taskName, task) => - val coordinator = new ReadableCoordinator(taskName) - task.window(coordinator) - checkCoordinator(coordinator) + updateTimer(metrics.windowMs) { + if (windowMs >= 0 && lastWindowMs + windowMs < clock()) { + trace("Windowing stream tasks.") + lastWindowMs = clock() + metrics.windows.inc + + taskInstances.foreach { + case (taskName, task) => + val coordinator = new ReadableCoordinator(taskName) + task.window(coordinator) + checkCoordinator(coordinator) + } } } } - /** * If task instances published any messages to output streams, this flushes * them to the underlying systems. */ private def send { - trace("Triggering send in task instances.") - metrics.sends.inc - taskInstances.values.foreach(_.send) + updateTimer(metrics.sendMs) { + trace("Triggering send in task instances.") + metrics.sends.inc + taskInstances.values.foreach(_.send) + } } - /** * Commits task state as a a checkpoint, if necessary. */ private def commit { - if (commitMs >= 0 && lastCommitMs + commitMs < clock()) { - trace("Committing task instances because the commit interval has elapsed.") - lastCommitMs = clock() - metrics.commits.inc - taskInstances.values.foreach(_.commit) - } else if (!taskCommitRequests.isEmpty) { - trace("Committing due to explicit commit request.") - metrics.commits.inc - taskCommitRequests.foreach(taskName => { - taskInstances(taskName).commit - }) - } + updateTimer(metrics.commitMs) { + if (commitMs >= 0 && lastCommitMs + commitMs < clock()) { + trace("Committing task instances because the commit interval has elapsed.") + lastCommitMs = clock() + metrics.commits.inc + taskInstances.values.foreach(_.commit) + } else if (!taskCommitRequests.isEmpty) { + trace("Committing due to explicit commit request.") + metrics.commits.inc + taskCommitRequests.foreach(taskName => { + taskInstances(taskName).commit + }) + } - taskCommitRequests = Set() + taskCommitRequests = Set() + } } - /** * A new TaskCoordinator object is passed to a task on every call to StreamTask.process * and WindowableTask.window. This method checks whether the task requested that we @@ -172,5 +182,4 @@ class RunLoop( shutdownNow = true } } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3964ce73/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala index bcb3fa3..44d5dff 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala @@ -33,4 +33,9 @@ class SamzaContainerMetrics( val sends = newCounter("send-calls") val envelopes = newCounter("process-envelopes") val nullEnvelopes = newCounter("process-null-envelopes") + val chooseMs = newTimer("choose-ms") + val windowMs = newTimer("window-ms") + val processMs = newTimer("process-ms") + val commitMs = newTimer("commit-ms") + val sendMs = newTimer("send-ms") } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3964ce73/samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala b/samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala new file mode 100644 index 0000000..6fedb62 --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.util + +import org.apache.samza.metrics.Timer + +/** + * a helper class to facilitate update {@link org.apache.samza.metrics.Timer} metric + */ +trait TimerUtils { + val clock: () => Long + + /** + * A helper method to update the {@link org.apache.samza.metrics.Timer} metric. + * It accepts a {@link org.apache.samza.metrics.Timer} instance and a code block. + * It updates the Timer instance with the duration of running code block. + */ + def updateTimer[T](timer: Timer)(runCodeBlock: => T): T = { + val startingTime = clock() + val returnValue = runCodeBlock + timer.update(clock() - startingTime) + returnValue + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3964ce73/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala index d4ceffc..86b7f31 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala @@ -28,7 +28,7 @@ import org.scalatest.junit.AssertionsForJUnit import org.scalatest.matchers.ShouldMatchers import org.scalatest.mock.MockitoSugar import org.apache.samza.Partition -import org.apache.samza.system.{IncomingMessageEnvelope, SystemConsumers, SystemStreamPartition} +import org.apache.samza.system.{ IncomingMessageEnvelope, SystemConsumers, SystemStreamPartition } import org.apache.samza.task.ReadableCoordinator import org.apache.samza.task.TaskCoordinator.RequestScope @@ -70,7 +70,6 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ShouldMatche runLoop.metrics.nullEnvelopes.getCount should equal(0L) } - @Test def testNullMessageFromChooser { val consumers = mock[SystemConsumers] @@ -171,10 +170,47 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ShouldMatche def stubProcess(taskInstance: TaskInstance, process: (IncomingMessageEnvelope, ReadableCoordinator) => Unit) { when(taskInstance.process(anyObject, anyObject)).thenAnswer(new Answer[Unit]() { override def answer(invocation: InvocationOnMock) { - val envelope = invocation.getArguments()(0).asInstanceOf[IncomingMessageEnvelope] + val envelope = invocation.getArguments()(0).asInstanceOf[IncomingMessageEnvelope] val coordinator = invocation.getArguments()(1).asInstanceOf[ReadableCoordinator] process(envelope, coordinator) } }) } -} + + @Test + def testUpdateTimerCorrectly { + var now = 0L + val consumers = mock[SystemConsumers] + when(consumers.choose).thenReturn(envelope0) + val testMetrics = new SamzaContainerMetrics + val runLoop = new RunLoop( + taskInstances = getMockTaskInstances, + consumerMultiplexer = consumers, + metrics = testMetrics, + windowMs = 1L, + commitMs = 1L, + clock = () => { + now += 1L + // clock() is called 15 times totally in RunLoop + // stop the runLoop after one run + if (now == 15L) throw new StopRunLoop + now + }) + intercept[StopRunLoop] { runLoop.run } + + testMetrics.chooseMs.getSnapshot.getAverage should equal(1L) + testMetrics.windowMs.getSnapshot.getAverage should equal(3L) + testMetrics.processMs.getSnapshot.getAverage should equal(3L) + testMetrics.commitMs.getSnapshot.getAverage should equal(3L) + testMetrics.sendMs.getSnapshot.getAverage should equal(1L) + + now = 0L + intercept[StopRunLoop] { runLoop.run } + // after two loops + testMetrics.chooseMs.getSnapshot.getSize should equal(2) + testMetrics.windowMs.getSnapshot.getSize should equal(2) + testMetrics.processMs.getSnapshot.getSize should equal(2) + testMetrics.commitMs.getSnapshot.getSize should equal(2) + testMetrics.sendMs.getSnapshot.getSize should equal(2) + } +} \ No newline at end of file
