Author: ddas
Date: Thu Dec 11 21:17:23 2008
New Revision: 725905
URL: http://svn.apache.org/viewvc?rev=725905&view=rev
Log:
HADOOP-4620. Fixes Streaming to handle well the cases of map/reduce with empty
input/output. Contributed by Ravi Gummadi.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapRunner.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=725905&r1=725904&r2=725905&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Dec 11 21:17:23 2008
@@ -1446,6 +1446,9 @@
HADOOP-4795. Prevent lease monitor getting into an infinite loop when
leases and the namespace tree does not match. (szetszwo)
+ HADOOP-4620. Fixes Streaming to handle well the cases of map/reduce with
empty
+ input/output. (Ravi Gummadi via ddas)
+
Release 0.18.2 - 2008-11-03
BUG FIXES
Modified:
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=725905&r1=725904&r2=725905&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
(original)
+++
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
Thu Dec 11 21:17:23 2008
@@ -288,6 +288,22 @@
void waitOutputThreads() {
try {
+ if (outThread_ == null) {
+ // This happens only when reducer has empty input(So reduce() is not
+ // called at all in this task). If reducer still generates output,
+ // which is very uncommon and we may not have to support this case.
+ // So we don't write this output to HDFS, but we consume/collect
+ // this output just to avoid reducer hanging forever.
+
+ OutputCollector collector = new OutputCollector() {
+ public void collect(Object key, Object value)
+ throws IOException {
+ //just consume it, no need to write the record anywhere
+ }
+ };
+ Reporter reporter = Reporter.NULL;//dummy reporter
+ startOutputThreads(collector, reporter);
+ }
int exitVal = sim.waitFor();
// how'd it go?
if (exitVal != 0) {
@@ -506,9 +522,11 @@
}
public void mapRedFinished() {
- logprintln("mapRedFinished");
try {
- if (!doPipe_) return;
+ if (!doPipe_) {
+ logprintln("mapRedFinished");
+ return;
+ }
try {
if (clientOut_ != null) {
clientOut_.flush();
@@ -518,6 +536,7 @@
}
waitOutputThreads();
if (sim != null) sim.destroy();
+ logprintln("mapRedFinished");
} catch (RuntimeException e) {
logStackTrace(e);
throw e;
Modified:
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java?rev=725905&r1=725904&r2=725905&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
(original)
+++
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
Thu Dec 11 21:17:23 2008
@@ -82,10 +82,6 @@
// (MapRed creates it reflectively)
public void map(Object key, Object value, OutputCollector output, Reporter
reporter) throws IOException {
- // init
- if (outThread_ == null) {
- startOutputThreads(output, reporter);
- }
if (outerrThreadsThrowable != null) {
mapRedFinished();
throw new IOException ("MROutput/MRErrThread failed:"
Modified:
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?rev=725905&r1=725904&r2=725905&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
(original)
+++
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
Thu Dec 11 21:17:23 2008
@@ -750,6 +750,7 @@
jobConf_.setMapperClass(c);
} else {
jobConf_.setMapperClass(PipeMapper.class);
+ jobConf_.setMapRunnerClass(PipeMapRunner.class);
jobConf_.set("stream.map.streamprocessor",
URLEncoder.encode(mapCmd_, "UTF-8"));
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapRunner.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapRunner.java?rev=725905&r1=725904&r2=725905&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapRunner.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapRunner.java Thu
Dec 11 21:17:23 2008
@@ -58,4 +58,7 @@
}
}
+ protected Mapper<K1, V1, K2, V2> getMapper() {
+ return mapper;
+ }
}