This is an automated email from the ASF dual-hosted git repository. yqlin pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-2.10 by this push: new e6374f0 HDFS-15264. Backport Datanode detection to branch-2.10. Contributed by Lisheng Sun. e6374f0 is described below commit e6374f031af6fb3a5467ccd12d6a4c8d7b0dae1e Author: Yiqun Lin <yq...@apache.org> AuthorDate: Sun May 17 11:59:10 2020 +0800 HDFS-15264. Backport Datanode detection to branch-2.10. Contributed by Lisheng Sun. --- .../java/org/apache/hadoop/hdfs/ClientContext.java | 49 ++ .../java/org/apache/hadoop/hdfs/DFSClient.java | 115 ++++ .../org/apache/hadoop/hdfs/DFSInputStream.java | 100 ++-- .../org/apache/hadoop/hdfs/DeadNodeDetector.java | 586 +++++++++++++++++++++ .../hadoop/hdfs/client/HdfsClientConfigKeys.java | 39 ++ .../hadoop/hdfs/client/impl/DfsClientConf.java | 14 + .../src/main/resources/hdfs-default.xml | 73 +++ .../apache/hadoop/hdfs/TestDeadNodeDetection.java | 406 ++++++++++++++ 8 files changed, 1349 insertions(+), 33 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java index a31945c..2cb30f5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java @@ -39,6 +39,7 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.ScriptBasedMapping; +import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.ReflectionUtils; import com.google.common.annotations.VisibleForTesting; @@ -117,6 +118,19 @@ public class ClientContext { private NodeBase clientNode; private boolean topologyResolutionEnabled; + private Daemon deadNodeDetectorThr = null; + + /** + * The switch to DeadNodeDetector. + */ + private boolean deadNodeDetectionEnabled = false; + + /** + * Detect the dead datanodes in advance, and share this information among all + * the DFSInputStreams in the same client. + */ + private DeadNodeDetector deadNodeDetector = null; + private ClientContext(String name, DfsClientConf conf, Configuration config) { final ShortCircuitConf scConf = conf.getShortCircuitConf(); @@ -133,6 +147,12 @@ public class ClientContext { this.byteArrayManager = ByteArrayManager.newInstance( conf.getWriteByteArrayManagerConf()); + this.deadNodeDetectionEnabled = conf.isDeadNodeDetectionEnabled(); + if (deadNodeDetectionEnabled && deadNodeDetector == null) { + deadNodeDetector = new DeadNodeDetector(name, config); + deadNodeDetectorThr = new Daemon(deadNodeDetector); + deadNodeDetectorThr.start(); + } initTopologyResolution(config); } @@ -250,4 +270,33 @@ public class ClientContext { datanodeInfo.getNetworkLocation()); return NetworkTopology.getDistanceByPath(clientNode, node); } + + /** + * The switch to DeadNodeDetector. If true, DeadNodeDetector is available. + */ + public boolean isDeadNodeDetectionEnabled() { + return deadNodeDetectionEnabled; + } + + /** + * Obtain DeadNodeDetector of the current client. + */ + public DeadNodeDetector getDeadNodeDetector() { + return deadNodeDetector; + } + + /** + * Close dead node detector thread. + */ + public void stopDeadNodeDetectorThread() { + if (deadNodeDetectorThr != null) { + deadNodeDetectorThr.interrupt(); + try { + deadNodeDetectorThr.join(); + } catch (InterruptedException e) { + LOG.warn("Encountered exception while waiting to join on dead " + + "node detector thread.", e); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 77ee893..ad4e499 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -48,6 +48,8 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadPoolExecutor; @@ -240,6 +242,20 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, private final int smallBufferSize; private final long serverDefaultsValidityPeriod; + /** + * Disabled stop DeadNodeDetectorThread for the testing when MiniDFSCluster + * start. + */ + private static volatile boolean disabledStopDeadNodeDetectorThreadForTest = + false; + + @VisibleForTesting + public static void setDisabledStopDeadNodeDetectorThreadForTest( + boolean disabledStopDeadNodeDetectorThreadForTest) { + DFSClient.disabledStopDeadNodeDetectorThreadForTest = + disabledStopDeadNodeDetectorThreadForTest; + } + public DfsClientConf getConf() { return dfsClientConf; } @@ -624,6 +640,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, // lease renewal stops when all files are closed closeAllFilesBeingWritten(false); clientRunning = false; + // close dead node detector thread + if (!disabledStopDeadNodeDetectorThreadForTest) { + clientContext.stopDeadNodeDetectorThread(); + } + // close connections to the namenode closeConnectionToNamenode(); } @@ -3053,4 +3074,98 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, throws IOException { return namenode.getHAServiceState(); } + + /** + * If deadNodeDetectionEnabled is true, return the dead nodes that detected by + * all the DFSInputStreams in the same client. Otherwise return the dead nodes + * that detected by given DFSInputStream. + */ + public ConcurrentHashMap<DatanodeInfo, DatanodeInfo> getDeadNodes( + DFSInputStream dfsInputStream) { + if (clientContext.isDeadNodeDetectionEnabled()) { + ConcurrentHashMap<DatanodeInfo, DatanodeInfo> deadNodes = + new ConcurrentHashMap<DatanodeInfo, DatanodeInfo>(); + if (dfsInputStream != null) { + deadNodes.putAll(dfsInputStream.getLocalDeadNodes()); + } + + Set<DatanodeInfo> detectDeadNodes = + clientContext.getDeadNodeDetector().clearAndGetDetectedDeadNodes(); + for (DatanodeInfo detectDeadNode : detectDeadNodes) { + deadNodes.put(detectDeadNode, detectDeadNode); + } + return deadNodes; + } else { + return dfsInputStream.getLocalDeadNodes(); + } + } + + /** + * If deadNodeDetectionEnabled is true, judgement based on whether this + * datanode is included or not in DeadNodeDetector. Otherwise judgment based + * given DFSInputStream. + */ + public boolean isDeadNode(DFSInputStream dfsInputStream, + DatanodeInfo datanodeInfo) { + if (isDeadNodeDetectionEnabled()) { + boolean isDeadNode = + clientContext.getDeadNodeDetector().isDeadNode(datanodeInfo); + if (dfsInputStream != null) { + isDeadNode = isDeadNode + || dfsInputStream.getLocalDeadNodes().contains(datanodeInfo); + } + return isDeadNode; + } else { + return dfsInputStream.getLocalDeadNodes().contains(datanodeInfo); + } + } + + /** + * Add given datanode in DeadNodeDetector. + */ + public void addNodeToDeadNodeDetector(DFSInputStream dfsInputStream, + DatanodeInfo datanodeInfo) { + if (!isDeadNodeDetectionEnabled()) { + LOG.debug("DeadNode detection is not enabled, skip to add node {}.", + datanodeInfo); + return; + } + clientContext.getDeadNodeDetector().addNodeToDetect(dfsInputStream, + datanodeInfo); + } + + /** + * Remove given datanode from DeadNodeDetector. + */ + public void removeNodeFromDeadNodeDetector(DFSInputStream dfsInputStream, + DatanodeInfo datanodeInfo) { + if (!isDeadNodeDetectionEnabled()) { + LOG.debug("DeadNode detection is not enabled, skip to remove node {}.", + datanodeInfo); + return; + } + clientContext.getDeadNodeDetector() + .removeNodeFromDeadNodeDetector(dfsInputStream, datanodeInfo); + } + + /** + * Remove datanodes that given block placed on from DeadNodeDetector. + */ + public void removeNodeFromDeadNodeDetector(DFSInputStream dfsInputStream, + LocatedBlocks locatedBlocks) { + if (!isDeadNodeDetectionEnabled() || locatedBlocks == null) { + LOG.debug("DeadNode detection is not enabled or given block {} " + + "is null, skip to remove node.", locatedBlocks); + return; + } + for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) { + for (DatanodeInfo datanodeInfo : locatedBlock.getLocations()) { + removeNodeFromDeadNodeDetector(dfsInputStream, datanodeInfo); + } + } + } + + private boolean isDeadNodeDetectionEnabled() { + return clientContext.isDeadNodeDetectionEnabled(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 939c45c..f17f944 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -260,10 +260,26 @@ public class DFSInputStream extends FSInputStream private byte[] oneByteBuf; // used for 'int read()' - void addToDeadNodes(DatanodeInfo dnInfo) { + protected void addToLocalDeadNodes(DatanodeInfo dnInfo) { deadNodes.put(dnInfo, dnInfo); } + protected void removeFromLocalDeadNodes(DatanodeInfo dnInfo) { + deadNodes.remove(dnInfo); + } + + protected ConcurrentHashMap<DatanodeInfo, DatanodeInfo> getLocalDeadNodes() { + return deadNodes; + } + + private void clearLocalDeadNodes() { + deadNodes.clear(); + } + + protected DFSClient getDFSClient() { + return dfsClient; + } + DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum, LocatedBlocks locatedBlocks) throws IOException { this.dfsClient = dfsClient; @@ -688,7 +704,8 @@ public class DFSInputStream extends FSInputStream "add to deadNodes and continue. ", targetAddr, targetBlock.getBlock(), ex); // Put chosen node into dead list, continue - addToDeadNodes(chosenNode); + addToLocalDeadNodes(chosenNode); + dfsClient.addNodeToDeadNodeDetector(this, chosenNode); } } } @@ -740,28 +757,40 @@ public class DFSInputStream extends FSInputStream */ @Override public synchronized void close() throws IOException { - if (!closed.compareAndSet(false, true)) { - DFSClient.LOG.debug("DFSInputStream has been closed already"); - return; - } - dfsClient.checkOpen(); - - if ((extendedReadBuffers != null) && (!extendedReadBuffers.isEmpty())) { - final StringBuilder builder = new StringBuilder(); - extendedReadBuffers.visitAll(new IdentityHashStore.Visitor<ByteBuffer, Object>() { - private String prefix = ""; - @Override - public void accept(ByteBuffer k, Object v) { - builder.append(prefix).append(k); - prefix = ", "; - } - }); - DFSClient.LOG.warn("closing file " + src + ", but there are still " + - "unreleased ByteBuffers allocated by read(). " + - "Please release " + builder.toString() + "."); + try { + if (!closed.compareAndSet(false, true)) { + DFSClient.LOG.debug("DFSInputStream has been closed already"); + return; + } + dfsClient.checkOpen(); + + if ((extendedReadBuffers != null) && (!extendedReadBuffers.isEmpty())) { + final StringBuilder builder = new StringBuilder(); + extendedReadBuffers + .visitAll(new IdentityHashStore.Visitor<ByteBuffer, Object>() { + private String prefix = ""; + + @Override + public void accept(ByteBuffer k, Object v) { + builder.append(prefix).append(k); + prefix = ", "; + } + }); + DFSClient.LOG.warn("closing file " + src + ", but there are still " + + "unreleased ByteBuffers allocated by read(). " + + "Please release " + builder.toString() + "."); + } + closeCurrentBlockReaders(); + super.close(); + } finally { + /** + * If dfsInputStream is closed and datanode is in + * DeadNodeDetector#dfsInputStreamNodes, we need remove the datanode from + * the DeadNodeDetector#dfsInputStreamNodes. Since user should not use + * this dfsInputStream anymore. + */ + dfsClient.removeNodeFromDeadNodeDetector(this, locatedBlocks); } - closeCurrentBlockReaders(); - super.close(); } @Override @@ -914,7 +943,8 @@ public class DFSInputStream extends FSInputStream */ sourceFound = seekToBlockSource(pos); } else { - addToDeadNodes(currentNode); + addToLocalDeadNodes(currentNode); + dfsClient.addNodeToDeadNodeDetector(this, currentNode); sourceFound = seekToNewSource(pos); } if (!sourceFound) { @@ -964,7 +994,10 @@ public class DFSInputStream extends FSInputStream DFSClient.LOG.warn("DFS Read", e); } blockEnd = -1; - if (currentNode != null) { addToDeadNodes(currentNode); } + if (currentNode != null) { + addToLocalDeadNodes(currentNode); + dfsClient.addNodeToDeadNodeDetector(this, currentNode); + } if (--retries == 0) { throw e; } @@ -1067,7 +1100,7 @@ public class DFSInputStream extends FSInputStream private LocatedBlock refetchLocations(LocatedBlock block, Collection<DatanodeInfo> ignoredNodes) throws IOException { String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(), - deadNodes, ignoredNodes); + dfsClient.getDeadNodes(this), ignoredNodes); String blockInfo = block.getBlock() + " file=" + src; if (failures >= dfsClient.getConf().getMaxBlockAcquireFailures()) { String description = "Could not obtain block: " + blockInfo; @@ -1108,7 +1141,7 @@ public class DFSInputStream extends FSInputStream throw new InterruptedIOException( "Interrupted while choosing DataNode for read."); } - deadNodes.clear(); //2nd option is to remove only nodes[blockId] + clearLocalDeadNodes(); //2nd option is to remove only nodes[blockId] openInfo(true); block = refreshLocatedBlock(block); failures++; @@ -1129,7 +1162,7 @@ public class DFSInputStream extends FSInputStream StorageType storageType = null; if (nodes != null) { for (int i = 0; i < nodes.length; i++) { - if (!deadNodes.containsKey(nodes[i]) + if (!dfsClient.getDeadNodes(this).containsKey(nodes[i]) && (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) { chosenNode = nodes[i]; // Storage types are ordered to correspond with nodes, so use the same @@ -1272,7 +1305,7 @@ public class DFSInputStream extends FSInputStream // we want to remember what we have tried addIntoCorruptedBlockMap(block.getBlock(), datanode.info, corruptedBlockMap); - addToDeadNodes(datanode.info); + addToLocalDeadNodes(datanode.info); throw new IOException(msg); } catch (IOException e) { checkInterrupted(e); @@ -1294,7 +1327,8 @@ public class DFSInputStream extends FSInputStream String msg = "Failed to connect to " + datanode.addr + " for file " + src + " for block " + block.getBlock() + ":" + e; DFSClient.LOG.warn("Connection failure: " + msg, e); - addToDeadNodes(datanode.info); + addToLocalDeadNodes(datanode.info); + dfsClient.addNodeToDeadNodeDetector(this, datanode.info); throw new IOException(msg); } // Refresh the block for updated tokens in case of token failures or @@ -1708,14 +1742,14 @@ public class DFSInputStream extends FSInputStream if (currentNode == null) { return seekToBlockSource(targetPos); } - boolean markedDead = deadNodes.containsKey(currentNode); - addToDeadNodes(currentNode); + boolean markedDead = dfsClient.isDeadNode(this, currentNode); + addToLocalDeadNodes(currentNode); DatanodeInfo oldNode = currentNode; DatanodeInfo newNode = blockSeekTo(targetPos); if (!markedDead) { /* remove it from deadNodes. blockSeekTo could have cleared * deadNodes and added currentNode again. Thats ok. */ - deadNodes.remove(oldNode); + removeFromLocalDeadNodes(oldNode); } if (!oldNode.getDatanodeUuid().equals(newNode.getDatanodeUuid())) { currentNode = newNode; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java new file mode 100644 index 0000000..c15ba8a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java @@ -0,0 +1,586 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.util.Daemon; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; + +/** + * Detect the dead nodes in advance, and share this information among all the + * DFSInputStreams in the same client. + */ +public class DeadNodeDetector implements Runnable { + public static final Logger LOG = + LoggerFactory.getLogger(DeadNodeDetector.class); + + /** + * Waiting time when DeadNodeDetector happens error. + */ + private static final long ERROR_SLEEP_MS = 5000; + + /** + * Waiting time when DeadNodeDetector's state is idle. + */ + private static final long IDLE_SLEEP_MS = 10000; + + /** + * Client context name. + */ + private String name; + + private Configuration conf; + + /** + * Dead nodes shared by all the DFSInputStreams of the client. + */ + private final Map<String, DatanodeInfo> deadNodes; + + /** + * Record suspect and dead nodes by one DFSInputStream. When node is not used + * by one DFSInputStream, remove it from suspectAndDeadNodes#DFSInputStream. + * If DFSInputStream does not include any node, remove DFSInputStream from + * suspectAndDeadNodes. + */ + private final Map<DFSInputStream, HashSet<DatanodeInfo>> + suspectAndDeadNodes; + + /** + * Datanodes that is being probed. + */ + private Map<String, DatanodeInfo> probeInProg = + new ConcurrentHashMap<String, DatanodeInfo>(); + + /** + * Interval time in milliseconds for probing dead node behavior. + */ + private long deadNodeDetectInterval = 0; + + /** + * Interval time in milliseconds for probing suspect node behavior. + */ + private long suspectNodeDetectInterval = 0; + + /** + * The max queue size of probing dead node. + */ + private int maxDeadNodesProbeQueueLen = 0; + + /** + * The max queue size of probing suspect node. + */ + private int maxSuspectNodesProbeQueueLen; + + /** + * Connection timeout for probing dead node in milliseconds. + */ + private long probeConnectionTimeoutMs; + + /** + * The dead node probe queue. + */ + private Queue<DatanodeInfo> deadNodesProbeQueue; + + /** + * The suspect node probe queue. + */ + private Queue<DatanodeInfo> suspectNodesProbeQueue; + + /** + * The thread pool of probing dead node. + */ + private ExecutorService probeDeadNodesThreadPool; + + /** + * The thread pool of probing suspect node. + */ + private ExecutorService probeSuspectNodesThreadPool; + + /** + * The scheduler thread of probing dead node. + */ + private Thread probeDeadNodesSchedulerThr; + + /** + * The scheduler thread of probing suspect node. + */ + private Thread probeSuspectNodesSchedulerThr; + + /** + * The thread pool of probing datanodes' rpc request. Sometimes the data node + * can hang and not respond to the client in a short time. And these node will + * filled with probe thread pool and block other normal node probing. + */ + private ExecutorService rpcThreadPool; + + private int socketTimeout; + + /** + * The type of probe. + */ + private enum ProbeType { + CHECK_DEAD, CHECK_SUSPECT + } + + /** + * The state of DeadNodeDetector. + */ + private enum State { + INIT, CHECK_DEAD, IDLE, ERROR + } + + /** + * Disabled start probe suspect/dead thread for the testing. + */ + private static volatile boolean disabledProbeThreadForTest = false; + + private State state; + + public DeadNodeDetector(String name, Configuration conf) { + this.conf = new Configuration(conf); + this.deadNodes = new ConcurrentHashMap<String, DatanodeInfo>(); + this.suspectAndDeadNodes = + new ConcurrentHashMap<DFSInputStream, HashSet<DatanodeInfo>>(); + this.name = name; + + deadNodeDetectInterval = conf.getLong( + DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY, + DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_DEFAULT); + suspectNodeDetectInterval = conf.getLong( + DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY, + DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_DEFAULT); + socketTimeout = + conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsConstants.READ_TIMEOUT); + maxDeadNodesProbeQueueLen = + conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY, + DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT); + maxSuspectNodesProbeQueueLen = + conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY, + DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_DEFAULT); + probeConnectionTimeoutMs = conf.getLong( + DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY, + DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_DEFAULT); + + this.deadNodesProbeQueue = + new ArrayBlockingQueue<DatanodeInfo>(maxDeadNodesProbeQueueLen); + this.suspectNodesProbeQueue = + new ArrayBlockingQueue<DatanodeInfo>(maxSuspectNodesProbeQueueLen); + + int deadNodeDetectDeadThreads = + conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_KEY, + DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_DEFAULT); + int suspectNodeDetectDeadThreads = conf.getInt( + DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_KEY, + DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_DEFAULT); + int rpcThreads = conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_KEY, + DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_DEFAULT); + probeDeadNodesThreadPool = Executors.newFixedThreadPool( + deadNodeDetectDeadThreads, new Daemon.DaemonFactory()); + probeSuspectNodesThreadPool = Executors.newFixedThreadPool( + suspectNodeDetectDeadThreads, new Daemon.DaemonFactory()); + rpcThreadPool = + Executors.newFixedThreadPool(rpcThreads, new Daemon.DaemonFactory()); + + if (!disabledProbeThreadForTest) { + startProbeScheduler(); + } + + LOG.info("Start dead node detector for DFSClient {}.", this.name); + state = State.INIT; + } + + @Override + public void run() { + while (!Thread.currentThread().isInterrupted()) { + clearAndGetDetectedDeadNodes(); + LOG.debug("Current detector state {}, the detected nodes: {}.", state, + deadNodes.values()); + switch (state) { + case INIT: + init(); + break; + case CHECK_DEAD: + checkDeadNodes(); + break; + case IDLE: + idle(); + break; + case ERROR: + try { + Thread.sleep(ERROR_SLEEP_MS); + } catch (InterruptedException e) { + LOG.debug("Got interrupted while DeadNodeDetector is error.", e); + Thread.currentThread().interrupt(); + } + return; + default: + break; + } + } + } + + @VisibleForTesting + static void setDisabledProbeThreadForTest( + boolean disabledProbeThreadForTest) { + DeadNodeDetector.disabledProbeThreadForTest = disabledProbeThreadForTest; + } + + /** + * Start probe dead node and suspect node thread. + */ + @VisibleForTesting + void startProbeScheduler() { + probeDeadNodesSchedulerThr = + new Thread(new ProbeScheduler(this, ProbeType.CHECK_DEAD)); + probeDeadNodesSchedulerThr.setDaemon(true); + probeDeadNodesSchedulerThr.start(); + + probeSuspectNodesSchedulerThr = + new Thread(new ProbeScheduler(this, ProbeType.CHECK_SUSPECT)); + probeSuspectNodesSchedulerThr.setDaemon(true); + probeSuspectNodesSchedulerThr.start(); + } + + /** + * Prode datanode by probe byte. + */ + private void scheduleProbe(ProbeType type) { + LOG.debug("Schedule probe datanode for probe type: {}.", type); + DatanodeInfo datanodeInfo = null; + if (type == ProbeType.CHECK_DEAD) { + while ((datanodeInfo = deadNodesProbeQueue.poll()) != null) { + if (probeInProg.containsKey(datanodeInfo.getDatanodeUuid())) { + LOG.debug("The datanode {} is already contained in probe queue, " + + "skip to add it.", datanodeInfo); + continue; + } + probeInProg.put(datanodeInfo.getDatanodeUuid(), datanodeInfo); + Probe probe = new Probe(this, datanodeInfo, ProbeType.CHECK_DEAD); + probeDeadNodesThreadPool.execute(probe); + } + } else if (type == ProbeType.CHECK_SUSPECT) { + while ((datanodeInfo = suspectNodesProbeQueue.poll()) != null) { + if (probeInProg.containsKey(datanodeInfo.getDatanodeUuid())) { + continue; + } + probeInProg.put(datanodeInfo.getDatanodeUuid(), datanodeInfo); + Probe probe = new Probe(this, datanodeInfo, ProbeType.CHECK_SUSPECT); + probeSuspectNodesThreadPool.execute(probe); + } + } + } + + /** + * Request the data node through rpc, and determine the data node status based + * on the returned result. + */ + class Probe implements Runnable { + private DeadNodeDetector deadNodeDetector; + private DatanodeInfo datanodeInfo; + private ProbeType type; + + Probe(DeadNodeDetector deadNodeDetector, DatanodeInfo datanodeInfo, + ProbeType type) { + this.deadNodeDetector = deadNodeDetector; + this.datanodeInfo = datanodeInfo; + this.type = type; + } + + public DatanodeInfo getDatanodeInfo() { + return datanodeInfo; + } + + public ProbeType getType() { + return type; + } + + @Override + public void run() { + LOG.debug("Check node: {}, type: {}.", datanodeInfo, type); + try { + final ClientDatanodeProtocol proxy = + DFSUtilClient.createClientDatanodeProtocolProxy(datanodeInfo, + deadNodeDetector.conf, socketTimeout, true); + + Future<DatanodeLocalInfo> future = + rpcThreadPool.submit(new Callable<DatanodeLocalInfo>() { + @Override + public DatanodeLocalInfo call() throws Exception { + return proxy.getDatanodeInfo(); + } + }); + + try { + future.get(probeConnectionTimeoutMs, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + LOG.error("Probe failed, datanode: {}, type: {}.", datanodeInfo, type, + e); + deadNodeDetector.probeCallBack(this, false); + return; + } finally { + future.cancel(true); + } + deadNodeDetector.probeCallBack(this, true); + return; + } catch (Exception e) { + LOG.error("Probe failed, datanode: {}, type: {}.", datanodeInfo, type, + e); + } + + deadNodeDetector.probeCallBack(this, false); + } + } + + /** + * Handle data node, according to probe result. When ProbeType is CHECK_DEAD, + * remove the datanode from DeadNodeDetector#deadNodes if probe success. + */ + private void probeCallBack(Probe probe, boolean success) { + LOG.debug("Probe datanode: {} result: {}, type: {}", + probe.getDatanodeInfo(), success, probe.getType()); + probeInProg.remove(probe.getDatanodeInfo().getDatanodeUuid()); + if (success) { + if (probe.getType() == ProbeType.CHECK_DEAD) { + LOG.info("Remove the node out from dead node list: {}.", + probe.getDatanodeInfo()); + removeDeadNode(probe.getDatanodeInfo()); + } else if (probe.getType() == ProbeType.CHECK_SUSPECT) { + LOG.debug("Remove the node out from suspect node list: {}.", + probe.getDatanodeInfo()); + removeNodeFromDeadNodeDetector(probe.getDatanodeInfo()); + } + } else { + if (probe.getType() == ProbeType.CHECK_SUSPECT) { + LOG.info("Add the node to dead node list: {}.", + probe.getDatanodeInfo()); + addToDead(probe.getDatanodeInfo()); + } + } + } + + /** + * Check dead node periodically. + */ + private void checkDeadNodes() { + Set<DatanodeInfo> datanodeInfos = clearAndGetDetectedDeadNodes(); + for (DatanodeInfo datanodeInfo : datanodeInfos) { + LOG.debug("Add dead node to check: {}.", datanodeInfo); + if (!deadNodesProbeQueue.offer(datanodeInfo)) { + LOG.debug("Skip to add dead node {} to check " + + "since the probe queue is full.", datanodeInfo); + break; + } + } + state = State.IDLE; + } + + private void idle() { + try { + Thread.sleep(IDLE_SLEEP_MS); + } catch (InterruptedException e) { + LOG.debug("Got interrupted while DeadNodeDetector is idle.", e); + Thread.currentThread().interrupt(); + } + + state = State.CHECK_DEAD; + } + + private void init() { + state = State.CHECK_DEAD; + } + + private void addToDead(DatanodeInfo datanodeInfo) { + deadNodes.put(datanodeInfo.getDatanodeUuid(), datanodeInfo); + } + + public boolean isDeadNode(DatanodeInfo datanodeInfo) { + return deadNodes.containsKey(datanodeInfo.getDatanodeUuid()); + } + + private void removeFromDead(DatanodeInfo datanodeInfo) { + deadNodes.remove(datanodeInfo.getDatanodeUuid()); + } + + public Queue<DatanodeInfo> getDeadNodesProbeQueue() { + return deadNodesProbeQueue; + } + + public Queue<DatanodeInfo> getSuspectNodesProbeQueue() { + return suspectNodesProbeQueue; + } + + /** + * Add datanode to suspectNodes and suspectAndDeadNodes. + */ + public synchronized void addNodeToDetect(DFSInputStream dfsInputStream, + DatanodeInfo datanodeInfo) { + HashSet<DatanodeInfo> datanodeInfos = + suspectAndDeadNodes.get(dfsInputStream); + if (datanodeInfos == null) { + datanodeInfos = new HashSet<DatanodeInfo>(); + datanodeInfos.add(datanodeInfo); + suspectAndDeadNodes.put(dfsInputStream, datanodeInfos); + } else { + datanodeInfos.add(datanodeInfo); + } + + addSuspectNodeToDetect(datanodeInfo); + } + + /** + * Add datanode to suspectNodes. + */ + private boolean addSuspectNodeToDetect(DatanodeInfo datanodeInfo) { + return suspectNodesProbeQueue.offer(datanodeInfo); + } + + /** + * Remove dead node which is not used by any DFSInputStream from deadNodes. + * @return new dead node shared by all DFSInputStreams. + */ + public synchronized Set<DatanodeInfo> clearAndGetDetectedDeadNodes() { + // remove the dead nodes who doesn't have any inputstream first + Set<DatanodeInfo> newDeadNodes = new HashSet<DatanodeInfo>(); + for (HashSet<DatanodeInfo> datanodeInfos : suspectAndDeadNodes.values()) { + newDeadNodes.addAll(datanodeInfos); + } + + for (DatanodeInfo datanodeInfo : deadNodes.values()) { + if (!newDeadNodes.contains(datanodeInfo)) { + deadNodes.remove(datanodeInfo.getDatanodeUuid()); + } + } + return new HashSet<>(deadNodes.values()); + } + + /** + * Remove suspect and dead node from suspectAndDeadNodes#dfsInputStream and + * local deadNodes. + */ + public synchronized void removeNodeFromDeadNodeDetector( + DFSInputStream dfsInputStream, DatanodeInfo datanodeInfo) { + Set<DatanodeInfo> datanodeInfos = suspectAndDeadNodes.get(dfsInputStream); + if (datanodeInfos != null) { + datanodeInfos.remove(datanodeInfo); + dfsInputStream.removeFromLocalDeadNodes(datanodeInfo); + if (datanodeInfos.isEmpty()) { + suspectAndDeadNodes.remove(dfsInputStream); + } + } + } + + /** + * Remove suspect and dead node from suspectAndDeadNodes#dfsInputStream and + * local deadNodes. + */ + private synchronized void removeNodeFromDeadNodeDetector( + DatanodeInfo datanodeInfo) { + for (Map.Entry<DFSInputStream, HashSet<DatanodeInfo>> entry : + suspectAndDeadNodes.entrySet()) { + Set<DatanodeInfo> datanodeInfos = entry.getValue(); + if (datanodeInfos.remove(datanodeInfo)) { + DFSInputStream dfsInputStream = entry.getKey(); + dfsInputStream.removeFromLocalDeadNodes(datanodeInfo); + if (datanodeInfos.isEmpty()) { + suspectAndDeadNodes.remove(dfsInputStream); + } + } + } + } + + /** + * Remove suspect and dead node from suspectAndDeadNodes#dfsInputStream and + * deadNodes. + */ + private void removeDeadNode(DatanodeInfo datanodeInfo) { + removeNodeFromDeadNodeDetector(datanodeInfo); + removeFromDead(datanodeInfo); + } + + private static void probeSleep(long time) { + try { + Thread.sleep(time); + } catch (InterruptedException e) { + LOG.debug("Got interrupted while probe is scheduling.", e); + Thread.currentThread().interrupt(); + return; + } + } + + /** + * Schedule probe data node. + */ + static class ProbeScheduler implements Runnable { + private DeadNodeDetector deadNodeDetector; + private ProbeType type; + + ProbeScheduler(DeadNodeDetector deadNodeDetector, ProbeType type) { + this.deadNodeDetector = deadNodeDetector; + this.type = type; + } + + @Override + public void run() { + while (!Thread.currentThread().isInterrupted()) { + deadNodeDetector.scheduleProbe(type); + if (type == ProbeType.CHECK_SUSPECT) { + probeSleep(deadNodeDetector.suspectNodeDetectInterval); + } else { + probeSleep(deadNodeDetector.deadNodeDetectInterval); + } + } + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index 8122693..09257b1 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -163,6 +163,45 @@ public interface HdfsClientConfigKeys { "dfs.datanode.hdfs-blocks-metadata.enabled"; boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = false; + String DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY = + "dfs.client.deadnode.detection.enabled"; + boolean DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_DEFAULT = false; + + String DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY = + "dfs.client.deadnode.detection.deadnode.queue.max"; + int DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT = 100; + + String DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY = + "dfs.client.deadnode.detection.suspectnode.queue.max"; + int DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_DEFAULT = 1000; + + String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY = + "dfs.client.deadnode.detection.probe.connection.timeout.ms"; + long DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_DEFAULT = + 20000; + + String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_KEY = + "dfs.client.deadnode.detection.probe.deadnode.threads"; + int DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_DEFAULT = 10; + + String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_KEY = + "dfs.client.deadnode.detection.probe.suspectnode.threads"; + int DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_DEFAULT = 10; + + String DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_KEY = + "dfs.client.deadnode.detection.rpc.threads"; + int DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_DEFAULT = 20; + + String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY = + "dfs.client.deadnode.detection.probe.deadnode.interval.ms"; + long DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_DEFAULT = + 60 * 1000; // 60s + + String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY = + "dfs.client.deadnode.detection.probe.suspectnode.interval.ms"; + long DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_DEFAULT = + 300; // 300ms + String DFS_DATANODE_KERBEROS_PRINCIPAL_KEY = "dfs.datanode.kerberos.principal"; String DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes"; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java index df2fd48..64121af 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -44,6 +44,8 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACH import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT; @@ -143,6 +145,8 @@ public class DfsClientConf { private final boolean dataTransferTcpNoDelay; private final long leaseHardLimitPeriod; + private final boolean deadNodeDetectionEnabled; + public DfsClientConf(Configuration conf) { // The hdfsTimeout is currently the same as the ipc timeout hdfsTimeout = Client.getRpcTimeout(conf); @@ -264,6 +268,9 @@ public class DfsClientConf { HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY, HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_DEFAULT); + deadNodeDetectionEnabled = + conf.getBoolean(DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY, + DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_DEFAULT); replicaAccessorBuilderClasses = loadReplicaAccessorBuilderClasses(conf); leaseHardLimitPeriod = @@ -591,6 +598,13 @@ public class DfsClientConf { } /** + * @return the deadNodeDetectionEnabled + */ + public boolean isDeadNodeDetectionEnabled() { + return deadNodeDetectionEnabled; + } + + /** * @return the replicaAccessorBuilderClasses */ public List<Class<? extends ReplicaAccessorBuilder>> diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 06f48f8..262d5be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2796,6 +2796,79 @@ </description> </property> + <property> + <name>dfs.client.deadnode.detection.enabled</name> + <value>false</value> + <description> + Set to true to enable dead node detection in client side. Then all the DFSInputStreams of the same client can + share the dead node information. + </description> + </property> + + <property> + <name>dfs.client.deadnode.detection.deadnode.queue.max</name> + <value>100</value> + <description> + The max queue size of probing dead node. + </description> + </property> + + <property> + <name>dfs.client.deadnode.detection.suspectnode.queue.max</name> + <value>1000</value> + <description> + The max queue size of probing suspect node. + </description> + </property> + + <property> + <name>dfs.client.deadnode.detection.probe.deadnode.threads</name> + <value>10</value> + <description> + The maximum number of threads to use for probing dead node. + </description> + </property> + + <property> + <name>dfs.client.deadnode.detection.probe.suspectnode.threads</name> + <value>10</value> + <description> + The maximum number of threads to use for probing suspect node. + </description> + </property> + + <property> + <name>dfs.client.deadnode.detection.rpc.threads</name> + <value>20</value> + <description> + The maximum number of threads to use for calling RPC call to recheck the liveness of dead node. + </description> + </property> + + <property> + <name>dfs.client.deadnode.detection.probe.deadnode.interval.ms</name> + <value>60000</value> + <description> + Interval time in milliseconds for probing dead node behavior. + </description> + </property> + + <property> + <name>dfs.client.deadnode.detection.probe.suspectnode.interval.ms</name> + <value>300</value> + <description> + Interval time in milliseconds for probing suspect node behavior. + </description> + </property> + + <property> + <name>dfs.client.deadnode.detection.probe.connection.timeout.ms</name> + <value>20000</value> + <description> + Connection timeout for probing dead node in milliseconds. + </description> + </property> + <property> <name>dfs.block.local-path-access.user</name> <value></value> diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java new file mode 100644 index 0000000..b24c692 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java @@ -0,0 +1,406 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import com.google.common.base.Supplier; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY; +import static org.junit.Assert.assertEquals; + +/** + * Tests for dead node detection in DFSClient. + */ +public class TestDeadNodeDetection { + + private MiniDFSCluster cluster; + private Configuration conf; + + @Before + public void setUp() { + cluster = null; + conf = new HdfsConfiguration(); + conf.setBoolean(DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY, true); + conf.setLong( + DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY, + 1000); + conf.setLong( + DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY, + 100); + conf.setLong( + DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY, + 1000); + conf.setInt(DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, 0); + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testDeadNodeDetectionInBackground() throws Exception { + conf.set(DFS_CLIENT_CONTEXT, "testDeadNodeDetectionInBackground"); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + cluster.waitActive(); + + FileSystem fs = cluster.getFileSystem(); + Path filePath = new Path("/testDetectDeadNodeInBackground"); + + // 256 bytes data chunk for writes + byte[] bytes = new byte[256]; + for (int index = 0; index < bytes.length; index++) { + bytes[index] = '0'; + } + + // File with a 512 bytes block size + FSDataOutputStream out = fs.create(filePath, true, 4096, (short) 3, 512); + + // Write a block to all 3 DNs (2x256bytes). + out.write(bytes); + out.write(bytes); + out.hflush(); + out.close(); + + // Remove three DNs, + cluster.stopDataNode(0); + cluster.stopDataNode(0); + cluster.stopDataNode(0); + + FSDataInputStream in = fs.open(filePath); + DFSInputStream din = (DFSInputStream) in.getWrappedStream(); + DFSClient dfsClient = din.getDFSClient(); + try { + try { + in.read(); + } catch (BlockMissingException e) { + } + + DefaultCoordination defaultCoordination = new DefaultCoordination(); + defaultCoordination.startWaitForDeadNodeThread(dfsClient, 3); + defaultCoordination.sync(); + + assertEquals(3, dfsClient.getDeadNodes(din).size()); + assertEquals(3, dfsClient.getClientContext().getDeadNodeDetector() + .clearAndGetDetectedDeadNodes().size()); + } finally { + in.close(); + fs.delete(filePath, true); + // check the dead node again here, the dead node is expected be removed + assertEquals(0, dfsClient.getDeadNodes(din).size()); + assertEquals(0, dfsClient.getClientContext().getDeadNodeDetector() + .clearAndGetDetectedDeadNodes().size()); + } + } + + @Test + public void testDeadNodeDetectionInMultipleDFSInputStream() + throws IOException { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + + FileSystem fs = cluster.getFileSystem(); + Path filePath = new Path("/testDeadNodeMultipleDFSInputStream"); + createFile(fs, filePath); + + String datanodeUuid = cluster.getDataNodes().get(0).getDatanodeUuid(); + FSDataInputStream in1 = fs.open(filePath); + DFSInputStream din1 = (DFSInputStream) in1.getWrappedStream(); + DFSClient dfsClient1 = din1.getDFSClient(); + cluster.stopDataNode(0); + + FSDataInputStream in2 = fs.open(filePath); + DFSInputStream din2 = null; + DFSClient dfsClient2 = null; + try { + try { + in1.read(); + } catch (BlockMissingException e) { + } + + din2 = (DFSInputStream) in2.getWrappedStream(); + dfsClient2 = din2.getDFSClient(); + + DefaultCoordination defaultCoordination = new DefaultCoordination(); + defaultCoordination.startWaitForDeadNodeThread(dfsClient2, 1); + defaultCoordination.sync(); + assertEquals(dfsClient1.toString(), dfsClient2.toString()); + assertEquals(1, dfsClient1.getDeadNodes(din1).size()); + assertEquals(1, dfsClient2.getDeadNodes(din2).size()); + assertEquals(1, dfsClient1.getClientContext().getDeadNodeDetector() + .clearAndGetDetectedDeadNodes().size()); + assertEquals(1, dfsClient2.getClientContext().getDeadNodeDetector() + .clearAndGetDetectedDeadNodes().size()); + // check the dn uuid of dead node to see if its expected dead node + assertEquals(datanodeUuid, + ((DatanodeInfo) dfsClient1.getClientContext().getDeadNodeDetector() + .clearAndGetDetectedDeadNodes().toArray()[0]).getDatanodeUuid()); + assertEquals(datanodeUuid, + ((DatanodeInfo) dfsClient2.getClientContext().getDeadNodeDetector() + .clearAndGetDetectedDeadNodes().toArray()[0]).getDatanodeUuid()); + } finally { + in1.close(); + in2.close(); + deleteFile(fs, filePath); + // check the dead node again here, the dead node is expected be removed + assertEquals(0, dfsClient1.getDeadNodes(din1).size()); + assertEquals(0, dfsClient2.getDeadNodes(din2).size()); + assertEquals(0, dfsClient1.getClientContext().getDeadNodeDetector() + .clearAndGetDetectedDeadNodes().size()); + assertEquals(0, dfsClient2.getClientContext().getDeadNodeDetector() + .clearAndGetDetectedDeadNodes().size()); + } + } + + @Test + public void testDeadNodeDetectionDeadNodeRecovery() throws Exception { + // prevent interrupt deadNodeDetectorThr in cluster.waitActive() + DFSClient.setDisabledStopDeadNodeDetectorThreadForTest(true); + conf.set(DFS_CLIENT_CONTEXT, "testDeadNodeDetectionDeadNodeRecovery"); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + cluster.waitActive(); + + DFSClient.setDisabledStopDeadNodeDetectorThreadForTest(false); + FileSystem fs = cluster.getFileSystem(); + Path filePath = new Path("/testDeadNodeDetectionDeadNodeRecovery"); + createFile(fs, filePath); + + // Remove three DNs, + MiniDFSCluster.DataNodeProperties one = cluster.stopDataNode(0); + cluster.stopDataNode(0); + cluster.stopDataNode(0); + + FSDataInputStream in = fs.open(filePath); + DFSInputStream din = (DFSInputStream) in.getWrappedStream(); + DFSClient dfsClient = din.getDFSClient(); + try { + try { + in.read(); + } catch (BlockMissingException e) { + } + DefaultCoordination defaultCoordination = new DefaultCoordination(); + defaultCoordination.startWaitForDeadNodeThread(dfsClient, 3); + defaultCoordination.sync(); + assertEquals(3, dfsClient.getDeadNodes(din).size()); + assertEquals(3, dfsClient.getClientContext().getDeadNodeDetector() + .clearAndGetDetectedDeadNodes().size()); + + cluster.restartDataNode(one, true); + + defaultCoordination = new DefaultCoordination(); + defaultCoordination.startWaitForDeadNodeThread(dfsClient, 2); + defaultCoordination.sync(); + assertEquals(2, dfsClient.getDeadNodes(din).size()); + assertEquals(2, dfsClient.getClientContext().getDeadNodeDetector() + .clearAndGetDetectedDeadNodes().size()); + } finally { + in.close(); + deleteFile(fs, filePath); + assertEquals(0, dfsClient.getDeadNodes(din).size()); + assertEquals(0, dfsClient.getClientContext().getDeadNodeDetector() + .clearAndGetDetectedDeadNodes().size()); + } + } + + @Test + public void testDeadNodeDetectionMaxDeadNodesProbeQueue() throws Exception { + conf.setInt(DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY, 1); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + cluster.waitActive(); + + FileSystem fs = cluster.getFileSystem(); + Path filePath = new Path("/testDeadNodeDetectionMaxDeadNodesProbeQueue"); + createFile(fs, filePath); + + // Remove three DNs, + cluster.stopDataNode(0); + cluster.stopDataNode(0); + cluster.stopDataNode(0); + + FSDataInputStream in = fs.open(filePath); + DFSInputStream din = (DFSInputStream) in.getWrappedStream(); + DFSClient dfsClient = din.getDFSClient(); + try { + try { + in.read(); + } catch (BlockMissingException e) { + } + + Thread.sleep(1500); + Assert.assertTrue((dfsClient.getClientContext().getDeadNodeDetector() + .getDeadNodesProbeQueue().size() + + dfsClient.getDeadNodes(din).size()) <= 4); + } finally { + in.close(); + deleteFile(fs, filePath); + } + } + + @Test + public void testDeadNodeDetectionSuspectNode() throws Exception { + conf.setInt(DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY, 1); + DeadNodeDetector.setDisabledProbeThreadForTest(true); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + + FileSystem fs = cluster.getFileSystem(); + Path filePath = new Path("/testDeadNodeDetectionSuspectNode"); + createFile(fs, filePath); + + MiniDFSCluster.DataNodeProperties one = cluster.stopDataNode(0); + + FSDataInputStream in = fs.open(filePath); + DFSInputStream din = (DFSInputStream) in.getWrappedStream(); + DFSClient dfsClient = din.getDFSClient(); + DeadNodeDetector deadNodeDetector = + dfsClient.getClientContext().getDeadNodeDetector(); + try { + try { + in.read(); + } catch (BlockMissingException e) { + } + waitForSuspectNode(din.getDFSClient()); + cluster.restartDataNode(one, true); + Assert.assertEquals(1, + deadNodeDetector.getSuspectNodesProbeQueue().size()); + Assert.assertEquals(0, + deadNodeDetector.clearAndGetDetectedDeadNodes().size()); + deadNodeDetector.startProbeScheduler(); + Thread.sleep(1000); + Assert.assertEquals(0, + deadNodeDetector.getSuspectNodesProbeQueue().size()); + Assert.assertEquals(0, + deadNodeDetector.clearAndGetDetectedDeadNodes().size()); + } finally { + in.close(); + deleteFile(fs, filePath); + assertEquals(0, dfsClient.getDeadNodes(din).size()); + assertEquals(0, dfsClient.getClientContext().getDeadNodeDetector() + .clearAndGetDetectedDeadNodes().size()); + // reset disabledProbeThreadForTest + DeadNodeDetector.setDisabledProbeThreadForTest(false); + } + } + + private void createFile(FileSystem fs, Path filePath) throws IOException { + FSDataOutputStream out = null; + try { + // 256 bytes data chunk for writes + byte[] bytes = new byte[256]; + for (int index = 0; index < bytes.length; index++) { + bytes[index] = '0'; + } + + // File with a 512 bytes block size + out = fs.create(filePath, true, 4096, (short) 3, 512); + + // Write a block to all 3 DNs (2x256bytes). + out.write(bytes); + out.write(bytes); + out.hflush(); + + } finally { + out.close(); + } + } + + private void deleteFile(FileSystem fs, Path filePath) throws IOException { + fs.delete(filePath, true); + } + + private void waitForSuspectNode(final DFSClient dfsClient) throws Exception { + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + try { + if (dfsClient.getClientContext().getDeadNodeDetector() + .getSuspectNodesProbeQueue().size() > 0) { + return true; + } + } catch (Exception e) { + // Ignore the exception + } + + return false; + } + }, 500, 5000); + } + + class DefaultCoordination { + private Queue<Object> queue = new LinkedBlockingQueue<Object>(1); + + public boolean addToQueue() { + return queue.offer(new Object()); + } + + public Object removeFromQueue() { + return queue.poll(); + } + + public void sync() { + while (removeFromQueue() == null) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + } + } + + private void startWaitForDeadNodeThread(final DFSClient dfsClient, + final int size) { + new Thread(new Runnable() { + @Override + public void run() { + DeadNodeDetector deadNodeDetector = + dfsClient.getClientContext().getDeadNodeDetector(); + while (deadNodeDetector.clearAndGetDetectedDeadNodes() + .size() != size) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + } + addToQueue(); + } + }).start(); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org