Repository: tez Updated Branches: refs/heads/master cac9237b5 -> d0bafcef5
TEZ-3486. COMBINE_OUTPUT_RECORDS/COMBINE_INPUT_RECORDS are not correct (Eric Badger via jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d0bafcef Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d0bafcef Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d0bafcef Branch: refs/heads/master Commit: d0bafcef5617928d722d9763d53b68137bac192b Parents: cac9237 Author: Jonathan Eagles <[email protected]> Authored: Thu Nov 3 13:00:10 2016 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Thu Nov 3 13:00:10 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 3 + .../tez/mapreduce/combine/MRCombiner.java | 24 ++++--- .../tez/mapreduce/combine/TestMRCombiner.java | 73 ++++++++++++++++++++ .../runtime/library/common/ValuesIterator.java | 4 +- 4 files changed, 92 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/d0bafcef/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 48c0f1e..58a3346 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3486. COMBINE_OUTPUT_RECORDS/COMBINE_INPUT_RECORDS are not correct TEZ-3247. Add more unit test coverage for container reuse. TEZ-3215. Support for MultipleOutputs. TEZ-3097. Flaky test: TestCommit.testDAGCommitStartedEventFail_OnDAGSuccess. @@ -139,6 +140,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3486. COMBINE_OUTPUT_RECORDS/COMBINE_INPUT_RECORDS are not correct TEZ-3097. Flaky test: TestCommit.testDAGCommitStartedEventFail_OnDAGSuccess. TEZ-3437. Improve synchronization and the progress report behavior for Inputs from TEZ-3317. TEZ-3317. Speculative execution starts too early due to 0 progress. @@ -640,6 +642,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3486. COMBINE_OUTPUT_RECORDS/COMBINE_INPUT_RECORDS are not correct TEZ-3437. Improve synchronization and the progress report behavior for Inputs from TEZ-3317. TEZ-3317. Speculative execution starts too early due to 0 progress. TEZ-3452. Auto-reduce parallelism calculation can overflow with large inputs http://git-wip-us.apache.org/repos/asf/tez/blob/d0bafcef/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java index 5ad3136..9514215 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java @@ -71,8 +71,8 @@ public class MRCombiner implements Combiner { private final RawComparator<?> comparator; private final boolean useNewApi; - private final TezCounter combineInputKeyCounter; - private final TezCounter combineInputValueCounter; + private final TezCounter combineInputRecordsCounter; + private final TezCounter combineOutputRecordsCounter; private final MRTaskReporter reporter; private final TaskAttemptID mrTaskAttemptID; @@ -95,8 +95,8 @@ public class MRCombiner implements Combiner { this.useNewApi = ConfigUtils.useNewApi(conf); - combineInputKeyCounter = taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS); - combineInputValueCounter = taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS); + combineInputRecordsCounter = taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS); + combineOutputRecordsCounter = taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS); boolean isMap = conf.getBoolean(MRConfig.IS_MAP_PROCESSOR,false); this.mrTaskAttemptID = new TaskAttemptID( @@ -130,6 +130,7 @@ public class MRCombiner implements Combiner { @Override public void collect(Object key, Object value) throws IOException { writer.append(key, value); + combineOutputRecordsCounter.increment(1); } }; @@ -145,7 +146,7 @@ public class MRCombiner implements Combiner { Class<KEY> keyClass, Class<VALUE> valClass, RawComparator<KEY> comparator) throws IOException { super(rawIter, comparator, keyClass, valClass, conf, - combineInputKeyCounter, combineInputValueCounter); + null, combineInputRecordsCounter); } } @@ -161,6 +162,7 @@ public class MRCombiner implements Combiner { public void write(Object key, Object value) throws IOException, InterruptedException { writer.append(key, value); + combineOutputRecordsCounter.increment(1); } @Override @@ -180,8 +182,8 @@ public class MRCombiner implements Combiner { conf, mrTaskAttemptID, rawIter, - new MRCounters.MRCounter(combineInputKeyCounter), - new MRCounters.MRCounter(combineInputValueCounter), + new MRCounters.MRCounter(combineInputRecordsCounter), + new MRCounters.MRCounter(combineOutputRecordsCounter), recordWriter, reporter, (RawComparator)comparator, @@ -196,8 +198,8 @@ public class MRCombiner implements Combiner { Configuration conf, TaskAttemptID mrTaskAttemptID, final TezRawKeyValueIterator rawIter, - Counter combineInputKeyCounter, - Counter combineInputValueCounter, + Counter combineInputRecordsCounter, + Counter combineOutputRecordsCounter, RecordWriter<KEYOUT, VALUEOUT> recordWriter, MRTaskReporter reporter, RawComparator<KEYIN> comparator, @@ -233,8 +235,8 @@ public class MRCombiner implements Combiner { }; ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> rContext = new ReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>( - conf, mrTaskAttemptID, r, combineInputKeyCounter, - combineInputValueCounter, recordWriter, null, reporter, comparator, + conf, mrTaskAttemptID, r, null, + combineInputRecordsCounter, recordWriter, null, reporter, comparator, keyClass, valClass); org.apache.hadoop.mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context reducerContext = new WrappedReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>() http://git-wip-us.apache.org/repos/asf/tez/blob/d0bafcef/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/combine/TestMRCombiner.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/combine/TestMRCombiner.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/combine/TestMRCombiner.java index a92f8dd..a796e59 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/combine/TestMRCombiner.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/combine/TestMRCombiner.java @@ -29,6 +29,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.TaskCounter; import org.apache.hadoop.util.Progress; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.common.TezUtils; @@ -44,6 +45,8 @@ import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator; import org.junit.Test; import org.mockito.Mockito; +import static org.junit.Assert.assertEquals; + public class TestMRCombiner { @Test @@ -55,6 +58,10 @@ public class TestMRCombiner { MRCombiner combiner = new MRCombiner(taskContext); Writer writer = Mockito.mock(Writer.class); combiner.combine(new TezRawKeyValueIteratorTest(), writer); + long inputRecords = taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS).getValue(); + long outputRecords = taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS).getValue(); + assertEquals(6, inputRecords); + assertEquals(3, outputRecords); // verify combiner output keys and values verifyKeyAndValues(writer); } @@ -70,10 +77,46 @@ public class TestMRCombiner { MRCombiner combiner = new MRCombiner(taskContext); Writer writer = Mockito.mock(Writer.class); combiner.combine(new TezRawKeyValueIteratorTest(), writer); + long inputRecords = taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS).getValue(); + long outputRecords = taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS).getValue(); + assertEquals(6, inputRecords); + assertEquals(3, outputRecords); // verify combiner output keys and values verifyKeyAndValues(writer); } + @Test + public void testTop2RunOldCombiner() throws IOException, InterruptedException { + TezConfiguration conf = new TezConfiguration(); + setKeyAndValueClassTypes(conf); + conf.setClass("mapred.combiner.class", Top2OldReducer.class, Object.class); + TaskContext taskContext = getTaskContext(conf); + MRCombiner combiner = new MRCombiner(taskContext); + Writer writer = Mockito.mock(Writer.class); + combiner.combine(new TezRawKeyValueIteratorTest(), writer); + long inputRecords = taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS).getValue(); + long outputRecords = taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS).getValue(); + assertEquals(6, inputRecords); + assertEquals(5, outputRecords); + } + + @Test + public void testTop2RunNewCombiner() throws IOException, InterruptedException { + TezConfiguration conf = new TezConfiguration(); + setKeyAndValueClassTypes(conf); + conf.setBoolean("mapred.mapper.new-api", true); + conf.setClass(MRJobConfig.COMBINE_CLASS_ATTR, Top2NewReducer.class, + Object.class); + TaskContext taskContext = getTaskContext(conf); + MRCombiner combiner = new MRCombiner(taskContext); + Writer writer = Mockito.mock(Writer.class); + combiner.combine(new TezRawKeyValueIteratorTest(), writer); + long inputRecords = taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS).getValue(); + long outputRecords = taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS).getValue(); + assertEquals(6, inputRecords); + assertEquals(5, outputRecords); + } + private void setKeyAndValueClassTypes(TezConfiguration conf) { conf.setClass(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class, Object.class); @@ -185,4 +228,34 @@ public class TestMRCombiner { context.write(new Text(key.toString()), new IntWritable(count)); } } + + private static class Top2OldReducer extends OldReducer { + @Override + public void reduce(Text key, Iterator<IntWritable> value, + OutputCollector<Text, IntWritable> collector, Reporter reporter) + throws IOException { + int i = 0; + while (value.hasNext()) { + int val = value.next().get(); + if (i++ < 2) { + collector.collect(new Text(key.toString()), new IntWritable(val)); + } + } + } + } + + private static class Top2NewReducer extends NewReducer { + @Override + protected void reduce(Text key, Iterable<IntWritable> values, + Context context) throws IOException, InterruptedException { + int i = 0; + for (IntWritable value : values) { + if (i++ < 2) { + context.write(new Text(key.toString()), value); + } else { + break; + } + } + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/d0bafcef/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java index f4da742..7add8c5 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java @@ -189,7 +189,9 @@ public class ValuesIterator<KEY,VALUE> { if (key == null || false == hasMoreValues) { // invariant: more=true & there are no more values in an existing key group // so this indicates start of new key group - inputKeyCounter.increment(1); + if(inputKeyCounter != null) { + inputKeyCounter.increment(1); + } ++keyCtr; } } else {
