Author: xuefu
Date: Fri Dec 18 17:03:16 2015
New Revision: 1720826
URL: http://svn.apache.org/viewvc?rev=1720826&view=rev
Log:
PIG-4675: Operators with multiple predecessors fail under multiquery
optimization (Liyun via Xuefu)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1720826&r1=1720825&r2=1720826&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Fri Dec 18 17:03:16 2015
@@ -569,7 +569,7 @@ public class SparkLauncher extends Launc
addUDFJarsToSparkJobWorkingDirectory(sparkOperator);
List<SparkOperator> predecessors = sparkPlan
.getPredecessors(sparkOperator);
- List<RDD<Tuple>> predecessorRDDs = Lists.newArrayList();
+ Set<OperatorKey> predecessorOfPreviousSparkOp = new
HashSet<OperatorKey>();
if (predecessors != null) {
for (SparkOperator pred : predecessors) {
if (sparkOpRdds.get(pred.getOperatorKey()) == null) {
@@ -577,7 +577,7 @@ public class SparkLauncher extends Launc
physicalOpRdds, convertMap, seenJobIDs, sparkStats,
conf);
}
- predecessorRDDs.add(sparkOpRdds.get(pred.getOperatorKey()));
+ predecessorOfPreviousSparkOp.add(pred.getOperatorKey());
}
}
@@ -594,8 +594,8 @@ public class SparkLauncher extends Launc
}
for (PhysicalOperator leafPO : leafPOs) {
try {
- physicalToRDD(sparkOperator.physicalPlan, leafPO,
physicalOpRdds,
- predecessorRDDs, convertMap);
+ physicalToRDD(sparkOperator, sparkOperator.physicalPlan,
leafPO, physicalOpRdds,
+ predecessorOfPreviousSparkOp, convertMap);
sparkOpRdds.put(sparkOperator.getOperatorKey(),
physicalOpRdds.get(leafPO.getOperatorKey()));
} catch (Exception e) {
@@ -626,34 +626,38 @@ public class SparkLauncher extends Launc
}
}
- private void physicalToRDD(PhysicalPlan plan,
+ private void physicalToRDD(SparkOperator sparkOperator, PhysicalPlan plan,
PhysicalOperator physicalOperator,
Map<OperatorKey, RDD<Tuple>> rdds,
- List<RDD<Tuple>> rddsFromPredeSparkOper,
+ Set<OperatorKey> predsFromPreviousSparkOper,
Map<Class<? extends PhysicalOperator>,
RDDConverter> convertMap)
throws IOException {
RDD<Tuple> nextRDD = null;
- List<PhysicalOperator> predecessors = plan
+ List<PhysicalOperator> predecessorsOfCurrentPhysicalOp = plan
.getPredecessors(physicalOperator);
- if (predecessors != null && predecessors.size() > 1) {
- Collections.sort(predecessors);
+ if (predecessorsOfCurrentPhysicalOp != null &&
predecessorsOfCurrentPhysicalOp.size() > 1) {
+ Collections.sort(predecessorsOfCurrentPhysicalOp);
}
- List<RDD<Tuple>> predecessorRdds = Lists.newArrayList();
- if (predecessors != null) {
- for (PhysicalOperator predecessor : predecessors) {
- physicalToRDD(plan, predecessor, rdds, rddsFromPredeSparkOper,
+ Set<OperatorKey> operatorKeysOfAllPreds = new HashSet<OperatorKey>();
+ addPredsFromPrevoiousSparkOp(sparkOperator, physicalOperator,
operatorKeysOfAllPreds);
+ if (predecessorsOfCurrentPhysicalOp != null) {
+ for (PhysicalOperator predecessor :
predecessorsOfCurrentPhysicalOp) {
+ physicalToRDD(sparkOperator, plan, predecessor, rdds,
predsFromPreviousSparkOper,
convertMap);
- predecessorRdds.add(rdds.get(predecessor.getOperatorKey()));
+ operatorKeysOfAllPreds.add(predecessor.getOperatorKey());
}
} else {
- if (rddsFromPredeSparkOper != null
- && rddsFromPredeSparkOper.size() > 0) {
- predecessorRdds.addAll(rddsFromPredeSparkOper);
+ if (predsFromPreviousSparkOper != null
+ && predsFromPreviousSparkOper.size() > 0) {
+ for (OperatorKey predFromPreviousSparkOper :
predsFromPreviousSparkOper) {
+ operatorKeysOfAllPreds.add(predFromPreviousSparkOper);
+ }
}
}
+
if (physicalOperator instanceof POSplit) {
List<PhysicalPlan> successorPlans = ((POSplit)
physicalOperator).getPlans();
for (PhysicalPlan successPlan : successorPlans) {
@@ -663,7 +667,7 @@ public class SparkLauncher extends Launc
break;
}
PhysicalOperator leafOfSuccessPlan =
leavesOfSuccessPlan.get(0);
- physicalToRDD(successPlan, leafOfSuccessPlan, rdds,
predecessorRdds, convertMap);
+ physicalToRDD(sparkOperator, successPlan, leafOfSuccessPlan,
rdds, operatorKeysOfAllPreds, convertMap);
}
} else {
RDDConverter converter =
convertMap.get(physicalOperator.getClass());
@@ -675,7 +679,8 @@ public class SparkLauncher extends Launc
LOG.info("Converting operator "
+ physicalOperator.getClass().getSimpleName() + " "
+ physicalOperator);
- nextRDD = converter.convert(predecessorRdds, physicalOperator);
+ List<RDD<Tuple>> allPredRDDs =
sortPredecessorRDDs(operatorKeysOfAllPreds, rdds);
+ nextRDD = converter.convert(allPredRDDs, physicalOperator);
if (nextRDD == null) {
throw new IllegalArgumentException(
@@ -687,6 +692,30 @@ public class SparkLauncher extends Launc
}
}
+ //get all rdds of predecessors sorted by the OperatorKey
+ private List<RDD<Tuple>> sortPredecessorRDDs(Set<OperatorKey>
operatorKeysOfAllPreds, Map<OperatorKey, RDD<Tuple>> rdds) {
+ List<RDD<Tuple>> predecessorRDDs = Lists.newArrayList();
+ List<OperatorKey> operatorKeyOfAllPreds =
Lists.newArrayList(operatorKeysOfAllPreds);
+ Collections.sort(operatorKeyOfAllPreds);
+ for (OperatorKey operatorKeyOfAllPred : operatorKeyOfAllPreds) {
+ predecessorRDDs.add(rdds.get(operatorKeyOfAllPred));
+ }
+ return predecessorRDDs;
+ }
+
+ //deal special cases containing operators with multiple predecessors when
multiquery is enabled to get the predecessors of specified
+ // physicalOp in previous SparkOp(see PIG-4675)
+ private void addPredsFromPrevoiousSparkOp(SparkOperator sparkOperator,
PhysicalOperator physicalOperator, Set<OperatorKey> operatorKeysOfPredecessors)
{
+ // the relationship is stored in
sparkOperator.getMultiQueryOptimizeConnectionItem()
+ List<OperatorKey> predOperatorKeys =
sparkOperator.getMultiQueryOptimizeConnectionItem().get(physicalOperator.getOperatorKey());
+ if (predOperatorKeys != null) {
+ for (OperatorKey predOperator : predOperatorKeys) {
+ LOG.debug(String.format("add predecessor(OperatorKey:%s) for
OperatorKey:%s", predOperator, physicalOperator.getOperatorKey()));
+ operatorKeysOfPredecessors.add(predOperator);
+ }
+ }
+ }
+
@Override
public void explain(PhysicalPlan pp, PigContext pc, PrintStream ps,
String format, boolean verbose) throws IOException {
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java?rev=1720826&r1=1720825&r2=1720826&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java
Fri Dec 18 17:03:16 2015
@@ -20,6 +20,8 @@ package org.apache.pig.backend.hadoop.ex
import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -38,6 +40,9 @@ import org.apache.pig.impl.plan.VisitorE
* MultiQueryOptimizer for spark
*/
public class MultiQueryOptimizerSpark extends SparkOpPlanVisitor {
+
+ private static final Log LOG =
LogFactory.getLog(MultiQueryOptimizerSpark.class);
+
private String scope;
private NodeIdGenerator nig;
@@ -104,18 +109,27 @@ public class MultiQueryOptimizerSpark ex
POStore poStore = null;
if (firstNodeLeaf != null && firstNodeLeaf instanceof POStore)
{
poStore = (POStore) firstNodeLeaf;
+ PhysicalOperator predOfPoStore =
sparkOp.physicalPlan.getPredecessors(poStore).get(0);
sparkOp.physicalPlan.remove(poStore); // remove
unnecessary store
- POSplit split = getSplit();
+ POSplit poSplit = createSplit();
ArrayList<SparkOperator> spliteesCopy = new ArrayList
<SparkOperator>(splittees);
for (SparkOperator splitee : spliteesCopy) {
- List<PhysicalOperator> firstNodeRoots =
splitee.physicalPlan.getRoots();
- for (int i = 0; i < firstNodeRoots.size(); i++) {
- if (firstNodeRoots.get(i) instanceof POLoad) {
- POLoad poLoad = (POLoad) firstNodeRoots.get(i);
+ List<PhysicalOperator> rootsOfSplitee =
splitee.physicalPlan.getRoots();
+ for (int i = 0; i < rootsOfSplitee.size(); i++) {
+ if (rootsOfSplitee.get(i) instanceof POLoad) {
+ POLoad poLoad = (POLoad) rootsOfSplitee.get(i);
if
(poLoad.getLFile().getFileName().equals(poStore.getSFile().getFileName())) {
+ List<PhysicalOperator> successorsOfPoLoad
= splitee.physicalPlan.getSuccessors(poLoad);
+ List<PhysicalOperator>
successorofPoLoadsCopy = new ArrayList<PhysicalOperator>(successorsOfPoLoad);
splitee.physicalPlan.remove(poLoad); //
remove unnecessary load
- split.addPlan(splitee.physicalPlan);
+ for (PhysicalOperator successorOfPoLoad :
successorofPoLoadsCopy) {
+ //we store from to relationship in
SparkOperator#multiQueryOptimizeConnectionMap
+
sparkOp.addMultiQueryOptimizeConnectionItem(successorOfPoLoad.getOperatorKey(),
predOfPoStore.getOperatorKey());
+ LOG.debug(String.format("add
multiQueryOptimize connection item: to:%s, from:%s for %s",
+ successorOfPoLoad.toString(),
predOfPoStore.getOperatorKey().toString(), splitee.getOperatorKey()));
+ }
+ poSplit.addPlan(splitee.physicalPlan);
addSubPlanPropertiesToParent(sparkOp,
splitee);
removeSplittee(getPlan(), sparkOp,
splitee);
}
@@ -123,7 +137,7 @@ public class MultiQueryOptimizerSpark ex
}
}
- sparkOp.physicalPlan.addAsLeaf(split);
+ sparkOp.physicalPlan.addAsLeaf(poSplit);
}
}
} catch (PlanException e) {
@@ -145,7 +159,7 @@ public class MultiQueryOptimizerSpark ex
getPlan().remove(splittee);
}
- private POSplit getSplit() {
+ private POSplit createSplit() {
return new POSplit(new OperatorKey(scope, nig.getNextNodeId(scope)));
}
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java?rev=1720826&r1=1720825&r2=1720826&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
Fri Dec 18 17:03:16 2015
@@ -28,6 +28,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.impl.plan.Operator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.MultiMap;
/**
* An operator model for a Spark job. Acts as a host to the plans that will
@@ -81,12 +82,14 @@ public class SparkOperator extends Opera
private List<String> crossKeys = null;
- public SparkOperator(OperatorKey k) {
- super(k);
- physicalPlan = new PhysicalPlan();
- UDFs = new HashSet<String>();
- scalars = new HashSet<PhysicalOperator>();
- }
+ private MultiMap<OperatorKey, OperatorKey> multiQueryOptimizeConnectionMap
= new MultiMap<OperatorKey, OperatorKey>();
+
+ public SparkOperator(OperatorKey k) {
+ super(k);
+ physicalPlan = new PhysicalPlan();
+ UDFs = new HashSet<String>();
+ scalars = new HashSet<PhysicalOperator>();
+ }
@Override
public boolean supportsMultipleInputs() {
@@ -264,4 +267,14 @@ public class SparkOperator extends Opera
public void setRequestedParallelismByReference(SparkOperator oper) {
this.requestedParallelism = oper.requestedParallelism;
}
+
+ //If enable multiquery optimizer, in some cases, the predecessor(from) of
a physicalOp(to) will be the leaf physicalOperator of
+ //previous sparkOperator.More detail see PIG-4675
+ public void addMultiQueryOptimizeConnectionItem(OperatorKey to,
OperatorKey from) {
+ multiQueryOptimizeConnectionMap.put(to, from);
+ }
+
+ public MultiMap<OperatorKey, OperatorKey>
getMultiQueryOptimizeConnectionItem() {
+ return multiQueryOptimizeConnectionMap;
+ }
}