[ 
https://issues.apache.org/jira/browse/PIG-4871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liyunzhang_intel updated PIG-4871:
----------------------------------
    Attachment: PIG-4871_2.patch

[~kexianda],[~mohitsabharwal],[~pallavi.rao], [~xuefuz]:

PIG-4871_2.patch 's changes:
1. remove OperatorPlan#forceConnect
2. clone the physical plan of spliter and then merge the clone plan with 
splittee's plan like what mr does.

All unit tests pass after using this patch. Patch of PIG-4797 depends on this.

>  Not use OperatorPlan#forceConnect in MultiQueryOptimizationSpark
> -----------------------------------------------------------------
>
>                 Key: PIG-4871
>                 URL: https://issues.apache.org/jira/browse/PIG-4871
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>             Fix For: spark-branch
>
>         Attachments: PIG-4871_2.patch
>
>
> In current code base, we use OperatorPlan#forceConnect() while merge the 
> physical plan of spliter and splittee in MultiQueryOptimizationSpark.
> The difference between OperatorPlan#connect and OperatorPlan#forceConnect is 
> not checking whether support multiOutputs and multiInputs or not in 
> forceConnect.
> {code}
>  /**
>      * connect from and to and ignore some judgements: like ignoring judge 
> whether from operator supports multiOutputs
>      * and whether to operator supports multiInputs
>      *
>      * @param from Operator data will flow from.
>      * @param to   Operator data will flow to.
>      * @throws PlanException if connect from or to which is not in the plan
>      */
>     public void forceConnect(E from, E to) throws PlanException {
>         markDirty();
>         // Check that both nodes are in the plan.
>         checkInPlan(from);
>         checkInPlan(to);
>         mFromEdges.put(from, to);
>         mToEdges.put(to, from);
>     }
> {code}
> Let's use an example to explain why add forceConnect before.
> {code}
> A = load './split5'  AS (a0:int, a1:int, a2:int);
> B = foreach A generate a0, a1;
> C = join A by a0, B by a0;
> D = filter C by A::a1>=B::a1;
> store D into './split5.out';
> {code}
> before multiquery optimization
> {code}
> scope-37->scope-43 
> scope-43
> #--------------------------------------------------
> # Spark Plan                                  
> #--------------------------------------------------
> Spark node scope-37
> Store(hdfs://zly2.sh.intel.com:8020/tmp/temp-535495592/tmp-2029463812:org.apache.pig.impl.io.InterStorage)
>  - scope-38
> |
> |---A: New For Each(false,false,false)[bag] - scope-10
>     |   |
>     |   Cast[int] - scope-2
>     |   |
>     |   |---Project[bytearray][0] - scope-1
>     |   |
>     |   Cast[int] - scope-5
>     |   |
>     |   |---Project[bytearray][1] - scope-4
>     |   |
>     |   Cast[int] - scope-8
>     |   |
>     |   |---Project[bytearray][2] - scope-7
>     |
>     |---A: 
> Load(hdfs://zly2.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage)
>  - scope-0--------
> Spark node scope-43
> D: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36
> |
> |---D: Filter[bag] - scope-32
>     |   |
>     |   Greater Than or Equal[boolean] - scope-35
>     |   |
>     |   |---Project[int][1] - scope-33
>     |   |
>     |   |---Project[int][4] - scope-34
>     |
>     |---C: New For Each(true,true)[tuple] - scope-31
>         |   |
>         |   Project[bag][1] - scope-29
>         |   |
>         |   Project[bag][2] - scope-30
>         |
>         |---C: Package(Packager)[tuple]{int} - scope-24
>             |
>             |---C: Global Rearrange[tuple] - scope-23
>                 |
>                 |---C: Local Rearrange[tuple]{int}(false) - scope-25
>                 |   |   |
>                 |   |   Project[int][0] - scope-26
>                 |   |
>                 |   
> |---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp-535495592/tmp-2029463812:org.apache.pig.impl.io.InterStorage)
>  - scope-39
>                 |
>                 |---C: Local Rearrange[tuple]{int}(false) - scope-27
>                     |   |
>                     |   Project[int][0] - scope-28
>                     |
>                     |---B: New For Each(false,false)[bag] - scope-20
>                         |   |
>                         |   Project[int][0] - scope-16
>                         |   |
>                         |   Project[int][1] - scope-18
>                         |
>                         
> |---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp-535495592/tmp-2029463812:org.apache.pig.impl.io.InterStorage)
>  - scope-41--------{code}
> after multiquery optimization
> {code}
> after multiquery optimization:
> scope-37
> #--------------------------------------------------
> # Spark Plan                                  
> #--------------------------------------------------
> Spark node scope-37
> D: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36
> |
> |---D: Filter[bag] - scope-32
>     |   |
>     |   Greater Than or Equal[boolean] - scope-35
>     |   |
>     |   |---Project[int][1] - scope-33
>     |   |
>     |   |---Project[int][4] - scope-34
>     |
>     |---C: New For Each(true,true)[tuple] - scope-31
>         |   |
>         |   Project[bag][1] - scope-29
>         |   |
>         |   Project[bag][2] - scope-30
>         |
>         |---C: Package(Packager)[tuple]{int} - scope-24
>             |
>             |---C: Global Rearrange[tuple] - scope-23
>                 |
>                 |---C: Local Rearrange[tuple]{int}(false) - scope-25
>                 |   |   |
>                 |   |   Project[int][0] - scope-26
>                 |   |
>                 |   |---A: New For Each(false,false,false)[bag] - scope-10
>                 |       |   |
>                 |       |   Cast[int] - scope-2
>                 |       |   |
>                 |       |   |---Project[bytearray][0] - scope-1
>                 |       |   |
>                 |       |   Cast[int] - scope-5
>                 |       |   |
>                 |       |   |---Project[bytearray][1] - scope-4
>                 |       |   |
>                 |       |   Cast[int] - scope-8
>                 |       |   |
>                 |       |   |---Project[bytearray][2] - scope-7
>                 |       |
>                 |       |---A: 
> Load(hdfs://zly2.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage)
>  - scope-0
>                 |
>                 |---C: Local Rearrange[tuple]{int}(false) - scope-27
>                     |   |
>                     |   Project[int][0] - scope-28
>                     |
>                     |---B: New For Each(false,false)[bag] - scope-20
>                         |   |
>                         |   Project[int][0] - scope-16
>                         |   |
>                         |   Project[int][1] - scope-18
>                         |
>                         |---A: New For Each(false,false,false)[bag] - scope-10
>                             |   |
>                             |   Cast[int] - scope-2
>                             |   |
>                             |   |---Project[bytearray][0] - scope-1
>                             |   |
>                             |   Cast[int] - scope-5
>                             |   |
>                             |   |---Project[bytearray][1] - scope-4
>                             |   |
>                             |   Cast[int] - scope-8
>                             |   |
>                             |   |---Project[bytearray][2] - scope-7
>                             |
>                             |---A: 
> Load(hdfs://zly2.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage)
>  - scope-0--------
> {code}
> We connect ForEach(scope-10) in SparkNode(scope-37) with ForEach(scope-20) 
> and LocalRearrange(scope-25) in SparkNode(scope-43). The successors of 
> ForEach(scope-10) are scope-20 and scope-25 after multiquery optimization. 
> Here we need use OperatorPlan#forceConnect(from, to) because 
> POForEach#supportsMultipleOutputs are false. *Why there is no problem in mr 
> mode?* in mr, clone ForEach(scope-10) as ForEach(scope-xxx), so the size of 
> successors of POForEach is always 1.
> before multiquery optimization in mr mode:
> {code}
> #--------------------------------------------------
> # Map Reduce Plan                                  
> #--------------------------------------------------
> MapReduce node scope-37
> Map Plan
> Store(hdfs://zly2.sh.intel.com:8020/tmp/temp825700611/tmp-47636243:org.apache.pig.impl.io.InterStorage)
>  - scope-38
> |
> |---A: New For Each(false,false,false)[bag] - scope-10
>     |   |
>     |   Cast[int] - scope-2
>     |   |
>     |   |---Project[bytearray][0] - scope-1
>     |   |
>     |   Cast[int] - scope-5
>     |   |
>     |   |---Project[bytearray][1] - scope-4
>     |   |
>     |   Cast[int] - scope-8
>     |   |
>     |   |---Project[bytearray][2] - scope-7
>     |
>     |---A: 
> Load(hdfs://zly2.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage)
>  - scope-0--------
> Global sort: false
> ----------------
> MapReduce node scope-43
> Map Plan
> Union[tuple] - scope-44
> |
> |---C: Local Rearrange[tuple]{int}(false) - scope-25
> |   |   |
> |   |   Project[int][0] - scope-26
> |   |
> |   
> |---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp825700611/tmp-47636243:org.apache.pig.impl.io.InterStorage)
>  - scope-39
> |
> |---C: Local Rearrange[tuple]{int}(false) - scope-27
>     |   |
>     |   Project[int][0] - scope-28
>     |
>     |---B: New For Each(false,false)[bag] - scope-20
>         |   |
>         |   Project[int][0] - scope-16
>         |   |
>         |   Project[int][1] - scope-18
>         |
>         
> |---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp825700611/tmp-47636243:org.apache.pig.impl.io.InterStorage)
>  - scope-41--------
> Reduce Plan
> D: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36
> |
> |---D: Filter[bag] - scope-32
>     |   |
>     |   Greater Than or Equal[boolean] - scope-35
>     |   |
>     |   |---Project[int][1] - scope-33
>     |   |
>     |   |---Project[int][4] - scope-34
>     |
>     |---C: Package(JoinPackager(true,true))[tuple]{int} - scope-24--------
> Global sort: false
> ----------------
> {code}
> after multiquery optimization in mr mode, scope-53 and scope-20 is the  clone 
> of  scope-10 
> {code}
> #--------------------------------------------------
> # Map Reduce Plan                                  
> #--------------------------------------------------
> MapReduce node scope-43
> Map Plan
> Union[tuple] - scope-44
> |
> |---C: Local Rearrange[tuple]{int}(false) - scope-25
> |   |   |
> |   |   Project[int][0] - scope-26
> |   |
> |   |---A: New For Each(false,false,false)[bag] - scope-53
> |       |   |
> |       |   Cast[int] - scope-48
> |       |   |
> |       |   |---Project[bytearray][0] - scope-47
> |       |   |
> |       |   Cast[int] - scope-50
> |       |   |
> |       |   |---Project[bytearray][1] - scope-49
> |       |   |
> |       |   Cast[int] - scope-52
> |       |   |
> |       |   |---Project[bytearray][2] - scope-51
> |       |
> |       |---A: 
> Load(hdfs://zly2.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage)
>  - scope-46
> |
> |---C: Local Rearrange[tuple]{int}(false) - scope-27
>     |   |
>     |   Project[int][0] - scope-28
>     |
>     |---B: New For Each(false,false)[bag] - scope-20
>         |   |
>         |   Project[int][0] - scope-16
>         |   |
>         |   Project[int][1] - scope-18
>         |
>         |---A: New For Each(false,false,false)[bag] - scope-61
>             |   |
>             |   Cast[int] - scope-56
>             |   |
>             |   |---Project[bytearray][0] - scope-55
>             |   |
>             |   Cast[int] - scope-58
>             |   |
>             |   |---Project[bytearray][1] - scope-57
>             |   |
>             |   Cast[int] - scope-60
>             |   |
>             |   |---Project[bytearray][2] - scope-59
>             |
>             |---A: 
> Load(hdfs://zly2.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage)
>  - scope-54--------
> Reduce Plan
> D: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36
> |
> |---D: Filter[bag] - scope-32
>     |   |
>     |   Greater Than or Equal[boolean] - scope-35
>     |   |
>     |   |---Project[int][1] - scope-33
>     |   |
>     |   |---Project[int][4] - scope-34
>     |
>     |---C: Package(JoinPackager(true,true))[tuple]{int} - scope-24--------
> Global sort: false
> ----------------
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to