Hi,
I've been looking at the code of RDD.treeAggregate, because we've seen a
huge performance drop between 1.5.2 and 1.6.1 on a treeReduce. I think
the treeAggregate code hasn't changed, so my message is not about the
performance drop, but a more general remark about treeAggregate.
In treeAggregate, after the aggregate is applied inside original
partitions, we enter the tree :
while (numPartitions > scale + math.ceil(numPartitions.toDouble /
scale)) {
numPartitions /= scale
val curNumPartitions = numPartitions
*partiallyAggregated **=**partiallyAggregated.mapPartitionsWithIndex {*
*(i, iter) **=>**iter.map((i **%**curNumPartitions, _))*
}.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values
}
The two lines where the partitions are numbered then renumbered, then
reducedByKey seems below optimality to me. There is a huge shuffle cost,
while a simple coalesce followed by a partition-level aggregation would
probably perfectly do the job.
Have I missed something that requires to do this reshuffle ?
Best regards
Guillaume Pitel