This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch release-2.0 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.0 by this push: new 8c8667e0166 [FLINK-37623][datastream] Async state support for process() in Datastream API (#26439) 8c8667e0166 is described below commit 8c8667e0166a9385bb8a48b09e159848b52d52da Author: Yanfei Lei <fredia...@gmail.com> AuthorDate: Tue Apr 15 10:39:15 2025 +0800 [FLINK-37623][datastream] Async state support for process() in Datastream API (#26439) --- .../streaming/api/datastream/KeyedStream.java | 12 +++--- .../apache/flink/streaming/api/DataStreamTest.java | 44 ++++++++++++++++++++++ 2 files changed, 51 insertions(+), 5 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java index dd4818fe30f..92b01f65c0c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java @@ -36,6 +36,7 @@ import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.runtime.asyncprocessing.operators.AsyncIntervalJoinOperator; +import org.apache.flink.runtime.asyncprocessing.operators.AsyncKeyedProcessOperator; import org.apache.flink.runtime.asyncprocessing.operators.AsyncStreamFlatMap; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction; @@ -358,9 +359,10 @@ public class KeyedStream<T, KEY> extends DataStream<T> { @Internal public <R> SingleOutputStreamOperator<R> process( KeyedProcessFunction<KEY, T, R> keyedProcessFunction, TypeInformation<R> outputType) { - - KeyedProcessOperator<KEY, T, R> operator = - new KeyedProcessOperator<>(clean(keyedProcessFunction)); + OneInputStreamOperator<T, R> operator = + isEnableAsyncState() + ? new AsyncKeyedProcessOperator<>(clean(keyedProcessFunction)) + : new KeyedProcessOperator<>(clean(keyedProcessFunction)); return transform("KeyedProcess", outputType, operator); } @@ -370,9 +372,9 @@ public class KeyedStream<T, KEY> extends DataStream<T> { @Override public <R> SingleOutputStreamOperator<R> flatMap( FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) { - OneInputStreamOperator operator = + OneInputStreamOperator<T, R> operator = isEnableAsyncState() - ? new AsyncStreamFlatMap(clean(flatMapper)) + ? new AsyncStreamFlatMap<>(clean(flatMapper)) : new StreamFlatMap<>(clean(flatMapper)); return transform("Flat Map", outputType, operator); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java index c02464b9557..747cec98067 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java @@ -40,6 +40,8 @@ import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator; +import org.apache.flink.runtime.asyncprocessing.operators.AsyncKeyedProcessOperator; import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.ConnectedStreams; @@ -934,6 +936,48 @@ class DataStreamTest { assertThat(getOperatorForDataStream(processed)).isInstanceOf(KeyedProcessOperator.class); } + /** + * Verify that a {@link KeyedStream#process(KeyedProcessFunction)} call is correctly translated + * to an async operator. + */ + @Test + void testAsyncKeyedStreamKeyedProcessTranslation() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStreamSource<Long> src = env.fromSequence(0, 0); + + KeyedProcessFunction<Long, Long, Integer> keyedProcessFunction = + new KeyedProcessFunction<Long, Long, Integer>() { + private static final long serialVersionUID = 1L; + + @Override + public void processElement(Long value, Context ctx, Collector<Integer> out) + throws Exception { + // Do nothing + } + + @Override + public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) + throws Exception { + // Do nothing + } + }; + + DataStream<Integer> processed = + src.keyBy(new IdentityKeySelector<Long>()) + .enableAsyncState() + .process(keyedProcessFunction); + + processed.sinkTo(new DiscardingSink<Integer>()); + + assertThat( + ((AbstractAsyncStateUdfStreamOperator<?, ?>) + getOperatorForDataStream(processed)) + .getUserFunction()) + .isEqualTo(keyedProcessFunction); + assertThat(getOperatorForDataStream(processed)) + .isInstanceOf(AsyncKeyedProcessOperator.class); + } + /** * Verify that a {@link DataStream#process(ProcessFunction)} call is correctly translated to an * operator.