This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new bc4c0fcb3d [HUDI-2673] Add kafka connect bundle to validation test 
(#7131)
bc4c0fcb3d is described below

commit bc4c0fcb3d619375a91cc7cfeee3a23d811a77b0
Author: Shiyan Xu <[email protected]>
AuthorDate: Tue Nov 8 02:25:49 2022 +0800

    [HUDI-2673] Add kafka connect bundle to validation test (#7131)
---
 hudi-kafka-connect/demo/setupKafka.sh              |  7 ++--
 packaging/bundle-validation/Dockerfile-base        | 16 ++++++-
 packaging/bundle-validation/ci_run.sh              | 11 +++++
 packaging/bundle-validation/kafka/config-sink.json | 20 +++++++++
 packaging/bundle-validation/kafka/consume.sh       | 34 +++++++++++++++
 packaging/bundle-validation/kafka/produce.sh       | 30 +++++++++++++
 packaging/bundle-validation/validate.sh            | 49 +++++++++++++++++++---
 7 files changed, 157 insertions(+), 10 deletions(-)

diff --git a/hudi-kafka-connect/demo/setupKafka.sh 
b/hudi-kafka-connect/demo/setupKafka.sh
index 5c618b2a70..e4e8d2e382 100755
--- a/hudi-kafka-connect/demo/setupKafka.sh
+++ b/hudi-kafka-connect/demo/setupKafka.sh
@@ -1,3 +1,4 @@
+#!/bin/bash
 # Licensed to the Apache Software Foundation (ASF) under one
 #  or more contributor license agreements.  See the NOTICE file
 #  distributed with this work for additional information
@@ -14,8 +15,6 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
-#!/bin/bash
-
 #########################
 # The command line help #
 #########################
@@ -79,11 +78,11 @@ while getopts ":n:b:tf:k:m:r:o:l:p:s:-:" opt; do
     recreateTopic="N"
     printf "Argument recreate-topic is N (reuse Kafka topic) \n"
     ;;
-  k)
+  f)
     rawDataFile="$OPTARG"
     printf "Argument raw-file is %s\n" "$rawDataFile"
     ;;
-  f)
+  k)
     kafkaTopicName="$OPTARG"
     printf "Argument kafka-topic is %s\n" "$kafkaTopicName"
     ;;
diff --git a/packaging/bundle-validation/Dockerfile-base 
b/packaging/bundle-validation/Dockerfile-base
index 1e782e08d5..81df6ce0c0 100644
--- a/packaging/bundle-validation/Dockerfile-base
+++ b/packaging/bundle-validation/Dockerfile-base
@@ -16,7 +16,7 @@
 #
 FROM adoptopenjdk/openjdk8:alpine
 
-RUN apk add --no-cache --upgrade bash
+RUN apk add --no-cache --upgrade bash curl jq
 
 RUN mkdir /opt/bundle-validation
 ENV WORKDIR=/opt/bundle-validation
@@ -27,6 +27,8 @@ ARG HIVE_VERSION=3.1.3
 ARG DERBY_VERSION=10.14.1.0
 ARG SPARK_VERSION=3.1.3
 ARG SPARK_HADOOP_VERSION=2.7
+ARG CONFLUENT_VERSION=5.5.12
+ARG KAFKA_CONNECT_HDFS_VERSION=10.1.13
 
 RUN wget 
https://archive.apache.org/dist/hadoop/common/hadoop-$HADOOP_VERSION/hadoop-$HADOOP_VERSION.tar.gz
 -P "$WORKDIR" \
     && tar -xf $WORKDIR/hadoop-$HADOOP_VERSION.tar.gz -C $WORKDIR/ \
@@ -47,3 +49,15 @@ RUN wget 
https://archive.apache.org/dist/spark/spark-$SPARK_VERSION/spark-$SPARK
     && tar -xf 
$WORKDIR/spark-$SPARK_VERSION-bin-hadoop$SPARK_HADOOP_VERSION.tgz -C $WORKDIR/ \
     && rm $WORKDIR/spark-$SPARK_VERSION-bin-hadoop$SPARK_HADOOP_VERSION.tgz
 ENV SPARK_HOME=$WORKDIR/spark-$SPARK_VERSION-bin-hadoop$SPARK_HADOOP_VERSION
+
+RUN wget 
https://packages.confluent.io/archive/${CONFLUENT_VERSION%.*}/confluent-community-$CONFLUENT_VERSION-2.12.tar.gz
 -P "$WORKDIR" \
