Author: zly
Date: Wed Apr 19 08:08:00 2017
New Revision: 1791870

URL: http://svn.apache.org/viewvc?rev=1791870&view=rev
Log:
PIG-5212:SkewedJoin_6 is failing on Spark(Liyun)

Modified:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1791870&r1=1791869&r2=1791870&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
 Wed Apr 19 08:08:00 2017
@@ -264,9 +264,10 @@ public class SparkCompiler extends PhyPl
             for (PhysicalOperator pred : predecessors) {
                 if (pred instanceof POSplit
                         && splitsSeen.containsKey(pred.getOperatorKey())) {
+                    POSplit split = (POSplit) pred;
                     compiledInputs[++i] = startNew(
-                            ((POSplit) pred).getSplitStore(),
-                            splitsSeen.get(pred.getOperatorKey()));
+                            split.getSplitStore(),
+                            splitsSeen.get(pred.getOperatorKey()), null);
                     continue;
                 }
                 compile(pred);
@@ -321,9 +322,16 @@ public class SparkCompiler extends PhyPl
         }
     }
 
-    private SparkOperator startNew(FileSpec fSpec, SparkOperator old)
-            throws PlanException {
-        POLoad ld = getLoad();
+    /**
+     * @param fSpec
+     * @param old
+     * @param operatorKey: If operatorKey is not null, we assign the 
operatorKey to POLoad in the new SparkOperator
+     *                     ,otherwise the operatorKey of POLoad will be 
created by the program. Detail see PIG-5212
+     * @return
+     * @throws PlanException
+     */
+    private SparkOperator startNew(FileSpec fSpec, SparkOperator old, 
OperatorKey operatorKey) throws PlanException {
+        POLoad ld = getLoad(operatorKey);
         ld.setLFile(fSpec);
         SparkOperator ret = getSparkOp();
         ret.add(ld);
@@ -332,21 +340,31 @@ public class SparkCompiler extends PhyPl
         return ret;
     }
 
-    private POLoad getLoad() {
-        POLoad ld = new POLoad(new OperatorKey(scope, 
nig.getNextNodeId(scope)));
+    private POLoad getLoad(OperatorKey operatorKey) {
+        POLoad ld = null;
+        if (operatorKey != null) {
+            ld = new POLoad(operatorKey);
+        } else {
+            ld = new POLoad(new OperatorKey(scope, nig.getNextNodeId(scope)));
+        }
         ld.setPc(pigContext);
         ld.setIsTmpLoad(true);
         return ld;
     }
-
+        
     @Override
     public void visitSplit(POSplit op) throws VisitorException {
         try {
+            List<PhysicalOperator> preds = 
this.physicalPlan.getPredecessors(op);
+            OperatorKey predOperatorKey = null;
+            if (preds != null && preds.size() > 0) {
+                predOperatorKey = preds.get(0).getOperatorKey();
+            }
             FileSpec fSpec = op.getSplitStore();
             SparkOperator sparkOp = endSingleInputPlanWithStr(fSpec);
             sparkOp.setSplitter(true);
             splitsSeen.put(op.getOperatorKey(), sparkOp);
-            curSparkOp = startNew(fSpec, sparkOp);
+            curSparkOp = startNew(fSpec, sparkOp, predOperatorKey);
             phyToSparkOpMap.put(op, curSparkOp);
         } catch (Exception e) {
             int errCode = 2034;
@@ -1151,7 +1169,7 @@ public class SparkCompiler extends PhyPl
         }
         indexerArgs[2] = ObjectSerializer.serialize(phyPlan);
 
-        POLoad idxJobLoader = getLoad();
+        POLoad idxJobLoader = getLoad(null);
         idxJobLoader.setLFile(new FileSpec(origLoaderFileSpec.getFileName(),
                 new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs)));
         indexerSparkOp.physicalPlan.add(idxJobLoader);
@@ -1225,7 +1243,7 @@ public class SparkCompiler extends PhyPl
             FileSpec lFile,
             FileSpec quantFile,
             int rp, Pair<POProject, Byte>[] fields) throws PlanException {
-        SparkOperator sparkOper = startNew(lFile, quantJob);
+        SparkOperator sparkOper = startNew(lFile, quantJob, null);
         List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
         byte keyType = DataType.UNKNOWN;
         if (fields == null) {


Reply via email to