Author: cutting Date: Fri Dec 1 14:32:48 2006 New Revision: 481430 URL: http://svn.apache.org/viewvc?view=rev&rev=481430 Log: HADOOP-728. Fix contrib/streaming issues, including '-reducer=NONE'. Contributed by Sanjay.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=481430&r1=481429&r2=481430 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Fri Dec 1 14:32:48 2006 @@ -146,6 +146,9 @@ 43. HADOOP-750. Fix a potential race condition during mapreduce shuffle. (omalley via cutting) +44. HADOOP-728. Fix contrib/streaming-related issues, including + '-reducer NONE'. (Sanjay Dahiya via cutting) + Release 0.8.0 - 2006-11-03 Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?view=diff&rev=481430&r1=481429&r2=481430 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Fri Dec 1 14:32:48 2006 @@ -36,6 +36,7 @@ import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.PhasedFileSystem; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.util.StringUtils; @@ -192,10 +193,6 @@ } } - String makeUniqueFileSuffix() { - return "." + System.currentTimeMillis() + "." + job_.get("mapred.task.id"); - } - public void configure(JobConf job) { try { String argv = getPipeCommand(job); @@ -259,20 +256,21 @@ // See StreamJob.setOutputSpec(): if reducerNone_ aka optSideEffect then: // client has renamed outputPath and saved the argv's original output path as: if (useSingleSideOutputURI_) { - sideEffectURI_ = new URI(sideOutputURI_); + finalOutputURI = new URI(sideOutputURI_); sideEffectPathFinal_ = null; // in-place, no renaming to final } else { + sideFs_ = new PhasedFileSystem(sideFs_, job); String sideOutputPath = job_.get("stream.sideoutput.dir"); // was: job_.getOutputPath() String fileName = getSideEffectFileName(); // see HADOOP-444 for rationale sideEffectPathFinal_ = new Path(sideOutputPath, fileName); - sideEffectURI_ = new URI(sideEffectPathFinal_ + makeUniqueFileSuffix()); // implicit dfs: + finalOutputURI = new URI(sideEffectPathFinal_.toString()); // implicit dfs: } // apply default scheme - if(sideEffectURI_.getScheme() == null) { - sideEffectURI_ = new URI("file", sideEffectURI_.getSchemeSpecificPart(), null); + if(finalOutputURI.getScheme() == null) { + finalOutputURI = new URI("file", finalOutputURI.getSchemeSpecificPart(), null); } boolean allowSocket = useSingleSideOutputURI_; - sideEffectOut_ = getURIOutputStream(sideEffectURI_, allowSocket); + sideEffectOut_ = getURIOutputStream(finalOutputURI, allowSocket); } // @@ -292,7 +290,7 @@ f = null; } logprintln("PipeMapRed exec " + Arrays.asList(argvSplit)); - logprintln("sideEffectURI_=" + sideEffectURI_); + logprintln("sideEffectURI_=" + finalOutputURI); Environment childEnv = (Environment) StreamUtil.env().clone(); addJobConfToEnvironment(job_, childEnv); @@ -505,6 +503,7 @@ if (optSideEffect_) { sideEffectOut_.write(answer); sideEffectOut_.write('\n'); + sideEffectOut_.flush(); } else { splitKeyVal(answer, key, val); output.collect(key, val); @@ -576,17 +575,11 @@ waitOutputThreads(); try { if (optSideEffect_) { - logprintln("closing " + sideEffectURI_); + logprintln("closing " + finalOutputURI); if (sideEffectOut_ != null) sideEffectOut_.close(); - logprintln("closed " + sideEffectURI_); - if (useSingleSideOutputURI_) { - // With sideEffectPath_ we wrote in-place. - // Possibly a named pipe set up by user or a socket. - } else { - boolean del = sideFs_.delete(sideEffectPathFinal_); - logprintln("deleted (" + del + ") " + sideEffectPathFinal_); - sideFs_.rename(new Path(sideEffectURI_.getSchemeSpecificPart()), sideEffectPathFinal_); - logprintln("renamed " + sideEffectPathFinal_); + logprintln("closed " + finalOutputURI); + if ( ! useSingleSideOutputURI_) { + ((PhasedFileSystem)sideFs_).commit(); } } } catch (IOException io) { @@ -725,7 +718,7 @@ boolean optUseKey_ = true; private boolean optSideEffect_; - private URI sideEffectURI_; + private URI finalOutputURI; private Path sideEffectPathFinal_; private boolean useSingleSideOutputURI_; Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?view=diff&rev=481430&r1=481429&r2=481430 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Fri Dec 1 14:32:48 2006 @@ -701,8 +701,6 @@ } catch (URISyntaxException e) { throw (IOException) new IOException().initCause(e); } - } else { - mapsideoutURI_ = primary; } // an empty reduce output named "part-00002" will go here and not collide. channel0 = primary + ".NONE"; Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java?view=diff&rev=481430&r1=481429&r2=481430 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java Fri Dec 1 14:32:48 2006 @@ -25,7 +25,7 @@ * better to commit(Path) individual files when done. Otherwise * commit() can be used to commit all open files at once. */ -class PhasedFileSystem extends FileSystem { +public class PhasedFileSystem extends FileSystem { private FileSystem baseFS ; // Map from final file name to temporary file name @@ -93,7 +93,9 @@ }catch(IOException ioe){ // ignore if already closed } - baseFS.delete( fInfo.getTempPath() ); + if( baseFS.exists(fInfo.getTempPath())){ + baseFS.delete( fInfo.getTempPath() ); + } finalNameToFileInfo.remove(finalFile); } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=481430&r1=481429&r2=481430 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri Dec 1 14:32:48 2006 @@ -1030,9 +1030,11 @@ // Delete temp directory in case any task used PhasedFileSystem. try{ String systemDir = task.getConf().get("mapred.system.dir"); - String taskTempDir = systemDir + "/" + - task.getJobId() + "/" + task.getTipId(); - fs.delete(new Path(taskTempDir)) ; + Path taskTempDir = new Path(systemDir + "/" + + task.getJobId() + "/" + task.getTipId()); + if( fs.exists(taskTempDir)){ + fs.delete(taskTempDir) ; + } }catch(IOException e){ LOG.warn("Error in deleting reduce temporary output",e); }