This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a0fb9b2250 [HUDI-5404] Add flink bundle validation (#7500)
a0fb9b2250 is described below
commit a0fb9b22507d74f6d678624008c1727093cf56a0
Author: Shiyan Xu <[email protected]>
AuthorDate: Mon Dec 19 15:37:48 2022 +0800
[HUDI-5404] Add flink bundle validation (#7500)
---
.github/workflows/bot.yml | 55 ++++++++++++++++++++++------
packaging/bundle-validation/Dockerfile | 2 +
packaging/bundle-validation/Dockerfile-base | 6 +++
packaging/bundle-validation/ci_run.sh | 16 +++++---
packaging/bundle-validation/flink/compact.sh | 33 +++++++++++++++++
packaging/bundle-validation/flink/insert.sql | 37 +++++++++++++++++++
packaging/bundle-validation/validate.sh | 35 ++++++++++++++++++
7 files changed, 167 insertions(+), 17 deletions(-)
diff --git a/.github/workflows/bot.yml b/.github/workflows/bot.yml
index e05ac87bc3..7701f76eb7 100644
--- a/.github/workflows/bot.yml
+++ b/.github/workflows/bot.yml
@@ -46,21 +46,13 @@ jobs:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
run:
- mvn clean install -Pintegration-tests -D"$SCALA_PROFILE"
-D"$SPARK_PROFILE" -DskipTests=true $MVN_ARGS
+ mvn clean install -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -pl
hudi-examples/hudi-examples-spark,hudi-spark-datasource/hudi-spark -am
-DskipTests=true $MVN_ARGS
- name: Quickstart Test
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
run:
- mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE"
-DfailIfNoTests=false -pl
hudi-examples/hudi-examples-java,hudi-examples/hudi-examples-spark $MVN_ARGS
- - name: IT - Bundle Validation
- env:
- SCALA_PROFILE: ${{ matrix.scalaProfile }}
- SPARK_PROFILE: ${{ matrix.sparkProfile }}
- if: ${{ !endsWith(env.SPARK_PROFILE, '2.4') }} # skip test spark 2.4
as it's covered by Azure CI
- run: |
- HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q
-DforceStdout)
- ./packaging/bundle-validation/ci_run.sh $HUDI_VERSION
+ mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -pl
hudi-examples/hudi-examples-spark $MVN_ARGS
- name: UT - Common & Spark
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
@@ -87,11 +79,50 @@ jobs:
architecture: x64
- name: Build Project
env:
+ SCALA_PROFILE: 'scala-2.12'
FLINK_PROFILE: ${{ matrix.flinkProfile }}
run:
- mvn clean install -Pintegration-tests -Dscala-2.12
-D"$FLINK_PROFILE" -Davro.version=1.10.0 -DskipTests=true $MVN_ARGS
+ mvn clean install -D"$SCALA_PROFILE" -D"$FLINK_PROFILE" -pl
hudi-examples/hudi-examples-flink -am -Davro.version=1.10.0 -DskipTests=true
$MVN_ARGS
- name: Quickstart Test
env:
+ SCALA_PROFILE: 'scala-2.12'
FLINK_PROFILE: ${{ matrix.flinkProfile }}
run:
- mvn test -Punit-tests -Dscala-2.12 -D"$FLINK_PROFILE"
-DfailIfNoTests=false -pl hudi-examples/hudi-examples-flink $MVN_ARGS
+ mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$FLINK_PROFILE" -pl
hudi-examples/hudi-examples-flink $MVN_ARGS
+
+ validate-bundles:
+ runs-on: ubuntu-latest
+ strategy:
+ matrix:
+ include:
+ - flinkProfile: 'flink1.15'
+ sparkProfile: 'spark3.3'
+ - flinkProfile: 'flink1.14'
+ sparkProfile: 'spark3.2'
+ - flinkProfile: 'flink1.13'
+ sparkProfile: 'spark3.1'
+ steps:
+ - uses: actions/checkout@v2
+ - name: Set up JDK 8
+ uses: actions/setup-java@v2
+ with:
+ java-version: '8'
+ distribution: 'adopt'
+ architecture: x64
+ - name: Build Project
+ env:
+ FLINK_PROFILE: ${{ matrix.flinkProfile }}
+ SPARK_PROFILE: ${{ matrix.sparkProfile }}
+ SCALA_PROFILE: 'scala-2.12'
+ run: |
+ mvn clean package -D"$SCALA_PROFILE" -D"$SPARK_PROFILE"
-Pintegration-tests -DskipTests=true $MVN_ARGS
+ mvn clean package -D"$SCALA_PROFILE" -D"$FLINK_PROFILE" -pl
packaging/hudi-flink-bundle -am -Davro.version=1.10.0 -DskipTests=true $MVN_ARGS
+ - name: IT - Bundle Validation
+ env:
+ FLINK_PROFILE: ${{ matrix.flinkProfile }}
+ SPARK_PROFILE: ${{ matrix.sparkProfile }}
+ SCALA_PROFILE: 'scala-2.12'
+ run: |
+ HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q
-DforceStdout)
+ ./packaging/bundle-validation/ci_run.sh $HUDI_VERSION
+
diff --git a/packaging/bundle-validation/Dockerfile
b/packaging/bundle-validation/Dockerfile
index 3c5500940c..27171da662 100644
--- a/packaging/bundle-validation/Dockerfile
+++ b/packaging/bundle-validation/Dockerfile
@@ -27,3 +27,5 @@ RUN cp $DERBY_HOME/lib/derbyclient.jar $SPARK_HOME/jars/
RUN cp conf/spark-defaults.conf $SPARK_HOME/conf/
RUN if [[ $SPARK_HOME == *"spark-3.2"* ]] || [[ $SPARK_HOME == *"spark-3.3"*
]]; \
then printf "\nspark.sql.catalog.spark_catalog
org.apache.spark.sql.hudi.catalog.HoodieCatalog\n" >>
$SPARK_HOME/conf/spark-defaults.conf; fi
+RUN printf "\ntaskmanager.numberOfTaskSlots: 2\n" >>
$FLINK_HOME/conf/flink-conf.yaml
+RUN printf "\nlocalhost\n" >> $FLINK_HOME/conf/workers
diff --git a/packaging/bundle-validation/Dockerfile-base
b/packaging/bundle-validation/Dockerfile-base
index 81df6ce0c0..4e730f7229 100644
--- a/packaging/bundle-validation/Dockerfile-base
+++ b/packaging/bundle-validation/Dockerfile-base
@@ -25,6 +25,7 @@ WORKDIR $WORKDIR
ARG HADOOP_VERSION=2.7.7
ARG HIVE_VERSION=3.1.3
ARG DERBY_VERSION=10.14.1.0
+ARG FLINK_VERSION=1.13.6
ARG SPARK_VERSION=3.1.3
ARG SPARK_HADOOP_VERSION=2.7
ARG CONFLUENT_VERSION=5.5.12
@@ -45,6 +46,11 @@ RUN wget
https://archive.apache.org/dist/db/derby/db-derby-$DERBY_VERSION/db-der
&& rm $WORKDIR/db-derby-$DERBY_VERSION-bin.tar.gz
ENV DERBY_HOME=$WORKDIR/db-derby-$DERBY_VERSION-bin
+RUN wget
https://archive.apache.org/dist/flink/flink-$FLINK_VERSION/flink-$FLINK_VERSION-bin-scala_2.12.tgz
-P "$WORKDIR" \
+ && tar -xf $WORKDIR/flink-$FLINK_VERSION-bin-scala_2.12.tgz -C $WORKDIR/ \
+ && rm $WORKDIR/flink-$FLINK_VERSION-bin-scala_2.12.tgz
+ENV FLINK_HOME=$WORKDIR/flink-$FLINK_VERSION
+
RUN wget
https://archive.apache.org/dist/spark/spark-$SPARK_VERSION/spark-$SPARK_VERSION-bin-hadoop$SPARK_HADOOP_VERSION.tgz
-P "$WORKDIR" \
&& tar -xf
$WORKDIR/spark-$SPARK_VERSION-bin-hadoop$SPARK_HADOOP_VERSION.tgz -C $WORKDIR/ \
&& rm $WORKDIR/spark-$SPARK_VERSION-bin-hadoop$SPARK_HADOOP_VERSION.tgz
diff --git a/packaging/bundle-validation/ci_run.sh
b/packaging/bundle-validation/ci_run.sh
index a1cd0a45ae..d03f8f2469 100755
--- a/packaging/bundle-validation/ci_run.sh
+++ b/packaging/bundle-validation/ci_run.sh
@@ -34,48 +34,53 @@ if [[ ${SPARK_PROFILE} == 'spark2.4' ]]; then
HADOOP_VERSION=2.7.7
HIVE_VERSION=2.3.9
DERBY_VERSION=10.10.2.0
+ FLINK_VERSION=1.13.6
SPARK_VERSION=2.4.8
SPARK_HADOOP_VERSION=2.7
CONFLUENT_VERSION=5.5.12
KAFKA_CONNECT_HDFS_VERSION=10.1.13
- IMAGE_TAG=spark248hive239
+ IMAGE_TAG=flink1136hive239spark248
elif [[ ${SPARK_PROFILE} == 'spark3.1' ]]; then
HADOOP_VERSION=2.7.7
HIVE_VERSION=3.1.3
DERBY_VERSION=10.14.1.0
+ FLINK_VERSION=1.13.6
SPARK_VERSION=3.1.3
SPARK_HADOOP_VERSION=2.7
CONFLUENT_VERSION=5.5.12
KAFKA_CONNECT_HDFS_VERSION=10.1.13
- IMAGE_TAG=spark313hive313
+ IMAGE_TAG=flink1136hive313spark313
elif [[ ${SPARK_PROFILE} == 'spark3.2' ]]; then
HADOOP_VERSION=2.7.7
HIVE_VERSION=3.1.3
DERBY_VERSION=10.14.1.0
+ FLINK_VERSION=1.14.6
SPARK_VERSION=3.2.3
SPARK_HADOOP_VERSION=2.7
CONFLUENT_VERSION=5.5.12
KAFKA_CONNECT_HDFS_VERSION=10.1.13
- IMAGE_TAG=spark323hive313
+ IMAGE_TAG=flink1146hive313spark323
elif [[ ${SPARK_PROFILE} == 'spark3.3' ]]; then
HADOOP_VERSION=2.7.7
HIVE_VERSION=3.1.3
DERBY_VERSION=10.14.1.0
+ FLINK_VERSION=1.15.3
SPARK_VERSION=3.3.1
SPARK_HADOOP_VERSION=2
CONFLUENT_VERSION=5.5.12
KAFKA_CONNECT_HDFS_VERSION=10.1.13
- IMAGE_TAG=spark331hive313
+ IMAGE_TAG=flink1153hive313spark331
fi
# Copy bundle jars to temp dir for mounting
TMP_JARS_DIR=/tmp/jars/$(date +%s)
mkdir -p $TMP_JARS_DIR
+cp
${GITHUB_WORKSPACE}/packaging/hudi-flink-bundle/target/hudi-*-$HUDI_VERSION.jar
$TMP_JARS_DIR/
cp
${GITHUB_WORKSPACE}/packaging/hudi-hadoop-mr-bundle/target/hudi-*-$HUDI_VERSION.jar
$TMP_JARS_DIR/
+cp
${GITHUB_WORKSPACE}/packaging/hudi-kafka-connect-bundle/target/hudi-*-$HUDI_VERSION.jar
$TMP_JARS_DIR/
cp
${GITHUB_WORKSPACE}/packaging/hudi-spark-bundle/target/hudi-*-$HUDI_VERSION.jar
$TMP_JARS_DIR/
cp
${GITHUB_WORKSPACE}/packaging/hudi-utilities-bundle/target/hudi-*-$HUDI_VERSION.jar
$TMP_JARS_DIR/
cp
${GITHUB_WORKSPACE}/packaging/hudi-utilities-slim-bundle/target/hudi-*-$HUDI_VERSION.jar
$TMP_JARS_DIR/
-cp
${GITHUB_WORKSPACE}/packaging/hudi-kafka-connect-bundle/target/hudi-*-$HUDI_VERSION.jar
$TMP_JARS_DIR/
echo 'Validating jars below:'
ls -l $TMP_JARS_DIR
@@ -91,6 +96,7 @@ docker build \
--build-arg HADOOP_VERSION=$HADOOP_VERSION \
--build-arg HIVE_VERSION=$HIVE_VERSION \
--build-arg DERBY_VERSION=$DERBY_VERSION \
+--build-arg FLINK_VERSION=$FLINK_VERSION \
--build-arg SPARK_VERSION=$SPARK_VERSION \
--build-arg SPARK_HADOOP_VERSION=$SPARK_HADOOP_VERSION \
--build-arg CONFLUENT_VERSION=$CONFLUENT_VERSION \
diff --git a/packaging/bundle-validation/flink/compact.sh
b/packaging/bundle-validation/flink/compact.sh
new file mode 100755
index 0000000000..ca24832422
--- /dev/null
+++ b/packaging/bundle-validation/flink/compact.sh
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+HUDI_FLINK_JAR=$1
+$FLINK_HOME/bin/flink run -c \
+org.apache.hudi.sink.compact.HoodieFlinkCompactor $HUDI_FLINK_JAR \
+--path /tmp/hudi-flink-bundle-test \
+--schedule
+
+# validate
+numCommits=$(ls /tmp/hudi-flink-bundle-test/.hoodie/*.commit | wc -l)
+if [ $numCommits -gt 0 ]; then
+ exit 0
+else
+ echo "::warning::compact.sh Failed to validate flink bundle: no commit file
generated after running compaction."
+ exit 1
+fi
diff --git a/packaging/bundle-validation/flink/insert.sql
b/packaging/bundle-validation/flink/insert.sql
new file mode 100644
index 0000000000..25efeeecba
--- /dev/null
+++ b/packaging/bundle-validation/flink/insert.sql
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+CREATE TABLE t1
+(
+ uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
+ name VARCHAR(10),
+ age INT,
+ ts TIMESTAMP(3),
+ `partition` VARCHAR(20)
+) PARTITIONED BY (`partition`)
+WITH (
+ 'connector' = 'hudi',
+ 'table.type' = 'MERGE_ON_READ',
+ 'path' = '/tmp/hudi-flink-bundle-test'
+);
+
+-- insert data using values
+INSERT INTO t1
+VALUES ('id1', 'Danny', 23, TIMESTAMP '1970-01-01 00:00:01', 'par1'),
+ ('id8', 'Han', 56, TIMESTAMP '1970-01-01 00:00:08', 'par4');
diff --git a/packaging/bundle-validation/validate.sh
b/packaging/bundle-validation/validate.sh
index 7d40228651..7ad9d25942 100755
--- a/packaging/bundle-validation/validate.sh
+++ b/packaging/bundle-validation/validate.sh
@@ -29,6 +29,7 @@ WORKDIR=/opt/bundle-validation
JARS_DIR=${WORKDIR}/jars
# link the jar names to easier to use names
ln -sf $JARS_DIR/hudi-hadoop-mr*.jar $JARS_DIR/hadoop-mr.jar
+ln -sf $JARS_DIR/hudi-flink*.jar $JARS_DIR/flink.jar
ln -sf $JARS_DIR/hudi-spark*.jar $JARS_DIR/spark.jar
ln -sf $JARS_DIR/hudi-utilities-bundle*.jar $JARS_DIR/utilities.jar
ln -sf $JARS_DIR/hudi-utilities-slim*.jar $JARS_DIR/utilities-slim.jar
@@ -47,7 +48,9 @@ test_spark_hadoop_mr_bundles () {
echo "::warning::validate.sh setting up hive metastore for spark &
hadoop-mr bundles validation"
$DERBY_HOME/bin/startNetworkServer -h 0.0.0.0 &
+ local DERBY_PID=$!
$HIVE_HOME/bin/hiveserver2 --hiveconf
hive.aux.jars.path=$JARS_DIR/hadoop-mr.jar &
+ local HIVE_PID=$!
echo "::warning::validate.sh Writing sample data via Spark DataSource and
run Hive Sync..."
$SPARK_HOME/bin/spark-shell --jars $JARS_DIR/spark.jar <
$WORKDIR/spark_hadoop_mr/write.scala
@@ -73,6 +76,7 @@ test_spark_hadoop_mr_bundles () {
exit 1
fi
echo "::warning::validate.sh spark & hadoop-mr bundles validation was
successful."
+ kill $DERBY_PID $HIVE_PID
}
@@ -132,6 +136,25 @@ test_utilities_bundle () {
}
+##
+# Function to test the flink bundle.
+#
+# env vars (defined in container):
+# HADOOP_HOME: path to the hadoop directory
+# FLINK_HOME: path to the flink directory
+##
+test_flink_bundle() {
+ export HADOOP_CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath)
+ $FLINK_HOME/bin/start-cluster.sh
+ $FLINK_HOME/bin/sql-client.sh -j $JARS_DIR/flink.jar -f
$WORKDIR/flink/insert.sql
+ $WORKDIR/flink/compact.sh $JARS_DIR/flink.jar
+ local EXIT_CODE=$?
+ $FLINK_HOME/bin/stop-cluster.sh
+ unset HADOOP_CLASSPATH
+ exit $EXIT_CODE
+}
+
+
##
# Function to test the kafka-connect bundle.
# It runs zookeeper, kafka broker, schema registry, and connector worker.
@@ -147,13 +170,18 @@ test_kafka_connect_bundle() {
KAFKA_CONNECT_JAR=$1
cp $KAFKA_CONNECT_JAR $KAFKA_CONNECT_PLUGIN_PATH_LIB_PATH
$CONFLUENT_HOME/bin/zookeeper-server-start
$CONFLUENT_HOME/etc/kafka/zookeeper.properties &
+ local ZOOKEEPER_PID=$!
$CONFLUENT_HOME/bin/kafka-server-start
$CONFLUENT_HOME/etc/kafka/server.properties &
+ local KAFKA_SERVER_PID=$!
sleep 10
$CONFLUENT_HOME/bin/schema-registry-start
$CONFLUENT_HOME/etc/schema-registry/schema-registry.properties &
+ local SCHEMA_REG_PID=$!
sleep 10
$CONFLUENT_HOME/bin/kafka-topics --create --topic hudi-control-topic
--partitions 1 --replication-factor 1 --bootstrap-server localhost:9092
$WORKDIR/kafka/produce.sh
$WORKDIR/kafka/consume.sh
+ local EXIT_CODE=$?
+ kill $ZOOKEEPER_PID $KAFKA_SERVER_PID $SCHEMA_REG_PID
}
@@ -187,6 +215,13 @@ if [ "$?" -ne 0 ]; then
fi
echo "::warning::validate.sh done validating utilities slim bundle"
+echo "::warning::validate.sh validating flink bundle"
+test_flink_bundle
+if [ "$?" -ne 0 ]; then
+ exit 1
+fi
+echo "::warning::validate.sh done validating flink bundle"
+
echo "::warning::validate.sh validating kafka connect bundle"
test_kafka_connect_bundle $JARS_DIR/kafka-connect.jar
if [ "$?" -ne 0 ]; then