Author: rohini
Date: Tue Mar 22 10:39:53 2016
New Revision: 1736179

URL: http://svn.apache.org/viewvc?rev=1736179&view=rev
Log:
PIG-4843: Turn off combiner in reducer vertex for Tez if bags are in combine 
plan (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/conf/pig.properties
    pig/trunk/src/org/apache/pig/PigConfiguration.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
    pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1736179&r1=1736178&r2=1736179&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Mar 22 10:39:53 2016
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-4843: Turn off combiner in reducer vertex for Tez if bags are in combine 
plan (rohini)
+
 PIG-4796: Authenticate with Kerberos using a keytab file (nielsbasjes via 
daijy)
 
 PIG-4817: Bump HTTP Logparser to version 2.4 (nielsbasjes via daijy)

Modified: pig/trunk/conf/pig.properties
URL: 
http://svn.apache.org/viewvc/pig/trunk/conf/pig.properties?rev=1736179&r1=1736178&r2=1736179&view=diff
==============================================================================
--- pig/trunk/conf/pig.properties (original)
+++ pig/trunk/conf/pig.properties Tue Mar 22 10:39:53 2016
@@ -331,6 +331,15 @@
 #
 # pig.exec.nocombiner=false
 
+# Enable or disable use of combiners only in reducer shuffle-merge phase. 
+# pig.exec.nocombiner turns off combiner for both map and reduce phases. 
+# Valid values are auto, true or false. Default is auto in which Pig turns off 
combiner
+# on per combine plan basis when bags are present in a particular plan.
+# Value of true or false will apply to all combine plans in the script.
+# Currently this only applies to Tez as Mapreduce does not run combiners in 
reducer (MAPREDUCE-5221).
+
+# pig.exec.nocombiner.reducer=auto
+
 # EXPERIMENTAL: Aggregate records in map task before sending to the combiner?
 # (default: false, 10; recommended: true, 10). In cases where there is a 
massive
 # reduction of data in the aggregation step, pig can do a first pass of

Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1736179&r1=1736178&r2=1736179&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Tue Mar 22 10:39:53 2016
@@ -94,6 +94,13 @@ public class PigConfiguration {
     public static final String PIG_EXEC_NO_COMBINER = "pig.exec.nocombiner";
 
     /**
+     * Enable or disable use of combiners in reducer shuffle-merge phase in 
Tez.
+     * Valid values are auto, true or false.
+     * Default is auto which turns off combiner if bags are present in the 
combine plan
+     */
+    public static final String PIG_EXEC_NO_COMBINER_REDUCER = 
"pig.exec.nocombiner.reducer";
+
+    /**
      * This key controls whether secondary sort key is used for optimization 
in case
      * of nested distinct or sort
      */
@@ -328,7 +335,7 @@ public class PigConfiguration {
      * Set the threshold for percentage of errors
      */
     public static final String PIG_ERROR_THRESHOLD_PERCENT = 
"pig.error.threshold.percent";
-    
+
     /**
      * Comma-delimited entries of commands/operators that must be disallowed.
      * This is a security feature to be used by administrators to block use of

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1736179&r1=1736178&r2=1736179&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
 Tue Mar 22 10:39:53 2016
@@ -487,6 +487,11 @@ public class POUserFunc extends Expressi
         }
     }
 
+    public Type getOriginalReturnType() throws ExecException {
+        instantiateFunc(origFSpec);
+        return func.getReturnType();
+    }
+
     public Type getReturnType() {
         return func.getReturnType();
     }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1736179&r1=1736178&r2=1736179&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
 Tue Mar 22 10:39:53 2016
@@ -103,6 +103,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.SecurityHelper;
+import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.util.TezUDFContextSeparator;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
@@ -507,8 +508,28 @@ public class TezDagBuilder extends TezOp
                     edge.partitionerClass.getName());
         }
 
-        in.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
-        out.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
+        UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
+        out.setUserPayload(payLoad);
+
+        if (!combinePlan.isEmpty()) {
+            boolean noCombineInReducer = false;
+            String reducerNoCombiner = 
globalConf.get(PigConfiguration.PIG_EXEC_NO_COMBINER_REDUCER);
+            if (reducerNoCombiner == null || reducerNoCombiner.equals("auto")) 
{
+                noCombineInReducer = 
TezCompilerUtil.bagDataTypeInCombinePlan(combinePlan);
+            } else {
+                noCombineInReducer = Boolean.parseBoolean(reducerNoCombiner);
+            }
+            if (noCombineInReducer) {
+                log.info("Turning off combiner in reducer vertex " + 
to.getOperatorKey() + " for edge from " + from.getOperatorKey());
+                conf.unset(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS);
+                conf.unset(MRJobConfig.COMBINE_CLASS_ATTR);
+                conf.unset("pig.combinePlan");
+                conf.unset("pig.combine.package");
+                conf.unset("pig.map.keytype");
+                payLoad = TezUtils.createUserPayloadFromConf(conf);
+            }
+        }
+        in.setUserPayload(payLoad);
 
         if (edge.dataMovementType!=DataMovementType.BROADCAST && 
to.getEstimatedParallelism()!=-1 && to.getVertexParallelism()==-1 && 
(to.isGlobalSort()||to.isSkewedJoin())) {
             // Use custom edge

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1736179&r1=1736178&r2=1736179&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
 Tue Mar 22 10:39:53 2016
@@ -18,6 +18,7 @@
 package org.apache.pig.backend.hadoop.executionengine.tez.util;
 
 import java.io.IOException;
+import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -42,6 +43,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
 import org.apache.pig.builtin.RoundRobinPartitioner;
+import org.apache.pig.builtin.TOBAG;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
@@ -273,4 +275,28 @@ public class TezCompilerUtil {
         edge.setIntermediateOutputValueClass(TUPLE_CLASS);
     }
 
+    public static boolean bagDataTypeInCombinePlan(PhysicalPlan combinePlan) 
throws ExecException {
+        PhysicalOperator lr = combinePlan.getLeaves().get(0);
+        POForEach fe = (POForEach) combinePlan.getPredecessors(lr).get(0);
+
+        // Hack. class.getTypeName() is only available in JDK8
+        Type dataBagType = new TOBAG().getReturnType();
+
+        List<PhysicalPlan> inputPlans = fe.getInputPlans();
+        for (PhysicalPlan inputPlan: inputPlans) {
+            PhysicalOperator leaf = inputPlan.getLeaves().get(0);
+            if (leaf.getResultType() == DataType.BAG) {
+                return true;
+            } else if (leaf instanceof POUserFunc) {
+                POUserFunc func = (POUserFunc) leaf;
+                // Return type of Intermediate func in combiner plan is always 
Tuple.
+                // Need to check original or Final EvalFunc return type
+                if (dataBagType.equals(func.getOriginalReturnType())) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
 }

Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java?rev=1736179&r1=1736178&r2=1736179&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java Tue Mar 22 
10:39:53 2016
@@ -22,6 +22,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -1057,6 +1058,10 @@ public class TestEvalPipeline {
         }
 
         Util.createInputFile(cluster, "table", inpString);
+        StringWriter writer = new StringWriter();
+        if (cluster.getExecType().name().equals("TEZ")) {
+            Util.createLogAppender("testNoCombinerInReducer", writer, 
Class.forName("org.apache.pig.backend.hadoop.executionengine.tez.TezDagBuilder"));
+        }
 
         pigServer.registerQuery("a = LOAD 'table' AS (i:int);");
         pigServer.registerQuery("b = group a ALL;");
@@ -1076,6 +1081,10 @@ public class TestEvalPipeline {
             Assert.assertTrue(DataType.compare(expectedBag.size(), 
resultBagSize) == 0);
         }
 
+        if (cluster.getExecType().name().equals("TEZ")) {
+            Assert.assertTrue(writer.toString().contains("Turning off combiner 
in reducer"));
+            Util.removeLogAppender("testNoCombinerInReducer", 
Class.forName("org.apache.pig.backend.hadoop.executionengine.tez.TezDagBuilder"));
+        }
         Util.deleteFile(cluster, "table");
     }
 


Reply via email to