Author: cdouglas
Date: Tue Sep 15 06:30:56 2009
New Revision: 815163
URL: http://svn.apache.org/viewvc?rev=815163&view=rev
Log:
MAPREDUCE-112. Add counters for reduce input, output records to the new API.
Contributed by Jothi Padmanabhan
Modified:
hadoop/common/branches/branch-0.20/CHANGES.txt
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Reducer.java
hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
Modified: hadoop/common/branches/branch-0.20/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/CHANGES.txt?rev=815163&r1=815162&r2=815163&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20/CHANGES.txt Tue Sep 15 06:30:56 2009
@@ -1,5 +1,12 @@
Hadoop Change Log
+Release 0.20.2 - Unreleased
+
+ BUG FIXES
+
+ MAPREDUCE-112. Add counters for reduce input, output records to the new
API.
+ (Jothi Padmanabhan via cdouglas)
+
Release 0.20.1 - 2009-09-01
INCOMPATIBLE CHANGES
Modified:
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=815163&r1=815162&r2=815163&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
(original)
+++
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Tue Sep 15 06:30:56 2009
@@ -553,11 +553,14 @@
org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output =
(org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE>)
outputFormat.getRecordWriter(taskContext);
+ org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW =
+ new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(output,
reduceOutputCounter);
job.setBoolean("mapred.skip.on", isSkipping());
org.apache.hadoop.mapreduce.Reducer.Context
reducerContext = createReduceContext(reducer, job, getTaskID(),
- rIter, reduceInputValueCounter,
- output, committer,
+ rIter, reduceInputKeyCounter,
+ reduceInputValueCounter,
+ trackedRW, committer,
reporter, comparator, keyClass,
valueClass);
reducer.run(reducerContext);
Modified:
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java?rev=815163&r1=815162&r2=815163&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java
(original)
+++
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java
Tue Sep 15 06:30:56 2009
@@ -1022,6 +1022,7 @@
org.apache.hadoop.mapreduce.TaskAttemptID.class,
RawKeyValueIterator.class,
org.apache.hadoop.mapreduce.Counter.class,
+ org.apache.hadoop.mapreduce.Counter.class,
org.apache.hadoop.mapreduce.RecordWriter.class,
org.apache.hadoop.mapreduce.OutputCommitter.class,
org.apache.hadoop.mapreduce.StatusReporter.class,
@@ -1041,7 +1042,8 @@
Configuration job,
org.apache.hadoop.mapreduce.TaskAttemptID taskId,
RawKeyValueIterator rIter,
- org.apache.hadoop.mapreduce.Counter inputCounter,
+ org.apache.hadoop.mapreduce.Counter inputKeyCounter,
+ org.apache.hadoop.mapreduce.Counter inputValueCounter,
org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output,
org.apache.hadoop.mapreduce.OutputCommitter committer,
org.apache.hadoop.mapreduce.StatusReporter reporter,
@@ -1051,7 +1053,8 @@
try {
return contextConstructor.newInstance(reducer, job, taskId,
- rIter, inputCounter, output,
+ rIter, inputKeyCounter,
+ inputValueCounter, output,
committer, reporter, comparator,
keyClass, valueClass);
} catch (InstantiationException e) {
@@ -1206,7 +1209,7 @@
ReflectionUtils.newInstance(reducerClass, job);
org.apache.hadoop.mapreduce.Reducer.Context
reducerContext = createReduceContext(reducer, job, taskId,
- iterator, inputCounter,
+ iterator, null, inputCounter,
new OutputConverter(collector),
committer,
reporter, comparator, keyClass,
Modified:
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java?rev=815163&r1=815162&r2=815163&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java
(original)
+++
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java
Tue Sep 15 06:30:56 2009
@@ -41,7 +41,8 @@
public class ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
private RawKeyValueIterator input;
- private Counter inputCounter;
+ private Counter inputKeyCounter;
+ private Counter inputValueCounter;
private RawComparator<KEYIN> comparator;
private KEYIN key; // current key
private VALUEIN value; // current value
@@ -57,7 +58,8 @@
public ReduceContext(Configuration conf, TaskAttemptID taskid,
RawKeyValueIterator input,
- Counter inputCounter,
+ Counter inputKeyCounter,
+ Counter inputValueCounter,
RecordWriter<KEYOUT,VALUEOUT> output,
OutputCommitter committer,
StatusReporter reporter,
@@ -67,7 +69,8 @@
) throws InterruptedException, IOException{
super(conf, taskid, output, committer, reporter);
this.input = input;
- this.inputCounter = inputCounter;
+ this.inputKeyCounter = inputKeyCounter;
+ this.inputValueCounter = inputValueCounter;
this.comparator = comparator;
SerializationFactory serializationFactory = new SerializationFactory(conf);
this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
@@ -83,6 +86,9 @@
nextKeyValue();
}
if (hasMore) {
+ if (inputKeyCounter != null) {
+ inputKeyCounter.increment(1);
+ }
return nextKeyValue();
} else {
return false;
@@ -109,7 +115,6 @@
buffer.reset(next.getData(), next.getPosition(), next.getLength());
value = valueDeserializer.deserialize(value);
hasMore = input.next();
- inputCounter.increment(1);
if (hasMore) {
next = input.getKey();
nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0,
@@ -121,6 +126,7 @@
} else {
nextKeyIsSame = false;
}
+ inputValueCounter.increment(1);
return true;
}
@@ -189,4 +195,4 @@
Iterable<VALUEIN> getValues() throws IOException, InterruptedException {
return iterable;
}
-}
\ No newline at end of file
+}
Modified:
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Reducer.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Reducer.java?rev=815163&r1=815162&r2=815163&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Reducer.java
(original)
+++
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Reducer.java
Tue Sep 15 06:30:56 2009
@@ -121,7 +121,8 @@
extends ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
public Context(Configuration conf, TaskAttemptID taskid,
RawKeyValueIterator input,
- Counter inputCounter,
+ Counter inputKeyCounter,
+ Counter inputValueCounter,
RecordWriter<KEYOUT,VALUEOUT> output,
OutputCommitter committer,
StatusReporter reporter,
@@ -129,7 +130,8 @@
Class<KEYIN> keyClass,
Class<VALUEIN> valueClass
) throws IOException, InterruptedException {
- super(conf, taskid, input, inputCounter, output, committer, reporter,
+ super(conf, taskid, input, inputKeyCounter, inputValueCounter,
+ output, committer, reporter,
comparator, keyClass, valueClass);
}
}
@@ -175,4 +177,4 @@
}
cleanup(context);
}
-}
\ No newline at end of file
+}
Modified:
hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java?rev=815163&r1=815162&r2=815163&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
(original)
+++
hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
Tue Sep 15 06:30:56 2009
@@ -152,9 +152,14 @@
"REDUCE_INPUT_RECORDS").getValue();
long mapOut = ctrs.findCounter(COUNTER_GROUP,
"MAP_OUTPUT_RECORDS").getValue();
+ long reduceOut = ctrs.findCounter(COUNTER_GROUP,
+ "REDUCE_OUTPUT_RECORDS").getValue();
+ long reduceGrps = ctrs.findCounter(COUNTER_GROUP,
+ "REDUCE_INPUT_GROUPS").getValue();
assertEquals("map out = combine in", mapOut, combineIn);
assertEquals("combine out = reduce in", combineOut, reduceIn);
assertTrue("combine in > combine out", combineIn > combineOut);
+ assertEquals("reduce groups = reduce out", reduceGrps, reduceOut);
String group = "Random Group";
CounterGroup ctrGrp = ctrs.getGroup(group);
assertEquals(0, ctrGrp.size());