Author: daijy
Date: Mon Jun 1 17:43:02 2015
New Revision: 1682958
URL: http://svn.apache.org/r1682958
Log:
PIG-4580: Fix TestTezAutoParallelism.testSkewedJoinIncreaseParallelism test
failure
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-2.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15-OPTOFF.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16-OPTOFF.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld
pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1682958&r1=1682957&r2=1682958&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Jun 1 17:43:02 2015
@@ -82,6 +82,8 @@ PIG-4333: Split BigData tests into multi
BUG FIXES
+PIG-4580: Fix TestTezAutoParallelism.testSkewedJoinIncreaseParallelism test
failure (daijy)
+
PIG-4571: TestPigRunner.testGetHadoopCounters fail on Windows (daijy)
PIG-4541: Skewed full outer join does not return records if any relation is
empty. Outer join does not
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1682958&r1=1682957&r2=1682958&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
Mon Jun 1 17:43:02 2015
@@ -430,7 +430,7 @@ public class TezDagBuilder extends TezOp
in.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
out.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
- if (edge.dataMovementType!=DataMovementType.BROADCAST &&
to.getEstimatedParallelism()!=-1 && (to.isGlobalSort()||to.isSkewedJoin())) {
+ if (edge.dataMovementType!=DataMovementType.BROADCAST &&
to.getEstimatedParallelism()!=-1 && to.getVertexParallelism()==-1 &&
(to.isGlobalSort()||to.isSkewedJoin())) {
// Use custom edge
return EdgeProperty.create((EdgeManagerPluginDescriptor)null,
edge.dataSourceType, edge.schedulingType, out, in);
@@ -671,11 +671,15 @@ public class TezDagBuilder extends TezOp
// Set the right VertexManagerPlugin
if (tezOp.getEstimatedParallelism() != -1) {
if (tezOp.isGlobalSort()||tezOp.isSkewedJoin()) {
- // Set VertexManagerPlugin to PartitionerDefinedVertexManager,
which is able
- // to decrease/increase parallelism of sorting vertex
dynamically
- // based on the numQuantiles calculated by sample aggregation
vertex
- vmPluginName = PartitionerDefinedVertexManager.class.getName();
- log.info("Set VertexManagerPlugin to
PartitionerDefinedParallelismVertexManager for vertex " +
tezOp.getOperatorKey().toString());
+ if (tezOp.getVertexParallelism()==-1 && (
+ tezOp.isGlobalSort()
&&getPlan().getPredecessors(tezOp).size()==1||
+ tezOp.isSkewedJoin()
&&getPlan().getPredecessors(tezOp).size()==2)) {
+ // Set VertexManagerPlugin to
PartitionerDefinedVertexManager, which is able
+ // to decrease/increase parallelism of sorting vertex
dynamically
+ // based on the numQuantiles calculated by sample
aggregation vertex
+ vmPluginName =
PartitionerDefinedVertexManager.class.getName();
+ log.info("Set VertexManagerPlugin to
PartitionerDefinedParallelismVertexManager for vertex " +
tezOp.getOperatorKey().toString());
+ }
} else {
boolean containScatterGather = false;
boolean containCustomPartitioner = false;
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1682958&r1=1682957&r2=1682958&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
Mon Jun 1 17:43:02 2015
@@ -1639,6 +1639,7 @@ public class TezCompiler extends PhyPlan
List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
List<Boolean> flat = new ArrayList<Boolean>();
+ boolean containsOuter = false;
// Add corresponding POProjects
for (int i=0; i < 2; i++) {
ep = new PhysicalPlan();
@@ -1651,6 +1652,7 @@ public class TezCompiler extends PhyPlan
if (!inner[i]) {
// Add an empty bag for outer join
CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i),
true, IsFirstReduceOfKeyTez.class.getName());
+ containsOuter = true;
}
flat.add(true);
}
@@ -1681,14 +1683,16 @@ public class TezCompiler extends PhyPlan
POValueOutputTez sampleOut = (POValueOutputTez)
sampleJobPair.first.plan.getLeaves().get(0);
for (int i = 0; i <= 2; i++) {
- // We need to send sample to left relation partitioner vertex,
right relation load vertex,
- // and join vertex (IsFirstReduceOfKey in join vertex need
sample file as well)
- joinJobs[i].setSampleOperator(sampleJobPair.first);
-
- // Configure broadcast edges for distribution map
- edge = TezCompilerUtil.connect(tezPlan, sampleJobPair.first,
joinJobs[i]);
- TezCompilerUtil.configureValueOnlyTupleOutput(edge,
DataMovementType.BROADCAST);
-
sampleOut.addOutputKey(joinJobs[i].getOperatorKey().toString());
+ if (i != 2 || containsOuter) {
+ // We need to send sample to left relation partitioner
vertex, right relation load vertex,
+ // and join vertex (IsFirstReduceOfKey in join vertex need
sample file as well)
+ joinJobs[i].setSampleOperator(sampleJobPair.first);
+
+ // Configure broadcast edges for distribution map
+ edge = TezCompilerUtil.connect(tezPlan,
sampleJobPair.first, joinJobs[i]);
+ TezCompilerUtil.configureValueOnlyTupleOutput(edge,
DataMovementType.BROADCAST);
+
sampleOut.addOutputKey(joinJobs[i].getOperatorKey().toString());
+ }
// Configure skewed partitioner for join
if (i != 2) {
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java?rev=1682958&r1=1682957&r2=1682958&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
Mon Jun 1 17:43:02 2015
@@ -148,7 +148,12 @@ public class ParallelismSetter extends T
parallelism = tezOp.getEstimatedParallelism();
}
if (tezOp.isGlobalSort() || tezOp.isSkewedJoin()) {
- if (!overrideRequestedParallelism) {
+ boolean additionalEdge = false;
+ if (tezOp.isGlobalSort() &&
getPlan().getPredecessors(tezOp).size() != 1 ||
+ tezOp.isSkewedJoin() &&
getPlan().getPredecessors(tezOp).size() != 2) {
+ additionalEdge = true;
+ }
+ if (!overrideRequestedParallelism &&
!additionalEdge) {
incrementTotalParallelism(tezOp, parallelism);
// PartitionerDefinedVertexManager will
determine parallelism.
// So call setVertexParallelism with -1
@@ -168,6 +173,7 @@ public class ParallelismSetter extends T
ParallelConstantVisitor
visitor =
new
ParallelConstantVisitor(partitionerPred.plan, parallelism);
visitor.visit();
+
partitionerPred.setNeedEstimatedQuantile(false);
break;
}
}
Modified:
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-2.gld
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-2.gld?rev=1682958&r1=1682957&r2=1682958&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-2.gld
(original)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-2.gld
Mon Jun 1 17:43:02 2015
@@ -5,7 +5,7 @@
# TEZ DAG plan: pig-0_scope-0
#--------------------------------------------------
Tez vertex scope-25 -> Tez vertex scope-29,Tez vertex scope-38,Tez
vertex scope-48,
-Tez vertex scope-38 -> Tez vertex scope-29,Tez vertex scope-48,Tez
vertex scope-52,
+Tez vertex scope-38 -> Tez vertex scope-29,Tez vertex scope-48,
Tez vertex scope-48 -> Tez vertex scope-52,
Tez vertex scope-29 -> Tez vertex scope-52,
Tez vertex scope-52
@@ -55,7 +55,7 @@ a: Split - scope-58
|---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
Tez vertex scope-38
# Plan on vertex
-POValueOutputTez - scope-47 -> [scope-29, scope-48, scope-52]
+POValueOutputTez - scope-47 -> [scope-29, scope-48]
|
|---New For Each(false)[tuple] - scope-46
| |
Modified:
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld?rev=1682958&r1=1682957&r2=1682958&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld
(original)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld
Mon Jun 1 17:43:02 2015
@@ -5,7 +5,7 @@
# TEZ DAG plan: pig-0_scope-0
#--------------------------------------------------
Tez vertex scope-27 -> Tez vertex scope-36,Tez vertex scope-46,
-Tez vertex scope-36 -> Tez vertex scope-28,Tez vertex scope-46,Tez
vertex scope-50,
+Tez vertex scope-36 -> Tez vertex scope-28,Tez vertex scope-46,
Tez vertex scope-46 -> Tez vertex scope-50,
Tez vertex scope-28 -> Tez vertex scope-50,
Tez vertex scope-50
@@ -43,7 +43,7 @@ Local Rearrange[tuple]{tuple}(false) - s
|---a:
Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
Tez vertex scope-36
# Plan on vertex
-POValueOutputTez - scope-45 -> [scope-28, scope-46, scope-50]
+POValueOutputTez - scope-45 -> [scope-28, scope-46]
|
|---New For Each(false)[tuple] - scope-44
| |
Modified:
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15-OPTOFF.gld
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15-OPTOFF.gld?rev=1682958&r1=1682957&r2=1682958&view=diff
==============================================================================
---
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15-OPTOFF.gld
(original)
+++
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15-OPTOFF.gld
Mon Jun 1 17:43:02 2015
@@ -7,7 +7,7 @@
Tez vertex scope-30 -> Tez vertex scope-34,Tez vertex scope-36,
Tez vertex scope-34 -> Tez vertex scope-36,
Tez vertex scope-36 -> Tez vertex scope-48,Tez vertex scope-58,
-Tez vertex scope-48 -> Tez vertex scope-40,Tez vertex scope-58,Tez
vertex scope-62,
+Tez vertex scope-48 -> Tez vertex scope-40,Tez vertex scope-58,
Tez vertex scope-58 -> Tez vertex scope-62,
Tez vertex scope-40 -> Tez vertex scope-62,
Tez vertex scope-62
@@ -67,7 +67,7 @@ Local Rearrange[tuple]{tuple}(false) - s
|---POShuffledValueInputTez - scope-37 <- [scope-30,
scope-34]
Tez vertex scope-48
# Plan on vertex
-POValueOutputTez - scope-57 -> [scope-40, scope-58, scope-62]
+POValueOutputTez - scope-57 -> [scope-40, scope-58]
|
|---New For Each(false)[tuple] - scope-56
| |
Modified:
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15.gld
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15.gld?rev=1682958&r1=1682957&r2=1682958&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15.gld
(original)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15.gld
Mon Jun 1 17:43:02 2015
@@ -5,7 +5,7 @@
# TEZ DAG plan: pig-0_scope-0
#--------------------------------------------------
Tez vertex scope-30 -> Tez vertex scope-48,Tez vertex scope-58,
-Tez vertex scope-48 -> Tez vertex scope-40,Tez vertex scope-58,Tez
vertex scope-62,
+Tez vertex scope-48 -> Tez vertex scope-40,Tez vertex scope-58,
Tez vertex scope-58 -> Tez vertex scope-62,
Tez vertex scope-40 -> Tez vertex scope-62,
Tez vertex scope-62
@@ -71,7 +71,7 @@ a: Split - scope-68
|---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
Tez vertex scope-48
# Plan on vertex
-POValueOutputTez - scope-57 -> [scope-40, scope-58, scope-62]
+POValueOutputTez - scope-57 -> [scope-40, scope-58]
|
|---New For Each(false)[tuple] - scope-56
| |
Modified:
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16-OPTOFF.gld
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16-OPTOFF.gld?rev=1682958&r1=1682957&r2=1682958&view=diff
==============================================================================
---
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16-OPTOFF.gld
(original)
+++
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16-OPTOFF.gld
Mon Jun 1 17:43:02 2015
@@ -5,7 +5,7 @@
# TEZ DAG plan: pig-0_scope-0
#--------------------------------------------------
Tez vertex scope-30 -> Tez vertex scope-48,Tez vertex scope-58,
-Tez vertex scope-48 -> Tez vertex scope-37,Tez vertex scope-58,Tez
vertex scope-62,
+Tez vertex scope-48 -> Tez vertex scope-37,Tez vertex scope-58,
Tez vertex scope-58 -> Tez vertex scope-62,
Tez vertex scope-31 -> Tez vertex scope-35,Tez vertex scope-37,
Tez vertex scope-35 -> Tez vertex scope-37,
@@ -45,7 +45,7 @@ Local Rearrange[tuple]{tuple}(false) - s
|---d:
Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
Tez vertex scope-48
# Plan on vertex
-POValueOutputTez - scope-57 -> [scope-37, scope-58, scope-62]
+POValueOutputTez - scope-57 -> [scope-37, scope-58]
|
|---New For Each(false)[tuple] - scope-56
| |
Modified:
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16.gld
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16.gld?rev=1682958&r1=1682957&r2=1682958&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16.gld
(original)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16.gld
Mon Jun 1 17:43:02 2015
@@ -5,7 +5,7 @@
# TEZ DAG plan: pig-0_scope-0
#--------------------------------------------------
Tez vertex scope-30 -> Tez vertex scope-48,Tez vertex scope-58,
-Tez vertex scope-48 -> Tez vertex scope-31,Tez vertex scope-58,Tez
vertex scope-62,
+Tez vertex scope-48 -> Tez vertex scope-31,Tez vertex scope-58,
Tez vertex scope-58 -> Tez vertex scope-62,
Tez vertex scope-31 -> Tez vertex scope-62,
Tez vertex scope-62
@@ -43,7 +43,7 @@ Local Rearrange[tuple]{tuple}(false) - s
|---d:
Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
Tez vertex scope-48
# Plan on vertex
-POValueOutputTez - scope-57 -> [scope-31, scope-58, scope-62]
+POValueOutputTez - scope-57 -> [scope-31, scope-58]
|
|---New For Each(false)[tuple] - scope-56
| |
Modified:
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld?rev=1682958&r1=1682957&r2=1682958&view=diff
==============================================================================
---
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld
(original)
+++
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld
Mon Jun 1 17:43:02 2015
@@ -7,7 +7,7 @@
Tez vertex scope-114 -> Tez vertex scope-116,
Tez vertex scope-115 -> Tez vertex scope-116,
Tez vertex scope-116 -> Tez vertex scope-128,Tez vertex scope-138,
-Tez vertex scope-128 -> Tez vertex scope-120,Tez vertex scope-138,Tez
vertex scope-142,
+Tez vertex scope-128 -> Tez vertex scope-120,Tez vertex scope-138,
Tez vertex scope-138 -> Tez vertex scope-142,
Tez vertex scope-120 -> Tez vertex scope-142,
Tez vertex scope-142
@@ -65,7 +65,7 @@ Local Rearrange[tuple]{tuple}(false) - s
|---POShuffledValueInputTez - scope-117 <- [scope-114,
scope-115]
Tez vertex scope-128
# Plan on vertex
-POValueOutputTez - scope-137 -> [scope-120, scope-138, scope-142]
+POValueOutputTez - scope-137 -> [scope-120, scope-138]
|
|---New For Each(false)[tuple] - scope-136
| |
Modified:
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld?rev=1682958&r1=1682957&r2=1682958&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld
(original)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld
Mon Jun 1 17:43:02 2015
@@ -7,7 +7,7 @@
Tez vertex scope-29 -> Tez vertex group scope-63,Tez vertex group
scope-64,
Tez vertex scope-30 -> Tez vertex group scope-63,Tez vertex group
scope-64,
Tez vertex group scope-64 -> Tez vertex scope-43,
-Tez vertex scope-43 -> Tez vertex scope-35,Tez vertex scope-53,Tez
vertex scope-57,
+Tez vertex scope-43 -> Tez vertex scope-35,Tez vertex scope-53,
Tez vertex group scope-63 -> Tez vertex scope-53,
Tez vertex scope-53 -> Tez vertex scope-57,
Tez vertex scope-35 -> Tez vertex scope-57,
@@ -79,7 +79,7 @@ Tez vertex group scope-64 <- [scope-29,
# No plan on vertex group
Tez vertex scope-43
# Plan on vertex
-POValueOutputTez - scope-52 -> [scope-35, scope-53, scope-57]
+POValueOutputTez - scope-52 -> [scope-35, scope-53]
|
|---New For Each(false)[tuple] - scope-51
| |
Modified: pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java?rev=1682958&r1=1682957&r2=1682958&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java Mon Jun 1
17:43:02 2015
@@ -246,6 +246,58 @@ public class TestTezAutoParallelism {
}
@Test
+ public void testSkewedFullJoinIncreaseParallelism() throws IOException{
+ // skewed full join parallelism take the initial setting, since the
join vertex has a broadcast(sample) dependency,
+ // which prevent it changing parallelism
+
pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION,
"true");
+
pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE,
"3000");
+
pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
"40000");
+ pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as
(name:chararray, age:int);");
+ pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as
(name:chararray, gender:chararray);");
+ pigServer.registerQuery("C = join A by name full, B by name using
'skewed';");
+ pigServer.store("C", "output6");
+ FileSystem fs = cluster.getFileSystem();
+ FileStatus[] files = fs.listStatus(new Path("output5"), new
PathFilter(){
+ @Override
+ public boolean accept(Path path) {
+ if (path.getName().startsWith("part")) {
+ return true;
+ }
+ return false;
+ }
+ });
+ assertEquals(files.length, 5);
+ }
+
+ @Test
+ public void testSkewedJoinIncreaseParallelismWithScalar() throws
IOException{
+ // skewed join parallelism take the initial setting, since the join
vertex has a broadcast(scalar) dependency,
+ // which prevent it changing parallelism
+
pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION,
"true");
+
pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE,
"3000");
+
pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
"40000");
+ pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as
(name:chararray, age:int);");
+ pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as
(name:chararray, gender:chararray);");
+ pigServer.registerQuery("C = join A by name, B by name using
'skewed';");
+ pigServer.registerQuery("D = load
'org.apache.pig.tez.TestTezAutoParallelism_1' as (name:chararray, age:int);");
+ pigServer.registerQuery("E = group D all;");
+ pigServer.registerQuery("F = foreach E generate COUNT(D) as count;");
+ pigServer.registerQuery("G = foreach C generate age/F.count, gender;");
+ pigServer.store("G", "output7");
+ FileSystem fs = cluster.getFileSystem();
+ FileStatus[] files = fs.listStatus(new Path("output7"), new
PathFilter(){
+ @Override
+ public boolean accept(Path path) {
+ if (path.getName().startsWith("part")) {
+ return true;
+ }
+ return false;
+ }
+ });
+ assertEquals(files.length, 4);
+ }
+
+ @Test
public void testIncreaseIntermediateParallelism1() throws IOException{
// User specified parallelism is overriden for intermediate step
String outputDir = "/tmp/testIncreaseIntermediateParallelism";