This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new becee2f [FLINK-13441][e2e] Add e2e test for SQL batch job
becee2f is described below
commit becee2f5db078231d32d207a2a2e708ecaaaedc1
Author: Aleksey Pak <[email protected]>
AuthorDate: Tue Jul 30 19:33:04 2019 +0200
[FLINK-13441][e2e] Add e2e test for SQL batch job
This closes #9359.
---
.../flink-batch-sql-test/pom.xml | 73 ++++++++++
.../flink/sql/tests/BatchSQLTestProgram.java | 160 +++++++++++++++++++++
flink-end-to-end-tests/pom.xml | 1 +
flink-end-to-end-tests/run-nightly-tests.sh | 1 +
.../test-scripts/test_batch_sql.sh | 88 ++++++++++++
tools/travis/splits/split_misc.sh | 1 +
tools/travis/splits/split_misc_hadoopfree.sh | 1 +
7 files changed, 325 insertions(+)
diff --git a/flink-end-to-end-tests/flink-batch-sql-test/pom.xml
b/flink-end-to-end-tests/flink-batch-sql-test/pom.xml
new file mode 100644
index 0000000..fe5eadd
--- /dev/null
+++ b/flink-end-to-end-tests/flink-batch-sql-test/pom.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <parent>
+ <artifactId>flink-end-to-end-tests</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>1.10-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>flink-batch-sql-test_${scala.binary.version}</artifactId>
+ <name>flink-batch-sql-test</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>BatchSQLTestProgram</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+
<finalName>BatchSQLTestProgram</finalName>
+ <transformers>
+ <transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+
<mainClass>org.apache.flink.sql.tests.BatchSQLTestProgram</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git
a/flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java
b/flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java
new file mode 100644
index 0000000..615b6d6
--- /dev/null
+++
b/flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.io.IteratorInputFormat;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sinks.CsvTableSink;
+import org.apache.flink.table.sources.InputFormatTableSource;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+
+import java.io.Serializable;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * End-to-end test for batch SQL queries.
+ *
+ * <p>The sources are generated and bounded. The result is always constant.
+ *
+ * <p>Parameters:
+ * -outputPath output file path for CsvTableSink;
+ * -sqlStatement SQL statement that will be executed as sqlUpdate
+ */
+public class BatchSQLTestProgram {
+
+ public static void main(String[] args) throws Exception {
+ ParameterTool params = ParameterTool.fromArgs(args);
+ String outputPath = params.getRequired("outputPath");
+ String sqlStatement = params.getRequired("sqlStatement");
+
+ TableEnvironment tEnv =
TableEnvironment.create(EnvironmentSettings.newInstance()
+ .useBlinkPlanner()
+ .inBatchMode()
+ .build());
+
+ tEnv.registerTableSource("table1", new GeneratorTableSource(10,
100, 60, 0));
+ tEnv.registerTableSource("table2", new GeneratorTableSource(5,
0.2f, 60, 5));
+ tEnv.registerTableSink("sinkTable",
+ new CsvTableSink(outputPath)
+ .configure(new String[]{"f0", "f1"}, new
TypeInformation[]{Types.INT, Types.SQL_TIMESTAMP}));
+
+ tEnv.sqlUpdate(sqlStatement);
+ tEnv.execute("TestSqlJob");
+ }
+
+ /**
+ * TableSource for generated data.
+ */
+ public static class GeneratorTableSource extends
InputFormatTableSource<Row> {
+
+ private final int numKeys;
+ private final float recordsPerKeyAndSecond;
+ private final int durationSeconds;
+ private final int offsetSeconds;
+
+ GeneratorTableSource(int numKeys, float recordsPerKeyAndSecond,
int durationSeconds, int offsetSeconds) {
+ this.numKeys = numKeys;
+ this.recordsPerKeyAndSecond = recordsPerKeyAndSecond;
+ this.durationSeconds = durationSeconds;
+ this.offsetSeconds = offsetSeconds;
+ }
+
+ @Override
+ public InputFormat<Row, ?> getInputFormat() {
+ return new IteratorInputFormat<>(
+ DataGenerator.create(numKeys,
recordsPerKeyAndSecond, durationSeconds, offsetSeconds));
+ }
+
+ @Override
+ public DataType getProducedDataType() {
+ return getTableSchema().toRowDataType();
+ }
+
+ @Override
+ public TableSchema getTableSchema() {
+ return TableSchema.builder()
+ .field("key", DataTypes.INT())
+ .field("rowtime", DataTypes.TIMESTAMP(3))
+ .field("payload", DataTypes.STRING())
+ .build();
+ }
+ }
+
+ /**
+ * Iterator for generated data.
+ */
+ public static class DataGenerator implements Iterator<Row>,
Serializable {
+ private static final long serialVersionUID = 1L;
+
+ final int numKeys;
+
+ private int keyIndex = 0;
+
+ private final long durationMs;
+ private final long stepMs;
+ private final long offsetMs;
+ private long ms = 0;
+
+ static DataGenerator create(int numKeys, float
rowsPerKeyAndSecond, int durationSeconds, int offsetSeconds) {
+ int sleepMs = (int) (1000 / rowsPerKeyAndSecond);
+ return new DataGenerator(numKeys, durationSeconds *
1000, sleepMs, offsetSeconds * 2000L);
+ }
+
+ DataGenerator(int numKeys, long durationMs, long stepMs, long
offsetMs) {
+ this.numKeys = numKeys;
+ this.durationMs = durationMs;
+ this.stepMs = stepMs;
+ this.offsetMs = offsetMs;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return ms < durationMs;
+ }
+
+ @Override
+ public Row next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ Row row = Row.of(
+ keyIndex,
+ LocalDateTime.ofInstant(Instant.ofEpochMilli(ms
+ offsetMs), ZoneOffset.UTC),
+ "Some payload...");
+ ++keyIndex;
+ if (keyIndex >= numKeys) {
+ keyIndex = 0;
+ ms += stepMs;
+ }
+ return row;
+ }
+ }
+}
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 85af090..4ae2c2d 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -41,6 +41,7 @@ under the License.
<module>flink-dataset-allround-test</module>
<module>flink-dataset-fine-grained-recovery-test</module>
<module>flink-datastream-allround-test</module>
+ <module>flink-batch-sql-test</module>
<module>flink-stream-sql-test</module>
<module>flink-bucketing-sink-test</module>
<module>flink-distributed-cache-via-blob-test</module>
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh
b/flink-end-to-end-tests/run-nightly-tests.sh
index 1f3b445..3b34b560 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -123,6 +123,7 @@ run_test "Queryable state (rocksdb) end-to-end test"
"$END_TO_END_DIR/test-scrip
run_test "Queryable state (rocksdb) with TM restart end-to-end test"
"$END_TO_END_DIR/test-scripts/test_queryable_state_restart_tm.sh"
"skip_check_exceptions"
run_test "DataSet allround end-to-end test"
"$END_TO_END_DIR/test-scripts/test_batch_allround.sh"
+run_test "Batch SQL end-to-end test"
"$END_TO_END_DIR/test-scripts/test_batch_sql.sh"
run_test "Streaming SQL end-to-end test (Old planner)"
"$END_TO_END_DIR/test-scripts/test_streaming_sql.sh old" "skip_check_exceptions"
run_test "Streaming SQL end-to-end test (Blink planner)"
"$END_TO_END_DIR/test-scripts/test_streaming_sql.sh blink"
"skip_check_exceptions"
run_test "Streaming bucketing end-to-end test"
"$END_TO_END_DIR/test-scripts/test_streaming_bucketing.sh"
"skip_check_exceptions"
diff --git a/flink-end-to-end-tests/test-scripts/test_batch_sql.sh
b/flink-end-to-end-tests/test-scripts/test_batch_sql.sh
new file mode 100755
index 0000000..dc9b931
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_batch_sql.sh
@@ -0,0 +1,88 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Test for SQL (batch mode) job that runs successfully on a Flink cluster with
fewer slots (1) than job's total slots (9).
+set -Eeuo pipefail
+
+source "$(dirname "$0")"/common.sh
+
+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-batch-sql-test/target/BatchSQLTestProgram.jar
+
+OUTPUT_FILE_PATH="${TEST_DATA_DIR}/out/result/results.csv"
+
+function sqlJobQuery() {
+ local tumbleWindowSizeSeconds=10
+
+ overQuery=$(cat <<SQL
+SELECT key, rowtime, 42 AS cnt FROM table1
+SQL
+)
+
+ tumbleQuery=$(cat <<SQL
+SELECT
+ key,
+ CASE SUM(cnt) / COUNT(*) WHEN 101 THEN 1 WHEN -1 THEN NULL ELSE 99 END AS
correct,
+ TUMBLE_START(rowtime, INTERVAL '${tumbleWindowSizeSeconds}' SECOND) AS
wStart,
+ TUMBLE_ROWTIME(rowtime, INTERVAL '${tumbleWindowSizeSeconds}' SECOND) AS
rowtime
+FROM (${overQuery})
+WHERE rowtime > TIMESTAMP '1970-01-01 00:00:01'
+GROUP BY key, TUMBLE(rowtime, INTERVAL '${tumbleWindowSizeSeconds}' SECOND)
+SQL
+)
+
+ joinQuery=$(cat <<SQL
+SELECT
+ t1.key,
+ t2.rowtime AS rowtime,
+ t2.correct,
+ t2.wStart
+FROM table2 t1, (${tumbleQuery}) t2
+WHERE
+ t1.key = t2.key AND
+ t1.rowtime BETWEEN t2.rowtime AND t2.rowtime + INTERVAL
'${tumbleWindowSizeSeconds}' SECOND
+SQL
+)
+
+ echo "
+SELECT
+ SUM(correct) AS correct,
+ TUMBLE_START(rowtime, INTERVAL '20' SECOND) AS rowtime
+FROM (${joinQuery})
+GROUP BY TUMBLE(rowtime, INTERVAL '20' SECOND)"
+}
+
+set_config_key "taskmanager.numberOfTaskSlots" "1"
+
+function sql_cleanup() {
+ stop_cluster
+ $FLINK_DIR/bin/taskmanager.sh stop-all
+}
+on_exit sql_cleanup
+
+start_cluster
+
+# The task has total 2 x (1 + 1 + 1 + 1) + 1 = 9 slots
+$FLINK_DIR/bin/flink run -p 2 $TEST_PROGRAM_JAR -outputPath
"file://${OUTPUT_FILE_PATH}" -sqlStatement \
+ "INSERT INTO sinkTable $(sqlJobQuery)"
+
+# check result:
+#1980,1970-01-01 00:00:00.0
+#1980,1970-01-01 00:00:20.0
+#1980,1970-01-01 00:00:40.0
+check_result_hash "BatchSQL" "${OUTPUT_FILE_PATH}"
"c7ccd2c3a25c3e06616806cf6aecaa66"
diff --git a/tools/travis/splits/split_misc.sh
b/tools/travis/splits/split_misc.sh
index 9bc5a27..97f811d 100755
--- a/tools/travis/splits/split_misc.sh
+++ b/tools/travis/splits/split_misc.sh
@@ -49,6 +49,7 @@ run_test "Queryable state (rocksdb) end-to-end test"
"$END_TO_END_DIR/test-scrip
run_test "Queryable state (rocksdb) with TM restart end-to-end test"
"$END_TO_END_DIR/test-scripts/test_queryable_state_restart_tm.sh"
"skip_check_exceptions"
run_test "DataSet allround end-to-end test"
"$END_TO_END_DIR/test-scripts/test_batch_allround.sh"
+run_test "Batch SQL end-to-end test"
"$END_TO_END_DIR/test-scripts/test_batch_sql.sh"
run_test "Streaming SQL end-to-end test (Old planner)"
"$END_TO_END_DIR/test-scripts/test_streaming_sql.sh old" "skip_check_exceptions"
run_test "Streaming SQL end-to-end test (Blink planner)"
"$END_TO_END_DIR/test-scripts/test_streaming_sql.sh blink"
"skip_check_exceptions"
run_test "Streaming bucketing end-to-end test"
"$END_TO_END_DIR/test-scripts/test_streaming_bucketing.sh"
"skip_check_exceptions"
diff --git a/tools/travis/splits/split_misc_hadoopfree.sh
b/tools/travis/splits/split_misc_hadoopfree.sh
index a2914ab..193ad0b 100755
--- a/tools/travis/splits/split_misc_hadoopfree.sh
+++ b/tools/travis/splits/split_misc_hadoopfree.sh
@@ -49,6 +49,7 @@ run_test "Queryable state (rocksdb) end-to-end test"
"$END_TO_END_DIR/test-scrip
run_test "Queryable state (rocksdb) with TM restart end-to-end test"
"$END_TO_END_DIR/test-scripts/test_queryable_state_restart_tm.sh"
"skip_check_exceptions"
run_test "DataSet allround end-to-end test"
"$END_TO_END_DIR/test-scripts/test_batch_allround.sh"
+run_test "Batch SQL end-to-end test"
"$END_TO_END_DIR/test-scripts/test_batch_sql.sh"
run_test "Streaming SQL end-to-end test (Old planner)"
"$END_TO_END_DIR/test-scripts/test_streaming_sql.sh old" "skip_check_exceptions"
run_test "Streaming SQL end-to-end test (Blink planner)"
"$END_TO_END_DIR/test-scripts/test_streaming_sql.sh blink"
"skip_check_exceptions"
run_test "Streaming File Sink end-to-end test"
"$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh"
"skip_check_exceptions"