This is an automated email from the ASF dual-hosted git repository.
nsivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e4ee7e3023ef test(metadata): Add test coverage for deferred RLI init
and bulk_insert (#18865)
e4ee7e3023ef is described below
commit e4ee7e3023efea1268b0b41510c82430fb6f2fcc
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Thu Jun 25 20:03:03 2026 -0700
test(metadata): Add test coverage for deferred RLI init and bulk_insert
(#18865)
Follow-up to #18353 (defer RLI init for fresh tables) and #18836 (robust
schema resolution during RLI bootstrap). Adds test coverage for the
deferred RLI init flow:
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
---------
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../hudi/functional/TestRecordLevelIndex.scala | 116 ++++++++++++++++++++-
1 file changed, 114 insertions(+), 2 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
index e7015b5ab81e..336886b44ba8 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
@@ -146,7 +146,8 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase
with SparkDatasetMix
"Metadata files partition count should be lower than data table file
count after rebootstrap")
}
- def testRecordLevelIndex(tableType: HoodieTableType, streamingWriteEnabled:
Boolean, holder: testRecordLevelIndexHolder): Unit = {
+ def testRecordLevelIndex(tableType: HoodieTableType, streamingWriteEnabled:
Boolean, holder: testRecordLevelIndexHolder,
+ rliInitDeferred: Boolean = false): Unit = {
val dataGen = new HoodieTestDataGenerator();
val inserts = dataGen.generateInserts("001", 5)
val latestBatchDf = toDataset(spark, inserts)
@@ -156,9 +157,10 @@ class TestRecordLevelIndex extends
RecordLevelIndexTestBase with SparkDatasetMix
RECORDKEY_FIELD.key -> "_row_key",
PARTITIONPATH_FIELD.key -> "data_partition_path",
HoodieTableConfig.ORDERING_FIELDS.key -> "timestamp",
- HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key()->
"false",
+ HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key() ->
"false",
HoodieMetadataConfig.RECORD_LEVEL_INDEX_ENABLE_PROP.key() -> "true",
HoodieMetadataConfig.STREAMING_WRITE_ENABLED.key() ->
streamingWriteEnabled.toString,
+ HoodieMetadataConfig.DEFER_RLI_INIT_FOR_FRESH_TABLE.key() ->
rliInitDeferred.toString,
HoodieCompactionConfig.INLINE_COMPACT.key() -> "false",
HoodieIndexConfig.INDEX_TYPE.key() -> RECORD_LEVEL_INDEX.name())
holder.options = options
@@ -167,11 +169,19 @@ class TestRecordLevelIndex extends
RecordLevelIndexTestBase with SparkDatasetMix
.mode(SaveMode.Overwrite)
.save(basePath)
assertEquals(10, spark.read.format("hudi").load(basePath).count())
+ if (rliInitDeferred) {
+ // With defer enabled, the first commit should NOT have initialized the
RLI partition.
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+
assertFalse(metaClient.getTableConfig.getMetadataPartitions.contains(MetadataPartitionType.RECORD_INDEX.getPartitionPath),
+ "RLI partition should not be initialized after the first commit when
defer is enabled")
+ }
val props =
TypedProperties.fromMap(JavaConverters.mapAsJavaMapConverter(options).asJava)
val writeConfig = HoodieWriteConfig.newBuilder()
.withProps(props)
.withPath(basePath)
.build()
+ // Constructing the metadata writer here will initialize RLI (lazily, on
this second metadata-writer entry)
+ // when defer is enabled, since there is now 1 completed commit on the
data table.
var metadata = metadataWriter(writeConfig).getTableMetadata
val recordKeys = inserts.asScala.map(i =>
i.getRecordKey).asJava.stream().collect(Collectors.toList())
holder.recordKeys = recordKeys
@@ -301,6 +311,108 @@ class TestRecordLevelIndex extends
RecordLevelIndexTestBase with SparkDatasetMix
}
}
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testPartitionedRecordLevelIndexDefer(streamingWriteEnabled: Boolean):
Unit = {
+ val holder = new testRecordLevelIndexHolder
+ testRecordLevelIndex(HoodieTableType.MERGE_ON_READ, streamingWriteEnabled,
holder, true)
+ assertEquals("deltacommit",
metaClient.getActiveTimeline.lastInstant().get().getAction)
+ val writeConfig = getWriteConfig(holder.options)
+ var metadata = metadataWriter(writeConfig).getTableMetadata
+ doAllAssertions(holder, metadata)
+ val writeClient = new SparkRDDWriteClient(new
HoodieSparkEngineContext(jsc), writeConfig)
+ val timeOpt = writeClient.scheduleCompaction(HOption.empty())
+ assertTrue(timeOpt.isPresent)
+ writeClient.compact(timeOpt.get())
+ metaClient.reloadActiveTimeline()
+ assertEquals("compaction",
metaClient.getActiveTimeline.lastInstant().get().getAction)
+ metadata = metadataWriter(writeConfig).getTableMetadata
+ doAllAssertions(holder, metadata)
+ writeClient.close()
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def
testPartitionedRecordLevelIndexDeferWithBulkInsert(streamingWriteEnabled:
Boolean): Unit = {
+ val tableType = HoodieTableType.MERGE_ON_READ
+ val dataGen = new HoodieTestDataGenerator()
+ val inserts1 = dataGen.generateInserts("001", 5)
+ val batch1Df = toDataset(spark, inserts1)
+ val insertDf1 = batch1Df.withColumn("data_partition_path",
lit("partition1"))
+ .union(batch1Df.withColumn("data_partition_path", lit("partition2")))
+
+ val options = Map(
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
+ RECORDKEY_FIELD.key -> "_row_key",
+ PARTITIONPATH_FIELD.key -> "data_partition_path",
+ HoodieTableConfig.ORDERING_FIELDS.key -> "timestamp",
+ HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key() ->
"false",
+ HoodieMetadataConfig.RECORD_LEVEL_INDEX_ENABLE_PROP.key() -> "true",
+ HoodieMetadataConfig.STREAMING_WRITE_ENABLED.key() ->
streamingWriteEnabled.toString,
+ HoodieMetadataConfig.DEFER_RLI_INIT_FOR_FRESH_TABLE.key() -> "true",
+ HoodieCompactionConfig.INLINE_COMPACT.key() -> "false",
+ HoodieIndexConfig.INDEX_TYPE.key() -> RECORD_LEVEL_INDEX.name())
+
+ // Commit #1: bulk_insert on a fresh table with defer enabled.
+ insertDf1.write.format("hudi")
+ .options(options)
+ .option(DataSourceWriteOptions.OPERATION.key(),
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ assertEquals(10, spark.read.format("hudi").load(basePath).count())
+
+ // Defer should have kicked in: RLI partition is not initialized after the
first bulk_insert.
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+
assertFalse(metaClient.getTableConfig.getMetadataPartitions.contains(MetadataPartitionType.RECORD_INDEX.getPartitionPath),
+ "RLI partition should not be initialized after the first bulk_insert
when defer is enabled")
+
+ // Commit #2: another bulk_insert into a new partition. New rows must use
distinct record keys.
+ val inserts2 = dataGen.generateInserts("002", 5)
+ val batch2Df = toDataset(spark, inserts2)
+ val insertDf2 = batch2Df.withColumn("data_partition_path",
lit("partition3"))
+
+ insertDf2.write.format("hudi")
+ .options(options)
+ .option(DataSourceWriteOptions.OPERATION.key(),
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ assertEquals(15, spark.read.format("hudi").load(basePath).count())
+
+ // Build metadata writer/reader; this entry will initialize RLI now that
there is a completed commit.
+ val writeConfig = getWriteConfig(options)
+ val metadata = metadataWriter(writeConfig).getTableMetadata
+
+ // RLI partition should now be present in the metadata table.
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains(MetadataPartitionType.RECORD_INDEX.getPartitionPath),
+ "RLI partition should be initialized once a completed commit exists on
the data table")
+ assertTrue(HoodieRecordIndex.isPartitioned(
+
metaClient.getIndexMetadata.get().getIndexDefinitions.get(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX)),
+ "RLI should be initialized as partitioned RLI")
+
+ // Validate record key -> location mapping for both batches against the
data.
+ val tableRows = spark.read.format("hudi").load(basePath).collect()
+
+ val batch1Keys =
inserts1.asScala.map(_.getRecordKey).asJava.stream().collect(Collectors.toList())
+ val partition1Locations = readRecordIndex(metadata, batch1Keys,
HOption.of("partition1"))
+ assertEquals(5, partition1Locations.size)
+ validateDFWithLocations(tableRows, partition1Locations, "partition1")
+ val partition2Locations = readRecordIndex(metadata, batch1Keys,
HOption.of("partition2"))
+ assertEquals(5, partition2Locations.size)
+ validateDFWithLocations(tableRows, partition2Locations, "partition2")
+
+ val batch2Keys =
inserts2.asScala.map(_.getRecordKey).asJava.stream().collect(Collectors.toList())
+ val partition3Locations = readRecordIndex(metadata, batch2Keys,
HOption.of("partition3"))
+ assertEquals(5, partition3Locations.size)
+ validateDFWithLocations(tableRows, partition3Locations, "partition3")
+
+ // Cross-partition lookups for batch1 keys against partition3 (and vice
versa) should be empty.
+ assertEquals(0, readRecordIndex(metadata, batch1Keys,
HOption.of("partition3")).size)
+ assertEquals(0, readRecordIndex(metadata, batch2Keys,
HOption.of("partition1")).size)
+ assertEquals(0, readRecordIndex(metadata, batch2Keys,
HOption.of("partition2")).size)
+ }
+
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testPartitionedRecordLevelIndexCompact(streamingWriteEnabled: Boolean):
Unit = {