Author: cdouglas
Date: Mon Jun 29 06:44:46 2009
New Revision: 789232
URL: http://svn.apache.org/viewvc?rev=789232&view=rev
Log:
MAPREDUCE-179. Update progress in new RecordReaders.
Modified:
hadoop/common/branches/branch-0.20/CHANGES.txt
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java
hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
Modified: hadoop/common/branches/branch-0.20/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/CHANGES.txt?rev=789232&r1=789231&r2=789232&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20/CHANGES.txt Mon Jun 29 06:44:46 2009
@@ -156,6 +156,8 @@
MAPREDUCE-657. Fix hardcoded filesystem problem in CompletedJobStatusStore.
(Amar Kamat via sharad)
+ MAPREDUCE-179. Update progress in new RecordReaders. (cdouglas)
+
Release 0.20.0 - 2009-04-15
INCOMPATIBLE CHANGES
Modified:
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=789232&r1=789231&r2=789232&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java
(original)
+++
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java
Mon Jun 29 06:44:46 2009
@@ -380,10 +380,12 @@
extends org.apache.hadoop.mapreduce.RecordReader<K,V> {
private final org.apache.hadoop.mapreduce.RecordReader<K,V> real;
private final org.apache.hadoop.mapreduce.Counter inputRecordCounter;
+ private final TaskReporter reporter;
NewTrackingRecordReader(org.apache.hadoop.mapreduce.RecordReader<K,V> real,
TaskReporter reporter) {
this.real = real;
+ this.reporter = reporter;
this.inputRecordCounter = reporter.getCounter(MAP_INPUT_RECORDS);
}
@@ -420,6 +422,7 @@
if (result) {
inputRecordCounter.increment(1);
}
+ reporter.setProgress(getProgress());
return result;
}
}
Modified:
hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java?rev=789232&r1=789231&r2=789232&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
(original)
+++
hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
Mon Jun 29 06:44:46 2009
@@ -37,9 +37,12 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
@@ -93,6 +96,33 @@
}
}
+ public static class TrackingTextInputFormat extends TextInputFormat {
+
+ public static class MonoProgressRecordReader extends LineRecordReader {
+ private float last = 0.0f;
+ private boolean progressCalled = false;
+ @Override
+ public float getProgress() {
+ progressCalled = true;
+ final float ret = super.getProgress();
+ assertTrue("getProgress decreased", ret >= last);
+ last = ret;
+ return ret;
+ }
+ @Override
+ public synchronized void close() throws IOException {
+ assertTrue("getProgress never called", progressCalled);
+ super.close();
+ }
+ }
+
+ @Override
+ public RecordReader<LongWritable, Text> createRecordReader(
+ InputSplit split, TaskAttemptContext context) {
+ return new MonoProgressRecordReader();
+ }
+ }
+
private void runWordCount(Configuration conf
) throws IOException,
InterruptedException,
@@ -109,6 +139,7 @@
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
+ job.setInputFormatClass(TrackingTextInputFormat.class);
FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
FileOutputFormat.setOutputPath(job, new Path(TEST_ROOT_DIR + "/out"));
assertTrue(job.waitForCompletion(false));