[
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15530065#comment-15530065
]
Thomas Graves commented on PIG-5029:
------------------------------------
Is the question whether spark supports maps that aren't idempotent? ie if it
runs again it could generate different output? If so I think that is a bad
assumption even if it did. maps should be idempotent because you have things
like speculative execution and failure cases as you have talked about that rely
or could rely on this.
Spark does try to be intelligent about rerunning stages, so if you get a
fetchFailure of a map after one of the reducers has already finished, it will
go rerun that map and then only rerun the reducers that haven't finished. So
if the map output is different you could end up with different results across
those reducers.
The external shuffle doesn't necessarily solve this problem. It helps but if
the node that has the map output goes away, it would have to be recomputed.
I'm assuming in the presentation they are using a salt key such that it
generates the same random if the map task is reran like Koji mentioned. For
instance can you use the task id (not attempt) such that the key is always the
same if it re-runs. [~rohini] how is pig handling skew for MR/TEZ?
> Optimize sort case when data is skewed
> --------------------------------------
>
> Key: PIG-5029
> URL: https://issues.apache.org/jira/browse/PIG-5029
> Project: Pig
> Issue Type: Sub-task
> Components: spark
> Reporter: liyunzhang_intel
> Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5029.patch, PIG-5029_2.patch, SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
> as (user, action, timespent, query_term, ip_addr, timestamp,
> estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at
> map at StoreConverter.java:80 []
> | MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
> | ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
> +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
> | MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
> | MapPartitionsRDD[1] at map at LoadConverter.java:127 []
> | NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
> to implement the sort feature. Although
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
> is used by RDD.sortByKey and RangePartitiner will sample data and ranges the
> key roughly into equal range, the test result(attached document) shows that
> one partition will load most keys and take long time to finish.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)