[
https://issues.apache.org/jira/browse/PIG-4522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14528019#comment-14528019
]
liyunzhang_intel commented on PIG-4522:
---------------------------------------
If enable multiquery and set “opt.multiquery=true” in conf/pig.properties.
testSplit.pig:
{code}
A = load './testSplit.txt' as (f1:int, f2:int,f3:int);
split A into x if f1<7, y if f2==5, z if (f3<6 or f3>6);
store x into './testSplit_x.out';
store y into './testSplit_y.out';
store z into './testSplit_z.out';
{code}
run it in mapreduce mode:
./pig –x mapred testSplit.pig
the MRPlan will be:
{code}
There are 4 mapreduce nodes(scope-31, scope-34,scope-36, scope-38).
#--------------------------------------------------
# Map Reduce Plan
#--------------------------------------------------
MapReduce node scope-31
Map Plan
Store(hdfs://zly1.sh.intel.com:8020/tmp/temp207422844/tmp-1807440245:org.apache.pig.impl.io.InterStorage)
- scope-32
|
|---A: New For Each(false,false,false)[bag] - scope-10
| |
| Cast[int] - scope-2
| |
| |---Project[bytearray][0] - scope-1
| |
| Cast[int] - scope-5
| |
| |---Project[bytearray][1] - scope-4
| |
| Cast[int] - scope-8
| |
| |---Project[bytearray][2] - scope-7
|
|---A:
Load(hdfs://zly1.sh.intel.com:8020/user/root/testSplit.txt:org.apache.pig.builtin.PigStorage)
- scope-0--------
Global sort: false
----------------
MapReduce node scope-34
Map Plan
x:
Store(hdfs://zly1.sh.intel.com:8020/user/root/testSplit_x.out:org.apache.pig.builtin.PigStorage)
- scope-16
|
|---x: Filter[bag] - scope-12
| |
| Less Than[boolean] - scope-15
| |
| |---Project[int][0] - scope-13
| |
| |---Constant(7) - scope-14
|
|---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp207422844/tmp-1807440245:org.apache.pig.impl.io.InterStorage)
- scope-33--------
Global sort: false
----------------
MapReduce node scope-36
Map Plan
y:
Store(hdfs://zly1.sh.intel.com:8020/user/root/testSplit_y.out:org.apache.pig.builtin.PigStorage)
- scope-21
|
|---y: Filter[bag] - scope-17
| |
| Equal To[boolean] - scope-20
| |
| |---Project[int][1] - scope-18
| |
| |---Constant(5) - scope-19
|
|---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp207422844/tmp-1807440245:org.apache.pig.impl.io.InterStorage)
- scope-35--------
Global sort: false
----------------
MapReduce node scope-38
Map Plan
z:
Store(hdfs://zly1.sh.intel.com:8020/user/root/testSplit_z.out:org.apache.pig.builtin.PigStorage)
- scope-30
|
|---z: Filter[bag] - scope-22
| |
| Or[boolean] - scope-29
| |
| |---Less Than[boolean] - scope-25
| | |
| | |---Project[int][2] - scope-23
| | |
| | |---Constant(6) - scope-24
| |
| |---Greater Than[boolean] - scope-28
| |
| |---Project[int][2] - scope-26
| |
| |---Constant(6) - scope-27
|
|---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp207422844/tmp-1807440245:org.apache.pig.impl.io.InterStorage)
- scope-37--------
Global sort: false
----------------
{code}
After executing
[org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer#visit|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java#L696]
The mrplan will be: there is only 1 mapreduce node.
{code}
Split - scope-39
| |
| x:
Store(hdfs://zly1.sh.intel.com:8020/user/root/testSplit_x.out:org.apache.pig.builtin.PigStorage)
- scope-16
| |
| |---x: Filter[bag] - scope-12
| | |
| | Less Than[boolean] - scope-15
| | |
| | |---Project[int][0] - scope-13
| | |
| | |---Constant(7) - scope-14
| |
| y:
Store(hdfs://zly1.sh.intel.com:8020/user/root/testSplit_y.out:org.apache.pig.builtin.PigStorage)
- scope-21
| |
| |---y: Filter[bag] - scope-17
| | |
| | Equal To[boolean] - scope-20
| | |
| | |---Project[int][1] - scope-18
| | |
| | |---Constant(5) - scope-19
| |
| z:
Store(hdfs://zly1.sh.intel.com:8020/user/root/testSplit_z.out:org.apache.pig.builtin.PigStorage)
- scope-30
| |
| |---z: Filter[bag] - scope-22
| | |
| | Or[boolean] - scope-29
| | |
| | |---Less Than[boolean] - scope-25
| | | |
| | | |---Project[int][2] - scope-23
| | | |
| | | |---Constant(6) - scope-24
| | |
| | |---Greater Than[boolean] - scope-28
| | |
| | |---Project[int][2] - scope-26
| | |
| | |---Constant(6) - scope-27
|
|---A: New For Each(false,false,false)[bag] - scope-10
| |
| Cast[int] - scope-2
| |
| |---Project[bytearray][0] - scope-1
| |
| Cast[int] - scope-5
| |
| |---Project[bytearray][1] - scope-4
| |
| Cast[int] - scope-8
| |
| |---Project[bytearray][2] - scope-7
|
|---A:
Load(hdfs://zly1.sh.intel.com:8020/user/root/testSplit.txt:org.apache.pig.builtin.PigStorage)
- scope-0
{code}
We can also add multiQueryOptimizer in spark. multiQueryOptimizer in spark. In
my view, If the sparkplan is optimized like what mr does. There is no need to
use RDD.cache() to reduce the repetitive LOADs and a new SparkOperator is
needed when POSplit is encounted.
But I found in some cases, it seems that a new SparkOperator should not be
created when POSplit is encounted:
*testAccumulator.join.sh*
{code}
A = load './testAccumulator.txt' as (id:int,f);
B = foreach A generate id, f, id as t;
C = group B by id;
D = foreach C {
E = order B by f desc;
F = E.f;
generate group, myudfs.AccumulativeSumBag(F);
};
G = foreach C {
E = order B by f desc;
F = E.f;
generate group, myudfs.AccumulativeSumBag(F);
};
H = join D by group, G by group;
store H into 'testAccumulator.join.out';
explain H
{code}
*Physical Plan:*
An implicit POSplit is generated in physical plan.
{code}
#-----------------------------------------------
# Physical Plan:
#-----------------------------------------------
H: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-57
|
|---H: New For Each(true,true)[tuple] - scope-56
| |
| Project[bag][1] - scope-54
| |
| Project[bag][2] - scope-55
|
|---H: Package(Packager)[tuple]{int} - scope-49
|
|---H: Global Rearrange[tuple] - scope-48
|
|---H: Local Rearrange[tuple]{int}(false) - scope-50
| | |
| | Project[int][0] - scope-51
| |
| |---D: New For Each(false,false)[bag] - scope-32
| | |
| | Project[int][0] - scope-22
| | |
| | POUserFunc(myudfs.AccumulativeSumBag)[chararray] -
scope-25
| | |
| | |---RelationToExpressionProject[bag][*] - scope-24
| | |
| | |---F: New For Each(false)[bag] - scope-31
| | | |
| | | Project[bytearray][1] - scope-29
| | |
| | |---E: POSort[bag]() - scope-28
| | | |
| | | Project[bytearray][1] - scope-27
| | |
| | |---Project[bag][1] - scope-26
| |
| |---C: Filter[bag] - scope-20
| | |
| | Constant(true) - scope-21
| |
| |---C: Split - scope-19 // here an implicit Split is
generated
| |
| |---C: Package(Packager)[tuple]{int} - scope-16
| |
| |---C: Global Rearrange[tuple] - scope-15
| |
| |---C: Local Rearrange[tuple]{int}(false) -
scope-17
| | |
| | Project[int][0] - scope-18
| |
| |---B: New For
Each(false,false,false)[bag] - scope-14
| | |
| | Project[int][0] - scope-7
| | |
| | Project[bytearray][1] - scope-9
| | |
| |
POUserFunc(org.apache.pig.impl.builtin.IdentityColumn)[int] - scope-12
| | |
| | |---Project[int][0] - scope-11
| |
| |---A: New For
Each(false,false)[bag] - scope-6
| | |
| | Cast[int] - scope-2
| | |
| | |---Project[bytearray][0] -
scope-1
| | |
| | Project[bytearray][1] -
scope-4
| |
| |---A:
Load(hdfs://zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage)
- scope-0
|
|---H: Local Rearrange[tuple]{int}(false) - scope-52
| |
| Project[int][0] - scope-53
|
|---G: New For Each(false,false)[bag] - scope-45
| |
| Project[int][0] - scope-35
| |
| POUserFunc(myudfs.AccumulativeSumBag)[chararray] -
scope-38
| |
| |---RelationToExpressionProject[bag][*] - scope-37
| |
| |---F: New For Each(false)[bag] - scope-44
| | |
| | Project[bytearray][1] - scope-42
| |
| |---E: POSort[bag]() - scope-41
| | |
| | Project[bytearray][1] - scope-40
| |
| |---Project[bag][1] - scope-39
|
|---C: Filter[bag] - scope-33
| |
| Constant(true) - scope-34
|
|---C: Split - scope-19
|
|---C: Package(Packager)[tuple]{int} - scope-16
|
|---C: Global Rearrange[tuple] - scope-15
|
|---C: Local Rearrange[tuple]{int}(false) -
scope-17
| |
| Project[int][0] - scope-18
|
|---B: New For
Each(false,false,false)[bag] - scope-14
| |
| Project[int][0] - scope-7
| |
| Project[bytearray][1] - scope-9
| |
|
POUserFunc(org.apache.pig.impl.builtin.IdentityColumn)[int] - scope-12
| |
| |---Project[int][0] - scope-11
|
|---A: New For
Each(false,false)[bag] - scope-6
| |
| Cast[int] - scope-2
| |
| |---Project[bytearray][0] -
scope-1
| |
| Project[bytearray][1] -
scope-4
|
|---A:
Load(hdfs://zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage)
- scope-0
{code}
*Spark Plan*
POSort(scope-28) should be deleted in
[SecondaryKeyOptimizerUtil.java#applySecondaryKeySort|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java#L329].
but it is not deleted in this situation because SecondaryKeyOptimizer will
only be enabled when group+foreach case is found in one SparkOperator while
POSplit(scope-19 in physical plan ) makes group and foreach in different
operators. if you run testAccumulator.join.sh in mr mode, you will find POSort
is not deleted even secondary key optimization is enabled for the same reason.
{code}
Spark node scope-58
Store(hdfs://zly2.sh.intel.com:8020/tmp/temp-1952465309/tmp-1075850732:org.apache.pig.impl.io.InterStorage)
- scope-59
|
|---C: Package(Packager)[tuple]{int} - scope-16
|
|---Global Rearrange[tuple] - scope-15
|
|---C: Local Rearrange[tuple]{int}(false) - scope-17
| |
| Project[int][0] - scope-18
|
|---B: New For Each(false,false,false)[bag] - scope-14
| |
| Project[int][0] - scope-7
| |
| Project[bytearray][1] - scope-9
| |
| POUserFunc(org.apache.pig.impl.builtin.IdentityColumn)[int]
- scope-12
| |
| |---Project[int][0] - scope-11
|
|---A: New For Each(false,false)[bag] - scope-6
| |
| Cast[int] - scope-2
| |
| |---Project[bytearray][0] - scope-1
| |
| Project[bytearray][1] - scope-4
|
|---A:
Load(hdfs://zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage)
- scope-0--------
Spark node scope-64
H: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-57
|
|---H: New For Each(true,true)[tuple] - scope-56
| |
| Project[bag][1] - scope-54
| |
| Project[bag][2] - scope-55
|
|---H: Package(Packager)[tuple]{int} - scope-49
|
|---Global Rearrange[tuple] - scope-48
|
|---H: Local Rearrange[tuple]{int}(false) - scope-50
| | |
| | Project[int][0] - scope-51
| |
| |---D: New For Each(false,false)[bag] - scope-32
| | |
| | Project[int][0] - scope-22
| | |
| | POUserFunc(myudfs.AccumulativeSumBag)[chararray] -
scope-25
| | |
| | |---RelationToExpressionProject[bag][*] - scope-24
| | |
| | |---F: New For Each(false)[bag] - scope-31
| | | |
| | | Project[bytearray][1] - scope-29
| | |
| | |---E: POSort[bag]() - scope-28
| | | |
| | | Project[bytearray][1] - scope-27
| | |
| | |---Project[bag][1] - scope-26
| |
| |---C: Filter[bag] - scope-20
| | |
| | Constant(true) - scope-21
| |
|
|---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp-1952465309/tmp-1075850732:org.apache.pig.impl.io.InterStorage)
- scope-60
|
|---H: Local Rearrange[tuple]{int}(false) - scope-52
| |
| Project[int][0] - scope-53
|
|---G: New For Each(false,false)[bag] - scope-45
| |
| Project[int][0] - scope-35
| |
| POUserFunc(myudfs.AccumulativeSumBag)[chararray] -
scope-38
| |
| |---RelationToExpressionProject[bag][*] - scope-37
| |
| |---F: New For Each(false)[bag] - scope-44
| | |
| | Project[bytearray][1] - scope-42
| |
| |---E: POSort[bag]() - scope-41
| | |
| | Project[bytearray][1] - scope-40
| |
| |---Project[bag][1] - scope-39
|
|---C: Filter[bag] - scope-33
| |
| Constant(true) - scope-34
|
|---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp-1952465309/tmp-1075850732:org.apache.pig.impl.io.InterStorage)
- scope-62--------
{code}
> Remove unnecessary store and load when POSplit is encounted
> -----------------------------------------------------------
>
> Key: PIG-4522
> URL: https://issues.apache.org/jira/browse/PIG-4522
> Project: Pig
> Issue Type: Sub-task
> Components: spark
> Reporter: liyunzhang_intel
> Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-4522.patch
>
>
> pig script:
> {code}
> A = load './testSplit.txt' as (f1:int, f2:int,f3:int);
> split A into x if f1<7, y if f2==5, z if (f3<6 or f3>6);
> store x into './testSplit_x.out';
> store y into './testSplit_y.out';
> store z into './testSplit_z.out';
> explain x;
> explain y;
> explain z;
> {code}
> spark plan:
> {code}
> #The Spark node relations are:
> #-----------------------------------------------------#
> scope-17->scope-20
> scope-20
> #--------------------------------------------------
> # Spark Plan
> #--------------------------------------------------
> Spark node scope-17
> Store(hdfs://zly1.sh.intel.com:8020/tmp/temp-1920285848/tmp-1477385839:org.apache.pig.impl.io.InterStorage)
> - scope-18
> |
> |---A: New For Each(false,false,false)[bag] - scope-10
> | |
> | Cast[int] - scope-2
> | |
> | |---Project[bytearray][0] - scope-1
> | |
> | Cast[int] - scope-5
> | |
> | |---Project[bytearray][1] - scope-4
> | |
> | Cast[int] - scope-8
> | |
> | |---Project[bytearray][2] - scope-7
> |
> |---A:
> Load(hdfs://zly1.sh.intel.com:8020/user/root/testSplit.txt:org.apache.pig.builtin.PigStorage)
> - scope-0--------
> Spark node scope-20
> x: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-16
> |
> |---x: Filter[bag] - scope-12
> | |
> | Less Than[boolean] - scope-15
> | |
> | |---Project[int][0] - scope-13
> | |
> | |---Constant(7) - scope-14
> |
>
> |---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp-1920285848/tmp-1477385839:org.apache.pig.impl.io.InterStorage)
> - scope-19--------
> #-----------------------------------------------------#
> #The Spark node relations are:
> #-----------------------------------------------------#
> scope-38->scope-41
> scope-41
> #--------------------------------------------------
> # Spark Plan
> #--------------------------------------------------
> Spark node scope-38
> Store(hdfs://zly1.sh.intel.com:8020/tmp/temp-1920285848/tmp-918933337:org.apache.pig.impl.io.InterStorage)
> - scope-39
> |
> |---A: New For Each(false,false,false)[bag] - scope-31
> | |
> | Cast[int] - scope-23
> | |
> | |---Project[bytearray][0] - scope-22
> | |
> | Cast[int] - scope-26
> | |
> | |---Project[bytearray][1] - scope-25
> | |
> | Cast[int] - scope-29
> | |
> | |---Project[bytearray][2] - scope-28
> |
> |---A:
> Load(hdfs://zly1.sh.intel.com:8020/user/root/testSplit.txt:org.apache.pig.builtin.PigStorage)
> - scope-21--------
> Spark node scope-41
> y: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-37
> |
> |---y: Filter[bag] - scope-33
> | |
> | Equal To[boolean] - scope-36
> | |
> | |---Project[int][1] - scope-34
> | |
> | |---Constant(5) - scope-35
> |
>
> |---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp-1920285848/tmp-918933337:org.apache.pig.impl.io.InterStorage)
> - scope-40--------
> #-----------------------------------------------------#
> #The Spark node relations are:
> #-----------------------------------------------------#
> scope-63->scope-66
> scope-66
> #--------------------------------------------------
> # Spark Plan
> #--------------------------------------------------
> Spark node scope-63
> Store(hdfs://zly1.sh.intel.com:8020/tmp/temp-1920285848/tmp1444529161:org.apache.pig.impl.io.InterStorage)
> - scope-64
> |
> |---A: New For Each(false,false,false)[bag] - scope-52
> | |
> | Cast[int] - scope-44
> | |
> | |---Project[bytearray][0] - scope-43
> | |
> | Cast[int] - scope-47
> | |
> | |---Project[bytearray][1] - scope-46
> | |
> | Cast[int] - scope-50
> | |
> | |---Project[bytearray][2] - scope-49
> |
> |---A:
> Load(hdfs://zly1.sh.intel.com:8020/user/root/testSplit.txt:org.apache.pig.builtin.PigStorage)
> - scope-42--------
> Spark node scope-66
> z: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-62
> |
> |---z: Filter[bag] - scope-54
> | |
> | Or[boolean] - scope-61
> | |
> | |---Less Than[boolean] - scope-57
> | | |
> | | |---Project[int][2] - scope-55
> | | |
> | | |---Constant(6) - scope-56
> | |
> | |---Greater Than[boolean] - scope-60
> | |
> | |---Project[int][2] - scope-58
> | |
> | |---Constant(6) - scope-59
> |
>
> |---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp-1920285848/tmp1444529161:org.apache.pig.impl.io.InterStorage)
> - scope-65--------
> {code}
> Scope-18(Store) and Scope-19(Load) is not necessary. It should be removed.
> Scope-39(Store) and Scope-40(Load) is not necessary. It should be removed.
> Scope-64(Store) and Scope-65(Load) is not necessary. It should be removed.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)