how can i get a shuffle with 2048 partitions and 2048 tasks and then a map phase with 10 partitions and 10 tasks that writes to hdfs?
every time i try to do this using coalesce the shuffle ends up having 10 tasks which is unacceptable due to OOM. this makes coalesce somewhat useless. On Wed, Oct 10, 2018 at 9:06 AM Wenchen Fan <cloud0...@gmail.com> wrote: > Note that, RDD partitions and Spark tasks are not always 1-1 mapping. > > Assuming `rdd1` has 100 partitions, and `rdd2 = rdd1.coalesce(10)`. Then > `rdd2` has 10 partitions, and there is no shuffle between `rdd1` and > `rdd2`. During scheduling, `rdd1` and `rdd2` are in the same stage, and > this stage has 10 tasks (decided by the last RDD). This means, each Spark > task will process 10 partitions of `rdd1`. > > Looking at your example, I don't see where is the problem. Can you > describe what is not expected? > > On Wed, Oct 10, 2018 at 2:11 PM Sergey Zhemzhitsky <szh.s...@gmail.com> > wrote: > >> Well, it seems that I can still extend the CoalesceRDD to make it >> preserve the total number of partitions from the parent RDD, reduce some >> partitons in the same way as the original coalesce does for map-only jobs >> and fill the gaps (partitions which should reside on the positions of the >> coalesced ones) with just a special kind of partitions which do not have >> any parent dependencies and always return an empty iterator. >> >> I believe this should work as desired (at least the previous >> ShuffleMapStage will think that the number of partitons in the next stage, >> it generates shuffle output for, is not changed). >> >> There are few issues though - existence of empty partitions which can be >> evaluated almost for free and empty output files from these empty partitons >> which can be beaten by means of LazyOutputFormat in case of RDDs. >> >> >> >> On Mon, Oct 8, 2018, 23:57 Koert Kuipers <ko...@tresata.com> wrote: >> >>> although i personally would describe this as a bug the answer will be >>> that this is the intended behavior. the coalesce "infects" the shuffle >>> before it, making a coalesce useless for reducing output files after a >>> shuffle with many partitions b design. >>> >>> your only option left is a repartition for which you pay the price in >>> that it introduces another expensive shuffle. >>> >>> interestingly if you do a coalesce on a map-only job it knows how to >>> reduce the partitions and output files without introducing a shuffle, so >>> clearly it is possible, but i dont know how to get this behavior after a >>> shuffle in an existing job. >>> >>> On Fri, Oct 5, 2018 at 6:34 PM Sergey Zhemzhitsky <szh.s...@gmail.com> >>> wrote: >>> >>>> Hello guys, >>>> >>>> Currently I'm a little bit confused with coalesce behaviour. >>>> >>>> Consider the following usecase - I'd like to join two pretty big RDDs. >>>> To make a join more stable and to prevent it from failures by OOM RDDs >>>> are usually repartitioned to redistribute data more evenly and to >>>> prevent every partition from hitting 2GB limit. Then after join with a >>>> lot of partitions. >>>> >>>> Then after successful join I'd like to save the resulting dataset. >>>> But I don't need such a huge amount of files as the number of >>>> partitions/tasks during joining. Actually I'm fine with such number of >>>> files as the total number of executor cores allocated to the job. So >>>> I've considered using a coalesce. >>>> >>>> The problem is that coalesce with shuffling disabled prevents join >>>> from using the specified number of partitions and instead forces join >>>> to use the number of partitions provided to coalesce >>>> >>>> scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5, >>>> false).toDebugString >>>> res5: String = >>>> (5) CoalescedRDD[15] at coalesce at <console>:25 [] >>>> | MapPartitionsRDD[14] at repartition at <console>:25 [] >>>> | CoalescedRDD[13] at repartition at <console>:25 [] >>>> | ShuffledRDD[12] at repartition at <console>:25 [] >>>> +-(20) MapPartitionsRDD[11] at repartition at <console>:25 [] >>>> | ParallelCollectionRDD[10] at makeRDD at <console>:25 [] >>>> >>>> With shuffling enabled everything is ok, e.g. >>>> >>>> scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5, >>>> true).toDebugString >>>> res6: String = >>>> (5) MapPartitionsRDD[24] at coalesce at <console>:25 [] >>>> | CoalescedRDD[23] at coalesce at <console>:25 [] >>>> | ShuffledRDD[22] at coalesce at <console>:25 [] >>>> +-(100) MapPartitionsRDD[21] at coalesce at <console>:25 [] >>>> | MapPartitionsRDD[20] at repartition at <console>:25 [] >>>> | CoalescedRDD[19] at repartition at <console>:25 [] >>>> | ShuffledRDD[18] at repartition at <console>:25 [] >>>> +-(20) MapPartitionsRDD[17] at repartition at <console>:25 [] >>>> | ParallelCollectionRDD[16] at makeRDD at <console>:25 [] >>>> >>>> In that case the problem is that for pretty huge datasets additional >>>> reshuffling can take hours or at least comparable amount of time as >>>> for the join itself. >>>> >>>> So I'd like to understand whether it is a bug or just an expected >>>> behaviour? >>>> In case it is expected is there any way to insert additional >>>> ShuffleMapStage into an appropriate position of DAG but without >>>> reshuffling itself? >>>> >>>> --------------------------------------------------------------------- >>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >>>> >>>>