This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7f4dfdab3222 test: Add Scala test for record index rebootstrap on
non-Hoodie partitions (#18208)
7f4dfdab3222 is described below
commit 7f4dfdab3222126bf780471dc1596d8e869530e7
Author: Surya Prasanna <[email protected]>
AuthorDate: Tue Feb 24 11:11:19 2026 -0800
test: Add Scala test for record index rebootstrap on non-Hoodie partitions
(#18208)
---
.../hudi/functional/TestRecordLevelIndex.scala | 134 ++++++++++++++++++++-
1 file changed, 130 insertions(+), 4 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
index c3888baf7113..367312fd6735 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
@@ -35,14 +35,15 @@ import org.apache.hudi.config.{HoodieCompactionConfig,
HoodieIndexConfig, Hoodie
import
org.apache.hudi.functional.TestRecordLevelIndex.TestPartitionedRecordLevelIndexTestCase
import org.apache.hudi.index.HoodieIndex.IndexType.RECORD_LEVEL_INDEX
import org.apache.hudi.index.record.HoodieRecordIndex
-import org.apache.hudi.metadata.{HoodieBackedTableMetadata,
HoodieTableMetadataUtil}
-import org.apache.hudi.storage.StoragePath
+import org.apache.hudi.metadata.{HoodieBackedTableMetadata,
HoodieTableMetadataUtil, MetadataPartitionType}
+import org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath
+import org.apache.hudi.storage.{StoragePath, StoragePathInfo}
import
org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy
import org.apache.spark.sql.{Row, SaveMode}
import org.apache.spark.sql.functions.lit
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertTrue, fail}
-import org.junit.jupiter.api.Tag
+import org.junit.jupiter.api.{Tag, Test}
+import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals,
assertFalse, assertTrue, fail}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource,
ValueSource}
@@ -61,6 +62,89 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase
with SparkDatasetMix
var newRecordKeys: java.util.List[String] = null
}
+ @Test
+ def testRecordIndexRebootstrapWhenHoodiePartitionMetadataIsMissingScala():
Unit = {
+ val partitionToCorrupt =
HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH
+ val insertedRecords = 30
+ val localDataGen = new HoodieTestDataGenerator()
+ val inserts = localDataGen.generateInserts("001", insertedRecords)
+ val insertDf = toDataset(spark, inserts)
+ val options = Map(HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ DataSourceWriteOptions.TABLE_TYPE.key ->
HoodieTableType.COPY_ON_WRITE.name(),
+ RECORDKEY_FIELD.key -> "_row_key",
+ PARTITIONPATH_FIELD.key -> "partition_path",
+ HoodieTableConfig.ORDERING_FIELDS.key -> "timestamp",
+ HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key() ->
"false",
+ HoodieMetadataConfig.RECORD_LEVEL_INDEX_ENABLE_PROP.key() -> "true",
+ HoodieCompactionConfig.INLINE_COMPACT.key() -> "false",
+ HoodieIndexConfig.INDEX_TYPE.key() -> RECORD_LEVEL_INDEX.name())
+ insertDf.write.format("hudi")
+ .options(options)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ assertEquals(insertedRecords,
spark.read.format("hudi").load(basePath).count())
+
+ metaClient.reloadActiveTimeline()
+ val latestTableSchema = new
TableSchemaResolver(metaClient).getTableSchemaFromLatestCommit(false).get().toString
+ val props = TypedProperties.fromMap(JavaConverters.mapAsJavaMapConverter(
+ options ++ Map(HoodieWriteConfig.AVRO_SCHEMA_STRING.key() ->
latestTableSchema)).asJava)
+ val writeConfig = HoodieWriteConfig.newBuilder()
+ .withProps(props)
+ .withPath(basePath)
+ .build()
+ val metadataBeforeRebootstrap =
metadataWriter(writeConfig).getTableMetadata
+ // Verify that the metadata table contains all the data partitions.
+ val partitionPaths = metadataBeforeRebootstrap.getAllPartitionPaths()
+ assertEquals(localDataGen.getPartitionPaths().size, partitionPaths.size,
+ "Metadata table should contain all the data partitions")
+ assertTrue(partitionPaths.contains(partitionToCorrupt),
+ "Metadata table should contain the partition to corrupt")
+
+ val filesInAllPartitionsBeforeRebootstrap =
getFilesInAllPartitions(metadataBeforeRebootstrap)
+ val recordKeys = getRecordKeys()
+ assertEquals(recordKeys.size(),
getRecordIndexEntries(metadataBeforeRebootstrap, recordKeys,
localDataGen.getPartitionPaths.toSeq).size,
+ "Record index entries should match inserted records after first batch")
+
+ assertTrue(storage.exists(new
StoragePath(getMetadataTableBasePath(basePath))),
+ "Metadata table should exist before deletion")
+
+ // Remove _hoodie_partition_metadata from one data partition.
+ removeOnePartitionMetadataFile(partitionToCorrupt)
+
+ // Delete metadata table and force a full metadata rebootstrap.
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ HoodieTableMetadataUtil.deleteMetadataTable(metaClient, context, false)
+ assertFalse(storage.exists(new
StoragePath(getMetadataTableBasePath(basePath))),
+ "Metadata table should be removed before rebootstrap")
+
+ // Rebootstrap should succeed even when one partition metadata file is
missing.
+ assertDoesNotThrow(() => metadataWriter(writeConfig).getTableMetadata,
+ "Metadata rebootstrap with record index enabled should succeed")
+ val metadataAfterRebootstrap =
metadataWriter(writeConfig).getTableMetadata.asInstanceOf[HoodieBackedTableMetadata]
+
+ // Verify the record_index partition is created after rebootstrap.
+ val recordIndexPath = new StoragePath(getMetadataTableBasePath(basePath),
MetadataPartitionType.RECORD_INDEX.getPartitionPath)
+ assertTrue(storage.exists(recordIndexPath),
+ "Record index partition should exist after metadata rebootstrap")
+
+ // Verify that the metadata table does not contain the partition that was
missing metadata.
+ val partitionPathsAfterBootstrap =
metadataAfterRebootstrap.getAllPartitionPaths()
+ assertFalse(partitionPathsAfterBootstrap.contains(partitionToCorrupt),
+ "Metadata table should not contain the partition that was missing
metadata")
+ assertEquals(localDataGen.getPartitionPaths().size - 1,
partitionPathsAfterBootstrap.size,
+ "Metadata table should contain all the data partitions except the one
that was missing metadata")
+
+ // Missing partition metadata should lead to fewer indexed records than
initially inserted.
+ val recordIndexEntriesCount =
getRecordIndexEntries(metadataAfterRebootstrap, recordKeys,
localDataGen.getPartitionPaths.toSeq).size
+ assertTrue(insertedRecords > recordIndexEntriesCount,
+ "Record index entries should not match inserted records after metadata
rebootstrap")
+
+ // Files metadata should also undercount when that partition is skipped
during bootstrap.
+ val filesInAllPartitionsAfterRebootstrap =
getFilesInAllPartitions(metadataAfterRebootstrap)
+ assertTrue(filesInAllPartitionsBeforeRebootstrap.size >
filesInAllPartitionsAfterRebootstrap.size,
+ "Metadata files partition count should be lower than data table file
count after rebootstrap")
+ }
+
def testRecordLevelIndex(tableType: HoodieTableType, streamingWriteEnabled:
Boolean, holder: testRecordLevelIndexHolder): Unit = {
val dataGen = new HoodieTestDataGenerator();
val inserts = dataGen.generateInserts("001", 5)
@@ -453,6 +537,48 @@ class TestRecordLevelIndex extends
RecordLevelIndexTestBase with SparkDatasetMix
.collectAsList().asScala.map(p => p.getKey -> p.getValue).toMap
}
+ private def getRecordKeys(): java.util.List[String] = {
+ spark.read.format("hudi").load(basePath)
+ .select("_hoodie_record_key")
+ .collectAsList()
+ .asScala
+ .map(row => row.getAs[String]("_hoodie_record_key"))
+ .asJava
+ }
+
+ private def getRecordIndexEntries(metadata: HoodieBackedTableMetadata,
+ recordKeys: java.util.List[String],
+ partitionPaths: Seq[String]): Map[String,
HoodieRecordGlobalLocation] = {
+ partitionPaths
+ .flatMap(partition => {
+ try {
+ readRecordIndex(metadata, recordKeys, HOption.of(partition))
+ } catch {
+ // Partitioned RLI can throw when a partition is skipped from
metadata bootstrap.
+ case _: ArithmeticException => Map.empty[String,
HoodieRecordGlobalLocation]
+ }
+ })
+ .toMap
+ }
+
+ private def removeOnePartitionMetadataFile(partition: String): Unit = {
+ val partitionPath = new StoragePath(basePath, partition)
+ val entries = storage.listDirectEntries(partitionPath).asScala
+ val partitionMetadataFile = entries
+ .map(_.getPath)
+ .find(path => path.getName.startsWith(".hoodie_partition_metadata"))
+ .getOrElse(throw new IllegalStateException(s"No partition metadata file
found under $partitionPath"))
+ assertTrue(storage.deleteFile(partitionMetadataFile),
+ s"Failed to delete partition metadata file $partitionMetadataFile")
+ }
+
+ private def getFilesInAllPartitions(metadata: HoodieBackedTableMetadata):
Seq[StoragePathInfo] = {
+ val partitionPaths = metadata.getAllPartitionPaths.asScala
+ .map(partitionPath => FSUtils.getAbsolutePartitionPath(new
StoragePath(basePath), partitionPath).toString)
+ .asJava
+
metadata.getAllFilesInPartitions(partitionPaths).values().asScala.flatMap(_.asScala).toSeq
+ }
+
@ParameterizedTest
@EnumSource(classOf[HoodieTableType])
def testRLIForDeletesWithHoodieIsDeletedColumn(tableType: HoodieTableType):
Unit = {