HDFS-12044. Mismatch between BlockManager.maxReplicationStreams and 
ErasureCodingWorker.stripedReconstructionPool pool size causes slow and bursty 
recovery. (Contributed by Lei (Eddy) Xu)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/77791e4c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/77791e4c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/77791e4c

Branch: refs/heads/YARN-5734
Commit: 77791e4c36ddc9305306c83806bf486d4d32575d
Parents: 9ea01fd
Author: Lei Xu <l...@cloudera.com>
Authored: Fri Jul 28 10:49:23 2017 -0700
Committer: Lei Xu <l...@cloudera.com>
Committed: Fri Jul 28 10:50:49 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSUtilClient.java   | 23 +++++-
 .../hadoop/hdfs/server/datanode/DataNode.java   | 21 ++++++
 .../erasurecode/ErasureCodingWorker.java        | 15 +++-
 .../erasurecode/StripedBlockReconstructor.java  |  3 +-
 .../datanode/erasurecode/StripedReader.java     | 20 ++++++
 .../erasurecode/StripedReconstructionInfo.java  | 15 ++++
 .../erasurecode/StripedReconstructor.java       |  8 ++-
 .../hadoop/hdfs/TestReconstructStripedFile.java | 74 ++++++++++++++++++--
 8 files changed, 169 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/77791e4c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
