Repository: tez Updated Branches: refs/heads/master 3b669f895 -> 00508f898
TEZ-2643. Minimize number of empty spills in Pipelined Sorter (Saikat via rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/00508f89 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/00508f89 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/00508f89 Branch: refs/heads/master Commit: 00508f8989566c1a8723b51c8621c20789b05969 Parents: 3b669f8 Author: Rajesh Balamohan <[email protected]> Authored: Wed Sep 9 14:55:44 2015 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Wed Sep 9 14:55:44 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../common/sort/impl/PipelinedSorter.java | 52 ++++++++++++++------ .../common/sort/impl/TestPipelinedSorter.java | 18 +++++++ 3 files changed, 55 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/00508f89/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index bd04fb4..2439370 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ Release 0.8.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2643. Minimize number of empty spills in Pipelined Sorter TEZ-2783. Refactor analyzers to extend TezAnalyzerBase TEZ-2784. optimize TaskImpl.isFinished() TEZ-2788. Allow TezAnalyzerBase to parse SimpleHistory logs http://git-wip-us.apache.org/repos/asf/tez/blob/00508f89/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java index b35efc7..c4b2b3d 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java @@ -215,10 +215,10 @@ public class PipelinedSorter extends ExternalSorter { stopWatch.start(); // sort in the same thread, do not wait for the thread pool merger.add(span.sort(sorter)); - spill(); + boolean ret = spill(true); stopWatch.stop(); LOG.info("Time taken for spill " + (stopWatch.elapsedMillis()) + " ms"); - if (pipelinedShuffle) { + if (pipelinedShuffle && ret) { sendPipelinedShuffleEvents(); } //safe to reset the iterator @@ -425,28 +425,34 @@ public class PipelinedSorter extends ExternalSorter { } } - public void spill() throws IOException { - // create spill file - final long size = capacity + - + (partitions * APPROX_HEADER_LENGTH); - final TezSpillRecord spillRec = new TezSpillRecord(partitions); - final Path filename = - mapOutputFile.getSpillFileForWrite(numSpills, size); - spillFilePaths.put(numSpills, filename); - FSDataOutputStream out = rfs.create(filename, true, 4096); - + public boolean spill(boolean ignoreEmptySpills) throws IOException { + FSDataOutputStream out = null; try { try { - merger.ready(); // wait for all the future results from sort threads + boolean ret = merger.ready(); + // if merger returned false and ignore merge is true, + // then return directly without spilling + if (!ret && ignoreEmptySpills){ + return false; + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOG.info("Interrupted while waiting for mergers to complete"); throw new IOInterruptedException("Interrupted while waiting for mergers to complete", e); } + + // create spill file + final long size = capacity + + + (partitions * APPROX_HEADER_LENGTH); + final TezSpillRecord spillRec = new TezSpillRecord(partitions); + final Path filename = + mapOutputFile.getSpillFileForWrite(numSpills, size); + spillFilePaths.put(numSpills, filename); + out = rfs.create(filename, true, 4096); LOG.info("Spilling to " + filename.toString()); for (int i = 0; i < partitions; ++i) { if (isThreadInterrupted()) { - return; + return false; } TezRawKeyValueIterator kvIter = merger.filter(i); //write merged output to disk @@ -489,8 +495,11 @@ public class PipelinedSorter extends ExternalSorter { //No final merge. Set the number of files offered via shuffle-handler numShuffleChunks.setValue(numSpills); } + return true; } finally { - out.close(); + if (out != null) { + out.close(); + } } } @@ -524,7 +533,15 @@ public class PipelinedSorter extends ExternalSorter { LOG.info("Starting flush of map output"); span.end(); merger.add(span.sort(sorter)); - spill(); + // force a spill in flush() + // case 1: we want to force because of following scenarios: + // we have no keys written, and flush got called + // we want atleast one spill(be it empty) + // case 2: in pipeline shuffle case, we have no way of + // knowing the last key being written until flush is called + // so for flush()->spill() we want to force spill so that + // we can send pipeline shuffle event with last event true. + spill(false); sortmaster.shutdown(); //safe to clean up @@ -1158,6 +1175,9 @@ public class PipelinedSorter extends ExternalSorter { } StringBuilder sb = new StringBuilder(); + if (heap.size() == 0) { + return false; + } for(SpanIterator sp: heap) { sb.append(sp.toString()); sb.append(","); http://git-wip-us.apache.org/repos/asf/tez/blob/00508f89/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java index ebec304..92163c4 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java @@ -147,6 +147,24 @@ public class TestPipelinedSorter { } @Test + public void testEmptyDataWithPipelinedShuffle() throws IOException { + this.numOutputs = 1; + this.initialAvailableMem = 1 *1024 * 1024; + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false); + PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, + initialAvailableMem, 1 << 20); + + writeData(sorter, 0, 1<<20); + + // final merge is disabled. Final output file would not be populated in this case. + assertTrue(sorter.finalOutputFile == null); + TezCounter numShuffleChunks = outputContext.getCounters().findCounter(TaskCounter.SHUFFLE_CHUNK_COUNT); + assertTrue(sorter.getNumSpills() == numShuffleChunks.getValue()); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); + + } + + @Test public void basicTestWithSmallBlockSize() throws IOException { //3 MB key & 3 MB value, whereas block size is just 3 MB basicTest(1, 5, (3 << 20), (10 * 1024l * 1024l), 3 << 20);
