Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?view=diff&rev=510181&r1=510180&r2=510181 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Wed Feb 21 12:11:00 2007 @@ -515,24 +515,12 @@ return namesystem.isInSafeMode(); } - /** - * Set administrative commands to decommission datanodes. + /* + * Refresh the list of datanodes that the namenode should allow to + * connect. Uses the files list in the configuration to update the list. */ - public boolean decommission(DecommissionAction action, String[] nodes) - throws IOException { - boolean ret = true; - switch (action) { - case DECOMMISSION_SET: // decommission datanode(s) - namesystem.startDecommission(nodes); - break; - case DECOMMISSION_CLEAR: // remove decommission state of a datanode - namesystem.stopDecommission(nodes); - break; - case DECOMMISSION_GET: // are all the node decommissioned? - ret = namesystem.checkDecommissioned(nodes); - break; - } - return ret; + public void refreshNodes() throws IOException { + namesystem.refreshNodes(); } /** @@ -564,8 +552,12 @@ public DatanodeRegistration register( DatanodeRegistration nodeReg, String networkLocation ) throws IOException { + if (!namesystem.verifyNodeRegistration(nodeReg)) { + throw new DisallowedDatanodeException( nodeReg ); + } verifyVersion( nodeReg.getVersion() ); namesystem.registerDatanode( nodeReg, networkLocation ); + return nodeReg; } @@ -650,7 +642,8 @@ /** * Verify request. * - * Verifies correctness of the datanode version and registration ID. + * Verifies correctness of the datanode version, registration ID, and + * if the datanode does not need to be shutdown. * * @param nodeReg data node registration * @throws IOException
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/DisallowedTaskTrackerException.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/DisallowedTaskTrackerException.java?view=auto&rev=510181 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/DisallowedTaskTrackerException.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/DisallowedTaskTrackerException.java Wed Feb 21 12:11:00 2007 @@ -0,0 +1,18 @@ +package org.apache.hadoop.mapred; + +import java.io.IOException; + + +/** + * This exception is thrown when a tasktracker tries to register or communicate + * with the jobtracker when it does not appear on the list of included nodes, + * or has been specifically excluded. + * + * @author Wendy Chien + */ +class DisallowedTaskTrackerException extends IOException { + + public DisallowedTaskTrackerException(TaskTrackerStatus tracker) { + super("Tasktracker denied communication with jobtracker: " + tracker.getTrackerName()); + } +} Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=510181&r1=510180&r2=510181 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Wed Feb 21 12:11:00 2007 @@ -23,7 +23,7 @@ import org.apache.hadoop.fs.*; import org.apache.hadoop.ipc.*; import org.apache.hadoop.conf.*; -import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.*; import java.io.*; import java.net.*; @@ -449,6 +449,7 @@ Random r = new Random(); private int maxCurrentTasks; + private HostsFileReader hostsReader; // // Properties to maintain while running Jobs and Tasks: @@ -572,6 +573,10 @@ // Same with 'localDir' except it's always on the local disk. jobConf.deleteLocalFiles(SUBDIR); + // Read the hosts/exclude files to restrict access to the jobtracker. + this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""), + conf.get("mapred.hosts.exclude", "")); + // Set ports, start RPC servers, etc. InetSocketAddress addr = getAddress(conf); this.localMachine = addr.getHostName(); @@ -962,7 +967,12 @@ " (initialContact: " + initialContact + " acceptNewTasks: " + acceptNewTasks + ")" + " with responseId: " + responseId); - + + // Make sure heartbeat is from a tasktracker allowed by the jobtracker. + if (!acceptTaskTracker(status)) { + throw new DisallowedTaskTrackerException(status); + } + // First check if the last heartbeat response got through String trackerName = status.getTrackerName(); HeartbeatResponse prevHeartbeatResponse = @@ -1033,6 +1043,32 @@ removeMarkedTasks(trackerName); return response; + } + + /** + * Return if the specified tasktracker is in the hosts list, + * if one was configured. If none was configured, then this + * returns true. + */ + private boolean inHostsList(TaskTrackerStatus status) { + Set<String> hostsList = hostsReader.getHosts(); + return (hostsList.isEmpty() || hostsList.contains(status.getHost())); + } + + /** + * Return if the specified tasktracker is in the exclude list. + */ + private boolean inExcludedHostsList(TaskTrackerStatus status) { + Set<String> excludeList = hostsReader.getExcludedHosts(); + return excludeList.contains(status.getHost()); + } + + /** + * Returns true if the tasktracker is in the hosts list and + * not in the exclude list. + */ + private boolean acceptTaskTracker(TaskTrackerStatus status) { + return (inHostsList(status) && !inExcludedHostsList(status)); } /** Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=510181&r1=510180&r2=510181 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Feb 21 12:11:00 2007 @@ -55,7 +55,7 @@ static final long WAIT_FOR_DONE = 3 * 1000; private int httpPort; - static enum State {NORMAL, STALE, INTERRUPTED} + static enum State {NORMAL, STALE, INTERRUPTED, DENIED} public static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.TaskTracker"); @@ -529,6 +529,12 @@ jobClient.reportTaskTrackerError(taskTrackerName, "DiskErrorException", msg); return State.STALE; + } catch (RemoteException re) { + String reClass = re.getClassName(); + if (DisallowedTaskTrackerException.class.getName().equals(reClass)) { + LOG.info("Tasktracker disallowed by JobTracker."); + return State.DENIED; + } } catch (Exception except) { String msg = "Caught exception: " + StringUtils.stringifyException(except); @@ -855,14 +861,18 @@ */ public void run() { try { - while (running && !shuttingDown) { + boolean denied = false; + while (running && !shuttingDown && !denied) { boolean staleState = false; try { // This while-loop attempts reconnects if we get network errors - while (running && ! staleState && !shuttingDown ) { + while (running && ! staleState && !shuttingDown && !denied) { try { - if (offerService() == State.STALE) { + State osState = offerService(); + if (osState == State.STALE) { staleState = true; + } else if (osState == State.DENIED) { + denied = true; } } catch (Exception ex) { if (!shuttingDown) { @@ -881,6 +891,9 @@ if (shuttingDown) { return; } LOG.warn("Reinitializing local state"); initialize(); + } + if (denied) { + shutdown(); } } catch (IOException iex) { LOG.error("Got fatal exception while reinitializing TaskTracker: " + Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/HostsFileReader.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/HostsFileReader.java?view=auto&rev=510181 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/HostsFileReader.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/HostsFileReader.java Wed Feb 21 12:11:00 2007 @@ -0,0 +1,62 @@ +package org.apache.hadoop.util; + +import java.io.*; +import java.util.Set; +import java.util.HashSet; + + +// Keeps track of which datanodes are allowed to connect to the namenode. +public class HostsFileReader { + private Set<String> includes; + private Set<String> excludes; + private String includesFile; + private String excludesFile; + + public HostsFileReader(String inFile, + String exFile) throws IOException { + includes = new HashSet<String>(); + excludes = new HashSet<String>(); + includesFile = inFile; + excludesFile = exFile; + refresh(); + } + + private void readFileToSet(String filename, Set<String> set) throws IOException { + FileInputStream fis = new FileInputStream(new File(filename)); + try { + BufferedReader reader = new BufferedReader(new InputStreamReader(fis)); + String line; + while ((line = reader.readLine()) != null) { + String[] nodes = line.split("[ \t\n\f\r]+"); + if (nodes != null) { + for (int i = 0; i < nodes.length; i++) { + set.add(nodes[i]); // might need to add canonical name + } + } + } + } finally { + fis.close(); + } + } + + public void refresh() throws IOException { + includes.clear(); + excludes.clear(); + + if (!includesFile.equals("")) { + readFileToSet(includesFile, includes); + } + if (!excludesFile.equals("")) { + readFileToSet(excludesFile, excludes); + } + } + + public Set<String> getHosts() { + return includes; + } + + public Set<String> getExcludedHosts() { + return excludes; + } + +} Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java?view=diff&rev=510181&r1=510180&r2=510181 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java Wed Feb 21 12:11:00 2007 @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FSOutputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -35,9 +36,25 @@ static final long seed = 0xDEADBEEFL; static final int blockSize = 8192; static final int fileSize = 16384; - static final int numDatanodes = 4; + static final int numDatanodes = 5; Random myrand = new Random(); + Path hostsFile; + Path excludeFile; + + private enum NodeState {NORMAL, DECOMMISSION_INPROGRESS, DECOMMISSIONED; } + + private void writeConfigFile(FileSystem fs, Path name, String node) + throws IOException { + // delete if it already exists + if (fs.exists(name)) { + fs.delete(name); + } + FSDataOutputStream stm = fs.create(name); + stm.writeBytes(node); + stm.writeBytes("\n"); + stm.close(); + } private void writeFile(FileSystem fileSys, Path name, int repl) throws IOException { @@ -66,7 +83,11 @@ * replication factor is 1 more than the specified one. */ private void checkFile(FileSystem fileSys, Path name, int repl, - String[] downnodes) throws IOException { + String downnode) throws IOException { + // + // sleep an additional 10 seconds for the blockreports from the datanodes + // to arrive. + // FSInputStream is = fileSys.openRaw(name); DFSClient.DFSInputStream dis = (DFSClient.DFSInputStream) is; DatanodeInfo[][] dinfo = dis.getDataNodes(); @@ -75,12 +96,10 @@ int hasdown = 0; DatanodeInfo[] nodes = dinfo[blk]; for (int j = 0; j < nodes.length; j++) { // for each replica - for (int k = 0; downnodes != null && k < downnodes.length; k++) { - if (nodes[j].getName().equals(downnodes[k])) { - hasdown++; - System.out.println("Block " + blk + " replica " + - nodes[j].getName() + " is decommissioned."); - } + if (nodes[j].getName().equals(downnode)) { + hasdown++; + System.out.println("Block " + blk + " replica " + + nodes[j].getName() + " is decommissioned."); } } System.out.println("Block " + blk + " has " + hasdown + @@ -107,74 +126,92 @@ /* * decommission one random node. */ - private String[] decommissionNode(DFSClient client, FileSystem filesys) - throws IOException { + private String decommissionNode(DFSClient client, + FileSystem filesys, + FileSystem localFileSys) + throws IOException { DistributedFileSystem dfs = (DistributedFileSystem) filesys; DatanodeInfo[] info = client.datanodeReport(); // // pick one datanode randomly. // - int index = myrand.nextInt(info.length); + int index = 0; + boolean found = false; + while (!found) { + index = myrand.nextInt(info.length); + if (!info[index].isDecommissioned()) { + found = true; + } + } String nodename = info[index].getName(); System.out.println("Decommissioning node: " + nodename); - String[] nodes = new String[1]; - nodes[0] = nodename; - dfs.decommission(FSConstants.DecommissionAction.DECOMMISSION_SET, nodes); - return nodes; + + // write nodename into the exclude file. + writeConfigFile(localFileSys, excludeFile, nodename); + dfs.refreshNodes(); + return nodename; } /* * put node back in action */ - private void commissionNode(DFSClient client, FileSystem filesys, - String[] nodes) throws IOException { + private void commissionNode(FileSystem filesys, FileSystem localFileSys, + String node) throws IOException { DistributedFileSystem dfs = (DistributedFileSystem) filesys; - DatanodeInfo[] info = client.datanodeReport(); - for (int i = 0; i < nodes.length; i++) { - System.out.println("Putting node back in action: " + nodes[i]); - } - dfs.decommission(FSConstants.DecommissionAction.DECOMMISSION_CLEAR, nodes); + System.out.println("Commissioning nodes."); + writeConfigFile(localFileSys, excludeFile, ""); + dfs.refreshNodes(); } - /* - * Check that node(s) were decommissioned + /* + * Check if node is in the requested state. */ - private void checkNodeDecommission(DFSClient client, FileSystem filesys, - String[] nodes) throws IOException { + private boolean checkNodeState(FileSystem filesys, + String node, + NodeState state) throws IOException { DistributedFileSystem dfs = (DistributedFileSystem) filesys; - boolean ret = dfs.decommission( - FSConstants.DecommissionAction.DECOMMISSION_GET, nodes); - assertEquals("State of Decommissioned Datanode(s) ", ret, true); + boolean done = false; + boolean foundNode = false; + DatanodeInfo[] datanodes = dfs.getDataNodeStats(); + for (int i = 0; i < datanodes.length; i++) { + DatanodeInfo dn = datanodes[i]; + if (dn.getName().equals(node)) { + if (state == NodeState.DECOMMISSIONED) { + done = dn.isDecommissioned(); + } else if (state == NodeState.DECOMMISSION_INPROGRESS) { + done = dn.isDecommissionInProgress(); + } else { + done = (!dn.isDecommissionInProgress() && !dn.isDecommissioned()); + } + System.out.println(dn.getDatanodeReport()); + foundNode = true; + } + } + if (!foundNode) { + throw new IOException("Could not find node: " + node); + } + return done; } /* * Wait till node is fully decommissioned. */ - private void waitNodeDecommission(DFSClient client, FileSystem filesys, - String[] nodes) throws IOException { + private void waitNodeState(FileSystem filesys, + String node, + NodeState state) throws IOException { DistributedFileSystem dfs = (DistributedFileSystem) filesys; - boolean done = dfs.decommission( - FSConstants.DecommissionAction.DECOMMISSION_GET, nodes); + boolean done = checkNodeState(filesys, node, state); while (!done) { - System.out.println("Waiting for nodes " + nodes[0] + - " to be fully decommissioned..."); + System.out.println("Waiting for node " + node + + " to change state..."); try { - Thread.sleep(5000L); + Thread.sleep(1000); } catch (InterruptedException e) { // nothing } - done = dfs.decommission(FSConstants.DecommissionAction.DECOMMISSION_GET, - nodes); - } - // - // sleep an additional 10 seconds for the blockreports from the datanodes - // to arrive. - // - try { - Thread.sleep(10 * 1000L); - } catch (Exception e) { + done = checkNodeState(filesys, node, state); } } @@ -183,6 +220,17 @@ */ public void testDecommission() throws IOException { Configuration conf = new Configuration(); + + // Set up the hosts/exclude files. + FileSystem localFileSys = FileSystem.getLocal(conf); + Path workingDir = localFileSys.getWorkingDirectory(); + Path dir = new Path(workingDir, "build/test/data/work-dir/decommission"); + assertTrue(localFileSys.mkdirs(dir)); + hostsFile = new Path(dir, "hosts"); + excludeFile = new Path(dir, "exclude"); + conf.set("dfs.hosts.exclude", excludeFile.toString()); + writeConfigFile(localFileSys, excludeFile, ""); + MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, numDatanodes, false); // Now wait for 15 seconds to give datanodes chance to register // themselves and to report heartbeat @@ -209,11 +257,16 @@ Path file1 = new Path("smallblocktest.dat"); writeFile(fileSys, file1, 3); checkFile(fileSys, file1, 3); - String downnodes[] = decommissionNode(client, fileSys); - waitNodeDecommission(client, fileSys, downnodes); - checkFile(fileSys, file1, 3, downnodes); - commissionNode(client, fileSys, downnodes); + + String downnode = decommissionNode(client, fileSys, localFileSys); + waitNodeState(fileSys, downnode, NodeState.DECOMMISSION_INPROGRESS); + commissionNode(fileSys, localFileSys, downnode); + waitNodeState(fileSys, downnode, NodeState.NORMAL); + downnode = decommissionNode(client, fileSys, localFileSys); + waitNodeState(fileSys, downnode, NodeState.DECOMMISSIONED); + checkFile(fileSys, file1, 3, downnode); cleanupFile(fileSys, file1); + cleanupFile(localFileSys, dir); } } catch (IOException e) { info = client.datanodeReport();