CrynetLogistics commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r813875078



##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -290,17 +290,33 @@ private void registerCallback() {
     @Override
     public void write(InputT element, Context context) throws IOException, 
InterruptedException {
         while (bufferedRequestEntries.size() >= maxBufferedRequests) {
-            mailboxExecutor.tryYield();
+            flush();
         }
 
         addEntryToBuffer(elementConverter.apply(element, context), false);
 
-        flushIfAble();
+        nonBlockingFlush();
     }
 
-    private void flushIfAble() {
-        while (bufferedRequestEntries.size() >= maxBatchSize
-                || bufferedRequestEntriesTotalSizeInBytes >= 
maxBatchSizeInBytes) {
+    /**
+     * Determines if a call to flush will be non-blocking (i.e. {@code 
inFlightRequestsCount} is
+     * strictly smaller than {@code maxInFlightRequests}). Also requires one 
of the following
+     * requirements to be met:
+     *
+     * <ul>
+     *   <li>The number of elements buffered is greater than or equal to the 
{@code maxBatchSize}
+     *   <li>The sum of the size in bytes of all records in the buffer is 
greater than or equal to
+     *       {@code maxBatchSizeInBytes}
+     * </ul>
+     */
+    private void nonBlockingFlush() {
+        boolean uncompletedInFlightResponses = true;
+        while (uncompletedInFlightResponses) {
+            uncompletedInFlightResponses = mailboxExecutor.tryYield();
+        }

Review comment:
       I think this is completely wonderful. I believe then the code for 
`completeRequest()` should be:
   
   ```
       private void completeRequest(List<RequestEntryT> failedRequestEntries, 
long requestStartTime) {
           // do completeRequest stuff including reducing the 
inFlightRequestsCount, etc.
           mailboxExecutor.tryYield();
           nonBlockingFlush();
       }
   ```
   
   We would get all the previously alluded to benefits.
   
   My only question (and maybe worry) would be if we had a very large number of 
in flight requests that have all completed (say 100+? since it's customisable 
by user). Once one `completeRequest` is triggered, the 
`mailboxExecutor.tryYield()` would repeatedly yield to the next completed 
request in a daisy chain of length equal to the number of completed in flight 
requests. 
   
   I imagine the state of the mailbox thread would have to live alongside the 
others' states during that time and I was wondering would that risk us using 
more stack/heap than otherwise and thereby risking an overflow?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to