This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1725b0915ad [HUDI-6041] add `options` input to Bootstrap Procedure for
passing hudi properties (#8387)
1725b0915ad is described below
commit 1725b0915ad0d97de19c50ae9c06cf57951471c8
Author: Kunni <[email protected]>
AuthorDate: Fri May 26 14:14:20 2023 +0800
[HUDI-6041] add `options` input to Bootstrap Procedure for passing hudi
properties (#8387)
---
.../command/procedures/RunBootstrapProcedure.scala | 17 +++++-
.../hudi/procedure/TestBootstrapProcedure.scala | 69 ++++++++++++++++++++++
2 files changed, 85 insertions(+), 1 deletion(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala
index bb93cf1a485..458d618e92b 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala
@@ -52,7 +52,9 @@ class RunBootstrapProcedure extends BaseProcedure with
ProcedureBuilder with Log
ProcedureParameter.optional(13, "parallelism", DataTypes.IntegerType,
1500),
ProcedureParameter.optional(14, "enable_hive_sync", DataTypes.BooleanType,
false),
ProcedureParameter.optional(15, "props_file_path", DataTypes.StringType,
""),
- ProcedureParameter.optional(16, "bootstrap_overwrite",
DataTypes.BooleanType, false)
+ ProcedureParameter.optional(16, "bootstrap_overwrite",
DataTypes.BooleanType, false),
+ // params => key=value, key2=value2
+ ProcedureParameter.optional(17, "options", DataTypes.StringType)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -83,6 +85,7 @@ class RunBootstrapProcedure extends BaseProcedure with
ProcedureBuilder with Log
val enableHiveSync = getArgValueOrDefault(args,
PARAMETERS(14)).get.asInstanceOf[Boolean]
val propsFilePath = getArgValueOrDefault(args,
PARAMETERS(15)).get.asInstanceOf[String]
val bootstrapOverwrite = getArgValueOrDefault(args,
PARAMETERS(16)).get.asInstanceOf[Boolean]
+ val options = getArgValueOrDefault(args, PARAMETERS(17))
val (tableName, database) =
HoodieCLIUtils.getTableIdentifier(table.get.asInstanceOf[String])
val configs: util.List[String] = new util.ArrayList[String]
@@ -121,6 +124,18 @@ class RunBootstrapProcedure extends BaseProcedure with
ProcedureBuilder with Log
// add session bootstrap conf
TypedProperties.putAll(properties,
spark.sqlContext.conf.getAllConfs.asJava)
+
+ // add conf from procedure, may overwrite session conf
+ options match {
+ case Some(p) =>
+ val paramPairs = HoodieCLIUtils.extractOptions(p.asInstanceOf[String])
+ paramPairs.foreach { pair =>
+ properties.setProperty(pair._1, pair._2)
+ }
+ case _ =>
+ logInfo("No options")
+ }
+
new BootstrapExecutorUtils(cfg, jsc, fs, jsc.hadoopConfiguration,
properties).execute()
Seq(Row(0))
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala
index 31a1de80da7..a8ac9b5e317 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala
@@ -19,7 +19,9 @@ package org.apache.spark.sql.hudi.procedure
import org.apache.hadoop.fs.Path
import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.functional.TestBootstrap
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.{Dataset, Row}
@@ -93,6 +95,73 @@ class TestBootstrapProcedure extends
HoodieSparkProcedureTestBase {
}
}
+ test("Test Call run_bootstrap Procedure with properties") {
+ withTempDir { tmp =>
+ val NUM_OF_RECORDS = 100
+ val PARTITION_FIELD = "datestr"
+ val RECORD_KEY_FIELD = "_row_key"
+
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}"
+
+ val srcName: String = "source"
+ val sourcePath = basePath + Path.SEPARATOR + srcName
+ val tablePath = basePath + Path.SEPARATOR + tableName
+ val jsc = new JavaSparkContext(spark.sparkContext)
+
+ // generate test data
+ val partitions = util.Arrays.asList("2018", "2019", "2020")
+ val timestamp: Long = Instant.now.toEpochMilli
+ for (i <- 0 until partitions.size) {
+ val df: Dataset[Row] =
TestBootstrap.generateTestRawTripDataset(timestamp, i * NUM_OF_RECORDS, i *
NUM_OF_RECORDS + NUM_OF_RECORDS, null, jsc, spark.sqlContext)
+ df.write.parquet(sourcePath + Path.SEPARATOR + PARTITION_FIELD + "=" +
partitions.get(i))
+ }
+
+ spark.sql("set hoodie.bootstrap.parallelism = 20")
+ checkAnswer(
+ s"""call run_bootstrap(
+ |table => '$tableName',
+ |base_path => '$tablePath',
+ |table_type => '${HoodieTableType.COPY_ON_WRITE.name}',
+ |bootstrap_path => '$sourcePath',
+ |rowKey_field => '$RECORD_KEY_FIELD',
+ |partition_path_field => '$PARTITION_FIELD',
+ |options => 'hoodie.datasource.write.hive_style_partitioning=true',
+ |bootstrap_overwrite => true)""".stripMargin) {
+ Seq(0)
+ }
+
+ // create table
+ spark.sql(
+ s"""
+ |create table $tableName using hudi
+ |location '$tablePath'
+ |tblproperties(primaryKey = '$RECORD_KEY_FIELD')
+ |""".stripMargin)
+
+ // show bootstrap's index partitions
+ var result = spark.sql(s"""call show_bootstrap_partitions(table =>
'$tableName')""".stripMargin).collect()
+ assertResult(3) {
+ result.length
+ }
+
+ // show bootstrap's index mapping
+ result = spark.sql(
+ s"""call show_bootstrap_mapping(table =>
'$tableName')""".stripMargin).collect()
+ assertResult(10) {
+ result.length
+ }
+
+ val metaClient = HoodieTableMetaClient.builder().setBasePath(tablePath)
+ .setConf(spark.sessionState.newHadoopConf()).build()
+
+ assertResult("true") {
+
metaClient.getTableConfig.getString(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE)
+ };
+
+ }
+ }
+
test("Test Call run_bootstrap Procedure with no-partitioned") {
withTempDir { tmp =>
val NUM_OF_RECORDS = 100