Author: stack Date: Tue Jan 26 05:50:41 2010 New Revision: 903089 URL: http://svn.apache.org/viewvc?rev=903089&view=rev Log: HDFS-630 In DFSOutputStream.nextBlockOutputStream(), the client can exclude specific datanodes when locating the next block
Modified: hadoop/hdfs/branches/branch-0.21/CHANGES.txt hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/DFSClient.java hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java Modified: hadoop/hdfs/branches/branch-0.21/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/CHANGES.txt?rev=903089&r1=903088&r2=903089&view=diff ============================================================================== --- hadoop/hdfs/branches/branch-0.21/CHANGES.txt (original) +++ hadoop/hdfs/branches/branch-0.21/CHANGES.txt Tue Jan 26 05:50:41 2010 @@ -542,6 +542,10 @@ HDFS-822. Appends to already-finalized blocks can rename across volumes. (hairong) + HDFS-630. In DFSOutputStream.nextBlockOutputStream(), the client can + exclude specific datanodes when locating the next block. + (Cosmin Lehene via Stack) + Release 0.20.2 - Unreleased IMPROVEMENTS Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/DFSClient.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=903089&r1=903088&r2=903089&view=diff ============================================================================== --- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/DFSClient.java (original) +++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/DFSClient.java Tue Jan 26 05:50:41 2010 @@ -2582,6 +2582,7 @@ private DataInputStream blockReplyStream; private ResponseProcessor response = null; private volatile DatanodeInfo[] nodes = null; // list of targets for current block + private ArrayList<DatanodeInfo> excludedNodes = new ArrayList<DatanodeInfo>(); volatile boolean hasError = false; volatile int errorIndex = -1; private BlockConstructionStage stage; // block construction stage @@ -3114,7 +3115,9 @@ success = false; long startTime = System.currentTimeMillis(); - lb = locateFollowingBlock(startTime); + DatanodeInfo[] w = excludedNodes.toArray( + new DatanodeInfo[excludedNodes.size()]); + lb = locateFollowingBlock(startTime, w.length > 0 ? w : null); block = lb.getBlock(); block.setNumBytes(0); accessToken = lb.getAccessToken(); @@ -3130,12 +3133,16 @@ namenode.abandonBlock(block, src, clientName); block = null; + LOG.debug("Excluding datanode " + nodes[errorIndex]); + excludedNodes.add(nodes[errorIndex]); + // Connection failed. Let's wait a little bit and retry retry = true; try { if (System.currentTimeMillis() - startTime > 5000) { LOG.info("Waiting to find target node: " + nodes[0].getName()); } + //TODO fix this timout. Extract it o a constant, maybe make it available from conf Thread.sleep(6000); } catch (InterruptedException iex) { } @@ -3233,14 +3240,15 @@ } } - private LocatedBlock locateFollowingBlock(long start) throws IOException { + private LocatedBlock locateFollowingBlock(long start, + DatanodeInfo[] excludedNodes) throws IOException { int retries = conf.getInt("dfs.client.block.write.locateFollowingBlock.retries", 5); long sleeptime = 400; while (true) { long localstart = System.currentTimeMillis(); while (true) { try { - return namenode.addBlock(src, clientName, block); + return namenode.addBlock(src, clientName, block, excludedNodes); } catch (RemoteException e) { IOException ue = e.unwrapRemoteException(FileNotFoundException.class, Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=903089&r1=903088&r2=903089&view=diff ============================================================================== --- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original) +++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Tue Jan 26 05:50:41 2010 @@ -44,9 +44,9 @@ * Compared to the previous version the following changes have been introduced: * (Only the latest change is reflected. * The log of historical changes can be retrieved from the svn). - * 50: change LocatedBlocks to include last block information. + * 53: changed addBlock to include a list of excluded datanodes. */ - public static final long versionID = 50L; + public static final long versionID = 53L; /////////////////////////////////////// // File contents @@ -94,7 +94,7 @@ * or explicitly as a result of lease expiration. * <p> * Blocks have a maximum size. Clients that intend to create - * multi-block files must also use {...@link #addBlock(String, String, Block)}. + * multi-block files must also use {...@link #addBlock(String, String, Block, DatanodeInfo[])}. * * @param src path of the file being created. * @param masked masked permission. @@ -190,11 +190,16 @@ * addBlock() also commits the previous block by reporting * to the name-node the actual generation stamp and the length * of the block that the client has transmitted to data-nodes. - * + * + * @param src the file being created + * @param clientName the name of the client that adds the block + * @param previous previous block + * @param excludedNodes a list of nodes that should not be + * allocated for the current block * @return LocatedBlock allocated block information. */ public LocatedBlock addBlock(String src, String clientName, - Block previous) throws IOException; + Block previous, DatanodeInfo[] excludedNodes) throws IOException; /** * The client is done writing data to the given filename, and would Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java?rev=903089&r1=903088&r2=903089&view=diff ============================================================================== --- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java (original) +++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java Tue Jan 26 05:50:41 2010 @@ -21,6 +21,7 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.net.Node; import org.apache.hadoop.util.ReflectionUtils; import java.util.*; @@ -60,6 +61,26 @@ * choose <i>numOfReplicas</i> data nodes for <i>writer</i> * to re-replicate a block with size <i>blocksize</i> * If not, return as many as we can. + * + * @param srcPath the file to which this chooseTargets is being invoked. + * @param numOfReplicas additional number of replicas wanted. + * @param writer the writer's machine, null if not in the cluster. + * @param chosenNodes datanodes that have been chosen as targets. + * @param excludedNodes: datanodes that should not be considered as targets. + * @param blocksize size of the data to be written. + * @return array of DatanodeDescriptor instances chosen as target + * and sorted as a pipeline. + */ + abstract DatanodeDescriptor[] chooseTarget(String srcPath, + int numOfReplicas, + DatanodeDescriptor writer, + List<DatanodeDescriptor> chosenNodes, + HashMap<Node, Node> excludedNodes, + long blocksize); + + /** + * choose <i>numOfReplicas</i> data nodes for <i>writer</i> + * If not, return as many as we can. * The base implemenatation extracts the pathname of the file from the * specified srcInode, but this could be a costly operation depending on the * file system implementation. Concrete implementations of this class should @@ -167,4 +188,29 @@ new ArrayList<DatanodeDescriptor>(), blocksize); } + + /** + * choose <i>numOfReplicas</i> nodes for <i>writer</i> to replicate + * a block with size <i>blocksize</i> + * If not, return as many as we can. + * + * @param srcPath a string representation of the file for which chooseTarget is invoked + * @param numOfReplicas number of replicas wanted. + * @param writer the writer's machine, null if not in the cluster. + * @param blocksize size of the data to be written. + * @param excludedNodes: datanodes that should not be considered as targets. + * @return array of DatanodeDescriptor instances chosen as targets + * and sorted as a pipeline. + */ + DatanodeDescriptor[] chooseTarget(String srcPath, + int numOfReplicas, + DatanodeDescriptor writer, + HashMap<Node, Node> excludedNodes, + long blocksize) { + return chooseTarget(srcPath, numOfReplicas, writer, + new ArrayList<DatanodeDescriptor>(), + excludedNodes, + blocksize); + } + } Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java?rev=903089&r1=903088&r2=903089&view=diff ============================================================================== --- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java (original) +++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java Tue Jan 26 05:50:41 2010 @@ -68,6 +68,17 @@ } /** {...@inheritdoc} */ + public DatanodeDescriptor[] chooseTarget(String srcPath, + int numOfReplicas, + DatanodeDescriptor writer, + List<DatanodeDescriptor> chosenNodes, + HashMap<Node, Node> excludedNodes, + long blocksize) { + return chooseTarget(numOfReplicas, writer, chosenNodes, excludedNodes, blocksize); + } + + + /** {...@inheritdoc} */ @Override public DatanodeDescriptor[] chooseTarget(FSInodeInfo srcInode, int numOfReplicas, Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=903089&r1=903088&r2=903089&view=diff ============================================================================== --- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original) +++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Jan 26 05:50:41 2010 @@ -41,6 +41,7 @@ import org.apache.hadoop.net.CachedDNSToSwitchMapping; import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.net.Node; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.ScriptBasedMapping; import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease; @@ -1162,9 +1163,10 @@ * are replicated. Will return an empty 2-elt array if we want the * client to "try again later". */ - public LocatedBlock getAdditionalBlock(String src, + public LocatedBlock getAdditionalBlock(String src, String clientName, - Block previous + Block previous, + HashMap<Node, Node> excludedNodes ) throws IOException { long fileLength, blockSize; int replication; @@ -1201,7 +1203,7 @@ // choose targets for the new block to be allocated. DatanodeDescriptor targets[] = blockManager.replicator.chooseTarget( - src, replication, clientNode, blockSize); + src, replication, clientNode, excludedNodes, blockSize); if (targets.length < blockManager.minReplication) { throw new IOException("File " + src + " could only be replicated to " + targets.length + " nodes, instead of " + Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=903089&r1=903088&r2=903089&view=diff ============================================================================== --- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original) +++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Tue Jan 26 05:50:41 2010 @@ -22,6 +22,7 @@ import java.net.InetSocketAddress; import java.net.URI; import java.util.Collection; +import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -72,6 +73,7 @@ import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.net.Node; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; @@ -617,14 +619,23 @@ namesystem.setOwner(src, username, groupname); } - /** - */ - public LocatedBlock addBlock(String src, String clientName, - Block previous) throws IOException { + @Override + public LocatedBlock addBlock(String src, + String clientName, + Block previous, + DatanodeInfo[] excludedNodes + ) throws IOException { stateChangeLog.debug("*BLOCK* NameNode.addBlock: file " +src+" for "+clientName); + HashMap<Node, Node> excludedNodesSet = null; + if (excludedNodes != null) { + excludedNodesSet = new HashMap<Node, Node>(excludedNodes.length); + for (Node node:excludedNodes) { + excludedNodesSet.put(node, node); + } + } LocatedBlock locatedBlock = - namesystem.getAdditionalBlock(src, clientName, previous); + namesystem.getAdditionalBlock(src, clientName, previous, excludedNodesSet); if (locatedBlock != null) myMetrics.numAddBlockOps.inc(); return locatedBlock; Modified: hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=903089&r1=903088&r2=903089&view=diff ============================================================================== --- hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original) +++ hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java Tue Jan 26 05:50:41 2010 @@ -136,8 +136,17 @@ return versionID; } - public LocatedBlock addBlock(String src, String clientName, Block previous) - throws IOException + public LocatedBlock addBlock(String src, String clientName, + Block previous) throws IOException { + + return addBlock(src, clientName, previous, null); + } + + public LocatedBlock addBlock(String src, + String clientName, + Block previous, + DatanodeInfo[] excludedNode + ) throws IOException { num_calls++; if (num_calls > num_calls_allowed) { Modified: hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java?rev=903089&r1=903088&r2=903089&view=diff ============================================================================== --- hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java (original) +++ hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java Tue Jan 26 05:50:41 2010 @@ -399,7 +399,7 @@ // add one block to the file LocatedBlock location = client.getNamenode().addBlock(file1.toString(), - client.clientName, null); + client.clientName, null, null); System.out.println("testFileCreationError2: " + "Added block " + location.getBlock()); Modified: hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java?rev=903089&r1=903088&r2=903089&view=diff ============================================================================== --- hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java (original) +++ hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java Tue Jan 26 05:50:41 2010 @@ -911,7 +911,7 @@ throws IOException { Block prevBlock = null; for(int jdx = 0; jdx < blocksPerFile; jdx++) { - LocatedBlock loc = nameNode.addBlock(fileName, clientName, prevBlock); + LocatedBlock loc = nameNode.addBlock(fileName, clientName, prevBlock, null); prevBlock = loc.getBlock(); for(DatanodeInfo dnInfo : loc.getLocations()) { int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getName());