Author: zly
Date: Wed Apr 19 08:08:00 2017
New Revision: 1791870
URL: http://svn.apache.org/viewvc?rev=1791870&view=rev
Log:
PIG-5212:SkewedJoin_6 is failing on Spark(Liyun)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1791870&r1=1791869&r2=1791870&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
Wed Apr 19 08:08:00 2017
@@ -264,9 +264,10 @@ public class SparkCompiler extends PhyPl
for (PhysicalOperator pred : predecessors) {
if (pred instanceof POSplit
&& splitsSeen.containsKey(pred.getOperatorKey())) {
+ POSplit split = (POSplit) pred;
compiledInputs[++i] = startNew(
- ((POSplit) pred).getSplitStore(),
- splitsSeen.get(pred.getOperatorKey()));
+ split.getSplitStore(),
+ splitsSeen.get(pred.getOperatorKey()), null);
continue;
}
compile(pred);
@@ -321,9 +322,16 @@ public class SparkCompiler extends PhyPl
}
}
- private SparkOperator startNew(FileSpec fSpec, SparkOperator old)
- throws PlanException {
- POLoad ld = getLoad();
+ /**
+ * @param fSpec
+ * @param old
+ * @param operatorKey: If operatorKey is not null, we assign the
operatorKey to POLoad in the new SparkOperator
+ * ,otherwise the operatorKey of POLoad will be
created by the program. Detail see PIG-5212
+ * @return
+ * @throws PlanException
+ */
+ private SparkOperator startNew(FileSpec fSpec, SparkOperator old,
OperatorKey operatorKey) throws PlanException {
+ POLoad ld = getLoad(operatorKey);
ld.setLFile(fSpec);
SparkOperator ret = getSparkOp();
ret.add(ld);
@@ -332,21 +340,31 @@ public class SparkCompiler extends PhyPl
return ret;
}
- private POLoad getLoad() {
- POLoad ld = new POLoad(new OperatorKey(scope,
nig.getNextNodeId(scope)));
+ private POLoad getLoad(OperatorKey operatorKey) {
+ POLoad ld = null;
+ if (operatorKey != null) {
+ ld = new POLoad(operatorKey);
+ } else {
+ ld = new POLoad(new OperatorKey(scope, nig.getNextNodeId(scope)));
+ }
ld.setPc(pigContext);
ld.setIsTmpLoad(true);
return ld;
}
-
+
@Override
public void visitSplit(POSplit op) throws VisitorException {
try {
+ List<PhysicalOperator> preds =
this.physicalPlan.getPredecessors(op);
+ OperatorKey predOperatorKey = null;
+ if (preds != null && preds.size() > 0) {
+ predOperatorKey = preds.get(0).getOperatorKey();
+ }
FileSpec fSpec = op.getSplitStore();
SparkOperator sparkOp = endSingleInputPlanWithStr(fSpec);
sparkOp.setSplitter(true);
splitsSeen.put(op.getOperatorKey(), sparkOp);
- curSparkOp = startNew(fSpec, sparkOp);
+ curSparkOp = startNew(fSpec, sparkOp, predOperatorKey);
phyToSparkOpMap.put(op, curSparkOp);
} catch (Exception e) {
int errCode = 2034;
@@ -1151,7 +1169,7 @@ public class SparkCompiler extends PhyPl
}
indexerArgs[2] = ObjectSerializer.serialize(phyPlan);
- POLoad idxJobLoader = getLoad();
+ POLoad idxJobLoader = getLoad(null);
idxJobLoader.setLFile(new FileSpec(origLoaderFileSpec.getFileName(),
new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs)));
indexerSparkOp.physicalPlan.add(idxJobLoader);
@@ -1225,7 +1243,7 @@ public class SparkCompiler extends PhyPl
FileSpec lFile,
FileSpec quantFile,
int rp, Pair<POProject, Byte>[] fields) throws PlanException {
- SparkOperator sparkOper = startNew(lFile, quantJob);
+ SparkOperator sparkOper = startNew(lFile, quantJob, null);
List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
byte keyType = DataType.UNKNOWN;
if (fields == null) {