Hi everyone,
Thank you all for your valuable comments and discussion on the design
document/this email. I have replied to the comments/concerns raised.
I welcome any other questions and to be challenged further.

Also *Sun Chao* accepted to shepherd this proposal(Thank you!)

If there are no other open questions by Wednesday morning (PST), I will
request Chao to open the official voting thread (which should give 72 hours
for the process <https://spark.apache.org/improvement-proposals.html>).

- Karuppayya

On Tue, Nov 18, 2025 at 12:38 PM Ángel Álvarez Pascua <
[email protected]> wrote:

> One aspect that hasn’t been mentioned yet (or so I think) is the
> thread-level behavior of shuffle. In large workloads with many small
> shuffle blocks, I’ve repeatedly observed executors spawning hundreds of
> threads tied to shuffle fetch operations, Netty client handlers, and block
> file access.
> Since the proposal changes block granularity and fetch patterns, it would
> be valuable to explicitly consider how the consolidation stage affects:
> – the number of concurrent fetch operations
> – thread pool growth / saturation
> – Netty transport threads
> – memory pressure from large in-flight reads
>
> Btw, I find your proposal quite interesting.
>
> El mar, 18 nov 2025, 19:33, karuppayya <[email protected]>
> escribió:
>
>> Rishab, Wenchen, Murali,
>> Thank you very much for taking the time to review the proposal and for
>> providing such thoughtful and insightful comments/questions.
>>
>> *Rishab*,
>>
>>> * suitable across all storage systems*
>>
>> You are right that the suitability is somewhat subjective and dependent
>> on cloud provider used.
>> In general, the goal of ShuffleVault is to utilize the standard Hadoop
>> FileSystem APIs, which means it should work seamlessly with popular cloud
>> and distributed file systems (like S3, HDFS, GFS, etc.).
>> These systems share a similar nature and are designed for large files.
>>
>> *large file could create problems during retrieval.*
>>
>> We are mitigating this risk by ensuring that  tasks do not read the
>> entire consolidated file at once.
>> Instead, the implementation is designed to read the data in configured
>> blocks, rather than relying on a single read. *This behavior can be
>> refined/validated* to make it more robust.
>>
>> *Wenchen,*
>> I fully agree that the operational details around using cloud storage for
>> shuffle—specifically traffic throttling, cleanup guarantees
>> and overall request-related network cost — these are critical issues that
>> must be solved.
>> The consolidation stage is explicitly designed to mitigate the throttling
>> and accompanying cost issues .
>> *Throttling* - By consolidating shuffle data, this approach transforms
>> the read pattern from a multitude of small, random requests into fewer,
>> large, targeted ones. Particularly beneficial for modern cloud object
>> storage.
>> *Shuffle cleanup* -  I am actively trying to leverage the Shufffle clean
>> up mode and also making an effort to make them robust .These cleanup
>> improvements should be beneficial, regardless of this proposal and cover
>> most cases.
>> However, I agree that to ensure *no orphaned files* remain, we will
>> still require other means (such as remote storage lifecycle policies or
>> job-specific scripts) for a guaranteed cleanup.
>> Thank you again for your valuable feedback, especially the validation on
>> synchronous scheduling and AQE integration.
>>
>> *Murali,*
>>
>>> * Doing an explicit sort stage*
>>
>> To clarify, ShuffleVault does not introduce an explicit sort stage.
>> Instead, it introduces a Shuffle Consolidation Stage.
>> This stage is a pure passthrough operation that only aggregates scattered
>> shuffle data for a given reducer partition.
>> In simple terms, it functions as an additional reducer stage that reads
>> the fragmented shuffle files from the mappers and writes them as a single,
>> consolidated, durable file to remote storage.
>>
>> *but that would be a nontrivial change *
>>
>> I agree that the change is significant, I am  actively working to ensure
>> the benefits are leveraged across the stack. This PR
>> <https://github.com/apache/spark/pull/53028> demonstrates integration
>> with AQE and interactions with other rules(Exchange reuse, Shuffle
>> Partition Coalescing ect).
>> I would genuinely appreciate it if you could take a look at the POC PR to
>> see the scope00 of changes. The primary logic is encapsulated within a new 
>> Spark
>> Physical Planner Rule
>> <https://github.com/apache/spark/pull/53028/files#diff-5a444440444095e67e15f707b7f5f34816c4e9c299cec4901a424a29a09874d6>
>> that injects the consolidation stage, which is the main crux.
>>
>> I welcome any further questions or comments!
>>
>> Thanks & Regards
>> Karuppayya
>>
>> On Tue, Nov 18, 2025 at 9:32 AM Mridul Muralidharan <[email protected]>
>> wrote:
>>
>>>
>>> There are existing Apache projects which provide the capabilities which
>>> largely addresses the problem statement - Apache Celeborn, Apache Uniffle,
>>> Zeus, etc.
>>> Doing an explicit sort stage, between "map" and "reduce" brings with it
>>> some nice advantages, especially if the output is durable, but that would
>>> be a nontrivial change - and should be attempted if the benefits are being
>>> leveraged throughout the stack (AQE, speculative execution, etc)
>>>
>>> Regards,
>>> Mridul
>>>
>>> On Tue, Nov 18, 2025 at 11:12 AM Wenchen Fan <[email protected]>
>>> wrote:
>>>
>>>> Hi Karuppayya,
>>>>
>>>> Handling large shuffles in Spark is challenging and it's great to see
>>>> proposals addressing it. I think the extra "shuffle consolidation stage" is
>>>> a good idea, and now I feel it's better for it to be synchronous, so that
>>>> we can integrate it with AQE and leverage the accurate runtime shuffle
>>>> status to make decisions about whether or not to launch this extra "shuffle
>>>> consolidation stage" and how to consolidate. This is a key differentiator
>>>> compared to the push-based shuffle.
>>>>
>>>> However, there are many details to consider, and in general it's
>>>> difficult to use cloud storage for shuffle. We need to deal with problems
>>>> like traffic throttling, cleanup guarantee, cost control, and so on. Let's
>>>> take a step back and see what are the actual problems of large shuffles.
>>>>
>>>> Large shuffle usually starts with a large number of mappers that we
>>>> can't adjust (e.g. large table scan). We can adjust the number of reducers
>>>> to reach two goals:
>>>> 1. The input data size of each reducer shouldn't be too large, which is
>>>> roughly *total_shuffle_size / num_reducers*. This is to avoid
>>>> spilling/OOM during reducer task execution.
>>>> 2. The data size of each shuffle block shoudn't be too small, which is
>>>> roughly *total_shuffle_size / (num_mappers * num_reducers)*. This is
>>>> for the good of disk/network IO.
>>>>
>>>> These two goals are actually contradictory and sometimes we have to
>>>> prioritize goal 1 (i.e. pick a large *num_reducers*) so that the query
>>>> can finish. An extra "shuffle consolidation stage" can kind of decrease the
>>>> number of mappers, by merging the shuffle files from multiple mappers. This
>>>> can be a clear win as fetching many small shuffle blocks can be quite slow,
>>>> even slower than running an extra "shuffle consolidation stage".
>>>>
>>>> In addition, the nodes that host shuffle files shouldn't be too many
>>>> (best to be 0 which means shuffle files are stored in a different storage).
>>>> With a large number of mappers, likely every node in the cluster stores
>>>> some shuffle files. By merging shuffle files via the extra "shuffle
>>>> consolidation stage", we can decrease the number of nodes that host active
>>>> shuffle data, so that the cluster is more elastic.
>>>>
>>>>
>>>> Thanks,
>>>> Wenchen
>>>>
>>>> On Sat, Nov 15, 2025 at 6:13 AM Rishab Joshi <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Karuppayya,
>>>>>
>>>>> Thanks for sharing the proposal and this looks very exciting.
>>>>>
>>>>> I have a few questions and please correct me if I misunderstood
>>>>> anything.
>>>>>
>>>>> Would it be possible to clarify whether the consolidated shuffle file
>>>>> produced for each partition is suitable across all storage systems,
>>>>> especially when this file becomes extremely large? I am wondering if a 
>>>>> very
>>>>> large file could create problems during retrieval. For example, if a
>>>>> connection breaks while reading the file, some storage systems may not
>>>>> support resuming reads from the point of failure and start reading the 
>>>>> file
>>>>> from the beginning again. This could lead to higher latency, repeated
>>>>> retries, or performance bottlenecks when a partition becomes too large or
>>>>> skewed?
>>>>>
>>>>> Would it make sense to introduce a configurable upper-bound on the
>>>>> maximum allowed file size? This might prevent the file from growing
>>>>> massively.
>>>>> Should the consolidated shuffle file be compressed before being
>>>>> written to the storage system. Compression might introduce additional
>>>>> latency but that too can be a configurable option.
>>>>>
>>>>> Regards,
>>>>> Rishab Joshi
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Nov 13, 2025 at 9:14 AM karuppayya <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Enrico,
>>>>>> Thank you very much for reviewing the doc.
>>>>>>
>>>>>> *Since the consolidation stage reads all the shuffle data, why not
>>>>>>> doing the transformation in that stage? What is the point in deferring 
>>>>>>> the
>>>>>>> transformations into another stage?*
>>>>>>
>>>>>>
>>>>>> The reason for deferring the final consolidation to a subsequent
>>>>>> stage lies in the distributed nature of shuffle data.
>>>>>> Reducer requires reading all corresponding shuffle data written
>>>>>> across all map tasks. Since each mapper only holds its own local output,
>>>>>> the consolidation cannot begin until all the map stage completes.
>>>>>>
>>>>>> However, your question is also aligned to one of the approaches
>>>>>> mentioned (concurrent consolidation
>>>>>> <https://docs.google.com/document/d/1tuWyXAaIBR0oVD5KZwYvz7JLyn6jB55_35xeslUEu7s/edit?tab=t.0#heading=h.tmi917h1n1vf>),
>>>>>> which was specifically considered.
>>>>>>
>>>>>> While the synchronous consolidation happens afetr all the data is
>>>>>> available , concurrent consolidation can aggregate and persist the
>>>>>> already-generated shuffle data to begin concurrently with the remaining 
>>>>>> map
>>>>>> tasks, thereby making the shuffle durable much earlier instead of having 
>>>>>> to
>>>>>> wait for all map tasks to complete.
>>>>>>
>>>>>> - Karuppayya
>>>>>>
>>>>>> On Thu, Nov 13, 2025 at 1:13 AM Enrico Minack <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> another remark regarding a remote shuffle storage solution:
>>>>>>> As long as the map executors are alive, reduce executors should read
>>>>>>> from them to avoid any extra delay / overhead.
>>>>>>> On fetch failure from a map executor, the reduce executors should
>>>>>>> fall back to a remote storage that provides a copy (merged or not) of 
>>>>>>> the
>>>>>>> shuffle data.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Enrico
>>>>>>>
>>>>>>>
>>>>>>> Am 13.11.25 um 09:42 schrieb Enrico Minack:
>>>>>>>
>>>>>>> Hi Karuppayya,
>>>>>>>
>>>>>>> thanks for your proposal and bringing up this issue.
>>>>>>>
>>>>>>> I am very much in favour of a shuffle storage solution that allows
>>>>>>> for dynamic allocation and node failure in a K8S environment, without 
>>>>>>> the
>>>>>>> burden of managing an Remote Shuffle Service.
>>>>>>>
>>>>>>> I have the following comments:
>>>>>>>
>>>>>>> Your proposed consolidation stage is equivalent to the next reducer
>>>>>>> stage in the sense that it reads shuffle data from the earlier map 
>>>>>>> stage.
>>>>>>> This requires the executors of the map stage to survive until the 
>>>>>>> shuffle
>>>>>>> data are consolidated ("merged" in Spark terminology). Therefore, I 
>>>>>>> think
>>>>>>> this passage of your design document is not accurate:
>>>>>>>
>>>>>>>     Executors that perform the initial map tasks (shuffle writers)
>>>>>>> can be immediately deallocated after writing their shuffle data ...
>>>>>>>
>>>>>>> Since the consolidation stage reads all the shuffle data, why not
>>>>>>> doing the transformation in that stage? What is the point in deferring 
>>>>>>> the
>>>>>>> transformations into another stage?
>>>>>>>
>>>>>>> You mention the "Native Shuffle Block Migration" and say its
>>>>>>> limitation is "It simply shifts the storage burden to other active
>>>>>>> executors".
>>>>>>> Please consider that the migration process can migrate to a (in
>>>>>>> Spark called) fallback storage, which essentially copies the shuffle 
>>>>>>> data
>>>>>>> to a remote storage.
>>>>>>> Kind regards,
>>>>>>> Enrico
>>>>>>>
>>>>>>> Am 13.11.25 um 01:40 schrieb karuppayya:
>>>>>>>
>>>>>>>  Hi All,
>>>>>>>
>>>>>>> I propose to utilize *Remote Storage as a Shuffle Store, natively
>>>>>>> in Spark* .
>>>>>>>
>>>>>>> This approach would fundamentally decouple shuffle storage from
>>>>>>> compute nodes, mitigating *shuffle fetch failures and also help
>>>>>>> with aggressive downscaling*.
>>>>>>>
>>>>>>> The primary goal is to enhance the *elasticity and resilience* of
>>>>>>> Spark workloads, leading to substantial cost optimization opportunities.
>>>>>>>
>>>>>>> *I welcome any initial thoughts or concerns regarding this idea.*
>>>>>>> *Looking forward to your feedback! *
>>>>>>>
>>>>>>> JIRA: SPARK-53484
>>>>>>> <https://issues.apache.org/jira/browse/SPARK-54327>
>>>>>>> SPIP doc
>>>>>>> <https://docs.google.com/document/d/1leywkLgD62-MdG7e57n0vFRi7ICNxn9el9hpgchsVnk/edit?tab=t.0#heading=h.u4h68wupq6lw>
>>>>>>> ,
>>>>>>> Design doc
>>>>>>> <https://docs.google.com/document/d/1tuWyXAaIBR0oVD5KZwYvz7JLyn6jB55_35xeslUEu7s/edit?tab=t.0>
>>>>>>> PoC PR <https://github.com/apache/spark/pull/53028>
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Karuppayya
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>> --
>>>>> Regards
>>>>> Rishab Joshi
>>>>>
>>>>

Reply via email to