i realize it is unlikely all data will be local to tasks, so placement will
not be optimal and there will be some network traffic, but is this the same
as a shuffle?

in CoalesceRDD it shows a NarrowDependency, which i thought meant it could
be implemented without a shuffle.

On Mon, Oct 15, 2018 at 2:49 AM Jörn Franke <jornfra...@gmail.com> wrote:

> This is not fully correct. If you have less files then you need to move
> some data to some other nodes, because not all the data is there for
> writing (even the case for the same node, but then it is easier from a
> network perspective). Hence a shuffling is needed.
>
>
> Am 15.10.2018 um 05:04 schrieb Koert Kuipers <ko...@tresata.com>:
>
> sure, i understand currently the workaround is to add a shuffle. but
> that's just a workaround, not a satisfactory solution: we shouldn't have to
> introduce another shuffle (an expensive operation) just to reduce the
> number of files.
>
> logically all you need is a map-phase with less tasks after the reduce
> phase with many tasks to reduce the number of files, but there is currently
> no way to express this in spark. it seems the map operation always gets
> tagged on to the end of the previous reduce operation, which is generally a
> reasonable optimization, but not here since it causes the tasks for the
> reduce to go down which is unacceptable.
>
> On Sun, Oct 14, 2018 at 10:06 PM Wenchen Fan <cloud0...@gmail.com> wrote:
>
>> You have a heavy workload, you want to run it with many tasks for better
>> performance and stability(no OMM), but you also want to run it with few
>> tasks to avoid too many small files. The reality is, mostly you can't reach
>> these 2 goals together, they conflict with each other. The solution I can
>> think of is to sacrifice performance a little: run the workload with many
>> tasks at first, and then merge the many small files. Generally this is how
>> `coalesce(n, shuffle = true)` does.
>>
>> On Sat, Oct 13, 2018 at 10:05 PM Koert Kuipers <ko...@tresata.com> wrote:
>>
>>> we have a collection of programs in dataframe api that all do big
>>> shuffles for which we use 2048+ partitions. this works fine but it produces
>>> a lot of (small) output files, which put pressure on the memory of the
>>> drivers programs of any spark program that reads this data in again.
>>>
>>> so one of our developers stuck in a .coalesce at the end of every
>>> program just before writing to disk to reduce the output files thinking
>>> this would solve the many files issue. to his surprise the coalesce caused
>>> the existing shuffles to run with less tasks, leading to unacceptable
>>> slowdowns and OOMs. so this is not a solution.
>>>
>>> how can we insert a coalesce as a new map-phase (new job on application
>>> manager with narrow dependency) instead of modifying the existing reduce
>>> phase? i am saying map-phase because it should not introduce a new shuffle:
>>> this is wasteful and unnecessary.
>>>
>>>
>>> On Sat, Oct 13, 2018 at 1:39 AM Wenchen Fan <cloud0...@gmail.com> wrote:
>>>
>>>> In your first example, the root RDD has 1000 partitions, then you do a
>>>> shuffle (with repartitionAndSortWithinPartitions), and shuffles data
>>>> to 1000 reducers. Then you do coalesce, which asks Spark to launch
>>>> only 20 reducers to process the data which were prepared for 10000
>>>> reducers. since the reducers have heavy work(sorting), so you OOM. In
>>>> general, your work flow is: 1000 mappers -> 20 reducers.
>>>>
>>>> In your second example, the coalesce introduces shuffle, so your work
>>>> flow is: 1000 mappers -> 1000 reducers(also mappers) -> 20 reducers. The
>>>> sorting is done by 1000 tasks so no OOM.
>>>>
>>>> BTW have you tried DataFrame API? With Spark SQL, the memory management
>>>> is more precise, so even we only have 20 tasks to do the heavy sorting, the
>>>> system should just have more disk spills instead of OOM.
>>>>
>>>>
>>>> On Sat, Oct 13, 2018 at 11:35 AM Koert Kuipers <ko...@tresata.com>
>>>> wrote:
>>>>
>>>>> how can i get a shuffle with 2048 partitions and 2048 tasks and then a
>>>>> map phase with 10 partitions and 10 tasks that writes to hdfs?
>>>>>
>>>>> every time i try to do this using coalesce the shuffle ends up having
>>>>> 10 tasks which is unacceptable due to OOM. this makes coalesce somewhat
>>>>> useless.
>>>>>
>>>>> On Wed, Oct 10, 2018 at 9:06 AM Wenchen Fan <cloud0...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Note that, RDD partitions and Spark tasks are not always 1-1 mapping.
>>>>>>
>>>>>> Assuming `rdd1` has 100 partitions, and `rdd2 = rdd1.coalesce(10)`.
>>>>>> Then `rdd2` has 10 partitions, and there is no shuffle between `rdd1` and
>>>>>> `rdd2`. During scheduling, `rdd1` and `rdd2` are in the same stage, and
>>>>>> this stage has 10 tasks (decided by the last RDD). This means, each Spark
>>>>>> task will process 10 partitions of `rdd1`.
>>>>>>
>>>>>> Looking at your example, I don't see where is the problem. Can you
>>>>>> describe what is not expected?
>>>>>>
>>>>>> On Wed, Oct 10, 2018 at 2:11 PM Sergey Zhemzhitsky <
>>>>>> szh.s...@gmail.com> wrote:
>>>>>>
>>>>>>> Well, it seems that I can still extend the CoalesceRDD to make it
>>>>>>> preserve the total number of partitions from the parent RDD, reduce some
>>>>>>> partitons in the same way as the original coalesce does for map-only 
>>>>>>> jobs
>>>>>>> and fill the gaps (partitions which should reside on the positions of 
>>>>>>> the
>>>>>>> coalesced ones) with just a special kind of partitions which do not have
>>>>>>> any parent dependencies and always return an empty iterator.
>>>>>>>
>>>>>>> I believe this should work as desired (at least the previous
>>>>>>> ShuffleMapStage will think that the number of partitons in the next 
>>>>>>> stage,
>>>>>>> it generates shuffle output for, is not changed).
>>>>>>>
>>>>>>> There are few issues though - existence of empty partitions which
>>>>>>> can be evaluated almost for free and empty output files from these empty
>>>>>>> partitons which can be beaten by means of LazyOutputFormat in case of 
>>>>>>> RDDs.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Oct 8, 2018, 23:57 Koert Kuipers <ko...@tresata.com> wrote:
>>>>>>>
>>>>>>>> although i personally would describe this as a bug the answer will
>>>>>>>> be that this is the intended behavior. the coalesce "infects" the 
>>>>>>>> shuffle
>>>>>>>> before it, making a coalesce useless for reducing output files after a
>>>>>>>> shuffle with many partitions b design.
>>>>>>>>
>>>>>>>> your only option left is a repartition for which you pay the price
>>>>>>>> in that it introduces another expensive shuffle.
>>>>>>>>
>>>>>>>> interestingly if you do a coalesce on a map-only job it knows how
>>>>>>>> to reduce the partitions and output files without introducing a 
>>>>>>>> shuffle, so
>>>>>>>> clearly it is possible, but i dont know how to get this behavior after 
>>>>>>>> a
>>>>>>>> shuffle in an existing job.
>>>>>>>>
>>>>>>>> On Fri, Oct 5, 2018 at 6:34 PM Sergey Zhemzhitsky <
>>>>>>>> szh.s...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hello guys,
>>>>>>>>>
>>>>>>>>> Currently I'm a little bit confused with coalesce behaviour.
>>>>>>>>>
>>>>>>>>> Consider the following usecase - I'd like to join two pretty big
>>>>>>>>> RDDs.
>>>>>>>>> To make a join more stable and to prevent it from failures by OOM
>>>>>>>>> RDDs
>>>>>>>>> are usually repartitioned to redistribute data more evenly and to
>>>>>>>>> prevent every partition from hitting 2GB limit. Then after join
>>>>>>>>> with a
>>>>>>>>> lot of partitions.
>>>>>>>>>
>>>>>>>>> Then after successful join I'd like to save the resulting dataset.
>>>>>>>>> But I don't need such a huge amount of files as the number of
>>>>>>>>> partitions/tasks during joining. Actually I'm fine with such
>>>>>>>>> number of
>>>>>>>>> files as the total number of executor cores allocated to the job.
>>>>>>>>> So
>>>>>>>>> I've considered using a coalesce.
>>>>>>>>>
>>>>>>>>> The problem is that coalesce with shuffling disabled prevents join
>>>>>>>>> from using the specified number of partitions and instead forces
>>>>>>>>> join
>>>>>>>>> to use the number of partitions provided to coalesce
>>>>>>>>>
>>>>>>>>> scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5,
>>>>>>>>> false).toDebugString
>>>>>>>>> res5: String =
>>>>>>>>> (5) CoalescedRDD[15] at coalesce at <console>:25 []
>>>>>>>>>  |  MapPartitionsRDD[14] at repartition at <console>:25 []
>>>>>>>>>  |  CoalescedRDD[13] at repartition at <console>:25 []
>>>>>>>>>  |  ShuffledRDD[12] at repartition at <console>:25 []
>>>>>>>>>  +-(20) MapPartitionsRDD[11] at repartition at <console>:25 []
>>>>>>>>>     |   ParallelCollectionRDD[10] at makeRDD at <console>:25 []
>>>>>>>>>
>>>>>>>>> With shuffling enabled everything is ok, e.g.
>>>>>>>>>
>>>>>>>>> scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5,
>>>>>>>>> true).toDebugString
>>>>>>>>> res6: String =
>>>>>>>>> (5) MapPartitionsRDD[24] at coalesce at <console>:25 []
>>>>>>>>>  |  CoalescedRDD[23] at coalesce at <console>:25 []
>>>>>>>>>  |  ShuffledRDD[22] at coalesce at <console>:25 []
>>>>>>>>>  +-(100) MapPartitionsRDD[21] at coalesce at <console>:25 []
>>>>>>>>>      |   MapPartitionsRDD[20] at repartition at <console>:25 []
>>>>>>>>>      |   CoalescedRDD[19] at repartition at <console>:25 []
>>>>>>>>>      |   ShuffledRDD[18] at repartition at <console>:25 []
>>>>>>>>>      +-(20) MapPartitionsRDD[17] at repartition at <console>:25 []
>>>>>>>>>         |   ParallelCollectionRDD[16] at makeRDD at <console>:25 []
>>>>>>>>>
>>>>>>>>> In that case the problem is that for pretty huge datasets
>>>>>>>>> additional
>>>>>>>>> reshuffling can take hours or at least comparable amount of time as
>>>>>>>>> for the join itself.
>>>>>>>>>
>>>>>>>>> So I'd like to understand whether it is a bug or just an expected
>>>>>>>>> behaviour?
>>>>>>>>> In case it is expected is there any way to insert additional
>>>>>>>>> ShuffleMapStage into an appropriate position of DAG but without
>>>>>>>>> reshuffling itself?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>>>>>>>
>>>>>>>>>

Reply via email to