[FLINK-8983] Integrate test_confluent_schema_registry.sh into 
run-nightly-tests.sh


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9b366045
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9b366045
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9b366045

Branch: refs/heads/master
Commit: 9b366045697f80534b8eb2e8b559f02d1452f0cf
Parents: a36b569
Author: Till Rohrmann <[email protected]>
Authored: Thu Jun 28 18:34:24 2018 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Thu Jun 28 18:34:24 2018 +0200

----------------------------------------------------------------------
 flink-end-to-end-tests/.gitignore               |   1 +
 .../flink-confluent-schema-registry/.gitignore  |   1 +
 .../flink-confluent-schema-registry/pom.xml     |   7 ++
 .../test/TestAvroConsumerConfluent.java         |  14 +--
 flink-end-to-end-tests/run-nightly-tests.sh     |   2 +-
 .../test-scripts/kafka-common.sh                |  28 +++--
 .../test-confluent-schema-registry.sh           | 106 -------------------
 .../test_confluent_schema_registry.sh           |  93 ++++++++++++++++
 8 files changed, 128 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9b366045/flink-end-to-end-tests/.gitignore
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/.gitignore 
b/flink-end-to-end-tests/.gitignore
new file mode 100644
index 0000000..3fc3a72
--- /dev/null
+++ b/flink-end-to-end-tests/.gitignore
@@ -0,0 +1 @@
+test-scripts/temp-test-directory*

http://git-wip-us.apache.org/repos/asf/flink/blob/9b366045/flink-end-to-end-tests/flink-confluent-schema-registry/.gitignore
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-confluent-schema-registry/.gitignore 
b/flink-end-to-end-tests/flink-confluent-schema-registry/.gitignore
new file mode 100644
index 0000000..b83a752
--- /dev/null
+++ b/flink-end-to-end-tests/flink-confluent-schema-registry/.gitignore
@@ -0,0 +1 @@
+src/main/java/example/avro

http://git-wip-us.apache.org/repos/asf/flink/blob/9b366045/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml 
b/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml
index 576fca6..2f63c22 100644
--- a/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml
+++ b/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml
@@ -150,6 +150,13 @@ under the License.
                                        </execution>
                                </executions>
                        </plugin>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-checkstyle-plugin</artifactId>
