hlteoh37 commented on code in PR #26274:
URL: https://github.com/apache/flink/pull/26274#discussion_r2025160358
##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -327,7 +349,12 @@ private void flush() throws InterruptedException {
requestInfo = createRequestInfo();
}
- List<RequestEntryT> batch = createNextAvailableBatch(requestInfo);
+ Batch<RequestEntryT> batchCreationResult =
+ batchCreator.createNextBatch(requestInfo,
bufferedRequestEntries);
Review Comment:
We should make clear that this method does 2 things:
1/ Mutate `bufferedRequestEntries`
2/ Return the created batch.
This means that we need to make clear expectations and guarantees around:
1/ **Concurrent calls / thread safety.** We need to make sure that a new
instance of `bufferWrapper` is created per parallelism of async sink. AND that
for each parallelism, `batchCreator.createNextBatch()` is not called
concurrently (otherwise implementors need to ensure thread safe calling. We
enforce item (2) at the moment because we call this as part of the `flush()`,
which runs on main Flink thread. We should make this clear on the interface.
2/ Error handling. Users need to ensure that the interaction between both
implementations handle the errors well (don't pull from buffer, but not put in
batch).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]