Author: rohini
Date: Tue Mar 22 10:39:53 2016
New Revision: 1736179
URL: http://svn.apache.org/viewvc?rev=1736179&view=rev
Log:
PIG-4843: Turn off combiner in reducer vertex for Tez if bags are in combine
plan (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/conf/pig.properties
pig/trunk/src/org/apache/pig/PigConfiguration.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1736179&r1=1736178&r2=1736179&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Mar 22 10:39:53 2016
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4843: Turn off combiner in reducer vertex for Tez if bags are in combine
plan (rohini)
+
PIG-4796: Authenticate with Kerberos using a keytab file (nielsbasjes via
daijy)
PIG-4817: Bump HTTP Logparser to version 2.4 (nielsbasjes via daijy)
Modified: pig/trunk/conf/pig.properties
URL:
http://svn.apache.org/viewvc/pig/trunk/conf/pig.properties?rev=1736179&r1=1736178&r2=1736179&view=diff
==============================================================================
--- pig/trunk/conf/pig.properties (original)
+++ pig/trunk/conf/pig.properties Tue Mar 22 10:39:53 2016
@@ -331,6 +331,15 @@
#
# pig.exec.nocombiner=false
+# Enable or disable use of combiners only in reducer shuffle-merge phase.
+# pig.exec.nocombiner turns off combiner for both map and reduce phases.
+# Valid values are auto, true or false. Default is auto in which Pig turns off
combiner
+# on per combine plan basis when bags are present in a particular plan.
+# Value of true or false will apply to all combine plans in the script.
+# Currently this only applies to Tez as Mapreduce does not run combiners in
reducer (MAPREDUCE-5221).
+
+# pig.exec.nocombiner.reducer=auto
+
# EXPERIMENTAL: Aggregate records in map task before sending to the combiner?
# (default: false, 10; recommended: true, 10). In cases where there is a
massive
# reduction of data in the aggregation step, pig can do a first pass of
Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1736179&r1=1736178&r2=1736179&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Tue Mar 22 10:39:53 2016
@@ -94,6 +94,13 @@ public class PigConfiguration {
public static final String PIG_EXEC_NO_COMBINER = "pig.exec.nocombiner";
/**
+ * Enable or disable use of combiners in reducer shuffle-merge phase in
Tez.
+ * Valid values are auto, true or false.
+ * Default is auto which turns off combiner if bags are present in the
combine plan
+ */
+ public static final String PIG_EXEC_NO_COMBINER_REDUCER =
"pig.exec.nocombiner.reducer";
+
+ /**
* This key controls whether secondary sort key is used for optimization
in case
* of nested distinct or sort
*/
@@ -328,7 +335,7 @@ public class PigConfiguration {
* Set the threshold for percentage of errors
*/
public static final String PIG_ERROR_THRESHOLD_PERCENT =
"pig.error.threshold.percent";
-
+
/**
* Comma-delimited entries of commands/operators that must be disallowed.
* This is a security feature to be used by administrators to block use of
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1736179&r1=1736178&r2=1736179&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
Tue Mar 22 10:39:53 2016
@@ -487,6 +487,11 @@ public class POUserFunc extends Expressi
}
}
+ public Type getOriginalReturnType() throws ExecException {
+ instantiateFunc(origFSpec);
+ return func.getReturnType();
+ }
+
public Type getReturnType() {
return func.getReturnType();
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1736179&r1=1736178&r2=1736179&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
Tue Mar 22 10:39:53 2016
@@ -103,6 +103,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor;
import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
import org.apache.pig.backend.hadoop.executionengine.tez.util.SecurityHelper;
+import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
import
org.apache.pig.backend.hadoop.executionengine.tez.util.TezUDFContextSeparator;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
@@ -507,8 +508,28 @@ public class TezDagBuilder extends TezOp
edge.partitionerClass.getName());
}
- in.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
- out.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
+ UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
+ out.setUserPayload(payLoad);
+
+ if (!combinePlan.isEmpty()) {
+ boolean noCombineInReducer = false;
+ String reducerNoCombiner =
globalConf.get(PigConfiguration.PIG_EXEC_NO_COMBINER_REDUCER);
+ if (reducerNoCombiner == null || reducerNoCombiner.equals("auto"))
{
+ noCombineInReducer =
TezCompilerUtil.bagDataTypeInCombinePlan(combinePlan);
+ } else {
+ noCombineInReducer = Boolean.parseBoolean(reducerNoCombiner);
+ }
+ if (noCombineInReducer) {
+ log.info("Turning off combiner in reducer vertex " +
to.getOperatorKey() + " for edge from " + from.getOperatorKey());
+ conf.unset(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS);
+ conf.unset(MRJobConfig.COMBINE_CLASS_ATTR);
+ conf.unset("pig.combinePlan");
+ conf.unset("pig.combine.package");
+ conf.unset("pig.map.keytype");
+ payLoad = TezUtils.createUserPayloadFromConf(conf);
+ }
+ }
+ in.setUserPayload(payLoad);
if (edge.dataMovementType!=DataMovementType.BROADCAST &&
to.getEstimatedParallelism()!=-1 && to.getVertexParallelism()==-1 &&
(to.isGlobalSort()||to.isSkewedJoin())) {
// Use custom edge
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1736179&r1=1736178&r2=1736179&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
Tue Mar 22 10:39:53 2016
@@ -18,6 +18,7 @@
package org.apache.pig.backend.hadoop.executionengine.tez.util;
import java.io.IOException;
+import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
@@ -42,6 +43,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
import org.apache.pig.builtin.RoundRobinPartitioner;
+import org.apache.pig.builtin.TOBAG;
import org.apache.pig.data.DataType;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
@@ -273,4 +275,28 @@ public class TezCompilerUtil {
edge.setIntermediateOutputValueClass(TUPLE_CLASS);
}
+ public static boolean bagDataTypeInCombinePlan(PhysicalPlan combinePlan)
throws ExecException {
+ PhysicalOperator lr = combinePlan.getLeaves().get(0);
+ POForEach fe = (POForEach) combinePlan.getPredecessors(lr).get(0);
+
+ // Hack. class.getTypeName() is only available in JDK8
+ Type dataBagType = new TOBAG().getReturnType();
+
+ List<PhysicalPlan> inputPlans = fe.getInputPlans();
+ for (PhysicalPlan inputPlan: inputPlans) {
+ PhysicalOperator leaf = inputPlan.getLeaves().get(0);
+ if (leaf.getResultType() == DataType.BAG) {
+ return true;
+ } else if (leaf instanceof POUserFunc) {
+ POUserFunc func = (POUserFunc) leaf;
+ // Return type of Intermediate func in combiner plan is always
Tuple.
+ // Need to check original or Final EvalFunc return type
+ if (dataBagType.equals(func.getOriginalReturnType())) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
}
Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java?rev=1736179&r1=1736178&r2=1736179&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java Tue Mar 22
10:39:53 2016
@@ -22,6 +22,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.io.PrintWriter;
+import java.io.StringWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -1057,6 +1058,10 @@ public class TestEvalPipeline {
}
Util.createInputFile(cluster, "table", inpString);
+ StringWriter writer = new StringWriter();
+ if (cluster.getExecType().name().equals("TEZ")) {
+ Util.createLogAppender("testNoCombinerInReducer", writer,
Class.forName("org.apache.pig.backend.hadoop.executionengine.tez.TezDagBuilder"));
+ }
pigServer.registerQuery("a = LOAD 'table' AS (i:int);");
pigServer.registerQuery("b = group a ALL;");
@@ -1076,6 +1081,10 @@ public class TestEvalPipeline {
Assert.assertTrue(DataType.compare(expectedBag.size(),
resultBagSize) == 0);
}
+ if (cluster.getExecType().name().equals("TEZ")) {
+ Assert.assertTrue(writer.toString().contains("Turning off combiner
in reducer"));
+ Util.removeLogAppender("testNoCombinerInReducer",
Class.forName("org.apache.pig.backend.hadoop.executionengine.tez.TezDagBuilder"));
+ }
Util.deleteFile(cluster, "table");
}