Author: cutting Date: Wed Feb 28 11:36:33 2007 New Revision: 512924 URL: http://svn.apache.org/viewvc?view=rev&rev=512924 Log: HADOOP-972. Optimize HDFS's rack-aware block placement algorithm. Contributed by Hairong.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplicationPolicy.java lucene/hadoop/trunk/src/test/org/apache/hadoop/net/TestNetworkTopology.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=512924&r1=512923&r2=512924 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Wed Feb 28 11:36:33 2007 @@ -156,6 +156,9 @@ 46. HADOOP-1044. Fix HDFS's TestDecommission to not spuriously fail. (Wendy Chien via cutting) +47. HADOOP-972. Optimize HDFS's rack-aware block placement algorithm. + (Hairong Kuang via cutting) + Release 0.11.2 - 2007-02-16 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=512924&r1=512923&r2=512924 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Wed Feb 28 11:36:33 2007 @@ -203,7 +203,7 @@ // datanode networktoplogy NetworkTopology clusterMap = new NetworkTopology(); // for block replicas placement - Replicator replicator = new Replicator(); + ReplicationTargetChooser replicator = new ReplicationTargetChooser(); private HostsFileReader hostsReader; private Daemon dnthread = null; @@ -2691,7 +2691,7 @@ * @author hairong * */ - class Replicator { + class ReplicationTargetChooser { private class NotEnoughReplicasException extends Exception { NotEnoughReplicasException( String msg ) { super( msg ); @@ -2722,21 +2722,19 @@ new ArrayList<DatanodeDescriptor>(), excludedNodes, blocksize); } - /* - * re-replicate <i>numOfReplicas</i> - /** - * choose <i>numOfReplicas</i> data nodes for <i>writer</i> - * to re-replicate a block with size <i>blocksize</i> - * If not, return as many as we can. - * - * @param numOfReplicas: additional number of replicas wanted. - * @param writer: the writer's machine, null if not in the cluster. - * @param choosenNodes: datanodes that have been choosen as targets. - * @param excludedNodes: datanodesthat should not be considered targets. - * @param blocksize: size of the data to be written. - * @return array of DatanodeDescriptor instances chosen as target - * and sorted as a pipeline. - */ + /** + * choose <i>numOfReplicas</i> data nodes for <i>writer</i> + * to re-replicate a block with size <i>blocksize</i> + * If not, return as many as we can. + * + * @param numOfReplicas: additional number of replicas wanted. + * @param writer: the writer's machine, null if not in the cluster. + * @param choosenNodes: datanodes that have been choosen as targets. + * @param excludedNodes: datanodesthat should not be considered targets. + * @param blocksize: size of the data to be written. + * @return array of DatanodeDescriptor instances chosen as target + * and sorted as a pipeline. + */ DatanodeDescriptor[] chooseTarget(int numOfReplicas, DatanodeDescriptor writer, List<DatanodeDescriptor> choosenNodes, @@ -2767,7 +2765,6 @@ writer=null; DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer, - clusterMap.getLeaves(NodeBase.ROOT), excludedNodes, blocksize, maxNodesPerRack, results ); results.removeAll(choosenNodes); @@ -2776,16 +2773,17 @@ return getPipeline((writer==null)?localNode:writer, results); } - /* choose <i>numOfReplicas</i> from <i>clusterNodes</i> */ + /* choose <i>numOfReplicas</i> from all data nodes */ private DatanodeDescriptor chooseTarget(int numOfReplicas, DatanodeDescriptor writer, - DatanodeDescriptor[] clusterNodes, List<DatanodeDescriptor> excludedNodes, long blocksize, int maxNodesPerRack, List<DatanodeDescriptor> results) { - if( numOfReplicas == 0 ) return writer; + if( numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0 ) { + return writer; + } int numOfResults = results.size(); if(writer == null && (numOfResults==1 || numOfResults==2) ) { @@ -2795,28 +2793,28 @@ try { switch( numOfResults ) { case 0: - writer = chooseLocalNode(writer, clusterNodes, excludedNodes, + writer = chooseLocalNode(writer, excludedNodes, blocksize, maxNodesPerRack, results); if(--numOfReplicas == 0) break; case 1: - chooseRemoteRack(1, writer, clusterNodes, excludedNodes, + chooseRemoteRack(1, writer, excludedNodes, blocksize, maxNodesPerRack, results); if(--numOfReplicas == 0) break; case 2: if(clusterMap.isOnSameRack(results.get(0), results.get(1))) { - chooseRemoteRack(1, writer, clusterNodes, excludedNodes, + chooseRemoteRack(1, writer, excludedNodes, blocksize, maxNodesPerRack, results); } else { - chooseLocalRack(writer, clusterNodes, excludedNodes, + chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack, results); } if(--numOfReplicas == 0) break; default: - chooseRandom(numOfReplicas, clusterNodes, excludedNodes, + chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize, maxNodesPerRack, results); } } catch (NotEnoughReplicasException e) { - LOG.warn("Not be able to place enough replicas, still in need of " + LOG.warn("Not able to place enough replicas, still in need of " + numOfReplicas ); } return writer; @@ -2829,7 +2827,6 @@ */ private DatanodeDescriptor chooseLocalNode( DatanodeDescriptor localMachine, - DatanodeDescriptor[] nodes, List<DatanodeDescriptor> excludedNodes, long blocksize, int maxNodesPerRack, @@ -2837,20 +2834,21 @@ throws NotEnoughReplicasException { // if no local machine, randomly choose one node if(localMachine == null) - return chooseRandom(nodes, excludedNodes, + return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, maxNodesPerRack, results); // otherwise try local machine first if(!excludedNodes.contains(localMachine)) { excludedNodes.add(localMachine); - if( isGoodTarget(localMachine, blocksize, maxNodesPerRack, results)) { + if( isGoodTarget(localMachine, blocksize, + maxNodesPerRack, false, results)) { results.add(localMachine); return localMachine; } } // try a node on local rack - return chooseLocalRack(localMachine, nodes, excludedNodes, + return chooseLocalRack(localMachine, excludedNodes, blocksize, maxNodesPerRack, results); } @@ -2858,12 +2856,11 @@ * if no such node is availabe, choose one node from the rack where * a second replica is on. * if still no such node is available, choose a random node - * in the cluster <i>nodes</i>. + * in the cluster. * @return the choosen node */ private DatanodeDescriptor chooseLocalRack( DatanodeDescriptor localMachine, - DatanodeDescriptor[] nodes, List<DatanodeDescriptor> excludedNodes, long blocksize, int maxNodesPerRack, @@ -2871,14 +2868,14 @@ throws NotEnoughReplicasException { // no local machine, so choose a random machine if( localMachine == null ) { - return chooseRandom(nodes, excludedNodes, + return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, maxNodesPerRack, results ); } // choose one from the local rack try { return chooseRandom( - clusterMap.getLeaves( localMachine.getNetworkLocation() ), + localMachine.getNetworkLocation(), excludedNodes, blocksize, maxNodesPerRack, results); } catch (NotEnoughReplicasException e1) { // find the second replica @@ -2894,16 +2891,16 @@ if( newLocal != null ) { try { return chooseRandom( - clusterMap.getLeaves( newLocal.getNetworkLocation() ), + newLocal.getNetworkLocation(), excludedNodes, blocksize, maxNodesPerRack, results); } catch( NotEnoughReplicasException e2 ) { //otherwise randomly choose one from the network - return chooseRandom(nodes, excludedNodes, + return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, maxNodesPerRack, results); } } else { //otherwise randomly choose one from the network - return chooseRandom(nodes, excludedNodes, + return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, maxNodesPerRack, results); } } @@ -2917,38 +2914,19 @@ private void chooseRemoteRack( int numOfReplicas, DatanodeDescriptor localMachine, - DatanodeDescriptor[] nodes, List<DatanodeDescriptor> excludedNodes, long blocksize, int maxReplicasPerRack, List<DatanodeDescriptor> results) throws NotEnoughReplicasException { - // get all the nodes on the local rack - DatanodeDescriptor[] nodesOnRack = clusterMap.getLeaves( - localMachine.getNetworkLocation() ); - - // can we speed up this??? using hashing sets? - DatanodeDescriptor[] nodesOnRemoteRack - = new DatanodeDescriptor[nodes.length-nodesOnRack.length]; - HashSet<DatanodeDescriptor> set1 = new HashSet<DatanodeDescriptor>(nodes.length); - HashSet<DatanodeDescriptor> set2 = new HashSet<DatanodeDescriptor>(nodesOnRack.length); - for(int i=0; i<nodes.length; i++) { - set1.add(nodes[i]); - } - for(int i=0; i<nodesOnRack.length; i++) { - set2.add(nodesOnRack[i]); - } - set1.removeAll(set2); - nodesOnRemoteRack = set1.toArray(nodesOnRemoteRack); - int oldNumOfReplicas = results.size(); // randomly choose one node from remote racks try { - chooseRandom( numOfReplicas, nodesOnRemoteRack, excludedNodes, - blocksize, maxReplicasPerRack, results ); + chooseRandom( numOfReplicas, "~"+localMachine.getNetworkLocation(), + excludedNodes, blocksize, maxReplicasPerRack, results ); } catch (NotEnoughReplicasException e) { chooseRandom( numOfReplicas-(results.size()-oldNumOfReplicas), - nodesOnRack, excludedNodes, blocksize, + localMachine.getNetworkLocation(), excludedNodes, blocksize, maxReplicasPerRack, results); } } @@ -2957,7 +2935,7 @@ * @return the choosen node */ private DatanodeDescriptor chooseRandom( - DatanodeDescriptor[] nodes, + String nodes, List<DatanodeDescriptor> excludedNodes, long blocksize, int maxNodesPerRack, @@ -2980,7 +2958,7 @@ /* Randomly choose <i>numOfReplicas</i> targets from <i>nodes</i>. */ private void chooseRandom(int numOfReplicas, - DatanodeDescriptor[] nodes, + String nodes, List<DatanodeDescriptor> excludedNodes, long blocksize, int maxNodesPerRack, @@ -3008,24 +2986,20 @@ } } - /* Randomly choose one node from <i>nodes</i>. - * @return the choosen node + /* Randomly choose <i>numOfNodes</i> nodes from <i>scope</i>. + * @return the choosen nodes */ private DatanodeDescriptor[] chooseRandom(int numOfReplicas, - DatanodeDescriptor[] nodes, + String nodes, List<DatanodeDescriptor> excludedNodes) { List<DatanodeDescriptor> results = new ArrayList<DatanodeDescriptor>(); - int numOfAvailableNodes = 0; - for(int i=0; i<nodes.length; i++) { - if( !excludedNodes.contains(nodes[i]) ) { - numOfAvailableNodes++; - } - } + int numOfAvailableNodes = + clusterMap.countNumOfAvailableNodes(nodes, excludedNodes); numOfReplicas = (numOfAvailableNodes<numOfReplicas)? numOfAvailableNodes:numOfReplicas; while( numOfReplicas > 0 ) { - DatanodeDescriptor choosenNode = nodes[r.nextInt(nodes.length)]; + DatanodeDescriptor choosenNode = clusterMap.chooseRandom(nodes); if(!excludedNodes.contains(choosenNode)) { results.add( choosenNode ); excludedNodes.add(choosenNode); @@ -3041,27 +3015,42 @@ * does not have too much load, and the rack does not have too many nodes */ private boolean isGoodTarget( DatanodeDescriptor node, + long blockSize, int maxTargetPerLoc, + List<DatanodeDescriptor> results) { + return isGoodTarget(node, blockSize, maxTargetPerLoc, true, results); + } + + private boolean isGoodTarget( DatanodeDescriptor node, long blockSize, int maxTargetPerLoc, + boolean considerLoad, List<DatanodeDescriptor> results) { // check if the node is (being) decommissed if(node.isDecommissionInProgress() || node.isDecommissioned()) { + LOG.debug("Node "+node.getPath()+ + " is not chosen because the node is (being) decommissioned"); return false; } // check the remaining capacity of the target machine if(blockSize* FSConstants.MIN_BLOCKS_FOR_WRITE>node.getRemaining() ) { + LOG.debug("Node "+node.getPath()+ + " is not chosen because the node does not have enough space"); return false; } // check the communication traffic of the target machine - double avgLoad = 0; - int size = clusterMap.getNumOfLeaves(); - if( size != 0 ) { - avgLoad = (double)totalLoad()/size; - } - if(node.getXceiverCount() > (2.0 * avgLoad)) { - return false; + if(considerLoad) { + double avgLoad = 0; + int size = clusterMap.getNumOfLeaves(); + if( size != 0 ) { + avgLoad = (double)totalLoad()/size; + } + if(node.getXceiverCount() > (2.0 * avgLoad)) { + LOG.debug("Node "+node.getPath()+ + " is not chosen because the node is too busy"); + return false; + } } // check if the target rack has chosen too many nodes @@ -3075,6 +3064,8 @@ } } if(counter>maxTargetPerLoc) { + LOG.debug("Node "+node.getPath()+ + " is not chosen because the rack has too many chosen nodes"); return false; } return true; Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java?view=diff&rev=512924&r1=512923&r2=512924 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java Wed Feb 28 11:36:33 2007 @@ -19,8 +19,8 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; +import java.util.List; +import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -46,8 +46,8 @@ * Different from a leave node, it has non-null children. */ private class InnerNode extends NodeBase { - private HashMap<String, Node> children = - new HashMap<String, Node>(); // maps a name to a node + private ArrayList<Node> children=new ArrayList<Node>(); + private int numOfLeaves; /** Construct an InnerNode from a path-like string */ InnerNode( String path ) { @@ -60,7 +60,7 @@ } /** Get its children */ - HashMap<String, Node> getChildren() {return children;} + Collection<Node> getChildren() {return children;} /** Return the number of children this node has */ int getNumOfChildren() { @@ -75,7 +75,7 @@ return true; } - Node firstChild = children.values().iterator().next(); + Node firstChild = children.get(0); if(firstChild instanceof InnerNode) { return false; } @@ -89,7 +89,9 @@ * @return true if this node is an ancestor of <i>n</i> */ boolean isAncestor(Node n) { - return n.getNetworkLocation().startsWith(getPath()); + return getPath().equals(NodeBase.PATH_SEPARATOR_STR) || + (n.getNetworkLocation()+NodeBase.PATH_SEPARATOR_STR). + startsWith(getPath()+NodeBase.PATH_SEPARATOR_STR); } /** Judge if this node is the parent of node <i>n</i> @@ -121,7 +123,7 @@ * @param n node to be added * @return true if the node is added; false otherwise */ - boolean add( Node n ) { + boolean add( DatanodeDescriptor n ) { String parent = n.getNetworkLocation(); String currentPath = getPath(); if( !isAncestor( n ) ) @@ -129,18 +131,36 @@ +parent+", is not a decendent of "+currentPath); if( isParent( n ) ) { // this node is the parent of n; add n directly - return (null == children.put(n.getName(), n) ); + for(int i=0; i<children.size(); i++) { + if(children.get(i).getName().equals(n.getName())) { + children.set(i, n); + return false; + } + } + children.add(n); + numOfLeaves++; + return true; } else { // find the next ancestor node String parentName = getNextAncestorName( n ); - InnerNode parentNode = (InnerNode)children.get(parentName); + InnerNode parentNode = null; + for(int i=0; i<children.size(); i++) { + if(children.get(i).getName().equals(parentName)) { + parentNode = (InnerNode)children.get(i); + } + } if( parentNode == null ) { // create a new InnerNode parentNode = new InnerNode( parentName, currentPath ); - children.put(parentName, parentNode); + children.add(parentNode); } // add n to the subtree of the next ancestor node - return parentNode.add(n); + if( parentNode.add(n) ) { + numOfLeaves++; + return true; + } else { + return false; + } } } @@ -148,19 +168,34 @@ * @parameter n node to be deleted * @return true if the node is deleted; false otherwise */ - boolean remove( Node n ) { + boolean remove( DatanodeDescriptor n ) { String parent = n.getNetworkLocation(); String currentPath = getPath(); if(!isAncestor(n)) - throw new IllegalArgumentException( n.getName()+", which is located at " + throw new IllegalArgumentException( n.getName() + +", which is located at " +parent+", is not a decendent of "+currentPath); if( isParent(n) ) { // this node is the parent of n; remove n directly - return (n == children.remove(n.getName())); + for(int i=0; i<children.size(); i++) { + if(children.get(i).getName().equals(n.getName())) { + children.remove(i); + numOfLeaves--; + return true; + } + } + return false; } else { // find the next ancestor node: the parent node String parentName = getNextAncestorName( n ); - InnerNode parentNode = (InnerNode)children.get(parentName); + InnerNode parentNode = null; + int i; + for(i=0; i<children.size(); i++) { + if(children.get(i).getName().equals(parentName)) { + parentNode = (InnerNode)children.get(i); + break; + } + } if(parentNode==null) { throw new IllegalArgumentException( n.getName() + ", which is located at " @@ -169,8 +204,11 @@ // remove n from the parent node boolean isRemoved = parentNode.remove( n ); // if the parent node has no children, remove the parent node too - if(parentNode.getNumOfChildren() == 0 ) { - children.remove(parentName); + if(isRemoved) { + if(parentNode.getNumOfChildren() == 0 ) { + children.remove(i); + } + numOfLeaves--; } return isRemoved; } @@ -179,8 +217,14 @@ /** Given a node's string representation, return a reference to the node */ Node getLoc( String loc ) { if( loc == null || loc.length() == 0 ) return this; + String[] path = loc.split(PATH_SEPARATOR_STR, 2); - Node childnode = children.get( path[0] ); + Node childnode = null; + for(int i=0; i<children.size(); i++) { + if(children.get(i).getName().equals(path[0])) { + childnode = children.get(i); + } + } if(childnode == null ) return null; // non-existing node if( path.length == 1 ) return childnode; if( childnode instanceof InnerNode ) { @@ -190,22 +234,62 @@ } } - /** Get all the data nodes belonged to the subtree of this node */ - void getLeaves( Collection<DatanodeDescriptor> results ) { - for( Iterator<Node> iter = children.values().iterator(); - iter.hasNext(); ) { - Node childNode = iter.next(); - if( childNode instanceof InnerNode ) { - ((InnerNode)childNode).getLeaves(results); - } else { - results.add( (DatanodeDescriptor)childNode ); + /** get <i>leaveIndex</i> leaf of this subtree + * if it is not in the <i>excludedNode</i>*/ + private DatanodeDescriptor getLeaf(int leaveIndex, Node excludedNode) { + int count=0; + int numOfExcludedLeaves = 1; + if( excludedNode instanceof InnerNode ) { + numOfExcludedLeaves = ((InnerNode)excludedNode).getNumOfLeaves(); + } + if( isRack() ) { // children are leaves + // range check + if(leaveIndex<0 || leaveIndex>=this.getNumOfChildren()) { + return null; } + DatanodeDescriptor child = + (DatanodeDescriptor)children.get(leaveIndex); + if(excludedNode == null || excludedNode != child) { + // child is not the excludedNode + return child; + } else { // child is the excludedNode so return the next child + if(leaveIndex+1>=this.getNumOfChildren()) { + return null; + } else { + return (DatanodeDescriptor)children.get(leaveIndex+1); + } + } + } else { + for( int i=0; i<children.size(); i++ ) { + InnerNode child = (InnerNode)children.get(i); + if(excludedNode == null || excludedNode != child) { + // not the excludedNode + int numOfLeaves = child.getNumOfLeaves(); + if( excludedNode != null && child.isAncestor(excludedNode) ) { + numOfLeaves -= numOfExcludedLeaves; + } + if( count+numOfLeaves > leaveIndex ) { + // the leaf is in the child subtree + return child.getLeaf(leaveIndex-count, excludedNode); + } else { + // go to the next child + count = count+numOfLeaves; + } + } else { // it is the excluededNode + // skip it and set the excludedNode to be null + excludedNode = null; + } + } + return null; } } + + int getNumOfLeaves() { + return numOfLeaves; + } } // end of InnerNode - InnerNode clusterMap = new InnerNode( InnerNode.ROOT ); //the root of the tree - private int numOfLeaves = 0; // data nodes counter + InnerNode clusterMap = new InnerNode( InnerNode.ROOT ); // the root private int numOfRacks = 0; // rack counter public NetworkTopology() { @@ -215,7 +299,7 @@ * Update data node counter & rack counter if neccessary * @param node * data node to be added - * @exception IllegalArgumentException if add a data node to an existing leave + * @exception IllegalArgumentException if add a data node to a leave */ public synchronized void add( DatanodeDescriptor node ) { if( node==null ) return; @@ -227,7 +311,6 @@ + " at an illegal network location"); } if( clusterMap.add( node) ) { - numOfLeaves++; if( rack == null ) { numOfRacks++; } @@ -244,7 +327,6 @@ if( node==null ) return; LOG.info("Removing a node: "+node.getPath()); if( clusterMap.remove( node ) ) { - numOfLeaves--; InnerNode rack = (InnerNode)getNode(node.getNetworkLocation()); if(rack == null) { numOfRacks--; @@ -265,7 +347,7 @@ return (rNode == node); } - /** Given a string representation of a node, return the reference to the node + /** Given a string representation of a node, return its reference * * @param loc * a path-like string representation of a node @@ -278,54 +360,14 @@ return clusterMap.getLoc( loc ); } - /* Add all the data nodes that belong to - * the subtree of the node <i>loc</i> to <i>results</i>*/ - private synchronized void getLeaves( String loc, - Collection<DatanodeDescriptor> results ) { - Node node = getNode(loc); - if( node instanceof InnerNode ) - ((InnerNode)node).getLeaves(results); - else { - results.add((DatanodeDescriptor)node); - } - } - - /** Return all the data nodes that belong to the subtree of <i>loc</i> - * @param loc - * a path-like string representation of a node - * @return an array of data nodes that belong to the subtree of <i>loc</i> - */ - public synchronized DatanodeDescriptor[] getLeaves( String loc ) { - Collection<DatanodeDescriptor> results = new ArrayList<DatanodeDescriptor>(); - getLeaves(loc, results); - return results.toArray(new DatanodeDescriptor[results.size()]); - } - - /** Return all the data nodes that belong to the subtrees of <i>locs</i> - * @param locs - * a collection of strings representing nodes - * @return an array of data nodes that belong to subtrees of <i>locs</i> - */ - public synchronized DatanodeDescriptor[] getLeaves( - Collection<String> locs ) { - Collection<DatanodeDescriptor> nodes = new ArrayList<DatanodeDescriptor>(); - if( locs != null ) { - Iterator<String> iter = locs.iterator(); - while(iter.hasNext()) { - getLeaves( iter.next(), nodes ); - } - } - return nodes.toArray(new DatanodeDescriptor[nodes.size()]); - } - /** Return the total number of racks */ - public int getNumOfRacks( ) { + public synchronized int getNumOfRacks( ) { return numOfRacks; } /** Return the total number of data nodes */ - public int getNumOfLeaves() { - return numOfLeaves; + public synchronized int getNumOfLeaves() { + return clusterMap.getNumOfLeaves(); } private void checkArgument( DatanodeDescriptor node ) { @@ -354,11 +396,7 @@ public int getDistance(DatanodeDescriptor node1, DatanodeDescriptor node2 ) { checkArgument( node1 ); checkArgument( node2 ); - /* - if( !contains(node1) || !contains(node2) ) { - return Integer.MAX_VALUE; - } - */ + if( node1 == node2 || node1.equals(node2)) { return 0; } @@ -398,6 +436,85 @@ return location1.equals(location2); } + final private static Random r = new Random(); + /** randomly choose one node from <i>scope</i> + * if scope starts with ~, choose one from the all datanodes except for the + * ones in <i>scope</i>; otherwise, choose one from <i>scope</i> + * @param scope range of datanodes from which a node will be choosen + * @return the choosen data node + */ + public DatanodeDescriptor chooseRandom(String scope) { + if(scope.startsWith("~")) { + return chooseRandom(NodeBase.ROOT, scope.substring(1)); + } else { + return chooseRandom(scope, null); + } + } + + private DatanodeDescriptor chooseRandom(String scope, String excludedScope){ + if(excludedScope != null) { + if(scope.startsWith(excludedScope)) { + return null; + } + if(!excludedScope.startsWith(scope)) { + excludedScope = null; + } + } + Node node = getNode(scope); + if(node instanceof DatanodeDescriptor) { + return (DatanodeDescriptor)node; + } + InnerNode innerNode = (InnerNode)node; + int numOfDatanodes = innerNode.getNumOfLeaves(); + if(excludedScope == null) { + node = null; + } else { + node = getNode(excludedScope); + if(node instanceof DatanodeDescriptor) { + numOfDatanodes -= 1; + } else { + numOfDatanodes -= ((InnerNode)node).getNumOfLeaves(); + } + } + int leaveIndex = r.nextInt(numOfDatanodes); + return innerNode.getLeaf(leaveIndex, node); + } + + /** return the number of leaves in <i>scope</i> but not in <i>excludedNodes</i> + * if scope starts with ~, return the number of datanodes that are not + * in <i>scope</i> and <i>excludedNodes</i>; + * @param scope a path string that may start with ~ + * @param excludedNodes a list of data nodes + * @return number of available data nodes + */ + public int countNumOfAvailableNodes(String scope, + List<DatanodeDescriptor> excludedNodes) { + boolean isExcluded=false; + if(scope.startsWith("~")) { + isExcluded=true; + scope=scope.substring(1); + } + scope = NodeBase.normalize(scope); + int count=0; // the number of nodes in both scope & excludedNodes + for( DatanodeDescriptor node:excludedNodes) { + if( (node.getPath()+NodeBase.PATH_SEPARATOR_STR). + startsWith(scope+NodeBase.PATH_SEPARATOR_STR)) { + count++; + } + } + Node n=getNode(scope); + int scopeNodeCount=1; + if(n instanceof InnerNode) { + scopeNodeCount=((InnerNode)n).getNumOfLeaves(); + } + if(isExcluded) { + return clusterMap.getNumOfLeaves()- + scopeNodeCount-excludedNodes.size()+count; + } else { + return scopeNodeCount-count; + } + } + /** convert a network tree to a string */ public String toString() { // print the number of racks @@ -406,18 +523,13 @@ tree.append( numOfRacks ); tree.append( "\n" ); // print the number of leaves + int numOfLeaves = getNumOfLeaves(); tree.append( "Expected number of leaves:" ); tree.append( numOfLeaves ); tree.append( "\n" ); - // get all datanodes - DatanodeDescriptor[] datanodes = getLeaves( NodeBase.ROOT ); - // print the number of leaves - tree.append( "Actual number of leaves:" ); - tree.append( datanodes.length ); - tree.append( "\n" ); // print datanodes - for( int i=0; i<datanodes.length; i++ ) { - tree.append( datanodes[i].getPath() ); + for( int i=0; i<numOfLeaves; i++ ) { + tree.append( clusterMap.getLeaf(i, null).getPath() ); tree.append( "\n"); } return tree.toString(); Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplicationPolicy.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplicationPolicy.java?view=diff&rev=512924&r1=512923&r2=512924 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplicationPolicy.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplicationPolicy.java Wed Feb 28 11:36:33 2007 @@ -1,6 +1,5 @@ package org.apache.hadoop.dfs; -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -16,7 +15,7 @@ private static final Configuration CONF = new Configuration(); private static final NetworkTopology cluster; private static NameNode namenode; - private static FSNamesystem.Replicator replicator; + private static FSNamesystem.ReplicationTargetChooser replicator; private static DatanodeDescriptor dataNodes[] = new DatanodeDescriptor[] { new DatanodeDescriptor(new DatanodeID("h1:5020", "0", -1), "/d1/r1"), @@ -33,6 +32,7 @@ static { try { CONF.set("fs.default.name", "localhost:8020"); + NameNode.format(CONF); namenode = new NameNode(CONF); } catch (IOException e) { // TODO Auto-generated catch block @@ -61,6 +61,10 @@ * @throws Exception */ public void testChooseTarget1() throws Exception { + dataNodes[0].updateHeartbeat( + 2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, + FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 4); // overloaded + DatanodeDescriptor[] targets; targets = replicator.chooseTarget( 0, dataNodes[0], null, BLOCK_SIZE); @@ -91,6 +95,10 @@ assertTrue(cluster.isOnSameRack(targets[0], targets[1])); assertFalse(cluster.isOnSameRack(targets[0], targets[2])); assertFalse(cluster.isOnSameRack(targets[0], targets[3])); + + dataNodes[0].updateHeartbeat( + 2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, + FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0); } /** @@ -161,8 +169,8 @@ // make data node 0 to be not qualified to choose dataNodes[0].updateHeartbeat( 2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, - FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 4); // overloaded - + (FSConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0); // no space + DatanodeDescriptor[] targets; targets = replicator.chooseTarget( 0, dataNodes[0], null, BLOCK_SIZE); Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/net/TestNetworkTopology.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/net/TestNetworkTopology.java?view=diff&rev=512924&r1=512923&r2=512924 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/net/TestNetworkTopology.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/net/TestNetworkTopology.java Wed Feb 28 11:36:33 2007 @@ -1,6 +1,6 @@ package org.apache.hadoop.net; -import java.util.HashSet; + import org.apache.hadoop.dfs.DatanodeDescriptor; import org.apache.hadoop.dfs.DatanodeID; import junit.framework.TestCase; @@ -40,20 +40,6 @@ assertEquals(cluster.getNumOfRacks(), 3); } - public void testGetLeaves() throws Exception { - DatanodeDescriptor [] leaves = cluster.getLeaves(NodeBase.ROOT); - assertEquals(leaves.length, dataNodes.length); - HashSet<DatanodeDescriptor> set1 = - new HashSet<DatanodeDescriptor>(leaves.length); - HashSet<DatanodeDescriptor> set2 = - new HashSet<DatanodeDescriptor>(dataNodes.length); - for(int i=0; i<leaves.length; i++) { - set1.add(leaves[i]); - set2.add(dataNodes[i]); - } - assertTrue(set1.equals(set2)); - } - public void testGetDistance() throws Exception { assertEquals(cluster.getDistance(dataNodes[0], dataNodes[0]), 0); assertEquals(cluster.getDistance(dataNodes[0], dataNodes[1]), 2); @@ -73,6 +59,4 @@ cluster.add( dataNodes[i] ); } } - - }