[
https://issues.apache.org/jira/browse/PIG-4594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14613163#comment-14613163
]
liyunzhang_intel commented on PIG-4594:
---------------------------------------
[~mohitsabharwal]:
Let 's make an example to explain why need to add PhysicalPlan#forceConnect and
OperatorPlan#forceConnect.
cat bin/testMultiQueryJiraPig983_2.pig
{code}
a = load './passwd' using PigStorage(':') as (uname:chararray,
passwd:chararray, uid:int, gid:int);
b = filter a by uid < 5;
c = filter a by uid >= 5;
d = join b by uname, c by uname;
{code}
{code}
#--------------------------------------------------
# Spark Plan
#--------------------------------------------------
Spark node scope-67
Store(hdfs://zly1.sh.intel.com:8020/tmp/temp-1052928641/tmp1820070054:org.apache.pig.impl.io.InterStorage)
- scope-68
|
|---a: New For Each(false,false,false,false)[bag] - scope-13
| |
| Cast[chararray] - scope-2
| |
| |---Project[bytearray][0] - scope-1
| |
| Cast[chararray] - scope-5
| |
| |---Project[bytearray][1] - scope-4
| |
| Cast[int] - scope-8
| |
| |---Project[bytearray][2] - scope-7
| |
| Cast[int] - scope-11
| |
| |---Project[bytearray][3] - scope-10
|
|---a: Load(hdfs://zly1.sh.intel.com:8020/user/root/passwd:PigStorage(':'))
- scope-0--------
Spark node scope-73
Store(hdfs://zly1.sh.intel.com:8020/tmp/temp-1052928641/tmp-2075734880:org.apache.pig.impl.io.InterStorage)
- scope-74
|
|---d: New For Each(true,true)[tuple] - scope-37
| |
| Project[bag][1] - scope-35
| |
| Project[bag][2] - scope-36
|
|---d: Package(Packager)[tuple]{chararray} - scope-30
|
|---d: Global Rearrange[tuple] - scope-29
|
|---d: Local Rearrange[tuple]{chararray}(false) - scope-31
| | |
| | Project[chararray][0] - scope-32
| |
| |---b: Filter[bag] - scope-17
| | |
| | Less Than[boolean] - scope-20
| | |
| | |---Project[int][2] - scope-18
| | |
| | |---Constant(5) - scope-19
| |
|
|---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp-1052928641/tmp1820070054:org.apache.pig.impl.io.InterStorage)
- scope-69
|
|---d: Local Rearrange[tuple]{chararray}(false) - scope-33
| |
| Project[chararray][0] - scope-34
|
|---c: Filter[bag] - scope-23
| |
| Greater Than or Equal[boolean] - scope-26
| |
| |---Project[int][2] - scope-24
| |
| |---Constant(5) - scope-25
|
|---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp-1052928641/tmp1820070054:org.apache.pig.impl.io.InterStorage)
- scope-71--------
{code}
If multiquery optimization is enabled, SparkOperator(scope-67) and
SparkOperator(scope-73) need to be merged. remove scope-68,
scope-69,scope-71.Connect scope-13 and scope-17 and scope-13 and scope-23.
When you use
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan#connect
not
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan#forceConnect
in
org.apache.pig.backend.hadoop.executionengine.spark.optimizer.MultiQueryOptimizerSpark#visitSparkOp.
After scope-13 and scope-17 are connected, if we connect scope-13 and
scope-13,exception will be thrown out because
scope-13(POForEach#supportsMultipleOutputs() is false. The exception info:
{code}
org.apache.pig.impl.plan.VisitorException: ERROR 0:
org.apache.pig.impl.plan.PlanException: ERROR 0: Attempt to give operator of
type org.apache.p
ig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach
multiple outputs. This operator does not support multiple outputs.
628 at
org.apache.pig.backend.hadoop.executionengine.spark.optimizer.MultiQueryOptimizerSpark.visitSparkOp(MultiQueryOptimizerSpark.java:131)
{code}
org.apache.pig.backend.hadoop.executionengine.spark.optimizer.MultiQueryOptimizerSpark#visitSparkOp
{code}
@Override
public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
…..
if (splittees.size() == 1) {
// We don't need a POSplit here, we can merge the splittee into
spliter
SparkOperator singleSplitee = splittees.get(0);
POStore poStore = null;
PhysicalOperator firstNodeLeaf =
sparkOp.physicalPlan.getLeaves().get(0);
if (firstNodeLeaf instanceof POStore) {
poStore = (POStore) firstNodeLeaf;
}
PhysicalOperator firstNodeLeafPred =
sparkOp.physicalPlan.getPredecessors(firstNodeLeaf).get(0);
sparkOp.physicalPlan.remove(poStore); // remove unnecessary
store
List<PhysicalOperator> firstNodeRoots =
singleSplitee.physicalPlan.getRoots();
sparkOp.physicalPlan.merge(singleSplitee.physicalPlan);
for (int j = 0; j < firstNodeRoots.size(); j++) {
PhysicalOperator firstNodeRoot = firstNodeRoots.get(j);
POLoad poLoad = null;
if (firstNodeRoot instanceof POLoad && poStore != null) {
poLoad = (POLoad) firstNodeRoot;
if
(poLoad.getLFile().getFileName().equals(poStore.getSFile().getFileName())) {
PhysicalOperator firstNodeRootSucc =
sparkOp.physicalPlan.getSuccessors(firstNodeRoot).get(0);
sparkOp.physicalPlan.remove(poLoad); // remove
unnecessary load
sparkOp.physicalPlan.forceConnect(firstNodeLeafPred, firstNodeRootSucc); // If
use sparkOp.physicalPlan.connect(firstNodeLeafPred,firstNodeRootSucc);.it will
throw exception:POForEach multiple outputs. This operator does not support
multiple outputs.
}
}
}
addSubPlanPropertiesToParent(sparkOp, singleSplitee);
removeSplittee(getPlan(), sparkOp, singleSplitee);
} else {
…..
}
{code}
> Enable "TestMultiQuery" in spark mode
> -------------------------------------
>
> Key: PIG-4594
> URL: https://issues.apache.org/jira/browse/PIG-4594
> Project: Pig
> Issue Type: Sub-task
> Components: spark
> Reporter: liyunzhang_intel
> Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-4594.patch, PIG-4594_1.patch
>
>
> in https://builds.apache.org/job/Pig-spark/211/#showFailuresLink,it shows
> that
> following unit test failures fail:
> org.apache.pig.test.TestMultiQuery.testMultiQueryJiraPig1068
> org.apache.pig.test.TestMultiQuery.testMultiQueryJiraPig1157
> org.apache.pig.test.TestMultiQuery.testMultiQueryJiraPig1252
> org.apache.pig.test.TestMultiQuery.testMultiQueryJiraPig1438
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)