Hi Poorvank, thanks for the FLIP :) Added some comments on the Google doc. Some of them might be irrelevant since I am not that familiar with the codebase at the moment.
Otherwise LGTM On Wed, 5 Mar 2025 at 06:14, Poorvank Bhatia <puravbhat...@gmail.com> wrote: > Hey everyone, > > Before raising a vote thread, can anyone please help with copying the > contents of the modified doc > < > https://docs.google.com/document/d/1XI2DV-8r-kOwbMd2ZMdV4u0Q_s5m8Stojv4HSdJA8ZU/edit?tab=t.0#heading=h.1zdhrsm5oaad > > > to this FLIP > < > https://cwiki.apache.org/confluence/display/FLINK/FLIP-509+Add+pluggable+Batching+for+Async+Sink > > > page (mentioned on the proposal page > < > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65145551#FlinkImprovementProposals-Process > > > ). > > Thank you. > > On Tue, Mar 4, 2025 at 9:00 AM Poorvank Bhatia <puravbhat...@gmail.com> > wrote: > > > Hey devs, > > > > Thank you all for participating in the discussion! I've updated this doc > > based on feedback. Please feel free to share any further thoughts. > > > > If there are no additional suggestions, I plan to initiate a vote within > > the next few days. > > > > On Wed, Feb 26, 2025 at 9:00 PM Poorvank Bhatia <puravbhat...@gmail.com> > > wrote: > > > >> Thanks Arvid. As discussed offline, I have made the changes to doc. > >> > >> > >> 1. Gotten rid of the protected methods in the constructor. > >> 2. Added a new constructor that takes in BatchCreator/BufferWrapper > >> from the current one. > >> 3. The new constructor defaults to the Default BatchCreator and > >> BufferWrapper (mimicking the current behaviour) > >> > >> > >> So: > >> > >> - Since the old constructor remains unchanged, Flink 1.x will > >> continue using it without any issues. > >> - Flink 1.x was compiled with the old constructor signature, so it > >> will only look for and call that constructor. > >> - Even though the class contains references > >> to BatchCreator and BufferWrapper, they won’t be loaded if the > constructor > >> that references them is never used. > >> - If Flink 1.x explicitly tries to call the new constructor (which > >> includes BatchCreator and BufferWrapper), it will fail since those > classes > >> do not exist in Flink 1.x. But this will only happen when a connector > >> essentially is using the BatchCreator or the new interfaces right, > so what > >> I assume is it should be the connectors responsibility to provide > >> differentiation if it uses BatchCreator . As for Cassandra, we can > provide > >> 2 implementations one without Batch and other with Batch, so the one > with > >> Batch can check at runtime if the newer classes can be used. > >> > >> So the connectors that want to use customized batching would be > dependent > >> on 2.1. If it is used with (a previous flink runtime) a check can be > added > >> in the connector to make sure that BatchCreator is available. (simple > check > >> via reflection). > >> Please have a look and let me know your thoughts. > >> > >> Hey Danny. > >> In case you missed the last response, I have added a BufferWrapper and > >> removed the hardcoded Dequeue check. Please have a look and let me know > >> your thoughts. > >> > >> > >> On Tue, Feb 25, 2025 at 2:10 PM Arvid Heise <ar...@apache.org> wrote: > >> > >>> Hi Poorvank, > >>> > >>> I don't have strong feelings about the actual choice of technology. > >>> SPI is used rather heavily in Flink Table/SQL and is also used in > >>> similar projects more and more. But SPI also uses reflection under the > >>> hood, so it doesn't make a huge difference except from a usability > >>> perspective. SPI has some tool support. IDEA, for example, allows you > >>> to jump back and forth between the service definition and the > >>> implementations. > >>> > >>> In any case, protected methods that are called in the ctor cannot be > >>> overridden because at the point where the base object is initialized > >>> the VMT doesn't contain references to methods of the subclasses. That > >>> is a deliberate choice of the VM engineers to avoid cases where an > >>> overridden method accesses fields that have not been initialized yet > >>> because the subclass ctor wasn't invoked yet. > >>> > >>> Also I'm not sure if class loading would succeed if you have a method > >>> that returns an unknown class (let's say you want to run your > >>> connector on Flink 1.X, which was built against Flink 2.1). I'm > >>> assuming it will fail. > >>> > >>> So we need other choices. I'm curious if annotations could be used (in > >>> conjunction with reflection again). I'm guessing classloading will > >>> also fail if an unknown annotation is attached or loading of the > >>> annotation itself will fail if the referenced class cannot be loaded. > >>> If it works somehow, we could say: > >>> @BatchedWith(MyBatchCreator.class) > >>> class MyConnector extends Async... > >>> > >>> Unfortunately, I don't have time to try out these ideas. Maybe you > have. > >>> > >>> And in any case, if it gets too complicated, we can always forgo > >>> backwards compatibility and require specific builds for Flink 2.1+. > >>> This may also be the easier option in case the connector doesn't > >>> perform too well without the BatchCreator anyways. > >>> > >>> Best, > >>> > >>> Arvid > >>> > >>> On Thu, Feb 20, 2025 at 6:56 PM Poorvank Bhatia < > puravbhat...@gmail.com> > >>> wrote: > >>> > > >>> > Hey Arvid, > >>> > > >>> > Thank you for your feedback and for taking the time to review this > >>> > proposal. > >>> > To answer your concerns: > >>> > > >>> > *Naming Suggestion: BatchCreationResult → Batch * > >>> > This is a valid point, and I agree that BatchCreationResult is > >>> essentially > >>> > representing a Batch. The current name was chosen to reflect that it > >>> > encapsulates metadata such as batch size and count, in addition to > the > >>> > entries. I’ll update the proposal to reflect this suggestion and > >>> rename > >>> > the class accordingly. > >>> > > >>> > *Compatibility Concerns*: You are on point regarding backward > >>> compatibility > >>> > and avoiding API breakages in connectors. > >>> > Based on our discussion offline, I have removed the BatchCreator > >>> interface > >>> > from the constructor and it is not exposed in the public API of > >>> > AsyncSinkWriter. > >>> > Instead of requiring direct injection via the constructor, the > >>> > implementation now uses protected factory methods > >>> (createBatchCreator()), > >>> > allowing subclasses to override the behavior without modifying the > base > >>> > API. > >>> > If no subclass overrides the methods, Flink will continue using the > >>> default > >>> > SimpleBatchCreator and DequeBufferWrapper, maintaining full backward > >>> > compatibility. > >>> > You suggested using Java Reflection to dynamically instantiate > >>> > BatchCreator. While reflection offers flexibility, I aimed to avoid > it > >>> > primarily due to performance overhead & guidelines > >>> > < > >>> > https://flink.apache.org/how-to-contribute/code-style-and-quality-java/#java-reflection > >>> >. > >>> > That said, since reflection would only be used during initialization, > >>> the > >>> > impact should be minimal. An alternative approach you suggested for > >>> Service > >>> > Provider Interface (SPI), which allows for dynamic discovery of > >>> > implementations. However, it requires sink implementers to define a > >>> > META-INF/services file, which I was trying to avoid for simplicity. > >>> > If you think SPI is a better approach as compared to simple factor > >>> methods > >>> > i can add a simple loader: > >>> > > >>> > *private BatchCreator<RequestEntryT> loadBatchCreator() { * > >>> > * ServiceLoader<BatchCreator> loader = > >>> > ServiceLoader.load(BatchCreator.class); * > >>> > * for (BatchCreator<RequestEntryT> creator : loader) { * > >>> > * return creator; * > >>> > * } * > >>> > * return new SimpleBatchCreator<>(maxBatchSizeInBytes); * > >>> > *} * > >>> > > >>> > Let me know what makes the most sense to you, and I can either > replace > >>> the > >>> > factory methods with SPI loaders or keep them as they are. > >>> > > >>> > On Thu, Feb 20, 2025 at 10:26 PM Poorvank Bhatia < > >>> puravbhat...@gmail.com> > >>> > wrote: > >>> > > >>> > > Hey Danny, > >>> > > > >>> > > Thank you for the review. Your observation is absolutely valid, and > >>> moving > >>> > > away from *Deque<RequestEntryWrapper<RequestEntryT>> > >>> > > bufferedRequestEntries* in favor of a more flexible abstraction > makes > >>> > > sense. > >>> > > *Given the exposure of batch creators, Deque may not be the most > >>> > > semantically appropriate or performant choice. To address this, a > >>> more > >>> > > adaptable buffering abstraction that better aligns with different > >>> batching > >>> > > strategies can be used.* > >>> > > > >>> > > *So I Updated the proposal and* > >>> > > *Introduced BufferWrapper Interface:* > >>> > > > >>> > > - *Defines an abstraction for managing buffered request > entries. * > >>> > > - *Supports FIFO, priority-based inserts, and custom queuing > >>> > > mechanisms. * > >>> > > - *Provides essential operations like add, poll, peek, and > >>> > > getBufferedState. * > >>> > > > >>> > > *Implemented Default DequeBufferWrapper * > >>> > > > >>> > > - *Uses ArrayDeque for efficient FIFO behavior while allowing > >>> > > prioritized inserts. * > >>> > > - *Maintains backward compatibility with minimal overhead. * > >>> > > > >>> > > *Modified AsyncSinkWriter to Use BufferWrapper Instead of Deque * > >>> > > > >>> > > - *bufferedRequestEntries is now of type > >>> BufferWrapper<RequestEntryT>, > >>> > > making the choice of buffer implementation flexible. * > >>> > > - *A createBuffer() method initializes DequeBufferWrapper by > >>> default. * > >>> > > > >>> > > *Updated BatchCreator and SimpleBatchCreator * > >>> > > > >>> > > - *Modified method signatures to work with > >>> > > BufferWrapper<RequestEntryT> instead of > >>> > > Deque<RequestEntryWrapper<RequestEntryT>>. * > >>> > > - *Ensures better encapsulation and allows future optimizations > in > >>> > > buffering strategies.* > >>> > > > >>> > > I have added this interface and updated the doc. Please have a look > >>> and > >>> > > let me know if this makes sense. > >>> > > > >>> > > > >>> > > On Thu, Feb 20, 2025 at 5:34 PM Arvid Heise > >>> <ahe...@confluent.io.invalid> > >>> > > wrote: > >>> > > > >>> > >> Hi Poorvank, > >>> > >> > >>> > >> thanks for putting this together. It's obvious to me that this is > a > >>> > >> good addition. I wish Ahmed could check if his proposals are > >>> > >> compatible with yours, so we don't end up with two different ways > to > >>> > >> express the same thing. Ideally Ahmed's proposal could be > >>> retrofitted > >>> > >> to extend on yours. > >>> > >> > >>> > >> I have a small nit and a larger concern. The nit is to rename > >>> > >> BatchCreationResult into Batch because that's what it is. > >>> > >> > >>> > >> My main concern is around compatibility. If the new interfaces end > >>> up > >>> > >> in the signature of the AsyncSinkWriter, we will not be able to > run > >>> a > >>> > >> connector built on top of it with an older Flink version. Either > we > >>> > >> explicitly call it out or we find a way to avoid that scenario. > The > >>> > >> connector community had some negative experiences around evolving > >>> > >> APIs. > >>> > >> > >>> > >> One way to avoid breaking compatibility is to provide add-ons > where > >>> > >> the interfaces are not baked into the AsyncSinkWriter's API. > >>> Consider > >>> > >> your `BatchCreator` interface. If it's just used within the > >>> > >> AsyncSinkWriter (private field, private initialization), then we > >>> could > >>> > >> run the same connector on an older Flink version. In that case, > >>> > >> batching just wouldn't be enabled. Of course, all of that only > >>> matters > >>> > >> if it's even viable to run the connector without batching. If not > >>> > >> then, just make the connector version depend at least on Flink > 2.1+ > >>> or > >>> > >> whenever this feature lands. > >>> > >> > >>> > >> Now the question arises how to initialize the BatchCreator. Your > >>> > >> proposal injects it but maybe we can use some kind of discovery > >>> > >> instead (e.g. SPI, convention, and/or annotations). Here is a > naive > >>> > >> sketch with reflection using a convention (I'd prefer SPI though). > >>> > >> > >>> > >> public AsyncSinkWriter( > >>> > >> ElementConverter<InputT, RequestEntryT> elementConverter, > >>> > >> WriterInitContext context, > >>> > >> AsyncSinkWriterConfiguration configuration, > >>> > >> Collection<BufferedRequestState<RequestEntryT>> states) { > >>> > >> this(elementConverter, new Sink.InitContextWrapper(context), > >>> > >> configuration, states); > >>> > >> > >>> > >> try { > >>> > >> batchCreator = > >>> > >> (BatchCreator) > >>> > >> Class.forName(getClass().getName() + > >>> > >> "BatchCreator").newInstance(); > >>> > >> } catch (ClassNotFoundException e) { > >>> > >> batchCreator = BatchCreator.DEFAULT; > >>> > >> } catch (IllegalAccessException | InstantiationException e) { > >>> > >> throw new RuntimeException(e); > >>> > >> } > >>> > >> } > >>> > >> > >>> > >> class CassandraBatchCreator implements BatchCreator { ... } > >>> > >> > >>> > >> Best, > >>> > >> > >>> > >> Arvid > >>> > >> > >>> > >> > >>> > >> On Wed, Feb 19, 2025 at 11:07 PM Danny Cranmer < > >>> dannycran...@apache.org> > >>> > >> wrote: > >>> > >> > > >>> > >> > Hello Poorvank, > >>> > >> > > >>> > >> > Thanks for opening this discussion/FLIP. I am +1 for the idea, > it > >>> seems > >>> > >> > like a great enhancement to the AsyncSink. > >>> > >> > > >>> > >> > Just one question/observation from me. We currently use a > >>> "Deque<..> > >>> > >> > bufferedRequestEntries". Now we are exposing these batch > >>> creators; it > >>> > >> might > >>> > >> > not make sense to use a queue anymore. On one hand it does not > >>> make as > >>> > >> much > >>> > >> > semantic sense because we could have frequent queue jumpers. On > >>> the > >>> > >> other > >>> > >> > hand it might not be performant, since Deque lookups provide > O(n) > >>> > >> > performance. What do you think? This is an internal data > >>> structure so we > >>> > >> > could consider replacing it with something else, but I might > just > >>> be > >>> > >> > prematurely optimising this. > >>> > >> > > >>> > >> > Thanks, > >>> > >> > Danny > >>> > >> > > >>> > >> > On Thu, Feb 13, 2025 at 5:31 PM Ahmed Hamdy < > hamdy10...@gmail.com > >>> > > >>> > >> wrote: > >>> > >> > > >>> > >> > > FYI: Created FLIP-509. > >>> > >> > > > >>> > >> > > Best Regards > >>> > >> > > Ahmed Hamdy > >>> > >> > > > >>> > >> > > > >>> > >> > > On Thu, 13 Feb 2025 at 16:30, Ahmed Hamdy < > hamdy10...@gmail.com > >>> > > >>> > >> wrote: > >>> > >> > > > >>> > >> > > > Hi Poorvank, > >>> > >> > > > thanks for the feedback, I can see your point and I kinda > >>> agree, I > >>> > >> would > >>> > >> > > > say, if you don't need the flushing trigger interface in > your > >>> use > >>> > >> case > >>> > >> > > > let's keep it out of the FLIP for now, Also let's wait for > the > >>> > >> feedback > >>> > >> > > > about that from other members. > >>> > >> > > > I will create a FLIP and assign it a number. > >>> > >> > > > > >>> > >> > > > Best Regards > >>> > >> > > > Ahmed Hamdy > >>> > >> > > > > >>> > >> > > > > >>> > >> > > > On Thu, 13 Feb 2025 at 09:15, Poorvank Bhatia < > >>> > >> puravbhat...@gmail.com> > >>> > >> > > > wrote: > >>> > >> > > > > >>> > >> > > >> Hello Ahmed, > >>> > >> > > >> Thank you for your insights! In fact, FLIP-284 > >>> > >> > > >> < > >>> > >> > > >> > >>> > >> > > > >>> > >> > >>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-284+%3A+Making+AsyncSinkWriter+Flush+triggers+adjustable > >>> > >> > > >> >'s > >>> > >> > > >> proposal aligns so well with this approach, I have added it > >>> has the > >>> > >> > > future > >>> > >> > > >> scope. > >>> > >> > > >> To answer your questions: > >>> > >> > > >> > >>> > >> > > >> I assume we will not expose batch creators to users but > only > >>> to > >>> > >> sink > >>> > >> > > >> implementers, am I correct? > >>> > >> > > >> > >>> > >> > > >> - *Yes, that is correct. The BatchCreator interface is > >>> intended > >>> > >> to be > >>> > >> > > >> used only by sink implementers, not end users. Since the > >>> > >> SinkWriter > >>> > >> > > is > >>> > >> > > >> responsible for defining how records are batched and > >>> written, > >>> > >> only > >>> > >> > > >> those > >>> > >> > > >> developing a custom sink would need access to this > >>> > >> functionality.* > >>> > >> > > >> > >>> > >> > > >> It is nit: but I would prefer adding the creator to the > >>> > >> > > >> `AsyncSinkWriterConfiguration` rather than overloading the > >>> > >> constructor, > >>> > >> > > >> there are some already implemented ways for instantiating > the > >>> > >> configs > >>> > >> > > with > >>> > >> > > >> defaults so would be simpler. > >>> > >> > > >> > >>> > >> > > >> > >>> > >> > > >> - *Yes, BatchCreator can be included in > >>> > >> AsyncSinkWriterConfiguration. > >>> > >> > > >> However, my initial reasoning for keeping it separate > was > >>> that > >>> > >> > > >> BatchCreator > >>> > >> > > >> is not just a configuration parameter—it's a functional > >>> > >> component > >>> > >> > > that > >>> > >> > > >> directly influences how records are batched. This makes > >>> it more > >>> > >> of a > >>> > >> > > >> behavioral dependency rather than a passive setting. > That > >>> said, > >>> > >> if > >>> > >> > > >> incorporating it into AsyncSinkWriterConfiguration > aligns > >>> > >> better with > >>> > >> > > >> the > >>> > >> > > >> overall design, it is a nit change and I'm open to > >>> updating the > >>> > >> > > >> approach > >>> > >> > > >> accordingly. Do let me know your thoughts, as > >>> > >> RateLimitingStartegy is > >>> > >> > > >> also > >>> > >> > > >> a part of it.* > >>> > >> > > >> > >>> > >> > > >> > >>> > >> > > >> I want to leverage some concept from FLIP-284 that we have > >>> > >> previously > >>> > >> > > >> seen in DynamoDbAsyncSink, do you think we can also > >>> customize the > >>> > >> > > trigger > >>> > >> > > >> so we can also add a "readyToFlush()" method or something > >>> similar > >>> > >> and > >>> > >> > > use > >>> > >> > > >> it in `nonBlockingFlush` methods instead of forcing the > >>> trigger on > >>> > >> only > >>> > >> > > >> total record count or buffer size. I see this useful in > your > >>> case > >>> > >> as > >>> > >> > > well > >>> > >> > > >> because if we have buffered 100 requests from 100 different > >>> > >> partitions > >>> > >> > > >> then > >>> > >> > > >> we will keep triggering and emitting batches of 1 for 100 > >>> times > >>> > >> (unless > >>> > >> > > >> maxTimeInBufferMS is used ofc) so you can use the custom > >>> trigger > >>> > >> for > >>> > >> > > >> minimum buffered records per partition. This might be a bit > >>> tricky > >>> > >> > > because > >>> > >> > > >> in order to not affect the performance we probably will > wrap > >>> the > >>> > >> whole > >>> > >> > > >> buffer in the batchCreator to maintain calculations like > >>> size in > >>> > >> bytes > >>> > >> > > or > >>> > >> > > >> per partition record count in the batch creator making it > >>> more > >>> > >> > > >> "bufferHandler" rather than a "batchCreator", let me know > >>> your > >>> > >> thoughts > >>> > >> > > >> about this > >>> > >> > > >> > >>> > >> > > >> > >>> > >> > > >> - > >>> > >> > > >> > >>> > >> > > >> *You bring up a valid concern about optimizing flush > >>> triggers to > >>> > >> > > >> prevent > >>> > >> > > >> inefficient batch emissions when multiple partitions > >>> exist, > >>> > >> > > >> particularly in > >>> > >> > > >> sinks like DynamoDB and Cassandra. IMO flushing (when to > >>> send > >>> > >> > > records) > >>> > >> > > >> and > >>> > >> > > >> batching (how to group records) should remain separate > >>> > >> concerns, as > >>> > >> > > >> they > >>> > >> > > >> serve distinct purposes.* > >>> > >> > > >> > >>> > >> > > >> *To structure this properly, I’ll use FLIP-284 as the > >>> > >> foundation:* > >>> > >> > > >> *🔹 BufferFlushTrigger (From FLIP-284) - Decides When to > >>> Flush* > >>> > >> > > >> > >>> > >> > > >> *The BufferFlushTrigger should be responsible for > >>> determining > >>> > >> when > >>> > >> > > the > >>> > >> > > >> buffer is ready to flush, based on configurable > >>> conditions such > >>> > >> as:* > >>> > >> > > >> - *Total record count (batch size threshold).* > >>> > >> > > >> - *Buffer size in bytes.* > >>> > >> > > >> - *Time-based constraints (maxTimeInBufferMS), or any > >>> other > >>> > >> custom > >>> > >> > > >> logic a sink may require.* > >>> > >> > > >> > >>> > >> > > >> *🔹 BatchCreator - Decides How to Form a Batch* > >>> > >> > > >> > >>> > >> > > >> *Once BufferFlushTrigger determines that a flush should > >>> occur, > >>> > >> > > >> BatchCreator is responsible for reading the buffered > >>> requests > >>> > >> and > >>> > >> > > >> forming a > >>> > >> > > >> batch based on partitioning rules.* > >>> > >> > > >> - *BatchCreator does not decide when to flush—it only > >>> handles > >>> > >> how > >>> > >> > > >> records are grouped when a flush is triggered.* > >>> > >> > > >> - *This separation allows partition-aware batching, > >>> ensuring > >>> > >> that > >>> > >> > > >> records from the same partition are grouped together > >>> before > >>> > >> > > >> submission.* > >>> > >> > > >> ------------------------------ > >>> > >> > > >> *🔹 Example: BatchCreator with Two Partitioning > >>> Keys**Scenario* > >>> > >> > > >> - *batchSize = 20* > >>> > >> > > >> - *The buffer contains records from two partitions.* > >>> > >> > > >> - *Assume either (a) we flush when at least 10 > records > >>> exist > >>> > >> per > >>> > >> > > >> partition.* > >>> > >> > > >> - *We apply a simple flush strategy that triggers a > >>> flush > >>> > >> when 20 > >>> > >> > > >> total records are buffered.* > >>> > >> > > >> *How BatchCreator Forms the Batch* > >>> > >> > > >> > >>> > >> > > >> *Even though BufferFlushTrigger initiates the flush when > >>> 20 > >>> > >> records > >>> > >> > > are > >>> > >> > > >> reached, BatchCreator must still decide how to structure > >>> the > >>> > >> batch. > >>> > >> > > It > >>> > >> > > >> does > >>> > >> > > >> so by:* > >>> > >> > > >> 1. *Reading the buffered records.* > >>> > >> > > >> 2. *Grouping them by partition key. (Find out the 10 > >>> batch)* > >>> > >> > > >> 3. *Creating a valid batch from these groups before > >>> > >> submitting > >>> > >> > > them > >>> > >> > > >> downstream.* > >>> > >> > > >> *What I mean is that, the buffered Requests should be > >>> there for > >>> > >> both > >>> > >> > > >> flushing and batching for optimized writes. * > >>> > >> > > >> ------------------------------ > >>> > >> > > >> *The key takeaway is that by keeping these two > interfaces > >>> > >> separate, > >>> > >> > > >> Sink > >>> > >> > > >> Writers gain flexibility—they can mix and match > different > >>> > >> batching > >>> > >> > > and > >>> > >> > > >> flushing strategies. Since there is no tight coupling > >>> between > >>> > >> them, > >>> > >> > > >> different sinks can:* > >>> > >> > > >> - *Implement a distinct batching strategy independent > >>> of the > >>> > >> > > >> flushing > >>> > >> > > >> mechanism.* > >>> > >> > > >> - *Customize flush triggers without impacting how > >>> records are > >>> > >> > > >> grouped > >>> > >> > > >> into batches.* > >>> > >> > > >> > >>> > >> > > >> I suppose these two serve complementary but different > >>> purposes, > >>> > >> *hence > >>> > >> > > >> they > >>> > >> > > >> can be implemented differently*. > >>> > >> > > >> What do you think? > >>> > >> > > >> > >>> > >> > > >> > >>> > >> > > >> On Wed, Feb 12, 2025 at 4:41 PM Ahmed Hamdy < > >>> hamdy10...@gmail.com> > >>> > >> > > wrote: > >>> > >> > > >> > >>> > >> > > >> > Hi Poorvank, > >>> > >> > > >> > Thanks for driving this, +1 (non-binding) for the FLIP in > >>> > >> general. > >>> > >> > > >> While I > >>> > >> > > >> > see some common grounds with FLIP-284 that I wrote, I > >>> prefer > >>> > >> going > >>> > >> > > with > >>> > >> > > >> a > >>> > >> > > >> > new FLIP since it is driven by a current use case and is > >>> > >> specific for > >>> > >> > > >> > tackling it. I have a couple of clarifying questions. > >>> > >> > > >> > > >>> > >> > > >> > 1- I assume we will not expose batch creators to users > but > >>> only > >>> > >> to > >>> > >> > > sink > >>> > >> > > >> > implementers, am I correct? > >>> > >> > > >> > > >>> > >> > > >> > 2- It is nit: but I would prefer adding the creator to > the > >>> > >> > > >> > `AsyncSinkWriterConfiguration` rather than overloading > the > >>> > >> > > constructor, > >>> > >> > > >> > there are some already implemented ways for instantiating > >>> the > >>> > >> configs > >>> > >> > > >> with > >>> > >> > > >> > defaults so would be simpler. > >>> > >> > > >> > > >>> > >> > > >> > 3- I want to leverage some concept from FLIP-284 that we > >>> have > >>> > >> > > previously > >>> > >> > > >> > seen in DynamoDbAsyncSink, do you think we can also > >>> customize the > >>> > >> > > >> trigger > >>> > >> > > >> > so we can also add a "readyToFlush()" method or something > >>> > >> similar and > >>> > >> > > >> use > >>> > >> > > >> > it in `nonBlockingFlush` methods instead of forcing the > >>> trigger > >>> > >> on > >>> > >> > > only > >>> > >> > > >> > total record count or buffer size. I see this useful in > >>> your > >>> > >> case as > >>> > >> > > >> well > >>> > >> > > >> > because if we have buffered 100 requests from 100 > different > >>> > >> partitions > >>> > >> > > >> then > >>> > >> > > >> > we will keep triggering and emitting batches of 1 for 100 > >>> times > >>> > >> > > (unless > >>> > >> > > >> > maxTimeInBufferMS is used ofc) so you can use the custom > >>> trigger > >>> > >> for > >>> > >> > > >> > minimum buffered records per partition. This might be a > bit > >>> > >> tricky > >>> > >> > > >> because > >>> > >> > > >> > in order to not affect the performance we probably will > >>> wrap the > >>> > >> whole > >>> > >> > > >> > buffer in the batchCreator to maintain calculations like > >>> size in > >>> > >> bytes > >>> > >> > > >> or > >>> > >> > > >> > per partition record count in the batch creator making it > >>> more > >>> > >> > > >> > "bufferHandler" rather than a "batchCreator", let me know > >>> your > >>> > >> > > thoughts > >>> > >> > > >> > about this. > >>> > >> > > >> > > >>> > >> > > >> > > >>> > >> > > >> > Best Regards > >>> > >> > > >> > Ahmed Hamdy > >>> > >> > > >> > > >>> > >> > > >> > > >>> > >> > > >> > On Tue, 11 Feb 2025 at 10:45, Poorvank Bhatia < > >>> > >> puravbhat...@gmail.com > >>> > >> > > > > >>> > >> > > >> > wrote: > >>> > >> > > >> > > >>> > >> > > >> > > Hey everyone, > >>> > >> > > >> > > > >>> > >> > > >> > > I’d like to propose adding a pluggable batching > >>> mechanism to > >>> > >> > > >> > > AsyncSinkWriter > >>> > >> > > >> > > < > >>> > >> > > >> > > > >>> > >> > > >> > > >>> > >> > > >> > >>> > >> > > > >>> > >> > >>> > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L351 > >>> > >> > > >> > > > > >>> > >> > > >> > > to enable custom batch formation strategies. > >>> > >> > > >> > > Currently, batching is based on batch size and record > >>> count, > >>> > >> but > >>> > >> > > this > >>> > >> > > >> > > approach is suboptimal for sinks like Cassandra, which > >>> require > >>> > >> > > >> > > partition-aware batching. Specifically, batches should > be > >>> > >> formed so > >>> > >> > > >> that > >>> > >> > > >> > > all requests within a batch belong to the same > partition, > >>> > >> ensuring > >>> > >> > > >> more > >>> > >> > > >> > > efficient writes. > >>> > >> > > >> > > > >>> > >> > > >> > > The proposal introduces a minimal `BatchCreator` > >>> interface, > >>> > >> enabling > >>> > >> > > >> > users > >>> > >> > > >> > > to define custom batching strategies while maintaining > >>> backward > >>> > >> > > >> > > compatibility with a default implementation. > >>> > >> > > >> > > > >>> > >> > > >> > > For full details, please refer to the proposal document > >>> > >> > > >> > > < > >>> > >> > > >> > > > >>> > >> > > >> > > >>> > >> > > >> > >>> > >> > > > >>> > >> > >>> > https://docs.google.com/document/d/1XI2DV-8r-kOwbMd2ZMdV4u0Q_s5m8Stojv4HSdJA8ZU/edit?tab=t.0#heading=h.n4fv4r64xk2f > >>> > >> > > >> > > > > >>> > >> > > >> > > . > >>> > >> > > >> > > Associated Jira < > >>> > >> https://issues.apache.org/jira/browse/FLINK-37298> > >>> > >> > > >> > > > >>> > >> > > >> > > Thanks, > >>> > >> > > >> > > Poorvank > >>> > >> > > >> > > > >>> > >> > > >> > > >>> > >> > > >> > >>> > >> > > > > >>> > >> > > > >>> > >> > >>> > > > >>> > >> >