liyunzhang_intel created PIG-4553:
-------------------------------------
Summary: Implement secondary sort using 1 shuffle not twice
Key: PIG-4553
URL: https://issues.apache.org/jira/browse/PIG-4553
Project: Pig
Issue Type: Sub-task
Reporter: liyunzhang_intel
Now we implement secondary key sort in
GlobalRearrangeConverter#convert
first shuffle in repartitionAndSortWithinPartitions second shuffle in groupBy
{code}
public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
POGlobalRearrangeSpark physicalOperator) throws
IOException {
....
if (predecessors.size() == 1) {
// GROUP
JavaPairRDD<Object, Iterable<Tuple>> prdd = null;
if (physicalOperator.isUseSecondaryKey()) {
RDD<Tuple> rdd = predecessors.get(0);
RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new
ToKeyNullValueFunction(),
SparkUtil.<Tuple, Object>getTuple2Manifest());
JavaPairRDD<Tuple, Object> pairRDD = new JavaPairRDD<Tuple,
Object>(rddPair,
SparkUtil.getManifest(Tuple.class),
SparkUtil.getManifest(Object.class));
//first sort the tuple by secondary key if enable
useSecondaryKey sort
JavaPairRDD<Tuple, Object> sorted =
pairRDD.repartitionAndSortWithinPartitions(new HashPartitioner(parallelism),
new PigSecondaryKeyComparatorSpark(physicalOperator.getSecondarySortOrder()));
// first shuffle
JavaRDD<Tuple> mapped = sorted.mapPartitions(new
ToValueFunction());
prdd = mapped.groupBy(new GetKeyFunction(physicalOperator),
parallelism);// second shuffle
} else {
JavaRDD<Tuple> jrdd = predecessors.get(0).toJavaRDD();
prdd = jrdd.groupBy(new GetKeyFunction(physicalOperator),
parallelism);
}
JavaRDD<Tuple> jrdd2 = prdd.map(new
GroupTupleFunction(physicalOperator));
return jrdd2.rdd();
}
....
}
{code}
we can optimize it according to the code from
https://github.com/tresata/spark-sorted.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)