CrynetLogistics commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r813051820



##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -290,17 +290,33 @@ private void registerCallback() {
     @Override
     public void write(InputT element, Context context) throws IOException, 
InterruptedException {
         while (bufferedRequestEntries.size() >= maxBufferedRequests) {
-            mailboxExecutor.tryYield();
+            flush();
         }
 
         addEntryToBuffer(elementConverter.apply(element, context), false);
 
-        flushIfAble();
+        nonBlockingFlush();
     }
 
-    private void flushIfAble() {
-        while (bufferedRequestEntries.size() >= maxBatchSize
-                || bufferedRequestEntriesTotalSizeInBytes >= 
maxBatchSizeInBytes) {
+    /**
+     * Determines if a call to flush will be non-blocking (i.e. {@code 
inFlightRequestsCount} is
+     * strictly smaller than {@code maxInFlightRequests}). Also requires one 
of the following
+     * requirements to be met:
+     *
+     * <ul>
+     *   <li>The number of elements buffered is greater than or equal to the 
{@code maxBatchSize}
+     *   <li>The sum of the size in bytes of all records in the buffer is 
greater than or equal to
+     *       {@code maxBatchSizeInBytes}
+     * </ul>
+     */
+    private void nonBlockingFlush() {
+        boolean uncompletedInFlightResponses = true;
+        while (uncompletedInFlightResponses) {
+            uncompletedInFlightResponses = mailboxExecutor.tryYield();
+        }

Review comment:
       6. To that end, these lines simply completes any in flight requests that 
have completed asynchronously and called the `completeRequest` method, thereby 
reducing the in flight request count. This loop will not block and will return 
immediately if there are none to be completed. 
   
   7. This then allows the next method to accurately determine whether the call 
to flush will be blocking. If it will not be blocking, we benefit from:
    * eagerly flushing a batch of records that have reached our desired size 
(by number or by bytes), and,
    * failed elements will be retried sooner than otherwise due to eager 
completion of in flight requests.
   If it will be blocking, we benefit from:
    * returning immediately and not calling flush - and therefore not 
unnecessarily blocking because the element has been already added to the buffer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to