This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ddaef8feddb [HUDI-5101] Adding spark-structured streaming test support
via spark-submit job (#7074)
ddaef8feddb is described below
commit ddaef8feddb222f609a39a16688532cf8821c4e1
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sat Mar 9 09:33:10 2024 -0800
[HUDI-5101] Adding spark-structured streaming test support via spark-submit
job (#7074)
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../streaming/StructuredStreamingSinkUtil.java | 168 +++++++++++++++++++++
.../StructuredStreamingSinkTestWriter.scala | 104 +++++++++++++
2 files changed, 272 insertions(+)
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/streaming/StructuredStreamingSinkUtil.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/streaming/StructuredStreamingSinkUtil.java
new file mode 100644
index 00000000000..f6fec62cb3b
--- /dev/null
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/streaming/StructuredStreamingSinkUtil.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.integ.testsuite.streaming;
+
+import org.apache.hudi.exception.HoodieException;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Saprk-submit to test spark streaming
+ *
+ * Sample command.
+ * ./bin/spark-submit --master local[2] --driver-memory 1g --executor-memory
1g \
+ * --class org.apache.hudi.streaming.StructuredStreamingSinkUtil PATH TO
hudi-integ-test-bundle-0.13.0-SNAPSHOT.jar \
+ * --spark-master local[2] \
+ * --source-path /tmp/parquet_ny/ \
+ * --target-path /tmp/hudi_streaming_kafka10/MERGE_ON_READ3/ \
+ * --checkpoint-path /tmp/hudi_streaming_kafka10/checkpoint_mor3/ \
+ * --table-type COPY_ON_WRITE \
+ * --partition-field date_col \
+ * --record-key-field tpep_pickup_datetime \
+ * --pre-combine-field tpep_dropoff_datetime \
+ * --table-name test_tbl
+ *
+ * Ensure "source-path" has parquet data.
+ */
+public class StructuredStreamingSinkUtil implements Serializable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(StructuredStreamingSinkUtil.class);
+
+ private transient JavaSparkContext jsc;
+ private SparkSession sparkSession;
+ private Config cfg;
+
+ public StructuredStreamingSinkUtil(JavaSparkContext jsc, Config cfg) {
+ this.jsc = jsc;
+ this.sparkSession =
SparkSession.builder().config(jsc.getConf()).getOrCreate();
+ this.cfg = cfg;
+ }
+
+ public static class Config implements Serializable {
+ @Parameter(names = {"--source-path", "-sp"}, description = "Source path to
consume data from", required = true)
+ public String sourcePath = null;
+
+ @Parameter(names = {"--target-path", "-tp"}, description = "Target path of
the table of interest.", required = true)
+ public String targetPath = null;
+
+ @Parameter(names = {"--table-type", "-ty"}, description = "Target path of
the table of interest.", required = true)
+ public String tableType = "COPY_ON_WRITE";
+
+ @Parameter(names = {"--checkpoint-path", "-cp"}, description = "Checkppint
path of the table of interest", required = true)
+ public String checkpointPath = null;
+
+ @Parameter(names = {"--partition-field", "-pp"}, description =
"Partitioning field", required = true)
+ public String partitionField = null;
+
+ @Parameter(names = {"--record-key-field", "-rk"}, description = "record
key field", required = true)
+ public String recordKeyField = null;
+
+ @Parameter(names = {"--pre-combine-field", "-pc"}, description =
"Precombine field", required = true)
+ public String preCombineField = null;
+
+ @Parameter(names = {"--table-name", "-tn"}, description = "Table name",
required = true)
+ public String tableName = null;
+
+ @Parameter(names = {"--disable-metadata", "-dmdt"}, description = "Disable
metadata while querying", required = false)
+ public Boolean disableMetadata = false;
+
+ @Parameter(names = {"--spark-master", "-ms"}, description = "Spark
master", required = false)
+ public String sparkMaster = null;
+
+ @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory
to use", required = false)
+ public String sparkMemory = "1g";
+
+ @Parameter(names = {"--help", "-h"}, help = true)
+ public Boolean help = false;
+
+ }
+
+ public static void main(String[] args) {
+ final Config cfg = new Config();
+ JCommander cmd = new JCommander(cfg, null, args);
+
+ if (cfg.help || args.length == 0) {
+ cmd.usage();
+ System.exit(1);
+ }
+
+ SparkConf sparkConf = buildSparkConf("Spark-structured-streaming-test",
cfg.sparkMaster);
+ sparkConf.set("spark.executor.memory", cfg.sparkMemory);
+ JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+
+ try {
+ StructuredStreamingSinkUtil streamingSinkUtil = new
StructuredStreamingSinkUtil(jsc, cfg);
+ streamingSinkUtil.run();
+ } catch (Throwable throwable) {
+ LOG.error("Fail to execute tpcds read benchmarks for " + cfg, throwable);
+ } finally {
+ jsc.stop();
+ }
+ }
+
+ public void run() {
+ try {
+ LOG.info(cfg.toString());
+ StructuredStreamingSinkTestWriter.triggerStreaming(sparkSession,
cfg.tableType, cfg.sourcePath, cfg.targetPath, cfg.checkpointPath,
+ cfg.tableName, cfg.partitionField, cfg.recordKeyField,
cfg.preCombineField);
+ StructuredStreamingSinkTestWriter.waitUntilCondition(1000 * 60 * 10,
1000 * 30);
+ } catch (Exception e) {
+ throw new HoodieException("Unable to test spark structured writes to
hudi " + cfg.targetPath, e);
+ } finally {
+ LOG.warn("Completing Spark Structured Streaming test");
+ }
+ }
+
+ public static SparkConf buildSparkConf(String appName, String defaultMaster)
{
+ return buildSparkConf(appName, defaultMaster, new HashMap<>());
+ }
+
+ private static SparkConf buildSparkConf(String appName, String
defaultMaster, Map<String, String> additionalConfigs) {
+ final SparkConf sparkConf = new SparkConf().setAppName(appName);
+ String master = sparkConf.get("spark.master", defaultMaster);
+ sparkConf.setMaster(master);
+ if (master.startsWith("yarn")) {
+ sparkConf.set("spark.eventLog.overwrite", "true");
+ sparkConf.set("spark.eventLog.enabled", "true");
+ }
+ sparkConf.set("spark.ui.port", "8090");
+ sparkConf.setIfMissing("spark.driver.maxResultSize", "2g");
+ sparkConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
+ sparkConf.set("spark.kryo.registrator",
"org.apache.spark.HoodieSparkKryoRegistrar");
+ sparkConf.set("spark.sql.extensions",
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
+ sparkConf.set("spark.hadoop.mapred.output.compress", "true");
+ sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true");
+ sparkConf.set("spark.hadoop.mapred.output.compression.codec",
"org.apache.hadoop.io.compress.GzipCodec");
+ sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
+
+ additionalConfigs.forEach(sparkConf::set);
+ return sparkConf;
+ }
+}
diff --git
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/streaming/StructuredStreamingSinkTestWriter.scala
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/streaming/StructuredStreamingSinkTestWriter.scala
new file mode 100644
index 00000000000..8eb3b469e93
--- /dev/null
+++
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/streaming/StructuredStreamingSinkTestWriter.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.integ.testsuite.streaming
+
+import org.apache.hudi.DataSourceWriteOptions._
+import
org.apache.hudi.config.HoodieWriteConfig.FAIL_ON_TIMELINE_ARCHIVING_ENABLE
+import org.apache.spark.sql.SparkSession
+import
org.apache.spark.sql.streaming.StreamingQueryListener.{QueryProgressEvent,
QueryStartedEvent, QueryTerminatedEvent}
+import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryListener,
Trigger}
+import org.apache.log4j.LogManager
+
+object StructuredStreamingSinkTestWriter {
+
+ private val log = LogManager.getLogger(getClass)
+ var validationComplete: Boolean = false;
+
+ def waitUntilCondition(): Unit = {
+ waitUntilCondition(1000 * 60 * 5, 500)
+ }
+
+ def waitUntilCondition(maxWaitTimeMs: Long, intervalTimeMs: Long): Unit = {
+ var waitSoFar: Long = 0;
+ while (waitSoFar < maxWaitTimeMs && !validationComplete) {
+ log.info("Waiting for " + intervalTimeMs + ". Total wait time " +
waitSoFar)
+ Thread.sleep(intervalTimeMs)
+ waitSoFar += intervalTimeMs
+ }
+ }
+
+ def triggerStreaming(spark: SparkSession, tableType: String, inputPath:
String, hudiPath: String, hudiCheckpointPath: String,
+ tableName: String, partitionPathField: String,
recordKeyField: String,
+ preCombineField: String): Unit = {
+
+ def validate(): Unit = {
+ log.info("Validation starting")
+ val inputDf = spark.read.format("parquet").load(inputPath)
+ val hudiDf = spark.read.format("hudi").load(hudiPath)
+ inputDf.registerTempTable("inputTbl")
+ hudiDf.registerTempTable("hudiTbl")
+ assert(spark.sql("select count(distinct " + partitionPathField + ", " +
recordKeyField + ") from inputTbl").count ==
+ spark.sql("select count(distinct " + partitionPathField + ", " +
recordKeyField + ") from hudiTbl").count)
+ validationComplete = true
+ log.info("Validation complete")
+ }
+
+ def shutdownListener(spark: SparkSession) = new StreamingQueryListener() {
+ override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
+ log.info("Query started: " + queryStarted.id)
+ }
+
+ override def onQueryTerminated(queryTerminated: QueryTerminatedEvent):
Unit = {
+ log.info("Query terminated! " + queryTerminated.id + ". Validating
input and hudi")
+ validate()
+ log.info("Data Validation complete")
+ }
+
+ override def onQueryProgress(queryProgressEvent: QueryProgressEvent):
Unit = {
+ if (queryProgressEvent.progress.numInputRows == 0) {
+ log.info("Stopping spark structured streaming as we have reached the
end")
+ spark.streams.active.foreach(_.stop())
+ }
+ }
+ }
+
+ spark.streams.addListener(shutdownListener(spark))
+ log.info("Starting to consume from source and writing to hudi ")
+
+ val inputDfSchema = spark.read.format("parquet").load(inputPath).schema
+ val parquetdf =
spark.readStream.option("spark.sql.streaming.schemaInference",
"true").option("maxFilesPerTrigger", "1")
+ .schema(inputDfSchema).parquet(inputPath)
+
+ val writer = parquetdf.writeStream.format("org.apache.hudi").
+ option(TABLE_TYPE.key, tableType).
+ option(PRECOMBINE_FIELD.key, preCombineField).
+ option(RECORDKEY_FIELD.key, recordKeyField).
+ option(PARTITIONPATH_FIELD.key, partitionPathField).
+ option(FAIL_ON_TIMELINE_ARCHIVING_ENABLE.key, false).
+ option(STREAMING_IGNORE_FAILED_BATCH.key, false).
+ option(STREAMING_RETRY_CNT.key, 0).
+ option("hoodie.table.name", tableName).
+ option("hoodie.compact.inline.max.delta.commits", "2").
+ option("checkpointLocation", hudiCheckpointPath).
+ outputMode(OutputMode.Append());
+
+ writer.trigger(Trigger.ProcessingTime(30000)).start(hudiPath);
+ }
+}