[
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
liyunzhang_intel updated PIG-5029:
----------------------------------
Description:
In PigMix L9.pig
{code}
register $PIGMIX_JAR
A = load '$HDFS_ROOT/page_views' using
org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
as (user, action, timespent, query_term, ip_addr, timestamp,
estimated_revenue, page_info, page_links);
B = order A by query_term parallel $PARALLEL;
store B into '$PIGMIX_OUTPUT/L9out';
{code}
The pig physical plan will be changed to spark plan and to spark lineage:
{code}
[main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter
(StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at
map at StoreConverter.java:80 []
| MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
| ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
+-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
| MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
| MapPartitionsRDD[1] at map at LoadConverter.java:127 []
| NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
{code}
We use
[sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
to implement the sort feature. Although
[RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
is used by RDD.sortByKey and RangePartitiner will sample data and ranges the
key roughly into equal range, the test result(attached document) shows that
one partition will load most keys and take long time to finish.
was:
In PigMix L9.pig
{code}
register $PIGMIX_JAR
A = load '$HDFS_ROOT/page_views' using
org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
as (user, action, timespent, query_term, ip_addr, timestamp,
estimated_revenue, page_info, page_links);
B = order A by query_term parallel $PARALLEL;
store B into '$PIGMIX_OUTPUT/L9out';
{code}
The pig physical plan will be changed to spark plan and to spark lineage:
{code}
[main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter
(StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at
map at StoreConverter.java:80 []
| MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
| ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
+-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
| MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
| MapPartitionsRDD[1] at map at LoadConverter.java:127 []
| NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
{code}
We use
[sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
to implement the sort feature. Although
[RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
is used by RDD.sortByKey and RangePartitiner will sample data and ranges the
key roughly into equal range, the test result(attached picture) shows that one
partition will load most keys and take long time to finish.
> Optimize sort case when data is skewed
> --------------------------------------
>
> Key: PIG-5029
> URL: https://issues.apache.org/jira/browse/PIG-5029
> Project: Pig
> Issue Type: Sub-task
> Components: spark
> Reporter: liyunzhang_intel
> Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
> as (user, action, timespent, query_term, ip_addr, timestamp,
> estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at
> map at StoreConverter.java:80 []
> | MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
> | ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
> +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
> | MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
> | MapPartitionsRDD[1] at map at LoadConverter.java:127 []
> | NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
> to implement the sort feature. Although
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
> is used by RDD.sortByKey and RangePartitiner will sample data and ranges the
> key roughly into equal range, the test result(attached document) shows that
> one partition will load most keys and take long time to finish.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)