Repository: tez Updated Branches: refs/heads/master bbe0f96a7 -> d5ac3b75f
TEZ-3849. Combiner+PipelinedSorter silently drops records (Jacob Tolar via kshukla) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d5ac3b75 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d5ac3b75 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d5ac3b75 Branch: refs/heads/master Commit: d5ac3b75f73b908b292f4f7e1a3d619696d957b8 Parents: bbe0f96 Author: Kuhu Shukla <[email protected]> Authored: Wed Oct 25 11:01:28 2017 -0500 Committer: Kuhu Shukla <[email protected]> Committed: Wed Oct 25 11:01:28 2017 -0500 ---------------------------------------------------------------------- .../processor/reduce/ReduceProcessor.java | 6 +++ .../tez/mapreduce/combine/TestMRCombiner.java | 11 ++++- .../common/sort/impl/PipelinedSorter.java | 50 ++++++++++++++++++-- .../library/common/sort/impl/TezMerger.java | 36 +++++++++----- .../sort/impl/TezRawKeyValueIterator.java | 11 ++++- .../common/sort/impl/dflt/DefaultSorter.java | 6 +++ .../input/OrderedGroupedInputLegacy.java | 5 ++ .../common/sort/impl/TestPipelinedSorter.java | 39 +++++++++++++++ 8 files changed, 148 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/d5ac3b75/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java index 4b79c78..63b168f 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java @@ -309,6 +309,12 @@ public class ReduceProcessor extends MRTask { public DataInputBuffer getValue() throws IOException { return rawIter.getValue(); } + + @Override + public boolean hasNext() throws IOException { + return rawIter.hasNext(); + } + public boolean next() throws IOException { boolean ret = rawIter.next(); reporter.setProgress(rawIter.getProgress().getProgress()); http://git-wip-us.apache.org/repos/asf/tez/blob/d5ac3b75/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/combine/TestMRCombiner.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/combine/TestMRCombiner.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/combine/TestMRCombiner.java index a796e59..7668d96 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/combine/TestMRCombiner.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/combine/TestMRCombiner.java @@ -152,7 +152,16 @@ public class TestMRCombiner { @Override public boolean next() throws IOException { - if (i++ < keys.length - 1) { + boolean hasNext = hasNext(); + if (hasNext) { + i += 1; + } + + return hasNext; + } + + public boolean hasNext() throws IOException { + if (i < (keys.length - 1)) { return true; } return false; http://git-wip-us.apache.org/repos/asf/tez/blob/d5ac3b75/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java index 88d10d0..07c2fe2 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java @@ -574,15 +574,14 @@ public class PipelinedSorter extends ExternalSorter { //write merged output to disk long segmentStart = out.getPos(); Writer writer = null; - boolean hasNext = kvIter.next(); + boolean hasNext = kvIter.hasNext(); if (hasNext || !sendEmptyPartitionDetails) { writer = new Writer(conf, out, keyClass, valClass, codec, spilledRecordsCounter, null, merger.needsRLE()); } if (combiner == null) { - while (hasNext) { + while (kvIter.next()) { writer.append(kvIter.getKey(), kvIter.getValue()); - hasNext = kvIter.next(); } } else { if (hasNext) { @@ -842,6 +841,7 @@ public class PipelinedSorter extends ExternalSorter { private interface PartitionedRawKeyValueIterator extends TezRawKeyValueIterator { int getPartition(); + Integer peekPartition(); } private static class BufferStreamWrapper extends OutputStream @@ -1129,6 +1129,11 @@ public class PipelinedSorter extends ExternalSorter { return true; } + @Override + public boolean hasNext() { + return (kvindex == maxindex); + } + public void close() { } @@ -1146,6 +1151,14 @@ public class PipelinedSorter extends ExternalSorter { return partition; } + public Integer peekPartition() { + if (!hasNext()) { + return null; + } else { + return kvmeta.get(span.offsetFor(kvindex + 1) + PARTITION); + } + } + @SuppressWarnings("unused") public int size() { return (maxindex - kvindex); @@ -1264,6 +1277,23 @@ public class PipelinedSorter extends ExternalSorter { return false; } + @Override + public boolean hasNext() throws IOException { + if (dirty || iter.hasNext()) { + Integer part; + if (dirty) { + part = iter.getPartition(); + } else { + part = iter.peekPartition(); + } + + if (part != null) { + return (part >>> (32 - partitionBits)) == partition; + } + } + return false; + } + public void reset(int partition) { this.partition = partition; } @@ -1403,6 +1433,20 @@ public class PipelinedSorter extends ExternalSorter { return false; } + @Override + public boolean hasNext() { + return peek() != null; + } + + public Integer peekPartition() { + if (!hasNext()) { + return null; + } else { + SpanIterator peek = peek(); + return peek.getPartition(); + } + } + public DataInputBuffer getKey() { return key; } public DataInputBuffer getValue() { return value; } public int getPartition() { return partition; } http://git-wip-us.apache.org/repos/asf/tez/blob/d5ac3b75/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java index 6eb9a40..0e18ead 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java @@ -638,19 +638,10 @@ public class TezMerger { } public boolean next() throws IOException { - if (size() == 0) + if (!hasNext()) { return false; - - if (minSegment != null) { - //minSegment is non-null for all invocations of next except the first - //one. For the first invocation, the priority queue is ready for use - //but for the subsequent invocations, first adjust the queue - adjustPriorityQueue(minSegment); - if (size() == 0) { - minSegment = null; - return false; - } } + minSegment = top(); long startPos = minSegment.getPosition(); KeyValueBuffer nextKey = minSegment.getKey(); @@ -1036,6 +1027,24 @@ public class TezMerger { return (hasNext != null) && (hasNext == KeyState.SAME_KEY); } + public boolean hasNext() throws IOException { + if (size() == 0) + return false; + + if (minSegment != null) { + //minSegment is non-null for all invocations of next except the first + //one. For the first invocation, the priority queue is ready for use + //but for the subsequent invocations, first adjust the queue + adjustPriorityQueue(minSegment); + if (size() == 0) { + minSegment = null; + return false; + } + } + + return true; + } + } private static class EmptyIterator implements TezRawKeyValueIterator { @@ -1061,6 +1070,11 @@ public class TezMerger { } @Override + public boolean hasNext() throws IOException { + return false; + } + + @Override public void close() throws IOException { } http://git-wip-us.apache.org/repos/asf/tez/blob/d5ac3b75/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezRawKeyValueIterator.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezRawKeyValueIterator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezRawKeyValueIterator.java index 4e2ce3a..683c9b9 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezRawKeyValueIterator.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezRawKeyValueIterator.java @@ -55,7 +55,16 @@ public interface TezRawKeyValueIterator { * @throws IOException */ boolean next() throws IOException; - + + /** + * Returns true if any items are left in the iterator. + * + * @return <code>true</code> if a call to next will succeed + * <code>false</code> otherwise. + * @throws IOException + */ + boolean hasNext() throws IOException; + /** * Closes the iterator so that the underlying streams can be closed. * http://git-wip-us.apache.org/repos/asf/tez/blob/d5ac3b75/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java index 268e237..85e0003 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java @@ -1122,6 +1122,12 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab this.end = end; current = start - 1; } + + @Override + public boolean hasNext() throws IOException { + return (current + 1) < end; + } + public boolean next() throws IOException { return ++current < end; } http://git-wip-us.apache.org/repos/asf/tez/blob/d5ac3b75/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedInputLegacy.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedInputLegacy.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedInputLegacy.java index 6ae156a..b697be5 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedInputLegacy.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedInputLegacy.java @@ -58,6 +58,11 @@ public class OrderedGroupedInputLegacy extends OrderedGroupedKVInput { } @Override + public boolean hasNext() throws IOException { + return false; + } + + @Override public void close() throws IOException { } http://git-wip-us.apache.org/repos/asf/tez/blob/d5ac3b75/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java index f85272b..d6f6273 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java @@ -39,9 +39,11 @@ import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.OutputContext; import org.apache.tez.runtime.api.OutputStatisticsReporter; +import org.apache.tez.runtime.api.TaskContext; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats; +import org.apache.tez.runtime.library.common.combine.Combiner; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl; import org.apache.tez.runtime.library.partitioner.HashPartitioner; @@ -439,6 +441,43 @@ public class TestPipelinedSorter { } @Test + public void testWithCombiner() throws IOException { + Configuration conf = getConf(); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS, DummyCombiner.class.getName()); + this.numOutputs = 5; + this.initialAvailableMem = 5 * 1024 * 1024; + conf.setInt(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 3); + PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, + initialAvailableMem); + + writeData(sorter, 1, 20); + + Path outputFile = sorter.finalOutputFile; + FileSystem fs = outputFile.getFileSystem(conf); + IFile.Reader reader = new IFile.Reader(fs, outputFile, null, null, null, false, -1, 4096); + verifyData(reader); + reader.close(); + + verifyCounters(sorter, outputContext); + } + + // for testWithCombiner + public static class DummyCombiner implements Combiner { + public DummyCombiner(TaskContext ctx) { + // do nothing + } + + @Override + public void combine(TezRawKeyValueIterator rawIter, IFile.Writer writer) throws InterruptedException, IOException { + while (rawIter.next()) { + writer.append(rawIter.getKey(), rawIter.getValue()); + } + } + } + + @Test public void testMultipleSpills_WithRLE() throws IOException { Configuration conf = getConf(); conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
