This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b3c834a242 [HUDI-3571] Spark datasource continuous ingestion tool 
(#5156)
b3c834a242 is described below

commit b3c834a2426197b69e9ce2120e8e404a99e4dd88
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Thu Apr 7 11:13:46 2022 -0700

    [HUDI-3571] Spark datasource continuous ingestion tool (#5156)
---
 .../SparkDataSourceContinuousIngestTool.java       | 167 +++++++++++++++++++++
 .../SparkDataSourceContinuousIngest.scala          |  92 ++++++++++++
 2 files changed, 259 insertions(+)

diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngestTool.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngestTool.java
new file mode 100644
index 0000000000..c4f782fe40
--- /dev/null
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngestTool.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.integ.testsuite;
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.integ.testsuite.SparkDataSourceContinuousIngest;
+import org.apache.hudi.utilities.HoodieRepairTool;
+import org.apache.hudi.utilities.IdentitySplitter;
+import org.apache.hudi.utilities.UtilHelpers;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * Sample command
+ *
+ * ./bin/spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.4 
--driver-memory 4g   --executor-memory 4g \
+ * --conf spark.serializer=org.apache.spark.serializer.KryoSerializer   --conf 
spark.sql.catalogImplementation=hive \
+ * --class org.apache.hudi.integ.testsuite.SparkDSContinuousIngestTool \
+ * 
${HUDI_ROOT_DIR}/packaging/hudi-integ-test-bundle/target/hudi-integ-test-bundle-0.11.0-SNAPSHOT.jar
 \
+ * --source-path file:${SOURCE_DIR}/spark_ds_continuous   
--checkpoint-file-path /tmp/hudi/checkpoint  \
+ * --base-path file:///tmp/hudi/tbl_path/   --props /tmp/hudi_props.out
+ *
+ * Contents of hudi.properties
+ *
+ * hoodie.insert.shuffle.parallelism=4
+ * hoodie.upsert.shuffle.parallelism=4
+ * hoodie.bulkinsert.shuffle.parallelism=4
+ * hoodie.delete.shuffle.parallelism=4
+ * hoodie.datasource.write.recordkey.field=VendorID
+ * hoodie.datasource.write.partitionpath.field=date_col
+ * hoodie.datasource.write.operation=upsert
+ * hoodie.datasource.write.precombine.field=tpep_pickup_datetime
+ * hoodie.metadata.enable=false
+ * hoodie.table.name=hudi_tbl
+ */
+
+public class SparkDataSourceContinuousIngestTool {
+
+  private static final Logger LOG = 
LogManager.getLogger(SparkDataSourceContinuousIngestTool.class);
+
+  private final Config cfg;
+  // Properties with source, hoodie client, key generator etc.
+  private TypedProperties props;
+  private HoodieSparkEngineContext context;
+  private SparkSession sparkSession;
+
+  public SparkDataSourceContinuousIngestTool(JavaSparkContext jsc, Config cfg) 
{
+    if (cfg.propsFilePath != null) {
+      cfg.propsFilePath = 
FSUtils.addSchemeIfLocalPath(cfg.propsFilePath).toString();
+    }
+    this.context = new HoodieSparkEngineContext(jsc);
+    this.sparkSession = 
SparkSession.builder().config(jsc.getConf()).getOrCreate();
+    this.cfg = cfg;
+    this.props = cfg.propsFilePath == null
+        ? UtilHelpers.buildProperties(cfg.configs)
+        : readConfigFromFileSystem(jsc, cfg);
+  }
+
+  public static void main(String[] args) {
+    final Config cfg = new Config();
+    JCommander cmd = new JCommander(cfg, null, args);
+    if (cfg.help || args.length == 0) {
+      cmd.usage();
+      System.exit(1);
+    }
+    final JavaSparkContext jsc = 
UtilHelpers.buildSparkContext("spark-datasource-continuous-ingestion-tool", 
cfg.sparkMaster, cfg.sparkMemory);
+    try {
+      new SparkDataSourceContinuousIngestTool(jsc, cfg).run();
+    } catch (Throwable throwable) {
+      LOG.error("Fail to run Continuous Ingestion for spark datasource " + 
cfg.basePath, throwable);
+    } finally {
+      jsc.stop();
+    }
+  }
+
+  public void run() {
+    try {
+      SparkDataSourceContinuousIngest sparkDataSourceContinuousIngest =
+          new SparkDataSourceContinuousIngest(sparkSession, 
context.getHadoopConf().get(), new Path(cfg.sourcePath), cfg.sparkFormat,
+              new Path(cfg.checkpointFilePath), new Path(cfg.basePath), 
getPropsAsMap(props),
+              cfg.minSyncIntervalSeconds);
+      sparkDataSourceContinuousIngest.startIngestion();
+    } finally {
+      sparkSession.stop();
+      context.getJavaSparkContext().stop();
+    }
+  }
+
+  private Map<String, String> getPropsAsMap(TypedProperties typedProperties) {
+    Map<String, String> props = new HashMap<>();
+    typedProperties.entrySet().forEach(entry -> 
props.put(entry.getKey().toString(), entry.getValue().toString()));
+    return props;
+  }
+
+  /**
+   * Reads config from the file system.
+   *
+   * @param jsc {@link JavaSparkContext} instance.
+   * @param cfg {@link HoodieRepairTool.Config} instance.
+   * @return the {@link TypedProperties} instance.
+   */
+  private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, 
Config cfg) {
+    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new 
Path(cfg.propsFilePath), cfg.configs)
+        .getProps(true);
+  }
+
+  public static class Config implements Serializable {
+    @Parameter(names = {"--source-path", "-sp"}, description = "Source path 
for the parquet data to consume", required = true)
+    public String sourcePath = null;
+    @Parameter(names = {"--source-format", "-sf"}, description = "source data 
format", required = false)
+    public String sparkFormat = "parquet";
+    @Parameter(names = {"--checkpoint-file-path", "-cpf"}, description = 
"Checkpoint file path to store/fetch checkpointing info", required = true)
+    public String checkpointFilePath = null;
+    @Parameter(names = {"--base-path", "-bp"}, description = "Base path for 
the hudi table", required = true)
+    public String basePath = null;
+    @Parameter(names = {"--spark-master", "-ms"}, description = "Spark 
master", required = false)
+    public String sparkMaster = null;
+    @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory 
to use", required = false)
+    public String sparkMemory = "1g";
+    @Parameter(names = {"--min-sync-interval-seconds"},
+        description = "the min sync interval of each sync in continuous mode")
+    public Integer minSyncIntervalSeconds = 0;
+    @Parameter(names = {"--help", "-h"}, help = true)
+    public Boolean help = false;
+
+    @Parameter(names = {"--props"}, description = "path to properties file on 
localfs or dfs, with configurations for "
+        + "hoodie client for table repair")
+    public String propsFilePath = null;
+
+    @Parameter(names = {"--hoodie-conf"}, description = "Any configuration 
that can be set in the properties file "
+        + "(using the CLI parameter \"--props\") can also be passed command 
line using this parameter. This can be repeated",
+        splitter = IdentitySplitter.class)
+    public List<String> configs = new ArrayList<>();
+  }
+}
diff --git 
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngest.scala
 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngest.scala
new file mode 100644
index 0000000000..550ff9776f
--- /dev/null
+++ 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngest.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.integ.testsuite
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
+import org.apache.log4j.LogManager
+import org.apache.spark.sql.{SaveMode, SparkSession}
+
+import java.io.Serializable
+
+class SparkDataSourceContinuousIngest(val spark: SparkSession, val conf: 
Configuration, val sourcePath: Path,
+                                      val sourceFormat: String, val 
checkpointFile: Path, hudiBasePath: Path, hudiOptions: java.util.Map[String, 
String],
+                                      minSyncIntervalSeconds: Long) extends 
Serializable {
+
+  private val log = LogManager.getLogger(getClass)
+
+  def startIngestion(): Unit = {
+    val fs = sourcePath.getFileSystem(conf)
+    var orderedBatch : Array[FileStatus] = null
+    if (fs.exists(checkpointFile)) {
+      log.info("Checkpoint file exists. ")
+      val checkpoint = 
spark.sparkContext.textFile(checkpointFile.toString).collect()(0)
+      log.warn("Checkpoint to resume from " + checkpoint)
+
+      orderedBatch = fetchListOfFilesToConsume(fs, sourcePath, new PathFilter {
+        override def accept(path: Path): Boolean = {
+          path.getName.toLong > checkpoint.toLong
+        }
+      })
+      if (log.isDebugEnabled) {
+        log.debug("List of batches to consume in order ")
+        orderedBatch.foreach(entry => log.warn(" " + entry.getPath.getName))
+      }
+
+    } else {
+      log.warn("No checkpoint file exists. Starting from scratch ")
+      orderedBatch = fetchListOfFilesToConsume(fs, sourcePath, new PathFilter {
+        override def accept(path: Path): Boolean = {
+          true
+        }
+      })
+      if (log.isDebugEnabled) {
+        log.debug("List of batches to consume in order ")
+        orderedBatch.foreach(entry => log.warn(" " + entry.getPath.getName))
+      }
+    }
+
+    orderedBatch.foreach(entry => {
+      log.info("Consuming from batch " + entry)
+      val pathToConsume = new Path(sourcePath.toString + "/" + 
entry.getPath.getName)
+      val df = spark.read.format(sourceFormat).load(pathToConsume.toString)
+
+      
df.write.format("hudi").options(hudiOptions).mode(SaveMode.Append).save(hudiBasePath.toString)
+      writeToFile(checkpointFile, entry.getPath.getName, fs)
+      log.info("Completed batch " + entry + ". Moving to next batch. Sleeping 
for " + minSyncIntervalSeconds + " secs before next batch")
+      Thread.sleep(minSyncIntervalSeconds * 1000)
+    })
+  }
+
+  def fetchListOfFilesToConsume(fs: FileSystem, basePath: Path, pathFilter: 
PathFilter): Array[FileStatus] = {
+    val nextBatches = fs.listStatus(basePath, pathFilter)
+    nextBatches.sortBy(fileStatus => fileStatus.getPath.getName.toLong)
+  }
+
+  def writeToFile(checkpointFilePath: Path, str: String, fs: FileSystem): Unit 
= {
+    if (!fs.exists(checkpointFilePath)) {
+      fs.create(checkpointFilePath)
+    }
+    val fsOutStream = fs.create(checkpointFilePath, true)
+    fsOutStream.writeBytes(str)
+    fsOutStream.flush()
+    fsOutStream.close()
+  }
+}

Reply via email to