Hi Piotr,

Thank you so much for taking the time to reply. Your perspective as one of
the original authors is incredibly valuable 😃.
I'd like to make sure I fully understand your points and ask a few
clarifying questions. This will be extremely helpful.

But I don't

remember the details, so it's hard for me to provide any feedback besides

some vague

warnings from the original authors of the current connector, that there

might be some

special corner cases that are hard to cover. So I guess proceed with

caution? 😅

Thank you for the heads-up regarding the "special corner cases" you
mentioned.

*1.*  Even if the specifics are hazy, could you recall the general
nature of those concerns? For instance, were they related to S3's eventual
consistency model, which has since improved, the atomicity of Multipart
Upload commits, or perhaps complex failure/recovery scenarios during the
commit phase?


I implemented a PoC of SDKv2 bulk file copy vs using `s5cmd`. Both were

significantly

faster than the standard implementation copy one file at a time using file

streams.

`s5cmd` was ~30%-40% faster than SDKv2. Due to that I've chosen to use

`s5cmd`, but

it has some downsides (relaying on external dependency and complicating the

setup).


*2.*  It's clear that implementing an efficient PathsCopyingFileSystem[2] is
a non-negotiable requirement for performance. Is there any benchmark
numbers available that can be used as reference and evaluate new
implementation deviation ?

*3.* Do you recall the workload characteristics for that PoC? Specifically,
was the 30-40% performance advantage of s5cmd observed when copying many
small files (like checkpoint state) or larger, multi-gigabyte files?

*4.* The idea of a switchable implementation sounds great. Would you
envision this as a configuration flag (e.g., s3.native.copy.strategy=s5cmd
or s3.native.copy.strategy=sdk) that selects the backend implementation at
runtime? Also on contrary is it worth adding configuration that exposes
some level of implementation level information ?


b) Try to keep memory usage under control. I think the biggest weakness of

the current

`s5cmd` approach is that it doesn't control well how much memory is used

for the state

download, leading to occasional OOM issues.


*5.* My understanding is that the key takeaway here is to avoid the
file-by-file stream-based copy used in the vanilla connector and leverage
bulk operations, which PathsCopyingFileSystem[2] enables. This seems most
critical during state download on recovery. please suggest if my inference
is in right direction

*6.* The warning about `s5cmd` causing OOMs sounds like indication to
consider `S3TransferManager`[3] implementation, which might offer more
granular control over buffering and in-flight requests. Do you think
exploring more on `S3TransferManager` would be valuable ?

c) In AWS it can be very painful if you download/upload the state/files too

quickly. If you

exceed your burst network quota, AWS doesn't gracefully slow down the

network but

very aggressively starts dropping packets, very easily leading to

TaskManagers losing

connection with the JobManager. Some rate limiting capability is thus

necessary. For

example the current `s5cmd` has some (very crude) control over that, via

controlling

how many parallel downloads you can have etc.


*7.* The insight on AWS aggressively dropping packets instead of gracefully
throttling is invaluable. Currently i have limited understanding on how aws
behaves at throttling I will deep dive more into it and
look for clarification based on findings or doubt.  To counter this, were
you thinking of a configurable rate limiter within the filesystem itself
(e.g., setting max bandwidth or max concurrent requests), or something more
dynamic that could adapt to network conditions?

 I vaguely remember there were some old

concerns about

supporting FileSystem based sink (is it the one using

`S3RecoverableWriter`?).

As per my understanding from the codebase, S3RecoverableWriter[1] implements
Flink’s recoverable file sink contract for S3, so application can tolerate
task failures while writing checkpointed or transactional output. It wraps
S3 multipart uploads and exposing recover and commit serializers back to
Flink’s RecoverableWriter APIs.

Flink jobs that write to s3:// paths via FileSink or StreamingFileSink rely
on this writer whenever the configured filesystem is `flink-s3-fs-hadoop`
but not available for `flink-s3-fs-presto` , code reference[1.1].

*8. *The flink-s3-fs-presto connector explicitly throws an
`UnsupportedOperationException` when `createRecoverableWriter()` is called.
Was this a deliberate design choice to keep the Presto connector
lightweight and optimized specifically for checkpointing, or were there
other technical challenges that prevented its implementation at the time?
Any context on this would be very helpful


I will have more queries and open questions as i progress.

Bests,
Samrat

[1]
https://github.com/apache/flink/blob/1a7c0f1bcdc92d550d38a85fee1900572e6aaac5/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
[1.1]
https://github.com/apache/flink/blob/1a7c0f1bcdc92d550d38a85fee1900572e6aaac5/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java#L459

[2]
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/PathsCopyingFileSystem.java#L37

