HADOOP-15008.  Fixed period unit calculation for Hadoop Metrics V2.  
(Contribute by Erik Krogen)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1b68b8ff
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1b68b8ff
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1b68b8ff

Branch: refs/heads/YARN-5881
Commit: 1b68b8ff2c6d4704f748d47fc0b903636f3e98c7
Parents: 975a57a
Author: Eric Yang <ey...@apache.org>
Authored: Mon Nov 13 12:40:45 2017 -0500
Committer: Eric Yang <ey...@apache.org>
Committed: Mon Nov 13 12:42:43 2017 -0500

----------------------------------------------------------------------
 .../metrics2/impl/MetricsSinkAdapter.java       | 12 ++---
 .../hadoop/metrics2/impl/MetricsSystemImpl.java |  7 ++-
 .../metrics2/impl/TestMetricsSystemImpl.java    | 49 ++++++++++++++++++++
 3 files changed, 61 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b68b8ff/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
index 1199ebd..f2e607b 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
@@ -51,7 +51,7 @@ class MetricsSinkAdapter implements 
SinkQueue.Consumer<MetricsBuffer> {
   private final Thread sinkThread;
   private volatile boolean stopping = false;
   private volatile boolean inError = false;
-  private final int period, firstRetryDelay, retryCount;
+  private final int periodMs, firstRetryDelay, retryCount;
   private final long oobPutTimeout;
   private final float retryBackoff;
   private final MetricsRegistry registry = new MetricsRegistry("sinkadapter");
@@ -62,7 +62,7 @@ class MetricsSinkAdapter implements 
SinkQueue.Consumer<MetricsBuffer> {
   MetricsSinkAdapter(String name, String description, MetricsSink sink,
                      String context, MetricsFilter sourceFilter,
                      MetricsFilter recordFilter, MetricsFilter metricFilter,
-                     int period, int queueCapacity, int retryDelay,
+                     int periodMs, int queueCapacity, int retryDelay,
                      float retryBackoff, int retryCount) {
     this.name = checkNotNull(name, "name");
     this.description = description;
@@ -71,7 +71,7 @@ class MetricsSinkAdapter implements 
SinkQueue.Consumer<MetricsBuffer> {
     this.sourceFilter = sourceFilter;
     this.recordFilter = recordFilter;
     this.metricFilter = metricFilter;
-    this.period = checkArg(period, period > 0, "period");
+    this.periodMs = checkArg(periodMs, periodMs > 0, "period");
     firstRetryDelay = checkArg(retryDelay, retryDelay > 0, "retry delay");
     this.retryBackoff = checkArg(retryBackoff, retryBackoff>1, "retry 
backoff");
     oobPutTimeout = (long)
@@ -93,9 +93,9 @@ class MetricsSinkAdapter implements 
SinkQueue.Consumer<MetricsBuffer> {
     sinkThread.setDaemon(true);
   }
 
-  boolean putMetrics(MetricsBuffer buffer, long logicalTime) {
-    if (logicalTime % period == 0) {
-      LOG.debug("enqueue, logicalTime="+ logicalTime);
+  boolean putMetrics(MetricsBuffer buffer, long logicalTimeMs) {
+    if (logicalTimeMs % periodMs == 0) {
+      LOG.debug("enqueue, logicalTime="+ logicalTimeMs);
       if (queue.enqueue(buffer)) {
         refreshQueueSizeGauge();
         return true;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b68b8ff/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
index ee1672e..624edc9 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
@@ -519,7 +519,7 @@ public class MetricsSystemImpl extends MetricsSystem 
implements MetricsSource {
         conf.getFilter(SOURCE_FILTER_KEY),
         conf.getFilter(RECORD_FILTER_KEY),
         conf.getFilter(METRIC_FILTER_KEY),
-        conf.getInt(PERIOD_KEY, PERIOD_DEFAULT),
+        conf.getInt(PERIOD_KEY, PERIOD_DEFAULT) * 1000,
         conf.getInt(QUEUE_CAPACITY_KEY, QUEUE_CAPACITY_DEFAULT),
         conf.getInt(RETRY_DELAY_KEY, RETRY_DELAY_DEFAULT),
         conf.getFloat(RETRY_BACKOFF_KEY, RETRY_BACKOFF_DEFAULT),
@@ -618,6 +618,11 @@ public class MetricsSystemImpl extends MetricsSystem 
implements MetricsSource {
     return sources.get(name);
   }
 
+  @VisibleForTesting
+  public MetricsSinkAdapter getSinkAdapter(String name) {
+    return sinks.get(name);
+  }
+
   private InitMode initMode() {
     LOG.debug("from system property: "+ System.getProperty(MS_INIT_MODE_KEY));
     LOG.debug("from environment variable: "+ System.getenv(MS_INIT_MODE_KEY));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b68b8ff/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
index abd1b13..f3a2553 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
@@ -39,10 +39,12 @@ import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
 import com.google.common.base.Predicate;
+import com.google.common.base.Supplier;
 import com.google.common.collect.Iterables;
 
 import org.apache.commons.configuration2.SubsetConfiguration;
 import org.apache.hadoop.metrics2.MetricsException;
+import org.apache.hadoop.test.GenericTestUtils;
 import static org.apache.hadoop.test.MoreAsserts.*;
 
 import org.apache.hadoop.metrics2.AbstractMetric;
@@ -78,8 +80,11 @@ public class TestMetricsSystemImpl {
 
   public static class TestSink implements MetricsSink {
 
+    private List<Iterable<AbstractMetric>> metricValues = new ArrayList<>();
+
     @Override public void putMetrics(MetricsRecord record) {
       LOG.debug(record.toString());
+      metricValues.add(record.metrics());
     }
 
     @Override public void flush() {}
@@ -87,6 +92,10 @@ public class TestMetricsSystemImpl {
     @Override public void init(SubsetConfiguration conf) {
       LOG.debug(MetricsConfig.toString(conf));
     }
+
+    List<Iterable<AbstractMetric>> getMetricValues() {
+      return metricValues;
+    }
   }
 
   @Test public void testInitFirstVerifyStopInvokedImmediately() throws 
Exception {
@@ -559,6 +568,46 @@ public class TestMetricsSystemImpl {
     ms.shutdown();
   }
 
+  @Test
+  public void testRegisterSinksMultiplePeriods() throws Exception {
+    new ConfigBuilder().add("test.sink.test1.period", 100000)
+        .add("test.sink.test1.class", TestSink.class.getName())
+        .add("test.sink.test2.period", 200000)
+        .add("test.sink.test2.class", TestSink.class.getName())
+        .save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test"));
+    MetricsSystemImpl ms = new MetricsSystemImpl();
+    try {
+      ms.init("test");
+      TestSink sink1 = (TestSink) ms.getSinkAdapter("test1").sink();
+      TestSink sink2 = (TestSink) ms.getSinkAdapter("test2").sink();
+      assertEquals(0, sink1.getMetricValues().size());
+      assertEquals(0, sink2.getMetricValues().size());
+      ms.onTimerEvent();
+      // Give some time for the publish event to go through
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          return sink1.getMetricValues().size() > 0;
+        }
+      }, 10, 10000);
+      assertEquals(1, sink1.getMetricValues().size());
+      assertEquals(0, sink2.getMetricValues().size());
+      ms.onTimerEvent();
+      // Give some time for the publish event to go through
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          return sink1.getMetricValues().size() > 1 &&
+              sink2.getMetricValues().size() > 0;
+        }
+      }, 10, 10000);
+      assertEquals(2, sink1.getMetricValues().size());
+      assertEquals(1, sink2.getMetricValues().size());
+    } finally {
+      ms.shutdown();
+    }
+  }
+
   @Metrics(context="test")
   private static class TestSource {
     @Metric("C1 desc") MutableCounterLong c1;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to