Repository: samza Updated Branches: refs/heads/master 4e5907090 -> 7b0082b8c
SAMZA-1972: Make Operator Timer metrics calculation configurable This patch introduces two changes: 1. Make the timer metrics in OperatorImpl to be optional, and disabled by default. Adding TimerMetrics has quite a big performance impact which affects jobs with large number of operators, so it should be turned on for debugging only. 2. Register operator-level metrics on the container metrics registry. The task level registry has too many metrics which are usually ignored by the users. Having it in the container level will reduce the total amount of metrics published as well as the memory footprint. Tested by hello-samza and works as expected. Author: xinyuiscool <[email protected]> Reviewers: Jagadish V <[email protected]> Closes #805 from xinyuiscool/SAMZA-1972 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7b0082b8 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7b0082b8 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7b0082b8 Branch: refs/heads/master Commit: 7b0082b8c0b5de8e400b8deaae0604bf3cb293a3 Parents: 4e59070 Author: xinyuiscool <[email protected]> Authored: Thu Nov 15 16:01:53 2018 -0800 Committer: xinyuiscool <[email protected]> Committed: Thu Nov 15 16:01:53 2018 -0800 ---------------------------------------------------------------------- .../versioned/jobs/configuration-table.html | 17 +++++++++++++++++ .../apache/samza/operators/impl/OperatorImpl.java | 16 +++++++++++----- .../org/apache/samza/config/MetricsConfig.scala | 11 +++++++++++ .../java/org/apache/samza/context/MockContext.java | 6 +++++- .../apache/samza/operators/TestJoinOperator.java | 1 + .../samza/operators/impl/TestOperatorImpl.java | 5 +++-- .../operators/impl/TestOperatorImplGraph.java | 1 + .../samza/operators/impl/TestWindowOperator.java | 1 + 8 files changed, 50 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/7b0082b8/docs/learn/documentation/versioned/jobs/configuration-table.html ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index c7e987c..ad9f69f 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -2239,6 +2239,23 @@ 60 seconds. </td> </tr> + <tr> + <td class="property" id="metrics-timer-enabled">metrics.timer.enabled</td> + <td class="default">true</td> + <td class="description"> + This setting enables the common timer metrics for your job, which include container metrics + such as process-ns, window-ns, commit-ns, and block-ns, as well as key-value storage engine + metrics such as get-ns, put-ns and range-ns. + </td> + </tr> + <tr> + <td class="property" id="metrics-timer-debug-eanbled">metrics.timer.debug.enabled</td> + <td class="default">false</td> + <td class="description"> + This setting enables the additional timer metrics for debugging of your job. These metrics + include operator metrics such as handle-message-ns and handle-timer-ns. + </td> + </tr> <tr> <th colspan="3" class="section" id="hdfs-system-producer"><a href="../hdfs/producer.html">Writing to HDFS</a></th> http://git-wip-us.apache.org/repos/asf/samza/blob/7b0082b8/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java index 675b211..728a171 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java @@ -22,6 +22,7 @@ import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.MetricsConfig; import org.apache.samza.container.TaskName; +import org.apache.samza.context.ContainerContext; import org.apache.samza.context.Context; import org.apache.samza.context.TaskContextImpl; import org.apache.samza.job.model.TaskModel; @@ -105,14 +106,16 @@ public abstract class OperatorImpl<M, RM> { registeredOperators = new HashSet<>(); prevOperators = new HashSet<>(); inputStreams = new HashSet<>(); - // TODO SAMZA-1935: the objects that are only accessible through TaskContextImpl should be moved somewhere else - TaskContextImpl taskContext = (TaskContextImpl) context.getTaskContext(); - MetricsRegistry metricsRegistry = taskContext.getTaskMetricsRegistry(); + + final ContainerContext containerContext = context.getContainerContext(); + final MetricsRegistry metricsRegistry = containerContext.getContainerMetricsRegistry(); this.numMessage = metricsRegistry.newCounter(METRICS_GROUP, opId + "-messages"); this.handleMessageNs = metricsRegistry.newTimer(METRICS_GROUP, opId + "-handle-message-ns"); this.handleTimerNs = metricsRegistry.newTimer(METRICS_GROUP, opId + "-handle-timer-ns"); - this.taskName = taskContext.getTaskModel().getTaskName(); + // TODO SAMZA-1935: the objects that are only accessible through TaskContextImpl should be moved somewhere else + final TaskContextImpl taskContext = (TaskContextImpl) context.getTaskContext(); + this.taskName = taskContext.getTaskModel().getTaskName(); this.eosStates = (EndOfStreamStates) taskContext.fetchObject(EndOfStreamStates.class.getName()); this.watermarkStates = (WatermarkStates) taskContext.fetchObject(WatermarkStates.class.getName()); this.controlMessageSender = new ControlMessageSender(taskContext.getStreamMetadataCache()); @@ -495,7 +498,10 @@ public abstract class OperatorImpl<M, RM> { } private HighResolutionClock createHighResClock(Config config) { - if (new MetricsConfig(config).getMetricsTimerEnabled()) { + MetricsConfig metricsConfig = new MetricsConfig(config); + // The timer metrics calculation here is only enabled for debugging + if (metricsConfig.getMetricsTimerEnabled() + && metricsConfig.getMetricsTimerDebugEnabled()) { return System::nanoTime; } else { return () -> 0; http://git-wip-us.apache.org/repos/asf/samza/blob/7b0082b8/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala index 95350cf..da29013 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala @@ -27,7 +27,10 @@ object MetricsConfig { // metrics config constants val METRICS_REPORTERS = "metrics.reporters" val METRICS_REPORTER_FACTORY = "metrics.reporter.%s.class" + // This flag enables the common timer metrics, e.g. process_ns val METRICS_TIMER_ENABLED= "metrics.timer.enabled" + // This flag enables more timer metrics, e.g. handle-message-ns in an operator, for debugging purpose + val METRICS_TIMER_DEBUG_ENABLED= "metrics.timer.debug.enabled" // The following configs are applicable only to {@link MetricsSnapshotReporter} // added here only to maintain backwards compatibility of config @@ -70,4 +73,12 @@ class MetricsConfig(config: Config) extends ScalaMapConfig(config) { * @return Boolean flag to enable the timer metrics */ def getMetricsTimerEnabled: Boolean = getBoolean(MetricsConfig.METRICS_TIMER_ENABLED, true) + + /** + * Returns the flag to enable the debug timer metrics. These metrics + * are turned off by default for better performance. + * @return Boolean + */ + def getMetricsTimerDebugEnabled: Boolean = getBoolean(MetricsConfig.METRICS_TIMER_DEBUG_ENABLED, false) + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/7b0082b8/samza-core/src/test/java/org/apache/samza/context/MockContext.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/context/MockContext.java b/samza-core/src/test/java/org/apache/samza/context/MockContext.java index 778d486..fdab692 100644 --- a/samza-core/src/test/java/org/apache/samza/context/MockContext.java +++ b/samza-core/src/test/java/org/apache/samza/context/MockContext.java @@ -22,6 +22,8 @@ package org.apache.samza.context; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; +import java.util.Collections; + import static org.mockito.Mockito.*; @@ -36,7 +38,9 @@ public class MockContext implements Context { private final ApplicationTaskContext applicationTaskContext = mock(ApplicationTaskContext.class); public MockContext() { - this(new MapConfig()); + this(new MapConfig( + Collections.singletonMap("metrics.timer.debug.enabled", "true") + )); } /** http://git-wip-us.apache.org/repos/asf/samza/blob/7b0082b8/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java index b3b0a5d..90e59ae 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java @@ -311,6 +311,7 @@ public class TestJoinOperator { new SystemStreamPartition("insystem", "instream2", new Partition(0)))); when(context.getTaskContext().getTaskModel()).thenReturn(taskModel); when(context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new MetricsRegistryMap()); + when(context.getContainerContext().getContainerMetricsRegistry()).thenReturn(new MetricsRegistryMap()); // need to return different stores for left and right side IntegerSerde integerSerde = new IntegerSerde(); TimestampedValueSerde timestampedValueSerde = new TimestampedValueSerde(new KVSerde(integerSerde, integerSerde)); http://git-wip-us.apache.org/repos/asf/samza/blob/7b0082b8/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java index 0ff2e0d..d5d7f3d 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java @@ -53,6 +53,7 @@ public class TestOperatorImpl { this.context = new MockContext(); when(this.context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new MetricsRegistryMap()); when(this.context.getTaskContext().getTaskModel()).thenReturn(mock(TaskModel.class)); + when(this.context.getContainerContext().getContainerMetricsRegistry()).thenReturn(new MetricsRegistryMap()); } @Test(expected = IllegalStateException.class) @@ -100,7 +101,7 @@ public class TestOperatorImpl { @Test public void testOnMessageUpdatesMetrics() { ReadableMetricsRegistry mockMetricsRegistry = mock(ReadableMetricsRegistry.class); - when(this.context.getTaskContext().getTaskMetricsRegistry()).thenReturn(mockMetricsRegistry); + when(this.context.getContainerContext().getContainerMetricsRegistry()).thenReturn(mockMetricsRegistry); Counter mockCounter = mock(Counter.class); Timer mockTimer = mock(Timer.class); when(mockMetricsRegistry.newCounter(anyString(), anyString())).thenReturn(mockCounter); @@ -156,7 +157,7 @@ public class TestOperatorImpl { @Test public void testOnTimerUpdatesMetrics() { ReadableMetricsRegistry mockMetricsRegistry = mock(ReadableMetricsRegistry.class); - when(this.context.getTaskContext().getTaskMetricsRegistry()).thenReturn(mockMetricsRegistry); + when(this.context.getContainerContext().getContainerMetricsRegistry()).thenReturn(mockMetricsRegistry); Counter mockMessageCounter = mock(Counter.class); Timer mockTimer = mock(Timer.class); when(mockMetricsRegistry.newCounter(anyString(), anyString())).thenReturn(mockMessageCounter); http://git-wip-us.apache.org/repos/asf/samza/blob/7b0082b8/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java index 2da5858..5f1b7e6 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java @@ -97,6 +97,7 @@ public class TestOperatorImplGraph { when(taskModel.getTaskName()).thenReturn(new TaskName("task 0")); when(this.context.getTaskContext().getTaskModel()).thenReturn(taskModel); when(this.context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new MetricsRegistryMap()); + when(this.context.getContainerContext().getContainerMetricsRegistry()).thenReturn(new MetricsRegistryMap()); } @After http://git-wip-us.apache.org/repos/asf/samza/blob/7b0082b8/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java index eae1ef4..0b240f1 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java @@ -99,6 +99,7 @@ public class TestWindowOperator { when(taskModel.getTaskName()).thenReturn(new TaskName("task 1")); when(this.context.getTaskContext().getTaskModel()).thenReturn(taskModel); when(this.context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new MetricsRegistryMap()); + when(this.context.getContainerContext().getContainerMetricsRegistry()).thenReturn(new MetricsRegistryMap()); when(this.context.getTaskContext().getStore("jobName-jobId-window-w1")) .thenReturn(new TestInMemoryStore<>(storeKeySerde, storeValSerde)); }
