Repository: tez Updated Branches: refs/heads/master 29b45bc11 -> a70e16326
TEZ-3698: UnorderedKV writer should be able to honor tez.runtime.enable.final-merge.in.output without pipelinedshuffle (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a70e1632 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a70e1632 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a70e1632 Branch: refs/heads/master Commit: a70e163264c17ab7ee3184e79e8919c4e3bc744c Parents: 29b45bc Author: Rajesh Balamohan <[email protected]> Authored: Tue Jun 13 08:36:43 2017 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Tue Jun 13 08:36:43 2017 +0530 ---------------------------------------------------------------------- .../writers/UnorderedPartitionedKVWriter.java | 106 +++++++++++++------ .../TestUnorderedPartitionedKVWriter.java | 49 ++++++--- .../output/TestOnFileUnorderedKVOutput.java | 1 + 3 files changed, 112 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/a70e1632/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java index bcc9cf9..6bdb9e4 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java @@ -178,6 +178,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit private final Condition spillInProgress = spillLock.newCondition(); private final boolean pipelinedShuffle; + private final boolean isFinalMergeEnabled; + // To store events when final merge is disabled + private final List<Event> finalEvents; // How partition stats should be reported. final ReportPartitionStats reportPartitionStats; @@ -194,9 +197,14 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit this.destNameTrimmed = TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName()); //Not checking for TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT as it might not add much value in // this case. Add it later if needed. - pipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration + boolean pipelinedShuffleConf = this.conf.getBoolean(TezRuntimeConfiguration .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, TezRuntimeConfiguration .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT); + this.isFinalMergeEnabled = conf.getBoolean( + TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, + TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT_DEFAULT); + this.pipelinedShuffle = pipelinedShuffleConf && !isFinalMergeEnabled; + this.finalEvents = Lists.newLinkedList(); if (availableMemoryBytes == 0) { Preconditions.checkArgument(((numPartitions == 1) && !pipelinedShuffle), "availableMemory " @@ -385,7 +393,6 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit } } - int valStart = currentBuffer.nextPosition; valSerializer.serialize(value); @@ -725,15 +732,29 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit } //Regular code path. - if (numSpills.get() > 0) { + boolean updatedCounters = false; + if (numSpills.get() > 0 && isFinalMergeEnabled) { mergeAll(); } else { - finalSpill(); + if (finalSpill() && !isFinalMergeEnabled) { + //final spill generated some data. Add it to final events + updateTezCountersAndNotify(); + updatedCounters = true; + finalEvents.add(generateVMEvent()); + finalEvents.add(generateDMEvent()); + } + } + if (!updatedCounters) { + updateTezCountersAndNotify(); } - updateTezCountersAndNotify(); cleanupCurrentBuffer(); - eventList.add(generateVMEvent()); - eventList.add(generateDMEvent()); + if (isFinalMergeEnabled) { + eventList.add(generateVMEvent()); + eventList.add(generateDMEvent()); + } else { + //all events to be sent out are added in finalEvents. + eventList.addAll(finalEvents); + } return eventList; } @@ -741,7 +762,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit if (finalSpill()) { // VertexManagerEvent is only sent at the end and thus sizePerPartition is used // for the sum of all spills. - sendPipelinedEventForSpill(currentBuffer.recordsPerPartition, + mayBeSendEventsForSpill(currentBuffer.recordsPerPartition, sizePerPartition, numSpills.get() - 1, true); } updateTezCountersAndNotify(); @@ -828,7 +849,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit private boolean finalSpill() throws IOException { if (currentBuffer.nextPosition == 0) { - if (pipelinedShuffle) { + if (pipelinedShuffle || !isFinalMergeEnabled) { List<Event> eventList = Lists.newLinkedList(); eventList.add(ShuffleUtils.generateVMEvent(outputContext, reportPartitionStats() ? new long[numPartitions] : null, @@ -838,7 +859,11 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit emptyPartitions.flip(0, numPartitions); eventList.add(generateDMEvent(true, numSpills.get(), true, null, emptyPartitions)); - outputContext.sendEvents(eventList); + if (pipelinedShuffle) { + outputContext.sendEvents(eventList); + } else if (!isFinalMergeEnabled) { + finalEvents.addAll(eventList); + } } return false; } else { @@ -1011,7 +1036,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit final Path outPath = spillPathDetails.outputFilePath; out = rfs.create(outPath); BitSet emptyPartitions = null; - if (pipelinedShuffle) { + if (pipelinedShuffle || !isFinalMergeEnabled) { emptyPartitions = new BitSet(numPartitions); } for (int i = 0; i < numPartitions; i++) { @@ -1049,7 +1074,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit } handleSpillIndex(spillPathDetails, spillRecord); - sendPipelinedEventForSpill(emptyPartitions, sizePerPartition, + mayBeSendEventsForSpill(emptyPartitions, sizePerPartition, spillIndex, false); LOG.info(destNameTrimmed + ": " + "Finished writing large record of size " + outSize + " to spill file " + spillIndex); @@ -1158,36 +1183,53 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit } } - private void sendPipelinedEventForSpill( - BitSet emptyPartitions, long[] sizePerPartition, int spillNumber, - boolean isFinalUpdate) { - if (!pipelinedShuffle) { - return; - } + private List<Event> generateEventForSpill(BitSet emptyPartitions, long[] sizePerPartition, + int spillNumber, + boolean isFinalUpdate) throws IOException { List<Event> eventList = Lists.newLinkedList(); //Send out an event for consuming. + String pathComponent = (outputContext.getUniqueIdentifier() + "_" + spillNumber); + if (isFinalUpdate) { + eventList.add(ShuffleUtils.generateVMEvent(outputContext, + sizePerPartition, reportDetailedPartitionStats(), deflater.get())); + } + Event compEvent = generateDMEvent(true, spillNumber, isFinalUpdate, + pathComponent, emptyPartitions); + eventList.add(compEvent); + return eventList; + } + + private void mayBeSendEventsForSpill( + BitSet emptyPartitions, long[] sizePerPartition, + int spillNumber, boolean isFinalUpdate) { + if (!pipelinedShuffle) { + if (isFinalMergeEnabled) { + return; + } + } + List<Event> events = null; try { - String pathComponent = (outputContext.getUniqueIdentifier() + "_" + spillNumber); - if (isFinalUpdate) { - eventList.add(ShuffleUtils.generateVMEvent(outputContext, - sizePerPartition, reportDetailedPartitionStats(), deflater.get())); + events = generateEventForSpill(emptyPartitions, sizePerPartition, spillNumber, + isFinalUpdate); + LOG.info(destNameTrimmed + ": " + "Adding spill event for spill" + + " (final update=" + isFinalUpdate + "), spillId=" + spillNumber); + if (pipelinedShuffle) { + //Send out an event for consuming. + outputContext.sendEvents(events); + } else if (!isFinalMergeEnabled) { + this.finalEvents.addAll(events); } - Event compEvent = generateDMEvent(true, spillNumber, isFinalUpdate, - pathComponent, emptyPartitions); - eventList.add(compEvent); - - LOG.info(destNameTrimmed + ": " + "Adding spill event for spill (final update=" + isFinalUpdate + "), spillId=" + spillNumber); - outputContext.sendEvents(eventList); } catch (IOException e) { LOG.error(destNameTrimmed + ": " + "Error in sending pipelined events", e); - outputContext.reportFailure(TaskFailureType.NON_FATAL, e, "Error in sending pipelined events"); + outputContext.reportFailure(TaskFailureType.NON_FATAL, e, + "Error in sending events."); } } - private void sendPipelinedEventForSpill(int[] recordsPerPartition, + private void mayBeSendEventsForSpill(int[] recordsPerPartition, long[] sizePerPartition, int spillNumber, boolean isFinalUpdate) { BitSet emptyPartitions = getEmptyPartitions(recordsPerPartition); - sendPipelinedEventForSpill(emptyPartitions, sizePerPartition, spillNumber, + mayBeSendEventsForSpill(emptyPartitions, sizePerPartition, spillNumber, isFinalUpdate); } @@ -1221,7 +1263,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit } } - sendPipelinedEventForSpill(recordsPerPartition, sizePerPartition, spillNumber, false); + mayBeSendEventsForSpill(recordsPerPartition, sizePerPartition, spillNumber, false); try { for (WrappedBuffer buffer : result.filledBuffers) { http://git-wip-us.apache.org/repos/asf/tez/blob/a70e1632/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java index 1a0bbf0..bbe0992 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java @@ -287,31 +287,57 @@ public class TestUnorderedPartitionedKVWriter { @Test(timeout = 10000) public void testRandomText() throws IOException, InterruptedException { - textTest(100, 10, 2048, 0, 0, 0, false); + textTest(100, 10, 2048, 0, 0, 0, false, true); } @Test(timeout = 10000) public void testLargeKeys() throws IOException, InterruptedException { - textTest(0, 10, 2048, 10, 0, 0, false); + textTest(0, 10, 2048, 10, 0, 0, false, true); } @Test(timeout = 10000) public void testLargevalues() throws IOException, InterruptedException { - textTest(0, 10, 2048, 0, 10, 0, false); + textTest(0, 10, 2048, 0, 10, 0, false, true); } @Test(timeout = 10000) public void testLargeKvPairs() throws IOException, InterruptedException { - textTest(0, 10, 2048, 0, 0, 10, false); + textTest(0, 10, 2048, 0, 0, 10, false, true); } @Test(timeout = 10000) public void testTextMixedRecords() throws IOException, InterruptedException { - textTest(100, 10, 2048, 10, 10, 10, false); + textTest(100, 10, 2048, 10, 10, 10, false, true); + } + + @Test(timeout = 10000000) + public void testRandomTextWithoutFinalMerge() throws IOException, InterruptedException { + textTest(100, 10, 2048, 0, 0, 0, false, false); + } + + @Test(timeout = 10000) + public void testLargeKeysWithoutFinalMerge() throws IOException, InterruptedException { + textTest(0, 10, 2048, 10, 0, 0, false, false); + } + + @Test(timeout = 10000) + public void testLargevaluesWithoutFinalMerge() throws IOException, InterruptedException { + textTest(0, 10, 2048, 0, 10, 0, false, false); + } + + @Test(timeout = 10000) + public void testLargeKvPairsWithoutFinalMerge() throws IOException, InterruptedException { + textTest(0, 10, 2048, 0, 0, 10, false, false); + } + + @Test(timeout = 10000) + public void testTextMixedRecordsWithoutFinalMerge() throws IOException, InterruptedException { + textTest(100, 10, 2048, 10, 10, 10, false, false); } public void textTest(int numRegularRecords, int numPartitions, long availableMemory, - int numLargeKeys, int numLargevalues, int numLargeKvPairs, boolean pipeliningEnabled) throws IOException, + int numLargeKeys, int numLargevalues, int numLargeKvPairs, + boolean pipeliningEnabled, boolean isFinalMergeEnabled) throws IOException, InterruptedException { Partitioner partitioner = new HashPartitioner(); ApplicationId appId = ApplicationId.newInstance(10000000, 1); @@ -325,10 +351,9 @@ public class TestUnorderedPartitionedKVWriter { Configuration conf = createConfiguration(outputContext, Text.class, Text.class, shouldCompress, -1, HashPartitioner.class); - if (pipeliningEnabled) { - conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, true); - conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false); - } + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, pipeliningEnabled); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, isFinalMergeEnabled); + CompressionCodec codec = null; if (shouldCompress) { codec = new DefaultCodec(); @@ -432,7 +457,7 @@ public class TestUnorderedPartitionedKVWriter { assertEquals(numLargeKeys + numLargevalues + numLargeKvPairs, outputLargeRecordsCounter.getValue()); - if (pipeliningEnabled) { + if (pipeliningEnabled || !isFinalMergeEnabled) { return; } @@ -590,7 +615,7 @@ public class TestUnorderedPartitionedKVWriter { @Test(timeout = 10000) public void testLargeKvPairs_WithPipelinedShuffle() throws IOException, InterruptedException { - textTest(0, 10, 2048, 10, 20, 50, true); + textTest(0, 10, 2048, 10, 20, 50, true, false); } http://git-wip-us.apache.org/repos/asf/tez/blob/a70e1632/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java index a591ee9..393ac2e 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java @@ -168,6 +168,7 @@ public class TestOnFileUnorderedKVOutput { conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName()); conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, IntWritable.class.getName()); conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, true); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false); conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB, 1);
