[ 
https://issues.apache.org/jira/browse/PIG-4522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14528019#comment-14528019
 ] 

liyunzhang_intel commented on PIG-4522:
---------------------------------------

If enable multiquery and set “opt.multiquery=true” in conf/pig.properties.
testSplit.pig: 
{code}
A = load './testSplit.txt' as (f1:int, f2:int,f3:int);
split A into x if f1<7, y if f2==5, z if (f3<6 or f3>6);
store x into './testSplit_x.out';
store y into './testSplit_y.out';
store z into './testSplit_z.out';
{code}

run it in mapreduce mode:
./pig –x mapred testSplit.pig
the MRPlan will be:
{code}
There are 4 mapreduce nodes(scope-31, scope-34,scope-36, scope-38).
#--------------------------------------------------
# Map Reduce Plan                                  
#--------------------------------------------------
MapReduce node scope-31
Map Plan
Store(hdfs://zly1.sh.intel.com:8020/tmp/temp207422844/tmp-1807440245:org.apache.pig.impl.io.InterStorage)
 - scope-32
|
|---A: New For Each(false,false,false)[bag] - scope-10
    |   |
    |   Cast[int] - scope-2
    |   |
    |   |---Project[bytearray][0] - scope-1
    |   |
    |   Cast[int] - scope-5
    |   |
    |   |---Project[bytearray][1] - scope-4
    |   |
    |   Cast[int] - scope-8
    |   |
    |   |---Project[bytearray][2] - scope-7
    |
    |---A: 
Load(hdfs://zly1.sh.intel.com:8020/user/root/testSplit.txt:org.apache.pig.builtin.PigStorage)
 - scope-0--------
Global sort: false
----------------

MapReduce node scope-34
Map Plan
x: 
Store(hdfs://zly1.sh.intel.com:8020/user/root/testSplit_x.out:org.apache.pig.builtin.PigStorage)
 - scope-16
|
|---x: Filter[bag] - scope-12
    |   |
    |   Less Than[boolean] - scope-15
    |   |
    |   |---Project[int][0] - scope-13
    |   |
    |   |---Constant(7) - scope-14
    |
    
|---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp207422844/tmp-1807440245:org.apache.pig.impl.io.InterStorage)
 - scope-33--------
Global sort: false
----------------

MapReduce node scope-36
Map Plan
y: 
Store(hdfs://zly1.sh.intel.com:8020/user/root/testSplit_y.out:org.apache.pig.builtin.PigStorage)
 - scope-21
|
|---y: Filter[bag] - scope-17
    |   |
    |   Equal To[boolean] - scope-20
    |   |
    |   |---Project[int][1] - scope-18
    |   |
    |   |---Constant(5) - scope-19
    |
    
|---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp207422844/tmp-1807440245:org.apache.pig.impl.io.InterStorage)
 - scope-35--------
Global sort: false
----------------

MapReduce node scope-38
Map Plan
z: 
Store(hdfs://zly1.sh.intel.com:8020/user/root/testSplit_z.out:org.apache.pig.builtin.PigStorage)
 - scope-30
|
|---z: Filter[bag] - scope-22
    |   |
    |   Or[boolean] - scope-29
    |   |
    |   |---Less Than[boolean] - scope-25
    |   |   |
    |   |   |---Project[int][2] - scope-23
    |   |   |
    |   |   |---Constant(6) - scope-24
    |   |
    |   |---Greater Than[boolean] - scope-28
    |       |
    |       |---Project[int][2] - scope-26
    |       |
    |       |---Constant(6) - scope-27
    |
    
|---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp207422844/tmp-1807440245:org.apache.pig.impl.io.InterStorage)
 - scope-37--------
Global sort: false
----------------
{code}

After executing  
[org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer#visit|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java#L696]
The mrplan will be:   there is only 1 mapreduce node.
{code}
Split - scope-39
|   |
|   x: 
Store(hdfs://zly1.sh.intel.com:8020/user/root/testSplit_x.out:org.apache.pig.builtin.PigStorage)
 - scope-16
|   |
|   |---x: Filter[bag] - scope-12
|       |   |
|       |   Less Than[boolean] - scope-15
|       |   |
|       |   |---Project[int][0] - scope-13
|       |   |
|       |   |---Constant(7) - scope-14
|   |
|   y: 
Store(hdfs://zly1.sh.intel.com:8020/user/root/testSplit_y.out:org.apache.pig.builtin.PigStorage)
 - scope-21
|   |
|   |---y: Filter[bag] - scope-17
|       |   |
|       |   Equal To[boolean] - scope-20
|       |   |
|       |   |---Project[int][1] - scope-18
|       |   |
|       |   |---Constant(5) - scope-19
|   |
|   z: 
Store(hdfs://zly1.sh.intel.com:8020/user/root/testSplit_z.out:org.apache.pig.builtin.PigStorage)
 - scope-30
|   |
|   |---z: Filter[bag] - scope-22
|       |   |
|       |   Or[boolean] - scope-29
|       |   |
|       |   |---Less Than[boolean] - scope-25
|       |   |   |
|       |   |   |---Project[int][2] - scope-23
|       |   |   |
|       |   |   |---Constant(6) - scope-24
|       |   |
|       |   |---Greater Than[boolean] - scope-28
|       |       |
|       |       |---Project[int][2] - scope-26
|       |       |
|       |       |---Constant(6) - scope-27
|
|---A: New For Each(false,false,false)[bag] - scope-10
    |   |
    |   Cast[int] - scope-2
    |   |
    |   |---Project[bytearray][0] - scope-1
    |   |
    |   Cast[int] - scope-5
    |   |
    |   |---Project[bytearray][1] - scope-4
    |   |
    |   Cast[int] - scope-8
   |   |
    |   |---Project[bytearray][2] - scope-7
    |
|---A: 
Load(hdfs://zly1.sh.intel.com:8020/user/root/testSplit.txt:org.apache.pig.builtin.PigStorage)
 - scope-0
{code}

We can also add multiQueryOptimizer in spark. multiQueryOptimizer in spark. In 
my view, If the sparkplan is optimized like what mr does. There is  no need to 
use RDD.cache() to reduce the repetitive LOADs and a new SparkOperator is 
needed when POSplit is encounted.

But I found in some cases, it seems  that a new SparkOperator should not be 
created when POSplit is encounted:
*testAccumulator.join.sh*
{code}
A = load './testAccumulator.txt' as (id:int,f);
B = foreach A generate id, f, id as t;
C = group B by id;
D = foreach C {
E = order B by f desc;
F = E.f;
generate group, myudfs.AccumulativeSumBag(F);
};
G = foreach C {
E = order B by f desc;
F = E.f;
generate group, myudfs.AccumulativeSumBag(F);
};
H = join D by group, G by group; 
store H into 'testAccumulator.join.out';
explain H
{code}

*Physical Plan:*
An implicit POSplit is generated in physical plan.
{code}
#-----------------------------------------------
# Physical Plan:
#-----------------------------------------------
H: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-57
|
|---H: New For Each(true,true)[tuple] - scope-56
    |   |
    |   Project[bag][1] - scope-54
    |   |
    |   Project[bag][2] - scope-55
    |
    |---H: Package(Packager)[tuple]{int} - scope-49
        |
        |---H: Global Rearrange[tuple] - scope-48
            |
            |---H: Local Rearrange[tuple]{int}(false) - scope-50
            |   |   |
            |   |   Project[int][0] - scope-51
            |   |
            |   |---D: New For Each(false,false)[bag] - scope-32
            |       |   |
            |       |   Project[int][0] - scope-22
            |       |   |
            |       |   POUserFunc(myudfs.AccumulativeSumBag)[chararray] - 
scope-25
            |       |   |
            |       |   |---RelationToExpressionProject[bag][*] - scope-24
            |       |       |
            |       |       |---F: New For Each(false)[bag] - scope-31
            |       |           |   |
            |       |           |   Project[bytearray][1] - scope-29
            |       |           |
            |       |           |---E: POSort[bag]() - scope-28
            |       |               |   |
            |       |               |   Project[bytearray][1] - scope-27
            |       |               |
            |       |               |---Project[bag][1] - scope-26
            |       |
            |       |---C: Filter[bag] - scope-20
            |           |   |
            |           |   Constant(true) - scope-21
            |           |
            |           |---C: Split - scope-19   // here an implicit Split is 
generated
            |               |
            |               |---C: Package(Packager)[tuple]{int} - scope-16
            |                   |
            |                   |---C: Global Rearrange[tuple] - scope-15
            |                       |
            |                       |---C: Local Rearrange[tuple]{int}(false) - 
scope-17
            |                           |   |
            |                           |   Project[int][0] - scope-18
            |                           |
            |                           |---B: New For 
Each(false,false,false)[bag] - scope-14
            |                               |   |
            |                               |   Project[int][0] - scope-7
            |                               |   |
            |                               |   Project[bytearray][1] - scope-9
            |                               |   |
            |                               |   
POUserFunc(org.apache.pig.impl.builtin.IdentityColumn)[int] - scope-12
            |                               |   |
            |                               |   |---Project[int][0] - scope-11
            |                               |
            |                               |---A: New For 
Each(false,false)[bag] - scope-6
            |                                   |   |
            |                                   |   Cast[int] - scope-2
            |                                   |   |
            |                                   |   |---Project[bytearray][0] - 
scope-1
            |                                   |   |
            |                                   |   Project[bytearray][1] - 
scope-4
            |                                   |
            |                                   |---A: 
Load(hdfs://zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage)
 - scope-0
            |
            |---H: Local Rearrange[tuple]{int}(false) - scope-52
                |   |
                |   Project[int][0] - scope-53
                |
                |---G: New For Each(false,false)[bag] - scope-45
                    |   |
                    |   Project[int][0] - scope-35
                    |   |
                    |   POUserFunc(myudfs.AccumulativeSumBag)[chararray] - 
scope-38
                    |   |
                    |   |---RelationToExpressionProject[bag][*] - scope-37
                    |       |
                    |       |---F: New For Each(false)[bag] - scope-44
                    |           |   |
                    |           |   Project[bytearray][1] - scope-42
                    |           |
                    |           |---E: POSort[bag]() - scope-41
                    |               |   |
                    |               |   Project[bytearray][1] - scope-40
                    |               |
                    |               |---Project[bag][1] - scope-39
                    |
                    |---C: Filter[bag] - scope-33
                        |   |
                        |   Constant(true) - scope-34
                        |
                        |---C: Split - scope-19
                            |
                            |---C: Package(Packager)[tuple]{int} - scope-16
                                |
                                |---C: Global Rearrange[tuple] - scope-15
                                    |
                                    |---C: Local Rearrange[tuple]{int}(false) - 
scope-17
                                        |   |
                                        |   Project[int][0] - scope-18
                                        |
                                        |---B: New For 
Each(false,false,false)[bag] - scope-14
                                            |   |
                                            |   Project[int][0] - scope-7
                                            |   |
                                            |   Project[bytearray][1] - scope-9
                                            |   |
                                            |   
POUserFunc(org.apache.pig.impl.builtin.IdentityColumn)[int] - scope-12
                                            |   |
                                            |   |---Project[int][0] - scope-11
                                            |
                                            |---A: New For 
Each(false,false)[bag] - scope-6
                                                |   |
                                                |   Cast[int] - scope-2
                                                |   |
                                                |   |---Project[bytearray][0] - 
scope-1
                                                |   |
                                                |   Project[bytearray][1] - 
scope-4
                                                |
                                                |---A: 
Load(hdfs://zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage)
 - scope-0

{code}

*Spark Plan*
   POSort(scope-28) should be deleted in 
[SecondaryKeyOptimizerUtil.java#applySecondaryKeySort|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java#L329].
 but it is not deleted in this situation because SecondaryKeyOptimizer will 
only be enabled when group+foreach case is found in one SparkOperator while  
POSplit(scope-19 in physical plan ) makes group and foreach in different 
operators.  if you run testAccumulator.join.sh in mr mode, you will find POSort 
is not deleted even secondary key optimization is enabled for the same reason.

{code}
Spark node scope-58
Store(hdfs://zly2.sh.intel.com:8020/tmp/temp-1952465309/tmp-1075850732:org.apache.pig.impl.io.InterStorage)
 - scope-59
|
|---C: Package(Packager)[tuple]{int} - scope-16
    |
    |---Global Rearrange[tuple] - scope-15
        |
        |---C: Local Rearrange[tuple]{int}(false) - scope-17
            |   |
            |   Project[int][0] - scope-18
            |
            |---B: New For Each(false,false,false)[bag] - scope-14
                |   |
                |   Project[int][0] - scope-7
                |   |
                |   Project[bytearray][1] - scope-9
                |   |
                |   POUserFunc(org.apache.pig.impl.builtin.IdentityColumn)[int] 
- scope-12
                |   |
                |   |---Project[int][0] - scope-11
                |
                |---A: New For Each(false,false)[bag] - scope-6
                    |   |
                    |   Cast[int] - scope-2
                    |   |
                    |   |---Project[bytearray][0] - scope-1
                    |   |
                    |   Project[bytearray][1] - scope-4
                    |
                    |---A: 
Load(hdfs://zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage)
 - scope-0--------
 
Spark node scope-64
H: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-57
|
|---H: New For Each(true,true)[tuple] - scope-56
    |   |
    |   Project[bag][1] - scope-54
    |   |
    |   Project[bag][2] - scope-55
    |
    |---H: Package(Packager)[tuple]{int} - scope-49
        |
        |---Global Rearrange[tuple] - scope-48
            |
            |---H: Local Rearrange[tuple]{int}(false) - scope-50
            |   |   |
            |   |   Project[int][0] - scope-51
            |   |
            |   |---D: New For Each(false,false)[bag] - scope-32
            |       |   |
            |       |   Project[int][0] - scope-22
            |       |   |
            |       |   POUserFunc(myudfs.AccumulativeSumBag)[chararray] - 
scope-25
            |       |   |
            |       |   |---RelationToExpressionProject[bag][*] - scope-24
            |       |       |
            |       |       |---F: New For Each(false)[bag] - scope-31
            |       |           |   |
            |       |           |   Project[bytearray][1] - scope-29
            |       |           |
            |       |           |---E: POSort[bag]() - scope-28    
            |       |               |   |                                       
    
            |       |               |   Project[bytearray][1] - scope-27
            |       |               |
            |       |               |---Project[bag][1] - scope-26
            |       |
            |       |---C: Filter[bag] - scope-20
            |           |   |
            |           |   Constant(true) - scope-21
            |           |
            |           
|---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp-1952465309/tmp-1075850732:org.apache.pig.impl.io.InterStorage)
 - scope-60
            |
            |---H: Local Rearrange[tuple]{int}(false) - scope-52
                |   |
                |   Project[int][0] - scope-53
                |
                |---G: New For Each(false,false)[bag] - scope-45
                    |   |
                    |   Project[int][0] - scope-35
                    |   |
                    |   POUserFunc(myudfs.AccumulativeSumBag)[chararray] - 
scope-38
                    |   |
                    |   |---RelationToExpressionProject[bag][*] - scope-37
                    |       |
                    |       |---F: New For Each(false)[bag] - scope-44
                    |           |   |
                    |           |   Project[bytearray][1] - scope-42
                    |           |
                    |           |---E: POSort[bag]() - scope-41
                    |               |   |
                    |               |   Project[bytearray][1] - scope-40
                    |               |
                    |               |---Project[bag][1] - scope-39
                    |
                    |---C: Filter[bag] - scope-33
                        |   |
                        |   Constant(true) - scope-34
                        |
                        
|---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp-1952465309/tmp-1075850732:org.apache.pig.impl.io.InterStorage)
 - scope-62--------
{code}

> Remove unnecessary store and load when POSplit is encounted
> -----------------------------------------------------------
>
>                 Key: PIG-4522
>                 URL: https://issues.apache.org/jira/browse/PIG-4522
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>             Fix For: spark-branch
>
>         Attachments: PIG-4522.patch
>
>
> pig script:
> {code}
> A = load './testSplit.txt' as (f1:int, f2:int,f3:int);
> split A into x if f1<7, y if f2==5, z if (f3<6 or f3>6);
> store x into './testSplit_x.out';
> store y into './testSplit_y.out';
> store z into './testSplit_z.out';
> explain x; 
> explain y;
> explain z;
> {code}
> spark plan:
> {code}
> #The Spark node relations are:
> #-----------------------------------------------------#
> scope-17->scope-20 
> scope-20
> #--------------------------------------------------
> # Spark Plan                                  
> #--------------------------------------------------
> Spark node scope-17
> Store(hdfs://zly1.sh.intel.com:8020/tmp/temp-1920285848/tmp-1477385839:org.apache.pig.impl.io.InterStorage)
>  - scope-18
> |
> |---A: New For Each(false,false,false)[bag] - scope-10
>     |   |
>     |   Cast[int] - scope-2
>     |   |
>     |   |---Project[bytearray][0] - scope-1
>     |   |
>     |   Cast[int] - scope-5
>     |   |
>     |   |---Project[bytearray][1] - scope-4
>     |   |
>     |   Cast[int] - scope-8
>     |   |
>     |   |---Project[bytearray][2] - scope-7
>     |
>     |---A: 
> Load(hdfs://zly1.sh.intel.com:8020/user/root/testSplit.txt:org.apache.pig.builtin.PigStorage)
>  - scope-0--------
> Spark node scope-20
> x: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-16
> |
> |---x: Filter[bag] - scope-12
>     |   |
>     |   Less Than[boolean] - scope-15
>     |   |
>     |   |---Project[int][0] - scope-13
>     |   |
>     |   |---Constant(7) - scope-14
>     |
>     
> |---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp-1920285848/tmp-1477385839:org.apache.pig.impl.io.InterStorage)
>  - scope-19--------
> #-----------------------------------------------------#
> #The Spark node relations are:
> #-----------------------------------------------------#
> scope-38->scope-41 
> scope-41
> #--------------------------------------------------
> # Spark Plan                                  
> #--------------------------------------------------
> Spark node scope-38
> Store(hdfs://zly1.sh.intel.com:8020/tmp/temp-1920285848/tmp-918933337:org.apache.pig.impl.io.InterStorage)
>  - scope-39
> |
> |---A: New For Each(false,false,false)[bag] - scope-31
>     |   |
>     |   Cast[int] - scope-23
>     |   |
>     |   |---Project[bytearray][0] - scope-22
>     |   |
>     |   Cast[int] - scope-26
>     |   |
>     |   |---Project[bytearray][1] - scope-25
>     |   |
>     |   Cast[int] - scope-29
>     |   |
>     |   |---Project[bytearray][2] - scope-28
>     |
>     |---A: 
> Load(hdfs://zly1.sh.intel.com:8020/user/root/testSplit.txt:org.apache.pig.builtin.PigStorage)
>  - scope-21--------
> Spark node scope-41
> y: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-37
> |
> |---y: Filter[bag] - scope-33
>     |   |
>     |   Equal To[boolean] - scope-36
>     |   |
>     |   |---Project[int][1] - scope-34
>     |   |
>     |   |---Constant(5) - scope-35
>     |
>     
> |---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp-1920285848/tmp-918933337:org.apache.pig.impl.io.InterStorage)
>  - scope-40--------
> #-----------------------------------------------------#
> #The Spark node relations are:
> #-----------------------------------------------------#
> scope-63->scope-66 
> scope-66
> #--------------------------------------------------
> # Spark Plan                                  
> #--------------------------------------------------
> Spark node scope-63
> Store(hdfs://zly1.sh.intel.com:8020/tmp/temp-1920285848/tmp1444529161:org.apache.pig.impl.io.InterStorage)
>  - scope-64
> |
> |---A: New For Each(false,false,false)[bag] - scope-52
>     |   |
>     |   Cast[int] - scope-44
>     |   |
>     |   |---Project[bytearray][0] - scope-43
>     |   |
>     |   Cast[int] - scope-47
>     |   |
>     |   |---Project[bytearray][1] - scope-46
>     |   |
>     |   Cast[int] - scope-50
>     |   |
>     |   |---Project[bytearray][2] - scope-49
>     |
>     |---A: 
> Load(hdfs://zly1.sh.intel.com:8020/user/root/testSplit.txt:org.apache.pig.builtin.PigStorage)
>  - scope-42--------
> Spark node scope-66
> z: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-62
> |
> |---z: Filter[bag] - scope-54
>     |   |
>     |   Or[boolean] - scope-61
>     |   |
>     |   |---Less Than[boolean] - scope-57
>     |   |   |
>     |   |   |---Project[int][2] - scope-55
>     |   |   |
>     |   |   |---Constant(6) - scope-56
>     |   |
>     |   |---Greater Than[boolean] - scope-60
>     |       |
>     |       |---Project[int][2] - scope-58
>     |       |
>     |       |---Constant(6) - scope-59
>     |
>     
> |---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp-1920285848/tmp1444529161:org.apache.pig.impl.io.InterStorage)
>  - scope-65--------
> {code}
> Scope-18(Store) and Scope-19(Load)  is not necessary. It should be removed.  
> Scope-39(Store) and Scope-40(Load)  is not necessary. It should be removed.  
> Scope-64(Store) and Scope-65(Load) is not necessary. It should be removed.  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to