Repository: flink
Updated Branches:
  refs/heads/master da37daa8b -> 9b3660456


[FLINK-8983] End-to-end test: Confluent schema registry

This closes #6083.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a36b5699
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a36b5699
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a36b5699

Branch: refs/heads/master
Commit: a36b56999240d1ead0793be7acb4ad13cd0559f2
Parents: da37daa
Author: Yadan.JS <[email protected]>
Authored: Tue Jun 12 10:03:58 2018 -0400
Committer: Till Rohrmann <[email protected]>
Committed: Thu Jun 28 13:43:50 2018 +0200

----------------------------------------------------------------------
 .../flink-confluent-schema-registry/pom.xml     | 155 +++++++++++++++++++
 .../src/main/avro/user.avsc                     |  27 ++++
 .../test/TestAvroConsumerConfluent.java         |  86 ++++++++++
 flink-end-to-end-tests/pom.xml                  |   1 +
 flink-end-to-end-tests/run-nightly-tests.sh     |   2 +
 .../test-scripts/kafka-common.sh                |  29 ++++
 .../test-confluent-schema-registry.sh           | 106 +++++++++++++
 7 files changed, 406 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a36b5699/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml 
b/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml
new file mode 100644
index 0000000..576fca6
--- /dev/null
+++ b/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml
@@ -0,0 +1,155 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <parent>
+               <artifactId>flink-end-to-end-tests</artifactId>
+               <groupId>org.apache.flink</groupId>
+               <version>1.6-SNAPSHOT</version>
+       </parent>
+       <modelVersion>4.0.0</modelVersion>
+
+       <artifactId>flink-confluent-schema-registry</artifactId>
+       <properties>
+               
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+               <confluent.version>4.1.0</confluent.version>
+       </properties>
+
+       <repositories>
+               <repository>
+                       <id>confluent</id>
+                       <url>http://packages.confluent.io/maven/</url>
+               </repository>
+       </repositories>
+
+       <dependencies>
+               <!-- Apache Flink dependencies -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <!-- This dependency is required to actually execute 
jobs. It is currently pulled in by
+                flink-streaming-java, but we explicitly depend on it to 
safeguard against future changes. -->
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-clients_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <!-- 
https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.10 
-->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-avro</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-avro-confluent-registry</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-shade-plugin</artifactId>
+                               <version>3.0.0</version>
+                               <executions>
+                                       <execution>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>shade</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<finalName>TestAvroConsumerConfluent</finalName>
+                                                       <artifactSet>
+                                                               <excludes>
+                                                                       
<exclude>com.google.code.findbugs:jsr305</exclude>
+                                                               </excludes>
+                                                       </artifactSet>
+                                                       <filters>
+                                                               <filter>
+                                                                       
<artifact>*:*</artifact>
+                                                                       
<excludes>
+                                                                               
<exclude>META-INF/*.SF</exclude>
+                                                                               
<exclude>META-INF/*.DSA</exclude>
+                                                                               
<exclude>META-INF/*.RSA</exclude>
+                                                                       
</excludes>
+                                                               </filter>
+                                                       </filters>
+                                                       <transformers>
+                                                               <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                                                       
<mainClass>org.apache.flink.schema.registry.test.TestAvroConsumerConfluent</mainClass>
+                                                               </transformer>
+                                                       </transformers>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+                       <plugin>
+                               <groupId>org.apache.avro</groupId>
+                               <artifactId>avro-maven-plugin</artifactId>
+                               <version>${avro.version}</version>
+                               <executions>
+                                       <execution>
+                                               <phase>generate-sources</phase>
+                                               <goals>
+                                                       <goal>schema</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
+                                                       
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
+                                                       
<fieldVisibility>PRIVATE</fieldVisibility>
+                                                       <includes>
+                                                               
<include>**/*.avsc</include>
+                                                       </includes>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-enforcer-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <id>dependency-convergence</id>
+                                               <goals>
+                                                       <goal>enforce</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <skip>true</skip>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/a36b5699/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/avro/user.avsc
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/avro/user.avsc
 
