This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d74e642b5be [HUDI-7712] Fixing RLI initialization to account for file
slices instead of just base files while initializing (#11153)
d74e642b5be is described below
commit d74e642b5be121dc76f448f6943db90f99863709
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Tue May 14 10:03:12 2024 -0700
[HUDI-7712] Fixing RLI initialization to account for file slices instead of
just base files while initializing (#11153)
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../java/org/apache/hudi/io/HoodieIOHandle.java | 4 +-
.../org/apache/hudi/io/HoodieMergedReadHandle.java | 14 ++-
.../metadata/HoodieBackedTableMetadataWriter.java | 104 +++++++++++++++++----
.../FlinkHoodieBackedTableMetadataWriter.java | 7 ++
.../SparkHoodieBackedTableMetadataWriter.java | 7 ++
.../common/testutils/HoodieTestDataGenerator.java | 12 +++
.../hudi/functional/RecordLevelIndexTestBase.scala | 21 ++++-
.../hudi/functional/TestRecordLevelIndex.scala | 76 ++++++++++++++-
8 files changed, 219 insertions(+), 26 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieIOHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieIOHandle.java
index 39400394048..6865a6ac653 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieIOHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieIOHandle.java
@@ -30,9 +30,9 @@ public abstract class HoodieIOHandle<T, I, K, O> {
protected final String instantTime;
protected final HoodieWriteConfig config;
- protected final HoodieStorage storage;
- protected final FileSystem fs;
protected final HoodieTable<T, I, K, O> hoodieTable;
+ protected FileSystem fs;
+ protected HoodieStorage storage;
HoodieIOHandle(HoodieWriteConfig config, Option<String> instantTime,
HoodieTable<T, I, K, O> hoodieTable) {
this.instantTime = instantTime.orElse(StringUtils.EMPTY_STRING);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
index 47acc9a17ae..ffeeea326ee 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
@@ -52,23 +52,33 @@ import static
org.apache.hudi.common.util.ValidationUtils.checkState;
public class HoodieMergedReadHandle<T, I, K, O> extends HoodieReadHandle<T, I,
K, O> {
protected final Schema readerSchema;
+ private final Option<FileSlice> fileSliceOpt;
public HoodieMergedReadHandle(HoodieWriteConfig config,
Option<String> instantTime,
HoodieTable<T, I, K, O> hoodieTable,
Pair<String, String> partitionPathFileIDPair) {
+ this(config, instantTime, hoodieTable, partitionPathFileIDPair,
Option.empty());
+ }
+
+ public HoodieMergedReadHandle(HoodieWriteConfig config,
+ Option<String> instantTime,
+ HoodieTable<T, I, K, O> hoodieTable,
+ Pair<String, String> partitionPathFileIDPair,
+ Option<FileSlice> fileSliceOption) {
super(config, instantTime, hoodieTable, partitionPathFileIDPair);
readerSchema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(config.getSchema()),
config.allowOperationMetadataField());
+ fileSliceOpt = fileSliceOption.isPresent() ? fileSliceOption :
getLatestFileSlice();
}
public List<HoodieRecord<T>> getMergedRecords() {
- Option<FileSlice> fileSliceOpt = getLatestFileSlice();
if (!fileSliceOpt.isPresent()) {
return Collections.emptyList();
}
checkState(nonEmpty(instantTime), String.format("Expected a valid instant
time but got `%s`", instantTime));
final FileSlice fileSlice = fileSliceOpt.get();
- final HoodieRecordLocation currentLocation = new
HoodieRecordLocation(instantTime, fileSlice.getFileId());
+ String baseFileInstantTime = fileSlice.getBaseFile().get().getCommitTime();
+ final HoodieRecordLocation currentLocation = new
HoodieRecordLocation(baseFileInstantTime, fileSlice.getFileId());
Option<HoodieFileReader> baseFileReader = Option.empty();
HoodieMergedLogRecordScanner logRecordScanner = null;
try {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 4fc3271e8a0..532167c4281 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -73,7 +73,9 @@ import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
+import org.apache.hudi.io.HoodieMergedReadHandle;
import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
import org.slf4j.Logger;
@@ -181,6 +183,10 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
ValidationUtils.checkArgument(!initialized || this.metadata != null, "MDT
Reader should have been opened post initialization");
}
+ protected HoodieTable getHoodieTable(HoodieWriteConfig writeConfig,
HoodieTableMetaClient metaClient) {
+ return null;
+ }
+
private void initMetadataReader() {
if (this.metadata != null) {
this.metadata.close();
@@ -537,28 +543,50 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
private Pair<Integer, HoodieData<HoodieRecord>>
initializeRecordIndexPartition() throws IOException {
final HoodieMetadataFileSystemView fsView = new
HoodieMetadataFileSystemView(dataMetaClient,
dataMetaClient.getActiveTimeline(), metadata);
+ final HoodieTable hoodieTable = getHoodieTable(dataWriteConfig,
dataMetaClient);
// Collect the list of latest base files present in each partition
List<String> partitions = metadata.getAllPartitionPaths();
fsView.loadAllPartitions();
- final List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs = new
ArrayList<>();
- for (String partition : partitions) {
- partitionBaseFilePairs.addAll(fsView.getLatestBaseFiles(partition)
- .map(basefile -> Pair.of(partition,
basefile)).collect(Collectors.toList()));
- }
+ HoodieData<HoodieRecord> records = null;
+ if (dataMetaClient.getTableConfig().getTableType() ==
HoodieTableType.COPY_ON_WRITE) {
+ // for COW, we can only consider base files to initialize.
+ final List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs = new
ArrayList<>();
+ for (String partition : partitions) {
+ partitionBaseFilePairs.addAll(fsView.getLatestBaseFiles(partition)
+ .map(basefile -> Pair.of(partition,
basefile)).collect(Collectors.toList()));
+ }
- LOG.info("Initializing record index from {} base files in {} partitions",
partitionBaseFilePairs.size(), partitions.size());
+ LOG.info("Initializing record index from " +
partitionBaseFilePairs.size() + " base files in "
+ + partitions.size() + " partitions");
+
+ // Collect record keys from the files in parallel
+ records = readRecordKeysFromBaseFiles(
+ engineContext,
+ dataWriteConfig,
+ partitionBaseFilePairs,
+ false,
+ dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism(),
+ dataWriteConfig.getBasePath(),
+ storageConf,
+ this.getClass().getSimpleName());
+ } else {
+ final List<Pair<String, FileSlice>> partitionFileSlicePairs = new
ArrayList<>();
+ for (String partition : partitions) {
+ fsView.getLatestFileSlices(partition).forEach(fs ->
partitionFileSlicePairs.add(Pair.of(partition, fs)));
+ }
- // Collect record keys from the files in parallel
- HoodieData<HoodieRecord> records = readRecordKeysFromBaseFiles(
- engineContext,
- dataWriteConfig,
- partitionBaseFilePairs,
- false,
- dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism(),
- dataWriteConfig.getBasePath(),
- storageConf,
- this.getClass().getSimpleName());
+ LOG.info("Initializing record index from " +
partitionFileSlicePairs.size() + " file slices in "
+ + partitions.size() + " partitions");
+ records = readRecordKeysFromFileSliceSnapshot(
+ engineContext,
+ partitionFileSlicePairs,
+ dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism(),
+ this.getClass().getSimpleName(),
+ dataMetaClient,
+ dataWriteConfig,
+ hoodieTable);
+ }
records.persist("MEMORY_AND_DISK_SER");
final long recordCount = records.count();
@@ -572,6 +600,50 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
return Pair.of(fileGroupCount, records);
}
+ /**
+ * Fetch record locations from FileSlice snapshot.
+ * @param engineContext context ot use.
+ * @param partitionFileSlicePairs list of pairs of partition and file slice.
+ * @param recordIndexMaxParallelism parallelism to use.
+ * @param activeModule active module of interest.
+ * @param metaClient metaclient instance to use.
+ * @param dataWriteConfig write config to use.
+ * @param hoodieTable hoodie table instance of interest.
+ * @return
+ */
+ private static HoodieData<HoodieRecord>
readRecordKeysFromFileSliceSnapshot(HoodieEngineContext engineContext,
+
List<Pair<String, FileSlice>> partitionFileSlicePairs,
+
int recordIndexMaxParallelism,
+
String activeModule,
+
HoodieTableMetaClient metaClient,
+
HoodieWriteConfig dataWriteConfig,
+
HoodieTable hoodieTable) {
+ if (partitionFileSlicePairs.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+
+ Option<String> instantTime =
metaClient.getActiveTimeline().getCommitsTimeline()
+ .filterCompletedInstants()
+ .lastInstant()
+ .map(HoodieInstant::getTimestamp);
+
+ engineContext.setJobStatus(activeModule, "Record Index: reading record
keys from " + partitionFileSlicePairs.size() + " file slices");
+ final int parallelism = Math.min(partitionFileSlicePairs.size(),
recordIndexMaxParallelism);
+
+ return engineContext.parallelize(partitionFileSlicePairs,
parallelism).flatMap(partitionAndFileSlice -> {
+
+ final String partition = partitionAndFileSlice.getKey();
+ final FileSlice fileSlice = partitionAndFileSlice.getValue();
+ final String fileId = fileSlice.getFileId();
+ return new HoodieMergedReadHandle(dataWriteConfig, instantTime,
hoodieTable, Pair.of(partition, fileSlice.getFileId()),
+ Option.of(fileSlice)).getMergedRecords().stream().map(record -> {
+ HoodieRecord record1 = (HoodieRecord) record;
+ return
HoodieMetadataPayload.createRecordIndexUpdate(record1.getRecordKey(),
partition, fileId,
+ record1.getCurrentLocation().getInstantTime(), 0);
+ }).iterator();
+ });
+ }
+
private Pair<Integer, HoodieData<HoodieRecord>>
initializeFilesPartition(List<DirectoryInfo> partitionInfoList) {
// FILES partition uses a single file group
final int fileGroupCount = 1;
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index 5bf73306efb..a61d0e566e5 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -38,6 +38,8 @@ import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
import org.slf4j.Logger;
@@ -189,4 +191,9 @@ public class FlinkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
int
parallelism, Schema readerSchema, StorageConfiguration<?> storageConf) {
throw new HoodieNotSupportedException("Flink metadata table does not
support functional index yet.");
}
+
+ @Override
+ protected HoodieTable getHoodieTable(HoodieWriteConfig writeConfig,
HoodieTableMetaClient metaClient) {
+ return HoodieFlinkTable.create(writeConfig, engineContext, metaClient);
+ }
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index f5e7165a872..84d282ef51c 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -41,6 +41,8 @@ import org.apache.hudi.index.functional.HoodieFunctionalIndex;
import org.apache.hudi.metrics.DistributedRegistry;
import org.apache.hudi.metrics.MetricsReporterType;
import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
import org.apache.spark.api.java.JavaRDD;
@@ -211,6 +213,11 @@ public class SparkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
return HoodieJavaRDD.of(Collections.emptyList(), sparkEngineContext,
parallelism);
}
+ @Override
+ protected HoodieTable getHoodieTable(HoodieWriteConfig writeConfig,
HoodieTableMetaClient metaClient) {
+ return HoodieSparkTable.create(writeConfig, engineContext, metaClient);
+ }
+
@Override
public BaseHoodieWriteClient<?, JavaRDD<HoodieRecord>, ?, ?>
initializeWriteClient() {
return new SparkRDDWriteClient(engineContext, metadataWriteConfig, true);
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 31f6b1c562d..80d22a279ec 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -211,6 +211,18 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
this(DEFAULT_PARTITION_PATHS);
}
+ public static HoodieTestDataGenerator createTestGeneratorFirstPartition() {
+ return new HoodieTestDataGenerator(new
String[]{DEFAULT_FIRST_PARTITION_PATH});
+ }
+
+ public static HoodieTestDataGenerator createTestGeneratorSecondPartition() {
+ return new HoodieTestDataGenerator(new
String[]{DEFAULT_SECOND_PARTITION_PATH});
+ }
+
+ public static HoodieTestDataGenerator createTestGeneratorThirdPartition() {
+ return new HoodieTestDataGenerator(new
String[]{DEFAULT_THIRD_PARTITION_PATH});
+ }
+
public HoodieTestDataGenerator(boolean makeDatesAmbiguous) {
this();
this.makeDatesAmbiguous = makeDatesAmbiguous;
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
index 1df776cc771..e4158b0e17b 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
@@ -33,6 +33,7 @@ import org.apache.hudi.testutils.HoodieSparkClientTestBase
import org.apache.hudi.util.JavaConversions
import org.apache.spark.sql._
+import org.apache.spark.sql.{DataFrame, _}
import org.apache.spark.sql.functions.{col, not}
import org.junit.jupiter.api._
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
@@ -190,10 +191,14 @@ class RecordLevelIndexTestBase extends
HoodieSparkClientTestBase {
latestBatchDf
}
+ protected def calculateMergedDf(latestBatchDf: DataFrame, operation:
String): DataFrame = {
+ calculateMergedDf(latestBatchDf, operation, false)
+ }
+
/**
* @return [[DataFrame]] that should not exist as of the latest instant;
used for non-existence validation.
*/
- protected def calculateMergedDf(latestBatchDf: DataFrame, operation:
String): DataFrame = {
+ protected def calculateMergedDf(latestBatchDf: DataFrame, operation: String,
globalIndexEnableUpdatePartitions: Boolean): DataFrame = {
val prevDfOpt = mergedDfList.lastOption
if (prevDfOpt.isEmpty) {
mergedDfList = mergedDfList :+ latestBatchDf
@@ -216,10 +221,16 @@ class RecordLevelIndexTestBase extends
HoodieSparkClientTestBase {
prevDf.filter(col("partition").isInCollection(overwrittenPartitions))
} else {
val prevDf = prevDfOpt.get
- val prevDfOld = prevDf.join(latestBatchDf, prevDf("_row_key") ===
latestBatchDf("_row_key")
- && prevDf("partition") === latestBatchDf("partition"), "leftanti")
- val latestSnapshot = prevDfOld.union(latestBatchDf)
- mergedDfList = mergedDfList :+ latestSnapshot
+ if (globalIndexEnableUpdatePartitions) {
+ val prevDfOld = prevDf.join(latestBatchDf, prevDf("_row_key") ===
latestBatchDf("_row_key"), "leftanti")
+ val latestSnapshot = prevDfOld.union(latestBatchDf)
+ mergedDfList = mergedDfList :+ latestSnapshot
+ } else {
+ val prevDfOld = prevDf.join(latestBatchDf, prevDf("_row_key") ===
latestBatchDf("_row_key")
+ && prevDf("partition") === latestBatchDf("partition"), "leftanti")
+ val latestSnapshot = prevDfOld.union(latestBatchDf)
+ mergedDfList = mergedDfList :+ latestSnapshot
+ }
sparkSession.emptyDataFrame
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
index b5304cd2e23..31d2288276a 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
@@ -24,12 +24,15 @@ import
org.apache.hudi.client.transaction.PreferWriterConflictResolutionStrategy
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model._
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.common.testutils.{HoodieTestDataGenerator,
InProcessTimeGenerator}
import org.apache.hudi.config._
import org.apache.hudi.exception.HoodieWriteConflictException
-import
org.apache.hudi.functional.TestCOWDataSourceStorage.{SQL_DRIVER_IS_NOT_NULL,
SQL_DRIVER_IS_NULL, SQL_QUERY_EQUALITY_VALIDATOR_CLASS_NAME,
SQL_QUERY_INEQUALITY_VALIDATOR_CLASS_NAME, SQL_RIDER_IS_NOT_NULL,
SQL_RIDER_IS_NULL}
import org.apache.hudi.metadata.{HoodieBackedTableMetadata,
MetadataPartitionType}
import org.apache.hudi.util.JavaConversions
+
import org.apache.spark.sql._
+import org.apache.spark.sql.functions.lit
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api._
import org.junit.jupiter.params.ParameterizedTest
@@ -38,6 +41,7 @@ import org.junit.jupiter.params.provider.{Arguments,
CsvSource, EnumSource, Meth
import java.util.Collections
import java.util.concurrent.Executors
+
import scala.collection.JavaConverters._
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
@@ -55,6 +59,76 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase {
saveMode = SaveMode.Overwrite)
}
+ @Test
+ def testRLIInitializationForMorGlobalIndex(): Unit = {
+ val tableType = HoodieTableType.MERGE_ON_READ
+ val hudiOpts = commonOpts + (DataSourceWriteOptions.TABLE_TYPE.key ->
tableType.name()) +
+ (HoodieMetadataConfig.RECORD_INDEX_MIN_FILE_GROUP_COUNT_PROP.key -> "1")
+
+ (HoodieMetadataConfig.RECORD_INDEX_MAX_FILE_GROUP_COUNT_PROP.key -> "1")
+
+ (HoodieIndexConfig.INDEX_TYPE.key -> "RECORD_INDEX") +
+ (HoodieIndexConfig.RECORD_INDEX_UPDATE_PARTITION_PATH_ENABLE.key ->
"true") -
+ HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key
+
+ val dataGen1 = HoodieTestDataGenerator.createTestGeneratorFirstPartition()
+ val dataGen2 = HoodieTestDataGenerator.createTestGeneratorSecondPartition()
+
+ // batch1 inserts
+ val instantTime1 = getNewInstantTime()
+ val latestBatch = recordsToStrings(dataGen1.generateInserts(instantTime1,
5)).asScala.toSeq
+ var operation = INSERT_OPERATION_OPT_VAL
+ val latestBatchDf =
spark.read.json(spark.sparkContext.parallelize(latestBatch, 1))
+ latestBatchDf.cache()
+ latestBatchDf.write.format("org.apache.hudi")
+ .options(hudiOpts)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ val deletedDf1 = calculateMergedDf(latestBatchDf, operation, true)
+ deletedDf1.cache()
+
+ // batch2. upsert. update few records to 2nd partition from partition1 and
insert a few to partition2.
+ val instantTime2 = getNewInstantTime()
+
+ val latestBatch2_1 =
recordsToStrings(dataGen1.generateUniqueUpdates(instantTime2, 3)).asScala.toSeq
+ val latestBatchDf2_1 =
spark.read.json(spark.sparkContext.parallelize(latestBatch2_1, 1))
+ val latestBatchDf2_2 = latestBatchDf2_1.withColumn("partition",
lit(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH))
+ .withColumn("partition_path",
lit(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH))
+ val latestBatch2_3 =
recordsToStrings(dataGen2.generateInserts(instantTime2, 2)).asScala.toSeq
+ val latestBatchDf2_3 =
spark.read.json(spark.sparkContext.parallelize(latestBatch2_3, 1))
+ val latestBatchDf2Final = latestBatchDf2_3.union(latestBatchDf2_2)
+ latestBatchDf2Final.cache()
+ latestBatchDf2Final.write.format("org.apache.hudi")
+ .options(hudiOpts)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ operation = UPSERT_OPERATION_OPT_VAL
+ val deletedDf2 = calculateMergedDf(latestBatchDf2Final, operation, true)
+ deletedDf2.cache()
+
+ val hudiOpts2 = commonOpts + (DataSourceWriteOptions.TABLE_TYPE.key ->
tableType.name()) +
+ (HoodieMetadataConfig.RECORD_INDEX_MIN_FILE_GROUP_COUNT_PROP.key -> "1")
+
+ (HoodieMetadataConfig.RECORD_INDEX_MAX_FILE_GROUP_COUNT_PROP.key -> "1")
+
+ (HoodieIndexConfig.INDEX_TYPE.key -> "RECORD_INDEX") +
+ (HoodieIndexConfig.RECORD_INDEX_UPDATE_PARTITION_PATH_ENABLE.key ->
"true") +
+ (HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key -> "true")
+
+ val instantTime3 = getNewInstantTime()
+ // batch3. updates to partition2
+ val latestBatch3 =
recordsToStrings(dataGen2.generateUniqueUpdates(instantTime3, 2)).asScala.toSeq
+ val latestBatchDf3 =
spark.read.json(spark.sparkContext.parallelize(latestBatch3, 1))
+ latestBatchDf3.cache()
+ latestBatchDf.write.format("org.apache.hudi")
+ .options(hudiOpts2)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ val deletedDf3 = calculateMergedDf(latestBatchDf, operation, true)
+ deletedDf3.cache()
+ validateDataAndRecordIndices(hudiOpts, deletedDf3)
+ }
+
+ private def getNewInstantTime(): String = {
+ InProcessTimeGenerator.createNewInstantTime();
+ }
+
@ParameterizedTest
@EnumSource(classOf[HoodieTableType])
def testRLIUpsert(tableType: HoodieTableType): Unit = {