Hey everyone,

Before raising a vote thread, can anyone please help with copying the
contents of the modified doc
<https://docs.google.com/document/d/1XI2DV-8r-kOwbMd2ZMdV4u0Q_s5m8Stojv4HSdJA8ZU/edit?tab=t.0#heading=h.1zdhrsm5oaad>
to this FLIP
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-509+Add+pluggable+Batching+for+Async+Sink>
page (mentioned on the proposal page
<https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65145551#FlinkImprovementProposals-Process>
).

Thank you.

On Tue, Mar 4, 2025 at 9:00 AM Poorvank Bhatia <puravbhat...@gmail.com>
wrote:

> Hey devs,
>
> Thank you all for participating in the discussion! I've updated this doc
> based on feedback. Please feel free to share any further thoughts.
>
> If there are no additional suggestions, I plan to initiate a vote within
> the next few days.
>
> On Wed, Feb 26, 2025 at 9:00 PM Poorvank Bhatia <puravbhat...@gmail.com>
> wrote:
>
>> Thanks Arvid. As discussed offline, I have made the changes to doc.
>>
>>
>>    1. Gotten rid of the protected methods in the constructor.
>>    2. Added a new constructor that takes in BatchCreator/BufferWrapper
>>    from the current one.
>>    3. The new constructor defaults to the Default BatchCreator and
>>    BufferWrapper (mimicking the current behaviour)
>>
>>
>> So:
>>
>>    - Since the old constructor remains unchanged, Flink 1.x will
>>    continue using it without any issues.
>>    - Flink 1.x was compiled with the old constructor signature, so it
>>    will only look for and call that constructor.
>>    - Even though the class contains references
>>    to BatchCreator and BufferWrapper, they won’t be loaded if the constructor
>>    that references them is never used.
>>    - If Flink 1.x explicitly tries to call the new constructor (which
>>    includes BatchCreator and BufferWrapper), it will fail since those classes
>>    do not exist in Flink 1.x. But this will only happen when a connector
>>    essentially is using the BatchCreator or the new interfaces right,  so 
>> what
>>    I assume is it should be the connectors responsibility to provide
>>    differentiation if it uses BatchCreator . As for Cassandra, we can provide
>>    2 implementations one without Batch and other with Batch, so the one with
>>    Batch can check at runtime if the newer classes can be used.
>>
>> So the connectors that want to use customized batching would be dependent
>> on 2.1. If it is used with (a previous flink runtime) a check can be added
>> in the connector to make sure that BatchCreator is available. (simple check
>> via reflection).
>> Please have a look and let me know your thoughts.
>>
>> Hey Danny.
>> In case you missed the last response, I have added a BufferWrapper and
>> removed the hardcoded Dequeue check. Please have a look and let me know
>> your thoughts.
>>
>>
>> On Tue, Feb 25, 2025 at 2:10 PM Arvid Heise <ar...@apache.org> wrote:
>>
>>> Hi Poorvank,
>>>
>>> I don't have strong feelings about the actual choice of technology.
>>> SPI is used rather heavily in Flink Table/SQL and is also used in
>>> similar projects more and more. But SPI also uses reflection under the
>>> hood, so it doesn't make a huge difference except from a usability
>>> perspective. SPI has some tool support. IDEA, for example, allows you
>>> to jump back and forth between the service definition and the
>>> implementations.
>>>
>>> In any case, protected methods that are called in the ctor cannot be
>>> overridden because at the point where the base object is initialized
>>> the VMT doesn't contain references to methods of the subclasses. That
>>> is a deliberate choice of the VM engineers to avoid cases where an
>>> overridden method accesses fields that have not been initialized yet
>>> because the subclass ctor wasn't invoked yet.
>>>
>>> Also I'm not sure if class loading would succeed if you have a method
>>> that returns an unknown class (let's say you want to run your
>>> connector on Flink 1.X, which was built against Flink 2.1). I'm
>>> assuming it will fail.
>>>
>>> So we need other choices. I'm curious if annotations could be used (in
>>> conjunction with reflection again). I'm guessing classloading will
>>> also fail if an unknown annotation is attached or loading of the
>>> annotation itself will fail if the referenced class cannot be loaded.
>>> If it works somehow, we could say:
>>> @BatchedWith(MyBatchCreator.class)
>>> class MyConnector extends Async...
>>>
>>> Unfortunately, I don't have time to try out these ideas. Maybe you have.
>>>
>>> And in any case, if it gets too complicated, we can always forgo
>>> backwards compatibility and require specific builds for Flink 2.1+.
>>> This may also be the easier option in case the connector doesn't
>>> perform too well without the BatchCreator anyways.
>>>
>>> Best,
>>>
>>> Arvid
>>>
>>> On Thu, Feb 20, 2025 at 6:56 PM Poorvank Bhatia <puravbhat...@gmail.com>
>>> wrote:
>>> >
>>> > Hey Arvid,
>>> >
>>> > Thank you for your feedback and for taking the time to review this
>>> > proposal.
>>> > To answer your concerns:
>>> >
>>> > *Naming Suggestion: BatchCreationResult → Batch *
>>> > This is a valid point, and I agree that BatchCreationResult is
>>> essentially
>>> > representing a Batch. The current name was chosen to reflect that it
>>> > encapsulates metadata such as batch size and count, in addition to the
>>> > entries.  I’ll update the proposal to reflect this suggestion and
>>> rename
>>> > the class accordingly.
>>> >
>>> > *Compatibility Concerns*: You are on point regarding backward
>>> compatibility
>>> > and avoiding API breakages in connectors.
>>> > Based on our discussion offline, I have removed the BatchCreator
>>> interface
>>> > from the constructor and it is not exposed in the public API of
>>> > AsyncSinkWriter.
>>> > Instead of requiring direct injection via the constructor, the
>>> > implementation now uses protected factory methods
>>> (createBatchCreator()),
>>> > allowing subclasses to override the behavior without modifying the base
>>> > API.
>>> > If no subclass overrides the methods, Flink will continue using the
>>> default
>>> > SimpleBatchCreator and DequeBufferWrapper, maintaining full backward
>>> > compatibility.
>>> > You suggested using Java Reflection to dynamically instantiate
>>> > BatchCreator. While reflection offers flexibility, I aimed to avoid it
>>> > primarily due to performance overhead & guidelines
>>> > <
>>> https://flink.apache.org/how-to-contribute/code-style-and-quality-java/#java-reflection
>>> >.
>>> > That said, since reflection would only be used during initialization,
>>> the
>>> > impact should be minimal. An alternative approach you suggested for
>>> Service
>>> > Provider Interface (SPI), which allows for dynamic discovery of
>>> > implementations. However, it requires sink implementers to define a
>>> > META-INF/services file, which I was trying to avoid for simplicity.
>>> > If you think SPI is a better approach as compared to simple factor
>>> methods
>>> > i can add a simple loader:
>>> >
>>> > *private BatchCreator<RequestEntryT> loadBatchCreator() {  *
>>> > *       ServiceLoader<BatchCreator> loader =
>>> > ServiceLoader.load(BatchCreator.class);  *
>>> > *      for (BatchCreator<RequestEntryT> creator : loader) { *
>>> > *          return creator; *
>>> > *      }   *
>>> > *      return new SimpleBatchCreator<>(maxBatchSizeInBytes);  *
>>> > *} *
>>> >
>>> > Let me know what makes the most sense to you, and I can either replace
>>> the
>>> > factory methods with SPI loaders or keep them as they are.
>>> >
>>> > On Thu, Feb 20, 2025 at 10:26 PM Poorvank Bhatia <
>>> puravbhat...@gmail.com>
>>> > wrote:
>>> >
>>> > > Hey Danny,
>>> > >
>>> > > Thank you for the review. Your observation is absolutely valid, and
>>> moving
>>> > > away from *Deque<RequestEntryWrapper<RequestEntryT>>
>>> > > bufferedRequestEntries* in favor of a more flexible abstraction makes
>>> > > sense.
>>> > > *Given the exposure of batch creators, Deque may not be the most
>>> > > semantically appropriate or performant choice. To address this, a
>>> more
>>> > > adaptable buffering abstraction that better aligns with different
>>> batching
>>> > > strategies can be used.*
>>> > >
>>> > > *So I Updated the proposal and*
>>> > > *Introduced BufferWrapper Interface:*
>>> > >
>>> > >    - *Defines an abstraction for managing buffered request entries. *
>>> > >    - *Supports FIFO, priority-based inserts, and custom queuing
>>> > >    mechanisms. *
>>> > >    - *Provides essential operations like add, poll, peek, and
>>> > >    getBufferedState. *
>>> > >
>>> > > *Implemented Default DequeBufferWrapper *
>>> > >
>>> > >    - *Uses ArrayDeque for efficient FIFO behavior while allowing
>>> > >    prioritized inserts. *
>>> > >    - *Maintains backward compatibility with minimal overhead. *
>>> > >
>>> > > *Modified AsyncSinkWriter to Use BufferWrapper Instead of Deque *
>>> > >
>>> > >    - *bufferedRequestEntries is now of type
>>> BufferWrapper<RequestEntryT>,
>>> > >    making the choice of buffer implementation flexible. *
>>> > >    - *A createBuffer() method initializes DequeBufferWrapper by
>>> default. *
>>> > >
>>> > > *Updated BatchCreator and SimpleBatchCreator *
>>> > >
>>> > >    - *Modified method signatures to work with
>>> > >    BufferWrapper<RequestEntryT> instead of
>>> > >    Deque<RequestEntryWrapper<RequestEntryT>>. *
>>> > >    - *Ensures better encapsulation and allows future optimizations in
>>> > >    buffering strategies.*
>>> > >
>>> > > I have added this interface and updated the doc. Please have a look
>>> and
>>> > > let me know if this makes sense.
>>> > >
>>> > >
>>> > > On Thu, Feb 20, 2025 at 5:34 PM Arvid Heise
>>> <ahe...@confluent.io.invalid>
>>> > > wrote:
>>> > >
>>> > >> Hi Poorvank,
>>> > >>
>>> > >> thanks for putting this together. It's obvious to me that this is a
>>> > >> good addition. I wish Ahmed could check if his proposals are
>>> > >> compatible with yours, so we don't end up with two different ways to
>>> > >> express the same thing. Ideally Ahmed's proposal could be
>>> retrofitted
>>> > >> to extend on yours.
>>> > >>
>>> > >> I have a small nit and a larger concern. The nit is to rename
>>> > >> BatchCreationResult into Batch because that's what it is.
>>> > >>
>>> > >> My main concern is around compatibility. If the new interfaces end
>>> up
>>> > >> in the signature of the AsyncSinkWriter, we will not be able to run
>>> a
>>> > >> connector built on top of it with an older Flink version. Either we
>>> > >> explicitly call it out or we find a way to avoid that scenario. The
>>> > >> connector community had some negative experiences around evolving
>>> > >> APIs.
>>> > >>
>>> > >> One way to avoid breaking compatibility is to provide add-ons where
>>> > >> the interfaces are not baked into the AsyncSinkWriter's API.
>>> Consider
>>> > >> your `BatchCreator` interface. If it's just used within the
>>> > >> AsyncSinkWriter (private field, private initialization), then we
>>> could
>>> > >> run the same connector on an older Flink version. In that case,
>>> > >> batching just wouldn't be enabled. Of course, all of that only
>>> matters
>>> > >> if it's even viable to run the connector without batching. If not
>>> > >> then, just make the connector version depend at least on Flink 2.1+
>>> or
>>> > >> whenever this feature lands.
>>> > >>
>>> > >> Now the question arises how to initialize the BatchCreator. Your
>>> > >> proposal injects it but maybe we can use some kind of discovery
>>> > >> instead (e.g. SPI, convention, and/or annotations). Here is a naive
>>> > >> sketch with reflection using a convention (I'd prefer SPI though).
>>> > >>
>>> > >> public AsyncSinkWriter(
>>> > >>         ElementConverter<InputT, RequestEntryT> elementConverter,
>>> > >>         WriterInitContext context,
>>> > >>         AsyncSinkWriterConfiguration configuration,
>>> > >>         Collection<BufferedRequestState<RequestEntryT>> states) {
>>> > >>     this(elementConverter, new Sink.InitContextWrapper(context),
>>> > >> configuration, states);
>>> > >>
>>> > >>     try {
>>> > >>         batchCreator =
>>> > >>                 (BatchCreator)
>>> > >>                         Class.forName(getClass().getName() +
>>> > >> "BatchCreator").newInstance();
>>> > >>     } catch (ClassNotFoundException e) {
>>> > >>         batchCreator = BatchCreator.DEFAULT;
>>> > >>     } catch (IllegalAccessException | InstantiationException e) {
>>> > >>         throw new RuntimeException(e);
>>> > >>     }
>>> > >> }
>>> > >>
>>> > >> class CassandraBatchCreator implements BatchCreator { ... }
>>> > >>
>>> > >> Best,
>>> > >>
>>> > >> Arvid
>>> > >>
>>> > >>
>>> > >> On Wed, Feb 19, 2025 at 11:07 PM Danny Cranmer <
>>> dannycran...@apache.org>
>>> > >> wrote:
>>> > >> >
>>> > >> > Hello Poorvank,
>>> > >> >
>>> > >> > Thanks for opening this discussion/FLIP. I am +1 for the idea, it
>>> seems
>>> > >> > like a great enhancement to the AsyncSink.
>>> > >> >
>>> > >> > Just one question/observation from me. We currently use a
>>> "Deque<..>
>>> > >> > bufferedRequestEntries". Now we are exposing these batch
>>> creators; it
>>> > >> might
>>> > >> > not make sense to use a queue anymore. On one hand it does not
>>> make as
>>> > >> much
>>> > >> > semantic sense because we could have frequent queue jumpers. On
>>> the
>>> > >> other
>>> > >> > hand it might not be performant, since Deque lookups provide O(n)
>>> > >> > performance. What do you think? This is an internal data
>>> structure so we
>>> > >> > could consider replacing it with something else, but I might just
>>> be
>>> > >> > prematurely optimising this.
>>> > >> >
>>> > >> > Thanks,
>>> > >> > Danny
>>> > >> >
>>> > >> > On Thu, Feb 13, 2025 at 5:31 PM Ahmed Hamdy <hamdy10...@gmail.com
>>> >
>>> > >> wrote:
>>> > >> >
>>> > >> > > FYI: Created FLIP-509.
>>> > >> > >
>>> > >> > > Best Regards
>>> > >> > > Ahmed Hamdy
>>> > >> > >
>>> > >> > >
>>> > >> > > On Thu, 13 Feb 2025 at 16:30, Ahmed Hamdy <hamdy10...@gmail.com
>>> >
>>> > >> wrote:
>>> > >> > >
>>> > >> > > > Hi Poorvank,
>>> > >> > > > thanks for the feedback, I can see your point and I kinda
>>> agree, I
>>> > >> would
>>> > >> > > > say, if you don't need the flushing trigger interface in your
>>> use
>>> > >> case
>>> > >> > > > let's keep it out of the FLIP for now, Also let's wait for the
>>> > >> feedback
>>> > >> > > > about that from other members.
>>> > >> > > > I will create a FLIP and assign it a number.
>>> > >> > > >
>>> > >> > > > Best Regards
>>> > >> > > > Ahmed Hamdy
>>> > >> > > >
>>> > >> > > >
>>> > >> > > > On Thu, 13 Feb 2025 at 09:15, Poorvank Bhatia <
>>> > >> puravbhat...@gmail.com>
>>> > >> > > > wrote:
>>> > >> > > >
>>> > >> > > >> Hello Ahmed,
>>> > >> > > >> Thank you for your insights! In fact, FLIP-284
>>> > >> > > >> <
>>> > >> > > >>
>>> > >> > >
>>> > >>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-284+%3A+Making+AsyncSinkWriter+Flush+triggers+adjustable
>>> > >> > > >> >'s
>>> > >> > > >> proposal aligns so well with this approach, I have added it
>>> has the
>>> > >> > > future
>>> > >> > > >> scope.
>>> > >> > > >> To answer your questions:
>>> > >> > > >>
>>> > >> > > >> I assume we will not expose batch creators to users but only
>>> to
>>> > >> sink
>>> > >> > > >> implementers, am I correct?
>>> > >> > > >>
>>> > >> > > >>    - *Yes, that is correct. The BatchCreator interface is
>>> intended
>>> > >> to be
>>> > >> > > >>    used only by sink implementers, not end users. Since the
>>> > >> SinkWriter
>>> > >> > > is
>>> > >> > > >>    responsible for defining how records are batched and
>>> written,
>>> > >> only
>>> > >> > > >> those
>>> > >> > > >>    developing a custom sink would need access to this
>>> > >> functionality.*
>>> > >> > > >>
>>> > >> > > >> It is nit: but I would prefer adding the creator to the
>>> > >> > > >> `AsyncSinkWriterConfiguration` rather than overloading the
>>> > >> constructor,
>>> > >> > > >> there are some already implemented ways for instantiating the
>>> > >> configs
>>> > >> > > with
>>> > >> > > >> defaults so would be simpler.
>>> > >> > > >>
>>> > >> > > >>
>>> > >> > > >>    - *Yes, BatchCreator can be included in
>>> > >> AsyncSinkWriterConfiguration.
>>> > >> > > >>    However, my initial reasoning for keeping it separate was
>>> that
>>> > >> > > >> BatchCreator
>>> > >> > > >>    is not just a configuration parameter—it's a functional
>>> > >> component
>>> > >> > > that
>>> > >> > > >>    directly influences how records are batched. This makes
>>> it more
>>> > >> of a
>>> > >> > > >>    behavioral dependency rather than a passive setting. That
>>> said,
>>> > >> if
>>> > >> > > >>    incorporating it into AsyncSinkWriterConfiguration aligns
>>> > >> better with
>>> > >> > > >> the
>>> > >> > > >>    overall design, it is a nit change and I'm open to
>>> updating the
>>> > >> > > >> approach
>>> > >> > > >>    accordingly. Do let me know your thoughts, as
>>> > >> RateLimitingStartegy is
>>> > >> > > >> also
>>> > >> > > >>    a part of it.*
>>> > >> > > >>
>>> > >> > > >>
>>> > >> > > >>  I want to leverage some concept from FLIP-284 that we have
>>> > >> previously
>>> > >> > > >> seen in DynamoDbAsyncSink, do you think we can also
>>> customize the
>>> > >> > > trigger
>>> > >> > > >> so we can also add a "readyToFlush()" method or something
>>> similar
>>> > >> and
>>> > >> > > use
>>> > >> > > >> it in `nonBlockingFlush` methods instead of forcing the
>>> trigger on
>>> > >> only
>>> > >> > > >> total record count or buffer size. I see this useful in your
>>> case
>>> > >> as
>>> > >> > > well
>>> > >> > > >> because if we have buffered 100 requests from 100 different
>>> > >> partitions
>>> > >> > > >> then
>>> > >> > > >> we will keep triggering and emitting batches of 1 for 100
>>> times
>>> > >> (unless
>>> > >> > > >> maxTimeInBufferMS is used ofc) so you can use the custom
>>> trigger
>>> > >> for
>>> > >> > > >> minimum buffered records per partition. This might be a bit
>>> tricky
>>> > >> > > because
>>> > >> > > >> in order to not affect the performance we probably will wrap
>>> the
>>> > >> whole
>>> > >> > > >> buffer in the batchCreator to maintain calculations like
>>> size in
>>> > >> bytes
>>> > >> > > or
>>> > >> > > >> per partition record count in the batch creator making it
>>> more
>>> > >> > > >> "bufferHandler" rather than a "batchCreator", let me know
>>> your
>>> > >> thoughts
>>> > >> > > >> about this
>>> > >> > > >>
>>> > >> > > >>
>>> > >> > > >>    -
>>> > >> > > >>
>>> > >> > > >>    *You bring up a valid concern about optimizing flush
>>> triggers to
>>> > >> > > >> prevent
>>> > >> > > >>    inefficient batch emissions when multiple partitions
>>> exist,
>>> > >> > > >> particularly in
>>> > >> > > >>    sinks like DynamoDB and Cassandra. IMO flushing (when to
>>> send
>>> > >> > > records)
>>> > >> > > >> and
>>> > >> > > >>    batching (how to group records) should remain separate
>>> > >> concerns, as
>>> > >> > > >> they
>>> > >> > > >>    serve distinct purposes.*
>>> > >> > > >>
>>> > >> > > >>    *To structure this properly, I’ll use FLIP-284 as the
>>> > >> foundation:*
>>> > >> > > >>    *🔹 BufferFlushTrigger (From FLIP-284) - Decides When to
>>> Flush*
>>> > >> > > >>
>>> > >> > > >>    *The BufferFlushTrigger should be responsible for
>>> determining
>>> > >> when
>>> > >> > > the
>>> > >> > > >>    buffer is ready to flush, based on configurable
>>> conditions such
>>> > >> as:*
>>> > >> > > >>    - *Total record count (batch size threshold).*
>>> > >> > > >>       - *Buffer size in bytes.*
>>> > >> > > >>       - *Time-based constraints (maxTimeInBufferMS), or any
>>> other
>>> > >> custom
>>> > >> > > >>       logic a sink may require.*
>>> > >> > > >>
>>> > >> > > >>    *🔹 BatchCreator - Decides How to Form a Batch*
>>> > >> > > >>
>>> > >> > > >>    *Once BufferFlushTrigger determines that a flush should
>>> occur,
>>> > >> > > >>    BatchCreator is responsible for reading the buffered
>>> requests
>>> > >> and
>>> > >> > > >> forming a
>>> > >> > > >>    batch based on partitioning rules.*
>>> > >> > > >>    - *BatchCreator does not decide when to flush—it only
>>> handles
>>> > >> how
>>> > >> > > >>       records are grouped when a flush is triggered.*
>>> > >> > > >>       - *This separation allows partition-aware batching,
>>> ensuring
>>> > >> that
>>> > >> > > >>       records from the same partition are grouped together
>>> before
>>> > >> > > >> submission.*
>>> > >> > > >>    ------------------------------
>>> > >> > > >>    *🔹 Example: BatchCreator with Two Partitioning
>>> Keys**Scenario*
>>> > >> > > >>       - *batchSize = 20*
>>> > >> > > >>       - *The buffer contains records from two partitions.*
>>> > >> > > >>       - *Assume either (a) we flush when at least 10 records
>>> exist
>>> > >> per
>>> > >> > > >>       partition.*
>>> > >> > > >>       - *We apply a simple flush strategy that triggers a
>>> flush
>>> > >> when 20
>>> > >> > > >>       total records are buffered.*
>>> > >> > > >>    *How BatchCreator Forms the Batch*
>>> > >> > > >>
>>> > >> > > >>    *Even though BufferFlushTrigger initiates the flush when
>>> 20
>>> > >> records
>>> > >> > > are
>>> > >> > > >>    reached, BatchCreator must still decide how to structure
>>> the
>>> > >> batch.
>>> > >> > > It
>>> > >> > > >> does
>>> > >> > > >>    so by:*
>>> > >> > > >>    1. *Reading the buffered records.*
>>> > >> > > >>       2. *Grouping them by partition key. (Find out the 10
>>> batch)*
>>> > >> > > >>       3. *Creating a valid batch from these groups before
>>> > >> submitting
>>> > >> > > them
>>> > >> > > >>       downstream.*
>>> > >> > > >>    *What I mean is that, the buffered Requests should be
>>> there for
>>> > >> both
>>> > >> > > >>    flushing and batching for optimized writes. *
>>> > >> > > >>    ------------------------------
>>> > >> > > >>    *The key takeaway is that by keeping these two interfaces
>>> > >> separate,
>>> > >> > > >> Sink
>>> > >> > > >>    Writers gain flexibility—they can mix and match different
>>> > >> batching
>>> > >> > > and
>>> > >> > > >>    flushing strategies. Since there is no tight coupling
>>> between
>>> > >> them,
>>> > >> > > >>    different sinks can:*
>>> > >> > > >>       - *Implement a distinct batching strategy independent
>>> of the
>>> > >> > > >> flushing
>>> > >> > > >>       mechanism.*
>>> > >> > > >>       - *Customize flush triggers without impacting how
>>> records are
>>> > >> > > >> grouped
>>> > >> > > >>       into batches.*
>>> > >> > > >>
>>> > >> > > >> I suppose these two serve complementary but different
>>> purposes,
>>> > >> *hence
>>> > >> > > >> they
>>> > >> > > >> can be implemented differently*.
>>> > >> > > >> What do you think?
>>> > >> > > >>
>>> > >> > > >>
>>> > >> > > >> On Wed, Feb 12, 2025 at 4:41 PM Ahmed Hamdy <
>>> hamdy10...@gmail.com>
>>> > >> > > wrote:
>>> > >> > > >>
>>> > >> > > >> > Hi Poorvank,
>>> > >> > > >> > Thanks for driving this, +1 (non-binding) for the FLIP in
>>> > >> general.
>>> > >> > > >> While I
>>> > >> > > >> > see some common grounds with FLIP-284 that I wrote, I
>>> prefer
>>> > >> going
>>> > >> > > with
>>> > >> > > >> a
>>> > >> > > >> > new FLIP since it is driven by a current use case and is
>>> > >> specific for
>>> > >> > > >> > tackling it. I have a couple of clarifying questions.
>>> > >> > > >> >
>>> > >> > > >> > 1- I assume we will not expose batch creators to users but
>>> only
>>> > >> to
>>> > >> > > sink
>>> > >> > > >> > implementers, am I correct?
>>> > >> > > >> >
>>> > >> > > >> > 2- It is nit: but I would prefer adding the creator to the
>>> > >> > > >> > `AsyncSinkWriterConfiguration` rather than overloading the
>>> > >> > > constructor,
>>> > >> > > >> > there are some already implemented ways for instantiating
>>> the
>>> > >> configs
>>> > >> > > >> with
>>> > >> > > >> > defaults so would be simpler.
>>> > >> > > >> >
>>> > >> > > >> > 3- I want to leverage some concept from FLIP-284 that we
>>> have
>>> > >> > > previously
>>> > >> > > >> > seen in DynamoDbAsyncSink, do you think we can also
>>> customize the
>>> > >> > > >> trigger
>>> > >> > > >> > so we can also add a "readyToFlush()" method or something
>>> > >> similar and
>>> > >> > > >> use
>>> > >> > > >> > it in `nonBlockingFlush` methods instead of forcing the
>>> trigger
>>> > >> on
>>> > >> > > only
>>> > >> > > >> > total record count or buffer size. I see this useful in
>>> your
>>> > >> case as
>>> > >> > > >> well
>>> > >> > > >> > because if we have buffered 100 requests from 100 different
>>> > >> partitions
>>> > >> > > >> then
>>> > >> > > >> > we will keep triggering and emitting batches of 1 for 100
>>> times
>>> > >> > > (unless
>>> > >> > > >> > maxTimeInBufferMS is used ofc) so you can use the custom
>>> trigger
>>> > >> for
>>> > >> > > >> > minimum buffered records per partition. This might be a bit
>>> > >> tricky
>>> > >> > > >> because
>>> > >> > > >> > in order to not affect the performance we probably will
>>> wrap the
>>> > >> whole
>>> > >> > > >> > buffer in the batchCreator to maintain calculations like
>>> size in
>>> > >> bytes
>>> > >> > > >> or
>>> > >> > > >> > per partition record count in the batch creator making it
>>> more
>>> > >> > > >> > "bufferHandler" rather than a "batchCreator", let me know
>>> your
>>> > >> > > thoughts
>>> > >> > > >> > about this.
>>> > >> > > >> >
>>> > >> > > >> >
>>> > >> > > >> > Best Regards
>>> > >> > > >> > Ahmed Hamdy
>>> > >> > > >> >
>>> > >> > > >> >
>>> > >> > > >> > On Tue, 11 Feb 2025 at 10:45, Poorvank Bhatia <
>>> > >> puravbhat...@gmail.com
>>> > >> > > >
>>> > >> > > >> > wrote:
>>> > >> > > >> >
>>> > >> > > >> > > Hey everyone,
>>> > >> > > >> > >
>>> > >> > > >> > > I’d like to propose adding a pluggable batching
>>> mechanism to
>>> > >> > > >> > > AsyncSinkWriter
>>> > >> > > >> > > <
>>> > >> > > >> > >
>>> > >> > > >> >
>>> > >> > > >>
>>> > >> > >
>>> > >>
>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L351
>>> > >> > > >> > > >
>>> > >> > > >> > >  to enable custom batch formation strategies.
>>> > >> > > >> > > Currently, batching is based on batch size and record
>>> count,
>>> > >> but
>>> > >> > > this
>>> > >> > > >> > > approach is suboptimal for sinks like Cassandra, which
>>> require
>>> > >> > > >> > > partition-aware batching. Specifically, batches should be
>>> > >> formed so
>>> > >> > > >> that
>>> > >> > > >> > > all requests within a batch belong to the same partition,
>>> > >> ensuring
>>> > >> > > >> more
>>> > >> > > >> > > efficient writes.
>>> > >> > > >> > >
>>> > >> > > >> > > The proposal introduces a minimal `BatchCreator`
>>> interface,
>>> > >> enabling
>>> > >> > > >> > users
>>> > >> > > >> > > to define custom batching strategies while maintaining
>>> backward
>>> > >> > > >> > > compatibility with a default implementation.
>>> > >> > > >> > >
>>> > >> > > >> > > For full details, please refer to the proposal document
>>> > >> > > >> > > <
>>> > >> > > >> > >
>>> > >> > > >> >
>>> > >> > > >>
>>> > >> > >
>>> > >>
>>> https://docs.google.com/document/d/1XI2DV-8r-kOwbMd2ZMdV4u0Q_s5m8Stojv4HSdJA8ZU/edit?tab=t.0#heading=h.n4fv4r64xk2f
>>> > >> > > >> > > >
>>> > >> > > >> > > .
>>> > >> > > >> > > Associated Jira <
>>> > >> https://issues.apache.org/jira/browse/FLINK-37298>
>>> > >> > > >> > >
>>> > >> > > >> > > Thanks,
>>> > >> > > >> > > Poorvank
>>> > >> > > >> > >
>>> > >> > > >> >
>>> > >> > > >>
>>> > >> > > >
>>> > >> > >
>>> > >>
>>> > >
>>>
>>

Reply via email to