This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new bc4c0fcb3d [HUDI-2673] Add kafka connect bundle to validation test
(#7131)
bc4c0fcb3d is described below
commit bc4c0fcb3d619375a91cc7cfeee3a23d811a77b0
Author: Shiyan Xu <[email protected]>
AuthorDate: Tue Nov 8 02:25:49 2022 +0800
[HUDI-2673] Add kafka connect bundle to validation test (#7131)
---
hudi-kafka-connect/demo/setupKafka.sh | 7 ++--
packaging/bundle-validation/Dockerfile-base | 16 ++++++-
packaging/bundle-validation/ci_run.sh | 11 +++++
packaging/bundle-validation/kafka/config-sink.json | 20 +++++++++
packaging/bundle-validation/kafka/consume.sh | 34 +++++++++++++++
packaging/bundle-validation/kafka/produce.sh | 30 +++++++++++++
packaging/bundle-validation/validate.sh | 49 +++++++++++++++++++---
7 files changed, 157 insertions(+), 10 deletions(-)
diff --git a/hudi-kafka-connect/demo/setupKafka.sh
b/hudi-kafka-connect/demo/setupKafka.sh
index 5c618b2a70..e4e8d2e382 100755
--- a/hudi-kafka-connect/demo/setupKafka.sh
+++ b/hudi-kafka-connect/demo/setupKafka.sh
@@ -1,3 +1,4 @@
+#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -14,8 +15,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-#!/bin/bash
-
#########################
# The command line help #
#########################
@@ -79,11 +78,11 @@ while getopts ":n:b:tf:k:m:r:o:l:p:s:-:" opt; do
recreateTopic="N"
printf "Argument recreate-topic is N (reuse Kafka topic) \n"
;;
- k)
+ f)
rawDataFile="$OPTARG"
printf "Argument raw-file is %s\n" "$rawDataFile"
;;
- f)
+ k)
kafkaTopicName="$OPTARG"
printf "Argument kafka-topic is %s\n" "$kafkaTopicName"
;;
diff --git a/packaging/bundle-validation/Dockerfile-base
b/packaging/bundle-validation/Dockerfile-base
index 1e782e08d5..81df6ce0c0 100644
--- a/packaging/bundle-validation/Dockerfile-base
+++ b/packaging/bundle-validation/Dockerfile-base
@@ -16,7 +16,7 @@
#
FROM adoptopenjdk/openjdk8:alpine
-RUN apk add --no-cache --upgrade bash
+RUN apk add --no-cache --upgrade bash curl jq
RUN mkdir /opt/bundle-validation
ENV WORKDIR=/opt/bundle-validation
@@ -27,6 +27,8 @@ ARG HIVE_VERSION=3.1.3
ARG DERBY_VERSION=10.14.1.0
ARG SPARK_VERSION=3.1.3
ARG SPARK_HADOOP_VERSION=2.7
+ARG CONFLUENT_VERSION=5.5.12
+ARG KAFKA_CONNECT_HDFS_VERSION=10.1.13
RUN wget
https://archive.apache.org/dist/hadoop/common/hadoop-$HADOOP_VERSION/hadoop-$HADOOP_VERSION.tar.gz
-P "$WORKDIR" \
&& tar -xf $WORKDIR/hadoop-$HADOOP_VERSION.tar.gz -C $WORKDIR/ \
@@ -47,3 +49,15 @@ RUN wget
https://archive.apache.org/dist/spark/spark-$SPARK_VERSION/spark-$SPARK
&& tar -xf
$WORKDIR/spark-$SPARK_VERSION-bin-hadoop$SPARK_HADOOP_VERSION.tgz -C $WORKDIR/ \
&& rm $WORKDIR/spark-$SPARK_VERSION-bin-hadoop$SPARK_HADOOP_VERSION.tgz
ENV SPARK_HOME=$WORKDIR/spark-$SPARK_VERSION-bin-hadoop$SPARK_HADOOP_VERSION
+
+RUN wget
https://packages.confluent.io/archive/${CONFLUENT_VERSION%.*}/confluent-community-$CONFLUENT_VERSION-2.12.tar.gz
-P "$WORKDIR" \
+ && tar -xf $WORKDIR/confluent-community-$CONFLUENT_VERSION-2.12.tar.gz -C
$WORKDIR/ \
+ && rm $WORKDIR/confluent-community-$CONFLUENT_VERSION-2.12.tar.gz
+ENV CONFLUENT_HOME=$WORKDIR/confluent-$CONFLUENT_VERSION
+
+RUN wget
https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-hdfs/versions/$KAFKA_CONNECT_HDFS_VERSION/confluentinc-kafka-connect-hdfs-$KAFKA_CONNECT_HDFS_VERSION.zip
-P "$WORKDIR" \
+ && mkdir $WORKDIR/kafka-connectors \
+ && unzip
$WORKDIR/confluentinc-kafka-connect-hdfs-$KAFKA_CONNECT_HDFS_VERSION.zip -d
$WORKDIR/kafka-connectors/ \
+ && rm
$WORKDIR/confluentinc-kafka-connect-hdfs-$KAFKA_CONNECT_HDFS_VERSION.zip \
+ && printf "\nplugin.path=$WORKDIR/kafka-connectors\n" >>
$CONFLUENT_HOME/etc/kafka/connect-distributed.properties
+ENV
KAFKA_CONNECT_PLUGIN_PATH_LIB_PATH=$WORKDIR/kafka-connectors/confluentinc-kafka-connect-hdfs-$KAFKA_CONNECT_HDFS_VERSION/lib
diff --git a/packaging/bundle-validation/ci_run.sh
b/packaging/bundle-validation/ci_run.sh
index 4cc795f405..46ef80964b 100755
--- a/packaging/bundle-validation/ci_run.sh
+++ b/packaging/bundle-validation/ci_run.sh
@@ -36,6 +36,8 @@ if [[ ${SPARK_PROFILE} == 'spark2.4' ]]; then
DERBY_VERSION=10.10.2.0
SPARK_VERSION=2.4.8
SPARK_HADOOP_VERSION=2.7
+ CONFLUENT_VERSION=5.5.12
+ KAFKA_CONNECT_HDFS_VERSION=10.1.13
IMAGE_TAG=spark248hive239
elif [[ ${SPARK_PROFILE} == 'spark3.1' ]]; then
HADOOP_VERSION=2.7.7
@@ -43,6 +45,8 @@ elif [[ ${SPARK_PROFILE} == 'spark3.1' ]]; then
DERBY_VERSION=10.14.1.0
SPARK_VERSION=3.1.3
SPARK_HADOOP_VERSION=2.7
+ CONFLUENT_VERSION=5.5.12
+ KAFKA_CONNECT_HDFS_VERSION=10.1.13
IMAGE_TAG=spark313hive313
elif [[ ${SPARK_PROFILE} == 'spark3.2' ]]; then
HADOOP_VERSION=2.7.7
@@ -50,6 +54,8 @@ elif [[ ${SPARK_PROFILE} == 'spark3.2' ]]; then
DERBY_VERSION=10.14.1.0
SPARK_VERSION=3.2.2
SPARK_HADOOP_VERSION=2.7
+ CONFLUENT_VERSION=5.5.12
+ KAFKA_CONNECT_HDFS_VERSION=10.1.13
IMAGE_TAG=spark322hive313
elif [[ ${SPARK_PROFILE} == 'spark3.3' ]]; then
HADOOP_VERSION=2.7.7
@@ -57,6 +63,8 @@ elif [[ ${SPARK_PROFILE} == 'spark3.3' ]]; then
DERBY_VERSION=10.14.1.0
SPARK_VERSION=3.3.0
SPARK_HADOOP_VERSION=2
+ CONFLUENT_VERSION=5.5.12
+ KAFKA_CONNECT_HDFS_VERSION=10.1.13
IMAGE_TAG=spark330hive313
fi
@@ -67,6 +75,7 @@ cp
${GITHUB_WORKSPACE}/packaging/hudi-hadoop-mr-bundle/target/hudi-*-$HUDI_VERSI
cp
${GITHUB_WORKSPACE}/packaging/hudi-spark-bundle/target/hudi-*-$HUDI_VERSION.jar
$TMP_JARS_DIR/
cp
${GITHUB_WORKSPACE}/packaging/hudi-utilities-bundle/target/hudi-*-$HUDI_VERSION.jar
$TMP_JARS_DIR/
cp
${GITHUB_WORKSPACE}/packaging/hudi-utilities-slim-bundle/target/hudi-*-$HUDI_VERSION.jar
$TMP_JARS_DIR/
+cp
${GITHUB_WORKSPACE}/packaging/hudi-kafka-connect-bundle/target/hudi-*-$HUDI_VERSION.jar
$TMP_JARS_DIR/
echo 'Validating jars below:'
ls -l $TMP_JARS_DIR
@@ -84,6 +93,8 @@ docker build \
--build-arg DERBY_VERSION=$DERBY_VERSION \
--build-arg SPARK_VERSION=$SPARK_VERSION \
--build-arg SPARK_HADOOP_VERSION=$SPARK_HADOOP_VERSION \
+--build-arg CONFLUENT_VERSION=$CONFLUENT_VERSION \
+--build-arg KAFKA_CONNECT_HDFS_VERSION=$KAFKA_CONNECT_HDFS_VERSION \
--build-arg IMAGE_TAG=$IMAGE_TAG \
-t hudi-ci-bundle-validation:$IMAGE_TAG \
.
diff --git a/packaging/bundle-validation/kafka/config-sink.json
b/packaging/bundle-validation/kafka/config-sink.json
new file mode 100644
index 0000000000..0318ec961a
--- /dev/null
+++ b/packaging/bundle-validation/kafka/config-sink.json
@@ -0,0 +1,20 @@
+{
+ "name": "hudi-sink",
+ "config": {
+ "bootstrap.servers": "localhost:9092",
+ "connector.class": "org.apache.hudi.connect.HoodieSinkConnector",
+ "tasks.max": "2",
+ "key.converter": "org.apache.kafka.connect.storage.StringConverter",
+ "value.converter": "org.apache.kafka.connect.storage.StringConverter",
+ "value.converter.schemas.enable": "false",
+ "topics": "hudi-test-topic",
+ "hoodie.table.name": "hudi-test-topic",
+ "hoodie.table.type": "COPY_ON_WRITE",
+ "hoodie.base.path": "file:///tmp/hudi-kafka-test",
+ "hoodie.datasource.write.recordkey.field": "volume",
+ "hoodie.datasource.write.partitionpath.field": "date",
+ "hoodie.schemaprovider.class":
"org.apache.hudi.schema.SchemaRegistryProvider",
+ "hoodie.deltastreamer.schemaprovider.registry.url":
"http://localhost:8081/subjects/hudi-test-topic/versions/latest",
+ "hoodie.kafka.commit.interval.secs": 10
+ }
+}
diff --git a/packaging/bundle-validation/kafka/consume.sh
b/packaging/bundle-validation/kafka/consume.sh
new file mode 100755
index 0000000000..5ae67c3b61
--- /dev/null
+++ b/packaging/bundle-validation/kafka/consume.sh
@@ -0,0 +1,34 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+$CONFLUENT_HOME/bin/connect-distributed
$CONFLUENT_HOME/etc/kafka/connect-distributed.properties &
+sleep 30
+curl -X POST -H "Content-Type:application/json" -d
@/opt/bundle-validation/kafka/config-sink.json http://localhost:8083/connectors
&
+sleep 30
+curl -X DELETE http://localhost:8083/connectors/hudi-sink &
+sleep 10
+
+# validate
+numCommits=$(ls /tmp/hudi-kafka-test/.hoodie/*.commit | wc -l)
+if [ $numCommits -gt 0 ]; then
+ exit 0
+else
+ exit 1
+fi
diff --git a/packaging/bundle-validation/kafka/produce.sh
b/packaging/bundle-validation/kafka/produce.sh
new file mode 100755
index 0000000000..7f828b5d3b
--- /dev/null
+++ b/packaging/bundle-validation/kafka/produce.sh
@@ -0,0 +1,30 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+kafkaTopicName=hudi-test-topic
+
+# Setup the schema registry
+SCHEMA=$(sed 's|/\*|\n&|g;s|*/|&\n|g'
/opt/bundle-validation/data/stocks/schema.avsc | sed '/\/\*/,/*\//d' | jq
tostring)
+curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data
"{\"schema\": $SCHEMA}"
http://localhost:8081/subjects/${kafkaTopicName}/versions
+
+# produce data
+cat /opt/bundle-validation/data/stocks/data/batch_1.json
/opt/bundle-validation/data/stocks/data/batch_2.json |
$CONFLUENT_HOME/bin/kafka-console-producer \
+--bootstrap-server http://localhost:9092 \
+--topic ${kafkaTopicName}
diff --git a/packaging/bundle-validation/validate.sh
b/packaging/bundle-validation/validate.sh
index 67abdd5d65..7d40228651 100755
--- a/packaging/bundle-validation/validate.sh
+++ b/packaging/bundle-validation/validate.sh
@@ -32,6 +32,7 @@ ln -sf $JARS_DIR/hudi-hadoop-mr*.jar $JARS_DIR/hadoop-mr.jar
ln -sf $JARS_DIR/hudi-spark*.jar $JARS_DIR/spark.jar
ln -sf $JARS_DIR/hudi-utilities-bundle*.jar $JARS_DIR/utilities.jar
ln -sf $JARS_DIR/hudi-utilities-slim*.jar $JARS_DIR/utilities-slim.jar
+ln -sf $JARS_DIR/hudi-kafka-connect-bundle*.jar $JARS_DIR/kafka-connect.jar
##
@@ -131,26 +132,64 @@ test_utilities_bundle () {
}
+##
+# Function to test the kafka-connect bundle.
+# It runs zookeeper, kafka broker, schema registry, and connector worker.
+# After producing and consuming data, it checks successful commit under
`.hoodie/`
+#
+# 1st arg: path to the hudi-kafka-connect-bundle.jar (for writing data)
+#
+# env vars (defined in container):
+# CONFLUENT_HOME: path to the confluent community directory
+# KAFKA_CONNECT_PLUGIN_PATH_LIB_PATH: path to install
hudi-kafka-connect-bundle.jar
+##
+test_kafka_connect_bundle() {
+ KAFKA_CONNECT_JAR=$1
+ cp $KAFKA_CONNECT_JAR $KAFKA_CONNECT_PLUGIN_PATH_LIB_PATH
+ $CONFLUENT_HOME/bin/zookeeper-server-start
$CONFLUENT_HOME/etc/kafka/zookeeper.properties &
+ $CONFLUENT_HOME/bin/kafka-server-start
$CONFLUENT_HOME/etc/kafka/server.properties &
+ sleep 10
+ $CONFLUENT_HOME/bin/schema-registry-start
$CONFLUENT_HOME/etc/schema-registry/schema-registry.properties &
+ sleep 10
+ $CONFLUENT_HOME/bin/kafka-topics --create --topic hudi-control-topic
--partitions 1 --replication-factor 1 --bootstrap-server localhost:9092
+ $WORKDIR/kafka/produce.sh
+ $WORKDIR/kafka/consume.sh
+}
+
+
+############################
+# Execute tests
+############################
+
+echo "::warning::validate.sh validating spark & hadoop-mr bundle"
test_spark_hadoop_mr_bundles
if [ "$?" -ne 0 ]; then
exit 1
fi
+echo "::warning::validate.sh done validating spark & hadoop-mr bundle"
if [[ $SPARK_HOME == *"spark-2.4"* ]] || [[ $SPARK_HOME == *"spark-3.1"* ]]
then
- echo "::warning::validate.sh testing utilities bundle"
+ echo "::warning::validate.sh validating utilities bundle"
test_utilities_bundle $JARS_DIR/utilities.jar
if [ "$?" -ne 0 ]; then
exit 1
fi
- echo "::warning::validate.sh done testing utilities bundle"
+ echo "::warning::validate.sh done validating utilities bundle"
else
- echo "::warning::validate.sh skip testing utilities bundle for non-spark2.4
& non-spark3.1 build"
+ echo "::warning::validate.sh skip validating utilities bundle for
non-spark2.4 & non-spark3.1 build"
fi
-echo "::warning::validate.sh testing utilities slim bundle"
+echo "::warning::validate.sh validating utilities slim bundle"
test_utilities_bundle $JARS_DIR/utilities-slim.jar $JARS_DIR/spark.jar
if [ "$?" -ne 0 ]; then
exit 1
fi
-echo "::warning::validate.sh done testing utilities slim bundle"
+echo "::warning::validate.sh done validating utilities slim bundle"
+
+echo "::warning::validate.sh validating kafka connect bundle"
+test_kafka_connect_bundle $JARS_DIR/kafka-connect.jar
+if [ "$?" -ne 0 ]; then
+ exit 1
+fi
+echo "::warning::validate.sh done validating kafka connect bundle"