Author: cutting
Date: Fri Dec  1 14:32:48 2006
New Revision: 481430

URL: http://svn.apache.org/viewvc?view=rev&rev=481430
Log:
HADOOP-728.  Fix contrib/streaming issues, including '-reducer=NONE'.  
Contributed by Sanjay.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
    
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=481430&r1=481429&r2=481430
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Dec  1 14:32:48 2006
@@ -146,6 +146,9 @@
 43. HADOOP-750.  Fix a potential race condition during mapreduce
     shuffle.  (omalley via cutting)
 
+44. HADOOP-728.  Fix contrib/streaming-related issues, including
+    '-reducer NONE'.  (Sanjay Dahiya via cutting)
+
 
 Release 0.8.0 - 2006-11-03
 

Modified: 
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?view=diff&rev=481430&r1=481429&r2=481430
==============================================================================
--- 
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
 (original)
+++ 
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
 Fri Dec  1 14:32:48 2006
@@ -36,6 +36,7 @@
 
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.PhasedFileSystem;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.util.StringUtils;
@@ -192,10 +193,6 @@
     }
   }
 
-  String makeUniqueFileSuffix() {
-    return "." + System.currentTimeMillis() + "." + job_.get("mapred.task.id");
-  }
-
   public void configure(JobConf job) {
     try {
       String argv = getPipeCommand(job);
@@ -259,20 +256,21 @@
         // See StreamJob.setOutputSpec(): if reducerNone_ aka optSideEffect 
then: 
         // client has renamed outputPath and saved the argv's original output 
path as:
         if (useSingleSideOutputURI_) {
-          sideEffectURI_ = new URI(sideOutputURI_);
+          finalOutputURI = new URI(sideOutputURI_);
           sideEffectPathFinal_ = null; // in-place, no renaming to final
         } else {
+          sideFs_ = new PhasedFileSystem(sideFs_, job);
           String sideOutputPath = job_.get("stream.sideoutput.dir"); // was: 
job_.getOutputPath() 
           String fileName = getSideEffectFileName(); // see HADOOP-444 for 
rationale
           sideEffectPathFinal_ = new Path(sideOutputPath, fileName);
-          sideEffectURI_ = new URI(sideEffectPathFinal_ + 
makeUniqueFileSuffix()); // implicit dfs: 
+          finalOutputURI = new URI(sideEffectPathFinal_.toString()); // 
implicit dfs: 
         }
         // apply default scheme
-        if(sideEffectURI_.getScheme() == null) {
-          sideEffectURI_ = new URI("file", 
sideEffectURI_.getSchemeSpecificPart(), null);
+        if(finalOutputURI.getScheme() == null) {
+          finalOutputURI = new URI("file", 
finalOutputURI.getSchemeSpecificPart(), null);
         }
         boolean allowSocket = useSingleSideOutputURI_;
-        sideEffectOut_ = getURIOutputStream(sideEffectURI_, allowSocket);
+        sideEffectOut_ = getURIOutputStream(finalOutputURI, allowSocket);
       }
 
       // 
@@ -292,7 +290,7 @@
           f = null;
       }
       logprintln("PipeMapRed exec " + Arrays.asList(argvSplit));
-      logprintln("sideEffectURI_=" + sideEffectURI_);
+      logprintln("sideEffectURI_=" + finalOutputURI);
 
       Environment childEnv = (Environment) StreamUtil.env().clone();
       addJobConfToEnvironment(job_, childEnv);
@@ -505,6 +503,7 @@
           if (optSideEffect_) {
             sideEffectOut_.write(answer);
             sideEffectOut_.write('\n');
+            sideEffectOut_.flush();
           } else {
             splitKeyVal(answer, key, val);
             output.collect(key, val);
@@ -576,17 +575,11 @@
       waitOutputThreads();
       try {
         if (optSideEffect_) {
-          logprintln("closing " + sideEffectURI_);
+          logprintln("closing " + finalOutputURI);
           if (sideEffectOut_ != null) sideEffectOut_.close();
-          logprintln("closed  " + sideEffectURI_);
-          if (useSingleSideOutputURI_) {
-            // With sideEffectPath_ we wrote in-place. 
-            // Possibly a named pipe set up by user or a socket.
-          } else {
-            boolean del = sideFs_.delete(sideEffectPathFinal_);
-            logprintln("deleted  (" + del + ") " + sideEffectPathFinal_);
-            sideFs_.rename(new Path(sideEffectURI_.getSchemeSpecificPart()), 
sideEffectPathFinal_);
-            logprintln("renamed  " + sideEffectPathFinal_);
+          logprintln("closed  " + finalOutputURI);
+          if ( ! useSingleSideOutputURI_) {
+            ((PhasedFileSystem)sideFs_).commit(); 
           }
         }
       } catch (IOException io) {
@@ -725,7 +718,7 @@
   boolean optUseKey_ = true;
 
   private boolean optSideEffect_;
-  private URI sideEffectURI_;
+  private URI finalOutputURI;
   private Path sideEffectPathFinal_;
 
   private boolean useSingleSideOutputURI_;

Modified: 
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?view=diff&rev=481430&r1=481429&r2=481430
==============================================================================
--- 
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
 (original)
+++ 
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
 Fri Dec  1 14:32:48 2006
@@ -701,8 +701,6 @@
         } catch (URISyntaxException e) {
           throw (IOException) new IOException().initCause(e);
         }
-      } else {
-        mapsideoutURI_ = primary;
       }
       // an empty reduce output named "part-00002" will go here and not 
collide.
       channel0 = primary + ".NONE";

Modified: 
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java?view=diff&rev=481430&r1=481429&r2=481430
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java 
Fri Dec  1 14:32:48 2006
@@ -25,7 +25,7 @@
  * better to commit(Path) individual files when done. Otherwise
  * commit() can be used to commit all open files at once. 
  */
-class PhasedFileSystem extends FileSystem {
+public class PhasedFileSystem extends FileSystem {
 
   private FileSystem baseFS ;
   // Map from final file name to temporary file name
@@ -93,7 +93,9 @@
         }catch(IOException ioe){
           // ignore if already closed
         }
-        baseFS.delete( fInfo.getTempPath() ); 
+        if( baseFS.exists(fInfo.getTempPath())){
+          baseFS.delete( fInfo.getTempPath() );
+        }
         finalNameToFileInfo.remove(finalFile); 
       }
     }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=481430&r1=481429&r2=481430
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri 
Dec  1 14:32:48 2006
@@ -1030,9 +1030,11 @@
             // Delete temp directory in case any task used PhasedFileSystem.
             try{
               String systemDir = task.getConf().get("mapred.system.dir");
-              String taskTempDir = systemDir + "/" + 
-                  task.getJobId() + "/" + task.getTipId();
-              fs.delete(new Path(taskTempDir)) ;
+              Path taskTempDir = new Path(systemDir + "/" + 
+                  task.getJobId() + "/" + task.getTipId());
+              if( fs.exists(taskTempDir)){
+                fs.delete(taskTempDir) ;
+              }
             }catch(IOException e){
               LOG.warn("Error in deleting reduce temporary output",e); 
             }


Reply via email to