Repository: hadoop
Updated Branches:
refs/heads/branch-2.8 9baeae936 -> 4c8131b1b
HDFS-9612. DistCp worker threads are not terminated after jobs are done.
(Wei-Chiu Chuang via Yongjun Zhang)
(cherry picked from commit a9c69ebeb707801071db3cc22bfcd14f87be443a)
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4c8131b1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4c8131b1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4c8131b1
Branch: refs/heads/branch-2.8
Commit: 4c8131b1bf8ead0722b7ec005bceb3ea16ef4f6f
Parents: 9baeae9
Author: Yongjun Zhang <[email protected]>
Authored: Fri Jan 15 10:03:09 2016 -0800
Committer: Yongjun Zhang <[email protected]>
Committed: Fri Jan 15 10:12:15 2016 -0800
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../hadoop/tools/util/ProducerConsumer.java | 55 +++++++++----
.../hadoop/tools/util/WorkRequestProcessor.java | 2 +
.../hadoop/tools/util/TestProducerConsumer.java | 84 ++++++++++++++++++++
4 files changed, 130 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c8131b1/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index b9762d9..564b792 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1657,6 +1657,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9493. Test o.a.h.hdfs.server.namenode.TestMetaSave fails in trunk.
(Tony Wu via lei)
+ HDFS-9612. DistCp worker threads are not terminated after jobs are done.
+ (Wei-Chiu Chuang via Yongjun Zhang)
+
Release 2.7.3 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c8131b1/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ProducerConsumer.java
----------------------------------------------------------------------
diff --git
a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ProducerConsumer.java
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ProducerConsumer.java
index 16bf254..906e1ea 100644
---
a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ProducerConsumer.java
+++
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ProducerConsumer.java
@@ -70,7 +70,10 @@ public class ProducerConsumer<T, R> {
* completion of any pending work.
*/
public void shutdown() {
- executor.shutdown();
+ if (hasWork()) {
+ LOG.warn("Shutdown() is called but there are still unprocessed work!");
+ }
+ executor.shutdownNow();
}
/**
@@ -117,6 +120,8 @@ public class ProducerConsumer<T, R> {
/**
* Blocking take from ProducerConsumer output queue that can be interrupted.
*
+ * @throws InterruptedException if interrupted before an element becomes
+ * available.
* @return item returned by processor's processItem().
*/
public WorkReport<R> take() throws InterruptedException {
@@ -143,30 +148,52 @@ public class ProducerConsumer<T, R> {
}
}
+ /**
+ * Worker thread implementation.
+ *
+ */
private class Worker implements Runnable {
private WorkRequestProcessor<T, R> processor;
+ /**
+ * Constructor.
+ * @param processor is used to process an item from input queue.
+ */
public Worker(WorkRequestProcessor<T, R> processor) {
this.processor = processor;
}
+ /**
+ * The worker continuously gets an item from input queue, process it and
+ * then put the processed result into output queue. It waits to get an item
+ * from input queue if there's none.
+ */
public void run() {
while (true) {
+ WorkRequest<T> work;
+
try {
- WorkRequest<T> work = inputQueue.take();
- WorkReport<R> result = processor.processItem(work);
-
- boolean isDone = false;
- while (!isDone) {
- try {
- outputQueue.put(result);
- isDone = true;
- } catch (InterruptedException ie) {
- LOG.debug("Could not put report into outputQueue. Retrying...");
- }
+ work = inputQueue.take();
+ } catch (InterruptedException e) {
+ // It is assumed that if an interrupt occurs while taking a work
+ // out from input queue, the interrupt is likely triggered by
+ // ProducerConsumer.shutdown(). Therefore, exit the thread.
+ LOG.debug("Interrupted while waiting for requests from inputQueue.");
+ return;
+ }
+
+ boolean isDone = false;
+ while (!isDone) {
+ try {
+ // if the interrupt happens while the work is being processed,
+ // go back to process the same work again.
+ WorkReport<R> result = processor.processItem(work);
+ outputQueue.put(result);
+ isDone = true;
+ } catch (InterruptedException ie) {
+ LOG.debug("Worker thread was interrupted while processing an item,"
+ + " or putting into outputQueue. Retrying...");
}
- } catch (InterruptedException ie) {
- LOG.debug("Interrupted while waiting for request from inputQueue.");
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c8131b1/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequestProcessor.java
----------------------------------------------------------------------
diff --git
a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequestProcessor.java
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequestProcessor.java
index 91f738e..6a4c797 100644
---
a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequestProcessor.java
+++
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequestProcessor.java
@@ -26,6 +26,8 @@ public interface WorkRequestProcessor<T, R> {
/**
* Work processor.
+ * The processor should be stateless: that is, it can be repeated after
+ * being interrupted.
*
* @param workRequest Input work item.
* @return Outputs WorkReport after processing workRequest item.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c8131b1/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestProducerConsumer.java
----------------------------------------------------------------------
diff --git
a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestProducerConsumer.java
b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestProducerConsumer.java
index de0fcfd..ea52f69 100644
---
a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestProducerConsumer.java
+++
b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestProducerConsumer.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.tools.util;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.tools.util.ProducerConsumer;
import org.apache.hadoop.tools.util.WorkReport;
import org.apache.hadoop.tools.util.WorkRequest;
@@ -27,6 +28,7 @@ import org.junit.Test;
import java.lang.Exception;
import java.lang.Integer;
+import java.util.concurrent.TimeoutException;
public class TestProducerConsumer {
public class CopyProcessor implements WorkRequestProcessor<Integer, Integer>
{
@@ -64,6 +66,7 @@ public class TestProducerConsumer {
} catch (InterruptedException ie) {
Assert.assertTrue(false);
}
+ worker.shutdown();
}
@Test
@@ -89,6 +92,7 @@ public class TestProducerConsumer {
}
Assert.assertEquals(0, sum);
Assert.assertEquals(numRequests, numReports);
+ workers.shutdown();
}
@Test
@@ -105,5 +109,85 @@ public class TestProducerConsumer {
} catch (InterruptedException ie) {
Assert.assertTrue(false);
}
+ worker.shutdown();
+ }
+
+ @Test
+ public void testSimpleProducerConsumerShutdown() throws InterruptedException,
+ TimeoutException {
+ // create a producer-consumer thread pool with one thread.
+ ProducerConsumer<Integer, Integer> worker =
+ new ProducerConsumer<Integer, Integer>(1);
+ worker.addWorker(new CopyProcessor());
+ // interrupt worker threads
+ worker.shutdown();
+ // Regression test for HDFS-9612
+ // Periodically check, and make sure that worker threads are ultimately
+ // terminated after interrupts
+ GenericTestUtils.waitForThreadTermination("pool-.*-thread.*",100,10000);
+ }
+
+ @Test(timeout=10000)
+ public void testMultipleProducerConsumerShutdown()
+ throws InterruptedException, TimeoutException {
+ int numWorkers = 10;
+ // create a producer consumer thread pool with 10 threads.
+ final ProducerConsumer<Integer, Integer> worker =
+ new ProducerConsumer<Integer, Integer>(numWorkers);
+ for (int i=0; i< numWorkers; i++) {
+ worker.addWorker(new CopyProcessor());
+ }
+
+ // starts two thread: a source thread which put in work, and a sink thread
+ // which takes a piece of work from ProducerConsumer
+ class SourceThread extends Thread {
+ public void run() {
+ while (true) {
+ try {
+ worker.put(new WorkRequest<Integer>(42));
+ Thread.sleep(1);
+ } catch (InterruptedException ie) {
+ return;
+ }
+ }
+ }
+ };
+ // The source thread put requests into producer-consumer.
+ SourceThread source = new SourceThread();
+ source.start();
+ class SinkThread extends Thread {
+ public void run() {
+ try {
+ while (true) {
+ WorkReport<Integer> report = worker.take();
+ Assert.assertEquals(42, report.getItem().intValue());
+ }
+ } catch (InterruptedException ie) {
+ return;
+ }
+ }
+ };
+ // The sink thread gets proceessed items from producer-consumer
+ SinkThread sink = new SinkThread();
+ sink.start();
+ // sleep 1 second and then shut down source.
+ // This makes sure producer consumer gets some work to do
+ Thread.sleep(1000);
+ // after 1 second, stop source thread to stop pushing items.
+ source.interrupt();
+ // wait until all work is consumed by sink
+ while (worker.hasWork()) {
+ Thread.sleep(1);
+ }
+ worker.shutdown();
+ // Regression test for HDFS-9612
+ // make sure worker threads are terminated after workers are asked to
+ // shutdown.
+ GenericTestUtils.waitForThreadTermination("pool-.*-thread.*",100,10000);
+
+ sink.interrupt();
+
+ source.join();
+ sink.join();
}
}