This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.0 by this push:
     new 8c8667e0166 [FLINK-37623][datastream] Async state support for 
process() in Datastream API (#26439)
8c8667e0166 is described below

commit 8c8667e0166a9385bb8a48b09e159848b52d52da
Author: Yanfei Lei <fredia...@gmail.com>
AuthorDate: Tue Apr 15 10:39:15 2025 +0800

    [FLINK-37623][datastream] Async state support for process() in Datastream 
API (#26439)
---
 .../streaming/api/datastream/KeyedStream.java      | 12 +++---
 .../apache/flink/streaming/api/DataStreamTest.java | 44 ++++++++++++++++++++++
 2 files changed, 51 insertions(+), 5 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index dd4818fe30f..92b01f65c0c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -36,6 +36,7 @@ import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import 
org.apache.flink.runtime.asyncprocessing.operators.AsyncIntervalJoinOperator;
+import 
org.apache.flink.runtime.asyncprocessing.operators.AsyncKeyedProcessOperator;
 import org.apache.flink.runtime.asyncprocessing.operators.AsyncStreamFlatMap;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
@@ -358,9 +359,10 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
     @Internal
     public <R> SingleOutputStreamOperator<R> process(
             KeyedProcessFunction<KEY, T, R> keyedProcessFunction, 
TypeInformation<R> outputType) {
-
-        KeyedProcessOperator<KEY, T, R> operator =
-                new KeyedProcessOperator<>(clean(keyedProcessFunction));
+        OneInputStreamOperator<T, R> operator =
+                isEnableAsyncState()
+                        ? new 
AsyncKeyedProcessOperator<>(clean(keyedProcessFunction))
+                        : new 
KeyedProcessOperator<>(clean(keyedProcessFunction));
         return transform("KeyedProcess", outputType, operator);
     }
 
@@ -370,9 +372,9 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
     @Override
     public <R> SingleOutputStreamOperator<R> flatMap(
             FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) {
-        OneInputStreamOperator operator =
+        OneInputStreamOperator<T, R> operator =
                 isEnableAsyncState()
-                        ? new AsyncStreamFlatMap(clean(flatMapper))
+                        ? new AsyncStreamFlatMap<>(clean(flatMapper))
                         : new StreamFlatMap<>(clean(flatMapper));
         return transform("Flat Map", outputType, operator);
     }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index c02464b9557..747cec98067 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -40,6 +40,8 @@ import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import 
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator;
+import 
org.apache.flink.runtime.asyncprocessing.operators.AsyncKeyedProcessOperator;
 import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
 import org.apache.flink.streaming.api.datastream.BroadcastStream;
 import org.apache.flink.streaming.api.datastream.ConnectedStreams;
@@ -934,6 +936,48 @@ class DataStreamTest {
         
assertThat(getOperatorForDataStream(processed)).isInstanceOf(KeyedProcessOperator.class);
     }
 
+    /**
+     * Verify that a {@link KeyedStream#process(KeyedProcessFunction)} call is 
correctly translated
+     * to an async operator.
+     */
+    @Test
+    void testAsyncKeyedStreamKeyedProcessTranslation() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStreamSource<Long> src = env.fromSequence(0, 0);
+
+        KeyedProcessFunction<Long, Long, Integer> keyedProcessFunction =
+                new KeyedProcessFunction<Long, Long, Integer>() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public void processElement(Long value, Context ctx, 
Collector<Integer> out)
+                            throws Exception {
+                        // Do nothing
+                    }
+
+                    @Override
+                    public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<Integer> out)
+                            throws Exception {
+                        // Do nothing
+                    }
+                };
+
+        DataStream<Integer> processed =
+                src.keyBy(new IdentityKeySelector<Long>())
+                        .enableAsyncState()
+                        .process(keyedProcessFunction);
+
+        processed.sinkTo(new DiscardingSink<Integer>());
+
+        assertThat(
+                        ((AbstractAsyncStateUdfStreamOperator<?, ?>)
+                                        getOperatorForDataStream(processed))
+                                .getUserFunction())
+                .isEqualTo(keyedProcessFunction);
+        assertThat(getOperatorForDataStream(processed))
+                .isInstanceOf(AsyncKeyedProcessOperator.class);
+    }
+
     /**
      * Verify that a {@link DataStream#process(ProcessFunction)} call is 
correctly translated to an
      * operator.

Reply via email to