[ 
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15624556#comment-15624556
 ] 

liyunzhang_intel commented on PIG-5029:
---------------------------------------

[~knoguchi]:  

Before we discussed a lot about the skewed key sort issue in spark mode.  
Before i proposed the PIG-5029_1.patch and append a random integer to the 
original key thus to make the skewed key distributed evenly([salted key 
solution 
|http://www.slideshare.net/SparkSummit/top-5-mistakes-when-writing-spark-applications-by-mark-grover-and-ted-malaska].
 You mentioned some issues this patch brings such as duplicated and missing 
results. Later [~tgraves] commented that
{quote}
I'm assuming in the presentation they are using a salt key such that it 
generates the same random if the map task is reran like Koji mentioned. For 
instance can you use the task id (not attempt) such that the key is always the 
same if it re-runs.
{quote}
in PIG-5051_5029_5.patch
So now i generate the random by {noformat} new Random(seed) {noformat}. Here 
the value of {noformat}seed{noformat} is 
PigMapReduce.sJobConfInternal.get().get(PigConstants.TASK_INDEX);  thus the 
generated random integers in different task attempts of a spark task are same 
if retries happens because the value of  
PigMapReduce.sJobConfInternal.get().get(PigConstants.TASK_INDEX) are same.

Do you think this solution is ok? If you think it is ok.  [~kexianda] will 
start to review this patch.

Thanks [~knoguchi]'s patience on this jira again and this is a really 
interesting issue.



> Optimize sort case when data is skewed
> --------------------------------------
>
>                 Key: PIG-5029
>                 URL: https://issues.apache.org/jira/browse/PIG-5029
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>             Fix For: spark-branch
>
>         Attachments: PIG-5029.patch, PIG-5029_2.patch, PIG-5029_3.patch, 
> SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using 
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
>     as (user, action, timespent, query_term, ip_addr, timestamp,
>         estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter 
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at 
> map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
>     |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
>     |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
>     |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use 
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
>  to implement the sort feature. Although 
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
>  is used by RDD.sortByKey and RangePartitiner will sample data and ranges the 
> key roughly into equal range, the test result(attached  document) shows that 
> one partition will load most keys and take long time to finish.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to