[FLINK-8983] Integrate test_confluent_schema_registry.sh into run-nightly-tests.sh
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9b366045 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9b366045 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9b366045 Branch: refs/heads/master Commit: 9b366045697f80534b8eb2e8b559f02d1452f0cf Parents: a36b569 Author: Till Rohrmann <[email protected]> Authored: Thu Jun 28 18:34:24 2018 +0200 Committer: Till Rohrmann <[email protected]> Committed: Thu Jun 28 18:34:24 2018 +0200 ---------------------------------------------------------------------- flink-end-to-end-tests/.gitignore | 1 + .../flink-confluent-schema-registry/.gitignore | 1 + .../flink-confluent-schema-registry/pom.xml | 7 ++ .../test/TestAvroConsumerConfluent.java | 14 +-- flink-end-to-end-tests/run-nightly-tests.sh | 2 +- .../test-scripts/kafka-common.sh | 28 +++-- .../test-confluent-schema-registry.sh | 106 ------------------- .../test_confluent_schema_registry.sh | 93 ++++++++++++++++ 8 files changed, 128 insertions(+), 124 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9b366045/flink-end-to-end-tests/.gitignore ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/.gitignore b/flink-end-to-end-tests/.gitignore new file mode 100644 index 0000000..3fc3a72 --- /dev/null +++ b/flink-end-to-end-tests/.gitignore @@ -0,0 +1 @@ +test-scripts/temp-test-directory* http://git-wip-us.apache.org/repos/asf/flink/blob/9b366045/flink-end-to-end-tests/flink-confluent-schema-registry/.gitignore ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-confluent-schema-registry/.gitignore b/flink-end-to-end-tests/flink-confluent-schema-registry/.gitignore new file mode 100644 index 0000000..b83a752 --- /dev/null +++ b/flink-end-to-end-tests/flink-confluent-schema-registry/.gitignore @@ -0,0 +1 @@ +src/main/java/example/avro http://git-wip-us.apache.org/repos/asf/flink/blob/9b366045/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml b/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml index 576fca6..2f63c22 100644 --- a/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml +++ b/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml @@ -150,6 +150,13 @@ under the License. </execution> </executions> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <configuration> + <excludes>**/example/avro/*</excludes> + </configuration> + </plugin> </plugins> </build> </project> http://git-wip-us.apache.org/repos/asf/flink/blob/9b366045/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java b/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java index 9149832..fbbe3c3 100644 --- a/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java +++ b/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java @@ -28,6 +28,7 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; import example.avro.User; +import org.apache.avro.specific.SpecificRecordBase; import java.util.Properties; @@ -40,7 +41,6 @@ import java.util.Properties; public class TestAvroConsumerConfluent { public static void main(String[] args) throws Exception { - Properties config = new Properties(); // parse input arguments final ParameterTool parameterTool = ParameterTool.fromArgs(args); @@ -52,6 +52,7 @@ public class TestAvroConsumerConfluent { "--schema-registry-url <confluent schema registry> --group.id <some id>"); return; } + Properties config = new Properties(); config.setProperty("bootstrap.servers", parameterTool.getRequired("bootstrap.servers")); config.setProperty("group.id", parameterTool.getRequired("group.id")); config.setProperty("zookeeper.connect", parameterTool.getRequired("zookeeper.connect")); @@ -62,20 +63,15 @@ public class TestAvroConsumerConfluent { DataStreamSource<User> input = env .addSource( - new FlinkKafkaConsumer010( + new FlinkKafkaConsumer010<>( parameterTool.getRequired("input-topic"), ConfluentRegistryAvroDeserializationSchema.forSpecific(User.class, schemaRegistryUrl), config).setStartFromEarliest()); SingleOutputStreamOperator<String> mapToString = input - .map(new MapFunction<User, String>() { - @Override - public String map(User value) throws Exception { - return value.toString(); - } - }); + .map((MapFunction<User, String>) SpecificRecordBase::toString); - FlinkKafkaProducer010<String> stringFlinkKafkaProducer010 = new FlinkKafkaProducer010( + FlinkKafkaProducer010<String> stringFlinkKafkaProducer010 = new FlinkKafkaProducer010<>( parameterTool.getRequired("output-topic"), new SimpleStringSchema(), config); http://git-wip-us.apache.org/repos/asf/flink/blob/9b366045/flink-end-to-end-tests/run-nightly-tests.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index 5873349..11eeafe 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -98,7 +98,7 @@ run_test "Elasticsearch (v5.1.2) sink end-to-end test" "$END_TO_END_DIR/test-scr run_test "Quickstarts Java nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh java" run_test "Quickstarts Scala nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh scala" -run_test "Avro Confluent Schema Registry nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test-confluent-schema-registry.sh" +run_test "Avro Confluent Schema Registry nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_confluent_schema_registry.sh" printf "\n[PASS] All tests passed\n" exit 0 http://git-wip-us.apache.org/repos/asf/flink/blob/9b366045/flink-end-to-end-tests/test-scripts/kafka-common.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/test-scripts/kafka-common.sh b/flink-end-to-end-tests/test-scripts/kafka-common.sh index 50fba12..f0a37b8 100644 --- a/flink-end-to-end-tests/test-scripts/kafka-common.sh +++ b/flink-end-to-end-tests/test-scripts/kafka-common.sh @@ -26,6 +26,8 @@ fi KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0 CONFLUENT_DIR=$TEST_DATA_DIR/confluent-3.2.0 +SCHEMA_REGISTRY_PORT=8082 +SCHEMA_REGISTRY_URL=http://localhost:${SCHEMA_REGISTRY_PORT} function setup_kafka_dist { # download Kafka @@ -49,6 +51,9 @@ function setup_confluent_dist { curl "$CONFLUENT_URL" > $TEST_DATA_DIR/confluent.tgz tar xzf $TEST_DATA_DIR/confluent.tgz -C $TEST_DATA_DIR/ + + # fix confluent config + sed -i -e "s#listeners=http://0.0.0.0:8081#listeners=http://0.0.0.0:${SCHEMA_REGISTRY_PORT}#" $CONFLUENT_DIR/etc/schema-registry/schema-registry.properties } function start_kafka_cluster { @@ -87,14 +92,8 @@ function read_messages_from_kafka { --consumer-property group.id=$3 2> /dev/null } -function read_messages_from_kafka_avro { - $CONFLUENT_DIR/bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --from-beginning \ - --max-messages $1 \ - --topic $2 2> /dev/null -} - function send_messages_to_kafka_avro { -echo -e $1 | $CONFLUENT_DIR/bin/kafka-avro-console-producer --broker-list localhost:9092 --topic $2 --property value.schema=$3 +echo -e $1 | $CONFLUENT_DIR/bin/kafka-avro-console-producer --broker-list localhost:9092 --topic $2 --property value.schema=$3 --property schema.registry.url=${SCHEMA_REGISTRY_URL} } function modify_num_partitions { @@ -120,7 +119,20 @@ function get_partition_end_offset { } function start_confluent_schema_registry { - $CONFLUENT_DIR/bin/schema-registry-start -daemon $CONFLUENT_DIR/etc/schema-registry/schema-registry.properties + $CONFLUENT_DIR/bin/schema-registry-start -daemon $CONFLUENT_DIR/etc/schema-registry/schema-registry.properties + + # wait until the schema registry REST endpoint is up + for i in {1..30}; do + QUERY_RESULT=$(curl "${SCHEMA_REGISTRY_URL}/subjects" 2> /dev/null || true) + + if [[ ${QUERY_RESULT} =~ \[.*\] ]]; then + echo "Schema registry is up." + break + fi + + echo "Waiting for schema registry..." + sleep 1 + done } function stop_confluent_schema_registry { http://git-wip-us.apache.org/repos/asf/flink/blob/9b366045/flink-end-to-end-tests/test-scripts/test-confluent-schema-registry.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/test-scripts/test-confluent-schema-registry.sh b/flink-end-to-end-tests/test-scripts/test-confluent-schema-registry.sh deleted file mode 100755 index 5cebb72..0000000 --- a/flink-end-to-end-tests/test-scripts/test-confluent-schema-registry.sh +++ /dev/null @@ -1,106 +0,0 @@ -#!/usr/bin/env bash -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -source "$(dirname "$0")"/common.sh -source "$(dirname "$0")"/kafka-common.sh - -function verify_output { - local expected=$(printf $1) - local result=$(echo $2 | sed 's/ //g') - - if [[ "$result" != "$expected" ]]; then - echo "Output from Flink program does not match expected output." - echo -e "EXPECTED FOR KEY: --$expected--" - echo -e "ACTUAL: --$result--" - PASS="" - exit 1 - fi -} - -function test_cleanup { - # don't call ourselves again for another signal interruption - trap "exit -1" INT - # don't call ourselves again for normal exit - trap "" EXIT - - stop_kafka_cluster - stop_confluent_schema_registry - - # revert our modifications to the Flink distribution - mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml - - # make sure to run regular cleanup as well - cleanup -} - -trap test_cleanup INT -trap test_cleanup EXIT - -setup_kafka_dist -setup_confluent_dist - -cd flink-end-to-end-tests/flink-confluent-schema-registry -mvn clean package -nsu - -start_kafka_cluster -start_confluent_schema_registry -sleep 5 - -# modify configuration to use port 8082 for Flink -cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak -sed -i -e "s/web.port: 8081/web.port: 8082/" $FLINK_DIR/conf/flink-conf.yaml - -TEST_PROGRAM_JAR=target/TestAvroConsumerConfluent.jar - -INPUT_MESSAGE_1='{"name":"Alyssa","favoriteNumber":"250","favoriteColor":"green","eventType":"meeting"}' -INPUT_MESSAGE_2='{"name":"Charlie","favoriteNumber":"10","favoriteColor":"blue","eventType":"meeting"}' -INPUT_MESSAGE_3='{"name":"Ben","favoriteNumber":"7","favoriteColor":"red","eventType":"meeting"}' -USER_SCHEMA='{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string","default":""},{"name":"favoriteNumber","type":"string","default":""},{"name":"favoriteColor","type":"string","default":""},{"name":"eventType","type":{"name":"EventType","type":"enum","symbols":["meeting"]}}]}' - -curl -X POST \ - http://localhost:8081/subjects/users-value/versions \ - -H 'cache-control: no-cache' \ - -H 'content-type: application/vnd.schemaregistry.v1+json' \ - -d '{"schema": "{\"namespace\": \"example.avro\",\"type\": \"record\",\"name\": \"User\",\"fields\": [{\"name\": \"name\", \"type\": \"string\", \"default\": \"\"},{\"name\": \"favoriteNumber\", \"type\": \"string\", \"default\": \"\"},{\"name\": \"favoriteColor\", \"type\": \"string\", \"default\": \"\"},{\"name\": \"eventType\",\"type\": {\"name\": \"EventType\",\"type\": \"enum\", \"symbols\": [\"meeting\"] }}]}"}' - -echo "Sending messages to Kafka topic [test-avro-input] ..." - -send_messages_to_kafka_avro $INPUT_MESSAGE_1 test-avro-input $USER_SCHEMA -send_messages_to_kafka_avro $INPUT_MESSAGE_2 test-avro-input $USER_SCHEMA -send_messages_to_kafka_avro $INPUT_MESSAGE_3 test-avro-input $USER_SCHEMA - -start_cluster - -# Read Avro message from [test-avro-input], check the schema and send message to [test-avro-out] -$FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR \ - --input-topic test-avro-input --output-topic test-avro-out \ - --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer --auto.offset.reset earliest \ - --schema-registry-url http://localhost:8081 - -#echo "Reading messages from Kafka topic [test-avro-out] ..." - -KEY_1_MSGS=$(read_messages_from_kafka 3 test-avro-out Alyssa_consumer | grep Alyssa) -KEY_2_MSGS=$(read_messages_from_kafka 3 test-avro-out Charlie_consumer | grep Charlie) -KEY_3_MSGS=$(read_messages_from_kafka 3 test-avro-out Ben_consumer | grep Ben) - -## Verifying AVRO output with actual message -verify_output $INPUT_MESSAGE_1 "$KEY_1_MSGS" -verify_output $INPUT_MESSAGE_2 "$KEY_2_MSGS" -verify_output $INPUT_MESSAGE_3 "$KEY_3_MSGS" - http://git-wip-us.apache.org/repos/asf/flink/blob/9b366045/flink-end-to-end-tests/test-scripts/test_confluent_schema_registry.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/test-scripts/test_confluent_schema_registry.sh b/flink-end-to-end-tests/test-scripts/test_confluent_schema_registry.sh new file mode 100755 index 0000000..be0b014 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test_confluent_schema_registry.sh @@ -0,0 +1,93 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +source "$(dirname "$0")"/common.sh +source "$(dirname "$0")"/kafka-common.sh + +function verify_output { + local expected=$(printf $1) + local result=$(echo $2 | sed 's/ //g') + + if [[ "$result" != "$expected" ]]; then + echo "Output from Flink program does not match expected output." + echo -e "EXPECTED FOR KEY: --$expected--" + echo -e "ACTUAL: --$result--" + exit 1 + fi +} + +function test_cleanup { + # don't call ourselves again for another signal interruption + trap "exit -1" INT + # don't call ourselves again for normal exit + trap "" EXIT + + stop_kafka_cluster + stop_confluent_schema_registry +} + +trap test_cleanup INT +trap test_cleanup EXIT + +setup_kafka_dist +setup_confluent_dist + +start_kafka_cluster +start_confluent_schema_registry + +TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-confluent-schema-registry/target/TestAvroConsumerConfluent.jar + +INPUT_MESSAGE_1='{"name":"Alyssa","favoriteNumber":"250","favoriteColor":"green","eventType":"meeting"}' +INPUT_MESSAGE_2='{"name":"Charlie","favoriteNumber":"10","favoriteColor":"blue","eventType":"meeting"}' +INPUT_MESSAGE_3='{"name":"Ben","favoriteNumber":"7","favoriteColor":"red","eventType":"meeting"}' +USER_SCHEMA='{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string","default":""},{"name":"favoriteNumber","type":"string","default":""},{"name":"favoriteColor","type":"string","default":""},{"name":"eventType","type":{"name":"EventType","type":"enum","symbols":["meeting"]}}]}' + +curl -X POST \ + ${SCHEMA_REGISTRY_URL}/subjects/users-value/versions \ + -H 'cache-control: no-cache' \ + -H 'content-type: application/vnd.schemaregistry.v1+json' \ + -d '{"schema": "{\"namespace\": \"example.avro\",\"type\": \"record\",\"name\": \"User\",\"fields\": [{\"name\": \"name\", \"type\": \"string\", \"default\": \"\"},{\"name\": \"favoriteNumber\", \"type\": \"string\", \"default\": \"\"},{\"name\": \"favoriteColor\", \"type\": \"string\", \"default\": \"\"},{\"name\": \"eventType\",\"type\": {\"name\": \"EventType\",\"type\": \"enum\", \"symbols\": [\"meeting\"] }}]}"}' + +echo "Sending messages to Kafka topic [test-avro-input] ..." + +send_messages_to_kafka_avro $INPUT_MESSAGE_1 test-avro-input $USER_SCHEMA +send_messages_to_kafka_avro $INPUT_MESSAGE_2 test-avro-input $USER_SCHEMA +send_messages_to_kafka_avro $INPUT_MESSAGE_3 test-avro-input $USER_SCHEMA + +start_cluster + +create_kafka_topic 1 1 test-avro-out + +# Read Avro message from [test-avro-input], check the schema and send message to [test-avro-out] +$FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR \ + --input-topic test-avro-input --output-topic test-avro-out \ + --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer --auto.offset.reset earliest \ + --schema-registry-url ${SCHEMA_REGISTRY_URL} + +#echo "Reading messages from Kafka topic [test-avro-out] ..." + +KEY_1_MSGS=$(read_messages_from_kafka 3 test-avro-out Alyssa_consumer | grep Alyssa) +KEY_2_MSGS=$(read_messages_from_kafka 3 test-avro-out Charlie_consumer | grep Charlie) +KEY_3_MSGS=$(read_messages_from_kafka 3 test-avro-out Ben_consumer | grep Ben) + +## Verifying AVRO output with actual message +verify_output $INPUT_MESSAGE_1 "$KEY_1_MSGS" +verify_output $INPUT_MESSAGE_2 "$KEY_2_MSGS" +verify_output $INPUT_MESSAGE_3 "$KEY_3_MSGS" +
