[
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15521883#comment-15521883
]
liyunzhang_intel commented on PIG-5029:
---------------------------------------
[~vanzin]: Thanks for your comment, here i have a question about using salted
key to solve the skewed data problem in the above link: Will *redundant* data
be generated? for example, salt the key(append a random integer to make a new
key), and transform the key after several rdd transformations, the spark
job/stage/task fails because of fetch failure or node failure and the temporary
output is still saved on the disk and spark retries task. Will spark aggregate
the *temporary output* which is generated by the last failed task to the final
result? I think this will *not* happen because spark will remove temporary
output if [fail to fetch map
outputs|https://github.com/apache/spark/blob/649fa4bf1d6fc9271ae56b6891bc93ebf57858d1/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1278].
Can you double confirm this because [~knoguchi] proposed that it will
generate redundant data after fetch failure if we use salt key solution(append
a random integer to make a new key to distribute keys more evenly)?
[~knoguchi]:
And about node failure to cause the redundant key, i think spark will rerun the
task on other nodes and this will not aggregate temporary output on the failed
node to the final result.
About data missing, i think this will *only* happen when we use the random
integer as the *only* key of the tuple(from the case you provided in PIG-3257)
> Optimize sort case when data is skewed
> --------------------------------------
>
> Key: PIG-5029
> URL: https://issues.apache.org/jira/browse/PIG-5029
> Project: Pig
> Issue Type: Sub-task
> Components: spark
> Reporter: liyunzhang_intel
> Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5029.patch, SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
> as (user, action, timespent, query_term, ip_addr, timestamp,
> estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at
> map at StoreConverter.java:80 []
> | MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
> | ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
> +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
> | MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
> | MapPartitionsRDD[1] at map at LoadConverter.java:127 []
> | NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
> to implement the sort feature. Although
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
> is used by RDD.sortByKey and RangePartitiner will sample data and ranges the
> key roughly into equal range, the test result(attached document) shows that
> one partition will load most keys and take long time to finish.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)