[ 
https://issues.apache.org/jira/browse/PIG-4871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liyunzhang_intel updated PIG-4871:
----------------------------------
    Attachment: PIG-4871.patch

>  Not use OperatorPlan#forceConnect in MultiQueryOptimizationSpark
> -----------------------------------------------------------------
>
>                 Key: PIG-4871
>                 URL: https://issues.apache.org/jira/browse/PIG-4871
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>             Fix For: spark-branch
>
>         Attachments: PIG-4871.patch
>
>
> In current code base, we use OperatorPlan#forceConnect() while merge the 
> physical plan of spliter and splittee in MultiQueryOptimizationSpark.
> The difference between OperatorPlan#connect and OperatorPlan#forceConnect is 
> not checking whether support multiOutputs and multiInputs or not in 
> forceConnect.
> {code}
>  /**
>      * connect from and to and ignore some judgements: like ignoring judge 
> whether from operator supports multiOutputs
>      * and whether to operator supports multiInputs
>      *
>      * @param from Operator data will flow from.
>      * @param to   Operator data will flow to.
>      * @throws PlanException if connect from or to which is not in the plan
>      */
>     public void forceConnect(E from, E to) throws PlanException {
>         markDirty();
>         // Check that both nodes are in the plan.
>         checkInPlan(from);
>         checkInPlan(to);
>         mFromEdges.put(from, to);
>         mToEdges.put(to, from);
>     }
> {code}
> Let's use an example to explain why add forceConnect before.
> {code}
> before multiquery optimization:
> scope-102->scope-108 
> scope-108
> #--------------------------------------------------
> # Spark Plan                                  
> #--------------------------------------------------
> Spark node scope-102
> Store(hdfs://zly1.sh.intel.com:8020/tmp/temp1293488480/tmp128312889:org.apache.pig.impl.io.InterStorage)
>  - scope-103
> |
> |---A: New For Each(false,false,false)[bag] - scope-75
>     |   |
>     |   Cast[int] - scope-67
>     |   |
>     |   |---Project[bytearray][0] - scope-66
>     |   |
>     |   Cast[int] - scope-70
>     |   |
>     |   |---Project[bytearray][1] - scope-69
>     |   |
>     |   Cast[int] - scope-73
>     |   |
>     |   |---Project[bytearray][2] - scope-72
>     |
>     |---A: 
> Load(hdfs://zly1.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage)
>  - scope-65--------
> Spark node scope-108
> D: 
> Store(hdfs://zly1.sh.intel.com:8020/user/root/split5.out:org.apache.pig.builtin.PigStorage)
>  - scope-101
> |
> |---D: Filter[bag] - scope-97
>     |   |
>     |   Greater Than or Equal[boolean] - scope-100
>     |   |
>     |   |---Project[int][1] - scope-98
>     |   |
>     |   |---Project[int][4] - scope-99
>     |
>     |---C: New For Each(true,true)[tuple] - scope-96
>         |   |
>         |   Project[bag][1] - scope-94
>         |   |
>         |   Project[bag][2] - scope-95
>         |
>         |---C: Package(Packager)[tuple]{int} - scope-89
>             |
>             |---C: Global Rearrange[tuple] - scope-88
>                 |
>                 |---C: Local Rearrange[tuple]{int}(false) - scope-90
>                 |   |   |
>                 |   |   Project[int][0] - scope-91
>                 |   |
>                 |   
> |---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp1293488480/tmp128312889:org.apache.pig.impl.io.InterStorage)
>  - scope-104
>                 |
>                 |---C: Local Rearrange[tuple]{int}(false) - scope-92
>                     |   |
>                     |   Project[int][0] - scope-93
>                     |
>                     |---B: New For Each(false,false)[bag] - scope-85
>                         |   |
>                         |   Project[int][0] - scope-81
>                         |   |
>                         |   Project[int][1] - scope-83
>                         |
>                         
> |---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp1293488480/tmp128312889:org.apache.pig.impl.io.InterStorage)
>  - scope-106--------
> after multiquery optimization:
> scope-108
> #--------------------------------------------------
> # Spark Plan                                  
> #--------------------------------------------------
> Spark node scope-108
> D: 
> Store(hdfs://zly1.sh.intel.com:8020/user/root/split5.out:org.apache.pig.builtin.PigStorage)
>  - scope-101
> |
> |---D: Filter[bag] - scope-97
>     |   |
>     |   Greater Than or Equal[boolean] - scope-100
>     |   |
>     |   |---Project[int][1] - scope-98
>     |   |
>     |   |---Project[int][4] - scope-99
>     |
>     |---C: New For Each(true,true)[tuple] - scope-96
>         |   |
>         |   Project[bag][1] - scope-94
>         |   |
>         |   Project[bag][2] - scope-95
>         |
>         |---C: Package(Packager)[tuple]{int} - scope-89
>             |
>             |---C: Global Rearrange[tuple] - scope-88
>                 |
>                 |---C: Local Rearrange[tuple]{int}(false) - scope-90
>                 |   |   |
>                 |   |   Project[int][0] - scope-91
>                 |   |
>                 |   |---A: New For Each(false,false,false)[bag] - scope-119
>                 |       |   |
>                 |       |   Cast[int] - scope-114
>                 |       |   |
>                 |       |   |---Project[bytearray][0] - scope-113
>                 |       |   |
>                 |       |   Cast[int] - scope-116
>                 |       |   |
>                 |       |   |---Project[bytearray][1] - scope-115
>                 |       |   |
>                 |       |   Cast[int] - scope-118
>                 |       |   |
>                 |       |   |---Project[bytearray][2] - scope-117
>                 |       |
>                 |       |---A: 
> Load(hdfs://zly1.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage)
>  - scope-112
>                 |
>                 |---C: Local Rearrange[tuple]{int}(false) - scope-92
>                     |   |
>                     |   Project[int][0] - scope-93
>                     |
>                     |---B: New For Each(false,false)[bag] - scope-85
>                         |   |
>                         |   Project[int][0] - scope-81
>                         |   |
>                         |   Project[int][1] - scope-83
>                         |
>                         |---A: New For Each(false,false,false)[bag] - 
> scope-128
>                             |   |
>                             |   Cast[int] - scope-123
>                             |   |
>                             |   |---Project[bytearray][0] - scope-122
>                             |   |
>                             |   Cast[int] - scope-125
>                             |   |
>                             |   |---Project[bytearray][1] - scope-124
>                             |   |
>                             |   Cast[int] - scope-127
>                             |   |
>                             |   |---Project[bytearray][2] - scope-126
>                             |
>                             |---A: 
> Load(hdfs://zly1.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage)
>  - scope-121--------
> {code}
> we just copy subplan(Load:scope-121) of  SparkOperator(scope-102) to 
> SparkOperator(scope-108), connect POLoad(scope-121) with POForEach(scope-119) 
> and POForEach(scope-128). Here POLoad supports multiOutputs. we can use 
> OperatorPlan#connect. If a physical operator does not support multiOutputs, 
> we have to use OperatorPlan#forceConnect to do the connection.  Now we can 
> copy the subplan and generate a new subplan and merge the subplan to the 
> final plan to do multiquery optimization like following to solve this problem.
> {code}
> scope-108
> #--------------------------------------------------
> # Spark Plan                                  
> #--------------------------------------------------
> Spark node scope-108
> D: 
> Store(hdfs://zly1.sh.intel.com:8020/user/root/split5.out:org.apache.pig.builtin.PigStorage)
>  - scope-101
> |
> |---D: Filter[bag] - scope-97
>     |   |
>     |   Greater Than or Equal[boolean] - scope-100
>     |   |
>     |   |---Project[int][1] - scope-98
>     |   |
>     |   |---Project[int][4] - scope-99
>     |
>     |---C: New For Each(true,true)[tuple] - scope-96
>         |   |
>         |   Project[bag][1] - scope-94
>         |   |
>         |   Project[bag][2] - scope-95
>         |
>         |---C: Package(Packager)[tuple]{int} - scope-89
>             |
>             |---C: Global Rearrange[tuple] - scope-88
>                 |
>                 |---C: Local Rearrange[tuple]{int}(false) - scope-90
>                 |   |   |
>                 |   |   Project[int][0] - scope-91
>                 |   |
>                 |   |---A: New For Each(false,false,false)[bag] - scope-119
>                 |       |   |
>                 |       |   Cast[int] - scope-114
>                 |       |   |
>                 |       |   |---Project[bytearray][0] - scope-113
>                 |       |   |
>                 |       |   Cast[int] - scope-116
>                 |       |   |
>                 |       |   |---Project[bytearray][1] - scope-115
>                 |       |   |
>                 |       |   Cast[int] - scope-118
>                 |       |   |
>                 |       |   |---Project[bytearray][2] - scope-117
>                 |       |
>                 |       |---A: 
> Load(hdfs://zly1.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage)
>  - scope-112
>                 |
>                 |---C: Local Rearrange[tuple]{int}(false) - scope-92
>                     |   |
>                     |   Project[int][0] - scope-93
>                     |
>                     |---B: New For Each(false,false)[bag] - scope-85
>                         |   |
>                         |   Project[int][0] - scope-81
>                         |   |
>                         |   Project[int][1] - scope-83
>                         |
>                         |---A: New For Each(false,false,false)[bag] - 
> scope-128
>                             |   |
>                             |   Cast[int] - scope-123
>                             |   |
>                             |   |---Project[bytearray][0] - scope-122
>                             |   |
>                             |   Cast[int] - scope-125
>                             |   |
>                             |   |---Project[bytearray][1] - scope-124
>                             |   |
>                             |   Cast[int] - scope-127
>                             |   |
>                             |   |---Project[bytearray][2] - scope-126
>                             |
>                             |---A: 
> Load(hdfs://zly1.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage)
>  - scope-132--------
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to