Hello Poorvank, Thanks for opening this discussion/FLIP. I am +1 for the idea, it seems like a great enhancement to the AsyncSink.
Just one question/observation from me. We currently use a "Deque<..> bufferedRequestEntries". Now we are exposing these batch creators; it might not make sense to use a queue anymore. On one hand it does not make as much semantic sense because we could have frequent queue jumpers. On the other hand it might not be performant, since Deque lookups provide O(n) performance. What do you think? This is an internal data structure so we could consider replacing it with something else, but I might just be prematurely optimising this. Thanks, Danny On Thu, Feb 13, 2025 at 5:31 PM Ahmed Hamdy <hamdy10...@gmail.com> wrote: > FYI: Created FLIP-509. > > Best Regards > Ahmed Hamdy > > > On Thu, 13 Feb 2025 at 16:30, Ahmed Hamdy <hamdy10...@gmail.com> wrote: > > > Hi Poorvank, > > thanks for the feedback, I can see your point and I kinda agree, I would > > say, if you don't need the flushing trigger interface in your use case > > let's keep it out of the FLIP for now, Also let's wait for the feedback > > about that from other members. > > I will create a FLIP and assign it a number. > > > > Best Regards > > Ahmed Hamdy > > > > > > On Thu, 13 Feb 2025 at 09:15, Poorvank Bhatia <puravbhat...@gmail.com> > > wrote: > > > >> Hello Ahmed, > >> Thank you for your insights! In fact, FLIP-284 > >> < > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-284+%3A+Making+AsyncSinkWriter+Flush+triggers+adjustable > >> >'s > >> proposal aligns so well with this approach, I have added it has the > future > >> scope. > >> To answer your questions: > >> > >> I assume we will not expose batch creators to users but only to sink > >> implementers, am I correct? > >> > >> - *Yes, that is correct. The BatchCreator interface is intended to be > >> used only by sink implementers, not end users. Since the SinkWriter > is > >> responsible for defining how records are batched and written, only > >> those > >> developing a custom sink would need access to this functionality.* > >> > >> It is nit: but I would prefer adding the creator to the > >> `AsyncSinkWriterConfiguration` rather than overloading the constructor, > >> there are some already implemented ways for instantiating the configs > with > >> defaults so would be simpler. > >> > >> > >> - *Yes, BatchCreator can be included in AsyncSinkWriterConfiguration. > >> However, my initial reasoning for keeping it separate was that > >> BatchCreator > >> is not just a configuration parameter—it's a functional component > that > >> directly influences how records are batched. This makes it more of a > >> behavioral dependency rather than a passive setting. That said, if > >> incorporating it into AsyncSinkWriterConfiguration aligns better with > >> the > >> overall design, it is a nit change and I'm open to updating the > >> approach > >> accordingly. Do let me know your thoughts, as RateLimitingStartegy is > >> also > >> a part of it.* > >> > >> > >> I want to leverage some concept from FLIP-284 that we have previously > >> seen in DynamoDbAsyncSink, do you think we can also customize the > trigger > >> so we can also add a "readyToFlush()" method or something similar and > use > >> it in `nonBlockingFlush` methods instead of forcing the trigger on only > >> total record count or buffer size. I see this useful in your case as > well > >> because if we have buffered 100 requests from 100 different partitions > >> then > >> we will keep triggering and emitting batches of 1 for 100 times (unless > >> maxTimeInBufferMS is used ofc) so you can use the custom trigger for > >> minimum buffered records per partition. This might be a bit tricky > because > >> in order to not affect the performance we probably will wrap the whole > >> buffer in the batchCreator to maintain calculations like size in bytes > or > >> per partition record count in the batch creator making it more > >> "bufferHandler" rather than a "batchCreator", let me know your thoughts > >> about this > >> > >> > >> - > >> > >> *You bring up a valid concern about optimizing flush triggers to > >> prevent > >> inefficient batch emissions when multiple partitions exist, > >> particularly in > >> sinks like DynamoDB and Cassandra. IMO flushing (when to send > records) > >> and > >> batching (how to group records) should remain separate concerns, as > >> they > >> serve distinct purposes.* > >> > >> *To structure this properly, I’ll use FLIP-284 as the foundation:* > >> *🔹 BufferFlushTrigger (From FLIP-284) - Decides When to Flush* > >> > >> *The BufferFlushTrigger should be responsible for determining when > the > >> buffer is ready to flush, based on configurable conditions such as:* > >> - *Total record count (batch size threshold).* > >> - *Buffer size in bytes.* > >> - *Time-based constraints (maxTimeInBufferMS), or any other custom > >> logic a sink may require.* > >> > >> *🔹 BatchCreator - Decides How to Form a Batch* > >> > >> *Once BufferFlushTrigger determines that a flush should occur, > >> BatchCreator is responsible for reading the buffered requests and > >> forming a > >> batch based on partitioning rules.* > >> - *BatchCreator does not decide when to flush—it only handles how > >> records are grouped when a flush is triggered.* > >> - *This separation allows partition-aware batching, ensuring that > >> records from the same partition are grouped together before > >> submission.* > >> ------------------------------ > >> *🔹 Example: BatchCreator with Two Partitioning Keys**Scenario* > >> - *batchSize = 20* > >> - *The buffer contains records from two partitions.* > >> - *Assume either (a) we flush when at least 10 records exist per > >> partition.* > >> - *We apply a simple flush strategy that triggers a flush when 20 > >> total records are buffered.* > >> *How BatchCreator Forms the Batch* > >> > >> *Even though BufferFlushTrigger initiates the flush when 20 records > are > >> reached, BatchCreator must still decide how to structure the batch. > It > >> does > >> so by:* > >> 1. *Reading the buffered records.* > >> 2. *Grouping them by partition key. (Find out the 10 batch)* > >> 3. *Creating a valid batch from these groups before submitting > them > >> downstream.* > >> *What I mean is that, the buffered Requests should be there for both > >> flushing and batching for optimized writes. * > >> ------------------------------ > >> *The key takeaway is that by keeping these two interfaces separate, > >> Sink > >> Writers gain flexibility—they can mix and match different batching > and > >> flushing strategies. Since there is no tight coupling between them, > >> different sinks can:* > >> - *Implement a distinct batching strategy independent of the > >> flushing > >> mechanism.* > >> - *Customize flush triggers without impacting how records are > >> grouped > >> into batches.* > >> > >> I suppose these two serve complementary but different purposes, *hence > >> they > >> can be implemented differently*. > >> What do you think? > >> > >> > >> On Wed, Feb 12, 2025 at 4:41 PM Ahmed Hamdy <hamdy10...@gmail.com> > wrote: > >> > >> > Hi Poorvank, > >> > Thanks for driving this, +1 (non-binding) for the FLIP in general. > >> While I > >> > see some common grounds with FLIP-284 that I wrote, I prefer going > with > >> a > >> > new FLIP since it is driven by a current use case and is specific for > >> > tackling it. I have a couple of clarifying questions. > >> > > >> > 1- I assume we will not expose batch creators to users but only to > sink > >> > implementers, am I correct? > >> > > >> > 2- It is nit: but I would prefer adding the creator to the > >> > `AsyncSinkWriterConfiguration` rather than overloading the > constructor, > >> > there are some already implemented ways for instantiating the configs > >> with > >> > defaults so would be simpler. > >> > > >> > 3- I want to leverage some concept from FLIP-284 that we have > previously > >> > seen in DynamoDbAsyncSink, do you think we can also customize the > >> trigger > >> > so we can also add a "readyToFlush()" method or something similar and > >> use > >> > it in `nonBlockingFlush` methods instead of forcing the trigger on > only > >> > total record count or buffer size. I see this useful in your case as > >> well > >> > because if we have buffered 100 requests from 100 different partitions > >> then > >> > we will keep triggering and emitting batches of 1 for 100 times > (unless > >> > maxTimeInBufferMS is used ofc) so you can use the custom trigger for > >> > minimum buffered records per partition. This might be a bit tricky > >> because > >> > in order to not affect the performance we probably will wrap the whole > >> > buffer in the batchCreator to maintain calculations like size in bytes > >> or > >> > per partition record count in the batch creator making it more > >> > "bufferHandler" rather than a "batchCreator", let me know your > thoughts > >> > about this. > >> > > >> > > >> > Best Regards > >> > Ahmed Hamdy > >> > > >> > > >> > On Tue, 11 Feb 2025 at 10:45, Poorvank Bhatia <puravbhat...@gmail.com > > > >> > wrote: > >> > > >> > > Hey everyone, > >> > > > >> > > I’d like to propose adding a pluggable batching mechanism to > >> > > AsyncSinkWriter > >> > > < > >> > > > >> > > >> > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L351 > >> > > > > >> > > to enable custom batch formation strategies. > >> > > Currently, batching is based on batch size and record count, but > this > >> > > approach is suboptimal for sinks like Cassandra, which require > >> > > partition-aware batching. Specifically, batches should be formed so > >> that > >> > > all requests within a batch belong to the same partition, ensuring > >> more > >> > > efficient writes. > >> > > > >> > > The proposal introduces a minimal `BatchCreator` interface, enabling > >> > users > >> > > to define custom batching strategies while maintaining backward > >> > > compatibility with a default implementation. > >> > > > >> > > For full details, please refer to the proposal document > >> > > < > >> > > > >> > > >> > https://docs.google.com/document/d/1XI2DV-8r-kOwbMd2ZMdV4u0Q_s5m8Stojv4HSdJA8ZU/edit?tab=t.0#heading=h.n4fv4r64xk2f > >> > > > > >> > > . > >> > > Associated Jira <https://issues.apache.org/jira/browse/FLINK-37298> > >> > > > >> > > Thanks, > >> > > Poorvank > >> > > > >> > > >> > > >