Author: cutting Date: Wed Feb 21 12:11:00 2007 New Revision: 510181 URL: http://svn.apache.org/viewvc?view=rev&rev=510181 Log: HADOOP-442. Permit one to specify hosts allowed to connect to namenode and jobtracker with include and exclude files. Contributed by Wendy.
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DisallowedDatanodeException.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java.orig lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/DisallowedTaskTrackerException.java lucene/hadoop/trunk/src/java/org/apache/hadoop/util/HostsFileReader.java Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/conf/hadoop-default.xml lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=510181&r1=510180&r2=510181 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Wed Feb 21 12:11:00 2007 @@ -74,6 +74,10 @@ 22. HADOOP-947. Improve performance of datanode decomissioning. (Dhruba Borthakur via cutting) +23. HADOOP-442. Permit one to specify hosts allowed to connect to + namenode and jobtracker with include and exclude files. (Wendy + Chien via cutting) + Release 0.11.2 - 2007-02-16 Modified: lucene/hadoop/trunk/conf/hadoop-default.xml URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?view=diff&rev=510181&r1=510180&r2=510181 ============================================================================== --- lucene/hadoop/trunk/conf/hadoop-default.xml (original) +++ lucene/hadoop/trunk/conf/hadoop-default.xml Wed Feb 21 12:11:00 2007 @@ -356,6 +356,24 @@ </property> <property> + <name>dfs.hosts</name> + <value></value> + <description>Names a file that contains a list of hosts that are + permitted to connect to the namenode. The full pathname of the file + must be specified. If the value is empty, all hosts are + permitted.</description> +</property> + +<property> + <name>dfs.hosts.exclude</name> + <value></value> + <description>Names a file that contains a list of hosts that are + not permitted to connect to the namenode. The full pathname of the + file must be specified. If the value is empty, no hosts are + excluded.</description> +</property> + +<property> <name>fs.s3.block.size</name> <value>1048576</value> <description> @@ -698,6 +716,22 @@ retained. </description> </property> + +<property> + <name>mapred.hosts</name> + <value></value> + <description>Names a file that contains the list of nodes that may + connect to the jobtracker. If the value is empty, all hosts are + permitted.</description> +</property> + +<property> + <name>mapred.hosts.exclude</name> + <value></value> + <description>Names a file that contains the list of hosts that + should be excluded by the jobtracker. If the value is empty, no + hosts are excluded.</description> +</property> <property> <name>jobclient.output.filter</name> Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java?view=diff&rev=510181&r1=510180&r2=510181 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java Wed Feb 21 12:11:00 2007 @@ -29,7 +29,7 @@ **********************************************************************/ interface ClientProtocol extends VersionedProtocol { - public static final long versionID = 7L; // periodic checkpoint added + public static final long versionID = 8L; // refreshNodes added /////////////////////////////////////// // File contents @@ -313,7 +313,13 @@ */ public boolean setSafeMode( FSConstants.SafeModeAction action ) throws IOException; - public boolean decommission( FSConstants.DecommissionAction action, String[] nodenames) throws IOException; + /** + * Tells the namenode to reread the hosts and exclude files. + * @return True if the call was successful, false otherwise. + * @throws IOException + */ + public void refreshNodes() throws IOException; + /** * Get the size of the current edit log (in bytes). Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java?view=diff&rev=510181&r1=510180&r2=510181 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java Wed Feb 21 12:11:00 2007 @@ -132,61 +132,23 @@ } /** - * Command related to decommission of a datanode. - * Usage: java DFSAdmin -decommission [enter | leave | get] - * @param argv List of command line parameters. Each of these items - could be a hostname or a hostname:portname. - * @param idx The index of the command that is being processed. - * @exception IOException if the filesystem does not exist. - * @return 0 on success, non zero on error. + * Command to ask the namenode to reread the hosts and excluded hosts + * file. + * Usage: java DFSAdmin -refreshNodes + * @exception IOException */ - public int decommission(String[] argv, int idx) throws IOException { + public int refreshNodes() throws IOException { int exitCode = -1; if (!(fs instanceof DistributedFileSystem)) { System.out.println("FileSystem is " + fs.getName()); return exitCode; } - if (idx >= argv.length - 1) { - printUsage("-decommission"); - return exitCode; - } - - // - // Copy all the datanode names to nodes[] - // - String[] nodes = new String[argv.length - idx - 1]; - for (int i = idx + 1, j = 0; i < argv.length; i++, j++) { - nodes[j] = argv[i]; - } - FSConstants.DecommissionAction action; - - if ("set".equalsIgnoreCase(argv[idx])) { - action = FSConstants.DecommissionAction.DECOMMISSION_SET; - } else if ("clear".equalsIgnoreCase(argv[idx])) { - action = FSConstants.DecommissionAction.DECOMMISSION_CLEAR; - } else if ("get".equalsIgnoreCase(argv[idx])) { - action = FSConstants.DecommissionAction.DECOMMISSION_GET; - } else { - printUsage("-decommission"); - return exitCode; - } DistributedFileSystem dfs = (DistributedFileSystem) fs; - boolean mode = dfs.decommission(action, nodes); - - if (action == FSConstants.DecommissionAction.DECOMMISSION_GET) { - if (mode) { - System.out.println("Node(s) has finished decommission"); - } - else { - System.out.println("Node(s) have not yet been decommissioned"); - } - return 0; - } - if (mode) { - return 0; // success - } + dfs.refreshNodes(); + exitCode = 0; + return exitCode; } @@ -197,19 +159,18 @@ public void printUsage(String cmd) { if ("-report".equals(cmd)) { System.err.println("Usage: java DFSAdmin" - + " [report]"); + + " [-report]"); } else if ("-safemode".equals(cmd)) { System.err.println("Usage: java DFSAdmin" + " [-safemode enter | leave | get | wait]"); - } else if ("-decommission".equals(cmd)) { + } else if ("-refreshNodes".equals(cmd)) { System.err.println("Usage: java DFSAdmin" - + " [-decommission set | clear | get " - + "[datanode1[, datanode2..]]"); + + " [-refreshNodes]"); } else { System.err.println("Usage: java DFSAdmin"); System.err.println(" [-report]"); System.err.println(" [-safemode enter | leave | get | wait]"); - System.err.println(" [-decommission set | clear | get]"); + System.err.println(" [-refreshNodes]"); } } @@ -242,13 +203,14 @@ printUsage(cmd); return exitCode; } - } else if ("-decommission".equals(cmd)) { - if (argv.length < 2) { + } else if ("-refreshNodes".equals(cmd)) { + if (argv.length != 1) { printUsage(cmd); return exitCode; } } + // initialize DFSAdmin try { init(); @@ -267,8 +229,8 @@ report(); } else if ("-safemode".equals(cmd)) { setSafeMode(argv, i); - } else if ("-decommission".equals(cmd)) { - exitCode = decommission(argv, i); + } else if ("-refreshNodes".equals(cmd)) { + exitCode = refreshNodes(); } else { exitCode = -1; System.err.println(cmd.substring(1) + ": Unknown command"); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=510181&r1=510180&r2=510181 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Wed Feb 21 12:11:00 2007 @@ -370,17 +370,16 @@ } /** - * Set, clear decommission state of datnode(s). - * See [EMAIL PROTECTED] ClientProtocol#decommission(FSConstants.DecommissionAction)} + * Refresh the hosts and exclude files. (Rereads them.) + * See [EMAIL PROTECTED] ClientProtocol#refreshNodes()} * for more details. * - * @see ClientProtocol#decommission(FSConstants.DecommissionAction) + * @see ClientProtocol#refreshNodes() */ - public boolean decommission(DecommissionAction action, String[] nodes) - throws IOException { - return namenode.decommission(action, nodes); + public void refreshNodes() throws IOException { + namenode.refreshNodes(); } - + /** */ public boolean mkdirs(UTF8 src) throws IOException { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?view=diff&rev=510181&r1=510180&r2=510181 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Wed Feb 21 12:11:00 2007 @@ -499,7 +499,8 @@ return; } catch( RemoteException re ) { String reClass = re.getClassName(); - if( UnregisteredDatanodeException.class.getName().equals( reClass )) { + if( UnregisteredDatanodeException.class.getName().equals( reClass ) || + DisallowedDatanodeException.class.getName().equals( reClass )) { LOG.warn( "DataNode is shutting down: " + StringUtils.stringifyException(re)); shutdown(); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java?view=diff&rev=510181&r1=510180&r2=510181 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java Wed Feb 21 12:11:00 2007 @@ -190,7 +190,6 @@ * Sets the admin state to indicate that decommision is complete. */ void setDecommissioned() { - assert isDecommissionInProgress(); adminState = AdminStates.DECOMMISSIONED; } Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DisallowedDatanodeException.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DisallowedDatanodeException.java?view=auto&rev=510181 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DisallowedDatanodeException.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DisallowedDatanodeException.java Wed Feb 21 12:11:00 2007 @@ -0,0 +1,18 @@ +package org.apache.hadoop.dfs; + +import java.io.IOException; + + +/** + * This exception is thrown when a datanode tries to register or communicate + * with the namenode when it does not appear on the list of included nodes, + * or has been specifically excluded. + * + * @author Wendy Chien + */ +class DisallowedDatanodeException extends IOException { + + public DisallowedDatanodeException( DatanodeID nodeID ) { + super("Datanode denied communication with namenode: " + nodeID.getName() ); + } +} Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java?view=diff&rev=510181&r1=510180&r2=510181 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java Wed Feb 21 12:11:00 2007 @@ -343,12 +343,11 @@ return dfs.setSafeMode( action ); } - /** - * Set, clear decommission of a set of datanodes. + /* + * Refreshes the list of hosts and excluded hosts from the configured + * files. */ - public boolean decommission(FSConstants.DecommissionAction action, - String[] nodes) - throws IOException { - return dfs.decommission(action, nodes); + public void refreshNodes() throws IOException { + dfs.refreshNodes(); } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?view=diff&rev=510181&r1=510180&r2=510181 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Wed Feb 21 12:11:00 2007 @@ -122,9 +122,6 @@ // SafeMode actions public enum SafeModeAction{ SAFEMODE_LEAVE, SAFEMODE_ENTER, SAFEMODE_GET; } - // decommission administrative actions - public enum DecommissionAction{ DECOMMISSION_SET, DECOMMISSION_CLEAR, DECOMMISSION_GET; } - // Version is reflected in the dfs image and edit log files. // Version is reflected in the data storage file. // Versions are negative. Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=510181&r1=510180&r2=510181 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Wed Feb 21 12:11:00 2007 @@ -204,6 +204,9 @@ // for block replicas placement Replicator replicator = new Replicator(); + private HostsFileReader hostsReader; + private Daemon dnthread = null; + /** * dirs is a list oif directories where the filesystem directory state * is stored @@ -252,6 +255,11 @@ replthread.start(); this.systemStart = now(); this.startTime = new Date(systemStart); + + this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""), + conf.get("dfs.hosts.exclude","")); + this.dnthread = new Daemon(new DecommissionedMonitor()); + dnthread.start(); this.infoPort = conf.getInt("dfs.info.port", 50070); this.infoBindAddress = conf.get("dfs.info.bindAddress", "0.0.0.0"); @@ -292,6 +300,7 @@ infoServer.stop(); hbthread.join(3000); replthread.join(3000); + dnthread.join(3000); } catch (InterruptedException ie) { } finally { // using finally to ensure we also wait for lease daemon @@ -1575,6 +1584,11 @@ (System.currentTimeMillis() - heartbeatExpireInterval)); } + void setDatanodeDead(DatanodeID nodeID) throws IOException { + DatanodeDescriptor node = getDatanode(nodeID); + node.setLastUpdate(0); + } + /** * The given node has reported in. This method should: * 1) Record the heartbeat, so the datanode isn't timed out @@ -1606,6 +1620,12 @@ return true; } + // Check if this datanode should actually be shutdown instead. + if (shouldNodeShutdown(nodeinfo)) { + setDatanodeDead(nodeinfo); + throw new DisallowedDatanodeException(nodeinfo); + } + if( !nodeinfo.isAlive ) { return true; } else { @@ -1916,6 +1936,12 @@ +"from "+nodeID.getName()+" "+newReport.length+" blocks" ); DatanodeDescriptor node = getDatanode( nodeID ); + // Check if this datanode should actually be shutdown instead. + if (shouldNodeShutdown(node)) { + setDatanodeDead(node); + throw new DisallowedDatanodeException(node); + } + // // Modify the (block-->datanode) map, according to the difference // between the old and new block report. @@ -2198,8 +2224,16 @@ "Unexpected exception. Got blockReceived message from node " + block.getBlockName() + ", but there is no info for it"); } + NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: " +block.getBlockName()+" is received from " + nodeID.getName() ); + + // Check if this datanode should actually be shutdown instead. + if (shouldNodeShutdown(node)) { + setDatanodeDead(node); + throw new DisallowedDatanodeException(node); + } + // // Modify the blocks->datanode map and node's map. // @@ -2260,100 +2294,33 @@ } } + /** - * Start decommissioning the specified datanodes. If a datanode is - * already being decommissioned, then this is a no-op. + * Start decommissioning the specified datanode. */ - public synchronized void startDecommission (String[] nodes) - throws IOException { - if (isInSafeMode()) { - throw new SafeModeException("Cannot decommission node ", safeMode); - } - boolean isError = false; - String badnodes = ""; - - synchronized (datanodeMap) { - for (int i = 0; i < nodes.length; i++) { - boolean found = false; - for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); - it.hasNext(); ) { - DatanodeDescriptor node = it.next(); + private void startDecommission (DatanodeDescriptor node) + throws IOException { - // - // If this is a node that we are interested in, set its admin state. - // - if (node.getName().equals(nodes[i]) || - node.getHost().equals(nodes[i])) { - found = true; - if (!node.isDecommissionInProgress() && !node.isDecommissioned()) { - LOG.info("Start Decommissioning node " + node.name); - node.startDecommission(); - // - // all those blocks that resides on this node has to be - // replicated. - Block decommissionBlocks[] = node.getBlocks(); - for (int j = 0; j < decommissionBlocks.length; j++) { - neededReplications.update(decommissionBlocks[j], -1, 0); - } - } - break; - } - } - // - // Record the fact that a specified node was not found - // - if (!found) { - badnodes += nodes[i] + " "; - isError = true; - } + if (!node.isDecommissionInProgress() && !node.isDecommissioned()) { + LOG.info("Start Decommissioning node " + node.name); + node.startDecommission(); + // + // all the blocks that reside on this node have to be + // replicated. + Block decommissionBlocks[] = node.getBlocks(); + for (int j = 0; j < decommissionBlocks.length; j++) { + neededReplications.update(decommissionBlocks[j], -1, 0); } } - if (isError) { - throw new IOException("Nodes " + badnodes + " not found"); - } } /** * Stop decommissioning the specified datanodes. */ - public synchronized void stopDecommission (String[] nodes) + public void stopDecommission (DatanodeDescriptor node) throws IOException { - if (isInSafeMode()) { - throw new SafeModeException("Cannot decommission node ", safeMode); - } - boolean isError = false; - String badnodes = ""; - - synchronized (datanodeMap) { - for (int i = 0; i < nodes.length; i++) { - boolean found = false; - for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); - it.hasNext(); ) { - DatanodeDescriptor node = it.next(); - - // - // If this is a node that we are interested in, set its admin state. - // - if (node.getName().equals(nodes[i]) || - node.getHost().equals(nodes[i])) { - LOG.info("Stop Decommissioning node " + node.name); - found = true; - node.stopDecommission(); - break; - } - } - // - // Record the fact that a specified node was not found - // - if (!found) { - badnodes += nodes[i] + " "; - isError = true; - } - } - } - if (isError) { - throw new IOException("Nodes " + badnodes + " not found"); - } + LOG.info("Stop Decommissioning node " + node.name); + node.stopDecommission(); } /** @@ -2620,7 +2587,7 @@ && (excessBlocks == null || ! excessBlocks.contains(block))) { // filter out containingNodes that are marked for decommission. List<DatanodeDescriptor> nodes = - filterDecommissionedNodes(containingNodes); + filterDecommissionedNodes(containingNodes); int numCurrentReplica = nodes.size(); DatanodeDescriptor targets[] = replicator.chooseTarget( Math.min( fileINode.getReplication() - numCurrentReplica, @@ -3157,6 +3124,124 @@ } //end of Replicator + + // Keeps track of which datanodes are allowed to connect to the namenode. + + private boolean inHostsList(DatanodeID node) { + Set<String> hostsList = hostsReader.getHosts(); + return (hostsList.isEmpty() || + hostsList.contains(node.getName()) || + hostsList.contains(node.getHost())); + } + + + private boolean inExcludedHostsList(DatanodeID node) { + Set<String> excludeList = hostsReader.getExcludedHosts(); + return (excludeList.contains(node.getName()) || + excludeList.contains(node.getHost())); + } + + /** + * Rereads the files to update the hosts and exclude lists. It + * checks if any of the hosts have changed states: + * 1. Added to hosts --> no further work needed here. + * 2. Removed from hosts --> mark AdminState as decommissioned. + * 3. Added to exclude --> start decommission. + * 4. Removed from exclude --> stop decommission. + */ + void refreshNodes() throws IOException { + hostsReader.refresh(); + synchronized (this) { + for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); + it.hasNext(); ) { + DatanodeDescriptor node = it.next(); + // Check if not include. + if (!inHostsList(node)) { + node.setDecommissioned(); // case 2. + } else { + if (inExcludedHostsList(node)) { + if (!node.isDecommissionInProgress() && + !node.isDecommissioned()) { + startDecommission(node); // case 3. + } + } else { + if (node.isDecommissionInProgress() || + node.isDecommissioned()) { + stopDecommission(node); // case 4. + } + } + } + } + } + + } + + + /** + * Checks if the node is not on the hosts list. If it is not, then + * it will be ignored. If the node is in the hosts list, but is also + * on the exclude list, then it will be decommissioned. + * Returns FALSE if node is rejected for registration. + * Returns TRUE if node is registered (including when it is on the + * exclude list and is being decommissioned). + */ + public synchronized boolean verifyNodeRegistration(DatanodeRegistration nodeReg) + throws IOException { + if (!inHostsList(nodeReg)) { + return false; + } + if (inExcludedHostsList(nodeReg)) { + DatanodeDescriptor node = getDatanode(nodeReg); + if (!checkDecommissionStateInternal(node)) { + startDecommission(node); + } + } + return true; + } + + /** + * Checks if the Admin state bit is DECOMMISSIONED. If so, then + * we should shut it down. + * + * Returns true if the node should be shutdown. + */ + private boolean shouldNodeShutdown(DatanodeDescriptor node) { + return (node.isDecommissioned()); + } + + /** + * Check if any of the nodes being decommissioned has finished + * moving all its datablocks to another replica. This is a loose + * heuristic to determine when a decommission is really over. + */ + public synchronized void decommissionedDatanodeCheck() { + for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); + it.hasNext(); ) { + DatanodeDescriptor node = it.next(); + checkDecommissionStateInternal(node); + } + } + + /** + * Periodically calls decommissionedDatanodeCheck(). + */ + class DecommissionedMonitor implements Runnable { + + public void run() { + while (fsRunning) { + try { + decommissionedDatanodeCheck(); + } catch (Exception e) { + FSNamesystem.LOG.info(StringUtils.stringifyException(e)); + } + try { + Thread.sleep(1000 * 60 * 5); + } catch (InterruptedException ie) { + } + } + } + } + /** * Information about the file while it is being written to.