This is an automated email from the ASF dual-hosted git repository.

yqlin pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new e6374f0  HDFS-15264. Backport Datanode detection to branch-2.10. 
Contributed by Lisheng Sun.
e6374f0 is described below

commit e6374f031af6fb3a5467ccd12d6a4c8d7b0dae1e
Author: Yiqun Lin <yq...@apache.org>
AuthorDate: Sun May 17 11:59:10 2020 +0800

    HDFS-15264. Backport Datanode detection to branch-2.10. Contributed by 
Lisheng Sun.
---
 .../java/org/apache/hadoop/hdfs/ClientContext.java |  49 ++
 .../java/org/apache/hadoop/hdfs/DFSClient.java     | 115 ++++
 .../org/apache/hadoop/hdfs/DFSInputStream.java     | 100 ++--
 .../org/apache/hadoop/hdfs/DeadNodeDetector.java   | 586 +++++++++++++++++++++
 .../hadoop/hdfs/client/HdfsClientConfigKeys.java   |  39 ++
 .../hadoop/hdfs/client/impl/DfsClientConf.java     |  14 +
 .../src/main/resources/hdfs-default.xml            |  73 +++
 .../apache/hadoop/hdfs/TestDeadNodeDetection.java  | 406 ++++++++++++++
 8 files changed, 1349 insertions(+), 33 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
index a31945c..2cb30f5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.ScriptBasedMapping;
+import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -117,6 +118,19 @@ public class ClientContext {
   private NodeBase clientNode;
   private boolean topologyResolutionEnabled;
 
+  private Daemon deadNodeDetectorThr = null;
+
+  /**
+   * The switch to DeadNodeDetector.
+   */
+  private boolean deadNodeDetectionEnabled = false;
+
+  /**
+   * Detect the dead datanodes in advance, and share this information among all
+   * the DFSInputStreams in the same client.
+   */
+  private DeadNodeDetector deadNodeDetector = null;
+
   private ClientContext(String name, DfsClientConf conf,
       Configuration config) {
     final ShortCircuitConf scConf = conf.getShortCircuitConf();
@@ -133,6 +147,12 @@ public class ClientContext {
 
     this.byteArrayManager = ByteArrayManager.newInstance(
         conf.getWriteByteArrayManagerConf());
+    this.deadNodeDetectionEnabled = conf.isDeadNodeDetectionEnabled();
+    if (deadNodeDetectionEnabled && deadNodeDetector == null) {
+      deadNodeDetector = new DeadNodeDetector(name, config);
+      deadNodeDetectorThr = new Daemon(deadNodeDetector);
+      deadNodeDetectorThr.start();
+    }
     initTopologyResolution(config);
   }
 
@@ -250,4 +270,33 @@ public class ClientContext {
         datanodeInfo.getNetworkLocation());
     return NetworkTopology.getDistanceByPath(clientNode, node);
   }
+
+  /**
+   * The switch to DeadNodeDetector. If true, DeadNodeDetector is available.
+   */
+  public boolean isDeadNodeDetectionEnabled() {
+    return deadNodeDetectionEnabled;
+  }
+
+  /**
+   * Obtain DeadNodeDetector of the current client.
+   */
+  public DeadNodeDetector getDeadNodeDetector() {
+    return deadNodeDetector;
+  }
+
+  /**
+   * Close dead node detector thread.
+   */
+  public void stopDeadNodeDetectorThread() {
+    if (deadNodeDetectorThr != null) {
+      deadNodeDetectorThr.interrupt();
+      try {
+        deadNodeDetectorThr.join();
+      } catch (InterruptedException e) {
+        LOG.warn("Encountered exception while waiting to join on dead " +
+            "node detector thread.", e);
+      }
+    }
+  }
 }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 77ee893..ad4e499 100755
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -48,6 +48,8 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -240,6 +242,20 @@ public class DFSClient implements java.io.Closeable, 
RemotePeerFactory,
   private final int smallBufferSize;
   private final long serverDefaultsValidityPeriod;
 
+  /**
+   * Disabled stop DeadNodeDetectorThread for the testing when MiniDFSCluster
+   * start.
+   */
+  private static volatile boolean disabledStopDeadNodeDetectorThreadForTest =
+      false;
+
+  @VisibleForTesting
+  public static void setDisabledStopDeadNodeDetectorThreadForTest(
+      boolean disabledStopDeadNodeDetectorThreadForTest) {
+    DFSClient.disabledStopDeadNodeDetectorThreadForTest =
+        disabledStopDeadNodeDetectorThreadForTest;
+  }
+
   public DfsClientConf getConf() {
     return dfsClientConf;
   }
@@ -624,6 +640,11 @@ public class DFSClient implements java.io.Closeable, 
RemotePeerFactory,
       // lease renewal stops when all files are closed
       closeAllFilesBeingWritten(false);
       clientRunning = false;
+      // close dead node detector thread
+      if (!disabledStopDeadNodeDetectorThreadForTest) {
+        clientContext.stopDeadNodeDetectorThread();
+      }
+
       // close connections to the namenode
       closeConnectionToNamenode();
     }
@@ -3053,4 +3074,98 @@ public class DFSClient implements java.io.Closeable, 
RemotePeerFactory,
       throws IOException {
     return namenode.getHAServiceState();
   }
