This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f4dfdab3222 test: Add Scala test for record index rebootstrap on 
non-Hoodie partitions (#18208)
7f4dfdab3222 is described below

commit 7f4dfdab3222126bf780471dc1596d8e869530e7
Author: Surya Prasanna <[email protected]>
AuthorDate: Tue Feb 24 11:11:19 2026 -0800

    test: Add Scala test for record index rebootstrap on non-Hoodie partitions 
(#18208)
---
 .../hudi/functional/TestRecordLevelIndex.scala     | 134 ++++++++++++++++++++-
 1 file changed, 130 insertions(+), 4 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
index c3888baf7113..367312fd6735 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
@@ -35,14 +35,15 @@ import org.apache.hudi.config.{HoodieCompactionConfig, 
HoodieIndexConfig, Hoodie
 import 
org.apache.hudi.functional.TestRecordLevelIndex.TestPartitionedRecordLevelIndexTestCase
 import org.apache.hudi.index.HoodieIndex.IndexType.RECORD_LEVEL_INDEX
 import org.apache.hudi.index.record.HoodieRecordIndex
-import org.apache.hudi.metadata.{HoodieBackedTableMetadata, 
HoodieTableMetadataUtil}
-import org.apache.hudi.storage.StoragePath
+import org.apache.hudi.metadata.{HoodieBackedTableMetadata, 
HoodieTableMetadataUtil, MetadataPartitionType}
+import org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath
+import org.apache.hudi.storage.{StoragePath, StoragePathInfo}
 import 
org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy
 
 import org.apache.spark.sql.{Row, SaveMode}
 import org.apache.spark.sql.functions.lit
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertTrue, fail}
-import org.junit.jupiter.api.Tag
+import org.junit.jupiter.api.{Tag, Test}
+import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, 
assertFalse, assertTrue, fail}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource, 
ValueSource}
 
@@ -61,6 +62,89 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase 
with SparkDatasetMix
     var newRecordKeys: java.util.List[String] = null
   }
 
