[
https://issues.apache.org/jira/browse/PIG-4891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15839327#comment-15839327
]
liyunzhang_intel commented on PIG-4891:
---------------------------------------
Here is my understanding to this jira, let's use an example to explain it.
{code}
A = load './SkewedJoinInput1.txt' as (id,name,n);
B = load './SkewedJoinInput2.txt' as (id,name);
D = join A by (id,name), B by (id,name) using 'replicated';
explain D;
{code}
before the patch, the spark plan is:
{code}
#--------------------------------------------------
# Spark Plan
#--------------------------------------------------
Spark node scope-26
Store(hdfs://zly1.sh.intel.com:8020/tmp/temp1749487848/tmp1731009936:org.apache.pig.impl.io.InterStorage)
- scope-27
|
|---B: New For Each(false,false)[bag] - scope-13
| |
| Project[bytearray][0] - scope-9
| |
| Project[bytearray][1] - scope-11
|
|---B:
Load(hdfs://zly1.sh.intel.com:8020/user/root/SkewedJoinInput2.txt:org.apache.pig.builtin.PigStorage)
- scope-8--------
Spark node scope-25
D: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-24
|
|---D: FRJoin[tuple] - scope-18
| |
| Project[bytearray][0] - scope-14
| |
| Project[bytearray][1] - scope-15
| |
| Project[bytearray][0] - scope-16
| |
| Project[bytearray][1] - scope-17
|
|---A: New For Each(false,false,false)[bag] - scope-7
| |
| Project[bytearray][0] - scope-1
| |
| Project[bytearray][1] - scope-3
| |
| Project[bytearray][2] - scope-5
|
|---A:
Load(hdfs://zly1.sh.intel.com:8020/user/root/SkewedJoinInput1.txt:org.apache.pig.builtin.PigStorage)
- scope-0--------
{code}
After patch
{code}
Spark node scope-28
D: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-24
|
|---D: FRJoinSpark[tuple] - scope-18
| |
| Project[bytearray][0] - scope-14
| |
| Project[bytearray][1] - scope-15
| |
| Project[bytearray][0] - scope-16
| |
| Project[bytearray][1] - scope-17
|
|---A: New For Each(false,false,false)[bag] - scope-7
| | |
| | Project[bytearray][0] - scope-1
| | |
| | Project[bytearray][1] - scope-3
| | |
| | Project[bytearray][2] - scope-5
| |
| |---A:
Load(hdfs://zly1.sh.intel.com:8020/user/root/SkewedJoinInput1.txt:org.apache.pig.builtin.PigStorage)
- scope-0
|
|---BroadcastSpark - scope-27
|
|---B: New For Each(false,false)[bag] - scope-13
| |
| Project[bytearray][0] - scope-9
| |
| Project[bytearray][1] - scope-11
|
|---B:
Load(hdfs://zly1.sh.intel.com:8020/user/root/SkewedJoinInput2.txt:org.apache.pig.builtin.PigStorage)
- scope
{code}
In the patch
1. we don't load the small table to the distributed cache and start a
new job to load data from distributed cache.
2. load small table as rdd and broadcast small rdd by
SparkContext.broadcast()
> Implement FR join by broadcasting small rdd not making more copys of data
> -------------------------------------------------------------------------
>
> Key: PIG-4891
> URL: https://issues.apache.org/jira/browse/PIG-4891
> Project: Pig
> Issue Type: Sub-task
> Components: spark
> Reporter: liyunzhang_intel
> Assignee: Nandor Kollar
> Fix For: spark-branch
>
>
> In current implementation of FRJoin(PIG-4771), we just set the value of
> replication of data as 10 to make the data access more efficiency because
> current FRJoin algrithms can be reused in this way. We need to figure out how
> to use broadcasting small rdd to implement FRJoin in current code base if we
> find the performance can be improved a lot by using broadcasting rdd.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)