b/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/avro/user.avsc
new file mode 100644
index 0000000..aca9b83
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/avro/user.avsc
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ {"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string", "default": ""},
+     {"name": "favoriteNumber",  "type": "string", "default": ""},
+     {"name": "favoriteColor", "type": "string", "default": ""},
+     {"name": "eventType","type": {"name": "EventType","type": "enum", 
"symbols": ["meeting"] }}
+ ]
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a36b5699/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java
 
b/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java
new file mode 100644
index 0000000..9149832
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.schema.registry.test;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.java.utils.ParameterTool;
+import 
org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
+
+import example.avro.User;
+
+import java.util.Properties;
+
+/**
+ * A simple example that shows how to read from and write to Kafka with 
Confluent Schema Registry.
+ * This will read AVRO messages from the input topic, parse them into a POJO 
type via checking the Schema by calling Schema registry.
+ * Then this example publish the POJO type to kafka by converting the POJO to 
AVRO and verifying the schema.
+ * --input-topic test-input --output-topic test-output --bootstrap.servers 
localhost:9092 --zookeeper.connect localhost:2181 --schema-registry-url 
http://localhost:8081 --group.id myconsumer
+ */
+public class TestAvroConsumerConfluent {
+
+       public static void main(String[] args) throws Exception {
+               Properties config = new Properties();
+               // parse input arguments
+               final ParameterTool parameterTool = 
ParameterTool.fromArgs(args);
+
+               if (parameterTool.getNumberOfParameters() < 6) {
+                       System.out.println("Missing parameters!\n" +
+                               "Usage: Kafka --input-topic <topic> 
--output-topic <topic> " +
+                               "--bootstrap.servers <kafka brokers> " +
+                               "--zookeeper.connect <zk quorum> " +
+                               "--schema-registry-url <confluent schema 
registry> --group.id <some id>");
+                       return;
+               }
+               config.setProperty("bootstrap.servers", 
parameterTool.getRequired("bootstrap.servers"));
+               config.setProperty("group.id", 
parameterTool.getRequired("group.id"));
+               config.setProperty("zookeeper.connect", 
parameterTool.getRequired("zookeeper.connect"));
+               String schemaRegistryUrl = 
parameterTool.getRequired("schema-registry-url");
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.getConfig().disableSysoutLogging();
+
+               DataStreamSource<User> input = env
+                       .addSource(
+                               new FlinkKafkaConsumer010(
+                                       
parameterTool.getRequired("input-topic"),
+                                       
ConfluentRegistryAvroDeserializationSchema.forSpecific(User.class, 
schemaRegistryUrl),
+                                       config).setStartFromEarliest());
+
+               SingleOutputStreamOperator<String> mapToString = input
+                       .map(new MapFunction<User, String>() {
+                               @Override
+                               public String map(User value) throws Exception {
+                                       return value.toString();
+                               }
+                       });
+
+               FlinkKafkaProducer010<String> stringFlinkKafkaProducer010 = new 
FlinkKafkaProducer010(
+                       parameterTool.getRequired("output-topic"),
+                       new SimpleStringSchema(),
+                       config);
+
+               mapToString.addSink(stringFlinkKafkaProducer010);
+               env.execute("Kafka 0.10 Confluent Schema Registry AVRO 
Example");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a36b5699/flink-end-to-end-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index c169050..091dd0b 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -49,6 +49,7 @@ under the License.
                <module>flink-elasticsearch2-test</module>
                <module>flink-elasticsearch5-test</module>
                <module>flink-quickstart-test</module>
+               <module>flink-confluent-schema-registry</module>
        </modules>
 
        <build>

http://git-wip-us.apache.org/repos/asf/flink/blob/a36b5699/flink-end-to-end-tests/run-nightly-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh 
b/flink-end-to-end-tests/run-nightly-tests.sh
index 302e27f..5873349 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -98,5 +98,7 @@ run_test "Elasticsearch (v5.1.2) sink end-to-end test" 
"$END_TO_END_DIR/test-scr
 run_test "Quickstarts Java nightly end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_quickstarts.sh java"
 run_test "Quickstarts Scala nightly end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_quickstarts.sh scala"
 
+run_test "Avro Confluent Schema Registry nightly end-to-end test" 
"$END_TO_END_DIR/test-scripts/test-confluent-schema-registry.sh"
+
 printf "\n[PASS] All tests passed\n"
 exit 0

http://git-wip-us.apache.org/repos/asf/flink/blob/a36b5699/flink-end-to-end-tests/test-scripts/kafka-common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/kafka-common.sh 
b/flink-end-to-end-tests/test-scripts/kafka-common.sh
index 49ff4fe..50fba12 100644
--- a/flink-end-to-end-tests/test-scripts/kafka-common.sh
+++ b/flink-end-to-end-tests/test-scripts/kafka-common.sh
@@ -25,6 +25,7 @@ if [[ -z $TEST_DATA_DIR ]]; then
 fi
 
 KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
+CONFLUENT_DIR=$TEST_DATA_DIR/confluent-3.2.0
 
 function setup_kafka_dist {
   # download Kafka
@@ -40,6 +41,16 @@ function setup_kafka_dist {
   sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
 }
 
+function setup_confluent_dist {
+  # download confluent
+  mkdir -p $TEST_DATA_DIR
+  
CONFLUENT_URL="http://packages.confluent.io/archive/3.2/confluent-oss-3.2.0-2.11.tar.gz";
+  echo "Downloading confluent from $CONFLUENT_URL"
+  curl "$CONFLUENT_URL" > $TEST_DATA_DIR/confluent.tgz
+
+  tar xzf $TEST_DATA_DIR/confluent.tgz -C $TEST_DATA_DIR/
+}
+
 function start_kafka_cluster {
   if [[ -z $KAFKA_DIR ]]; then
     echo "Must run 'setup_kafka_dist' before attempting to start Kafka cluster"
@@ -76,6 +87,16 @@ function read_messages_from_kafka {
     --consumer-property group.id=$3 2> /dev/null
 }
 
+function read_messages_from_kafka_avro {
+  $CONFLUENT_DIR/bin/kafka-avro-console-consumer --bootstrap-server 
localhost:9092 --from-beginning \
+    --max-messages $1 \
+    --topic $2 2> /dev/null
+}
+
+function send_messages_to_kafka_avro {
+echo -e $1 | $CONFLUENT_DIR/bin/kafka-avro-console-producer --broker-list 
localhost:9092 --topic $2 --property value.schema=$3
+}
+
 function modify_num_partitions {
   $KAFKA_DIR/bin/kafka-topics.sh --alter --topic $1 --partitions $2 
--zookeeper localhost:2181
 }
@@ -97,3 +118,11 @@ function get_partition_end_offset {
     | tr -s " " \
     | cut -d " " -f 4
 }
+
+function start_confluent_schema_registry {
+    $CONFLUENT_DIR/bin/schema-registry-start -daemon 
$CONFLUENT_DIR/etc/schema-registry/schema-registry.properties
+}
+
+function stop_confluent_schema_registry {
+    $CONFLUENT_DIR/bin/schema-registry-stop
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a36b5699/flink-end-to-end-tests/test-scripts/test-confluent-schema-registry.sh
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/test-scripts/test-confluent-schema-registry.sh 
b/flink-end-to-end-tests/test-scripts/test-confluent-schema-registry.sh
new file mode 100755
index 0000000..5cebb72
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-confluent-schema-registry.sh
@@ -0,0 +1,106 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/kafka-common.sh
+
+function verify_output {
+  local expected=$(printf $1)
+  local result=$(echo $2 | sed 's/ //g')
+
+  if [[ "$result" != "$expected" ]]; then
+    echo "Output from Flink program does not match expected output."
+    echo -e "EXPECTED FOR KEY: --$expected--"
+    echo -e "ACTUAL: --$result--"
+    PASS=""
+    exit 1
+  fi
+}
+
+function test_cleanup {
+  # don't call ourselves again for another signal interruption
+  trap "exit -1" INT
+  # don't call ourselves again for normal exit
+  trap "" EXIT
+
+  stop_kafka_cluster
+  stop_confluent_schema_registry
+
+  # revert our modifications to the Flink distribution
+  mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml
+
+  # make sure to run regular cleanup as well
+  cleanup
+}
+
+trap test_cleanup INT
+trap test_cleanup EXIT
+
+setup_kafka_dist
+setup_confluent_dist
+
+cd flink-end-to-end-tests/flink-confluent-schema-registry
+mvn clean package -nsu
+
+start_kafka_cluster
+start_confluent_schema_registry
+sleep 5
+
+# modify configuration to use port 8082 for Flink
+cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak
+sed -i -e "s/web.port: 8081/web.port: 8082/" $FLINK_DIR/conf/flink-conf.yaml
+
+TEST_PROGRAM_JAR=target/TestAvroConsumerConfluent.jar
+
+INPUT_MESSAGE_1='{"name":"Alyssa","favoriteNumber":"250","favoriteColor":"green","eventType":"meeting"}'
+INPUT_MESSAGE_2='{"name":"Charlie","favoriteNumber":"10","favoriteColor":"blue","eventType":"meeting"}'
+INPUT_MESSAGE_3='{"name":"Ben","favoriteNumber":"7","favoriteColor":"red","eventType":"meeting"}'
+USER_SCHEMA='{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string","default":""},{"name":"favoriteNumber","type":"string","default":""},{"name":"favoriteColor","type":"string","default":""},{"name":"eventType","type":{"name":"EventType","type":"enum","symbols":["meeting"]}}]}'
+
+curl -X POST \
+  http://localhost:8081/subjects/users-value/versions \
+  -H 'cache-control: no-cache' \
+  -H 'content-type: application/vnd.schemaregistry.v1+json' \
+  -d '{"schema": "{\"namespace\": \"example.avro\",\"type\": 
\"record\",\"name\": \"User\",\"fields\": [{\"name\": \"name\", \"type\": 
\"string\", \"default\": \"\"},{\"name\": \"favoriteNumber\",  \"type\": 
\"string\", \"default\": \"\"},{\"name\": \"favoriteColor\", \"type\": 
\"string\", \"default\": \"\"},{\"name\": \"eventType\",\"type\": {\"name\": 
\"EventType\",\"type\": \"enum\", \"symbols\": [\"meeting\"] }}]}"}'
+
+echo "Sending messages to Kafka topic [test-avro-input] ..."
+
+send_messages_to_kafka_avro $INPUT_MESSAGE_1 test-avro-input $USER_SCHEMA
+send_messages_to_kafka_avro $INPUT_MESSAGE_2 test-avro-input $USER_SCHEMA
+send_messages_to_kafka_avro $INPUT_MESSAGE_3 test-avro-input $USER_SCHEMA
+
+start_cluster
+
+# Read Avro message from [test-avro-input], check the schema and send message 
to [test-avro-out]
+$FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR \
+  --input-topic test-avro-input --output-topic test-avro-out \
+  --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 
--group.id myconsumer --auto.offset.reset earliest \
+  --schema-registry-url http://localhost:8081
+
+#echo "Reading messages from Kafka topic [test-avro-out] ..."
+
+KEY_1_MSGS=$(read_messages_from_kafka 3 test-avro-out  Alyssa_consumer | grep 
Alyssa)
+KEY_2_MSGS=$(read_messages_from_kafka 3 test-avro-out Charlie_consumer | grep 
Charlie)
+KEY_3_MSGS=$(read_messages_from_kafka 3 test-avro-out Ben_consumer | grep Ben)
+
+## Verifying AVRO output with actual message
+verify_output $INPUT_MESSAGE_1 "$KEY_1_MSGS"
+verify_output $INPUT_MESSAGE_2 "$KEY_2_MSGS"
+verify_output $INPUT_MESSAGE_3 "$KEY_3_MSGS"
+

Reply via email to