This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.0 by this push:
     new 14e85eced10 [FLINK-37458][datastream] Forbid enableAsyncState() for 
synchronous operators (#26395)
14e85eced10 is described below

commit 14e85eced10e98bc75870ac0360bad67d0722697
Author: Yanfei Lei <fredia...@gmail.com>
AuthorDate: Tue Apr 8 12:13:58 2025 +0800

    [FLINK-37458][datastream] Forbid enableAsyncState() for synchronous 
operators (#26395)
---
 .../transformations/OneInputTransformation.java    |  8 +++++-
 .../api/graph/JobGraphGeneratorTestBase.java       | 29 ++++++++++++++++++++++
 2 files changed, 36 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java
index edae579c923..13a8ead51ee 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import 
org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator;
 
 import org.apache.flink.shaded.guava32.com.google.common.collect.Lists;
 
@@ -195,6 +196,11 @@ public class OneInputTransformation<IN, OUT> extends 
PhysicalTransformation<OUT>
 
     @Override
     public void enableAsyncState() {
-        // nothing to do.
+        OneInputStreamOperator<IN, OUT> operator =
+                (OneInputStreamOperator<IN, OUT>)
+                        ((SimpleOperatorFactory<OUT>) 
operatorFactory).getOperator();
+        if (!(operator instanceof AsyncStateProcessingOperator)) {
+            super.enableAsyncState();
+        }
     }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/JobGraphGeneratorTestBase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/JobGraphGeneratorTestBase.java
index a29164c51b6..48bf37c409c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/JobGraphGeneratorTestBase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/JobGraphGeneratorTestBase.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.io.InputFormat;
@@ -119,6 +120,7 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
 import org.apache.flink.streaming.util.TestAnyModeReadingStreamOperator;
 import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.Collector;
 import org.apache.flink.util.SerializedValue;
 
 import org.apache.flink.shaded.guava32.com.google.common.collect.Iterables;
@@ -155,6 +157,7 @@ import static 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.ar
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
 
 /**
  * Tests for {@link StreamingJobGraphGenerator} and {@link 
AdaptiveGraphManager}.
@@ -2163,6 +2166,32 @@ abstract class JobGraphGeneratorTestBase {
                 new TestingOutputFormatSupportConcurrentExecutionAttempts<>(), 
true);
     }
 
+    @Test
+    void testEnableAsyncStateForSyncOperatorThrowException() throws Exception {
+        final StreamExecutionEnvironment env =
+                StreamExecutionEnvironment.getExecutionEnvironment(new 
Configuration());
+        try {
+            env.fromData(1, 2, 3, 4, 5)
+                    .keyBy(k -> k)
+                    .flatMap(
+                            new FlatMapFunction<Integer, Integer>() {
+                                @Override
+                                public void flatMap(Integer value, 
Collector<Integer> out)
+                                        throws Exception {
+                                    out.collect(value);
+                                }
+                            })
+                    .enableAsyncState()
+                    .print();
+            fail("Enabling async state for synchronous operators is 
forbidden.");
+        } catch (UnsupportedOperationException e) {
+            assertThat(e.getMessage())
+                    .isEqualTo(
+                            "The transformation does not support "
+                                    + "async state, or you are enabling the 
async state without a keyed context (not behind a keyBy()).");
+        }
+    }
+
     private void testWhetherOutputFormatSupportsConcurrentExecutionAttempts(
             OutputFormat<Integer> outputFormat, boolean isSupported) {
         final StreamExecutionEnvironment env =

Reply via email to