This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d74e642b5be [HUDI-7712] Fixing RLI initialization to account for file 
slices instead of just base files while initializing (#11153)
d74e642b5be is described below

commit d74e642b5be121dc76f448f6943db90f99863709
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Tue May 14 10:03:12 2024 -0700

    [HUDI-7712] Fixing RLI initialization to account for file slices instead of 
just base files while initializing (#11153)
    
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../java/org/apache/hudi/io/HoodieIOHandle.java    |   4 +-
 .../org/apache/hudi/io/HoodieMergedReadHandle.java |  14 ++-
 .../metadata/HoodieBackedTableMetadataWriter.java  | 104 +++++++++++++++++----
 .../FlinkHoodieBackedTableMetadataWriter.java      |   7 ++
 .../SparkHoodieBackedTableMetadataWriter.java      |   7 ++
 .../common/testutils/HoodieTestDataGenerator.java  |  12 +++
 .../hudi/functional/RecordLevelIndexTestBase.scala |  21 ++++-
 .../hudi/functional/TestRecordLevelIndex.scala     |  76 ++++++++++++++-
 8 files changed, 219 insertions(+), 26 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieIOHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieIOHandle.java
index 39400394048..6865a6ac653 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieIOHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieIOHandle.java
@@ -30,9 +30,9 @@ public abstract class HoodieIOHandle<T, I, K, O> {
 
   protected final String instantTime;
   protected final HoodieWriteConfig config;
-  protected final HoodieStorage storage;
-  protected final FileSystem fs;
   protected final HoodieTable<T, I, K, O> hoodieTable;
+  protected FileSystem fs;
+  protected HoodieStorage storage;
 
   HoodieIOHandle(HoodieWriteConfig config, Option<String> instantTime, 
HoodieTable<T, I, K, O> hoodieTable) {
     this.instantTime = instantTime.orElse(StringUtils.EMPTY_STRING);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
index 47acc9a17ae..ffeeea326ee 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
@@ -52,23 +52,33 @@ import static 
org.apache.hudi.common.util.ValidationUtils.checkState;
 public class HoodieMergedReadHandle<T, I, K, O> extends HoodieReadHandle<T, I, 
K, O> {
 
   protected final Schema readerSchema;
+  private final Option<FileSlice> fileSliceOpt;
 
   public HoodieMergedReadHandle(HoodieWriteConfig config,
                                 Option<String> instantTime,
                                 HoodieTable<T, I, K, O> hoodieTable,
                                 Pair<String, String> partitionPathFileIDPair) {
+    this(config, instantTime, hoodieTable, partitionPathFileIDPair, 
Option.empty());
+  }
+
+  public HoodieMergedReadHandle(HoodieWriteConfig config,
+                                Option<String> instantTime,
+                                HoodieTable<T, I, K, O> hoodieTable,
+                                Pair<String, String> partitionPathFileIDPair,
+                                Option<FileSlice> fileSliceOption) {
     super(config, instantTime, hoodieTable, partitionPathFileIDPair);
     readerSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(config.getSchema()), 
config.allowOperationMetadataField());
+    fileSliceOpt = fileSliceOption.isPresent() ? fileSliceOption : 
getLatestFileSlice();
   }
 
   public List<HoodieRecord<T>> getMergedRecords() {
-    Option<FileSlice> fileSliceOpt = getLatestFileSlice();
     if (!fileSliceOpt.isPresent()) {
       return Collections.emptyList();
     }
     checkState(nonEmpty(instantTime), String.format("Expected a valid instant 
time but got `%s`", instantTime));
     final FileSlice fileSlice = fileSliceOpt.get();
-    final HoodieRecordLocation currentLocation = new 
HoodieRecordLocation(instantTime, fileSlice.getFileId());
+    String baseFileInstantTime = fileSlice.getBaseFile().get().getCommitTime();
+    final HoodieRecordLocation currentLocation = new 
HoodieRecordLocation(baseFileInstantTime, fileSlice.getFileId());
     Option<HoodieFileReader> baseFileReader = Option.empty();
     HoodieMergedLogRecordScanner logRecordScanner = null;
     try {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 4fc3271e8a0..532167c4281 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -73,7 +73,9 @@ import org.apache.hudi.storage.HoodieStorageUtils;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
+import org.apache.hudi.io.HoodieMergedReadHandle;
 import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.Schema;
 import org.slf4j.Logger;
@@ -181,6 +183,10 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     ValidationUtils.checkArgument(!initialized || this.metadata != null, "MDT 
Reader should have been opened post initialization");
   }
 
+  protected HoodieTable getHoodieTable(HoodieWriteConfig writeConfig, 
HoodieTableMetaClient metaClient) {
+    return null;
+  }
+
   private void initMetadataReader() {
     if (this.metadata != null) {
       this.metadata.close();
@@ -537,28 +543,50 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
   private Pair<Integer, HoodieData<HoodieRecord>> 
initializeRecordIndexPartition() throws IOException {
     final HoodieMetadataFileSystemView fsView = new 
HoodieMetadataFileSystemView(dataMetaClient,
         dataMetaClient.getActiveTimeline(), metadata);
+    final HoodieTable hoodieTable = getHoodieTable(dataWriteConfig, 
dataMetaClient);
 
     // Collect the list of latest base files present in each partition
     List<String> partitions = metadata.getAllPartitionPaths();
     fsView.loadAllPartitions();
-    final List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs = new 
ArrayList<>();
-    for (String partition : partitions) {
-      partitionBaseFilePairs.addAll(fsView.getLatestBaseFiles(partition)
-          .map(basefile -> Pair.of(partition, 
basefile)).collect(Collectors.toList()));
-    }
+    HoodieData<HoodieRecord> records = null;
+    if (dataMetaClient.getTableConfig().getTableType() == 
HoodieTableType.COPY_ON_WRITE) {
+      // for COW, we can only consider base files to initialize.
+      final List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs = new 
ArrayList<>();
+      for (String partition : partitions) {
+        partitionBaseFilePairs.addAll(fsView.getLatestBaseFiles(partition)
+            .map(basefile -> Pair.of(partition, 
basefile)).collect(Collectors.toList()));
+      }
 
-    LOG.info("Initializing record index from {} base files in {} partitions", 
partitionBaseFilePairs.size(), partitions.size());
+      LOG.info("Initializing record index from " + 
partitionBaseFilePairs.size() + " base files in "
+          + partitions.size() + " partitions");
+
+      // Collect record keys from the files in parallel
+      records = readRecordKeysFromBaseFiles(
+          engineContext,
+          dataWriteConfig,
+          partitionBaseFilePairs,
+          false,
+          dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism(),
+          dataWriteConfig.getBasePath(),
+          storageConf,
+          this.getClass().getSimpleName());
+    } else {
+      final List<Pair<String, FileSlice>> partitionFileSlicePairs = new 
ArrayList<>();
+      for (String partition : partitions) {
+        fsView.getLatestFileSlices(partition).forEach(fs -> 
partitionFileSlicePairs.add(Pair.of(partition, fs)));
+      }
 
-    // Collect record keys from the files in parallel
-    HoodieData<HoodieRecord> records = readRecordKeysFromBaseFiles(
-        engineContext,
-        dataWriteConfig,
-        partitionBaseFilePairs,
-        false,
-        dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism(),
-        dataWriteConfig.getBasePath(),
-        storageConf,
-        this.getClass().getSimpleName());
+      LOG.info("Initializing record index from " + 
partitionFileSlicePairs.size() + " file slices in "
+          + partitions.size() + " partitions");
+      records = readRecordKeysFromFileSliceSnapshot(
+          engineContext,
+          partitionFileSlicePairs,
+          dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism(),
+          this.getClass().getSimpleName(),
+          dataMetaClient,
+          dataWriteConfig,
+          hoodieTable);
+    }
     records.persist("MEMORY_AND_DISK_SER");
     final long recordCount = records.count();
 
@@ -572,6 +600,50 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     return Pair.of(fileGroupCount, records);
   }
 
+  /**
+   * Fetch record locations from FileSlice snapshot.
+   * @param engineContext context ot use.
+   * @param partitionFileSlicePairs list of pairs of partition and file slice.
+   * @param recordIndexMaxParallelism parallelism to use.
+   * @param activeModule active module of interest.
+   * @param metaClient metaclient instance to use.
+   * @param dataWriteConfig write config to use.
+   * @param hoodieTable hoodie table instance of interest.
+   * @return
+   */
+  private static HoodieData<HoodieRecord> 
readRecordKeysFromFileSliceSnapshot(HoodieEngineContext engineContext,
+                                                                              
List<Pair<String, FileSlice>> partitionFileSlicePairs,
+                                                                              
int recordIndexMaxParallelism,
+                                                                              
String activeModule,
+                                                                              
HoodieTableMetaClient metaClient,
+                                                                              
HoodieWriteConfig dataWriteConfig,
+                                                                              
HoodieTable hoodieTable) {
+    if (partitionFileSlicePairs.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+
+    Option<String> instantTime = 
metaClient.getActiveTimeline().getCommitsTimeline()
+        .filterCompletedInstants()
+        .lastInstant()
+        .map(HoodieInstant::getTimestamp);
+
+    engineContext.setJobStatus(activeModule, "Record Index: reading record 
keys from " + partitionFileSlicePairs.size() + " file slices");
+    final int parallelism = Math.min(partitionFileSlicePairs.size(), 
recordIndexMaxParallelism);
+
+    return engineContext.parallelize(partitionFileSlicePairs, 
parallelism).flatMap(partitionAndFileSlice -> {
+
+      final String partition = partitionAndFileSlice.getKey();
+      final FileSlice fileSlice = partitionAndFileSlice.getValue();
+      final String fileId = fileSlice.getFileId();
+      return new HoodieMergedReadHandle(dataWriteConfig, instantTime, 
hoodieTable, Pair.of(partition, fileSlice.getFileId()),
+          Option.of(fileSlice)).getMergedRecords().stream().map(record -> {
+            HoodieRecord record1 = (HoodieRecord) record;
+            return 
HoodieMetadataPayload.createRecordIndexUpdate(record1.getRecordKey(), 
partition, fileId,
+            record1.getCurrentLocation().getInstantTime(), 0);
+          }).iterator();
+    });
+  }
+
   private Pair<Integer, HoodieData<HoodieRecord>> 
initializeFilesPartition(List<DirectoryInfo> partitionInfoList) {
     // FILES partition uses a single file group
     final int fileGroupCount = 1;
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index 5bf73306efb..a61d0e566e5 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -38,6 +38,8 @@ import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.Schema;
 import org.slf4j.Logger;
@@ -189,4 +191,9 @@ public class FlinkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
                                                                int 
parallelism, Schema readerSchema, StorageConfiguration<?> storageConf) {
     throw new HoodieNotSupportedException("Flink metadata table does not 
support functional index yet.");
   }
+
+  @Override
+  protected HoodieTable getHoodieTable(HoodieWriteConfig writeConfig, 
HoodieTableMetaClient metaClient) {
+    return HoodieFlinkTable.create(writeConfig, engineContext, metaClient);
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index f5e7165a872..84d282ef51c 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -41,6 +41,8 @@ import org.apache.hudi.index.functional.HoodieFunctionalIndex;
 import org.apache.hudi.metrics.DistributedRegistry;
 import org.apache.hudi.metrics.MetricsReporterType;
 import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.Schema;
 import org.apache.spark.api.java.JavaRDD;
@@ -211,6 +213,11 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
     return HoodieJavaRDD.of(Collections.emptyList(), sparkEngineContext, 
parallelism);
   }
 
+  @Override
+  protected HoodieTable getHoodieTable(HoodieWriteConfig writeConfig, 
HoodieTableMetaClient metaClient) {
+    return HoodieSparkTable.create(writeConfig, engineContext, metaClient);
+  }
+
   @Override
   public BaseHoodieWriteClient<?, JavaRDD<HoodieRecord>, ?, ?> 
initializeWriteClient() {
     return new SparkRDDWriteClient(engineContext, metadataWriteConfig, true);
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 31f6b1c562d..80d22a279ec 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -211,6 +211,18 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
     this(DEFAULT_PARTITION_PATHS);
   }
 
+  public static HoodieTestDataGenerator createTestGeneratorFirstPartition() {
+    return new HoodieTestDataGenerator(new 
String[]{DEFAULT_FIRST_PARTITION_PATH});
+  }
+
+  public static HoodieTestDataGenerator createTestGeneratorSecondPartition() {
+    return new HoodieTestDataGenerator(new 
String[]{DEFAULT_SECOND_PARTITION_PATH});
+  }
+
+  public static HoodieTestDataGenerator createTestGeneratorThirdPartition() {
+    return new HoodieTestDataGenerator(new 
String[]{DEFAULT_THIRD_PARTITION_PATH});
+  }
+
   public HoodieTestDataGenerator(boolean makeDatesAmbiguous) {
     this();
     this.makeDatesAmbiguous = makeDatesAmbiguous;
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
index 1df776cc771..e4158b0e17b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
@@ -33,6 +33,7 @@ import org.apache.hudi.testutils.HoodieSparkClientTestBase
 import org.apache.hudi.util.JavaConversions
 
 import org.apache.spark.sql._
+import org.apache.spark.sql.{DataFrame, _}
 import org.apache.spark.sql.functions.{col, not}
 import org.junit.jupiter.api._
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
@@ -190,10 +191,14 @@ class RecordLevelIndexTestBase extends 
HoodieSparkClientTestBase {
     latestBatchDf
   }
 
+  protected def calculateMergedDf(latestBatchDf: DataFrame, operation: 
String): DataFrame = {
+    calculateMergedDf(latestBatchDf, operation, false)
+  }
+
   /**
    * @return [[DataFrame]] that should not exist as of the latest instant; 
used for non-existence validation.
    */
-  protected def calculateMergedDf(latestBatchDf: DataFrame, operation: 
String): DataFrame = {
+  protected def calculateMergedDf(latestBatchDf: DataFrame, operation: String, 
globalIndexEnableUpdatePartitions: Boolean): DataFrame = {
     val prevDfOpt = mergedDfList.lastOption
     if (prevDfOpt.isEmpty) {
       mergedDfList = mergedDfList :+ latestBatchDf
@@ -216,10 +221,16 @@ class RecordLevelIndexTestBase extends 
HoodieSparkClientTestBase {
         prevDf.filter(col("partition").isInCollection(overwrittenPartitions))
       } else {
         val prevDf = prevDfOpt.get
-        val prevDfOld = prevDf.join(latestBatchDf, prevDf("_row_key") === 
latestBatchDf("_row_key")
-          && prevDf("partition") === latestBatchDf("partition"), "leftanti")
-        val latestSnapshot = prevDfOld.union(latestBatchDf)
-        mergedDfList = mergedDfList :+ latestSnapshot
+        if (globalIndexEnableUpdatePartitions) {
+          val prevDfOld = prevDf.join(latestBatchDf, prevDf("_row_key") === 
latestBatchDf("_row_key"), "leftanti")
+          val latestSnapshot = prevDfOld.union(latestBatchDf)
+          mergedDfList = mergedDfList :+ latestSnapshot
+        } else {
+          val prevDfOld = prevDf.join(latestBatchDf, prevDf("_row_key") === 
latestBatchDf("_row_key")
+            && prevDf("partition") === latestBatchDf("partition"), "leftanti")
+          val latestSnapshot = prevDfOld.union(latestBatchDf)
+          mergedDfList = mergedDfList :+ latestSnapshot
+        }
         sparkSession.emptyDataFrame
       }
     }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
index b5304cd2e23..31d2288276a 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
@@ -24,12 +24,15 @@ import 
org.apache.hudi.client.transaction.PreferWriterConflictResolutionStrategy
 import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.model._
 import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, 
InProcessTimeGenerator}
 import org.apache.hudi.config._
 import org.apache.hudi.exception.HoodieWriteConflictException
-import 
org.apache.hudi.functional.TestCOWDataSourceStorage.{SQL_DRIVER_IS_NOT_NULL, 
SQL_DRIVER_IS_NULL, SQL_QUERY_EQUALITY_VALIDATOR_CLASS_NAME, 
SQL_QUERY_INEQUALITY_VALIDATOR_CLASS_NAME, SQL_RIDER_IS_NOT_NULL, 
SQL_RIDER_IS_NULL}
 import org.apache.hudi.metadata.{HoodieBackedTableMetadata, 
MetadataPartitionType}
 import org.apache.hudi.util.JavaConversions
+
 import org.apache.spark.sql._
+import org.apache.spark.sql.functions.lit
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api._
 import org.junit.jupiter.params.ParameterizedTest
@@ -38,6 +41,7 @@ import org.junit.jupiter.params.provider.{Arguments, 
CsvSource, EnumSource, Meth
 
 import java.util.Collections
 import java.util.concurrent.Executors
+
 import scala.collection.JavaConverters._
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, ExecutionContext, Future}
@@ -55,6 +59,76 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase {
       saveMode = SaveMode.Overwrite)
   }
 
+  @Test
+  def testRLIInitializationForMorGlobalIndex(): Unit = {
+    val tableType = HoodieTableType.MERGE_ON_READ
+    val hudiOpts = commonOpts + (DataSourceWriteOptions.TABLE_TYPE.key -> 
tableType.name()) +
+      (HoodieMetadataConfig.RECORD_INDEX_MIN_FILE_GROUP_COUNT_PROP.key -> "1") 
+
+      (HoodieMetadataConfig.RECORD_INDEX_MAX_FILE_GROUP_COUNT_PROP.key -> "1") 
+
+      (HoodieIndexConfig.INDEX_TYPE.key -> "RECORD_INDEX") +
+      (HoodieIndexConfig.RECORD_INDEX_UPDATE_PARTITION_PATH_ENABLE.key -> 
"true") -
+      HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key
+
+    val dataGen1 = HoodieTestDataGenerator.createTestGeneratorFirstPartition()
+    val dataGen2 = HoodieTestDataGenerator.createTestGeneratorSecondPartition()
+
+    // batch1 inserts
+    val instantTime1 = getNewInstantTime()
+    val latestBatch = recordsToStrings(dataGen1.generateInserts(instantTime1, 
5)).asScala.toSeq
+    var operation = INSERT_OPERATION_OPT_VAL
+    val latestBatchDf = 
spark.read.json(spark.sparkContext.parallelize(latestBatch, 1))
+    latestBatchDf.cache()
+    latestBatchDf.write.format("org.apache.hudi")
+      .options(hudiOpts)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+    val deletedDf1 = calculateMergedDf(latestBatchDf, operation, true)
+    deletedDf1.cache()
+
+    // batch2. upsert. update few records to 2nd partition from partition1 and 
insert a few to partition2.
+    val instantTime2 = getNewInstantTime()
+
+    val latestBatch2_1 = 
recordsToStrings(dataGen1.generateUniqueUpdates(instantTime2, 3)).asScala.toSeq
+    val latestBatchDf2_1 = 
spark.read.json(spark.sparkContext.parallelize(latestBatch2_1, 1))
+    val latestBatchDf2_2 = latestBatchDf2_1.withColumn("partition", 
lit(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH))
+      .withColumn("partition_path", 
lit(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH))
+    val latestBatch2_3 = 
recordsToStrings(dataGen2.generateInserts(instantTime2, 2)).asScala.toSeq
+    val latestBatchDf2_3 = 
spark.read.json(spark.sparkContext.parallelize(latestBatch2_3, 1))
+    val latestBatchDf2Final = latestBatchDf2_3.union(latestBatchDf2_2)
+    latestBatchDf2Final.cache()
+    latestBatchDf2Final.write.format("org.apache.hudi")
+      .options(hudiOpts)
+      .mode(SaveMode.Append)
+      .save(basePath)
+    operation = UPSERT_OPERATION_OPT_VAL
+    val deletedDf2 = calculateMergedDf(latestBatchDf2Final, operation, true)
+    deletedDf2.cache()
+
+    val hudiOpts2 = commonOpts + (DataSourceWriteOptions.TABLE_TYPE.key -> 
tableType.name()) +
+      (HoodieMetadataConfig.RECORD_INDEX_MIN_FILE_GROUP_COUNT_PROP.key -> "1") 
+
+      (HoodieMetadataConfig.RECORD_INDEX_MAX_FILE_GROUP_COUNT_PROP.key -> "1") 
+
+      (HoodieIndexConfig.INDEX_TYPE.key -> "RECORD_INDEX") +
+      (HoodieIndexConfig.RECORD_INDEX_UPDATE_PARTITION_PATH_ENABLE.key -> 
"true") +
+      (HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key -> "true")
+
+    val instantTime3 = getNewInstantTime()
+    // batch3. updates to partition2
+    val latestBatch3 = 
recordsToStrings(dataGen2.generateUniqueUpdates(instantTime3, 2)).asScala.toSeq
+    val latestBatchDf3 = 
spark.read.json(spark.sparkContext.parallelize(latestBatch3, 1))
+    latestBatchDf3.cache()
+    latestBatchDf.write.format("org.apache.hudi")
+      .options(hudiOpts2)
+      .mode(SaveMode.Append)
+      .save(basePath)
+    val deletedDf3 = calculateMergedDf(latestBatchDf, operation, true)
+    deletedDf3.cache()
+    validateDataAndRecordIndices(hudiOpts, deletedDf3)
+  }
+
+  private def getNewInstantTime(): String = {
+    InProcessTimeGenerator.createNewInstantTime();
+  }
+
   @ParameterizedTest
   @EnumSource(classOf[HoodieTableType])
   def testRLIUpsert(tableType: HoodieTableType): Unit = {

Reply via email to