There are existing Apache projects which provide the capabilities which
largely addresses the problem statement - Apache Celeborn, Apache Uniffle,
Zeus, etc.
Doing an explicit sort stage, between "map" and "reduce" brings with it
some nice advantages, especially if the output is durable, but that would
be a nontrivial change - and should be attempted if the benefits are being
leveraged throughout the stack (AQE, speculative execution, etc)

Regards,
Mridul

On Tue, Nov 18, 2025 at 11:12 AM Wenchen Fan <[email protected]> wrote:

> Hi Karuppayya,
>
> Handling large shuffles in Spark is challenging and it's great to see
> proposals addressing it. I think the extra "shuffle consolidation stage" is
> a good idea, and now I feel it's better for it to be synchronous, so that
> we can integrate it with AQE and leverage the accurate runtime shuffle
> status to make decisions about whether or not to launch this extra "shuffle
> consolidation stage" and how to consolidate. This is a key differentiator
> compared to the push-based shuffle.
>
> However, there are many details to consider, and in general it's difficult
> to use cloud storage for shuffle. We need to deal with problems like
> traffic throttling, cleanup guarantee, cost control, and so on. Let's take
> a step back and see what are the actual problems of large shuffles.
>
> Large shuffle usually starts with a large number of mappers that we can't
> adjust (e.g. large table scan). We can adjust the number of reducers to
> reach two goals:
> 1. The input data size of each reducer shouldn't be too large, which is
> roughly *total_shuffle_size / num_reducers*. This is to avoid
> spilling/OOM during reducer task execution.
> 2. The data size of each shuffle block shoudn't be too small, which is
> roughly *total_shuffle_size / (num_mappers * num_reducers)*. This is for
> the good of disk/network IO.
>
> These two goals are actually contradictory and sometimes we have to
> prioritize goal 1 (i.e. pick a large *num_reducers*) so that the query
> can finish. An extra "shuffle consolidation stage" can kind of decrease the
> number of mappers, by merging the shuffle files from multiple mappers. This
> can be a clear win as fetching many small shuffle blocks can be quite slow,
> even slower than running an extra "shuffle consolidation stage".
>
> In addition, the nodes that host shuffle files shouldn't be too many (best
> to be 0 which means shuffle files are stored in a different storage). With
> a large number of mappers, likely every node in the cluster stores some
> shuffle files. By merging shuffle files via the extra "shuffle
> consolidation stage", we can decrease the number of nodes that host active
> shuffle data, so that the cluster is more elastic.
>
>
> Thanks,
> Wenchen
>
> On Sat, Nov 15, 2025 at 6:13 AM Rishab Joshi <[email protected]>
> wrote:
>
>> Hi Karuppayya,
>>
>> Thanks for sharing the proposal and this looks very exciting.
>>
>> I have a few questions and please correct me if I misunderstood anything.
>>
>> Would it be possible to clarify whether the consolidated shuffle file
>> produced for each partition is suitable across all storage systems,
>> especially when this file becomes extremely large? I am wondering if a very
>> large file could create problems during retrieval. For example, if a
>> connection breaks while reading the file, some storage systems may not
>> support resuming reads from the point of failure and start reading the file
>> from the beginning again. This could lead to higher latency, repeated
>> retries, or performance bottlenecks when a partition becomes too large or
>> skewed?
>>
>> Would it make sense to introduce a configurable upper-bound on the
>> maximum allowed file size? This might prevent the file from growing
>> massively.
>> Should the consolidated shuffle file be compressed before being written
>> to the storage system. Compression might introduce additional latency but
>> that too can be a configurable option.
>>
>> Regards,
>> Rishab Joshi
>>
>>
>>
>>
>>
>> On Thu, Nov 13, 2025 at 9:14 AM karuppayya <[email protected]>
>> wrote:
>>
>>> Enrico,
>>> Thank you very much for reviewing the doc.
>>>
>>> *Since the consolidation stage reads all the shuffle data, why not doing
>>>> the transformation in that stage? What is the point in deferring the
>>>> transformations into another stage?*
>>>
>>>
>>> The reason for deferring the final consolidation to a subsequent stage
>>> lies in the distributed nature of shuffle data.
>>> Reducer requires reading all corresponding shuffle data written across
>>> all map tasks. Since each mapper only holds its own local output, the
>>> consolidation cannot begin until all the map stage completes.
>>>
>>> However, your question is also aligned to one of the approaches
>>> mentioned (concurrent consolidation
>>> <https://docs.google.com/document/d/1tuWyXAaIBR0oVD5KZwYvz7JLyn6jB55_35xeslUEu7s/edit?tab=t.0#heading=h.tmi917h1n1vf>),
>>> which was specifically considered.
>>>
>>> While the synchronous consolidation happens afetr all the data is
>>> available , concurrent consolidation can aggregate and persist the
>>> already-generated shuffle data to begin concurrently with the remaining map
>>> tasks, thereby making the shuffle durable much earlier instead of having to
>>> wait for all map tasks to complete.
>>>
>>> - Karuppayya
>>>
>>> On Thu, Nov 13, 2025 at 1:13 AM Enrico Minack <[email protected]>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> another remark regarding a remote shuffle storage solution:
>>>> As long as the map executors are alive, reduce executors should read
>>>> from them to avoid any extra delay / overhead.
>>>> On fetch failure from a map executor, the reduce executors should fall
>>>> back to a remote storage that provides a copy (merged or not) of the
>>>> shuffle data.
>>>>
>>>> Cheers,
>>>> Enrico
>>>>
>>>>
>>>> Am 13.11.25 um 09:42 schrieb Enrico Minack:
>>>>
>>>> Hi Karuppayya,
>>>>
>>>> thanks for your proposal and bringing up this issue.
>>>>
>>>> I am very much in favour of a shuffle storage solution that allows for
>>>> dynamic allocation and node failure in a K8S environment, without the
>>>> burden of managing an Remote Shuffle Service.
>>>>
>>>> I have the following comments:
>>>>
>>>> Your proposed consolidation stage is equivalent to the next reducer
>>>> stage in the sense that it reads shuffle data from the earlier map stage.
>>>> This requires the executors of the map stage to survive until the shuffle
>>>> data are consolidated ("merged" in Spark terminology). Therefore, I think
>>>> this passage of your design document is not accurate:
>>>>
>>>>     Executors that perform the initial map tasks (shuffle writers) can
>>>> be immediately deallocated after writing their shuffle data ...
>>>>
>>>> Since the consolidation stage reads all the shuffle data, why not doing
>>>> the transformation in that stage? What is the point in deferring the
>>>> transformations into another stage?
>>>>
>>>> You mention the "Native Shuffle Block Migration" and say its limitation
>>>> is "It simply shifts the storage burden to other active executors".
>>>> Please consider that the migration process can migrate to a (in Spark
>>>> called) fallback storage, which essentially copies the shuffle data to a
>>>> remote storage.
>>>> Kind regards,
>>>> Enrico
>>>>
>>>> Am 13.11.25 um 01:40 schrieb karuppayya:
>>>>
>>>>  Hi All,
>>>>
>>>> I propose to utilize *Remote Storage as a Shuffle Store, natively in
>>>> Spark* .
>>>>
>>>> This approach would fundamentally decouple shuffle storage from compute
>>>> nodes, mitigating *shuffle fetch failures and also help with
>>>> aggressive downscaling*.
>>>>
>>>> The primary goal is to enhance the *elasticity and resilience* of
>>>> Spark workloads, leading to substantial cost optimization opportunities.
>>>>
>>>> *I welcome any initial thoughts or concerns regarding this idea.*
>>>> *Looking forward to your feedback! *
>>>>
>>>> JIRA: SPARK-53484 <https://issues.apache.org/jira/browse/SPARK-54327>
>>>> SPIP doc
>>>> <https://docs.google.com/document/d/1leywkLgD62-MdG7e57n0vFRi7ICNxn9el9hpgchsVnk/edit?tab=t.0#heading=h.u4h68wupq6lw>
>>>> ,
>>>> Design doc
>>>> <https://docs.google.com/document/d/1tuWyXAaIBR0oVD5KZwYvz7JLyn6jB55_35xeslUEu7s/edit?tab=t.0>
>>>> PoC PR <https://github.com/apache/spark/pull/53028>
>>>>
>>>> Thanks,
>>>> Karuppayya
>>>>
>>>>
>>>>
>>>>
>>
>> --
>> Regards
>> Rishab Joshi
>>
>

Reply via email to