i realize it is unlikely all data will be local to tasks, so placement will not be optimal and there will be some network traffic, but is this the same as a shuffle?
in CoalesceRDD it shows a NarrowDependency, which i thought meant it could be implemented without a shuffle. On Mon, Oct 15, 2018 at 2:49 AM Jörn Franke <jornfra...@gmail.com> wrote: > This is not fully correct. If you have less files then you need to move > some data to some other nodes, because not all the data is there for > writing (even the case for the same node, but then it is easier from a > network perspective). Hence a shuffling is needed. > > > Am 15.10.2018 um 05:04 schrieb Koert Kuipers <ko...@tresata.com>: > > sure, i understand currently the workaround is to add a shuffle. but > that's just a workaround, not a satisfactory solution: we shouldn't have to > introduce another shuffle (an expensive operation) just to reduce the > number of files. > > logically all you need is a map-phase with less tasks after the reduce > phase with many tasks to reduce the number of files, but there is currently > no way to express this in spark. it seems the map operation always gets > tagged on to the end of the previous reduce operation, which is generally a > reasonable optimization, but not here since it causes the tasks for the > reduce to go down which is unacceptable. > > On Sun, Oct 14, 2018 at 10:06 PM Wenchen Fan <cloud0...@gmail.com> wrote: > >> You have a heavy workload, you want to run it with many tasks for better >> performance and stability(no OMM), but you also want to run it with few >> tasks to avoid too many small files. The reality is, mostly you can't reach >> these 2 goals together, they conflict with each other. The solution I can >> think of is to sacrifice performance a little: run the workload with many >> tasks at first, and then merge the many small files. Generally this is how >> `coalesce(n, shuffle = true)` does. >> >> On Sat, Oct 13, 2018 at 10:05 PM Koert Kuipers <ko...@tresata.com> wrote: >> >>> we have a collection of programs in dataframe api that all do big >>> shuffles for which we use 2048+ partitions. this works fine but it produces >>> a lot of (small) output files, which put pressure on the memory of the >>> drivers programs of any spark program that reads this data in again. >>> >>> so one of our developers stuck in a .coalesce at the end of every >>> program just before writing to disk to reduce the output files thinking >>> this would solve the many files issue. to his surprise the coalesce caused >>> the existing shuffles to run with less tasks, leading to unacceptable >>> slowdowns and OOMs. so this is not a solution. >>> >>> how can we insert a coalesce as a new map-phase (new job on application >>> manager with narrow dependency) instead of modifying the existing reduce >>> phase? i am saying map-phase because it should not introduce a new shuffle: >>> this is wasteful and unnecessary. >>> >>> >>> On Sat, Oct 13, 2018 at 1:39 AM Wenchen Fan <cloud0...@gmail.com> wrote: >>> >>>> In your first example, the root RDD has 1000 partitions, then you do a >>>> shuffle (with repartitionAndSortWithinPartitions), and shuffles data >>>> to 1000 reducers. Then you do coalesce, which asks Spark to launch >>>> only 20 reducers to process the data which were prepared for 10000 >>>> reducers. since the reducers have heavy work(sorting), so you OOM. In >>>> general, your work flow is: 1000 mappers -> 20 reducers. >>>> >>>> In your second example, the coalesce introduces shuffle, so your work >>>> flow is: 1000 mappers -> 1000 reducers(also mappers) -> 20 reducers. The >>>> sorting is done by 1000 tasks so no OOM. >>>> >>>> BTW have you tried DataFrame API? With Spark SQL, the memory management >>>> is more precise, so even we only have 20 tasks to do the heavy sorting, the >>>> system should just have more disk spills instead of OOM. >>>> >>>> >>>> On Sat, Oct 13, 2018 at 11:35 AM Koert Kuipers <ko...@tresata.com> >>>> wrote: >>>> >>>>> how can i get a shuffle with 2048 partitions and 2048 tasks and then a >>>>> map phase with 10 partitions and 10 tasks that writes to hdfs? >>>>> >>>>> every time i try to do this using coalesce the shuffle ends up having >>>>> 10 tasks which is unacceptable due to OOM. this makes coalesce somewhat >>>>> useless. >>>>> >>>>> On Wed, Oct 10, 2018 at 9:06 AM Wenchen Fan <cloud0...@gmail.com> >>>>> wrote: >>>>> >>>>>> Note that, RDD partitions and Spark tasks are not always 1-1 mapping. >>>>>> >>>>>> Assuming `rdd1` has 100 partitions, and `rdd2 = rdd1.coalesce(10)`. >>>>>> Then `rdd2` has 10 partitions, and there is no shuffle between `rdd1` and >>>>>> `rdd2`. During scheduling, `rdd1` and `rdd2` are in the same stage, and >>>>>> this stage has 10 tasks (decided by the last RDD). This means, each Spark >>>>>> task will process 10 partitions of `rdd1`. >>>>>> >>>>>> Looking at your example, I don't see where is the problem. Can you >>>>>> describe what is not expected? >>>>>> >>>>>> On Wed, Oct 10, 2018 at 2:11 PM Sergey Zhemzhitsky < >>>>>> szh.s...@gmail.com> wrote: >>>>>> >>>>>>> Well, it seems that I can still extend the CoalesceRDD to make it >>>>>>> preserve the total number of partitions from the parent RDD, reduce some >>>>>>> partitons in the same way as the original coalesce does for map-only >>>>>>> jobs >>>>>>> and fill the gaps (partitions which should reside on the positions of >>>>>>> the >>>>>>> coalesced ones) with just a special kind of partitions which do not have >>>>>>> any parent dependencies and always return an empty iterator. >>>>>>> >>>>>>> I believe this should work as desired (at least the previous >>>>>>> ShuffleMapStage will think that the number of partitons in the next >>>>>>> stage, >>>>>>> it generates shuffle output for, is not changed). >>>>>>> >>>>>>> There are few issues though - existence of empty partitions which >>>>>>> can be evaluated almost for free and empty output files from these empty >>>>>>> partitons which can be beaten by means of LazyOutputFormat in case of >>>>>>> RDDs. >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Mon, Oct 8, 2018, 23:57 Koert Kuipers <ko...@tresata.com> wrote: >>>>>>> >>>>>>>> although i personally would describe this as a bug the answer will >>>>>>>> be that this is the intended behavior. the coalesce "infects" the >>>>>>>> shuffle >>>>>>>> before it, making a coalesce useless for reducing output files after a >>>>>>>> shuffle with many partitions b design. >>>>>>>> >>>>>>>> your only option left is a repartition for which you pay the price >>>>>>>> in that it introduces another expensive shuffle. >>>>>>>> >>>>>>>> interestingly if you do a coalesce on a map-only job it knows how >>>>>>>> to reduce the partitions and output files without introducing a >>>>>>>> shuffle, so >>>>>>>> clearly it is possible, but i dont know how to get this behavior after >>>>>>>> a >>>>>>>> shuffle in an existing job. >>>>>>>> >>>>>>>> On Fri, Oct 5, 2018 at 6:34 PM Sergey Zhemzhitsky < >>>>>>>> szh.s...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hello guys, >>>>>>>>> >>>>>>>>> Currently I'm a little bit confused with coalesce behaviour. >>>>>>>>> >>>>>>>>> Consider the following usecase - I'd like to join two pretty big >>>>>>>>> RDDs. >>>>>>>>> To make a join more stable and to prevent it from failures by OOM >>>>>>>>> RDDs >>>>>>>>> are usually repartitioned to redistribute data more evenly and to >>>>>>>>> prevent every partition from hitting 2GB limit. Then after join >>>>>>>>> with a >>>>>>>>> lot of partitions. >>>>>>>>> >>>>>>>>> Then after successful join I'd like to save the resulting dataset. >>>>>>>>> But I don't need such a huge amount of files as the number of >>>>>>>>> partitions/tasks during joining. Actually I'm fine with such >>>>>>>>> number of >>>>>>>>> files as the total number of executor cores allocated to the job. >>>>>>>>> So >>>>>>>>> I've considered using a coalesce. >>>>>>>>> >>>>>>>>> The problem is that coalesce with shuffling disabled prevents join >>>>>>>>> from using the specified number of partitions and instead forces >>>>>>>>> join >>>>>>>>> to use the number of partitions provided to coalesce >>>>>>>>> >>>>>>>>> scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5, >>>>>>>>> false).toDebugString >>>>>>>>> res5: String = >>>>>>>>> (5) CoalescedRDD[15] at coalesce at <console>:25 [] >>>>>>>>> | MapPartitionsRDD[14] at repartition at <console>:25 [] >>>>>>>>> | CoalescedRDD[13] at repartition at <console>:25 [] >>>>>>>>> | ShuffledRDD[12] at repartition at <console>:25 [] >>>>>>>>> +-(20) MapPartitionsRDD[11] at repartition at <console>:25 [] >>>>>>>>> | ParallelCollectionRDD[10] at makeRDD at <console>:25 [] >>>>>>>>> >>>>>>>>> With shuffling enabled everything is ok, e.g. >>>>>>>>> >>>>>>>>> scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5, >>>>>>>>> true).toDebugString >>>>>>>>> res6: String = >>>>>>>>> (5) MapPartitionsRDD[24] at coalesce at <console>:25 [] >>>>>>>>> | CoalescedRDD[23] at coalesce at <console>:25 [] >>>>>>>>> | ShuffledRDD[22] at coalesce at <console>:25 [] >>>>>>>>> +-(100) MapPartitionsRDD[21] at coalesce at <console>:25 [] >>>>>>>>> | MapPartitionsRDD[20] at repartition at <console>:25 [] >>>>>>>>> | CoalescedRDD[19] at repartition at <console>:25 [] >>>>>>>>> | ShuffledRDD[18] at repartition at <console>:25 [] >>>>>>>>> +-(20) MapPartitionsRDD[17] at repartition at <console>:25 [] >>>>>>>>> | ParallelCollectionRDD[16] at makeRDD at <console>:25 [] >>>>>>>>> >>>>>>>>> In that case the problem is that for pretty huge datasets >>>>>>>>> additional >>>>>>>>> reshuffling can take hours or at least comparable amount of time as >>>>>>>>> for the join itself. >>>>>>>>> >>>>>>>>> So I'd like to understand whether it is a bug or just an expected >>>>>>>>> behaviour? >>>>>>>>> In case it is expected is there any way to insert additional >>>>>>>>> ShuffleMapStage into an appropriate position of DAG but without >>>>>>>>> reshuffling itself? >>>>>>>>> >>>>>>>>> >>>>>>>>> --------------------------------------------------------------------- >>>>>>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >>>>>>>>> >>>>>>>>>