This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0bfbef664fcd4aee3599db1297cb9796a3d19bee
Author: Zakelly <zakelly....@gmail.com>
AuthorDate: Fri Jan 10 13:59:43 2025 +0800

    [FLINK-37089][Runtime] Avoid deadlock draining for derived async processing
---
 .../asyncprocessing/AsyncExecutionController.java  | 13 +++-
 .../runtime/asyncprocessing/RecordContext.java     |  2 +-
 .../AbstractAsyncStateStreamOperatorTest.java      | 71 ++++++++++++++++++++++
 3 files changed, 84 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
index d351757a533..e9959c89e58 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
@@ -385,7 +385,18 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler, Closeab
         // 2. If the state request is for a newly entered record, the 
in-flight record number should
         // be less than the max in-flight record number.
         // Note: the currentContext may be updated by {@code 
StateFutureFactory#build}.
-        drainInflightRecords(maxInFlightRecordNum);
+        if (currentContext.getRecord() != RecordContext.EMPTY_RECORD) {
+            // We only drain the records when there is a real record or timer 
assigned.
+            // Typically, if it is an empty record, this is a derived request 
by another request (by
+            // initializing a process directly via #asyncProcessWithKey), 
meaning that we are in
+            // middle of another processing and creating a new one here. If we 
block here, there
+            // might be a deadlock (current processing waiting here to drain 
while draining the
+            // current processing).
+            // There probably cause the number of records actually run to be 
greater than the limit.
+            // But overall it is under-control since there should not be many 
derived requests
+            // within each request.
+            drainInflightRecords(maxInFlightRecordNum);
+        }
         // 3. Ensure the currentContext is restored.
         setCurrentContext(storedContext);
         inFlightRecordNum.incrementAndGet();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java
index 1341c8a6f33..40b18a060a6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java
@@ -38,7 +38,7 @@ import java.util.function.Consumer;
  * @param <K> The type of the key inside the record.
  */
 public class RecordContext<K> extends 
ReferenceCounted<RecordContext.DisposerRunner> {
-    /** The empty record for timer and non-record input usage. */
+    /** The empty record for non-record input usage. */
     static final Object EMPTY_RECORD = new Object();
 
     /** The record to be processed. */
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java
index 6ee19242011..c59038ab8d4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.asyncprocessing.operators;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
 import org.apache.flink.runtime.asyncprocessing.StateRequestType;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -49,6 +50,7 @@ import org.junit.jupiter.api.Test;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.flink.runtime.state.StateBackendTestUtils.buildAsyncStateBackend;
@@ -191,6 +193,39 @@ public class AbstractAsyncStateStreamOperatorTest {
         }
     }
 
+    @Test
+    void testManyAsyncProcessWithKey() throws Exception {
+        // This test is for verifying AsyncExecutionController could avoid 
deadlock for derived
+        // processing requests.
+        int requests = 
ExecutionOptions.ASYNC_INFLIGHT_RECORDS_LIMIT.defaultValue() + 1;
+        TestOperatorWithMultipleDirectAsyncProcess testOperator =
+                new 
TestOperatorWithMultipleDirectAsyncProcess(ElementOrder.RECORD_ORDER, requests);
+        AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, 
String>, String>
+                testHarness =
+                        AsyncKeyedOneInputStreamOperatorTestHarness.create(
+                                testOperator,
+                                new TestKeySelector(),
+                                BasicTypeInfo.INT_TYPE_INFO,
+                                128,
+                                1,
+                                0);
+        testHarness.setStateBackend(buildAsyncStateBackend(new 
HashMapStateBackend()));
+        try {
+            testHarness.open();
+            CompletableFuture<Void> future =
+                    testHarness.processElementInternal(new 
StreamRecord<>(Tuple2.of(5, "5")));
+
+            testHarness.drainStateRequests();
+            future.get(10000, TimeUnit.MILLISECONDS);
+            // If the AEC could avoid deadlock, there should not be any 
timeout exception.
+            testHarness.drainStateRequests();
+            testOperator.getLastProcessedFuture().get(10000, 
TimeUnit.MILLISECONDS);
+            assertThat(testOperator.getProcessed()).isEqualTo(requests);
+        } finally {
+            testHarness.close();
+        }
+    }
+
     @Test
     void testCheckpointDrain() throws Exception {
         try (AsyncKeyedOneInputStreamOperatorTestHarness<Integer, 
Tuple2<Integer, String>, String>
@@ -534,6 +569,42 @@ public class AbstractAsyncStateStreamOperatorTest {
         }
     }
 
+    private static class TestOperatorWithMultipleDirectAsyncProcess extends 
TestOperator {
+
+        private final int numAsyncProcesses;
+        private final CompletableFuture<Void> lastProcessedFuture = new 
CompletableFuture<>();
+
+        TestOperatorWithMultipleDirectAsyncProcess(
+                ElementOrder elementOrder, int numAsyncProcesses) {
+            super(elementOrder);
+            this.numAsyncProcesses = numAsyncProcesses;
+        }
+
+        @Override
+        public void processElement(StreamRecord<Tuple2<Integer, String>> 
element) throws Exception {
+            for (int i = 0; i < numAsyncProcesses; i++) {
+                if (i < numAsyncProcesses - 1) {
+                    asyncProcessWithKey(
+                            element.getValue().f0,
+                            () -> {
+                                processed.incrementAndGet();
+                            });
+                } else {
+                    asyncProcessWithKey(
+                            element.getValue().f0,
+                            () -> {
+                                processed.incrementAndGet();
+                                lastProcessedFuture.complete(null);
+                            });
+                }
+            }
+        }
+
+        public CompletableFuture<Void> getLastProcessedFuture() {
+            return lastProcessedFuture;
+        }
+    }
+
     private static class TestOperatorWithAsyncProcessTimer extends 
TestOperator {
 
         TestOperatorWithAsyncProcessTimer(ElementOrder elementOrder) {

Reply via email to