[
https://issues.apache.org/jira/browse/PIG-4771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
liyunzhang_intel updated PIG-4771:
----------------------------------
Attachment: PIG-4771.patch
Use the algorithms in POFRJoin to implement FRJoin for pig on spark mode.
let use an example to explain this feature
frJoin.pig
{code}
A = load './SkewedJoinInput1.txt' as (id,name,n);
B = load './SkewedJoinInput2.txt' as (id,name);
C = filter B by id > 100;
D = join A by (id,name), C by (id,name) using 'replicated';
store D into './testFRJoin.out';
{code}
Physical Plan
{code}
#-----------------------------------------------
# Physical Plan:
#-----------------------------------------------
D: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-17
|
|---D: FRJoin[tuple] - scope-11
| |
| Project[bytearray][0] - scope-7
| |
| Project[bytearray][1] - scope-8
| |
| Project[bytearray][0] - scope-9
| |
| Project[bytearray][1] - scope-10
|
|---A:
Load(hdfs://zly1.sh.intel.com:8020/user/root/SkewedJoinInput1.txt:org.apache.pig.builtin.PigStorage)
- scope-0
|
|---C: Filter[bag] - scope-2
| |
| Greater Than[boolean] - scope-6
| |
| |---Cast[int] - scope-4
| | |
| | |---Project[bytearray][0] - scope-3
| |
| |---Constant(100) - scope-5
|
|---B:
Load(hdfs://zly1.sh.intel.com:8020/user/root/SkewedJoinInput2.txt:org.apache.pig.builtin.PigStorage)
- scope-1
{code}
Spark plan
{code}
Spark node scope-40
Store(hdfs://zly1.sh.intel.com:8020/tmp/temp-862810646/tmp-1458847763:org.apache.pig.impl.io.InterStorage)
- scope-41
|
|---C: Filter[bag] - scope-23
| |
| Greater Than[boolean] - scope-27
| |
| |---Cast[int] - scope-25
| | |
| | |---Project[bytearray][0] - scope-24
| |
| |---Constant(100) - scope-26
|
|---B:
Load(hdfs://zly1.sh.intel.com:8020/user/root/SkewedJoinInput2.txt:org.apache.pig.builtin.PigStorage)
- scope-22--------
Spark node scope-39
D:
Store(hdfs://zly1.sh.intel.com:8020/user/root/testFRJoin.out:org.apache.pig.builtin.PigStorage)
- scope-38
|
|---D: FRJoin[tuple] - scope-32
| |
| Project[bytearray][0] - scope-28
| |
| Project[bytearray][1] - scope-29
| |
| Project[bytearray][0] - scope-30
| |
| Project[bytearray][1] - scope-31
|
|---A:
Load(hdfs://zly1.sh.intel.com:8020/user/root/SkewedJoinInput1.txt:org.apache.pig.builtin.PigStorage)
- scope-21--------
{code}
in SparkCompiler#visitFRJoin:We create a sparkOperator to save the result of
replicated file to the hdfs temporary file. We load the temporary file in
POFRJoin#setUpHashMap.*Why create a new SparkOperator just load file then store
it to a temporary file and then load it in POFRJoin#setUpHash?why not just load
the file in POFRJOin#setUpHash?* This is because we can not gurantee that the
type of predecessors of FRJoin is POLoad in physical plan, in above case, the
predecessors of FRJoin is POFIlter and POLoad.
*How to gurantee that replicated files are access to the spark workers?*
Replicated files are stored in hdfs and spark workers can access them. We set
mapred.submit.replication as "10" to make more backups of replicated files so
that spark workers are likely to access the data locally. We don't use
Distributed Cache( a map-reduce feature) like what is used in MR mode because
we do not gurantee users install MR when they use pig on spark.
> Implement FR Join for spark engine
> ----------------------------------
>
> Key: PIG-4771
> URL: https://issues.apache.org/jira/browse/PIG-4771
> Project: Pig
> Issue Type: Sub-task
> Components: spark
> Reporter: liyunzhang_intel
> Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-4771.patch
>
>
> We use regular join to replace FR join in current code base(fd31fda). We need
> to implement FR join.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)