Hi Poorvank, I don't have strong feelings about the actual choice of technology. SPI is used rather heavily in Flink Table/SQL and is also used in similar projects more and more. But SPI also uses reflection under the hood, so it doesn't make a huge difference except from a usability perspective. SPI has some tool support. IDEA, for example, allows you to jump back and forth between the service definition and the implementations.
In any case, protected methods that are called in the ctor cannot be overridden because at the point where the base object is initialized the VMT doesn't contain references to methods of the subclasses. That is a deliberate choice of the VM engineers to avoid cases where an overridden method accesses fields that have not been initialized yet because the subclass ctor wasn't invoked yet. Also I'm not sure if class loading would succeed if you have a method that returns an unknown class (let's say you want to run your connector on Flink 1.X, which was built against Flink 2.1). I'm assuming it will fail. So we need other choices. I'm curious if annotations could be used (in conjunction with reflection again). I'm guessing classloading will also fail if an unknown annotation is attached or loading of the annotation itself will fail if the referenced class cannot be loaded. If it works somehow, we could say: @BatchedWith(MyBatchCreator.class) class MyConnector extends Async... Unfortunately, I don't have time to try out these ideas. Maybe you have. And in any case, if it gets too complicated, we can always forgo backwards compatibility and require specific builds for Flink 2.1+. This may also be the easier option in case the connector doesn't perform too well without the BatchCreator anyways. Best, Arvid On Thu, Feb 20, 2025 at 6:56 PM Poorvank Bhatia <puravbhat...@gmail.com> wrote: > > Hey Arvid, > > Thank you for your feedback and for taking the time to review this > proposal. > To answer your concerns: > > *Naming Suggestion: BatchCreationResult → Batch * > This is a valid point, and I agree that BatchCreationResult is essentially > representing a Batch. The current name was chosen to reflect that it > encapsulates metadata such as batch size and count, in addition to the > entries. I’ll update the proposal to reflect this suggestion and rename > the class accordingly. > > *Compatibility Concerns*: You are on point regarding backward compatibility > and avoiding API breakages in connectors. > Based on our discussion offline, I have removed the BatchCreator interface > from the constructor and it is not exposed in the public API of > AsyncSinkWriter. > Instead of requiring direct injection via the constructor, the > implementation now uses protected factory methods (createBatchCreator()), > allowing subclasses to override the behavior without modifying the base > API. > If no subclass overrides the methods, Flink will continue using the default > SimpleBatchCreator and DequeBufferWrapper, maintaining full backward > compatibility. > You suggested using Java Reflection to dynamically instantiate > BatchCreator. While reflection offers flexibility, I aimed to avoid it > primarily due to performance overhead & guidelines > <https://flink.apache.org/how-to-contribute/code-style-and-quality-java/#java-reflection>. > That said, since reflection would only be used during initialization, the > impact should be minimal. An alternative approach you suggested for Service > Provider Interface (SPI), which allows for dynamic discovery of > implementations. However, it requires sink implementers to define a > META-INF/services file, which I was trying to avoid for simplicity. > If you think SPI is a better approach as compared to simple factor methods > i can add a simple loader: > > *private BatchCreator<RequestEntryT> loadBatchCreator() { * > * ServiceLoader<BatchCreator> loader = > ServiceLoader.load(BatchCreator.class); * > * for (BatchCreator<RequestEntryT> creator : loader) { * > * return creator; * > * } * > * return new SimpleBatchCreator<>(maxBatchSizeInBytes); * > *} * > > Let me know what makes the most sense to you, and I can either replace the > factory methods with SPI loaders or keep them as they are. > > On Thu, Feb 20, 2025 at 10:26 PM Poorvank Bhatia <puravbhat...@gmail.com> > wrote: > > > Hey Danny, > > > > Thank you for the review. Your observation is absolutely valid, and moving > > away from *Deque<RequestEntryWrapper<RequestEntryT>> > > bufferedRequestEntries* in favor of a more flexible abstraction makes > > sense. > > *Given the exposure of batch creators, Deque may not be the most > > semantically appropriate or performant choice. To address this, a more > > adaptable buffering abstraction that better aligns with different batching > > strategies can be used.* > > > > *So I Updated the proposal and* > > *Introduced BufferWrapper Interface:* > > > > - *Defines an abstraction for managing buffered request entries. * > > - *Supports FIFO, priority-based inserts, and custom queuing > > mechanisms. * > > - *Provides essential operations like add, poll, peek, and > > getBufferedState. * > > > > *Implemented Default DequeBufferWrapper * > > > > - *Uses ArrayDeque for efficient FIFO behavior while allowing > > prioritized inserts. * > > - *Maintains backward compatibility with minimal overhead. * > > > > *Modified AsyncSinkWriter to Use BufferWrapper Instead of Deque * > > > > - *bufferedRequestEntries is now of type BufferWrapper<RequestEntryT>, > > making the choice of buffer implementation flexible. * > > - *A createBuffer() method initializes DequeBufferWrapper by default. * > > > > *Updated BatchCreator and SimpleBatchCreator * > > > > - *Modified method signatures to work with > > BufferWrapper<RequestEntryT> instead of > > Deque<RequestEntryWrapper<RequestEntryT>>. * > > - *Ensures better encapsulation and allows future optimizations in > > buffering strategies.* > > > > I have added this interface and updated the doc. Please have a look and > > let me know if this makes sense. > > > > > > On Thu, Feb 20, 2025 at 5:34 PM Arvid Heise <ahe...@confluent.io.invalid> > > wrote: > > > >> Hi Poorvank, > >> > >> thanks for putting this together. It's obvious to me that this is a > >> good addition. I wish Ahmed could check if his proposals are > >> compatible with yours, so we don't end up with two different ways to > >> express the same thing. Ideally Ahmed's proposal could be retrofitted > >> to extend on yours. > >> > >> I have a small nit and a larger concern. The nit is to rename > >> BatchCreationResult into Batch because that's what it is. > >> > >> My main concern is around compatibility. If the new interfaces end up > >> in the signature of the AsyncSinkWriter, we will not be able to run a > >> connector built on top of it with an older Flink version. Either we > >> explicitly call it out or we find a way to avoid that scenario. The > >> connector community had some negative experiences around evolving > >> APIs. > >> > >> One way to avoid breaking compatibility is to provide add-ons where > >> the interfaces are not baked into the AsyncSinkWriter's API. Consider > >> your `BatchCreator` interface. If it's just used within the > >> AsyncSinkWriter (private field, private initialization), then we could > >> run the same connector on an older Flink version. In that case, > >> batching just wouldn't be enabled. Of course, all of that only matters > >> if it's even viable to run the connector without batching. If not > >> then, just make the connector version depend at least on Flink 2.1+ or > >> whenever this feature lands. > >> > >> Now the question arises how to initialize the BatchCreator. Your > >> proposal injects it but maybe we can use some kind of discovery > >> instead (e.g. SPI, convention, and/or annotations). Here is a naive > >> sketch with reflection using a convention (I'd prefer SPI though). > >> > >> public AsyncSinkWriter( > >> ElementConverter<InputT, RequestEntryT> elementConverter, > >> WriterInitContext context, > >> AsyncSinkWriterConfiguration configuration, > >> Collection<BufferedRequestState<RequestEntryT>> states) { > >> this(elementConverter, new Sink.InitContextWrapper(context), > >> configuration, states); > >> > >> try { > >> batchCreator = > >> (BatchCreator) > >> Class.forName(getClass().getName() + > >> "BatchCreator").newInstance(); > >> } catch (ClassNotFoundException e) { > >> batchCreator = BatchCreator.DEFAULT; > >> } catch (IllegalAccessException | InstantiationException e) { > >> throw new RuntimeException(e); > >> } > >> } > >> > >> class CassandraBatchCreator implements BatchCreator { ... } > >> > >> Best, > >> > >> Arvid > >> > >> > >> On Wed, Feb 19, 2025 at 11:07 PM Danny Cranmer <dannycran...@apache.org> > >> wrote: > >> > > >> > Hello Poorvank, > >> > > >> > Thanks for opening this discussion/FLIP. I am +1 for the idea, it seems > >> > like a great enhancement to the AsyncSink. > >> > > >> > Just one question/observation from me. We currently use a "Deque<..> > >> > bufferedRequestEntries". Now we are exposing these batch creators; it > >> might > >> > not make sense to use a queue anymore. On one hand it does not make as > >> much > >> > semantic sense because we could have frequent queue jumpers. On the > >> other > >> > hand it might not be performant, since Deque lookups provide O(n) > >> > performance. What do you think? This is an internal data structure so we > >> > could consider replacing it with something else, but I might just be > >> > prematurely optimising this. > >> > > >> > Thanks, > >> > Danny > >> > > >> > On Thu, Feb 13, 2025 at 5:31 PM Ahmed Hamdy <hamdy10...@gmail.com> > >> wrote: > >> > > >> > > FYI: Created FLIP-509. > >> > > > >> > > Best Regards > >> > > Ahmed Hamdy > >> > > > >> > > > >> > > On Thu, 13 Feb 2025 at 16:30, Ahmed Hamdy <hamdy10...@gmail.com> > >> wrote: > >> > > > >> > > > Hi Poorvank, > >> > > > thanks for the feedback, I can see your point and I kinda agree, I > >> would > >> > > > say, if you don't need the flushing trigger interface in your use > >> case > >> > > > let's keep it out of the FLIP for now, Also let's wait for the > >> feedback > >> > > > about that from other members. > >> > > > I will create a FLIP and assign it a number. > >> > > > > >> > > > Best Regards > >> > > > Ahmed Hamdy > >> > > > > >> > > > > >> > > > On Thu, 13 Feb 2025 at 09:15, Poorvank Bhatia < > >> puravbhat...@gmail.com> > >> > > > wrote: > >> > > > > >> > > >> Hello Ahmed, > >> > > >> Thank you for your insights! In fact, FLIP-284 > >> > > >> < > >> > > >> > >> > > > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-284+%3A+Making+AsyncSinkWriter+Flush+triggers+adjustable > >> > > >> >'s > >> > > >> proposal aligns so well with this approach, I have added it has the > >> > > future > >> > > >> scope. > >> > > >> To answer your questions: > >> > > >> > >> > > >> I assume we will not expose batch creators to users but only to > >> sink > >> > > >> implementers, am I correct? > >> > > >> > >> > > >> - *Yes, that is correct. The BatchCreator interface is intended > >> to be > >> > > >> used only by sink implementers, not end users. Since the > >> SinkWriter > >> > > is > >> > > >> responsible for defining how records are batched and written, > >> only > >> > > >> those > >> > > >> developing a custom sink would need access to this > >> functionality.* > >> > > >> > >> > > >> It is nit: but I would prefer adding the creator to the > >> > > >> `AsyncSinkWriterConfiguration` rather than overloading the > >> constructor, > >> > > >> there are some already implemented ways for instantiating the > >> configs > >> > > with > >> > > >> defaults so would be simpler. > >> > > >> > >> > > >> > >> > > >> - *Yes, BatchCreator can be included in > >> AsyncSinkWriterConfiguration. > >> > > >> However, my initial reasoning for keeping it separate was that > >> > > >> BatchCreator > >> > > >> is not just a configuration parameter—it's a functional > >> component > >> > > that > >> > > >> directly influences how records are batched. This makes it more > >> of a > >> > > >> behavioral dependency rather than a passive setting. That said, > >> if > >> > > >> incorporating it into AsyncSinkWriterConfiguration aligns > >> better with > >> > > >> the > >> > > >> overall design, it is a nit change and I'm open to updating the > >> > > >> approach > >> > > >> accordingly. Do let me know your thoughts, as > >> RateLimitingStartegy is > >> > > >> also > >> > > >> a part of it.* > >> > > >> > >> > > >> > >> > > >> I want to leverage some concept from FLIP-284 that we have > >> previously > >> > > >> seen in DynamoDbAsyncSink, do you think we can also customize the > >> > > trigger > >> > > >> so we can also add a "readyToFlush()" method or something similar > >> and > >> > > use > >> > > >> it in `nonBlockingFlush` methods instead of forcing the trigger on > >> only > >> > > >> total record count or buffer size. I see this useful in your case > >> as > >> > > well > >> > > >> because if we have buffered 100 requests from 100 different > >> partitions > >> > > >> then > >> > > >> we will keep triggering and emitting batches of 1 for 100 times > >> (unless > >> > > >> maxTimeInBufferMS is used ofc) so you can use the custom trigger > >> for > >> > > >> minimum buffered records per partition. This might be a bit tricky > >> > > because > >> > > >> in order to not affect the performance we probably will wrap the > >> whole > >> > > >> buffer in the batchCreator to maintain calculations like size in > >> bytes > >> > > or > >> > > >> per partition record count in the batch creator making it more > >> > > >> "bufferHandler" rather than a "batchCreator", let me know your > >> thoughts > >> > > >> about this > >> > > >> > >> > > >> > >> > > >> - > >> > > >> > >> > > >> *You bring up a valid concern about optimizing flush triggers to > >> > > >> prevent > >> > > >> inefficient batch emissions when multiple partitions exist, > >> > > >> particularly in > >> > > >> sinks like DynamoDB and Cassandra. IMO flushing (when to send > >> > > records) > >> > > >> and > >> > > >> batching (how to group records) should remain separate > >> concerns, as > >> > > >> they > >> > > >> serve distinct purposes.* > >> > > >> > >> > > >> *To structure this properly, I’ll use FLIP-284 as the > >> foundation:* > >> > > >> *🔹 BufferFlushTrigger (From FLIP-284) - Decides When to Flush* > >> > > >> > >> > > >> *The BufferFlushTrigger should be responsible for determining > >> when > >> > > the > >> > > >> buffer is ready to flush, based on configurable conditions such > >> as:* > >> > > >> - *Total record count (batch size threshold).* > >> > > >> - *Buffer size in bytes.* > >> > > >> - *Time-based constraints (maxTimeInBufferMS), or any other > >> custom > >> > > >> logic a sink may require.* > >> > > >> > >> > > >> *🔹 BatchCreator - Decides How to Form a Batch* > >> > > >> > >> > > >> *Once BufferFlushTrigger determines that a flush should occur, > >> > > >> BatchCreator is responsible for reading the buffered requests > >> and > >> > > >> forming a > >> > > >> batch based on partitioning rules.* > >> > > >> - *BatchCreator does not decide when to flush—it only handles > >> how > >> > > >> records are grouped when a flush is triggered.* > >> > > >> - *This separation allows partition-aware batching, ensuring > >> that > >> > > >> records from the same partition are grouped together before > >> > > >> submission.* > >> > > >> ------------------------------ > >> > > >> *🔹 Example: BatchCreator with Two Partitioning Keys**Scenario* > >> > > >> - *batchSize = 20* > >> > > >> - *The buffer contains records from two partitions.* > >> > > >> - *Assume either (a) we flush when at least 10 records exist > >> per > >> > > >> partition.* > >> > > >> - *We apply a simple flush strategy that triggers a flush > >> when 20 > >> > > >> total records are buffered.* > >> > > >> *How BatchCreator Forms the Batch* > >> > > >> > >> > > >> *Even though BufferFlushTrigger initiates the flush when 20 > >> records > >> > > are > >> > > >> reached, BatchCreator must still decide how to structure the > >> batch. > >> > > It > >> > > >> does > >> > > >> so by:* > >> > > >> 1. *Reading the buffered records.* > >> > > >> 2. *Grouping them by partition key. (Find out the 10 batch)* > >> > > >> 3. *Creating a valid batch from these groups before > >> submitting > >> > > them > >> > > >> downstream.* > >> > > >> *What I mean is that, the buffered Requests should be there for > >> both > >> > > >> flushing and batching for optimized writes. * > >> > > >> ------------------------------ > >> > > >> *The key takeaway is that by keeping these two interfaces > >> separate, > >> > > >> Sink > >> > > >> Writers gain flexibility—they can mix and match different > >> batching > >> > > and > >> > > >> flushing strategies. Since there is no tight coupling between > >> them, > >> > > >> different sinks can:* > >> > > >> - *Implement a distinct batching strategy independent of the > >> > > >> flushing > >> > > >> mechanism.* > >> > > >> - *Customize flush triggers without impacting how records are > >> > > >> grouped > >> > > >> into batches.* > >> > > >> > >> > > >> I suppose these two serve complementary but different purposes, > >> *hence > >> > > >> they > >> > > >> can be implemented differently*. > >> > > >> What do you think? > >> > > >> > >> > > >> > >> > > >> On Wed, Feb 12, 2025 at 4:41 PM Ahmed Hamdy <hamdy10...@gmail.com> > >> > > wrote: > >> > > >> > >> > > >> > Hi Poorvank, > >> > > >> > Thanks for driving this, +1 (non-binding) for the FLIP in > >> general. > >> > > >> While I > >> > > >> > see some common grounds with FLIP-284 that I wrote, I prefer > >> going > >> > > with > >> > > >> a > >> > > >> > new FLIP since it is driven by a current use case and is > >> specific for > >> > > >> > tackling it. I have a couple of clarifying questions. > >> > > >> > > >> > > >> > 1- I assume we will not expose batch creators to users but only > >> to > >> > > sink > >> > > >> > implementers, am I correct? > >> > > >> > > >> > > >> > 2- It is nit: but I would prefer adding the creator to the > >> > > >> > `AsyncSinkWriterConfiguration` rather than overloading the > >> > > constructor, > >> > > >> > there are some already implemented ways for instantiating the > >> configs > >> > > >> with > >> > > >> > defaults so would be simpler. > >> > > >> > > >> > > >> > 3- I want to leverage some concept from FLIP-284 that we have > >> > > previously > >> > > >> > seen in DynamoDbAsyncSink, do you think we can also customize the > >> > > >> trigger > >> > > >> > so we can also add a "readyToFlush()" method or something > >> similar and > >> > > >> use > >> > > >> > it in `nonBlockingFlush` methods instead of forcing the trigger > >> on > >> > > only > >> > > >> > total record count or buffer size. I see this useful in your > >> case as > >> > > >> well > >> > > >> > because if we have buffered 100 requests from 100 different > >> partitions > >> > > >> then > >> > > >> > we will keep triggering and emitting batches of 1 for 100 times > >> > > (unless > >> > > >> > maxTimeInBufferMS is used ofc) so you can use the custom trigger > >> for > >> > > >> > minimum buffered records per partition. This might be a bit > >> tricky > >> > > >> because > >> > > >> > in order to not affect the performance we probably will wrap the > >> whole > >> > > >> > buffer in the batchCreator to maintain calculations like size in > >> bytes > >> > > >> or > >> > > >> > per partition record count in the batch creator making it more > >> > > >> > "bufferHandler" rather than a "batchCreator", let me know your > >> > > thoughts > >> > > >> > about this. > >> > > >> > > >> > > >> > > >> > > >> > Best Regards > >> > > >> > Ahmed Hamdy > >> > > >> > > >> > > >> > > >> > > >> > On Tue, 11 Feb 2025 at 10:45, Poorvank Bhatia < > >> puravbhat...@gmail.com > >> > > > > >> > > >> > wrote: > >> > > >> > > >> > > >> > > Hey everyone, > >> > > >> > > > >> > > >> > > I’d like to propose adding a pluggable batching mechanism to > >> > > >> > > AsyncSinkWriter > >> > > >> > > < > >> > > >> > > > >> > > >> > > >> > > >> > >> > > > >> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L351 > >> > > >> > > > > >> > > >> > > to enable custom batch formation strategies. > >> > > >> > > Currently, batching is based on batch size and record count, > >> but > >> > > this > >> > > >> > > approach is suboptimal for sinks like Cassandra, which require > >> > > >> > > partition-aware batching. Specifically, batches should be > >> formed so > >> > > >> that > >> > > >> > > all requests within a batch belong to the same partition, > >> ensuring > >> > > >> more > >> > > >> > > efficient writes. > >> > > >> > > > >> > > >> > > The proposal introduces a minimal `BatchCreator` interface, > >> enabling > >> > > >> > users > >> > > >> > > to define custom batching strategies while maintaining backward > >> > > >> > > compatibility with a default implementation. > >> > > >> > > > >> > > >> > > For full details, please refer to the proposal document > >> > > >> > > < > >> > > >> > > > >> > > >> > > >> > > >> > >> > > > >> https://docs.google.com/document/d/1XI2DV-8r-kOwbMd2ZMdV4u0Q_s5m8Stojv4HSdJA8ZU/edit?tab=t.0#heading=h.n4fv4r64xk2f > >> > > >> > > > > >> > > >> > > . > >> > > >> > > Associated Jira < > >> https://issues.apache.org/jira/browse/FLINK-37298> > >> > > >> > > > >> > > >> > > Thanks, > >> > > >> > > Poorvank > >> > > >> > > > >> > > >> > > >> > > >> > >> > > > > >> > > > >> > >