Author: daijy
Date: Mon Jun  1 17:42:26 2015
New Revision: 1682957

URL: http://svn.apache.org/r1682957
Log:
PIG-4580: Fix TestTezAutoParallelism.testSkewedJoinIncreaseParallelism test 
failure

Modified:
    pig/branches/branch-0.15/CHANGES.txt
    
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
    
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
    
pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-2.gld
    
pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld
    
pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15-OPTOFF.gld
    
pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15.gld
    
pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16-OPTOFF.gld
    
pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16.gld
    
pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld
    
pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld
    pig/branches/branch-0.15/test/org/apache/pig/tez/TestTezAutoParallelism.java

Modified: pig/branches/branch-0.15/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/branches/branch-0.15/CHANGES.txt?rev=1682957&r1=1682956&r2=1682957&view=diff
==============================================================================
--- pig/branches/branch-0.15/CHANGES.txt (original)
+++ pig/branches/branch-0.15/CHANGES.txt Mon Jun  1 17:42:26 2015
@@ -74,6 +74,8 @@ PIG-4333: Split BigData tests into multi
  
 BUG FIXES
 
+PIG-4580: Fix TestTezAutoParallelism.testSkewedJoinIncreaseParallelism test 
failure (daijy)
+
 PIG-4571: TestPigRunner.testGetHadoopCounters fail on Windows (daijy)
 
 PIG-4541: Skewed full outer join does not return records if any relation is 
empty. Outer join does not

Modified: 
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: 
http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1682957&r1=1682956&r2=1682957&view=diff
==============================================================================
--- 
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
 (original)
+++ 
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
 Mon Jun  1 17:42:26 2015
@@ -430,7 +430,7 @@ public class TezDagBuilder extends TezOp
         in.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
         out.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
 
