This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c14ac0e0959 [HUDI-6516] Correct the use of 
hoodie.bootstrap.mode.selector (#9164)
c14ac0e0959 is described below

commit c14ac0e09590d11b00f2f33f7c9dcede50423493
Author: Zouxxyy <[email protected]>
AuthorDate: Sun Jul 23 15:13:46 2023 +0800

    [HUDI-6516] Correct the use of hoodie.bootstrap.mode.selector (#9164)
---
 .../selector/BootstrapRegexModeSelector.java       |  3 ++
 .../SparkBootstrapCommitActionExecutor.java        | 20 +-------
 .../org/apache/hudi/functional/TestBootstrap.java  |  9 ++--
 .../apache/hudi/functional/TestBootstrapRead.java  | 12 ++---
 .../functional/TestDataSourceForBootstrap.scala    | 54 ++++++++++++----------
 5 files changed, 44 insertions(+), 54 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/selector/BootstrapRegexModeSelector.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/selector/BootstrapRegexModeSelector.java
index 99c3dbd90a8..0b6e5a77818 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/selector/BootstrapRegexModeSelector.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/selector/BootstrapRegexModeSelector.java
@@ -31,6 +31,9 @@ import java.util.Map;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
+/**
+ * A bootstrap selector which employs bootstrap mode by specified partitions.
+ */
 public class BootstrapRegexModeSelector extends BootstrapModeSelector {
 
   private static final long serialVersionUID = 1L;
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
index eae577d8b9d..d93401c2247 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
@@ -26,7 +26,6 @@ import 
org.apache.hudi.client.bootstrap.FullRecordBootstrapDataProvider;
 import org.apache.hudi.client.bootstrap.HoodieBootstrapSchemaProvider;
 import org.apache.hudi.client.bootstrap.HoodieSparkBootstrapSchemaProvider;
 import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector;
-import 
org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector;
 import 
org.apache.hudi.client.bootstrap.translator.BootstrapPartitionPathTranslator;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.client.utils.SparkValidatorUtils;
@@ -73,7 +72,6 @@ import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -115,10 +113,6 @@ public class SparkBootstrapCommitActionExecutor<T>
         "Ensure Bootstrap Source Path is set");
     checkArgument(config.getBootstrapModeSelectorClass() != null,
         "Ensure Bootstrap Partition Selector is set");
-    if (METADATA_ONLY.name().equals(config.getBootstrapModeSelectorRegex())) {
-      
checkArgument(!config.getBootstrapModeSelectorClass().equals(FullRecordBootstrapModeSelector.class.getCanonicalName()),
-          "FullRecordBootstrapModeSelector cannot be used with METADATA_ONLY 
bootstrap mode");
-    }
   }
 
   @Override
@@ -320,18 +314,8 @@ public class SparkBootstrapCommitActionExecutor<T>
     BootstrapModeSelector selector =
         (BootstrapModeSelector) 
ReflectionUtils.loadClass(config.getBootstrapModeSelectorClass(), config);
 
-    Map<BootstrapMode, List<String>> result = new HashMap<>();
-    // for FULL_RECORD mode, original record along with metadata fields are 
needed
-    if (FULL_RECORD.equals(config.getBootstrapModeForRegexMatch())) {
-      if (!(selector instanceof FullRecordBootstrapModeSelector)) {
-        FullRecordBootstrapModeSelector fullRecordBootstrapModeSelector = new 
FullRecordBootstrapModeSelector(config);
-        result.putAll(fullRecordBootstrapModeSelector.select(folders));
-      } else {
-        result.putAll(selector.select(folders));
-      }
-    } else {
-      result = selector.select(folders);
-    }
+    Map<BootstrapMode, List<String>> result = selector.select(folders);
+
     Map<String, List<HoodieFileStatus>> partitionToFiles = 
folders.stream().collect(
         Collectors.toMap(Pair::getKey, Pair::getValue));
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
index d94f065ee0a..b398ea82aa9 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
@@ -272,7 +272,7 @@ public class TestBootstrap extends 
HoodieSparkClientTestBase {
     metaClient.reloadActiveTimeline();
     assertEquals(0, metaClient.getCommitsTimeline().countInstants());
     assertEquals(0L, BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, 
metaClient.getFs(), basePath, context)
-            .stream().flatMap(f -> f.getValue().stream()).count());
+        .stream().mapToLong(f -> f.getValue().size()).sum());
 
     BootstrapIndex index = BootstrapIndex.getBootstrapIndex(metaClient);
     assertFalse(index.useIndex());
@@ -295,7 +295,7 @@ public class TestBootstrap extends 
HoodieSparkClientTestBase {
 
     // Upsert case
     long updateTimestamp = Instant.now().toEpochMilli();
-    String updateSPath = tmpFolder.toAbsolutePath().toString() + "/data2";
+    String updateSPath = tmpFolder.toAbsolutePath() + "/data2";
     generateNewDataSetAndReturnSchema(updateTimestamp, totalRecords, 
partitions, updateSPath);
     JavaRDD<HoodieRecord> updateBatch =
         generateInputBatch(jsc, 
BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(), 
updateSPath, context),
@@ -390,7 +390,6 @@ public class TestBootstrap extends 
HoodieSparkClientTestBase {
       Dataset<Row> missingBootstrapped = sqlContext.sql("select 
a._hoodie_record_key from bootstrapped a "
           + "where a._hoodie_record_key not in (select _row_key from 
original)");
       assertEquals(0, missingBootstrapped.count());
-      //sqlContext.sql("select * from bootstrapped").show(10, false);
     }
 
     // RO Input Format Read
@@ -410,7 +409,7 @@ public class TestBootstrap extends 
HoodieSparkClientTestBase {
     }
     assertEquals(totalRecords, seenKeys.size());
 
-    //RT Input Format Read
+    // RT Input Format Read
     reloadInputFormats();
     seenKeys = new HashSet<>();
     records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
@@ -475,7 +474,7 @@ public class TestBootstrap extends 
HoodieSparkClientTestBase {
     }
     assertEquals(totalRecords, seenKeys.size());
 
-    //RT Input Format Read - Project only non-hoodie column
+    // RT Input Format Read - Project only non-hoodie column
     reloadInputFormats();
     seenKeys = new HashSet<>();
     records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
index 80fc792ad54..bbce1c61f0f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
@@ -108,8 +108,8 @@ public class TestBootstrapRead extends 
HoodieSparkClientTestBase {
       for (Boolean dash : dashPartitions) {
         for (String bt : bootstrapType) {
           for (Integer n : nPartitions) {
-            //can't be mixed bootstrap if it's nonpartitioned
-            //don't need to test slash partitions if it's nonpartitioned
+            // can't be mixed bootstrap if it's nonpartitioned
+            // don't need to test slash partitions if it's nonpartitioned
             if ((!bt.equals("mixed") && dash) || n > 0) {
               b.add(Arguments.of(bt, dash, tt, n));
             }
@@ -129,7 +129,7 @@ public class TestBootstrapRead extends 
HoodieSparkClientTestBase {
     this.nPartitions = nPartitions;
     setupDirs();
 
-    //do bootstrap
+    // do bootstrap
     Map<String, String> options = setBootstrapOptions();
     Dataset<Row> bootstrapDf = sparkSession.emptyDataFrame();
     bootstrapDf.write().format("hudi")
@@ -139,7 +139,7 @@ public class TestBootstrapRead extends 
HoodieSparkClientTestBase {
     compareTables();
     verifyMetaColOnlyRead(0);
 
-    //do upserts
+    // do upserts
     options = basicOptions();
     doUpdate(options, "001");
     compareTables();
@@ -224,8 +224,8 @@ public class TestBootstrapRead extends 
HoodieSparkClientTestBase {
         .mode(SaveMode.Append)
         .save(hudiBasePath);
     if (bootstrapType.equals("mixed")) {
-      //mixed tables have a commit for each of the metadata and full bootstrap 
modes
-      //so to align with the regular hudi table, we need to compact after 4 
commits instead of 3
+      // mixed tables have a commit for each of the metadata and full 
bootstrap modes
+      // so to align with the regular hudi table, we need to compact after 4 
commits instead of 3
       nCompactCommits = "4";
     }
     df.write().format("hudi")
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
index 12974d133a8..9949b396abf 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
@@ -17,9 +17,8 @@
 
 package org.apache.hudi.functional
 
-import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider
-import 
org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector
+import 
org.apache.hudi.client.bootstrap.selector.{FullRecordBootstrapModeSelector, 
MetadataOnlyBootstrapModeSelector}
 import org.apache.hudi.common.config.HoodieStorageConfig
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.HoodieRecord
@@ -30,9 +29,11 @@ import 
org.apache.hudi.functional.TestDataSourceForBootstrap.{dropMetaCols, sort
 import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator}
 import org.apache.hudi.testutils.HoodieClientTestUtils
 import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
HoodieDataSourceHelpers, HoodieSparkRecordMerger}
+
+import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql.functions.{col, lit}
-import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+import org.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode, SparkSession}
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.io.TempDir
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -75,12 +76,14 @@ class TestDataSourceForBootstrap {
   val verificationCol: String = "driver"
   val originalVerificationVal: String = "driver_0"
   val updatedVerificationVal: String = "driver_update"
+  val metadataOnlySelector: String = 
classOf[MetadataOnlyBootstrapModeSelector].getCanonicalName
+  val fullRecordSelector: String = 
classOf[FullRecordBootstrapModeSelector].getCanonicalName
 
   /**
    * TODO rebase onto existing test base-class to avoid duplication
    */
   @BeforeEach
-  def initialize(@TempDir tempDir: java.nio.file.Path) {
+  def initialize(@TempDir tempDir: java.nio.file.Path): Unit = {
     val sparkConf = 
HoodieClientTestUtils.getSparkConfForTest(getClass.getSimpleName)
 
     spark = SparkSession.builder.config(sparkConf).getOrCreate
@@ -119,7 +122,7 @@ class TestDataSourceForBootstrap {
     // Perform bootstrap
     val bootstrapKeygenClass = classOf[NonpartitionedKeyGenerator].getName
     val options = commonOpts.-(DataSourceWriteOptions.PARTITIONPATH_FIELD.key)
-    val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
+    val commitInstantTime1 = runBootstrapAndVerifyCommit(
       DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
       extraOpts = options ++ 
Map(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> bootstrapKeygenClass),
       bootstrapKeygenClass = bootstrapKeygenClass
@@ -166,13 +169,13 @@ class TestDataSourceForBootstrap {
 
   @ParameterizedTest
   @CsvSource(value = Array(
-    "METADATA_ONLY,AVRO",
+    
"org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector,AVRO",
     // TODO(HUDI-5807) enable for spark native records
-    /* "METADATA_ONLY,SPARK", */
-    "FULL_RECORD,AVRO",
-    "FULL_RECORD,SPARK"
+    /* 
"org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector,SPARK",
 */
+    
"org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector,AVRO",
+    
"org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector,SPARK"
   ))
-  def testMetadataBootstrapCOWHiveStylePartitioned(bootstrapMode: String, 
recordType: HoodieRecordType): Unit = {
+  def testMetadataBootstrapCOWHiveStylePartitioned(bootstrapSelector: String, 
recordType: HoodieRecordType): Unit = {
     val timestamp = Instant.now.toEpochMilli
     val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
 
@@ -189,11 +192,11 @@ class TestDataSourceForBootstrap {
     val readOpts = commonOpts ++ Map(
         DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "datestr",
         DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true",
-        HoodieBootstrapConfig.PARTITION_SELECTOR_REGEX_MODE.key -> 
bootstrapMode
+        HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key -> bootstrapSelector
     )
 
     // Perform bootstrap
-    val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
+    val commitInstantTime1 = runBootstrapAndVerifyCommit(
       DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
       readOpts ++ getRecordTypeOpts(recordType),
       classOf[SimpleKeyGenerator].getName)
@@ -201,10 +204,10 @@ class TestDataSourceForBootstrap {
     // check marked directory clean up
     assert(!fs.exists(new Path(basePath, ".hoodie/.temp/00000000000001")))
 
-    val expectedDF = bootstrapMode match {
-      case "METADATA_ONLY" =>
+    val expectedDF = bootstrapSelector match {
+      case `metadataOnlySelector` =>
         sort(sourceDF)
-      case "FULL_RECORD" =>
+      case `fullRecordSelector` =>
         sort(sourceDF)
     }
 
@@ -271,7 +274,7 @@ class TestDataSourceForBootstrap {
     )
 
     // Perform bootstrap
-    val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
+    val commitInstantTime1 = runBootstrapAndVerifyCommit(
       DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
       writeOpts,
       classOf[SimpleKeyGenerator].getName)
@@ -346,7 +349,7 @@ class TestDataSourceForBootstrap {
     )
 
     // Perform bootstrap
-    val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
+    val commitInstantTime1 = runBootstrapAndVerifyCommit(
       DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
       writeOpts,
       classOf[SimpleKeyGenerator].getName)
@@ -414,7 +417,7 @@ class TestDataSourceForBootstrap {
     )
 
     // Perform bootstrap
-    val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
+    val commitInstantTime1 = runBootstrapAndVerifyCommit(
       DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
       writeOpts,
       classOf[SimpleKeyGenerator].getName)
@@ -481,7 +484,7 @@ class TestDataSourceForBootstrap {
     )
 
     // Perform bootstrap
-    val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
+    val commitInstantTime1 = runBootstrapAndVerifyCommit(
       DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
       writeOpts,
       classOf[SimpleKeyGenerator].getName)
@@ -616,9 +619,9 @@ class TestDataSourceForBootstrap {
     verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, 
isPartitioned = true, isHiveStylePartitioned = true)
   }
 
-  def runMetadataBootstrapAndVerifyCommit(tableType: String,
-                                          extraOpts: Map[String, String] = 
Map.empty,
-                                          bootstrapKeygenClass: String): 
String = {
+  def runBootstrapAndVerifyCommit(tableType: String,
+                                  extraOpts: Map[String, String] = Map.empty,
+                                  bootstrapKeygenClass: String): String = {
     val bootstrapDF = spark.emptyDataFrame
     bootstrapDF.write
       .format("hudi")
@@ -632,7 +635,8 @@ class TestDataSourceForBootstrap {
 
     val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, 
basePath)
     val expectedBootstrapInstant =
-      if 
("FULL_RECORD".equals(extraOpts.getOrElse(HoodieBootstrapConfig.PARTITION_SELECTOR_REGEX_MODE.key,
 HoodieBootstrapConfig.PARTITION_SELECTOR_REGEX_MODE.defaultValue)))
+      if 
(fullRecordSelector.equals(extraOpts.getOrElse(HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key,
+        HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.defaultValue)))
         HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS
       else HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS
     assertEquals(expectedBootstrapInstant, commitInstantTime1)
@@ -689,9 +693,9 @@ class TestDataSourceForBootstrap {
 
 object TestDataSourceForBootstrap {
 
-  def sort(df: DataFrame) = df.sort("_row_key")
+  def sort(df: DataFrame): Dataset[Row] = df.sort("_row_key")
 
-  def dropMetaCols(df: DataFrame) =
+  def dropMetaCols(df: DataFrame): DataFrame =
     df.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala: _*)
 
 }

Reply via email to