This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 502933d2f70 [HUDI-5512] fix spark call procedure run_bootstrap missing 
conf and conf can not take effect (#7621)
502933d2f70 is described below

commit 502933d2f70acb0f0fcf930fef70c6ab9127fb3b
Author: KnightChess <[email protected]>
AuthorDate: Sun Jan 15 14:11:53 2023 +0800

    [HUDI-5512] fix spark call procedure run_bootstrap missing conf and conf 
can not take effect (#7621)
---
 .../apache/hudi/cli/BootstrapExecutorUtils.java    | 89 +++++++++++++++++++++-
 .../command/procedures/RunBootstrapProcedure.scala |  3 +
 .../hudi/procedure/TestBootstrapProcedure.scala    | 50 ++++++++++++
 3 files changed, 139 insertions(+), 3 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
index 97d3cfc4416..e398858f84e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
@@ -22,12 +22,15 @@ import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieTimelineTimeZone;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieBootstrapConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -35,6 +38,12 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.hive.HiveSyncTool;
 import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
+import org.apache.hudi.keygen.SimpleKeyGenerator;
+import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator;
+import org.apache.hudi.keygen.TimestampBasedKeyGenerator;
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
+import org.apache.hudi.util.SparkKeyGenUtils;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -45,13 +54,23 @@ import org.apache.spark.api.java.JavaSparkContext;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
 
 import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
+import static org.apache.hudi.common.table.HoodieTableConfig.TIMELINE_TIMEZONE;
 import static org.apache.hudi.config.HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD;
 import static 
org.apache.hudi.config.HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS;
+import static org.apache.hudi.config.HoodieWriteConfig.PRECOMBINE_FIELD_NAME;
 import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC;
 import static 
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC_SPEC;
+import static 
org.apache.hudi.keygen.constant.KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE;
+import static 
org.apache.hudi.keygen.constant.KeyGeneratorOptions.RECORDKEY_FIELD_NAME;
+import static 
org.apache.hudi.keygen.constant.KeyGeneratorOptions.URL_ENCODE_PARTITIONING;
 import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT;
 import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
 
@@ -125,12 +144,14 @@ public class BootstrapExecutorUtils implements 
Serializable {
      * Schema provider that supplies the command for reading the input and 
writing out the target table.
      */
     SchemaProvider schemaProvider = 
createSchemaProvider(cfg.schemaProviderClass, props, jssc);
+    String keyGenClass = genKeyGenClassAndPartitionColumns().getLeft();
     HoodieWriteConfig.Builder builder =
         HoodieWriteConfig.newBuilder().withPath(cfg.basePath)
             
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build())
             .forTable(cfg.tableName)
             
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
             .withAutoCommit(true)
+            .withKeyGenerator(keyGenClass)
             .withProps(props);
 
     if (null != schemaProvider && null != schemaProvider.getTargetSchema()) {
@@ -193,16 +214,78 @@ public class BootstrapExecutorUtils implements 
Serializable {
             + ". Cannot bootstrap data on top of an existing table");
       }
     }
-    HoodieTableMetaClient.withPropertyBuilder()
+    Pair<String, String> keyGenClassAndParCols = 
genKeyGenClassAndPartitionColumns();
+    Map<String, Object> timestampKeyGeneratorConfigs =
+        
extractConfigsRelatedToTimestampBasedKeyGenerator(keyGenClassAndParCols.getLeft(),
 props);
+
+    HoodieTableMetaClient.PropertyBuilder builder = 
HoodieTableMetaClient.withPropertyBuilder()
         .fromProperties(props)
         .setTableType(cfg.tableType)
         .setTableName(cfg.tableName)
-        .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
+        .setRecordKeyFields(props.getString(RECORDKEY_FIELD_NAME.key()))
+        .setPreCombineField(props.getString(PRECOMBINE_FIELD_NAME.key(), null))
+        .setPopulateMetaFields(props.getBoolean(
+            POPULATE_META_FIELDS.key(), POPULATE_META_FIELDS.defaultValue()))
+        .setArchiveLogFolder(props.getString(
+            ARCHIVELOG_FOLDER.key(), ARCHIVELOG_FOLDER.defaultValue()))
         .setPayloadClassName(cfg.payloadClass)
         .setBaseFileFormat(cfg.baseFileFormat)
         .setBootstrapIndexClass(cfg.bootstrapIndexClass)
         .setBootstrapBasePath(bootstrapBasePath)
-        .initTable(new Configuration(jssc.hadoopConfiguration()), 
cfg.basePath);
+        .setCDCEnabled(props.getBoolean(HoodieTableConfig.CDC_ENABLED.key(),
+            HoodieTableConfig.CDC_ENABLED.defaultValue()))
+        
.setCDCSupplementalLoggingMode(props.getString(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key(),
+            HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.defaultValue()))
+        .setHiveStylePartitioningEnable(props.getBoolean(
+            HIVE_STYLE_PARTITIONING_ENABLE.key(),
+            Boolean.parseBoolean(HIVE_STYLE_PARTITIONING_ENABLE.defaultValue())
+        ))
+        .setUrlEncodePartitioning(props.getBoolean(
+            URL_ENCODE_PARTITIONING.key(),
+            Boolean.parseBoolean(URL_ENCODE_PARTITIONING.defaultValue())))
+        .setCommitTimezone(HoodieTimelineTimeZone.valueOf(props.getString(
+                TIMELINE_TIMEZONE.key(),
+                String.valueOf(TIMELINE_TIMEZONE.defaultValue()))))
+        .setPartitionMetafileUseBaseFormat(props.getBoolean(
+            PARTITION_METAFILE_USE_BASE_FORMAT.key(),
+            PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()))
+        .set(timestampKeyGeneratorConfigs)
+        .setKeyGeneratorClassProp(keyGenClassAndParCols.getLeft())
+        .setPartitionFields(keyGenClassAndParCols.getRight());
+
+    builder.initTable(new Configuration(jssc.hadoopConfiguration()), 
cfg.basePath);
+  }
+
+  private Pair<String, String> genKeyGenClassAndPartitionColumns() {
+    String keyGenClass;
+    if 
(StringUtils.nonEmpty(props.getString(HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key(),
 null))) {
+      keyGenClass = 
props.getString(HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key());
+    } else if 
(StringUtils.nonEmpty(props.getString(HoodieBootstrapConfig.KEYGEN_TYPE.key(), 
null))) {
+      props.put(HoodieWriteConfig.KEYGENERATOR_TYPE.key(), 
props.getString(HoodieBootstrapConfig.KEYGEN_TYPE.key()));
+      keyGenClass = 
HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName(props);
+    } else {
+      keyGenClass = 
props.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), 
SimpleKeyGenerator.class.getName());
+    }
+    props.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), keyGenClass);
+    String partitionColumns = SparkKeyGenUtils.getPartitionColumns(props);
+
+    if (StringUtils.isNullOrEmpty(partitionColumns)) {
+      partitionColumns = null;
+      if (keyGenClass.equals(SimpleKeyGenerator.class.getName())) {
+        keyGenClass = NonpartitionedKeyGenerator.class.getName();
+      }
+    }
+    return Pair.of(keyGenClass, partitionColumns);
+  }
+
+  private Map<String, Object> 
extractConfigsRelatedToTimestampBasedKeyGenerator(String keyGenerator, 
TypedProperties params) {
+    if 
(TimestampBasedKeyGenerator.class.getCanonicalName().equals(keyGenerator)
+        || 
TimestampBasedAvroKeyGenerator.class.getCanonicalName().equals(keyGenerator)) {
+      return params.entrySet().stream()
+          .filter(p -> 
HoodieTableConfig.PERSISTED_CONFIG_LIST.contains(String.valueOf(p.getKey())))
+          .collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> 
String.valueOf(e.getValue())));
+    }
+    return Collections.emptyMap();
   }
 
   public static class Config {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala
index 2cd50148329..946b687fcc0 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala
@@ -34,6 +34,7 @@ import java.util
 import java.util.Locale
 import java.util.function.Supplier
 
+import scala.collection.JavaConverters._
 class RunBootstrapProcedure extends BaseProcedure with ProcedureBuilder with 
Logging {
   private val PARAMETERS = Array[ProcedureParameter](
     ProcedureParameter.required(0, "table", DataTypes.StringType, None),
@@ -117,6 +118,8 @@ class RunBootstrapProcedure extends BaseProcedure with 
ProcedureBuilder with Log
     cfg.setEnableHiveSync(enableHiveSync)
     cfg.setBootstrapOverwrite(bootstrapOverwrite)
 
+    // add session bootstrap conf
+    properties.putAll(spark.sqlContext.conf.getAllConfs.asJava)
     try {
       new BootstrapExecutorUtils(cfg, jsc, fs, jsc.hadoopConfiguration, 
properties).execute()
     } catch {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala
index bc02c0402bb..9ac6d83f4ea 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala
@@ -50,6 +50,7 @@ class TestBootstrapProcedure extends 
HoodieSparkProcedureTestBase {
         df.write.parquet(sourcePath + Path.SEPARATOR + PARTITION_FIELD + "=" + 
partitions.get(i))
       }
 
+      spark.sql("set hoodie.bootstrap.parallelism = 20")
       // run bootstrap
       checkAnswer(
         s"""call run_bootstrap(
@@ -85,4 +86,53 @@ class TestBootstrapProcedure extends 
HoodieSparkProcedureTestBase {
       }
     }
   }
+
+  test("Test Call run_bootstrap Procedure with no-partitioned") {
+    withTempDir { tmp =>
+      val NUM_OF_RECORDS = 100
+      val PARTITION_FIELD = "datestr"
+      val RECORD_KEY_FIELD = "_row_key"
+
+      val tableName = generateTableName
+      val basePath = s"${tmp.getCanonicalPath}"
+
+      val srcName: String = "source"
+      val sourcePath = basePath + Path.SEPARATOR + srcName
+      val tablePath = basePath + Path.SEPARATOR + tableName
+      val jsc = new JavaSparkContext(spark.sparkContext)
+
+      // generate test data
+      val timestamp: Long = Instant.now.toEpochMilli
+      val df: Dataset[Row] = 
TestBootstrap.generateTestRawTripDataset(timestamp, 0, NUM_OF_RECORDS, null, 
jsc, spark.sqlContext)
+      df.write.parquet(sourcePath)
+
+      spark.sql("set hoodie.bootstrap.parallelism = 20")
+      // run bootstrap
+      checkAnswer(
+        s"""call run_bootstrap(
+           |table => '$tableName',
+           |base_path => '$tablePath',
+           |table_type => '${HoodieTableType.COPY_ON_WRITE.name}',
+           |bootstrap_path => '$sourcePath',
+           |rowKey_field => '$RECORD_KEY_FIELD',
+           |key_generator_class => 'NON_PARTITION',
+           |bootstrap_overwrite => true)""".stripMargin) {
+        Seq(0)
+      }
+
+      // create table
+      spark.sql(
+        s"""
+           |create table $tableName using hudi
+           |location '$tablePath'
+           |tblproperties(primaryKey = '$RECORD_KEY_FIELD')
+           |""".stripMargin)
+
+      // use new hudi table
+      val originCount = spark.sql(s"select count(*) from 
$tableName").collect()(0).getLong(0)
+      spark.sql(s"delete from $tableName where _row_key = 'trip_0'")
+      val afterDeleteCount = spark.sql(s"select count(*) from 
$tableName").collect()(0).getLong(0)
+      assert(originCount != afterDeleteCount)
+    }
+  }
 }

Reply via email to