+                               <configuration>
+                                       <excludes>**/example/avro/*</excludes>
+                               </configuration>
+                       </plugin>
                </plugins>
        </build>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/9b366045/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java
 
b/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java
index 9149832..fbbe3c3 100644
--- 
a/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java
+++ 
b/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
 
 import example.avro.User;
+import org.apache.avro.specific.SpecificRecordBase;
 
 import java.util.Properties;
 
@@ -40,7 +41,6 @@ import java.util.Properties;
 public class TestAvroConsumerConfluent {
 
        public static void main(String[] args) throws Exception {
-               Properties config = new Properties();
                // parse input arguments
                final ParameterTool parameterTool = 
ParameterTool.fromArgs(args);
 
@@ -52,6 +52,7 @@ public class TestAvroConsumerConfluent {
                                "--schema-registry-url <confluent schema 
registry> --group.id <some id>");
                        return;
                }
+               Properties config = new Properties();
                config.setProperty("bootstrap.servers", 
parameterTool.getRequired("bootstrap.servers"));
                config.setProperty("group.id", 
parameterTool.getRequired("group.id"));
                config.setProperty("zookeeper.connect", 
parameterTool.getRequired("zookeeper.connect"));
@@ -62,20 +63,15 @@ public class TestAvroConsumerConfluent {
 
                DataStreamSource<User> input = env
                        .addSource(
-                               new FlinkKafkaConsumer010(
+                               new FlinkKafkaConsumer010<>(
                                        
parameterTool.getRequired("input-topic"),
                                        
ConfluentRegistryAvroDeserializationSchema.forSpecific(User.class, 
schemaRegistryUrl),
                                        config).setStartFromEarliest());
 
                SingleOutputStreamOperator<String> mapToString = input
-                       .map(new MapFunction<User, String>() {
-                               @Override
-                               public String map(User value) throws Exception {
-                                       return value.toString();
-                               }
-                       });
+                       .map((MapFunction<User, String>) 
SpecificRecordBase::toString);
 
-               FlinkKafkaProducer010<String> stringFlinkKafkaProducer010 = new 
FlinkKafkaProducer010(
+               FlinkKafkaProducer010<String> stringFlinkKafkaProducer010 = new 
FlinkKafkaProducer010<>(
                        parameterTool.getRequired("output-topic"),
                        new SimpleStringSchema(),
                        config);

http://git-wip-us.apache.org/repos/asf/flink/blob/9b366045/flink-end-to-end-tests/run-nightly-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh 
b/flink-end-to-end-tests/run-nightly-tests.sh
index 5873349..11eeafe 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -98,7 +98,7 @@ run_test "Elasticsearch (v5.1.2) sink end-to-end test" 
"$END_TO_END_DIR/test-scr
 run_test "Quickstarts Java nightly end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_quickstarts.sh java"
 run_test "Quickstarts Scala nightly end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_quickstarts.sh scala"
 
-run_test "Avro Confluent Schema Registry nightly end-to-end test" 
"$END_TO_END_DIR/test-scripts/test-confluent-schema-registry.sh"
+run_test "Avro Confluent Schema Registry nightly end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_confluent_schema_registry.sh"
 
 printf "\n[PASS] All tests passed\n"
 exit 0

http://git-wip-us.apache.org/repos/asf/flink/blob/9b366045/flink-end-to-end-tests/test-scripts/kafka-common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/kafka-common.sh 
b/flink-end-to-end-tests/test-scripts/kafka-common.sh
index 50fba12..f0a37b8 100644
--- a/flink-end-to-end-tests/test-scripts/kafka-common.sh
+++ b/flink-end-to-end-tests/test-scripts/kafka-common.sh
@@ -26,6 +26,8 @@ fi
 
 KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
 CONFLUENT_DIR=$TEST_DATA_DIR/confluent-3.2.0
+SCHEMA_REGISTRY_PORT=8082
+SCHEMA_REGISTRY_URL=http://localhost:${SCHEMA_REGISTRY_PORT}
 
 function setup_kafka_dist {
   # download Kafka
@@ -49,6 +51,9 @@ function setup_confluent_dist {
   curl "$CONFLUENT_URL" > $TEST_DATA_DIR/confluent.tgz
 
   tar xzf $TEST_DATA_DIR/confluent.tgz -C $TEST_DATA_DIR/
+
+  # fix confluent config
+  sed -i -e 
"s#listeners=http://0.0.0.0:8081#listeners=http://0.0.0.0:${SCHEMA_REGISTRY_PORT}#";
 $CONFLUENT_DIR/etc/schema-registry/schema-registry.properties
 }
 
 function start_kafka_cluster {
@@ -87,14 +92,8 @@ function read_messages_from_kafka {
     --consumer-property group.id=$3 2> /dev/null
 }
 
-function read_messages_from_kafka_avro {
-  $CONFLUENT_DIR/bin/kafka-avro-console-consumer --bootstrap-server 
localhost:9092 --from-beginning \
-    --max-messages $1 \
-    --topic $2 2> /dev/null
-}
-
 function send_messages_to_kafka_avro {
-echo -e $1 | $CONFLUENT_DIR/bin/kafka-avro-console-producer --broker-list 
localhost:9092 --topic $2 --property value.schema=$3
+echo -e $1 | $CONFLUENT_DIR/bin/kafka-avro-console-producer --broker-list 
localhost:9092 --topic $2 --property value.schema=$3 --property 
schema.registry.url=${SCHEMA_REGISTRY_URL}
 }
 
 function modify_num_partitions {
@@ -120,7 +119,20 @@ function get_partition_end_offset {
 }
 
 function start_confluent_schema_registry {
-    $CONFLUENT_DIR/bin/schema-registry-start -daemon 
$CONFLUENT_DIR/etc/schema-registry/schema-registry.properties
+  $CONFLUENT_DIR/bin/schema-registry-start -daemon 
$CONFLUENT_DIR/etc/schema-registry/schema-registry.properties
+
+  # wait until the schema registry REST endpoint is up
+  for i in {1..30}; do
+    QUERY_RESULT=$(curl "${SCHEMA_REGISTRY_URL}/subjects" 2> /dev/null || true)
+
+    if [[ ${QUERY_RESULT} =~ \[.*\] ]]; then
+        echo "Schema registry is up."
+        break
+    fi
+
+    echo "Waiting for schema registry..."
+    sleep 1
+  done
 }
 
 function stop_confluent_schema_registry {

http://git-wip-us.apache.org/repos/asf/flink/blob/9b366045/flink-end-to-end-tests/test-scripts/test-confluent-schema-registry.sh
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/test-scripts/test-confluent-schema-registry.sh 
b/flink-end-to-end-tests/test-scripts/test-confluent-schema-registry.sh
deleted file mode 100755
index 5cebb72..0000000
--- a/flink-end-to-end-tests/test-scripts/test-confluent-schema-registry.sh
+++ /dev/null
@@ -1,106 +0,0 @@
-#!/usr/bin/env bash
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-source "$(dirname "$0")"/common.sh
-source "$(dirname "$0")"/kafka-common.sh
-
-function verify_output {
-  local expected=$(printf $1)
-  local result=$(echo $2 | sed 's/ //g')
-
-  if [[ "$result" != "$expected" ]]; then
-    echo "Output from Flink program does not match expected output."
-    echo -e "EXPECTED FOR KEY: --$expected--"
-    echo -e "ACTUAL: --$result--"
-    PASS=""
-    exit 1
-  fi
-}
-
-function test_cleanup {
-  # don't call ourselves again for another signal interruption
-  trap "exit -1" INT
-  # don't call ourselves again for normal exit
-  trap "" EXIT
-
-  stop_kafka_cluster
-  stop_confluent_schema_registry
-
-  # revert our modifications to the Flink distribution
-  mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml
-
-  # make sure to run regular cleanup as well
-  cleanup
-}
-
-trap test_cleanup INT
-trap test_cleanup EXIT
-
-setup_kafka_dist
-setup_confluent_dist
-
-cd flink-end-to-end-tests/flink-confluent-schema-registry
-mvn clean package -nsu
-
-start_kafka_cluster
-start_confluent_schema_registry
-sleep 5
-
-# modify configuration to use port 8082 for Flink
-cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak
-sed -i -e "s/web.port: 8081/web.port: 8082/" $FLINK_DIR/conf/flink-conf.yaml
-
-TEST_PROGRAM_JAR=target/TestAvroConsumerConfluent.jar
-
-INPUT_MESSAGE_1='{"name":"Alyssa","favoriteNumber":"250","favoriteColor":"green","eventType":"meeting"}'
-INPUT_MESSAGE_2='{"name":"Charlie","favoriteNumber":"10","favoriteColor":"blue","eventType":"meeting"}'
-INPUT_MESSAGE_3='{"name":"Ben","favoriteNumber":"7","favoriteColor":"red","eventType":"meeting"}'
-USER_SCHEMA='{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string","default":""},{"name":"favoriteNumber","type":"string","default":""},{"name":"favoriteColor","type":"string","default":""},{"name":"eventType","type":{"name":"EventType","type":"enum","symbols":["meeting"]}}]}'
-
-curl -X POST \
-  http://localhost:8081/subjects/users-value/versions \
-  -H 'cache-control: no-cache' \
-  -H 'content-type: application/vnd.schemaregistry.v1+json' \
-  -d '{"schema": "{\"namespace\": \"example.avro\",\"type\": 
\"record\",\"name\": \"User\",\"fields\": [{\"name\": \"name\", \"type\": 
\"string\", \"default\": \"\"},{\"name\": \"favoriteNumber\",  \"type\": 
\"string\", \"default\": \"\"},{\"name\": \"favoriteColor\", \"type\": 
\"string\", \"default\": \"\"},{\"name\": \"eventType\",\"type\": {\"name\": 
\"EventType\",\"type\": \"enum\", \"symbols\": [\"meeting\"] }}]}"}'
-
-echo "Sending messages to Kafka topic [test-avro-input] ..."
-
-send_messages_to_kafka_avro $INPUT_MESSAGE_1 test-avro-input $USER_SCHEMA
-send_messages_to_kafka_avro $INPUT_MESSAGE_2 test-avro-input $USER_SCHEMA
-send_messages_to_kafka_avro $INPUT_MESSAGE_3 test-avro-input $USER_SCHEMA
-
-start_cluster
-
-# Read Avro message from [test-avro-input], check the schema and send message 
to [test-avro-out]
-$FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR \
-  --input-topic test-avro-input --output-topic test-avro-out \
-  --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 
--group.id myconsumer --auto.offset.reset earliest \
-  --schema-registry-url http://localhost:8081
-
-#echo "Reading messages from Kafka topic [test-avro-out] ..."
-
-KEY_1_MSGS=$(read_messages_from_kafka 3 test-avro-out  Alyssa_consumer | grep 
Alyssa)
-KEY_2_MSGS=$(read_messages_from_kafka 3 test-avro-out Charlie_consumer | grep 
Charlie)
-KEY_3_MSGS=$(read_messages_from_kafka 3 test-avro-out Ben_consumer | grep Ben)
-
-## Verifying AVRO output with actual message
-verify_output $INPUT_MESSAGE_1 "$KEY_1_MSGS"
-verify_output $INPUT_MESSAGE_2 "$KEY_2_MSGS"
-verify_output $INPUT_MESSAGE_3 "$KEY_3_MSGS"
-

http://git-wip-us.apache.org/repos/asf/flink/blob/9b366045/flink-end-to-end-tests/test-scripts/test_confluent_schema_registry.sh
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/test-scripts/test_confluent_schema_registry.sh 
b/flink-end-to-end-tests/test-scripts/test_confluent_schema_registry.sh
new file mode 100755
index 0000000..be0b014
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_confluent_schema_registry.sh
@@ -0,0 +1,93 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/kafka-common.sh
+
+function verify_output {
+  local expected=$(printf $1)
+  local result=$(echo $2 | sed 's/ //g')
+
+  if [[ "$result" != "$expected" ]]; then
+    echo "Output from Flink program does not match expected output."
+    echo -e "EXPECTED FOR KEY: --$expected--"
+    echo -e "ACTUAL: --$result--"
+    exit 1
+  fi
+}
+
+function test_cleanup {
+  # don't call ourselves again for another signal interruption
+  trap "exit -1" INT
+  # don't call ourselves again for normal exit
+  trap "" EXIT
+
+  stop_kafka_cluster
+  stop_confluent_schema_registry
+}
+
+trap test_cleanup INT
+trap test_cleanup EXIT
+
+setup_kafka_dist
+setup_confluent_dist
+
+start_kafka_cluster
+start_confluent_schema_registry
+
+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-confluent-schema-registry/target/TestAvroConsumerConfluent.jar
+
+INPUT_MESSAGE_1='{"name":"Alyssa","favoriteNumber":"250","favoriteColor":"green","eventType":"meeting"}'
+INPUT_MESSAGE_2='{"name":"Charlie","favoriteNumber":"10","favoriteColor":"blue","eventType":"meeting"}'
+INPUT_MESSAGE_3='{"name":"Ben","favoriteNumber":"7","favoriteColor":"red","eventType":"meeting"}'
+USER_SCHEMA='{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string","default":""},{"name":"favoriteNumber","type":"string","default":""},{"name":"favoriteColor","type":"string","default":""},{"name":"eventType","type":{"name":"EventType","type":"enum","symbols":["meeting"]}}]}'
+
+curl -X POST \
+  ${SCHEMA_REGISTRY_URL}/subjects/users-value/versions \
+  -H 'cache-control: no-cache' \
+  -H 'content-type: application/vnd.schemaregistry.v1+json' \
+  -d '{"schema": "{\"namespace\": \"example.avro\",\"type\": 
\"record\",\"name\": \"User\",\"fields\": [{\"name\": \"name\", \"type\": 
\"string\", \"default\": \"\"},{\"name\": \"favoriteNumber\",  \"type\": 
\"string\", \"default\": \"\"},{\"name\": \"favoriteColor\", \"type\": 
\"string\", \"default\": \"\"},{\"name\": \"eventType\",\"type\": {\"name\": 
\"EventType\",\"type\": \"enum\", \"symbols\": [\"meeting\"] }}]}"}'
+
+echo "Sending messages to Kafka topic [test-avro-input] ..."
+
+send_messages_to_kafka_avro $INPUT_MESSAGE_1 test-avro-input $USER_SCHEMA
+send_messages_to_kafka_avro $INPUT_MESSAGE_2 test-avro-input $USER_SCHEMA
+send_messages_to_kafka_avro $INPUT_MESSAGE_3 test-avro-input $USER_SCHEMA
+
+start_cluster
+
+create_kafka_topic 1 1 test-avro-out
+
+# Read Avro message from [test-avro-input], check the schema and send message 
to [test-avro-out]
+$FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR \
+  --input-topic test-avro-input --output-topic test-avro-out \
+  --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 
--group.id myconsumer --auto.offset.reset earliest \
+  --schema-registry-url ${SCHEMA_REGISTRY_URL}
+
+#echo "Reading messages from Kafka topic [test-avro-out] ..."
+
+KEY_1_MSGS=$(read_messages_from_kafka 3 test-avro-out  Alyssa_consumer | grep 
Alyssa)
+KEY_2_MSGS=$(read_messages_from_kafka 3 test-avro-out Charlie_consumer | grep 
Charlie)
+KEY_3_MSGS=$(read_messages_from_kafka 3 test-avro-out Ben_consumer | grep Ben)
+
+## Verifying AVRO output with actual message
+verify_output $INPUT_MESSAGE_1 "$KEY_1_MSGS"
+verify_output $INPUT_MESSAGE_2 "$KEY_2_MSGS"
+verify_output $INPUT_MESSAGE_3 "$KEY_3_MSGS"
+

Reply via email to