Author: cutting Date: Thu Jan 4 11:14:39 2007 New Revision: 492695 URL: http://svn.apache.org/viewvc?view=rev&rev=492695 Log: HADOOP-681. Add to HDFS the ability to decommission nodes. Contributed by Dhruba.
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java lucene/hadoop/trunk/src/webapps/dfs/dfshealth.jsp lucene/hadoop/trunk/src/webapps/static/hadoop.css Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=492695&r1=492694&r2=492695 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Thu Jan 4 11:14:39 2007 @@ -188,6 +188,10 @@ 53. HADOOP-840. In task tracker, queue task cleanups and perform them in a separate thread. (omalley & Mahadev Konar via cutting) +54. HADOOP-681. Add to HDFS the ability to decommission nodes. This + causes their blocks to be re-replicated on other nodes, so that + they may be removed from a cluster. (Dhruba Borthakur via cutting) + Release 0.9.2 - 2006-12-15 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java?view=diff&rev=492695&r1=492694&r2=492695 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java Thu Jan 4 11:14:39 2007 @@ -29,7 +29,7 @@ **********************************************************************/ interface ClientProtocol extends VersionedProtocol { - public static final long versionID = 3L; // setSafeMode() added + public static final long versionID = 4L; // decommission node added /////////////////////////////////////// // File contents @@ -300,4 +300,6 @@ * @author Konstantin Shvachko */ public boolean setSafeMode( FSConstants.SafeModeAction action ) throws IOException; + + public boolean decommission( FSConstants.DecommissionAction action, String[] nodenames) throws IOException; } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java?view=diff&rev=492695&r1=492694&r2=492695 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java Thu Jan 4 11:14:39 2007 @@ -86,8 +86,6 @@ * @exception IOException if the filesystem does not exist. */ public void setSafeMode(String[] argv, int idx) throws IOException { - final String safeModeUsage = "Usage: java DFSAdmin -safemode " - + "[enter | leave | get]"; if (!(fs instanceof DistributedFileSystem)) { System.out.println("FileSystem is " + fs.getName()); return; @@ -134,6 +132,65 @@ } /** + * Command related to decommission of a datanode. + * Usage: java DFSAdmin -decommission [enter | leave | get] + * @param argv List of command line parameters. Each of these items + could be a hostname or a hostname:portname. + * @param idx The index of the command that is being processed. + * @exception IOException if the filesystem does not exist. + * @return 0 on success, non zero on error. + */ + public int decommission(String[] argv, int idx) throws IOException { + int exitCode = -1; + + if (!(fs instanceof DistributedFileSystem)) { + System.out.println("FileSystem is " + fs.getName()); + return exitCode; + } + if (idx >= argv.length - 1) { + printUsage("-decommission"); + return exitCode; + } + + // + // Copy all the datanode names to nodes[] + // + String[] nodes = new String[argv.length - idx - 1]; + for (int i = idx + 1, j = 0; i < argv.length; i++, j++) { + nodes[j] = argv[i]; + } + + FSConstants.DecommissionAction action; + + if ("set".equalsIgnoreCase(argv[idx])) { + action = FSConstants.DecommissionAction.DECOMMISSION_SET; + } else if ("clear".equalsIgnoreCase(argv[idx])) { + action = FSConstants.DecommissionAction.DECOMMISSION_CLEAR; + } else if ("get".equalsIgnoreCase(argv[idx])) { + action = FSConstants.DecommissionAction.DECOMMISSION_GET; + } else { + printUsage("-decommission"); + return exitCode; + } + DistributedFileSystem dfs = (DistributedFileSystem) fs; + boolean mode = dfs.decommission(action, nodes); + + if (action == FSConstants.DecommissionAction.DECOMMISSION_GET) { + if (mode) { + System.out.println("Node(s) has finished decommission"); + } + else { + System.out.println("Node(s) have not yet been decommissioned"); + } + return 0; + } + if (mode) { + return 0; // success + } + return exitCode; + } + + /** * Displays format of commands. * @param cmd The command that is being executed. */ @@ -144,10 +201,15 @@ } else if ("-safemode".equals(cmd)) { System.err.println("Usage: java DFSAdmin" + " [-safemode enter | leave | get | wait]"); + } else if ("-decommission".equals(cmd)) { + System.err.println("Usage: java DFSAdmin" + + " [-decommission set | clear | get " + + "[datanode1[, datanode2..]]"); } else { System.err.println("Usage: java DFSAdmin"); System.err.println(" [-report]"); System.err.println(" [-safemode enter | leave | get | wait]"); + System.err.println(" [-decommission set | clear | get]"); } } @@ -180,6 +242,11 @@ printUsage(cmd); return exitCode; } + } else if ("-decommission".equals(cmd)) { + if (argv.length < 2) { + printUsage(cmd); + return exitCode; + } } // initialize DFSAdmin @@ -200,6 +267,8 @@ report(); } else if ("-safemode".equals(cmd)) { setSafeMode(argv, i); + } else if ("-decommission".equals(cmd)) { + exitCode = decommission(argv, i); } else { exitCode = -1; System.err.println(cmd.substring(1) + ": Unknown command"); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=492695&r1=492694&r2=492695 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Thu Jan 4 11:14:39 2007 @@ -362,6 +362,18 @@ } /** + * Set, clear decommission state of datnode(s). + * See [EMAIL PROTECTED] ClientProtocol#decommission(FSConstants.DecommissionAction)} + * for more details. + * + * @see ClientProtocol#decommission(FSConstants.DecommissionAction) + */ + public boolean decommission(DecommissionAction action, String[] nodes) + throws IOException { + return namenode.decommission(action, nodes); + } + + /** */ public boolean mkdirs(UTF8 src) throws IOException { checkOpen(); @@ -526,6 +538,14 @@ } this.blocks = newBlocks; this.nodes = (DatanodeInfo[][]) nodeV.toArray(new DatanodeInfo[nodeV.size()][]); + } + + /** + * Used by the automatic tests to detemine blocks locations of a + * file + */ + synchronized DatanodeInfo[][] getDataNodes() { + return nodes; } /** Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java?view=diff&rev=492695&r1=492694&r2=492695 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java Thu Jan 4 11:14:39 2007 @@ -27,6 +27,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableFactories; import org.apache.hadoop.io.WritableFactory; +import org.apache.hadoop.io.WritableUtils; /** * DatanodeInfo represents the status of a DataNode. @@ -42,8 +43,14 @@ protected long lastUpdate; protected int xceiverCount; + // administrative states of a datanode + public enum AdminStates {NORMAL, DECOMMISSION_INPROGRESS, DECOMMISSIONED; } + protected AdminStates adminState; + + DatanodeInfo() { super(); + adminState = null; } DatanodeInfo( DatanodeInfo from ) { @@ -52,6 +59,7 @@ this.remaining = from.getRemaining(); this.lastUpdate = from.getLastUpdate(); this.xceiverCount = from.getXceiverCount(); + this.adminState = from.adminState; } DatanodeInfo( DatanodeID nodeID ) { @@ -60,6 +68,7 @@ this.remaining = 0L; this.lastUpdate = 0L; this.xceiverCount = 0; + this.adminState = null; } /** The raw capacity. */ @@ -101,6 +110,13 @@ long r = getRemaining(); long u = c - r; buffer.append("Name: "+name+"\n"); + if (isDecommissioned()) { + buffer.append("State : Decommissioned\n"); + } else if (isDecommissionInProgress()) { + buffer.append("State : Decommission in progress\n"); + } else { + buffer.append("State : In Service\n"); + } buffer.append("Total raw bytes: "+c+" ("+FsShell.byteDesc(c)+")"+"\n"); buffer.append("Used raw bytes: "+u+" ("+FsShell.byteDesc(u)+")"+"\n"); buffer.append("% used: "+FsShell.limitDecimal(((1.0*u)/c)*100,2)+"%"+"\n"); @@ -108,6 +124,72 @@ return buffer.toString(); } + /** + * Start decommissioning a node. + * old state. + */ + void startDecommission() { + adminState = AdminStates.DECOMMISSION_INPROGRESS; + } + + /** + * Stop decommissioning a node. + * old state. + */ + void stopDecommission() { + adminState = null; + } + + /** + * Returns true if the node is in the process of being decommissioned + */ + boolean isDecommissionInProgress() { + if (adminState == AdminStates.DECOMMISSION_INPROGRESS) { + return true; + } + return false; + } + + /** + * Returns true if the node has been decommissioned. + */ + boolean isDecommissioned() { + if (adminState == AdminStates.DECOMMISSIONED) { + return true; + } + return false; + } + + /** + * Sets the admin state to indicate that decommision is complete. + */ + void setDecommissioned() { + assert isDecommissionInProgress(); + adminState = AdminStates.DECOMMISSIONED; + } + + /** + * Retrieves the admin state of this node. + */ + AdminStates getAdminState() { + if (adminState == null) { + return AdminStates.NORMAL; + } + return adminState; + } + + /** + * Sets the admin state of this node. + */ + void setAdminState(AdminStates newState) { + if (newState == AdminStates.NORMAL) { + adminState = null; + } + else { + adminState = newState; + } + } + ///////////////////////////////////////////////// // Writable ///////////////////////////////////////////////// @@ -127,6 +209,7 @@ out.writeLong(remaining); out.writeLong(lastUpdate); out.writeInt(xceiverCount); + WritableUtils.writeEnum(out, getAdminState()); } /** @@ -137,5 +220,8 @@ this.remaining = in.readLong(); this.lastUpdate = in.readLong(); this.xceiverCount = in.readInt(); + AdminStates newState = (AdminStates) WritableUtils.readEnum(in, + AdminStates.class); + setAdminState(newState); } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java?view=diff&rev=492695&r1=492694&r2=492695 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java Thu Jan 4 11:14:39 2007 @@ -281,7 +281,7 @@ return used; } - /** Return statistics for each datanode.*/ + /** Return statistics for each datanode. */ public DatanodeInfo[] getDataNodeStats() throws IOException { return dfs.datanodeReport(); } @@ -294,5 +294,14 @@ public boolean setSafeMode( FSConstants.SafeModeAction action ) throws IOException { return dfs.setSafeMode( action ); + } + + /** + * Set, clear decommission of a set of datanodes. + */ + public boolean decommission(FSConstants.DecommissionAction action, + String[] nodes) + throws IOException { + return dfs.decommission(action, nodes); } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?view=diff&rev=492695&r1=492694&r2=492695 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Thu Jan 4 11:14:39 2007 @@ -122,6 +122,9 @@ // SafeMode actions public enum SafeModeAction{ SAFEMODE_LEAVE, SAFEMODE_ENTER, SAFEMODE_GET; } + // decommission administrative actions + public enum DecommissionAction{ DECOMMISSION_SET, DECOMMISSION_CLEAR, DECOMMISSION_GET; } + // Version is reflected in the dfs image and edit log files. // Version is reflected in the data storage file. // Versions are negative. Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java?view=diff&rev=492695&r1=492694&r2=492695 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java Thu Jan 4 11:14:39 2007 @@ -258,8 +258,9 @@ if( logVersion > -3 ) throw new IOException("Unexpected opcode " + opcode + " for version " + logVersion ); - DatanodeDescriptor node = new DatanodeDescriptor(); - node.readFields(in); + FSImage.DatanodeImage nodeimage = new FSImage.DatanodeImage(); + nodeimage.readFields(in); + DatanodeDescriptor node = nodeimage.getDatanodeDescriptor(); fsNamesys.unprotectedAddDatanode( node ); break; } @@ -376,7 +377,7 @@ * registration event. */ void logAddDatanode( DatanodeDescriptor node ) { - logEdit( OP_DATANODE_ADD, node, null ); + logEdit( OP_DATANODE_ADD, new FSImage.DatanodeImage(node), null ); } /** Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java?view=diff&rev=492695&r1=492694&r2=492695 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java Thu Jan 4 11:14:39 2007 @@ -74,7 +74,7 @@ } this.editLog = new FSEditLog( edits ); } - + FSEditLog getEditLog() { return editLog; } @@ -344,7 +344,7 @@ } } - class DatanodeImage implements WritableComparable { + static class DatanodeImage implements WritableComparable { /************************************************** * DatanodeImage is used to store persistent information Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=492695&r1=492694&r2=492695 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Thu Jan 4 11:14:39 2007 @@ -717,7 +717,10 @@ // the blocks. for (int i = 0; i < nrBlocks; i++) { SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(pendingBlocks[i]); - if (containingNodes.size() < pendingFile.getReplication()) { + // filter out containingNodes that are marked for decommission. + int numCurrentReplica = countContainingNodes(containingNodes); + + if (numCurrentReplica < pendingFile.getReplication()) { NameNode.stateChangeLog.debug( "DIR* NameSystem.completeFile:" + pendingBlocks[i].getBlockName()+" has only "+containingNodes.size() @@ -1585,20 +1588,25 @@ FSDirectory.INode fileINode = dir.getFileByBlock(block); if( fileINode == null ) // block does not belong to any file return; + + // filter out containingNodes that are marked for decommission. + int numCurrentReplica = countContainingNodes(containingNodes); + // check whether safe replication is reached for the block // only if it is a part of a files - incrementSafeBlockCount( containingNodes.size() ); + incrementSafeBlockCount( numCurrentReplica ); short fileReplication = fileINode.getReplication(); - if (containingNodes.size() >= fileReplication ) { + if (numCurrentReplica >= fileReplication ) { neededReplications.remove(block); pendingReplications.remove(block); NameNode.stateChangeLog.trace("BLOCK* NameSystem.addStoredBlock: " - +block.getBlockName()+" has "+containingNodes.size() + +block.getBlockName()+" has "+ numCurrentReplica +" replicas so is removed from neededReplications and pendingReplications" ); - } else {// containingNodes.size() < fileReplication + + } else {// numCurrentReplica < fileReplication neededReplications.add(block); NameNode.stateChangeLog.debug("BLOCK* NameSystem.addStoredBlock: " - +block.getBlockName()+" has only "+containingNodes.size() + +block.getBlockName()+" has only "+ numCurrentReplica +" replicas so is added to neededReplications" ); } @@ -1620,7 +1628,9 @@ DatanodeDescriptor cur = it.next(); Collection<Block> excessBlocks = excessReplicateMap.get(cur.getStorageID()); if (excessBlocks == null || ! excessBlocks.contains(block)) { + if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) { nonExcess.add(cur); + } } } chooseExcessReplicates(nonExcess, block, replication); @@ -1811,6 +1821,145 @@ } } } + + /** + * Start decommissioning the specified datanodes. If a datanode is + * already being decommissioned, then this is a no-op. + */ + public synchronized void startDecommission (String[] nodes) + throws IOException { + if (isInSafeMode()) { + throw new SafeModeException("Cannot decommission node ", safeMode); + } + boolean isError = false; + String badnodes = ""; + + synchronized (datanodeMap) { + for (int i = 0; i < nodes.length; i++) { + boolean found = false; + for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); + it.hasNext(); ) { + DatanodeDescriptor node = it.next(); + + // + // If this is a node that we are interested in, set its admin state. + // + if (node.getName().equals(nodes[i]) || + node.getHost().equals(nodes[i])) { + found = true; + if (!node.isDecommissionInProgress() && !node.isDecommissioned()) { + LOG.info("Start Decommissioning node " + node.name); + node.startDecommission(); + // + // all those blocks that resides on this node has to be + // replicated. + Block decommissionBlocks[] = node.getBlocks(); + for (int j = 0; j < decommissionBlocks.length; j++) { + synchronized (neededReplications) { + neededReplications.add(decommissionBlocks[j]); + } + } + } + break; + } + } + // + // Record the fact that a specified node was not found + // + if (!found) { + badnodes += nodes[i] + " "; + isError = true; + } + } + } + if (isError) { + throw new IOException("Nodes " + badnodes + " not found"); + } + } + + /** + * Stop decommissioning the specified datanodes. + */ + public synchronized void stopDecommission (String[] nodes) + throws IOException { + if (isInSafeMode()) { + throw new SafeModeException("Cannot decommission node ", safeMode); + } + boolean isError = false; + String badnodes = ""; + + synchronized (datanodeMap) { + for (int i = 0; i < nodes.length; i++) { + boolean found = false; + for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); + it.hasNext(); ) { + DatanodeDescriptor node = it.next(); + + // + // If this is a node that we are interested in, set its admin state. + // + if (node.getName().equals(nodes[i]) || + node.getHost().equals(nodes[i])) { + LOG.info("Stop Decommissioning node " + node.name); + found = true; + node.stopDecommission(); + break; + } + } + // + // Record the fact that a specified node was not found + // + if (!found) { + badnodes += nodes[i] + " "; + isError = true; + } + } + } + if (isError) { + throw new IOException("Nodes " + badnodes + " not found"); + } + } + + /** + * Return true if all specified nodes are decommissioned. + * Otherwise return false. + */ + public synchronized boolean checkDecommissioned (String[] nodes) + throws IOException { + String badnodes = ""; + boolean isError = false; + + synchronized (datanodeMap) { + for (int i = 0; i < nodes.length; i++) { + boolean found = false; + for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); + it.hasNext(); ) { + DatanodeDescriptor node = it.next(); + + // + // If this is a node that we are interested in, check its admin state. + // + if (node.getName().equals(nodes[i]) || + node.getHost().equals(nodes[i])) { + found = true; + boolean isDecommissioned = checkDecommissionStateInternal(node); + if (!isDecommissioned) { + return false; + } + } + } + if (!found) { + badnodes += nodes[i] + " "; + isError = true; + } + } + } + if (isError) { + throw new IOException("Nodes " + badnodes + " not found"); + } + return true; + } + /** */ public DatanodeInfo getDataNodeInfo(String name) { @@ -1896,6 +2045,72 @@ return (Block[]) sendBlock.toArray(new Block[sendBlock.size()]); } + /* + * Counts the number of nodes in the given list. Skips over nodes + * that are marked for decommission. + */ + private int countContainingNodes(Collection<DatanodeDescriptor> nodelist) { + int count = 0; + for (Iterator<DatanodeDescriptor> it = nodelist.iterator(); + it.hasNext(); ) { + DatanodeDescriptor node = it.next(); + if (!node.isDecommissionInProgress() && !node.isDecommissioned()) { + count++; + } + } + return count; + } + + /* + * Return true if there are any blocks in neededReplication that + * reside on the specified node. Otherwise returns false. + */ + private boolean isReplicationInProgress(DatanodeDescriptor srcNode) { + synchronized (neededReplications) { + for (Iterator<Block> it = neededReplications.iterator(); it.hasNext();){ + Block block = it.next(); + Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block); + if (containingNodes.contains(srcNode)) { + return true; + } + } + } + return false; + } + + /** + * Change, if appropriate, the admin state of a datanode to + * decommission completed. Return true if decommission is complete. + */ + private boolean checkDecommissionStateInternal(DatanodeDescriptor node) { + // + // Check to see if there are any blocks in the neededReplication + // data structure that has a replica on the node being decommissioned. + // + if (node.isDecommissionInProgress()) { + if (!isReplicationInProgress(node)) { + node.setDecommissioned(); + LOG.info("Decommission complete for node " + node.name); + } + } + if (node.isDecommissioned()) { + return true; + } + return false; + } + + /** + * Change, if appropriate, the admin state of a datanode to + * decommission completed. + */ + public synchronized void checkDecommissionState(DatanodeID nodeReg) { + DatanodeDescriptor node = datanodeMap.get(nodeReg.getStorageID()); + if (node == null) { + return; + } + checkDecommissionStateInternal(node); + } + /** * Return with a list of Block/DataNodeInfo sets, indicating * where various Blocks should be copied, ASAP. @@ -1924,6 +2139,7 @@ // replicate them. // List<Block> replicateBlocks = new ArrayList<Block>(); + List<Integer> numCurrentReplicas = new ArrayList<Integer>(); List<DatanodeDescriptor[]> replicateTargetSets; replicateTargetSets = new ArrayList<DatanodeDescriptor[]>(); for (Iterator<Block> it = neededReplications.iterator(); it.hasNext();) { @@ -1943,17 +2159,23 @@ Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block); Collection<Block> excessBlocks = excessReplicateMap.get( srcNode.getStorageID() ); + // srcNode must contain the block, and the block must // not be scheduled for removal on that node if (containingNodes != null && containingNodes.contains(srcNode) && (excessBlocks == null || ! excessBlocks.contains(block))) { + + // filter out containingNodes that are marked for decommission. + int numCurrentReplica = countContainingNodes(containingNodes); + DatanodeDescriptor targets[] = chooseTargets( - Math.min( fileINode.getReplication() - containingNodes.size(), + Math.min( fileINode.getReplication() - numCurrentReplica, this.maxReplicationStreams - xmitsInProgress), containingNodes, null, blockSize); if (targets.length > 0) { // Build items to return replicateBlocks.add(block); + numCurrentReplicas.add(new Integer(numCurrentReplica)); replicateTargetSets.add(targets); scheduledXfers += targets.length; } @@ -1973,9 +2195,10 @@ Block block = it.next(); DatanodeDescriptor targets[] = (DatanodeDescriptor[]) replicateTargetSets.get(i); + int numCurrentReplica = numCurrentReplicas.get(i).intValue(); Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block); - if (containingNodes.size() + targets.length >= + if (numCurrentReplica + targets.length >= dir.getFileByBlock( block).getReplication() ) { neededReplications.remove(block); pendingReplications.add(block); @@ -2060,7 +2283,8 @@ it.hasNext();) { DatanodeDescriptor node = it.next(); if ((forbiddenNodes == null || !forbiddenNodes.contains(node)) && - clientMachine.toString().equals(node.getHost())) { + clientMachine.toString().equals(node.getHost()) && + !node.isDecommissionInProgress() && !node.isDecommissioned()) { if ((node.getRemaining() >= blockSize * MIN_BLOCKS_FOR_WRITE) && (node.getXceiverCount() <= (2.0 * avgLoad))) { targets.add(node); @@ -2084,6 +2308,7 @@ DatanodeDescriptor node = heartbeats.get(idx); if ((forbiddenNodes == null || !forbiddenNodes.contains(node)) && !targets.contains(node) && + !node.isDecommissionInProgress() && !node.isDecommissioned() && (node.getRemaining() >= blockSize * MIN_BLOCKS_FOR_WRITE) && (node.getXceiverCount() <= (2.0 * avgLoad))) { target = node; @@ -2100,6 +2325,7 @@ DatanodeDescriptor node = heartbeats.get(idx); if ((forbiddenNodes == null || !forbiddenNodes.contains(node)) && !targets.contains(node) && + !node.isDecommissionInProgress() && !node.isDecommissioned() && node.getRemaining() >= blockSize) { target = node; break; Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?view=diff&rev=492695&r1=492694&r2=492695 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Thu Jan 4 11:14:39 2007 @@ -465,6 +465,27 @@ boolean isInSafeMode() { return namesystem.isInSafeMode(); } + + /** + * Set administrative commands to decommission datanodes. + */ + public boolean decommission(DecommissionAction action, String[] nodes) + throws IOException { + boolean ret = true; + switch (action) { + case DECOMMISSION_SET: // decommission datanode(s) + namesystem.startDecommission(nodes); + break; + case DECOMMISSION_CLEAR: // remove decommission state of a datanode + namesystem.stopDecommission(nodes); + break; + case DECOMMISSION_GET: // are all the node decommissioned? + ret = namesystem.checkDecommissioned(nodes); + break; + } + return ret; + } + //////////////////////////////////////////////////////////////// // DatanodeProtocol @@ -513,6 +534,14 @@ if (blocks != null) { return new BlockCommand(blocks); } + // + // See if the decommissioned node has finished moving all + // its datablocks to another replica. This is a loose + // heuristic to determine when a decommission is really over. + // We can probbaly do it in a seperate thread rather than making + // the heartbeat thread do this. + // + namesystem.checkDecommissionState(nodeReg); return null; } Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java?view=auto&rev=492695 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java Thu Jan 4 11:14:39 2007 @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.dfs; + +import junit.framework.TestCase; +import java.io.*; +import java.util.Random; +import java.net.*; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FSOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +/** + * This class tests the decommissioning of nodes. + * @author Dhruba Borthakur + */ +public class TestDecommission extends TestCase { + static final long seed = 0xDEADBEEFL; + static final int blockSize = 8192; + static final int fileSize = 16384; + static final int numDatanodes = 4; + + Random myrand = new Random(); + + private void writeFile(FileSystem fileSys, Path name, int repl) + throws IOException { + // create and write a file that contains three blocks of data + FSOutputStream stm = fileSys.createRaw(name, true, (short)repl, + (long)blockSize); + byte[] buffer = new byte[fileSize]; + Random rand = new Random(seed); + rand.nextBytes(buffer); + stm.write(buffer); + stm.close(); + } + + + private void checkFile(FileSystem fileSys, Path name, int repl) + throws IOException { + String[][] locations = fileSys.getFileCacheHints(name, 0, fileSize); + for (int idx = 0; idx < locations.length; idx++) { + assertEquals("Number of replicas for block" + idx, + Math.min(numDatanodes, repl), locations[idx].length); + } + } + + /** + * For blocks that reside on the nodes that are down, verify that their + * replication factor is 1 more than the specified one. + */ + private void checkFile(FileSystem fileSys, Path name, int repl, + String[] downnodes) throws IOException { + FSInputStream is = fileSys.openRaw(name); + DFSClient.DFSInputStream dis = (DFSClient.DFSInputStream) is; + DatanodeInfo[][] dinfo = dis.getDataNodes(); + + for (int blk = 0; blk < dinfo.length; blk++) { // for each block + int hasdown = 0; + DatanodeInfo[] nodes = dinfo[blk]; + for (int j = 0; j < nodes.length; j++) { // for each replica + for (int k = 0; downnodes != null && k < downnodes.length; k++) { + if (nodes[j].getName().equals(downnodes[k])) { + hasdown++; + System.out.println("Block " + blk + " replica " + + nodes[j].getName() + " is decommissioned."); + } + } + } + System.out.println("Block " + blk + " has " + hasdown + + " decommissioned replica."); + assertEquals("Number of replicas for block" + blk, + Math.min(numDatanodes, repl+hasdown), nodes.length); + } + } + + private void cleanupFile(FileSystem fileSys, Path name) throws IOException { + assertTrue(fileSys.exists(name)); + fileSys.delete(name); + assertTrue(!fileSys.exists(name)); + } + + private void printDatanodeReport(DatanodeInfo[] info) { + System.out.println("-------------------------------------------------"); + for (int i = 0; i < info.length; i++) { + System.out.println(info[i].getDatanodeReport()); + System.out.println(); + } + } + + /* + * decommission one random node. + */ + private String[] decommissionNode(DFSClient client, FileSystem filesys) + throws IOException { + DistributedFileSystem dfs = (DistributedFileSystem) filesys; + DatanodeInfo[] info = client.datanodeReport(); + + // + // pick one datanode randomly. + // + int index = myrand.nextInt(info.length); + String nodename = info[index].getName(); + System.out.println("Decommissioning node: " + nodename); + String[] nodes = new String[1]; + nodes[0] = nodename; + dfs.decommission(FSConstants.DecommissionAction.DECOMMISSION_SET, nodes); + return nodes; + } + + /* + * put node back in action + */ + private void commissionNode(DFSClient client, FileSystem filesys, + String[] nodes) throws IOException { + DistributedFileSystem dfs = (DistributedFileSystem) filesys; + DatanodeInfo[] info = client.datanodeReport(); + + for (int i = 0; i < nodes.length; i++) { + System.out.println("Putting node back in action: " + nodes[i]); + } + dfs.decommission(FSConstants.DecommissionAction.DECOMMISSION_CLEAR, nodes); + } + + /* + * Check that node(s) were decommissioned + */ + private void checkNodeDecommission(DFSClient client, FileSystem filesys, + String[] nodes) throws IOException { + DistributedFileSystem dfs = (DistributedFileSystem) filesys; + boolean ret = dfs.decommission( + FSConstants.DecommissionAction.DECOMMISSION_GET, nodes); + assertEquals("State of Decommissioned Datanode(s) ", ret, true); + } + + /* + * Wait till node is fully decommissioned. + */ + private void waitNodeDecommission(DFSClient client, FileSystem filesys, + String[] nodes) throws IOException { + DistributedFileSystem dfs = (DistributedFileSystem) filesys; + boolean done = dfs.decommission( + FSConstants.DecommissionAction.DECOMMISSION_GET, nodes); + while (!done) { + System.out.println("Waiting for nodes " + nodes[0] + + " to be fully decommissioned..."); + try { + Thread.sleep(5000L); + } catch (InterruptedException e) { + // nothing + } + done = dfs.decommission(FSConstants.DecommissionAction.DECOMMISSION_GET, + nodes); + } + // + // sleep an additional 10 seconds for the blockreports from the datanodes + // to arrive. + // + try { + Thread.sleep(10 * 1000L); + } catch (Exception e) { + } + } + + /** + * Tests Decommission in DFS. + */ + public void testDecommission() throws IOException { + Configuration conf = new Configuration(); + MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, numDatanodes, false); + // Now wait for 15 seconds to give datanodes chance to register + // themselves and to report heartbeat + try { + Thread.sleep(15000L); + } catch (InterruptedException e) { + // nothing + } + InetSocketAddress addr = new InetSocketAddress("localhost", + cluster.getNameNodePort()); + DFSClient client = new DFSClient(addr, conf); + DatanodeInfo[] info = client.datanodeReport(); + assertEquals("Number of Datanodes ", numDatanodes, info.length); + FileSystem fileSys = cluster.getFileSystem(); + DistributedFileSystem dfs = (DistributedFileSystem) fileSys; + + try { + for (int iteration = 0; iteration < 2; iteration++) { + // + // Decommission one node. Verify that node is decommissioned. + // Verify that replication factor of file has increased from 3 + // to 4. This means one replica is on decommissioned node. + // + Path file1 = new Path("smallblocktest.dat"); + writeFile(fileSys, file1, 3); + checkFile(fileSys, file1, 3); + String downnodes[] = decommissionNode(client, fileSys); + waitNodeDecommission(client, fileSys, downnodes); + checkFile(fileSys, file1, 3, downnodes); + commissionNode(client, fileSys, downnodes); + cleanupFile(fileSys, file1); + } + } catch (IOException e) { + info = client.datanodeReport(); + printDatanodeReport(info); + throw e; + } finally { + fileSys.close(); + cluster.shutdown(); + } + } +} Modified: lucene/hadoop/trunk/src/webapps/dfs/dfshealth.jsp URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/dfs/dfshealth.jsp?view=diff&rev=492695&r1=492694&r2=492695 ============================================================================== --- lucene/hadoop/trunk/src/webapps/dfs/dfshealth.jsp (original) +++ lucene/hadoop/trunk/src/webapps/dfs/dfshealth.jsp Thu Jan 4 11:14:39 2007 @@ -82,9 +82,15 @@ percentUsed = FsShell.limitDecimal(((1.0 * u)/c)*100, 2); else percentUsed = "100"; + + String adminState = (d.isDecommissioned() ? "Decommissioned" : + (d.isDecommissionInProgress() ? "Decommission In Progress": + "In Service")); out.print("<td class=\"lastcontact\"> " + ((currentTime - d.getLastUpdate())/1000) + + "<td class=\"adminstate\">" + + adminState + "<td class=\"size\">" + FsShell.limitDecimal(c*1.0/diskBytes, 2) + "<td class=\"pcused\">" + percentUsed + @@ -167,6 +173,7 @@ out.print( "<tr class=\"headerRow\"> <th " + NodeHeaderStr("name") + "> Node <th " + NodeHeaderStr("lastcontact") + "> Last Contact <th " + + NodeHeaderStr("adminstate") + "> Admin State <th " + NodeHeaderStr("size") + "> Size (" + diskByteStr + ") <th " + NodeHeaderStr("pcused") + "> Used (%) <th " + NodeHeaderStr("blocks") + Modified: lucene/hadoop/trunk/src/webapps/static/hadoop.css URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/static/hadoop.css?view=diff&rev=492695&r1=492694&r2=492695 ============================================================================== --- lucene/hadoop/trunk/src/webapps/static/hadoop.css (original) +++ lucene/hadoop/trunk/src/webapps/static/hadoop.css Thu Jan 4 11:14:39 2007 @@ -41,7 +41,7 @@ cursor : pointer; } -div#dfsnodetable td.blocks, td.size, td.pcused, td.lastcontact { +div#dfsnodetable td.blocks, td.size, td.pcused, td.adminstate, td.lastcontact { text-align : right; }