Hi Piotr, Thank you again for the response.
As I dig deeper into the existing code based on the discussion, one specific question, I hope you or others in the community might have context on: *1. *I noticed that XmlResponsesSaxParser.java [1] was copied into the Flink codebase, seemingly as a workaround for a JDK bug (JDK-8015099). The main change appears to be switching from XMLReaderFactory.createXMLReader() to SAX_PARSER_FACTORY.newSAXParser().getXMLReader(). Was this bug the only reason for including this custom parser, or were there other modifications or behaviors it handles that are critical to retain? Apart from that *2. *Are there any established, open-source integration test suites or benchmark tools within the Flink community specifically for measuring the performance of Flink's filesystems? The feedback from this entire discussion is really helpful. I will now proceed to the next steps based on the discussion My next steps will be: 1. To build out the flink-s3-fs-native POC, incorporating the critical features and pointers from this thread to ensure we aim for feature parity. 2. Once the POC is in a stable state, I will conduct benchmarks for both sinking and checkpointing. I will then share the results here, comparing the performance against the existing flink-s3-fs-hadoop[2] and flink-s3-fs-presto connectors [3]. Cheers, Samrat [1] https://github.com/apache/flink/blob/5e45679aab71104714698f1d3bf180d06a69d531/flink-filesystems/flink-s3-fs-base/src/main/java/com/amazonaws/services/s3/model/transform/XmlResponsesSaxParser.java#L4 [2] https://github.com/apache/flink/tree/master/flink-filesystems/flink-s3-fs-hadoop [3] https://github.com/apache/flink/tree/master/flink-filesystems/flink-s3-fs-presto On Wed, Oct 22, 2025 at 6:31 PM Piotr Nowojski <[email protected]> wrote: > Hi Samrat, > > > *1.* Even if the specifics are hazy, could you recall the general > > nature of those concerns? For instance, were they related to S3's > eventual > > consistency model, which has since improved, the atomicity of Multipart > > Upload commits, or perhaps complex failure/recovery scenarios during the > > commit phase? > > and > > > *8. *The flink-s3-fs-presto connector explicitly throws an > > `UnsupportedOperationException` when `createRecoverableWriter()` is > called. > > Was this a deliberate design choice to keep the Presto connector > > lightweight and optimized specifically for checkpointing, or were there > > other technical challenges that prevented its implementation at the time? > > Any context on this would be very helpful > > I very vaguely remember that at least one of those concerns was with > respect to how long > does it take for the S3 to make some certain operations visible. That you > think you have > uploaded and committed a file, but in reality it might not be visible for > tens of seconds. > > Sorry, I don't remember more (or even if there was more). I was only > superficially involved > in the S3 connector back then - just participated/overheard some > discussions. > > > *2.* It's clear that implementing an efficient PathsCopyingFileSystem[2] > is > > a non-negotiable requirement for performance. Is there any benchmark > > numbers available that can be used as reference and evaluate new > > implementation deviation ? > > I only have the numbers that I put in the original Flip [1]. I don't > remember the benchmark > setup, but it must have been something simple. Like just let some job > accumulate 1GB of state > and measure how long the state downloading phase of recovery was taking. > > > *3.* Do you recall the workload characteristics for that PoC? > Specifically, > > was the 30-40% performance advantage of s5cmd observed when copying many > > small files (like checkpoint state) or larger, multi-gigabyte files? > > It was just a regular mix of compacted RocksDB sst files, with total state > size 1 or at most > a couple of GBs. So most of the files were around ~64MB or ~128MB, with a > couple of > smaller L0 files, and maybe one larger L2 file. > > > *4.* The idea of a switchable implementation sounds great. Would you > > envision this as a configuration flag (e.g., > s3.native.copy.strategy=s5cmd > > or s3.native.copy.strategy=sdk) that selects the backend implementation > at > > runtime? Also on contrary is it worth adding configuration that exposes > > some level of implementation level information ? > > I think something like that should be fine, assuming that `s5cmd` will > again > prove significantly faster and/or more cpu efficient. If not, if the SDKv2 > has > already improved and caught up with the `s5cmd`, then it probably doesn't > make sense to keep `s5cmd` support. > > > *5.* My understanding is that the key takeaway here is to avoid the > > file-by-file stream-based copy used in the vanilla connector and leverage > > bulk operations, which PathsCopyingFileSystem[2] enables. This seems most > > critical during state download on recovery. please suggest if my > inference > > is in right direction > > Yes, but you should also make the bult transfer configurable. How many bulk > transfers > can be happening in parallel etc. > > > *6.* The warning about `s5cmd` causing OOMs sounds like indication to > > consider `S3TransferManager`[3] implementation, which might offer more > > granular control over buffering and in-flight requests. Do you think > > exploring more on `S3TransferManager` would be valuable ? > > I'm pretty sure if you start hundreds of bulk transfers in parallel via the > `S3TransferManager` you can get the same problems with running out of > memory or exceeding available network throughput. I don't know if > `S3TransferManager` is better or worse in that regard to be honest. > > > *7.* The insight on AWS aggressively dropping packets instead of > gracefully > > throttling is invaluable. Currently i have limited understanding on how > aws > > behaves at throttling I will deep dive more into it and > > look for clarification based on findings or doubt. To counter this, were > > you thinking of a configurable rate limiter within the filesystem itself > > (e.g., setting max bandwidth or max concurrent requests), or something > more > > dynamic that could adapt to network conditions? > > Flat rate limiting is tricky because AWS offers burst network capacity, > which > comes very handy, and in the vast majority of cases works fine. But for > some jobs > if you exceed that burst capacity, AWS starts dropping your packets and > then the > problems happen. On the other hand, if rate limit to your normal capacity, > you > are leaving a lot of network throughput unused during recoveries. > > At the same time AWS doesn't share details for the burst capacity, so it's > sometimes > tricky to configure the whole system properly. I don't have an universal > good answer > for that :( > > Best, > Piotrek > > > > wt., 21 paź 2025 o 21:40 Samrat Deb <[email protected]> napisał(a): > > > Hi Gabor/ Ferenc > > > > Thank you for sharing the pointer and valuable feedback. > > > > The link to the custom `XmlResponsesSaxParser`[1] looks scary 😦 > > and contains hidden complexity. > > > > *1.* Could you share some context on why this custom parser was > necessary? > > Was it to work around a specific bug, a performance issue, or an > > inconsistency in the S3 XML API responses that the default AWS SDK parser > > couldn't handle at the time? With sdk v2 what are core functionality that > > is required to be intensively tested ? > > > > *2.* You mentioned it has no Hadoop dependency, which is great news. For > a > > new native S3 connector, would integration simply require implementing a > > new S3DelegationTokenProvider/Receiver pair using the AWS SDK, or are > there > > more subtle integration points with the framework that should be > accounted? > > > > *3.* I remember solving Serialized Throwable exception issue [2] leading > to > > a new bug [3], where an initial fix led to a regression that Gabor later > > solved with Ferenc providing a detailed root cause insights [4] 😅. > > Its hard to fully sure that all scenarios are covered properly. This is > one > > of the example, there can be other unknowns. > > what would be the best approach to test for and prevent such regressions > or > > unknown unknowns, especially in the most sensitive parts of the > filesystem > > logic? > > > > Cheers, > > Samrat > > > > [1] > > > > > https://github.com/apache/flink/blob/0e4e6d7082e83f098d0c1a94351babb3ea407aa8/flink-filesystems/flink-s3-fs-base/src/main/java/com/amazonaws/services/s3/model/transform/XmlResponsesSaxParser.java > > [2] https://issues.apache.org/jira/browse/FLINK-28513 > > [3] https://github.com/apache/flink/pull/25231 > > [4] https://github.com/apache/flink/pull/25231#issuecomment-2312059662 > > > > On Tue, 21 Oct 2025 at 3:49 PM, Gabor Somogyi <[email protected] > > > > wrote: > > > > > Hi Samrat, > > > > > > +1 on the direction that we move away from hadoop. > > > > > > This is a long standing discussion to replace the mentioned 2 > connectors > > > with something better. > > > Both of them has it's own weaknesses, I've fixed several blockers > inside > > > them. > > > > > > There are definitely magic inside them, please see this [1] for example > > and > > > there are more🙂 > > > I think the most sensitive part is the recovery because hard to test > all > > > cases. > > > > > > @Ferenc > > > > One thing that comes to my mind that will need some changes and its > > > involvement > > > to this change is not trivial is the delegation token framework. > > Currently > > > it > > > is also tied to the Hadoop stuff and has some abstract classes in the > > base > > > S3 FS > > > module. > > > > > > The delegation token framework has no dependency on hadoop so there is > no > > > blocker on the road, > > > but I'm here to help if any question appears. > > > > > > BR, > > > G > > > > > > [1] > > > > > > > > > https://github.com/apache/flink/blob/0e4e6d7082e83f098d0c1a94351babb3ea407aa8/flink-filesystems/flink-s3-fs-base/src/main/java/com/amazonaws/services/s3/model/transform/XmlResponsesSaxParser.java#L95-L104 > > > > > > On Tue, Oct 14, 2025 at 8:19 PM Samrat Deb <[email protected]> > > wrote: > > > > > > > Hi All, > > > > > > > > Poorvank (cc'ed) and I are writing to start a discussion about a > > > potential > > > > improvement for Flink, creating a new, native S3 filesystem > independent > > > of > > > > Hadoop/Presto. > > > > > > > > The goal of this proposal is to address several challenges related to > > > > Flink's S3 integration, simplifying flink-s3-filesystem. If this > > > discussion > > > > gains positive traction, the next step would be to move forward with > a > > > > formalised FLIP. > > > > > > > > The Challenges with the Current S3 Connectors > > > > Currently, Flink offers two primary S3 filesystems, > > flink-s3-fs-hadoop[1] > > > > and flink-s3-fs-presto[2]. While functional, this dual-connector > > approach > > > > has few issues: > > > > > > > > 1. The flink-s3-fs-hadoop connector adds an additional dependency to > > > > manage. Upgrades like AWS SDK v2 are more dependent on Hadoop/Presto > to > > > > support first and leverage in flink-s3-filesystem. Sometimes it's > > > > restrictive to leverage features directly from the AWS SDK. > > > > > > > > 2. The flink-s3-fs-presto connector was introduced to mitigate the > > > > performance issues of the Hadoop connector, especially for > > checkpointing. > > > > However, it lacks a RecoverableWriter implementation. > > > > Sometimes it's confusing for Flink users, highlighting the need for a > > > > single, unified solution. > > > > > > > > *Proposed Solution:* > > > > A Native, Hadoop-Free S3 Filesystem > > > > > > > > I propose we develop a new filesystem, let's call it > > flink-s3-fs-native, > > > > built directly on the modern AWS SDK for Java v2. This approach would > > be > > > > free of any Hadoop or Presto dependencies. I have done a small > > prototype > > > to > > > > validate [3] > > > > > > > > This is motivated by trino<>s3 [4]. The Trino project successfully > > > > undertook a similar migration, moving from Hadoop-based object > storage > > > > clients to their own native implementations. > > > > > > > > The new Flink S3 filesystem would: > > > > > > > > 1. Provide a single, unified connector for all S3 interactions, from > > > state > > > > backends to sinks. > > > > > > > > 2. Implement a high-performance S3RecoverableWriter using S3's > > Multipart > > > > Upload feature, ensuring exactly-once sink semantics. > > > > > > > > 3. Offer a clean, self-contained dependency, drastically simplifying > > > setup > > > > and eliminating external dependencies. > > > > > > > > A Phased Migration Path > > > > To ensure a smooth transition, we could adopt a phased approach on a > > very > > > > high level : > > > > > > > > Phase 1: > > > > Introduce the new native S3 filesystem as an optional, parallel > plugin. > > > > This would allow for community testing and adoption without breaking > > > > existing setups. > > > > > > > > Phase 2: > > > > Once the native connector achieves feature parity and proven > stability, > > > we > > > > will update the documentation to recommend it as the default choice > for > > > all > > > > S3 use cases. > > > > > > > > Phase 3: > > > > In a future major release, the legacy flink-s3-fs-hadoop and > > > > flink-s3-fs-presto connectors could be formally deprecated, with > clear > > > > migration guides provided for users. > > > > > > > > I would love to hear the community's thoughts on this. > > > > > > > > A few questions to start the discussion: > > > > > > > > 1. What are the biggest pain points with the current S3 filesystem? > > > > > > > > 2. Are there any critical features from the Hadoop S3A client that > are > > > > essential to replicate in a native implementation? > > > > > > > > 3. Would a simplified, non-dependent S3 experience be a valuable > > > > improvement for Flink use cases? > > > > > > > > > > > > Cheers, > > > > Samrat > > > > > > > > > > > > [1] > > > > > > > > > > > > > > https://github.com/apache/flink/tree/master/flink-filesystems/flink-s3-fs-hadoop > > > > [2] > > > > > > > > > > > > > > https://github.com/apache/flink/tree/master/flink-filesystems/flink-s3-fs-presto > > > > [3] https://github.com/Samrat002/flink/pull/4 > > > > [4] > > > https://github.com/trinodb/trino/tree/master/lib/trino-filesystem-s3 > > > > > > > > > >
