TEZ-2213. For the ordered case, enabling pipelined shuffle should automatically disable final merge (rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8be1602f Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8be1602f Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8be1602f Branch: refs/heads/TEZ-2003 Commit: 8be1602f1648907abc76dfeb98bd3cfa6ea98c10 Parents: d42a3c7 Author: Rajesh Balamohan <[email protected]> Authored: Fri Mar 27 04:55:43 2015 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Fri Mar 27 04:55:43 2015 +0530 ---------------------------------------------------------------------- .../output/OrderedPartitionedKVOutput.java | 23 +++++++++++--------- .../library/output/TestOnFileSortedOutput.java | 11 ++++------ 2 files changed, 17 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/8be1602f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java index 01df311..518d214 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java @@ -111,6 +111,19 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput { .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, TezRuntimeConfiguration .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT); + if (pipelinedShuffle) { + if (finalMergeEnabled) { + LOG.info("Disabling final merge as " + + TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED + " is enabled."); + finalMergeEnabled = false; + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false); + } + + //TODO: Enable it for pipelinedsorter only and not for DefaultSorter + Preconditions.checkArgument((sortThreads > 1), TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED + " works with PipelinedSorter."); + } + if (sortThreads > 1) { sorter = new PipelinedSorter(getContext(), conf, getNumPhysicalOutputs(), memoryUpdateCallbackHandler.getMemoryAssigned()); @@ -119,16 +132,6 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput { memoryUpdateCallbackHandler.getMemoryAssigned()); } - if (pipelinedShuffle) { - Preconditions.checkArgument(!finalMergeEnabled, TezRuntimeConfiguration - .TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT + " has to be set to false for pipelined " - + "shuffle to work properly."); - - //TODO: Enable it for pipelinedsorter only and not for DefaultSorter - Preconditions.checkArgument((sortThreads > 1), TezRuntimeConfiguration - .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED + " works with PipelinedSorter."); - } - isStarted.set(true); } } http://git-wip-us.apache.org/repos/asf/tez/blob/8be1602f/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java index 721673b..cfe1e6f 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java @@ -186,6 +186,7 @@ public class TestOnFileSortedOutput { conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 3); conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, 2); + //wrong setting for final merge enable in output conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, true); OutputContext context = createTezOutputContext(); @@ -194,13 +195,9 @@ public class TestOnFileSortedOutput { sortedOutput = new OrderedPartitionedKVOutput(context, partitions); sortedOutput.initialize(); - try { - sortedOutput.start(); - fail("Should have thrown illegal arguement exception as final merge & pipelining are " - + "enabled together"); - } catch(IllegalArgumentException ie) { - assertTrue(ie.getMessage().contains("has to be set to false for pipelined")); - } + sortedOutput.start(); + assertFalse(sortedOutput.finalMergeEnabled); //should be disabled as pipelining is on + assertTrue(sortedOutput.pipelinedShuffle); } @Test
