Hi Samrat,

First of all, thanks for the proposal. It's long overdue to get this in a
better state.

With regards to the schemes, I would say to ship an initial release that
does not include support for s3a and s3p, and focus first on getting this
new implementation into a stable state. When that's done, as a follow-up,
we can consider adding support for s3a and s3p on this implementation, and
when that's there consider deprecating the older implementations. It will
probably take multiple releases before we have this in a stable state.

Not directly related to this, but given that MinIO decided to change their
license, do we also need to refactor existing tests to not use MinIO
anymore but something else?

Thanks,

Martijn

On Sat, Oct 25, 2025 at 1:38 AM Samrat Deb <[email protected]> wrote:

> Hi all,
>
> One clarifying question regarding the URI schemes:
>
> Currently, the Flink ecosystem uses multiple schemes to differentiate
> between S3 implementations: s3a:// for the Hadoop-based connector and
> s3p://[1] for the Presto-based one, which is often recommended for
> checkpointing.
>
> A key goal of the proposed flink-s3-fs-native is to unify these into a
> single implementation. With that in mind, what should be the strategy for
> scheme support? Should the new native s3 filesystem register only for the
> simple s3:// scheme, aiming to deprecate the others? Or would it be
> beneficial to also support s3a:// and s3p:// to provide a smoother
> migration path for users who may have these schemes in their existing job
> configurations?
> Cheers,
> Samrat
>
> [1] https://github.com/generalui/s3p
>
> On Wed, Oct 22, 2025 at 6:31 PM Piotr Nowojski <[email protected]>
> wrote:
>
> > Hi Samrat,
> >
> > > *1.*  Even if the specifics are hazy, could you recall the general
> > > nature of those concerns? For instance, were they related to S3's
> > eventual
> > > consistency model, which has since improved, the atomicity of Multipart
> > > Upload commits, or perhaps complex failure/recovery scenarios during
> the
> > > commit phase?
> >
> > and
> >
> > > *8. *The flink-s3-fs-presto connector explicitly throws an
> > > `UnsupportedOperationException` when `createRecoverableWriter()` is
> > called.
> > > Was this a deliberate design choice to keep the Presto connector
> > > lightweight and optimized specifically for checkpointing, or were there
> > > other technical challenges that prevented its implementation at the
> time?
> > > Any context on this would be very helpful
> >
> > I very vaguely remember that at least one of those concerns was with
> > respect to how long
> > does it take for the S3 to make some certain operations visible. That you
> > think you have
> > uploaded and committed a file, but in reality it might not be visible for
> > tens of seconds.
> >
> > Sorry, I don't remember more (or even if there was more). I was only
> > superficially involved
> > in the S3 connector back then - just participated/overheard some
> > discussions.
> >
> > > *2.*  It's clear that implementing an efficient
> PathsCopyingFileSystem[2]
> > is
> > > a non-negotiable requirement for performance. Is there any benchmark
> > > numbers available that can be used as reference and evaluate new
> > > implementation deviation ?
> >
> > I only have the numbers that I put in the original Flip [1]. I don't
> > remember the benchmark
> > setup, but it must have been something simple. Like just let some job
> > accumulate 1GB of state
> > and measure how long the state downloading phase of recovery was taking.
> >
> > > *3.* Do you recall the workload characteristics for that PoC?
> > Specifically,
> > > was the 30-40% performance advantage of s5cmd observed when copying
> many
> > > small files (like checkpoint state) or larger, multi-gigabyte files?
> >
> > It was just a regular mix of compacted RocksDB sst files, with total
> state
> > size 1 or at most
> > a couple of GBs. So most of the files were around ~64MB or ~128MB, with a
> > couple of
> > smaller L0 files, and maybe one larger L2 file.
> >
> > > *4.* The idea of a switchable implementation sounds great. Would you
> > > envision this as a configuration flag (e.g.,
> > s3.native.copy.strategy=s5cmd
> > > or s3.native.copy.strategy=sdk) that selects the backend implementation
> > at
> > > runtime? Also on contrary is it worth adding configuration that exposes
> > > some level of implementation level information ?
> >
> > I think something like that should be fine, assuming that `s5cmd` will
> > again
> > prove significantly faster and/or more cpu efficient. If not, if the
> SDKv2
> > has
> > already improved and caught up with the `s5cmd`, then it probably doesn't
> > make sense to keep `s5cmd` support.
> >
> > > *5.* My understanding is that the key takeaway here is to avoid the
> > > file-by-file stream-based copy used in the vanilla connector and
> leverage
> > > bulk operations, which PathsCopyingFileSystem[2] enables. This seems
> most
> > > critical during state download on recovery. please suggest if my
> > inference
> > > is in right direction
> >
> > Yes, but you should also make the bult transfer configurable. How many
> bulk
> > transfers
> > can be happening in parallel etc.
> >
> > > *6.* The warning about `s5cmd` causing OOMs sounds like indication to
> > > consider `S3TransferManager`[3] implementation, which might offer more
> > > granular control over buffering and in-flight requests. Do you think
> > > exploring more on `S3TransferManager` would be valuable ?
> >
> > I'm pretty sure if you start hundreds of bulk transfers in parallel via
> the
> > `S3TransferManager` you can get the same problems with running out of
> > memory or exceeding available network throughput. I don't know if
> > `S3TransferManager` is better or worse in that regard to be honest.
> >
> > > *7.* The insight on AWS aggressively dropping packets instead of
> > gracefully
> > > throttling is invaluable. Currently i have limited understanding on how
> > aws
> > > behaves at throttling I will deep dive more into it and
> > > look for clarification based on findings or doubt.  To counter this,
> were
> > > you thinking of a configurable rate limiter within the filesystem
> itself
> > > (e.g., setting max bandwidth or max concurrent requests), or something
> > more
> > > dynamic that could adapt to network conditions?
> >
> > Flat rate limiting is tricky because AWS offers burst network capacity,
> > which
> > comes very handy, and in the vast majority of cases works fine. But for
> > some jobs
> > if you exceed that burst capacity, AWS starts dropping your packets and
> > then the
> > problems happen. On the other hand, if rate limit to your normal
> capacity,
> > you
> > are leaving a lot of network throughput unused during recoveries.
> >
> > At the same time AWS doesn't share details for the burst  capacity, so
> it's
> > sometimes
> > tricky to configure the whole system properly. I don't have an universal
> > good answer
> > for that :(
> >
> > Best,
> > Piotrek
> >
> >
> >
> > wt., 21 paź 2025 o 21:40 Samrat Deb <[email protected]> napisał(a):
> >
> > > Hi Gabor/ Ferenc
> > >
> > > Thank you for sharing the pointer and valuable feedback.
> > >
> > > The link to the custom `XmlResponsesSaxParser`[1] looks scary 😦
> > > and contains hidden complexity.
> > >
> > > *1.* Could you share some context on why this custom parser was
> > necessary?
> > > Was it to work around a specific bug, a performance issue, or an
> > > inconsistency in the S3 XML API responses that the default AWS SDK
> parser
> > > couldn't handle at the time? With sdk v2 what are core functionality
> that
> > > is required to be intensively tested ?
> > >
> > > *2.* You mentioned it has no Hadoop dependency, which is great news.
> For
> > a
> > > new native S3 connector, would integration simply require implementing
> a
> > > new S3DelegationTokenProvider/Receiver pair using the AWS SDK, or are
> > there
> > > more subtle integration points with the framework that should be
> > accounted?
> > >
> > > *3.* I remember solving Serialized Throwable exception issue [2]
> leading
> > to
> > > a new bug [3], where an initial fix led to a regression that Gabor
> later
> > > solved with Ferenc providing a detailed root cause insights [4] 😅.
> > > Its hard to fully sure that all scenarios are covered properly. This is
> > one
> > > of the example, there can be other unknowns.
> > > what would be the best approach to test for and prevent such
> regressions
> > or
> > > unknown unknowns, especially in the most sensitive parts of the
> > filesystem
> > > logic?
> > >
> > > Cheers,
> > > Samrat
> > >
> > > [1]
> > >
> > >
> >
> https://github.com/apache/flink/blob/0e4e6d7082e83f098d0c1a94351babb3ea407aa8/flink-filesystems/flink-s3-fs-base/src/main/java/com/amazonaws/services/s3/model/transform/XmlResponsesSaxParser.java
> > > [2] https://issues.apache.org/jira/browse/FLINK-28513
> > > [3] https://github.com/apache/flink/pull/25231
> > > [4] https://github.com/apache/flink/pull/25231#issuecomment-2312059662
> > >
> > > On Tue, 21 Oct 2025 at 3:49 PM, Gabor Somogyi <
> [email protected]
> > >
> > > wrote:
> > >
> > > > Hi Samrat,
> > > >
> > > > +1 on the direction that we move away from hadoop.
> > > >
> > > > This is a long standing discussion to replace the mentioned 2
> > connectors
> > > > with something better.
> > > > Both of them has it's own weaknesses, I've fixed several blockers
> > inside
> > > > them.
> > > >
> > > > There are definitely magic inside them, please see this [1] for
> example
> > > and
> > > > there are more🙂
> > > > I think the most sensitive part is the recovery because hard to test
> > all
> > > > cases.
> > > >
> > > > @Ferenc
> > > > > One thing that comes to my mind that will need some changes and its
> > > > involvement
> > > > to this change is not trivial is the delegation token framework.
> > > Currently
> > > > it
> > > > is also tied to the Hadoop stuff and has some abstract classes in the
> > > base
> > > > S3 FS
> > > > module.
> > > >
> > > > The delegation token framework has no dependency on hadoop so there
> is
> > no
> > > > blocker on the road,
> > > > but I'm here to help if any question appears.
> > > >
> > > > BR,
> > > > G
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/0e4e6d7082e83f098d0c1a94351babb3ea407aa8/flink-filesystems/flink-s3-fs-base/src/main/java/com/amazonaws/services/s3/model/transform/XmlResponsesSaxParser.java#L95-L104
> > > >
> > > > On Tue, Oct 14, 2025 at 8:19 PM Samrat Deb <[email protected]>
> > > wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > Poorvank (cc'ed) and I are writing to start a discussion about a
> > > > potential
> > > > > improvement for Flink, creating a new, native S3 filesystem
> > independent
> > > > of
> > > > > Hadoop/Presto.
> > > > >
> > > > > The goal of this proposal is to address several challenges related
> to
> > > > > Flink's S3 integration, simplifying flink-s3-filesystem. If this
> > > > discussion
> > > > > gains positive traction, the next step would be to move forward
> with
> > a
> > > > > formalised FLIP.
> > > > >
> > > > > The Challenges with the Current S3 Connectors
> > > > > Currently, Flink offers two primary S3 filesystems,
> > > flink-s3-fs-hadoop[1]
> > > > > and flink-s3-fs-presto[2]. While functional, this dual-connector
> > > approach
> > > > > has few issues:
> > > > >
> > > > > 1. The flink-s3-fs-hadoop connector adds an additional dependency
> to
> > > > > manage. Upgrades like AWS SDK v2 are more dependent on
> Hadoop/Presto
> > to
> > > > > support first and leverage in flink-s3-filesystem. Sometimes it's
> > > > > restrictive to leverage features directly from the AWS SDK.
> > > > >
> > > > > 2. The flink-s3-fs-presto connector was introduced to mitigate the
> > > > > performance issues of the Hadoop connector, especially for
> > > checkpointing.
> > > > > However, it lacks a RecoverableWriter implementation.
> > > > > Sometimes it's confusing for Flink users, highlighting the need
> for a
> > > > > single, unified solution.
> > > > >
> > > > > *Proposed Solution:*
> > > > > A Native, Hadoop-Free S3 Filesystem
> > > > >
> > > > > I propose we develop a new filesystem, let's call it
> > > flink-s3-fs-native,
> > > > > built directly on the modern AWS SDK for Java v2. This approach
> would
> > > be
> > > > > free of any Hadoop or Presto dependencies. I have done a small
> > > prototype
> > > > to
> > > > > validate [3]
> > > > >
> > > > > This is motivated by trino<>s3 [4]. The Trino project successfully
> > > > > undertook a similar migration, moving from Hadoop-based object
> > storage
> > > > > clients to their own native implementations.
> > > > >
> > > > > The new Flink S3 filesystem would:
> > > > >
> > > > > 1. Provide a single, unified connector for all S3 interactions,
> from
> > > > state
> > > > > backends to sinks.
> > > > >
> > > > > 2. Implement a high-performance S3RecoverableWriter using S3's
> > > Multipart
> > > > > Upload feature, ensuring exactly-once sink semantics.
> > > > >
> > > > > 3. Offer a clean, self-contained dependency, drastically
> simplifying
> > > > setup
> > > > > and eliminating external dependencies.
> > > > >
> > > > > A Phased Migration Path
> > > > > To ensure a smooth transition, we could adopt a phased approach on
> a
> > > very
> > > > > high level :
> > > > >
> > > > > Phase 1:
> > > > > Introduce the new native S3 filesystem as an optional, parallel
> > plugin.
> > > > > This would allow for community testing and adoption without
> breaking
> > > > > existing setups.
> > > > >
> > > > > Phase 2:
> > > > > Once the native connector achieves feature parity and proven
> > stability,
> > > > we
> > > > > will update the documentation to recommend it as the default choice
> > for
> > > > all
> > > > > S3 use cases.
> > > > >
> > > > > Phase 3:
> > > > > In a future major release, the legacy flink-s3-fs-hadoop and
> > > > > flink-s3-fs-presto connectors could be formally deprecated, with
> > clear
> > > > > migration guides provided for users.
> > > > >
> > > > > I would love to hear the community's thoughts on this.
> > > > >
> > > > > A few questions to start the discussion:
> > > > >
> > > > > 1. What are the biggest pain points with the current S3 filesystem?
> > > > >
> > > > > 2. Are there any critical features from the Hadoop S3A client that
> > are
> > > > > essential to replicate in a native implementation?
> > > > >
> > > > > 3. Would a simplified, non-dependent S3 experience be a valuable
> > > > > improvement for Flink use cases?
> > > > >
> > > > >
> > > > > Cheers,
> > > > > Samrat
> > > > >
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/tree/master/flink-filesystems/flink-s3-fs-hadoop
> > > > > [2]
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/tree/master/flink-filesystems/flink-s3-fs-presto
> > > > > [3] https://github.com/Samrat002/flink/pull/4
> > > > > [4]
> > > > https://github.com/trinodb/trino/tree/master/lib/trino-filesystem-s3
> > > > >
> > > >
> > >
> >
>

Reply via email to