TEZ-2213. For the ordered case, enabling pipelined shuffle should automatically 
disable final merge (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8be1602f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8be1602f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8be1602f

Branch: refs/heads/TEZ-2003
Commit: 8be1602f1648907abc76dfeb98bd3cfa6ea98c10
Parents: d42a3c7
Author: Rajesh Balamohan <[email protected]>
Authored: Fri Mar 27 04:55:43 2015 +0530
Committer: Rajesh Balamohan <[email protected]>
Committed: Fri Mar 27 04:55:43 2015 +0530

----------------------------------------------------------------------
 .../output/OrderedPartitionedKVOutput.java      | 23 +++++++++++---------
 .../library/output/TestOnFileSortedOutput.java  | 11 ++++------
 2 files changed, 17 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/8be1602f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index 01df311..518d214 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -111,6 +111,19 @@ public class OrderedPartitionedKVOutput extends 
AbstractLogicalOutput {
           .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, TezRuntimeConfiguration
           .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT);
 
+      if (pipelinedShuffle) {
+        if (finalMergeEnabled) {
+          LOG.info("Disabling final merge as "
+              + TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED 
+ " is enabled.");
+          finalMergeEnabled = false;
+          
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT,
 false);
+        }
+
+        //TODO: Enable it for pipelinedsorter only and not for DefaultSorter
+        Preconditions.checkArgument((sortThreads > 1), TezRuntimeConfiguration
+            .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED + " works with 
PipelinedSorter.");
+      }
+
       if (sortThreads > 1) {
         sorter = new PipelinedSorter(getContext(), conf, 
getNumPhysicalOutputs(),
             memoryUpdateCallbackHandler.getMemoryAssigned());
@@ -119,16 +132,6 @@ public class OrderedPartitionedKVOutput extends 
AbstractLogicalOutput {
             memoryUpdateCallbackHandler.getMemoryAssigned());
       }
 
-      if (pipelinedShuffle) {
-        Preconditions.checkArgument(!finalMergeEnabled, TezRuntimeConfiguration
-            .TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT + " has to be set to 
false for pipelined "
-            + "shuffle to work properly.");
-
-        //TODO: Enable it for pipelinedsorter only and not for DefaultSorter
-        Preconditions.checkArgument((sortThreads > 1), TezRuntimeConfiguration
-            .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED + " works with 
PipelinedSorter.");
-      }
-
       isStarted.set(true);
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/8be1602f/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
index 721673b..cfe1e6f 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
@@ -186,6 +186,7 @@ public class TestOnFileSortedOutput {
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 3);
 
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, 2);
+    //wrong setting for final merge enable in output
     
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT,
 true);
     
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, 
true);
     OutputContext context = createTezOutputContext();
@@ -194,13 +195,9 @@ public class TestOnFileSortedOutput {
     sortedOutput = new OrderedPartitionedKVOutput(context, partitions);
 
     sortedOutput.initialize();
-    try {
-      sortedOutput.start();
-      fail("Should have thrown illegal arguement exception as final merge & 
pipelining are "
-          + "enabled together");
-    } catch(IllegalArgumentException ie) {
-      assertTrue(ie.getMessage().contains("has to be set to false for 
pipelined"));
-    }
+    sortedOutput.start();
+    assertFalse(sortedOutput.finalMergeEnabled); //should be disabled as 
pipelining is on
+    assertTrue(sortedOutput.pipelinedShuffle);
   }
 
   @Test

Reply via email to