+    && tar -xf $WORKDIR/confluent-community-$CONFLUENT_VERSION-2.12.tar.gz -C 
$WORKDIR/ \
+    && rm $WORKDIR/confluent-community-$CONFLUENT_VERSION-2.12.tar.gz
+ENV CONFLUENT_HOME=$WORKDIR/confluent-$CONFLUENT_VERSION
+
+RUN wget 
https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-hdfs/versions/$KAFKA_CONNECT_HDFS_VERSION/confluentinc-kafka-connect-hdfs-$KAFKA_CONNECT_HDFS_VERSION.zip
 -P "$WORKDIR" \
+    && mkdir $WORKDIR/kafka-connectors \
+    && unzip 
$WORKDIR/confluentinc-kafka-connect-hdfs-$KAFKA_CONNECT_HDFS_VERSION.zip -d 
$WORKDIR/kafka-connectors/ \
+    && rm 
$WORKDIR/confluentinc-kafka-connect-hdfs-$KAFKA_CONNECT_HDFS_VERSION.zip \
+    && printf "\nplugin.path=$WORKDIR/kafka-connectors\n" >> 
$CONFLUENT_HOME/etc/kafka/connect-distributed.properties
+ENV 
KAFKA_CONNECT_PLUGIN_PATH_LIB_PATH=$WORKDIR/kafka-connectors/confluentinc-kafka-connect-hdfs-$KAFKA_CONNECT_HDFS_VERSION/lib
diff --git a/packaging/bundle-validation/ci_run.sh 
b/packaging/bundle-validation/ci_run.sh
index 4cc795f405..46ef80964b 100755
--- a/packaging/bundle-validation/ci_run.sh
+++ b/packaging/bundle-validation/ci_run.sh
@@ -36,6 +36,8 @@ if [[ ${SPARK_PROFILE} == 'spark2.4' ]]; then
   DERBY_VERSION=10.10.2.0
   SPARK_VERSION=2.4.8
   SPARK_HADOOP_VERSION=2.7
+  CONFLUENT_VERSION=5.5.12
+  KAFKA_CONNECT_HDFS_VERSION=10.1.13
   IMAGE_TAG=spark248hive239
 elif [[ ${SPARK_PROFILE} == 'spark3.1' ]]; then
   HADOOP_VERSION=2.7.7
@@ -43,6 +45,8 @@ elif [[ ${SPARK_PROFILE} == 'spark3.1' ]]; then
   DERBY_VERSION=10.14.1.0
   SPARK_VERSION=3.1.3
   SPARK_HADOOP_VERSION=2.7
+  CONFLUENT_VERSION=5.5.12
+  KAFKA_CONNECT_HDFS_VERSION=10.1.13
   IMAGE_TAG=spark313hive313
 elif [[ ${SPARK_PROFILE} == 'spark3.2' ]]; then
   HADOOP_VERSION=2.7.7
@@ -50,6 +54,8 @@ elif [[ ${SPARK_PROFILE} == 'spark3.2' ]]; then
   DERBY_VERSION=10.14.1.0
   SPARK_VERSION=3.2.2
   SPARK_HADOOP_VERSION=2.7
+  CONFLUENT_VERSION=5.5.12
+  KAFKA_CONNECT_HDFS_VERSION=10.1.13
   IMAGE_TAG=spark322hive313
 elif [[ ${SPARK_PROFILE} == 'spark3.3' ]]; then
   HADOOP_VERSION=2.7.7
@@ -57,6 +63,8 @@ elif [[ ${SPARK_PROFILE} == 'spark3.3' ]]; then
   DERBY_VERSION=10.14.1.0
   SPARK_VERSION=3.3.0
   SPARK_HADOOP_VERSION=2
+  CONFLUENT_VERSION=5.5.12
+  KAFKA_CONNECT_HDFS_VERSION=10.1.13
   IMAGE_TAG=spark330hive313
 fi
 
@@ -67,6 +75,7 @@ cp 
${GITHUB_WORKSPACE}/packaging/hudi-hadoop-mr-bundle/target/hudi-*-$HUDI_VERSI
 cp 
${GITHUB_WORKSPACE}/packaging/hudi-spark-bundle/target/hudi-*-$HUDI_VERSION.jar 
$TMP_JARS_DIR/
 cp 
${GITHUB_WORKSPACE}/packaging/hudi-utilities-bundle/target/hudi-*-$HUDI_VERSION.jar
 $TMP_JARS_DIR/
 cp 
