liyunzhang_intel created PIG-5029:
-------------------------------------

             Summary: Optimize sort case when data is skewed
                 Key: PIG-5029
                 URL: https://issues.apache.org/jira/browse/PIG-5029
             Project: Pig
          Issue Type: Sub-task
            Reporter: liyunzhang_intel
            Assignee: liyunzhang_intel


In PigMix L9.pig
{code}
register $PIGMIX_JAR
A = load '$HDFS_ROOT/page_views' using 
org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
    as (user, action, timespent, query_term, ip_addr, timestamp,
        estimated_revenue, page_info, page_links);
B = order A by query_term parallel $PARALLEL;
store B into '$PIGMIX_OUTPUT/L9out';
{code}

The pig physical plan will be changed to spark plan and to spark lineage:
{code}
[main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter 
(StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at 
map at StoreConverter.java:80 []
 |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
 |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
 +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
    |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
    |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
    |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
{code}

We use 
[sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
 to implement the sort feature. Although 
[RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
 is used by RDD.sortByKey and RangePartitiner will sample data and ranges the 
key roughly into equal range, the test result(attached  picture) shows that one 
partition will load most keys and take long time to finish.








--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to