This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ddaef8feddb [HUDI-5101] Adding spark-structured streaming test support 
via spark-submit job (#7074)
ddaef8feddb is described below

commit ddaef8feddb222f609a39a16688532cf8821c4e1
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sat Mar 9 09:33:10 2024 -0800

    [HUDI-5101] Adding spark-structured streaming test support via spark-submit 
job (#7074)
    
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../streaming/StructuredStreamingSinkUtil.java     | 168 +++++++++++++++++++++
 .../StructuredStreamingSinkTestWriter.scala        | 104 +++++++++++++
 2 files changed, 272 insertions(+)

diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/streaming/StructuredStreamingSinkUtil.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/streaming/StructuredStreamingSinkUtil.java
new file mode 100644
index 00000000000..f6fec62cb3b
--- /dev/null
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/streaming/StructuredStreamingSinkUtil.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.integ.testsuite.streaming;
+
+import org.apache.hudi.exception.HoodieException;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Saprk-submit to test spark streaming
+ * 
+ * Sample command.
+ * ./bin/spark-submit --master local[2]  --driver-memory 1g  --executor-memory 
1g \
+ * --class org.apache.hudi.streaming.StructuredStreamingSinkUtil  PATH TO 
hudi-integ-test-bundle-0.13.0-SNAPSHOT.jar \
+ * --spark-master local[2] \
+ * --source-path /tmp/parquet_ny/ \
+ * --target-path /tmp/hudi_streaming_kafka10/MERGE_ON_READ3/ \
+ * --checkpoint-path /tmp/hudi_streaming_kafka10/checkpoint_mor3/ \
+ * --table-type COPY_ON_WRITE \
+ * --partition-field date_col \
+ * --record-key-field tpep_pickup_datetime \
+ * --pre-combine-field tpep_dropoff_datetime \
+ * --table-name test_tbl
+ *
+ * Ensure "source-path" has parquet data.
+ */
+public class StructuredStreamingSinkUtil implements Serializable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(StructuredStreamingSinkUtil.class);
+
+  private transient JavaSparkContext jsc;
+  private SparkSession sparkSession;
+  private Config cfg;
+
+  public StructuredStreamingSinkUtil(JavaSparkContext jsc, Config cfg) {
+    this.jsc = jsc;
+    this.sparkSession = 
SparkSession.builder().config(jsc.getConf()).getOrCreate();
+    this.cfg = cfg;
+  }
+
+  public static class Config implements Serializable {
+    @Parameter(names = {"--source-path", "-sp"}, description = "Source path to 
consume data from", required = true)
+    public String sourcePath = null;
+
+    @Parameter(names = {"--target-path", "-tp"}, description = "Target path of 
the table of interest.", required = true)
+    public String targetPath = null;
+
+    @Parameter(names = {"--table-type", "-ty"}, description = "Target path of 
the table of interest.", required = true)
+    public String tableType = "COPY_ON_WRITE";
+
+    @Parameter(names = {"--checkpoint-path", "-cp"}, description = "Checkppint 
path of the table of interest", required = true)
+    public String checkpointPath = null;
+
+    @Parameter(names = {"--partition-field", "-pp"}, description = 
"Partitioning field", required = true)
+    public String partitionField = null;
+
+    @Parameter(names = {"--record-key-field", "-rk"}, description = "record 
key field", required = true)
+    public String recordKeyField = null;
+
+    @Parameter(names = {"--pre-combine-field", "-pc"}, description = 
"Precombine field", required = true)
+    public String preCombineField = null;
+
+    @Parameter(names = {"--table-name", "-tn"}, description = "Table name", 
required = true)
+    public String tableName = null;
+
+    @Parameter(names = {"--disable-metadata", "-dmdt"}, description = "Disable 
metadata while querying", required = false)
+    public Boolean disableMetadata = false;
+
+    @Parameter(names = {"--spark-master", "-ms"}, description = "Spark 
master", required = false)
+    public String sparkMaster = null;
+
+    @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory 
to use", required = false)
+    public String sparkMemory = "1g";
+
+    @Parameter(names = {"--help", "-h"}, help = true)
+    public Boolean help = false;
+
+  }
+
+  public static void main(String[] args) {
+    final Config cfg = new Config();
+    JCommander cmd = new JCommander(cfg, null, args);
+
+    if (cfg.help || args.length == 0) {
+      cmd.usage();
+      System.exit(1);
+    }
+
+    SparkConf sparkConf = buildSparkConf("Spark-structured-streaming-test", 
cfg.sparkMaster);
+    sparkConf.set("spark.executor.memory", cfg.sparkMemory);
+    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+
+    try {
+      StructuredStreamingSinkUtil streamingSinkUtil = new 
StructuredStreamingSinkUtil(jsc, cfg);
+      streamingSinkUtil.run();
+    } catch (Throwable throwable) {
+      LOG.error("Fail to execute tpcds read benchmarks for " + cfg, throwable);
+    } finally {
+      jsc.stop();
+    }
+  }
+
+  public void run() {
+    try {
+      LOG.info(cfg.toString());
+      StructuredStreamingSinkTestWriter.triggerStreaming(sparkSession, 
cfg.tableType, cfg.sourcePath, cfg.targetPath, cfg.checkpointPath,
+          cfg.tableName, cfg.partitionField, cfg.recordKeyField, 
cfg.preCombineField);
+      StructuredStreamingSinkTestWriter.waitUntilCondition(1000 * 60 * 10, 
1000 * 30);
+    } catch (Exception e) {
+      throw new HoodieException("Unable to test spark structured writes to 
hudi " + cfg.targetPath, e);
+    } finally {
+      LOG.warn("Completing Spark Structured Streaming test");
+    }
+  }
+
+  public static SparkConf buildSparkConf(String appName, String defaultMaster) 
{
+    return buildSparkConf(appName, defaultMaster, new HashMap<>());
+  }
+
+  private static SparkConf buildSparkConf(String appName, String 
defaultMaster, Map<String, String> additionalConfigs) {
+    final SparkConf sparkConf = new SparkConf().setAppName(appName);
+    String master = sparkConf.get("spark.master", defaultMaster);
+    sparkConf.setMaster(master);
+    if (master.startsWith("yarn")) {
+      sparkConf.set("spark.eventLog.overwrite", "true");
+      sparkConf.set("spark.eventLog.enabled", "true");
+    }
+    sparkConf.set("spark.ui.port", "8090");
+    sparkConf.setIfMissing("spark.driver.maxResultSize", "2g");
+    sparkConf.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer");
+    sparkConf.set("spark.kryo.registrator", 
"org.apache.spark.HoodieSparkKryoRegistrar");
+    sparkConf.set("spark.sql.extensions", 
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
+    sparkConf.set("spark.hadoop.mapred.output.compress", "true");
+    sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true");
+    sparkConf.set("spark.hadoop.mapred.output.compression.codec", 
"org.apache.hadoop.io.compress.GzipCodec");
+    sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
+
+    additionalConfigs.forEach(sparkConf::set);
+    return sparkConf;
+  }
+}
diff --git 
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/streaming/StructuredStreamingSinkTestWriter.scala
 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/streaming/StructuredStreamingSinkTestWriter.scala
new file mode 100644
index 00000000000..8eb3b469e93
--- /dev/null
+++ 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/streaming/StructuredStreamingSinkTestWriter.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.integ.testsuite.streaming
+
+import org.apache.hudi.DataSourceWriteOptions._
+import 
org.apache.hudi.config.HoodieWriteConfig.FAIL_ON_TIMELINE_ARCHIVING_ENABLE
+import org.apache.spark.sql.SparkSession
+import 
org.apache.spark.sql.streaming.StreamingQueryListener.{QueryProgressEvent, 
QueryStartedEvent, QueryTerminatedEvent}
+import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryListener, 
Trigger}
+import org.apache.log4j.LogManager
+
+object StructuredStreamingSinkTestWriter {
+
+  private val log = LogManager.getLogger(getClass)
+  var validationComplete: Boolean = false;
+
+  def waitUntilCondition(): Unit = {
+    waitUntilCondition(1000 * 60 * 5, 500)
+  }
+
+  def waitUntilCondition(maxWaitTimeMs: Long, intervalTimeMs: Long): Unit = {
+    var waitSoFar: Long = 0;
+    while (waitSoFar < maxWaitTimeMs && !validationComplete) {
+      log.info("Waiting for " + intervalTimeMs + ". Total wait time " + 
waitSoFar)
+      Thread.sleep(intervalTimeMs)
+      waitSoFar += intervalTimeMs
+    }
+  }
+
+  def triggerStreaming(spark: SparkSession, tableType: String, inputPath: 
String, hudiPath: String, hudiCheckpointPath: String,
+                       tableName: String, partitionPathField: String, 
recordKeyField: String,
+                       preCombineField: String): Unit = {
+
+    def validate(): Unit = {
+      log.info("Validation starting")
+      val inputDf = spark.read.format("parquet").load(inputPath)
+      val hudiDf = spark.read.format("hudi").load(hudiPath)
+      inputDf.registerTempTable("inputTbl")
+      hudiDf.registerTempTable("hudiTbl")
+      assert(spark.sql("select count(distinct " + partitionPathField + ", " + 
recordKeyField + ") from inputTbl").count ==
+        spark.sql("select count(distinct " + partitionPathField + ", " + 
recordKeyField + ") from hudiTbl").count)
+      validationComplete = true
+      log.info("Validation complete")
+    }
+
+    def shutdownListener(spark: SparkSession) = new StreamingQueryListener() {
+      override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
+        log.info("Query started: " + queryStarted.id)
+      }
+
+      override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): 
Unit = {
+        log.info("Query terminated! " + queryTerminated.id + ". Validating 
input and hudi")
+        validate()
+        log.info("Data Validation complete")
+      }
+
+      override def onQueryProgress(queryProgressEvent: QueryProgressEvent): 
Unit = {
+        if (queryProgressEvent.progress.numInputRows == 0) {
+          log.info("Stopping spark structured streaming as we have reached the 
end")
+          spark.streams.active.foreach(_.stop())
+        }
+      }
+    }
+
+    spark.streams.addListener(shutdownListener(spark))
+    log.info("Starting to consume from source and writing to hudi ")
+
+    val inputDfSchema = spark.read.format("parquet").load(inputPath).schema
+    val parquetdf = 
spark.readStream.option("spark.sql.streaming.schemaInference", 
"true").option("maxFilesPerTrigger", "1")
+      .schema(inputDfSchema).parquet(inputPath)
+
+    val writer = parquetdf.writeStream.format("org.apache.hudi").
+      option(TABLE_TYPE.key, tableType).
+      option(PRECOMBINE_FIELD.key, preCombineField).
+      option(RECORDKEY_FIELD.key, recordKeyField).
+      option(PARTITIONPATH_FIELD.key, partitionPathField).
+      option(FAIL_ON_TIMELINE_ARCHIVING_ENABLE.key, false).
+      option(STREAMING_IGNORE_FAILED_BATCH.key, false).
+      option(STREAMING_RETRY_CNT.key, 0).
+      option("hoodie.table.name", tableName).
+      option("hoodie.compact.inline.max.delta.commits", "2").
+      option("checkpointLocation", hudiCheckpointPath).
+      outputMode(OutputMode.Append());
+
+    writer.trigger(Trigger.ProcessingTime(30000)).start(hudiPath);
+  }
+}

Reply via email to