[ 
https://issues.apache.org/jira/browse/PIG-4553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liyunzhang_intel updated PIG-4553:
----------------------------------
    Attachment: PIG-4553_2.patch

[~mohitsabharwal],[~pallavi.rao] and [~kexianda]
please help review PIG-4553_2.patch.
Changes are
1. modify IndexedKey#compareTo to order by first key then 
secondary key.
2. create customer partitioner “IndexedKeyPartitioner”(make tuple with same 
first key in the same partition). later use 
JavaPairRDD.repartitionAndSortWithinPartitions: repartition the RDD 
according to IndexedPartitioner and within each resulting partition,sort tuples 
3.  add AccumulateByKey which wraps POPackage to package tuples with same key 
as the result: (key,(val1,val2,val3,...))



> Implement secondary sort using 1 shuffle not twice
> --------------------------------------------------
>
>                 Key: PIG-4553
>                 URL: https://issues.apache.org/jira/browse/PIG-4553
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>             Fix For: spark-branch
>
>         Attachments: PIG-4553_1.patch, PIG-4553_2.patch
>
>
> Now we implement secondary key sort in 
> GlobalRearrangeConverter#convert
> first shuffle in repartitionAndSortWithinPartitions second shuffle in groupBy
> {code}
> public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
>                               POGlobalRearrangeSpark physicalOperator) throws 
> IOException {
> ....
>   if (predecessors.size() == 1) {
>             // GROUP
>             JavaPairRDD<Object, Iterable<Tuple>> prdd = null;
>             if (physicalOperator.isUseSecondaryKey()) {
>                 RDD<Tuple> rdd = predecessors.get(0);
>                 RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new 
> ToKeyNullValueFunction(),
>                         SparkUtil.<Tuple, Object>getTuple2Manifest());
>                 JavaPairRDD<Tuple, Object> pairRDD = new JavaPairRDD<Tuple, 
> Object>(rddPair,
>                         SparkUtil.getManifest(Tuple.class),
>                         SparkUtil.getManifest(Object.class));
>                 //first sort the tuple by secondary key if enable 
> useSecondaryKey sort
>                 JavaPairRDD<Tuple, Object> sorted = 
> pairRDD.repartitionAndSortWithinPartitions(new HashPartitioner(parallelism), 
> new 
> PigSecondaryKeyComparatorSpark(physicalOperator.getSecondarySortOrder()));  
> // first shuffle 
>                 JavaRDD<Tuple> mapped = sorted.mapPartitions(new 
> ToValueFunction());
>                 prdd = mapped.groupBy(new GetKeyFunction(physicalOperator), 
> parallelism);// second shuffle
>             } else {
>                 JavaRDD<Tuple> jrdd = predecessors.get(0).toJavaRDD();
>                 prdd = jrdd.groupBy(new GetKeyFunction(physicalOperator), 
> parallelism);
>             }
>             JavaRDD<Tuple> jrdd2 = prdd.map(new 
> GroupTupleFunction(physicalOperator));
>             return jrdd2.rdd();
>         } 
> ....
> }
> {code}
> we can optimize it according to the code from 
> https://github.com/tresata/spark-sorted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to