Author: cutting Date: Thu May 31 12:14:57 2007 New Revision: 543222 URL: http://svn.apache.org/viewvc?view=rev&rev=543222 Log: HADOOP-1332. Fix so that TaskTracker exits reliably during unit tests on Windows. Contributed by Owen.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=543222&r1=543221&r2=543222 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Thu May 31 12:14:57 2007 @@ -508,6 +508,9 @@ 129. HADOOP-1242. Improve handling of DFS upgrades. (Konstantin Shvachko via cutting) +130. HADOOP-1332. Fix so that TaskTracker exits reliably during unit + tests on Windows. (omalley via cutting) + Release 0.12.3 - 2007-04-06 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?view=diff&rev=543222&r1=543221&r2=543222 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Thu May 31 12:14:57 2007 @@ -389,16 +389,19 @@ */ private void runChild(String[] args, File dir) throws IOException { this.process = Runtime.getRuntime().exec(args, null, dir); + + Thread logStdErrThread = null; + Thread logStdOutThread = null; try { - new Thread() { - public void run() { - // Copy stderr of the process - logStream(process.getErrorStream(), taskStdErrLogWriter); - } - }.start(); - - // Copy stderr of the process; normally empty - logStream(process.getInputStream(), taskStdOutLogWriter); + // Copy stderr of the child-process via a thread + logStdErrThread = logStream((t.getTaskId() + " - " + "stderr"), + process.getErrorStream(), + taskStdErrLogWriter); + + // Copy stdout of the child-process via a thread + logStdOutThread = logStream((t.getTaskId() + " - " + "stdout"), + process.getInputStream(), + taskStdOutLogWriter); int exit_code = process.waitFor(); @@ -411,8 +414,21 @@ throw new IOException(e.toString()); } finally { kill(); - taskStdOutLogWriter.close(); - taskStdErrLogWriter.close(); + + // Kill both stdout/stderr copying threads + if (logStdErrThread != null) { + logStdErrThread.interrupt(); + try { + logStdErrThread.join(); + } catch (InterruptedException ie) {} + } + + if (logStdOutThread != null) { + logStdOutThread.interrupt(); + try { + logStdOutThread.join(); + } catch (InterruptedException ie) {} + } } } @@ -427,24 +443,47 @@ } /** + * Spawn a new thread to copy the child-jvm's stdout/stderr streams + * via a [EMAIL PROTECTED] TaskLog.Writer} + * + * @param threadName thread name + * @param stream child-jvm's stdout/stderr stream + * @param writer [EMAIL PROTECTED] TaskLog.Writer} used to copy the child-jvm's data + * @return Return the newly created thread */ - private void logStream(InputStream output, TaskLog.Writer taskLog) { - try { - byte[] buf = new byte[512]; - int n = 0; - while ((n = output.read(buf, 0, buf.length)) != -1) { - // Write out to the task's log - taskLog.write(buf, 0, n); - } - } catch (IOException e) { - LOG.warn(t.getTaskId()+" Error reading child output", e); - } finally { - try { - output.close(); - } catch (IOException e) { - LOG.warn(t.getTaskId()+" Error closing child output", e); + private Thread logStream(String threadName, + final InputStream stream, + final TaskLog.Writer taskLog) { + Thread loggerThread = new Thread() { + public void run() { + try { + byte[] buf = new byte[512]; + while (!Thread.interrupted()) { + while (stream.available() > 0) { + int n = stream.read(buf, 0, buf.length); + taskLog.write(buf, 0, n); + } + Thread.sleep(1000); + } + } catch (IOException e) { + LOG.warn(t.getTaskId()+" Error reading child output", e); + } catch (InterruptedException e) { + // expected + } finally { + try { + stream.close(); + taskLog.close(); + } catch (IOException e) { + LOG.warn(t.getTaskId()+" Error closing child output", e); + } + } } - } + }; + loggerThread.setName(threadName); + loggerThread.setDaemon(true); + loggerThread.start(); + + return loggerThread; } }