CrynetLogistics commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r813056834



##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -290,17 +290,33 @@ private void registerCallback() {
     @Override
     public void write(InputT element, Context context) throws IOException, 
InterruptedException {
         while (bufferedRequestEntries.size() >= maxBufferedRequests) {
-            mailboxExecutor.tryYield();
+            flush();

Review comment:
       8. Note, this is the only time in the write method that we call flush 
without checking whether it will block or not. This is precisely when our 
buffer is full. In this case, we have no choice but to do the only action that 
has a chance of reducing the number of elements in the buffer - which is to 
flush. If there are free in flight requests then flush will be non blocking and 
complete after firing off that in flight request. If there are NO free in 
flight requests, then this call will block until the number of in flight 
requests is less than the allowed number.
   
   Therefore an important corollary appears: "calls to write will block if and 
only if the buffer is full and the number of in flight requests is equal to or 
greater than the maximum allowed".




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to