This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e9d9cb2278e65c1e0291c4460ba156beca3f1b8e Author: Zakelly <zakelly....@gmail.com> AuthorDate: Mon Jan 13 20:07:16 2025 +0800 [FLINK-37017][Runtime] Allow buffer overdraft in async state processing --- .../asyncprocessing/AsyncExecutionController.java | 58 ++++++++++++++-------- .../AbstractAsyncStateStreamOperator.java | 4 +- .../AbstractAsyncStateStreamOperatorV2.java | 4 +- .../operators/InternalTimerServiceAsyncImpl.java | 9 ++-- .../AsyncExecutionControllerTest.java | 41 ++++++++++++++- 5 files changed, 85 insertions(+), 31 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java index b253c236dca..e5414695545 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java @@ -40,7 +40,6 @@ import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; /** @@ -300,6 +299,24 @@ public class AsyncExecutionController<K> implements StateRequestHandler, Closeab @Override public <IN, OUT> InternalStateFuture<OUT> handleRequest( @Nullable State state, StateRequestType type, @Nullable IN payload) { + return handleRequest(state, type, payload, false); + } + + /** + * Submit a {@link StateRequest} to this AsyncExecutionController and trigger it if needed. + * + * @param state the state to request. Could be {@code null} if the type is {@link + * StateRequestType#SYNC_POINT}. + * @param type the type of this request. + * @param payload the payload input for this request. + * @param allowOverdraft whether to allow overdraft. + * @return the state future. + */ + public <IN, OUT> InternalStateFuture<OUT> handleRequest( + @Nullable State state, + StateRequestType type, + @Nullable IN payload, + boolean allowOverdraft) { // Step 1: build state future & assign context. InternalStateFuture<OUT> stateFuture = stateFutureFactory.create(currentContext); StateRequest<K, ?, IN, OUT> request = @@ -307,7 +324,7 @@ public class AsyncExecutionController<K> implements StateRequestHandler, Closeab // Step 2: try to seize the capacity, if the current in-flight records exceeds the limit, // block the current state request from entering until some buffered requests are processed. - seizeCapacity(); + seizeCapacity(allowOverdraft); // Step 3: try to occupy the key and place it into right buffer. if (tryOccupyKey(currentContext)) { @@ -377,7 +394,13 @@ public class AsyncExecutionController<K> implements StateRequestHandler, Closeab stateRequestsBuffer.advanceSeq(); } - private void seizeCapacity() { + /** + * Seize capacity from the in-flight request limit. Will drain if reach the limit. + * + * @param allowOverdraft whether to allow overdraft. If true, it won't drain the in-flight + * requests even though it reaches the limit. + */ + private void seizeCapacity(boolean allowOverdraft) { // 1. Check if the record is already in buffer. If yes, this indicates that it is a state // request resulting from a callback statement, otherwise, it signifies the initial state // request for a newly entered record. @@ -388,14 +411,13 @@ public class AsyncExecutionController<K> implements StateRequestHandler, Closeab // 2. If the state request is for a newly entered record, the in-flight record number should // be less than the max in-flight record number. // Note: the currentContext may be updated by {@code StateFutureFactory#build}. - if (currentContext.getRecord() != RecordContext.EMPTY_RECORD) { - // We only drain the records when there is a real record or timer assigned. - // Typically, if it is an empty record, this is a derived request by another request (by - // initializing a process directly via #asyncProcessWithKey), meaning that we are in - // middle of another processing and creating a new one here. If we block here, there - // might be a deadlock (current processing waiting here to drain while draining the - // current processing). - // There probably cause the number of records actually run to be greater than the limit. + if (!allowOverdraft) { + // We allow a derived request by another request (by initializing a process directly via + // #asyncProcessWithKey, or timer triggering right after a record processing), meaning + // that we are in middle of another processing and creating a new one here. If we block + // here, there might be a deadlock (current processing waiting here to drain the current + // processing, this is a rare case when all the records share the same key). + // This probably cause the number of records actually run to be greater than the limit. // But overall it is under-control since there should not be many derived requests // within each request. drainInflightRecords(maxInFlightRecordNum); @@ -410,9 +432,11 @@ public class AsyncExecutionController<K> implements StateRequestHandler, Closeab * (once the record is not blocked). * * @param callback the callback to run if it finishes (once the record is not blocked). + * @param allowOverdraft whether to overdraft the in-flight buffer. */ - public StateFuture<Void> syncPointRequestWithCallback(ThrowingRunnable<Exception> callback) { - return handleRequest(null, StateRequestType.SYNC_POINT, null) + public StateFuture<Void> syncPointRequestWithCallback( + ThrowingRunnable<Exception> callback, boolean allowOverdraft) { + return handleRequest(null, StateRequestType.SYNC_POINT, null, allowOverdraft) .thenAccept(v -> callback.run()); } @@ -440,14 +464,6 @@ public class AsyncExecutionController<K> implements StateRequestHandler, Closeab } } - /** A helper function to drain in-flight requests emitted by timer. */ - public void drainWithTimerIfNeeded(CompletableFuture<Void> timerFuture) { - if (epochParallelMode == ParallelMode.SERIAL_BETWEEN_EPOCH) { - drainInflightRecords(0); - Preconditions.checkState(timerFuture.isDone()); - } - } - /** Wait for new mails if there is no more mail. */ private void waitForNewMails() throws InterruptedException { if (!callbackRunner.isHasMail()) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java index fa311f8f67f..797cfbad483 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java @@ -182,7 +182,7 @@ public abstract class AbstractAsyncStateStreamOperator<OUT> extends AbstractStre @Override @SuppressWarnings("unchecked") public final void preserveRecordOrderAndProcess(ThrowingRunnable<Exception> processing) { - asyncExecutionController.syncPointRequestWithCallback(processing); + asyncExecutionController.syncPointRequestWithCallback(processing, false); } @Override @@ -195,7 +195,7 @@ public abstract class AbstractAsyncStateStreamOperator<OUT> extends AbstractStre asyncExecutionController.setCurrentContext(newContext); // Same logic as RECORD_ORDER, since FIRST_STATE_ORDER is problematic when the call's key // pass the same key in. - preserveRecordOrderAndProcess(processing); + asyncExecutionController.syncPointRequestWithCallback(processing, true); newContext.release(); // switch to original context diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java index 76973808d03..983d9f7794a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java @@ -190,7 +190,7 @@ public abstract class AbstractAsyncStateStreamOperatorV2<OUT> extends AbstractSt @Override @SuppressWarnings("unchecked") public final void preserveRecordOrderAndProcess(ThrowingRunnable<Exception> processing) { - asyncExecutionController.syncPointRequestWithCallback(processing); + asyncExecutionController.syncPointRequestWithCallback(processing, false); } @Override @@ -203,7 +203,7 @@ public abstract class AbstractAsyncStateStreamOperatorV2<OUT> extends AbstractSt asyncExecutionController.setCurrentContext(newContext); // Same logic as RECORD_ORDER, since FIRST_STATE_ORDER is problematic when the call's key // pass the same key in. - preserveRecordOrderAndProcess(processing); + asyncExecutionController.syncPointRequestWithCallback(processing, true); newContext.release(); // switch to original context diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java index 3ccfa8f2eda..f756d2d40eb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java @@ -32,9 +32,9 @@ import org.apache.flink.util.function.ThrowingRunnable; /** * An implementation of {@link InternalTimerService} that is used by {@link - * org.apache.flink.streaming.runtime.operators.asyncprocessing.AbstractAsyncStateStreamOperator}. - * The timer service will set {@link RecordContext} for the timers before invoking action to - * preserve the execution order between timer firing and records processing. + * org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperator}. The timer + * service will set {@link RecordContext} for the timers before invoking action to preserve the + * execution order between timer firing and records processing. * * @see <a * href=https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-Timers>FLIP-425 @@ -130,7 +130,8 @@ public class InternalTimerServiceAsyncImpl<K, N> extends InternalTimerServiceImp recordCtx.retain(); asyncExecutionController.setCurrentContext(recordCtx); keyContext.setCurrentKey(timer.getKey()); - StateFuture<Void> future = asyncExecutionController.syncPointRequestWithCallback(runnable); + StateFuture<Void> future = + asyncExecutionController.syncPointRequestWithCallback(runnable, true); recordCtx.release(); return future; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java index 0afe85ec5c1..64e78bd853e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java @@ -488,7 +488,7 @@ class AsyncExecutionControllerTest { RecordContext<String> recordContext = aec.buildContext("record", "key"); aec.setCurrentContext(recordContext); recordContext.retain(); - aec.syncPointRequestWithCallback(counter::incrementAndGet); + aec.syncPointRequestWithCallback(counter::incrementAndGet, false); assertThat(counter.get()).isEqualTo(1); assertThat(recordContext.getReferenceCount()).isEqualTo(1); assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0); @@ -505,7 +505,7 @@ class AsyncExecutionControllerTest { RecordContext<String> recordContext2 = aec.buildContext("record2", "occupied"); aec.setCurrentContext(recordContext2); - aec.syncPointRequestWithCallback(counter::incrementAndGet); + aec.syncPointRequestWithCallback(counter::incrementAndGet, false); recordContext2.retain(); assertThat(counter.get()).isEqualTo(0); assertThat(recordContext2.getReferenceCount()).isGreaterThan(1); @@ -526,6 +526,43 @@ class AsyncExecutionControllerTest { resourceRegistry.close(); } + @Test + public void testSyncPointWithOverdraft() throws IOException { + CloseableRegistry resourceRegistry = new CloseableRegistry(); + setup( + 1, + 10000L, + 1, + new SyncMailboxExecutor(), + new TestAsyncFrameworkExceptionHandler(), + resourceRegistry); + AtomicInteger counter = new AtomicInteger(0); + + // Test the sync point processing with a key occupied. + RecordContext<String> recordContext1 = aec.buildContext("record1", "occupied"); + aec.setCurrentContext(recordContext1); + // retain this to avoid the recordContext1 being released before the sync point + recordContext1.retain(); + userCode.run(); + + RecordContext<String> recordContext2 = aec.buildContext("record2", "occupied"); + aec.setCurrentContext(recordContext2); + aec.syncPointRequestWithCallback(counter::incrementAndGet, true); + recordContext2.retain(); + assertThat(counter.get()).isEqualTo(0); + assertThat(recordContext2.getReferenceCount()).isGreaterThan(1); + assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0); + assertThat(aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1); + recordContext1.release(); + assertThat(counter.get()).isEqualTo(1); + assertThat(recordContext2.getReferenceCount()).isEqualTo(1); + assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0); + assertThat(aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0); + recordContext2.release(); + + resourceRegistry.close(); + } + @Test void testBufferTimeout() throws Exception { int batchSize = 5;