Repository: samza
Updated Branches:
  refs/heads/master 4e5907090 -> 7b0082b8c


SAMZA-1972: Make Operator Timer metrics calculation configurable

This patch introduces two changes:
1. Make the timer metrics in OperatorImpl to be optional, and disabled by 
default. Adding TimerMetrics has quite a big performance impact which affects 
jobs with large number of operators, so it should be turned on for debugging 
only.
2. Register operator-level metrics on the container metrics registry. The task 
level registry has too many metrics which are usually ignored by the users. 
Having it in the container level will reduce the total amount of metrics 
published as well as the memory footprint.

Tested by hello-samza and works as expected.

Author: xinyuiscool <[email protected]>

Reviewers: Jagadish V <[email protected]>

Closes #805 from xinyuiscool/SAMZA-1972


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7b0082b8
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7b0082b8
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7b0082b8

Branch: refs/heads/master
Commit: 7b0082b8c0b5de8e400b8deaae0604bf3cb293a3
Parents: 4e59070
Author: xinyuiscool <[email protected]>
Authored: Thu Nov 15 16:01:53 2018 -0800
Committer: xinyuiscool <[email protected]>
Committed: Thu Nov 15 16:01:53 2018 -0800

----------------------------------------------------------------------
 .../versioned/jobs/configuration-table.html        | 17 +++++++++++++++++
 .../apache/samza/operators/impl/OperatorImpl.java  | 16 +++++++++++-----
 .../org/apache/samza/config/MetricsConfig.scala    | 11 +++++++++++
 .../java/org/apache/samza/context/MockContext.java |  6 +++++-
 .../apache/samza/operators/TestJoinOperator.java   |  1 +
 .../samza/operators/impl/TestOperatorImpl.java     |  5 +++--
 .../operators/impl/TestOperatorImplGraph.java      |  1 +
 .../samza/operators/impl/TestWindowOperator.java   |  1 +
 8 files changed, 50 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/7b0082b8/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html 
b/docs/learn/documentation/versioned/jobs/configuration-table.html
index c7e987c..ad9f69f 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -2239,6 +2239,23 @@
                         60 seconds.
                     </td>
                 </tr>
+                <tr>
+                    <td class="property" 
id="metrics-timer-enabled">metrics.timer.enabled</td>
+                    <td class="default">true</td>
+                    <td class="description">
+                        This setting enables the common timer metrics for your 
job, which include container metrics
+                        such as process-ns, window-ns, commit-ns, and 
block-ns, as well as key-value storage engine
+                        metrics such as get-ns, put-ns and range-ns.
+                    </td>
+                </tr>
+                <tr>
+                    <td class="property" 
id="metrics-timer-debug-eanbled">metrics.timer.debug.enabled</td>
+                    <td class="default">false</td>
+                    <td class="description">
+                        This setting enables the additional timer metrics for 
debugging of your job. These metrics
+                        include operator metrics such as handle-message-ns and 
handle-timer-ns.
+                    </td>
+                </tr>
 
                 <tr>
                     <th colspan="3" class="section" 
id="hdfs-system-producer"><a href="../hdfs/producer.html">Writing to 
HDFS</a></th>

http://git-wip-us.apache.org/repos/asf/samza/blob/7b0082b8/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java 
b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index 675b211..728a171 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -22,6 +22,7 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.container.TaskName;
+import org.apache.samza.context.ContainerContext;
 import org.apache.samza.context.Context;
 import org.apache.samza.context.TaskContextImpl;
 import org.apache.samza.job.model.TaskModel;
