pnowojski commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r814570782



##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -290,17 +290,33 @@ private void registerCallback() {
     @Override
     public void write(InputT element, Context context) throws IOException, 
InterruptedException {
         while (bufferedRequestEntries.size() >= maxBufferedRequests) {
-            mailboxExecutor.tryYield();
+            flush();
         }
 
         addEntryToBuffer(elementConverter.apply(element, context), false);
 
-        flushIfAble();
+        nonBlockingFlush();
     }
 
-    private void flushIfAble() {
-        while (bufferedRequestEntries.size() >= maxBatchSize
-                || bufferedRequestEntriesTotalSizeInBytes >= 
maxBatchSizeInBytes) {
+    /**
+     * Determines if a call to flush will be non-blocking (i.e. {@code 
inFlightRequestsCount} is
+     * strictly smaller than {@code maxInFlightRequests}). Also requires one 
of the following
+     * requirements to be met:
+     *
+     * <ul>
+     *   <li>The number of elements buffered is greater than or equal to the 
{@code maxBatchSize}
+     *   <li>The sum of the size in bytes of all records in the buffer is 
greater than or equal to
+     *       {@code maxBatchSizeInBytes}
+     * </ul>
+     */
+    private void nonBlockingFlush() {
+        boolean uncompletedInFlightResponses = true;
+        while (uncompletedInFlightResponses) {
+            uncompletedInFlightResponses = mailboxExecutor.tryYield();
+        }

Review comment:
       There is something like that, but not on the operator level. It's on the 
task level. Task thread is (apart of initialisation and closing sequences) 
executing always `StreamTask#runMailboxLoop`. Which is effectively:
   ```
           while (isNextLoopPossible()) {
               // The blocking `processMail` call will not return until default 
action is available.
               processMail(localMailbox, false);
               if (isNextLoopPossible()) {
                   mailboxDefaultAction.runDefaultAction(
                           defaultActionContext); // lock is acquired inside 
default action as needed
               }
           }
   ```
   And `mailboxDefaultAction.runDefaultAction(...)` equates to 
`StreamTask#processInput`. StreamTask#processInput` polls a single record from 
the input (source/network) and pushes it through the `OperatorChain`, which at 
the end means `AsyncSinkWriter#write()`. So in other words, all known/existing 
mails are always executed before any record is processed.
   
   This is more or less standard single threaded async execution model.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to