Hello guys,

Currently I'm a little bit confused with coalesce behaviour.

Consider the following usecase - I'd like to join two pretty big RDDs.
To make a join more stable and to prevent it from failures by OOM RDDs
are usually repartitioned to redistribute data more evenly and to
prevent every partition from hitting 2GB limit. Then after join with a
lot of partitions.

Then after successful join I'd like to save the resulting dataset.
But I don't need such a huge amount of files as the number of
partitions/tasks during joining. Actually I'm fine with such number of
files as the total number of executor cores allocated to the job. So
I've considered using a coalesce.

The problem is that coalesce with shuffling disabled prevents join
from using the specified number of partitions and instead forces join
to use the number of partitions provided to coalesce

scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5,
false).toDebugString
res5: String =
(5) CoalescedRDD[15] at coalesce at <console>:25 []
 |  MapPartitionsRDD[14] at repartition at <console>:25 []
 |  CoalescedRDD[13] at repartition at <console>:25 []
 |  ShuffledRDD[12] at repartition at <console>:25 []
 +-(20) MapPartitionsRDD[11] at repartition at <console>:25 []
    |   ParallelCollectionRDD[10] at makeRDD at <console>:25 []

With shuffling enabled everything is ok, e.g.

scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5, true).toDebugString
res6: String =
(5) MapPartitionsRDD[24] at coalesce at <console>:25 []
 |  CoalescedRDD[23] at coalesce at <console>:25 []
 |  ShuffledRDD[22] at coalesce at <console>:25 []
 +-(100) MapPartitionsRDD[21] at coalesce at <console>:25 []
     |   MapPartitionsRDD[20] at repartition at <console>:25 []
     |   CoalescedRDD[19] at repartition at <console>:25 []
     |   ShuffledRDD[18] at repartition at <console>:25 []
     +-(20) MapPartitionsRDD[17] at repartition at <console>:25 []
        |   ParallelCollectionRDD[16] at makeRDD at <console>:25 []

In that case the problem is that for pretty huge datasets additional
reshuffling can take hours or at least comparable amount of time as
for the join itself.

So I'd like to understand whether it is a bug or just an expected behaviour?
In case it is expected is there any way to insert additional
ShuffleMapStage into an appropriate position of DAG but without
reshuffling itself?

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

Reply via email to