index 2e770cc..e7cd0d8 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
@@ -83,6 +83,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -811,10 +812,30 @@ public class DFSUtilClient {
   public static ThreadPoolExecutor getThreadPoolExecutor(int corePoolSize,
       int maxPoolSize, long keepAliveTimeSecs, String threadNamePrefix,
       boolean runRejectedExec) {
+    return getThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTimeSecs,
+        new SynchronousQueue<>(), threadNamePrefix, runRejectedExec);
+}
+
+  /**
+   * Utility to create a {@link ThreadPoolExecutor}.
+   *
+   * @param corePoolSize - min threads in the pool, even if idle
+   * @param maxPoolSize - max threads in the pool
+   * @param keepAliveTimeSecs - max seconds beyond which excess idle threads
+   *        will be terminated
+   * @param queue - the queue to use for holding tasks before they are 
executed.
+   * @param threadNamePrefix - name prefix for the pool threads
+   * @param runRejectedExec - when true, rejected tasks from
+   *        ThreadPoolExecutor are run in the context of calling thread
+   * @return ThreadPoolExecutor
+   */
+  public static ThreadPoolExecutor getThreadPoolExecutor(int corePoolSize,
+      int maxPoolSize, long keepAliveTimeSecs, BlockingQueue<Runnable> queue,
+      String threadNamePrefix, boolean runRejectedExec) {
     Preconditions.checkArgument(corePoolSize > 0);
     ThreadPoolExecutor threadPoolExecutor = new 
ThreadPoolExecutor(corePoolSize,
         maxPoolSize, keepAliveTimeSecs, TimeUnit.SECONDS,
-        new SynchronousQueue<Runnable>(), new Daemon.DaemonFactory() {
+        queue, new Daemon.DaemonFactory() {
           private final AtomicInteger threadIndex = new AtomicInteger(0);
 
           @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77791e4c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 2730393..6069487 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -2204,12 +2204,33 @@ public class DataNode extends ReconfigurableBase
   }
 
   /**
+   * Increments the xmitInProgress count by given value.
+   *
+   * @param delta the amount of xmitsInProgress to increase.
+   * @see #incrementXmitsInProgress()
+   */
+  public void incrementXmitsInProcess(int delta) {
+    Preconditions.checkArgument(delta >= 0);
+    xmitsInProgress.getAndAdd(delta);
+  }
+
+  /**
    * Decrements the xmitsInProgress count
    */
   public void decrementXmitsInProgress() {
     xmitsInProgress.getAndDecrement();
   }
 
+  /**
+   * Decrements the xmitsInProgress count by given value.
+   *
+   * @see #decrementXmitsInProgress()
+   */
+  public void decrementXmitsInProgress(int delta) {
+    Preconditions.checkArgument(delta >= 0);
+    xmitsInProgress.getAndAdd(0 - delta);
+  }
+
   private void reportBadBlock(final BPOfferService bpos,
       final ExtendedBlock block, final String msg) {
     FsVolumeSpi volume = getFSDataset().getVolume(block);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77791e4c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
index e076dda..72c224f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.util.Daemon;
 import org.slf4j.Logger;
 
 import java.util.Collection;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -93,7 +94,8 @@ public final class ErasureCodingWorker {
     LOG.debug("Using striped block reconstruction; pool threads={}",
         numThreads);
     stripedReconstructionPool = DFSUtilClient.getThreadPoolExecutor(2,
-        numThreads, 60, "StripedBlockReconstruction-", false);
+        numThreads, 60, new LinkedBlockingQueue<>(),
+        "StripedBlockReconstruction-", false);
     stripedReconstructionPool.allowCoreThreadTimeOut(true);
   }
 
@@ -106,6 +108,7 @@ public final class ErasureCodingWorker {
   public void processErasureCodingTasks(
       Collection<BlockECReconstructionInfo> ecTasks) {
     for (BlockECReconstructionInfo reconInfo : ecTasks) {
+      int xmitsSubmitted = 0;
       try {
         StripedReconstructionInfo stripedReconInfo =
             new StripedReconstructionInfo(
@@ -113,15 +116,25 @@ public final class ErasureCodingWorker {
             reconInfo.getLiveBlockIndices(), reconInfo.getSourceDnInfos(),
             reconInfo.getTargetDnInfos(), reconInfo.getTargetStorageTypes(),
             reconInfo.getTargetStorageIDs());
+        // It may throw IllegalArgumentException from task#stripedReader
+        // constructor.
         final StripedBlockReconstructor task =
             new StripedBlockReconstructor(this, stripedReconInfo);
         if (task.hasValidTargets()) {
+          // See HDFS-12044. We increase xmitsInProgress even the task is only
+          // enqueued, so that
+          //   1) NN will not send more tasks than what DN can execute and
+          //   2) DN will not throw away reconstruction tasks, and instead 
keeps
+          //      an unbounded number of tasks in the executor's task queue.
+          xmitsSubmitted = task.getXmits();
+          getDatanode().incrementXmitsInProcess(xmitsSubmitted);
           stripedReconstructionPool.submit(task);
         } else {
           LOG.warn("No missing internal block. Skip reconstruction for 
task:{}",
               reconInfo);
         }
       } catch (Throwable e) {
+        getDatanode().decrementXmitsInProgress(xmitsSubmitted);
         LOG.warn("Failed to reconstruct striped block {}",
             reconInfo.getExtendedBlock().getLocalBlock(), e);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77791e4c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
index 1119bbb..bac013a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
@@ -48,7 +48,6 @@ class StripedBlockReconstructor extends StripedReconstructor
 
   @Override
   public void run() {
-    getDatanode().incrementXmitsInProgress();
     try {
       initDecoderIfNecessary();
 
@@ -66,7 +65,7 @@ class StripedBlockReconstructor extends StripedReconstructor
       LOG.warn("Failed to reconstruct striped block: {}", getBlockGroup(), e);
       getDatanode().getMetrics().incrECFailedReconstructionTasks();
     } finally {
-      getDatanode().decrementXmitsInProgress();
+      getDatanode().decrementXmitsInProgress(getXmits());
       final DataNodeMetrics metrics = getDatanode().getMetrics();
       metrics.incrECReconstructionTasks();
       metrics.incrECReconstructionBytesRead(getBytesRead());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77791e4c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
index f6f343a..96f9791 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
@@ -68,6 +68,8 @@ class StripedReader {
   private int[] successList;
 
   private final int minRequiredSources;
+  // the number of xmits used by the re-construction task.
+  private final int xmits;
   // The buffers and indices for striped blocks whose length is 0
   private ByteBuffer[] zeroStripeBuffers;
   private short[] zeroStripeIndices;
@@ -107,6 +109,12 @@ class StripedReader {
       zeroStripeIndices = new short[zeroStripNum];
     }
 
+    // It is calculated by the maximum number of connections from either 
sources
+    // or targets.
+    xmits = Math.max(minRequiredSources,
+        stripedReconInfo.getTargets() != null ?
+        stripedReconInfo.getTargets().length : 0);
+
     this.liveIndices = stripedReconInfo.getLiveIndices();
     assert liveIndices != null;
     this.sources = stripedReconInfo.getSources();
@@ -472,4 +480,16 @@ class StripedReader {
   CachingStrategy getCachingStrategy() {
     return reconstructor.getCachingStrategy();
   }
+
+  /**
+   * Return the xmits of this EC reconstruction task.
+   * <p>
+   * DN uses it to coordinate with NN to adjust the speed of scheduling the
+   * EC reconstruction tasks to this DN.
+   *
+   * @return the xmits of this reconstruction task.
+   */
+  int getXmits() {
+    return xmits;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77791e4c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java
index a619c34..0a3e125 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java
@@ -103,5 +103,20 @@ public class StripedReconstructionInfo {
   String[] getTargetStorageIds() {
     return targetStorageIds;
   }
+
+  /**
+   * Return the weight of this EC reconstruction task.
+   *
+   * DN uses it to coordinate with NN to adjust the speed of scheduling the
+   * reconstructions tasks to this DN.
+   *
+   * @return the weight of this reconstruction task.
+   * @see HDFS-12044
+   */
+  int getWeight() {
+    // See HDFS-12044. The weight of a RS(n, k) is calculated by the network
+    // connections it opens.
+    return sources.length + targets.length;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77791e4c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
index b8433c7..3202121 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
@@ -133,7 +133,6 @@ abstract class StripedReconstructor {
     }
     blockGroup = stripedReconInfo.getBlockGroup();
     stripedReader = new StripedReader(this, datanode, conf, stripedReconInfo);
-
     cachingStrategy = CachingStrategy.newDefaultStrategy();
 
     positionInBlock = 0L;
@@ -233,6 +232,13 @@ abstract class StripedReconstructor {
     return blockGroup;
   }
 
+  /**
+   * Get the xmits that _will_ be used for this reconstruction task.
+   */
+  int getXmits() {
+    return stripedReader.getXmits();
+  }
+
   BitSet getLiveBitSet() {
     return liveBitSet;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77791e4c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
index 29e4028..7cd34c2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -44,6 +45,7 @@ import 
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import 
org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
@@ -81,6 +83,7 @@ public class TestReconstructStripedFile {
     Any
   }
 
+  private Configuration conf;
   private MiniDFSCluster cluster;
   private DistributedFileSystem fs;
   // Map: DatanodeID -> datanode index in cluster
@@ -89,7 +92,7 @@ public class TestReconstructStripedFile {
 
   @Before
   public void setup() throws IOException {
-    final Configuration conf = new Configuration();
+    conf = new Configuration();
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
     conf.setInt(
         DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY,
@@ -263,6 +266,14 @@ public class TestReconstructStripedFile {
     return stoppedDNs;
   }
 
+  private static void writeFile(DistributedFileSystem fs, String fileName,
+      int fileLen) throws Exception {
+    final byte[] data = new byte[fileLen];
+    Arrays.fill(data, (byte) 1);
+    DFSTestUtil.writeFile(fs, new Path(fileName), data);
+    StripedFileTestUtil.waitBlockGroupsReported(fs, fileName);
+  }
+
   /**
    * Test the file blocks reconstruction.
    * 1. Check the replica is reconstructed in the target datanode,
@@ -278,10 +289,7 @@ public class TestReconstructStripedFile {
 
     Path file = new Path(fileName);
 
-    final byte[] data = new byte[fileLen];
-    Arrays.fill(data, (byte) 1);
-    DFSTestUtil.writeFile(fs, file, data);
-    StripedFileTestUtil.waitBlockGroupsReported(fs, fileName);
+    writeFile(fs, fileName, fileLen);
 
     LocatedBlocks locatedBlocks =
         StripedFileTestUtil.getLocatedBlocks(file, fs);
@@ -424,4 +432,60 @@ public class TestReconstructStripedFile {
     ecTasks.add(invalidECInfo);
     dataNode.getErasureCodingWorker().processErasureCodingTasks(ecTasks);
   }
+
+  // HDFS-12044
+  @Test(timeout = 60000)
+  public void testNNSendsErasureCodingTasks() throws Exception {
+    testNNSendsErasureCodingTasks(1);
+    testNNSendsErasureCodingTasks(2);
+  }
+
+  private void testNNSendsErasureCodingTasks(int deadDN) throws Exception {
+    cluster.shutdown();
+
+    final int numDataNodes = dnNum  + 1;
+    conf.setInt(
+        DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, 10);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 20);
+    conf.setInt(DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_BLK_THREADS_KEY,
+        2);
+    cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(numDataNodes).build();
+    cluster.waitActive();
+    fs = cluster.getFileSystem();
+    ErasureCodingPolicy policy = StripedFileTestUtil.getDefaultECPolicy();
+    fs.getClient().setErasureCodingPolicy("/", policy.getName());
+
+    final int fileLen = cellSize * ecPolicy.getNumDataUnits() * 2;
+    for (int i = 0; i < 100; i++) {
+      writeFile(fs, "/ec-file-" + i, fileLen);
+    }
+
+    // Inject data-loss by tear down desired number of DataNodes.
+    assertTrue(policy.getNumParityUnits() >= deadDN);
+    List<DataNode> dataNodes = new ArrayList<>(cluster.getDataNodes());
+    Collections.shuffle(dataNodes);
+    for (DataNode dn : dataNodes.subList(0, deadDN)) {
+      shutdownDataNode(dn);
+    }
+
+    final FSNamesystem ns = cluster.getNamesystem();
+    GenericTestUtils.waitFor(() -> ns.getPendingDeletionBlocks() == 0,
+        500, 30000);
+
+    // Make sure that all pending reconstruction tasks can be processed.
+    while (ns.getPendingReconstructionBlocks() > 0) {
+      long timeoutPending = ns.getNumTimedOutPendingReconstructions();
+      assertTrue(String.format("Found %d timeout pending reconstruction tasks",
+          timeoutPending), timeoutPending == 0);
+      Thread.sleep(1000);
+    }
+
+    // Verify all DN reaches zero xmitsInProgress.
+    GenericTestUtils.waitFor(() ->
+        cluster.getDataNodes().stream().mapToInt(
+            DataNode::getXmitsInProgress).sum() == 0,
+        500, 30000
+    );
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to