Author: sharad
Date: Wed Jun  3 06:29:23 2009
New Revision: 781276

URL: http://svn.apache.org/viewvc?rev=781276&view=rev
Log:
HADOOP-5882. Fixes a reducer progress update problem for new mapreduce api. 
Contributed by Amareshwari Sriramadasu.

Modified:
    hadoop/core/branches/branch-0.20/CHANGES.txt
    
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java

Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=781276&r1=781275&r2=781276&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Wed Jun  3 06:29:23 2009
@@ -115,6 +115,9 @@
     tasks that report back but the corresponding job hasn't been initialized
     yet. (Amar Kamat via ddas)
 
+    HADOOP-5882. Fixes a reducer progress update problem for new mapreduce
+    api. (Amareshwari Sriramadasu via sharad)
+
 Release 0.20.0 - 2009-04-15
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: 
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=781276&r1=781275&r2=781276&view=diff
==============================================================================
--- 
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
 (original)
+++ 
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
 Wed Jun  3 06:29:23 2009
@@ -523,6 +523,28 @@
                      Class<INVALUE> valueClass
                      ) throws IOException,InterruptedException, 
                               ClassNotFoundException {
+    // wrap value iterator to report progress.
+    final RawKeyValueIterator rawIter = rIter;
+    rIter = new RawKeyValueIterator() {
+      public void close() throws IOException {
+        rawIter.close();
+      }
+      public DataInputBuffer getKey() throws IOException {
+        return rawIter.getKey();
+      }
+      public Progress getProgress() {
+        return rawIter.getProgress();
+      }
+      public DataInputBuffer getValue() throws IOException {
+        return rawIter.getValue();
+      }
+      public boolean next() throws IOException {
+        boolean ret = rawIter.next();
+        reducePhase.set(rawIter.getProgress().get());
+        reporter.progress();
+        return ret;
+      }
+    };
     // make a task context so we can get the classes
     org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
       new org.apache.hadoop.mapreduce.TaskAttemptContext(job, getTaskID());


Reply via email to