[ 
https://issues.apache.org/jira/browse/PIG-4771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206010#comment-15206010
 ] 

Pallavi Rao commented on PIG-4771:
----------------------------------

[~kellyzly], couple of comments on the approach:
{quote}
Replicated files are stored in hdfs and spark workers can access them. We set 
mapred.submit.replication as "10" to make more backups of replicated files so 
that spark workers are likely to access the data locally.
{quote}
Setting mapred.submit.replication as "10". Why 10? The optimal value for this 
per documentation 
(https://hadoop.apache.org/docs/r2.7.0/hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml)
 is square_root(no. of nodes).

mapred.submit.replication is Hadoop 1.0 param. For Hadoop 2, it is 
mapreduce.client.submit.file.replication

Also, mapred.submit.replication is replication factor for job jars, libjars and 
the distributed cache and not the replication factor of HDFS files. So, how 
does this help in spark workers accessing replicated data locally. The 
replication factor of data files can be controlled during creation of files 
using the FileSystem API.

Did you explore doing this the native way in Spark - Broadcasting the RDD and 
doing a map-side join? Were there challenges here?


> Implement FR Join for spark engine
> ----------------------------------
>
>                 Key: PIG-4771
>                 URL: https://issues.apache.org/jira/browse/PIG-4771
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>             Fix For: spark-branch
>
>         Attachments: PIG-4771.patch
>
>
> We use regular join to replace FR join in current code base(fd31fda). We need 
> to implement FR join.
> Some info collected from 
> https://pig.apache.org/docs/r0.11.0/perf.html#replicated-joins:
> *Replicated Joins*
> Fragment replicate join is a special type of join that works well if one or 
> more relations are small enough to fit into main memory. In such cases, Pig 
> can perform a very efficient join because all of the hadoop work is done on 
> the map side. In this type of join the large relation is followed by one or 
> more small relations. The small relations must be small enough to fit into 
> main memory; if they don't, the process fails and an error is generated.
> *Usage*
> Perform a replicated join with the USING clause (see JOIN (inner) and JOIN 
> (outer)). In this example, a large relation is joined with two smaller 
> relations. Note that the large relation comes first followed by the smaller 
> relations; and, all small relations together must fit into main memory, 
> otherwise an error is generated.
> big = LOAD 'big_data' AS (b1,b2,b3);
> tiny = LOAD 'tiny_data' AS (t1,t2,t3);
> mini = LOAD 'mini_data' AS (m1,m2,m3);
> C = JOIN big BY b1, tiny BY t1, mini BY m1 USING 'replicated';
> *Conditions*
> Fragment replicate joins are experimental; we don't have a strong sense of 
> how small the small relation must be to fit into memory. In our tests with a 
> simple query that involves just a JOIN, a relation of up to 100 M can be used 
> if the process overall gets 1 GB of memory. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to