This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch release-2.0 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.0 by this push: new 14e85eced10 [FLINK-37458][datastream] Forbid enableAsyncState() for synchronous operators (#26395) 14e85eced10 is described below commit 14e85eced10e98bc75870ac0360bad67d0722697 Author: Yanfei Lei <fredia...@gmail.com> AuthorDate: Tue Apr 8 12:13:58 2025 +0800 [FLINK-37458][datastream] Forbid enableAsyncState() for synchronous operators (#26395) --- .../transformations/OneInputTransformation.java | 8 +++++- .../api/graph/JobGraphGeneratorTestBase.java | 29 ++++++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java index edae579c923..13a8ead51ee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java @@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator; import org.apache.flink.shaded.guava32.com.google.common.collect.Lists; @@ -195,6 +196,11 @@ public class OneInputTransformation<IN, OUT> extends PhysicalTransformation<OUT> @Override public void enableAsyncState() { - // nothing to do. + OneInputStreamOperator<IN, OUT> operator = + (OneInputStreamOperator<IN, OUT>) + ((SimpleOperatorFactory<OUT>) operatorFactory).getOperator(); + if (!(operator instanceof AsyncStateProcessingOperator)) { + super.enableAsyncState(); + } } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/JobGraphGeneratorTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/JobGraphGeneratorTestBase.java index a29164c51b6..48bf37c409c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/JobGraphGeneratorTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/JobGraphGeneratorTestBase.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.io.InputFormat; @@ -119,6 +120,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask; import org.apache.flink.streaming.util.TestAnyModeReadingStreamOperator; import org.apache.flink.util.AbstractID; +import org.apache.flink.util.Collector; import org.apache.flink.util.SerializedValue; import org.apache.flink.shaded.guava32.com.google.common.collect.Iterables; @@ -155,6 +157,7 @@ import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.ar import static org.apache.flink.util.Preconditions.checkNotNull; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.fail; /** * Tests for {@link StreamingJobGraphGenerator} and {@link AdaptiveGraphManager}. @@ -2163,6 +2166,32 @@ abstract class JobGraphGeneratorTestBase { new TestingOutputFormatSupportConcurrentExecutionAttempts<>(), true); } + @Test + void testEnableAsyncStateForSyncOperatorThrowException() throws Exception { + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(new Configuration()); + try { + env.fromData(1, 2, 3, 4, 5) + .keyBy(k -> k) + .flatMap( + new FlatMapFunction<Integer, Integer>() { + @Override + public void flatMap(Integer value, Collector<Integer> out) + throws Exception { + out.collect(value); + } + }) + .enableAsyncState() + .print(); + fail("Enabling async state for synchronous operators is forbidden."); + } catch (UnsupportedOperationException e) { + assertThat(e.getMessage()) + .isEqualTo( + "The transformation does not support " + + "async state, or you are enabling the async state without a keyed context (not behind a keyBy())."); + } + } + private void testWhetherOutputFormatSupportsConcurrentExecutionAttempts( OutputFormat<Integer> outputFormat, boolean isSupported) { final StreamExecutionEnvironment env =