Author: xuefu
Date: Thu Mar 31 02:50:34 2016
New Revision: 1737174
URL: http://svn.apache.org/viewvc?rev=1737174&view=rev
Log:
PIG-4848: pig.noSplitCombination=true should always be set internally for a
merge join (Xianda via Xuefu)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1737174&r1=1737173&r2=1737174&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
Thu Mar 31 02:50:34 2016
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.tools.pigstats.spark.SparkCounters;
import org.apache.pig.tools.pigstats.spark.SparkPigStatusReporter;
@@ -92,6 +93,12 @@ public class LoadConverter implements RD
jobConf.set("mapreduce.input.fileinputformat.inputdir",
op.getLFile().getFileName());
+ // internally set pig.noSplitCombination as true ONLY for
+ // the POLoad operator which has POMergeJoin successor.
+ if (hasMergeJoinSuccessor(op)) {
+ jobConf.set("pig.noSplitCombination", "true");
+ }
+
RDD<Tuple2<Text, Tuple>> hadoopRDD = sparkContext.newAPIHadoopRDD(
jobConf, PigInputFormatSpark.class, Text.class, Tuple.class);
@@ -218,4 +225,20 @@ public class LoadConverter implements RD
return jobConf;
}
+ private static boolean hasMergeJoinSuccessor(PhysicalOperator op) {
+ List<PhysicalOperator> successors =
op.getParentPlan().getSuccessors(op);
+ if (successors == null ) {
+ return false;
+ }
+ for (PhysicalOperator successor : successors){
+ if (successor instanceof POMergeJoin){
+ return true;
+ }
+ if (hasMergeJoinSuccessor(successor)){
+ return true;
+ }
+ }
+ return false;
+ }
+
}