${GITHUB_WORKSPACE}/packaging/hudi-utilities-slim-bundle/target/hudi-*-$HUDI_VERSION.jar
 $TMP_JARS_DIR/
+cp 
${GITHUB_WORKSPACE}/packaging/hudi-kafka-connect-bundle/target/hudi-*-$HUDI_VERSION.jar
 $TMP_JARS_DIR/
 echo 'Validating jars below:'
 ls -l $TMP_JARS_DIR
 
@@ -84,6 +93,8 @@ docker build \
 --build-arg DERBY_VERSION=$DERBY_VERSION \
 --build-arg SPARK_VERSION=$SPARK_VERSION \
 --build-arg SPARK_HADOOP_VERSION=$SPARK_HADOOP_VERSION \
+--build-arg CONFLUENT_VERSION=$CONFLUENT_VERSION \
+--build-arg KAFKA_CONNECT_HDFS_VERSION=$KAFKA_CONNECT_HDFS_VERSION \
 --build-arg IMAGE_TAG=$IMAGE_TAG \
 -t hudi-ci-bundle-validation:$IMAGE_TAG \
 .
diff --git a/packaging/bundle-validation/kafka/config-sink.json 
b/packaging/bundle-validation/kafka/config-sink.json
new file mode 100644
index 0000000000..0318ec961a
--- /dev/null
+++ b/packaging/bundle-validation/kafka/config-sink.json
@@ -0,0 +1,20 @@
+{
+  "name": "hudi-sink",
+  "config": {
+    "bootstrap.servers": "localhost:9092",
+    "connector.class": "org.apache.hudi.connect.HoodieSinkConnector",
+    "tasks.max": "2",
+    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
+    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
+    "value.converter.schemas.enable": "false",
+    "topics": "hudi-test-topic",
+    "hoodie.table.name": "hudi-test-topic",
+    "hoodie.table.type": "COPY_ON_WRITE",
+    "hoodie.base.path": "file:///tmp/hudi-kafka-test",
+    "hoodie.datasource.write.recordkey.field": "volume",
+    "hoodie.datasource.write.partitionpath.field": "date",
+    "hoodie.schemaprovider.class": 
"org.apache.hudi.schema.SchemaRegistryProvider",
+    "hoodie.deltastreamer.schemaprovider.registry.url": 
"http://localhost:8081/subjects/hudi-test-topic/versions/latest";,
+    "hoodie.kafka.commit.interval.secs": 10
+  }
+}
diff --git a/packaging/bundle-validation/kafka/consume.sh 
b/packaging/bundle-validation/kafka/consume.sh
new file mode 100755
index 0000000000..5ae67c3b61
--- /dev/null
+++ b/packaging/bundle-validation/kafka/consume.sh
@@ -0,0 +1,34 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+$CONFLUENT_HOME/bin/connect-distributed 
$CONFLUENT_HOME/etc/kafka/connect-distributed.properties &
+sleep 30
+curl -X POST -H "Content-Type:application/json" -d 
@/opt/bundle-validation/kafka/config-sink.json http://localhost:8083/connectors 
&
+sleep 30
+curl -X DELETE http://localhost:8083/connectors/hudi-sink &
+sleep 10
+
+# validate
+numCommits=$(ls /tmp/hudi-kafka-test/.hoodie/*.commit | wc -l)
+if [ $numCommits -gt 0 ]; then
+  exit 0
+else
+  exit 1
+fi
diff --git a/packaging/bundle-validation/kafka/produce.sh 
b/packaging/bundle-validation/kafka/produce.sh
new file mode 100755
index 0000000000..7f828b5d3b
--- /dev/null
+++ b/packaging/bundle-validation/kafka/produce.sh
@@ -0,0 +1,30 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+kafkaTopicName=hudi-test-topic
+
+# Setup the schema registry
+SCHEMA=$(sed 's|/\*|\n&|g;s|*/|&\n|g' 
/opt/bundle-validation/data/stocks/schema.avsc | sed '/\/\*/,/*\//d' | jq 
tostring)
+curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data 
"{\"schema\": $SCHEMA}" 
http://localhost:8081/subjects/${kafkaTopicName}/versions
+
+# produce data
+cat /opt/bundle-validation/data/stocks/data/batch_1.json 
/opt/bundle-validation/data/stocks/data/batch_2.json | 
$CONFLUENT_HOME/bin/kafka-console-producer \
+--bootstrap-server http://localhost:9092 \
+--topic ${kafkaTopicName}
diff --git a/packaging/bundle-validation/validate.sh 
b/packaging/bundle-validation/validate.sh
index 67abdd5d65..7d40228651 100755
--- a/packaging/bundle-validation/validate.sh
+++ b/packaging/bundle-validation/validate.sh
@@ -32,6 +32,7 @@ ln -sf $JARS_DIR/hudi-hadoop-mr*.jar $JARS_DIR/hadoop-mr.jar
 ln -sf $JARS_DIR/hudi-spark*.jar $JARS_DIR/spark.jar
 ln -sf $JARS_DIR/hudi-utilities-bundle*.jar $JARS_DIR/utilities.jar
 ln -sf $JARS_DIR/hudi-utilities-slim*.jar $JARS_DIR/utilities-slim.jar
