Author: rohini
Date: Tue Nov 10 00:03:30 2015
New Revision: 1713571
URL: http://svn.apache.org/viewvc?rev=1713571&view=rev
Log:
PIG-4730: [Pig on Tez] Total parallelism estimation does not account load
parallelism (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
pig/trunk/test/org/apache/pig/test/Util.java
pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java
pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1713571&r1=1713570&r2=1713571&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Nov 10 00:03:30 2015
@@ -69,6 +69,8 @@ PIG-4639: Add better parser for Apache H
BUG FIXES
+PIG-4730: [Pig on Tez] Total parallelism estimation does not account load
parallelism (rohini)
+
PIG-4689: CSV Writes incorrect header if two CSV files are created in one
script (nielsbasjes via daijy)
PIG-4727: Incorrect types table for AVG in docs (nsmith via daijy)
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1713571&r1=1713570&r2=1713571&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
Tue Nov 10 00:03:30 2015
@@ -103,7 +103,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor;
import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
import org.apache.pig.backend.hadoop.executionengine.tez.util.SecurityHelper;
-import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
import
org.apache.pig.backend.hadoop.executionengine.tez.util.TezUDFContextSeparator;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
@@ -164,7 +163,7 @@ import org.apache.tez.runtime.library.in
* A visitor to construct DAG out of Tez plan.
*/
public class TezDagBuilder extends TezOpPlanVisitor {
- private static final Log log = LogFactory.getLog(TezJobCompiler.class);
+ private static final Log log = LogFactory.getLog(TezDagBuilder.class);
private DAG dag;
private Map<String, LocalResource> localResources;
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java?rev=1713571&r1=1713570&r2=1713571&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
Tue Nov 10 00:03:30 2015
@@ -23,7 +23,6 @@ import java.net.URI;
import java.util.HashMap;
import java.util.Map;
-import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -111,6 +110,7 @@ public class TezJobCompiler {
}
DAG tezDag = buildDAG(tezPlanNode, localResources);
tezDag.setDAGInfo(createDagInfo(TezScriptState.get().getScript()));
+ log.info("Total estimated parallelism is " +
tezPlan.getEstimatedTotalParallelism());
return new TezJob(tezConf, tezDag, localResources,
tezPlan.getEstimatedTotalParallelism());
} catch (Exception e) {
int errCode = 2017;
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java?rev=1713571&r1=1713570&r2=1713571&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
Tue Nov 10 00:03:30 2015
@@ -81,6 +81,7 @@ public class ParallelismSetter extends T
// requestedParallelism of Loader vertex is handled in
LoaderProcessor
// propogate to vertexParallelism
tezOp.setVertexParallelism(tezOp.getRequestedParallelism());
+ incrementTotalParallelism(tezOp,
tezOp.getRequestedParallelism());
return;
} else {
int prevParallelism = -1;
Modified: pig/trunk/test/org/apache/pig/test/Util.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/Util.java?rev=1713571&r1=1713570&r2=1713571&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ pig/trunk/test/org/apache/pig/test/Util.java Tue Nov 10 00:03:30 2015
@@ -1382,19 +1382,21 @@ public class Util {
}
public static void createLogAppender(String appenderName, Writer writer,
Class...clazzes) {
+ WriterAppender writerAppender = new WriterAppender(new
PatternLayout("%d [%t] %-5p %c %x - %m%n"), writer);
+ writerAppender.setName(appenderName);
for (Class clazz : clazzes) {
Logger logger = Logger.getLogger(clazz);
- WriterAppender writerAppender = new WriterAppender(new
PatternLayout("%d [%t] %-5p %c %x - %m%n"), writer);
- writerAppender.setName(appenderName);
logger.addAppender(writerAppender);
}
}
- public static void removeLogAppender(Class clazz, String appenderName) {
- Logger logger = Logger.getLogger(clazz);
- Appender appender = logger.getAppender(appenderName);
- appender.close();
- logger.removeAppender(appenderName);
+ public static void removeLogAppender(String appenderName, Class...clazzes)
{
+ for (Class clazz : clazzes) {
+ Logger logger = Logger.getLogger(clazz);
+ Appender appender = logger.getAppender(appenderName);
+ appender.close();
+ logger.removeAppender(appenderName);
+ }
}
public static Path getFirstPartFile(Path path) throws Exception {
Modified: pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java?rev=1713571&r1=1713570&r2=1713571&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java Tue Nov 10
00:03:30 2015
@@ -40,6 +40,7 @@ import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezJobCompiler;
import
org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.ParallelismSetter;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -311,6 +312,7 @@ public class TestTezAutoParallelism {
// Parallelism of C should be increased
assertTrue(log.contains("Increased requested parallelism of scope-59
to 4"));
assertEquals(1, StringUtils.countMatches(log, "Increased requested
parallelism"));
+ assertTrue(log.contains("Total estimated parallelism is 52"));
}
@Test
@@ -349,6 +351,7 @@ public class TestTezAutoParallelism {
// Parallelism of C1 should be increased. C2 will not be increased
due to order by
assertEquals(1, StringUtils.countMatches(log, "Increased requested
parallelism"));
assertTrue(log.contains("Increased requested parallelism of
scope-65 to 10"));
+ assertTrue(log.contains("Total estimated parallelism is 19"));
} finally {
pigServer.setDefaultParallel(-1);
}
@@ -359,7 +362,7 @@ public class TestTezAutoParallelism {
PigServer.resetScope();
StringWriter writer = new StringWriter();
// When there is a combiner operation involved user specified
parallelism is overriden
- Util.createLogAppender("testIncreaseIntermediateParallelism", writer,
ParallelismSetter.class);
+ Util.createLogAppender("testIncreaseIntermediateParallelism", writer,
ParallelismSetter.class, TezJobCompiler.class);
try {
pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION,
"true");
pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE,
"4000");
@@ -387,7 +390,7 @@ public class TestTezAutoParallelism {
}
return writer.toString();
} finally {
- Util.removeLogAppender(ParallelismSetter.class,
"testIncreaseIntermediateParallelism");
+ Util.removeLogAppender("testIncreaseIntermediateParallelism",
ParallelismSetter.class, TezJobCompiler.class);
Util.deleteFile(cluster, outputDir);
}
}
Modified: pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java?rev=1713571&r1=1713570&r2=1713571&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java Tue Nov 10
00:03:30 2015
@@ -145,8 +145,7 @@ public class TestTezGraceParallelism {
assertTrue(writer.toString().contains("Reduce auto parallelism for
vertex: scope-52 to 1 from 20"));
assertTrue(writer.toString().contains("Reduce auto parallelism for
vertex: scope-61 to 1 from 100"));
} finally {
- Util.removeLogAppender(PigGraceShuffleVertexManager.class,
"testDecreaseParallelism");
- Util.removeLogAppender(ShuffleVertexManager.class,
"testDecreaseParallelism");
+ Util.removeLogAppender("testDecreaseParallelism",
PigGraceShuffleVertexManager.class, ShuffleVertexManager.class);
}
}
@@ -186,8 +185,7 @@ public class TestTezGraceParallelism {
// There are randomness in which task finishes first, so the auto
parallelism could result different result
assertTrue(Pattern.compile("Reduce auto parallelism for vertex:
scope-64 to (\\d+)* from 50").matcher(writer.toString()).find());
} finally {
- Util.removeLogAppender(PigGraceShuffleVertexManager.class,
"testIncreaseParallelism");
- Util.removeLogAppender(ShuffleVertexManager.class,
"testIncreaseParallelism");
+ Util.removeLogAppender("testIncreaseParallelism",
PigGraceShuffleVertexManager.class, ShuffleVertexManager.class);
}
}
@@ -222,7 +220,7 @@ public class TestTezGraceParallelism {
assertTrue(writer.toString().contains("All predecessors for
scope-84 are finished, time to set parallelism for scope-85"));
assertTrue(writer.toString().contains("Initialize parallelism for
scope-85 to 101"));
} finally {
- Util.removeLogAppender(PigGraceShuffleVertexManager.class,
"testJoinWithDifferentDepth");
+ Util.removeLogAppender("testJoinWithDifferentDepth",
PigGraceShuffleVertexManager.class);
}
}
@@ -252,7 +250,7 @@ public class TestTezGraceParallelism {
assertEquals(count, 1000);
assertFalse(writer.toString().contains("scope-68"));
} finally {
- Util.removeLogAppender(PigGraceShuffleVertexManager.class,
"testJoinWithDifferentDepth2");
+ Util.removeLogAppender("testJoinWithDifferentDepth2",
PigGraceShuffleVertexManager.class);
}
}
@@ -285,7 +283,7 @@ public class TestTezGraceParallelism {
assertTrue(writer.toString().contains("time to set parallelism for
scope-41"));
assertTrue(writer.toString().contains("time to set parallelism for
scope-54"));
} finally {
- Util.removeLogAppender(PigGraceShuffleVertexManager.class,
"testJoinWithUnion");
+ Util.removeLogAppender("testJoinWithUnion",
PigGraceShuffleVertexManager.class);
}
}