HADOOP-9087. Queue size metric for metric sinks isn't actually maintained. Contributed by Akira AJISAKA
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f0f29926 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f0f29926 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f0f29926 Branch: refs/heads/YARN-2928 Commit: f0f299268625af275522f55d5bfc43118c31bdd8 Parents: 2fd02af Author: Jason Lowe <jl...@apache.org> Authored: Thu Feb 19 17:30:07 2015 +0000 Committer: Jason Lowe <jl...@apache.org> Committed: Thu Feb 19 17:30:07 2015 +0000 ---------------------------------------------------------------------- hadoop-common-project/hadoop-common/CHANGES.txt | 3 ++ .../metrics2/impl/MetricsSinkAdapter.java | 15 +++++- .../hadoop-common/src/site/markdown/Metrics.md | 2 +- .../metrics2/impl/TestMetricsSystemImpl.java | 50 ++++++++++++++++++++ 4 files changed, 67 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0f29926/hadoop-common-project/hadoop-common/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index c01e3d6..8d3f9f5 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -973,6 +973,9 @@ Release 2.7.0 - UNRELEASED HADOOP-11595. Add default implementation for AbstractFileSystem#truncate. (yliu) + HADOOP-9087. Queue size metric for metric sinks isn't actually maintained + (Akira AJISAKA via jlowe) + Release 2.6.1 - UNRELEASED INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0f29926/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java index 9add494..ed52317 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java @@ -95,7 +95,10 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> { boolean putMetrics(MetricsBuffer buffer, long logicalTime) { if (logicalTime % period == 0) { LOG.debug("enqueue, logicalTime="+ logicalTime); - if (queue.enqueue(buffer)) return true; + if (queue.enqueue(buffer)) { + refreshQueueSizeGauge(); + return true; + } dropped.incr(); return false; } @@ -105,7 +108,9 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> { public boolean putMetricsImmediate(MetricsBuffer buffer) { WaitableMetricsBuffer waitableBuffer = new WaitableMetricsBuffer(buffer); - if (!queue.enqueue(waitableBuffer)) { + if (queue.enqueue(waitableBuffer)) { + refreshQueueSizeGauge(); + } else { LOG.warn(name + " has a full queue and can't consume the given metrics."); dropped.incr(); return false; @@ -127,6 +132,7 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> { while (!stopping) { try { queue.consumeAll(this); + refreshQueueSizeGauge(); retryDelay = firstRetryDelay; n = retryCount; inError = false; @@ -151,12 +157,17 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> { "suppressing further error messages", e); } queue.clear(); + refreshQueueSizeGauge(); inError = true; // Don't keep complaining ad infinitum } } } } + private void refreshQueueSizeGauge() { + qsize.set(queue.size()); + } + @Override public void consume(MetricsBuffer buffer) { long ts = 0; http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0f29926/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md index dbcf0d8..6953c3b 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md @@ -434,7 +434,7 @@ MetricsSystem shows the statistics for metrics snapshots and publishes. Each met | `Sink_`*instance*`NumOps` | Total number of sink operations for the *instance* | | `Sink_`*instance*`AvgTime` | Average time in milliseconds of sink operations for the *instance* | | `Sink_`*instance*`Dropped` | Total number of dropped sink operations for the *instance* | -| `Sink_`*instance*`Qsize` | Current queue length of sink operations  (BUT always set to 0 because nothing to increment this metrics, see [HADOOP-9941](https://issues.apache.org/jira/browse/HADOOP-9941)) | +| `Sink_`*instance*`Qsize` | Current queue length of sink operations | default context =============== http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0f29926/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java index 4c2ebc8..0f7b15f 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java @@ -29,7 +29,9 @@ import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Captor; +import org.mockito.invocation.InvocationOnMock; import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; import static org.junit.Assert.*; import static org.mockito.Mockito.*; @@ -434,6 +436,54 @@ public class TestMetricsSystemImpl { new MetricGaugeInt(MsInfo.NumActiveSinks, 3))); } + @Test + public void testQSize() throws Exception { + new ConfigBuilder().add("*.period", 8) + .add("test.sink.test.class", TestSink.class.getName()) + .save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test")); + MetricsSystemImpl ms = new MetricsSystemImpl("Test"); + final CountDownLatch proceedSignal = new CountDownLatch(1); + final CountDownLatch reachedPutMetricSignal = new CountDownLatch(1); + ms.start(); + try { + MetricsSink slowSink = mock(MetricsSink.class); + MetricsSink dataSink = mock(MetricsSink.class); + ms.registerSink("slowSink", + "The sink that will wait on putMetric", slowSink); + ms.registerSink("dataSink", + "The sink I'll use to get info about slowSink", dataSink); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + reachedPutMetricSignal.countDown(); + proceedSignal.await(); + return null; + } + }).when(slowSink).putMetrics(any(MetricsRecord.class)); + + // trigger metric collection first time + ms.onTimerEvent(); + assertTrue(reachedPutMetricSignal.await(1, TimeUnit.SECONDS)); + // Now that the slow sink is still processing the first metric, + // its queue length should be 1 for the second collection. + ms.onTimerEvent(); + verify(dataSink, timeout(500).times(2)).putMetrics(r1.capture()); + List<MetricsRecord> mr = r1.getAllValues(); + Number qSize = Iterables.find(mr.get(1).metrics(), + new Predicate<AbstractMetric>() { + @Override + public boolean apply(@Nullable AbstractMetric input) { + assert input != null; + return input.name().equals("Sink_slowSinkQsize"); + } + }).value(); + assertEquals(1, qSize); + } finally { + proceedSignal.countDown(); + ms.stop(); + } + } + @Metrics(context="test") private static class TestSource { @Metric("C1 desc") MutableCounterLong c1;