This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9da368d6cb163e0b90c5be680282885e47ac3647 Author: Zakelly <zakelly....@gmail.com> AuthorDate: Wed Jan 15 23:23:31 2025 +0800 [FLINK-37017][Runtime] Prevent stack overflow when draining in-flight records Closed #25968 --- .../asyncprocessing/AsyncExecutionController.java | 47 ++++++++++++++-------- .../asyncprocessing/StateRequestBuffer.java | 5 +-- 2 files changed, 33 insertions(+), 19 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java index 9afe066433a..e4b0f6f9c8e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java @@ -146,6 +146,9 @@ public class AsyncExecutionController<K> implements StateRequestHandler, Closeab /** Flag indicating if this AEC is under waiting status. */ private volatile boolean waitingMail = false; + /** The recursive depth of the drain process. */ + private int drainDepth = 0; + public AsyncExecutionController( MailboxExecutor mailboxExecutor, AsyncFrameworkExceptionHandler exceptionHandler, @@ -244,10 +247,12 @@ public class AsyncExecutionController<K> implements StateRequestHandler, Closeab * @param switchingContext the context to switch. */ public void setCurrentContext(RecordContext<K> switchingContext) { - currentContext = switchingContext; - declarationManager.setCurrentContext(switchingContext); - if (switchContextListener != null) { - switchContextListener.switchContext(switchingContext); + if (currentContext != switchingContext) { + currentContext = switchingContext; + declarationManager.setCurrentContext(switchingContext); + if (switchContextListener != null) { + switchContextListener.switchContext(switchingContext); + } } } @@ -264,10 +269,11 @@ public class AsyncExecutionController<K> implements StateRequestHandler, Closeab epochManager.completeOneRecord(toDispose.getEpoch()); keyAccountingUnit.release(toDispose.getRecord(), toDispose.getKey()); inFlightRecordNum.decrementAndGet(); - RecordContext<K> nextRecordCtx = - stateRequestsBuffer.tryActivateOneByKey(toDispose.getKey()); - if (nextRecordCtx != null) { - Preconditions.checkState(tryOccupyKey(nextRecordCtx)); + StateRequest<K, ?, ?, ?> nextRequest = + stateRequestsBuffer.unblockOneByKey(toDispose.getKey()); + if (nextRequest != null) { + Preconditions.checkState(tryOccupyKey(nextRequest.getRecordContext())); + insertActiveBuffer(nextRequest); } } @@ -408,7 +414,6 @@ public class AsyncExecutionController<K> implements StateRequestHandler, Closeab if (currentContext.isKeyOccupied()) { return; } - RecordContext<K> storedContext = currentContext; // 2. If the state request is for a newly entered record, the in-flight record number should // be less than the max in-flight record number. // Note: the currentContext may be updated by {@code StateFutureFactory#build}. @@ -424,9 +429,6 @@ public class AsyncExecutionController<K> implements StateRequestHandler, Closeab // We perform a drain to keep the buffer limit. But when allowing overdraft, we won't wait // here. drainInflightRecords(maxInFlightRecordNum, !allowOverdraft); - - // 3. Ensure the currentContext is restored. - setCurrentContext(storedContext); inFlightRecordNum.incrementAndGet(); } @@ -462,6 +464,15 @@ public class AsyncExecutionController<K> implements StateRequestHandler, Closeab * only drain in best efforts and return when no progress is made. */ private void drainInflightRecords(int targetNum, boolean forceToWait) { + if (!forceToWait && drainDepth > 0) { + // We don't allow recursive call of drain if we are not forced to wait here. + // This is to avoid stack overflow, since the yield will pick up another processing, + // which may cause another drain. + return; + } + // Store the current context, which might be switched below. + RecordContext<K> storedContext = currentContext; + drainDepth++; try { boolean shouldWait = true; while (shouldWait && inFlightRecordNum.get() > targetNum) { @@ -496,6 +507,10 @@ public class AsyncExecutionController<K> implements StateRequestHandler, Closeab } catch (InterruptedException ignored) { // ignore the interrupted exception to avoid throwing fatal error when the task cancel // or exit. + } finally { + drainDepth--; + // Restore the previously stored context. + setCurrentContext(storedContext); } } @@ -533,9 +548,9 @@ public class AsyncExecutionController<K> implements StateRequestHandler, Closeab try { // We clear the current context since this is a non-record context. RecordContext<K> previousContext = currentContext; - currentContext = null; + setCurrentContext(null); triggerAction.run(); - currentContext = previousContext; + setCurrentContext(previousContext); } catch (Exception e) { exceptionHandler.handleException( "Failed to process non-record.", e); @@ -546,9 +561,9 @@ public class AsyncExecutionController<K> implements StateRequestHandler, Closeab : () -> { try { RecordContext<K> previousContext = currentContext; - currentContext = null; + setCurrentContext(null); finalAction.run(); - currentContext = previousContext; + setCurrentContext(previousContext); } catch (Exception e) { exceptionHandler.handleException( "Failed to process non-record.", e); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java index 2496aa7a74b..bcaf1f24501 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java @@ -201,18 +201,17 @@ public class StateRequestBuffer<K> implements Closeable { * @return The first record context with the same key in blocking queue, null if no such record. */ @Nullable - RecordContext<K> tryActivateOneByKey(K key) { + StateRequest<K, ?, ?, ?> unblockOneByKey(K key) { if (!blockingQueue.containsKey(key)) { return null; } StateRequest<K, ?, ?, ?> stateRequest = blockingQueue.get(key).removeFirst(); - enqueueToActive(stateRequest); if (blockingQueue.get(key).isEmpty()) { blockingQueue.remove(key); } blockingQueueSize--; - return stateRequest.getRecordContext(); + return stateRequest; } /**