Author: cutting Date: Tue Mar 27 13:17:25 2007 New Revision: 523062 URL: http://svn.apache.org/viewvc?view=rev&rev=523062 Log: HADOOP-1153. Fix HDFS daemons to correctly stop their threads. Contributed by Konstantin.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=523062&r1=523061&r2=523062 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Tue Mar 27 13:17:25 2007 @@ -45,6 +45,9 @@ by probing for free ports, improving test reliability. (Arun C Murthy via cutting) +14. HADOOP-1153. Fix HDFS daemons to correctly stop their threads. + (Konstantin Shvachko via cutting) + Release 0.12.2 - 2007-23-17 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?view=diff&rev=523062&r1=523061&r2=523062 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Tue Mar 27 13:17:25 2007 @@ -110,7 +110,7 @@ FSDataset data; DatanodeRegistration dnRegistration; private String networkLoc; - boolean shouldRun = true; + volatile boolean shouldRun = true; Vector receivedBlockList = new Vector(); int xmitsInProgress = 0; Daemon dataXceiveServer = null; @@ -318,7 +318,7 @@ * @throws IOException */ private void register() throws IOException { - while( true ) { + while( shouldRun ) { try { // reset name to machineName. Mainly for web interface. dnRegistration.name = machineName + ":" + dnRegistration.getPort(); @@ -342,15 +342,22 @@ * Returns only after shutdown is complete. */ public void shutdown() { - try { - infoServer.stop(); - } catch (Exception e) { + if (infoServer != null) { + try { + infoServer.stop(); + } catch (Exception e) { + } } this.shouldRun = false; - ((DataXceiveServer) this.dataXceiveServer.getRunnable()).kill(); - try { - this.storage.closeAll(); - } catch (IOException ie) { + if (dataXceiveServer != null) { + ((DataXceiveServer) this.dataXceiveServer.getRunnable()).kill(); + this.dataXceiveServer.interrupt(); + } + if (storage != null) { + try { + this.storage.closeAll(); + } catch (IOException ie) { + } } } @@ -539,7 +546,6 @@ * Hadoop IPC mechanism. */ class DataXceiveServer implements Runnable { - boolean shouldListen = true; ServerSocket ss; public DataXceiveServer(ServerSocket ss) { this.ss = ss; @@ -549,7 +555,7 @@ */ public void run() { try { - while (shouldListen) { + while (shouldRun) { Socket s = ss.accept(); //s.setSoTimeout(READ_TIMEOUT); data.checkDataDir(); @@ -566,7 +572,8 @@ } } public void kill() { - this.shouldListen = false; + assert shouldRun == false : + "shoudRun should be set to false before killing"; try { this.ss.close(); } catch (IOException iex) { @@ -1071,16 +1078,16 @@ LOG.info("Finishing DataNode in: "+data); } - private static ArrayList dataNodeList = new ArrayList(); - private static ArrayList dataNodeThreadList = new ArrayList(); + private static ArrayList<DataNode> dataNodeList = new ArrayList<DataNode>(); + private static ArrayList<Thread> dataNodeThreadList = new ArrayList<Thread>(); /** Start datanode daemon. */ public static void run(Configuration conf, String networkLoc) throws IOException { String[] dataDirs = conf.getStrings("dfs.data.dir"); DataNode dn = makeInstance(networkLoc, dataDirs, conf); - dataNodeList.add(dn); if (dn != null) { + dataNodeList.add(dn); Thread t = new Thread(dn, "DataNode: [" + StringUtils.arrayToString(dataDirs) + "]"); t.setDaemon(true); // needed for JUnit testing @@ -1090,15 +1097,14 @@ } /** - * Shut down all datanodes that where started via the run(conf) method. + * Shut down all datanodes that where started via the + * run(conf,networkLoc) method. * Returns only after shutdown is complete. */ public static void shutdownAll(){ - if(!dataNodeList.isEmpty()){ - for (Iterator iterator = dataNodeList.iterator(); iterator.hasNext();) { - DataNode dataNode = (DataNode) iterator.next(); - dataNode.shutdown(); - } + while (!dataNodeList.isEmpty()) { + dataNodeList.remove(0).shutdown(); + dataNodeThreadList.remove(0).interrupt(); } } @@ -1113,12 +1119,7 @@ Thread t = (Thread) dataNodeThreadList.remove(dataNodeThreadList.size()-1); try { t.join(); - } catch (InterruptedException e) { - if (Thread.currentThread().isInterrupted()) { - // did someone knock? - return; - } - } + } catch (InterruptedException e) {} } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=523062&r1=523061&r2=523062 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Tue Mar 27 13:17:25 2007 @@ -174,7 +174,7 @@ Daemon lmthread = null; // LeaseMonitor thread Daemon smmthread = null; // SafeModeMonitor thread Daemon replthread = null; // Replication thread - boolean fsRunning = true; + volatile boolean fsRunning = true; long systemStart = 0; // The maximum number of replicates we should allow for a single block @@ -302,20 +302,22 @@ * them to finish, but a short timeout returns control back to caller. */ public void close() { - synchronized (this) { fsRunning = false; - } try { - pendingReplications.stop(); - infoServer.stop(); - hbthread.join(3000); - replthread.join(3000); - dnthread.join(3000); + if (pendingReplications != null) pendingReplications.stop(); + if (infoServer != null) infoServer.stop(); + if (hbthread != null) hbthread.interrupt(); + if (replthread != null) replthread.interrupt(); + if (dnthread != null) dnthread.interrupt(); + if (smmthread != null) smmthread.interrupt(); } catch (InterruptedException ie) { } finally { // using finally to ensure we also wait for lease daemon try { - lmthread.join(3000); + if (lmthread != null) { + lmthread.interrupt(); + lmthread.join(3000); + } } catch (InterruptedException ie) { } finally { try { @@ -3710,7 +3712,7 @@ /** */ public void run() { - while( ! safeMode.canLeave() ) { + while (fsRunning && !safeMode.canLeave()) { try { Thread.sleep(recheckInterval); } catch (InterruptedException ie) { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java?view=diff&rev=523062&r1=523061&r2=523062 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java Tue Mar 27 13:17:25 2007 @@ -38,7 +38,7 @@ private Map<Block, PendingBlockInfo> pendingReplications; private ArrayList<Block> timedOutItems; Daemon timerThread = null; - private boolean fsRunning = true; + private volatile boolean fsRunning = true; // // It might take anywhere between 5 to 10 minutes before Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java?view=diff&rev=523062&r1=523061&r2=523062 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java Tue Mar 27 13:17:25 2007 @@ -250,13 +250,7 @@ writeConfigFile(localFileSys, excludeFile, null); MiniDFSCluster cluster = new MiniDFSCluster(conf, numDatanodes, true, null); - // Now wait for 15 seconds to give datanodes chance to register - // themselves and to report heartbeat - try { - Thread.sleep(15000L); - } catch (InterruptedException e) { - // nothing - } + cluster.waitActive(); InetSocketAddress addr = new InetSocketAddress("localhost", cluster.getNameNodePort()); DFSClient client = new DFSClient(addr, conf);