Author: rohini
Date: Mon Mar 23 20:36:52 2015
New Revision: 1668723
URL: http://svn.apache.org/r1668723
Log:
PIG-4474: Increasing intermediate parallelism has issue with default
parallelism (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1668723&r1=1668722&r2=1668723&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Mar 23 20:36:52 2015
@@ -54,6 +54,8 @@ PIG-4333: Split BigData tests into multi
BUG FIXES
+PIG-4474: Increasing intermediate parallelism has issue with default
parallelism (rohini)
+
PIG-4465: Pig streaming ship fails for relative paths on Tez (rohini)
PIG-4461: Use benchmarks for Windows Pig e2e tests (nmaheshwari via daijy)
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1668723&r1=1668722&r2=1668723&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
Mon Mar 23 20:36:52 2015
@@ -262,9 +262,10 @@ public class TezOperator extends Operato
this.estimatedParallelism = estimatedParallelism;
}
- public int getEffectiveParallelism() {
+ public int getEffectiveParallelism(int defaultParallelism) {
// PIG-4162: For intermediate reducers, use estimated parallelism over
user set parallelism.
- return getEstimatedParallelism() == -1 ? getRequestedParallelism()
+ return getEstimatedParallelism() == -1
+ ? (getRequestedParallelism() == -1 ? defaultParallelism :
getRequestedParallelism())
: getEstimatedParallelism();
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java?rev=1668723&r1=1668722&r2=1668723&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
Mon Mar 23 20:36:52 2015
@@ -99,7 +99,7 @@ public class ParallelismSetter extends T
for (Map.Entry<OperatorKey,TezEdgeDescriptor> entry :
tezOp.inEdges.entrySet()) {
if (entry.getValue().dataMovementType ==
DataMovementType.ONE_TO_ONE) {
TezOperator pred = mPlan.getOperator(entry.getKey());
- parallelism = pred.getEffectiveParallelism();
+ parallelism =
pred.getEffectiveParallelism(pc.defaultParallel);
if (prevParallelism == -1) {
prevParallelism = parallelism;
} else if (prevParallelism != parallelism) {
@@ -133,6 +133,9 @@ public class ParallelismSetter extends T
// if it is intermediate reducer
parallelism = estimator.estimateParallelism(mPlan,
tezOp, conf);
if (overrideRequestedParallelism) {
+ if (tezOp.getRequestedParallelism() !=
parallelism) {
+ LOG.info("Increased requested parallelism
of " + tezOp.getOperatorKey() + " to " + parallelism);
+ }
tezOp.setRequestedParallelism(parallelism);
} else {
tezOp.setEstimatedParallelism(parallelism);
@@ -157,8 +160,7 @@ public class ParallelismSetter extends T
if (pred.isSampleBasedPartitioner()) {
for (TezOperator partitionerPred :
mPlan.getPredecessors(pred)) {
if
(partitionerPred.isSampleAggregation()) {
- LOG.debug("Updating constant
value to " + parallelism + " in " + partitionerPred.plan);
- LOG.info("Increased requested
parallelism of " + partitionerPred.getOperatorKey() + " to " + parallelism);
+ LOG.debug("Updating
parallelism constant value to " + parallelism + " in " + partitionerPred.plan);
ParallelConstantVisitor
visitor =
new
ParallelConstantVisitor(partitionerPred.plan, parallelism);
visitor.visit();
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java?rev=1668723&r1=1668722&r2=1668723&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
Mon Mar 23 20:36:52 2015
@@ -111,7 +111,7 @@ public class TezOperDependencyParallelis
// and sample/scalar (does not impact parallelism)
if
(entry.getValue().dataMovementType==DataMovementType.SCATTER_GATHER ||
entry.getValue().dataMovementType==DataMovementType.ONE_TO_ONE) {
- double predParallelism = pred.getEffectiveParallelism();
+ double predParallelism =
pred.getEffectiveParallelism(pc.defaultParallel);
if (predParallelism==-1) {
throw new IOException("Cannot estimate parallelism for " +
tezOper.getOperatorKey().toString()
+ ", effective parallelism for predecessor " +
tezOper.getOperatorKey().toString()
Modified: pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java?rev=1668723&r1=1668722&r2=1668723&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java Mon Mar 23
20:36:52 2015
@@ -20,6 +20,7 @@ package org.apache.pig.tez;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
@@ -30,6 +31,7 @@ import java.util.List;
import java.util.Properties;
import java.util.Random;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -244,22 +246,79 @@ public class TestTezAutoParallelism {
}
@Test
- public void testSkewedJoinIncreaseIntermediateParallelism() throws
IOException{
+ public void testIncreaseIntermediateParallelism1() throws IOException{
+ // User specified parallelism is overriden for intermediate step
+ String outputDir = "/tmp/testIncreaseIntermediateParallelism";
+ String script = "A = load '" + INPUT_FILE1 + "' as (name:chararray,
age:int);"
+ + "B = load '" + INPUT_FILE2 + "' as (name:chararray,
gender:chararray);"
+ + "C = join A by name, B by name using 'skewed' parallel 1;"
+ + "D = group C by A::name;"
+ + "E = foreach D generate group, COUNT(C.A::name);"
+ + "STORE E into '" + outputDir + "/finalout';";
+ String log = testIncreaseIntermediateParallelism(script, outputDir,
true);
+ // Parallelism of C should be increased
+ assertTrue(log.contains("Increased requested parallelism of scope-54
to 4"));
+ assertEquals(1, StringUtils.countMatches(log, "Increased requested
parallelism"));
+ }
+
+ @Test
+ public void testIncreaseIntermediateParallelism2() throws IOException{
+ // User specified parallelism should not be overriden for intermediate
step if there is a STORE
+ String outputDir = "/tmp/testIncreaseIntermediateParallelism";
+ String script = "A = load '" + INPUT_FILE1 + "' as (name:chararray,
age:int);"
+ + "B = load '" + INPUT_FILE2 + "' as (name:chararray,
gender:chararray);"
+ + "C = join A by name, B by name using 'skewed' parallel 2;"
+ + "STORE C into '/tmp/testIncreaseIntermediateParallelism';"
+ + "D = group C by A::name parallel 2;"
+ + "E = foreach D generate group, COUNT(C.A::name);"
+ + "STORE E into '" + outputDir + "/finalout';";
+ String log = testIncreaseIntermediateParallelism(script, outputDir,
true);
+ // Parallelism of C will not be increased as the Split has a STORE
+ assertEquals(0, StringUtils.countMatches(log, "Increased requested
parallelism"));
+ }
+
+ @Test
+ public void testIncreaseIntermediateParallelism3() throws IOException{
+ // Multiple levels with default parallelism. Group by followed by
Group by
+ try {
+ String outputDir = "/tmp/testIncreaseIntermediateParallelism";
+ String script = "set default_parallel 1\n"
+ + "A = load '" + INPUT_FILE1 + "' as (name:chararray,
age:int);"
+ + "B = load '" + INPUT_FILE2 + "' as (name:chararray,
gender:chararray);"
+ + "C = join A by name, B by name;"
+ + "STORE C into
'/tmp/testIncreaseIntermediateParallelism';"
+ + "C1 = group C by A::name;"
+ + "C2 = FOREACH C1 generate group, FLATTEN(C);"
+ + "D = group C2 by group;"
+ + "E = foreach D generate group, COUNT(C2.A::name);"
+ + "F = order E by $0;"
+ + "STORE E into '" + outputDir + "/finalout';";
+ String log = testIncreaseIntermediateParallelism(script,
outputDir, false);
+ // Parallelism of C1 should be increased. C2 will not be increased
due to order by
+ assertEquals(1, StringUtils.countMatches(log, "Increased requested
parallelism"));
+ assertTrue(log.contains("Increased requested parallelism of
scope-63 to 10"));
+ } finally {
+ pigServer.setDefaultParallel(-1);
+ }
+ }
+
+ private String testIncreaseIntermediateParallelism(String script, String
outputDir, boolean sortAndCheck) throws IOException {
NodeIdGenerator.reset();
PigServer.resetScope();
StringWriter writer = new StringWriter();
// When there is a combiner operation involved user specified
parallelism is overriden
- Util.createLogAppender(ParallelismSetter.class,
"testSkewedJoinIncreaseIntermediateParallelism", writer);
+ Util.createLogAppender(ParallelismSetter.class,
"testIncreaseIntermediateParallelism", writer);
try {
pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION,
"true");
pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE,
"4000");
pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
"80000");
- pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as
(name:chararray, age:int);");
- pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as
(name:chararray, gender:chararray);");
- pigServer.registerQuery("C = join A by name, B by name using
'skewed' parallel 1;");
- pigServer.registerQuery("D = group C by A::name;");
- pigServer.registerQuery("E = foreach D generate group,
COUNT(C.A::name);");
- Iterator<Tuple> iter = pigServer.openIterator("E");
+ pigServer.setBatchOn();
+ pigServer.registerScript(new
ByteArrayInputStream(script.getBytes()));
+ pigServer.executeBatch();
+
+ pigServer.registerQuery("A = load '" + outputDir + "/finalout' as
(name:chararray, count:long);");
+ Iterator<Tuple> iter = pigServer.openIterator("A");
+
List<Tuple> expectedResults = Util
.getTuplesFromConstantTupleStrings(new String[] {
"('Abigail',56L)", "('Alexander',45L)",
"('Ava',60L)",
@@ -269,11 +328,15 @@ public class TestTezAutoParallelism {
"('Liam',46L)", "('Madison',46L)", "('Mason',54L)",
"('Mia',51L)", "('Michael',47L)", "('Noah',38L)",
"('Olivia',50L)", "('Sophia',52L)",
"('William',43L)" });
-
- Util.checkQueryOutputsAfterSort(iter, expectedResults);
- assertTrue(writer.toString().contains("Increased requested
parallelism of scope-40 to 4"));
+ if (sortAndCheck) {
+ Util.checkQueryOutputsAfterSort(iter, expectedResults);
+ } else {
+ Util.checkQueryOutputs(iter, expectedResults);
+ }
+ return writer.toString();
} finally {
- Util.removeLogAppender(ParallelismSetter.class,
"testSkewedJoinIncreaseIntermediateParallelism");
+ Util.removeLogAppender(ParallelismSetter.class,
"testIncreaseIntermediateParallelism");
+ Util.deleteFile(cluster, outputDir);
}
}
}