hlteoh37 commented on code in PR #26274:
URL: https://github.com/apache/flink/pull/26274#discussion_r2025160358


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -327,7 +349,12 @@ private void flush() throws InterruptedException {
             requestInfo = createRequestInfo();
         }
 
-        List<RequestEntryT> batch = createNextAvailableBatch(requestInfo);
+        Batch<RequestEntryT> batchCreationResult =
+                batchCreator.createNextBatch(requestInfo, 
bufferedRequestEntries);

Review Comment:
   We should make clear that this method does 2 things:
   1/ Mutate `bufferedRequestEntries` 
   2/ Return the created batch. 
   
   This means that we need to make clear expectations and guarantees around:
   1/ **Concurrent calls / thread safety.** We need to make sure that a new 
instance of `bufferWrapper` is created per parallelism of async sink. AND that 
for each parallelism, `batchCreator.createNextBatch()` is not called 
concurrently (otherwise implementors need to ensure thread safe calling. We 
enforce item (2) at the moment because we call this as part of the `flush()`, 
which runs on main Flink thread. We should make this clear on the interface.
   2/ Error handling. Users need to ensure that the interaction between both 
implementations handle the errors well (don't pull from buffer, but not put in 
batch).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to