Author: cutting Date: Thu Jun 14 14:53:06 2007 New Revision: 547419 URL: http://svn.apache.org/viewvc?view=rev&rev=547419 Log: HADOOP-1269. Finer grained locking in HDFS namenode. Contributed by Dhruba.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Host2NodesMap.java lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=547419&r1=547418&r2=547419 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Thu Jun 14 14:53:06 2007 @@ -120,6 +120,9 @@ 38. HADOOP-1139. Log HDFS block transitions at INFO level, to better enable diagnosis of problems. (Dhruba Borthakur via cutting) + 39. HADOOP-1269. Finer grained locking in HDFS namenode. + (Dhruba Borthakur via cutting) + Release 0.13.0 - 2007-06-08 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=547419&r1=547418&r2=547419 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Thu Jun 14 14:53:06 2007 @@ -1273,9 +1273,10 @@ " seconds"); } try { - LOG.debug("NotReplicatedYetException sleeping " + src + + LOG.warn("NotReplicatedYetException sleeping " + src + " retries left " + retries); Thread.sleep(sleeptime); + sleeptime *= 2; } catch (InterruptedException ie) { } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?view=diff&rev=547419&r1=547418&r2=547419 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Thu Jun 14 14:53:06 2007 @@ -289,22 +289,6 @@ } } String errorMsg = null; - // verify build version - if (!nsInfo.getBuildVersion().equals(Storage.getBuildVersion())) { - errorMsg = "Incompatible build versions: namenode BV = " - + nsInfo.getBuildVersion() + "; datanode BV = " - + Storage.getBuildVersion(); - LOG.fatal(errorMsg); - try { - namenode.errorReport(dnRegistration, - DatanodeProtocol.NOTIFY, errorMsg); - } catch(SocketTimeoutException e) { // namenode is busy - LOG.info("Problem connecting to server: " + getNameNodeAddr()); - } - throw new IOException(errorMsg); - } - assert FSConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() : - "Data-node and name-node layout versions must be the same."; return nsInfo; } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=547419&r1=547418&r2=547419 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Thu Jun 14 14:53:06 2007 @@ -429,28 +429,39 @@ * @see ClientProtocol#open(String, long, long) * @see ClientProtocol#getBlockLocations(String, long, long) */ - synchronized LocatedBlocks getBlockLocations(String clientMachine, - String src, - long offset, - long length - ) throws IOException { + LocatedBlocks getBlockLocations(String clientMachine, + String src, + long offset, + long length + ) throws IOException { if (offset < 0) { throw new IOException("Negative offset is not supported. File: " + src ); } if (length < 0) { throw new IOException("Negative length is not supported. File: " + src ); } - return getBlockLocations(clientMachine, - dir.getFileINode(src), - offset, length, Integer.MAX_VALUE); + + DatanodeDescriptor client = null; + LocatedBlocks blocks = getBlockLocations(dir.getFileINode(src), + offset, length, + Integer.MAX_VALUE); + if (blocks == null) { + return null; + } + client = host2DataNodeMap.getDatanodeByHost(clientMachine); + for (Iterator<LocatedBlock> it = blocks.getLocatedBlocks().iterator(); + it.hasNext();) { + LocatedBlock block = (LocatedBlock) it.next(); + clusterMap.sortByDistance(client, + (DatanodeDescriptor[])(block.getLocations())); + } + return blocks; } - private LocatedBlocks getBlockLocations(String clientMachine, - FSDirectory.INode inode, - long offset, - long length, - int nrBlocksToReturn - ) throws IOException { + private synchronized LocatedBlocks getBlockLocations(FSDirectory.INode inode, + long offset, + long length, + int nrBlocksToReturn) { if(inode == null || inode.isDir()) { return null; } @@ -479,8 +490,6 @@ long endOff = offset + length; - DatanodeDescriptor client; - client = host2DataNodeMap.getDatanodeByHost(clientMachine); do { // get block locations int numNodes = blocksMap.numNodes(blocks[curBlk]); @@ -491,7 +500,6 @@ blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) { machineSet[numNodes++] = it.next(); } - clusterMap.sortByDistance(client, machineSet); } results.add(new LocatedBlock(blocks[curBlk], machineSet, curPos)); curPos += blocks[curBlk].getNumBytes(); @@ -585,7 +593,54 @@ * @throws IOException if the filename is invalid * [EMAIL PROTECTED] FSDirectory#isValidToCreate(UTF8)}. */ - public synchronized LocatedBlock startFile(UTF8 src, + public LocatedBlock startFile(UTF8 src, + UTF8 holder, + UTF8 clientMachine, + boolean overwrite, + short replication, + long blockSize + ) throws IOException { + + // + // Create file into pendingCreates and get the first blockId + // + Block newBlock = startFileInternal(src, holder, clientMachine, + overwrite, replication, + blockSize); + + // + // Get the array of replication targets + // + try { + DatanodeDescriptor clientNode = + host2DataNodeMap.getDatanodeByHost(clientMachine.toString()); + DatanodeDescriptor targets[] = replicator.chooseTarget(replication, + clientNode, null, blockSize); + if (targets.length < this.minReplication) { + if (clusterMap.getNumOfLeaves() == 0) { + throw new IOException("Failed to create file " + src + + " on client " + clientMachine + + " because this cluster has no datanodes."); + } + throw new IOException("Failed to create file " + src + + " on client " + clientMachine + + " because there were not enough datanodes available. " + + "Found " + targets.length + + " datanodes but MIN_REPLICATION for the cluster is " + + "configured to be " + + this.minReplication + + "."); + } + return new LocatedBlock(newBlock, targets, 0L); + + } catch (IOException ie) { + NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: " + + ie.getMessage()); + throw ie; + } + } + + public synchronized Block startFileInternal(UTF8 src, UTF8 holder, UTF8 clientMachine, boolean overwrite, @@ -666,26 +721,8 @@ } } - // Get the array of replication targets DatanodeDescriptor clientNode = host2DataNodeMap.getDatanodeByHost(clientMachine.toString()); - DatanodeDescriptor targets[] = replicator.chooseTarget(replication, - clientNode, null, blockSize); - if (targets.length < this.minReplication) { - if (clusterMap.getNumOfLeaves() == 0) { - throw new IOException("Failed to create file "+src - + " on client " + clientMachine - + " because this cluster has no datanodes."); - } - throw new IOException("Failed to create file "+src - + " on client " + clientMachine - + " because there were not enough datanodes available. " - + "Found " + targets.length - + " datanodes but MIN_REPLICATION for the cluster is " - + "configured to be " - + this.minReplication - + "."); - } // Reserve space for this pending file pendingCreates.put(src, @@ -709,9 +746,9 @@ } lease.startedCreate(src); } - + // Create first block - return new LocatedBlock(allocateBlock(src), targets, 0L); + return allocateBlock(src); } catch (IOException ie) { NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: " +ie.getMessage()); @@ -730,38 +767,52 @@ * are replicated. Will return an empty 2-elt array if we want the * client to "try again later". */ - public synchronized LocatedBlock getAdditionalBlock(UTF8 src, - UTF8 clientName - ) throws IOException { + public LocatedBlock getAdditionalBlock(UTF8 src, + UTF8 clientName + ) throws IOException { + long fileLength, blockSize; + int replication; + DatanodeDescriptor clientNode = null; + Block newBlock = null; + NameNode.stateChangeLog.debug("BLOCK* NameSystem.getAdditionalBlock: file " +src+" for "+clientName); - if (isInSafeMode()) - throw new SafeModeException("Cannot add block to " + src, safeMode); - FileUnderConstruction pendingFile = pendingCreates.get(src); - // make sure that we still have the lease on this file - if (pendingFile == null) { - throw new LeaseExpiredException("No lease on " + src); - } - if (!pendingFile.getClientName().equals(clientName)) { - throw new LeaseExpiredException("Lease mismatch on " + src + - " owned by " + pendingFile.getClientName() + - " and appended by " + clientName); - } - // - // If we fail this, bad things happen! - // - if (!checkFileProgress(pendingFile, false)) { - throw new NotReplicatedYetException("Not replicated yet:" + src); + synchronized (this) { + if (isInSafeMode()) { + throw new SafeModeException("Cannot add block to " + src, safeMode); + } + + // + // make sure that we still have the lease on this file + // + FileUnderConstruction pendingFile = pendingCreates.get(src); + if (pendingFile == null) { + throw new LeaseExpiredException("No lease on " + src); + } + if (!pendingFile.getClientName().equals(clientName)) { + throw new LeaseExpiredException("Lease mismatch on " + src + + " owned by " + pendingFile.getClientName() + + " and appended by " + clientName); + } + + // + // If we fail this, bad things happen! + // + if (!checkFileProgress(pendingFile, false)) { + throw new NotReplicatedYetException("Not replicated yet:" + src); + } + fileLength = pendingFile.computeFileLength(); + blockSize = pendingFile.getBlockSize(); + clientNode = pendingFile.getClientNode(); + replication = (int)pendingFile.getReplication(); + newBlock = allocateBlock(src); } - // Get the array of replication targets - DatanodeDescriptor clientNode = pendingFile.getClientNode(); - DatanodeDescriptor targets[] = replicator.chooseTarget( - (int)(pendingFile.getReplication()), + DatanodeDescriptor targets[] = replicator.chooseTarget(replication, clientNode, null, - pendingFile.getBlockSize()); + blockSize); if (targets.length < this.minReplication) { throw new IOException("File " + src + " could only be replicated to " + targets.length + " nodes, instead of " + @@ -769,9 +820,7 @@ } // Create next block - return new LocatedBlock(allocateBlock(src), - targets, - pendingFile.computeFileLength()); + return new LocatedBlock(newBlock, targets, fileLength); } /** @@ -930,7 +979,7 @@ /** * Allocate a block at the given pending filename */ - synchronized Block allocateBlock(UTF8 src) { + private Block allocateBlock(UTF8 src) { Block b = null; do { b = new Block(FSNamesystem.randBlockId.nextLong(), 0); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Host2NodesMap.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Host2NodesMap.java?view=diff&rev=547419&r1=547418&r2=547419 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Host2NodesMap.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Host2NodesMap.java Thu Jun 14 14:53:06 2007 @@ -18,26 +18,34 @@ package org.apache.hadoop.dfs; import java.util.*; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; class Host2NodesMap { private HashMap<String, DatanodeDescriptor[]> map = new HashMap<String, DatanodeDescriptor[]>(); private Random r = new Random(); + private ReadWriteLock hostmapLock = new ReentrantReadWriteLock(); /** Check if node is already in the map. */ - synchronized boolean contains(DatanodeDescriptor node) { + boolean contains(DatanodeDescriptor node) { if (node==null) { return false; } String host = node.getHost(); - DatanodeDescriptor[] nodes = map.get(host); - if (nodes != null) { - for(DatanodeDescriptor containedNode:nodes) { - if (node==containedNode) { - return true; + hostmapLock.readLock().lock(); + try { + DatanodeDescriptor[] nodes = map.get(host); + if (nodes != null) { + for(DatanodeDescriptor containedNode:nodes) { + if (node==containedNode) { + return true; + } } } + } finally { + hostmapLock.readLock().unlock(); } return false; } @@ -45,85 +53,101 @@ /** add node to the map * return true if the node is added; false otherwise. */ - synchronized boolean add(DatanodeDescriptor node) { - if (node==null || contains(node)) { - return false; - } + boolean add(DatanodeDescriptor node) { + hostmapLock.writeLock().lock(); + try { + if (node==null || contains(node)) { + return false; + } - String host = node.getHost(); - DatanodeDescriptor[] nodes = map.get(host); - DatanodeDescriptor[] newNodes; - if (nodes==null) { - newNodes = new DatanodeDescriptor[1]; - newNodes[0]=node; - } else { // rare case: more than one datanode on the host - newNodes = new DatanodeDescriptor[nodes.length+1]; - System.arraycopy(nodes, 0, newNodes, 0, nodes.length); - newNodes[nodes.length] = node; + String host = node.getHost(); + DatanodeDescriptor[] nodes = map.get(host); + DatanodeDescriptor[] newNodes; + if (nodes==null) { + newNodes = new DatanodeDescriptor[1]; + newNodes[0]=node; + } else { // rare case: more than one datanode on the host + newNodes = new DatanodeDescriptor[nodes.length+1]; + System.arraycopy(nodes, 0, newNodes, 0, nodes.length); + newNodes[nodes.length] = node; + } + map.put(host, newNodes); + return true; + } finally { + hostmapLock.writeLock().unlock(); } - map.put(host, newNodes); - return true; } /** remove node from the map * return true if the node is removed; false otherwise. */ - synchronized boolean remove(DatanodeDescriptor node) { + boolean remove(DatanodeDescriptor node) { if (node==null) { return false; } String host = node.getHost(); - DatanodeDescriptor[] nodes = map.get(host); - if (nodes==null) { - return false; - } - if (nodes.length==1) { - if (nodes[0]==node) { - map.remove(host); - return true; - } else { + hostmapLock.writeLock().lock(); + try { + + DatanodeDescriptor[] nodes = map.get(host); + if (nodes==null) { return false; } - } - //rare case - int i=0; - for(; i<nodes.length; i++) { - if (nodes[i]==node) { - break; + if (nodes.length==1) { + if (nodes[0]==node) { + map.remove(host); + return true; + } else { + return false; + } } - } - if (i==nodes.length) { - return false; - } else { - DatanodeDescriptor[] newNodes; - newNodes = new DatanodeDescriptor[nodes.length-1]; - System.arraycopy(nodes, 0, newNodes, 0, i); - System.arraycopy(nodes, i+1, newNodes, i, nodes.length-i-1); - map.put(host, newNodes); - return true; + //rare case + int i=0; + for(; i<nodes.length; i++) { + if (nodes[i]==node) { + break; + } + } + if (i==nodes.length) { + return false; + } else { + DatanodeDescriptor[] newNodes; + newNodes = new DatanodeDescriptor[nodes.length-1]; + System.arraycopy(nodes, 0, newNodes, 0, i); + System.arraycopy(nodes, i+1, newNodes, i, nodes.length-i-1); + map.put(host, newNodes); + return true; + } + } finally { + hostmapLock.writeLock().unlock(); } } /** get a data node by its host. * @return DatanodeDescriptor if found; otherwise null. */ - synchronized DatanodeDescriptor getDatanodeByHost(String host) { + DatanodeDescriptor getDatanodeByHost(String host) { if (host==null) { return null; } - DatanodeDescriptor[] nodes = map.get(host); - // no entry - if (nodes== null) { - return null; - } - // one node - if (nodes.length == 1) { - return nodes[0]; + hostmapLock.readLock().lock(); + try { + DatanodeDescriptor[] nodes = map.get(host); + // no entry + if (nodes== null) { + return null; + } + // one node + if (nodes.length == 1) { + return nodes[0]; + } + // more than one node + return nodes[r.nextInt(nodes.length)]; + } finally { + hostmapLock.readLock().unlock(); } - // more than one node - return nodes[r.nextInt(nodes.length)]; } /** @@ -144,16 +168,21 @@ host = name.substring(0, colon); } - DatanodeDescriptor[] nodes = map.get(host); - // no entry - if (nodes== null) { - return null; - } - for(DatanodeDescriptor containedNode:nodes) { - if (name.equals(containedNode.getName())) { - return containedNode; + hostmapLock.readLock().lock(); + try { + DatanodeDescriptor[] nodes = map.get(host); + // no entry + if (nodes== null) { + return null; } + for(DatanodeDescriptor containedNode:nodes) { + if (name.equals(containedNode.getName())) { + return containedNode; + } + } + return null; + } finally { + hostmapLock.readLock().unlock(); } - return null; } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java?view=diff&rev=547419&r1=547418&r2=547419 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java Thu Jun 14 14:53:06 2007 @@ -23,6 +23,8 @@ import java.util.List; import java.util.Random; import java.util.Arrays; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -224,7 +226,7 @@ } // end of remove /** Given a node's string representation, return a reference to the node */ - Node getLoc(String loc) { + private Node getLoc(String loc) { if (loc == null || loc.length() == 0) return this; String[] path = loc.split(PATH_SEPARATOR_STR, 2); @@ -300,8 +302,10 @@ InnerNode clusterMap = new InnerNode(InnerNode.ROOT); // the root private int numOfRacks = 0; // rack counter + private ReadWriteLock netlock; public NetworkTopology() { + netlock = new ReentrantReadWriteLock(); } /** Add a data node @@ -310,21 +314,26 @@ * data node to be added * @exception IllegalArgumentException if add a data node to a leave */ - public synchronized void add(DatanodeDescriptor node) { + public void add(DatanodeDescriptor node) { if (node==null) return; + netlock.writeLock().lock(); LOG.info("Adding a new node: "+node.getPath()); - Node rack = getNode(node.getNetworkLocation()); - if (rack != null && !(rack instanceof InnerNode)) { - throw new IllegalArgumentException("Unexpected data node " - + node.toString() - + " at an illegal network location"); - } - if (clusterMap.add(node)) { - if (rack == null) { - numOfRacks++; + try { + Node rack = getNode(node.getNetworkLocation()); + if (rack != null && !(rack instanceof InnerNode)) { + throw new IllegalArgumentException("Unexpected data node " + + node.toString() + + " at an illegal network location"); + } + if (clusterMap.add(node)) { + if (rack == null) { + numOfRacks++; + } } + LOG.debug("NetworkTopology became:\n" + this.toString()); + } finally { + netlock.writeLock().unlock(); } - LOG.debug("NetworkTopology became:\n" + this.toString()); } /** Remove a data node @@ -332,16 +341,21 @@ * @param node * data node to be removed */ - public synchronized void remove(DatanodeDescriptor node) { + public void remove(DatanodeDescriptor node) { if (node==null) return; + netlock.writeLock().lock(); LOG.info("Removing a node: "+node.getPath()); - if (clusterMap.remove(node)) { - InnerNode rack = (InnerNode)getNode(node.getNetworkLocation()); - if (rack == null) { - numOfRacks--; + try { + if (clusterMap.remove(node)) { + InnerNode rack = (InnerNode)getNode(node.getNetworkLocation()); + if (rack == null) { + numOfRacks--; + } } + LOG.debug("NetworkTopology became:\n" + this.toString()); + } finally { + netlock.writeLock().unlock(); } - LOG.debug("NetworkTopology became:\n" + this.toString()); } /** Check if the tree contains data node <i>node</i> @@ -350,13 +364,18 @@ * a data node * @return true if <i>node</i> is already in the tree; false otherwise */ - public synchronized boolean contains(DatanodeDescriptor node) { + public boolean contains(DatanodeDescriptor node) { if (node == null) return false; - Node parent = node.getParent(); - for(int level=node.getLevel(); parent!=null&&level>0; - parent=parent.getParent(), level--) { - if (parent == clusterMap) - return true; + netlock.readLock().lock(); + try { + Node parent = node.getParent(); + for(int level=node.getLevel(); parent!=null&&level>0; + parent=parent.getParent(), level--) { + if (parent == clusterMap) + return true; + } + } finally { + netlock.readLock().unlock(); } return false; } @@ -367,7 +386,7 @@ * a path-like string representation of a node * @return a reference to the node; null if the node is not in the tree */ - public synchronized Node getNode(String loc) { + private Node getNode(String loc) { loc = NodeBase.normalize(loc); if (!NodeBase.ROOT.equals(loc)) loc = loc.substring(1); @@ -375,13 +394,23 @@ } /** Return the total number of racks */ - public synchronized int getNumOfRacks() { - return numOfRacks; + public int getNumOfRacks() { + netlock.readLock().lock(); + try { + return numOfRacks; + } finally { + netlock.readLock().unlock(); + } } /** Return the total number of data nodes */ - public synchronized int getNumOfLeaves() { - return clusterMap.getNumOfLeaves(); + public int getNumOfLeaves() { + netlock.readLock().lock(); + try { + return clusterMap.getNumOfLeaves(); + } finally { + netlock.readLock().unlock(); + } } /** Return the distance between two data nodes @@ -397,24 +426,28 @@ if (node1 == node2) { return 0; } - int i; Node n1=node1, n2=node2; - int level1=node1.getLevel(), level2=node2.getLevel(); int dis = 0; - while(n1!=null && level1>level2) { - n1 = n1.getParent(); - level1--; - dis++; - } - while(n2!=null && level2>level1) { - n2 = n2.getParent(); - level2--; - dis++; - } - while(n1!=null && n2!=null && n1.getParent()!=n2.getParent()) { - n1=n1.getParent(); - n2=n2.getParent(); - dis+=2; + netlock.readLock().lock(); + try { + int level1=node1.getLevel(), level2=node2.getLevel(); + while(n1!=null && level1>level2) { + n1 = n1.getParent(); + level1--; + dis++; + } + while(n2!=null && level2>level1) { + n2 = n2.getParent(); + level2--; + dis++; + } + while(n1!=null && n2!=null && n1.getParent()!=n2.getParent()) { + n1=n1.getParent(); + n2=n2.getParent(); + dis+=2; + } + } finally { + netlock.readLock().unlock(); } if (n1==null) { LOG.warn("The cluster does not contain data node: "+node1.getPath()); @@ -440,11 +473,16 @@ return false; } - if (node1 == node2 || node1.equals(node2)) { - return true; - } + netlock.readLock().lock(); + try { + if (node1 == node2 || node1.equals(node2)) { + return true; + } - return node1.getParent()==node2.getParent(); + return node1.getParent()==node2.getParent(); + } finally { + netlock.readLock().unlock(); + } } final private static Random r = new Random(); @@ -455,10 +493,15 @@ * @return the choosen data node */ public DatanodeDescriptor chooseRandom(String scope) { - if (scope.startsWith("~")) { - return chooseRandom(NodeBase.ROOT, scope.substring(1)); - } else { - return chooseRandom(scope, null); + netlock.readLock().lock(); + try { + if (scope.startsWith("~")) { + return chooseRandom(NodeBase.ROOT, scope.substring(1)); + } else { + return chooseRandom(scope, null); + } + } finally { + netlock.readLock().unlock(); } } @@ -507,22 +550,27 @@ } scope = NodeBase.normalize(scope); int count=0; // the number of nodes in both scope & excludedNodes - for(DatanodeDescriptor node:excludedNodes) { - if ((node.getPath()+NodeBase.PATH_SEPARATOR_STR). - startsWith(scope+NodeBase.PATH_SEPARATOR_STR)) { - count++; + netlock.readLock().lock(); + try { + for(DatanodeDescriptor node:excludedNodes) { + if ((node.getPath()+NodeBase.PATH_SEPARATOR_STR). + startsWith(scope+NodeBase.PATH_SEPARATOR_STR)) { + count++; + } + } + Node n=getNode(scope); + int scopeNodeCount=1; + if (n instanceof InnerNode) { + scopeNodeCount=((InnerNode)n).getNumOfLeaves(); + } + if (isExcluded) { + return clusterMap.getNumOfLeaves()- + scopeNodeCount-excludedNodes.size()+count; + } else { + return scopeNodeCount-count; } - } - Node n=getNode(scope); - int scopeNodeCount=1; - if (n instanceof InnerNode) { - scopeNodeCount=((InnerNode)n).getNumOfLeaves(); - } - if (isExcluded) { - return clusterMap.getNumOfLeaves()- - scopeNodeCount-excludedNodes.size()+count; - } else { - return scopeNodeCount-count; + } finally { + netlock.readLock().unlock(); } } @@ -549,21 +597,22 @@ /* Set and used only inside sortByDistance. * This saves an allocation each time we sort. */ - private DatanodeDescriptor distFrom = null; + private static ThreadLocal<DatanodeDescriptor> distFrom = + new ThreadLocal<DatanodeDescriptor>(); private final Comparator<DatanodeDescriptor> nodeDistanceComparator = new Comparator<DatanodeDescriptor>() { public int compare(DatanodeDescriptor n1, DatanodeDescriptor n2) { - return getDistance(distFrom, n1) - getDistance(distFrom, n2); + return getDistance(distFrom.get(), n1) - getDistance(distFrom.get(), n2); } }; /** Sorts nodes array by their distances to <i>reader</i>. */ - public synchronized void sortByDistance(final DatanodeDescriptor reader, - DatanodeDescriptor[] nodes) { + public void sortByDistance(final DatanodeDescriptor reader, + DatanodeDescriptor[] nodes) { if (reader != null && contains(reader)) { - distFrom = reader; + distFrom.set(reader); Arrays.sort(nodes, nodeDistanceComparator); - distFrom = null; + distFrom.set(null); } } }