Hi all,

One clarifying question regarding the URI schemes:

Currently, the Flink ecosystem uses multiple schemes to differentiate
between S3 implementations: s3a:// for the Hadoop-based connector and
s3p://[1] for the Presto-based one, which is often recommended for
checkpointing.

A key goal of the proposed flink-s3-fs-native is to unify these into a
single implementation. With that in mind, what should be the strategy for
scheme support? Should the new native s3 filesystem register only for the
simple s3:// scheme, aiming to deprecate the others? Or would it be
beneficial to also support s3a:// and s3p:// to provide a smoother
migration path for users who may have these schemes in their existing job
configurations?
Cheers,
Samrat

[1] https://github.com/generalui/s3p

On Wed, Oct 22, 2025 at 6:31 PM Piotr Nowojski <[email protected]> wrote:

> Hi Samrat,
>
> > *1.*  Even if the specifics are hazy, could you recall the general
> > nature of those concerns? For instance, were they related to S3's
> eventual
> > consistency model, which has since improved, the atomicity of Multipart
> > Upload commits, or perhaps complex failure/recovery scenarios during the
> > commit phase?
>
> and
>
> > *8. *The flink-s3-fs-presto connector explicitly throws an
> > `UnsupportedOperationException` when `createRecoverableWriter()` is
> called.
> > Was this a deliberate design choice to keep the Presto connector
> > lightweight and optimized specifically for checkpointing, or were there
> > other technical challenges that prevented its implementation at the time?
> > Any context on this would be very helpful
>
> I very vaguely remember that at least one of those concerns was with
> respect to how long
> does it take for the S3 to make some certain operations visible. That you
> think you have
> uploaded and committed a file, but in reality it might not be visible for
> tens of seconds.
>
> Sorry, I don't remember more (or even if there was more). I was only
> superficially involved
> in the S3 connector back then - just participated/overheard some
> discussions.
>
> > *2.*  It's clear that implementing an efficient PathsCopyingFileSystem[2]
> is
> > a non-negotiable requirement for performance. Is there any benchmark
> > numbers available that can be used as reference and evaluate new
> > implementation deviation ?
>
> I only have the numbers that I put in the original Flip [1]. I don't
> remember the benchmark
> setup, but it must have been something simple. Like just let some job
> accumulate 1GB of state
> and measure how long the state downloading phase of recovery was taking.
>
> > *3.* Do you recall the workload characteristics for that PoC?
> Specifically,
> > was the 30-40% performance advantage of s5cmd observed when copying many
> > small files (like checkpoint state) or larger, multi-gigabyte files?
>
> It was just a regular mix of compacted RocksDB sst files, with total state
> size 1 or at most
> a couple of GBs. So most of the files were around ~64MB or ~128MB, with a
> couple of
> smaller L0 files, and maybe one larger L2 file.
>
> > *4.* The idea of a switchable implementation sounds great. Would you
> > envision this as a configuration flag (e.g.,
> s3.native.copy.strategy=s5cmd
> > or s3.native.copy.strategy=sdk) that selects the backend implementation
> at
> > runtime? Also on contrary is it worth adding configuration that exposes
> > some level of implementation level information ?
>
> I think something like that should be fine, assuming that `s5cmd` will
> again
> prove significantly faster and/or more cpu efficient. If not, if the SDKv2
> has
> already improved and caught up with the `s5cmd`, then it probably doesn't
> make sense to keep `s5cmd` support.
>
> > *5.* My understanding is that the key takeaway here is to avoid the
> > file-by-file stream-based copy used in the vanilla connector and leverage
> > bulk operations, which PathsCopyingFileSystem[2] enables. This seems most
> > critical during state download on recovery. please suggest if my
> inference
> > is in right direction
>
> Yes, but you should also make the bult transfer configurable. How many bulk
> transfers
> can be happening in parallel etc.
>
> > *6.* The warning about `s5cmd` causing OOMs sounds like indication to
> > consider `S3TransferManager`[3] implementation, which might offer more
> > granular control over buffering and in-flight requests. Do you think
> > exploring more on `S3TransferManager` would be valuable ?
>
> I'm pretty sure if you start hundreds of bulk transfers in parallel via the
> `S3TransferManager` you can get the same problems with running out of
> memory or exceeding available network throughput. I don't know if
> `S3TransferManager` is better or worse in that regard to be honest.
>
> > *7.* The insight on AWS aggressively dropping packets instead of
> gracefully
> > throttling is invaluable. Currently i have limited understanding on how
> aws
> > behaves at throttling I will deep dive more into it and
> > look for clarification based on findings or doubt.  To counter this, were
> > you thinking of a configurable rate limiter within the filesystem itself
> > (e.g., setting max bandwidth or max concurrent requests), or something
> more
> > dynamic that could adapt to network conditions?
>
> Flat rate limiting is tricky because AWS offers burst network capacity,
> which
> comes very handy, and in the vast majority of cases works fine. But for
> some jobs
> if you exceed that burst capacity, AWS starts dropping your packets and
> then the
> problems happen. On the other hand, if rate limit to your normal capacity,
> you
> are leaving a lot of network throughput unused during recoveries.
>
> At the same time AWS doesn't share details for the burst  capacity, so it's
> sometimes
> tricky to configure the whole system properly. I don't have an universal
> good answer
> for that :(
>
> Best,
> Piotrek
>
>
>
> wt., 21 paź 2025 o 21:40 Samrat Deb <[email protected]> napisał(a):
>
> > Hi Gabor/ Ferenc
> >
> > Thank you for sharing the pointer and valuable feedback.
> >
> > The link to the custom `XmlResponsesSaxParser`[1] looks scary 😦
> > and contains hidden complexity.
> >
> > *1.* Could you share some context on why this custom parser was
> necessary?
> > Was it to work around a specific bug, a performance issue, or an
> > inconsistency in the S3 XML API responses that the default AWS SDK parser
> > couldn't handle at the time? With sdk v2 what are core functionality that
> > is required to be intensively tested ?
> >
> > *2.* You mentioned it has no Hadoop dependency, which is great news. For
> a
> > new native S3 connector, would integration simply require implementing a
> > new S3DelegationTokenProvider/Receiver pair using the AWS SDK, or are
> there
> > more subtle integration points with the framework that should be
> accounted?
> >
> > *3.* I remember solving Serialized Throwable exception issue [2] leading
> to
> > a new bug [3], where an initial fix led to a regression that Gabor later
> > solved with Ferenc providing a detailed root cause insights [4] 😅.
> > Its hard to fully sure that all scenarios are covered properly. This is
> one
> > of the example, there can be other unknowns.
> > what would be the best approach to test for and prevent such regressions
> or
> > unknown unknowns, especially in the most sensitive parts of the
> filesystem
> > logic?
> >
> > Cheers,
> > Samrat
> >
> > [1]
> >
> >
> https://github.com/apache/flink/blob/0e4e6d7082e83f098d0c1a94351babb3ea407aa8/flink-filesystems/flink-s3-fs-base/src/main/java/com/amazonaws/services/s3/model/transform/XmlResponsesSaxParser.java
> > [2] https://issues.apache.org/jira/browse/FLINK-28513
> > [3] https://github.com/apache/flink/pull/25231
> > [4] https://github.com/apache/flink/pull/25231#issuecomment-2312059662
> >
> > On Tue, 21 Oct 2025 at 3:49 PM, Gabor Somogyi <[email protected]
> >
> > wrote:
> >
> > > Hi Samrat,
> > >
> > > +1 on the direction that we move away from hadoop.
> > >
> > > This is a long standing discussion to replace the mentioned 2
> connectors
> > > with something better.
> > > Both of them has it's own weaknesses, I've fixed several blockers
> inside
> > > them.
> > >
> > > There are definitely magic inside them, please see this [1] for example
> > and
> > > there are more🙂
> > > I think the most sensitive part is the recovery because hard to test
> all
> > > cases.
> > >
> > > @Ferenc
> > > > One thing that comes to my mind that will need some changes and its
> > > involvement
> > > to this change is not trivial is the delegation token framework.
> > Currently
> > > it
> > > is also tied to the Hadoop stuff and has some abstract classes in the
> > base
> > > S3 FS
> > > module.
> > >
> > > The delegation token framework has no dependency on hadoop so there is
> no
> > > blocker on the road,
> > > but I'm here to help if any question appears.
> > >
> > > BR,
> > > G
> > >
> > > [1]
> > >
> > >
> >
> https://github.com/apache/flink/blob/0e4e6d7082e83f098d0c1a94351babb3ea407aa8/flink-filesystems/flink-s3-fs-base/src/main/java/com/amazonaws/services/s3/model/transform/XmlResponsesSaxParser.java#L95-L104
> > >
> > > On Tue, Oct 14, 2025 at 8:19 PM Samrat Deb <[email protected]>
> > wrote:
> > >
> > > > Hi All,
> > > >
> > > > Poorvank (cc'ed) and I are writing to start a discussion about a
> > > potential
> > > > improvement for Flink, creating a new, native S3 filesystem
> independent
> > > of
> > > > Hadoop/Presto.
> > > >
> > > > The goal of this proposal is to address several challenges related to
> > > > Flink's S3 integration, simplifying flink-s3-filesystem. If this
> > > discussion
> > > > gains positive traction, the next step would be to move forward with
> a
> > > > formalised FLIP.
> > > >
> > > > The Challenges with the Current S3 Connectors
> > > > Currently, Flink offers two primary S3 filesystems,
> > flink-s3-fs-hadoop[1]
> > > > and flink-s3-fs-presto[2]. While functional, this dual-connector
> > approach
> > > > has few issues:
> > > >
> > > > 1. The flink-s3-fs-hadoop connector adds an additional dependency to
> > > > manage. Upgrades like AWS SDK v2 are more dependent on Hadoop/Presto
> to
> > > > support first and leverage in flink-s3-filesystem. Sometimes it's
> > > > restrictive to leverage features directly from the AWS SDK.
> > > >
> > > > 2. The flink-s3-fs-presto connector was introduced to mitigate the
> > > > performance issues of the Hadoop connector, especially for
> > checkpointing.
> > > > However, it lacks a RecoverableWriter implementation.
> > > > Sometimes it's confusing for Flink users, highlighting the need for a
> > > > single, unified solution.
> > > >
> > > > *Proposed Solution:*
> > > > A Native, Hadoop-Free S3 Filesystem
> > > >
> > > > I propose we develop a new filesystem, let's call it
> > flink-s3-fs-native,
> > > > built directly on the modern AWS SDK for Java v2. This approach would
> > be
> > > > free of any Hadoop or Presto dependencies. I have done a small
> > prototype
> > > to
> > > > validate [3]
> > > >
> > > > This is motivated by trino<>s3 [4]. The Trino project successfully
> > > > undertook a similar migration, moving from Hadoop-based object
> storage
> > > > clients to their own native implementations.
> > > >
> > > > The new Flink S3 filesystem would:
> > > >
> > > > 1. Provide a single, unified connector for all S3 interactions, from
> > > state
> > > > backends to sinks.
> > > >
> > > > 2. Implement a high-performance S3RecoverableWriter using S3's
> > Multipart
> > > > Upload feature, ensuring exactly-once sink semantics.
> > > >
> > > > 3. Offer a clean, self-contained dependency, drastically simplifying
> > > setup
> > > > and eliminating external dependencies.
> > > >
> > > > A Phased Migration Path
> > > > To ensure a smooth transition, we could adopt a phased approach on a
> > very
> > > > high level :
> > > >
> > > > Phase 1:
> > > > Introduce the new native S3 filesystem as an optional, parallel
> plugin.
> > > > This would allow for community testing and adoption without breaking
> > > > existing setups.
> > > >
> > > > Phase 2:
> > > > Once the native connector achieves feature parity and proven
> stability,
> > > we
> > > > will update the documentation to recommend it as the default choice
> for
> > > all
> > > > S3 use cases.
> > > >
> > > > Phase 3:
> > > > In a future major release, the legacy flink-s3-fs-hadoop and
> > > > flink-s3-fs-presto connectors could be formally deprecated, with
> clear
> > > > migration guides provided for users.
> > > >
> > > > I would love to hear the community's thoughts on this.
> > > >
> > > > A few questions to start the discussion:
> > > >
> > > > 1. What are the biggest pain points with the current S3 filesystem?
> > > >
> > > > 2. Are there any critical features from the Hadoop S3A client that
> are
> > > > essential to replicate in a native implementation?
> > > >
> > > > 3. Would a simplified, non-dependent S3 experience be a valuable
> > > > improvement for Flink use cases?
> > > >
> > > >
> > > > Cheers,
> > > > Samrat
> > > >
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://github.com/apache/flink/tree/master/flink-filesystems/flink-s3-fs-hadoop
> > > > [2]
> > > >
> > > >
> > >
> >
> https://github.com/apache/flink/tree/master/flink-filesystems/flink-s3-fs-presto
> > > > [3] https://github.com/Samrat002/flink/pull/4
> > > > [4]
> > > https://github.com/trinodb/trino/tree/master/lib/trino-filesystem-s3
> > > >
> > >
> >
>

Reply via email to