Hi Piotr, Thank you so much for taking the time to reply. Your perspective as one of the original authors is incredibly valuable 😃. I'd like to make sure I fully understand your points and ask a few clarifying questions. This will be extremely helpful.
But I don't remember the details, so it's hard for me to provide any feedback besides some vague warnings from the original authors of the current connector, that there might be some special corner cases that are hard to cover. So I guess proceed with caution? 😅 Thank you for the heads-up regarding the "special corner cases" you mentioned. *1.* Even if the specifics are hazy, could you recall the general nature of those concerns? For instance, were they related to S3's eventual consistency model, which has since improved, the atomicity of Multipart Upload commits, or perhaps complex failure/recovery scenarios during the commit phase? I implemented a PoC of SDKv2 bulk file copy vs using `s5cmd`. Both were significantly faster than the standard implementation copy one file at a time using file streams. `s5cmd` was ~30%-40% faster than SDKv2. Due to that I've chosen to use `s5cmd`, but it has some downsides (relaying on external dependency and complicating the setup). *2.* It's clear that implementing an efficient PathsCopyingFileSystem[2] is a non-negotiable requirement for performance. Is there any benchmark numbers available that can be used as reference and evaluate new implementation deviation ? *3.* Do you recall the workload characteristics for that PoC? Specifically, was the 30-40% performance advantage of s5cmd observed when copying many small files (like checkpoint state) or larger, multi-gigabyte files? *4.* The idea of a switchable implementation sounds great. Would you envision this as a configuration flag (e.g., s3.native.copy.strategy=s5cmd or s3.native.copy.strategy=sdk) that selects the backend implementation at runtime? Also on contrary is it worth adding configuration that exposes some level of implementation level information ? b) Try to keep memory usage under control. I think the biggest weakness of the current `s5cmd` approach is that it doesn't control well how much memory is used for the state download, leading to occasional OOM issues. *5.* My understanding is that the key takeaway here is to avoid the file-by-file stream-based copy used in the vanilla connector and leverage bulk operations, which PathsCopyingFileSystem[2] enables. This seems most critical during state download on recovery. please suggest if my inference is in right direction *6.* The warning about `s5cmd` causing OOMs sounds like indication to consider `S3TransferManager`[3] implementation, which might offer more granular control over buffering and in-flight requests. Do you think exploring more on `S3TransferManager` would be valuable ? c) In AWS it can be very painful if you download/upload the state/files too quickly. If you exceed your burst network quota, AWS doesn't gracefully slow down the network but very aggressively starts dropping packets, very easily leading to TaskManagers losing connection with the JobManager. Some rate limiting capability is thus necessary. For example the current `s5cmd` has some (very crude) control over that, via controlling how many parallel downloads you can have etc. *7.* The insight on AWS aggressively dropping packets instead of gracefully throttling is invaluable. Currently i have limited understanding on how aws behaves at throttling I will deep dive more into it and look for clarification based on findings or doubt. To counter this, were you thinking of a configurable rate limiter within the filesystem itself (e.g., setting max bandwidth or max concurrent requests), or something more dynamic that could adapt to network conditions? I vaguely remember there were some old concerns about supporting FileSystem based sink (is it the one using `S3RecoverableWriter`?). As per my understanding from the codebase, S3RecoverableWriter[1] implements Flink’s recoverable file sink contract for S3, so application can tolerate task failures while writing checkpointed or transactional output. It wraps S3 multipart uploads and exposing recover and commit serializers back to Flink’s RecoverableWriter APIs. Flink jobs that write to s3:// paths via FileSink or StreamingFileSink rely on this writer whenever the configured filesystem is `flink-s3-fs-hadoop` but not available for `flink-s3-fs-presto` , code reference[1.1]. *8. *The flink-s3-fs-presto connector explicitly throws an `UnsupportedOperationException` when `createRecoverableWriter()` is called. Was this a deliberate design choice to keep the Presto connector lightweight and optimized specifically for checkpointing, or were there other technical challenges that prevented its implementation at the time? Any context on this would be very helpful I will have more queries and open questions as i progress. Bests, Samrat [1] https://github.com/apache/flink/blob/1a7c0f1bcdc92d550d38a85fee1900572e6aaac5/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java [1.1] https://github.com/apache/flink/blob/1a7c0f1bcdc92d550d38a85fee1900572e6aaac5/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java#L459 [2] https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/PathsCopyingFileSystem.java#L37 [3] https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/transfer/s3/S3TransferManager.html On Mon, Oct 20, 2025 at 1:47 PM Piotr Nowojski <[email protected]> wrote: > Hi, > > Thanks for looking into this. I vaguely remember there were some old > concerns about > supporting FileSystem based sink (is it the one using > `S3RecoverableWriter`?). But I don't > remember the details, so it's hard for me to provide any feedback besides > some vague > warnings from the original authors of the current connector, that there > might be some > special corner cases that are hard to cover. So I guess proceed with > caution? 😅 > > > A few questions to start the discussion: > > > > 2. Are there any critical features from the Hadoop S3A client that are > essential to replicate in a native implementation? > > Please implement `PathsCopyingFileSystem` [1] for the new connector. Either > keep > the current existing implementation using `s5cmd` or implement a new one > using AWS > SDKv2 `S3TransferManager` (or something equivalent). Or ideally add support > to switch > between both `s5cmd` and SDKv2. > > A couple of years ago to address some of the performance concerns of the S3 > connector > I implemented a PoC of SDKv2 bulk file copy vs using `s5cmd`. Both were > significantly > faster than the standard implementation copy one file at a time using file > streams. > `s5cmd` was ~30%-40% faster than SDKv2. Due to that I've chosen to use > `s5cmd`, but > it has some downsides (relaying on external dependency and complicating the > setup). > > > 1. What are the biggest pain points with the current S3 filesystem? > > a) Download speed performance/CPU efficiency. Without using > PathsCopyingFileSystem [1] > the vanilla connector is very inefficient - to saturate available network > it must use much > much more CPU compared to for example `s5cmd`. > > b) Try to keep memory usage under control. I think the biggest weakness of > the current > `s5cmd` approach is that it doesn't control well how much memory is used > for the state > download, leading to occasional OOM issues. > > c) In AWS it can be very painful if you download/upload the state/files too > quickly. If you > exceed your burst network quota, AWS doesn't gracefully slow down the > network but > very aggressively starts dropping packets, very easily leading to > TaskManagers losing > connection with the JobManager. Some rate limiting capability is thus > necessary. For > example the current `s5cmd` has some (very crude) control over that, via > controlling > how many parallel downloads you can have etc. > > That's based on our (Confluent) perspective & experience. > > Best, > Piotrek > > [1] > > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/PathsCopyingFileSystem.java#L37 > > pt., 17 paź 2025 o 19:49 Ferenc Csaky <[email protected]> > napisał(a): > > > Hi, > > > > Thanks for starting this discussion. Heavy +1 from me. > > > > AWS v1 is EOL at the end of 2025, so the Hadoop S3 FS has to be updated > > pretty > > soonish as well. But that's not really news fer you guys [1] :) > Personally > > I > > don't think that would make this proposal even a tiny bit less important. > > > > What I see nowadays there are more and more Hadoop-less use-cases, so > > eliminating Hadoop bloat where it is not a must, IMO is a net gain, > period. > > > > One thing that comes to my mind that will need some changes and its > > involvement > > to this change is not trivial is the delegation token framework. > Currently > > it > > is also tied to the Hadoop stuff and has some abstract classes in the > base > > S3 FS > > module. > > > > Another funny thing I personally experienced and also points out problems > > with > > the current setup is if you use Iceberg with an AWS Glue catalog, you > must > > also > > bundle the AWS SDK v2, cause Iceberg depends on that version. So if > someone > > would like to do that currently they cannot really escape bloating their > cp > > with both AWS SDK. > > > > Best, > > Ferenc > > > > [1] https://issues.apache.org/jira/browse/FLINK-30975 > > > > > > > > On Friday, October 17th, 2025 at 17:12, Tom Cooper <[email protected]> > > wrote: > > > > > > > > > > > Hi Samrat, > > > > > > +1 from me. I think this would be a brilliant contribution. The Hadoop > > libraries are often full of CVEs and updating them can be, IMHO, one of > the > > hardest chores in the Flink code base. > > > So from a purely maintenance POV I think this work would be valuable. > > Also having the most up to date AWS Java SDK means we keep up with all > the > > auth requirements and opens up more options for using advanced features > in > > future. > > > > > > Frankly, I think in the long term, Flink would be better off moving > away > > from Hadoop altogether (but that is a much bigger discussion). > > > > > > Thanks, > > > > > > Tom Cooper > > > @tomcooper.dev | https://tomcooper.dev > > > > > > > > > On Tuesday, 14 October 2025 at 19:19, Samrat Deb [email protected] > > wrote: > > > > > > > Hi All, > > > > > > > > Poorvank (cc'ed) and I are writing to start a discussion about a > > potential > > > > improvement for Flink, creating a new, native S3 filesystem > > independent of > > > > Hadoop/Presto. > > > > > > > > The goal of this proposal is to address several challenges related to > > > > Flink's S3 integration, simplifying flink-s3-filesystem. If this > > discussion > > > > gains positive traction, the next step would be to move forward with > a > > > > formalised FLIP. > > > > > > > > The Challenges with the Current S3 Connectors > > > > Currently, Flink offers two primary S3 filesystems, > > flink-s3-fs-hadoop[1] > > > > and flink-s3-fs-presto[2]. While functional, this dual-connector > > approach > > > > has few issues: > > > > > > > > 1. The flink-s3-fs-hadoop connector adds an additional dependency to > > > > manage. Upgrades like AWS SDK v2 are more dependent on Hadoop/Presto > to > > > > support first and leverage in flink-s3-filesystem. Sometimes it's > > > > restrictive to leverage features directly from the AWS SDK. > > > > > > > > 2. The flink-s3-fs-presto connector was introduced to mitigate the > > > > performance issues of the Hadoop connector, especially for > > checkpointing. > > > > However, it lacks a RecoverableWriter implementation. > > > > Sometimes it's confusing for Flink users, highlighting the need for a > > > > single, unified solution. > > > > > > > > Proposed Solution: > > > > A Native, Hadoop-Free S3 Filesystem > > > > > > > > I propose we develop a new filesystem, let's call it > > flink-s3-fs-native, > > > > built directly on the modern AWS SDK for Java v2. This approach would > > be > > > > free of any Hadoop or Presto dependencies. I have done a small > > prototype to > > > > validate [3] > > > > > > > > This is motivated by trino<>s3 [4]. The Trino project successfully > > > > > > > > undertook a similar migration, moving from Hadoop-based object > storage > > > > clients to their own native implementations. > > > > > > > > The new Flink S3 filesystem would: > > > > > > > > 1. Provide a single, unified connector for all S3 interactions, from > > state > > > > backends to sinks. > > > > > > > > 2. Implement a high-performance S3RecoverableWriter using S3's > > Multipart > > > > Upload feature, ensuring exactly-once sink semantics. > > > > > > > > 3. Offer a clean, self-contained dependency, drastically simplifying > > setup > > > > and eliminating external dependencies. > > > > > > > > A Phased Migration Path > > > > To ensure a smooth transition, we could adopt a phased approach on a > > very > > > > high level : > > > > > > > > Phase 1: > > > > Introduce the new native S3 filesystem as an optional, parallel > plugin. > > > > This would allow for community testing and adoption without breaking > > > > existing setups. > > > > > > > > Phase 2: > > > > Once the native connector achieves feature parity and proven > > stability, we > > > > will update the documentation to recommend it as the default choice > > for all > > > > S3 use cases. > > > > > > > > Phase 3: > > > > In a future major release, the legacy flink-s3-fs-hadoop and > > > > flink-s3-fs-presto connectors could be formally deprecated, with > clear > > > > migration guides provided for users. > > > > > > > > I would love to hear the community's thoughts on this. > > > > > > > > A few questions to start the discussion: > > > > > > > > 1. What are the biggest pain points with the current S3 filesystem? > > > > > > > > 2. Are there any critical features from the Hadoop S3A client that > are > > > > essential to replicate in a native implementation? > > > > > > > > 3. Would a simplified, non-dependent S3 experience be a valuable > > > > improvement for Flink use cases? > > > > > > > > Cheers, > > > > Samrat > > > > > > > > [1] > > > > > > > https://github.com/apache/flink/tree/master/flink-filesystems/flink-s3-fs-hadoop > > > > [2] > > > > > > > https://github.com/apache/flink/tree/master/flink-filesystems/flink-s3-fs-presto > > > > [3] https://github.com/Samrat002/flink/pull/4 > > > > [4] > > https://github.com/trinodb/trino/tree/master/lib/trino-filesystem-s3 > > >
