liyunzhang_intel created PIG-4969:
-------------------------------------
Summary: Optimize combine case for spark mode
Key: PIG-4969
URL: https://issues.apache.org/jira/browse/PIG-4969
Project: Pig
Issue Type: Bug
Components: spark
Reporter: liyunzhang_intel
Assignee: liyunzhang_intel
In our test result of 1 TB pigmix benchmark , it shows that it runs slower in
combine case in spark mode .
||Script||MR||Spark
|L_1|8089 |10064
L1.pig
{code}
register pigperf.jar;
A = load '/user/pig/tests/data/pigmix/page_views' using
org.apache.pig.test.udf.storefunc.PigPerformanceLoader()
as (user, action, timespent, query_term, ip_addr, timestamp,
estimated_revenue, page_info, page_links);
B = foreach A generate user, (int)action as action, (map[])page_info as
page_info,
flatten((bag{tuple(map[])})page_links) as page_links;
C = foreach B generate user,
(action == 1 ? page_info#'a' : page_links#'b') as header;
D = group C by user parallel 40;
E = foreach D generate group, COUNT(C) as cnt;
store E into 'L1out';
{code}
Then spark plan
{code}
exec] #--------------------------------------------------
[exec] # Spark Plan
[exec] #--------------------------------------------------
[exec]
[exec] Spark node scope-38
[exec] E:
Store(hdfs://bdpe81:8020/user/root/output/pig/L1out:org.apache.pig.builtin.PigStorage)
- scope-37
[exec] |
[exec] |---E: New For Each(false,false)[tuple] - scope-42
[exec] | |
[exec] | Project[bytearray][0] - scope-39
[exec] | |
[exec] | Project[bag][1] - scope-40
[exec] |
[exec] | POUserFunc(org.apache.pig.builtin.COUNT$Final)[long] -
scope-41
[exec] | |
[exec] | |---Project[bag][1] - scope-57
[exec] |
[exec] |---Reduce By(false,false)[tuple] - scope-47
[exec] | |
[exec] | Project[bytearray][0] - scope-48
[exec] | |
[exec] |
POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple] - scope-49
[exec] | |
[exec] | |---Project[bag][1] - scope-50
[exec] |
[exec] |---D: Local Rearrange[tuple]{bytearray}(false) - scope-53
[exec] | |
[exec] | Project[bytearray][0] - scope-55
[exec] |
[exec] |---E: New For Each(false,false)[bag] - scope-43
[exec] | |
[exec] | Project[bytearray][0] - scope-44
[exec] | |
[exec] |
POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - scope-45
[exec] | |
[exec] | |---Project[bag][1] - scope-46
[exec] |
[exec] |---Pre Combiner Local Rearrange[tuple]{Unknown} -
scope-56
[exec] |
[exec] |---C: New For Each(false,false)[bag] - scope-26
[exec] | |
[exec] | Project[bytearray][0] - scope-13
[exec] | |
[exec] | POBinCond[bytearray] - scope-22
[exec] | |
[exec] | |---Equal To[boolean] - scope-17
[exec] | | |
[exec] | | |---Project[int][1] - scope-15
[exec] | | |
[exec] | | |---Constant(1) - scope-16
[exec] | |
[exec] | |---POMapLookUp[bytearray] - scope-19
[exec] | | |
[exec] | | |---Project[map][2] - scope-18
[exec] | |
[exec] | |---POMapLookUp[bytearray] - scope-21
[exec] | |
[exec] | |---Project[map][3] - scope-20
[exec] |
[exec] |---B: New For
Each(false,false,false,true)[bag] - scope-12
[exec] | |
[exec] | Project[bytearray][0] - scope-1
[exec] | |
[exec] | Cast[int] - scope-4
[exec] | |
[exec] | |---Project[bytearray][1] - scope-3
[exec] | |
[exec] | Cast[map:[]] - scope-7
[exec] | |
[exec] | |---Project[bytearray][2] - scope-6
[exec] | |
[exec] | Cast[bag:{([])}] - scope-10
[exec] | |
[exec] | |---Project[bytearray][3] - scope-9
[exec] |
[exec] |---A:
Load(/user/pig/tests/data/pigmix/page_views:org.apache.pig.test.pigmix.udf.PigPerformanceLoader)
- scope-0--------
{code}
We can combine LocalRearrange(scope-53) and ReduceBy(scope-47) as 1 physical
operator to remove the redundant map operations like what we did in
PIG-4797(Optimization for join/group case for spark mode).
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)