FYI: Created FLIP-509. Best Regards Ahmed Hamdy
On Thu, 13 Feb 2025 at 16:30, Ahmed Hamdy <hamdy10...@gmail.com> wrote: > Hi Poorvank, > thanks for the feedback, I can see your point and I kinda agree, I would > say, if you don't need the flushing trigger interface in your use case > let's keep it out of the FLIP for now, Also let's wait for the feedback > about that from other members. > I will create a FLIP and assign it a number. > > Best Regards > Ahmed Hamdy > > > On Thu, 13 Feb 2025 at 09:15, Poorvank Bhatia <puravbhat...@gmail.com> > wrote: > >> Hello Ahmed, >> Thank you for your insights! In fact, FLIP-284 >> < >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-284+%3A+Making+AsyncSinkWriter+Flush+triggers+adjustable >> >'s >> proposal aligns so well with this approach, I have added it has the future >> scope. >> To answer your questions: >> >> I assume we will not expose batch creators to users but only to sink >> implementers, am I correct? >> >> - *Yes, that is correct. The BatchCreator interface is intended to be >> used only by sink implementers, not end users. Since the SinkWriter is >> responsible for defining how records are batched and written, only >> those >> developing a custom sink would need access to this functionality.* >> >> It is nit: but I would prefer adding the creator to the >> `AsyncSinkWriterConfiguration` rather than overloading the constructor, >> there are some already implemented ways for instantiating the configs with >> defaults so would be simpler. >> >> >> - *Yes, BatchCreator can be included in AsyncSinkWriterConfiguration. >> However, my initial reasoning for keeping it separate was that >> BatchCreator >> is not just a configuration parameter—it's a functional component that >> directly influences how records are batched. This makes it more of a >> behavioral dependency rather than a passive setting. That said, if >> incorporating it into AsyncSinkWriterConfiguration aligns better with >> the >> overall design, it is a nit change and I'm open to updating the >> approach >> accordingly. Do let me know your thoughts, as RateLimitingStartegy is >> also >> a part of it.* >> >> >> I want to leverage some concept from FLIP-284 that we have previously >> seen in DynamoDbAsyncSink, do you think we can also customize the trigger >> so we can also add a "readyToFlush()" method or something similar and use >> it in `nonBlockingFlush` methods instead of forcing the trigger on only >> total record count or buffer size. I see this useful in your case as well >> because if we have buffered 100 requests from 100 different partitions >> then >> we will keep triggering and emitting batches of 1 for 100 times (unless >> maxTimeInBufferMS is used ofc) so you can use the custom trigger for >> minimum buffered records per partition. This might be a bit tricky because >> in order to not affect the performance we probably will wrap the whole >> buffer in the batchCreator to maintain calculations like size in bytes or >> per partition record count in the batch creator making it more >> "bufferHandler" rather than a "batchCreator", let me know your thoughts >> about this >> >> >> - >> >> *You bring up a valid concern about optimizing flush triggers to >> prevent >> inefficient batch emissions when multiple partitions exist, >> particularly in >> sinks like DynamoDB and Cassandra. IMO flushing (when to send records) >> and >> batching (how to group records) should remain separate concerns, as >> they >> serve distinct purposes.* >> >> *To structure this properly, I’ll use FLIP-284 as the foundation:* >> *🔹 BufferFlushTrigger (From FLIP-284) - Decides When to Flush* >> >> *The BufferFlushTrigger should be responsible for determining when the >> buffer is ready to flush, based on configurable conditions such as:* >> - *Total record count (batch size threshold).* >> - *Buffer size in bytes.* >> - *Time-based constraints (maxTimeInBufferMS), or any other custom >> logic a sink may require.* >> >> *🔹 BatchCreator - Decides How to Form a Batch* >> >> *Once BufferFlushTrigger determines that a flush should occur, >> BatchCreator is responsible for reading the buffered requests and >> forming a >> batch based on partitioning rules.* >> - *BatchCreator does not decide when to flush—it only handles how >> records are grouped when a flush is triggered.* >> - *This separation allows partition-aware batching, ensuring that >> records from the same partition are grouped together before >> submission.* >> ------------------------------ >> *🔹 Example: BatchCreator with Two Partitioning Keys**Scenario* >> - *batchSize = 20* >> - *The buffer contains records from two partitions.* >> - *Assume either (a) we flush when at least 10 records exist per >> partition.* >> - *We apply a simple flush strategy that triggers a flush when 20 >> total records are buffered.* >> *How BatchCreator Forms the Batch* >> >> *Even though BufferFlushTrigger initiates the flush when 20 records are >> reached, BatchCreator must still decide how to structure the batch. It >> does >> so by:* >> 1. *Reading the buffered records.* >> 2. *Grouping them by partition key. (Find out the 10 batch)* >> 3. *Creating a valid batch from these groups before submitting them >> downstream.* >> *What I mean is that, the buffered Requests should be there for both >> flushing and batching for optimized writes. * >> ------------------------------ >> *The key takeaway is that by keeping these two interfaces separate, >> Sink >> Writers gain flexibility—they can mix and match different batching and >> flushing strategies. Since there is no tight coupling between them, >> different sinks can:* >> - *Implement a distinct batching strategy independent of the >> flushing >> mechanism.* >> - *Customize flush triggers without impacting how records are >> grouped >> into batches.* >> >> I suppose these two serve complementary but different purposes, *hence >> they >> can be implemented differently*. >> What do you think? >> >> >> On Wed, Feb 12, 2025 at 4:41 PM Ahmed Hamdy <hamdy10...@gmail.com> wrote: >> >> > Hi Poorvank, >> > Thanks for driving this, +1 (non-binding) for the FLIP in general. >> While I >> > see some common grounds with FLIP-284 that I wrote, I prefer going with >> a >> > new FLIP since it is driven by a current use case and is specific for >> > tackling it. I have a couple of clarifying questions. >> > >> > 1- I assume we will not expose batch creators to users but only to sink >> > implementers, am I correct? >> > >> > 2- It is nit: but I would prefer adding the creator to the >> > `AsyncSinkWriterConfiguration` rather than overloading the constructor, >> > there are some already implemented ways for instantiating the configs >> with >> > defaults so would be simpler. >> > >> > 3- I want to leverage some concept from FLIP-284 that we have previously >> > seen in DynamoDbAsyncSink, do you think we can also customize the >> trigger >> > so we can also add a "readyToFlush()" method or something similar and >> use >> > it in `nonBlockingFlush` methods instead of forcing the trigger on only >> > total record count or buffer size. I see this useful in your case as >> well >> > because if we have buffered 100 requests from 100 different partitions >> then >> > we will keep triggering and emitting batches of 1 for 100 times (unless >> > maxTimeInBufferMS is used ofc) so you can use the custom trigger for >> > minimum buffered records per partition. This might be a bit tricky >> because >> > in order to not affect the performance we probably will wrap the whole >> > buffer in the batchCreator to maintain calculations like size in bytes >> or >> > per partition record count in the batch creator making it more >> > "bufferHandler" rather than a "batchCreator", let me know your thoughts >> > about this. >> > >> > >> > Best Regards >> > Ahmed Hamdy >> > >> > >> > On Tue, 11 Feb 2025 at 10:45, Poorvank Bhatia <puravbhat...@gmail.com> >> > wrote: >> > >> > > Hey everyone, >> > > >> > > I’d like to propose adding a pluggable batching mechanism to >> > > AsyncSinkWriter >> > > < >> > > >> > >> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L351 >> > > > >> > > to enable custom batch formation strategies. >> > > Currently, batching is based on batch size and record count, but this >> > > approach is suboptimal for sinks like Cassandra, which require >> > > partition-aware batching. Specifically, batches should be formed so >> that >> > > all requests within a batch belong to the same partition, ensuring >> more >> > > efficient writes. >> > > >> > > The proposal introduces a minimal `BatchCreator` interface, enabling >> > users >> > > to define custom batching strategies while maintaining backward >> > > compatibility with a default implementation. >> > > >> > > For full details, please refer to the proposal document >> > > < >> > > >> > >> https://docs.google.com/document/d/1XI2DV-8r-kOwbMd2ZMdV4u0Q_s5m8Stojv4HSdJA8ZU/edit?tab=t.0#heading=h.n4fv4r64xk2f >> > > > >> > > . >> > > Associated Jira <https://issues.apache.org/jira/browse/FLINK-37298> >> > > >> > > Thanks, >> > > Poorvank >> > > >> > >> >