Please remove me from this distribution list. Thanks
On Mon, Dec 1, 2025 at 2:33 PM karuppayya <[email protected]> wrote: > Hi everyone, > Thank you all for your valuable comments and discussion on the design > document/this email. I have replied to the comments/concerns raised. > I welcome any other questions and to be challenged further. > > Also *Sun Chao* accepted to shepherd this proposal(Thank you!) > > If there are no other open questions by Wednesday morning (PST), I will > request Chao to open the official voting thread (which should give 72 > hours for the process > <https://spark.apache.org/improvement-proposals.html>). > > - Karuppayya > > On Tue, Nov 18, 2025 at 12:38 PM Ángel Álvarez Pascua < > [email protected]> wrote: > >> One aspect that hasn’t been mentioned yet (or so I think) is the >> thread-level behavior of shuffle. In large workloads with many small >> shuffle blocks, I’ve repeatedly observed executors spawning hundreds of >> threads tied to shuffle fetch operations, Netty client handlers, and block >> file access. >> Since the proposal changes block granularity and fetch patterns, it would >> be valuable to explicitly consider how the consolidation stage affects: >> – the number of concurrent fetch operations >> – thread pool growth / saturation >> – Netty transport threads >> – memory pressure from large in-flight reads >> >> Btw, I find your proposal quite interesting. >> >> El mar, 18 nov 2025, 19:33, karuppayya <[email protected]> >> escribió: >> >>> Rishab, Wenchen, Murali, >>> Thank you very much for taking the time to review the proposal and for >>> providing such thoughtful and insightful comments/questions. >>> >>> *Rishab*, >>> >>>> * suitable across all storage systems* >>> >>> You are right that the suitability is somewhat subjective and dependent >>> on cloud provider used. >>> In general, the goal of ShuffleVault is to utilize the standard Hadoop >>> FileSystem APIs, which means it should work seamlessly with popular cloud >>> and distributed file systems (like S3, HDFS, GFS, etc.). >>> These systems share a similar nature and are designed for large files. >>> >>> *large file could create problems during retrieval.* >>> >>> We are mitigating this risk by ensuring that tasks do not read the >>> entire consolidated file at once. >>> Instead, the implementation is designed to read the data in configured >>> blocks, rather than relying on a single read. *This behavior can be >>> refined/validated* to make it more robust. >>> >>> *Wenchen,* >>> I fully agree that the operational details around using cloud storage >>> for shuffle—specifically traffic throttling, cleanup guarantees >>> and overall request-related network cost — these are critical issues >>> that must be solved. >>> The consolidation stage is explicitly designed to mitigate the >>> throttling and accompanying cost issues . >>> *Throttling* - By consolidating shuffle data, this approach transforms >>> the read pattern from a multitude of small, random requests into fewer, >>> large, targeted ones. Particularly beneficial for modern cloud object >>> storage. >>> *Shuffle cleanup* - I am actively trying to leverage the Shufffle >>> clean up mode and also making an effort to make them robust .These cleanup >>> improvements should be beneficial, regardless of this proposal and cover >>> most cases. >>> However, I agree that to ensure *no orphaned files* remain, we will >>> still require other means (such as remote storage lifecycle policies or >>> job-specific scripts) for a guaranteed cleanup. >>> Thank you again for your valuable feedback, especially the validation on >>> synchronous scheduling and AQE integration. >>> >>> *Murali,* >>> >>>> * Doing an explicit sort stage* >>> >>> To clarify, ShuffleVault does not introduce an explicit sort stage. >>> Instead, it introduces a Shuffle Consolidation Stage. >>> This stage is a pure passthrough operation that only aggregates >>> scattered shuffle data for a given reducer partition. >>> In simple terms, it functions as an additional reducer stage that reads >>> the fragmented shuffle files from the mappers and writes them as a single, >>> consolidated, durable file to remote storage. >>> >>> *but that would be a nontrivial change * >>> >>> I agree that the change is significant, I am actively working to ensure >>> the benefits are leveraged across the stack. This PR >>> <https://github.com/apache/spark/pull/53028> demonstrates integration >>> with AQE and interactions with other rules(Exchange reuse, Shuffle >>> Partition Coalescing ect). >>> I would genuinely appreciate it if you could take a look at the POC PR >>> to see the scope00 of changes. The primary logic is encapsulated within a >>> new Spark Physical Planner Rule >>> <https://github.com/apache/spark/pull/53028/files#diff-5a444440444095e67e15f707b7f5f34816c4e9c299cec4901a424a29a09874d6> >>> that injects the consolidation stage, which is the main crux. >>> >>> I welcome any further questions or comments! >>> >>> Thanks & Regards >>> Karuppayya >>> >>> On Tue, Nov 18, 2025 at 9:32 AM Mridul Muralidharan <[email protected]> >>> wrote: >>> >>>> >>>> There are existing Apache projects which provide the capabilities which >>>> largely addresses the problem statement - Apache Celeborn, Apache Uniffle, >>>> Zeus, etc. >>>> Doing an explicit sort stage, between "map" and "reduce" brings with it >>>> some nice advantages, especially if the output is durable, but that would >>>> be a nontrivial change - and should be attempted if the benefits are being >>>> leveraged throughout the stack (AQE, speculative execution, etc) >>>> >>>> Regards, >>>> Mridul >>>> >>>> On Tue, Nov 18, 2025 at 11:12 AM Wenchen Fan <[email protected]> >>>> wrote: >>>> >>>>> Hi Karuppayya, >>>>> >>>>> Handling large shuffles in Spark is challenging and it's great to see >>>>> proposals addressing it. I think the extra "shuffle consolidation stage" >>>>> is >>>>> a good idea, and now I feel it's better for it to be synchronous, so that >>>>> we can integrate it with AQE and leverage the accurate runtime shuffle >>>>> status to make decisions about whether or not to launch this extra >>>>> "shuffle >>>>> consolidation stage" and how to consolidate. This is a key differentiator >>>>> compared to the push-based shuffle. >>>>> >>>>> However, there are many details to consider, and in general it's >>>>> difficult to use cloud storage for shuffle. We need to deal with problems >>>>> like traffic throttling, cleanup guarantee, cost control, and so on. Let's >>>>> take a step back and see what are the actual problems of large shuffles. >>>>> >>>>> Large shuffle usually starts with a large number of mappers that we >>>>> can't adjust (e.g. large table scan). We can adjust the number of reducers >>>>> to reach two goals: >>>>> 1. The input data size of each reducer shouldn't be too large, which >>>>> is roughly *total_shuffle_size / num_reducers*. This is to avoid >>>>> spilling/OOM during reducer task execution. >>>>> 2. The data size of each shuffle block shoudn't be too small, which is >>>>> roughly *total_shuffle_size / (num_mappers * num_reducers)*. This is >>>>> for the good of disk/network IO. >>>>> >>>>> These two goals are actually contradictory and sometimes we have to >>>>> prioritize goal 1 (i.e. pick a large *num_reducers*) so that the >>>>> query can finish. An extra "shuffle consolidation stage" can kind of >>>>> decrease the number of mappers, by merging the shuffle files from multiple >>>>> mappers. This can be a clear win as fetching many small shuffle blocks can >>>>> be quite slow, even slower than running an extra "shuffle consolidation >>>>> stage". >>>>> >>>>> In addition, the nodes that host shuffle files shouldn't be too many >>>>> (best to be 0 which means shuffle files are stored in a different >>>>> storage). >>>>> With a large number of mappers, likely every node in the cluster stores >>>>> some shuffle files. By merging shuffle files via the extra "shuffle >>>>> consolidation stage", we can decrease the number of nodes that host active >>>>> shuffle data, so that the cluster is more elastic. >>>>> >>>>> >>>>> Thanks, >>>>> Wenchen >>>>> >>>>> On Sat, Nov 15, 2025 at 6:13 AM Rishab Joshi <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi Karuppayya, >>>>>> >>>>>> Thanks for sharing the proposal and this looks very exciting. >>>>>> >>>>>> I have a few questions and please correct me if I misunderstood >>>>>> anything. >>>>>> >>>>>> Would it be possible to clarify whether the consolidated shuffle file >>>>>> produced for each partition is suitable across all storage systems, >>>>>> especially when this file becomes extremely large? I am wondering if a >>>>>> very >>>>>> large file could create problems during retrieval. For example, if a >>>>>> connection breaks while reading the file, some storage systems may not >>>>>> support resuming reads from the point of failure and start reading the >>>>>> file >>>>>> from the beginning again. This could lead to higher latency, repeated >>>>>> retries, or performance bottlenecks when a partition becomes too large or >>>>>> skewed? >>>>>> >>>>>> Would it make sense to introduce a configurable upper-bound on the >>>>>> maximum allowed file size? This might prevent the file from growing >>>>>> massively. >>>>>> Should the consolidated shuffle file be compressed before being >>>>>> written to the storage system. Compression might introduce additional >>>>>> latency but that too can be a configurable option. >>>>>> >>>>>> Regards, >>>>>> Rishab Joshi >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Thu, Nov 13, 2025 at 9:14 AM karuppayya <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Enrico, >>>>>>> Thank you very much for reviewing the doc. >>>>>>> >>>>>>> *Since the consolidation stage reads all the shuffle data, why not >>>>>>>> doing the transformation in that stage? What is the point in deferring >>>>>>>> the >>>>>>>> transformations into another stage?* >>>>>>> >>>>>>> >>>>>>> The reason for deferring the final consolidation to a subsequent >>>>>>> stage lies in the distributed nature of shuffle data. >>>>>>> Reducer requires reading all corresponding shuffle data written >>>>>>> across all map tasks. Since each mapper only holds its own local output, >>>>>>> the consolidation cannot begin until all the map stage completes. >>>>>>> >>>>>>> However, your question is also aligned to one of the approaches >>>>>>> mentioned (concurrent consolidation >>>>>>> <https://docs.google.com/document/d/1tuWyXAaIBR0oVD5KZwYvz7JLyn6jB55_35xeslUEu7s/edit?tab=t.0#heading=h.tmi917h1n1vf>), >>>>>>> which was specifically considered. >>>>>>> >>>>>>> While the synchronous consolidation happens afetr all the data is >>>>>>> available , concurrent consolidation can aggregate and persist the >>>>>>> already-generated shuffle data to begin concurrently with the remaining >>>>>>> map >>>>>>> tasks, thereby making the shuffle durable much earlier instead of >>>>>>> having to >>>>>>> wait for all map tasks to complete. >>>>>>> >>>>>>> - Karuppayya >>>>>>> >>>>>>> On Thu, Nov 13, 2025 at 1:13 AM Enrico Minack <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> another remark regarding a remote shuffle storage solution: >>>>>>>> As long as the map executors are alive, reduce executors should >>>>>>>> read from them to avoid any extra delay / overhead. >>>>>>>> On fetch failure from a map executor, the reduce executors should >>>>>>>> fall back to a remote storage that provides a copy (merged or not) of >>>>>>>> the >>>>>>>> shuffle data. >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Enrico >>>>>>>> >>>>>>>> >>>>>>>> Am 13.11.25 um 09:42 schrieb Enrico Minack: >>>>>>>> >>>>>>>> Hi Karuppayya, >>>>>>>> >>>>>>>> thanks for your proposal and bringing up this issue. >>>>>>>> >>>>>>>> I am very much in favour of a shuffle storage solution that allows >>>>>>>> for dynamic allocation and node failure in a K8S environment, without >>>>>>>> the >>>>>>>> burden of managing an Remote Shuffle Service. >>>>>>>> >>>>>>>> I have the following comments: >>>>>>>> >>>>>>>> Your proposed consolidation stage is equivalent to the next reducer >>>>>>>> stage in the sense that it reads shuffle data from the earlier map >>>>>>>> stage. >>>>>>>> This requires the executors of the map stage to survive until the >>>>>>>> shuffle >>>>>>>> data are consolidated ("merged" in Spark terminology). Therefore, I >>>>>>>> think >>>>>>>> this passage of your design document is not accurate: >>>>>>>> >>>>>>>> Executors that perform the initial map tasks (shuffle writers) >>>>>>>> can be immediately deallocated after writing their shuffle data ... >>>>>>>> >>>>>>>> Since the consolidation stage reads all the shuffle data, why not >>>>>>>> doing the transformation in that stage? What is the point in deferring >>>>>>>> the >>>>>>>> transformations into another stage? >>>>>>>> >>>>>>>> You mention the "Native Shuffle Block Migration" and say its >>>>>>>> limitation is "It simply shifts the storage burden to other active >>>>>>>> executors". >>>>>>>> Please consider that the migration process can migrate to a (in >>>>>>>> Spark called) fallback storage, which essentially copies the shuffle >>>>>>>> data >>>>>>>> to a remote storage. >>>>>>>> Kind regards, >>>>>>>> Enrico >>>>>>>> >>>>>>>> Am 13.11.25 um 01:40 schrieb karuppayya: >>>>>>>> >>>>>>>> Hi All, >>>>>>>> >>>>>>>> I propose to utilize *Remote Storage as a Shuffle Store, natively >>>>>>>> in Spark* . >>>>>>>> >>>>>>>> This approach would fundamentally decouple shuffle storage from >>>>>>>> compute nodes, mitigating *shuffle fetch failures and also help >>>>>>>> with aggressive downscaling*. >>>>>>>> >>>>>>>> The primary goal is to enhance the *elasticity and resilience* of >>>>>>>> Spark workloads, leading to substantial cost optimization >>>>>>>> opportunities. >>>>>>>> >>>>>>>> *I welcome any initial thoughts or concerns regarding this idea.* >>>>>>>> *Looking forward to your feedback! * >>>>>>>> >>>>>>>> JIRA: SPARK-53484 >>>>>>>> <https://issues.apache.org/jira/browse/SPARK-54327> >>>>>>>> SPIP doc >>>>>>>> <https://docs.google.com/document/d/1leywkLgD62-MdG7e57n0vFRi7ICNxn9el9hpgchsVnk/edit?tab=t.0#heading=h.u4h68wupq6lw> >>>>>>>> , >>>>>>>> Design doc >>>>>>>> <https://docs.google.com/document/d/1tuWyXAaIBR0oVD5KZwYvz7JLyn6jB55_35xeslUEu7s/edit?tab=t.0> >>>>>>>> PoC PR <https://github.com/apache/spark/pull/53028> >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Karuppayya >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>> >>>>>> -- >>>>>> Regards >>>>>> Rishab Joshi >>>>>> >>>>>
