hlteoh37 commented on code in PR #26274:
URL: https://github.com/apache/flink/pull/26274#discussion_r2025060911


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -101,19 +101,25 @@ public abstract class AsyncSinkWriter<InputT, 
RequestEntryT extends Serializable
 
     /**
      * Buffer to hold request entries that should be persisted into the 
destination, along with its
-     * size in bytes.
+     * total size in bytes.
      *
-     * <p>A request entry contain all relevant details to make a call to the 
destination. Eg, for
-     * Kinesis Data Streams a request entry contains the payload and partition 
key.
+     * <p>This buffer is managed using {@link BufferWrapper}, allowing sink 
implementations to
+     * define their own optimized buffering strategies. By default, {@link 
DequeBufferWrapper} is
+     * used.
      *
-     * <p>It seems more natural to buffer InputT, ie, the events that should 
be persisted, rather
-     * than RequestEntryT. However, in practice, the response of a failed 
request call can make it
-     * very hard, if not impossible, to reconstruct the original event. It is 
much easier, to just
-     * construct a new (retry) request entry from the response and add that 
back to the queue for
-     * later retry.
+     * <p>The buffer stores {@link RequestEntryWrapper} objects rather than 
raw {@link

Review Comment:
   Let's keep the original explanation around why we chose `RequestEntryT` 
instead of `InputT` as the generic buffer type. 
   
   nit: The explanation around using `RequestEntryWrapper` in the interface of 
the `BufferWrapper` seems abit out of place here since we pass in generic type 
of `RequestEntryWrapper` here! Should we move it to the `BufferWrapper` class 
instead?



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -245,10 +260,17 @@ public AsyncSinkWriter(
         this.rateLimitingStrategy = configuration.getRateLimitingStrategy();
         this.requestTimeoutMS = configuration.getRequestTimeoutMS();
         this.failOnTimeout = configuration.isFailOnTimeout();
-
+        this.bufferedRequestEntries =
+                bufferedRequestEntries == null
+                        ? new 
DequeBufferWrapper.Builder<RequestEntryT>().build()
+                        : bufferedRequestEntries;

Review Comment:
   nit: Should we just remove the `Wrapper` bit? We could simply name it 
`Buffer` instead, or `RequestBuffer`?



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -303,7 +325,7 @@ public void write(InputT element, Context context) throws 
IOException, Interrupt
     private void nonBlockingFlush() throws InterruptedException {
         while (!rateLimitingStrategy.shouldBlock(createRequestInfo())
                 && (bufferedRequestEntries.size() >= getNextBatchSizeLimit()
-                        || bufferedRequestEntriesTotalSizeInBytes >= 
maxBatchSizeInBytes)) {
+                        || bufferedRequestEntries.totalSizeInBytes() >= 
maxBatchSizeInBytes)) {

Review Comment:
   Would be better to rename `maxBatchSizeInBytes` here as something more 
meaningful like `flushThresholdBytes`.
   
   Previously, `maxBatchSizeInBytes` was used in two places
   1/ Constructing batch (prevent the constructed batch from exceeding X 
bytes). This logic is in `createNextAvailableBatch()`.
   2/ Determine whether to flush. If the buffer has records of size greater 
than `maxBatchSizeInBytes`, flush the records. This logic is in 
`nonBlockingFlush()`.
   
   Since we are delegating responsibility of (1) to the `batchCreator`, it 
would be clearer if we renamed this to `flushThresholdBytes`? Happy to rename 
internally first, and followup with a separate PR to clean up the configuration 
interfaces in backwards compatible manner.



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -213,11 +213,26 @@ protected void submitRequestEntries(
      */
     protected abstract long getSizeInBytes(RequestEntryT requestEntry);
 
+    /**
+     * This constructor is deprecated. Users should use {@link 
#AsyncSinkWriter(ElementConverter,
+     * WriterInitContext, AsyncSinkWriterConfiguration, Collection, 
BatchCreator, BufferWrapper)}.
+     */
+    @Deprecated
     public AsyncSinkWriter(
             ElementConverter<InputT, RequestEntryT> elementConverter,
             WriterInitContext context,
             AsyncSinkWriterConfiguration configuration,
             Collection<BufferedRequestState<RequestEntryT>> states) {
+        this(elementConverter, context, configuration, states, null, null);
+    }
+
+    public AsyncSinkWriter(
+            ElementConverter<InputT, RequestEntryT> elementConverter,
+            WriterInitContext context,
+            AsyncSinkWriterConfiguration configuration,
+            Collection<BufferedRequestState<RequestEntryT>> states,
+            @Nullable BatchCreator<RequestEntryT> batchCreator,
+            @Nullable BufferWrapper<RequestEntryT> bufferedRequestEntries) {

Review Comment:
   Hmm, it would be cleaner (and easier to maintain backwards compatibility if 
we set up the configuration for `batchCreator` / `bufferedRequestEntries` in 
the `AsyncSinkWriterConfiguration` object.
   
   This also removes the need for sink implementors to make any changes in 
their code signature at all! :) 
   
   ### Reasoning
   With the current proposal, sink implementers using the default 
implementation would need to pass null to the new constructor. If implementers 
want to override just one, they would need to also ensure the right argument is 
`null`.
   ```
   super(elementConverter, context, configuration, states, null, null);
   // Only override batchCreator
   super(elementConverter, context, configuration, states, batchCreator, null);
   // Only override bufferWrapper
   super(elementConverter, context, configuration, states, null, bufferWrapper);
   ```
   
   Compare this instead to setting this in the `AsyncSinkWriterConfiguration` 
object.
   
   ```
   // specify both options
   AsyncSinkWriterConfiguration.builder()
     .setBatchCreator(batchCreator)
     .setBufferWrapper(bufferWrapper)
     .build()
   
   // specify single option
   AsyncSinkWriterConfiguration.builder()
     .setBatchCreator(batchCreator)
     .build()
   
   // use defaults
   AsyncSinkWriterConfiguration.builder()
     .build()
   ```
   



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -327,7 +349,12 @@ private void flush() throws InterruptedException {
             requestInfo = createRequestInfo();
         }
 
-        List<RequestEntryT> batch = createNextAvailableBatch(requestInfo);
+        Batch<RequestEntryT> batchCreationResult =

Review Comment:
   nit: rename as `batch` instead of `batchCreationResult`



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -327,7 +349,12 @@ private void flush() throws InterruptedException {
             requestInfo = createRequestInfo();
         }
 
-        List<RequestEntryT> batch = createNextAvailableBatch(requestInfo);
+        Batch<RequestEntryT> batchCreationResult =
+                batchCreator.createNextBatch(requestInfo, 
bufferedRequestEntries);

Review Comment:
   We should make clear that this method does 2 things
   1/ Mutate `bufferedRequestEntries` data structure.
   2/ Return the created batch. 
   
   This means that we need to make clear expectations and guarantees around
   1/ **Concurrent calls / thread safety.** We need to make sure that a new 
instance of `bufferWrapper` is created per parallelism of async sink. AND that 
for each parallelism, `batchCreator.createNextBatch()` is not called 
concurrently (otherwise implementors need to ensure thread safe calling. We 
enforce item (2) at the moment because we call this as part of the `flush()`, 
which runs on main Flink thread. We should make this clear on the interface.
   2/ Error handling. Users need to ensure that the interaction between both 
implementations handle the errors well (don't pull from buffer, but not put in 
batch).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to