HADOOP-11932. MetricsSinkAdapter may hang  when being stopped. Contributed by 
Brahma Reddy Battula
(cherry picked from commit f59612edd74d1bef2b60870c24c1f67c56b2b3cb)

(cherry picked from commit 5950c1f6f8ac6f514f8d2e8bfbd1f71747b097de)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d8ddfea4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d8ddfea4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d8ddfea4

Branch: refs/heads/branch-2.6.1
Commit: d8ddfea491a7ccf5c074eb339f06015186c0d2b0
Parents: ca7fe71
Author: Jian He <[email protected]>
Authored: Wed Aug 5 16:12:45 2015 -0700
Committer: Vinod Kumar Vavilapalli <[email protected]>
Committed: Thu Sep 3 14:45:04 2015 -0700

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |  3 +
 .../metrics2/impl/MetricsSinkAdapter.java       |  6 +-
 .../metrics2/impl/TestMetricsSystemImpl.java    | 60 ++++++++++++++++++++
 3 files changed, 66 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8ddfea4/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt 
b/hadoop-common-project/hadoop-common/CHANGES.txt
index c222b8e..57c7eae 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -69,6 +69,9 @@ Release 2.6.1 - UNRELEASED
     HADOOP-8151. Error handling in snappy decompressor throws invalid
     exceptions. (Matt Foley via harsh)
 
+    HADOOP-11932. MetricsSinkAdapter may hang  when being stopped.
+    (Brahma Reddy Battula via jianhe)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8ddfea4/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
index de39a13..da24b8e 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
@@ -198,15 +198,15 @@ class MetricsSinkAdapter implements 
SinkQueue.Consumer<MetricsBuffer> {
   void stop() {
     stopping = true;
     sinkThread.interrupt();
+    if (sink instanceof Closeable) {
+      IOUtils.cleanup(LOG, (Closeable)sink);
+    }
     try {
       sinkThread.join();
     }
     catch (InterruptedException e) {
       LOG.warn("Stop interrupted", e);
     }
-    if (sink instanceof Closeable) {
-      IOUtils.cleanup(LOG, (Closeable)sink);
-    }
   }
 
   String name() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8ddfea4/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
index d59e80b..b5ebb93 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.metrics2.impl;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
@@ -434,6 +436,64 @@ public class TestMetricsSystemImpl {
                new MetricGaugeInt(MsInfo.NumActiveSinks, 3)));
   }
 
+  /**
+   * Class to verify HADOOP-11932. Instead of reading from HTTP, going in loop
+   * until closed.
+   */
+  private static class TestClosableSink implements MetricsSink, Closeable {
+
+    boolean closed = false;
+    CountDownLatch collectingLatch;
+
+    public TestClosableSink(CountDownLatch collectingLatch) {
+      this.collectingLatch = collectingLatch;
+    }
+
+    @Override
+    public void init(SubsetConfiguration conf) {
+    }
+
+    @Override
+    public void close() throws IOException {
+      closed = true;
+    }
+
+    @Override
+    public void putMetrics(MetricsRecord record) {
+      while (!closed) {
+        collectingLatch.countDown();
+      }
+    }
+
+    @Override
+    public void flush() {
+    }
+  }
+
+  /**
+   * HADOOP-11932
+   */
+  @Test(timeout = 5000)
+  public void testHangOnSinkRead() throws Exception {
+    new ConfigBuilder().add("*.period", 8)
+        .add("test.sink.test.class", TestSink.class.getName())
+        .save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test"));
+    MetricsSystemImpl ms = new MetricsSystemImpl("Test");
+    ms.start();
+    try {
+      CountDownLatch collectingLatch = new CountDownLatch(1);
+      MetricsSink sink = new TestClosableSink(collectingLatch);
+      ms.registerSink("closeableSink",
+          "The sink will be used to test closeability", sink);
+      // trigger metric collection first time
+      ms.onTimerEvent();
+      // Make sure that sink is collecting metrics
+      assertTrue(collectingLatch.await(1, TimeUnit.SECONDS));
+    } finally {
+      ms.stop();
+    }
+  }
+
   @Metrics(context="test")
   private static class TestSource {
     @Metric("C1 desc") MutableCounterLong c1;

Reply via email to