pnowojski commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r813807419



##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -290,17 +290,33 @@ private void registerCallback() {
     @Override
     public void write(InputT element, Context context) throws IOException, 
InterruptedException {
         while (bufferedRequestEntries.size() >= maxBufferedRequests) {
-            mailboxExecutor.tryYield();
+            flush();
         }
 
         addEntryToBuffer(elementConverter.apply(element, context), false);
 
-        flushIfAble();
+        nonBlockingFlush();
     }
 
-    private void flushIfAble() {
-        while (bufferedRequestEntries.size() >= maxBatchSize
-                || bufferedRequestEntriesTotalSizeInBytes >= 
maxBatchSizeInBytes) {
+    /**
+     * Determines if a call to flush will be non-blocking (i.e. {@code 
inFlightRequestsCount} is
+     * strictly smaller than {@code maxInFlightRequests}). Also requires one 
of the following
+     * requirements to be met:
+     *
+     * <ul>
+     *   <li>The number of elements buffered is greater than or equal to the 
{@code maxBatchSize}
+     *   <li>The sum of the size in bytes of all records in the buffer is 
greater than or equal to
+     *       {@code maxBatchSizeInBytes}
+     * </ul>
+     */
+    private void nonBlockingFlush() {
+        boolean uncompletedInFlightResponses = true;
+        while (uncompletedInFlightResponses) {
+            uncompletedInFlightResponses = mailboxExecutor.tryYield();
+        }
+        while (inFlightRequestsCount < maxInFlightRequests
+                && (bufferedRequestEntries.size() >= maxBatchSize
+                        || bufferedRequestEntriesTotalSizeInBytes >= 
maxBatchSizeInBytes)) {

Review comment:
       3. 4. and 5. they make sense to me 👍 
   
   However, can you extract the conditions to named methods and re-use those 
methods in all of the places?
   
   ```
   private boolean hasReachedMaxInFlightRequests() {
     return inFlightRequestsCount < maxInFlightRequests;
   }
   
   private boolean hasReachedMaxBufferedRequestsCount() {
     return bufferedRequestEntries.size() >= maxBufferedRequests;
   }
   
   private boolean hasReachedMaxBatchSize() {
     return (bufferedRequestEntriesTotalSizeInBytes >= maxBatchSizeInBytes || 
bufferedRequestEntriesTotalSizeInBytes >= maxBatchSizeInBytes);
   }
   ```
   

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -290,17 +290,33 @@ private void registerCallback() {
     @Override
     public void write(InputT element, Context context) throws IOException, 
InterruptedException {
         while (bufferedRequestEntries.size() >= maxBufferedRequests) {
-            mailboxExecutor.tryYield();
+            flush();
         }
 
         addEntryToBuffer(elementConverter.apply(element, context), false);
 
-        flushIfAble();
+        nonBlockingFlush();
     }
 
-    private void flushIfAble() {
-        while (bufferedRequestEntries.size() >= maxBatchSize
-                || bufferedRequestEntriesTotalSizeInBytes >= 
maxBatchSizeInBytes) {
+    /**
+     * Determines if a call to flush will be non-blocking (i.e. {@code 
inFlightRequestsCount} is
+     * strictly smaller than {@code maxInFlightRequests}). Also requires one 
of the following
+     * requirements to be met:
+     *
+     * <ul>
+     *   <li>The number of elements buffered is greater than or equal to the 
{@code maxBatchSize}
+     *   <li>The sum of the size in bytes of all records in the buffer is 
greater than or equal to
+     *       {@code maxBatchSizeInBytes}
+     * </ul>
+     */
+    private void nonBlockingFlush() {
+        boolean uncompletedInFlightResponses = true;
+        while (uncompletedInFlightResponses) {
+            uncompletedInFlightResponses = mailboxExecutor.tryYield();
+        }

Review comment:
       I would remove this loop. It adds complexity and a overhead of checking 
the mailbox queue, while it doesn't do much. Any mails if present, should have 
been executed before any `write()` call has been made. There is usually very 
very small time window for a race condition to happen that task thread decided 
to process a new record, but before we reached this place, some batch has 
completed. If this happens, we can always call `nonBlockingFlush()` method from 
the code that is executed at the end of completing a batch 
(`AsyncSinkWriter#completeRequest` ?) - which we should be doing regardless of 
anything else.
   
   For example what if we have just written a last record before a long pause. 
We haven't flushed that record, because the call would have been blocking, but 
it should have been flushed, because max buffered size in bytes has been 
reached. In this scenario after completing a batch, we should check if we 
should flush a new one.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to