+
+  /**
+   * If deadNodeDetectionEnabled is true, return the dead nodes that detected 
by
+   * all the DFSInputStreams in the same client. Otherwise return the dead 
nodes
+   * that detected by given DFSInputStream.
+   */
+  public ConcurrentHashMap<DatanodeInfo, DatanodeInfo> getDeadNodes(
+      DFSInputStream dfsInputStream) {
+    if (clientContext.isDeadNodeDetectionEnabled()) {
+      ConcurrentHashMap<DatanodeInfo, DatanodeInfo> deadNodes =
+          new ConcurrentHashMap<DatanodeInfo, DatanodeInfo>();
+      if (dfsInputStream != null) {
+        deadNodes.putAll(dfsInputStream.getLocalDeadNodes());
+      }
+
+      Set<DatanodeInfo> detectDeadNodes =
+          clientContext.getDeadNodeDetector().clearAndGetDetectedDeadNodes();
+      for (DatanodeInfo detectDeadNode : detectDeadNodes) {
+        deadNodes.put(detectDeadNode, detectDeadNode);
+      }
+      return deadNodes;
+    } else {
+      return dfsInputStream.getLocalDeadNodes();
+    }
+  }
+
+  /**
+   * If deadNodeDetectionEnabled is true, judgement based on whether this
+   * datanode is included or not in DeadNodeDetector. Otherwise judgment based
+   * given DFSInputStream.
+   */
+  public boolean isDeadNode(DFSInputStream dfsInputStream,
+      DatanodeInfo datanodeInfo) {
+    if (isDeadNodeDetectionEnabled()) {
+      boolean isDeadNode =
+          clientContext.getDeadNodeDetector().isDeadNode(datanodeInfo);
+      if (dfsInputStream != null) {
+        isDeadNode = isDeadNode
+            || dfsInputStream.getLocalDeadNodes().contains(datanodeInfo);
+      }
+      return isDeadNode;
+    } else {
+      return dfsInputStream.getLocalDeadNodes().contains(datanodeInfo);
+    }
+  }
+
+  /**
+   * Add given datanode in DeadNodeDetector.
+   */
+  public void addNodeToDeadNodeDetector(DFSInputStream dfsInputStream,
+      DatanodeInfo datanodeInfo) {
+    if (!isDeadNodeDetectionEnabled()) {
+      LOG.debug("DeadNode detection is not enabled, skip to add node {}.",
+          datanodeInfo);
+      return;
+    }
+    clientContext.getDeadNodeDetector().addNodeToDetect(dfsInputStream,
+        datanodeInfo);
+  }
+
+  /**
+   * Remove given datanode from DeadNodeDetector.
+   */
+  public void removeNodeFromDeadNodeDetector(DFSInputStream dfsInputStream,
+      DatanodeInfo datanodeInfo) {
+    if (!isDeadNodeDetectionEnabled()) {
+      LOG.debug("DeadNode detection is not enabled, skip to remove node {}.",
+          datanodeInfo);
+      return;
+    }
+    clientContext.getDeadNodeDetector()
+        .removeNodeFromDeadNodeDetector(dfsInputStream, datanodeInfo);
+  }
+
+  /**
+   * Remove datanodes that given block placed on from DeadNodeDetector.
+   */
+  public void removeNodeFromDeadNodeDetector(DFSInputStream dfsInputStream,
+      LocatedBlocks locatedBlocks) {
+    if (!isDeadNodeDetectionEnabled() || locatedBlocks == null) {
+      LOG.debug("DeadNode detection is not enabled or given block {} " +
+          "is null, skip to remove node.", locatedBlocks);
+      return;
+    }
+    for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
+      for (DatanodeInfo datanodeInfo : locatedBlock.getLocations()) {
+        removeNodeFromDeadNodeDetector(dfsInputStream, datanodeInfo);
+      }
+    }
+  }
+
+  private boolean isDeadNodeDetectionEnabled() {
+    return clientContext.isDeadNodeDetectionEnabled();
+  }
 }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 939c45c..f17f944 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -260,10 +260,26 @@ public class DFSInputStream extends FSInputStream
 
   private byte[] oneByteBuf; // used for 'int read()'
 
-  void addToDeadNodes(DatanodeInfo dnInfo) {
+  protected void addToLocalDeadNodes(DatanodeInfo dnInfo) {
     deadNodes.put(dnInfo, dnInfo);
   }
 
+  protected void removeFromLocalDeadNodes(DatanodeInfo dnInfo) {
+    deadNodes.remove(dnInfo);
+  }
+
+  protected ConcurrentHashMap<DatanodeInfo, DatanodeInfo> getLocalDeadNodes() {
+    return deadNodes;
+  }
+
+  private void clearLocalDeadNodes() {
+    deadNodes.clear();
+  }
+
+  protected DFSClient getDFSClient() {
+    return dfsClient;
+  }
+
   DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
       LocatedBlocks locatedBlocks) throws IOException {
     this.dfsClient = dfsClient;
@@ -688,7 +704,8 @@ public class DFSInputStream extends FSInputStream
               "add to deadNodes and continue. ", targetAddr,
               targetBlock.getBlock(), ex);
           // Put chosen node into dead list, continue
-          addToDeadNodes(chosenNode);
+          addToLocalDeadNodes(chosenNode);
+          dfsClient.addNodeToDeadNodeDetector(this, chosenNode);
         }
       }
     }
@@ -740,28 +757,40 @@ public class DFSInputStream extends FSInputStream
    */
   @Override
   public synchronized void close() throws IOException {
-    if (!closed.compareAndSet(false, true)) {
-      DFSClient.LOG.debug("DFSInputStream has been closed already");
-      return;
-    }
-    dfsClient.checkOpen();
-
-    if ((extendedReadBuffers != null) && (!extendedReadBuffers.isEmpty())) {
-      final StringBuilder builder = new StringBuilder();
-      extendedReadBuffers.visitAll(new IdentityHashStore.Visitor<ByteBuffer, 
Object>() {
-        private String prefix = "";
-        @Override
-        public void accept(ByteBuffer k, Object v) {
-          builder.append(prefix).append(k);
-          prefix = ", ";
-        }
-      });
-      DFSClient.LOG.warn("closing file " + src + ", but there are still " +
-          "unreleased ByteBuffers allocated by read().  " +
-          "Please release " + builder.toString() + ".");
+    try {
+      if (!closed.compareAndSet(false, true)) {
+        DFSClient.LOG.debug("DFSInputStream has been closed already");
+        return;
+      }
+      dfsClient.checkOpen();
+
+      if ((extendedReadBuffers != null) && (!extendedReadBuffers.isEmpty())) {
+        final StringBuilder builder = new StringBuilder();
+        extendedReadBuffers
+            .visitAll(new IdentityHashStore.Visitor<ByteBuffer, Object>() {
+              private String prefix = "";
+
+              @Override
+              public void accept(ByteBuffer k, Object v) {
+                builder.append(prefix).append(k);
+                prefix = ", ";
+              }
+            });
+        DFSClient.LOG.warn("closing file " + src + ", but there are still "
+            + "unreleased ByteBuffers allocated by read().  "
+            + "Please release " + builder.toString() + ".");
+      }
+      closeCurrentBlockReaders();
+      super.close();
+    } finally {
+      /**
+       * If dfsInputStream is closed and datanode is in
+       * DeadNodeDetector#dfsInputStreamNodes, we need remove the datanode from
+       * the DeadNodeDetector#dfsInputStreamNodes. Since user should not use
+       * this dfsInputStream anymore.
+       */
+      dfsClient.removeNodeFromDeadNodeDetector(this, locatedBlocks);
     }
-    closeCurrentBlockReaders();
-    super.close();
   }
 
   @Override
@@ -914,7 +943,8 @@ public class DFSInputStream extends FSInputStream
          */
         sourceFound = seekToBlockSource(pos);
       } else {
-        addToDeadNodes(currentNode);
+        addToLocalDeadNodes(currentNode);
+        dfsClient.addNodeToDeadNodeDetector(this, currentNode);
         sourceFound = seekToNewSource(pos);
       }
       if (!sourceFound) {
@@ -964,7 +994,10 @@ public class DFSInputStream extends FSInputStream
             DFSClient.LOG.warn("DFS Read", e);
           }
           blockEnd = -1;