@@ -105,14 +106,16 @@ public abstract class OperatorImpl<M, RM> {
     registeredOperators = new HashSet<>();
     prevOperators = new HashSet<>();
     inputStreams = new HashSet<>();
-    // TODO SAMZA-1935: the objects that are only accessible through 
TaskContextImpl should be moved somewhere else
-    TaskContextImpl taskContext = (TaskContextImpl) context.getTaskContext();
-    MetricsRegistry metricsRegistry = taskContext.getTaskMetricsRegistry();
+
+    final ContainerContext containerContext = context.getContainerContext();
+    final MetricsRegistry metricsRegistry = 
containerContext.getContainerMetricsRegistry();
     this.numMessage = metricsRegistry.newCounter(METRICS_GROUP, opId + 
"-messages");
     this.handleMessageNs = metricsRegistry.newTimer(METRICS_GROUP, opId + 
"-handle-message-ns");
     this.handleTimerNs = metricsRegistry.newTimer(METRICS_GROUP, opId + 
"-handle-timer-ns");
-    this.taskName = taskContext.getTaskModel().getTaskName();
 
+    // TODO SAMZA-1935: the objects that are only accessible through 
TaskContextImpl should be moved somewhere else
+    final TaskContextImpl taskContext = (TaskContextImpl) 
context.getTaskContext();
+    this.taskName = taskContext.getTaskModel().getTaskName();
     this.eosStates = (EndOfStreamStates) 
taskContext.fetchObject(EndOfStreamStates.class.getName());
     this.watermarkStates = (WatermarkStates) 
taskContext.fetchObject(WatermarkStates.class.getName());
     this.controlMessageSender = new 
ControlMessageSender(taskContext.getStreamMetadataCache());
@@ -495,7 +498,10 @@ public abstract class OperatorImpl<M, RM> {
   }
 
   private HighResolutionClock createHighResClock(Config config) {
-    if (new MetricsConfig(config).getMetricsTimerEnabled()) {
+    MetricsConfig metricsConfig = new MetricsConfig(config);
+    // The timer metrics calculation here is only enabled for debugging
+    if (metricsConfig.getMetricsTimerEnabled()
+        && metricsConfig.getMetricsTimerDebugEnabled()) {
       return System::nanoTime;
     } else {
       return () -> 0;

http://git-wip-us.apache.org/repos/asf/samza/blob/7b0082b8/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala
index 95350cf..da29013 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala
@@ -27,7 +27,10 @@ object MetricsConfig {
   // metrics config constants
   val METRICS_REPORTERS = "metrics.reporters"
   val METRICS_REPORTER_FACTORY = "metrics.reporter.%s.class"
+  // This flag enables the common timer metrics, e.g. process_ns
   val METRICS_TIMER_ENABLED= "metrics.timer.enabled"
+  // This flag enables more timer metrics, e.g. handle-message-ns in an 
operator, for debugging purpose
+  val METRICS_TIMER_DEBUG_ENABLED= "metrics.timer.debug.enabled"
 
   // The following configs are applicable only to {@link 
MetricsSnapshotReporter}
   // added here only to maintain backwards compatibility of config
@@ -70,4 +73,12 @@ class MetricsConfig(config: Config) extends 
ScalaMapConfig(config) {
    * @return Boolean flag to enable the timer metrics
    */
   def getMetricsTimerEnabled: Boolean = 
getBoolean(MetricsConfig.METRICS_TIMER_ENABLED, true)
+
+  /**
+    * Returns the flag to enable the debug timer metrics. These metrics
+    * are turned off by default for better performance.
+    * @return Boolean
+    */
+  def getMetricsTimerDebugEnabled: Boolean = 
getBoolean(MetricsConfig.METRICS_TIMER_DEBUG_ENABLED, false)
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/7b0082b8/samza-core/src/test/java/org/apache/samza/context/MockContext.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/context/MockContext.java 
b/samza-core/src/test/java/org/apache/samza/context/MockContext.java
index 778d486..fdab692 100644
--- a/samza-core/src/test/java/org/apache/samza/context/MockContext.java
+++ b/samza-core/src/test/java/org/apache/samza/context/MockContext.java
@@ -22,6 +22,8 @@ package org.apache.samza.context;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 
+import java.util.Collections;
+
 import static org.mockito.Mockito.*;
 
 
@@ -36,7 +38,9 @@ public class MockContext implements Context {
   private final ApplicationTaskContext applicationTaskContext = 
mock(ApplicationTaskContext.class);
 
   public MockContext() {
-    this(new MapConfig());
+    this(new MapConfig(
+        Collections.singletonMap("metrics.timer.debug.enabled", "true")
+    ));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/7b0082b8/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java 
b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
index b3b0a5d..90e59ae 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
@@ -311,6 +311,7 @@ public class TestJoinOperator {
             new SystemStreamPartition("insystem", "instream2", new 
Partition(0))));
     when(context.getTaskContext().getTaskModel()).thenReturn(taskModel);
     when(context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new 
MetricsRegistryMap());
+    
when(context.getContainerContext().getContainerMetricsRegistry()).thenReturn(new
 MetricsRegistryMap());
     // need to return different stores for left and right side
     IntegerSerde integerSerde = new IntegerSerde();
     TimestampedValueSerde timestampedValueSerde = new 
TimestampedValueSerde(new KVSerde(integerSerde, integerSerde));

http://git-wip-us.apache.org/repos/asf/samza/blob/7b0082b8/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
index 0ff2e0d..d5d7f3d 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -53,6 +53,7 @@ public class TestOperatorImpl {
     this.context = new MockContext();
     
when(this.context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new 
MetricsRegistryMap());
     
when(this.context.getTaskContext().getTaskModel()).thenReturn(mock(TaskModel.class));
+    
when(this.context.getContainerContext().getContainerMetricsRegistry()).thenReturn(new
 MetricsRegistryMap());
   }
 
   @Test(expected = IllegalStateException.class)
@@ -100,7 +101,7 @@ public class TestOperatorImpl {
   @Test
   public void testOnMessageUpdatesMetrics() {
     ReadableMetricsRegistry mockMetricsRegistry = 
mock(ReadableMetricsRegistry.class);
-    
when(this.context.getTaskContext().getTaskMetricsRegistry()).thenReturn(mockMetricsRegistry);
+    
when(this.context.getContainerContext().getContainerMetricsRegistry()).thenReturn(mockMetricsRegistry);
     Counter mockCounter = mock(Counter.class);
     Timer mockTimer = mock(Timer.class);
     when(mockMetricsRegistry.newCounter(anyString(), 
anyString())).thenReturn(mockCounter);
@@ -156,7 +157,7 @@ public class TestOperatorImpl {
   @Test
   public void testOnTimerUpdatesMetrics() {
     ReadableMetricsRegistry mockMetricsRegistry = 
mock(ReadableMetricsRegistry.class);
-    
when(this.context.getTaskContext().getTaskMetricsRegistry()).thenReturn(mockMetricsRegistry);
+    
when(this.context.getContainerContext().getContainerMetricsRegistry()).thenReturn(mockMetricsRegistry);
     Counter mockMessageCounter = mock(Counter.class);
     Timer mockTimer = mock(Timer.class);
     when(mockMetricsRegistry.newCounter(anyString(), 
anyString())).thenReturn(mockMessageCounter);

http://git-wip-us.apache.org/repos/asf/samza/blob/7b0082b8/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
index 2da5858..5f1b7e6 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@ -97,6 +97,7 @@ public class TestOperatorImplGraph {
     when(taskModel.getTaskName()).thenReturn(new TaskName("task 0"));
     when(this.context.getTaskContext().getTaskModel()).thenReturn(taskModel);
     
when(this.context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new 
MetricsRegistryMap());
+    
when(this.context.getContainerContext().getContainerMetricsRegistry()).thenReturn(new
 MetricsRegistryMap());
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/samza/blob/7b0082b8/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
index eae1ef4..0b240f1 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
@@ -99,6 +99,7 @@ public class TestWindowOperator {
     when(taskModel.getTaskName()).thenReturn(new TaskName("task 1"));
     when(this.context.getTaskContext().getTaskModel()).thenReturn(taskModel);
     
when(this.context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new 
MetricsRegistryMap());
+    
when(this.context.getContainerContext().getContainerMetricsRegistry()).thenReturn(new
 MetricsRegistryMap());
     when(this.context.getTaskContext().getStore("jobName-jobId-window-w1"))
         .thenReturn(new TestInMemoryStore<>(storeKeySerde, storeValSerde));
   }

Reply via email to