This is an automated email from the ASF dual-hosted git repository.
forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 502933d2f70 [HUDI-5512] fix spark call procedure run_bootstrap missing
conf and conf can not take effect (#7621)
502933d2f70 is described below
commit 502933d2f70acb0f0fcf930fef70c6ab9127fb3b
Author: KnightChess <[email protected]>
AuthorDate: Sun Jan 15 14:11:53 2023 +0800
[HUDI-5512] fix spark call procedure run_bootstrap missing conf and conf
can not take effect (#7621)
---
.../apache/hudi/cli/BootstrapExecutorUtils.java | 89 +++++++++++++++++++++-
.../command/procedures/RunBootstrapProcedure.scala | 3 +
.../hudi/procedure/TestBootstrapProcedure.scala | 50 ++++++++++++
3 files changed, 139 insertions(+), 3 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
index 97d3cfc4416..e398858f84e 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
+++
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
@@ -22,12 +22,15 @@ import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieTimelineTimeZone;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieBootstrapConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -35,6 +38,12 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
+import org.apache.hudi.keygen.SimpleKeyGenerator;
+import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator;
+import org.apache.hudi.keygen.TimestampBasedKeyGenerator;
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
+import org.apache.hudi.util.SparkKeyGenUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -45,13 +54,23 @@ import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.io.Serializable;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT;
+import static
org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
+import static org.apache.hudi.common.table.HoodieTableConfig.TIMELINE_TIMEZONE;
import static org.apache.hudi.config.HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD;
import static
org.apache.hudi.config.HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS;
+import static org.apache.hudi.config.HoodieWriteConfig.PRECOMBINE_FIELD_NAME;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC;
import static
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC_SPEC;
+import static
org.apache.hudi.keygen.constant.KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE;
+import static
org.apache.hudi.keygen.constant.KeyGeneratorOptions.RECORDKEY_FIELD_NAME;
+import static
org.apache.hudi.keygen.constant.KeyGeneratorOptions.URL_ENCODE_PARTITIONING;
import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
@@ -125,12 +144,14 @@ public class BootstrapExecutorUtils implements
Serializable {
* Schema provider that supplies the command for reading the input and
writing out the target table.
*/
SchemaProvider schemaProvider =
createSchemaProvider(cfg.schemaProviderClass, props, jssc);
+ String keyGenClass = genKeyGenClassAndPartitionColumns().getLeft();
HoodieWriteConfig.Builder builder =
HoodieWriteConfig.newBuilder().withPath(cfg.basePath)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build())
.forTable(cfg.tableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withAutoCommit(true)
+ .withKeyGenerator(keyGenClass)
.withProps(props);
if (null != schemaProvider && null != schemaProvider.getTargetSchema()) {
@@ -193,16 +214,78 @@ public class BootstrapExecutorUtils implements
Serializable {
+ ". Cannot bootstrap data on top of an existing table");
}
}
- HoodieTableMetaClient.withPropertyBuilder()
+ Pair<String, String> keyGenClassAndParCols =
genKeyGenClassAndPartitionColumns();
+ Map<String, Object> timestampKeyGeneratorConfigs =
+
extractConfigsRelatedToTimestampBasedKeyGenerator(keyGenClassAndParCols.getLeft(),
props);
+
+ HoodieTableMetaClient.PropertyBuilder builder =
HoodieTableMetaClient.withPropertyBuilder()
.fromProperties(props)
.setTableType(cfg.tableType)
.setTableName(cfg.tableName)
- .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
+ .setRecordKeyFields(props.getString(RECORDKEY_FIELD_NAME.key()))
+ .setPreCombineField(props.getString(PRECOMBINE_FIELD_NAME.key(), null))
+ .setPopulateMetaFields(props.getBoolean(
+ POPULATE_META_FIELDS.key(), POPULATE_META_FIELDS.defaultValue()))
+ .setArchiveLogFolder(props.getString(
+ ARCHIVELOG_FOLDER.key(), ARCHIVELOG_FOLDER.defaultValue()))
.setPayloadClassName(cfg.payloadClass)
.setBaseFileFormat(cfg.baseFileFormat)
.setBootstrapIndexClass(cfg.bootstrapIndexClass)
.setBootstrapBasePath(bootstrapBasePath)
- .initTable(new Configuration(jssc.hadoopConfiguration()),
cfg.basePath);
+ .setCDCEnabled(props.getBoolean(HoodieTableConfig.CDC_ENABLED.key(),
+ HoodieTableConfig.CDC_ENABLED.defaultValue()))
+
.setCDCSupplementalLoggingMode(props.getString(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key(),
+ HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.defaultValue()))
+ .setHiveStylePartitioningEnable(props.getBoolean(
+ HIVE_STYLE_PARTITIONING_ENABLE.key(),
+ Boolean.parseBoolean(HIVE_STYLE_PARTITIONING_ENABLE.defaultValue())
+ ))
+ .setUrlEncodePartitioning(props.getBoolean(
+ URL_ENCODE_PARTITIONING.key(),
+ Boolean.parseBoolean(URL_ENCODE_PARTITIONING.defaultValue())))
+ .setCommitTimezone(HoodieTimelineTimeZone.valueOf(props.getString(
+ TIMELINE_TIMEZONE.key(),
+ String.valueOf(TIMELINE_TIMEZONE.defaultValue()))))
+ .setPartitionMetafileUseBaseFormat(props.getBoolean(
+ PARTITION_METAFILE_USE_BASE_FORMAT.key(),
+ PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()))
+ .set(timestampKeyGeneratorConfigs)
+ .setKeyGeneratorClassProp(keyGenClassAndParCols.getLeft())
+ .setPartitionFields(keyGenClassAndParCols.getRight());
+
+ builder.initTable(new Configuration(jssc.hadoopConfiguration()),
cfg.basePath);
+ }
+
+ private Pair<String, String> genKeyGenClassAndPartitionColumns() {
+ String keyGenClass;
+ if
(StringUtils.nonEmpty(props.getString(HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key(),
null))) {
+ keyGenClass =
props.getString(HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key());
+ } else if
(StringUtils.nonEmpty(props.getString(HoodieBootstrapConfig.KEYGEN_TYPE.key(),
null))) {
+ props.put(HoodieWriteConfig.KEYGENERATOR_TYPE.key(),
props.getString(HoodieBootstrapConfig.KEYGEN_TYPE.key()));
+ keyGenClass =
HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName(props);
+ } else {
+ keyGenClass =
props.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(),
SimpleKeyGenerator.class.getName());
+ }
+ props.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), keyGenClass);
+ String partitionColumns = SparkKeyGenUtils.getPartitionColumns(props);
+
+ if (StringUtils.isNullOrEmpty(partitionColumns)) {
+ partitionColumns = null;
+ if (keyGenClass.equals(SimpleKeyGenerator.class.getName())) {
+ keyGenClass = NonpartitionedKeyGenerator.class.getName();
+ }
+ }
+ return Pair.of(keyGenClass, partitionColumns);
+ }
+
+ private Map<String, Object>
extractConfigsRelatedToTimestampBasedKeyGenerator(String keyGenerator,
TypedProperties params) {
+ if
(TimestampBasedKeyGenerator.class.getCanonicalName().equals(keyGenerator)
+ ||
TimestampBasedAvroKeyGenerator.class.getCanonicalName().equals(keyGenerator)) {
+ return params.entrySet().stream()
+ .filter(p ->
HoodieTableConfig.PERSISTED_CONFIG_LIST.contains(String.valueOf(p.getKey())))
+ .collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e ->
String.valueOf(e.getValue())));
+ }
+ return Collections.emptyMap();
}
public static class Config {
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala
index 2cd50148329..946b687fcc0 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala
@@ -34,6 +34,7 @@ import java.util
import java.util.Locale
import java.util.function.Supplier
+import scala.collection.JavaConverters._
class RunBootstrapProcedure extends BaseProcedure with ProcedureBuilder with
Logging {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
@@ -117,6 +118,8 @@ class RunBootstrapProcedure extends BaseProcedure with
ProcedureBuilder with Log
cfg.setEnableHiveSync(enableHiveSync)
cfg.setBootstrapOverwrite(bootstrapOverwrite)
+ // add session bootstrap conf
+ properties.putAll(spark.sqlContext.conf.getAllConfs.asJava)
try {
new BootstrapExecutorUtils(cfg, jsc, fs, jsc.hadoopConfiguration,
properties).execute()
} catch {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala
index bc02c0402bb..9ac6d83f4ea 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala
@@ -50,6 +50,7 @@ class TestBootstrapProcedure extends
HoodieSparkProcedureTestBase {
df.write.parquet(sourcePath + Path.SEPARATOR + PARTITION_FIELD + "=" +
partitions.get(i))
}
+ spark.sql("set hoodie.bootstrap.parallelism = 20")
// run bootstrap
checkAnswer(
s"""call run_bootstrap(
@@ -85,4 +86,53 @@ class TestBootstrapProcedure extends
HoodieSparkProcedureTestBase {
}
}
}
+
+ test("Test Call run_bootstrap Procedure with no-partitioned") {
+ withTempDir { tmp =>
+ val NUM_OF_RECORDS = 100
+ val PARTITION_FIELD = "datestr"
+ val RECORD_KEY_FIELD = "_row_key"
+
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}"
+
+ val srcName: String = "source"
+ val sourcePath = basePath + Path.SEPARATOR + srcName
+ val tablePath = basePath + Path.SEPARATOR + tableName
+ val jsc = new JavaSparkContext(spark.sparkContext)
+
+ // generate test data
+ val timestamp: Long = Instant.now.toEpochMilli
+ val df: Dataset[Row] =
TestBootstrap.generateTestRawTripDataset(timestamp, 0, NUM_OF_RECORDS, null,
jsc, spark.sqlContext)
+ df.write.parquet(sourcePath)
+
+ spark.sql("set hoodie.bootstrap.parallelism = 20")
+ // run bootstrap
+ checkAnswer(
+ s"""call run_bootstrap(
+ |table => '$tableName',
+ |base_path => '$tablePath',
+ |table_type => '${HoodieTableType.COPY_ON_WRITE.name}',
+ |bootstrap_path => '$sourcePath',
+ |rowKey_field => '$RECORD_KEY_FIELD',
+ |key_generator_class => 'NON_PARTITION',
+ |bootstrap_overwrite => true)""".stripMargin) {
+ Seq(0)
+ }
+
+ // create table
+ spark.sql(
+ s"""
+ |create table $tableName using hudi
+ |location '$tablePath'
+ |tblproperties(primaryKey = '$RECORD_KEY_FIELD')
+ |""".stripMargin)
+
+ // use new hudi table
+ val originCount = spark.sql(s"select count(*) from
$tableName").collect()(0).getLong(0)
+ spark.sql(s"delete from $tableName where _row_key = 'trip_0'")
+ val afterDeleteCount = spark.sql(s"select count(*) from
$tableName").collect()(0).getLong(0)
+ assert(originCount != afterDeleteCount)
+ }
+ }
}