This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0bfbef664fcd4aee3599db1297cb9796a3d19bee Author: Zakelly <zakelly....@gmail.com> AuthorDate: Fri Jan 10 13:59:43 2025 +0800 [FLINK-37089][Runtime] Avoid deadlock draining for derived async processing --- .../asyncprocessing/AsyncExecutionController.java | 13 +++- .../runtime/asyncprocessing/RecordContext.java | 2 +- .../AbstractAsyncStateStreamOperatorTest.java | 71 ++++++++++++++++++++++ 3 files changed, 84 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java index d351757a533..e9959c89e58 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java @@ -385,7 +385,18 @@ public class AsyncExecutionController<K> implements StateRequestHandler, Closeab // 2. If the state request is for a newly entered record, the in-flight record number should // be less than the max in-flight record number. // Note: the currentContext may be updated by {@code StateFutureFactory#build}. - drainInflightRecords(maxInFlightRecordNum); + if (currentContext.getRecord() != RecordContext.EMPTY_RECORD) { + // We only drain the records when there is a real record or timer assigned. + // Typically, if it is an empty record, this is a derived request by another request (by + // initializing a process directly via #asyncProcessWithKey), meaning that we are in + // middle of another processing and creating a new one here. If we block here, there + // might be a deadlock (current processing waiting here to drain while draining the + // current processing). + // There probably cause the number of records actually run to be greater than the limit. + // But overall it is under-control since there should not be many derived requests + // within each request. + drainInflightRecords(maxInFlightRecordNum); + } // 3. Ensure the currentContext is restored. setCurrentContext(storedContext); inFlightRecordNum.incrementAndGet(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java index 1341c8a6f33..40b18a060a6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java @@ -38,7 +38,7 @@ import java.util.function.Consumer; * @param <K> The type of the key inside the record. */ public class RecordContext<K> extends ReferenceCounted<RecordContext.DisposerRunner> { - /** The empty record for timer and non-record input usage. */ + /** The empty record for non-record input usage. */ static final Object EMPTY_RECORD = new Object(); /** The record to be processed. */ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java index 6ee19242011..c59038ab8d4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.asyncprocessing.operators; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; import org.apache.flink.runtime.asyncprocessing.StateRequestType; import org.apache.flink.runtime.jobgraph.OperatorID; @@ -49,6 +50,7 @@ import org.junit.jupiter.api.Test; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flink.runtime.state.StateBackendTestUtils.buildAsyncStateBackend; @@ -191,6 +193,39 @@ public class AbstractAsyncStateStreamOperatorTest { } } + @Test + void testManyAsyncProcessWithKey() throws Exception { + // This test is for verifying AsyncExecutionController could avoid deadlock for derived + // processing requests. + int requests = ExecutionOptions.ASYNC_INFLIGHT_RECORDS_LIMIT.defaultValue() + 1; + TestOperatorWithMultipleDirectAsyncProcess testOperator = + new TestOperatorWithMultipleDirectAsyncProcess(ElementOrder.RECORD_ORDER, requests); + AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> + testHarness = + AsyncKeyedOneInputStreamOperatorTestHarness.create( + testOperator, + new TestKeySelector(), + BasicTypeInfo.INT_TYPE_INFO, + 128, + 1, + 0); + testHarness.setStateBackend(buildAsyncStateBackend(new HashMapStateBackend())); + try { + testHarness.open(); + CompletableFuture<Void> future = + testHarness.processElementInternal(new StreamRecord<>(Tuple2.of(5, "5"))); + + testHarness.drainStateRequests(); + future.get(10000, TimeUnit.MILLISECONDS); + // If the AEC could avoid deadlock, there should not be any timeout exception. + testHarness.drainStateRequests(); + testOperator.getLastProcessedFuture().get(10000, TimeUnit.MILLISECONDS); + assertThat(testOperator.getProcessed()).isEqualTo(requests); + } finally { + testHarness.close(); + } + } + @Test void testCheckpointDrain() throws Exception { try (AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> @@ -534,6 +569,42 @@ public class AbstractAsyncStateStreamOperatorTest { } } + private static class TestOperatorWithMultipleDirectAsyncProcess extends TestOperator { + + private final int numAsyncProcesses; + private final CompletableFuture<Void> lastProcessedFuture = new CompletableFuture<>(); + + TestOperatorWithMultipleDirectAsyncProcess( + ElementOrder elementOrder, int numAsyncProcesses) { + super(elementOrder); + this.numAsyncProcesses = numAsyncProcesses; + } + + @Override + public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws Exception { + for (int i = 0; i < numAsyncProcesses; i++) { + if (i < numAsyncProcesses - 1) { + asyncProcessWithKey( + element.getValue().f0, + () -> { + processed.incrementAndGet(); + }); + } else { + asyncProcessWithKey( + element.getValue().f0, + () -> { + processed.incrementAndGet(); + lastProcessedFuture.complete(null); + }); + } + } + } + + public CompletableFuture<Void> getLastProcessedFuture() { + return lastProcessedFuture; + } + } + private static class TestOperatorWithAsyncProcessTimer extends TestOperator { TestOperatorWithAsyncProcessTimer(ElementOrder elementOrder) {