Author: rohini
Date: Tue Nov 10 00:03:30 2015
New Revision: 1713571

URL: http://svn.apache.org/viewvc?rev=1713571&view=rev
Log:
PIG-4730: [Pig on Tez] Total parallelism estimation does not account load 
parallelism (rohini)

Modified:
    pig/trunk/CHANGES.txt
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
    pig/trunk/test/org/apache/pig/test/Util.java
    pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java
    pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1713571&r1=1713570&r2=1713571&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Nov 10 00:03:30 2015
@@ -69,6 +69,8 @@ PIG-4639: Add better parser for Apache H
 
 BUG FIXES
 
+PIG-4730: [Pig on Tez] Total parallelism estimation does not account load 
parallelism (rohini)
+
 PIG-4689: CSV Writes incorrect header if two CSV files are created in one 
script (nielsbasjes via daijy)
 
 PIG-4727: Incorrect types table for AVG in docs (nsmith via daijy)

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1713571&r1=1713570&r2=1713571&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
 Tue Nov 10 00:03:30 2015
@@ -103,7 +103,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.SecurityHelper;
-import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.util.TezUDFContextSeparator;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
@@ -164,7 +163,7 @@ import org.apache.tez.runtime.library.in
  * A visitor to construct DAG out of Tez plan.
  */
 public class TezDagBuilder extends TezOpPlanVisitor {
-    private static final Log log = LogFactory.getLog(TezJobCompiler.class);
+    private static final Log log = LogFactory.getLog(TezDagBuilder.class);
 
     private DAG dag;
     private Map<String, LocalResource> localResources;

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java?rev=1713571&r1=1713570&r2=1713571&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
 Tue Nov 10 00:03:30 2015
@@ -23,7 +23,6 @@ import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -111,6 +110,7 @@ public class TezJobCompiler {
             }
             DAG tezDag = buildDAG(tezPlanNode, localResources);
             tezDag.setDAGInfo(createDagInfo(TezScriptState.get().getScript()));
+            log.info("Total estimated parallelism is " + 
tezPlan.getEstimatedTotalParallelism());
             return new TezJob(tezConf, tezDag, localResources, 
tezPlan.getEstimatedTotalParallelism());
         } catch (Exception e) {
             int errCode = 2017;

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java?rev=1713571&r1=1713570&r2=1713571&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
 Tue Nov 10 00:03:30 2015
@@ -81,6 +81,7 @@ public class ParallelismSetter extends T
                 // requestedParallelism of Loader vertex is handled in 
LoaderProcessor
                 // propogate to vertexParallelism
                 tezOp.setVertexParallelism(tezOp.getRequestedParallelism());
+                incrementTotalParallelism(tezOp, 
tezOp.getRequestedParallelism());
                 return;
             } else {
                 int prevParallelism = -1;

Modified: pig/trunk/test/org/apache/pig/test/Util.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/Util.java?rev=1713571&r1=1713570&r2=1713571&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ pig/trunk/test/org/apache/pig/test/Util.java Tue Nov 10 00:03:30 2015
@@ -1382,19 +1382,21 @@ public class Util {
     }
 
     public static void createLogAppender(String appenderName, Writer writer, 
Class...clazzes) {
+        WriterAppender writerAppender = new WriterAppender(new 
PatternLayout("%d [%t] %-5p %c %x - %m%n"), writer);
+        writerAppender.setName(appenderName);
         for (Class clazz : clazzes) {
             Logger logger = Logger.getLogger(clazz);
-            WriterAppender writerAppender = new WriterAppender(new 
PatternLayout("%d [%t] %-5p %c %x - %m%n"), writer);
-            writerAppender.setName(appenderName);
             logger.addAppender(writerAppender);
         }
     }
 
-    public static void removeLogAppender(Class clazz, String appenderName) {
-        Logger logger = Logger.getLogger(clazz);
-        Appender appender = logger.getAppender(appenderName);
-        appender.close();
-        logger.removeAppender(appenderName);
+    public static void removeLogAppender(String appenderName, Class...clazzes) 
{
+        for (Class clazz : clazzes) {
+            Logger logger = Logger.getLogger(clazz);
+            Appender appender = logger.getAppender(appenderName);
+            appender.close();
+            logger.removeAppender(appenderName);
+        }
     }
 
     public static Path getFirstPartFile(Path path) throws Exception {

Modified: pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java?rev=1713571&r1=1713570&r2=1713571&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java Tue Nov 10 
00:03:30 2015
@@ -40,6 +40,7 @@ import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezJobCompiler;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.ParallelismSetter;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -311,6 +312,7 @@ public class TestTezAutoParallelism {
         // Parallelism of C should be increased
         assertTrue(log.contains("Increased requested parallelism of scope-59 
to 4"));
         assertEquals(1, StringUtils.countMatches(log, "Increased requested 
parallelism"));
+        assertTrue(log.contains("Total estimated parallelism is 52"));
     }
 
     @Test
@@ -349,6 +351,7 @@ public class TestTezAutoParallelism {
             // Parallelism of C1 should be increased. C2 will not be increased 
due to order by
             assertEquals(1, StringUtils.countMatches(log, "Increased requested 
parallelism"));
             assertTrue(log.contains("Increased requested parallelism of 
scope-65 to 10"));
+            assertTrue(log.contains("Total estimated parallelism is 19"));
         } finally {
             pigServer.setDefaultParallel(-1);
         }
@@ -359,7 +362,7 @@ public class TestTezAutoParallelism {
         PigServer.resetScope();
         StringWriter writer = new StringWriter();
         // When there is a combiner operation involved user specified 
parallelism is overriden
-        Util.createLogAppender("testIncreaseIntermediateParallelism", writer, 
ParallelismSetter.class);
+        Util.createLogAppender("testIncreaseIntermediateParallelism", writer, 
ParallelismSetter.class, TezJobCompiler.class);
         try {
             
pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION,
 "true");
             
pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE,
 "4000");
@@ -387,7 +390,7 @@ public class TestTezAutoParallelism {
             }
             return writer.toString();
         } finally {
-            Util.removeLogAppender(ParallelismSetter.class, 
"testIncreaseIntermediateParallelism");
+            Util.removeLogAppender("testIncreaseIntermediateParallelism", 
ParallelismSetter.class, TezJobCompiler.class);
             Util.deleteFile(cluster, outputDir);
         }
     }

Modified: pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java?rev=1713571&r1=1713570&r2=1713571&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java Tue Nov 10 
00:03:30 2015
@@ -145,8 +145,7 @@ public class TestTezGraceParallelism {
             assertTrue(writer.toString().contains("Reduce auto parallelism for 
vertex: scope-52 to 1 from 20"));
             assertTrue(writer.toString().contains("Reduce auto parallelism for 
vertex: scope-61 to 1 from 100"));
         } finally {
-            Util.removeLogAppender(PigGraceShuffleVertexManager.class, 
"testDecreaseParallelism");
-            Util.removeLogAppender(ShuffleVertexManager.class, 
"testDecreaseParallelism");
+            Util.removeLogAppender("testDecreaseParallelism", 
PigGraceShuffleVertexManager.class, ShuffleVertexManager.class);
         }
     }
 
@@ -186,8 +185,7 @@ public class TestTezGraceParallelism {
             // There are randomness in which task finishes first, so the auto 
parallelism could result different result
             assertTrue(Pattern.compile("Reduce auto parallelism for vertex: 
scope-64 to (\\d+)* from 50").matcher(writer.toString()).find());
         } finally {
-            Util.removeLogAppender(PigGraceShuffleVertexManager.class, 
"testIncreaseParallelism");
-            Util.removeLogAppender(ShuffleVertexManager.class, 
"testIncreaseParallelism");
+            Util.removeLogAppender("testIncreaseParallelism", 
PigGraceShuffleVertexManager.class, ShuffleVertexManager.class);
         }
     }
 
@@ -222,7 +220,7 @@ public class TestTezGraceParallelism {
             assertTrue(writer.toString().contains("All predecessors for 
scope-84 are finished, time to set parallelism for scope-85"));
             assertTrue(writer.toString().contains("Initialize parallelism for 
scope-85 to 101"));
         } finally {
-            Util.removeLogAppender(PigGraceShuffleVertexManager.class, 
"testJoinWithDifferentDepth");
+            Util.removeLogAppender("testJoinWithDifferentDepth", 
PigGraceShuffleVertexManager.class);
         }
     }
 
@@ -252,7 +250,7 @@ public class TestTezGraceParallelism {
             assertEquals(count, 1000);
             assertFalse(writer.toString().contains("scope-68"));
         } finally {
-            Util.removeLogAppender(PigGraceShuffleVertexManager.class, 
"testJoinWithDifferentDepth2");
+            Util.removeLogAppender("testJoinWithDifferentDepth2", 
PigGraceShuffleVertexManager.class);
         }
     }
 
@@ -285,7 +283,7 @@ public class TestTezGraceParallelism {
             assertTrue(writer.toString().contains("time to set parallelism for 
scope-41"));
             assertTrue(writer.toString().contains("time to set parallelism for 
scope-54"));
         } finally {
-            Util.removeLogAppender(PigGraceShuffleVertexManager.class, 
"testJoinWithUnion");
+            Util.removeLogAppender("testJoinWithUnion", 
PigGraceShuffleVertexManager.class);
         }
     }
 


Reply via email to