Hi everyone, Thank you all for your valuable comments and discussion on the design document/this email. I have replied to the comments/concerns raised. I welcome any other questions and to be challenged further.
Also *Sun Chao* accepted to shepherd this proposal(Thank you!) If there are no other open questions by Wednesday morning (PST), I will request Chao to open the official voting thread (which should give 72 hours for the process <https://spark.apache.org/improvement-proposals.html>). - Karuppayya On Tue, Nov 18, 2025 at 12:38 PM Ángel Álvarez Pascua < [email protected]> wrote: > One aspect that hasn’t been mentioned yet (or so I think) is the > thread-level behavior of shuffle. In large workloads with many small > shuffle blocks, I’ve repeatedly observed executors spawning hundreds of > threads tied to shuffle fetch operations, Netty client handlers, and block > file access. > Since the proposal changes block granularity and fetch patterns, it would > be valuable to explicitly consider how the consolidation stage affects: > – the number of concurrent fetch operations > – thread pool growth / saturation > – Netty transport threads > – memory pressure from large in-flight reads > > Btw, I find your proposal quite interesting. > > El mar, 18 nov 2025, 19:33, karuppayya <[email protected]> > escribió: > >> Rishab, Wenchen, Murali, >> Thank you very much for taking the time to review the proposal and for >> providing such thoughtful and insightful comments/questions. >> >> *Rishab*, >> >>> * suitable across all storage systems* >> >> You are right that the suitability is somewhat subjective and dependent >> on cloud provider used. >> In general, the goal of ShuffleVault is to utilize the standard Hadoop >> FileSystem APIs, which means it should work seamlessly with popular cloud >> and distributed file systems (like S3, HDFS, GFS, etc.). >> These systems share a similar nature and are designed for large files. >> >> *large file could create problems during retrieval.* >> >> We are mitigating this risk by ensuring that tasks do not read the >> entire consolidated file at once. >> Instead, the implementation is designed to read the data in configured >> blocks, rather than relying on a single read. *This behavior can be >> refined/validated* to make it more robust. >> >> *Wenchen,* >> I fully agree that the operational details around using cloud storage for >> shuffle—specifically traffic throttling, cleanup guarantees >> and overall request-related network cost — these are critical issues that >> must be solved. >> The consolidation stage is explicitly designed to mitigate the throttling >> and accompanying cost issues . >> *Throttling* - By consolidating shuffle data, this approach transforms >> the read pattern from a multitude of small, random requests into fewer, >> large, targeted ones. Particularly beneficial for modern cloud object >> storage. >> *Shuffle cleanup* - I am actively trying to leverage the Shufffle clean >> up mode and also making an effort to make them robust .These cleanup >> improvements should be beneficial, regardless of this proposal and cover >> most cases. >> However, I agree that to ensure *no orphaned files* remain, we will >> still require other means (such as remote storage lifecycle policies or >> job-specific scripts) for a guaranteed cleanup. >> Thank you again for your valuable feedback, especially the validation on >> synchronous scheduling and AQE integration. >> >> *Murali,* >> >>> * Doing an explicit sort stage* >> >> To clarify, ShuffleVault does not introduce an explicit sort stage. >> Instead, it introduces a Shuffle Consolidation Stage. >> This stage is a pure passthrough operation that only aggregates scattered >> shuffle data for a given reducer partition. >> In simple terms, it functions as an additional reducer stage that reads >> the fragmented shuffle files from the mappers and writes them as a single, >> consolidated, durable file to remote storage. >> >> *but that would be a nontrivial change * >> >> I agree that the change is significant, I am actively working to ensure >> the benefits are leveraged across the stack. This PR >> <https://github.com/apache/spark/pull/53028> demonstrates integration >> with AQE and interactions with other rules(Exchange reuse, Shuffle >> Partition Coalescing ect). >> I would genuinely appreciate it if you could take a look at the POC PR to >> see the scope00 of changes. The primary logic is encapsulated within a new >> Spark >> Physical Planner Rule >> <https://github.com/apache/spark/pull/53028/files#diff-5a444440444095e67e15f707b7f5f34816c4e9c299cec4901a424a29a09874d6> >> that injects the consolidation stage, which is the main crux. >> >> I welcome any further questions or comments! >> >> Thanks & Regards >> Karuppayya >> >> On Tue, Nov 18, 2025 at 9:32 AM Mridul Muralidharan <[email protected]> >> wrote: >> >>> >>> There are existing Apache projects which provide the capabilities which >>> largely addresses the problem statement - Apache Celeborn, Apache Uniffle, >>> Zeus, etc. >>> Doing an explicit sort stage, between "map" and "reduce" brings with it >>> some nice advantages, especially if the output is durable, but that would >>> be a nontrivial change - and should be attempted if the benefits are being >>> leveraged throughout the stack (AQE, speculative execution, etc) >>> >>> Regards, >>> Mridul >>> >>> On Tue, Nov 18, 2025 at 11:12 AM Wenchen Fan <[email protected]> >>> wrote: >>> >>>> Hi Karuppayya, >>>> >>>> Handling large shuffles in Spark is challenging and it's great to see >>>> proposals addressing it. I think the extra "shuffle consolidation stage" is >>>> a good idea, and now I feel it's better for it to be synchronous, so that >>>> we can integrate it with AQE and leverage the accurate runtime shuffle >>>> status to make decisions about whether or not to launch this extra "shuffle >>>> consolidation stage" and how to consolidate. This is a key differentiator >>>> compared to the push-based shuffle. >>>> >>>> However, there are many details to consider, and in general it's >>>> difficult to use cloud storage for shuffle. We need to deal with problems >>>> like traffic throttling, cleanup guarantee, cost control, and so on. Let's >>>> take a step back and see what are the actual problems of large shuffles. >>>> >>>> Large shuffle usually starts with a large number of mappers that we >>>> can't adjust (e.g. large table scan). We can adjust the number of reducers >>>> to reach two goals: >>>> 1. The input data size of each reducer shouldn't be too large, which is >>>> roughly *total_shuffle_size / num_reducers*. This is to avoid >>>> spilling/OOM during reducer task execution. >>>> 2. The data size of each shuffle block shoudn't be too small, which is >>>> roughly *total_shuffle_size / (num_mappers * num_reducers)*. This is >>>> for the good of disk/network IO. >>>> >>>> These two goals are actually contradictory and sometimes we have to >>>> prioritize goal 1 (i.e. pick a large *num_reducers*) so that the query >>>> can finish. An extra "shuffle consolidation stage" can kind of decrease the >>>> number of mappers, by merging the shuffle files from multiple mappers. This >>>> can be a clear win as fetching many small shuffle blocks can be quite slow, >>>> even slower than running an extra "shuffle consolidation stage". >>>> >>>> In addition, the nodes that host shuffle files shouldn't be too many >>>> (best to be 0 which means shuffle files are stored in a different storage). >>>> With a large number of mappers, likely every node in the cluster stores >>>> some shuffle files. By merging shuffle files via the extra "shuffle >>>> consolidation stage", we can decrease the number of nodes that host active >>>> shuffle data, so that the cluster is more elastic. >>>> >>>> >>>> Thanks, >>>> Wenchen >>>> >>>> On Sat, Nov 15, 2025 at 6:13 AM Rishab Joshi <[email protected]> >>>> wrote: >>>> >>>>> Hi Karuppayya, >>>>> >>>>> Thanks for sharing the proposal and this looks very exciting. >>>>> >>>>> I have a few questions and please correct me if I misunderstood >>>>> anything. >>>>> >>>>> Would it be possible to clarify whether the consolidated shuffle file >>>>> produced for each partition is suitable across all storage systems, >>>>> especially when this file becomes extremely large? I am wondering if a >>>>> very >>>>> large file could create problems during retrieval. For example, if a >>>>> connection breaks while reading the file, some storage systems may not >>>>> support resuming reads from the point of failure and start reading the >>>>> file >>>>> from the beginning again. This could lead to higher latency, repeated >>>>> retries, or performance bottlenecks when a partition becomes too large or >>>>> skewed? >>>>> >>>>> Would it make sense to introduce a configurable upper-bound on the >>>>> maximum allowed file size? This might prevent the file from growing >>>>> massively. >>>>> Should the consolidated shuffle file be compressed before being >>>>> written to the storage system. Compression might introduce additional >>>>> latency but that too can be a configurable option. >>>>> >>>>> Regards, >>>>> Rishab Joshi >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Thu, Nov 13, 2025 at 9:14 AM karuppayya <[email protected]> >>>>> wrote: >>>>> >>>>>> Enrico, >>>>>> Thank you very much for reviewing the doc. >>>>>> >>>>>> *Since the consolidation stage reads all the shuffle data, why not >>>>>>> doing the transformation in that stage? What is the point in deferring >>>>>>> the >>>>>>> transformations into another stage?* >>>>>> >>>>>> >>>>>> The reason for deferring the final consolidation to a subsequent >>>>>> stage lies in the distributed nature of shuffle data. >>>>>> Reducer requires reading all corresponding shuffle data written >>>>>> across all map tasks. Since each mapper only holds its own local output, >>>>>> the consolidation cannot begin until all the map stage completes. >>>>>> >>>>>> However, your question is also aligned to one of the approaches >>>>>> mentioned (concurrent consolidation >>>>>> <https://docs.google.com/document/d/1tuWyXAaIBR0oVD5KZwYvz7JLyn6jB55_35xeslUEu7s/edit?tab=t.0#heading=h.tmi917h1n1vf>), >>>>>> which was specifically considered. >>>>>> >>>>>> While the synchronous consolidation happens afetr all the data is >>>>>> available , concurrent consolidation can aggregate and persist the >>>>>> already-generated shuffle data to begin concurrently with the remaining >>>>>> map >>>>>> tasks, thereby making the shuffle durable much earlier instead of having >>>>>> to >>>>>> wait for all map tasks to complete. >>>>>> >>>>>> - Karuppayya >>>>>> >>>>>> On Thu, Nov 13, 2025 at 1:13 AM Enrico Minack <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> another remark regarding a remote shuffle storage solution: >>>>>>> As long as the map executors are alive, reduce executors should read >>>>>>> from them to avoid any extra delay / overhead. >>>>>>> On fetch failure from a map executor, the reduce executors should >>>>>>> fall back to a remote storage that provides a copy (merged or not) of >>>>>>> the >>>>>>> shuffle data. >>>>>>> >>>>>>> Cheers, >>>>>>> Enrico >>>>>>> >>>>>>> >>>>>>> Am 13.11.25 um 09:42 schrieb Enrico Minack: >>>>>>> >>>>>>> Hi Karuppayya, >>>>>>> >>>>>>> thanks for your proposal and bringing up this issue. >>>>>>> >>>>>>> I am very much in favour of a shuffle storage solution that allows >>>>>>> for dynamic allocation and node failure in a K8S environment, without >>>>>>> the >>>>>>> burden of managing an Remote Shuffle Service. >>>>>>> >>>>>>> I have the following comments: >>>>>>> >>>>>>> Your proposed consolidation stage is equivalent to the next reducer >>>>>>> stage in the sense that it reads shuffle data from the earlier map >>>>>>> stage. >>>>>>> This requires the executors of the map stage to survive until the >>>>>>> shuffle >>>>>>> data are consolidated ("merged" in Spark terminology). Therefore, I >>>>>>> think >>>>>>> this passage of your design document is not accurate: >>>>>>> >>>>>>> Executors that perform the initial map tasks (shuffle writers) >>>>>>> can be immediately deallocated after writing their shuffle data ... >>>>>>> >>>>>>> Since the consolidation stage reads all the shuffle data, why not >>>>>>> doing the transformation in that stage? What is the point in deferring >>>>>>> the >>>>>>> transformations into another stage? >>>>>>> >>>>>>> You mention the "Native Shuffle Block Migration" and say its >>>>>>> limitation is "It simply shifts the storage burden to other active >>>>>>> executors". >>>>>>> Please consider that the migration process can migrate to a (in >>>>>>> Spark called) fallback storage, which essentially copies the shuffle >>>>>>> data >>>>>>> to a remote storage. >>>>>>> Kind regards, >>>>>>> Enrico >>>>>>> >>>>>>> Am 13.11.25 um 01:40 schrieb karuppayya: >>>>>>> >>>>>>> Hi All, >>>>>>> >>>>>>> I propose to utilize *Remote Storage as a Shuffle Store, natively >>>>>>> in Spark* . >>>>>>> >>>>>>> This approach would fundamentally decouple shuffle storage from >>>>>>> compute nodes, mitigating *shuffle fetch failures and also help >>>>>>> with aggressive downscaling*. >>>>>>> >>>>>>> The primary goal is to enhance the *elasticity and resilience* of >>>>>>> Spark workloads, leading to substantial cost optimization opportunities. >>>>>>> >>>>>>> *I welcome any initial thoughts or concerns regarding this idea.* >>>>>>> *Looking forward to your feedback! * >>>>>>> >>>>>>> JIRA: SPARK-53484 >>>>>>> <https://issues.apache.org/jira/browse/SPARK-54327> >>>>>>> SPIP doc >>>>>>> <https://docs.google.com/document/d/1leywkLgD62-MdG7e57n0vFRi7ICNxn9el9hpgchsVnk/edit?tab=t.0#heading=h.u4h68wupq6lw> >>>>>>> , >>>>>>> Design doc >>>>>>> <https://docs.google.com/document/d/1tuWyXAaIBR0oVD5KZwYvz7JLyn6jB55_35xeslUEu7s/edit?tab=t.0> >>>>>>> PoC PR <https://github.com/apache/spark/pull/53028> >>>>>>> >>>>>>> Thanks, >>>>>>> Karuppayya >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>> >>>>> -- >>>>> Regards >>>>> Rishab Joshi >>>>> >>>>
