Hi,

I've been looking at the code of RDD.treeAggregate, because we've seen a huge performance drop between 1.5.2 and 1.6.1 on a treeReduce. I think the treeAggregate code hasn't changed, so my message is not about the performance drop, but a more general remark about treeAggregate.

In treeAggregate, after the aggregate is applied inside original partitions, we enter the tree :


while (numPartitions > scale + math.ceil(numPartitions.toDouble / scale)) {

        numPartitions /= scale

        val curNumPartitions = numPartitions

        *partiallyAggregated **=**partiallyAggregated.mapPartitionsWithIndex {*

        *(i, iter) **=>**iter.map((i **%**curNumPartitions, _))*

        }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values

        }


The two lines where the partitions are numbered then renumbered, then reducedByKey seems below optimality to me. There is a huge shuffle cost, while a simple coalesce followed by a partition-level aggregation would probably perfectly do the job.

Have I missed something that requires to do this reshuffle ?

Best regards
Guillaume Pitel

Reply via email to