This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 65705096e164a67d744a9e43addbec8fa58a6ba9
Author: xiarui <xiarui0...@gmail.com>
AuthorDate: Sat Jan 18 12:48:01 2025 +0800

    [FLINK-37028][Runtime] Integrate async window operator to DataStream API
---
 .../examples/windowing/WindowWordCount.java        | 24 +++++++++++++++++--
 .../streaming/examples/wordcount/WordCount.java    | 10 +++++---
 .../streaming/test/StreamingExamplesITCase.java    | 22 +++++++++++++++++
 .../state/api/WindowedStateTransformation.java     | 21 ++++++++++------
 .../streaming/api/datastream/WindowedStream.java   | 28 +++++++++++++++++-----
 .../transformations/OneInputTransformation.java    |  5 ++++
 6 files changed, 92 insertions(+), 18 deletions(-)

diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
index 6075edf69c83..9375138991d7 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
@@ -20,11 +20,14 @@ package org.apache.flink.streaming.examples.windowing;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.StateBackendOptions;
 import org.apache.flink.connector.file.sink.FileSink;
 import org.apache.flink.connector.file.src.FileSource;
 import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 import org.apache.flink.streaming.examples.wordcount.WordCount;
@@ -33,6 +36,8 @@ import 
org.apache.flink.streaming.examples.wordcount.util.WordCountData;
 
 import java.time.Duration;
 
