This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c14ac0e0959 [HUDI-6516] Correct the use of
hoodie.bootstrap.mode.selector (#9164)
c14ac0e0959 is described below
commit c14ac0e09590d11b00f2f33f7c9dcede50423493
Author: Zouxxyy <[email protected]>
AuthorDate: Sun Jul 23 15:13:46 2023 +0800
[HUDI-6516] Correct the use of hoodie.bootstrap.mode.selector (#9164)
---
.../selector/BootstrapRegexModeSelector.java | 3 ++
.../SparkBootstrapCommitActionExecutor.java | 20 +-------
.../org/apache/hudi/functional/TestBootstrap.java | 9 ++--
.../apache/hudi/functional/TestBootstrapRead.java | 12 ++---
.../functional/TestDataSourceForBootstrap.scala | 54 ++++++++++++----------
5 files changed, 44 insertions(+), 54 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/selector/BootstrapRegexModeSelector.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/selector/BootstrapRegexModeSelector.java
index 99c3dbd90a8..0b6e5a77818 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/selector/BootstrapRegexModeSelector.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/selector/BootstrapRegexModeSelector.java
@@ -31,6 +31,9 @@ import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+/**
+ * A bootstrap selector which employs bootstrap mode by specified partitions.
+ */
public class BootstrapRegexModeSelector extends BootstrapModeSelector {
private static final long serialVersionUID = 1L;
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
index eae577d8b9d..d93401c2247 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
@@ -26,7 +26,6 @@ import
org.apache.hudi.client.bootstrap.FullRecordBootstrapDataProvider;
import org.apache.hudi.client.bootstrap.HoodieBootstrapSchemaProvider;
import org.apache.hudi.client.bootstrap.HoodieSparkBootstrapSchemaProvider;
import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector;
-import
org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector;
import
org.apache.hudi.client.bootstrap.translator.BootstrapPartitionPathTranslator;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.SparkValidatorUtils;
@@ -73,7 +72,6 @@ import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -115,10 +113,6 @@ public class SparkBootstrapCommitActionExecutor<T>
"Ensure Bootstrap Source Path is set");
checkArgument(config.getBootstrapModeSelectorClass() != null,
"Ensure Bootstrap Partition Selector is set");
- if (METADATA_ONLY.name().equals(config.getBootstrapModeSelectorRegex())) {
-
checkArgument(!config.getBootstrapModeSelectorClass().equals(FullRecordBootstrapModeSelector.class.getCanonicalName()),
- "FullRecordBootstrapModeSelector cannot be used with METADATA_ONLY
bootstrap mode");
- }
}
@Override
@@ -320,18 +314,8 @@ public class SparkBootstrapCommitActionExecutor<T>
BootstrapModeSelector selector =
(BootstrapModeSelector)
ReflectionUtils.loadClass(config.getBootstrapModeSelectorClass(), config);
- Map<BootstrapMode, List<String>> result = new HashMap<>();
- // for FULL_RECORD mode, original record along with metadata fields are
needed
- if (FULL_RECORD.equals(config.getBootstrapModeForRegexMatch())) {
- if (!(selector instanceof FullRecordBootstrapModeSelector)) {
- FullRecordBootstrapModeSelector fullRecordBootstrapModeSelector = new
FullRecordBootstrapModeSelector(config);
- result.putAll(fullRecordBootstrapModeSelector.select(folders));
- } else {
- result.putAll(selector.select(folders));
- }
- } else {
- result = selector.select(folders);
- }
+ Map<BootstrapMode, List<String>> result = selector.select(folders);
+
Map<String, List<HoodieFileStatus>> partitionToFiles =
folders.stream().collect(
Collectors.toMap(Pair::getKey, Pair::getValue));
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
index d94f065ee0a..b398ea82aa9 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
@@ -272,7 +272,7 @@ public class TestBootstrap extends
HoodieSparkClientTestBase {
metaClient.reloadActiveTimeline();
assertEquals(0, metaClient.getCommitsTimeline().countInstants());
assertEquals(0L, BootstrapUtils.getAllLeafFoldersWithFiles(metaClient,
metaClient.getFs(), basePath, context)
- .stream().flatMap(f -> f.getValue().stream()).count());
+ .stream().mapToLong(f -> f.getValue().size()).sum());
BootstrapIndex index = BootstrapIndex.getBootstrapIndex(metaClient);
assertFalse(index.useIndex());
@@ -295,7 +295,7 @@ public class TestBootstrap extends
HoodieSparkClientTestBase {
// Upsert case
long updateTimestamp = Instant.now().toEpochMilli();
- String updateSPath = tmpFolder.toAbsolutePath().toString() + "/data2";
+ String updateSPath = tmpFolder.toAbsolutePath() + "/data2";
generateNewDataSetAndReturnSchema(updateTimestamp, totalRecords,
partitions, updateSPath);
JavaRDD<HoodieRecord> updateBatch =
generateInputBatch(jsc,
BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(),
updateSPath, context),
@@ -390,7 +390,6 @@ public class TestBootstrap extends
HoodieSparkClientTestBase {
Dataset<Row> missingBootstrapped = sqlContext.sql("select
a._hoodie_record_key from bootstrapped a "
+ "where a._hoodie_record_key not in (select _row_key from
original)");
assertEquals(0, missingBootstrapped.count());
- //sqlContext.sql("select * from bootstrapped").show(10, false);
}
// RO Input Format Read
@@ -410,7 +409,7 @@ public class TestBootstrap extends
HoodieSparkClientTestBase {
}
assertEquals(totalRecords, seenKeys.size());
- //RT Input Format Read
+ // RT Input Format Read
reloadInputFormats();
seenKeys = new HashSet<>();
records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
@@ -475,7 +474,7 @@ public class TestBootstrap extends
HoodieSparkClientTestBase {
}
assertEquals(totalRecords, seenKeys.size());
- //RT Input Format Read - Project only non-hoodie column
+ // RT Input Format Read - Project only non-hoodie column
reloadInputFormats();
seenKeys = new HashSet<>();
records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
index 80fc792ad54..bbce1c61f0f 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
@@ -108,8 +108,8 @@ public class TestBootstrapRead extends
HoodieSparkClientTestBase {
for (Boolean dash : dashPartitions) {
for (String bt : bootstrapType) {
for (Integer n : nPartitions) {
- //can't be mixed bootstrap if it's nonpartitioned
- //don't need to test slash partitions if it's nonpartitioned
+ // can't be mixed bootstrap if it's nonpartitioned
+ // don't need to test slash partitions if it's nonpartitioned
if ((!bt.equals("mixed") && dash) || n > 0) {
b.add(Arguments.of(bt, dash, tt, n));
}
@@ -129,7 +129,7 @@ public class TestBootstrapRead extends
HoodieSparkClientTestBase {
this.nPartitions = nPartitions;
setupDirs();
- //do bootstrap
+ // do bootstrap
Map<String, String> options = setBootstrapOptions();
Dataset<Row> bootstrapDf = sparkSession.emptyDataFrame();
bootstrapDf.write().format("hudi")
@@ -139,7 +139,7 @@ public class TestBootstrapRead extends
HoodieSparkClientTestBase {
compareTables();
verifyMetaColOnlyRead(0);
- //do upserts
+ // do upserts
options = basicOptions();
doUpdate(options, "001");
compareTables();
@@ -224,8 +224,8 @@ public class TestBootstrapRead extends
HoodieSparkClientTestBase {
.mode(SaveMode.Append)
.save(hudiBasePath);
if (bootstrapType.equals("mixed")) {
- //mixed tables have a commit for each of the metadata and full bootstrap
modes
- //so to align with the regular hudi table, we need to compact after 4
commits instead of 3
+ // mixed tables have a commit for each of the metadata and full
bootstrap modes
+ // so to align with the regular hudi table, we need to compact after 4
commits instead of 3
nCompactCommits = "4";
}
df.write().format("hudi")
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
index 12974d133a8..9949b396abf 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
@@ -17,9 +17,8 @@
package org.apache.hudi.functional
-import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider
-import
org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector
+import
org.apache.hudi.client.bootstrap.selector.{FullRecordBootstrapModeSelector,
MetadataOnlyBootstrapModeSelector}
import org.apache.hudi.common.config.HoodieStorageConfig
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord
@@ -30,9 +29,11 @@ import
org.apache.hudi.functional.TestDataSourceForBootstrap.{dropMetaCols, sort
import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.testutils.HoodieClientTestUtils
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
HoodieDataSourceHelpers, HoodieSparkRecordMerger}
+
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.functions.{col, lit}
-import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+import org.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode, SparkSession}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.io.TempDir
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -75,12 +76,14 @@ class TestDataSourceForBootstrap {
val verificationCol: String = "driver"
val originalVerificationVal: String = "driver_0"
val updatedVerificationVal: String = "driver_update"
+ val metadataOnlySelector: String =
classOf[MetadataOnlyBootstrapModeSelector].getCanonicalName
+ val fullRecordSelector: String =
classOf[FullRecordBootstrapModeSelector].getCanonicalName
/**
* TODO rebase onto existing test base-class to avoid duplication
*/
@BeforeEach
- def initialize(@TempDir tempDir: java.nio.file.Path) {
+ def initialize(@TempDir tempDir: java.nio.file.Path): Unit = {
val sparkConf =
HoodieClientTestUtils.getSparkConfForTest(getClass.getSimpleName)
spark = SparkSession.builder.config(sparkConf).getOrCreate
@@ -119,7 +122,7 @@ class TestDataSourceForBootstrap {
// Perform bootstrap
val bootstrapKeygenClass = classOf[NonpartitionedKeyGenerator].getName
val options = commonOpts.-(DataSourceWriteOptions.PARTITIONPATH_FIELD.key)
- val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
+ val commitInstantTime1 = runBootstrapAndVerifyCommit(
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
extraOpts = options ++
Map(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> bootstrapKeygenClass),
bootstrapKeygenClass = bootstrapKeygenClass
@@ -166,13 +169,13 @@ class TestDataSourceForBootstrap {
@ParameterizedTest
@CsvSource(value = Array(
- "METADATA_ONLY,AVRO",
+
"org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector,AVRO",
// TODO(HUDI-5807) enable for spark native records
- /* "METADATA_ONLY,SPARK", */
- "FULL_RECORD,AVRO",
- "FULL_RECORD,SPARK"
+ /*
"org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector,SPARK",
*/
+
"org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector,AVRO",
+
"org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector,SPARK"
))
- def testMetadataBootstrapCOWHiveStylePartitioned(bootstrapMode: String,
recordType: HoodieRecordType): Unit = {
+ def testMetadataBootstrapCOWHiveStylePartitioned(bootstrapSelector: String,
recordType: HoodieRecordType): Unit = {
val timestamp = Instant.now.toEpochMilli
val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
@@ -189,11 +192,11 @@ class TestDataSourceForBootstrap {
val readOpts = commonOpts ++ Map(
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "datestr",
DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true",
- HoodieBootstrapConfig.PARTITION_SELECTOR_REGEX_MODE.key ->
bootstrapMode
+ HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key -> bootstrapSelector
)
// Perform bootstrap
- val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
+ val commitInstantTime1 = runBootstrapAndVerifyCommit(
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
readOpts ++ getRecordTypeOpts(recordType),
classOf[SimpleKeyGenerator].getName)
@@ -201,10 +204,10 @@ class TestDataSourceForBootstrap {
// check marked directory clean up
assert(!fs.exists(new Path(basePath, ".hoodie/.temp/00000000000001")))
- val expectedDF = bootstrapMode match {
- case "METADATA_ONLY" =>
+ val expectedDF = bootstrapSelector match {
+ case `metadataOnlySelector` =>
sort(sourceDF)
- case "FULL_RECORD" =>
+ case `fullRecordSelector` =>
sort(sourceDF)
}
@@ -271,7 +274,7 @@ class TestDataSourceForBootstrap {
)
// Perform bootstrap
- val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
+ val commitInstantTime1 = runBootstrapAndVerifyCommit(
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
writeOpts,
classOf[SimpleKeyGenerator].getName)
@@ -346,7 +349,7 @@ class TestDataSourceForBootstrap {
)
// Perform bootstrap
- val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
+ val commitInstantTime1 = runBootstrapAndVerifyCommit(
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
writeOpts,
classOf[SimpleKeyGenerator].getName)
@@ -414,7 +417,7 @@ class TestDataSourceForBootstrap {
)
// Perform bootstrap
- val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
+ val commitInstantTime1 = runBootstrapAndVerifyCommit(
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
writeOpts,
classOf[SimpleKeyGenerator].getName)
@@ -481,7 +484,7 @@ class TestDataSourceForBootstrap {
)
// Perform bootstrap
- val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
+ val commitInstantTime1 = runBootstrapAndVerifyCommit(
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
writeOpts,
classOf[SimpleKeyGenerator].getName)
@@ -616,9 +619,9 @@ class TestDataSourceForBootstrap {
verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2,
isPartitioned = true, isHiveStylePartitioned = true)
}
- def runMetadataBootstrapAndVerifyCommit(tableType: String,
- extraOpts: Map[String, String] =
Map.empty,
- bootstrapKeygenClass: String):
String = {
+ def runBootstrapAndVerifyCommit(tableType: String,
+ extraOpts: Map[String, String] = Map.empty,
+ bootstrapKeygenClass: String): String = {
val bootstrapDF = spark.emptyDataFrame
bootstrapDF.write
.format("hudi")
@@ -632,7 +635,8 @@ class TestDataSourceForBootstrap {
val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs,
basePath)
val expectedBootstrapInstant =
- if
("FULL_RECORD".equals(extraOpts.getOrElse(HoodieBootstrapConfig.PARTITION_SELECTOR_REGEX_MODE.key,
HoodieBootstrapConfig.PARTITION_SELECTOR_REGEX_MODE.defaultValue)))
+ if
(fullRecordSelector.equals(extraOpts.getOrElse(HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key,
+ HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.defaultValue)))
HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS
else HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS
assertEquals(expectedBootstrapInstant, commitInstantTime1)
@@ -689,9 +693,9 @@ class TestDataSourceForBootstrap {
object TestDataSourceForBootstrap {
- def sort(df: DataFrame) = df.sort("_row_key")
+ def sort(df: DataFrame): Dataset[Row] = df.sort("_row_key")
- def dropMetaCols(df: DataFrame) =
+ def dropMetaCols(df: DataFrame): DataFrame =
df.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala: _*)
}