This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e9d9cb2278e65c1e0291c4460ba156beca3f1b8e
Author: Zakelly <zakelly....@gmail.com>
AuthorDate: Mon Jan 13 20:07:16 2025 +0800

    [FLINK-37017][Runtime] Allow buffer overdraft in async state processing
---
 .../asyncprocessing/AsyncExecutionController.java  | 58 ++++++++++++++--------
 .../AbstractAsyncStateStreamOperator.java          |  4 +-
 .../AbstractAsyncStateStreamOperatorV2.java        |  4 +-
 .../operators/InternalTimerServiceAsyncImpl.java   |  9 ++--
 .../AsyncExecutionControllerTest.java              | 41 ++++++++++++++-
 5 files changed, 85 insertions(+), 31 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
index b253c236dca..e5414695545 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
@@ -40,7 +40,6 @@ import javax.annotation.Nullable;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -300,6 +299,24 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler, Closeab
     @Override
     public <IN, OUT> InternalStateFuture<OUT> handleRequest(
             @Nullable State state, StateRequestType type, @Nullable IN 
payload) {
+        return handleRequest(state, type, payload, false);
+    }
+
+    /**
+     * Submit a {@link StateRequest} to this AsyncExecutionController and 
trigger it if needed.
+     *
+     * @param state the state to request. Could be {@code null} if the type is 
{@link
+     *     StateRequestType#SYNC_POINT}.
+     * @param type the type of this request.
+     * @param payload the payload input for this request.
+     * @param allowOverdraft whether to allow overdraft.
+     * @return the state future.
+     */
+    public <IN, OUT> InternalStateFuture<OUT> handleRequest(
+            @Nullable State state,
+            StateRequestType type,
+            @Nullable IN payload,
+            boolean allowOverdraft) {
         // Step 1: build state future & assign context.
         InternalStateFuture<OUT> stateFuture = 
stateFutureFactory.create(currentContext);
         StateRequest<K, ?, IN, OUT> request =
@@ -307,7 +324,7 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler, Closeab
 
         // Step 2: try to seize the capacity, if the current in-flight records 
exceeds the limit,
         // block the current state request from entering until some buffered 
requests are processed.
-        seizeCapacity();
+        seizeCapacity(allowOverdraft);
 
         // Step 3: try to occupy the key and place it into right buffer.
         if (tryOccupyKey(currentContext)) {
@@ -377,7 +394,13 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler, Closeab
         stateRequestsBuffer.advanceSeq();
     }
 
-    private void seizeCapacity() {
+    /**
+     * Seize capacity from the in-flight request limit. Will drain if reach 
the limit.
+     *
+     * @param allowOverdraft whether to allow overdraft. If true, it won't 
drain the in-flight
+     *     requests even though it reaches the limit.
+     */
+    private void seizeCapacity(boolean allowOverdraft) {
         // 1. Check if the record is already in buffer. If yes, this indicates 
that it is a state
         // request resulting from a callback statement, otherwise, it 
signifies the initial state
         // request for a newly entered record.
@@ -388,14 +411,13 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler, Closeab
         // 2. If the state request is for a newly entered record, the 
in-flight record number should
         // be less than the max in-flight record number.
         // Note: the currentContext may be updated by {@code 
StateFutureFactory#build}.
-        if (currentContext.getRecord() != RecordContext.EMPTY_RECORD) {
-            // We only drain the records when there is a real record or timer 
assigned.
-            // Typically, if it is an empty record, this is a derived request 
by another request (by
-            // initializing a process directly via #asyncProcessWithKey), 
meaning that we are in
-            // middle of another processing and creating a new one here. If we 
block here, there
-            // might be a deadlock (current processing waiting here to drain 
while draining the
-            // current processing).
-            // There probably cause the number of records actually run to be 
greater than the limit.
+        if (!allowOverdraft) {
+            // We allow a derived request by another request (by initializing 
a process directly via
+            // #asyncProcessWithKey, or timer triggering right after a record 
processing), meaning
+            // that we are in middle of another processing and creating a new 
one here. If we block
+            // here, there might be a deadlock (current processing waiting 
here to drain the current
+            // processing, this is a rare case when all the records share the 
same key).
+            // This probably cause the number of records actually run to be 
greater than the limit.
             // But overall it is under-control since there should not be many 
derived requests
             // within each request.
             drainInflightRecords(maxInFlightRecordNum);
@@ -410,9 +432,11 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler, Closeab
      * (once the record is not blocked).
      *
      * @param callback the callback to run if it finishes (once the record is 
not blocked).
+     * @param allowOverdraft whether to overdraft the in-flight buffer.
      */
-    public StateFuture<Void> 
syncPointRequestWithCallback(ThrowingRunnable<Exception> callback) {
-        return handleRequest(null, StateRequestType.SYNC_POINT, null)
+    public StateFuture<Void> syncPointRequestWithCallback(
+            ThrowingRunnable<Exception> callback, boolean allowOverdraft) {
+        return handleRequest(null, StateRequestType.SYNC_POINT, null, 
allowOverdraft)
                 .thenAccept(v -> callback.run());
     }
 
@@ -440,14 +464,6 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler, Closeab
         }
     }
 
-    /** A helper function to drain in-flight requests emitted by timer. */
-    public void drainWithTimerIfNeeded(CompletableFuture<Void> timerFuture) {
-        if (epochParallelMode == ParallelMode.SERIAL_BETWEEN_EPOCH) {
-            drainInflightRecords(0);
-            Preconditions.checkState(timerFuture.isDone());
-        }
-    }
-
     /** Wait for new mails if there is no more mail. */
     private void waitForNewMails() throws InterruptedException {
         if (!callbackRunner.isHasMail()) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
index fa311f8f67f..797cfbad483 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
@@ -182,7 +182,7 @@ public abstract class AbstractAsyncStateStreamOperator<OUT> 
extends AbstractStre
     @Override
     @SuppressWarnings("unchecked")
     public final void 
preserveRecordOrderAndProcess(ThrowingRunnable<Exception> processing) {
-        asyncExecutionController.syncPointRequestWithCallback(processing);
+        asyncExecutionController.syncPointRequestWithCallback(processing, 
false);
     }
 
     @Override
@@ -195,7 +195,7 @@ public abstract class AbstractAsyncStateStreamOperator<OUT> 
extends AbstractStre
         asyncExecutionController.setCurrentContext(newContext);
         // Same logic as RECORD_ORDER, since FIRST_STATE_ORDER is problematic 
when the call's key
         // pass the same key in.
-        preserveRecordOrderAndProcess(processing);
+        asyncExecutionController.syncPointRequestWithCallback(processing, 
true);
         newContext.release();
 
         // switch to original context
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java
index 76973808d03..983d9f7794a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java
@@ -190,7 +190,7 @@ public abstract class 
AbstractAsyncStateStreamOperatorV2<OUT> extends AbstractSt
     @Override
     @SuppressWarnings("unchecked")
     public final void 
preserveRecordOrderAndProcess(ThrowingRunnable<Exception> processing) {
-        asyncExecutionController.syncPointRequestWithCallback(processing);
+        asyncExecutionController.syncPointRequestWithCallback(processing, 
false);
     }
 
     @Override
@@ -203,7 +203,7 @@ public abstract class 
AbstractAsyncStateStreamOperatorV2<OUT> extends AbstractSt
         asyncExecutionController.setCurrentContext(newContext);
         // Same logic as RECORD_ORDER, since FIRST_STATE_ORDER is problematic 
when the call's key
         // pass the same key in.
-        preserveRecordOrderAndProcess(processing);
+        asyncExecutionController.syncPointRequestWithCallback(processing, 
true);
         newContext.release();
 
         // switch to original context
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java
index 3ccfa8f2eda..f756d2d40eb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java
@@ -32,9 +32,9 @@ import org.apache.flink.util.function.ThrowingRunnable;
 
 /**
  * An implementation of {@link InternalTimerService} that is used by {@link
- * 
org.apache.flink.streaming.runtime.operators.asyncprocessing.AbstractAsyncStateStreamOperator}.
- * The timer service will set {@link RecordContext} for the timers before 
invoking action to
- * preserve the execution order between timer firing and records processing.
+ * 
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperator}.
 The timer
+ * service will set {@link RecordContext} for the timers before invoking 
action to preserve the
+ * execution order between timer firing and records processing.
  *
  * @see <a
  *     
href=https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-Timers>FLIP-425
@@ -130,7 +130,8 @@ public class InternalTimerServiceAsyncImpl<K, N> extends 
InternalTimerServiceImp
         recordCtx.retain();
         asyncExecutionController.setCurrentContext(recordCtx);
         keyContext.setCurrentKey(timer.getKey());
-        StateFuture<Void> future = 
asyncExecutionController.syncPointRequestWithCallback(runnable);
+        StateFuture<Void> future =
+                
asyncExecutionController.syncPointRequestWithCallback(runnable, true);
         recordCtx.release();
         return future;
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
index 0afe85ec5c1..64e78bd853e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
@@ -488,7 +488,7 @@ class AsyncExecutionControllerTest {
         RecordContext<String> recordContext = aec.buildContext("record", 
"key");
         aec.setCurrentContext(recordContext);
         recordContext.retain();
-        aec.syncPointRequestWithCallback(counter::incrementAndGet);
+        aec.syncPointRequestWithCallback(counter::incrementAndGet, false);
         assertThat(counter.get()).isEqualTo(1);
         assertThat(recordContext.getReferenceCount()).isEqualTo(1);
         assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
@@ -505,7 +505,7 @@ class AsyncExecutionControllerTest {
 
         RecordContext<String> recordContext2 = aec.buildContext("record2", 
"occupied");
         aec.setCurrentContext(recordContext2);
-        aec.syncPointRequestWithCallback(counter::incrementAndGet);
+        aec.syncPointRequestWithCallback(counter::incrementAndGet, false);
         recordContext2.retain();
         assertThat(counter.get()).isEqualTo(0);
         assertThat(recordContext2.getReferenceCount()).isGreaterThan(1);
@@ -526,6 +526,43 @@ class AsyncExecutionControllerTest {
         resourceRegistry.close();
     }
 
+    @Test
+    public void testSyncPointWithOverdraft() throws IOException {
+        CloseableRegistry resourceRegistry = new CloseableRegistry();
+        setup(
+                1,
+                10000L,
+                1,
+                new SyncMailboxExecutor(),
+                new TestAsyncFrameworkExceptionHandler(),
+                resourceRegistry);
+        AtomicInteger counter = new AtomicInteger(0);
+
+        // Test the sync point processing with a key occupied.
+        RecordContext<String> recordContext1 = aec.buildContext("record1", 
"occupied");
+        aec.setCurrentContext(recordContext1);
+        // retain this to avoid the recordContext1 being released before the 
sync point
+        recordContext1.retain();
+        userCode.run();
+
+        RecordContext<String> recordContext2 = aec.buildContext("record2", 
"occupied");
+        aec.setCurrentContext(recordContext2);
+        aec.syncPointRequestWithCallback(counter::incrementAndGet, true);
+        recordContext2.retain();
+        assertThat(counter.get()).isEqualTo(0);
+        assertThat(recordContext2.getReferenceCount()).isGreaterThan(1);
+        assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
+        assertThat(aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1);
+        recordContext1.release();
+        assertThat(counter.get()).isEqualTo(1);
+        assertThat(recordContext2.getReferenceCount()).isEqualTo(1);
+        assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
+        assertThat(aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
+        recordContext2.release();
+
+        resourceRegistry.close();
+    }
+
     @Test
     void testBufferTimeout() throws Exception {
         int batchSize = 5;

Reply via email to