-        if (edge.dataMovementType!=DataMovementType.BROADCAST && 
to.getEstimatedParallelism()!=-1 && (to.isGlobalSort()||to.isSkewedJoin())) {
+        if (edge.dataMovementType!=DataMovementType.BROADCAST && 
to.getEstimatedParallelism()!=-1 && to.getVertexParallelism()==-1 && 
(to.isGlobalSort()||to.isSkewedJoin())) {
             // Use custom edge
             return EdgeProperty.create((EdgeManagerPluginDescriptor)null,
                     edge.dataSourceType, edge.schedulingType, out, in);
@@ -671,11 +671,15 @@ public class TezDagBuilder extends TezOp
         // Set the right VertexManagerPlugin
         if (tezOp.getEstimatedParallelism() != -1) {
             if (tezOp.isGlobalSort()||tezOp.isSkewedJoin()) {
-                // Set VertexManagerPlugin to PartitionerDefinedVertexManager, 
which is able
-                // to decrease/increase parallelism of sorting vertex 
dynamically
-                // based on the numQuantiles calculated by sample aggregation 
vertex
-                vmPluginName = PartitionerDefinedVertexManager.class.getName();
-                log.info("Set VertexManagerPlugin to 
PartitionerDefinedParallelismVertexManager for vertex " + 
tezOp.getOperatorKey().toString());
+                if (tezOp.getVertexParallelism()==-1 && (
+                        tezOp.isGlobalSort() 
&&getPlan().getPredecessors(tezOp).size()==1||
+                        tezOp.isSkewedJoin() 
&&getPlan().getPredecessors(tezOp).size()==2)) {
+                    // Set VertexManagerPlugin to 
PartitionerDefinedVertexManager, which is able
+                    // to decrease/increase parallelism of sorting vertex 
dynamically
+                    // based on the numQuantiles calculated by sample 
aggregation vertex
+                    vmPluginName = 
PartitionerDefinedVertexManager.class.getName();
+                    log.info("Set VertexManagerPlugin to 
PartitionerDefinedParallelismVertexManager for vertex " + 
tezOp.getOperatorKey().toString());
+                }
             } else {
                 boolean containScatterGather = false;
                 boolean containCustomPartitioner = false;

Modified: 
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1682957&r1=1682956&r2=1682957&view=diff
==============================================================================
--- 
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
 (original)
+++ 
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
 Mon Jun  1 17:42:26 2015
@@ -1639,6 +1639,7 @@ public class TezCompiler extends PhyPlan
             List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
             List<Boolean> flat = new ArrayList<Boolean>();
 
+            boolean containsOuter = false;
             // Add corresponding POProjects
             for (int i=0; i < 2; i++) {
                 ep = new PhysicalPlan();
@@ -1651,6 +1652,7 @@ public class TezCompiler extends PhyPlan
                 if (!inner[i]) {
                     // Add an empty bag for outer join
                     CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i), 
true, IsFirstReduceOfKeyTez.class.getName());
+                    containsOuter = true;
                 }
                 flat.add(true);
             }
@@ -1681,14 +1683,16 @@ public class TezCompiler extends PhyPlan
 
             POValueOutputTez sampleOut = (POValueOutputTez) 
sampleJobPair.first.plan.getLeaves().get(0);
             for (int i = 0; i <= 2; i++) {
-                // We need to send sample to left relation partitioner vertex, 
right relation load vertex,
-                // and join vertex (IsFirstReduceOfKey in join vertex need 
sample file as well)
-                joinJobs[i].setSampleOperator(sampleJobPair.first);
-
-                // Configure broadcast edges for distribution map
-                edge = TezCompilerUtil.connect(tezPlan, sampleJobPair.first, 
joinJobs[i]);
-                TezCompilerUtil.configureValueOnlyTupleOutput(edge, 
DataMovementType.BROADCAST);
-                
sampleOut.addOutputKey(joinJobs[i].getOperatorKey().toString());
+                if (i != 2 || containsOuter) {
+                    // We need to send sample to left relation partitioner 
vertex, right relation load vertex,
+                    // and join vertex (IsFirstReduceOfKey in join vertex need 
sample file as well)
+                    joinJobs[i].setSampleOperator(sampleJobPair.first);
+    
+                    // Configure broadcast edges for distribution map
+                    edge = TezCompilerUtil.connect(tezPlan, 
sampleJobPair.first, joinJobs[i]);
+                    TezCompilerUtil.configureValueOnlyTupleOutput(edge, 
DataMovementType.BROADCAST);
+                    
sampleOut.addOutputKey(joinJobs[i].getOperatorKey().toString());
+                }
 
                 // Configure skewed partitioner for join
                 if (i != 2) {

Modified: 
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java?rev=1682957&r1=1682956&r2=1682957&view=diff
==============================================================================
--- 
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
 (original)
+++ 
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
 Mon Jun  1 17:42:26 2015
@@ -148,7 +148,12 @@ public class ParallelismSetter extends T
                             parallelism = tezOp.getEstimatedParallelism();
                         }
                         if (tezOp.isGlobalSort() || tezOp.isSkewedJoin()) {
-                            if (!overrideRequestedParallelism) {
+                            boolean additionalEdge = false;
+                            if (tezOp.isGlobalSort() && 
getPlan().getPredecessors(tezOp).size() != 1 ||
+                                    tezOp.isSkewedJoin() && 
getPlan().getPredecessors(tezOp).size() != 2) {
+                                additionalEdge = true;
+                            }
+                            if (!overrideRequestedParallelism && 
!additionalEdge) {
                                 incrementTotalParallelism(tezOp, parallelism);
                                 // PartitionerDefinedVertexManager will 
determine parallelism.
                                 // So call setVertexParallelism with -1
@@ -168,6 +173,7 @@ public class ParallelismSetter extends T
                                                 ParallelConstantVisitor 
visitor =
                                                         new 
ParallelConstantVisitor(partitionerPred.plan, parallelism);
                                                 visitor.visit();
+                                                
partitionerPred.setNeedEstimatedQuantile(false);
                                                 break;
                                             }
                                         }

Modified: 
pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-2.gld
URL: 
http://svn.apache.org/viewvc/pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-2.gld?rev=1682957&r1=1682956&r2=1682957&view=diff
==============================================================================
--- 
pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-2.gld
 (original)
+++ 
pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-2.gld
 Mon Jun  1 17:42:26 2015
@@ -5,7 +5,7 @@
 # TEZ DAG plan: pig-0_scope-0
 #--------------------------------------------------
 Tez vertex scope-25    ->      Tez vertex scope-29,Tez vertex scope-38,Tez 
vertex scope-48,
-Tez vertex scope-38    ->      Tez vertex scope-29,Tez vertex scope-48,Tez 
vertex scope-52,
+Tez vertex scope-38    ->      Tez vertex scope-29,Tez vertex scope-48,
 Tez vertex scope-48    ->      Tez vertex scope-52,
 Tez vertex scope-29    ->      Tez vertex scope-52,
 Tez vertex scope-52
@@ -55,7 +55,7 @@ a: Split - scope-58
     |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
 Tez vertex scope-38
 # Plan on vertex
-POValueOutputTez - scope-47    ->       [scope-29, scope-48, scope-52]
+POValueOutputTez - scope-47    ->       [scope-29, scope-48]
 |
 |---New For Each(false)[tuple] - scope-46
     |   |

Modified: 
pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld
URL: 
http://svn.apache.org/viewvc/pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld?rev=1682957&r1=1682956&r2=1682957&view=diff
==============================================================================
--- 
pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld
 (original)
+++ 
pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld
 Mon Jun  1 17:42:26 2015
@@ -5,7 +5,7 @@
 # TEZ DAG plan: pig-0_scope-0
 #--------------------------------------------------
 Tez vertex scope-27    ->      Tez vertex scope-36,Tez vertex scope-46,
-Tez vertex scope-36    ->      Tez vertex scope-28,Tez vertex scope-46,Tez 
vertex scope-50,
+Tez vertex scope-36    ->      Tez vertex scope-28,Tez vertex scope-46,
 Tez vertex scope-46    ->      Tez vertex scope-50,
 Tez vertex scope-28    ->      Tez vertex scope-50,
 Tez vertex scope-50
@@ -43,7 +43,7 @@ Local Rearrange[tuple]{tuple}(false) - s
                 |---a: 
Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
 Tez vertex scope-36
 # Plan on vertex
-POValueOutputTez - scope-45    ->       [scope-28, scope-46, scope-50]
+POValueOutputTez - scope-45    ->       [scope-28, scope-46]
 |
 |---New For Each(false)[tuple] - scope-44
     |   |

Modified: 
pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15-OPTOFF.gld
URL: 
http://svn.apache.org/viewvc/pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15-OPTOFF.gld?rev=1682957&r1=1682956&r2=1682957&view=diff
==============================================================================
--- 
pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15-OPTOFF.gld
 (original)
+++ 
pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15-OPTOFF.gld
 Mon Jun  1 17:42:26 2015
@@ -7,7 +7,7 @@
 Tez vertex scope-30    ->      Tez vertex scope-34,Tez vertex scope-36,
 Tez vertex scope-34    ->      Tez vertex scope-36,
 Tez vertex scope-36    ->      Tez vertex scope-48,Tez vertex scope-58,
-Tez vertex scope-48    ->      Tez vertex scope-40,Tez vertex scope-58,Tez 
vertex scope-62,
+Tez vertex scope-48    ->      Tez vertex scope-40,Tez vertex scope-58,
 Tez vertex scope-58    ->      Tez vertex scope-62,
 Tez vertex scope-40    ->      Tez vertex scope-62,
 Tez vertex scope-62
@@ -67,7 +67,7 @@ Local Rearrange[tuple]{tuple}(false) - s
             |---POShuffledValueInputTez - scope-37     <-       [scope-30, 
scope-34]
 Tez vertex scope-48
 # Plan on vertex
-POValueOutputTez - scope-57    ->       [scope-40, scope-58, scope-62]
+POValueOutputTez - scope-57    ->       [scope-40, scope-58]
 |
 |---New For Each(false)[tuple] - scope-56
     |   |

Modified: 
pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15.gld
URL: 
http://svn.apache.org/viewvc/pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15.gld?rev=1682957&r1=1682956&r2=1682957&view=diff
==============================================================================
--- 
pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15.gld
 (original)
+++ 
pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15.gld
 Mon Jun  1 17:42:26 2015
@@ -5,7 +5,7 @@
 # TEZ DAG plan: pig-0_scope-0
 #--------------------------------------------------
 Tez vertex scope-30    ->      Tez vertex scope-48,Tez vertex scope-58,
-Tez vertex scope-48    ->      Tez vertex scope-40,Tez vertex scope-58,Tez 
vertex scope-62,
+Tez vertex scope-48    ->      Tez vertex scope-40,Tez vertex scope-58,
 Tez vertex scope-58    ->      Tez vertex scope-62,
 Tez vertex scope-40    ->      Tez vertex scope-62,
 Tez vertex scope-62
@@ -71,7 +71,7 @@ a: Split - scope-68
     |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
 Tez vertex scope-48
 # Plan on vertex
-POValueOutputTez - scope-57    ->       [scope-40, scope-58, scope-62]
+POValueOutputTez - scope-57    ->       [scope-40, scope-58]
 |
 |---New For Each(false)[tuple] - scope-56
     |   |

Modified: 
pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16-OPTOFF.gld
URL: 
http://svn.apache.org/viewvc/pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16-OPTOFF.gld?rev=1682957&r1=1682956&r2=1682957&view=diff
==============================================================================
--- 
pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16-OPTOFF.gld
 (original)
+++ 
pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16-OPTOFF.gld
 Mon Jun  1 17:42:26 2015
@@ -5,7 +5,7 @@
 # TEZ DAG plan: pig-0_scope-0
 #--------------------------------------------------
 Tez vertex scope-30    ->      Tez vertex scope-48,Tez vertex scope-58,
-Tez vertex scope-48    ->      Tez vertex scope-37,Tez vertex scope-58,Tez 
vertex scope-62,
+Tez vertex scope-48    ->      Tez vertex scope-37,Tez vertex scope-58,
 Tez vertex scope-58    ->      Tez vertex scope-62,
 Tez vertex scope-31    ->      Tez vertex scope-35,Tez vertex scope-37,
 Tez vertex scope-35    ->      Tez vertex scope-37,
@@ -45,7 +45,7 @@ Local Rearrange[tuple]{tuple}(false) - s
                 |---d: 
Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
 Tez vertex scope-48
 # Plan on vertex
-POValueOutputTez - scope-57    ->       [scope-37, scope-58, scope-62]
+POValueOutputTez - scope-57    ->       [scope-37, scope-58]
 |
 |---New For Each(false)[tuple] - scope-56
     |   |

Modified: 
pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16.gld
URL: 
http://svn.apache.org/viewvc/pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16.gld?rev=1682957&r1=1682956&r2=1682957&view=diff
==============================================================================
--- 
pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16.gld
 (original)
+++ 
pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16.gld
 Mon Jun  1 17:42:26 2015
@@ -5,7 +5,7 @@
 # TEZ DAG plan: pig-0_scope-0
 #--------------------------------------------------
 Tez vertex scope-30    ->      Tez vertex scope-48,Tez vertex scope-58,
-Tez vertex scope-48    ->      Tez vertex scope-31,Tez vertex scope-58,Tez 
vertex scope-62,
+Tez vertex scope-48    ->      Tez vertex scope-31,Tez vertex scope-58,
 Tez vertex scope-58    ->      Tez vertex scope-62,
 Tez vertex scope-31    ->      Tez vertex scope-62,
 Tez vertex scope-62
@@ -43,7 +43,7 @@ Local Rearrange[tuple]{tuple}(false) - s
                 |---d: 
Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
 Tez vertex scope-48
 # Plan on vertex
-POValueOutputTez - scope-57    ->       [scope-31, scope-58, scope-62]
+POValueOutputTez - scope-57    ->       [scope-31, scope-58]
 |
 |---New For Each(false)[tuple] - scope-56
     |   |

Modified: 
pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld
URL: 
http://svn.apache.org/viewvc/pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld?rev=1682957&r1=1682956&r2=1682957&view=diff
==============================================================================
--- 
pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld
 (original)
+++ 
pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld
 Mon Jun  1 17:42:26 2015
@@ -7,7 +7,7 @@
 Tez vertex scope-114   ->      Tez vertex scope-116,
 Tez vertex scope-115   ->      Tez vertex scope-116,
 Tez vertex scope-116   ->      Tez vertex scope-128,Tez vertex scope-138,
-Tez vertex scope-128   ->      Tez vertex scope-120,Tez vertex scope-138,Tez 
vertex scope-142,
+Tez vertex scope-128   ->      Tez vertex scope-120,Tez vertex scope-138,
 Tez vertex scope-138   ->      Tez vertex scope-142,
 Tez vertex scope-120   ->      Tez vertex scope-142,
 Tez vertex scope-142
@@ -65,7 +65,7 @@ Local Rearrange[tuple]{tuple}(false) - s
             |---POShuffledValueInputTez - scope-117    <-       [scope-114, 
scope-115]
 Tez vertex scope-128
 # Plan on vertex
-POValueOutputTez - scope-137   ->       [scope-120, scope-138, scope-142]
+POValueOutputTez - scope-137   ->       [scope-120, scope-138]
 |
 |---New For Each(false)[tuple] - scope-136
     |   |

Modified: 
pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld
URL: 
http://svn.apache.org/viewvc/pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld?rev=1682957&r1=1682956&r2=1682957&view=diff
==============================================================================
--- 
pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld
 (original)
+++ 
pig/branches/branch-0.15/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld
 Mon Jun  1 17:42:26 2015
@@ -7,7 +7,7 @@
 Tez vertex scope-29    ->      Tez vertex group scope-63,Tez vertex group 
scope-64,
 Tez vertex scope-30    ->      Tez vertex group scope-63,Tez vertex group 
scope-64,
 Tez vertex group scope-64      ->      Tez vertex scope-43,
-Tez vertex scope-43    ->      Tez vertex scope-35,Tez vertex scope-53,Tez 
vertex scope-57,
+Tez vertex scope-43    ->      Tez vertex scope-35,Tez vertex scope-53,
 Tez vertex group scope-63      ->      Tez vertex scope-53,
 Tez vertex scope-53    ->      Tez vertex scope-57,
 Tez vertex scope-35    ->      Tez vertex scope-57,
@@ -79,7 +79,7 @@ Tez vertex group scope-64     <-       [scope-29,
 # No plan on vertex group
 Tez vertex scope-43
 # Plan on vertex
-POValueOutputTez - scope-52    ->       [scope-35, scope-53, scope-57]
+POValueOutputTez - scope-52    ->       [scope-35, scope-53]
 |
 |---New For Each(false)[tuple] - scope-51
     |   |

Modified: 
pig/branches/branch-0.15/test/org/apache/pig/tez/TestTezAutoParallelism.java
URL: 
http://svn.apache.org/viewvc/pig/branches/branch-0.15/test/org/apache/pig/tez/TestTezAutoParallelism.java?rev=1682957&r1=1682956&r2=1682957&view=diff
==============================================================================
--- 
pig/branches/branch-0.15/test/org/apache/pig/tez/TestTezAutoParallelism.java 
(original)
+++ 
pig/branches/branch-0.15/test/org/apache/pig/tez/TestTezAutoParallelism.java 
Mon Jun  1 17:42:26 2015
@@ -246,6 +246,58 @@ public class TestTezAutoParallelism {
     }
 
     @Test
+    public void testSkewedFullJoinIncreaseParallelism() throws IOException{
+        // skewed full join parallelism take the initial setting, since the 
join vertex has a broadcast(sample) dependency,
+        // which prevent it changing parallelism
+        
pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION,
 "true");
+        
pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE,
 "3000");
+        
pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
 "40000");
+        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as 
(name:chararray, age:int);");
+        pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as 
(name:chararray, gender:chararray);");
+        pigServer.registerQuery("C = join A by name full, B by name using 
'skewed';");
+        pigServer.store("C", "output6");
+        FileSystem fs = cluster.getFileSystem();
+        FileStatus[] files = fs.listStatus(new Path("output5"), new 
PathFilter(){
+            @Override
+            public boolean accept(Path path) {
+                if (path.getName().startsWith("part")) {
+                    return true;
+                }
+                return false;
+            }
+        });
+        assertEquals(files.length, 5);
+    }
+
+    @Test
+    public void testSkewedJoinIncreaseParallelismWithScalar() throws 
IOException{
+        // skewed join parallelism take the initial setting, since the join 
vertex has a broadcast(scalar) dependency,
+        // which prevent it changing parallelism
+        
pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION,
 "true");
+        
pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE,
 "3000");
+        
pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
 "40000");
+        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as 
(name:chararray, age:int);");
+        pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as 
(name:chararray, gender:chararray);");
+        pigServer.registerQuery("C = join A by name, B by name using 
'skewed';");
+        pigServer.registerQuery("D = load 
'org.apache.pig.tez.TestTezAutoParallelism_1' as (name:chararray, age:int);");
+        pigServer.registerQuery("E = group D all;");
+        pigServer.registerQuery("F = foreach E generate COUNT(D) as count;");
+        pigServer.registerQuery("G = foreach C generate age/F.count, gender;");
+        pigServer.store("G", "output7");
+        FileSystem fs = cluster.getFileSystem();
+        FileStatus[] files = fs.listStatus(new Path("output7"), new 
PathFilter(){
+            @Override
+            public boolean accept(Path path) {
+                if (path.getName().startsWith("part")) {
+                    return true;
+                }
+                return false;
+            }
+        });
+        assertEquals(files.length, 4);
+    }
+
+    @Test
     public void testIncreaseIntermediateParallelism1() throws IOException{
         // User specified parallelism is overriden for intermediate step
         String outputDir = "/tmp/testIncreaseIntermediateParallelism";


Reply via email to