This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9da368d6cb163e0b90c5be680282885e47ac3647
Author: Zakelly <zakelly....@gmail.com>
AuthorDate: Wed Jan 15 23:23:31 2025 +0800

    [FLINK-37017][Runtime] Prevent stack overflow when draining in-flight 
records
    
    Closed #25968
---
 .../asyncprocessing/AsyncExecutionController.java  | 47 ++++++++++++++--------
 .../asyncprocessing/StateRequestBuffer.java        |  5 +--
 2 files changed, 33 insertions(+), 19 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
index 9afe066433a..e4b0f6f9c8e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
@@ -146,6 +146,9 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler, Closeab
     /** Flag indicating if this AEC is under waiting status. */
     private volatile boolean waitingMail = false;
 
+    /** The recursive depth of the drain process. */
+    private int drainDepth = 0;
+
     public AsyncExecutionController(
             MailboxExecutor mailboxExecutor,
             AsyncFrameworkExceptionHandler exceptionHandler,
@@ -244,10 +247,12 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler, Closeab
      * @param switchingContext the context to switch.
      */
     public void setCurrentContext(RecordContext<K> switchingContext) {
-        currentContext = switchingContext;
-        declarationManager.setCurrentContext(switchingContext);
-        if (switchContextListener != null) {
-            switchContextListener.switchContext(switchingContext);
+        if (currentContext != switchingContext) {
+            currentContext = switchingContext;
+            declarationManager.setCurrentContext(switchingContext);
+            if (switchContextListener != null) {
+                switchContextListener.switchContext(switchingContext);
+            }
         }
     }
 
@@ -264,10 +269,11 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler, Closeab
         epochManager.completeOneRecord(toDispose.getEpoch());
         keyAccountingUnit.release(toDispose.getRecord(), toDispose.getKey());
         inFlightRecordNum.decrementAndGet();
-        RecordContext<K> nextRecordCtx =
-                stateRequestsBuffer.tryActivateOneByKey(toDispose.getKey());
-        if (nextRecordCtx != null) {
-            Preconditions.checkState(tryOccupyKey(nextRecordCtx));
+        StateRequest<K, ?, ?, ?> nextRequest =
+                stateRequestsBuffer.unblockOneByKey(toDispose.getKey());
+        if (nextRequest != null) {
+            
Preconditions.checkState(tryOccupyKey(nextRequest.getRecordContext()));
+            insertActiveBuffer(nextRequest);
         }
     }
 
@@ -408,7 +414,6 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler, Closeab
         if (currentContext.isKeyOccupied()) {
             return;
         }
-        RecordContext<K> storedContext = currentContext;
         // 2. If the state request is for a newly entered record, the 
in-flight record number should
         // be less than the max in-flight record number.
         // Note: the currentContext may be updated by {@code 
StateFutureFactory#build}.
@@ -424,9 +429,6 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler, Closeab
         // We perform a drain to keep the buffer limit. But when allowing 
overdraft, we won't wait
         // here.
         drainInflightRecords(maxInFlightRecordNum, !allowOverdraft);
-
-        // 3. Ensure the currentContext is restored.
-        setCurrentContext(storedContext);
         inFlightRecordNum.incrementAndGet();
     }
 
@@ -462,6 +464,15 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler, Closeab
      *     only drain in best efforts and return when no progress is made.
      */
     private void drainInflightRecords(int targetNum, boolean forceToWait) {
+        if (!forceToWait && drainDepth > 0) {
+            // We don't allow recursive call of drain if we are not forced to 
wait here.
+            // This is to avoid stack overflow, since the yield will pick up 
another processing,
+            // which may cause another drain.
+            return;
+        }
+        // Store the current context, which might be switched below.
+        RecordContext<K> storedContext = currentContext;
+        drainDepth++;
         try {
             boolean shouldWait = true;
             while (shouldWait && inFlightRecordNum.get() > targetNum) {
@@ -496,6 +507,10 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler, Closeab
         } catch (InterruptedException ignored) {
             // ignore the interrupted exception to avoid throwing fatal error 
when the task cancel
             // or exit.
+        } finally {
+            drainDepth--;
+            // Restore the previously stored context.
+            setCurrentContext(storedContext);
         }
     }
 
@@ -533,9 +548,9 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler, Closeab
                             try {
                                 // We clear the current context since this is 
a non-record context.
                                 RecordContext<K> previousContext = 
currentContext;
-                                currentContext = null;
+                                setCurrentContext(null);
                                 triggerAction.run();
-                                currentContext = previousContext;
+                                setCurrentContext(previousContext);
                             } catch (Exception e) {
                                 exceptionHandler.handleException(
                                         "Failed to process non-record.", e);
@@ -546,9 +561,9 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler, Closeab
                         : () -> {
                             try {
                                 RecordContext<K> previousContext = 
currentContext;
-                                currentContext = null;
+                                setCurrentContext(null);
                                 finalAction.run();
-                                currentContext = previousContext;
+                                setCurrentContext(previousContext);
                             } catch (Exception e) {
                                 exceptionHandler.handleException(
                                         "Failed to process non-record.", e);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java
index 2496aa7a74b..bcaf1f24501 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java
@@ -201,18 +201,17 @@ public class StateRequestBuffer<K> implements Closeable {
      * @return The first record context with the same key in blocking queue, 
null if no such record.
      */
     @Nullable
-    RecordContext<K> tryActivateOneByKey(K key) {
+    StateRequest<K, ?, ?, ?> unblockOneByKey(K key) {
         if (!blockingQueue.containsKey(key)) {
             return null;
         }
 
         StateRequest<K, ?, ?, ?> stateRequest = 
blockingQueue.get(key).removeFirst();
-        enqueueToActive(stateRequest);
         if (blockingQueue.get(key).isEmpty()) {
             blockingQueue.remove(key);
         }
         blockingQueueSize--;
-        return stateRequest.getRecordContext();
+        return stateRequest;
     }
 
     /**

Reply via email to