Author: cutting Date: Mon Nov 20 15:24:40 2006 New Revision: 477400 URL: http://svn.apache.org/viewvc?view=rev&rev=477400 Log: HADOOP-725. Optimize DFS block placement algorithm. Contributed by Milind.
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeID.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=477400&r1=477399&r2=477400 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Mon Nov 20 15:24:40 2006 @@ -91,6 +91,9 @@ 27. HADOOP-652. In DFS, when a file is deleted, the block count is now decremented. (Vladimir Krokhmalyov via cutting) +28. HADOOP-725. In DFS, optimize block placement algorithm, + previously a performance bottleneck. (Milind Bhandarkar via cutting) + Release 0.8.0 - 2006-11-03 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java?view=diff&rev=477400&r1=477399&r2=477400 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java Mon Nov 20 15:24:40 2006 @@ -30,6 +30,9 @@ class DatanodeDescriptor extends DatanodeInfo { private volatile Collection<Block> blocks = new TreeSet<Block>(); + // isAlive == heartbeats.contains(this) + // This is an optimization, because contains takes O(n) time on Arraylist + protected boolean isAlive = false; DatanodeDescriptor() { super(); @@ -81,17 +84,6 @@ this.xceiverCount = xceiverCount; } - /** - * Verify whether the node is dead. - * - * A data node is considered dead if its last heartbeat was received - * EXPIRE_INTERVAL msecs ago. - */ - boolean isDead() { - return getLastUpdate() < - System.currentTimeMillis() - FSConstants.EXPIRE_INTERVAL; - } - Block[] getBlocks() { return (Block[]) blocks.toArray(new Block[blocks.size()]); } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeID.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeID.java?view=diff&rev=477400&r1=477399&r2=477400 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeID.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeID.java Mon Nov 20 15:24:40 2006 @@ -82,7 +82,8 @@ } public boolean equals( Object to ) { - return (this.compareTo( to ) != 0); + return (name.equals(((DatanodeID)to).getName()) && + storageID.equals(((DatanodeID)to).getStorageID())); } public int hashCode() { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?view=diff&rev=477400&r1=477399&r2=477400 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Mon Nov 20 15:24:40 2006 @@ -103,7 +103,6 @@ // Timeouts, constants // public static long HEARTBEAT_INTERVAL = 3; - public static long EXPIRE_INTERVAL = 10 * 60 * 1000; public static long BLOCKREPORT_INTERVAL = 60 * 60 * 1000; public static final long LEASE_SOFTLIMIT_PERIOD = 60 * 1000; public static final long LEASE_HARDLIMIT_PERIOD = 60 * LEASE_SOFTLIMIT_PERIOD; Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=477400&r1=477399&r2=477400 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Mon Nov 20 15:24:40 2006 @@ -122,6 +122,9 @@ // Stats on overall usage // long totalCapacity = 0, totalRemaining = 0; + + // total number of connections per live datanode + int totalLoad = 0; // @@ -136,26 +139,13 @@ Random r = new Random(); /** - * Stores a set of DatanodeDescriptor objects, sorted by heartbeat. + * Stores a set of DatanodeDescriptor objects. * This is a subset of [EMAIL PROTECTED] #datanodeMap}, containing nodes that are * considered alive. * The [EMAIL PROTECTED] HeartbeatMonitor} periodically checks for outdated entries, - * and removes them from the set. + * and removes them from the list. */ - TreeSet<DatanodeDescriptor> heartbeats = - new TreeSet<DatanodeDescriptor>( - new Comparator<DatanodeDescriptor>() { - public int compare(DatanodeDescriptor d1, DatanodeDescriptor d2) { - long lu1 = d1.getLastUpdate(); - long lu2 = d2.getLastUpdate(); - if (lu1 < lu2) - return -1; - if (lu1 > lu2) - return 1; - return d1.getStorageID().compareTo(d2.getStorageID()); - } - } - ); + ArrayList<DatanodeDescriptor> heartbeats = new ArrayList<DatanodeDescriptor>(); // // Store set of Blocks that need to be replicated 1 or more times. @@ -189,8 +179,11 @@ private int maxReplicationStreams; // MIN_REPLICATION is how many copies we need in place or else we disallow the write private int minReplication; - // HEARTBEAT_RECHECK is how often a datanode sends its hearbeat - private int heartBeatRecheck; + // heartbeatRecheckInterval is how often namenode checks for expired datanodes + private long heartbeatRecheckInterval; + // heartbeatExpireInterval is how long namenode waits for datanode to report + // heartbeat + private long heartbeatExpireInterval; public static FSNamesystem fsNamesystemObject; private String localMachine; @@ -222,7 +215,10 @@ + " must be less than dfs.replication.max = " + maxReplication ); this.maxReplicationStreams = conf.getInt("dfs.max-repl-streams", 2); - this.heartBeatRecheck= 1000; + long heartbeatInterval = conf.getLong("dfs.heartbeat.interval", 3) * 1000; + this.heartbeatRecheckInterval = 5 * 60 * 1000; // 5 minutes + this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval + + 10 * heartbeatInterval; this.localMachine = addr.getHostName(); this.port = addr.getPort(); @@ -1206,7 +1202,7 @@ // The same datanode has been just restarted to serve the same data // storage. We do not need to remove old data blocks, the delta will // be calculated on the next block report from the datanode - NameNode.stateChangeLog.debug( + NameNode.stateChangeLog.info( "BLOCK* NameSystem.registerDatanode: " + "node restarted." ); return; @@ -1249,6 +1245,8 @@ } // register new datanode DatanodeDescriptor nodeDescr = new DatanodeDescriptor( nodeReg ); + // unless we get a heartbeat from this datanode, we will not mark it Alive + nodeDescr.isAlive = false; unprotectedAddDatanode( nodeDescr ); getEditLog().logAddDatanode( nodeDescr ); return; @@ -1283,6 +1281,11 @@ return newID; } + private boolean isDatanodeDead(DatanodeDescriptor node) { + return (node.getLastUpdate() < + (System.currentTimeMillis() - heartbeatExpireInterval)); + } + /** * The given node has reported in. This method should: * 1) Record the heartbeat, so the datanode isn't timed out @@ -1303,19 +1306,36 @@ synchronized (heartbeats) { synchronized (datanodeMap) { DatanodeDescriptor nodeinfo = getDatanode( nodeID ); - needBlockReport = nodeinfo.isDead(); + needBlockReport = isDatanodeDead(nodeinfo); if (nodeinfo == null) // We do not accept unregistered guests throw new UnregisteredDatanodeException( nodeID ); - removeHeartbeat(nodeinfo); + if (nodeinfo.isAlive) { + updateStats(nodeinfo, false); + } nodeinfo.updateHeartbeat(capacity, remaining, xceiverCount); - addHeartbeat(nodeinfo); + updateStats(nodeinfo, true); + if (!nodeinfo.isAlive) { + heartbeats.add(nodeinfo); + nodeinfo.isAlive = true; + } } } return needBlockReport; } + private void updateStats(DatanodeDescriptor node, boolean isAdded) { + if (isAdded) { + totalCapacity += node.getCapacity(); + totalRemaining += node.getRemaining(); + totalLoad += node.getXceiverCount(); + } else { + totalCapacity -= node.getCapacity(); + totalRemaining -= node.getRemaining(); + totalLoad -= node.getXceiverCount(); + } + } /** * Periodically calls heartbeatCheck(). */ @@ -1326,7 +1346,7 @@ while (fsRunning) { heartbeatCheck(); try { - Thread.sleep(heartBeatRecheck); + Thread.sleep(heartbeatRecheckInterval); } catch (InterruptedException ie) { } } @@ -1355,7 +1375,11 @@ * @author hairong */ private void removeDatanode( DatanodeDescriptor nodeInfo ) { - removeHeartbeat(nodeInfo); + if (nodeInfo.isAlive) { + updateStats(nodeInfo, false); + heartbeats.remove(nodeInfo); + nodeInfo.isAlive = false; + } Block deadblocks[] = nodeInfo.getBlocks(); if( deadblocks != null ) @@ -1380,17 +1404,6 @@ + "node " + nodeDescr.getName() + " is added to datanodeMap." ); } - private void addHeartbeat( DatanodeDescriptor nodeDescr ) { - heartbeats.add(nodeDescr); - totalCapacity += nodeDescr.capacity; - totalRemaining += nodeDescr.remaining; - } - - private void removeHeartbeat( DatanodeDescriptor nodeDescr ) { - totalCapacity -= nodeDescr.getCapacity(); - totalRemaining -= nodeDescr.getRemaining(); - heartbeats.remove(nodeDescr); - } /** * Physically remove node from datanodeMap. @@ -1412,18 +1425,30 @@ /** * Check if there are any expired heartbeats, and if so, * whether any blocks have to be re-replicated. - */ - synchronized void heartbeatCheck() { - synchronized (heartbeats) { - DatanodeDescriptor nodeInfo = null; - - while ((heartbeats.size() > 0) && - ((nodeInfo = heartbeats.first()) != null) && - (nodeInfo.isDead())) { - NameNode.stateChangeLog.info("BLOCK* NameSystem.heartbeatCheck: " - + "lost heartbeat from " + nodeInfo.getName()); - removeDatanode( nodeInfo ); + * While removing dead datanodes, make sure that only one datanode is marked + * dead at a time within the synchronized section. Otherwise, a cascading + * effect causes more datanodes to be declared dead. + */ + void heartbeatCheck() { + boolean allAlive = false; + while (!allAlive) { + boolean foundDead = false; + synchronized(this) { + synchronized (heartbeats) { + for (Iterator<DatanodeDescriptor> it = heartbeats.iterator(); + it.hasNext();) { + DatanodeDescriptor nodeInfo = it.next(); + if (isDatanodeDead(nodeInfo)) { + NameNode.stateChangeLog.info("BLOCK* NameSystem.heartbeatCheck: " + + "lost heartbeat from " + nodeInfo.getName()); + removeDatanode( nodeInfo ); + foundDead = true; + break; + } + } + } } + allAlive = ! foundDead; } } @@ -1744,7 +1769,7 @@ synchronized (datanodeMap) { for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); it.hasNext(); ) { DatanodeDescriptor node = it.next(); - if( node.isDead() ) + if( isDatanodeDead(node)) dead.add( node ); else live.add( node ); @@ -1946,151 +1971,88 @@ Collection<DatanodeDescriptor> forbiddenNodes, UTF8 clientMachine, long blockSize) { + Collection<DatanodeDescriptor> targets = new ArrayList<DatanodeDescriptor>(); + if (desiredReplicates > heartbeats.size()) { LOG.warn("Replication requested of "+desiredReplicates +" is larger than cluster size ("+heartbeats.size() +"). Using cluster size."); desiredReplicates = heartbeats.size(); + if (desiredReplicates == 0) { + LOG.warn("While choosing target, totalMachines is " + desiredReplicates); + } } - - Collection<DatanodeDescriptor> alreadyChosen; - alreadyChosen = new TreeSet<DatanodeDescriptor>(); - Collection<DatanodeDescriptor> targets = new ArrayList<DatanodeDescriptor>(); - - for (int i = 0; i < desiredReplicates; i++) { - DatanodeDescriptor target = chooseTarget(forbiddenNodes, alreadyChosen, - clientMachine, blockSize); - if (target == null) - break; // calling chooseTarget again won't help - targets.add(target); - alreadyChosen.add(target); - } - return (DatanodeDescriptor[]) targets.toArray(new DatanodeDescriptor[targets.size()]); - } - - /** - * Choose a target from available machines, excepting the - * given ones. - * - * Right now it chooses randomly from available boxes. In future could - * choose according to capacity and load-balancing needs (or even - * network-topology, to avoid inter-switch traffic). - * @param forbidden1 DatanodeDescriptor targets not allowed, null allowed. - * @param forbidden2 DatanodeDescriptor targets not allowed, null allowed. - * @return DatanodeDescriptor instance to use or null if something went wrong - * (a log message is emitted if null is returned). - */ - DatanodeDescriptor chooseTarget(Collection<DatanodeDescriptor> forbidden1, - Collection<DatanodeDescriptor> forbidden2, - UTF8 clientMachine, - long blockSize) { - // - // Check if there are any available targets at all - // - int totalMachines = heartbeats.size(); - if (totalMachines == 0) { - LOG.warn("While choosing target, totalMachines is " + totalMachines); - return null; - } - - // - // Build a map of forbidden hostnames from the two forbidden sets. - // - Collection<DatanodeDescriptor> forbiddenMachines = new TreeSet(); - if (forbidden1 != null) { - for (Iterator<DatanodeDescriptor> it = forbidden1.iterator(); it.hasNext(); ) { - DatanodeDescriptor cur = it.next(); - forbiddenMachines.add(cur); - } - } - if (forbidden2 != null) { - for (Iterator<DatanodeDescriptor> it = forbidden2.iterator(); it.hasNext(); ) { - DatanodeDescriptor cur = it.next(); - forbiddenMachines.add(cur); - } - } - + double avgLoad = 0.0; - // - // Build list of machines we can actually choose from - // - List<DatanodeDescriptor> targetList = new ArrayList<DatanodeDescriptor>(); - for (Iterator<DatanodeDescriptor> it = heartbeats.iterator(); it.hasNext(); ) { - DatanodeDescriptor node = it.next(); - if (! forbiddenMachines.contains(node)) { - targetList.add(node); - avgLoad += node.getXceiverCount(); - } + if (heartbeats.size() != 0) { + avgLoad = (double) totalLoad / heartbeats.size(); } - if (targetList.size() > 0) { avgLoad = avgLoad/targetList.size(); } - - Collections.shuffle(targetList); - - // - // Now pick one - // - if (targetList.size() > 0) { - // - // If the requester's machine is in the targetList, - // and it's got the capacity, pick it. - // - if (clientMachine != null && clientMachine.getLength() > 0) { - for (Iterator<DatanodeDescriptor> it = targetList.iterator(); it.hasNext(); ) { - DatanodeDescriptor node = it.next(); - if (clientMachine.toString().equals(node.getHost())) { - if ((node.getRemaining() >= blockSize * MIN_BLOCKS_FOR_WRITE) && - (node.getXceiverCount() <= (2.0 * avgLoad))) { - return node; - } - } - } - } - - // - // Otherwise, choose node according to target capacity - // - for (Iterator<DatanodeDescriptor> it = targetList.iterator(); it.hasNext(); ) { - DatanodeDescriptor node = it.next(); + // choose local replica first + if (desiredReplicates != 0) { + // make sure that the client machine is not forbidden + if (clientMachine != null && clientMachine.getLength() > 0) { + for (Iterator<DatanodeDescriptor> it = heartbeats.iterator(); + it.hasNext();) { + DatanodeDescriptor node = it.next(); + if ((forbiddenNodes == null || !forbiddenNodes.contains(node)) && + clientMachine.toString().equals(node.getHost())) { if ((node.getRemaining() >= blockSize * MIN_BLOCKS_FOR_WRITE) && (node.getXceiverCount() <= (2.0 * avgLoad))) { - return node; + targets.add(node); + desiredReplicates--; + break; } + } } + } + } - // - // If we are still not able to find a good node, then see if - // we can pick the clientmachine itself. - // - if (clientMachine != null && clientMachine.getLength() > 0) { - for (Iterator<DatanodeDescriptor> it = targetList.iterator(); it.hasNext(); ) { - DatanodeDescriptor node = it.next(); - if (clientMachine.toString().equals(node.getHost()) && - node.getRemaining() >= blockSize) { - return node; - } - } + for (int i = 0; i < desiredReplicates; i++) { + DatanodeDescriptor target = null; + // + // Otherwise, choose node according to target capacity + // + int nNodes = heartbeats.size(); + int idx = r.nextInt(nNodes); + int rejected = 0; + while (target == null && rejected < nNodes ) { + DatanodeDescriptor node = heartbeats.get(idx); + if ((forbiddenNodes == null || !forbiddenNodes.contains(node)) && + !targets.contains(node) && + (node.getRemaining() >= blockSize * MIN_BLOCKS_FOR_WRITE) && + (node.getXceiverCount() <= (2.0 * avgLoad))) { + target = node; + break; + } else { + idx = (idx+1) % nNodes; + rejected++; } - - // - // That should do the trick. But we might not be able - // to pick any node if the target was out of bytes. As - // a last resort, pick the first valid one we can find. - // - for (Iterator<DatanodeDescriptor> it = targetList.iterator(); it.hasNext(); ) { - DatanodeDescriptor node = it.next(); - if (node.getRemaining() >= blockSize) { - return node; - } + } + if (target == null) { + idx = r.nextInt(nNodes); + rejected = 0; + while (target == null && rejected < nNodes ) { + DatanodeDescriptor node = heartbeats.get(idx); + if ((forbiddenNodes == null || !forbiddenNodes.contains(node)) && + !targets.contains(node) && + node.getRemaining() >= blockSize) { + target = node; + break; + } else { + idx = (idx + 1) % nNodes; + rejected++; + } } + } + + if (target == null) { LOG.warn("Could not find any nodes with sufficient capacity"); - return null; - } else { - LOG.warn("Zero targets found, forbidden1.size=" + - ( forbidden1 != null ? forbidden1.size() : 0 ) + - " forbidden2.size()=" + - ( forbidden2 != null ? forbidden2.size() : 0 )); - return null; + break; // making one more pass over heartbeats would not help + } + targets.add(target); } + + return (DatanodeDescriptor[]) targets.toArray(new DatanodeDescriptor[targets.size()]); } /** Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java?view=auto&rev=477400 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java Mon Nov 20 15:24:40 2006 @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.dfs; + +import junit.framework.TestCase; +import java.io.*; +import java.util.Random; +import java.net.*; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FSOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +/** + * This class tests the replication of a DFS file. + * @author Milind Bhandarkar + */ +public class TestReplication extends TestCase { + static final long seed = 0xDEADBEEFL; + static final int blockSize = 8192; + static final int fileSize = 16384; + static final int numDatanodes = 4; + + private void writeFile(FileSystem fileSys, Path name, int repl) + throws IOException { + // create and write a file that contains three blocks of data + FSOutputStream stm = fileSys.createRaw(name, true, (short)repl, + (long)blockSize); + byte[] buffer = new byte[fileSize]; + Random rand = new Random(seed); + rand.nextBytes(buffer); + stm.write(buffer); + stm.close(); + } + + + private void checkFile(FileSystem fileSys, Path name, int repl) + throws IOException { + String[][] locations = fileSys.getFileCacheHints(name, 0, fileSize); + for (int idx = 0; idx < locations.length; idx++) { + assertEquals("Number of replicas for block" + idx, + Math.min(numDatanodes, repl), locations[idx].length); + } + } + + private void cleanupFile(FileSystem fileSys, Path name) throws IOException { + assertTrue(fileSys.exists(name)); + fileSys.delete(name); + assertTrue(!fileSys.exists(name)); + } + + /** + * Tests replication in DFS. + */ + public void testReplication() throws IOException { + Configuration conf = new Configuration(); + MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, numDatanodes, false); + // Now wait for 15 seconds to give datanodes chance to register + // themselves and to report heartbeat + try { + Thread.sleep(15000L); + } catch (InterruptedException e) { + // nothing + } + InetSocketAddress addr = new InetSocketAddress("localhost", 65312); + DFSClient client = new DFSClient(addr, conf); + DatanodeInfo[] info = client.datanodeReport(); + assertEquals("Number of Datanodes ", numDatanodes, info.length); + FileSystem fileSys = cluster.getFileSystem(); + try { + Path file1 = new Path("smallblocktest.dat"); + writeFile(fileSys, file1, 3); + checkFile(fileSys, file1, 3); + cleanupFile(fileSys, file1); + writeFile(fileSys, file1, 10); + checkFile(fileSys, file1, 10); + cleanupFile(fileSys, file1); + writeFile(fileSys, file1, 4); + checkFile(fileSys, file1, 4); + cleanupFile(fileSys, file1); + writeFile(fileSys, file1, 1); + checkFile(fileSys, file1, 1); + cleanupFile(fileSys, file1); + } finally { + fileSys.close(); + cluster.shutdown(); + } + } +}