Repository: tez Updated Branches: refs/heads/branch-0.7 26b3832b8 -> 2e368a523
TEZ-3486. COMBINE_OUTPUT_RECORDS/COMBINE_INPUT_RECORDS are not correct (Eric Badger via jeagles) (cherry picked from commit d0bafcef5617928d722d9763d53b68137bac192b) (cherry picked from commit 07efe93b849b2f97f4d153dafb423d4b41f002df) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2e368a52 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2e368a52 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2e368a52 Branch: refs/heads/branch-0.7 Commit: 2e368a523b915ff0a427059ae4d757faf0c724ff Parents: 26b3832 Author: Jonathan Eagles <[email protected]> Authored: Thu Nov 3 13:00:10 2016 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Thu Nov 3 13:28:21 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../tez/mapreduce/combine/MRCombiner.java | 24 ++++--- .../tez/mapreduce/combine/TestMRCombiner.java | 73 ++++++++++++++++++++ .../runtime/library/common/ValuesIterator.java | 4 +- 4 files changed, 90 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/2e368a52/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 404a1e5..e27efda 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3486. COMBINE_OUTPUT_RECORDS/COMBINE_INPUT_RECORDS are not correct TEZ-3487. Improvements in travis yml file to get builds to work. TEZ-3483. Create basic travis yml file for Tez. TEZ-3437. Improve synchronization and the progress report behavior for Inputs from TEZ-3317. http://git-wip-us.apache.org/repos/asf/tez/blob/2e368a52/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java index 5ad3136..9514215 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java @@ -71,8 +71,8 @@ public class MRCombiner implements Combiner { private final RawComparator<?> comparator; private final boolean useNewApi; - private final TezCounter combineInputKeyCounter; - private final TezCounter combineInputValueCounter; + private final TezCounter combineInputRecordsCounter; + private final TezCounter combineOutputRecordsCounter; private final MRTaskReporter reporter; private final TaskAttemptID mrTaskAttemptID; @@ -95,8 +95,8 @@ public class MRCombiner implements Combiner { this.useNewApi = ConfigUtils.useNewApi(conf); - combineInputKeyCounter = taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS); - combineInputValueCounter = taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS); + combineInputRecordsCounter = taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS); + combineOutputRecordsCounter = taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS); boolean isMap = conf.getBoolean(MRConfig.IS_MAP_PROCESSOR,false); this.mrTaskAttemptID = new TaskAttemptID( @@ -130,6 +130,7 @@ public class MRCombiner implements Combiner { @Override public void collect(Object key, Object value) throws IOException { writer.append(key, value); + combineOutputRecordsCounter.increment(1); } }; @@ -145,7 +146,7 @@ public class MRCombiner implements Combiner { Class<KEY> keyClass, Class<VALUE> valClass, RawComparator<KEY> comparator) throws IOException { super(rawIter, comparator, keyClass, valClass, conf, - combineInputKeyCounter, combineInputValueCounter); + null, combineInputRecordsCounter); } } @@ -161,6 +162,7 @@ public class MRCombiner implements Combiner { public void write(Object key, Object value) throws IOException, InterruptedException { writer.append(key, value); + combineOutputRecordsCounter.increment(1); } @Override @@ -180,8 +182,8 @@ public class MRCombiner implements Combiner { conf, mrTaskAttemptID, rawIter, - new MRCounters.MRCounter(combineInputKeyCounter), - new MRCounters.MRCounter(combineInputValueCounter), + new MRCounters.MRCounter(combineInputRecordsCounter), + new MRCounters.MRCounter(combineOutputRecordsCounter), recordWriter, reporter, (RawComparator)comparator, @@ -196,8 +198,8 @@ public class MRCombiner implements Combiner { Configuration conf, TaskAttemptID mrTaskAttemptID, final TezRawKeyValueIterator rawIter, - Counter combineInputKeyCounter, - Counter combineInputValueCounter, + Counter combineInputRecordsCounter, + Counter combineOutputRecordsCounter, RecordWriter<KEYOUT, VALUEOUT> recordWriter, MRTaskReporter reporter, RawComparator<KEYIN> comparator, @@ -233,8 +235,8 @@ public class MRCombiner implements Combiner { }; ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> rContext = new ReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>( - conf, mrTaskAttemptID, r, combineInputKeyCounter, - combineInputValueCounter, recordWriter, null, reporter, comparator, + conf, mrTaskAttemptID, r, null, + combineInputRecordsCounter, recordWriter, null, reporter, comparator, keyClass, valClass); org.apache.hadoop.mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context reducerContext = new WrappedReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>() http://git-wip-us.apache.org/repos/asf/tez/blob/2e368a52/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/combine/TestMRCombiner.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/combine/TestMRCombiner.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/combine/TestMRCombiner.java index a92f8dd..a796e59 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/combine/TestMRCombiner.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/combine/TestMRCombiner.java @@ -29,6 +29,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.TaskCounter; import org.apache.hadoop.util.Progress; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.common.TezUtils; @@ -44,6 +45,8 @@ import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator; import org.junit.Test; import org.mockito.Mockito; +import static org.junit.Assert.assertEquals; + public class TestMRCombiner { @Test @@ -55,6 +58,10 @@ public class TestMRCombiner { MRCombiner combiner = new MRCombiner(taskContext); Writer writer = Mockito.mock(Writer.class); combiner.combine(new TezRawKeyValueIteratorTest(), writer); + long inputRecords = taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS).getValue(); + long outputRecords = taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS).getValue(); + assertEquals(6, inputRecords); + assertEquals(3, outputRecords); // verify combiner output keys and values verifyKeyAndValues(writer); } @@ -70,10 +77,46 @@ public class TestMRCombiner { MRCombiner combiner = new MRCombiner(taskContext); Writer writer = Mockito.mock(Writer.class); combiner.combine(new TezRawKeyValueIteratorTest(), writer); + long inputRecords = taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS).getValue(); + long outputRecords = taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS).getValue(); + assertEquals(6, inputRecords); + assertEquals(3, outputRecords); // verify combiner output keys and values verifyKeyAndValues(writer); } + @Test + public void testTop2RunOldCombiner() throws IOException, InterruptedException { + TezConfiguration conf = new TezConfiguration(); + setKeyAndValueClassTypes(conf); + conf.setClass("mapred.combiner.class", Top2OldReducer.class, Object.class); + TaskContext taskContext = getTaskContext(conf); + MRCombiner combiner = new MRCombiner(taskContext); + Writer writer = Mockito.mock(Writer.class); + combiner.combine(new TezRawKeyValueIteratorTest(), writer); + long inputRecords = taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS).getValue(); + long outputRecords = taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS).getValue(); + assertEquals(6, inputRecords); + assertEquals(5, outputRecords); + } + + @Test + public void testTop2RunNewCombiner() throws IOException, InterruptedException { + TezConfiguration conf = new TezConfiguration(); + setKeyAndValueClassTypes(conf); + conf.setBoolean("mapred.mapper.new-api", true); + conf.setClass(MRJobConfig.COMBINE_CLASS_ATTR, Top2NewReducer.class, + Object.class); + TaskContext taskContext = getTaskContext(conf); + MRCombiner combiner = new MRCombiner(taskContext); + Writer writer = Mockito.mock(Writer.class); + combiner.combine(new TezRawKeyValueIteratorTest(), writer); + long inputRecords = taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS).getValue(); + long outputRecords = taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS).getValue(); + assertEquals(6, inputRecords); + assertEquals(5, outputRecords); + } + private void setKeyAndValueClassTypes(TezConfiguration conf) { conf.setClass(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class, Object.class); @@ -185,4 +228,34 @@ public class TestMRCombiner { context.write(new Text(key.toString()), new IntWritable(count)); } } + + private static class Top2OldReducer extends OldReducer { + @Override + public void reduce(Text key, Iterator<IntWritable> value, + OutputCollector<Text, IntWritable> collector, Reporter reporter) + throws IOException { + int i = 0; + while (value.hasNext()) { + int val = value.next().get(); + if (i++ < 2) { + collector.collect(new Text(key.toString()), new IntWritable(val)); + } + } + } + } + + private static class Top2NewReducer extends NewReducer { + @Override + protected void reduce(Text key, Iterable<IntWritable> values, + Context context) throws IOException, InterruptedException { + int i = 0; + for (IntWritable value : values) { + if (i++ < 2) { + context.write(new Text(key.toString()), value); + } else { + break; + } + } + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/2e368a52/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java index f4da742..7add8c5 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java @@ -189,7 +189,9 @@ public class ValuesIterator<KEY,VALUE> { if (key == null || false == hasMoreValues) { // invariant: more=true & there are no more values in an existing key group // so this indicates start of new key group - inputKeyCounter.increment(1); + if(inputKeyCounter != null) { + inputKeyCounter.increment(1); + } ++keyCtr; } } else {
