This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit ba94e96dbf125d000feb70b415979d8ac373f906
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed May 15 07:04:20 2024 -0700

    [HUDI-7712] Fixing RLI initialization to account for file slices instead of 
just base files while initializing (#11153)
    
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../java/org/apache/hudi/io/HoodieIOHandle.java    |   4 +-
 .../org/apache/hudi/io/HoodieMergedReadHandle.java |  14 ++-
 .../metadata/HoodieBackedTableMetadataWriter.java  | 104 +++++++++++++++++----
 .../FlinkHoodieBackedTableMetadataWriter.java      |   7 ++
 .../SparkHoodieBackedTableMetadataWriter.java      |   8 ++
 .../common/testutils/HoodieTestDataGenerator.java  |  12 +++
 .../hudi/functional/RecordLevelIndexTestBase.scala |  21 ++++-
 .../hudi/functional/TestRecordLevelIndex.scala     |  78 +++++++++++++++-
 .../deltastreamer/TestHoodieDeltaStreamer.java     |   2 +-
 9 files changed, 222 insertions(+), 28 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieIOHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieIOHandle.java
index 39400394048..6865a6ac653 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieIOHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieIOHandle.java
@@ -30,9 +30,9 @@ public abstract class HoodieIOHandle<T, I, K, O> {
 
   protected final String instantTime;
   protected final HoodieWriteConfig config;
-  protected final HoodieStorage storage;
-  protected final FileSystem fs;
   protected final HoodieTable<T, I, K, O> hoodieTable;
+  protected FileSystem fs;
+  protected HoodieStorage storage;
 
   HoodieIOHandle(HoodieWriteConfig config, Option<String> instantTime, 
HoodieTable<T, I, K, O> hoodieTable) {
     this.instantTime = instantTime.orElse(StringUtils.EMPTY_STRING);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
index bb64edbb0b0..4d5ace58274 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
@@ -53,25 +53,35 @@ public class HoodieMergedReadHandle<T, I, K, O> extends 
HoodieReadHandle<T, I, K
 
   protected final Schema readerSchema;
   protected final Schema baseFileReaderSchema;
+  private final Option<FileSlice> fileSliceOpt;
 
   public HoodieMergedReadHandle(HoodieWriteConfig config,
                                 Option<String> instantTime,
                                 HoodieTable<T, I, K, O> hoodieTable,
                                 Pair<String, String> partitionPathFileIDPair) {
+    this(config, instantTime, hoodieTable, partitionPathFileIDPair, 
Option.empty());
+  }
+
+  public HoodieMergedReadHandle(HoodieWriteConfig config,
+                                Option<String> instantTime,
+                                HoodieTable<T, I, K, O> hoodieTable,
+                                Pair<String, String> partitionPathFileIDPair,
+                                Option<FileSlice> fileSliceOption) {
     super(config, instantTime, hoodieTable, partitionPathFileIDPair);
     readerSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(config.getSchema()), 
config.allowOperationMetadataField());
     // config.getSchema is not canonicalized, while config.getWriteSchema is 
canonicalized. So, we have to use the canonicalized schema to read the existing 
data.
     baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(config.getWriteSchema()), 
config.allowOperationMetadataField());
+    fileSliceOpt = fileSliceOption.isPresent() ? fileSliceOption : 
getLatestFileSlice();
   }
 
   public List<HoodieRecord<T>> getMergedRecords() {
-    Option<FileSlice> fileSliceOpt = getLatestFileSlice();
     if (!fileSliceOpt.isPresent()) {
       return Collections.emptyList();
     }
     checkState(nonEmpty(instantTime), String.format("Expected a valid instant 
time but got `%s`", instantTime));
     final FileSlice fileSlice = fileSliceOpt.get();
-    final HoodieRecordLocation currentLocation = new 
HoodieRecordLocation(instantTime, fileSlice.getFileId());
+    String baseFileInstantTime = fileSlice.getBaseFile().get().getCommitTime();
+    final HoodieRecordLocation currentLocation = new 
HoodieRecordLocation(baseFileInstantTime, fileSlice.getFileId());
     Option<HoodieFileReader> baseFileReader = Option.empty();
     HoodieMergedLogRecordScanner logRecordScanner = null;
     try {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 445c7b74fff..dd292830a85 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -71,7 +71,9 @@ import org.apache.hudi.storage.HoodieStorageUtils;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
+import org.apache.hudi.io.HoodieMergedReadHandle;
 import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.table.HoodieTable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -179,6 +181,10 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     ValidationUtils.checkArgument(!initialized || this.metadata != null, "MDT 
Reader should have been opened post initialization");
   }
 
+  protected HoodieTable getHoodieTable(HoodieWriteConfig writeConfig, 
HoodieTableMetaClient metaClient) {
+    return null;
+  }
+
   private void initMetadataReader() {
     if (this.metadata != null) {
       this.metadata.close();
@@ -487,28 +493,50 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
   private Pair<Integer, HoodieData<HoodieRecord>> 
initializeRecordIndexPartition() throws IOException {
     final HoodieMetadataFileSystemView fsView = new 
HoodieMetadataFileSystemView(dataMetaClient,
         dataMetaClient.getActiveTimeline(), metadata);
+    final HoodieTable hoodieTable = getHoodieTable(dataWriteConfig, 
dataMetaClient);
 
     // Collect the list of latest base files present in each partition
     List<String> partitions = metadata.getAllPartitionPaths();
     fsView.loadAllPartitions();
-    final List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs = new 
ArrayList<>();
-    for (String partition : partitions) {
-      partitionBaseFilePairs.addAll(fsView.getLatestBaseFiles(partition)
-          .map(basefile -> Pair.of(partition, 
basefile)).collect(Collectors.toList()));
-    }
+    HoodieData<HoodieRecord> records = null;
+    if (dataMetaClient.getTableConfig().getTableType() == 
HoodieTableType.COPY_ON_WRITE) {
+      // for COW, we can only consider base files to initialize.
+      final List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs = new 
ArrayList<>();
+      for (String partition : partitions) {
+        partitionBaseFilePairs.addAll(fsView.getLatestBaseFiles(partition)
+            .map(basefile -> Pair.of(partition, 
basefile)).collect(Collectors.toList()));
+      }
 
-    LOG.info("Initializing record index from {} base files in {} partitions", 
partitionBaseFilePairs.size(), partitions.size());
+      LOG.info("Initializing record index from " + 
partitionBaseFilePairs.size() + " base files in "
+          + partitions.size() + " partitions");
+
+      // Collect record keys from the files in parallel
+      records = readRecordKeysFromBaseFiles(
+          engineContext,
+          dataWriteConfig,
+          partitionBaseFilePairs,
+          false,
+          dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism(),
+          dataWriteConfig.getBasePath(),
+          storageConf,
+          this.getClass().getSimpleName());
+    } else {
+      final List<Pair<String, FileSlice>> partitionFileSlicePairs = new 
ArrayList<>();
+      for (String partition : partitions) {
+        fsView.getLatestFileSlices(partition).forEach(fs -> 
partitionFileSlicePairs.add(Pair.of(partition, fs)));
+      }
 
-    // Collect record keys from the files in parallel
-    HoodieData<HoodieRecord> records = readRecordKeysFromBaseFiles(
-        engineContext,
-        dataWriteConfig,
-        partitionBaseFilePairs,
-        false,
-        dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism(),
-        dataWriteConfig.getBasePath(),
-        storageConf,
-        this.getClass().getSimpleName());
+      LOG.info("Initializing record index from " + 
partitionFileSlicePairs.size() + " file slices in "
+          + partitions.size() + " partitions");
+      records = readRecordKeysFromFileSliceSnapshot(
+          engineContext,
+          partitionFileSlicePairs,
+          dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism(),
+          this.getClass().getSimpleName(),
+          dataMetaClient,
+          dataWriteConfig,
+          hoodieTable);
+    }
     records.persist("MEMORY_AND_DISK_SER");
     final long recordCount = records.count();
 
@@ -522,6 +550,50 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     return Pair.of(fileGroupCount, records);
   }
 
+  /**
+   * Fetch record locations from FileSlice snapshot.
+   * @param engineContext context ot use.
+   * @param partitionFileSlicePairs list of pairs of partition and file slice.
+   * @param recordIndexMaxParallelism parallelism to use.
+   * @param activeModule active module of interest.
+   * @param metaClient metaclient instance to use.
+   * @param dataWriteConfig write config to use.
+   * @param hoodieTable hoodie table instance of interest.
+   * @return
+   */
+  private static HoodieData<HoodieRecord> 
readRecordKeysFromFileSliceSnapshot(HoodieEngineContext engineContext,
+                                                                              
List<Pair<String, FileSlice>> partitionFileSlicePairs,
+                                                                              
int recordIndexMaxParallelism,
+                                                                              
String activeModule,
+                                                                              
HoodieTableMetaClient metaClient,
+                                                                              
HoodieWriteConfig dataWriteConfig,
+                                                                              
HoodieTable hoodieTable) {
+    if (partitionFileSlicePairs.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+
+    Option<String> instantTime = 
metaClient.getActiveTimeline().getCommitsTimeline()
+        .filterCompletedInstants()
+        .lastInstant()
+        .map(HoodieInstant::getTimestamp);
+
+    engineContext.setJobStatus(activeModule, "Record Index: reading record 
keys from " + partitionFileSlicePairs.size() + " file slices");
+    final int parallelism = Math.min(partitionFileSlicePairs.size(), 
recordIndexMaxParallelism);
+
+    return engineContext.parallelize(partitionFileSlicePairs, 
parallelism).flatMap(partitionAndFileSlice -> {
+
+      final String partition = partitionAndFileSlice.getKey();
+      final FileSlice fileSlice = partitionAndFileSlice.getValue();
+      final String fileId = fileSlice.getFileId();
+      return new HoodieMergedReadHandle(dataWriteConfig, instantTime, 
hoodieTable, Pair.of(partition, fileSlice.getFileId()),
+          Option.of(fileSlice)).getMergedRecords().stream().map(record -> {
+            HoodieRecord record1 = (HoodieRecord) record;
+            return 
HoodieMetadataPayload.createRecordIndexUpdate(record1.getRecordKey(), 
partition, fileId,
+            record1.getCurrentLocation().getInstantTime(), 0);
+          }).iterator();
+    });
+  }
+
   private Pair<Integer, HoodieData<HoodieRecord>> 
initializeFilesPartition(List<DirectoryInfo> partitionInfoList) {
     // FILES partition uses a single file group
     final int fileGroupCount = 1;
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index 2ae017b85b4..77f1439c982 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -36,6 +36,8 @@ import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.table.HoodieTable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -197,4 +199,9 @@ public class FlinkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
   protected void preWrite(String instantTime) {
     
metadataMetaClient.getActiveTimeline().transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION,
 instantTime);
   }
+
+  @Override
+  protected HoodieTable getHoodieTable(HoodieWriteConfig writeConfig, 
HoodieTableMetaClient metaClient) {
+    return HoodieFlinkTable.create(writeConfig, engineContext, metaClient);
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 8e73a52ab4c..34b1c91e07b 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -28,6 +28,7 @@ import 
org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -35,6 +36,8 @@ import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.metrics.DistributedRegistry;
 import org.apache.hudi.metrics.MetricsReporterType;
 import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
 
 import org.apache.spark.api.java.JavaRDD;
 import org.slf4j.Logger;
@@ -141,6 +144,11 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
     writeClient.deletePartitions(partitionsToDrop, instantTime);
   }
 
+  @Override
+  protected HoodieTable getHoodieTable(HoodieWriteConfig writeConfig, 
HoodieTableMetaClient metaClient) {
+    return HoodieSparkTable.create(writeConfig, engineContext, metaClient);
+  }
+
   @Override
   public BaseHoodieWriteClient<?, JavaRDD<HoodieRecord>, ?, ?> 
initializeWriteClient() {
     return new SparkRDDWriteClient(engineContext, metadataWriteConfig, true);
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index ca463cbf0e2..544d8bc787b 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -211,6 +211,18 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
     this(DEFAULT_PARTITION_PATHS);
   }
 
+  public static HoodieTestDataGenerator createTestGeneratorFirstPartition() {
+    return new HoodieTestDataGenerator(new 
String[]{DEFAULT_FIRST_PARTITION_PATH});
+  }
+
+  public static HoodieTestDataGenerator createTestGeneratorSecondPartition() {
+    return new HoodieTestDataGenerator(new 
String[]{DEFAULT_SECOND_PARTITION_PATH});
+  }
+
+  public static HoodieTestDataGenerator createTestGeneratorThirdPartition() {
+    return new HoodieTestDataGenerator(new 
String[]{DEFAULT_THIRD_PARTITION_PATH});
+  }
+
   public HoodieTestDataGenerator(boolean makeDatesAmbiguous) {
     this();
     this.makeDatesAmbiguous = makeDatesAmbiguous;
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
index b4130ac189b..96853950d50 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
@@ -34,6 +34,7 @@ import org.apache.hudi.testutils.HoodieSparkClientTestBase
 import org.apache.hudi.util.JavaConversions
 
 import org.apache.spark.sql._
+import org.apache.spark.sql.{DataFrame, _}
 import org.apache.spark.sql.functions.{col, not}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api._
@@ -191,10 +192,14 @@ class RecordLevelIndexTestBase extends 
HoodieSparkClientTestBase {
     latestBatchDf
   }
 
+  protected def calculateMergedDf(latestBatchDf: DataFrame, operation: 
String): DataFrame = {
+    calculateMergedDf(latestBatchDf, operation, false)
+  }
+
   /**
    * @return [[DataFrame]] that should not exist as of the latest instant; 
used for non-existence validation.
    */
-  protected def calculateMergedDf(latestBatchDf: DataFrame, operation: 
String): DataFrame = {
+  protected def calculateMergedDf(latestBatchDf: DataFrame, operation: String, 
globalIndexEnableUpdatePartitions: Boolean): DataFrame = {
     val prevDfOpt = mergedDfList.lastOption
     if (prevDfOpt.isEmpty) {
       mergedDfList = mergedDfList :+ latestBatchDf
@@ -217,10 +222,16 @@ class RecordLevelIndexTestBase extends 
HoodieSparkClientTestBase {
         prevDf.filter(col("partition").isInCollection(overwrittenPartitions))
       } else {
         val prevDf = prevDfOpt.get
-        val prevDfOld = prevDf.join(latestBatchDf, prevDf("_row_key") === 
latestBatchDf("_row_key")
-          && prevDf("partition") === latestBatchDf("partition"), "leftanti")
-        val latestSnapshot = prevDfOld.union(latestBatchDf)
-        mergedDfList = mergedDfList :+ latestSnapshot
+        if (globalIndexEnableUpdatePartitions) {
+          val prevDfOld = prevDf.join(latestBatchDf, prevDf("_row_key") === 
latestBatchDf("_row_key"), "leftanti")
+          val latestSnapshot = prevDfOld.union(latestBatchDf)
+          mergedDfList = mergedDfList :+ latestSnapshot
+        } else {
+          val prevDfOld = prevDf.join(latestBatchDf, prevDf("_row_key") === 
latestBatchDf("_row_key")
+            && prevDf("partition") === latestBatchDf("partition"), "leftanti")
+          val latestSnapshot = prevDfOld.union(latestBatchDf)
+          mergedDfList = mergedDfList :+ latestSnapshot
+        }
         sparkSession.emptyDataFrame
       }
     }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
index 393587f34ac..a2ae2b27445 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
@@ -23,13 +23,16 @@ import org.apache.hudi.DataSourceWriteOptions._
 import 
org.apache.hudi.client.transaction.PreferWriterConflictResolutionStrategy
 import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.model._
-import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
+import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, 
HoodieInstant, HoodieTimeline}
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
 import org.apache.hudi.config._
 import org.apache.hudi.exception.HoodieWriteConflictException
-import 
org.apache.hudi.functional.TestCOWDataSourceStorage.{SQL_DRIVER_IS_NOT_NULL, 
SQL_DRIVER_IS_NULL, SQL_QUERY_EQUALITY_VALIDATOR_CLASS_NAME, 
SQL_QUERY_INEQUALITY_VALIDATOR_CLASS_NAME, SQL_RIDER_IS_NOT_NULL, 
SQL_RIDER_IS_NULL}
 import org.apache.hudi.metadata.{HoodieBackedTableMetadata, 
MetadataPartitionType}
 import org.apache.hudi.util.JavaConversions
+
 import org.apache.spark.sql._
+import org.apache.spark.sql.functions.lit
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api._
 import org.junit.jupiter.params.ParameterizedTest
@@ -38,6 +41,7 @@ import org.junit.jupiter.params.provider.{Arguments, 
CsvSource, EnumSource, Meth
 
 import java.util.Collections
 import java.util.concurrent.Executors
+
 import scala.collection.JavaConverters._
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, ExecutionContext, Future}
@@ -55,6 +59,76 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase {
       saveMode = SaveMode.Overwrite)
   }
 
+  @Test
+  def testRLIInitializationForMorGlobalIndex(): Unit = {
+    val tableType = HoodieTableType.MERGE_ON_READ
+    val hudiOpts = commonOpts + (DataSourceWriteOptions.TABLE_TYPE.key -> 
tableType.name()) +
+      (HoodieMetadataConfig.RECORD_INDEX_MIN_FILE_GROUP_COUNT_PROP.key -> "1") 
+
+      (HoodieMetadataConfig.RECORD_INDEX_MAX_FILE_GROUP_COUNT_PROP.key -> "1") 
+
+      (HoodieIndexConfig.INDEX_TYPE.key -> "RECORD_INDEX") +
+      (HoodieIndexConfig.RECORD_INDEX_UPDATE_PARTITION_PATH_ENABLE.key -> 
"true") -
+      HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key
+
+    val dataGen1 = HoodieTestDataGenerator.createTestGeneratorFirstPartition()
+    val dataGen2 = HoodieTestDataGenerator.createTestGeneratorSecondPartition()
+
+    // batch1 inserts
+    val instantTime1 = getNewInstantTime()
+    val latestBatch = recordsToStrings(dataGen1.generateInserts(instantTime1, 
5)).asScala.toSeq
+    var operation = INSERT_OPERATION_OPT_VAL
+    val latestBatchDf = 
spark.read.json(spark.sparkContext.parallelize(latestBatch, 1))
+    latestBatchDf.cache()
+    latestBatchDf.write.format("org.apache.hudi")
+      .options(hudiOpts)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+    val deletedDf1 = calculateMergedDf(latestBatchDf, operation, true)
+    deletedDf1.cache()
+
+    // batch2. upsert. update few records to 2nd partition from partition1 and 
insert a few to partition2.
+    val instantTime2 = getNewInstantTime()
+
+    val latestBatch2_1 = 
recordsToStrings(dataGen1.generateUniqueUpdates(instantTime2, 3)).asScala.toSeq
+    val latestBatchDf2_1 = 
spark.read.json(spark.sparkContext.parallelize(latestBatch2_1, 1))
+    val latestBatchDf2_2 = latestBatchDf2_1.withColumn("partition", 
lit(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH))
+      .withColumn("partition_path", 
lit(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH))
+    val latestBatch2_3 = 
recordsToStrings(dataGen2.generateInserts(instantTime2, 2)).asScala.toSeq
+    val latestBatchDf2_3 = 
spark.read.json(spark.sparkContext.parallelize(latestBatch2_3, 1))
+    val latestBatchDf2Final = latestBatchDf2_3.union(latestBatchDf2_2)
+    latestBatchDf2Final.cache()
+    latestBatchDf2Final.write.format("org.apache.hudi")
+      .options(hudiOpts)
+      .mode(SaveMode.Append)
+      .save(basePath)
+    operation = UPSERT_OPERATION_OPT_VAL
+    val deletedDf2 = calculateMergedDf(latestBatchDf2Final, operation, true)
+    deletedDf2.cache()
+
+    val hudiOpts2 = commonOpts + (DataSourceWriteOptions.TABLE_TYPE.key -> 
tableType.name()) +
+      (HoodieMetadataConfig.RECORD_INDEX_MIN_FILE_GROUP_COUNT_PROP.key -> "1") 
+
+      (HoodieMetadataConfig.RECORD_INDEX_MAX_FILE_GROUP_COUNT_PROP.key -> "1") 
+
+      (HoodieIndexConfig.INDEX_TYPE.key -> "RECORD_INDEX") +
+      (HoodieIndexConfig.RECORD_INDEX_UPDATE_PARTITION_PATH_ENABLE.key -> 
"true") +
+      (HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key -> "true")
+
+    val instantTime3 = getNewInstantTime()
+    // batch3. updates to partition2
+    val latestBatch3 = 
recordsToStrings(dataGen2.generateUniqueUpdates(instantTime3, 2)).asScala.toSeq
+    val latestBatchDf3 = 
spark.read.json(spark.sparkContext.parallelize(latestBatch3, 1))
+    latestBatchDf3.cache()
+    latestBatchDf.write.format("org.apache.hudi")
+      .options(hudiOpts2)
+      .mode(SaveMode.Append)
+      .save(basePath)
+    val deletedDf3 = calculateMergedDf(latestBatchDf, operation, true)
+    deletedDf3.cache()
+    validateDataAndRecordIndices(hudiOpts, deletedDf3)
+  }
+
+  private def getNewInstantTime(): String = {
+    HoodieActiveTimeline.createNewInstantTime();
+  }
+
   @ParameterizedTest
   @EnumSource(classOf[HoodieTableType])
   def testRLIUpsert(tableType: HoodieTableType): Unit = {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 9831ec060a8..cb30d3dc0be 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -2910,7 +2910,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1);
 
     HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(HoodieTestUtils.getDefaultStorageConf()).build();
-    List<String> partitions = FSUtils.getAllPartitionPaths(new 
HoodieLocalEngineContext(metaClient.getStorageConf()), 
metaClient.getBasePath(), false);
+    List<String> partitions = FSUtils.getAllPartitionPaths(new 
HoodieLocalEngineContext(metaClient.getStorageConf()), 
metaClient.getBasePath(), false, false);
     StorageConfiguration hadoopConf = metaClient.getStorageConf();
     HoodieLocalEngineContext engContext = new 
HoodieLocalEngineContext(hadoopConf);
     HoodieMetadataFileSystemView fsView = new 
HoodieMetadataFileSystemView(engContext, metaClient,

Reply via email to