Hi Samrat,

> *1.*  Even if the specifics are hazy, could you recall the general
> nature of those concerns? For instance, were they related to S3's eventual
> consistency model, which has since improved, the atomicity of Multipart
> Upload commits, or perhaps complex failure/recovery scenarios during the
> commit phase?

and

> *8. *The flink-s3-fs-presto connector explicitly throws an
> `UnsupportedOperationException` when `createRecoverableWriter()` is
called.
> Was this a deliberate design choice to keep the Presto connector
> lightweight and optimized specifically for checkpointing, or were there
> other technical challenges that prevented its implementation at the time?
> Any context on this would be very helpful

I very vaguely remember that at least one of those concerns was with
respect to how long
does it take for the S3 to make some certain operations visible. That you
think you have
uploaded and committed a file, but in reality it might not be visible for
tens of seconds.

Sorry, I don't remember more (or even if there was more). I was only
superficially involved
in the S3 connector back then - just participated/overheard some
discussions.

> *2.*  It's clear that implementing an efficient PathsCopyingFileSystem[2]
is
> a non-negotiable requirement for performance. Is there any benchmark
> numbers available that can be used as reference and evaluate new
> implementation deviation ?

I only have the numbers that I put in the original Flip [1]. I don't
remember the benchmark
setup, but it must have been something simple. Like just let some job
accumulate 1GB of state
and measure how long the state downloading phase of recovery was taking.

> *3.* Do you recall the workload characteristics for that PoC?
Specifically,
> was the 30-40% performance advantage of s5cmd observed when copying many
> small files (like checkpoint state) or larger, multi-gigabyte files?

It was just a regular mix of compacted RocksDB sst files, with total state
size 1 or at most
a couple of GBs. So most of the files were around ~64MB or ~128MB, with a
couple of
smaller L0 files, and maybe one larger L2 file.

> *4.* The idea of a switchable implementation sounds great. Would you
> envision this as a configuration flag (e.g., s3.native.copy.strategy=s5cmd
> or s3.native.copy.strategy=sdk) that selects the backend implementation at
> runtime? Also on contrary is it worth adding configuration that exposes
> some level of implementation level information ?

I think something like that should be fine, assuming that `s5cmd` will again
prove significantly faster and/or more cpu efficient. If not, if the SDKv2
has
already improved and caught up with the `s5cmd`, then it probably doesn't
make sense to keep `s5cmd` support.

> *5.* My understanding is that the key takeaway here is to avoid the
> file-by-file stream-based copy used in the vanilla connector and leverage
> bulk operations, which PathsCopyingFileSystem[2] enables. This seems most
> critical during state download on recovery. please suggest if my inference
> is in right direction

Yes, but you should also make the bult transfer configurable. How many bulk
transfers
can be happening in parallel etc.

> *6.* The warning about `s5cmd` causing OOMs sounds like indication to
> consider `S3TransferManager`[3] implementation, which might offer more
> granular control over buffering and in-flight requests. Do you think
> exploring more on `S3TransferManager` would be valuable ?

I'm pretty sure if you start hundreds of bulk transfers in parallel via the
`S3TransferManager` you can get the same problems with running out of
memory or exceeding available network throughput. I don't know if
`S3TransferManager` is better or worse in that regard to be honest.

> *7.* The insight on AWS aggressively dropping packets instead of
gracefully
> throttling is invaluable. Currently i have limited understanding on how
aws
> behaves at throttling I will deep dive more into it and
> look for clarification based on findings or doubt.  To counter this, were
> you thinking of a configurable rate limiter within the filesystem itself
> (e.g., setting max bandwidth or max concurrent requests), or something
more
> dynamic that could adapt to network conditions?

Flat rate limiting is tricky because AWS offers burst network capacity,
which
comes very handy, and in the vast majority of cases works fine. But for
some jobs
if you exceed that burst capacity, AWS starts dropping your packets and
then the
problems happen. On the other hand, if rate limit to your normal capacity,
you
are leaving a lot of network throughput unused during recoveries.

