This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 488f58d770 [HUDI-4785] Fix partition discovery in bootstrap operation
(#6673)
488f58d770 is described below
commit 488f58d770137057532196065f2f69eea1a15db8
Author: Sagar Sumit <[email protected]>
AuthorDate: Fri Sep 16 06:36:44 2022 +0530
[HUDI-4785] Fix partition discovery in bootstrap operation (#6673)
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../apache/hudi/config/HoodieBootstrapConfig.java | 20 +++++----
.../SparkBootstrapCommitActionExecutor.java | 47 ++++++++++++++++------
.../SparkBootstrapDeltaCommitActionExecutor.java | 12 ++++--
.../hudi/common/table/TableSchemaResolver.java | 34 ++++++++--------
.../hudi/common/table/TestTableSchemaResolver.java | 12 +++---
.../org/apache/hudi/HoodieBootstrapRelation.scala | 7 ++--
.../SparkFullBootstrapDataProviderBase.java | 4 +-
.../functional/TestDataSourceForBootstrap.scala | 18 +++++++--
8 files changed, 99 insertions(+), 55 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java
index 94bb7830cc..0b9116b01c 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java
@@ -34,6 +34,9 @@ import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
+import static org.apache.hudi.client.bootstrap.BootstrapMode.FULL_RECORD;
+import static org.apache.hudi.client.bootstrap.BootstrapMode.METADATA_ONLY;
+
/**
* Bootstrap specific configs.
*/
@@ -50,6 +53,15 @@ public class HoodieBootstrapConfig extends HoodieConfig {
.sinceVersion("0.6.0")
.withDocumentation("Base path of the dataset that needs to be
bootstrapped as a Hudi table");
+ public static final ConfigProperty<String> PARTITION_SELECTOR_REGEX_MODE =
ConfigProperty
+ .key("hoodie.bootstrap.mode.selector.regex.mode")
+ .defaultValue(METADATA_ONLY.name())
+ .sinceVersion("0.6.0")
+ .withValidValues(METADATA_ONLY.name(), FULL_RECORD.name())
+ .withDocumentation("Bootstrap mode to apply for partition paths, that
match regex above. "
+ + "METADATA_ONLY will generate just skeleton base files with
keys/footers, avoiding full cost of rewriting the dataset. "
+ + "FULL_RECORD will perform a full copy/rewrite of the data as a
Hudi table.");
+
public static final ConfigProperty<String> MODE_SELECTOR_CLASS_NAME =
ConfigProperty
.key("hoodie.bootstrap.mode.selector")
.defaultValue(MetadataOnlyBootstrapModeSelector.class.getCanonicalName())
@@ -92,14 +104,6 @@ public class HoodieBootstrapConfig extends HoodieConfig {
.sinceVersion("0.6.0")
.withDocumentation("Matches each bootstrap dataset partition against
this regex and applies the mode below to it.");
- public static final ConfigProperty<String> PARTITION_SELECTOR_REGEX_MODE =
ConfigProperty
- .key("hoodie.bootstrap.mode.selector.regex.mode")
- .defaultValue(BootstrapMode.METADATA_ONLY.name())
- .sinceVersion("0.6.0")
- .withDocumentation("Bootstrap mode to apply for partition paths, that
match regex above. "
- + "METADATA_ONLY will generate just skeleton base files with
keys/footers, avoiding full cost of rewriting the dataset. "
- + "FULL_RECORD will perform a full copy/rewrite of the data as a
Hudi table.");
-
public static final ConfigProperty<String> INDEX_CLASS_NAME = ConfigProperty
.key("hoodie.bootstrap.index.class")
.defaultValue(HFileBootstrapIndex.class.getName())
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
index a2ee384940..88f6a54e0d 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
@@ -26,6 +26,7 @@ import
org.apache.hudi.client.bootstrap.FullRecordBootstrapDataProvider;
import org.apache.hudi.client.bootstrap.HoodieBootstrapSchemaProvider;
import org.apache.hudi.client.bootstrap.HoodieSparkBootstrapSchemaProvider;
import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector;
+import
org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector;
import
org.apache.hudi.client.bootstrap.translator.BootstrapPartitionPathTranslator;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.SparkValidatorUtils;
@@ -47,7 +48,6 @@ import
org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
-import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
@@ -74,11 +74,15 @@ import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import static org.apache.hudi.client.bootstrap.BootstrapMode.FULL_RECORD;
+import static org.apache.hudi.client.bootstrap.BootstrapMode.METADATA_ONLY;
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static
org.apache.hudi.config.HoodieWriteConfig.WRITE_STATUS_STORAGE_LEVEL_VALUE;
import static
org.apache.hudi.table.action.bootstrap.MetadataBootstrapHandlerFactory.getMetadataHandler;
@@ -93,19 +97,29 @@ public class SparkBootstrapCommitActionExecutor<T extends
HoodieRecordPayload<T>
HoodieWriteConfig config,
HoodieTable table,
Option<Map<String, String>>
extraMetadata) {
- super(context, new HoodieWriteConfig.Builder().withProps(config.getProps())
- .withAutoCommit(true).withWriteStatusClass(BootstrapWriteStatus.class)
- .withBulkInsertParallelism(config.getBootstrapParallelism())
- .build(), table, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
WriteOperationType.BOOTSTRAP,
+ super(
+ context,
+ new HoodieWriteConfig.Builder()
+ .withProps(config.getProps())
+ .withAutoCommit(true)
+ .withWriteStatusClass(BootstrapWriteStatus.class)
+
.withBulkInsertParallelism(config.getBootstrapParallelism()).build(),
+ table,
+ HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
+ WriteOperationType.BOOTSTRAP,
extraMetadata);
bootstrapSourceFileSystem =
FSUtils.getFs(config.getBootstrapSourceBasePath(), hadoopConf);
}
private void validate() {
- ValidationUtils.checkArgument(config.getBootstrapSourceBasePath() != null,
+ checkArgument(config.getBootstrapSourceBasePath() != null,
"Ensure Bootstrap Source Path is set");
- ValidationUtils.checkArgument(config.getBootstrapModeSelectorClass() !=
null,
+ checkArgument(config.getBootstrapModeSelectorClass() != null,
"Ensure Bootstrap Partition Selector is set");
+ if (METADATA_ONLY.name().equals(config.getBootstrapModeSelectorRegex())) {
+
checkArgument(!config.getBootstrapModeSelectorClass().equals(FullRecordBootstrapModeSelector.class.getCanonicalName()),
+ "FullRecordBootstrapModeSelector cannot be used with METADATA_ONLY
bootstrap mode");
+ }
}
@Override
@@ -115,15 +129,15 @@ public class SparkBootstrapCommitActionExecutor<T extends
HoodieRecordPayload<T>
HoodieTableMetaClient metaClient = table.getMetaClient();
Option<HoodieInstant> completedInstant =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
- ValidationUtils.checkArgument(!completedInstant.isPresent(),
+ checkArgument(!completedInstant.isPresent(),
"Active Timeline is expected to be empty for bootstrap to be
performed. "
+ "If you want to re-bootstrap, please rollback bootstrap first
!!");
Map<BootstrapMode, List<Pair<String, List<HoodieFileStatus>>>>
partitionSelections = listAndProcessSourcePartitions();
// First run metadata bootstrap which will auto commit
- Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> metadataResult =
metadataBootstrap(partitionSelections.get(BootstrapMode.METADATA_ONLY));
+ Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> metadataResult =
metadataBootstrap(partitionSelections.get(METADATA_ONLY));
// if there are full bootstrap to be performed, perform that too
- Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> fullBootstrapResult
= fullBootstrap(partitionSelections.get(BootstrapMode.FULL_RECORD));
+ Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> fullBootstrapResult
= fullBootstrap(partitionSelections.get(FULL_RECORD));
return new HoodieBootstrapWriteMetadata(metadataResult,
fullBootstrapResult);
} catch (IOException ioe) {
@@ -307,12 +321,21 @@ public class SparkBootstrapCommitActionExecutor<T extends
HoodieRecordPayload<T>
BootstrapModeSelector selector =
(BootstrapModeSelector)
ReflectionUtils.loadClass(config.getBootstrapModeSelectorClass(), config);
- Map<BootstrapMode, List<String>> result = selector.select(folders);
+ Map<BootstrapMode, List<String>> result = new HashMap<>();
+ // for FULL_RECORD mode, original record along with metadata fields are
needed
+ if (FULL_RECORD.equals(config.getBootstrapModeForRegexMatch())) {
+ if (!(selector instanceof FullRecordBootstrapModeSelector)) {
+ FullRecordBootstrapModeSelector fullRecordBootstrapModeSelector = new
FullRecordBootstrapModeSelector(config);
+ result.putAll(fullRecordBootstrapModeSelector.select(folders));
+ }
+ } else {
+ result = selector.select(folders);
+ }
Map<String, List<HoodieFileStatus>> partitionToFiles =
folders.stream().collect(
Collectors.toMap(Pair::getKey, Pair::getValue));
// Ensure all partitions are accounted for
- ValidationUtils.checkArgument(partitionToFiles.keySet().equals(
+ checkArgument(partitionToFiles.keySet().equals(
result.values().stream().flatMap(Collection::stream).collect(Collectors.toSet())));
return result.entrySet().stream().map(e -> Pair.of(e.getKey(),
e.getValue().stream()
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapDeltaCommitActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapDeltaCommitActionExecutor.java
index d712ca430b..0d2ac6ceef 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapDeltaCommitActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapDeltaCommitActionExecutor.java
@@ -42,8 +42,14 @@ public class SparkBootstrapDeltaCommitActionExecutor<T
extends HoodieRecordPaylo
@Override
protected BaseSparkCommitActionExecutor<T>
getBulkInsertActionExecutor(HoodieData<HoodieRecord> inputRecordsRDD) {
- return new
SparkBulkInsertDeltaCommitActionExecutor((HoodieSparkEngineContext) context,
new HoodieWriteConfig.Builder().withProps(config.getProps())
- .withSchema(bootstrapSchema).build(), table,
HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
- inputRecordsRDD, extraMetadata);
+ return new SparkBulkInsertDeltaCommitActionExecutor(
+ (HoodieSparkEngineContext) context,
+ new HoodieWriteConfig.Builder()
+ .withProps(config.getProps())
+ .withSchema(bootstrapSchema).build(),
+ table,
+ HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
+ inputRecordsRDD,
+ extraMetadata);
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index f825fd6b99..657ac57c63 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -18,17 +18,6 @@
package org.apache.hudi.common.table;
-import org.apache.avro.JsonProperties;
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.SchemaCompatibility;
-import org.apache.avro.generic.IndexedRecord;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
@@ -56,9 +45,17 @@ import org.apache.hudi.io.storage.HoodieHFileReader;
import org.apache.hudi.io.storage.HoodieOrcReader;
import org.apache.hudi.util.Lazy;
+import org.apache.avro.JsonProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.SchemaCompatibility;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
@@ -66,6 +63,7 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;
import javax.annotation.concurrent.ThreadSafe;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -208,7 +206,7 @@ public class TableSchemaResolver {
// TODO partition columns have to be appended in all read-paths
if (metaClient.getTableConfig().shouldDropPartitionColumns()) {
return metaClient.getTableConfig().getPartitionFields()
- .map(partitionFields -> appendPartitionColumns(schema,
partitionFields))
+ .map(partitionFields -> appendPartitionColumns(schema,
Option.ofNullable(partitionFields)))
.orElse(schema);
}
@@ -650,18 +648,18 @@ public class TableSchemaResolver {
}
}
- static Schema appendPartitionColumns(Schema dataSchema, String[]
partitionFields) {
+ public static Schema appendPartitionColumns(Schema dataSchema,
Option<String[]> partitionFields) {
// In cases when {@link DROP_PARTITION_COLUMNS} config is set true,
partition columns
// won't be persisted w/in the data files, and therefore we need to append
such columns
// when schema is parsed from data files
//
// Here we append partition columns with {@code StringType} as the data
type
- if (partitionFields.length == 0) {
+ if (!partitionFields.isPresent() || partitionFields.get().length == 0) {
return dataSchema;
}
- boolean hasPartitionColNotInSchema =
Arrays.stream(partitionFields).anyMatch(pf ->
!containsFieldInSchema(dataSchema, pf));
- boolean hasPartitionColInSchema =
Arrays.stream(partitionFields).anyMatch(pf -> containsFieldInSchema(dataSchema,
pf));
+ boolean hasPartitionColNotInSchema =
Arrays.stream(partitionFields.get()).anyMatch(pf ->
!containsFieldInSchema(dataSchema, pf));
+ boolean hasPartitionColInSchema =
Arrays.stream(partitionFields.get()).anyMatch(pf ->
containsFieldInSchema(dataSchema, pf));
if (hasPartitionColNotInSchema && hasPartitionColInSchema) {
throw new HoodieIncompatibleSchemaException("Partition columns could not
be partially contained w/in the data schema");
}
@@ -670,7 +668,7 @@ public class TableSchemaResolver {
// when hasPartitionColNotInSchema is true and hasPartitionColInSchema
is false, all partition columns
// are not in originSchema. So we create and add them.
List<Field> newFields = new ArrayList<>();
- for (String partitionField: partitionFields) {
+ for (String partitionField: partitionFields.get()) {
newFields.add(new Schema.Field(
partitionField, createNullableSchema(Schema.Type.STRING), "",
JsonProperties.NULL_VALUE));
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
index 51d5c5212f..5d949431e4 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
@@ -18,10 +18,12 @@
package org.apache.hudi.common.table;
-import org.apache.avro.Schema;
import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIncompatibleSchemaException;
+
+import org.apache.avro.Schema;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -36,17 +38,17 @@ public class TestTableSchemaResolver {
// case2
String[] pts1 = new String[0];
- Schema s2 = TableSchemaResolver.appendPartitionColumns(originSchema, pts1);
+ Schema s2 = TableSchemaResolver.appendPartitionColumns(originSchema,
Option.of(pts1));
assertEquals(originSchema, s2);
// case3: partition_path is in originSchema
String[] pts2 = {"partition_path"};
- Schema s3 = TableSchemaResolver.appendPartitionColumns(originSchema, pts2);
+ Schema s3 = TableSchemaResolver.appendPartitionColumns(originSchema,
Option.of(pts2));
assertEquals(originSchema, s3);
// case4: user_partition is not in originSchema
String[] pts3 = {"user_partition"};
- Schema s4 = TableSchemaResolver.appendPartitionColumns(originSchema, pts3);
+ Schema s4 = TableSchemaResolver.appendPartitionColumns(originSchema,
Option.of(pts3));
assertNotEquals(originSchema, s4);
assertTrue(s4.getFields().stream().anyMatch(f ->
f.name().equals("user_partition")));
Schema.Field f = s4.getField("user_partition");
@@ -55,7 +57,7 @@ public class TestTableSchemaResolver {
// case5: user_partition is in originSchema, but partition_path is in
originSchema
String[] pts4 = {"user_partition", "partition_path"};
try {
- TableSchemaResolver.appendPartitionColumns(originSchema, pts3);
+ TableSchemaResolver.appendPartitionColumns(originSchema,
Option.of(pts3));
} catch (HoodieIncompatibleSchemaException e) {
assertTrue(e.getMessage().contains("Partial partition fields are still
in the schema"));
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
index 00de2f756e..4ec7f65913 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
@@ -20,18 +20,17 @@ package org.apache.hudi
import org.apache.hadoop.fs.Path
import org.apache.hudi.common.model.HoodieBaseFile
-import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.exception.HoodieException
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.{FileStatusCache,
PartitionedFile}
-import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
-import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{Row, SQLContext}
import scala.collection.JavaConverters._
@@ -147,7 +146,7 @@ class HoodieBootstrapRelation(@transient val _sqlContext:
SQLContext,
if (fullSchema == null) {
logInfo("Inferring schema..")
val schemaResolver = new TableSchemaResolver(metaClient)
- val tableSchema = schemaResolver.getTableAvroSchemaWithoutMetadataFields
+ val tableSchema =
TableSchemaResolver.appendPartitionColumns(schemaResolver.getTableAvroSchemaWithoutMetadataFields,
metaClient.getTableConfig.getPartitionFields)
dataSchema =
AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
fullSchema = StructType(skeletonSchema.fields ++ dataSchema.fields)
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
index a0204b256e..bc732a1401 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
+++
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
@@ -60,7 +60,9 @@ public abstract class SparkFullBootstrapDataProviderBase
extends FullRecordBoots
.flatMap(f -> f.stream().map(fs ->
FileStatusUtils.toPath(fs.getPath()).toString()))
.toArray(String[]::new);
- Dataset inputDataset =
sparkSession.read().format(getFormat()).load(filePaths);
+ // NOTE: "basePath" option is required for spark to discover the partition
column
+ // More details at
https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery
+ Dataset inputDataset =
sparkSession.read().format(getFormat()).option("basePath",
sourceBasePath).load(filePaths);
try {
KeyGenerator keyGenerator =
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
String structName = tableName + "_record";
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
index 23b21b315f..a2b9d29009 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
@@ -31,6 +31,8 @@ import org.apache.spark.sql.{SaveMode, SparkSession}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.io.TempDir
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import java.time.Instant
import java.util.Collections
@@ -148,8 +150,9 @@ class TestDataSourceForBootstrap {
verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2,
isPartitioned = false, isHiveStylePartitioned = false)
}
- @Test
- def testMetadataBootstrapCOWHiveStylePartitioned(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("METADATA_ONLY", "FULL_RECORD"))
+ def testMetadataBootstrapCOWHiveStylePartitioned(bootstrapMode: String):
Unit = {
val timestamp = Instant.now.toEpochMilli
val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
@@ -166,7 +169,10 @@ class TestDataSourceForBootstrap {
// Perform bootstrap
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
- commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key,
"datestr") ++ Map(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true"),
+ commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key,
"datestr") ++
+ Map(
+ DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true",
+ HoodieBootstrapConfig.PARTITION_SELECTOR_REGEX_MODE.key ->
bootstrapMode),
classOf[SimpleKeyGenerator].getName)
// check marked directory clean up
@@ -520,7 +526,11 @@ class TestDataSourceForBootstrap {
.save(basePath)
val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs,
basePath)
- assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
commitInstantTime1)
+ val expectedBootstrapInstant =
+ if
("FULL_RECORD".equals(extraOpts.getOrElse(HoodieBootstrapConfig.PARTITION_SELECTOR_REGEX_MODE.key,
HoodieBootstrapConfig.PARTITION_SELECTOR_REGEX_MODE.defaultValue)))
+ HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS
+ else HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS
+ assertEquals(expectedBootstrapInstant, commitInstantTime1)
commitInstantTime1
}