This is an automated email from the ASF dual-hosted git repository. huweihua pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 534279ee388077513d4d7f3396e79fa094da3841 Author: Zhanghao Chen <[email protected]> AuthorDate: Mon Aug 28 21:53:55 2023 +0800 [FLINK-32821][examples] Add integrated tests for streaming examples (cherry picked from commit 3a44c1fbff1acf429f010d3f0fb06a6457727e16) --- flink-end-to-end-tests/run-nightly-tests.sh | 2 + .../test-scripts/test_streaming_examples.sh | 53 ++++++++++++++++++++++ .../flink/streaming/examples/join/WindowJoin.java | 27 ++++++++++- 3 files changed, 80 insertions(+), 2 deletions(-) diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index f6f57b4ec77..7589d281634 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -176,6 +176,8 @@ function run_group_2 { run_test "Flink CLI end-to-end test" "$END_TO_END_DIR/test-scripts/test_cli.sh" + run_test "Flink streaming examples end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_examples.sh" + run_test "Queryable state (rocksdb) end-to-end test" "$END_TO_END_DIR/test-scripts/test_queryable_state.sh rocksdb" run_test "Queryable state (rocksdb) with TM restart end-to-end test" "$END_TO_END_DIR/test-scripts/test_queryable_state_restart_tm.sh" "skip_check_exceptions" diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_examples.sh b/flink-end-to-end-tests/test-scripts/test_streaming_examples.sh new file mode 100755 index 00000000000..8145da4e68d --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test_streaming_examples.sh @@ -0,0 +1,53 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# End to end test for streaming examples. It only validates that the job graph can be successfully generated +# and submitted to a standalone session cluster. +# Usage: +# FLINK_DIR=<flink dir> TEST_DATA_DIR-<test data dir> flink-end-to-end-tests/test-scripts/test_streaming_examples.sh + +source "$(dirname "$0")"/common.sh + +start_cluster +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start + +EXIT_CODE=0 + +function run_example() { + printf "\n==============================================================================\n" + printf "Test operation on running $1 example\n" + printf "==============================================================================\n" + if [ $EXIT_CODE == 0 ]; then + TEST_PROGRAM_JAR=$FLINK_DIR/examples/streaming/$1.jar + RETURN=`$FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR --output file:///${TEST_DATA_DIR}/result1` + EXIT_CODE=$? + echo "$RETURN" + fi +} + +run_example "Iteration" +run_example "SessionWindowing" +run_example "StateMachineExample" +run_example "TopSpeedWindowing" +run_example "WindowJoin" +run_example "WordCount" + +exit $EXIT_CODE diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java index ef33a58ddcb..e6332090289 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java @@ -24,17 +24,24 @@ import org.apache.flink.api.common.eventtime.WatermarkGenerator; import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.examples.join.WindowJoinSampleData.GradeSource; import org.apache.flink.streaming.examples.join.WindowJoinSampleData.SalarySource; +import java.time.Duration; + /** * Example illustrating a windowed stream join between two data streams. * @@ -55,6 +62,7 @@ public class WindowJoin { final ParameterTool params = ParameterTool.fromArgs(args); final long windowSize = params.getLong("windowSize", 2000); final long rate = params.getLong("rate", 3L); + final boolean fileOutput = params.has("output"); System.out.println("Using windowSize=" + windowSize + ", data rate=" + rate); System.out.println( @@ -80,8 +88,23 @@ public class WindowJoin { DataStream<Tuple3<String, Integer, Integer>> joinedStream = runWindowJoin(grades, salaries, windowSize); - // print the results with a single thread, rather than in parallel - joinedStream.print().setParallelism(1); + if (fileOutput) { + joinedStream + .sinkTo( + FileSink.<Tuple3<String, Integer, Integer>>forRowFormat( + new Path(params.get("output")), + new SimpleStringEncoder<>()) + .withRollingPolicy( + DefaultRollingPolicy.builder() + .withMaxPartSize(MemorySize.ofMebiBytes(1)) + .withRolloverInterval(Duration.ofSeconds(10)) + .build()) + .build()) + .name("output"); + } else { + // print the results with a single thread, rather than in parallel + joinedStream.print().setParallelism(1); + } // execute program env.execute("Windowed Join Example");
