liyunzhang_intel created PIG-4839:
-------------------------------------
Summary: MultiQueryOptimizerSpark doesn't remove all redudant
nodes in spark plan
Key: PIG-4839
URL: https://issues.apache.org/jira/browse/PIG-4839
Project: Pig
Issue Type: Sub-task
Reporter: liyunzhang_intel
Assignee: liyunzhang_intel
TestMultiQueryBasic#testMultiQueryWithFJ_2
{code}
a = load './passwd' using PigStorage(':') as (uname:chararray,passwd:chararray,
uid:int, gid:int);
b = load './passwd' using PigStorage(':') as (uname:chararray,passwd:chararray,
uid:int, gid:int);
c = filter a by uid > 5;
store c into './multiQueryFJ.output';
d = filter b by gid > 10;
store d into './multiQueryFJ.output.2';
e = join c by gid, d by gid using 'repl';
store e into './multiQueryFJ.output.3';
{code}
The spark plan:
{code}
before multiquery optimization:
scope-57->scope-60 scope-66
scope-60
scope-61->scope-64 scope-68
scope-64
scope-66
scope-68->scope-66
#--------------------------------------------------
# Spark Plan
#--------------------------------------------------
Spark node scope-61
Store(hdfs://zly1.sh.intel.com:8020/tmp/temp-1338833908/tmp-1814908586:org.apache.pig.impl.io.InterStorage)
- scope-62
|
|---d: Filter[bag] - scope-36
| |
| Greater Than[boolean] - scope-39
| |
| |---Project[int][3] - scope-37
| |
| |---Constant(10) - scope-38
|
|---b: New For Each(false,false,false,false)[bag] - scope-35
| |
| Cast[chararray] - scope-24
| |
| |---Project[bytearray][0] - scope-23
| |
| Cast[chararray] - scope-27
| |
| |---Project[bytearray][1] - scope-26
| |
| Cast[int] - scope-30
| |
| |---Project[bytearray][2] - scope-29
| |
| Cast[int] - scope-33
| |
| |---Project[bytearray][3] - scope-32
|
|---b:
Load(hdfs://zly1.sh.intel.com:8020/user/root/passwd:PigStorage(':')) -
scope-22--------
Spark node scope-64
d:
Store(hdfs://zly1.sh.intel.com:8020/user/root/multiQueryFJ.output.2:org.apache.pig.builtin.PigStorage)
- scope-43
|
|---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp-1338833908/tmp-1814908586:org.apache.pig.impl.io.InterStorage)
- scope-63--------
Spark node scope-68
Store(hdfs://zly1.sh.intel.com:8020/tmp/temp-1338833908/tmp-1233897062:org.apache.pig.impl.io.InterStorage)
- scope-69
|
|---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp-1338833908/tmp-1814908586:org.apache.pig.impl.io.InterStorage)
- scope-67--------
Spark node scope-66
e:
Store(hdfs://zly1.sh.intel.com:8020/user/root/multiQueryFJ.output.3:org.apache.pig.builtin.PigStorage)
- scope-56
|
|---e: FRJoin[tuple] - scope-50
| |
| Project[int][3] - scope-48
| |
| Project[int][3] - scope-49
|
|---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp-1338833908/tmp929915440:org.apache.pig.impl.io.InterStorage)
- scope-65--------
Spark node scope-57
Store(hdfs://zly1.sh.intel.com:8020/tmp/temp-1338833908/tmp929915440:org.apache.pig.impl.io.InterStorage)
- scope-58
|
|---c: Filter[bag] - scope-14
| |
| Greater Than[boolean] - scope-17
| |
| |---Project[int][2] - scope-15
| |
| |---Constant(5) - scope-16
|
|---a: New For Each(false,false,false,false)[bag] - scope-13
| |
| Cast[chararray] - scope-2
| |
| |---Project[bytearray][0] - scope-1
| |
| Cast[chararray] - scope-5
| |
| |---Project[bytearray][1] - scope-4
| |
| Cast[int] - scope-8
| |
| |---Project[bytearray][2] - scope-7
| |
| Cast[int] - scope-11
| |
| |---Project[bytearray][3] - scope-10
|
|---a:
Load(hdfs://zly1.sh.intel.com:8020/user/root/passwd:PigStorage(':')) -
scope-0--------
Spark node scope-60
c:
Store(hdfs://zly1.sh.intel.com:8020/user/root/multiQueryFJ.output:org.apache.pig.builtin.PigStorage)
- scope-21
|
|---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp-1338833908/tmp929915440:org.apache.pig.impl.io.InterStorage)
- scope-59--------
{code}
After spark multiquery optimization, 6 spark nodes will be reduced to 4.
scope-60 should be combined with scope-57 but not.
{code}
scope-57->scope-60 scope-66
scope-60
scope-61->scope-66
scope-66
#--------------------------------------------------
# Spark Plan
#--------------------------------------------------
Spark node scope-61
Split - scope-70
| |
| d:
Store(hdfs://zly1.sh.intel.com:8020/user/root/multiQueryFJ.output.2:org.apache.pig.builtin.PigStorage)
- scope-43
| |
|
Store(hdfs://zly1.sh.intel.com:8020/tmp/temp-1338833908/tmp-1233897062:org.apache.pig.impl.io.InterStorage)
- scope-69
|
|---d: Filter[bag] - scope-36
| |
| Greater Than[boolean] - scope-39
| |
| |---Project[int][3] - scope-37
| |
| |---Constant(10) - scope-38
|
|---b: New For Each(false,false,false,false)[bag] - scope-35
| |
| Cast[chararray] - scope-24
| |
| |---Project[bytearray][0] - scope-23
| |
| Cast[chararray] - scope-27
| |
| |---Project[bytearray][1] - scope-26
| |
| Cast[int] - scope-30
| |
| |---Project[bytearray][2] - scope-29
| |
| Cast[int] - scope-33
| |
| |---Project[bytearray][3] - scope-32
|
|---b:
Load(hdfs://zly1.sh.intel.com:8020/user/root/passwd:PigStorage(':')) -
scope-22--------
Spark node scope-66
e:
Store(hdfs://zly1.sh.intel.com:8020/user/root/multiQueryFJ.output.3:org.apache.pig.builtin.PigStorage)
- scope-56
|
|---e: FRJoin[tuple] - scope-50
| |
| Project[int][3] - scope-48
| |
| Project[int][3] - scope-49
|
|---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp-1338833908/tmp929915440:org.apache.pig.impl.io.InterStorage)
- scope-65--------
Spark node scope-57
Store(hdfs://zly1.sh.intel.com:8020/tmp/temp-1338833908/tmp929915440:org.apache.pig.impl.io.InterStorage)
- scope-58
|
|---c: Filter[bag] - scope-14
| |
| Greater Than[boolean] - scope-17
| |
| |---Project[int][2] - scope-15
| |
| |---Constant(5) - scope-16
|
|---a: New For Each(false,false,false,false)[bag] - scope-13
| |
| Cast[chararray] - scope-2
| |
| |---Project[bytearray][0] - scope-1
| |
| Cast[chararray] - scope-5
| |
| |---Project[bytearray][1] - scope-4
| |
| Cast[int] - scope-8
| |
| |---Project[bytearray][2] - scope-7
| |
| Cast[int] - scope-11
| |
| |---Project[bytearray][3] - scope-10
|
|---a:
Load(hdfs://zly1.sh.intel.com:8020/user/root/passwd:PigStorage(':')) -
scope-0--------
Spark node scope-60
c:
Store(hdfs://zly1.sh.intel.com:8020/user/root/multiQueryFJ.output:org.apache.pig.builtin.PigStorage)
- scope-21
|
|---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp-1338833908/tmp929915440:org.apache.pig.impl.io.InterStorage)
- scope-59--------
{code}
Following is mr plan after multiquery optimization
{code}
scope57->scope-66
scope-61->scope-66
scope-66
#--------------------------------------------------
# Map Reduce Plan
#--------------------------------------------------
MapReduce node scope-61
Map Plan
Split - scope-70
| |
| d:
Store(hdfs://zly1.sh.intel.com:8020/user/root/multiQueryFJ.output.2:org.apache.pig.builtin.PigStorage)
- scope-43
| |
|
Store(hdfs://zly1.sh.intel.com:8020/tmp/temp411366696/tmp-223707761:org.apache.pig.impl.io.InterStorage)
- scope-69
|
|---d: Filter[bag] - scope-36
| |
| Greater Than[boolean] - scope-39
| |
| |---Project[int][3] - scope-37
| |
| |---Constant(10) - scope-38
|
|---b: New For Each(false,false,false,false)[bag] - scope-35
| |
| Cast[chararray] - scope-24
| |
| |---Project[bytearray][0] - scope-23
| |
| Cast[chararray] - scope-27
| |
| |---Project[bytearray][1] - scope-26
| |
| Cast[int] - scope-30
| |
| |---Project[bytearray][2] - scope-29
| |
| Cast[int] - scope-33
| |
| |---Project[bytearray][3] - scope-32
|
|---b:
Load(hdfs://zly1.sh.intel.com:8020/user/root/passwd:PigStorage(':')) -
scope-22--------
Global sort: false
----------------
MapReduce node scope-66
Map Plan
e:
Store(hdfs://zly1.sh.intel.com:8020/user/root/multiQueryFJ.output.3:org.apache.pig.builtin.PigStorage)
- scope-56
|
|---e: FRJoin[tuple] - scope-50
| |
| Project[int][3] - scope-48
| |
| Project[int][3] - scope-49
|
|---Load(hdfs://zly1.sh.intel.com:8020/tmp/temp411366696/tmp-729323405:org.apache.pig.impl.io.InterStorage)
- scope-65--------
Global sort: false
----------------
MapReduce node scope-57
Map Plan
Split - scope-71
| |
| c:
Store(hdfs://zly1.sh.intel.com:8020/user/root/multiQueryFJ.output:org.apache.pig.builtin.PigStorage)
- scope-21
| |
|
Store(hdfs://zly1.sh.intel.com:8020/tmp/temp411366696/tmp-729323405:org.apache.pig.impl.io.InterStorage)
- scope-58
|
|---c: Filter[bag] - scope-14
| |
| Greater Than[boolean] - scope-17
| |
| |---Project[int][2] - scope-15
| |
| |---Constant(5) - scope-16
|
|---a: New For Each(false,false,false,false)[bag] - scope-13
| |
| Cast[chararray] - scope-2
| |
| |---Project[bytearray][0] - scope-1
| |
| Cast[chararray] - scope-5
| |
| |---Project[bytearray][1] - scope-4
| |
| Cast[int] - scope-8
| |
| |---Project[bytearray][2] - scope-7
| |
| Cast[int] - scope-11
| |
| |---Project[bytearray][3] - scope-10
|
|---a:
Load(hdfs://zly1.sh.intel.com:8020/user/root/passwd:PigStorage(':')) -
scope-0--------
Global sort: false
----------------
{code}
After this jjra is fixed, [the
modification|https://github.com/apache/pig/blob/spark/test/org/apache/pig/test/TestPigRunner.java#L458]
in TestPigRunner can be removed.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)