This is an automated email from the ASF dual-hosted git repository.

huweihua pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 534279ee388077513d4d7f3396e79fa094da3841
Author: Zhanghao Chen <[email protected]>
AuthorDate: Mon Aug 28 21:53:55 2023 +0800

    [FLINK-32821][examples] Add integrated tests for streaming examples
    
    (cherry picked from commit 3a44c1fbff1acf429f010d3f0fb06a6457727e16)
---
 flink-end-to-end-tests/run-nightly-tests.sh        |  2 +
 .../test-scripts/test_streaming_examples.sh        | 53 ++++++++++++++++++++++
 .../flink/streaming/examples/join/WindowJoin.java  | 27 ++++++++++-
 3 files changed, 80 insertions(+), 2 deletions(-)

diff --git a/flink-end-to-end-tests/run-nightly-tests.sh 
b/flink-end-to-end-tests/run-nightly-tests.sh
index f6f57b4ec77..7589d281634 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -176,6 +176,8 @@ function run_group_2 {
 
     run_test "Flink CLI end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_cli.sh"
 
+    run_test "Flink streaming examples end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_examples.sh"
+
     run_test "Queryable state (rocksdb) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_queryable_state.sh rocksdb"
     run_test "Queryable state (rocksdb) with TM restart end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_queryable_state_restart_tm.sh" 
"skip_check_exceptions"
 
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_examples.sh 
b/flink-end-to-end-tests/test-scripts/test_streaming_examples.sh
new file mode 100755
index 00000000000..8145da4e68d
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_examples.sh
@@ -0,0 +1,53 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# End to end test for streaming examples. It only validates that the job graph 
can be successfully generated
+# and submitted to a standalone session cluster.
+# Usage:
+# FLINK_DIR=<flink dir> TEST_DATA_DIR-<test data dir> 
flink-end-to-end-tests/test-scripts/test_streaming_examples.sh
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+
+EXIT_CODE=0
+
+function run_example() {
+    printf 
"\n==============================================================================\n"
+    printf "Test operation on running $1 example\n"
+    printf 
"==============================================================================\n"
+    if [ $EXIT_CODE == 0 ]; then
+        TEST_PROGRAM_JAR=$FLINK_DIR/examples/streaming/$1.jar
+        RETURN=`$FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR --output 
file:///${TEST_DATA_DIR}/result1`
+        EXIT_CODE=$?
+        echo "$RETURN"
+    fi
+}
+
+run_example "Iteration"
+run_example "SessionWindowing"
+run_example "StateMachineExample"
+run_example "TopSpeedWindowing"
+run_example "WindowJoin"
+run_example "WordCount"
+
+exit $EXIT_CODE
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
index ef33a58ddcb..e6332090289 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -24,17 +24,24 @@ import 
org.apache.flink.api.common.eventtime.WatermarkGenerator;
 import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import 
org.apache.flink.streaming.examples.join.WindowJoinSampleData.GradeSource;
 import 
org.apache.flink.streaming.examples.join.WindowJoinSampleData.SalarySource;
 
+import java.time.Duration;
+
 /**
  * Example illustrating a windowed stream join between two data streams.
  *
@@ -55,6 +62,7 @@ public class WindowJoin {
         final ParameterTool params = ParameterTool.fromArgs(args);
         final long windowSize = params.getLong("windowSize", 2000);
         final long rate = params.getLong("rate", 3L);
+        final boolean fileOutput = params.has("output");
 
         System.out.println("Using windowSize=" + windowSize + ", data rate=" + 
rate);
         System.out.println(
@@ -80,8 +88,23 @@ public class WindowJoin {
         DataStream<Tuple3<String, Integer, Integer>> joinedStream =
                 runWindowJoin(grades, salaries, windowSize);
 
-        // print the results with a single thread, rather than in parallel
-        joinedStream.print().setParallelism(1);
+        if (fileOutput) {
+            joinedStream
+                    .sinkTo(
+                            FileSink.<Tuple3<String, Integer, 
Integer>>forRowFormat(
+                                            new Path(params.get("output")),
+                                            new SimpleStringEncoder<>())
+                                    .withRollingPolicy(
+                                            DefaultRollingPolicy.builder()
+                                                    
.withMaxPartSize(MemorySize.ofMebiBytes(1))
+                                                    
.withRolloverInterval(Duration.ofSeconds(10))
+                                                    .build())
+                                    .build())
+                    .name("output");
+        } else {
+            // print the results with a single thread, rather than in parallel
+            joinedStream.print().setParallelism(1);
+        }
 
         // execute program
         env.execute("Windowed Join Example");

Reply via email to