Hi Poorvank, thanks for the FLIP :) Added some comments on the Google doc.
Some of them might be irrelevant since I am not that familiar with the
codebase at the moment.

Otherwise LGTM

On Wed, 5 Mar 2025 at 06:14, Poorvank Bhatia <puravbhat...@gmail.com> wrote:

> Hey everyone,
>
> Before raising a vote thread, can anyone please help with copying the
> contents of the modified doc
> <
> https://docs.google.com/document/d/1XI2DV-8r-kOwbMd2ZMdV4u0Q_s5m8Stojv4HSdJA8ZU/edit?tab=t.0#heading=h.1zdhrsm5oaad
> >
> to this FLIP
> <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-509+Add+pluggable+Batching+for+Async+Sink
> >
> page (mentioned on the proposal page
> <
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65145551#FlinkImprovementProposals-Process
> >
> ).
>
> Thank you.
>
> On Tue, Mar 4, 2025 at 9:00 AM Poorvank Bhatia <puravbhat...@gmail.com>
> wrote:
>
> > Hey devs,
> >
> > Thank you all for participating in the discussion! I've updated this doc
> > based on feedback. Please feel free to share any further thoughts.
> >
> > If there are no additional suggestions, I plan to initiate a vote within
> > the next few days.
> >
> > On Wed, Feb 26, 2025 at 9:00 PM Poorvank Bhatia <puravbhat...@gmail.com>
> > wrote:
> >
> >> Thanks Arvid. As discussed offline, I have made the changes to doc.
> >>
> >>
> >>    1. Gotten rid of the protected methods in the constructor.
> >>    2. Added a new constructor that takes in BatchCreator/BufferWrapper
> >>    from the current one.
> >>    3. The new constructor defaults to the Default BatchCreator and
> >>    BufferWrapper (mimicking the current behaviour)
> >>
> >>
> >> So:
> >>
> >>    - Since the old constructor remains unchanged, Flink 1.x will
> >>    continue using it without any issues.
> >>    - Flink 1.x was compiled with the old constructor signature, so it
> >>    will only look for and call that constructor.
> >>    - Even though the class contains references
> >>    to BatchCreator and BufferWrapper, they won’t be loaded if the
> constructor
> >>    that references them is never used.
> >>    - If Flink 1.x explicitly tries to call the new constructor (which
> >>    includes BatchCreator and BufferWrapper), it will fail since those
> classes
> >>    do not exist in Flink 1.x. But this will only happen when a connector
> >>    essentially is using the BatchCreator or the new interfaces right,
> so what
> >>    I assume is it should be the connectors responsibility to provide
> >>    differentiation if it uses BatchCreator . As for Cassandra, we can
> provide
> >>    2 implementations one without Batch and other with Batch, so the one
> with
> >>    Batch can check at runtime if the newer classes can be used.
> >>
> >> So the connectors that want to use customized batching would be
> dependent
> >> on 2.1. If it is used with (a previous flink runtime) a check can be
> added
> >> in the connector to make sure that BatchCreator is available. (simple
> check
> >> via reflection).
> >> Please have a look and let me know your thoughts.
> >>
> >> Hey Danny.
> >> In case you missed the last response, I have added a BufferWrapper and
> >> removed the hardcoded Dequeue check. Please have a look and let me know
> >> your thoughts.
> >>
> >>
> >> On Tue, Feb 25, 2025 at 2:10 PM Arvid Heise <ar...@apache.org> wrote:
> >>
> >>> Hi Poorvank,
> >>>
> >>> I don't have strong feelings about the actual choice of technology.
> >>> SPI is used rather heavily in Flink Table/SQL and is also used in
> >>> similar projects more and more. But SPI also uses reflection under the
> >>> hood, so it doesn't make a huge difference except from a usability
> >>> perspective. SPI has some tool support. IDEA, for example, allows you
> >>> to jump back and forth between the service definition and the
> >>> implementations.
> >>>
> >>> In any case, protected methods that are called in the ctor cannot be
> >>> overridden because at the point where the base object is initialized
> >>> the VMT doesn't contain references to methods of the subclasses. That
> >>> is a deliberate choice of the VM engineers to avoid cases where an
> >>> overridden method accesses fields that have not been initialized yet
> >>> because the subclass ctor wasn't invoked yet.
> >>>
> >>> Also I'm not sure if class loading would succeed if you have a method
> >>> that returns an unknown class (let's say you want to run your
> >>> connector on Flink 1.X, which was built against Flink 2.1). I'm
> >>> assuming it will fail.
> >>>
> >>> So we need other choices. I'm curious if annotations could be used (in
> >>> conjunction with reflection again). I'm guessing classloading will
> >>> also fail if an unknown annotation is attached or loading of the
> >>> annotation itself will fail if the referenced class cannot be loaded.
> >>> If it works somehow, we could say:
> >>> @BatchedWith(MyBatchCreator.class)
> >>> class MyConnector extends Async...
> >>>
> >>> Unfortunately, I don't have time to try out these ideas. Maybe you
> have.
> >>>
> >>> And in any case, if it gets too complicated, we can always forgo
> >>> backwards compatibility and require specific builds for Flink 2.1+.
> >>> This may also be the easier option in case the connector doesn't
> >>> perform too well without the BatchCreator anyways.
> >>>
> >>> Best,
> >>>
> >>> Arvid
> >>>
> >>> On Thu, Feb 20, 2025 at 6:56 PM Poorvank Bhatia <
> puravbhat...@gmail.com>
> >>> wrote:
> >>> >
> >>> > Hey Arvid,
> >>> >
> >>> > Thank you for your feedback and for taking the time to review this
> >>> > proposal.
> >>> > To answer your concerns:
> >>> >
> >>> > *Naming Suggestion: BatchCreationResult → Batch *
> >>> > This is a valid point, and I agree that BatchCreationResult is
> >>> essentially
> >>> > representing a Batch. The current name was chosen to reflect that it
> >>> > encapsulates metadata such as batch size and count, in addition to
> the
> >>> > entries.  I’ll update the proposal to reflect this suggestion and
> >>> rename
> >>> > the class accordingly.
> >>> >
> >>> > *Compatibility Concerns*: You are on point regarding backward
> >>> compatibility
> >>> > and avoiding API breakages in connectors.
> >>> > Based on our discussion offline, I have removed the BatchCreator
> >>> interface
> >>> > from the constructor and it is not exposed in the public API of
> >>> > AsyncSinkWriter.
> >>> > Instead of requiring direct injection via the constructor, the
> >>> > implementation now uses protected factory methods
> >>> (createBatchCreator()),
> >>> > allowing subclasses to override the behavior without modifying the
> base
> >>> > API.
> >>> > If no subclass overrides the methods, Flink will continue using the
> >>> default
> >>> > SimpleBatchCreator and DequeBufferWrapper, maintaining full backward
> >>> > compatibility.
> >>> > You suggested using Java Reflection to dynamically instantiate
> >>> > BatchCreator. While reflection offers flexibility, I aimed to avoid
> it
> >>> > primarily due to performance overhead & guidelines
> >>> > <
> >>>
> https://flink.apache.org/how-to-contribute/code-style-and-quality-java/#java-reflection
> >>> >.
> >>> > That said, since reflection would only be used during initialization,
> >>> the
> >>> > impact should be minimal. An alternative approach you suggested for
> >>> Service
> >>> > Provider Interface (SPI), which allows for dynamic discovery of
> >>> > implementations. However, it requires sink implementers to define a
> >>> > META-INF/services file, which I was trying to avoid for simplicity.
> >>> > If you think SPI is a better approach as compared to simple factor
> >>> methods
> >>> > i can add a simple loader:
> >>> >
> >>> > *private BatchCreator<RequestEntryT> loadBatchCreator() {  *
> >>> > *       ServiceLoader<BatchCreator> loader =
> >>> > ServiceLoader.load(BatchCreator.class);  *
> >>> > *      for (BatchCreator<RequestEntryT> creator : loader) { *
> >>> > *          return creator; *
> >>> > *      }   *
> >>> > *      return new SimpleBatchCreator<>(maxBatchSizeInBytes);  *
> >>> > *} *
> >>> >
> >>> > Let me know what makes the most sense to you, and I can either
> replace
> >>> the
> >>> > factory methods with SPI loaders or keep them as they are.
> >>> >
> >>> > On Thu, Feb 20, 2025 at 10:26 PM Poorvank Bhatia <
> >>> puravbhat...@gmail.com>
> >>> > wrote:
> >>> >
> >>> > > Hey Danny,
> >>> > >
> >>> > > Thank you for the review. Your observation is absolutely valid, and
> >>> moving
> >>> > > away from *Deque<RequestEntryWrapper<RequestEntryT>>
> >>> > > bufferedRequestEntries* in favor of a more flexible abstraction
> makes
> >>> > > sense.
> >>> > > *Given the exposure of batch creators, Deque may not be the most
> >>> > > semantically appropriate or performant choice. To address this, a
> >>> more
> >>> > > adaptable buffering abstraction that better aligns with different
> >>> batching
> >>> > > strategies can be used.*
> >>> > >
> >>> > > *So I Updated the proposal and*
> >>> > > *Introduced BufferWrapper Interface:*
> >>> > >
> >>> > >    - *Defines an abstraction for managing buffered request
> entries. *
> >>> > >    - *Supports FIFO, priority-based inserts, and custom queuing
> >>> > >    mechanisms. *
> >>> > >    - *Provides essential operations like add, poll, peek, and
> >>> > >    getBufferedState. *
> >>> > >
> >>> > > *Implemented Default DequeBufferWrapper *
> >>> > >
> >>> > >    - *Uses ArrayDeque for efficient FIFO behavior while allowing
> >>> > >    prioritized inserts. *
> >>> > >    - *Maintains backward compatibility with minimal overhead. *
> >>> > >
> >>> > > *Modified AsyncSinkWriter to Use BufferWrapper Instead of Deque *
> >>> > >
> >>> > >    - *bufferedRequestEntries is now of type
> >>> BufferWrapper<RequestEntryT>,
> >>> > >    making the choice of buffer implementation flexible. *
> >>> > >    - *A createBuffer() method initializes DequeBufferWrapper by
> >>> default. *
> >>> > >
> >>> > > *Updated BatchCreator and SimpleBatchCreator *
> >>> > >
> >>> > >    - *Modified method signatures to work with
> >>> > >    BufferWrapper<RequestEntryT> instead of
> >>> > >    Deque<RequestEntryWrapper<RequestEntryT>>. *
> >>> > >    - *Ensures better encapsulation and allows future optimizations
> in
> >>> > >    buffering strategies.*
> >>> > >
> >>> > > I have added this interface and updated the doc. Please have a look
> >>> and
> >>> > > let me know if this makes sense.
> >>> > >
> >>> > >
> >>> > > On Thu, Feb 20, 2025 at 5:34 PM Arvid Heise
> >>> <ahe...@confluent.io.invalid>
> >>> > > wrote:
> >>> > >
> >>> > >> Hi Poorvank,
> >>> > >>
> >>> > >> thanks for putting this together. It's obvious to me that this is
> a
> >>> > >> good addition. I wish Ahmed could check if his proposals are
> >>> > >> compatible with yours, so we don't end up with two different ways
> to
> >>> > >> express the same thing. Ideally Ahmed's proposal could be
> >>> retrofitted
> >>> > >> to extend on yours.
> >>> > >>
> >>> > >> I have a small nit and a larger concern. The nit is to rename
> >>> > >> BatchCreationResult into Batch because that's what it is.
> >>> > >>
> >>> > >> My main concern is around compatibility. If the new interfaces end
> >>> up
> >>> > >> in the signature of the AsyncSinkWriter, we will not be able to
> run
> >>> a
> >>> > >> connector built on top of it with an older Flink version. Either
> we
> >>> > >> explicitly call it out or we find a way to avoid that scenario.
> The
> >>> > >> connector community had some negative experiences around evolving
> >>> > >> APIs.
> >>> > >>
> >>> > >> One way to avoid breaking compatibility is to provide add-ons
> where
> >>> > >> the interfaces are not baked into the AsyncSinkWriter's API.
> >>> Consider
> >>> > >> your `BatchCreator` interface. If it's just used within the
> >>> > >> AsyncSinkWriter (private field, private initialization), then we
> >>> could
> >>> > >> run the same connector on an older Flink version. In that case,
> >>> > >> batching just wouldn't be enabled. Of course, all of that only
> >>> matters
> >>> > >> if it's even viable to run the connector without batching. If not
> >>> > >> then, just make the connector version depend at least on Flink
> 2.1+
> >>> or
> >>> > >> whenever this feature lands.
> >>> > >>
> >>> > >> Now the question arises how to initialize the BatchCreator. Your
> >>> > >> proposal injects it but maybe we can use some kind of discovery
> >>> > >> instead (e.g. SPI, convention, and/or annotations). Here is a
> naive
> >>> > >> sketch with reflection using a convention (I'd prefer SPI though).
> >>> > >>
> >>> > >> public AsyncSinkWriter(
> >>> > >>         ElementConverter<InputT, RequestEntryT> elementConverter,
> >>> > >>         WriterInitContext context,
> >>> > >>         AsyncSinkWriterConfiguration configuration,
> >>> > >>         Collection<BufferedRequestState<RequestEntryT>> states) {
> >>> > >>     this(elementConverter, new Sink.InitContextWrapper(context),
> >>> > >> configuration, states);
> >>> > >>
> >>> > >>     try {
> >>> > >>         batchCreator =
> >>> > >>                 (BatchCreator)
> >>> > >>                         Class.forName(getClass().getName() +
> >>> > >> "BatchCreator").newInstance();
> >>> > >>     } catch (ClassNotFoundException e) {
> >>> > >>         batchCreator = BatchCreator.DEFAULT;
> >>> > >>     } catch (IllegalAccessException | InstantiationException e) {
> >>> > >>         throw new RuntimeException(e);
> >>> > >>     }
> >>> > >> }
> >>> > >>
> >>> > >> class CassandraBatchCreator implements BatchCreator { ... }
> >>> > >>
> >>> > >> Best,
> >>> > >>
> >>> > >> Arvid
> >>> > >>
> >>> > >>
> >>> > >> On Wed, Feb 19, 2025 at 11:07 PM Danny Cranmer <
> >>> dannycran...@apache.org>
> >>> > >> wrote:
> >>> > >> >
> >>> > >> > Hello Poorvank,
> >>> > >> >
> >>> > >> > Thanks for opening this discussion/FLIP. I am +1 for the idea,
> it
> >>> seems
> >>> > >> > like a great enhancement to the AsyncSink.
> >>> > >> >
> >>> > >> > Just one question/observation from me. We currently use a
> >>> "Deque<..>
> >>> > >> > bufferedRequestEntries". Now we are exposing these batch
> >>> creators; it
> >>> > >> might
> >>> > >> > not make sense to use a queue anymore. On one hand it does not
> >>> make as
> >>> > >> much
> >>> > >> > semantic sense because we could have frequent queue jumpers. On
> >>> the
> >>> > >> other
> >>> > >> > hand it might not be performant, since Deque lookups provide
> O(n)
> >>> > >> > performance. What do you think? This is an internal data
> >>> structure so we
> >>> > >> > could consider replacing it with something else, but I might
> just
> >>> be
> >>> > >> > prematurely optimising this.
> >>> > >> >
> >>> > >> > Thanks,
> >>> > >> > Danny
> >>> > >> >
> >>> > >> > On Thu, Feb 13, 2025 at 5:31 PM Ahmed Hamdy <
> hamdy10...@gmail.com
> >>> >
> >>> > >> wrote:
> >>> > >> >
> >>> > >> > > FYI: Created FLIP-509.
> >>> > >> > >
> >>> > >> > > Best Regards
> >>> > >> > > Ahmed Hamdy
> >>> > >> > >
> >>> > >> > >
> >>> > >> > > On Thu, 13 Feb 2025 at 16:30, Ahmed Hamdy <
> hamdy10...@gmail.com
> >>> >
> >>> > >> wrote:
> >>> > >> > >
> >>> > >> > > > Hi Poorvank,
> >>> > >> > > > thanks for the feedback, I can see your point and I kinda
> >>> agree, I
> >>> > >> would
> >>> > >> > > > say, if you don't need the flushing trigger interface in
> your
> >>> use
> >>> > >> case
> >>> > >> > > > let's keep it out of the FLIP for now, Also let's wait for
> the
> >>> > >> feedback
> >>> > >> > > > about that from other members.
> >>> > >> > > > I will create a FLIP and assign it a number.
> >>> > >> > > >
> >>> > >> > > > Best Regards
> >>> > >> > > > Ahmed Hamdy
> >>> > >> > > >
> >>> > >> > > >
> >>> > >> > > > On Thu, 13 Feb 2025 at 09:15, Poorvank Bhatia <
> >>> > >> puravbhat...@gmail.com>
> >>> > >> > > > wrote:
> >>> > >> > > >
> >>> > >> > > >> Hello Ahmed,
> >>> > >> > > >> Thank you for your insights! In fact, FLIP-284
> >>> > >> > > >> <
> >>> > >> > > >>
> >>> > >> > >
> >>> > >>
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-284+%3A+Making+AsyncSinkWriter+Flush+triggers+adjustable
> >>> > >> > > >> >'s
> >>> > >> > > >> proposal aligns so well with this approach, I have added it
> >>> has the
> >>> > >> > > future
> >>> > >> > > >> scope.
> >>> > >> > > >> To answer your questions:
> >>> > >> > > >>
> >>> > >> > > >> I assume we will not expose batch creators to users but
> only
> >>> to
> >>> > >> sink
> >>> > >> > > >> implementers, am I correct?
> >>> > >> > > >>
> >>> > >> > > >>    - *Yes, that is correct. The BatchCreator interface is
> >>> intended
> >>> > >> to be
> >>> > >> > > >>    used only by sink implementers, not end users. Since the
> >>> > >> SinkWriter
> >>> > >> > > is
> >>> > >> > > >>    responsible for defining how records are batched and
> >>> written,
> >>> > >> only
> >>> > >> > > >> those
> >>> > >> > > >>    developing a custom sink would need access to this
> >>> > >> functionality.*
> >>> > >> > > >>
> >>> > >> > > >> It is nit: but I would prefer adding the creator to the
> >>> > >> > > >> `AsyncSinkWriterConfiguration` rather than overloading the
> >>> > >> constructor,
> >>> > >> > > >> there are some already implemented ways for instantiating
> the
> >>> > >> configs
> >>> > >> > > with
> >>> > >> > > >> defaults so would be simpler.
> >>> > >> > > >>
> >>> > >> > > >>
> >>> > >> > > >>    - *Yes, BatchCreator can be included in
> >>> > >> AsyncSinkWriterConfiguration.
> >>> > >> > > >>    However, my initial reasoning for keeping it separate
> was
> >>> that
> >>> > >> > > >> BatchCreator
> >>> > >> > > >>    is not just a configuration parameter—it's a functional
> >>> > >> component
> >>> > >> > > that
> >>> > >> > > >>    directly influences how records are batched. This makes
> >>> it more
> >>> > >> of a
> >>> > >> > > >>    behavioral dependency rather than a passive setting.
> That
> >>> said,
> >>> > >> if
> >>> > >> > > >>    incorporating it into AsyncSinkWriterConfiguration
> aligns
> >>> > >> better with
> >>> > >> > > >> the
> >>> > >> > > >>    overall design, it is a nit change and I'm open to
> >>> updating the
> >>> > >> > > >> approach
> >>> > >> > > >>    accordingly. Do let me know your thoughts, as
> >>> > >> RateLimitingStartegy is
> >>> > >> > > >> also
> >>> > >> > > >>    a part of it.*
> >>> > >> > > >>
> >>> > >> > > >>
> >>> > >> > > >>  I want to leverage some concept from FLIP-284 that we have
> >>> > >> previously
> >>> > >> > > >> seen in DynamoDbAsyncSink, do you think we can also
> >>> customize the
> >>> > >> > > trigger
> >>> > >> > > >> so we can also add a "readyToFlush()" method or something
> >>> similar
> >>> > >> and
> >>> > >> > > use
> >>> > >> > > >> it in `nonBlockingFlush` methods instead of forcing the
> >>> trigger on
> >>> > >> only
> >>> > >> > > >> total record count or buffer size. I see this useful in
> your
> >>> case
> >>> > >> as
> >>> > >> > > well
> >>> > >> > > >> because if we have buffered 100 requests from 100 different
> >>> > >> partitions
> >>> > >> > > >> then
> >>> > >> > > >> we will keep triggering and emitting batches of 1 for 100
> >>> times
> >>> > >> (unless
> >>> > >> > > >> maxTimeInBufferMS is used ofc) so you can use the custom
> >>> trigger
> >>> > >> for
> >>> > >> > > >> minimum buffered records per partition. This might be a bit
> >>> tricky
> >>> > >> > > because
> >>> > >> > > >> in order to not affect the performance we probably will
> wrap
> >>> the
> >>> > >> whole
> >>> > >> > > >> buffer in the batchCreator to maintain calculations like
> >>> size in
> >>> > >> bytes
> >>> > >> > > or
> >>> > >> > > >> per partition record count in the batch creator making it
> >>> more
> >>> > >> > > >> "bufferHandler" rather than a "batchCreator", let me know
> >>> your
> >>> > >> thoughts
> >>> > >> > > >> about this
> >>> > >> > > >>
> >>> > >> > > >>
> >>> > >> > > >>    -
> >>> > >> > > >>
> >>> > >> > > >>    *You bring up a valid concern about optimizing flush
> >>> triggers to
> >>> > >> > > >> prevent
> >>> > >> > > >>    inefficient batch emissions when multiple partitions
> >>> exist,
> >>> > >> > > >> particularly in
> >>> > >> > > >>    sinks like DynamoDB and Cassandra. IMO flushing (when to
> >>> send
> >>> > >> > > records)
> >>> > >> > > >> and
> >>> > >> > > >>    batching (how to group records) should remain separate
> >>> > >> concerns, as
> >>> > >> > > >> they
> >>> > >> > > >>    serve distinct purposes.*
> >>> > >> > > >>
> >>> > >> > > >>    *To structure this properly, I’ll use FLIP-284 as the
> >>> > >> foundation:*
> >>> > >> > > >>    *🔹 BufferFlushTrigger (From FLIP-284) - Decides When to
> >>> Flush*
> >>> > >> > > >>
> >>> > >> > > >>    *The BufferFlushTrigger should be responsible for
> >>> determining
> >>> > >> when
> >>> > >> > > the
> >>> > >> > > >>    buffer is ready to flush, based on configurable
> >>> conditions such
> >>> > >> as:*
> >>> > >> > > >>    - *Total record count (batch size threshold).*
> >>> > >> > > >>       - *Buffer size in bytes.*
> >>> > >> > > >>       - *Time-based constraints (maxTimeInBufferMS), or any
> >>> other
> >>> > >> custom
> >>> > >> > > >>       logic a sink may require.*
> >>> > >> > > >>
> >>> > >> > > >>    *🔹 BatchCreator - Decides How to Form a Batch*
> >>> > >> > > >>
> >>> > >> > > >>    *Once BufferFlushTrigger determines that a flush should
> >>> occur,
> >>> > >> > > >>    BatchCreator is responsible for reading the buffered
> >>> requests
> >>> > >> and
> >>> > >> > > >> forming a
> >>> > >> > > >>    batch based on partitioning rules.*
> >>> > >> > > >>    - *BatchCreator does not decide when to flush—it only
> >>> handles
> >>> > >> how
> >>> > >> > > >>       records are grouped when a flush is triggered.*
> >>> > >> > > >>       - *This separation allows partition-aware batching,
> >>> ensuring
> >>> > >> that
> >>> > >> > > >>       records from the same partition are grouped together
> >>> before
> >>> > >> > > >> submission.*
> >>> > >> > > >>    ------------------------------
> >>> > >> > > >>    *🔹 Example: BatchCreator with Two Partitioning
> >>> Keys**Scenario*
> >>> > >> > > >>       - *batchSize = 20*
> >>> > >> > > >>       - *The buffer contains records from two partitions.*
> >>> > >> > > >>       - *Assume either (a) we flush when at least 10
> records
> >>> exist
> >>> > >> per
> >>> > >> > > >>       partition.*
> >>> > >> > > >>       - *We apply a simple flush strategy that triggers a
> >>> flush
> >>> > >> when 20
> >>> > >> > > >>       total records are buffered.*
> >>> > >> > > >>    *How BatchCreator Forms the Batch*
> >>> > >> > > >>
> >>> > >> > > >>    *Even though BufferFlushTrigger initiates the flush when
> >>> 20
> >>> > >> records
> >>> > >> > > are
> >>> > >> > > >>    reached, BatchCreator must still decide how to structure
> >>> the
> >>> > >> batch.
> >>> > >> > > It
> >>> > >> > > >> does
> >>> > >> > > >>    so by:*
> >>> > >> > > >>    1. *Reading the buffered records.*
> >>> > >> > > >>       2. *Grouping them by partition key. (Find out the 10
> >>> batch)*
> >>> > >> > > >>       3. *Creating a valid batch from these groups before
> >>> > >> submitting
> >>> > >> > > them
> >>> > >> > > >>       downstream.*
> >>> > >> > > >>    *What I mean is that, the buffered Requests should be
> >>> there for
> >>> > >> both
> >>> > >> > > >>    flushing and batching for optimized writes. *
> >>> > >> > > >>    ------------------------------
> >>> > >> > > >>    *The key takeaway is that by keeping these two
> interfaces
> >>> > >> separate,
> >>> > >> > > >> Sink
> >>> > >> > > >>    Writers gain flexibility—they can mix and match
> different
> >>> > >> batching
> >>> > >> > > and
> >>> > >> > > >>    flushing strategies. Since there is no tight coupling
> >>> between
> >>> > >> them,
> >>> > >> > > >>    different sinks can:*
> >>> > >> > > >>       - *Implement a distinct batching strategy independent
> >>> of the
> >>> > >> > > >> flushing
> >>> > >> > > >>       mechanism.*
> >>> > >> > > >>       - *Customize flush triggers without impacting how
> >>> records are
> >>> > >> > > >> grouped
> >>> > >> > > >>       into batches.*
> >>> > >> > > >>
> >>> > >> > > >> I suppose these two serve complementary but different
> >>> purposes,
> >>> > >> *hence
> >>> > >> > > >> they
> >>> > >> > > >> can be implemented differently*.
> >>> > >> > > >> What do you think?
> >>> > >> > > >>
> >>> > >> > > >>
> >>> > >> > > >> On Wed, Feb 12, 2025 at 4:41 PM Ahmed Hamdy <
> >>> hamdy10...@gmail.com>
> >>> > >> > > wrote:
> >>> > >> > > >>
> >>> > >> > > >> > Hi Poorvank,
> >>> > >> > > >> > Thanks for driving this, +1 (non-binding) for the FLIP in
> >>> > >> general.
> >>> > >> > > >> While I
> >>> > >> > > >> > see some common grounds with FLIP-284 that I wrote, I
> >>> prefer
> >>> > >> going
> >>> > >> > > with
> >>> > >> > > >> a
> >>> > >> > > >> > new FLIP since it is driven by a current use case and is
> >>> > >> specific for
> >>> > >> > > >> > tackling it. I have a couple of clarifying questions.
> >>> > >> > > >> >
> >>> > >> > > >> > 1- I assume we will not expose batch creators to users
> but
> >>> only
> >>> > >> to
> >>> > >> > > sink
> >>> > >> > > >> > implementers, am I correct?
> >>> > >> > > >> >
> >>> > >> > > >> > 2- It is nit: but I would prefer adding the creator to
> the
> >>> > >> > > >> > `AsyncSinkWriterConfiguration` rather than overloading
> the
> >>> > >> > > constructor,
> >>> > >> > > >> > there are some already implemented ways for instantiating
> >>> the
> >>> > >> configs
> >>> > >> > > >> with
> >>> > >> > > >> > defaults so would be simpler.
> >>> > >> > > >> >
> >>> > >> > > >> > 3- I want to leverage some concept from FLIP-284 that we
> >>> have
> >>> > >> > > previously
> >>> > >> > > >> > seen in DynamoDbAsyncSink, do you think we can also
> >>> customize the
> >>> > >> > > >> trigger
> >>> > >> > > >> > so we can also add a "readyToFlush()" method or something
> >>> > >> similar and
> >>> > >> > > >> use
> >>> > >> > > >> > it in `nonBlockingFlush` methods instead of forcing the
> >>> trigger
> >>> > >> on
> >>> > >> > > only
> >>> > >> > > >> > total record count or buffer size. I see this useful in
> >>> your
> >>> > >> case as
> >>> > >> > > >> well
> >>> > >> > > >> > because if we have buffered 100 requests from 100
> different
> >>> > >> partitions
> >>> > >> > > >> then
> >>> > >> > > >> > we will keep triggering and emitting batches of 1 for 100
> >>> times
> >>> > >> > > (unless
> >>> > >> > > >> > maxTimeInBufferMS is used ofc) so you can use the custom
> >>> trigger
> >>> > >> for
> >>> > >> > > >> > minimum buffered records per partition. This might be a
> bit
> >>> > >> tricky
> >>> > >> > > >> because
> >>> > >> > > >> > in order to not affect the performance we probably will
> >>> wrap the
> >>> > >> whole
> >>> > >> > > >> > buffer in the batchCreator to maintain calculations like
> >>> size in
> >>> > >> bytes
> >>> > >> > > >> or
> >>> > >> > > >> > per partition record count in the batch creator making it
> >>> more
> >>> > >> > > >> > "bufferHandler" rather than a "batchCreator", let me know
> >>> your
> >>> > >> > > thoughts
> >>> > >> > > >> > about this.
> >>> > >> > > >> >
> >>> > >> > > >> >
> >>> > >> > > >> > Best Regards
> >>> > >> > > >> > Ahmed Hamdy
> >>> > >> > > >> >
> >>> > >> > > >> >
> >>> > >> > > >> > On Tue, 11 Feb 2025 at 10:45, Poorvank Bhatia <
> >>> > >> puravbhat...@gmail.com
> >>> > >> > > >
> >>> > >> > > >> > wrote:
> >>> > >> > > >> >
> >>> > >> > > >> > > Hey everyone,
> >>> > >> > > >> > >
> >>> > >> > > >> > > I’d like to propose adding a pluggable batching
> >>> mechanism to
> >>> > >> > > >> > > AsyncSinkWriter
> >>> > >> > > >> > > <
> >>> > >> > > >> > >
> >>> > >> > > >> >
> >>> > >> > > >>
> >>> > >> > >
> >>> > >>
> >>>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L351
> >>> > >> > > >> > > >
> >>> > >> > > >> > >  to enable custom batch formation strategies.
> >>> > >> > > >> > > Currently, batching is based on batch size and record
> >>> count,
> >>> > >> but
> >>> > >> > > this
> >>> > >> > > >> > > approach is suboptimal for sinks like Cassandra, which
> >>> require
> >>> > >> > > >> > > partition-aware batching. Specifically, batches should
> be
> >>> > >> formed so
> >>> > >> > > >> that
> >>> > >> > > >> > > all requests within a batch belong to the same
> partition,
> >>> > >> ensuring
> >>> > >> > > >> more
> >>> > >> > > >> > > efficient writes.
> >>> > >> > > >> > >
> >>> > >> > > >> > > The proposal introduces a minimal `BatchCreator`
> >>> interface,
> >>> > >> enabling
> >>> > >> > > >> > users
> >>> > >> > > >> > > to define custom batching strategies while maintaining
> >>> backward
> >>> > >> > > >> > > compatibility with a default implementation.
> >>> > >> > > >> > >
> >>> > >> > > >> > > For full details, please refer to the proposal document
> >>> > >> > > >> > > <
> >>> > >> > > >> > >
> >>> > >> > > >> >
> >>> > >> > > >>
> >>> > >> > >
> >>> > >>
> >>>
> https://docs.google.com/document/d/1XI2DV-8r-kOwbMd2ZMdV4u0Q_s5m8Stojv4HSdJA8ZU/edit?tab=t.0#heading=h.n4fv4r64xk2f
> >>> > >> > > >> > > >
> >>> > >> > > >> > > .
> >>> > >> > > >> > > Associated Jira <
> >>> > >> https://issues.apache.org/jira/browse/FLINK-37298>
> >>> > >> > > >> > >
> >>> > >> > > >> > > Thanks,
> >>> > >> > > >> > > Poorvank
> >>> > >> > > >> > >
> >>> > >> > > >> >
> >>> > >> > > >>
> >>> > >> > > >
> >>> > >> > >
> >>> > >>
> >>> > >
> >>>
> >>
>

Reply via email to