Repository: samza Updated Branches: refs/heads/master 6ae7784a5 -> 9f7abf535
SAMZA-951 - Improve event loop timing metrics Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9f7abf53 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9f7abf53 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9f7abf53 Branch: refs/heads/master Commit: 9f7abf535822a1de4d6ac6ee73cb3b879800e4a3 Parents: 6ae7784 Author: Jacob Maes <[email protected]> Authored: Mon May 23 19:51:17 2016 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Mon May 23 19:51:17 2016 -0700 ---------------------------------------------------------------------- .../org/apache/samza/container/RunLoop.scala | 10 +-- .../apache/samza/system/SystemConsumers.scala | 73 +++++++++++++------- .../samza/system/SystemConsumersMetrics.scala | 2 + .../apache/samza/container/TestRunLoop.scala | 2 +- .../samza/system/TestSystemConsumers.scala | 12 +++- 5 files changed, 67 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/9f7abf53/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala index 6916c5c..3f25eca 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala @@ -103,11 +103,13 @@ class RunLoop( trace("Attempting to choose a message to process.") metrics.processes.inc - activeNs += updateTimerAndGetDuration(metrics.processNs) ((currentTimeNs: Long) => { - val envelope = updateTimer(metrics.chooseNs) { - consumerMultiplexer.choose - } + // Exclude choose time from activeNs. Although it includes deserialization time, + // it most closely captures idle time. + val envelope = updateTimer(metrics.chooseNs) { + consumerMultiplexer.choose + } + activeNs += updateTimerAndGetDuration(metrics.processNs) ((currentTimeNs: Long) => { if (envelope != null) { val ssp = envelope.getSystemStreamPartition http://git-wip-us.apache.org/repos/asf/samza/blob/9f7abf53/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala index 32fc771..2efe836 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala @@ -19,9 +19,12 @@ package org.apache.samza.system + +import java.util.concurrent.TimeUnit + import scala.collection.JavaConversions._ import org.apache.samza.serializers.SerdeManager -import org.apache.samza.util.Logging +import org.apache.samza.util.{Logging, TimerUtils} import org.apache.samza.system.chooser.MessageChooser import org.apache.samza.SamzaException import java.util.HashMap @@ -44,7 +47,7 @@ object SystemConsumers { * messages, poll the MessageChooser for the next message to process, and * return that message to the SamzaContainer. */ -class SystemConsumers( +class SystemConsumers ( /** * The class that determines the order to process incoming messages. @@ -59,12 +62,12 @@ class SystemConsumers( /** * The class that handles deserialization of incoming messages. */ - serdeManager: SerdeManager = new SerdeManager, + serdeManager: SerdeManager, /** * A helper class to hold all of SystemConsumers' metrics. */ - metrics: SystemConsumersMetrics = new SystemConsumersMetrics, + metrics: SystemConsumersMetrics, /** * If MessageChooser returns null when it's polled, SystemConsumers will @@ -73,14 +76,14 @@ class SystemConsumers( * thread will sit in a tight loop polling every SystemConsumer over and * over again if no new messages are available. */ - noNewMessagesTimeout: Int = SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT, + noNewMessagesTimeout: Int, /** * This parameter is to define how to deal with deserialization failure. If * set to true, the task will skip the messages when deserialization fails. * If set to false, the task will throw SamzaException and fail the container. */ - dropDeserializationError: Boolean = SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR, + dropDeserializationError: Boolean, /** * <p>Defines an upper bound for how long the SystemConsumers will wait @@ -96,13 +99,29 @@ class SystemConsumers( * with no remaining unprocessed messages, the SystemConsumers will poll for * it within 50ms of its availability in the stream system.</p> */ - pollIntervalMs: Int = SystemConsumers.DEFAULT_POLL_INTERVAL_MS, + pollIntervalMs: Int, /** * Clock can be used to inject a custom clock when mocking this class in * tests. The default implementation returns the current system clock time. */ - clock: () => Long = () => System.currentTimeMillis) extends Logging { + val clock: () => Long) extends Logging with TimerUtils { + + def this(chooser: MessageChooser, + consumers: Map[String, SystemConsumer], + serdeManager: SerdeManager = new SerdeManager, + metrics: SystemConsumersMetrics = new SystemConsumersMetrics, + noNewMessagesTimeout: Int = SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT, + dropDeserializationError: Boolean = SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR, + pollIntervalMs: Int = SystemConsumers.DEFAULT_POLL_INTERVAL_MS) = + this(chooser, + consumers, + serdeManager, + metrics, + noNewMessagesTimeout, + dropDeserializationError, + pollIntervalMs, + () => System.nanoTime()) /** * A buffer of incoming messages grouped by SystemStreamPartition. These @@ -128,7 +147,7 @@ class SystemConsumers( /** * The last time that systems were polled for new messages. */ - var lastPollMs = 0L + var lastPollNs = 0L /** * Total number of unprocessed messages in unprocessedMessagesBySSP. @@ -187,28 +206,32 @@ class SystemConsumers( def choose: IncomingMessageEnvelope = { val envelopeFromChooser = chooser.choose - if (envelopeFromChooser == null) { - trace("Chooser returned null.") + updateTimer(metrics.deserializationNs) { + if (envelopeFromChooser == null) { + trace("Chooser returned null.") - metrics.choseNull.inc + metrics.choseNull.inc - // Sleep for a while so we don't poll in a tight loop. - timeout = noNewMessagesTimeout - } else { - val systemStreamPartition = envelopeFromChooser.getSystemStreamPartition + // Sleep for a while so we don't poll in a tight loop. + timeout = noNewMessagesTimeout + } else { + val systemStreamPartition = envelopeFromChooser.getSystemStreamPartition - trace("Chooser returned an incoming message envelope: %s" format envelopeFromChooser) + trace("Chooser returned an incoming message envelope: %s" format envelopeFromChooser) - // Ok to give the chooser a new message from this stream. - timeout = 0 - metrics.choseObject.inc - metrics.systemStreamMessagesChosen(envelopeFromChooser.getSystemStreamPartition).inc + // Ok to give the chooser a new message from this stream. + timeout = 0 + metrics.choseObject.inc + metrics.systemStreamMessagesChosen(envelopeFromChooser.getSystemStreamPartition).inc - tryUpdate(systemStreamPartition) + tryUpdate(systemStreamPartition) + } } - if (envelopeFromChooser == null || lastPollMs < clock() - pollIntervalMs) { - refresh + updateTimer(metrics.pollNs) { + if (envelopeFromChooser == null || TimeUnit.NANOSECONDS.toMillis(clock() - lastPollNs) > pollIntervalMs) { + refresh + } } envelopeFromChooser @@ -280,7 +303,7 @@ class SystemConsumers( trace("Refreshing chooser with new messages.") // Update last poll time so we don't poll too frequently. - lastPollMs = clock() + lastPollNs = clock() // Poll every system for new messages. consumers.keys.map(poll(_)) http://git-wip-us.apache.org/repos/asf/samza/blob/9f7abf53/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala index e7f012f..43d381b 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala @@ -32,6 +32,8 @@ class SystemConsumersMetrics(val registry: MetricsRegistry = new MetricsRegistry val systemStreamPartitionFetchesPerPoll = scala.collection.mutable.Map[String, Counter]() val systemMessagesPerPoll = scala.collection.mutable.Map[String, Counter]() val systemStreamMessagesChosen = scala.collection.mutable.Map[SystemStreamPartition, Counter]() + val pollNs = newTimer("poll-ns") + val deserializationNs = newTimer("deserialization-ns") def setNeededByChooser(getValue: () => Int) { newGauge("ssps-needed-by-chooser", getValue) http://git-wip-us.apache.org/repos/asf/samza/blob/9f7abf53/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala index ad37447..e280daa 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala @@ -217,7 +217,7 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMat testMetrics.chooseNs.getSnapshot.getAverage should equal(1000000L) testMetrics.windowNs.getSnapshot.getAverage should equal(1000000L) - testMetrics.processNs.getSnapshot.getAverage should equal(3000000L) + testMetrics.processNs.getSnapshot.getAverage should equal(1000000L) testMetrics.commitNs.getSnapshot.getAverage should equal(1000000L) now = 0L http://git-wip-us.apache.org/repos/asf/samza/blob/9f7abf53/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala index fbaa8ee..09da62e 100644 --- a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala +++ b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala @@ -39,7 +39,11 @@ class TestSystemConsumers { val envelope = new IncomingMessageEnvelope(systemStreamPartition0, "1", "k", "v") val consumer = new CustomPollResponseSystemConsumer(envelope) var now = 0L - val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer), clock = () => now) + val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer), + new SerdeManager, new SystemConsumersMetrics, + SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT, + SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR, + SystemConsumers.DEFAULT_POLL_INTERVAL_MS, clock = () => now) consumers.register(systemStreamPartition0, "0") consumers.register(systemStreamPartition1, "1234") @@ -97,7 +101,11 @@ class TestSystemConsumers { val envelope = new IncomingMessageEnvelope(systemStreamPartition, "1", "k", "v") val consumer = new CustomPollResponseSystemConsumer(envelope) var now = 0 - val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer), clock = () => now) + val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer), + new SerdeManager, new SystemConsumersMetrics, + SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT, + SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR, + SystemConsumers.DEFAULT_POLL_INTERVAL_MS, clock = () => now) consumers.register(systemStreamPartition, "0") consumers.start