+import static 
org.apache.flink.runtime.state.StateBackendLoader.FORST_STATE_BACKEND_NAME;
+
 /**
  * Implements a windowed version of the streaming "WordCount" program.
  *
@@ -88,6 +93,14 @@ public class WindowWordCount {
         // available in the Flink UI.
         env.getConfig().setGlobalJobParameters(params);
 
+        if (params.isAsyncState()) {
+            Configuration config = 
Configuration.fromMap(env.getConfiguration().toMap());
+            if (!config.containsKey(StateBackendOptions.STATE_BACKEND.key())) {
+                config.set(StateBackendOptions.STATE_BACKEND, 
FORST_STATE_BACKEND_NAME);
+                env.configure(config);
+            }
+        }
+
         DataStream<String> text;
         if (params.getInputs().isPresent()) {
             // Create a new file source that will read files from a given set 
of directories.
@@ -108,7 +121,7 @@ public class WindowWordCount {
         int windowSize = params.getInt("window").orElse(250);
         int slideSize = params.getInt("slide").orElse(150);
 
-        DataStream<Tuple2<String, Integer>> counts =
+        KeyedStream<Tuple2<String, Integer>, String> keyedStream =
                 // The text lines read from the source are split into words
                 // using a user-defined function. The tokenizer, implemented 
below,
                 // will output each words as a (2-tuple) containing (word, 1)
@@ -118,7 +131,14 @@ public class WindowWordCount {
                         // Using a keyBy allows performing aggregations and 
other
                         // stateful transformations over data on a per-key 
basis.
                         // This is similar to a GROUP BY clause in a SQL query.
-                        .keyBy(value -> value.f0)
+                        .keyBy(value -> value.f0);
+
+        if (params.isAsyncState()) {
+            keyedStream.enableAsyncState();
+        }
+
+        DataStream<Tuple2<String, Integer>> counts =
+                keyedStream
                         // create windows of windowSize records slided every 
slideSize records
                         .countWindow(windowSize, slideSize)
                         // For each key, we perform a simple sum of the "1" 
field, the count.
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
index dae32922859e..78ba06a98898 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
@@ -82,9 +82,13 @@ public class WordCount {
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
         // For async state, by default we will use the forst state backend.
-        Configuration config = 
Configuration.fromMap(env.getConfiguration().toMap());
-        config.set(StateBackendOptions.STATE_BACKEND, 
FORST_STATE_BACKEND_NAME);
-        env.configure(config);
+        if (params.isAsyncState()) {
+            Configuration config = 
Configuration.fromMap(env.getConfiguration().toMap());
+            if (!config.containsKey(StateBackendOptions.STATE_BACKEND.key())) {
+                config.set(StateBackendOptions.STATE_BACKEND, 
FORST_STATE_BACKEND_NAME);
+                env.configure(config);
+            }
+        }
 
         // Apache Flink’s unified approach to stream and batch processing 
means that a DataStream
         // application executed over bounded input will produce the same final 
results regardless
diff --git 
a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
 
b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
index c40f8632728a..1d4a1a4d38ad 100644
--- 
a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
+++ 
b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
@@ -126,6 +126,28 @@ public class StreamingExamplesITCase extends 
AbstractTestBaseJUnit4 {
         checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d)+\\)");
     }
 
+    @Test
+    public void testAsyncWindowWordCount() throws Exception {
+        final String windowSize = "25";
+        final String slideSize = "15";
+        final String textPath = createTempFile("text.txt", WordCountData.TEXT);
+        final String resultPath = getTempDirPath("result");
+
+        org.apache.flink.streaming.examples.windowing.WindowWordCount.main(
+                new String[] {
+                    "--input", textPath,
+                    "--output", resultPath,
+                    "--window", windowSize,
+                    "--slide", slideSize,
+                    "--async-state"
+                });
+
+        // since the parallel tokenizers might have different speed
+        // the exact output can not be checked just whether it is well-formed
+        // checks that the result lines look like e.g. (faust, 2)
+        checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d)+\\)");
+    }
+
     @Test
     public void testWordCount() throws Exception {
         final String textPath = createTempFile("text.txt", WordCountData.TEXT);
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedStateTransformation.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedStateTransformation.java
index 6a59024d487c..d239d49a741f 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedStateTransformation.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedStateTransformation.java
@@ -153,7 +153,8 @@ public class WindowedStateTransformation<T, K, W extends 
Window> {
         function = input.getExecutionEnvironment().clean(function);
         reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);
 
-        WindowOperator<K, T, ?, R, W> operator = 
builder.reduce(reduceFunction, function);
+        WindowOperator<K, T, ?, R, W> operator =
+                (WindowOperator<K, T, ?, R, W>) builder.reduce(reduceFunction, 
function);
 
         SavepointWriterOperatorFactory factory =
                 (timestamp, path) ->
@@ -180,7 +181,8 @@ public class WindowedStateTransformation<T, K, W extends 
Window> {
         function = input.getExecutionEnvironment().clean(function);
         reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);
 
-        WindowOperator<K, T, ?, R, W> operator = 
builder.reduce(reduceFunction, function);
+        WindowOperator<K, T, ?, R, W> operator =
+                (WindowOperator<K, T, ?, R, W>) builder.reduce(reduceFunction, 
function);
 
         SavepointWriterOperatorFactory factory =
                 (timestamp, path) ->
@@ -316,7 +318,8 @@ public class WindowedStateTransformation<T, K, W extends 
Window> {
         aggregateFunction = 
input.getExecutionEnvironment().clean(aggregateFunction);
 
         WindowOperator<K, T, ?, R, W> operator =
-                builder.aggregate(aggregateFunction, windowFunction, 
accumulatorType);
+                (WindowOperator<K, T, ?, R, W>)
+                        builder.aggregate(aggregateFunction, windowFunction, 
accumulatorType);
 
         SavepointWriterOperatorFactory factory =
                 (timestamp, path) ->
@@ -394,7 +397,8 @@ public class WindowedStateTransformation<T, K, W extends 
Window> {
         aggregateFunction = 
input.getExecutionEnvironment().clean(aggregateFunction);
 
         WindowOperator<K, T, ?, R, W> operator =
-                builder.aggregate(aggregateFunction, windowFunction, 
accumulatorType);
+                (WindowOperator<K, T, ?, R, W>)
+                        builder.aggregate(aggregateFunction, windowFunction, 
accumulatorType);
 
         SavepointWriterOperatorFactory factory =
                 (timestamp, path) ->
@@ -419,7 +423,8 @@ public class WindowedStateTransformation<T, K, W extends 
Window> {
      * @return The data stream that is the result of applying the window 
function to the window.
      */
     public <R> StateBootstrapTransformation<T> apply(WindowFunction<T, R, K, 
W> function) {
-        WindowOperator<K, T, ?, R, W> operator = builder.apply(function);
+        WindowOperator<K, T, ?, R, W> operator =
+                (WindowOperator<K, T, ?, R, W>) builder.apply(function);
 
         SavepointWriterOperatorFactory factory =
                 (timestamp, path) ->
@@ -444,7 +449,8 @@ public class WindowedStateTransformation<T, K, W extends 
Window> {
             WindowFunction<T, R, K, W> function, TypeInformation<R> 
resultType) {
         function = input.getExecutionEnvironment().clean(function);
 
-        WindowOperator<K, T, ?, R, W> operator = builder.apply(function);
+        WindowOperator<K, T, ?, R, W> operator =
+                (WindowOperator<K, T, ?, R, W>) builder.apply(function);
 
         SavepointWriterOperatorFactory factory =
                 (timestamp, path) ->
@@ -466,7 +472,8 @@ public class WindowedStateTransformation<T, K, W extends 
Window> {
      */
     @PublicEvolving
     public <R> StateBootstrapTransformation<T> 
process(ProcessWindowFunction<T, R, K, W> function) {
-        WindowOperator<K, T, ?, R, W> operator = builder.process(function);
+        WindowOperator<K, T, ?, R, W> operator =
+                (WindowOperator<K, T, ?, R, W>) builder.process(function);
 
         SavepointWriterOperatorFactory factory =
                 (timestamp, path) ->
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 1c694b4b81b6..a05d32930030 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -77,10 +77,13 @@ public class WindowedStream<T, K, W extends Window> {
 
     private final WindowOperatorBuilder<T, K, W> builder;
 
+    private boolean isEnableAsyncState;
+
     @PublicEvolving
     public WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, 
W> windowAssigner) {
 
         this.input = input;
+        this.isEnableAsyncState = input.isEnableAsyncState();
 
         this.builder =
                 new WindowOperatorBuilder<>(
@@ -216,7 +219,10 @@ public class WindowedStream<T, K, W extends Window> {
         final String opName = builder.generateOperatorName();
         final String opDescription = 
builder.generateOperatorDescription(reduceFunction, function);
 
-        OneInputStreamOperator<T, R> operator = builder.reduce(reduceFunction, 
function);
+        OneInputStreamOperator<T, R> operator =
+                isEnableAsyncState
+                        ? builder.asyncReduce(reduceFunction, function)
+                        : builder.reduce(reduceFunction, function);
         return input.transform(opName, resultType, 
operator).setDescription(opDescription);
     }
 
@@ -263,7 +269,10 @@ public class WindowedStream<T, K, W extends Window> {
 
         final String opName = builder.generateOperatorName();
         final String opDescription = 
builder.generateOperatorDescription(reduceFunction, function);
-        OneInputStreamOperator<T, R> operator = builder.reduce(reduceFunction, 
function);
+        OneInputStreamOperator<T, R> operator =
+                isEnableAsyncState
+                        ? builder.asyncReduce(reduceFunction, function)
+                        : builder.reduce(reduceFunction, function);
 
         return input.transform(opName, resultType, 
operator).setDescription(opDescription);
     }
@@ -414,7 +423,9 @@ public class WindowedStream<T, K, W extends Window> {
                 builder.generateOperatorDescription(aggregateFunction, 
windowFunction);
 
         OneInputStreamOperator<T, R> operator =
-                builder.aggregate(aggregateFunction, windowFunction, 
accumulatorType);
+                isEnableAsyncState
+                        ? builder.asyncAggregate(aggregateFunction, 
windowFunction, accumulatorType)
+                        : builder.aggregate(aggregateFunction, windowFunction, 
accumulatorType);
 
         return input.transform(opName, resultType, 
operator).setDescription(opDescription);
     }
@@ -525,7 +536,9 @@ public class WindowedStream<T, K, W extends Window> {
                 builder.generateOperatorDescription(aggregateFunction, 
windowFunction);
 
         OneInputStreamOperator<T, R> operator =
-                builder.aggregate(aggregateFunction, windowFunction, 
accumulatorType);
+                isEnableAsyncState
+                        ? builder.asyncAggregate(aggregateFunction, 
windowFunction, accumulatorType)
+                        : builder.aggregate(aggregateFunction, windowFunction, 
accumulatorType);
 
         return input.transform(opName, resultType, 
operator).setDescription(opDescription);
     }
@@ -569,7 +582,8 @@ public class WindowedStream<T, K, W extends Window> {
 
         final String opName = builder.generateOperatorName();
         final String opDescription = 
builder.generateOperatorDescription(function, null);
-        OneInputStreamOperator<T, R> operator = builder.apply(function);
+        OneInputStreamOperator<T, R> operator =
+                isEnableAsyncState ? builder.asyncApply(function) : 
builder.apply(function);
 
         return input.transform(opName, resultType, 
operator).setDescription(opDescription);
     }
@@ -613,7 +627,8 @@ public class WindowedStream<T, K, W extends Window> {
         final String opName = builder.generateOperatorName();
         final String opDesc = builder.generateOperatorDescription(function, 
null);
 
-        OneInputStreamOperator<T, R> operator = builder.process(function);
+        OneInputStreamOperator<T, R> operator =
+                isEnableAsyncState ? builder.asyncProcess(function) : 
builder.process(function);
 
         return input.transform(opName, resultType, 
operator).setDescription(opDesc);
     }
@@ -861,6 +876,7 @@ public class WindowedStream<T, K, W extends Window> {
     @Experimental
     public WindowedStream<T, K, W> enableAsyncState() {
         input.enableAsyncState();
+        this.isEnableAsyncState = true;
         return this;
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java
index e8a116b3f9bb..edae579c9231 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java
@@ -192,4 +192,9 @@ public class OneInputTransformation<IN, OUT> extends 
PhysicalTransformation<OUT>
     public boolean isInternalSorterSupported() {
         return 
operatorFactory.getOperatorAttributes().isInternalSorterSupported();
     }
+
+    @Override
+    public void enableAsyncState() {
+        // nothing to do.
+    }
 }

Reply via email to