This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 65705096e164a67d744a9e43addbec8fa58a6ba9 Author: xiarui <xiarui0...@gmail.com> AuthorDate: Sat Jan 18 12:48:01 2025 +0800 [FLINK-37028][Runtime] Integrate async window operator to DataStream API --- .../examples/windowing/WindowWordCount.java | 24 +++++++++++++++++-- .../streaming/examples/wordcount/WordCount.java | 10 +++++--- .../streaming/test/StreamingExamplesITCase.java | 22 +++++++++++++++++ .../state/api/WindowedStateTransformation.java | 21 ++++++++++------ .../streaming/api/datastream/WindowedStream.java | 28 +++++++++++++++++----- .../transformations/OneInputTransformation.java | 5 ++++ 6 files changed, 92 insertions(+), 18 deletions(-) diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java index 6075edf69c83..9375138991d7 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java @@ -20,11 +20,14 @@ package org.apache.flink.streaming.examples.windowing; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.StateBackendOptions; import org.apache.flink.connector.file.sink.FileSink; import org.apache.flink.connector.file.src.FileSource; import org.apache.flink.connector.file.src.reader.TextLineInputFormat; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.examples.wordcount.WordCount; @@ -33,6 +36,8 @@ import org.apache.flink.streaming.examples.wordcount.util.WordCountData; import java.time.Duration; +import static org.apache.flink.runtime.state.StateBackendLoader.FORST_STATE_BACKEND_NAME; + /** * Implements a windowed version of the streaming "WordCount" program. * @@ -88,6 +93,14 @@ public class WindowWordCount { // available in the Flink UI. env.getConfig().setGlobalJobParameters(params); + if (params.isAsyncState()) { + Configuration config = Configuration.fromMap(env.getConfiguration().toMap()); + if (!config.containsKey(StateBackendOptions.STATE_BACKEND.key())) { + config.set(StateBackendOptions.STATE_BACKEND, FORST_STATE_BACKEND_NAME); + env.configure(config); + } + } + DataStream<String> text; if (params.getInputs().isPresent()) { // Create a new file source that will read files from a given set of directories. @@ -108,7 +121,7 @@ public class WindowWordCount { int windowSize = params.getInt("window").orElse(250); int slideSize = params.getInt("slide").orElse(150); - DataStream<Tuple2<String, Integer>> counts = + KeyedStream<Tuple2<String, Integer>, String> keyedStream = // The text lines read from the source are split into words // using a user-defined function. The tokenizer, implemented below, // will output each words as a (2-tuple) containing (word, 1) @@ -118,7 +131,14 @@ public class WindowWordCount { // Using a keyBy allows performing aggregations and other // stateful transformations over data on a per-key basis. // This is similar to a GROUP BY clause in a SQL query. - .keyBy(value -> value.f0) + .keyBy(value -> value.f0); + + if (params.isAsyncState()) { + keyedStream.enableAsyncState(); + } + + DataStream<Tuple2<String, Integer>> counts = + keyedStream // create windows of windowSize records slided every slideSize records .countWindow(windowSize, slideSize) // For each key, we perform a simple sum of the "1" field, the count. diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java index dae32922859e..78ba06a98898 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java @@ -82,9 +82,13 @@ public class WordCount { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // For async state, by default we will use the forst state backend. - Configuration config = Configuration.fromMap(env.getConfiguration().toMap()); - config.set(StateBackendOptions.STATE_BACKEND, FORST_STATE_BACKEND_NAME); - env.configure(config); + if (params.isAsyncState()) { + Configuration config = Configuration.fromMap(env.getConfiguration().toMap()); + if (!config.containsKey(StateBackendOptions.STATE_BACKEND.key())) { + config.set(StateBackendOptions.STATE_BACKEND, FORST_STATE_BACKEND_NAME); + env.configure(config); + } + } // Apache Flinkās unified approach to stream and batch processing means that a DataStream // application executed over bounded input will produce the same final results regardless diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java index c40f8632728a..1d4a1a4d38ad 100644 --- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java +++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java @@ -126,6 +126,28 @@ public class StreamingExamplesITCase extends AbstractTestBaseJUnit4 { checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d)+\\)"); } + @Test + public void testAsyncWindowWordCount() throws Exception { + final String windowSize = "25"; + final String slideSize = "15"; + final String textPath = createTempFile("text.txt", WordCountData.TEXT); + final String resultPath = getTempDirPath("result"); + + org.apache.flink.streaming.examples.windowing.WindowWordCount.main( + new String[] { + "--input", textPath, + "--output", resultPath, + "--window", windowSize, + "--slide", slideSize, + "--async-state" + }); + + // since the parallel tokenizers might have different speed + // the exact output can not be checked just whether it is well-formed + // checks that the result lines look like e.g. (faust, 2) + checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d)+\\)"); + } + @Test public void testWordCount() throws Exception { final String textPath = createTempFile("text.txt", WordCountData.TEXT); diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedStateTransformation.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedStateTransformation.java index 6a59024d487c..d239d49a741f 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedStateTransformation.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedStateTransformation.java @@ -153,7 +153,8 @@ public class WindowedStateTransformation<T, K, W extends Window> { function = input.getExecutionEnvironment().clean(function); reduceFunction = input.getExecutionEnvironment().clean(reduceFunction); - WindowOperator<K, T, ?, R, W> operator = builder.reduce(reduceFunction, function); + WindowOperator<K, T, ?, R, W> operator = + (WindowOperator<K, T, ?, R, W>) builder.reduce(reduceFunction, function); SavepointWriterOperatorFactory factory = (timestamp, path) -> @@ -180,7 +181,8 @@ public class WindowedStateTransformation<T, K, W extends Window> { function = input.getExecutionEnvironment().clean(function); reduceFunction = input.getExecutionEnvironment().clean(reduceFunction); - WindowOperator<K, T, ?, R, W> operator = builder.reduce(reduceFunction, function); + WindowOperator<K, T, ?, R, W> operator = + (WindowOperator<K, T, ?, R, W>) builder.reduce(reduceFunction, function); SavepointWriterOperatorFactory factory = (timestamp, path) -> @@ -316,7 +318,8 @@ public class WindowedStateTransformation<T, K, W extends Window> { aggregateFunction = input.getExecutionEnvironment().clean(aggregateFunction); WindowOperator<K, T, ?, R, W> operator = - builder.aggregate(aggregateFunction, windowFunction, accumulatorType); + (WindowOperator<K, T, ?, R, W>) + builder.aggregate(aggregateFunction, windowFunction, accumulatorType); SavepointWriterOperatorFactory factory = (timestamp, path) -> @@ -394,7 +397,8 @@ public class WindowedStateTransformation<T, K, W extends Window> { aggregateFunction = input.getExecutionEnvironment().clean(aggregateFunction); WindowOperator<K, T, ?, R, W> operator = - builder.aggregate(aggregateFunction, windowFunction, accumulatorType); + (WindowOperator<K, T, ?, R, W>) + builder.aggregate(aggregateFunction, windowFunction, accumulatorType); SavepointWriterOperatorFactory factory = (timestamp, path) -> @@ -419,7 +423,8 @@ public class WindowedStateTransformation<T, K, W extends Window> { * @return The data stream that is the result of applying the window function to the window. */ public <R> StateBootstrapTransformation<T> apply(WindowFunction<T, R, K, W> function) { - WindowOperator<K, T, ?, R, W> operator = builder.apply(function); + WindowOperator<K, T, ?, R, W> operator = + (WindowOperator<K, T, ?, R, W>) builder.apply(function); SavepointWriterOperatorFactory factory = (timestamp, path) -> @@ -444,7 +449,8 @@ public class WindowedStateTransformation<T, K, W extends Window> { WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) { function = input.getExecutionEnvironment().clean(function); - WindowOperator<K, T, ?, R, W> operator = builder.apply(function); + WindowOperator<K, T, ?, R, W> operator = + (WindowOperator<K, T, ?, R, W>) builder.apply(function); SavepointWriterOperatorFactory factory = (timestamp, path) -> @@ -466,7 +472,8 @@ public class WindowedStateTransformation<T, K, W extends Window> { */ @PublicEvolving public <R> StateBootstrapTransformation<T> process(ProcessWindowFunction<T, R, K, W> function) { - WindowOperator<K, T, ?, R, W> operator = builder.process(function); + WindowOperator<K, T, ?, R, W> operator = + (WindowOperator<K, T, ?, R, W>) builder.process(function); SavepointWriterOperatorFactory factory = (timestamp, path) -> diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java index 1c694b4b81b6..a05d32930030 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java @@ -77,10 +77,13 @@ public class WindowedStream<T, K, W extends Window> { private final WindowOperatorBuilder<T, K, W> builder; + private boolean isEnableAsyncState; + @PublicEvolving public WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) { this.input = input; + this.isEnableAsyncState = input.isEnableAsyncState(); this.builder = new WindowOperatorBuilder<>( @@ -216,7 +219,10 @@ public class WindowedStream<T, K, W extends Window> { final String opName = builder.generateOperatorName(); final String opDescription = builder.generateOperatorDescription(reduceFunction, function); - OneInputStreamOperator<T, R> operator = builder.reduce(reduceFunction, function); + OneInputStreamOperator<T, R> operator = + isEnableAsyncState + ? builder.asyncReduce(reduceFunction, function) + : builder.reduce(reduceFunction, function); return input.transform(opName, resultType, operator).setDescription(opDescription); } @@ -263,7 +269,10 @@ public class WindowedStream<T, K, W extends Window> { final String opName = builder.generateOperatorName(); final String opDescription = builder.generateOperatorDescription(reduceFunction, function); - OneInputStreamOperator<T, R> operator = builder.reduce(reduceFunction, function); + OneInputStreamOperator<T, R> operator = + isEnableAsyncState + ? builder.asyncReduce(reduceFunction, function) + : builder.reduce(reduceFunction, function); return input.transform(opName, resultType, operator).setDescription(opDescription); } @@ -414,7 +423,9 @@ public class WindowedStream<T, K, W extends Window> { builder.generateOperatorDescription(aggregateFunction, windowFunction); OneInputStreamOperator<T, R> operator = - builder.aggregate(aggregateFunction, windowFunction, accumulatorType); + isEnableAsyncState + ? builder.asyncAggregate(aggregateFunction, windowFunction, accumulatorType) + : builder.aggregate(aggregateFunction, windowFunction, accumulatorType); return input.transform(opName, resultType, operator).setDescription(opDescription); } @@ -525,7 +536,9 @@ public class WindowedStream<T, K, W extends Window> { builder.generateOperatorDescription(aggregateFunction, windowFunction); OneInputStreamOperator<T, R> operator = - builder.aggregate(aggregateFunction, windowFunction, accumulatorType); + isEnableAsyncState + ? builder.asyncAggregate(aggregateFunction, windowFunction, accumulatorType) + : builder.aggregate(aggregateFunction, windowFunction, accumulatorType); return input.transform(opName, resultType, operator).setDescription(opDescription); } @@ -569,7 +582,8 @@ public class WindowedStream<T, K, W extends Window> { final String opName = builder.generateOperatorName(); final String opDescription = builder.generateOperatorDescription(function, null); - OneInputStreamOperator<T, R> operator = builder.apply(function); + OneInputStreamOperator<T, R> operator = + isEnableAsyncState ? builder.asyncApply(function) : builder.apply(function); return input.transform(opName, resultType, operator).setDescription(opDescription); } @@ -613,7 +627,8 @@ public class WindowedStream<T, K, W extends Window> { final String opName = builder.generateOperatorName(); final String opDesc = builder.generateOperatorDescription(function, null); - OneInputStreamOperator<T, R> operator = builder.process(function); + OneInputStreamOperator<T, R> operator = + isEnableAsyncState ? builder.asyncProcess(function) : builder.process(function); return input.transform(opName, resultType, operator).setDescription(opDesc); } @@ -861,6 +876,7 @@ public class WindowedStream<T, K, W extends Window> { @Experimental public WindowedStream<T, K, W> enableAsyncState() { input.enableAsyncState(); + this.isEnableAsyncState = true; return this; } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java index e8a116b3f9bb..edae579c9231 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java @@ -192,4 +192,9 @@ public class OneInputTransformation<IN, OUT> extends PhysicalTransformation<OUT> public boolean isInternalSorterSupported() { return operatorFactory.getOperatorAttributes().isInternalSorterSupported(); } + + @Override + public void enableAsyncState() { + // nothing to do. + } }