This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fc57f0daa4227fe4f7a5136208d00cb36e2af802
Author: Zakelly <zakelly....@gmail.com>
AuthorDate: Tue Jan 14 18:32:45 2025 +0800

    [FLINK-37017][Runtime] Drain in best efforts when allowing overdraft
---
 .../asyncprocessing/AsyncExecutionController.java  | 69 ++++++++++++++++------
 1 file changed, 52 insertions(+), 17 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
index e5414695545..9afe066433a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
@@ -379,19 +379,20 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler, Closeab
      *
      * @param force whether to trigger requests in force.
      */
-    public void triggerIfNeeded(boolean force) {
+    public boolean triggerIfNeeded(boolean force) {
         if (!force && stateRequestsBuffer.activeQueueSize() < batchSize) {
-            return;
+            return false;
         }
 
         Optional<StateRequestContainer> toRun =
                 stateRequestsBuffer.popActive(
                         batchSize, () -> 
stateExecutor.createStateRequestContainer());
         if (!toRun.isPresent() || toRun.get().isEmpty()) {
-            return;
+            return false;
         }
         stateExecutor.executeBatchRequests(toRun.get());
         stateRequestsBuffer.advanceSeq();
+        return true;
     }
 
     /**
@@ -411,17 +412,19 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler, Closeab
         // 2. If the state request is for a newly entered record, the 
in-flight record number should
         // be less than the max in-flight record number.
         // Note: the currentContext may be updated by {@code 
StateFutureFactory#build}.
-        if (!allowOverdraft) {
-            // We allow a derived request by another request (by initializing 
a process directly via
-            // #asyncProcessWithKey, or timer triggering right after a record 
processing), meaning
-            // that we are in middle of another processing and creating a new 
one here. If we block
-            // here, there might be a deadlock (current processing waiting 
here to drain the current
-            // processing, this is a rare case when all the records share the 
same key).
-            // This probably cause the number of records actually run to be 
greater than the limit.
-            // But overall it is under-control since there should not be many 
derived requests
-            // within each request.
-            drainInflightRecords(maxInFlightRecordNum);
-        }
+
+        // We allow a derived request by another request (by initializing a 
process directly via
+        // #asyncProcessWithKey, or timer triggering right after a record 
processing), meaning
+        // that we are in middle of another processing and creating a new one 
here. If we block
+        // here, there might be a deadlock (current processing waiting here to 
drain the current
+        // processing, this is a rare case when all the records share the same 
key).
+        // This probably cause the number of records actually run to be 
greater than the limit.
+        // But overall it is under-control since there should not be many 
derived requests
+        // within each request.
+        // We perform a drain to keep the buffer limit. But when allowing 
overdraft, we won't wait
+        // here.
+        drainInflightRecords(maxInFlightRecordNum, !allowOverdraft);
+
         // 3. Ensure the currentContext is restored.
         setCurrentContext(storedContext);
         inFlightRecordNum.incrementAndGet();
@@ -447,15 +450,47 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler, Closeab
      * @param targetNum the target {@link #inFlightRecordNum} to achieve.
      */
     public void drainInflightRecords(int targetNum) {
+        drainInflightRecords(targetNum, true);
+    }
+
+    /**
+     * A helper function to drain in-flight records util {@link 
#inFlightRecordNum} within the limit
+     * of given {@code targetNum}.
+     *
+     * @param targetNum the target {@link #inFlightRecordNum} to achieve.
+     * @param forceToWait whether to force to wait until the target number is 
reached. If false,
+     *     only drain in best efforts and return when no progress is made.
+     */
+    private void drainInflightRecords(int targetNum, boolean forceToWait) {
         try {
-            while (inFlightRecordNum.get() > targetNum) {
+            boolean shouldWait = true;
+            while (shouldWait && inFlightRecordNum.get() > targetNum) {
                 if (!mailboxExecutor.tryYield()) {
+                    boolean triggered = false;
                     // We force trigger the buffer if targetNum == 0 (for 
draining) or the state
                     // executor is not fully loaded.
                     if (targetNum == 0 || !stateExecutor.fullyLoaded()) {
-                        triggerIfNeeded(true);
+                        triggered = triggerIfNeeded(true);
+                    }
+                    if (!forceToWait
+                            && !triggered
+                            && !stateExecutor.fullyLoaded()
+                            && !callbackRunner.isHasMail()) {
+                        // Decision of waiting is based on whether we are 
making progress of state
+                        // accessing (or if there is a deadlock). Based on the 
following factors:
+                        // 1. We failed triggered some state requests. AND
+                        // 2. The state executor is not fully loaded, meaning 
that state is not
+                        // being accessed. AND
+                        // 3. There is no new mail, meaning that the mailbox 
has no callbacks ready.
+                        //
+                        // We cannot make progress anywhere, then there 
probably is a deadlock. We'd
+                        // better give up waiting.
+                        shouldWait = false;
+                        // What if we force to wait here but cannot make any 
progress?
+                        // There must be a bug. TODO: Print necessary debug 
info in this case.
+                    } else {
+                        waitForNewMails();
                     }
-                    waitForNewMails();
                 }
             }
         } catch (InterruptedException ignored) {

Reply via email to