Repository: tez Updated Branches: refs/heads/master 020a7c873 -> 24475acc7
TEZ-3777. Avoid buffer copies by passing RLE flag to TezMerger from PipelinedSorter (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/24475acc Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/24475acc Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/24475acc Branch: refs/heads/master Commit: 24475acc772f23248938a89ff50766d88c4a7da7 Parents: 020a7c8 Author: Rajesh Balamohan <[email protected]> Authored: Wed Jun 28 12:49:04 2017 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Wed Jun 28 12:49:04 2017 +0530 ---------------------------------------------------------------------- .../common/sort/impl/PipelinedSorter.java | 9 +- .../library/common/sort/impl/TezMerger.java | 103 +++++++++++++------ .../common/sort/impl/TestPipelinedSorter.java | 52 ++++++++++ 3 files changed, 130 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/24475acc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java index 90bb4d7..3d4d29b 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java @@ -37,6 +37,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.runtime.library.api.IOInterruptedException; import org.slf4j.Logger; @@ -767,7 +768,7 @@ public class PipelinedSorter extends ExternalSorter { (RawComparator) ConfigUtils.getIntermediateOutputKeyComparator(conf), progressable, sortSegments, true, null, spilledRecordsCounter, additionalSpillBytesRead, - null); // Not using any Progress in TezMerger. Should just work. + null, merger.needsRLE()); // Not using any Progress in TezMerger. Should just work. //write merged output to disk long segmentStart = finalOut.getPos(); Writer writer = @@ -1269,6 +1270,12 @@ public class PipelinedSorter extends ExternalSorter { } } + @InterfaceAudience.Private + @VisibleForTesting + public boolean needsRLE() { + return merger.needsRLE(); + } + private final class SpanMerger implements PartitionedRawKeyValueIterator { InputByteBuffer key = new InputByteBuffer(); InputByteBuffer value = new InputByteBuffer(); http://git-wip-us.apache.org/repos/asf/tez/blob/24475acc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java index c811455..6eb9a40 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java @@ -130,6 +130,29 @@ public class TezMerger { public static <K extends Object, V extends Object> TezRawKeyValueIterator merge(Configuration conf, FileSystem fs, + Class keyClass, Class valueClass, + CompressionCodec codec, + List<Segment> segments, + int mergeFactor, Path tmpDir, + RawComparator comparator, Progressable reporter, + boolean sortSegments, + boolean considerFinalMergeForProgress, + TezCounter readsCounter, + TezCounter writesCounter, + TezCounter bytesReadCounter, + Progress mergePhase, boolean checkForSameKeys) + throws IOException, InterruptedException { + return new MergeQueue(conf, fs, segments, comparator, reporter, + sortSegments, codec, considerFinalMergeForProgress, checkForSameKeys). + merge(keyClass, valueClass, + mergeFactor, tmpDir, + readsCounter, writesCounter, + bytesReadCounter, + mergePhase); + } + + public static <K extends Object, V extends Object> + TezRawKeyValueIterator merge(Configuration conf, FileSystem fs, Class keyClass, Class valueClass, CompressionCodec codec, List<Segment> segments, @@ -424,17 +447,18 @@ public class TezMerger { @VisibleForTesting static class MergeQueue<K extends Object, V extends Object> extends PriorityQueue<Segment> implements TezRawKeyValueIterator { - Configuration conf; - FileSystem fs; - CompressionCodec codec; - boolean ifileReadAhead = TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT; - int ifileReadAheadLength = TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT; - int ifileBufferSize = TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT; - long recordsBeforeProgress = TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT; + final Configuration conf; + final FileSystem fs; + final CompressionCodec codec; + final boolean checkForSameKeys; + static final boolean ifileReadAhead = TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT; + static final int ifileReadAheadLength = TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT; + static final int ifileBufferSize = TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT; + static final long recordsBeforeProgress = TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT; List<Segment> segments = new ArrayList<Segment>(); - RawComparator comparator; + final RawComparator comparator; private long totalBytesProcessed; private float progPerByte; @@ -444,7 +468,7 @@ public class TezMerger { // used in calculating mergeProgress. private final boolean considerFinalMergeForProgress; - Progressable reporter; + final Progressable reporter; final DataInputBuffer key = new DataInputBuffer(); final DataInputBuffer value = new DataInputBuffer(); @@ -475,6 +499,7 @@ public class TezMerger { TezCounter mergedMapOutputsCounter) throws IOException { this.conf = conf; + this.checkForSameKeys = true; // this.recordsBeforeProgress = // conf.getLong(TezJobConfig.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS, // TezJobConfig.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT); @@ -500,9 +525,25 @@ public class TezMerger { Collections.sort(segments, segmentComparator); } - public MergeQueue(Configuration conf, FileSystem fs, + public MergeQueue(Configuration conf, FileSystem fs, List<Segment> segments, RawComparator comparator, Progressable reporter, boolean sortSegments, boolean considerFinalMergeForProgress) { + this(conf, fs, segments, comparator, reporter, sortSegments, null, + considerFinalMergeForProgress); + } + + public MergeQueue(Configuration conf, FileSystem fs, + List<Segment> segments, RawComparator comparator, + Progressable reporter, boolean sortSegments, CompressionCodec codec, + boolean considerFinalMergeForProgress) { + this(conf, fs, segments, comparator, reporter, sortSegments, null, + considerFinalMergeForProgress, true); + } + + public MergeQueue(Configuration conf, FileSystem fs, + List<Segment> segments, RawComparator comparator, + Progressable reporter, boolean sortSegments, CompressionCodec codec, + boolean considerFinalMergeForProgress, boolean checkForSameKeys) { this.conf = conf; this.fs = fs; this.comparator = comparator; @@ -512,13 +553,7 @@ public class TezMerger { if (sortSegments) { Collections.sort(segments, segmentComparator); } - } - - public MergeQueue(Configuration conf, FileSystem fs, - List<Segment> segments, RawComparator comparator, - Progressable reporter, boolean sortSegments, CompressionCodec codec, - boolean considerFinalMergeForProgress) { - this(conf, fs, segments, comparator, reporter, sortSegments, considerFinalMergeForProgress); + this.checkForSameKeys = checkForSameKeys; this.codec = codec; } @@ -544,24 +579,26 @@ public class TezMerger { private void adjustPriorityQueue(Segment reader) throws IOException{ long startPos = reader.getPosition(); - if (hasNext == null) { - /** - * hasNext can be null during first iteration & prevKey is initialized here. - * In cases of NO_KEY/NEW_KEY, we readjust the queue later. If new segment/file is found - * during this process, we need to compare keys for RLE across segment boundaries. - * prevKey can't be empty at that time (e.g custom comparators) - */ - populatePreviousKey(); - } else { - //indicates a key has been read already - if (hasNext != KeyState.SAME_KEY) { + if (checkForSameKeys) { + if (hasNext == null) { /** - * Store previous key before reading next for later key comparisons. - * If all keys in a segment are unique, it would always hit this code path and key copies - * are wasteful in such condition, as these comparisons are mainly done for RLE. - * TODO: When better stats are available, this condition can be avoided. + * hasNext can be null during first iteration & prevKey is initialized here. + * In cases of NO_KEY/NEW_KEY, we readjust the queue later. If new segment/file is found + * during this process, we need to compare keys for RLE across segment boundaries. + * prevKey can't be empty at that time (e.g custom comparators) */ populatePreviousKey(); + } else { + //indicates a key has been read already + if (hasNext != KeyState.SAME_KEY) { + /** + * Store previous key before reading next for later key comparisons. + * If all keys in a segment are unique, it would always hit this code path and key copies + * are wasteful in such condition, as these comparisons are mainly done for RLE. + * TODO: When better stats are available, this condition can be avoided. + */ + populatePreviousKey(); + } } } hasNext = reader.readRawKey(nextKey); @@ -589,7 +626,7 @@ public class TezMerger { */ void compareKeyWithNextTopKey(Segment current) throws IOException { Segment nextTop = top(); - if (nextTop != current) { + if (checkForSameKeys && nextTop != current) { //we have a different file. Compare it with previous key KeyValueBuffer nextKey = nextTop.getKey(); int compare = compare(nextKey, prevKey); http://git-wip-us.apache.org/repos/asf/tez/blob/24475acc/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java index e985292..15fae07 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java @@ -58,6 +58,7 @@ import java.util.Map; import java.util.TreeMap; import java.util.UUID; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.anyListOf; import static org.mockito.Mockito.atLeastOnce; @@ -367,6 +368,38 @@ public class TestPipelinedSorter { verifyCounters(sorter, outputContext); } + @Test + public void testMultipleSpills() throws IOException { + Configuration conf = getConf(); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); + this.numOutputs = 5; + this.initialAvailableMem = 5 * 1024 * 1024; + conf.setInt(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 3); + PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, + initialAvailableMem); + + writeData(sorter, 25000, 1000); + assertFalse("Expecting needsRLE to be false", sorter.needsRLE()); + verifyCounters(sorter, outputContext); + } + + @Test + public void testMultipleSpills_WithRLE() throws IOException { + Configuration conf = getConf(); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); + this.numOutputs = 5; + this.initialAvailableMem = 5 * 1024 * 1024; + conf.setInt(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 3); + PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, + initialAvailableMem); + + writeSimilarKeys(sorter, 25000, 1000, true); + assertTrue("Expecting needsRLE to be true", sorter.needsRLE()); + verifyCounters(sorter, outputContext); + } + public void basicTest2(int partitions, int[] numkeys, int[] keysize, long initialAvailableMem, int blockSize) throws IOException { this.numOutputs = partitions; // single output @@ -704,6 +737,25 @@ public class TestPipelinedSorter { writeData(sorter, numKeys, keyLen, true); } + // duplicate some of the keys + private void writeSimilarKeys(ExternalSorter sorter, int numKeys, int keyLen, + boolean autoClose) throws IOException { + sortedDataMap.clear(); + String keyStr = RandomStringUtils.randomAlphanumeric(keyLen); + for (int i = 0; i < numKeys; i++) { + if (i % 4 == 0) { + keyStr = RandomStringUtils.randomAlphanumeric(keyLen); + } + Text key = new Text(keyStr); + Text value = new Text(RandomStringUtils.randomAlphanumeric(keyLen)); + sorter.write(key, value); + sortedDataMap.put(key.toString(), value.toString()); //for verifying data later + } + if (autoClose) { + closeSorter(sorter); + } + } + private void writeData(ExternalSorter sorter, int numKeys, int keyLen, boolean autoClose) throws IOException { sortedDataMap.clear();