+ln -sf $JARS_DIR/hudi-kafka-connect-bundle*.jar $JARS_DIR/kafka-connect.jar
 
 
 ##
@@ -131,26 +132,64 @@ test_utilities_bundle () {
 }
 
 
+##
+# Function to test the kafka-connect bundle.
+# It runs zookeeper, kafka broker, schema registry, and connector worker.
+# After producing and consuming data, it checks successful commit under 
`.hoodie/`
+#
+# 1st arg: path to the hudi-kafka-connect-bundle.jar (for writing data)
+#
+# env vars (defined in container):
+#   CONFLUENT_HOME: path to the confluent community directory
+#   KAFKA_CONNECT_PLUGIN_PATH_LIB_PATH: path to install 
hudi-kafka-connect-bundle.jar
+##
+test_kafka_connect_bundle() {
+    KAFKA_CONNECT_JAR=$1
+    cp $KAFKA_CONNECT_JAR $KAFKA_CONNECT_PLUGIN_PATH_LIB_PATH
+    $CONFLUENT_HOME/bin/zookeeper-server-start 
$CONFLUENT_HOME/etc/kafka/zookeeper.properties &
+    $CONFLUENT_HOME/bin/kafka-server-start 
$CONFLUENT_HOME/etc/kafka/server.properties &
+    sleep 10
+    $CONFLUENT_HOME/bin/schema-registry-start 
$CONFLUENT_HOME/etc/schema-registry/schema-registry.properties &
+    sleep 10
+    $CONFLUENT_HOME/bin/kafka-topics --create --topic hudi-control-topic 
--partitions 1 --replication-factor 1 --bootstrap-server localhost:9092
+    $WORKDIR/kafka/produce.sh
+    $WORKDIR/kafka/consume.sh
+}
+
+
+############################
+# Execute tests
+############################
+
+echo "::warning::validate.sh validating spark & hadoop-mr bundle"
 test_spark_hadoop_mr_bundles
 if [ "$?" -ne 0 ]; then
     exit 1
 fi
+echo "::warning::validate.sh done validating spark & hadoop-mr bundle"
 
 if [[ $SPARK_HOME == *"spark-2.4"* ]] || [[  $SPARK_HOME == *"spark-3.1"* ]]
 then
-  echo "::warning::validate.sh testing utilities bundle"
+  echo "::warning::validate.sh validating utilities bundle"
   test_utilities_bundle $JARS_DIR/utilities.jar
   if [ "$?" -ne 0 ]; then
       exit 1
   fi
-  echo "::warning::validate.sh done testing utilities bundle"
+  echo "::warning::validate.sh done validating utilities bundle"
 else
-  echo "::warning::validate.sh skip testing utilities bundle for non-spark2.4 
& non-spark3.1 build"
+  echo "::warning::validate.sh skip validating utilities bundle for 
non-spark2.4 & non-spark3.1 build"
 fi
 
-echo "::warning::validate.sh testing utilities slim bundle"
+echo "::warning::validate.sh validating utilities slim bundle"
 test_utilities_bundle $JARS_DIR/utilities-slim.jar $JARS_DIR/spark.jar
 if [ "$?" -ne 0 ]; then
     exit 1
 fi
-echo "::warning::validate.sh done testing utilities slim bundle"
+echo "::warning::validate.sh done validating utilities slim bundle"
+
+echo "::warning::validate.sh validating kafka connect bundle"
+test_kafka_connect_bundle $JARS_DIR/kafka-connect.jar
+if [ "$?" -ne 0 ]; then
+    exit 1
+fi
+echo "::warning::validate.sh done validating kafka connect bundle"

Reply via email to