+  @Test
+  def testRecordIndexRebootstrapWhenHoodiePartitionMetadataIsMissingScala(): 
Unit = {
+    val partitionToCorrupt = 
HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH
+    val insertedRecords = 30
+    val localDataGen = new HoodieTestDataGenerator()
+    val inserts = localDataGen.generateInserts("001", insertedRecords)
+    val insertDf = toDataset(spark, inserts)
+    val options = Map(HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+      DataSourceWriteOptions.TABLE_TYPE.key -> 
HoodieTableType.COPY_ON_WRITE.name(),
+      RECORDKEY_FIELD.key -> "_row_key",
+      PARTITIONPATH_FIELD.key -> "partition_path",
+      HoodieTableConfig.ORDERING_FIELDS.key -> "timestamp",
+      HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key() -> 
"false",
+      HoodieMetadataConfig.RECORD_LEVEL_INDEX_ENABLE_PROP.key() -> "true",
+      HoodieCompactionConfig.INLINE_COMPACT.key() -> "false",
+      HoodieIndexConfig.INDEX_TYPE.key() -> RECORD_LEVEL_INDEX.name())
+    insertDf.write.format("hudi")
+      .options(options)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+    assertEquals(insertedRecords, 
spark.read.format("hudi").load(basePath).count())
+
+    metaClient.reloadActiveTimeline()
+    val latestTableSchema = new 
TableSchemaResolver(metaClient).getTableSchemaFromLatestCommit(false).get().toString
+    val props = TypedProperties.fromMap(JavaConverters.mapAsJavaMapConverter(
+      options ++ Map(HoodieWriteConfig.AVRO_SCHEMA_STRING.key() -> 
latestTableSchema)).asJava)
+    val writeConfig = HoodieWriteConfig.newBuilder()
+      .withProps(props)
+      .withPath(basePath)
+      .build()
+    val metadataBeforeRebootstrap = 
metadataWriter(writeConfig).getTableMetadata
+    // Verify that the metadata table contains all the data partitions.
+    val partitionPaths = metadataBeforeRebootstrap.getAllPartitionPaths()
+    assertEquals(localDataGen.getPartitionPaths().size, partitionPaths.size,
+      "Metadata table should contain all the data partitions")
+    assertTrue(partitionPaths.contains(partitionToCorrupt),
+      "Metadata table should contain the partition to corrupt")
+
+    val filesInAllPartitionsBeforeRebootstrap = 
getFilesInAllPartitions(metadataBeforeRebootstrap)
+    val recordKeys = getRecordKeys()
+    assertEquals(recordKeys.size(), 
getRecordIndexEntries(metadataBeforeRebootstrap, recordKeys, 
localDataGen.getPartitionPaths.toSeq).size,
+      "Record index entries should match inserted records after first batch")
+
+    assertTrue(storage.exists(new 
StoragePath(getMetadataTableBasePath(basePath))),
+      "Metadata table should exist before deletion")
+
+    // Remove _hoodie_partition_metadata from one data partition.
+    removeOnePartitionMetadataFile(partitionToCorrupt)
+
+    // Delete metadata table and force a full metadata rebootstrap.
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    HoodieTableMetadataUtil.deleteMetadataTable(metaClient, context, false)
+    assertFalse(storage.exists(new 
StoragePath(getMetadataTableBasePath(basePath))),
+      "Metadata table should be removed before rebootstrap")
+
+    // Rebootstrap should succeed even when one partition metadata file is 
missing.
+    assertDoesNotThrow(() => metadataWriter(writeConfig).getTableMetadata,
+      "Metadata rebootstrap with record index enabled should succeed")
+    val metadataAfterRebootstrap = 
metadataWriter(writeConfig).getTableMetadata.asInstanceOf[HoodieBackedTableMetadata]
+
+    // Verify the record_index partition is created after rebootstrap.
+    val recordIndexPath = new StoragePath(getMetadataTableBasePath(basePath), 
MetadataPartitionType.RECORD_INDEX.getPartitionPath)
+    assertTrue(storage.exists(recordIndexPath),
+      "Record index partition should exist after metadata rebootstrap")
+
+    // Verify that the metadata table does not contain the partition that was 
missing metadata.
+    val partitionPathsAfterBootstrap = 
metadataAfterRebootstrap.getAllPartitionPaths()
+    assertFalse(partitionPathsAfterBootstrap.contains(partitionToCorrupt),
+      "Metadata table should not contain the partition that was missing 
metadata")
+    assertEquals(localDataGen.getPartitionPaths().size - 1, 
partitionPathsAfterBootstrap.size,
+      "Metadata table should contain all the data partitions except the one 
that was missing metadata")
+
+    // Missing partition metadata should lead to fewer indexed records than 
initially inserted.
+    val recordIndexEntriesCount = 
getRecordIndexEntries(metadataAfterRebootstrap, recordKeys, 
localDataGen.getPartitionPaths.toSeq).size
+    assertTrue(insertedRecords > recordIndexEntriesCount,
+      "Record index entries should not match inserted records after metadata 
rebootstrap")
+
+    // Files metadata should also undercount when that partition is skipped 
during bootstrap.
+    val filesInAllPartitionsAfterRebootstrap = 
getFilesInAllPartitions(metadataAfterRebootstrap)
+    assertTrue(filesInAllPartitionsBeforeRebootstrap.size > 
filesInAllPartitionsAfterRebootstrap.size,
+      "Metadata files partition count should be lower than data table file 
count after rebootstrap")
+  }
+
   def testRecordLevelIndex(tableType: HoodieTableType, streamingWriteEnabled: 
Boolean, holder: testRecordLevelIndexHolder): Unit = {
     val dataGen = new HoodieTestDataGenerator();
     val inserts = dataGen.generateInserts("001", 5)
@@ -453,6 +537,48 @@ class TestRecordLevelIndex extends 
RecordLevelIndexTestBase with SparkDatasetMix
       .collectAsList().asScala.map(p => p.getKey -> p.getValue).toMap
   }
 
+  private def getRecordKeys(): java.util.List[String] = {
+    spark.read.format("hudi").load(basePath)
+      .select("_hoodie_record_key")
+      .collectAsList()
+      .asScala
+      .map(row => row.getAs[String]("_hoodie_record_key"))
+      .asJava
+  }
+
+  private def getRecordIndexEntries(metadata: HoodieBackedTableMetadata,
+                                    recordKeys: java.util.List[String],
+                                    partitionPaths: Seq[String]): Map[String, 
HoodieRecordGlobalLocation] = {
+    partitionPaths
+      .flatMap(partition => {
+        try {
+          readRecordIndex(metadata, recordKeys, HOption.of(partition))
+        } catch {
+          // Partitioned RLI can throw when a partition is skipped from 
metadata bootstrap.
+          case _: ArithmeticException => Map.empty[String, 
HoodieRecordGlobalLocation]
+        }
+      })
+      .toMap
+  }
+
+  private def removeOnePartitionMetadataFile(partition: String): Unit = {
+    val partitionPath = new StoragePath(basePath, partition)
+    val entries = storage.listDirectEntries(partitionPath).asScala
+    val partitionMetadataFile = entries
+      .map(_.getPath)
+      .find(path => path.getName.startsWith(".hoodie_partition_metadata"))
+      .getOrElse(throw new IllegalStateException(s"No partition metadata file 
found under $partitionPath"))
+    assertTrue(storage.deleteFile(partitionMetadataFile),
+      s"Failed to delete partition metadata file $partitionMetadataFile")
+  }
+
+  private def getFilesInAllPartitions(metadata: HoodieBackedTableMetadata): 
Seq[StoragePathInfo] = {
+    val partitionPaths = metadata.getAllPartitionPaths.asScala
+      .map(partitionPath => FSUtils.getAbsolutePartitionPath(new 
StoragePath(basePath), partitionPath).toString)
+      .asJava
+    
metadata.getAllFilesInPartitions(partitionPaths).values().asScala.flatMap(_.asScala).toSeq
+  }
+
   @ParameterizedTest
   @EnumSource(classOf[HoodieTableType])
   def testRLIForDeletesWithHoodieIsDeletedColumn(tableType: HoodieTableType): 
Unit = {

Reply via email to