HDFS-12523. Thread pools in ErasureCodingWorker do not shutdown. (Huafeng Wang via Lei)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1267ff22 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1267ff22 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1267ff22 Branch: refs/heads/YARN-5734 Commit: 1267ff22ce9226b6dd52e3f33cbe3b3094fb0e35 Parents: 9d3e4cc Author: Lei Xu <[email protected]> Authored: Thu Sep 21 16:10:32 2017 -0700 Committer: Lei Xu <[email protected]> Committed: Tue Sep 26 10:08:16 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hdfs/server/datanode/DataNode.java | 6 +++++- .../server/datanode/erasurecode/ErasureCodingWorker.java | 11 +++++++++-- .../datanode/erasurecode/StripedReconstructor.java | 8 +++----- 3 files changed, 17 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1267ff22/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 042a627..6163d93 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1115,7 +1115,7 @@ public class DataNode extends ReconfigurableBase /** * Shutdown disk balancer. */ - private void shutdownDiskBalancer() { + private void shutdownDiskBalancer() { if (this.diskBalancer != null) { this.diskBalancer.shutdown(); this.diskBalancer = null; @@ -2077,6 +2077,10 @@ public class DataNode extends ReconfigurableBase ipcServer.stop(); } + if (ecWorker != null) { + ecWorker.shutDown(); + } + if(blockPoolManager != null) { try { this.blockPoolManager.shutDownAll(bposArray); http://git-wip-us.apache.org/repos/asf/hadoop/blob/1267ff22/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java index 07d213c..63498bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java @@ -27,6 +27,8 @@ import org.apache.hadoop.util.Daemon; import org.slf4j.Logger; import java.util.Collection; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -149,7 +151,12 @@ public final class ErasureCodingWorker { return conf; } - ThreadPoolExecutor getStripedReadPool() { - return stripedReadPool; + CompletionService<Void> createReadService() { + return new ExecutorCompletionService<>(stripedReadPool); + } + + public void shutDown() { + stripedReconstructionPool.shutdown(); + stripedReadPool.shutdown(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1267ff22/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java index 3202121..bbffcf5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java @@ -39,8 +39,6 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.BitSet; import java.util.concurrent.CompletionService; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicLong; /** @@ -110,7 +108,7 @@ abstract class StripedReconstructor { // position in striped internal block private long positionInBlock; private StripedReader stripedReader; - private ThreadPoolExecutor stripedReadPool; + private ErasureCodingWorker erasureCodingWorker; private final CachingStrategy cachingStrategy; private long maxTargetLength = 0L; private final BitSet liveBitSet; @@ -122,7 +120,7 @@ abstract class StripedReconstructor { StripedReconstructor(ErasureCodingWorker worker, StripedReconstructionInfo stripedReconInfo) { - this.stripedReadPool = worker.getStripedReadPool(); + this.erasureCodingWorker = worker; this.datanode = worker.getDatanode(); this.conf = worker.getConf(); this.ecPolicy = stripedReconInfo.getEcPolicy(); @@ -225,7 +223,7 @@ abstract class StripedReconstructor { } CompletionService<Void> createReadService() { - return new ExecutorCompletionService<>(stripedReadPool); + return erasureCodingWorker.createReadService(); } ExtendedBlock getBlockGroup() { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
