Poorvankbhatia commented on code in PR #26274:
URL: https://github.com/apache/flink/pull/26274#discussion_r2025421752


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -101,19 +101,25 @@ public abstract class AsyncSinkWriter<InputT, 
RequestEntryT extends Serializable
 
     /**
      * Buffer to hold request entries that should be persisted into the 
destination, along with its
-     * size in bytes.
+     * total size in bytes.
      *
-     * <p>A request entry contain all relevant details to make a call to the 
destination. Eg, for
-     * Kinesis Data Streams a request entry contains the payload and partition 
key.
+     * <p>This buffer is managed using {@link BufferWrapper}, allowing sink 
implementations to
+     * define their own optimized buffering strategies. By default, {@link 
DequeBufferWrapper} is
+     * used.
      *
-     * <p>It seems more natural to buffer InputT, ie, the events that should 
be persisted, rather
-     * than RequestEntryT. However, in practice, the response of a failed 
request call can make it
-     * very hard, if not impossible, to reconstruct the original event. It is 
much easier, to just
-     * construct a new (retry) request entry from the response and add that 
back to the queue for
-     * later retry.
+     * <p>The buffer stores {@link RequestEntryWrapper} objects rather than 
raw {@link

Review Comment:
   Done. Please check.



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -213,11 +213,26 @@ protected void submitRequestEntries(
      */
     protected abstract long getSizeInBytes(RequestEntryT requestEntry);
 
+    /**
+     * This constructor is deprecated. Users should use {@link 
#AsyncSinkWriter(ElementConverter,
+     * WriterInitContext, AsyncSinkWriterConfiguration, Collection, 
BatchCreator, BufferWrapper)}.
+     */
+    @Deprecated
     public AsyncSinkWriter(
             ElementConverter<InputT, RequestEntryT> elementConverter,
             WriterInitContext context,
             AsyncSinkWriterConfiguration configuration,
             Collection<BufferedRequestState<RequestEntryT>> states) {
+        this(elementConverter, context, configuration, states, null, null);
+    }
+
+    public AsyncSinkWriter(
+            ElementConverter<InputT, RequestEntryT> elementConverter,
+            WriterInitContext context,
+            AsyncSinkWriterConfiguration configuration,
+            Collection<BufferedRequestState<RequestEntryT>> states,
+            @Nullable BatchCreator<RequestEntryT> batchCreator,
+            @Nullable BufferWrapper<RequestEntryT> bufferedRequestEntries) {

Review Comment:
   Yeah, I thought about it. Moving batchCreator and bufferWrapper into 
AsyncSinkWriterConfiguration does help with backward compatibility and 
simplifies usage for sink implementors.
   
   The only hesitation I had was around type safety. Since 
AsyncSinkWriterConfiguration would store them as:
   
   ```
   private BatchCreator<?> batchCreator;
   private BufferWrapper<?> bufferWrapper;
   ```
   and i would need to cast them when retrieving (the casting is needed because 
AsyncSinkWriterConfiguration isn't generic and doesn't know about 
RequestEntryT): 
   ```
   
   this.batchCreator = (BatchCreator<RequestEntryT>) 
configuration.getBatchCreator();
   this.bufferWrapper = (BufferWrapper<RequestEntryT>) 
configuration.getBufferWrapper();
   ```
   
   This means we lose compile-time type guarantees at the point of usage inside 
AsyncSinkWriter. If someone accidentally wires a mismatched generic type, we 
wouldn't catch it at compile time—it would fail at runtime. Let me know if you 
think there is a better approach, as i was under the impression that 
AsyncSinkWriterConfiguration is meant to be a non-generic configuration class.



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -245,10 +260,17 @@ public AsyncSinkWriter(
         this.rateLimitingStrategy = configuration.getRateLimitingStrategy();
         this.requestTimeoutMS = configuration.getRequestTimeoutMS();
         this.failOnTimeout = configuration.isFailOnTimeout();
-
+        this.bufferedRequestEntries =
+                bufferedRequestEntries == null
+                        ? new 
DequeBufferWrapper.Builder<RequestEntryT>().build()
+                        : bufferedRequestEntries;

Review Comment:
   Done. Renamed BufferWrapper as RequestBuffer, and DequeRequestBuffer intead 
of DequeBufferWrapper.



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -303,7 +325,7 @@ public void write(InputT element, Context context) throws 
IOException, Interrupt
     private void nonBlockingFlush() throws InterruptedException {
         while (!rateLimitingStrategy.shouldBlock(createRequestInfo())
                 && (bufferedRequestEntries.size() >= getNextBatchSizeLimit()
-                        || bufferedRequestEntriesTotalSizeInBytes >= 
maxBatchSizeInBytes)) {
+                        || bufferedRequestEntries.totalSizeInBytes() >= 
maxBatchSizeInBytes)) {

Review Comment:
   Thanks for the suggestion — I’ve renamed the internal field to 
flushThresholdBytes in AsyncSinkWriter to reflect that it's now only used for 
triggering flushes, while the actual batch size limit is enforced by 
BatchCreator.
   
   The config key in AsyncSinkWriterConfiguration is still called 
maxBatchSizeInBytes, and I’m continuing to pass that into SimpleBatchCreator 
since it still uses it to cap batch sizes. So semantically, both use cases are 
still covered — just with clearer separation at the usage site. Let me know if 
this makes sense.



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -327,7 +349,12 @@ private void flush() throws InterruptedException {
             requestInfo = createRequestInfo();
         }
 
-        List<RequestEntryT> batch = createNextAvailableBatch(requestInfo);
+        Batch<RequestEntryT> batchCreationResult =

Review Comment:
   Oh. I create a batch variable from the batchCreationResult in the next line 
, so can't rename it. would you suggest another name? 
   
   ```
   Batch<RequestEntryT> batchCreationResult =
                   batchCreator.createNextBatch(requestInfo, 
bufferedRequestEntries);
           List<RequestEntryT> batch = batchCreationResult.getBatchEntries();
   ```



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -327,7 +349,12 @@ private void flush() throws InterruptedException {
             requestInfo = createRequestInfo();
         }
 
-        List<RequestEntryT> batch = createNextAvailableBatch(requestInfo);
+        Batch<RequestEntryT> batchCreationResult =
+                batchCreator.createNextBatch(requestInfo, 
bufferedRequestEntries);

Review Comment:
   Great call out. Thank you. Updated the java doc for `createNextBatch()` in 
`BatchCreator` interface. Please check and let me know if that makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to