Author: ddas
Date: Thu Mar 19 11:20:47 2009
New Revision: 755927

URL: http://svn.apache.org/viewvc?rev=755927&view=rev
Log:
HADOOP-4842. Streaming now allows specifiying a command for the combiner. 
Contributed by Amareshwari Sriramadasu.

Added:
    
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeCombiner.java
    
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCombiner.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
    
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
    hadoop/core/trunk/src/docs/src/documentation/content/xdocs/streaming.xml

Modified: hadoop/core/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=755927&r1=755926&r2=755927&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Mar 19 11:20:47 2009
@@ -165,6 +165,9 @@
     HADOOP-5442. Paginate jobhistory display and added some search
     capabilities. (Amar Kamat via acmurthy) 
 
+    HADOOP-4842. Streaming now allows specifiying a command for the combiner.
+    (Amareshwari Sriramadasu via ddas)
+
   OPTIMIZATIONS
 
   BUG FIXES

Added: 
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeCombiner.java
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeCombiner.java?rev=755927&view=auto
==============================================================================
--- 
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeCombiner.java
 (added)
+++ 
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeCombiner.java
 Thu Mar 19 11:20:47 2009
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+
+import org.apache.hadoop.mapred.JobConf;
+
+public class PipeCombiner extends PipeReducer {
+  String getPipeCommand(JobConf job) {
+    String str = job.get("stream.combine.streamprocessor");
+    try {
+      if (str != null) {
+        return URLDecoder.decode(str, "UTF-8");
+      }
+    } catch (UnsupportedEncodingException e) {
+      System.err.println("stream.combine.streamprocessor" + 
+                         " in jobconf not found");
+    }
+    return null;
+  }
+  boolean getDoPipe() {
+    return (getPipeCommand(job_) != null);
+  }
+}

Modified: 
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?rev=755927&r1=755926&r2=755927&view=diff
==============================================================================
--- 
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
 (original)
+++ 
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
 Thu Mar 19 11:20:47 2009
@@ -517,7 +517,8 @@
     System.out.println("  -input    <path>     DFS input file(s) for the Map 
step");
     System.out.println("  -output   <path>     DFS output directory for the 
Reduce step");
     System.out.println("  -mapper   <cmd|JavaClassName>      The streaming 
command to run");
-    System.out.println("  -combiner <JavaClassName> Combiner has to be a Java 
class");
+    System.out.println("  -combiner <cmd|JavaClassName>" + 
+                       " The streaming command to run");
     System.out.println("  -reducer  <cmd|JavaClassName>      The streaming 
command to run");
     System.out.println("  -file     <file>     File/dir to be shipped in the 
Job jar file");
     System.out.println("  -inputformat 
TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName 
Optional.");
@@ -805,7 +806,9 @@
       if (c != null) {
         jobConf_.setCombinerClass(c);
       } else {
-        fail("-combiner : class not found : " + comCmd_);
+        jobConf_.setCombinerClass(PipeCombiner.class);
+        jobConf_.set("stream.combine.streamprocessor", URLEncoder.encode(
+                comCmd_, "UTF-8"));
       }
     }
 

Modified: 
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java?rev=755927&r1=755926&r2=755927&view=diff
==============================================================================
--- 
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
 (original)
+++ 
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
 Thu Mar 19 11:20:47 2009
@@ -39,7 +39,7 @@
   protected String reduce = StreamUtil.makeJavaCommand(UniqApp.class, new 
String[]{"R"});
   protected String outputExpect = 
"Rare\t\nRblue\t\nRbunnies\t\nRpink\t\nRred\t\nRroses\t\nRviolets\t\n";
 
-  private StreamJob job;
+  protected StreamJob job;
 
   public TestStreaming() throws IOException
   {

Added: 
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCombiner.java
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCombiner.java?rev=755927&view=auto
==============================================================================
--- 
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCombiner.java
 (added)
+++ 
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCombiner.java
 Thu Mar 19 11:20:47 2009
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.Counters;
+
+public class TestStreamingCombiner extends TestStreaming {
+
+  protected String combine = StreamUtil.makeJavaCommand(UniqApp.class, new 
String[]{""});
+  
+  public TestStreamingCombiner() throws IOException {
+    super();
+  }
+  
+  protected String[] genArgs() {
+    return new String[] {
+      "-input", INPUT_FILE.getAbsolutePath(),
+      "-output", OUTPUT_DIR.getAbsolutePath(),
+      "-mapper", map,
+      "-reducer", reduce,
+      "-combiner", combine,
+      "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
+    };
+  }
+
+  public void testCommandLine() throws IOException {
+    super.testCommandLine();
+    // validate combiner counters
+    String counterGrp = "org.apache.hadoop.mapred.Task$Counter";
+    Counters counters = job.running_.getCounters();
+    assertTrue(counters.findCounter(
+               counterGrp, "COMBINE_INPUT_RECORDS").getValue() != 0);
+    assertTrue(counters.findCounter(
+               counterGrp, "COMBINE_OUTPUT_RECORDS").getValue() != 0);
+  }
+
+  public static void main(String[]args) throws Exception
+  {
+    new TestStreamingCombiner().testCommandLine();
+  }
+
+}

Modified: 
hadoop/core/trunk/src/docs/src/documentation/content/xdocs/streaming.xml
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/docs/src/documentation/content/xdocs/streaming.xml?rev=755927&r1=755926&r2=755927&view=diff
==============================================================================
--- hadoop/core/trunk/src/docs/src/documentation/content/xdocs/streaming.xml 
(original)
+++ hadoop/core/trunk/src/docs/src/documentation/content/xdocs/streaming.xml 
Thu Mar 19 11:20:47 2009
@@ -125,7 +125,7 @@
    -inputformat JavaClassName
    -outputformat JavaClassName
    -partitioner JavaClassName
-   -combiner JavaClassName
+   -combiner streamingCommand or JavaClassName
 </source>
 <p>
 The class you supply for the input format should return key/value pairs of 
Text class. If you do not specify an input format class, the TextInputFormat is 
used as the default. Since the TextInputFormat returns keys of LongWritable 
class, which are actually not part of the input data, the keys will be 
discarded; only the values will be piped to the streaming mapper.


Reply via email to