Author: stack Date: Tue Nov 24 05:56:24 2009 New Revision: 883596 URL: http://svn.apache.org/viewvc?rev=883596&view=rev Log: HBASE-630 In DFSOutputStream.nextBlockOutputStream(), the client can exclude specific datanodes when locating the next block
Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java Modified: hadoop/hdfs/trunk/CHANGES.txt hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java Modified: hadoop/hdfs/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=883596&r1=883595&r2=883596&view=diff ============================================================================== --- hadoop/hdfs/trunk/CHANGES.txt (original) +++ hadoop/hdfs/trunk/CHANGES.txt Tue Nov 24 05:56:24 2009 @@ -28,6 +28,10 @@ HDFS-699. Add unit tests framework (Mockito) (cos, Eli Collins) + HDFS-630 In DFSOutputStream.nextBlockOutputStream(), the client can + exclude specific datanodes when locating the next block + (Cosmin Lehene via Stack) + OPTIMIZATIONS BUG FIXES Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=883596&r1=883595&r2=883596&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Tue Nov 24 05:56:24 2009 @@ -2585,6 +2585,7 @@ private DataInputStream blockReplyStream; private ResponseProcessor response = null; private volatile DatanodeInfo[] nodes = null; // list of targets for current block + private ArrayList<DatanodeInfo> excludedNodes = new ArrayList<DatanodeInfo>(); volatile boolean hasError = false; volatile int errorIndex = -1; private BlockConstructionStage stage; // block construction stage @@ -3109,7 +3110,9 @@ success = false; long startTime = System.currentTimeMillis(); - lb = locateFollowingBlock(startTime); + DatanodeInfo[] w = excludedNodes.toArray( + new DatanodeInfo[excludedNodes.size()]); + lb = locateFollowingBlock(startTime, w.length > 0 ? w : null); block = lb.getBlock(); block.setNumBytes(0); accessToken = lb.getAccessToken(); @@ -3125,12 +3128,16 @@ namenode.abandonBlock(block, src, clientName); block = null; + LOG.info("Excluding datanode " + nodes[errorIndex]); + excludedNodes.add(nodes[errorIndex]); + // Connection failed. Let's wait a little bit and retry retry = true; try { if (System.currentTimeMillis() - startTime > 5000) { LOG.info("Waiting to find target node: " + nodes[0].getName()); } + //TODO fix this timout. Extract it o a constant, maybe make it available from conf Thread.sleep(6000); } catch (InterruptedException iex) { } @@ -3228,14 +3235,15 @@ } } - private LocatedBlock locateFollowingBlock(long start) throws IOException { + private LocatedBlock locateFollowingBlock(long start, + DatanodeInfo[] excludedNodes) throws IOException { int retries = conf.getInt("dfs.client.block.write.locateFollowingBlock.retries", 5); long sleeptime = 400; while (true) { long localstart = System.currentTimeMillis(); while (true) { try { - return namenode.addBlock(src, clientName, block); + return namenode.addBlock(src, clientName, block, excludedNodes); } catch (RemoteException e) { IOException ue = e.unwrapRemoteException(FileNotFoundException.class, Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=883596&r1=883595&r2=883596&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Tue Nov 24 05:56:24 2009 @@ -198,6 +198,9 @@ public LocatedBlock addBlock(String src, String clientName, Block previous) throws IOException; + public LocatedBlock addBlock(String src, String clientName, + Block previous, DatanodeInfo[] excludedNode) throws IOException; + /** * The client is done writing data to the given filename, and would * like to complete it. Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java?rev=883596&r1=883595&r2=883596&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java Tue Nov 24 05:56:24 2009 @@ -21,6 +21,7 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.net.Node; import org.apache.hadoop.util.ReflectionUtils; import java.util.*; @@ -60,6 +61,26 @@ * choose <i>numOfReplicas</i> data nodes for <i>writer</i> * to re-replicate a block with size <i>blocksize</i> * If not, return as many as we can. + * + * @param srcPath the file to which this chooseTargets is being invoked. + * @param numOfReplicas additional number of replicas wanted. + * @param writer the writer's machine, null if not in the cluster. + * @param chosenNodes datanodes that have been chosen as targets. + * @param excludedNodes: datanodes that should not be considered as targets. + * @param blocksize size of the data to be written. + * @return array of DatanodeDescriptor instances chosen as target + * and sorted as a pipeline. + */ + abstract DatanodeDescriptor[] chooseTarget(String srcPath, + int numOfReplicas, + DatanodeDescriptor writer, + List<DatanodeDescriptor> chosenNodes, + HashMap<Node, Node> excludedNodes, + long blocksize); + + /** + * choose <i>numOfReplicas</i> data nodes for <i>writer</i> + * If not, return as many as we can. * The base implemenatation extracts the pathname of the file from the * specified srcInode, but this could be a costly operation depending on the * file system implementation. Concrete implementations of this class should @@ -167,4 +188,29 @@ new ArrayList<DatanodeDescriptor>(), blocksize); } + + /** + * choose <i>numOfReplicas</i> nodes for <i>writer</i> to replicate + * a block with size <i>blocksize</i> + * If not, return as many as we can. + * + * @param srcPath a string representation of the file for which chooseTarget is invoked + * @param numOfReplicas number of replicas wanted. + * @param writer the writer's machine, null if not in the cluster. + * @param blocksize size of the data to be written. + * @param excludedNodes: datanodes that should not be considered as targets. + * @return array of DatanodeDescriptor instances chosen as targets + * and sorted as a pipeline. + */ + DatanodeDescriptor[] chooseTarget(String srcPath, + int numOfReplicas, + DatanodeDescriptor writer, + HashMap<Node, Node> excludedNodes, + long blocksize) { + return chooseTarget(srcPath, numOfReplicas, writer, + new ArrayList<DatanodeDescriptor>(), + excludedNodes, + blocksize); + } + } Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java?rev=883596&r1=883595&r2=883596&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java Tue Nov 24 05:56:24 2009 @@ -68,6 +68,17 @@ } /** {...@inheritdoc} */ + public DatanodeDescriptor[] chooseTarget(String srcPath, + int numOfReplicas, + DatanodeDescriptor writer, + List<DatanodeDescriptor> chosenNodes, + HashMap<Node, Node> excludedNodes, + long blocksize) { + return chooseTarget(numOfReplicas, writer, chosenNodes, excludedNodes, blocksize); + } + + + /** {...@inheritdoc} */ @Override public DatanodeDescriptor[] chooseTarget(FSInodeInfo srcInode, int numOfReplicas, Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=883596&r1=883595&r2=883596&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Nov 24 05:56:24 2009 @@ -40,6 +40,7 @@ import org.apache.hadoop.net.CachedDNSToSwitchMapping; import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.net.Node; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.ScriptBasedMapping; import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease; @@ -1317,10 +1318,18 @@ * are replicated. Will return an empty 2-elt array if we want the * client to "try again later". */ - public LocatedBlock getAdditionalBlock(String src, + public LocatedBlock getAdditionalBlock(String src, String clientName, Block previous ) throws IOException { + return getAdditionalBlock(src, clientName, previous, null); + } + + public LocatedBlock getAdditionalBlock(String src, + String clientName, + Block previous, + HashMap<Node, Node> excludedNodes + ) throws IOException { long fileLength, blockSize; int replication; DatanodeDescriptor clientNode = null; @@ -1356,7 +1365,7 @@ // choose targets for the new block to be allocated. DatanodeDescriptor targets[] = blockManager.replicator.chooseTarget( - src, replication, clientNode, blockSize); + src, replication, clientNode, excludedNodes, blockSize); if (targets.length < blockManager.minReplication) { throw new IOException("File " + src + " could only be replicated to " + targets.length + " nodes, instead of " + Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=883596&r1=883595&r2=883596&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Tue Nov 24 05:56:24 2009 @@ -22,6 +22,7 @@ import java.net.InetSocketAddress; import java.net.URI; import java.util.Collection; +import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -73,6 +74,7 @@ import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.net.Node; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; @@ -612,14 +614,30 @@ namesystem.setOwner(src, username, groupname); } - /** - */ + + @Override public LocatedBlock addBlock(String src, String clientName, Block previous) throws IOException { + return addBlock(src, clientName, previous, null); + } + + @Override + public LocatedBlock addBlock(String src, + String clientName, + Block previous, + DatanodeInfo[] excludedNodes + ) throws IOException { stateChangeLog.debug("*BLOCK* NameNode.addBlock: file " +src+" for "+clientName); + HashMap<Node, Node> excludedNodesSet = null; + if (excludedNodes != null) { + excludedNodesSet = new HashMap<Node, Node>(excludedNodes.length); + for (Node node:excludedNodes) { + excludedNodesSet.put(node, node); + } + } LocatedBlock locatedBlock = - namesystem.getAdditionalBlock(src, clientName, previous); + namesystem.getAdditionalBlock(src, clientName, previous, excludedNodesSet); if (locatedBlock != null) myMetrics.numAddBlockOps.inc(); return locatedBlock; Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java?rev=883596&view=auto ============================================================================== --- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java (added) +++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java Tue Nov 24 05:56:24 2009 @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.io.OutputStream; + +import junit.framework.TestCase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + + +/** + * These tests make sure that DFSClient retries fetching data from DFS + * properly in case of errors. + */ +public class TestDFSClientExcludedNodes extends TestCase { + public void testExcludedNodes() throws IOException + { + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null); + FileSystem fs = cluster.getFileSystem(); + Path filePath = new Path("/testExcludedNodes"); + + // kill a datanode + cluster.stopDataNode(AppendTestUtil.nextInt(3)); + OutputStream out = fs.create(filePath, true, 4096); + out.write(20); + + try { + out.close(); + } catch (Exception e) { + fail("DataNode failure should not result in a block abort: \n" + e.getMessage()); + } + } + +} + + + \ No newline at end of file Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=883596&r1=883595&r2=883596&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original) +++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java Tue Nov 24 05:56:24 2009 @@ -137,8 +137,17 @@ return versionID; } - public LocatedBlock addBlock(String src, String clientName, Block previous) - throws IOException + public LocatedBlock addBlock(String src, String clientName, + Block previous) throws IOException { + + return addBlock(src, clientName, previous, null); + } + + public LocatedBlock addBlock(String src, + String clientName, + Block previous, + DatanodeInfo[] excludedNode + ) throws IOException { num_calls++; if (num_calls > num_calls_allowed) {