Author: rohini
Date: Mon Mar 23 20:36:52 2015
New Revision: 1668723

URL: http://svn.apache.org/r1668723
Log:
PIG-4474: Increasing intermediate parallelism has issue with default 
parallelism (rohini)

Modified:
    pig/trunk/CHANGES.txt
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
    pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1668723&r1=1668722&r2=1668723&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Mar 23 20:36:52 2015
@@ -54,6 +54,8 @@ PIG-4333: Split BigData tests into multi
  
 BUG FIXES
 
+PIG-4474: Increasing intermediate parallelism has issue with default 
parallelism (rohini)
+
 PIG-4465: Pig streaming ship fails for relative paths on Tez (rohini)
 
 PIG-4461: Use benchmarks for Windows Pig e2e tests (nmaheshwari via daijy)

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1668723&r1=1668722&r2=1668723&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
 Mon Mar 23 20:36:52 2015
@@ -262,9 +262,10 @@ public class TezOperator extends Operato
         this.estimatedParallelism = estimatedParallelism;
     }
 
-    public int getEffectiveParallelism() {
+    public int getEffectiveParallelism(int defaultParallelism) {
         // PIG-4162: For intermediate reducers, use estimated parallelism over 
user set parallelism.
-        return getEstimatedParallelism() == -1 ? getRequestedParallelism()
+        return getEstimatedParallelism() == -1
+                ? (getRequestedParallelism() == -1 ? defaultParallelism : 
getRequestedParallelism())
                 : getEstimatedParallelism();
     }
 

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java?rev=1668723&r1=1668722&r2=1668723&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
 Mon Mar 23 20:36:52 2015
@@ -99,7 +99,7 @@ public class ParallelismSetter extends T
                 for (Map.Entry<OperatorKey,TezEdgeDescriptor> entry : 
tezOp.inEdges.entrySet()) {
                     if (entry.getValue().dataMovementType == 
DataMovementType.ONE_TO_ONE) {
                         TezOperator pred = mPlan.getOperator(entry.getKey());
-                        parallelism = pred.getEffectiveParallelism();
+                        parallelism = 
pred.getEffectiveParallelism(pc.defaultParallel);
                         if (prevParallelism == -1) {
                             prevParallelism = parallelism;
                         } else if (prevParallelism != parallelism) {
@@ -133,6 +133,9 @@ public class ParallelismSetter extends T
                             // if it is intermediate reducer
                             parallelism = estimator.estimateParallelism(mPlan, 
tezOp, conf);
                             if (overrideRequestedParallelism) {
+                                if (tezOp.getRequestedParallelism() != 
parallelism) {
+                                    LOG.info("Increased requested parallelism 
of " + tezOp.getOperatorKey() + " to " + parallelism);
+                                }
                                 tezOp.setRequestedParallelism(parallelism);
                             } else {
                                 tezOp.setEstimatedParallelism(parallelism);
@@ -157,8 +160,7 @@ public class ParallelismSetter extends T
                                     if (pred.isSampleBasedPartitioner()) {
                                         for (TezOperator partitionerPred : 
mPlan.getPredecessors(pred)) {
                                             if 
(partitionerPred.isSampleAggregation()) {
-                                                LOG.debug("Updating constant 
value to " + parallelism + " in " + partitionerPred.plan);
-                                                LOG.info("Increased requested 
parallelism of " + partitionerPred.getOperatorKey() + " to " + parallelism);
+                                                LOG.debug("Updating 
parallelism constant value to " + parallelism + " in " + partitionerPred.plan);
                                                 ParallelConstantVisitor 
visitor =
                                                         new 
ParallelConstantVisitor(partitionerPred.plan, parallelism);
                                                 visitor.visit();

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java?rev=1668723&r1=1668722&r2=1668723&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
 Mon Mar 23 20:36:52 2015
@@ -111,7 +111,7 @@ public class TezOperDependencyParallelis
             // and sample/scalar (does not impact parallelism)
             if 
(entry.getValue().dataMovementType==DataMovementType.SCATTER_GATHER ||
                     
entry.getValue().dataMovementType==DataMovementType.ONE_TO_ONE) {
-                double predParallelism = pred.getEffectiveParallelism();
+                double predParallelism = 
pred.getEffectiveParallelism(pc.defaultParallel);
                 if (predParallelism==-1) {
                     throw new IOException("Cannot estimate parallelism for " + 
tezOper.getOperatorKey().toString()
                             + ", effective parallelism for predecessor " + 
tezOper.getOperatorKey().toString()

Modified: pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java?rev=1668723&r1=1668722&r2=1668723&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java Mon Mar 23 
20:36:52 2015
@@ -20,6 +20,7 @@ package org.apache.pig.tez;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
@@ -30,6 +31,7 @@ import java.util.List;
 import java.util.Properties;
 import java.util.Random;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -244,22 +246,79 @@ public class TestTezAutoParallelism {
     }
 
     @Test
-    public void testSkewedJoinIncreaseIntermediateParallelism() throws 
IOException{
+    public void testIncreaseIntermediateParallelism1() throws IOException{
+        // User specified parallelism is overriden for intermediate step
+        String outputDir = "/tmp/testIncreaseIntermediateParallelism";
+        String script = "A = load '" + INPUT_FILE1 + "' as (name:chararray, 
age:int);"
+                + "B = load '" + INPUT_FILE2 + "' as (name:chararray, 
gender:chararray);"
+                + "C = join A by name, B by name using 'skewed' parallel 1;"
+                + "D = group C by A::name;"
+                + "E = foreach D generate group, COUNT(C.A::name);"
+                + "STORE E into '" + outputDir + "/finalout';";
+        String log = testIncreaseIntermediateParallelism(script, outputDir, 
true);
+        // Parallelism of C should be increased
+        assertTrue(log.contains("Increased requested parallelism of scope-54 
to 4"));
+        assertEquals(1, StringUtils.countMatches(log, "Increased requested 
parallelism"));
+    }
+
+    @Test
+    public void testIncreaseIntermediateParallelism2() throws IOException{
+        // User specified parallelism should not be overriden for intermediate 
step if there is a STORE
+        String outputDir = "/tmp/testIncreaseIntermediateParallelism";
+        String script = "A = load '" + INPUT_FILE1 + "' as (name:chararray, 
age:int);"
+                + "B = load '" + INPUT_FILE2 + "' as (name:chararray, 
gender:chararray);"
+                + "C = join A by name, B by name using 'skewed' parallel 2;"
+                + "STORE C into '/tmp/testIncreaseIntermediateParallelism';"
+                + "D = group C by A::name parallel 2;"
+                + "E = foreach D generate group, COUNT(C.A::name);"
+                + "STORE E into '" + outputDir + "/finalout';";
+        String log = testIncreaseIntermediateParallelism(script, outputDir, 
true);
+        // Parallelism of C will not be increased as the Split has a STORE
+        assertEquals(0, StringUtils.countMatches(log, "Increased requested 
parallelism"));
+    }
+
+    @Test
+    public void testIncreaseIntermediateParallelism3() throws IOException{
+        // Multiple levels with default parallelism. Group by followed by 
Group by
+        try {
+            String outputDir = "/tmp/testIncreaseIntermediateParallelism";
+            String script = "set default_parallel 1\n"
+                    + "A = load '" + INPUT_FILE1 + "' as (name:chararray, 
age:int);"
+                    + "B = load '" + INPUT_FILE2 + "' as (name:chararray, 
gender:chararray);"
+                    + "C = join A by name, B by name;"
+                    + "STORE C into 
'/tmp/testIncreaseIntermediateParallelism';"
+                    + "C1 = group C by A::name;"
+                    + "C2 = FOREACH C1 generate group, FLATTEN(C);"
+                    + "D = group C2 by group;"
+                    + "E = foreach D generate group, COUNT(C2.A::name);"
+                    + "F = order E by $0;"
+                    + "STORE E into '" + outputDir + "/finalout';";
+            String log = testIncreaseIntermediateParallelism(script, 
outputDir, false);
+            // Parallelism of C1 should be increased. C2 will not be increased 
due to order by
+            assertEquals(1, StringUtils.countMatches(log, "Increased requested 
parallelism"));
+            assertTrue(log.contains("Increased requested parallelism of 
scope-63 to 10"));
+        } finally {
+            pigServer.setDefaultParallel(-1);
+        }
+    }
+
+    private String testIncreaseIntermediateParallelism(String script, String 
outputDir, boolean sortAndCheck) throws IOException {
         NodeIdGenerator.reset();
         PigServer.resetScope();
         StringWriter writer = new StringWriter();
         // When there is a combiner operation involved user specified 
parallelism is overriden
-        Util.createLogAppender(ParallelismSetter.class, 
"testSkewedJoinIncreaseIntermediateParallelism", writer);
+        Util.createLogAppender(ParallelismSetter.class, 
"testIncreaseIntermediateParallelism", writer);
         try {
             
pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION,
 "true");
             
pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE,
 "4000");
             
pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
 "80000");
-            pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as 
(name:chararray, age:int);");
-            pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as 
(name:chararray, gender:chararray);");
-            pigServer.registerQuery("C = join A by name, B by name using 
'skewed' parallel 1;");
-            pigServer.registerQuery("D = group C by A::name;");
-            pigServer.registerQuery("E = foreach D generate group, 
COUNT(C.A::name);");
-            Iterator<Tuple> iter = pigServer.openIterator("E");
+            pigServer.setBatchOn();
+            pigServer.registerScript(new 
ByteArrayInputStream(script.getBytes()));
+            pigServer.executeBatch();
+
+            pigServer.registerQuery("A = load '" + outputDir + "/finalout' as 
(name:chararray, count:long);");
+            Iterator<Tuple> iter = pigServer.openIterator("A");
+
             List<Tuple> expectedResults = Util
                     .getTuplesFromConstantTupleStrings(new String[] {
                             "('Abigail',56L)", "('Alexander',45L)", 
"('Ava',60L)",
@@ -269,11 +328,15 @@ public class TestTezAutoParallelism {
                             "('Liam',46L)", "('Madison',46L)", "('Mason',54L)",
                             "('Mia',51L)", "('Michael',47L)", "('Noah',38L)",
                             "('Olivia',50L)", "('Sophia',52L)", 
"('William',43L)" });
-
-            Util.checkQueryOutputsAfterSort(iter, expectedResults);
-            assertTrue(writer.toString().contains("Increased requested 
parallelism of scope-40 to 4"));
+            if (sortAndCheck) {
+                Util.checkQueryOutputsAfterSort(iter, expectedResults);
+            } else {
+                Util.checkQueryOutputs(iter, expectedResults);
+            }
+            return writer.toString();
         } finally {
-            Util.removeLogAppender(ParallelismSetter.class, 
"testSkewedJoinIncreaseIntermediateParallelism");
+            Util.removeLogAppender(ParallelismSetter.class, 
"testIncreaseIntermediateParallelism");
+            Util.deleteFile(cluster, outputDir);
         }
     }
 }


Reply via email to