This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit ba94e96dbf125d000feb70b415979d8ac373f906 Author: Sivabalan Narayanan <[email protected]> AuthorDate: Wed May 15 07:04:20 2024 -0700 [HUDI-7712] Fixing RLI initialization to account for file slices instead of just base files while initializing (#11153) Co-authored-by: Y Ethan Guo <[email protected]> --- .../java/org/apache/hudi/io/HoodieIOHandle.java | 4 +- .../org/apache/hudi/io/HoodieMergedReadHandle.java | 14 ++- .../metadata/HoodieBackedTableMetadataWriter.java | 104 +++++++++++++++++---- .../FlinkHoodieBackedTableMetadataWriter.java | 7 ++ .../SparkHoodieBackedTableMetadataWriter.java | 8 ++ .../common/testutils/HoodieTestDataGenerator.java | 12 +++ .../hudi/functional/RecordLevelIndexTestBase.scala | 21 ++++- .../hudi/functional/TestRecordLevelIndex.scala | 78 +++++++++++++++- .../deltastreamer/TestHoodieDeltaStreamer.java | 2 +- 9 files changed, 222 insertions(+), 28 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieIOHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieIOHandle.java index 39400394048..6865a6ac653 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieIOHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieIOHandle.java @@ -30,9 +30,9 @@ public abstract class HoodieIOHandle<T, I, K, O> { protected final String instantTime; protected final HoodieWriteConfig config; - protected final HoodieStorage storage; - protected final FileSystem fs; protected final HoodieTable<T, I, K, O> hoodieTable; + protected FileSystem fs; + protected HoodieStorage storage; HoodieIOHandle(HoodieWriteConfig config, Option<String> instantTime, HoodieTable<T, I, K, O> hoodieTable) { this.instantTime = instantTime.orElse(StringUtils.EMPTY_STRING); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java index bb64edbb0b0..4d5ace58274 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java @@ -53,25 +53,35 @@ public class HoodieMergedReadHandle<T, I, K, O> extends HoodieReadHandle<T, I, K protected final Schema readerSchema; protected final Schema baseFileReaderSchema; + private final Option<FileSlice> fileSliceOpt; public HoodieMergedReadHandle(HoodieWriteConfig config, Option<String> instantTime, HoodieTable<T, I, K, O> hoodieTable, Pair<String, String> partitionPathFileIDPair) { + this(config, instantTime, hoodieTable, partitionPathFileIDPair, Option.empty()); + } + + public HoodieMergedReadHandle(HoodieWriteConfig config, + Option<String> instantTime, + HoodieTable<T, I, K, O> hoodieTable, + Pair<String, String> partitionPathFileIDPair, + Option<FileSlice> fileSliceOption) { super(config, instantTime, hoodieTable, partitionPathFileIDPair); readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField()); // config.getSchema is not canonicalized, while config.getWriteSchema is canonicalized. So, we have to use the canonicalized schema to read the existing data. baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getWriteSchema()), config.allowOperationMetadataField()); + fileSliceOpt = fileSliceOption.isPresent() ? fileSliceOption : getLatestFileSlice(); } public List<HoodieRecord<T>> getMergedRecords() { - Option<FileSlice> fileSliceOpt = getLatestFileSlice(); if (!fileSliceOpt.isPresent()) { return Collections.emptyList(); } checkState(nonEmpty(instantTime), String.format("Expected a valid instant time but got `%s`", instantTime)); final FileSlice fileSlice = fileSliceOpt.get(); - final HoodieRecordLocation currentLocation = new HoodieRecordLocation(instantTime, fileSlice.getFileId()); + String baseFileInstantTime = fileSlice.getBaseFile().get().getCommitTime(); + final HoodieRecordLocation currentLocation = new HoodieRecordLocation(baseFileInstantTime, fileSlice.getFileId()); Option<HoodieFileReader> baseFileReader = Option.empty(); HoodieMergedLogRecordScanner logRecordScanner = null; try { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 445c7b74fff..dd292830a85 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -71,7 +71,9 @@ import org.apache.hudi.storage.HoodieStorageUtils; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.StoragePathInfo; +import org.apache.hudi.io.HoodieMergedReadHandle; import org.apache.hudi.table.BulkInsertPartitioner; +import org.apache.hudi.table.HoodieTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -179,6 +181,10 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM ValidationUtils.checkArgument(!initialized || this.metadata != null, "MDT Reader should have been opened post initialization"); } + protected HoodieTable getHoodieTable(HoodieWriteConfig writeConfig, HoodieTableMetaClient metaClient) { + return null; + } + private void initMetadataReader() { if (this.metadata != null) { this.metadata.close(); @@ -487,28 +493,50 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM private Pair<Integer, HoodieData<HoodieRecord>> initializeRecordIndexPartition() throws IOException { final HoodieMetadataFileSystemView fsView = new HoodieMetadataFileSystemView(dataMetaClient, dataMetaClient.getActiveTimeline(), metadata); + final HoodieTable hoodieTable = getHoodieTable(dataWriteConfig, dataMetaClient); // Collect the list of latest base files present in each partition List<String> partitions = metadata.getAllPartitionPaths(); fsView.loadAllPartitions(); - final List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs = new ArrayList<>(); - for (String partition : partitions) { - partitionBaseFilePairs.addAll(fsView.getLatestBaseFiles(partition) - .map(basefile -> Pair.of(partition, basefile)).collect(Collectors.toList())); - } + HoodieData<HoodieRecord> records = null; + if (dataMetaClient.getTableConfig().getTableType() == HoodieTableType.COPY_ON_WRITE) { + // for COW, we can only consider base files to initialize. + final List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs = new ArrayList<>(); + for (String partition : partitions) { + partitionBaseFilePairs.addAll(fsView.getLatestBaseFiles(partition) + .map(basefile -> Pair.of(partition, basefile)).collect(Collectors.toList())); + } - LOG.info("Initializing record index from {} base files in {} partitions", partitionBaseFilePairs.size(), partitions.size()); + LOG.info("Initializing record index from " + partitionBaseFilePairs.size() + " base files in " + + partitions.size() + " partitions"); + + // Collect record keys from the files in parallel + records = readRecordKeysFromBaseFiles( + engineContext, + dataWriteConfig, + partitionBaseFilePairs, + false, + dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism(), + dataWriteConfig.getBasePath(), + storageConf, + this.getClass().getSimpleName()); + } else { + final List<Pair<String, FileSlice>> partitionFileSlicePairs = new ArrayList<>(); + for (String partition : partitions) { + fsView.getLatestFileSlices(partition).forEach(fs -> partitionFileSlicePairs.add(Pair.of(partition, fs))); + } - // Collect record keys from the files in parallel - HoodieData<HoodieRecord> records = readRecordKeysFromBaseFiles( - engineContext, - dataWriteConfig, - partitionBaseFilePairs, - false, - dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism(), - dataWriteConfig.getBasePath(), - storageConf, - this.getClass().getSimpleName()); + LOG.info("Initializing record index from " + partitionFileSlicePairs.size() + " file slices in " + + partitions.size() + " partitions"); + records = readRecordKeysFromFileSliceSnapshot( + engineContext, + partitionFileSlicePairs, + dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism(), + this.getClass().getSimpleName(), + dataMetaClient, + dataWriteConfig, + hoodieTable); + } records.persist("MEMORY_AND_DISK_SER"); final long recordCount = records.count(); @@ -522,6 +550,50 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM return Pair.of(fileGroupCount, records); } + /** + * Fetch record locations from FileSlice snapshot. + * @param engineContext context ot use. + * @param partitionFileSlicePairs list of pairs of partition and file slice. + * @param recordIndexMaxParallelism parallelism to use. + * @param activeModule active module of interest. + * @param metaClient metaclient instance to use. + * @param dataWriteConfig write config to use. + * @param hoodieTable hoodie table instance of interest. + * @return + */ + private static HoodieData<HoodieRecord> readRecordKeysFromFileSliceSnapshot(HoodieEngineContext engineContext, + List<Pair<String, FileSlice>> partitionFileSlicePairs, + int recordIndexMaxParallelism, + String activeModule, + HoodieTableMetaClient metaClient, + HoodieWriteConfig dataWriteConfig, + HoodieTable hoodieTable) { + if (partitionFileSlicePairs.isEmpty()) { + return engineContext.emptyHoodieData(); + } + + Option<String> instantTime = metaClient.getActiveTimeline().getCommitsTimeline() + .filterCompletedInstants() + .lastInstant() + .map(HoodieInstant::getTimestamp); + + engineContext.setJobStatus(activeModule, "Record Index: reading record keys from " + partitionFileSlicePairs.size() + " file slices"); + final int parallelism = Math.min(partitionFileSlicePairs.size(), recordIndexMaxParallelism); + + return engineContext.parallelize(partitionFileSlicePairs, parallelism).flatMap(partitionAndFileSlice -> { + + final String partition = partitionAndFileSlice.getKey(); + final FileSlice fileSlice = partitionAndFileSlice.getValue(); + final String fileId = fileSlice.getFileId(); + return new HoodieMergedReadHandle(dataWriteConfig, instantTime, hoodieTable, Pair.of(partition, fileSlice.getFileId()), + Option.of(fileSlice)).getMergedRecords().stream().map(record -> { + HoodieRecord record1 = (HoodieRecord) record; + return HoodieMetadataPayload.createRecordIndexUpdate(record1.getRecordKey(), partition, fileId, + record1.getCurrentLocation().getInstantTime(), 0); + }).iterator(); + }); + } + private Pair<Integer, HoodieData<HoodieRecord>> initializeFilesPartition(List<DirectoryInfo> partitionInfoList) { // FILES partition uses a single file group final int fileGroupCount = 1; diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index 2ae017b85b4..77f1439c982 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -36,6 +36,8 @@ import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.table.BulkInsertPartitioner; +import org.apache.hudi.table.HoodieFlinkTable; +import org.apache.hudi.table.HoodieTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -197,4 +199,9 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad protected void preWrite(String instantTime) { metadataMetaClient.getActiveTimeline().transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instantTime); } + + @Override + protected HoodieTable getHoodieTable(HoodieWriteConfig writeConfig, HoodieTableMetaClient metaClient) { + return HoodieFlinkTable.create(writeConfig, engineContext, metaClient); + } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index 8e73a52ab4c..34b1c91e07b 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -28,6 +28,7 @@ import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; @@ -35,6 +36,8 @@ import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.metrics.DistributedRegistry; import org.apache.hudi.metrics.MetricsReporterType; import org.apache.hudi.storage.StorageConfiguration; +import org.apache.hudi.table.HoodieSparkTable; +import org.apache.hudi.table.HoodieTable; import org.apache.spark.api.java.JavaRDD; import org.slf4j.Logger; @@ -141,6 +144,11 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad writeClient.deletePartitions(partitionsToDrop, instantTime); } + @Override + protected HoodieTable getHoodieTable(HoodieWriteConfig writeConfig, HoodieTableMetaClient metaClient) { + return HoodieSparkTable.create(writeConfig, engineContext, metaClient); + } + @Override public BaseHoodieWriteClient<?, JavaRDD<HoodieRecord>, ?, ?> initializeWriteClient() { return new SparkRDDWriteClient(engineContext, metadataWriteConfig, true); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java index ca463cbf0e2..544d8bc787b 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java @@ -211,6 +211,18 @@ public class HoodieTestDataGenerator implements AutoCloseable { this(DEFAULT_PARTITION_PATHS); } + public static HoodieTestDataGenerator createTestGeneratorFirstPartition() { + return new HoodieTestDataGenerator(new String[]{DEFAULT_FIRST_PARTITION_PATH}); + } + + public static HoodieTestDataGenerator createTestGeneratorSecondPartition() { + return new HoodieTestDataGenerator(new String[]{DEFAULT_SECOND_PARTITION_PATH}); + } + + public static HoodieTestDataGenerator createTestGeneratorThirdPartition() { + return new HoodieTestDataGenerator(new String[]{DEFAULT_THIRD_PARTITION_PATH}); + } + public HoodieTestDataGenerator(boolean makeDatesAmbiguous) { this(); this.makeDatesAmbiguous = makeDatesAmbiguous; diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala index b4130ac189b..96853950d50 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala @@ -34,6 +34,7 @@ import org.apache.hudi.testutils.HoodieSparkClientTestBase import org.apache.hudi.util.JavaConversions import org.apache.spark.sql._ +import org.apache.spark.sql.{DataFrame, _} import org.apache.spark.sql.functions.{col, not} import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api._ @@ -191,10 +192,14 @@ class RecordLevelIndexTestBase extends HoodieSparkClientTestBase { latestBatchDf } + protected def calculateMergedDf(latestBatchDf: DataFrame, operation: String): DataFrame = { + calculateMergedDf(latestBatchDf, operation, false) + } + /** * @return [[DataFrame]] that should not exist as of the latest instant; used for non-existence validation. */ - protected def calculateMergedDf(latestBatchDf: DataFrame, operation: String): DataFrame = { + protected def calculateMergedDf(latestBatchDf: DataFrame, operation: String, globalIndexEnableUpdatePartitions: Boolean): DataFrame = { val prevDfOpt = mergedDfList.lastOption if (prevDfOpt.isEmpty) { mergedDfList = mergedDfList :+ latestBatchDf @@ -217,10 +222,16 @@ class RecordLevelIndexTestBase extends HoodieSparkClientTestBase { prevDf.filter(col("partition").isInCollection(overwrittenPartitions)) } else { val prevDf = prevDfOpt.get - val prevDfOld = prevDf.join(latestBatchDf, prevDf("_row_key") === latestBatchDf("_row_key") - && prevDf("partition") === latestBatchDf("partition"), "leftanti") - val latestSnapshot = prevDfOld.union(latestBatchDf) - mergedDfList = mergedDfList :+ latestSnapshot + if (globalIndexEnableUpdatePartitions) { + val prevDfOld = prevDf.join(latestBatchDf, prevDf("_row_key") === latestBatchDf("_row_key"), "leftanti") + val latestSnapshot = prevDfOld.union(latestBatchDf) + mergedDfList = mergedDfList :+ latestSnapshot + } else { + val prevDfOld = prevDf.join(latestBatchDf, prevDf("_row_key") === latestBatchDf("_row_key") + && prevDf("partition") === latestBatchDf("partition"), "leftanti") + val latestSnapshot = prevDfOld.union(latestBatchDf) + mergedDfList = mergedDfList :+ latestSnapshot + } sparkSession.emptyDataFrame } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala index 393587f34ac..a2ae2b27445 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala @@ -23,13 +23,16 @@ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.client.transaction.PreferWriterConflictResolutionStrategy import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.model._ -import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant, HoodieTimeline} +import org.apache.hudi.common.testutils.HoodieTestDataGenerator +import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings import org.apache.hudi.config._ import org.apache.hudi.exception.HoodieWriteConflictException -import org.apache.hudi.functional.TestCOWDataSourceStorage.{SQL_DRIVER_IS_NOT_NULL, SQL_DRIVER_IS_NULL, SQL_QUERY_EQUALITY_VALIDATOR_CLASS_NAME, SQL_QUERY_INEQUALITY_VALIDATOR_CLASS_NAME, SQL_RIDER_IS_NOT_NULL, SQL_RIDER_IS_NULL} import org.apache.hudi.metadata.{HoodieBackedTableMetadata, MetadataPartitionType} import org.apache.hudi.util.JavaConversions + import org.apache.spark.sql._ +import org.apache.spark.sql.functions.lit import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api._ import org.junit.jupiter.params.ParameterizedTest @@ -38,6 +41,7 @@ import org.junit.jupiter.params.provider.{Arguments, CsvSource, EnumSource, Meth import java.util.Collections import java.util.concurrent.Executors + import scala.collection.JavaConverters._ import scala.concurrent.duration.Duration import scala.concurrent.{Await, ExecutionContext, Future} @@ -55,6 +59,76 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase { saveMode = SaveMode.Overwrite) } + @Test + def testRLIInitializationForMorGlobalIndex(): Unit = { + val tableType = HoodieTableType.MERGE_ON_READ + val hudiOpts = commonOpts + (DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name()) + + (HoodieMetadataConfig.RECORD_INDEX_MIN_FILE_GROUP_COUNT_PROP.key -> "1") + + (HoodieMetadataConfig.RECORD_INDEX_MAX_FILE_GROUP_COUNT_PROP.key -> "1") + + (HoodieIndexConfig.INDEX_TYPE.key -> "RECORD_INDEX") + + (HoodieIndexConfig.RECORD_INDEX_UPDATE_PARTITION_PATH_ENABLE.key -> "true") - + HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key + + val dataGen1 = HoodieTestDataGenerator.createTestGeneratorFirstPartition() + val dataGen2 = HoodieTestDataGenerator.createTestGeneratorSecondPartition() + + // batch1 inserts + val instantTime1 = getNewInstantTime() + val latestBatch = recordsToStrings(dataGen1.generateInserts(instantTime1, 5)).asScala.toSeq + var operation = INSERT_OPERATION_OPT_VAL + val latestBatchDf = spark.read.json(spark.sparkContext.parallelize(latestBatch, 1)) + latestBatchDf.cache() + latestBatchDf.write.format("org.apache.hudi") + .options(hudiOpts) + .mode(SaveMode.Overwrite) + .save(basePath) + val deletedDf1 = calculateMergedDf(latestBatchDf, operation, true) + deletedDf1.cache() + + // batch2. upsert. update few records to 2nd partition from partition1 and insert a few to partition2. + val instantTime2 = getNewInstantTime() + + val latestBatch2_1 = recordsToStrings(dataGen1.generateUniqueUpdates(instantTime2, 3)).asScala.toSeq + val latestBatchDf2_1 = spark.read.json(spark.sparkContext.parallelize(latestBatch2_1, 1)) + val latestBatchDf2_2 = latestBatchDf2_1.withColumn("partition", lit(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)) + .withColumn("partition_path", lit(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)) + val latestBatch2_3 = recordsToStrings(dataGen2.generateInserts(instantTime2, 2)).asScala.toSeq + val latestBatchDf2_3 = spark.read.json(spark.sparkContext.parallelize(latestBatch2_3, 1)) + val latestBatchDf2Final = latestBatchDf2_3.union(latestBatchDf2_2) + latestBatchDf2Final.cache() + latestBatchDf2Final.write.format("org.apache.hudi") + .options(hudiOpts) + .mode(SaveMode.Append) + .save(basePath) + operation = UPSERT_OPERATION_OPT_VAL + val deletedDf2 = calculateMergedDf(latestBatchDf2Final, operation, true) + deletedDf2.cache() + + val hudiOpts2 = commonOpts + (DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name()) + + (HoodieMetadataConfig.RECORD_INDEX_MIN_FILE_GROUP_COUNT_PROP.key -> "1") + + (HoodieMetadataConfig.RECORD_INDEX_MAX_FILE_GROUP_COUNT_PROP.key -> "1") + + (HoodieIndexConfig.INDEX_TYPE.key -> "RECORD_INDEX") + + (HoodieIndexConfig.RECORD_INDEX_UPDATE_PARTITION_PATH_ENABLE.key -> "true") + + (HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key -> "true") + + val instantTime3 = getNewInstantTime() + // batch3. updates to partition2 + val latestBatch3 = recordsToStrings(dataGen2.generateUniqueUpdates(instantTime3, 2)).asScala.toSeq + val latestBatchDf3 = spark.read.json(spark.sparkContext.parallelize(latestBatch3, 1)) + latestBatchDf3.cache() + latestBatchDf.write.format("org.apache.hudi") + .options(hudiOpts2) + .mode(SaveMode.Append) + .save(basePath) + val deletedDf3 = calculateMergedDf(latestBatchDf, operation, true) + deletedDf3.cache() + validateDataAndRecordIndices(hudiOpts, deletedDf3) + } + + private def getNewInstantTime(): String = { + HoodieActiveTimeline.createNewInstantTime(); + } + @ParameterizedTest @EnumSource(classOf[HoodieTableType]) def testRLIUpsert(tableType: HoodieTableType): Unit = { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index 9831ec060a8..cb30d3dc0be 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -2910,7 +2910,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1); HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(HoodieTestUtils.getDefaultStorageConf()).build(); - List<String> partitions = FSUtils.getAllPartitionPaths(new HoodieLocalEngineContext(metaClient.getStorageConf()), metaClient.getBasePath(), false); + List<String> partitions = FSUtils.getAllPartitionPaths(new HoodieLocalEngineContext(metaClient.getStorageConf()), metaClient.getBasePath(), false, false); StorageConfiguration hadoopConf = metaClient.getStorageConf(); HoodieLocalEngineContext engContext = new HoodieLocalEngineContext(hadoopConf); HoodieMetadataFileSystemView fsView = new HoodieMetadataFileSystemView(engContext, metaClient,
