Repository: flink Updated Branches: refs/heads/master da37daa8b -> 9b3660456
[FLINK-8983] End-to-end test: Confluent schema registry This closes #6083. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a36b5699 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a36b5699 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a36b5699 Branch: refs/heads/master Commit: a36b56999240d1ead0793be7acb4ad13cd0559f2 Parents: da37daa Author: Yadan.JS <[email protected]> Authored: Tue Jun 12 10:03:58 2018 -0400 Committer: Till Rohrmann <[email protected]> Committed: Thu Jun 28 13:43:50 2018 +0200 ---------------------------------------------------------------------- .../flink-confluent-schema-registry/pom.xml | 155 +++++++++++++++++++ .../src/main/avro/user.avsc | 27 ++++ .../test/TestAvroConsumerConfluent.java | 86 ++++++++++ flink-end-to-end-tests/pom.xml | 1 + flink-end-to-end-tests/run-nightly-tests.sh | 2 + .../test-scripts/kafka-common.sh | 29 ++++ .../test-confluent-schema-registry.sh | 106 +++++++++++++ 7 files changed, 406 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a36b5699/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml b/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml new file mode 100644 index 0000000..576fca6 --- /dev/null +++ b/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml @@ -0,0 +1,155 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>flink-end-to-end-tests</artifactId> + <groupId>org.apache.flink</groupId> + <version>1.6-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>flink-confluent-schema-registry</artifactId> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <confluent.version>4.1.0</confluent.version> + </properties> + + <repositories> + <repository> + <id>confluent</id> + <url>http://packages.confluent.io/maven/</url> + </repository> + </repositories> + + <dependencies> + <!-- Apache Flink dependencies --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <!-- This dependency is required to actually execute jobs. It is currently pulled in by + flink-streaming-java, but we explicitly depend on it to safeguard against future changes. --> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.10 --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-avro</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-avro-confluent-registry</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>3.0.0</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <finalName>TestAvroConsumerConfluent</finalName> + <artifactSet> + <excludes> + <exclude>com.google.code.findbugs:jsr305</exclude> + </excludes> + </artifactSet> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>org.apache.flink.schema.registry.test.TestAvroConsumerConfluent</mainClass> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.avro</groupId> + <artifactId>avro-maven-plugin</artifactId> + <version>${avro.version}</version> + <executions> + <execution> + <phase>generate-sources</phase> + <goals> + <goal>schema</goal> + </goals> + <configuration> + <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory> + <outputDirectory>${project.basedir}/src/main/java/</outputDirectory> + <fieldVisibility>PRIVATE</fieldVisibility> + <includes> + <include>**/*.avsc</include> + </includes> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-enforcer-plugin</artifactId> + <executions> + <execution> + <id>dependency-convergence</id> + <goals> + <goal>enforce</goal> + </goals> + <configuration> + <skip>true</skip> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/a36b5699/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/avro/user.avsc ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/avro/user.avsc b/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/avro/user.avsc new file mode 100644 index 0000000..aca9b83 --- /dev/null +++ b/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/avro/user.avsc @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + {"namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string", "default": ""}, + {"name": "favoriteNumber", "type": "string", "default": ""}, + {"name": "favoriteColor", "type": "string", "default": ""}, + {"name": "eventType","type": {"name": "EventType","type": "enum", "symbols": ["meeting"] }} + ] +} http://git-wip-us.apache.org/repos/asf/flink/blob/a36b5699/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java b/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java new file mode 100644 index 0000000..9149832 --- /dev/null +++ b/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.schema.registry.test; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; + +import example.avro.User; + +import java.util.Properties; + +/** + * A simple example that shows how to read from and write to Kafka with Confluent Schema Registry. + * This will read AVRO messages from the input topic, parse them into a POJO type via checking the Schema by calling Schema registry. + * Then this example publish the POJO type to kafka by converting the POJO to AVRO and verifying the schema. + * --input-topic test-input --output-topic test-output --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --schema-registry-url http://localhost:8081 --group.id myconsumer + */ +public class TestAvroConsumerConfluent { + + public static void main(String[] args) throws Exception { + Properties config = new Properties(); + // parse input arguments + final ParameterTool parameterTool = ParameterTool.fromArgs(args); + + if (parameterTool.getNumberOfParameters() < 6) { + System.out.println("Missing parameters!\n" + + "Usage: Kafka --input-topic <topic> --output-topic <topic> " + + "--bootstrap.servers <kafka brokers> " + + "--zookeeper.connect <zk quorum> " + + "--schema-registry-url <confluent schema registry> --group.id <some id>"); + return; + } + config.setProperty("bootstrap.servers", parameterTool.getRequired("bootstrap.servers")); + config.setProperty("group.id", parameterTool.getRequired("group.id")); + config.setProperty("zookeeper.connect", parameterTool.getRequired("zookeeper.connect")); + String schemaRegistryUrl = parameterTool.getRequired("schema-registry-url"); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().disableSysoutLogging(); + + DataStreamSource<User> input = env + .addSource( + new FlinkKafkaConsumer010( + parameterTool.getRequired("input-topic"), + ConfluentRegistryAvroDeserializationSchema.forSpecific(User.class, schemaRegistryUrl), + config).setStartFromEarliest()); + + SingleOutputStreamOperator<String> mapToString = input + .map(new MapFunction<User, String>() { + @Override + public String map(User value) throws Exception { + return value.toString(); + } + }); + + FlinkKafkaProducer010<String> stringFlinkKafkaProducer010 = new FlinkKafkaProducer010( + parameterTool.getRequired("output-topic"), + new SimpleStringSchema(), + config); + + mapToString.addSink(stringFlinkKafkaProducer010); + env.execute("Kafka 0.10 Confluent Schema Registry AVRO Example"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a36b5699/flink-end-to-end-tests/pom.xml ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml index c169050..091dd0b 100644 --- a/flink-end-to-end-tests/pom.xml +++ b/flink-end-to-end-tests/pom.xml @@ -49,6 +49,7 @@ under the License. <module>flink-elasticsearch2-test</module> <module>flink-elasticsearch5-test</module> <module>flink-quickstart-test</module> + <module>flink-confluent-schema-registry</module> </modules> <build> http://git-wip-us.apache.org/repos/asf/flink/blob/a36b5699/flink-end-to-end-tests/run-nightly-tests.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index 302e27f..5873349 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -98,5 +98,7 @@ run_test "Elasticsearch (v5.1.2) sink end-to-end test" "$END_TO_END_DIR/test-scr run_test "Quickstarts Java nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh java" run_test "Quickstarts Scala nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh scala" +run_test "Avro Confluent Schema Registry nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test-confluent-schema-registry.sh" + printf "\n[PASS] All tests passed\n" exit 0 http://git-wip-us.apache.org/repos/asf/flink/blob/a36b5699/flink-end-to-end-tests/test-scripts/kafka-common.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/test-scripts/kafka-common.sh b/flink-end-to-end-tests/test-scripts/kafka-common.sh index 49ff4fe..50fba12 100644 --- a/flink-end-to-end-tests/test-scripts/kafka-common.sh +++ b/flink-end-to-end-tests/test-scripts/kafka-common.sh @@ -25,6 +25,7 @@ if [[ -z $TEST_DATA_DIR ]]; then fi KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0 +CONFLUENT_DIR=$TEST_DATA_DIR/confluent-3.2.0 function setup_kafka_dist { # download Kafka @@ -40,6 +41,16 @@ function setup_kafka_dist { sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" $KAFKA_DIR/config/server.properties } +function setup_confluent_dist { + # download confluent + mkdir -p $TEST_DATA_DIR + CONFLUENT_URL="http://packages.confluent.io/archive/3.2/confluent-oss-3.2.0-2.11.tar.gz" + echo "Downloading confluent from $CONFLUENT_URL" + curl "$CONFLUENT_URL" > $TEST_DATA_DIR/confluent.tgz + + tar xzf $TEST_DATA_DIR/confluent.tgz -C $TEST_DATA_DIR/ +} + function start_kafka_cluster { if [[ -z $KAFKA_DIR ]]; then echo "Must run 'setup_kafka_dist' before attempting to start Kafka cluster" @@ -76,6 +87,16 @@ function read_messages_from_kafka { --consumer-property group.id=$3 2> /dev/null } +function read_messages_from_kafka_avro { + $CONFLUENT_DIR/bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --from-beginning \ + --max-messages $1 \ + --topic $2 2> /dev/null +} + +function send_messages_to_kafka_avro { +echo -e $1 | $CONFLUENT_DIR/bin/kafka-avro-console-producer --broker-list localhost:9092 --topic $2 --property value.schema=$3 +} + function modify_num_partitions { $KAFKA_DIR/bin/kafka-topics.sh --alter --topic $1 --partitions $2 --zookeeper localhost:2181 } @@ -97,3 +118,11 @@ function get_partition_end_offset { | tr -s " " \ | cut -d " " -f 4 } + +function start_confluent_schema_registry { + $CONFLUENT_DIR/bin/schema-registry-start -daemon $CONFLUENT_DIR/etc/schema-registry/schema-registry.properties +} + +function stop_confluent_schema_registry { + $CONFLUENT_DIR/bin/schema-registry-stop +} http://git-wip-us.apache.org/repos/asf/flink/blob/a36b5699/flink-end-to-end-tests/test-scripts/test-confluent-schema-registry.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/test-scripts/test-confluent-schema-registry.sh b/flink-end-to-end-tests/test-scripts/test-confluent-schema-registry.sh new file mode 100755 index 0000000..5cebb72 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test-confluent-schema-registry.sh @@ -0,0 +1,106 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +source "$(dirname "$0")"/common.sh +source "$(dirname "$0")"/kafka-common.sh + +function verify_output { + local expected=$(printf $1) + local result=$(echo $2 | sed 's/ //g') + + if [[ "$result" != "$expected" ]]; then + echo "Output from Flink program does not match expected output." + echo -e "EXPECTED FOR KEY: --$expected--" + echo -e "ACTUAL: --$result--" + PASS="" + exit 1 + fi +} + +function test_cleanup { + # don't call ourselves again for another signal interruption + trap "exit -1" INT + # don't call ourselves again for normal exit + trap "" EXIT + + stop_kafka_cluster + stop_confluent_schema_registry + + # revert our modifications to the Flink distribution + mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml + + # make sure to run regular cleanup as well + cleanup +} + +trap test_cleanup INT +trap test_cleanup EXIT + +setup_kafka_dist +setup_confluent_dist + +cd flink-end-to-end-tests/flink-confluent-schema-registry +mvn clean package -nsu + +start_kafka_cluster +start_confluent_schema_registry +sleep 5 + +# modify configuration to use port 8082 for Flink +cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak +sed -i -e "s/web.port: 8081/web.port: 8082/" $FLINK_DIR/conf/flink-conf.yaml + +TEST_PROGRAM_JAR=target/TestAvroConsumerConfluent.jar + +INPUT_MESSAGE_1='{"name":"Alyssa","favoriteNumber":"250","favoriteColor":"green","eventType":"meeting"}' +INPUT_MESSAGE_2='{"name":"Charlie","favoriteNumber":"10","favoriteColor":"blue","eventType":"meeting"}' +INPUT_MESSAGE_3='{"name":"Ben","favoriteNumber":"7","favoriteColor":"red","eventType":"meeting"}' +USER_SCHEMA='{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string","default":""},{"name":"favoriteNumber","type":"string","default":""},{"name":"favoriteColor","type":"string","default":""},{"name":"eventType","type":{"name":"EventType","type":"enum","symbols":["meeting"]}}]}' + +curl -X POST \ + http://localhost:8081/subjects/users-value/versions \ + -H 'cache-control: no-cache' \ + -H 'content-type: application/vnd.schemaregistry.v1+json' \ + -d '{"schema": "{\"namespace\": \"example.avro\",\"type\": \"record\",\"name\": \"User\",\"fields\": [{\"name\": \"name\", \"type\": \"string\", \"default\": \"\"},{\"name\": \"favoriteNumber\", \"type\": \"string\", \"default\": \"\"},{\"name\": \"favoriteColor\", \"type\": \"string\", \"default\": \"\"},{\"name\": \"eventType\",\"type\": {\"name\": \"EventType\",\"type\": \"enum\", \"symbols\": [\"meeting\"] }}]}"}' + +echo "Sending messages to Kafka topic [test-avro-input] ..." + +send_messages_to_kafka_avro $INPUT_MESSAGE_1 test-avro-input $USER_SCHEMA +send_messages_to_kafka_avro $INPUT_MESSAGE_2 test-avro-input $USER_SCHEMA +send_messages_to_kafka_avro $INPUT_MESSAGE_3 test-avro-input $USER_SCHEMA + +start_cluster + +# Read Avro message from [test-avro-input], check the schema and send message to [test-avro-out] +$FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR \ + --input-topic test-avro-input --output-topic test-avro-out \ + --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer --auto.offset.reset earliest \ + --schema-registry-url http://localhost:8081 + +#echo "Reading messages from Kafka topic [test-avro-out] ..." + +KEY_1_MSGS=$(read_messages_from_kafka 3 test-avro-out Alyssa_consumer | grep Alyssa) +KEY_2_MSGS=$(read_messages_from_kafka 3 test-avro-out Charlie_consumer | grep Charlie) +KEY_3_MSGS=$(read_messages_from_kafka 3 test-avro-out Ben_consumer | grep Ben) + +## Verifying AVRO output with actual message +verify_output $INPUT_MESSAGE_1 "$KEY_1_MSGS" +verify_output $INPUT_MESSAGE_2 "$KEY_2_MSGS" +verify_output $INPUT_MESSAGE_3 "$KEY_3_MSGS" +
