Author: zly Date: Wed Apr 19 08:08:00 2017 New Revision: 1791870 URL: http://svn.apache.org/viewvc?rev=1791870&view=rev Log: PIG-5212:SkewedJoin_6 is failing on Spark(Liyun)
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1791870&r1=1791869&r2=1791870&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java Wed Apr 19 08:08:00 2017 @@ -264,9 +264,10 @@ public class SparkCompiler extends PhyPl for (PhysicalOperator pred : predecessors) { if (pred instanceof POSplit && splitsSeen.containsKey(pred.getOperatorKey())) { + POSplit split = (POSplit) pred; compiledInputs[++i] = startNew( - ((POSplit) pred).getSplitStore(), - splitsSeen.get(pred.getOperatorKey())); + split.getSplitStore(), + splitsSeen.get(pred.getOperatorKey()), null); continue; } compile(pred); @@ -321,9 +322,16 @@ public class SparkCompiler extends PhyPl } } - private SparkOperator startNew(FileSpec fSpec, SparkOperator old) - throws PlanException { - POLoad ld = getLoad(); + /** + * @param fSpec + * @param old + * @param operatorKey: If operatorKey is not null, we assign the operatorKey to POLoad in the new SparkOperator + * ,otherwise the operatorKey of POLoad will be created by the program. Detail see PIG-5212 + * @return + * @throws PlanException + */ + private SparkOperator startNew(FileSpec fSpec, SparkOperator old, OperatorKey operatorKey) throws PlanException { + POLoad ld = getLoad(operatorKey); ld.setLFile(fSpec); SparkOperator ret = getSparkOp(); ret.add(ld); @@ -332,21 +340,31 @@ public class SparkCompiler extends PhyPl return ret; } - private POLoad getLoad() { - POLoad ld = new POLoad(new OperatorKey(scope, nig.getNextNodeId(scope))); + private POLoad getLoad(OperatorKey operatorKey) { + POLoad ld = null; + if (operatorKey != null) { + ld = new POLoad(operatorKey); + } else { + ld = new POLoad(new OperatorKey(scope, nig.getNextNodeId(scope))); + } ld.setPc(pigContext); ld.setIsTmpLoad(true); return ld; } - + @Override public void visitSplit(POSplit op) throws VisitorException { try { + List<PhysicalOperator> preds = this.physicalPlan.getPredecessors(op); + OperatorKey predOperatorKey = null; + if (preds != null && preds.size() > 0) { + predOperatorKey = preds.get(0).getOperatorKey(); + } FileSpec fSpec = op.getSplitStore(); SparkOperator sparkOp = endSingleInputPlanWithStr(fSpec); sparkOp.setSplitter(true); splitsSeen.put(op.getOperatorKey(), sparkOp); - curSparkOp = startNew(fSpec, sparkOp); + curSparkOp = startNew(fSpec, sparkOp, predOperatorKey); phyToSparkOpMap.put(op, curSparkOp); } catch (Exception e) { int errCode = 2034; @@ -1151,7 +1169,7 @@ public class SparkCompiler extends PhyPl } indexerArgs[2] = ObjectSerializer.serialize(phyPlan); - POLoad idxJobLoader = getLoad(); + POLoad idxJobLoader = getLoad(null); idxJobLoader.setLFile(new FileSpec(origLoaderFileSpec.getFileName(), new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs))); indexerSparkOp.physicalPlan.add(idxJobLoader); @@ -1225,7 +1243,7 @@ public class SparkCompiler extends PhyPl FileSpec lFile, FileSpec quantFile, int rp, Pair<POProject, Byte>[] fields) throws PlanException { - SparkOperator sparkOper = startNew(lFile, quantJob); + SparkOperator sparkOper = startNew(lFile, quantJob, null); List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>(); byte keyType = DataType.UNKNOWN; if (fields == null) {