Author: omalley
Date: Fri Mar 4 03:29:23 2011
New Revision: 1077001
URL: http://svn.apache.org/viewvc?rev=1077001&view=rev
Log:
commit 30a4e464ec146926dbefe2d75bb4be063c584254
Author: Christopher Douglas <[email protected]>
Date: Tue Sep 15 06:30:56 2009 +0000
MAPREDUCE-112. Add counters for reduce input, output records to the new API.
Contributed by Jothi Padmanabhan
git-svn-id:
https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20@815163
13f79535-47bb-0310-9956-ffa450edef68
Modified:
hadoop/common/branches/branch-0.20-security-patches/CHANGES.txt
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Reducer.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
Modified: hadoop/common/branches/branch-0.20-security-patches/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/CHANGES.txt?rev=1077001&r1=1077000&r2=1077001&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security-patches/CHANGES.txt Fri Mar 4
03:29:23 2011
@@ -1,5 +1,12 @@
Hadoop Change Log
+Release 0.20.2 - Unreleased
+
+ BUG FIXES
+
+ MAPREDUCE-112. Add counters for reduce input, output records to the new
API.
+ (Jothi Padmanabhan via cdouglas)
+
Release 0.20.1 - 2009-09-01
INCOMPATIBLE CHANGES
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1077001&r1=1077000&r2=1077001&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Fri Mar 4 03:29:23 2011
@@ -554,11 +554,14 @@ class ReduceTask extends Task {
org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output =
(org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE>)
outputFormat.getRecordWriter(taskContext);
+ org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW =
+ new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(output,
reduceOutputCounter);
job.setBoolean("mapred.skip.on", isSkipping());
org.apache.hadoop.mapreduce.Reducer.Context
reducerContext = createReduceContext(reducer, job, getTaskID(),
- rIter, reduceInputValueCounter,
- output, committer,
+ rIter, reduceInputKeyCounter,
+ reduceInputValueCounter,
+ trackedRW, committer,
reporter, comparator, keyClass,
valueClass);
reducer.run(reducerContext);
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java?rev=1077001&r1=1077000&r2=1077001&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
Fri Mar 4 03:29:23 2011
@@ -1031,6 +1031,7 @@ abstract public class Task implements Wr
org.apache.hadoop.mapreduce.TaskAttemptID.class,
RawKeyValueIterator.class,
org.apache.hadoop.mapreduce.Counter.class,
+ org.apache.hadoop.mapreduce.Counter.class,
org.apache.hadoop.mapreduce.RecordWriter.class,
org.apache.hadoop.mapreduce.OutputCommitter.class,
org.apache.hadoop.mapreduce.StatusReporter.class,
@@ -1050,7 +1051,8 @@ abstract public class Task implements Wr
Configuration job,
org.apache.hadoop.mapreduce.TaskAttemptID taskId,
RawKeyValueIterator rIter,
- org.apache.hadoop.mapreduce.Counter inputCounter,
+ org.apache.hadoop.mapreduce.Counter inputKeyCounter,
+ org.apache.hadoop.mapreduce.Counter inputValueCounter,
org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output,
org.apache.hadoop.mapreduce.OutputCommitter committer,
org.apache.hadoop.mapreduce.StatusReporter reporter,
@@ -1060,7 +1062,8 @@ abstract public class Task implements Wr
try {
return contextConstructor.newInstance(reducer, job, taskId,
- rIter, inputCounter, output,
+ rIter, inputKeyCounter,
+ inputValueCounter, output,
committer, reporter, comparator,
keyClass, valueClass);
} catch (InstantiationException e) {
@@ -1215,7 +1218,7 @@ abstract public class Task implements Wr
ReflectionUtils.newInstance(reducerClass, job);
org.apache.hadoop.mapreduce.Reducer.Context
reducerContext = createReduceContext(reducer, job, taskId,
- iterator, inputCounter,
+ iterator, null, inputCounter,
new OutputConverter(collector),
committer,
reporter, comparator, keyClass,
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java?rev=1077001&r1=1077000&r2=1077001&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java
Fri Mar 4 03:29:23 2011
@@ -41,7 +41,8 @@ import org.apache.hadoop.util.Progressab
public class ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
private RawKeyValueIterator input;
- private Counter inputCounter;
+ private Counter inputKeyCounter;
+ private Counter inputValueCounter;
private RawComparator<KEYIN> comparator;
private KEYIN key; // current key
private VALUEIN value; // current value
@@ -57,7 +58,8 @@ public class ReduceContext<KEYIN,VALUEIN
public ReduceContext(Configuration conf, TaskAttemptID taskid,
RawKeyValueIterator input,
- Counter inputCounter,
+ Counter inputKeyCounter,
+ Counter inputValueCounter,
RecordWriter<KEYOUT,VALUEOUT> output,
OutputCommitter committer,
StatusReporter reporter,
@@ -67,7 +69,8 @@ public class ReduceContext<KEYIN,VALUEIN
) throws InterruptedException, IOException{
super(conf, taskid, output, committer, reporter);
this.input = input;
- this.inputCounter = inputCounter;
+ this.inputKeyCounter = inputKeyCounter;
+ this.inputValueCounter = inputValueCounter;
this.comparator = comparator;
SerializationFactory serializationFactory = new SerializationFactory(conf);
this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
@@ -83,6 +86,9 @@ public class ReduceContext<KEYIN,VALUEIN
nextKeyValue();
}
if (hasMore) {
+ if (inputKeyCounter != null) {
+ inputKeyCounter.increment(1);
+ }
return nextKeyValue();
} else {
return false;
@@ -109,7 +115,6 @@ public class ReduceContext<KEYIN,VALUEIN
buffer.reset(next.getData(), next.getPosition(), next.getLength());
value = valueDeserializer.deserialize(value);
hasMore = input.next();
- inputCounter.increment(1);
if (hasMore) {
next = input.getKey();
nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0,
@@ -121,6 +126,7 @@ public class ReduceContext<KEYIN,VALUEIN
} else {
nextKeyIsSame = false;
}
+ inputValueCounter.increment(1);
return true;
}
@@ -189,4 +195,4 @@ public class ReduceContext<KEYIN,VALUEIN
Iterable<VALUEIN> getValues() throws IOException, InterruptedException {
return iterable;
}
-}
\ No newline at end of file
+}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Reducer.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Reducer.java?rev=1077001&r1=1077000&r2=1077001&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Reducer.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Reducer.java
Fri Mar 4 03:29:23 2011
@@ -121,7 +121,8 @@ public class Reducer<KEYIN,VALUEIN,KEYOU
extends ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
public Context(Configuration conf, TaskAttemptID taskid,
RawKeyValueIterator input,
- Counter inputCounter,
+ Counter inputKeyCounter,
+ Counter inputValueCounter,
RecordWriter<KEYOUT,VALUEOUT> output,
OutputCommitter committer,
StatusReporter reporter,
@@ -129,7 +130,8 @@ public class Reducer<KEYIN,VALUEIN,KEYOU
Class<KEYIN> keyClass,
Class<VALUEIN> valueClass
) throws IOException, InterruptedException {
- super(conf, taskid, input, inputCounter, output, committer, reporter,
+ super(conf, taskid, input, inputKeyCounter, inputValueCounter,
+ output, committer, reporter,
comparator, keyClass, valueClass);
}
}
@@ -175,4 +177,4 @@ public class Reducer<KEYIN,VALUEIN,KEYOU
}
cleanup(context);
}
-}
\ No newline at end of file
+}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java?rev=1077001&r1=1077000&r2=1077001&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
Fri Mar 4 03:29:23 2011
@@ -152,9 +152,14 @@ public class TestMapReduceLocal extends
"REDUCE_INPUT_RECORDS").getValue();
long mapOut = ctrs.findCounter(COUNTER_GROUP,
"MAP_OUTPUT_RECORDS").getValue();
+ long reduceOut = ctrs.findCounter(COUNTER_GROUP,
+ "REDUCE_OUTPUT_RECORDS").getValue();
+ long reduceGrps = ctrs.findCounter(COUNTER_GROUP,
+ "REDUCE_INPUT_GROUPS").getValue();
assertEquals("map out = combine in", mapOut, combineIn);
assertEquals("combine out = reduce in", combineOut, reduceIn);
assertTrue("combine in > combine out", combineIn > combineOut);
+ assertEquals("reduce groups = reduce out", reduceGrps, reduceOut);
String group = "Random Group";
CounterGroup ctrGrp = ctrs.getGroup(group);
assertEquals(0, ctrGrp.size());