At the same time AWS doesn't share details for the burst  capacity, so it's
sometimes
tricky to configure the whole system properly. I don't have an universal
good answer
for that :(

Best,
Piotrek



wt., 21 paź 2025 o 21:40 Samrat Deb <[email protected]> napisał(a):

> Hi Gabor/ Ferenc
>
> Thank you for sharing the pointer and valuable feedback.
>
> The link to the custom `XmlResponsesSaxParser`[1] looks scary 😦
> and contains hidden complexity.
>
> *1.* Could you share some context on why this custom parser was necessary?
> Was it to work around a specific bug, a performance issue, or an
> inconsistency in the S3 XML API responses that the default AWS SDK parser
> couldn't handle at the time? With sdk v2 what are core functionality that
> is required to be intensively tested ?
>
> *2.* You mentioned it has no Hadoop dependency, which is great news. For a
> new native S3 connector, would integration simply require implementing a
> new S3DelegationTokenProvider/Receiver pair using the AWS SDK, or are there
> more subtle integration points with the framework that should be accounted?
>
> *3.* I remember solving Serialized Throwable exception issue [2] leading to
> a new bug [3], where an initial fix led to a regression that Gabor later
> solved with Ferenc providing a detailed root cause insights [4] 😅.
> Its hard to fully sure that all scenarios are covered properly. This is one
> of the example, there can be other unknowns.
> what would be the best approach to test for and prevent such regressions or
> unknown unknowns, especially in the most sensitive parts of the filesystem
> logic?
>
> Cheers,
> Samrat
>
> [1]
>
> https://github.com/apache/flink/blob/0e4e6d7082e83f098d0c1a94351babb3ea407aa8/flink-filesystems/flink-s3-fs-base/src/main/java/com/amazonaws/services/s3/model/transform/XmlResponsesSaxParser.java
> [2] https://issues.apache.org/jira/browse/FLINK-28513
> [3] https://github.com/apache/flink/pull/25231
> [4] https://github.com/apache/flink/pull/25231#issuecomment-2312059662
>
> On Tue, 21 Oct 2025 at 3:49 PM, Gabor Somogyi <[email protected]>
> wrote:
>
> > Hi Samrat,
> >
> > +1 on the direction that we move away from hadoop.
> >
> > This is a long standing discussion to replace the mentioned 2 connectors
> > with something better.
> > Both of them has it's own weaknesses, I've fixed several blockers inside
> > them.
> >
> > There are definitely magic inside them, please see this [1] for example
> and
> > there are more🙂
> > I think the most sensitive part is the recovery because hard to test all
> > cases.
> >
> > @Ferenc
> > > One thing that comes to my mind that will need some changes and its
> > involvement
> > to this change is not trivial is the delegation token framework.
> Currently
> > it
> > is also tied to the Hadoop stuff and has some abstract classes in the
> base
> > S3 FS
> > module.
> >
> > The delegation token framework has no dependency on hadoop so there is no
> > blocker on the road,
> > but I'm here to help if any question appears.
> >
> > BR,
> > G
> >
> > [1]
> >
> >
> https://github.com/apache/flink/blob/0e4e6d7082e83f098d0c1a94351babb3ea407aa8/flink-filesystems/flink-s3-fs-base/src/main/java/com/amazonaws/services/s3/model/transform/XmlResponsesSaxParser.java#L95-L104
> >
> > On Tue, Oct 14, 2025 at 8:19 PM Samrat Deb <[email protected]>
> wrote:
> >
> > > Hi All,
> > >
> > > Poorvank (cc'ed) and I are writing to start a discussion about a
> > potential
> > > improvement for Flink, creating a new, native S3 filesystem independent
> > of
> > > Hadoop/Presto.
> > >
> > > The goal of this proposal is to address several challenges related to
> > > Flink's S3 integration, simplifying flink-s3-filesystem. If this
> > discussion
> > > gains positive traction, the next step would be to move forward with a
> > > formalised FLIP.
> > >
> > > The Challenges with the Current S3 Connectors
> > > Currently, Flink offers two primary S3 filesystems,
> flink-s3-fs-hadoop[1]
> > > and flink-s3-fs-presto[2]. While functional, this dual-connector
> approach
> > > has few issues:
> > >
> > > 1. The flink-s3-fs-hadoop connector adds an additional dependency to
> > > manage. Upgrades like AWS SDK v2 are more dependent on Hadoop/Presto to
> > > support first and leverage in flink-s3-filesystem. Sometimes it's
> > > restrictive to leverage features directly from the AWS SDK.
> > >
> > > 2. The flink-s3-fs-presto connector was introduced to mitigate the
> > > performance issues of the Hadoop connector, especially for
> checkpointing.
> > > However, it lacks a RecoverableWriter implementation.
> > > Sometimes it's confusing for Flink users, highlighting the need for a
> > > single, unified solution.
> > >
> > > *Proposed Solution:*
> > > A Native, Hadoop-Free S3 Filesystem
> > >
> > > I propose we develop a new filesystem, let's call it
> flink-s3-fs-native,
> > > built directly on the modern AWS SDK for Java v2. This approach would
> be
> > > free of any Hadoop or Presto dependencies. I have done a small
> prototype
> > to
> > > validate [3]
> > >
> > > This is motivated by trino<>s3 [4]. The Trino project successfully
> > > undertook a similar migration, moving from Hadoop-based object storage
> > > clients to their own native implementations.
> > >
> > > The new Flink S3 filesystem would:
> > >
> > > 1. Provide a single, unified connector for all S3 interactions, from
> > state
> > > backends to sinks.
> > >
> > > 2. Implement a high-performance S3RecoverableWriter using S3's
> Multipart
> > > Upload feature, ensuring exactly-once sink semantics.
> > >
> > > 3. Offer a clean, self-contained dependency, drastically simplifying
> > setup
> > > and eliminating external dependencies.
> > >
> > > A Phased Migration Path
> > > To ensure a smooth transition, we could adopt a phased approach on a
> very
> > > high level :
> > >
> > > Phase 1:
> > > Introduce the new native S3 filesystem as an optional, parallel plugin.
> > > This would allow for community testing and adoption without breaking
> > > existing setups.
> > >
> > > Phase 2:
> > > Once the native connector achieves feature parity and proven stability,
> > we
> > > will update the documentation to recommend it as the default choice for
> > all
> > > S3 use cases.
> > >
> > > Phase 3:
> > > In a future major release, the legacy flink-s3-fs-hadoop and
> > > flink-s3-fs-presto connectors could be formally deprecated, with clear
> > > migration guides provided for users.
> > >
> > > I would love to hear the community's thoughts on this.
> > >
> > > A few questions to start the discussion:
> > >
> > > 1. What are the biggest pain points with the current S3 filesystem?
> > >
> > > 2. Are there any critical features from the Hadoop S3A client that are
> > > essential to replicate in a native implementation?
> > >
> > > 3. Would a simplified, non-dependent S3 experience be a valuable
> > > improvement for Flink use cases?
> > >
> > >
> > > Cheers,
> > > Samrat
> > >
> > >
> > > [1]
> > >
> > >
> >
> https://github.com/apache/flink/tree/master/flink-filesystems/flink-s3-fs-hadoop
> > > [2]
> > >
> > >
> >
> https://github.com/apache/flink/tree/master/flink-filesystems/flink-s3-fs-presto
> > > [3] https://github.com/Samrat002/flink/pull/4
> > > [4]
> > https://github.com/trinodb/trino/tree/master/lib/trino-filesystem-s3
> > >
> >
>

Reply via email to