This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1725b0915ad [HUDI-6041] add `options` input to Bootstrap Procedure for 
passing hudi properties (#8387)
1725b0915ad is described below

commit 1725b0915ad0d97de19c50ae9c06cf57951471c8
Author: Kunni <[email protected]>
AuthorDate: Fri May 26 14:14:20 2023 +0800

    [HUDI-6041] add `options` input to Bootstrap Procedure for passing hudi 
properties (#8387)
---
 .../command/procedures/RunBootstrapProcedure.scala | 17 +++++-
 .../hudi/procedure/TestBootstrapProcedure.scala    | 69 ++++++++++++++++++++++
 2 files changed, 85 insertions(+), 1 deletion(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala
index bb93cf1a485..458d618e92b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala
@@ -52,7 +52,9 @@ class RunBootstrapProcedure extends BaseProcedure with 
ProcedureBuilder with Log
     ProcedureParameter.optional(13, "parallelism", DataTypes.IntegerType, 
1500),
     ProcedureParameter.optional(14, "enable_hive_sync", DataTypes.BooleanType, 
false),
     ProcedureParameter.optional(15, "props_file_path", DataTypes.StringType, 
""),
-    ProcedureParameter.optional(16, "bootstrap_overwrite", 
DataTypes.BooleanType, false)
+    ProcedureParameter.optional(16, "bootstrap_overwrite", 
DataTypes.BooleanType, false),
+    // params => key=value, key2=value2
+    ProcedureParameter.optional(17, "options", DataTypes.StringType)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -83,6 +85,7 @@ class RunBootstrapProcedure extends BaseProcedure with 
ProcedureBuilder with Log
     val enableHiveSync = getArgValueOrDefault(args, 
PARAMETERS(14)).get.asInstanceOf[Boolean]
     val propsFilePath = getArgValueOrDefault(args, 
PARAMETERS(15)).get.asInstanceOf[String]
     val bootstrapOverwrite = getArgValueOrDefault(args, 
PARAMETERS(16)).get.asInstanceOf[Boolean]
+    val options = getArgValueOrDefault(args, PARAMETERS(17))
 
     val (tableName, database) = 
HoodieCLIUtils.getTableIdentifier(table.get.asInstanceOf[String])
     val configs: util.List[String] = new util.ArrayList[String]
@@ -121,6 +124,18 @@ class RunBootstrapProcedure extends BaseProcedure with 
ProcedureBuilder with Log
 
     // add session bootstrap conf
     TypedProperties.putAll(properties, 
spark.sqlContext.conf.getAllConfs.asJava)
+
+    // add conf from procedure, may overwrite session conf
+    options match {
+      case Some(p) =>
+        val paramPairs = HoodieCLIUtils.extractOptions(p.asInstanceOf[String])
+        paramPairs.foreach { pair =>
+          properties.setProperty(pair._1, pair._2)
+        }
+      case _ =>
+        logInfo("No options")
+    }
+
     new BootstrapExecutorUtils(cfg, jsc, fs, jsc.hadoopConfiguration, 
properties).execute()
     Seq(Row(0))
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala
index 31a1de80da7..a8ac9b5e317 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala
@@ -19,7 +19,9 @@ package org.apache.spark.sql.hudi.procedure
 
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.functional.TestBootstrap
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql.{Dataset, Row}
 
@@ -93,6 +95,73 @@ class TestBootstrapProcedure extends 
HoodieSparkProcedureTestBase {
     }
   }
 
+  test("Test Call run_bootstrap Procedure with properties") {
+    withTempDir { tmp =>
+      val NUM_OF_RECORDS = 100
+      val PARTITION_FIELD = "datestr"
+      val RECORD_KEY_FIELD = "_row_key"
+
+      val tableName = generateTableName
+      val basePath = s"${tmp.getCanonicalPath}"
+
+      val srcName: String = "source"
+      val sourcePath = basePath + Path.SEPARATOR + srcName
+      val tablePath = basePath + Path.SEPARATOR + tableName
+      val jsc = new JavaSparkContext(spark.sparkContext)
+
+      // generate test data
+      val partitions = util.Arrays.asList("2018", "2019", "2020")
+      val timestamp: Long = Instant.now.toEpochMilli
+      for (i <- 0 until partitions.size) {
+        val df: Dataset[Row] = 
TestBootstrap.generateTestRawTripDataset(timestamp, i * NUM_OF_RECORDS, i * 
NUM_OF_RECORDS + NUM_OF_RECORDS, null, jsc, spark.sqlContext)
+        df.write.parquet(sourcePath + Path.SEPARATOR + PARTITION_FIELD + "=" + 
partitions.get(i))
+      }
+
+      spark.sql("set hoodie.bootstrap.parallelism = 20")
+      checkAnswer(
+        s"""call run_bootstrap(
+           |table => '$tableName',
+           |base_path => '$tablePath',
+           |table_type => '${HoodieTableType.COPY_ON_WRITE.name}',
+           |bootstrap_path => '$sourcePath',
+           |rowKey_field => '$RECORD_KEY_FIELD',
+           |partition_path_field => '$PARTITION_FIELD',
+           |options => 'hoodie.datasource.write.hive_style_partitioning=true',
+           |bootstrap_overwrite => true)""".stripMargin) {
+        Seq(0)
+      }
+
+      // create table
+      spark.sql(
+        s"""
+           |create table $tableName using hudi
+           |location '$tablePath'
+           |tblproperties(primaryKey = '$RECORD_KEY_FIELD')
+           |""".stripMargin)
+
+      // show bootstrap's index partitions
+      var result = spark.sql(s"""call show_bootstrap_partitions(table => 
'$tableName')""".stripMargin).collect()
+      assertResult(3) {
+        result.length
+      }
+
+      // show bootstrap's index mapping
+      result = spark.sql(
+        s"""call show_bootstrap_mapping(table => 
'$tableName')""".stripMargin).collect()
+      assertResult(10) {
+        result.length
+      }
+
+      val metaClient = HoodieTableMetaClient.builder().setBasePath(tablePath)
+        .setConf(spark.sessionState.newHadoopConf()).build()
+
+      assertResult("true") {
+        
metaClient.getTableConfig.getString(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE)
+      };
+
+    }
+  }
+
   test("Test Call run_bootstrap Procedure with no-partitioned") {
     withTempDir { tmp =>
       val NUM_OF_RECORDS = 100

Reply via email to