[
https://issues.apache.org/jira/browse/FLINK-8985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16676976#comment-16676976
]
ASF GitHub Bot commented on FLINK-8985:
---------------------------------------
asfgit closed pull request #5863: [FLINK-8985][e2etest] initial support for
End-to-end CLI test, excluding YARN test
URL: https://github.com/apache/flink/pull/5863
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/flink-end-to-end-tests/flink-api-test/pom.xml
b/flink-end-to-end-tests/flink-api-test/pom.xml
new file mode 100644
index 00000000000..24a85e02bef
--- /dev/null
+++ b/flink-end-to-end-tests/flink-api-test/pom.xml
@@ -0,0 +1,105 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <parent>
+ <artifactId>flink-end-to-end-tests</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>1.6-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>flink-api-test</artifactId>
+ <name>flink-api-test</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-connector-filesystem_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.0.0</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+
<finalName>PeriodicStreamingJob</finalName>
+ <artifactSet>
+ <excludes>
+
<exclude>com.google.code.findbugs:jsr305</exclude>
+
<exclude>org.slf4j:*</exclude>
+
<exclude>log4j:*</exclude>
+ </excludes>
+ </artifactSet>
+ <filters>
+ <filter>
+
<artifact>*:*</artifact>
+
<excludes>
+
<exclude>META-INF/*.SF</exclude>
+
<exclude>META-INF/*.DSA</exclude>
+
<exclude>META-INF/*.RSA</exclude>
+
</excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+
<mainClass>org.apache.flink.runtime.tests.PeriodicStreamingJob</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
+
diff --git
a/flink-end-to-end-tests/flink-api-test/src/main/java/org/apache/flink/runtime/tests/PeriodicStreamingJob.java
b/flink-end-to-end-tests/flink-api-test/src/main/java/org/apache/flink/runtime/tests/PeriodicStreamingJob.java
new file mode 100644
index 00000000000..ee8a49b8583
--- /dev/null
+++
b/flink-end-to-end-tests/flink-api-test/src/main/java/org/apache/flink/runtime/tests/PeriodicStreamingJob.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.tests;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.time.Time;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * This is a periodic streaming job that runs for API testing purpose.
+ *
+ * <p>The stream is bounded and will complete after the specified duration.
+ *
+ * <p>Parameters:
+ * -outputPath Sets the path to where the result data is written.
+ * -recordsPerSecond Sets the output record frequency.
+ * -durationInSecond Sets the running duration of the job.
+ * -offsetInSecond Sets the startup delay before the processing first message.
+ */
+public class PeriodicStreamingJob {
+
+ public static void main(String[] args) throws Exception {
+ ParameterTool params = ParameterTool.fromArgs(args);
+ String outputPath = params.getRequired("outputPath");
+ int recordsPerSecond = params.getInt("recordsPerSecond", 10);
+ int duration = params.getInt("durationInSecond", 60);
+ int offset = params.getInt("offsetInSecond", 0);
+
+ StreamExecutionEnvironment sEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
+
sEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+ sEnv.enableCheckpointing(4000);
+ sEnv.getConfig().setAutoWatermarkInterval(1000);
+
+ // execute a simple pass through program.
+ PeriodicSourceGenerator generator = new PeriodicSourceGenerator(
+ recordsPerSecond, duration, offset);
+ DataStream rows = sEnv.addSource(generator);
+
+ DataStream result = rows
+ .keyBy(1)
+ .timeWindow(Time.seconds(5))
+ .sum(0);
+
+ result.writeAsText(outputPath + "/result.txt",
FileSystem.WriteMode.OVERWRITE)
+ .setParallelism(1);
+
+ sEnv.execute();
+ }
+
+ /**
+ * Data-generating source function.
+ */
+ public static class PeriodicSourceGenerator implements
SourceFunction<Tuple>, ResultTypeQueryable<Tuple>, ListCheckpointed<Long> {
+ private final int sleepMs;
+ private final int durationMs;
+ private final int offsetSeconds;
+ private long ms = 0;
+
+ public PeriodicSourceGenerator(float rowsPerSecond, int
durationSeconds, int offsetSeconds) {
+ this.durationMs = durationSeconds * 1000;
+ this.sleepMs = (int) (1000 / rowsPerSecond);
+ this.offsetSeconds = offsetSeconds;
+ }
+
+ @Override
+ public void run(SourceContext<Tuple> ctx) throws Exception {
+ long offsetMs = offsetSeconds * 1000L;
+
+ while (ms < durationMs) {
+ synchronized (ctx.getCheckpointLock()) {
+ ctx.collect(new Tuple2<>(ms + offsetMs,
"key"));
+ }
+ ms += sleepMs;
+ Thread.sleep(sleepMs);
+ }
+ }
+
+ @Override
+ public void cancel() { }
+
+ @Override
+ public TypeInformation<Tuple> getProducedType() {
+ return Types.TUPLE(Types.LONG, Types.STRING);
+ }
+
+ @Override
+ public List<Long> snapshotState(long checkpointId, long
timestamp) {
+ return Collections.singletonList(ms);
+ }
+
+ @Override
+ public void restoreState(List<Long> state) {
+ for (Long l : state) {
+ ms += l;
+ }
+ }
+ }
+}
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 581abc84d3c..4b1d287297b 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -35,6 +35,7 @@ under the License.
<name>flink-end-to-end-tests</name>
<modules>
+ <module>flink-api-test</module>
<module>flink-parent-child-classloading-test</module>
<module>flink-dataset-allround-test</module>
<module>flink-datastream-allround-test</module>
diff --git a/flink-end-to-end-tests/test-scripts/test_cli_api.sh
b/flink-end-to-end-tests/test-scripts/test_cli_api.sh
new file mode 100755
index 00000000000..fd9d305ea63
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_cli_api.sh
@@ -0,0 +1,216 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+
+# Test for CLI commands.
+# verify only the return code the content correctness of the API results.
+PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar
+JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)"
+SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\."
+JOB_INFO_PACT_DATA_SOURCE_REGEX_EXTRACTOR="\"pact\": \"(Data Source)\""
+JOB_INFO_PACT_DATA_SINK_REGEX_EXTRACTOR="\"pact\": \"(Data Sink)\""
+JOB_LIST_REGEX_EXTRACTOR_BY_STATUS="([0-9,a-f]*) :"
+
+EXIT_CODE=0
+
+function extract_job_id_from_job_submission_return() {
+ if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]];
+ then
+ JOB_ID="${BASH_REMATCH[1]}";
+ else
+ JOB_ID=""
+ fi
+ echo "$JOB_ID"
+}
+
+function extract_savepoint_path_from_savepoint_return() {
+ if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]];
+ then
+ SAVEPOINT_PATH="${BASH_REMATCH[1]}";
+ else
+ SAVEPOINT_PATH=""
+ fi
+ echo "$SAVEPOINT_PATH"
+}
+
+function extract_valid_pact_from_job_info_return() {
+ PACT_MATCH=0
+ if [[ $1 =~ $JOB_INFO_PACT_DATA_SOURCE_REGEX_EXTRACTOR ]];
+ then
+ PACT_MATCH=$PACT_MATCH
+ else
+ PACT_MATCH=-1
+ fi
+ if [[ $1 =~ $JOB_INFO_PACT_DATA_SINK_REGEX_EXTRACTOR ]];
+ then
+ PACT_MATCH=$PACT_MATCH
+ else
+ PACT_MATCH=-1
+ fi
+ echo ${PACT_MATCH}
+}
+
+function extract_valid_job_list_by_type_from_job_list_return() {
+ JOB_LIST_MATCH=0
+ JOB_LIST_REGEX_EXTRACTOR="$JOB_LIST_REGEX_EXTRACTOR_BY_STATUS $2 $3"
+ if [[ $1 =~ $JOB_LIST_REGEX_EXTRACTOR ]];
+ then
+ JOB_LIST_MATCH=$JOB_LIST_MATCH
+ else
+ JOB_LIST_MATCH=-1
+ fi
+ echo ${JOB_LIST_MATCH}
+}
+
+function extract_task_manager_slot_request_count() {
+ COUNT=`grep "Receive slot request" $FLINK_DIR/log/*taskexecutor*.log | wc
-l`
+ echo $COUNT
+}
+
+function cleanup_cli_test() {
+ $FLINK_DIR/bin/taskmanager.sh stop-all
+
+ cleanup
+}
+
+if [ $EXIT_CODE == 0 ]; then
+ printf
"\n==============================================================================\n"
+ printf "Test default job launch with non-detach mode\n"
+ printf
"==============================================================================\n"
+ eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar"
+ EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+ printf
"\n==============================================================================\n"
+ printf "Test job launch with complex parameter set\n"
+ printf
"==============================================================================\n"
+ eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \
+ -c org.apache.flink.examples.java.wordcount.WordCount \
+ $FLINK_DIR/examples/batch/WordCount.jar \
+ --input file:///$FLINK_DIR/README.txt \
+ --output file:///${TEST_DATA_DIR}/out/result"
+ EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+ printf
"\n==============================================================================\n"
+ printf "Validate job launch parallelism configuration\n"
+ printf
"==============================================================================\n"
+ RECEIVED_TASKMGR_REQUEST=`extract_task_manager_slot_request_count`
+ # expected 1 from default launch and 4 from complex parameter set.
+ if [[ $RECEIVED_TASKMGR_REQUEST == 5 ]]; then
+ EXIT_CODE=0
+ else
+ EXIT_CODE=-1
+ fi
+fi
+
+printf
"\n==============================================================================\n"
+printf "Test information APIs\n"
+printf
"==============================================================================\n"
+if [ $EXIT_CODE == 0 ]; then
+ RETURN=`$FLINK_DIR/bin/flink info $FLINK_DIR/examples/batch/WordCount.jar`
+ echo "job info returns: $RETURN"
+ PACT_MATCH=`extract_valid_pact_from_job_info_return "$RETURN"`
+ echo "job info regex match: $PACT_MATCH"
+ if [[ $PACT_MATCH == -1 ]]; then # expect at least a Data Source and a
Data Sink pact match
+ EXIT_CODE=-1
+ else
+ EXIT_CODE=0
+ fi
+fi
+
+printf
"\n==============================================================================\n"
+printf "Test operation on running streaming jobs\n"
+printf
"==============================================================================\n"
+JOB_ID=""
+if [ $EXIT_CODE == 0 ]; then
+ RETURN=`$FLINK_DIR/bin/flink run -d \
+ $PERIODIC_JOB_JAR --outputPath file:///${TEST_DATA_DIR}/out/result`
+ echo "job submission returns: $RETURN"
+ JOB_ID=`extract_job_id_from_job_submission_return "$RETURN"`
+ EXIT_CODE=$? # expect matching job id extraction
+fi
+
+printf
"\n==============================================================================\n"
+printf "Test list API on a streaming job \n"
+printf
"==============================================================================\n"
+if [ $EXIT_CODE == 0 ]; then
+ RETURN=`$FLINK_DIR/bin/flink list -a`
+ echo "job list all returns: $RETURN"
+ JOB_LIST_MATCH=`extract_valid_job_list_by_type_from_job_list_return
"$RETURN" "Flink Streaming Job" ""`
+ echo "job list all regex match: $JOB_LIST_MATCH"
+ if [[ $JOB_LIST_MATCH == -1 ]]; then # expect match for all job
+ EXIT_CODE=-1
+ else
+ EXIT_CODE=0
+ fi
+fi
+if [ $EXIT_CODE == 0 ]; then
+ RETURN=`$FLINK_DIR/bin/flink list -r`
+ echo "job list running returns: $RETURN"
+ JOB_LIST_MATCH=`extract_valid_job_list_by_type_from_job_list_return
"$RETURN" "Flink Streaming Job" "\(RUNNING\)"`
+ echo "job list running regex match: $JOB_LIST_MATCH"
+ if [[ $JOB_LIST_MATCH == -1 ]]; then # expect match for running job
+ EXIT_CODE=-1
+ else
+ EXIT_CODE=0
+ fi
+fi
+if [ $EXIT_CODE == 0 ]; then
+ RETURN=`$FLINK_DIR/bin/flink list -s`
+ echo "job list scheduled returns: $RETURN"
+ JOB_LIST_MATCH=`extract_valid_job_list_by_type_from_job_list_return
"$RETURN" "Flink Streaming Job" "\(CREATED\)"`
+ echo "job list scheduled regex match: $JOB_LIST_MATCH"
+ if [[ $JOB_LIST_MATCH == -1 ]]; then # expect no match for scheduled job
+ EXIT_CODE=0
+ else
+ EXIT_CODE=-1
+ fi
+fi
+
+printf
"\n==============================================================================\n"
+printf "Test canceling a running streaming jobs\n"
+printf
"==============================================================================\n"
+if [ $EXIT_CODE == 0 ]; then
+ eval "$FLINK_DIR/bin/flink cancel ${JOB_ID}"
+ EXIT_CODE=$?
+fi
+
+printf
"\n==============================================================================\n"
+printf "Cleaning up... \n"
+printf
"==============================================================================\n"
+trap cleanup_cli_test INT
+trap cleanup_cli_test EXIT
+
+if [ $EXIT_CODE == 0 ];
+ then
+ echo "All CLI test passed!";
+ else
+ echo "CLI test failed: $EXIT_CODE";
+ PASS=""
+ exit 1
+fi
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> End-to-end test: CLI
> --------------------
>
> Key: FLINK-8985
> URL: https://issues.apache.org/jira/browse/FLINK-8985
> Project: Flink
> Issue Type: Sub-task
> Components: Client, Tests
> Affects Versions: 1.5.0
> Reporter: Till Rohrmann
> Assignee: Rong Rong
> Priority: Major
> Labels: pull-request-available
>
> We should add end-to-end test which verifies that all client commands are
> working correctly.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)