CrynetLogistics commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r813056834
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -290,17 +290,33 @@ private void registerCallback() {
@Override
public void write(InputT element, Context context) throws IOException,
InterruptedException {
while (bufferedRequestEntries.size() >= maxBufferedRequests) {
- mailboxExecutor.tryYield();
+ flush();
Review comment:
8. Note, this is the only time in the write method that we call flush
without checking whether it will block or not. This is precisely when our
buffer is full. In this case, we have no choice but to do the only action that
has a chance of reducing the number of elements in the buffer - which is to
flush. If there are free in flight requests then flush will be non blocking and
complete after firing off that in flight request. If there are NO free in
flight requests, then this call will block until the number of in flight
requests is less than the allowed number.
Therefore an important corollary appears: "calls to write will block if and
only if the buffer is full and the number of in flight requests is equal to or
greater than the maximum allowed".
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]