[ 
https://issues.apache.org/jira/browse/PIG-4871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liyunzhang_intel updated PIG-4871:
----------------------------------
    Description: 
In current code base, we use OperatorPlan#forceConnect() while merge the 
physical plan of spliter and splittee in MultiQueryOptimizationSpark.
The difference between OperatorPlan#connect and OperatorPlan#forceConnect is 
not checking whether support multiOutputs and multiInputs or not in 
forceConnect.
{code}
 /**
     * connect from and to and ignore some judgements: like ignoring judge 
whether from operator supports multiOutputs
     * and whether to operator supports multiInputs
     *
     * @param from Operator data will flow from.
     * @param to   Operator data will flow to.
     * @throws PlanException if connect from or to which is not in the plan
     */
    public void forceConnect(E from, E to) throws PlanException {
        markDirty();

        // Check that both nodes are in the plan.
        checkInPlan(from);
        checkInPlan(to);
        mFromEdges.put(from, to);
        mToEdges.put(to, from);
    }

{code}
Let's use an example to explain why add forceConnect before.
{code}
A = load './split5'  AS (a0:int, a1:int, a2:int);
B = foreach A generate a0, a1;
C = join A by a0, B by a0;
D = filter C by A::a1>=B::a1;
store D into './split5.out';
{code}
before multiquery optimization
{code}
scope-37->scope-43 
scope-43
#--------------------------------------------------
# Spark Plan                                  
#--------------------------------------------------

Spark node scope-37
Store(hdfs://zly2.sh.intel.com:8020/tmp/temp-535495592/tmp-2029463812:org.apache.pig.impl.io.InterStorage)
 - scope-38
|
|---A: New For Each(false,false,false)[bag] - scope-10
    |   |
    |   Cast[int] - scope-2
    |   |
    |   |---Project[bytearray][0] - scope-1
    |   |
    |   Cast[int] - scope-5
    |   |
    |   |---Project[bytearray][1] - scope-4
    |   |
    |   Cast[int] - scope-8
    |   |
    |   |---Project[bytearray][2] - scope-7
    |
    |---A: 
Load(hdfs://zly2.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage)
 - scope-0--------

Spark node scope-43
D: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36
|
|---D: Filter[bag] - scope-32
    |   |
    |   Greater Than or Equal[boolean] - scope-35
    |   |
    |   |---Project[int][1] - scope-33
    |   |
    |   |---Project[int][4] - scope-34
    |
    |---C: New For Each(true,true)[tuple] - scope-31
        |   |
        |   Project[bag][1] - scope-29
        |   |
        |   Project[bag][2] - scope-30
        |
        |---C: Package(Packager)[tuple]{int} - scope-24
            |
            |---C: Global Rearrange[tuple] - scope-23
                |
                |---C: Local Rearrange[tuple]{int}(false) - scope-25
                |   |   |
                |   |   Project[int][0] - scope-26
                |   |
                |   
|---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp-535495592/tmp-2029463812:org.apache.pig.impl.io.InterStorage)
 - scope-39
                |
                |---C: Local Rearrange[tuple]{int}(false) - scope-27
                    |   |
                    |   Project[int][0] - scope-28
                    |
                    |---B: New For Each(false,false)[bag] - scope-20
                        |   |
                        |   Project[int][0] - scope-16
                        |   |
                        |   Project[int][1] - scope-18
                        |
                        
|---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp-535495592/tmp-2029463812:org.apache.pig.impl.io.InterStorage)
 - scope-41--------{code}

after multiquery optimization
{code}
after multiquery optimization:
scope-37
#--------------------------------------------------
# Spark Plan                                  
#--------------------------------------------------

Spark node scope-37
D: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36
|
|---D: Filter[bag] - scope-32
    |   |
    |   Greater Than or Equal[boolean] - scope-35
    |   |
    |   |---Project[int][1] - scope-33
    |   |
    |   |---Project[int][4] - scope-34
    |
    |---C: New For Each(true,true)[tuple] - scope-31
        |   |
        |   Project[bag][1] - scope-29
        |   |
        |   Project[bag][2] - scope-30
        |
        |---C: Package(Packager)[tuple]{int} - scope-24
            |
            |---C: Global Rearrange[tuple] - scope-23
                |
                |---C: Local Rearrange[tuple]{int}(false) - scope-25
                |   |   |
                |   |   Project[int][0] - scope-26
                |   |
                |   |---A: New For Each(false,false,false)[bag] - scope-10
                |       |   |
                |       |   Cast[int] - scope-2
                |       |   |
                |       |   |---Project[bytearray][0] - scope-1
                |       |   |
                |       |   Cast[int] - scope-5
                |       |   |
                |       |   |---Project[bytearray][1] - scope-4
                |       |   |
                |       |   Cast[int] - scope-8
                |       |   |
                |       |   |---Project[bytearray][2] - scope-7
                |       |
                |       |---A: 
Load(hdfs://zly2.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage)
 - scope-0
                |
                |---C: Local Rearrange[tuple]{int}(false) - scope-27
                    |   |
                    |   Project[int][0] - scope-28
                    |
                    |---B: New For Each(false,false)[bag] - scope-20
                        |   |
                        |   Project[int][0] - scope-16
                        |   |
                        |   Project[int][1] - scope-18
                        |
                        |---A: New For Each(false,false,false)[bag] - scope-10
                            |   |
                            |   Cast[int] - scope-2
                            |   |
                            |   |---Project[bytearray][0] - scope-1
                            |   |
                            |   Cast[int] - scope-5
                            |   |
                            |   |---Project[bytearray][1] - scope-4
                            |   |
                            |   Cast[int] - scope-8
                            |   |
                            |   |---Project[bytearray][2] - scope-7
                            |
                            |---A: 
Load(hdfs://zly2.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage)
 - scope-0--------
{code}

We connect ForEach(scope-10) in SparkNode(scope-37) with ForEach(scope-20) and 
LocalRearrange(scope-25) in SparkNode(scope-43). The successors of 
ForEach(scope-10) are scope-20 and scope-25 after multiquery optimization. Here 
we need use OperatorPlan#forceConnect(from, to) because 
POForEach#supportsMultipleOutputs are false. *Why there is no problem in mr 
mode?* in mr, clone ForEach(scope-10) as ForEach(scope-xxx), so the size of 
successors of POForEach is always 1.
after multiquery optimization in mr mode
{code}
#--------------------------------------------------
# Map Reduce Plan                                  
#--------------------------------------------------
MapReduce node scope-43
Map Plan
Union[tuple] - scope-44
|
|---C: Local Rearrange[tuple]{int}(false) - scope-25
|   |   |
|   |   Project[int][0] - scope-26
|   |
|   |---A: New For Each(false,false,false)[bag] - scope-53
|       |   |
|       |   Cast[int] - scope-48
|       |   |
|       |   |---Project[bytearray][0] - scope-47
|       |   |
|       |   Cast[int] - scope-50
|       |   |
|       |   |---Project[bytearray][1] - scope-49
|       |   |
|       |   Cast[int] - scope-52
|       |   |
|       |   |---Project[bytearray][2] - scope-51
|       |
|       |---A: 
Load(hdfs://zly2.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage)
 - scope-46
|
|---C: Local Rearrange[tuple]{int}(false) - scope-27
    |   |
    |   Project[int][0] - scope-28
    |
    |---B: New For Each(false,false)[bag] - scope-20
        |   |
        |   Project[int][0] - scope-16
        |   |
        |   Project[int][1] - scope-18
        |
        |---A: New For Each(false,false,false)[bag] - scope-61
            |   |
            |   Cast[int] - scope-56
            |   |
            |   |---Project[bytearray][0] - scope-55
            |   |
            |   Cast[int] - scope-58
            |   |
            |   |---Project[bytearray][1] - scope-57
            |   |
            |   Cast[int] - scope-60
            |   |
            |   |---Project[bytearray][2] - scope-59
            |
            |---A: 
Load(hdfs://zly2.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage)
 - scope-54--------
Reduce Plan
D: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36
|
|---D: Filter[bag] - scope-32
    |   |
    |   Greater Than or Equal[boolean] - scope-35
    |   |
    |   |---Project[int][1] - scope-33
    |   |
    |   |---Project[int][4] - scope-34
    |
    |---C: Package(JoinPackager(true,true))[tuple]{int} - scope-24--------
Global sort: false
----------------
{code}

  was:
In current code base, we use OperatorPlan#forceConnect() while merge the 
physical plan of spliter and splittee in MultiQueryOptimizationSpark.
The difference between OperatorPlan#connect and OperatorPlan#forceConnect is 
not checking whether support multiOutputs and multiInputs or not in 
forceConnect.
{code}
 /**
     * connect from and to and ignore some judgements: like ignoring judge 
whether from operator supports multiOutputs
     * and whether to operator supports multiInputs
     *
     * @param from Operator data will flow from.
     * @param to   Operator data will flow to.
     * @throws PlanException if connect from or to which is not in the plan
     */
    public void forceConnect(E from, E to) throws PlanException {
        markDirty();

        // Check that both nodes are in the plan.
        checkInPlan(from);
        checkInPlan(to);
        mFromEdges.put(from, to);
        mToEdges.put(to, from);
    }

{code}
Let's use an example to explain why add forceConnect before.
{code}
before multiquery optimization:
scope-102->scope-108 
scope-108
#--------------------------------------------------
# Spark Plan                                  
#--------------------------------------------------

Spark node scope-102
Store(hdfs://zly1.sh.intel.com:8020/tmp/temp1293488480/tmp128312889:org.apache.pig.impl.io.InterStorage)
 - scope-103
|
|---A: New For Each(false,false,false)[bag] - scope-75
    |   |
    |   Cast[int] - scope-67
    |   |
    |   |---Project[bytearray][0] - scope-66
    |   |
    |   Cast[int] - scope-70
    |   |
    |   |---Project[bytearray][1] - scope-69
    |   |
    |   Cast[int] - scope-73
    |   |
    |   |---Project[bytearray][2] - scope-72
    |
    |---A: 
Load(hdfs://zly1.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage)
 - scope-65--------

Spark node scope-108
D: 
Store(hdfs://zly1.sh.intel.com:8020/user/root/split5.out:org.apache.pig.builtin.PigStorage)
 - scope-101
|
|---D: Filter[bag] - scope-97
    |   |
    |   Greater Than or Equal[boolean] - scope-100
    |   |
    |   |---Project[int][1] - scope-98
    |   |
    |   |---Project[int][4] - scope-99
    |
    |---C: New For Each(true,true)[tuple] - scope-96
        |   |
        |   Project[bag][1] - scope-94
        |   |
        |   Project[bag][2] - scope-95
        |
        |---C: Package(Packager)[tuple]{int} - scope-89
            |
            |---C: Global Rearrange[tuple] - scope-88
                |
                |---C: Local Rearrange[tuple]{int}(false) - scope-90
                |   |   |
                |   |   Project[int][0] - scope-91
                |   |
                |   
|---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp1293488480/tmp128312889:org.apache.pig.impl.io.InterStorage)
 - scope-104
                |
                |---C: Local Rearrange[tuple]{int}(false) - scope-92
                    |   |
                    |   Project[int][0] - scope-93
                    |
                    |---B: New For Each(false,false)[bag] - scope-85
                        |   |
                        |   Project[int][0] - scope-81
                        |   |
                        |   Project[int][1] - scope-83
                        |
                        
|---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp1293488480/tmp128312889:org.apache.pig.impl.io.InterStorage)
 - scope-106--------
after multiquery optimization:
scope-108
#--------------------------------------------------
# Spark Plan                                  
#--------------------------------------------------

Spark node scope-108
D: 
Store(hdfs://zly1.sh.intel.com:8020/user/root/split5.out:org.apache.pig.builtin.PigStorage)
 - scope-101
|
|---D: Filter[bag] - scope-97
    |   |
    |   Greater Than or Equal[boolean] - scope-100
    |   |
    |   |---Project[int][1] - scope-98
    |   |
    |   |---Project[int][4] - scope-99
    |
    |---C: New For Each(true,true)[tuple] - scope-96
        |   |
        |   Project[bag][1] - scope-94
        |   |
        |   Project[bag][2] - scope-95
        |
        |---C: Package(Packager)[tuple]{int} - scope-89
            |
            |---C: Global Rearrange[tuple] - scope-88
                |
                |---C: Local Rearrange[tuple]{int}(false) - scope-90
                |   |   |
                |   |   Project[int][0] - scope-91
                |   |
                |   |---A: New For Each(false,false,false)[bag] - scope-119
                |       |   |
                |       |   Cast[int] - scope-114
                |       |   |
                |       |   |---Project[bytearray][0] - scope-113
                |       |   |
                |       |   Cast[int] - scope-116
                |       |   |
                |       |   |---Project[bytearray][1] - scope-115
                |       |   |
                |       |   Cast[int] - scope-118
                |       |   |
                |       |   |---Project[bytearray][2] - scope-117
                |       |
                |       |---A: 
Load(hdfs://zly1.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage)
 - scope-112
                |
                |---C: Local Rearrange[tuple]{int}(false) - scope-92
                    |   |
                    |   Project[int][0] - scope-93
                    |
                    |---B: New For Each(false,false)[bag] - scope-85
                        |   |
                        |   Project[int][0] - scope-81
                        |   |
                        |   Project[int][1] - scope-83
                        |
                        |---A: New For Each(false,false,false)[bag] - scope-128
                            |   |
                            |   Cast[int] - scope-123
                            |   |
                            |   |---Project[bytearray][0] - scope-122
                            |   |
                            |   Cast[int] - scope-125
                            |   |
                            |   |---Project[bytearray][1] - scope-124
                            |   |
                            |   Cast[int] - scope-127
                            |   |
                            |   |---Project[bytearray][2] - scope-126
                            |
                            |---A: 
Load(hdfs://zly1.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage)
 - scope-121--------
{code}

we just copy subplan(Load:scope-121) of  SparkOperator(scope-102) to 
SparkOperator(scope-108), connect POLoad(scope-121) with POForEach(scope-119) 
and POForEach(scope-128). Here POLoad supports multiOutputs. we can use 
OperatorPlan#connect. If a physical operator does not support multiOutputs, we 
have to use OperatorPlan#forceConnect to do the connection.  Now we can copy 
the subplan and generate a new subplan and merge the subplan to the final plan 
to do multiquery optimization like following to solve this problem.
{code}
scope-108
#--------------------------------------------------
# Spark Plan                                  
#--------------------------------------------------

Spark node scope-108
D: 
Store(hdfs://zly1.sh.intel.com:8020/user/root/split5.out:org.apache.pig.builtin.PigStorage)
 - scope-101
|
|---D: Filter[bag] - scope-97
    |   |
    |   Greater Than or Equal[boolean] - scope-100
    |   |
    |   |---Project[int][1] - scope-98
    |   |
    |   |---Project[int][4] - scope-99
    |
    |---C: New For Each(true,true)[tuple] - scope-96
        |   |
        |   Project[bag][1] - scope-94
        |   |
        |   Project[bag][2] - scope-95
        |
        |---C: Package(Packager)[tuple]{int} - scope-89
            |
            |---C: Global Rearrange[tuple] - scope-88
                |
                |---C: Local Rearrange[tuple]{int}(false) - scope-90
                |   |   |
                |   |   Project[int][0] - scope-91
                |   |
                |   |---A: New For Each(false,false,false)[bag] - scope-119
                |       |   |
                |       |   Cast[int] - scope-114
                |       |   |
                |       |   |---Project[bytearray][0] - scope-113
                |       |   |
                |       |   Cast[int] - scope-116
                |       |   |
                |       |   |---Project[bytearray][1] - scope-115
                |       |   |
                |       |   Cast[int] - scope-118
                |       |   |
                |       |   |---Project[bytearray][2] - scope-117
                |       |
                |       |---A: 
Load(hdfs://zly1.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage)
 - scope-112
                |
                |---C: Local Rearrange[tuple]{int}(false) - scope-92
                    |   |
                    |   Project[int][0] - scope-93
                    |
                    |---B: New For Each(false,false)[bag] - scope-85
                        |   |
                        |   Project[int][0] - scope-81
                        |   |
                        |   Project[int][1] - scope-83
                        |
                        |---A: New For Each(false,false,false)[bag] - scope-128
                            |   |
                            |   Cast[int] - scope-123
                            |   |
                            |   |---Project[bytearray][0] - scope-122
                            |   |
                            |   Cast[int] - scope-125
                            |   |
                            |   |---Project[bytearray][1] - scope-124
                            |   |
                            |   Cast[int] - scope-127
                            |   |
                            |   |---Project[bytearray][2] - scope-126
                            |
                            |---A: 
Load(hdfs://zly1.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage)
 - scope-132--------
{code}



>  Not use OperatorPlan#forceConnect in MultiQueryOptimizationSpark
> -----------------------------------------------------------------
>
>                 Key: PIG-4871
>                 URL: https://issues.apache.org/jira/browse/PIG-4871
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>             Fix For: spark-branch
>
>         Attachments: PIG-4871.patch
>
>
> In current code base, we use OperatorPlan#forceConnect() while merge the 
> physical plan of spliter and splittee in MultiQueryOptimizationSpark.
> The difference between OperatorPlan#connect and OperatorPlan#forceConnect is 
> not checking whether support multiOutputs and multiInputs or not in 
> forceConnect.
> {code}
>  /**
>      * connect from and to and ignore some judgements: like ignoring judge 
> whether from operator supports multiOutputs
>      * and whether to operator supports multiInputs
>      *
>      * @param from Operator data will flow from.
>      * @param to   Operator data will flow to.
>      * @throws PlanException if connect from or to which is not in the plan
>      */
>     public void forceConnect(E from, E to) throws PlanException {
>         markDirty();
>         // Check that both nodes are in the plan.
>         checkInPlan(from);
>         checkInPlan(to);
>         mFromEdges.put(from, to);
>         mToEdges.put(to, from);
>     }
> {code}
> Let's use an example to explain why add forceConnect before.
> {code}
> A = load './split5'  AS (a0:int, a1:int, a2:int);
> B = foreach A generate a0, a1;
> C = join A by a0, B by a0;
> D = filter C by A::a1>=B::a1;
> store D into './split5.out';
> {code}
> before multiquery optimization
> {code}
> scope-37->scope-43 
> scope-43
> #--------------------------------------------------
> # Spark Plan                                  
> #--------------------------------------------------
> Spark node scope-37
> Store(hdfs://zly2.sh.intel.com:8020/tmp/temp-535495592/tmp-2029463812:org.apache.pig.impl.io.InterStorage)
>  - scope-38
> |
> |---A: New For Each(false,false,false)[bag] - scope-10
>     |   |
>     |   Cast[int] - scope-2
>     |   |
>     |   |---Project[bytearray][0] - scope-1
>     |   |
>     |   Cast[int] - scope-5
>     |   |
>     |   |---Project[bytearray][1] - scope-4
>     |   |
>     |   Cast[int] - scope-8
>     |   |
>     |   |---Project[bytearray][2] - scope-7
>     |
>     |---A: 
> Load(hdfs://zly2.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage)
>  - scope-0--------
> Spark node scope-43
> D: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36
> |
> |---D: Filter[bag] - scope-32
>     |   |
>     |   Greater Than or Equal[boolean] - scope-35
>     |   |
>     |   |---Project[int][1] - scope-33
>     |   |
>     |   |---Project[int][4] - scope-34
>     |
>     |---C: New For Each(true,true)[tuple] - scope-31
>         |   |
>         |   Project[bag][1] - scope-29
>         |   |
>         |   Project[bag][2] - scope-30
>         |
>         |---C: Package(Packager)[tuple]{int} - scope-24
>             |
>             |---C: Global Rearrange[tuple] - scope-23
>                 |
>                 |---C: Local Rearrange[tuple]{int}(false) - scope-25
>                 |   |   |
>                 |   |   Project[int][0] - scope-26
>                 |   |
>                 |   
> |---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp-535495592/tmp-2029463812:org.apache.pig.impl.io.InterStorage)
>  - scope-39
>                 |
>                 |---C: Local Rearrange[tuple]{int}(false) - scope-27
>                     |   |
>                     |   Project[int][0] - scope-28
>                     |
>                     |---B: New For Each(false,false)[bag] - scope-20
>                         |   |
>                         |   Project[int][0] - scope-16
>                         |   |
>                         |   Project[int][1] - scope-18
>                         |
>                         
> |---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp-535495592/tmp-2029463812:org.apache.pig.impl.io.InterStorage)
>  - scope-41--------{code}
> after multiquery optimization
> {code}
> after multiquery optimization:
> scope-37
> #--------------------------------------------------
> # Spark Plan                                  
> #--------------------------------------------------
> Spark node scope-37
> D: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36
> |
> |---D: Filter[bag] - scope-32
>     |   |
>     |   Greater Than or Equal[boolean] - scope-35
>     |   |
>     |   |---Project[int][1] - scope-33
>     |   |
>     |   |---Project[int][4] - scope-34
>     |
>     |---C: New For Each(true,true)[tuple] - scope-31
>         |   |
>         |   Project[bag][1] - scope-29
>         |   |
>         |   Project[bag][2] - scope-30
>         |
>         |---C: Package(Packager)[tuple]{int} - scope-24
>             |
>             |---C: Global Rearrange[tuple] - scope-23
>                 |
>                 |---C: Local Rearrange[tuple]{int}(false) - scope-25
>                 |   |   |
>                 |   |   Project[int][0] - scope-26
>                 |   |
>                 |   |---A: New For Each(false,false,false)[bag] - scope-10
>                 |       |   |
>                 |       |   Cast[int] - scope-2
>                 |       |   |
>                 |       |   |---Project[bytearray][0] - scope-1
>                 |       |   |
>                 |       |   Cast[int] - scope-5
>                 |       |   |
>                 |       |   |---Project[bytearray][1] - scope-4
>                 |       |   |
>                 |       |   Cast[int] - scope-8
>                 |       |   |
>                 |       |   |---Project[bytearray][2] - scope-7
>                 |       |
>                 |       |---A: 
> Load(hdfs://zly2.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage)
>  - scope-0
>                 |
>                 |---C: Local Rearrange[tuple]{int}(false) - scope-27
>                     |   |
>                     |   Project[int][0] - scope-28
>                     |
>                     |---B: New For Each(false,false)[bag] - scope-20
>                         |   |
>                         |   Project[int][0] - scope-16
>                         |   |
>                         |   Project[int][1] - scope-18
>                         |
>                         |---A: New For Each(false,false,false)[bag] - scope-10
>                             |   |
>                             |   Cast[int] - scope-2
>                             |   |
>                             |   |---Project[bytearray][0] - scope-1
>                             |   |
>                             |   Cast[int] - scope-5
>                             |   |
>                             |   |---Project[bytearray][1] - scope-4
>                             |   |
>                             |   Cast[int] - scope-8
>                             |   |
>                             |   |---Project[bytearray][2] - scope-7
>                             |
>                             |---A: 
> Load(hdfs://zly2.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage)
>  - scope-0--------
> {code}
> We connect ForEach(scope-10) in SparkNode(scope-37) with ForEach(scope-20) 
> and LocalRearrange(scope-25) in SparkNode(scope-43). The successors of 
> ForEach(scope-10) are scope-20 and scope-25 after multiquery optimization. 
> Here we need use OperatorPlan#forceConnect(from, to) because 
> POForEach#supportsMultipleOutputs are false. *Why there is no problem in mr 
> mode?* in mr, clone ForEach(scope-10) as ForEach(scope-xxx), so the size of 
> successors of POForEach is always 1.
> after multiquery optimization in mr mode
> {code}
> #--------------------------------------------------
> # Map Reduce Plan                                  
> #--------------------------------------------------
> MapReduce node scope-43
> Map Plan
> Union[tuple] - scope-44
> |
> |---C: Local Rearrange[tuple]{int}(false) - scope-25
> |   |   |
> |   |   Project[int][0] - scope-26
> |   |
> |   |---A: New For Each(false,false,false)[bag] - scope-53
> |       |   |
> |       |   Cast[int] - scope-48
> |       |   |
> |       |   |---Project[bytearray][0] - scope-47
> |       |   |
> |       |   Cast[int] - scope-50
> |       |   |
> |       |   |---Project[bytearray][1] - scope-49
> |       |   |
> |       |   Cast[int] - scope-52
> |       |   |
> |       |   |---Project[bytearray][2] - scope-51
> |       |
> |       |---A: 
> Load(hdfs://zly2.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage)
>  - scope-46
> |
> |---C: Local Rearrange[tuple]{int}(false) - scope-27
>     |   |
>     |   Project[int][0] - scope-28
>     |
>     |---B: New For Each(false,false)[bag] - scope-20
>         |   |
>         |   Project[int][0] - scope-16
>         |   |
>         |   Project[int][1] - scope-18
>         |
>         |---A: New For Each(false,false,false)[bag] - scope-61
>             |   |
>             |   Cast[int] - scope-56
>             |   |
>             |   |---Project[bytearray][0] - scope-55
>             |   |
>             |   Cast[int] - scope-58
>             |   |
>             |   |---Project[bytearray][1] - scope-57
>             |   |
>             |   Cast[int] - scope-60
>             |   |
>             |   |---Project[bytearray][2] - scope-59
>             |
>             |---A: 
> Load(hdfs://zly2.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage)
>  - scope-54--------
> Reduce Plan
> D: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36
> |
> |---D: Filter[bag] - scope-32
>     |   |
>     |   Greater Than or Equal[boolean] - scope-35
>     |   |
>     |   |---Project[int][1] - scope-33
>     |   |
>     |   |---Project[int][4] - scope-34
>     |
>     |---C: Package(JoinPackager(true,true))[tuple]{int} - scope-24--------
> Global sort: false
> ----------------
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to