This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
commit c465ce1fe4d4dd7188d3cc77d183ac38a8af7e2c Author: TsReaper <[email protected]> AuthorDate: Thu Aug 1 15:55:18 2019 +0800 [FLINK-13436][e2e] Add TPC-H queries as E2E tests This closes #9312 --- flink-end-to-end-tests/flink-tpch-test/pom.xml | 60 ++++ .../apache/flink/table/tpch/TpchDataGenerator.java | 127 ++++++++ .../flink/table/tpch/TpchResultComparator.java | 123 ++++++++ flink-end-to-end-tests/pom.xml | 1 + flink-end-to-end-tests/run-nightly-tests.sh | 2 + .../test-data/tpch/modified-query/q11.sql | 30 ++ .../test-data/tpch/modified-query/q15.sql | 69 ++++ .../test-data/tpch/modified-query/q20.sql | 36 +++ .../test-data/tpch/modified-query/q6.sql | 11 + .../test-scripts/test-data/tpch/sink/q1.yaml | 51 +++ .../test-scripts/test-data/tpch/sink/q10.yaml | 43 +++ .../test-scripts/test-data/tpch/sink/q11.yaml | 19 ++ .../test-scripts/test-data/tpch/sink/q12.yaml | 23 ++ .../test-scripts/test-data/tpch/sink/q13.yaml | 19 ++ .../test-scripts/test-data/tpch/sink/q14.yaml | 15 + .../test-scripts/test-data/tpch/sink/q15.yaml | 31 ++ .../test-scripts/test-data/tpch/sink/q16.yaml | 27 ++ .../test-scripts/test-data/tpch/sink/q17.yaml | 15 + .../test-scripts/test-data/tpch/sink/q18.yaml | 35 +++ .../test-scripts/test-data/tpch/sink/q19.yaml | 15 + .../test-scripts/test-data/tpch/sink/q2.yaml | 43 +++ .../test-scripts/test-data/tpch/sink/q20.yaml | 19 ++ .../test-scripts/test-data/tpch/sink/q21.yaml | 19 ++ .../test-scripts/test-data/tpch/sink/q22.yaml | 23 ++ .../test-scripts/test-data/tpch/sink/q3.yaml | 27 ++ .../test-scripts/test-data/tpch/sink/q4.yaml | 19 ++ .../test-scripts/test-data/tpch/sink/q5.yaml | 19 ++ .../test-scripts/test-data/tpch/sink/q6.yaml | 15 + .../test-scripts/test-data/tpch/sink/q7.yaml | 27 ++ .../test-scripts/test-data/tpch/sink/q8.yaml | 19 ++ .../test-scripts/test-data/tpch/sink/q9.yaml | 23 ++ .../test-scripts/test-data/tpch/source.yaml | 349 +++++++++++++++++++++ flink-end-to-end-tests/test-scripts/test_tpch.sh | 91 ++++++ pom.xml | 2 +- 34 files changed, 1446 insertions(+), 1 deletion(-) diff --git a/flink-end-to-end-tests/flink-tpch-test/pom.xml b/flink-end-to-end-tests/flink-tpch-test/pom.xml new file mode 100644 index 0000000..e317f31 --- /dev/null +++ b/flink-end-to-end-tests/flink-tpch-test/pom.xml @@ -0,0 +1,60 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>flink-end-to-end-tests</artifactId> + <groupId>org.apache.flink</groupId> + <version>1.10-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>flink-tpch-test</artifactId> + + <dependencies> + <dependency> + <groupId>io.airlift.tpch</groupId> + <artifactId>tpch</artifactId> + <version>0.10</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <version>2.10</version> + <executions> + <execution> + <id>copy-dependencies</id> + <phase>package</phase> + <goals> + <goal>copy-dependencies</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/lib</outputDirectory> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> diff --git a/flink-end-to-end-tests/flink-tpch-test/src/main/java/org/apache/flink/table/tpch/TpchDataGenerator.java b/flink-end-to-end-tests/flink-tpch-test/src/main/java/org/apache/flink/table/tpch/TpchDataGenerator.java new file mode 100644 index 0000000..4646a02 --- /dev/null +++ b/flink-end-to-end-tests/flink-tpch-test/src/main/java/org/apache/flink/table/tpch/TpchDataGenerator.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.tpch; + +import io.airlift.tpch.TpchEntity; +import io.airlift.tpch.TpchTable; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; + +/** + * TPC-H test data generator. + */ +public class TpchDataGenerator { + + public static final int QUERY_NUM = 22; + + public static void main(String[] args) throws IOException { + if (args.length != 2) { + System.out.println("Exactly 1 double value and 1 path should be provided as argument"); + return; + } + + double scale = Double.valueOf(args[0]); + String path = args[1]; + generateTable(scale, path); + generateQuery(path); + generateExpected(path); + } + + private static void generateTable(double scale, String path) throws IOException { + File dir = new File(path + "/table"); + dir.mkdir(); + + for (TpchTable table : TpchTable.getTables()) { + Iterable generator = table.createGenerator(scale, 1, 1); + + StringBuilder builder = new StringBuilder(); + generator.forEach(s -> { + String line = ((TpchEntity) s).toLine().trim(); + if (line.endsWith("|")) { + line = line.substring(0, line.length() - 1); + } + builder.append(line).append('\n'); + }); + + try (BufferedWriter writer = new BufferedWriter( + new OutputStreamWriter( + new FileOutputStream(path + "/table/" + table.getTableName() + ".csv"))) + ) { + writer.write(builder.toString()); + } + } + } + + private static void generateQuery(String path) throws IOException { + File dir = new File(path + "/query"); + dir.mkdir(); + + for (int i = 0; i < QUERY_NUM; i++) { + try ( + InputStream in = TpchDataGenerator.class.getResourceAsStream( + "/io/airlift/tpch/queries/q" + (i + 1) + ".sql"); + OutputStream out = new FileOutputStream(path + "/query/q" + (i + 1) + ".sql") + ) { + byte[] buffer = new byte[4096]; + int bytesRead = 0; + while ((bytesRead = in.read(buffer)) > 0) { + out.write(buffer, 0, bytesRead); + } + } + } + } + + private static void generateExpected(String path) throws IOException { + File dir = new File(path + "/expected"); + dir.mkdir(); + + for (int i = 0; i < QUERY_NUM; i++) { + try ( + BufferedReader reader = new BufferedReader( + new InputStreamReader(TpchDataGenerator.class.getResourceAsStream( + "/io/airlift/tpch/queries/q" + (i + 1) + ".result"))); + BufferedWriter writer = new BufferedWriter( + new OutputStreamWriter( + new FileOutputStream(path + "/expected/q" + (i + 1) + ".csv"))) + ) { + int lineNumber = 0; + String line; + while ((line = reader.readLine()) != null) { + line = line.trim().replace("null", ""); + lineNumber++; + if (lineNumber == 1) { + continue; + } + if (line.length() > 0 && line.endsWith("|")) { + line = line.substring(0, line.length() - 1); + } + writer.write(line + "\n"); + } + } + } + } +} diff --git a/flink-end-to-end-tests/flink-tpch-test/src/main/java/org/apache/flink/table/tpch/TpchResultComparator.java b/flink-end-to-end-tests/flink-tpch-test/src/main/java/org/apache/flink/table/tpch/TpchResultComparator.java new file mode 100644 index 0000000..e9bbea3 --- /dev/null +++ b/flink-end-to-end-tests/flink-tpch-test/src/main/java/org/apache/flink/table/tpch/TpchResultComparator.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.tpch; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; + +/** + * Result comparator for TPC-H test, according to the TPC-H standard specification v2.18.0. + */ +public class TpchResultComparator { + + public static void main(String[] args) throws IOException { + if (args.length != 2) { + System.out.println( + "Exactly 2 paths must be provided, the expected result path and the actual result path"); + System.exit(1); + } + + String expectedPath = args[0]; + String actualPath = args[1]; + + try ( + BufferedReader expectedReader = new BufferedReader(new FileReader(expectedPath)); + BufferedReader actualReader = new BufferedReader(new FileReader(actualPath)) + ) { + int expectedLineNum = 0; + int actualLineNum = 0; + + String expectedLine, actualLine; + while ( + (expectedLine = expectedReader.readLine()) != null && + (actualLine = actualReader.readLine()) != null + ) { + String[] expected = expectedLine.split("\\|"); + expectedLineNum++; + String[] actual = actualLine.split("\\|"); + actualLineNum++; + + if (expected.length != actual.length) { + System.out.println( + "Incorrect number of columns on line " + actualLineNum + + "! Expecting " + expected.length + " columns, but found " + actual.length + " columns."); + System.exit(1); + } + for (int i = 0; i < expected.length; i++) { + boolean failed; + try { + long e = Long.valueOf(expected[i]); + long a = Long.valueOf(actual[i]); + failed = (e != a); + } catch (NumberFormatException nfe) { + try { + double e = Double.valueOf(expected[i]); + double a = Double.valueOf(actual[i]); + if (e < 0 && a > 0 || e > 0 && a < 0) { + failed = true; + } else { + if (e < 0) { + e = -e; + a = -a; + } + double t = round(a, 2); + // defined in TPC-H standard specification v2.18.0 section 2.1.3.5 + failed = (e * 0.99 > t || e * 1.01 < t); + } + } catch (NumberFormatException nfe2) { + failed = !expected[i].trim().equals(actual[i].trim()); + } + } + if (failed) { + System.out.println("Incorrect result on line " + actualLineNum + " column " + (i + 1) + + "! Expecting " + expected[i] + ", but found " + actual[i] + "."); + System.exit(1); + } + } + } + + while (expectedReader.readLine() != null) { + expectedLineNum++; + } + while (actualReader.readLine() != null) { + actualLineNum++; + } + if (expectedLineNum != actualLineNum) { + System.out.println( + "Incorrect number of lines! Expecting " + expectedLineNum + + " lines, but found " + actualLineNum + " lines."); + System.exit(1); + } + } + } + + /** + * Rounding function defined in TPC-H standard specification v2.18.0 chapter 10. + */ + private static double round(double x, int m) { + if (x < 0) { + throw new IllegalArgumentException("x must be non-negative"); + } + double y = x + 5 * Math.pow(10, -m - 1); + double z = y * Math.pow(10, m); + double q = Math.floor(z); + return q / Math.pow(10, m); + } +} diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml index d3dd283..5ffa54d 100644 --- a/flink-end-to-end-tests/pom.xml +++ b/flink-end-to-end-tests/pom.xml @@ -68,6 +68,7 @@ under the License. <module>flink-streaming-kafka011-test</module> <module>flink-streaming-kafka010-test</module> <module>flink-plugins-test</module> + <module>flink-tpch-test</module> </modules> <!-- See main pom.xml for explanation of profiles --> diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index 96da7b8..afbd98f 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -149,6 +149,8 @@ run_test "SQL Client end-to-end test for Kafka 0.10" "$END_TO_END_DIR/test-scrip run_test "SQL Client end-to-end test for Kafka 0.11" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka011.sh" run_test "SQL Client end-to-end test for modern Kafka" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka.sh" +run_test "TPC-H end-to-end test (Blink planner)" "$END_TO_END_DIR/test-scripts/test_tpch.sh" + run_test "Heavy deployment end-to-end test" "$END_TO_END_DIR/test-scripts/test_heavy_deployment.sh" "skip_check_exceptions" run_test "ConnectedComponents iterations with high parallelism end-to-end test" "$END_TO_END_DIR/test-scripts/test_high_parallelism_iterations.sh 25" diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query/q11.sql b/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query/q11.sql new file mode 100644 index 0000000..be2cd09 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query/q11.sql @@ -0,0 +1,30 @@ +-- database: presto; groups: tpch; tables: partsupp,supplier,nation +SELECT + ps_partkey, +-- sum(ps_supplycost * ps_availqty) AS value + sum(ps_supplycost * ps_availqty) AS `value` +FROM + partsupp, + supplier, + nation +WHERE + ps_suppkey = s_suppkey + AND s_nationkey = n_nationkey + AND n_name = 'GERMANY' +GROUP BY + ps_partkey +HAVING + sum(ps_supplycost * ps_availqty) > ( + SELECT sum(ps_supplycost * ps_availqty) * 0.0001 + FROM + partsupp, + supplier, + nation + WHERE + ps_suppkey = s_suppkey + AND s_nationkey = n_nationkey + AND n_name = 'GERMANY' + ) +ORDER BY +-- value DESC + `value` DESC diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query/q15.sql b/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query/q15.sql new file mode 100644 index 0000000..8182b3e --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query/q15.sql @@ -0,0 +1,69 @@ +-- database: presto; groups: tpch; tables: lineitem,supplier +-- CREATE OR REPLACE VIEW revenue AS +-- SELECT +-- l_suppkey AS supplier_no, +-- sum(l_extendedprice * (1 - l_discount)) AS total_revenue +-- FROM +-- lineitem +-- WHERE +-- l_shipdate >= DATE '1996-01-01' +-- AND l_shipdate < DATE '1996-01-01' + INTERVAL '3' MONTH +-- GROUP BY +-- l_suppkey; +-- +-- SELECT +-- s_suppkey, +-- s_name, +-- s_address, +-- s_phone, +-- total_revenue +-- FROM +-- supplier, +-- revenue +-- WHERE +-- s_suppkey = supplier_no +-- AND total_revenue = ( +-- SELECT max(total_revenue) +-- FROM +-- revenue +-- ) +-- ORDER BY +-- s_suppkey; +-- Blink does not support view + +SELECT + s_suppkey, + s_name, + s_address, + s_phone, + total_revenue +FROM + supplier, ( + SELECT + l_suppkey AS supplier_no, + sum(l_extendedprice * (1 - l_discount)) AS total_revenue + FROM + lineitem + WHERE + l_shipdate >= DATE '1996-01-01' + AND l_shipdate < DATE '1996-01-01' + INTERVAL '3' MONTH + GROUP BY + l_suppkey) AS revenue +WHERE + s_suppkey = supplier_no + AND total_revenue = ( + SELECT max(total_revenue) + FROM ( + SELECT + l_suppkey AS supplier_no, + sum(l_extendedprice * (1 - l_discount)) AS total_revenue + FROM + lineitem + WHERE + l_shipdate >= DATE '1996-01-01' + AND l_shipdate < DATE '1996-01-01' + INTERVAL '3' MONTH + GROUP BY + l_suppkey) AS revenue + ) +ORDER BY + s_suppkey; diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query/q20.sql b/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query/q20.sql new file mode 100644 index 0000000..7445398 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query/q20.sql @@ -0,0 +1,36 @@ +-- database: presto; groups: tpch; tables: supplier,nation,partsupp,lineitem,part +SELECT + s_name, + s_address +FROM + supplier, nation +WHERE + s_suppkey IN ( + SELECT ps_suppkey + FROM + partsupp + WHERE + ps_partkey IN ( + SELECT p_partkey + FROM + part + WHERE + p_name LIKE 'forest%' + ) + AND ps_availqty > ( + SELECT 0.5 * sum(l_quantity) + FROM + lineitem + WHERE + l_partkey = ps_partkey + AND l_suppkey = ps_suppkey + -- AND l_shipdate >= date('1994-01-01') + -- AND l_shipdate < date('1994-01-01') + interval '1' YEAR + -- Blink does not support the above format + AND l_shipdate >= date '1994-01-01' + AND l_shipdate < date '1994-01-01' + interval '1' YEAR +) +) +AND s_nationkey = n_nationkey +AND n_name = 'CANADA' +ORDER BY s_name diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query/q6.sql b/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query/q6.sql new file mode 100644 index 0000000..28fb52e --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query/q6.sql @@ -0,0 +1,11 @@ +-- database: presto; groups: tpch; tables: lineitem +SELECT sum(l_extendedprice * l_discount) AS revenue +FROM + lineitem +WHERE + l_shipdate >= DATE '1994-01-01' + AND l_shipdate < DATE '1994-01-01' + INTERVAL '1' YEAR +-- AND l_discount BETWEEN decimal '0.06' - decimal '0.01' AND decimal '0.06' + decimal '0.01' +-- Blink currently does not support the above feature +AND l_discount BETWEEN 0.06 - 0.01 AND 0.06 + 0.01 +AND l_quantity < 24 diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q1.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q1.yaml new file mode 100644 index 0000000..be89601 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q1.yaml @@ -0,0 +1,51 @@ +tables: + - name: q1 + type: sink-table + connector: + type: filesystem + path: "$RESULT_DIR/q1.csv" + format: + type: csv + fields: + - name: l_returnflag + type: VARCHAR + - name: l_linestatus + type: VARCHAR + - name: sum_qty + type: DOUBLE + - name: sum_base_price + type: DOUBLE + - name: sum_disc_price + type: DOUBLE + - name: sum_charge + type: DOUBLE + - name: avg_qty + type: DOUBLE + - name: avg_price + type: DOUBLE + - name: avg_disc + type: DOUBLE + - name: count_order + type: BIGINT + field-delimiter: "|" + schema: + - name: l_returnflag + type: VARCHAR + - name: l_linestatus + type: VARCHAR + - name: sum_qty + type: DOUBLE + - name: sum_base_price + type: DOUBLE + - name: sum_disc_price + type: DOUBLE + - name: sum_charge + type: DOUBLE + - name: avg_qty + type: DOUBLE + - name: avg_price + type: DOUBLE + - name: avg_disc + type: DOUBLE + - name: count_order + type: BIGINT diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q10.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q10.yaml new file mode 100644 index 0000000..3fcbb7d --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q10.yaml @@ -0,0 +1,43 @@ +tables: + - name: q10 + type: sink-table + connector: + type: filesystem + path: "$RESULT_DIR/q10.csv" + format: + type: csv + fields: + - name: c_custkey + type: BIGINT + - name: c_name + type: VARCHAR + - name: revenue + type: DOUBLE + - name: c_acctbal + type: DOUBLE + - name: n_name + type: VARCHAR + - name: c_address + type: VARCHAR + - name: c_phone + type: VARCHAR + - name: c_comment + type: VARCHAR + field-delimiter: "|" + schema: + - name: c_custkey + type: BIGINT + - name: c_name + type: VARCHAR + - name: revenue + type: DOUBLE + - name: c_acctbal + type: DOUBLE + - name: n_name + type: VARCHAR + - name: c_address + type: VARCHAR + - name: c_phone + type: VARCHAR + - name: c_comment + type: VARCHAR diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q11.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q11.yaml new file mode 100644 index 0000000..c729305 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q11.yaml @@ -0,0 +1,19 @@ +tables: + - name: q11 + type: sink-table + connector: + type: filesystem + path: "$RESULT_DIR/q11.csv" + format: + type: csv + fields: + - name: ps_partkey + type: BIGINT + - name: value + type: DOUBLE + field-delimiter: "|" + schema: + - name: ps_partkey + type: BIGINT + - name: value + type: DOUBLE diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q12.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q12.yaml new file mode 100644 index 0000000..d990cf0 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q12.yaml @@ -0,0 +1,23 @@ +tables: + - name: q12 + type: sink-table + connector: + type: filesystem + path: "$RESULT_DIR/q12.csv" + format: + type: csv + fields: + - name: l_shipmode + type: VARCHAR + - name: high_line_count + type: INT + - name: low_line_count + type: INT + field-delimiter: "|" + schema: + - name: l_shipmode + type: VARCHAR + - name: high_line_count + type: INT + - name: low_line_count + type: INT diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q13.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q13.yaml new file mode 100644 index 0000000..d3c30f9 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q13.yaml @@ -0,0 +1,19 @@ +tables: + - name: q13 + type: sink-table + connector: + type: filesystem + path: "$RESULT_DIR/q13.csv" + format: + type: csv + fields: + - name: c_count + type: BIGINT + - name: custdist + type: BIGINT + field-delimiter: "|" + schema: + - name: c_count + type: BIGINT + - name: custdist + type: BIGINT diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q14.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q14.yaml new file mode 100644 index 0000000..551339b --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q14.yaml @@ -0,0 +1,15 @@ +tables: + - name: q14 + type: sink-table + connector: + type: filesystem + path: "$RESULT_DIR/q14.csv" + format: + type: csv + fields: + - name: promo_revenue + type: DOUBLE + field-delimiter: "|" + schema: + - name: promo_revenue + type: DOUBLE diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q15.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q15.yaml new file mode 100644 index 0000000..069757a --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q15.yaml @@ -0,0 +1,31 @@ +tables: + - name: q15 + type: sink-table + connector: + type: filesystem + path: "$RESULT_DIR/q15.csv" + format: + type: csv + fields: + - name: s_suppkey + type: BIGINT + - name: s_name + type: VARCHAR + - name: s_address + type: VARCHAR + - name: s_phone + type: VARCHAR + - name: total_revenue + type: DOUBLE + field-delimiter: "|" + schema: + - name: s_suppkey + type: BIGINT + - name: s_name + type: VARCHAR + - name: s_address + type: VARCHAR + - name: s_phone + type: VARCHAR + - name: total_revenue + type: DOUBLE diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q16.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q16.yaml new file mode 100644 index 0000000..95b2293 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q16.yaml @@ -0,0 +1,27 @@ +tables: + - name: q16 + type: sink-table + connector: + type: filesystem + path: "$RESULT_DIR/q16.csv" + format: + type: csv + fields: + - name: p_brand + type: VARCHAR + - name: p_type + type: VARCHAR + - name: p_size + type: INT + - name: supplier_cnt + type: BIGINT + field-delimiter: "|" + schema: + - name: p_brand + type: VARCHAR + - name: p_type + type: VARCHAR + - name: p_size + type: INT + - name: supplier_cnt + type: BIGINT diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q17.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q17.yaml new file mode 100644 index 0000000..bf618a7 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q17.yaml @@ -0,0 +1,15 @@ +tables: + - name: q17 + type: sink-table + connector: + type: filesystem + path: "$RESULT_DIR/q17.csv" + format: + type: csv + fields: + - name: avg_yearly + type: DOUBLE + field-delimiter: "|" + schema: + - name: avg_yearly + type: DOUBLE diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q18.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q18.yaml new file mode 100644 index 0000000..303c8b4 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q18.yaml @@ -0,0 +1,35 @@ +tables: + - name: q18 + type: sink-table + connector: + type: filesystem + path: "$RESULT_DIR/q18.csv" + format: + type: csv + fields: + - name: c_name + type: VARCHAR + - name: c_custkey + type: BIGINT + - name: o_orderkey + type: BIGINT + - name: o_orderdate + type: DATE + - name: o_totalprice + type: DOUBLE + - name: sum(l_quantity) + type: DOUBLE + field-delimiter: "|" + schema: + - name: c_name + type: VARCHAR + - name: c_custkey + type: BIGINT + - name: o_orderkey + type: BIGINT + - name: o_orderdate + type: DATE + - name: o_totalprice + type: DOUBLE + - name: sum(l_quantity) + type: DOUBLE diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q19.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q19.yaml new file mode 100644 index 0000000..f84d177 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q19.yaml @@ -0,0 +1,15 @@ +tables: + - name: q19 + type: sink-table + connector: + type: filesystem + path: "$RESULT_DIR/q19.csv" + format: + type: csv + fields: + - name: revenue + type: DOUBLE + field-delimiter: "|" + schema: + - name: revenue + type: DOUBLE diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q2.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q2.yaml new file mode 100644 index 0000000..17f58b1 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q2.yaml @@ -0,0 +1,43 @@ +tables: + - name: q2 + type: sink-table + connector: + type: filesystem + path: "$RESULT_DIR/q2.csv" + format: + type: csv + fields: + - name: s_acctbal + type: DOUBLE + - name: s_name + type: VARCHAR + - name: n_name + type: VARCHAR + - name: p_partkey + type: BIGINT + - name: p_mfgr + type: VARCHAR + - name: s_addres + type: VARCHAR + - name: s_phone + type: VARCHAR + - name: s_comment + type: VARCHAR + field-delimiter: "|" + schema: + - name: s_acctbal + type: DOUBLE + - name: s_name + type: VARCHAR + - name: n_name + type: VARCHAR + - name: p_partkey + type: BIGINT + - name: p_mfgr + type: VARCHAR + - name: s_addres + type: VARCHAR + - name: s_phone + type: VARCHAR + - name: s_comment + type: VARCHAR diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q20.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q20.yaml new file mode 100644 index 0000000..ca58a53 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q20.yaml @@ -0,0 +1,19 @@ +tables: + - name: q20 + type: sink-table + connector: + type: filesystem + path: "$RESULT_DIR/q20.csv" + format: + type: csv + fields: + - name: s_name + type: VARCHAR + - name: s_address + type: VARCHAR + field-delimiter: "|" + schema: + - name: s_name + type: VARCHAR + - name: s_address + type: VARCHAR diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q21.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q21.yaml new file mode 100644 index 0000000..760573f --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q21.yaml @@ -0,0 +1,19 @@ +tables: + - name: q21 + type: sink-table + connector: + type: filesystem + path: "$RESULT_DIR/q21.csv" + format: + type: csv + fields: + - name: s_name + type: VARCHAR + - name: numwait + type: BIGINT + field-delimiter: "|" + schema: + - name: s_name + type: VARCHAR + - name: numwait + type: BIGINT diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q22.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q22.yaml new file mode 100644 index 0000000..f39035e --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q22.yaml @@ -0,0 +1,23 @@ +tables: + - name: q22 + type: sink-table + connector: + type: filesystem + path: "$RESULT_DIR/q22.csv" + format: + type: csv + fields: + - name: cntrycode + type: VARCHAR + - name: numcust + type: BIGINT + - name: totacctbal + type: DOUBLE + field-delimiter: "|" + schema: + - name: cntrycode + type: VARCHAR + - name: numcust + type: BIGINT + - name: totacctbal + type: DOUBLE diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q3.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q3.yaml new file mode 100644 index 0000000..8617e50 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q3.yaml @@ -0,0 +1,27 @@ +tables: + - name: q3 + type: sink-table + connector: + type: filesystem + path: "$RESULT_DIR/q3.csv" + format: + type: csv + fields: + - name: l_orderkey + type: BIGINT + - name: revenue + type: DOUBLE + - name: o_orderdate + type: DATE + - name: o_shippriority + type: INT + field-delimiter: "|" + schema: + - name: l_orderkey + type: BIGINT + - name: revenue + type: DOUBLE + - name: o_orderdate + type: DATE + - name: o_shippriority + type: INT diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q4.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q4.yaml new file mode 100644 index 0000000..1a05c98 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q4.yaml @@ -0,0 +1,19 @@ +tables: + - name: q4 + type: sink-table + connector: + type: filesystem + path: "$RESULT_DIR/q4.csv" + format: + type: csv + fields: + - name: o_orderpriority + type: VARCHAR + - name: order_count + type: BIGINT + field-delimiter: "|" + schema: + - name: o_orderpriority + type: VARCHAR + - name: order_count + type: BIGINT diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q5.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q5.yaml new file mode 100644 index 0000000..845b131 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q5.yaml @@ -0,0 +1,19 @@ +tables: + - name: q5 + type: sink-table + connector: + type: filesystem + path: "$RESULT_DIR/q5.csv" + format: + type: csv + fields: + - name: n_name + type: VARCHAR + - name: revenue + type: DOUBLE + field-delimiter: "|" + schema: + - name: n_name + type: VARCHAR + - name: revenue + type: DOUBLE diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q6.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q6.yaml new file mode 100644 index 0000000..3c0612f --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q6.yaml @@ -0,0 +1,15 @@ +tables: + - name: q6 + type: sink-table + connector: + type: filesystem + path: "$RESULT_DIR/q6.csv" + format: + type: csv + fields: + - name: revenue + type: DOUBLE + field-delimiter: "|" + schema: + - name: revenue + type: DOUBLE diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q7.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q7.yaml new file mode 100644 index 0000000..d11da65 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q7.yaml @@ -0,0 +1,27 @@ +tables: + - name: q7 + type: sink-table + connector: + type: filesystem + path: "$RESULT_DIR/q7.csv" + format: + type: csv + fields: + - name: supp_nation + type: VARCHAR + - name: cust_nation + type: VARCHAR + - name: l_year + type: BIGINT + - name: revenue + type: DOUBLE + field-delimiter: "|" + schema: + - name: supp_nation + type: VARCHAR + - name: cust_nation + type: VARCHAR + - name: l_year + type: BIGINT + - name: revenue + type: DOUBLE diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q8.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q8.yaml new file mode 100644 index 0000000..88b48df --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q8.yaml @@ -0,0 +1,19 @@ +tables: + - name: q8 + type: sink-table + connector: + type: filesystem + path: "$RESULT_DIR/q8.csv" + format: + type: csv + fields: + - name: o_year + type: BIGINT + - name: mkt_share + type: DOUBLE + field-delimiter: "|" + schema: + - name: o_year + type: BIGINT + - name: mkt_share + type: DOUBLE diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q9.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q9.yaml new file mode 100644 index 0000000..b2030c7 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/sink/q9.yaml @@ -0,0 +1,23 @@ +tables: + - name: q9 + type: sink-table + connector: + type: filesystem + path: "$RESULT_DIR/q9.csv" + format: + type: csv + fields: + - name: nation + type: VARCHAR + - name: o_year + type: BIGINT + - name: sum_profit + type: DOUBLE + field-delimiter: "|" + schema: + - name: nation + type: VARCHAR + - name: o_year + type: BIGINT + - name: sum_profit + type: DOUBLE diff --git a/flink-end-to-end-tests/test-scripts/test-data/tpch/source.yaml b/flink-end-to-end-tests/test-scripts/test-data/tpch/source.yaml new file mode 100644 index 0000000..83c1558 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test-data/tpch/source.yaml @@ -0,0 +1,349 @@ +tables: + - name: customer + type: source-table + update-mode: append + connector: + type: filesystem + path: "$TABLE_DIR/customer.csv" + format: + type: csv + fields: + - name: c_custkey + type: BIGINT + - name: c_name + type: VARCHAR + - name: c_address + type: VARCHAR + - name: c_nationkey + type: BIGINT + - name: c_phone + type: VARCHAR + - name: c_acctbal + type: DOUBLE + - name: c_mktsegment + type: VARCHAR + - name: c_comment + type: VARCHAR + field-delimiter: "|" + line-delimiter: "\n" + comment-prefix: "--" + schema: + - name: c_custkey + type: BIGINT + - name: c_name + type: VARCHAR + - name: c_address + type: VARCHAR + - name: c_nationkey + type: BIGINT + - name: c_phone + type: VARCHAR + - name: c_acctbal + type: DOUBLE + - name: c_mktsegment + type: VARCHAR + - name: c_comment + type: VARCHAR + - name: lineitem + type: source-table + update-mode: append + connector: + type: filesystem + path: "$TABLE_DIR/lineitem.csv" + format: + type: csv + fields: + - name: l_orderkey + type: BIGINT + - name: l_partkey + type: BIGINT + - name: l_suppkey + type: BIGINT + - name: l_linenumber + type: INT + - name: l_quantity + type: DOUBLE + - name: l_extendedprice + type: DOUBLE + - name: l_discount + type: DOUBLE + - name: l_tax + type: DOUBLE + - name: l_returnflag + type: VARCHAR + - name: l_linestatus + type: VARCHAR + - name: l_shipdate + type: DATE + - name: l_commitdate + type: DATE + - name: l_receiptdate + type: DATE + - name: l_shipinstruct + type: VARCHAR + - name: l_shipmode + type: VARCHAR + - name: l_comment + type: VARCHAR + field-delimiter: "|" + line-delimiter: "\n" + comment-prefix: "--" + schema: + - name: l_orderkey + type: BIGINT + - name: l_partkey + type: BIGINT + - name: l_suppkey + type: BIGINT + - name: l_linenumber + type: INT + - name: l_quantity + type: DOUBLE + - name: l_extendedprice + type: DOUBLE + - name: l_discount + type: DOUBLE + - name: l_tax + type: DOUBLE + - name: l_returnflag + type: VARCHAR + - name: l_linestatus + type: VARCHAR + - name: l_shipdate + type: DATE + - name: l_commitdate + type: DATE + - name: l_receiptdate + type: DATE + - name: l_shipinstruct + type: VARCHAR + - name: l_shipmode + type: VARCHAR + - name: l_comment + type: VARCHAR + - name: nation + type: source-table + update-mode: append + connector: + type: filesystem + path: "$TABLE_DIR/nation.csv" + format: + type: csv + fields: + - name: n_nationkey + type: BIGINT + - name: n_name + type: VARCHAR + - name: n_regionkey + type: BIGINT + - name: n_comment + type: VARCHAR + field-delimiter: "|" + line-delimiter: "\n" + comment-prefix: "--" + schema: + - name: n_nationkey + type: BIGINT + - name: n_name + type: VARCHAR + - name: n_regionkey + type: BIGINT + - name: n_comment + type: VARCHAR + - name: orders + type: source-table + update-mode: append + connector: + type: filesystem + path: "$TABLE_DIR/orders.csv" + format: + type: csv + fields: + - name: o_orderkey + type: BIGINT + - name: o_custkey + type: BIGINT + - name: o_orderstatus + type: VARCHAR + - name: o_totalprice + type: DOUBLE + - name: o_orderdate + type: DATE + - name: o_orderpriority + type: VARCHAR + - name: o_clerk + type: VARCHAR + - name: o_shippriority + type: INT + - name: o_comment + type: VARCHAR + field-delimiter: "|" + line-delimiter: "\n" + comment-prefix: "--" + schema: + - name: o_orderkey + type: BIGINT + - name: o_custkey + type: BIGINT + - name: o_orderstatus + type: VARCHAR + - name: o_totalprice + type: DOUBLE + - name: o_orderdate + type: DATE + - name: o_orderpriority + type: VARCHAR + - name: o_clerk + type: VARCHAR + - name: o_shippriority + type: INT + - name: o_comment + type: VARCHAR + - name: part + type: source-table + update-mode: append + connector: + type: filesystem + path: "$TABLE_DIR/part.csv" + format: + type: csv + fields: + - name: p_partkey + type: BIGINT + - name: p_name + type: VARCHAR + - name: p_mfgr + type: VARCHAR + - name: p_brand + type: VARCHAR + - name: p_type + type: VARCHAR + - name: p_size + type: INT + - name: p_container + type: VARCHAR + - name: p_retailprice + type: DOUBLE + - name: p_comment + type: VARCHAR + field-delimiter: "|" + line-delimiter: "\n" + comment-prefix: "--" + schema: + - name: p_partkey + type: BIGINT + - name: p_name + type: VARCHAR + - name: p_mfgr + type: VARCHAR + - name: p_brand + type: VARCHAR + - name: p_type + type: VARCHAR + - name: p_size + type: INT + - name: p_container + type: VARCHAR + - name: p_retailprice + type: DOUBLE + - name: p_comment + type: VARCHAR + - name: partsupp + type: source-table + update-mode: append + connector: + type: filesystem + path: "$TABLE_DIR/partsupp.csv" + format: + type: csv + fields: + - name: ps_partkey + type: BIGINT + - name: ps_suppkey + type: BIGINT + - name: ps_availqty + type: INT + - name: ps_supplycost + type: DOUBLE + - name: ps_comment + type: VARCHAR + field-delimiter: "|" + line-delimiter: "\n" + comment-prefix: "--" + schema: + - name: ps_partkey + type: BIGINT + - name: ps_suppkey + type: BIGINT + - name: ps_availqty + type: INT + - name: ps_supplycost + type: DOUBLE + - name: ps_comment + type: VARCHAR + - name: region + type: source-table + update-mode: append + connector: + type: filesystem + path: "$TABLE_DIR/region.csv" + format: + type: csv + fields: + - name: r_regionkey + type: BIGINT + - name: r_name + type: VARCHAR + - name: r_comment + type: VARCHAR + field-delimiter: "|" + line-delimiter: "\n" + comment-prefix: "--" + schema: + - name: r_regionkey + type: BIGINT + - name: r_name + type: VARCHAR + - name: r_comment + type: VARCHAR + - name: supplier + type: source-table + update-mode: append + connector: + type: filesystem + path: "$TABLE_DIR/supplier.csv" + format: + type: csv + fields: + - name: s_suppkey + type: BIGINT + - name: s_name + type: VARCHAR + - name: s_address + type: VARCHAR + - name: s_nationkey + type: BIGINT + - name: s_phone + type: VARCHAR + - name: s_acctbal + type: DOUBLE + - name: s_comment + type: VARCHAR + field-delimiter: "|" + line-delimiter: "\n" + comment-prefix: "--" + schema: + - name: s_suppkey + type: BIGINT + - name: s_name + type: VARCHAR + - name: s_address + type: VARCHAR + - name: s_nationkey + type: BIGINT + - name: s_phone + type: VARCHAR + - name: s_acctbal + type: DOUBLE + - name: s_comment + type: VARCHAR diff --git a/flink-end-to-end-tests/test-scripts/test_tpch.sh b/flink-end-to-end-tests/test-scripts/test_tpch.sh new file mode 100755 index 0000000..ab4c61c --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test_tpch.sh @@ -0,0 +1,91 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +set -Eeuo pipefail + +SCALE="0.01" + +source "$(dirname "$0")"/common.sh + +################################################################################ +# Generate test data +################################################################################ + +echo "Generating test data..." + +TARGET_DIR="$END_TO_END_DIR/flink-tpch-test/target" +TPCH_DATA_DIR="$END_TO_END_DIR/test-scripts/test-data/tpch" +java -cp "$TARGET_DIR/flink-tpch-test-1.10-SNAPSHOT.jar:$TARGET_DIR/lib/*" org.apache.flink.table.tpch.TpchDataGenerator "$SCALE" "$TARGET_DIR" + +################################################################################ +# Prepare Flink +################################################################################ + +echo "Preparing Flink..." + +start_cluster + +################################################################################ +# Run SQL statements +################################################################################ + +TABLE_DIR="$TARGET_DIR/table" +ORIGIN_QUERY_DIR="$TARGET_DIR/query" +MODIFIED_QUERY_DIR="$TPCH_DATA_DIR/modified-query" +EXPECTED_DIR="$TARGET_DIR/expected" +RESULT_DIR="$TEST_DATA_DIR/result" +SQL_CONF="$TEST_DATA_DIR/sql-client-session.conf" + +mkdir "$RESULT_DIR" + +SOURCES_YAML=$(cat "$TPCH_DATA_DIR/source.yaml") +SOURCES_YAML=${SOURCES_YAML//\$TABLE_DIR/"$TABLE_DIR"} + +for i in {1..22} +do + echo "Running query #$i..." + + # First line in sink yaml is ignored + SINK_YAML=$(tail -n +2 "$TPCH_DATA_DIR/sink/q${i}.yaml") + SINK_YAML=${SINK_YAML//\$RESULT_DIR/"$RESULT_DIR"} + + cat > "$SQL_CONF" << EOF +${SOURCES_YAML} +${SINK_YAML} +execution: + planner: blink + type: batch + result-mode: table +EOF + + if [[ -e "$MODIFIED_QUERY_DIR/q$i.sql" ]] + then + SQL_STATEMENT="INSERT INTO q$i $(cat "$MODIFIED_QUERY_DIR/q$i.sql")" + else + SQL_STATEMENT="INSERT INTO q$i $(cat "$ORIGIN_QUERY_DIR/q$i.sql")" + fi + + JOB_ID=$("$FLINK_DIR/bin/sql-client.sh" embedded \ + --environment "$SQL_CONF" \ + --update "$SQL_STATEMENT" | grep "Job ID:" | sed 's/.* //g') + + wait_job_terminal_state "$JOB_ID" "FINISHED" + + java -cp "$TARGET_DIR/flink-tpch-test-1.10-SNAPSHOT.jar" org.apache.flink.table.tpch.TpchResultComparator "$EXPECTED_DIR/q$i.csv" "$RESULT_DIR/q$i.csv" +done diff --git a/pom.xml b/pom.xml index 65af8f3..b1fc076 100644 --- a/pom.xml +++ b/pom.xml @@ -1337,7 +1337,7 @@ under the License. <exclude>flink-table/flink-table-planner-blink/src/test/resources/digest/*.out</exclude> <exclude>flink-table/flink-table-planner-blink/src/test/resources/explain/*.out</exclude> <exclude>flink-yarn/src/test/resources/krb5.keytab</exclude> - <exclude>flink-end-to-end-tests/test-scripts/test-data/*</exclude> + <exclude>flink-end-to-end-tests/test-scripts/test-data/**</exclude> <exclude>flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/config/keystore.jks</exclude> <exclude>flink-connectors/flink-connector-kafka/src/test/resources/**</exclude> <exclude>flink-connectors/flink-connector-kafka-0.11/src/test/resources/**</exclude>
