Author: tomwhite Date: Tue Apr 17 05:18:31 2007 New Revision: 529574 URL: http://svn.apache.org/viewvc?view=rev&rev=529574 Log: HADOOP-1154. Fail a streaming task if the threads reading from or writing to the streaming process fail. Contributed by Koji Noguchi.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=529574&r1=529573&r2=529574 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Tue Apr 17 05:18:31 2007 @@ -219,6 +219,9 @@ 66. HADOOP-1224. Fix "Browse the filesystem" link to no longer point to dead datanodes. (Enis Soztutar via tomwhite) +67. HADOOP-1154. Fail a streaming task if the threads reading from or + writing to the streaming process fail. (Koji Noguchi via tomwhite) + Release 0.12.3 - 2007-04-06 Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?view=diff&rev=529574&r1=529573&r2=529574 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Tue Apr 17 05:18:31 2007 @@ -515,6 +515,9 @@ } } catch (IOException io) { io.printStackTrace(log_); + outerrThreadsThrowable = io; + } catch (Throwable th) { + outerrThreadsThrowable = th; } logprintln("MROutputThread done"); } @@ -551,6 +554,9 @@ } } catch (IOException io) { logStackTrace(io); + outerrThreadsThrowable = io; + } catch (Throwable th) { + outerrThreadsThrowable = th; } } long lastStderrReport = 0; @@ -718,6 +724,8 @@ private String sideOutputURI_; private OutputStream sideEffectOut_; + + protected volatile Throwable outerrThreadsThrowable; String LOGNAME; PrintStream log_; Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java?view=diff&rev=529574&r1=529573&r2=529574 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java Tue Apr 17 05:18:31 2007 @@ -25,6 +25,7 @@ import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.Writable; @@ -68,6 +69,12 @@ // init if (outThread_ == null) { startOutputThreads(output, reporter); + } + if( outerrThreadsThrowable != null ) { + mapRedFinished(); + throw new IOException ("MROutput/MRErrThread failed:" + + StringUtils.stringifyException( + outerrThreadsThrowable)); } try { // 1/4 Hadoop in Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java?view=diff&rev=529574&r1=529573&r2=529574 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java Tue Apr 17 05:18:31 2007 @@ -27,6 +27,7 @@ import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.Writable; @@ -73,6 +74,12 @@ numRecRead_++; maybeLogRecord(); if (doPipe_) { + if( outerrThreadsThrowable != null ) { + mapRedFinished(); + throw new IOException ("MROutput/MRErrThread failed:" + + StringUtils.stringifyException( + outerrThreadsThrowable)); + } write(key); clientOut_.write('\t'); write(val);