[ https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15500402#comment-15500402 ]
liyunzhang_intel commented on PIG-5029: --------------------------------------- [~knoguchi]: Thanks for your reply. Here is a question about the example you provided in PIG-3257. {code} A = load ... B = group A by UUID(); C = foreach B... {code} {quote}This job could successfully finish with output ranging from 0 to 2n records. For example, sequence of events can be, mapper0_attempt0 finish with n outputs and say all n uuid keys were assigned to reducer0. reducer0_attempt0 pulls map outputs and produces n outputs. reducer1_attempt0 tries to pull mapper0_attempt0 output and fail. (could be fetch failure or node failure). mapper0_attempt1 rerun. And this time, all n uuid keys were assigned to reducer1. reducer1_attempt0 pulls mapper0_attempt1 output and produces n outputs. job finish successfully with 2n outputs. This is certainly unexpected to users. {quote} My question is: 1. reducer0_attempt0 and reduce1_attempt0 both pull map outpout and ready to produce n outputs because of speculative mechanism? 2. If because of speculative mechanism, after reduce0_attempt0 finishes to produce n output, hadoop will cancel reduce1_attempt0 because reduce0_attempt0 success) so there is no possibility to generate 2n outputs. > Optimize sort case when data is skewed > -------------------------------------- > > Key: PIG-5029 > URL: https://issues.apache.org/jira/browse/PIG-5029 > Project: Pig > Issue Type: Sub-task > Components: spark > Reporter: liyunzhang_intel > Assignee: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-5029.patch, SkewedData_L9.docx > > > In PigMix L9.pig > {code} > register $PIGMIX_JAR > A = load '$HDFS_ROOT/page_views' using > org.apache.pig.test.pigmix.udf.PigPerformanceLoader() > as (user, action, timespent, query_term, ip_addr, timestamp, > estimated_revenue, page_info, page_links); > B = order A by query_term parallel $PARALLEL; > store B into '$PIGMIX_OUTPUT/L9out'; > {code} > The pig physical plan will be changed to spark plan and to spark lineage: > {code} > [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter > (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at > map at StoreConverter.java:80 [] > | MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 [] > | ShuffledRDD[6] at sortByKey at SortConverter.java:56 [] > +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 [] > | MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 [] > | MapPartitionsRDD[1] at map at LoadConverter.java:127 [] > | NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 [] > {code} > We use > [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56] > to implement the sort feature. Although > [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106] > is used by RDD.sortByKey and RangePartitiner will sample data and ranges the > key roughly into equal range, the test result(attached document) shows that > one partition will load most keys and take long time to finish. -- This message was sent by Atlassian JIRA (v6.3.4#6332)