[3]
https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/transfer/s3/S3TransferManager.html

On Mon, Oct 20, 2025 at 1:47 PM Piotr Nowojski <[email protected]> wrote:

> Hi,
>
> Thanks for looking into this. I vaguely remember there were some old
> concerns about
> supporting FileSystem based sink (is it the one using
> `S3RecoverableWriter`?). But I don't
> remember the details, so it's hard for me to provide any feedback besides
> some vague
> warnings from the original authors of the current connector, that there
> might be some
> special corner cases that are hard to cover. So I guess proceed with
> caution? 😅
>
> > A few questions to start the discussion:
> >
> > 2. Are there any critical features from the Hadoop S3A client that are
> essential to replicate in a native implementation?
>
> Please implement `PathsCopyingFileSystem` [1] for the new connector. Either
> keep
> the current existing implementation using `s5cmd` or implement a new one
> using AWS
> SDKv2 `S3TransferManager` (or something equivalent). Or ideally add support
> to switch
> between both `s5cmd` and SDKv2.
>
> A couple of years ago to address some of the performance concerns of the S3
> connector
> I implemented a PoC of SDKv2 bulk file copy vs using `s5cmd`. Both were
> significantly
> faster than the standard implementation copy one file at a time using file
> streams.
> `s5cmd` was ~30%-40% faster than SDKv2. Due to that I've chosen to use
> `s5cmd`, but
> it has some downsides (relaying on external dependency and complicating the
> setup).
>
> > 1. What are the biggest pain points with the current S3 filesystem?
>
> a) Download speed performance/CPU efficiency. Without using
> PathsCopyingFileSystem [1]
> the vanilla connector is very inefficient - to saturate available network
> it must use much
> much more CPU compared to for example `s5cmd`.
>
> b) Try to keep memory usage under control. I think the biggest weakness of
> the current
> `s5cmd` approach is that it doesn't control well how much memory is used
> for the state
> download, leading to occasional OOM issues.
>
> c) In AWS it can be very painful if you download/upload the state/files too
> quickly. If you
> exceed your burst network quota, AWS doesn't gracefully slow down the
> network but
> very aggressively starts dropping packets, very easily leading to
> TaskManagers losing
> connection with the JobManager. Some rate limiting capability is thus
> necessary. For
> example the current `s5cmd` has some (very crude) control over that, via
> controlling
> how many parallel downloads you can have etc.
>
> That's based on our (Confluent) perspective & experience.
>
> Best,
> Piotrek
>
> [1]
>
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/PathsCopyingFileSystem.java#L37
>
> pt., 17 paź 2025 o 19:49 Ferenc Csaky <[email protected]>
> napisał(a):
>
> > Hi,
> >
> > Thanks for starting this discussion. Heavy +1 from me.
> >
> > AWS v1 is EOL at the end of 2025, so the Hadoop S3 FS has to be updated
> > pretty
> > soonish as well. But that's not really news fer you guys [1] :)
> Personally
> > I
> > don't think that would make this proposal even a tiny bit less important.
> >
> > What I see nowadays there are more and more Hadoop-less use-cases, so
> > eliminating Hadoop bloat where it is not a must, IMO is a net gain,
> period.
> >
> > One thing that comes to my mind that will need some changes and its
> > involvement
> > to this change is not trivial is the delegation token framework.
> Currently
> > it
> > is also tied to the Hadoop stuff and has some abstract classes in the
> base
> > S3 FS
> > module.
> >
> > Another funny thing I personally experienced and also points out problems
> > with
> > the current setup is if you use Iceberg with an AWS Glue catalog, you
> must
> > also
> > bundle the AWS SDK v2, cause Iceberg depends on that version. So if
> someone
> > would like to do that currently they cannot really escape bloating their
> cp
> > with both AWS SDK.
> >
> > Best,
> > Ferenc
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-30975
> >
> >
> >
> > On Friday, October 17th, 2025 at 17:12, Tom Cooper <[email protected]>
> > wrote:
> >
> > >
> > >
> > > Hi Samrat,
> > >
> > > +1 from me. I think this would be a brilliant contribution. The Hadoop
> > libraries are often full of CVEs and updating them can be, IMHO, one of
> the
> > hardest chores in the Flink code base.
> > > So from a purely maintenance POV I think this work would be valuable.
> > Also having the most up to date AWS Java SDK means we keep up with all
> the
> > auth requirements and opens up more options for using advanced features
> in
> > future.
> > >
> > > Frankly, I think in the long term, Flink would be better off moving
> away
> > from Hadoop altogether (but that is a much bigger discussion).
> > >
> > > Thanks,
> > >
> > > Tom Cooper
> > > @tomcooper.dev | https://tomcooper.dev
> > >
> > >
> > > On Tuesday, 14 October 2025 at 19:19, Samrat Deb [email protected]
> > wrote:
> > >
> > > > Hi All,
> > > >
> > > > Poorvank (cc'ed) and I are writing to start a discussion about a
> > potential
> > > > improvement for Flink, creating a new, native S3 filesystem
> > independent of
> > > > Hadoop/Presto.
> > > >
> > > > The goal of this proposal is to address several challenges related to
> > > > Flink's S3 integration, simplifying flink-s3-filesystem. If this
> > discussion
> > > > gains positive traction, the next step would be to move forward with
> a
> > > > formalised FLIP.
> > > >
> > > > The Challenges with the Current S3 Connectors
> > > > Currently, Flink offers two primary S3 filesystems,
> > flink-s3-fs-hadoop[1]
> > > > and flink-s3-fs-presto[2]. While functional, this dual-connector
> > approach
> > > > has few issues:
> > > >
> > > > 1. The flink-s3-fs-hadoop connector adds an additional dependency to
> > > > manage. Upgrades like AWS SDK v2 are more dependent on Hadoop/Presto
> to
> > > > support first and leverage in flink-s3-filesystem. Sometimes it's
> > > > restrictive to leverage features directly from the AWS SDK.
> > > >
> > > > 2. The flink-s3-fs-presto connector was introduced to mitigate the
> > > > performance issues of the Hadoop connector, especially for
> > checkpointing.
> > > > However, it lacks a RecoverableWriter implementation.
> > > > Sometimes it's confusing for Flink users, highlighting the need for a
> > > > single, unified solution.
> > > >
> > > > Proposed Solution:
> > > > A Native, Hadoop-Free S3 Filesystem
> > > >
> > > > I propose we develop a new filesystem, let's call it
> > flink-s3-fs-native,
> > > > built directly on the modern AWS SDK for Java v2. This approach would
> > be
> > > > free of any Hadoop or Presto dependencies. I have done a small
> > prototype to
> > > > validate [3]
> > > >
> > > > This is motivated by trino<>s3 [4]. The Trino project successfully
> > > >
> > > > undertook a similar migration, moving from Hadoop-based object
> storage
> > > > clients to their own native implementations.
> > > >
> > > > The new Flink S3 filesystem would:
> > > >
> > > > 1. Provide a single, unified connector for all S3 interactions, from
> > state
> > > > backends to sinks.
> > > >
> > > > 2. Implement a high-performance S3RecoverableWriter using S3's
> > Multipart
> > > > Upload feature, ensuring exactly-once sink semantics.
> > > >
> > > > 3. Offer a clean, self-contained dependency, drastically simplifying
> > setup
> > > > and eliminating external dependencies.
> > > >
> > > > A Phased Migration Path
> > > > To ensure a smooth transition, we could adopt a phased approach on a
> > very
> > > > high level :
> > > >
> > > > Phase 1:
> > > > Introduce the new native S3 filesystem as an optional, parallel
> plugin.
> > > > This would allow for community testing and adoption without breaking
> > > > existing setups.
> > > >
> > > > Phase 2:
> > > > Once the native connector achieves feature parity and proven
> > stability, we
> > > > will update the documentation to recommend it as the default choice
> > for all
> > > > S3 use cases.
> > > >
> > > > Phase 3:
> > > > In a future major release, the legacy flink-s3-fs-hadoop and
> > > > flink-s3-fs-presto connectors could be formally deprecated, with
> clear
> > > > migration guides provided for users.
> > > >
> > > > I would love to hear the community's thoughts on this.
> > > >
> > > > A few questions to start the discussion:
> > > >
> > > > 1. What are the biggest pain points with the current S3 filesystem?
> > > >
> > > > 2. Are there any critical features from the Hadoop S3A client that
> are
> > > > essential to replicate in a native implementation?
> > > >
> > > > 3. Would a simplified, non-dependent S3 experience be a valuable
> > > > improvement for Flink use cases?
> > > >
> > > > Cheers,
> > > > Samrat
> > > >
> > > > [1]
> > > >
> >
> https://github.com/apache/flink/tree/master/flink-filesystems/flink-s3-fs-hadoop
> > > > [2]
> > > >
> >
> https://github.com/apache/flink/tree/master/flink-filesystems/flink-s3-fs-presto
> > > > [3] https://github.com/Samrat002/flink/pull/4
> > > > [4]
> > https://github.com/trinodb/trino/tree/master/lib/trino-filesystem-s3
> >
>

Reply via email to