Author: xuefu
Date: Thu Mar 31 02:50:34 2016
New Revision: 1737174

URL: http://svn.apache.org/viewvc?rev=1737174&view=rev
Log:
PIG-4848: pig.noSplitCombination=true should always be set internally for a 
merge join (Xianda via Xuefu)

Modified:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1737174&r1=1737173&r2=1737174&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
 Thu Mar 31 02:50:34 2016
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.tools.pigstats.spark.SparkCounters;
 import org.apache.pig.tools.pigstats.spark.SparkPigStatusReporter;
@@ -92,6 +93,12 @@ public class LoadConverter implements RD
         jobConf.set("mapreduce.input.fileinputformat.inputdir",
                 op.getLFile().getFileName());
 
+        // internally set pig.noSplitCombination as true ONLY for
+        // the POLoad operator which has POMergeJoin successor.
+        if (hasMergeJoinSuccessor(op)) {
+            jobConf.set("pig.noSplitCombination", "true");
+        }
+
         RDD<Tuple2<Text, Tuple>> hadoopRDD = sparkContext.newAPIHadoopRDD(
                 jobConf, PigInputFormatSpark.class, Text.class, Tuple.class);
 
@@ -218,4 +225,20 @@ public class LoadConverter implements RD
         return jobConf;
     }
 
+    private static boolean hasMergeJoinSuccessor(PhysicalOperator op) {
+        List<PhysicalOperator> successors = 
op.getParentPlan().getSuccessors(op);
+        if (successors == null ) {
+            return false;
+        }
+        for (PhysicalOperator successor : successors){
+            if (successor instanceof POMergeJoin){
+                return true;
+            }
+            if (hasMergeJoinSuccessor(successor)){
+                return true;
+            }
+        }
+        return false;
+    }
+
 }


Reply via email to