Hi all
VertexRDD is partitioned with HashPartitioner, and it exhibits some
imbalance of tasks.
For example, Connected Components with partition strategy Edge2D:
Aggregated Metrics by Executor
Executor ID Task Time Total Tasks Failed Tasks Succeeded Tasks
Input Shuffle Read Shuffle Write Shuffle Spill (Memory) Shuffle
Spill (Disk)
1 10 s 10 0 10 234.6 MB 0.0 B 43.2 MB
0.0 B 0.0 B
2 3 s 3 0 3 70.4 MB 0.0 B 13.0 MB
0.0 B 0.0 B
3 6 s 6 0 6 140.7 MB 0.0 B 25.9 MB
0.0 B 0.0 B
4 9 s 8 0 8 187.9 MB 0.0 B 34.6 MB
0.0 B 0.0 B
5 10 s 9 0 9 211.4 MB 0.0 B 38.9 MB
0.0 B 0.0 B
For a stage on mapPartitions at VertexRDD.scala:347
343
344 /** Generates an RDD of vertex attributes suitable for shipping to
the edge partitions. */
345 private[graphx] def shipVertexAttributes(
346 shipSrc: Boolean, shipDst: Boolean): RDD[(PartitionID,
VertexAttributeBlock[VD])] = {
347
partitionsRDD.mapPartitions(_.flatMap(_.shipVertexAttributes(shipSrc,
shipDst)))
348 }
349
This is executed for every iteration in Pregel, so the imbalance is bad
for performance.
However, when run PageRank with Edge2D, the tasks are even across
executors. (all finish 6 tasks)
Our configuration is 6 node, 36 partitions.
My questions is:
What decides the number of tasks for different executors? And how to
make it balance?
Thanks!
Larry