[ 
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15524780#comment-15524780
 ] 

liyunzhang_intel commented on PIG-5029:
---------------------------------------

[~rohini] and [~knoguchi]: thanks for your patience on this jira.
bq. Key is making Random less random so that it'll produce same sequence of 
values when retried for the given task.

You mean that different key which does not belongs to the same partition will 
be distributed to the same partition after we use (key+random integer) after 
retried the given task, is my understanding right?

If my understanding is right, i wonder why salted key(appending random to the 
key) is  widely used in 
[HBase|https://sematext.com/blog/2012/04/09/hbasewd-avoid-regionserver-hotspotting-despite-writing-records-with-sequential-keys/]
 and even proposed in the [Spark Submit 
2016|http://www.slideshare.net/SparkSummit/top-5-mistakes-when-writing-spark-applications-by-mark-grover-and-ted-malaska]
 to make the key distributed evenly if salted key will generate problems in the 
retried task case.

> Optimize sort case when data is skewed
> --------------------------------------
>
>                 Key: PIG-5029
>                 URL: https://issues.apache.org/jira/browse/PIG-5029
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>             Fix For: spark-branch
>
>         Attachments: PIG-5029.patch, SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using 
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
>     as (user, action, timespent, query_term, ip_addr, timestamp,
>         estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter 
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at 
> map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
>     |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
>     |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
>     |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use 
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
>  to implement the sort feature. Although 
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
>  is used by RDD.sortByKey and RangePartitiner will sample data and ranges the 
> key roughly into equal range, the test result(attached  document) shows that 
> one partition will load most keys and take long time to finish.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to