This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch release-1.6 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6e1ad2d7c5dc5f2da0e18b23b21b69c1bfc5a333 Author: Timo Walther <[email protected]> AuthorDate: Fri Jul 20 16:55:48 2018 +0200 [FLINK-9833] [e2e] Add a SQL Client end-to-end test with unified source/sink/format Adds a SQL Client end-to-end test with Kafka/Filesystem and Avro/JSON/CSV components. It reads JSON from Kafka, uses a UDF for transformation, writes to Kafka Avro, reads from Kafka Avro, and writes to Filesystem CSV again. It also tests the available SQL jars for correctness. This closes #6422. --- .../flink-sql-client-test/pom.xml | 124 +++++++++ .../table/toolbox/StringRegexReplaceFunction.java | 31 +++ flink-end-to-end-tests/pom.xml | 1 + flink-end-to-end-tests/run-nightly-tests.sh | 2 + .../test-scripts/test_sql_client.sh | 288 +++++++++++++++++++++ 5 files changed, 446 insertions(+) diff --git a/flink-end-to-end-tests/flink-sql-client-test/pom.xml b/flink-end-to-end-tests/flink-sql-client-test/pom.xml new file mode 100644 index 0000000..d2bfe4b --- /dev/null +++ b/flink-end-to-end-tests/flink-sql-client-test/pom.xml @@ -0,0 +1,124 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>flink-end-to-end-tests</artifactId> + <groupId>org.apache.flink</groupId> + <version>1.6-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>flink-sql-client-test</artifactId> + <name>flink-sql-client-test</name> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-compiler</artifactId> + <scope>provided</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <!-- Build toolbox jar. --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>3.1.1</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <finalName>SqlToolbox</finalName> + </configuration> + </execution> + </executions> + </plugin> + + <!-- Copy SQL jars into dedicated "sql-jars" directory. --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <id>copy</id> + <phase>package</phase> + <goals> + <goal>copy</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/sql-jars</outputDirectory> + <!-- List of currently provided SQL jars. --> + <artifactItems> + <artifactItem> + <groupId>org.apache.flink</groupId> + <artifactId>flink-avro</artifactId> + <version>${project.version}</version> + <classifier>sql-jar</classifier> + <type>jar</type> + </artifactItem> + <artifactItem> + <groupId>org.apache.flink</groupId> + <artifactId>flink-json</artifactId> + <version>${project.version}</version> + <classifier>sql-jar</classifier> + <type>jar</type> + </artifactItem> + <artifactItem> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka-0.9_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <classifier>sql-jar</classifier> + <type>jar</type> + </artifactItem> + <artifactItem> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <classifier>sql-jar</classifier> + <type>jar</type> + </artifactItem> + <artifactItem> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <classifier>sql-jar</classifier> + <type>jar</type> + </artifactItem> + </artifactItems> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> diff --git a/flink-end-to-end-tests/flink-sql-client-test/src/main/java/org/apache/flink/table/toolbox/StringRegexReplaceFunction.java b/flink-end-to-end-tests/flink-sql-client-test/src/main/java/org/apache/flink/table/toolbox/StringRegexReplaceFunction.java new file mode 100644 index 0000000..c74e46d --- /dev/null +++ b/flink-end-to-end-tests/flink-sql-client-test/src/main/java/org/apache/flink/table/toolbox/StringRegexReplaceFunction.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.toolbox; + +import org.apache.flink.table.functions.ScalarFunction; + +/** + * Scalar function for replacing all occurrences of a regular expression with a replacement string. + */ +public class StringRegexReplaceFunction extends ScalarFunction { + + public String eval(String input, String regex, String replacement) { + return input.replaceAll(regex, replacement); + } +} diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml index 162ca66..6d759e6 100644 --- a/flink-end-to-end-tests/pom.xml +++ b/flink-end-to-end-tests/pom.xml @@ -52,6 +52,7 @@ under the License. <module>flink-quickstart-test</module> <module>flink-confluent-schema-registry</module> <module>flink-stream-state-ttl-test</module> + <module>flink-sql-client-test</module> </modules> <build> diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index b4f3789..8439bad 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -113,5 +113,7 @@ run_test "State TTL RocksDb backend end-to-end test" "$END_TO_END_DIR/test-scrip run_test "Running Kerberized YARN on Docker test " "$END_TO_END_DIR/test-scripts/test_yarn_kerberos_docker.sh" +run_test "SQL Client end-to-end test" "$END_TO_END_DIR/test-scripts/test_sql_client.sh" + printf "\n[PASS] All tests passed\n" exit 0 diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh b/flink-end-to-end-tests/test-scripts/test_sql_client.sh new file mode 100755 index 0000000..934f7d4 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh @@ -0,0 +1,288 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +source "$(dirname "$0")"/common.sh +source "$(dirname "$0")"/kafka-common.sh + +SQL_TOOLBOX_JAR=$END_TO_END_DIR/flink-sql-client-test/target/SqlToolbox.jar +SQL_JARS_DIR=$END_TO_END_DIR/flink-sql-client-test/target/sql-jars + +################################################################################ +# Verify existing SQL jars +################################################################################ + +EXTRACTED_JAR=$TEST_DATA_DIR/extracted + +mkdir -p $EXTRACTED_JAR + +for SQL_JAR in $SQL_JARS_DIR/*.jar; do + echo "Checking SQL JAR: $SQL_JAR" + unzip $SQL_JAR -d $EXTRACTED_JAR > /dev/null + + # check for proper shading + for EXTRACTED_FILE in $(find $EXTRACTED_JAR -type f); do + + if ! [[ $EXTRACTED_FILE = "$EXTRACTED_JAR/org/apache/flink"* ]] && \ + ! [[ $EXTRACTED_FILE = "$EXTRACTED_JAR/META-INF"* ]] && \ + ! [[ $EXTRACTED_FILE = "$EXTRACTED_JAR/LICENSE"* ]] && \ + ! [[ $EXTRACTED_FILE = "$EXTRACTED_JAR/NOTICE"* ]] ; then + echo "Bad file in JAR: $EXTRACTED_FILE" + exit 1 + fi + done + + # check for proper factory + if [ ! -f $EXTRACTED_JAR/META-INF/services/org.apache.flink.table.factories.TableFactory ]; then + echo "No table factory found in JAR: $SQL_JAR" + exit 1 + fi + + # clean up + rm -r $EXTRACTED_JAR/* +done + +################################################################################ +# Run a SQL statement +################################################################################ + +echo "Testing SQL statement..." + +function sql_cleanup() { + # don't call ourselves again for another signal interruption + trap "exit -1" INT + # don't call ourselves again for normal exit + trap "" EXIT + + stop_kafka_cluster +} +trap sql_cleanup INT +trap sql_cleanup EXIT + +# prepare Kafka +echo "Preparing Kafka..." + +setup_kafka_dist + +start_kafka_cluster + +create_kafka_topic 1 1 test-json +create_kafka_topic 1 1 test-avro + +# put JSON data into Kafka +echo "Sending messages to Kafka..." + +send_messages_to_kafka '{"timestamp": "2018-03-12 08:00:00", "user": "Alice", "event": { "type": "WARNING", "message": "This is a warning."}}' test-json +# duplicate +send_messages_to_kafka '{"timestamp": "2018-03-12 08:10:00", "user": "Alice", "event": { "type": "WARNING", "message": "This is a warning."}}' test-json +send_messages_to_kafka '{"timestamp": "2018-03-12 09:00:00", "user": "Bob", "event": { "type": "WARNING", "message": "This is another warning."}}' test-json +send_messages_to_kafka '{"timestamp": "2018-03-12 09:10:00", "user": "Alice", "event": { "type": "INFO", "message": "This is a info."}}' test-json +send_messages_to_kafka '{"timestamp": "2018-03-12 09:20:00", "user": "Steve", "event": { "type": "INFO", "message": "This is another info."}}' test-json +# duplicate +send_messages_to_kafka '{"timestamp": "2018-03-12 09:30:00", "user": "Steve", "event": { "type": "INFO", "message": "This is another info."}}' test-json +# filtered in results +send_messages_to_kafka '{"timestamp": "2018-03-12 09:30:00", "user": null, "event": { "type": "WARNING", "message": "This is a bad message because the user is missing."}}' test-json +# pending in results +send_messages_to_kafka '{"timestamp": "2018-03-12 10:40:00", "user": "Bob", "event": { "type": "ERROR", "message": "This is an error."}}' test-json + +# prepare Flink +echo "Preparing Flink..." + +start_cluster +start_taskmanagers 1 + +# create session environment file +RESULT=$TEST_DATA_DIR/result +SQL_CONF=$TEST_DATA_DIR/sql-client-session.conf + +cat > $SQL_CONF << EOF +tables: + - name: JsonSourceTable + type: source + update-mode: append + schema: + - name: rowtime + type: TIMESTAMP + rowtime: + timestamps: + type: from-field + from: timestamp + watermarks: + type: periodic-bounded + delay: 2000 + - name: user + type: VARCHAR + - name: event + type: ROW(type VARCHAR, message VARCHAR) + connector: + type: kafka + version: "0.10" + topic: test-json + startup-mode: earliest-offset + properties: + - key: zookeeper.connect + value: localhost:2181 + - key: bootstrap.servers + value: localhost:9092 + format: + type: json + json-schema: > + { + "type": "object", + "properties": { + "timestamp": { + "type": "string" + }, + "user": { + "type": ["string", "null"] + }, + "event": { + "type": "object", + "properties": { + "type": { + "type": "string" + }, + "message": { + "type": "string" + } + } + } + } + } + - name: AvroBothTable + type: both + update-mode: append + schema: + - name: event_timestamp + type: VARCHAR + - name: user + type: VARCHAR + - name: message + type: VARCHAR + - name: duplicate_count + type: BIGINT + connector: + type: kafka + version: "0.10" + topic: test-avro + startup-mode: earliest-offset + properties: + - key: zookeeper.connect + value: localhost:2181 + - key: bootstrap.servers + value: localhost:9092 + format: + type: avro + avro-schema: > + { + "namespace": "org.apache.flink.table.tests", + "type": "record", + "name": "NormalizedEvent", + "fields": [ + {"name": "event_timestamp", "type": "string"}, + {"name": "user", "type": ["string", "null"]}, + {"name": "message", "type": "string"}, + {"name": "duplicate_count", "type": "long"} + ] + } + - name: CsvSinkTable + type: sink + update-mode: append + schema: + - name: event_timestamp + type: VARCHAR + - name: user + type: VARCHAR + - name: message + type: VARCHAR + - name: duplicate_count + type: BIGINT + connector: + type: filesystem + path: $RESULT + format: + type: csv + fields: + - name: event_timestamp + type: VARCHAR + - name: user + type: VARCHAR + - name: message + type: VARCHAR + - name: duplicate_count + type: BIGINT + +functions: + - name: RegReplace + from: class + class: org.apache.flink.table.toolbox.StringRegexReplaceFunction +EOF + +# submit SQL statements + +read -r -d '' SQL_STATEMENT_1 << EOF +INSERT INTO AvroBothTable + SELECT + CAST(TUMBLE_START(rowtime, INTERVAL '1' HOUR) AS VARCHAR) AS event_timestamp, + user, + RegReplace(event.message, ' is ', ' was ') AS message, + COUNT(*) AS duplicate_count + FROM JsonSourceTable + WHERE user IS NOT NULL + GROUP BY + user, + event.message, + TUMBLE(rowtime, INTERVAL '1' HOUR) +EOF + +echo "Executing SQL: Kafka JSON -> Kafka Avro" +echo "$SQL_STATEMENT_1" + +$FLINK_DIR/bin/sql-client.sh embedded \ + --library $SQL_JARS_DIR \ + --jar $SQL_TOOLBOX_JAR \ + --environment $SQL_CONF \ + --update "$SQL_STATEMENT_1" + +read -r -d '' SQL_STATEMENT_2 << EOF +INSERT INTO CsvSinkTable + SELECT * + FROM AvroBothTable +EOF + +echo "Executing SQL: Kafka Avro -> Filesystem CSV" +echo "$SQL_STATEMENT_2" + +$FLINK_DIR/bin/sql-client.sh embedded \ + --library $SQL_JARS_DIR \ + --jar $SQL_TOOLBOX_JAR \ + --environment $SQL_CONF \ + --update "$SQL_STATEMENT_2" + +echo "Waiting for CSV results..." +for i in {1..10}; do + if [ -e $RESULT ]; then + CSV_LINE_COUNT=`cat $RESULT | wc -l` + if [ $((CSV_LINE_COUNT)) -eq 4 ]; then + break + fi + fi + sleep 5 +done + +check_result_hash "SQLClient" $RESULT "dca08a82cc09f6b19950291dbbef16bb"
