This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit fc57f0daa4227fe4f7a5136208d00cb36e2af802 Author: Zakelly <zakelly....@gmail.com> AuthorDate: Tue Jan 14 18:32:45 2025 +0800 [FLINK-37017][Runtime] Drain in best efforts when allowing overdraft --- .../asyncprocessing/AsyncExecutionController.java | 69 ++++++++++++++++------ 1 file changed, 52 insertions(+), 17 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java index e5414695545..9afe066433a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java @@ -379,19 +379,20 @@ public class AsyncExecutionController<K> implements StateRequestHandler, Closeab * * @param force whether to trigger requests in force. */ - public void triggerIfNeeded(boolean force) { + public boolean triggerIfNeeded(boolean force) { if (!force && stateRequestsBuffer.activeQueueSize() < batchSize) { - return; + return false; } Optional<StateRequestContainer> toRun = stateRequestsBuffer.popActive( batchSize, () -> stateExecutor.createStateRequestContainer()); if (!toRun.isPresent() || toRun.get().isEmpty()) { - return; + return false; } stateExecutor.executeBatchRequests(toRun.get()); stateRequestsBuffer.advanceSeq(); + return true; } /** @@ -411,17 +412,19 @@ public class AsyncExecutionController<K> implements StateRequestHandler, Closeab // 2. If the state request is for a newly entered record, the in-flight record number should // be less than the max in-flight record number. // Note: the currentContext may be updated by {@code StateFutureFactory#build}. - if (!allowOverdraft) { - // We allow a derived request by another request (by initializing a process directly via - // #asyncProcessWithKey, or timer triggering right after a record processing), meaning - // that we are in middle of another processing and creating a new one here. If we block - // here, there might be a deadlock (current processing waiting here to drain the current - // processing, this is a rare case when all the records share the same key). - // This probably cause the number of records actually run to be greater than the limit. - // But overall it is under-control since there should not be many derived requests - // within each request. - drainInflightRecords(maxInFlightRecordNum); - } + + // We allow a derived request by another request (by initializing a process directly via + // #asyncProcessWithKey, or timer triggering right after a record processing), meaning + // that we are in middle of another processing and creating a new one here. If we block + // here, there might be a deadlock (current processing waiting here to drain the current + // processing, this is a rare case when all the records share the same key). + // This probably cause the number of records actually run to be greater than the limit. + // But overall it is under-control since there should not be many derived requests + // within each request. + // We perform a drain to keep the buffer limit. But when allowing overdraft, we won't wait + // here. + drainInflightRecords(maxInFlightRecordNum, !allowOverdraft); + // 3. Ensure the currentContext is restored. setCurrentContext(storedContext); inFlightRecordNum.incrementAndGet(); @@ -447,15 +450,47 @@ public class AsyncExecutionController<K> implements StateRequestHandler, Closeab * @param targetNum the target {@link #inFlightRecordNum} to achieve. */ public void drainInflightRecords(int targetNum) { + drainInflightRecords(targetNum, true); + } + + /** + * A helper function to drain in-flight records util {@link #inFlightRecordNum} within the limit + * of given {@code targetNum}. + * + * @param targetNum the target {@link #inFlightRecordNum} to achieve. + * @param forceToWait whether to force to wait until the target number is reached. If false, + * only drain in best efforts and return when no progress is made. + */ + private void drainInflightRecords(int targetNum, boolean forceToWait) { try { - while (inFlightRecordNum.get() > targetNum) { + boolean shouldWait = true; + while (shouldWait && inFlightRecordNum.get() > targetNum) { if (!mailboxExecutor.tryYield()) { + boolean triggered = false; // We force trigger the buffer if targetNum == 0 (for draining) or the state // executor is not fully loaded. if (targetNum == 0 || !stateExecutor.fullyLoaded()) { - triggerIfNeeded(true); + triggered = triggerIfNeeded(true); + } + if (!forceToWait + && !triggered + && !stateExecutor.fullyLoaded() + && !callbackRunner.isHasMail()) { + // Decision of waiting is based on whether we are making progress of state + // accessing (or if there is a deadlock). Based on the following factors: + // 1. We failed triggered some state requests. AND + // 2. The state executor is not fully loaded, meaning that state is not + // being accessed. AND + // 3. There is no new mail, meaning that the mailbox has no callbacks ready. + // + // We cannot make progress anywhere, then there probably is a deadlock. We'd + // better give up waiting. + shouldWait = false; + // What if we force to wait here but cannot make any progress? + // There must be a bug. TODO: Print necessary debug info in this case. + } else { + waitForNewMails(); } - waitForNewMails(); } } } catch (InterruptedException ignored) {