-          if (currentNode != null) { addToDeadNodes(currentNode); }
+          if (currentNode != null) {
+            addToLocalDeadNodes(currentNode);
+            dfsClient.addNodeToDeadNodeDetector(this, currentNode);
+          }
           if (--retries == 0) {
             throw e;
           }
@@ -1067,7 +1100,7 @@ public class DFSInputStream extends FSInputStream
   private LocatedBlock refetchLocations(LocatedBlock block,
       Collection<DatanodeInfo> ignoredNodes) throws IOException {
     String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
-        deadNodes, ignoredNodes);
+            dfsClient.getDeadNodes(this), ignoredNodes);
     String blockInfo = block.getBlock() + " file=" + src;
     if (failures >= dfsClient.getConf().getMaxBlockAcquireFailures()) {
       String description = "Could not obtain block: " + blockInfo;
@@ -1108,7 +1141,7 @@ public class DFSInputStream extends FSInputStream
       throw new InterruptedIOException(
           "Interrupted while choosing DataNode for read.");
     }
-    deadNodes.clear(); //2nd option is to remove only nodes[blockId]
+    clearLocalDeadNodes(); //2nd option is to remove only nodes[blockId]
     openInfo(true);
     block = refreshLocatedBlock(block);
     failures++;
@@ -1129,7 +1162,7 @@ public class DFSInputStream extends FSInputStream
     StorageType storageType = null;
     if (nodes != null) {
       for (int i = 0; i < nodes.length; i++) {
-        if (!deadNodes.containsKey(nodes[i])
+        if (!dfsClient.getDeadNodes(this).containsKey(nodes[i])
             && (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) {
           chosenNode = nodes[i];
           // Storage types are ordered to correspond with nodes, so use the 
same
@@ -1272,7 +1305,7 @@ public class DFSInputStream extends FSInputStream
         // we want to remember what we have tried
         addIntoCorruptedBlockMap(block.getBlock(), datanode.info,
             corruptedBlockMap);
-        addToDeadNodes(datanode.info);
+        addToLocalDeadNodes(datanode.info);
         throw new IOException(msg);
       } catch (IOException e) {
         checkInterrupted(e);
@@ -1294,7 +1327,8 @@ public class DFSInputStream extends FSInputStream
           String msg = "Failed to connect to " + datanode.addr + " for file "
               + src + " for block " + block.getBlock() + ":" + e;
           DFSClient.LOG.warn("Connection failure: " + msg, e);
-          addToDeadNodes(datanode.info);
+          addToLocalDeadNodes(datanode.info);
+          dfsClient.addNodeToDeadNodeDetector(this, datanode.info);
           throw new IOException(msg);
         }
         // Refresh the block for updated tokens in case of token failures or
@@ -1708,14 +1742,14 @@ public class DFSInputStream extends FSInputStream
     if (currentNode == null) {
       return seekToBlockSource(targetPos);
     }
-    boolean markedDead = deadNodes.containsKey(currentNode);
-    addToDeadNodes(currentNode);
+    boolean markedDead = dfsClient.isDeadNode(this, currentNode);
+    addToLocalDeadNodes(currentNode);
     DatanodeInfo oldNode = currentNode;
     DatanodeInfo newNode = blockSeekTo(targetPos);
     if (!markedDead) {
       /* remove it from deadNodes. blockSeekTo could have cleared
        * deadNodes and added currentNode again. Thats ok. */
-      deadNodes.remove(oldNode);
+      removeFromLocalDeadNodes(oldNode);
     }
     if (!oldNode.getDatanodeUuid().equals(newNode.getDatanodeUuid())) {
       currentNode = newNode;
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java
new file mode 100644
index 0000000..c15ba8a
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java
@@ -0,0 +1,586 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.util.Daemon;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_DEFAULT;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_DEFAULT;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_DEFAULT;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_KEY;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_DEFAULT;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_DEFAULT;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_KEY;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_DEFAULT;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_KEY;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_DEFAULT;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
+
+/**
+ * Detect the dead nodes in advance, and share this information among all the
+ * DFSInputStreams in the same client.
+ */
+public class DeadNodeDetector implements Runnable {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(DeadNodeDetector.class);
+
+  /**
+   * Waiting time when DeadNodeDetector happens error.
+   */
+  private static final long ERROR_SLEEP_MS = 5000;
+
+  /**
+   * Waiting time when DeadNodeDetector's state is idle.
+   */
+  private static final long IDLE_SLEEP_MS = 10000;
+
+  /**
+   * Client context name.
+   */
+  private String name;
+
+  private Configuration conf;
+
+  /**
+   * Dead nodes shared by all the DFSInputStreams of the client.
+   */
+  private final Map<String, DatanodeInfo> deadNodes;
+
+  /**
+   * Record suspect and dead nodes by one DFSInputStream. When node is not used
+   * by one DFSInputStream, remove it from suspectAndDeadNodes#DFSInputStream.
+   * If DFSInputStream does not include any node, remove DFSInputStream from
+   * suspectAndDeadNodes.
+   */
+  private final Map<DFSInputStream, HashSet<DatanodeInfo>>
+          suspectAndDeadNodes;
+
+  /**
+   * Datanodes that is being probed.
+   */
+  private Map<String, DatanodeInfo> probeInProg =
+      new ConcurrentHashMap<String, DatanodeInfo>();
+
+  /**
+   * Interval time in milliseconds for probing dead node behavior.
+   */
+  private long deadNodeDetectInterval = 0;
+
+  /**
+   * Interval time in milliseconds for probing suspect node behavior.
+   */
+  private long suspectNodeDetectInterval = 0;
+
+  /**
+   * The max queue size of probing dead node.
+   */
+  private int maxDeadNodesProbeQueueLen = 0;
+
+  /**
+   * The max queue size of probing suspect node.
+   */
+  private int maxSuspectNodesProbeQueueLen;
+
+  /**
+   * Connection timeout for probing dead node in milliseconds.
+   */
+  private long probeConnectionTimeoutMs;
+
+  /**
+   * The dead node probe queue.
+   */
+  private Queue<DatanodeInfo> deadNodesProbeQueue;
+
+  /**
+   * The suspect node probe queue.
+   */
+  private Queue<DatanodeInfo> suspectNodesProbeQueue;
+
+  /**
+   * The thread pool of probing dead node.
+   */
+  private ExecutorService probeDeadNodesThreadPool;
+
+  /**
+   * The thread pool of probing suspect node.
+   */
+  private ExecutorService probeSuspectNodesThreadPool;
+
+  /**
+   * The scheduler thread of probing dead node.
+   */
+  private Thread probeDeadNodesSchedulerThr;
+
+  /**
+   * The scheduler thread of probing suspect node.
+   */
+  private Thread probeSuspectNodesSchedulerThr;
+
+  /**
+   * The thread pool of probing datanodes' rpc request. Sometimes the data node
+   * can hang and not respond to the client in a short time. And these node 
will
+   * filled with probe thread pool and block other normal node probing.
+   */
+  private ExecutorService rpcThreadPool;
+
+  private int socketTimeout;
+
+  /**
+   * The type of probe.
+   */
+  private enum ProbeType {
+    CHECK_DEAD, CHECK_SUSPECT
+  }
+
+  /**
+   * The state of DeadNodeDetector.
+   */
+  private enum State {
+    INIT, CHECK_DEAD, IDLE, ERROR
+  }
+
+  /**
+   * Disabled start probe suspect/dead thread for the testing.
+   */
+  private static volatile boolean disabledProbeThreadForTest = false;
+
+  private State state;
+
+  public DeadNodeDetector(String name, Configuration conf) {
+    this.conf = new Configuration(conf);
+    this.deadNodes = new ConcurrentHashMap<String, DatanodeInfo>();
+    this.suspectAndDeadNodes =
+        new ConcurrentHashMap<DFSInputStream, HashSet<DatanodeInfo>>();
+    this.name = name;
+
+    deadNodeDetectInterval = conf.getLong(
+        DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY,
+        DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_DEFAULT);
+    suspectNodeDetectInterval = conf.getLong(
+        DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY,
+        DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_DEFAULT);
+    socketTimeout =
+        conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsConstants.READ_TIMEOUT);
+    maxDeadNodesProbeQueueLen =
+        conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY,
+            DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT);
+    maxSuspectNodesProbeQueueLen =
+        conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY,
+            DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_DEFAULT);
+    probeConnectionTimeoutMs = conf.getLong(
+        DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY,
+        DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_DEFAULT);
+
+    this.deadNodesProbeQueue =
+        new ArrayBlockingQueue<DatanodeInfo>(maxDeadNodesProbeQueueLen);
+    this.suspectNodesProbeQueue =
+        new ArrayBlockingQueue<DatanodeInfo>(maxSuspectNodesProbeQueueLen);
+
+    int deadNodeDetectDeadThreads =
+        conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_KEY,
+            DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_DEFAULT);
+    int suspectNodeDetectDeadThreads = conf.getInt(
+        DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_KEY,
+        DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_DEFAULT);
+    int rpcThreads = 
conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_KEY,
+        DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_DEFAULT);
+    probeDeadNodesThreadPool = Executors.newFixedThreadPool(
+        deadNodeDetectDeadThreads, new Daemon.DaemonFactory());
+    probeSuspectNodesThreadPool = Executors.newFixedThreadPool(
+        suspectNodeDetectDeadThreads, new Daemon.DaemonFactory());
+    rpcThreadPool =
+        Executors.newFixedThreadPool(rpcThreads, new Daemon.DaemonFactory());
+
+    if (!disabledProbeThreadForTest) {
+      startProbeScheduler();
+    }
+
+    LOG.info("Start dead node detector for DFSClient {}.", this.name);
+    state = State.INIT;
+  }
+
+  @Override
+  public void run() {
+    while (!Thread.currentThread().isInterrupted()) {
+      clearAndGetDetectedDeadNodes();
+      LOG.debug("Current detector state {}, the detected nodes: {}.", state,
+          deadNodes.values());
+      switch (state) {
+      case INIT:
+        init();
+        break;
+      case CHECK_DEAD:
+        checkDeadNodes();
+        break;
+      case IDLE:
+        idle();
+        break;
+      case ERROR:
+        try {
+          Thread.sleep(ERROR_SLEEP_MS);
+        } catch (InterruptedException e) {
+          LOG.debug("Got interrupted while DeadNodeDetector is error.", e);
+          Thread.currentThread().interrupt();
+        }
+        return;
+      default:
+        break;
+      }
+    }
+  }
+
+  @VisibleForTesting
+  static void setDisabledProbeThreadForTest(
+      boolean disabledProbeThreadForTest) {
+    DeadNodeDetector.disabledProbeThreadForTest = disabledProbeThreadForTest;
+  }
+
+  /**
+   * Start probe dead node and suspect node thread.
+   */
+  @VisibleForTesting
+  void startProbeScheduler() {
+    probeDeadNodesSchedulerThr =
+            new Thread(new ProbeScheduler(this, ProbeType.CHECK_DEAD));
+    probeDeadNodesSchedulerThr.setDaemon(true);
+    probeDeadNodesSchedulerThr.start();
+
+    probeSuspectNodesSchedulerThr =
+            new Thread(new ProbeScheduler(this, ProbeType.CHECK_SUSPECT));
+    probeSuspectNodesSchedulerThr.setDaemon(true);
+    probeSuspectNodesSchedulerThr.start();
+  }
+
+  /**
+   * Prode datanode by probe byte.
+   */
+  private void scheduleProbe(ProbeType type) {
+    LOG.debug("Schedule probe datanode for probe type: {}.", type);
+    DatanodeInfo datanodeInfo = null;
+    if (type == ProbeType.CHECK_DEAD) {
+      while ((datanodeInfo = deadNodesProbeQueue.poll()) != null) {
+        if (probeInProg.containsKey(datanodeInfo.getDatanodeUuid())) {
+          LOG.debug("The datanode {} is already contained in probe queue, " +
+              "skip to add it.", datanodeInfo);
+          continue;
+        }
+        probeInProg.put(datanodeInfo.getDatanodeUuid(), datanodeInfo);
+        Probe probe = new Probe(this, datanodeInfo, ProbeType.CHECK_DEAD);
+        probeDeadNodesThreadPool.execute(probe);
+      }
+    } else if (type == ProbeType.CHECK_SUSPECT) {
+      while ((datanodeInfo = suspectNodesProbeQueue.poll()) != null) {
+        if (probeInProg.containsKey(datanodeInfo.getDatanodeUuid())) {
+          continue;
+        }
+        probeInProg.put(datanodeInfo.getDatanodeUuid(), datanodeInfo);
+        Probe probe = new Probe(this, datanodeInfo, ProbeType.CHECK_SUSPECT);
+        probeSuspectNodesThreadPool.execute(probe);
+      }
+    }
+  }
+
+  /**
+   * Request the data node through rpc, and determine the data node status 
based
+   * on the returned result.
+   */
+  class Probe implements Runnable {
+    private DeadNodeDetector deadNodeDetector;
+    private DatanodeInfo datanodeInfo;
+    private ProbeType type;
+
+    Probe(DeadNodeDetector deadNodeDetector, DatanodeInfo datanodeInfo,
+           ProbeType type) {
+      this.deadNodeDetector = deadNodeDetector;
+      this.datanodeInfo = datanodeInfo;
+      this.type = type;
+    }
+
+    public DatanodeInfo getDatanodeInfo() {
+      return datanodeInfo;
+    }
+
+    public ProbeType getType() {
+      return type;
+    }
+
+    @Override
+    public void run() {
+      LOG.debug("Check node: {}, type: {}.", datanodeInfo, type);
+      try {
+        final ClientDatanodeProtocol proxy =
+            DFSUtilClient.createClientDatanodeProtocolProxy(datanodeInfo,
+                deadNodeDetector.conf, socketTimeout, true);
+
+        Future<DatanodeLocalInfo> future =
+            rpcThreadPool.submit(new Callable<DatanodeLocalInfo>() {
+              @Override
+              public DatanodeLocalInfo call() throws Exception {
+                return proxy.getDatanodeInfo();
+              }
+            });
+
+        try {
+          future.get(probeConnectionTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+          LOG.error("Probe failed, datanode: {}, type: {}.", datanodeInfo, 
type,
+              e);
+          deadNodeDetector.probeCallBack(this, false);
+          return;
+        } finally {
+          future.cancel(true);
+        }
+        deadNodeDetector.probeCallBack(this, true);
+        return;
+      } catch (Exception e) {
+        LOG.error("Probe failed, datanode: {}, type: {}.", datanodeInfo, type,
+            e);
+      }
+
+      deadNodeDetector.probeCallBack(this, false);
+    }
+  }
+
+  /**
+   * Handle data node, according to probe result. When ProbeType is CHECK_DEAD,
+   * remove the datanode from DeadNodeDetector#deadNodes if probe success.
+   */
+  private void probeCallBack(Probe probe, boolean success) {
+    LOG.debug("Probe datanode: {} result: {}, type: {}",
+        probe.getDatanodeInfo(), success, probe.getType());
+    probeInProg.remove(probe.getDatanodeInfo().getDatanodeUuid());
+    if (success) {
+      if (probe.getType() == ProbeType.CHECK_DEAD) {
+        LOG.info("Remove the node out from dead node list: {}.",
+            probe.getDatanodeInfo());
+        removeDeadNode(probe.getDatanodeInfo());
+      } else if (probe.getType() == ProbeType.CHECK_SUSPECT) {
+        LOG.debug("Remove the node out from suspect node list: {}.",
+            probe.getDatanodeInfo());
+        removeNodeFromDeadNodeDetector(probe.getDatanodeInfo());
+      }
+    } else {
+      if (probe.getType() == ProbeType.CHECK_SUSPECT) {
+        LOG.info("Add the node to dead node list: {}.",
+            probe.getDatanodeInfo());
+        addToDead(probe.getDatanodeInfo());
+      }
+    }
+  }
+
+  /**
+   * Check dead node periodically.
+   */
+  private void checkDeadNodes() {
+    Set<DatanodeInfo> datanodeInfos = clearAndGetDetectedDeadNodes();
+    for (DatanodeInfo datanodeInfo : datanodeInfos) {
+      LOG.debug("Add dead node to check: {}.", datanodeInfo);
+      if (!deadNodesProbeQueue.offer(datanodeInfo)) {
+        LOG.debug("Skip to add dead node {} to check " +
+                "since the probe queue is full.", datanodeInfo);
+        break;
+      }
+    }
+    state = State.IDLE;
+  }
+
+  private void idle() {
+    try {
+      Thread.sleep(IDLE_SLEEP_MS);
+    } catch (InterruptedException e) {
+      LOG.debug("Got interrupted while DeadNodeDetector is idle.", e);
+      Thread.currentThread().interrupt();
+    }
+
+    state = State.CHECK_DEAD;
+  }
+
+  private void init() {
+    state = State.CHECK_DEAD;
+  }
+
+  private void addToDead(DatanodeInfo datanodeInfo) {
+    deadNodes.put(datanodeInfo.getDatanodeUuid(), datanodeInfo);
+  }
+
+  public boolean isDeadNode(DatanodeInfo datanodeInfo) {
+    return deadNodes.containsKey(datanodeInfo.getDatanodeUuid());
+  }
+
+  private void removeFromDead(DatanodeInfo datanodeInfo) {
+    deadNodes.remove(datanodeInfo.getDatanodeUuid());
+  }
+
+  public Queue<DatanodeInfo> getDeadNodesProbeQueue() {
+    return deadNodesProbeQueue;
+  }
+
+  public Queue<DatanodeInfo> getSuspectNodesProbeQueue() {
+    return suspectNodesProbeQueue;
+  }
+
+  /**
+   * Add datanode to suspectNodes and suspectAndDeadNodes.
+   */
+  public synchronized void addNodeToDetect(DFSInputStream dfsInputStream,
+      DatanodeInfo datanodeInfo) {
+    HashSet<DatanodeInfo> datanodeInfos =
+        suspectAndDeadNodes.get(dfsInputStream);
+    if (datanodeInfos == null) {
+      datanodeInfos = new HashSet<DatanodeInfo>();
+      datanodeInfos.add(datanodeInfo);
+      suspectAndDeadNodes.put(dfsInputStream, datanodeInfos);
+    } else {
+      datanodeInfos.add(datanodeInfo);
+    }
+
+    addSuspectNodeToDetect(datanodeInfo);
+  }
+
+  /**
+   * Add datanode to suspectNodes.
+   */
+  private boolean addSuspectNodeToDetect(DatanodeInfo datanodeInfo) {
+    return suspectNodesProbeQueue.offer(datanodeInfo);
+  }
+
+    /**
+     * Remove dead node which is not used by any DFSInputStream from deadNodes.
+     * @return new dead node shared by all DFSInputStreams.
+     */
+  public synchronized Set<DatanodeInfo> clearAndGetDetectedDeadNodes() {
+    // remove the dead nodes who doesn't have any inputstream first
+    Set<DatanodeInfo> newDeadNodes = new HashSet<DatanodeInfo>();
+    for (HashSet<DatanodeInfo> datanodeInfos : suspectAndDeadNodes.values()) {
+      newDeadNodes.addAll(datanodeInfos);
+    }
+
+    for (DatanodeInfo datanodeInfo : deadNodes.values()) {
+      if (!newDeadNodes.contains(datanodeInfo)) {
+        deadNodes.remove(datanodeInfo.getDatanodeUuid());
+      }
+    }
+    return new HashSet<>(deadNodes.values());
+  }
+
+  /**
+   * Remove suspect and dead node from suspectAndDeadNodes#dfsInputStream and
+   *  local deadNodes.
+   */
+  public synchronized void removeNodeFromDeadNodeDetector(
+      DFSInputStream dfsInputStream, DatanodeInfo datanodeInfo) {
+    Set<DatanodeInfo> datanodeInfos = suspectAndDeadNodes.get(dfsInputStream);
+    if (datanodeInfos != null) {
+      datanodeInfos.remove(datanodeInfo);
+      dfsInputStream.removeFromLocalDeadNodes(datanodeInfo);
+      if (datanodeInfos.isEmpty()) {
+        suspectAndDeadNodes.remove(dfsInputStream);
+      }
+    }
+  }
+
+  /**
+   * Remove suspect and dead node from suspectAndDeadNodes#dfsInputStream and
+   *  local deadNodes.
+   */
+  private synchronized void removeNodeFromDeadNodeDetector(
+      DatanodeInfo datanodeInfo) {
+    for (Map.Entry<DFSInputStream, HashSet<DatanodeInfo>> entry :
+            suspectAndDeadNodes.entrySet()) {
+      Set<DatanodeInfo> datanodeInfos = entry.getValue();
+      if (datanodeInfos.remove(datanodeInfo)) {
+        DFSInputStream dfsInputStream = entry.getKey();
+        dfsInputStream.removeFromLocalDeadNodes(datanodeInfo);
+        if (datanodeInfos.isEmpty()) {
+          suspectAndDeadNodes.remove(dfsInputStream);
+        }
+      }
+    }
+  }
+
+  /**
+   * Remove suspect and dead node from suspectAndDeadNodes#dfsInputStream and
+   * deadNodes.
+   */
+  private void removeDeadNode(DatanodeInfo datanodeInfo) {
+    removeNodeFromDeadNodeDetector(datanodeInfo);
+    removeFromDead(datanodeInfo);
+  }
+
+  private static void probeSleep(long time) {
+    try {
+      Thread.sleep(time);
+    } catch (InterruptedException e) {
+      LOG.debug("Got interrupted while probe is scheduling.", e);
+      Thread.currentThread().interrupt();
+      return;
+    }
+  }
+
+  /**
+   * Schedule probe data node.
+   */
+  static class ProbeScheduler implements Runnable {
+    private DeadNodeDetector deadNodeDetector;
+    private ProbeType type;
+
+    ProbeScheduler(DeadNodeDetector deadNodeDetector, ProbeType type) {
+      this.deadNodeDetector = deadNodeDetector;
+      this.type = type;
+    }
+
+    @Override
+    public void run() {
+      while (!Thread.currentThread().isInterrupted()) {
+        deadNodeDetector.scheduleProbe(type);
+        if (type == ProbeType.CHECK_SUSPECT) {
+          probeSleep(deadNodeDetector.suspectNodeDetectInterval);
+        } else {
+          probeSleep(deadNodeDetector.deadNodeDetectInterval);
+        }
+      }
+    }
+  }
+}
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index 8122693..09257b1 100755
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -163,6 +163,45 @@ public interface HdfsClientConfigKeys {
       "dfs.datanode.hdfs-blocks-metadata.enabled";
   boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = false;
 
+  String DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY =
+          "dfs.client.deadnode.detection.enabled";
+  boolean DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_DEFAULT = false;
+
+  String DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY =
+      "dfs.client.deadnode.detection.deadnode.queue.max";
+  int DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT = 100;
+
+  String DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY =
+      "dfs.client.deadnode.detection.suspectnode.queue.max";
+  int DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_DEFAULT = 1000;
+
+  String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY =
+      "dfs.client.deadnode.detection.probe.connection.timeout.ms";
+  long DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_DEFAULT =
+      20000;
+
+  String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_KEY =
+      "dfs.client.deadnode.detection.probe.deadnode.threads";
+  int DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_DEFAULT = 10;
+
+  String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_KEY =
+      "dfs.client.deadnode.detection.probe.suspectnode.threads";
+  int DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_DEFAULT = 10;
+
+  String DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_KEY =
+      "dfs.client.deadnode.detection.rpc.threads";
+  int DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_DEFAULT = 20;
+
+  String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY =
+      "dfs.client.deadnode.detection.probe.deadnode.interval.ms";
+  long DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_DEFAULT =
+      60 * 1000; // 60s
+
+  String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY =
+      "dfs.client.deadnode.detection.probe.suspectnode.interval.ms";
+  long DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_DEFAULT =
+      300; // 300ms
+
   String  DFS_DATANODE_KERBEROS_PRINCIPAL_KEY =
       "dfs.datanode.kerberos.principal";
   String  DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes";
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
index df2fd48..64121af 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
@@ -44,6 +44,8 @@ import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACH
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_DEFAULT;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT;
@@ -143,6 +145,8 @@ public class DfsClientConf {
   private final boolean dataTransferTcpNoDelay;
   private final long leaseHardLimitPeriod;
 
+  private final boolean deadNodeDetectionEnabled;
+
   public DfsClientConf(Configuration conf) {
     // The hdfsTimeout is currently the same as the ipc timeout
     hdfsTimeout = Client.getRpcTimeout(conf);
@@ -264,6 +268,9 @@ public class DfsClientConf {
         HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY,
         HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_DEFAULT);
 
+    deadNodeDetectionEnabled =
+        conf.getBoolean(DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY,
+            DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_DEFAULT);
     replicaAccessorBuilderClasses = loadReplicaAccessorBuilderClasses(conf);
 
     leaseHardLimitPeriod =
@@ -591,6 +598,13 @@ public class DfsClientConf {
   }
 
   /**
+   * @return the deadNodeDetectionEnabled
+   */
+  public boolean isDeadNodeDetectionEnabled() {
+    return deadNodeDetectionEnabled;
+  }
+
+  /**
    * @return the replicaAccessorBuilderClasses
    */
   public List<Class<? extends ReplicaAccessorBuilder>>
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 06f48f8..262d5be 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -2796,6 +2796,79 @@
   </description>
 </property>
 
+  <property>
+    <name>dfs.client.deadnode.detection.enabled</name>
+    <value>false</value>
+    <description>
+      Set to true to enable dead node detection in client side. Then all the 
DFSInputStreams of the same client can
+      share the dead node information.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.client.deadnode.detection.deadnode.queue.max</name>
+    <value>100</value>
+    <description>
+      The max queue size of probing dead node.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.client.deadnode.detection.suspectnode.queue.max</name>
+    <value>1000</value>
+    <description>
+      The max queue size of probing suspect node.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.client.deadnode.detection.probe.deadnode.threads</name>
+    <value>10</value>
+    <description>
+      The maximum number of threads to use for probing dead node.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.client.deadnode.detection.probe.suspectnode.threads</name>
+    <value>10</value>
+    <description>
+      The maximum number of threads to use for probing suspect node.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.client.deadnode.detection.rpc.threads</name>
+    <value>20</value>
+    <description>
+      The maximum number of threads to use for calling RPC call to recheck the 
liveness of dead node.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.client.deadnode.detection.probe.deadnode.interval.ms</name>
+    <value>60000</value>
+    <description>
+      Interval time in milliseconds for probing dead node behavior.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.client.deadnode.detection.probe.suspectnode.interval.ms</name>
+    <value>300</value>
+    <description>
+      Interval time in milliseconds for probing suspect node behavior.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.client.deadnode.detection.probe.connection.timeout.ms</name>
+    <value>20000</value>
+    <description>
+      Connection timeout for probing dead node in milliseconds.
+    </description>
+  </property>
+
 <property>
   <name>dfs.block.local-path-access.user</name>
   <value></value>
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java
new file mode 100644
index 0000000..b24c692
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java
@@ -0,0 +1,406 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import com.google.common.base.Supplier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for dead node detection in DFSClient.
+ */
+public class TestDeadNodeDetection {
+
+  private MiniDFSCluster cluster;
+  private Configuration conf;
+
+  @Before
+  public void setUp() {
+    cluster = null;
+    conf = new HdfsConfiguration();
+    conf.setBoolean(DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY, true);
+    conf.setLong(
+        DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY,
+        1000);
+    conf.setLong(
+        DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY,
+        100);
+    conf.setLong(
+        DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY,
+        1000);
+    conf.setInt(DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, 0);
+  }
+
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testDeadNodeDetectionInBackground() throws Exception {
+    conf.set(DFS_CLIENT_CONTEXT, "testDeadNodeDetectionInBackground");
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    cluster.waitActive();
+
+    FileSystem fs = cluster.getFileSystem();
+    Path filePath = new Path("/testDetectDeadNodeInBackground");
+
+    // 256 bytes data chunk for writes
+    byte[] bytes = new byte[256];
+    for (int index = 0; index < bytes.length; index++) {
+      bytes[index] = '0';
+    }
+
+    // File with a 512 bytes block size
+    FSDataOutputStream out = fs.create(filePath, true, 4096, (short) 3, 512);
+
+    // Write a block to all 3 DNs (2x256bytes).
+    out.write(bytes);
+    out.write(bytes);
+    out.hflush();
+    out.close();
+
+    // Remove three DNs,
+    cluster.stopDataNode(0);
+    cluster.stopDataNode(0);
+    cluster.stopDataNode(0);
+
+    FSDataInputStream in = fs.open(filePath);
+    DFSInputStream din = (DFSInputStream) in.getWrappedStream();
+    DFSClient dfsClient = din.getDFSClient();
+    try {
+      try {
+        in.read();
+      } catch (BlockMissingException e) {
+      }
+
+      DefaultCoordination defaultCoordination = new DefaultCoordination();
+      defaultCoordination.startWaitForDeadNodeThread(dfsClient, 3);
+      defaultCoordination.sync();
+
+      assertEquals(3, dfsClient.getDeadNodes(din).size());
+      assertEquals(3, dfsClient.getClientContext().getDeadNodeDetector()
+          .clearAndGetDetectedDeadNodes().size());
+    } finally {
+      in.close();
+      fs.delete(filePath, true);
+      // check the dead node again here, the dead node is expected be removed
+      assertEquals(0, dfsClient.getDeadNodes(din).size());
+      assertEquals(0, dfsClient.getClientContext().getDeadNodeDetector()
+          .clearAndGetDetectedDeadNodes().size());
+    }
+  }
+
+  @Test
+  public void testDeadNodeDetectionInMultipleDFSInputStream()
+      throws IOException {
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+
+    FileSystem fs = cluster.getFileSystem();
+    Path filePath = new Path("/testDeadNodeMultipleDFSInputStream");
+    createFile(fs, filePath);
+
+    String datanodeUuid = cluster.getDataNodes().get(0).getDatanodeUuid();
+    FSDataInputStream in1 = fs.open(filePath);
+    DFSInputStream din1 = (DFSInputStream) in1.getWrappedStream();
+    DFSClient dfsClient1 = din1.getDFSClient();
+    cluster.stopDataNode(0);
+
+    FSDataInputStream in2 = fs.open(filePath);
+    DFSInputStream din2 = null;
+    DFSClient dfsClient2 = null;
+    try {
+      try {
+        in1.read();
+      } catch (BlockMissingException e) {
+      }
+
+      din2 = (DFSInputStream) in2.getWrappedStream();
+      dfsClient2 = din2.getDFSClient();
+
+      DefaultCoordination defaultCoordination = new DefaultCoordination();
+      defaultCoordination.startWaitForDeadNodeThread(dfsClient2, 1);
+      defaultCoordination.sync();
+      assertEquals(dfsClient1.toString(), dfsClient2.toString());
+      assertEquals(1, dfsClient1.getDeadNodes(din1).size());
+      assertEquals(1, dfsClient2.getDeadNodes(din2).size());
+      assertEquals(1, dfsClient1.getClientContext().getDeadNodeDetector()
+          .clearAndGetDetectedDeadNodes().size());
+      assertEquals(1, dfsClient2.getClientContext().getDeadNodeDetector()
+          .clearAndGetDetectedDeadNodes().size());
+      // check the dn uuid of dead node to see if its expected dead node
+      assertEquals(datanodeUuid,
+          ((DatanodeInfo) dfsClient1.getClientContext().getDeadNodeDetector()
+              .clearAndGetDetectedDeadNodes().toArray()[0]).getDatanodeUuid());
+      assertEquals(datanodeUuid,
+          ((DatanodeInfo) dfsClient2.getClientContext().getDeadNodeDetector()
+              .clearAndGetDetectedDeadNodes().toArray()[0]).getDatanodeUuid());
+    } finally {
+      in1.close();
+      in2.close();
+      deleteFile(fs, filePath);
+      // check the dead node again here, the dead node is expected be removed
+      assertEquals(0, dfsClient1.getDeadNodes(din1).size());
+      assertEquals(0, dfsClient2.getDeadNodes(din2).size());
+      assertEquals(0, dfsClient1.getClientContext().getDeadNodeDetector()
+          .clearAndGetDetectedDeadNodes().size());
+      assertEquals(0, dfsClient2.getClientContext().getDeadNodeDetector()
+          .clearAndGetDetectedDeadNodes().size());
+    }
+  }
+
+  @Test
+  public void testDeadNodeDetectionDeadNodeRecovery() throws Exception {
+    // prevent interrupt deadNodeDetectorThr in cluster.waitActive()
+    DFSClient.setDisabledStopDeadNodeDetectorThreadForTest(true);
+    conf.set(DFS_CLIENT_CONTEXT, "testDeadNodeDetectionDeadNodeRecovery");
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    cluster.waitActive();
+
+    DFSClient.setDisabledStopDeadNodeDetectorThreadForTest(false);
+    FileSystem fs = cluster.getFileSystem();
+    Path filePath = new Path("/testDeadNodeDetectionDeadNodeRecovery");
+    createFile(fs, filePath);
+
+    // Remove three DNs,
+    MiniDFSCluster.DataNodeProperties one = cluster.stopDataNode(0);
+    cluster.stopDataNode(0);
+    cluster.stopDataNode(0);
+
+    FSDataInputStream in = fs.open(filePath);
+    DFSInputStream din = (DFSInputStream) in.getWrappedStream();
+    DFSClient dfsClient = din.getDFSClient();
+    try {
+      try {
+        in.read();
+      } catch (BlockMissingException e) {
+      }
+      DefaultCoordination defaultCoordination = new DefaultCoordination();
+      defaultCoordination.startWaitForDeadNodeThread(dfsClient, 3);
+      defaultCoordination.sync();
+      assertEquals(3, dfsClient.getDeadNodes(din).size());
+      assertEquals(3, dfsClient.getClientContext().getDeadNodeDetector()
+          .clearAndGetDetectedDeadNodes().size());
+
+      cluster.restartDataNode(one, true);
+
+      defaultCoordination = new DefaultCoordination();
+      defaultCoordination.startWaitForDeadNodeThread(dfsClient, 2);
+      defaultCoordination.sync();
+      assertEquals(2, dfsClient.getDeadNodes(din).size());
+      assertEquals(2, dfsClient.getClientContext().getDeadNodeDetector()
+          .clearAndGetDetectedDeadNodes().size());
+    } finally {
+      in.close();
+      deleteFile(fs, filePath);
+      assertEquals(0, dfsClient.getDeadNodes(din).size());
+      assertEquals(0, dfsClient.getClientContext().getDeadNodeDetector()
+          .clearAndGetDetectedDeadNodes().size());
+    }
+  }
+
+  @Test
+  public void testDeadNodeDetectionMaxDeadNodesProbeQueue() throws Exception {
+    conf.setInt(DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY, 1);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    cluster.waitActive();
+
+    FileSystem fs = cluster.getFileSystem();
+    Path filePath = new Path("/testDeadNodeDetectionMaxDeadNodesProbeQueue");
+    createFile(fs, filePath);
+
+    // Remove three DNs,
+    cluster.stopDataNode(0);
+    cluster.stopDataNode(0);
+    cluster.stopDataNode(0);
+
+    FSDataInputStream in = fs.open(filePath);
+    DFSInputStream din = (DFSInputStream) in.getWrappedStream();
+    DFSClient dfsClient = din.getDFSClient();
+    try {
+      try {
+        in.read();
+      } catch (BlockMissingException e) {
+      }
+
+      Thread.sleep(1500);
+      Assert.assertTrue((dfsClient.getClientContext().getDeadNodeDetector()
+          .getDeadNodesProbeQueue().size()
+          + dfsClient.getDeadNodes(din).size()) <= 4);
+    } finally {
+      in.close();
+      deleteFile(fs, filePath);
+    }
+  }
+
+  @Test
+  public void testDeadNodeDetectionSuspectNode() throws Exception {
+    conf.setInt(DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY, 1);
+    DeadNodeDetector.setDisabledProbeThreadForTest(true);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+
+    FileSystem fs = cluster.getFileSystem();
+    Path filePath = new Path("/testDeadNodeDetectionSuspectNode");
+    createFile(fs, filePath);
+
+    MiniDFSCluster.DataNodeProperties one = cluster.stopDataNode(0);
+
+    FSDataInputStream in = fs.open(filePath);
+    DFSInputStream din = (DFSInputStream) in.getWrappedStream();
+    DFSClient dfsClient = din.getDFSClient();
+    DeadNodeDetector deadNodeDetector =
+        dfsClient.getClientContext().getDeadNodeDetector();
+    try {
+      try {
+        in.read();
+      } catch (BlockMissingException e) {
+      }
+      waitForSuspectNode(din.getDFSClient());
+      cluster.restartDataNode(one, true);
+      Assert.assertEquals(1,
+          deadNodeDetector.getSuspectNodesProbeQueue().size());
+      Assert.assertEquals(0,
+          deadNodeDetector.clearAndGetDetectedDeadNodes().size());
+      deadNodeDetector.startProbeScheduler();
+      Thread.sleep(1000);
+      Assert.assertEquals(0,
+          deadNodeDetector.getSuspectNodesProbeQueue().size());
+      Assert.assertEquals(0,
+          deadNodeDetector.clearAndGetDetectedDeadNodes().size());
+    } finally {
+      in.close();
+      deleteFile(fs, filePath);
+      assertEquals(0, dfsClient.getDeadNodes(din).size());
+      assertEquals(0, dfsClient.getClientContext().getDeadNodeDetector()
+          .clearAndGetDetectedDeadNodes().size());
+      // reset disabledProbeThreadForTest
+      DeadNodeDetector.setDisabledProbeThreadForTest(false);
+    }
+  }
+
+  private void createFile(FileSystem fs, Path filePath) throws IOException {
+    FSDataOutputStream out = null;
+    try {
+      // 256 bytes data chunk for writes
+      byte[] bytes = new byte[256];
+      for (int index = 0; index < bytes.length; index++) {
+        bytes[index] = '0';
+      }
+
+      // File with a 512 bytes block size
+      out = fs.create(filePath, true, 4096, (short) 3, 512);
+
+      // Write a block to all 3 DNs (2x256bytes).
+      out.write(bytes);
+      out.write(bytes);
+      out.hflush();
+
+    } finally {
+      out.close();
+    }
+  }
+
+  private void deleteFile(FileSystem fs, Path filePath) throws IOException {
+    fs.delete(filePath, true);
+  }
+
+  private void waitForSuspectNode(final DFSClient dfsClient) throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        try {
+          if (dfsClient.getClientContext().getDeadNodeDetector()
+              .getSuspectNodesProbeQueue().size() > 0) {
+            return true;
+          }
+        } catch (Exception e) {
+          // Ignore the exception
+        }
+
+        return false;
+      }
+    }, 500, 5000);
+  }
+
+  class DefaultCoordination {
+    private Queue<Object> queue = new LinkedBlockingQueue<Object>(1);
+
+    public boolean addToQueue() {
+      return queue.offer(new Object());
+    }
+
+    public Object removeFromQueue() {
+      return queue.poll();
+    }
+
+    public void sync() {
+      while (removeFromQueue() == null) {
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+        }
+      }
+    }
+
+    private void startWaitForDeadNodeThread(final DFSClient dfsClient,
+        final int size) {
+      new Thread(new Runnable() {
+        @Override
+        public void run() {
+          DeadNodeDetector deadNodeDetector =
+                  dfsClient.getClientContext().getDeadNodeDetector();
+          while (deadNodeDetector.clearAndGetDetectedDeadNodes()
+              .size() != size) {
+            try {
+              Thread.sleep(1000);
+            } catch (InterruptedException e) {
+            }
+          }
+          addToQueue();
+        }
+      }).start();
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to