This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b3c834a242 [HUDI-3571] Spark datasource continuous ingestion tool
(#5156)
b3c834a242 is described below
commit b3c834a2426197b69e9ce2120e8e404a99e4dd88
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Thu Apr 7 11:13:46 2022 -0700
[HUDI-3571] Spark datasource continuous ingestion tool (#5156)
---
.../SparkDataSourceContinuousIngestTool.java | 167 +++++++++++++++++++++
.../SparkDataSourceContinuousIngest.scala | 92 ++++++++++++
2 files changed, 259 insertions(+)
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngestTool.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngestTool.java
new file mode 100644
index 0000000000..c4f782fe40
--- /dev/null
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngestTool.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.integ.testsuite;
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.integ.testsuite.SparkDataSourceContinuousIngest;
+import org.apache.hudi.utilities.HoodieRepairTool;
+import org.apache.hudi.utilities.IdentitySplitter;
+import org.apache.hudi.utilities.UtilHelpers;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * Sample command
+ *
+ * ./bin/spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.4
--driver-memory 4g --executor-memory 4g \
+ * --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf
spark.sql.catalogImplementation=hive \
+ * --class org.apache.hudi.integ.testsuite.SparkDSContinuousIngestTool \
+ *
${HUDI_ROOT_DIR}/packaging/hudi-integ-test-bundle/target/hudi-integ-test-bundle-0.11.0-SNAPSHOT.jar
\
+ * --source-path file:${SOURCE_DIR}/spark_ds_continuous
--checkpoint-file-path /tmp/hudi/checkpoint \
+ * --base-path file:///tmp/hudi/tbl_path/ --props /tmp/hudi_props.out
+ *
+ * Contents of hudi.properties
+ *
+ * hoodie.insert.shuffle.parallelism=4
+ * hoodie.upsert.shuffle.parallelism=4
+ * hoodie.bulkinsert.shuffle.parallelism=4
+ * hoodie.delete.shuffle.parallelism=4
+ * hoodie.datasource.write.recordkey.field=VendorID
+ * hoodie.datasource.write.partitionpath.field=date_col
+ * hoodie.datasource.write.operation=upsert
+ * hoodie.datasource.write.precombine.field=tpep_pickup_datetime
+ * hoodie.metadata.enable=false
+ * hoodie.table.name=hudi_tbl
+ */
+
+public class SparkDataSourceContinuousIngestTool {
+
+ private static final Logger LOG =
LogManager.getLogger(SparkDataSourceContinuousIngestTool.class);
+
+ private final Config cfg;
+ // Properties with source, hoodie client, key generator etc.
+ private TypedProperties props;
+ private HoodieSparkEngineContext context;
+ private SparkSession sparkSession;
+
+ public SparkDataSourceContinuousIngestTool(JavaSparkContext jsc, Config cfg)
{
+ if (cfg.propsFilePath != null) {
+ cfg.propsFilePath =
FSUtils.addSchemeIfLocalPath(cfg.propsFilePath).toString();
+ }
+ this.context = new HoodieSparkEngineContext(jsc);
+ this.sparkSession =
SparkSession.builder().config(jsc.getConf()).getOrCreate();
+ this.cfg = cfg;
+ this.props = cfg.propsFilePath == null
+ ? UtilHelpers.buildProperties(cfg.configs)
+ : readConfigFromFileSystem(jsc, cfg);
+ }
+
+ public static void main(String[] args) {
+ final Config cfg = new Config();
+ JCommander cmd = new JCommander(cfg, null, args);
+ if (cfg.help || args.length == 0) {
+ cmd.usage();
+ System.exit(1);
+ }
+ final JavaSparkContext jsc =
UtilHelpers.buildSparkContext("spark-datasource-continuous-ingestion-tool",
cfg.sparkMaster, cfg.sparkMemory);
+ try {
+ new SparkDataSourceContinuousIngestTool(jsc, cfg).run();
+ } catch (Throwable throwable) {
+ LOG.error("Fail to run Continuous Ingestion for spark datasource " +
cfg.basePath, throwable);
+ } finally {
+ jsc.stop();
+ }
+ }
+
+ public void run() {
+ try {
+ SparkDataSourceContinuousIngest sparkDataSourceContinuousIngest =
+ new SparkDataSourceContinuousIngest(sparkSession,
context.getHadoopConf().get(), new Path(cfg.sourcePath), cfg.sparkFormat,
+ new Path(cfg.checkpointFilePath), new Path(cfg.basePath),
getPropsAsMap(props),
+ cfg.minSyncIntervalSeconds);
+ sparkDataSourceContinuousIngest.startIngestion();
+ } finally {
+ sparkSession.stop();
+ context.getJavaSparkContext().stop();
+ }
+ }
+
+ private Map<String, String> getPropsAsMap(TypedProperties typedProperties) {
+ Map<String, String> props = new HashMap<>();
+ typedProperties.entrySet().forEach(entry ->
props.put(entry.getKey().toString(), entry.getValue().toString()));
+ return props;
+ }
+
+ /**
+ * Reads config from the file system.
+ *
+ * @param jsc {@link JavaSparkContext} instance.
+ * @param cfg {@link HoodieRepairTool.Config} instance.
+ * @return the {@link TypedProperties} instance.
+ */
+ private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc,
Config cfg) {
+ return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new
Path(cfg.propsFilePath), cfg.configs)
+ .getProps(true);
+ }
+
+ public static class Config implements Serializable {
+ @Parameter(names = {"--source-path", "-sp"}, description = "Source path
for the parquet data to consume", required = true)
+ public String sourcePath = null;
+ @Parameter(names = {"--source-format", "-sf"}, description = "source data
format", required = false)
+ public String sparkFormat = "parquet";
+ @Parameter(names = {"--checkpoint-file-path", "-cpf"}, description =
"Checkpoint file path to store/fetch checkpointing info", required = true)
+ public String checkpointFilePath = null;
+ @Parameter(names = {"--base-path", "-bp"}, description = "Base path for
the hudi table", required = true)
+ public String basePath = null;
+ @Parameter(names = {"--spark-master", "-ms"}, description = "Spark
master", required = false)
+ public String sparkMaster = null;
+ @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory
to use", required = false)
+ public String sparkMemory = "1g";
+ @Parameter(names = {"--min-sync-interval-seconds"},
+ description = "the min sync interval of each sync in continuous mode")
+ public Integer minSyncIntervalSeconds = 0;
+ @Parameter(names = {"--help", "-h"}, help = true)
+ public Boolean help = false;
+
+ @Parameter(names = {"--props"}, description = "path to properties file on
localfs or dfs, with configurations for "
+ + "hoodie client for table repair")
+ public String propsFilePath = null;
+
+ @Parameter(names = {"--hoodie-conf"}, description = "Any configuration
that can be set in the properties file "
+ + "(using the CLI parameter \"--props\") can also be passed command
line using this parameter. This can be repeated",
+ splitter = IdentitySplitter.class)
+ public List<String> configs = new ArrayList<>();
+ }
+}
diff --git
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngest.scala
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngest.scala
new file mode 100644
index 0000000000..550ff9776f
--- /dev/null
+++
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngest.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.integ.testsuite
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
+import org.apache.log4j.LogManager
+import org.apache.spark.sql.{SaveMode, SparkSession}
+
+import java.io.Serializable
+
+class SparkDataSourceContinuousIngest(val spark: SparkSession, val conf:
Configuration, val sourcePath: Path,
+ val sourceFormat: String, val
checkpointFile: Path, hudiBasePath: Path, hudiOptions: java.util.Map[String,
String],
+ minSyncIntervalSeconds: Long) extends
Serializable {
+
+ private val log = LogManager.getLogger(getClass)
+
+ def startIngestion(): Unit = {
+ val fs = sourcePath.getFileSystem(conf)
+ var orderedBatch : Array[FileStatus] = null
+ if (fs.exists(checkpointFile)) {
+ log.info("Checkpoint file exists. ")
+ val checkpoint =
spark.sparkContext.textFile(checkpointFile.toString).collect()(0)
+ log.warn("Checkpoint to resume from " + checkpoint)
+
+ orderedBatch = fetchListOfFilesToConsume(fs, sourcePath, new PathFilter {
+ override def accept(path: Path): Boolean = {
+ path.getName.toLong > checkpoint.toLong
+ }
+ })
+ if (log.isDebugEnabled) {
+ log.debug("List of batches to consume in order ")
+ orderedBatch.foreach(entry => log.warn(" " + entry.getPath.getName))
+ }
+
+ } else {
+ log.warn("No checkpoint file exists. Starting from scratch ")
+ orderedBatch = fetchListOfFilesToConsume(fs, sourcePath, new PathFilter {
+ override def accept(path: Path): Boolean = {
+ true
+ }
+ })
+ if (log.isDebugEnabled) {
+ log.debug("List of batches to consume in order ")
+ orderedBatch.foreach(entry => log.warn(" " + entry.getPath.getName))
+ }
+ }
+
+ orderedBatch.foreach(entry => {
+ log.info("Consuming from batch " + entry)
+ val pathToConsume = new Path(sourcePath.toString + "/" +
entry.getPath.getName)
+ val df = spark.read.format(sourceFormat).load(pathToConsume.toString)
+
+
df.write.format("hudi").options(hudiOptions).mode(SaveMode.Append).save(hudiBasePath.toString)
+ writeToFile(checkpointFile, entry.getPath.getName, fs)
+ log.info("Completed batch " + entry + ". Moving to next batch. Sleeping
for " + minSyncIntervalSeconds + " secs before next batch")
+ Thread.sleep(minSyncIntervalSeconds * 1000)
+ })
+ }
+
+ def fetchListOfFilesToConsume(fs: FileSystem, basePath: Path, pathFilter:
PathFilter): Array[FileStatus] = {
+ val nextBatches = fs.listStatus(basePath, pathFilter)
+ nextBatches.sortBy(fileStatus => fileStatus.getPath.getName.toLong)
+ }
+
+ def writeToFile(checkpointFilePath: Path, str: String, fs: FileSystem): Unit
= {
+ if (!fs.exists(checkpointFilePath)) {
+ fs.create(checkpointFilePath)
+ }
+ val fsOutStream = fs.create(checkpointFilePath, true)
+ fsOutStream.writeBytes(str)
+ fsOutStream.flush()
+ fsOutStream.close()
+ }
+}