This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5fb1b34a6c38 fix: Fail metadata bootstrap early in presence of 0 byte 
file (#18209)
5fb1b34a6c38 is described below

commit 5fb1b34a6c389ec1a4a3b332b22eb94851676da0
Author: Surya Prasanna <[email protected]>
AuthorDate: Wed Feb 25 13:53:16 2026 -0800

    fix: Fail metadata bootstrap early in presence of 0 byte file (#18209)
---
 .../hudi/metadata/HoodieMetadataPayload.java       |  8 ++-
 .../TestCopyOnWriteRollbackActionExecutor.java     |  3 +-
 .../hudi/functional/TestRecordLevelIndex.scala     | 79 ++++++++++++++++++++--
 3 files changed, 82 insertions(+), 8 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index 651fa61eedc5..ac509fce962e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -317,7 +317,13 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
 
     int size = filesAdded.size() + filesDeleted.size();
     Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>(size, 1);
-    filesAdded.forEach((fileName, fileSize) -> fileInfo.put(fileName, new 
HoodieMetadataFileInfo(fileSize, false)));
+    filesAdded.forEach((fileName, fileSize) -> {
+      // Assert that the file-size of the file being added is positive, since 
Hudi
+      // should not be creating empty files
+      checkState(fileSize > 0, "File name " + fileName
+          + ", is a 0 byte file. It does not have any contents");
+      fileInfo.put(fileName, new HoodieMetadataFileInfo(fileSize, false));
+    });
 
     filesDeleted.forEach(fileName -> fileInfo.put(fileName, 
DELETE_FILE_METADATA));
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
index 4a890bc2e1b7..343e406ddacd 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
@@ -380,7 +380,8 @@ public class TestCopyOnWriteRollbackActionExecutor extends 
HoodieClientRollbackT
     // we are using test table infra. So, col stats are not populated.
     HoodieTable table =
         this.getHoodieTable(metaClient, 
getConfigBuilder().withRollbackUsingMarkers(true).withRollbackBackupEnabled(true)
-            
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withMetadataIndexColumnStats(false).build())
+            .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false)
+                .withMetadataIndexColumnStats(false).build())
             .build());
     HoodieInstant needRollBackInstant = HoodieTestUtils.getCompleteInstant(
         metaClient.getStorage(), metaClient.getTimelinePath(),
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
index 367312fd6735..cf63ea915630 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
@@ -32,21 +32,22 @@ import 
org.apache.hudi.common.testutils.{HoodieTestDataGenerator, InProcessTimeG
 import 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.recordsToStrings
 import org.apache.hudi.common.util.{Option => HOption}
 import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, 
HoodieWriteConfig}
+import org.apache.hudi.exception.{HoodieException, HoodieMetadataException}
 import 
org.apache.hudi.functional.TestRecordLevelIndex.TestPartitionedRecordLevelIndexTestCase
 import org.apache.hudi.index.HoodieIndex.IndexType.RECORD_LEVEL_INDEX
 import org.apache.hudi.index.record.HoodieRecordIndex
-import org.apache.hudi.metadata.{HoodieBackedTableMetadata, 
HoodieTableMetadataUtil, MetadataPartitionType}
-import org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath
+import org.apache.hudi.metadata.{HoodieBackedTableMetadata, 
HoodieTableMetadata, HoodieTableMetadataUtil, MetadataPartitionType}
 import org.apache.hudi.storage.{StoragePath, StoragePathInfo}
 import 
org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy
 
 import org.apache.spark.sql.{Row, SaveMode}
 import org.apache.spark.sql.functions.lit
 import org.junit.jupiter.api.{Tag, Test}
-import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, 
assertFalse, assertTrue, fail}
+import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, 
assertFalse, assertThrows, assertTrue, fail}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource, 
ValueSource}
 
+import java.io.{PrintWriter, StringWriter}
 import java.util
 import java.util.stream.Collectors
 
@@ -105,7 +106,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase 
with SparkDatasetMix
     assertEquals(recordKeys.size(), 
getRecordIndexEntries(metadataBeforeRebootstrap, recordKeys, 
localDataGen.getPartitionPaths.toSeq).size,
       "Record index entries should match inserted records after first batch")
 
-    assertTrue(storage.exists(new 
StoragePath(getMetadataTableBasePath(basePath))),
+    assertTrue(storage.exists(new 
StoragePath(HoodieTableMetadata.getMetadataTableBasePath(basePath))),
       "Metadata table should exist before deletion")
 
     // Remove _hoodie_partition_metadata from one data partition.
@@ -114,7 +115,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase 
with SparkDatasetMix
     // Delete metadata table and force a full metadata rebootstrap.
     metaClient = HoodieTableMetaClient.reload(metaClient)
     HoodieTableMetadataUtil.deleteMetadataTable(metaClient, context, false)
-    assertFalse(storage.exists(new 
StoragePath(getMetadataTableBasePath(basePath))),
+    assertFalse(storage.exists(new 
StoragePath(HoodieTableMetadata.getMetadataTableBasePath(basePath))),
       "Metadata table should be removed before rebootstrap")
 
     // Rebootstrap should succeed even when one partition metadata file is 
missing.
@@ -123,7 +124,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase 
with SparkDatasetMix
     val metadataAfterRebootstrap = 
metadataWriter(writeConfig).getTableMetadata.asInstanceOf[HoodieBackedTableMetadata]
 
     // Verify the record_index partition is created after rebootstrap.
-    val recordIndexPath = new StoragePath(getMetadataTableBasePath(basePath), 
MetadataPartitionType.RECORD_INDEX.getPartitionPath)
+    val recordIndexPath = new 
StoragePath(HoodieTableMetadata.getMetadataTableBasePath(basePath), 
MetadataPartitionType.RECORD_INDEX.getPartitionPath)
     assertTrue(storage.exists(recordIndexPath),
       "Record index partition should exist after metadata rebootstrap")
 
@@ -609,6 +610,72 @@ class TestRecordLevelIndex extends 
RecordLevelIndexTestBase with SparkDatasetMix
     validateDataAndRecordIndices(hudiOpts, 
spark.read.json(spark.sparkContext.parallelize(recordsToStrings(deletedRecords).asScala.toSeq,
 1)))
     deleteDf.unpersist()
   }
+
+  @Test
+  def testRecordIndexRebootstrapWithZeroByteBaseFile(): Unit = {
+    val insertedRecords = 30
+    val localDataGen = new HoodieTestDataGenerator()
+    val inserts = localDataGen.generateInserts("001", insertedRecords)
+    val insertDf = toDataset(spark, inserts)
+    val optionsWithoutRecordIndex = Map(HoodieWriteConfig.TBL_NAME.key -> 
"hoodie_test",
+      DataSourceWriteOptions.TABLE_TYPE.key -> 
HoodieTableType.COPY_ON_WRITE.name(),
+      RECORDKEY_FIELD.key -> "_row_key",
+      PARTITIONPATH_FIELD.key -> "partition_path",
+      HoodieTableConfig.ORDERING_FIELDS.key -> "timestamp",
+      HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key() -> 
"false",
+      HoodieMetadataConfig.RECORD_LEVEL_INDEX_ENABLE_PROP.key() -> "false",
+      HoodieCompactionConfig.INLINE_COMPACT.key() -> "false")
+
+    // Create first commit with record_index disabled.
+    insertDf.write.format("hudi")
+      .options(optionsWithoutRecordIndex)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+    assertEquals(insertedRecords, 
spark.read.format("hudi").load(basePath).count())
+
+    // Corrupt one base parquet file by replacing it with an empty file.
+    val corruptedBaseFileName = 
replaceOneBaseFileWithEmpty(localDataGen.getPartitionPaths.toSeq)
+
+    // Delete metadata table to force rebootstrap.
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    HoodieTableMetadataUtil.deleteMetadataTable(metaClient, context, false)
+    assertFalse(storage.exists(new 
StoragePath(HoodieTableMetadata.getMetadataTableBasePath(basePath))),
+      "Metadata table should be removed before rebootstrap")
+
+    // Rebootstrap metadata with record_index enabled should still succeed.
+    metaClient.reloadActiveTimeline()
+    val latestSchema = new 
TableSchemaResolver(metaClient).getTableSchemaFromLatestCommit(false).get().toString
+    val optionsWithRecordIndex = optionsWithoutRecordIndex ++ Map(
+      HoodieMetadataConfig.RECORD_LEVEL_INDEX_ENABLE_PROP.key() -> "true",
+      HoodieIndexConfig.INDEX_TYPE.key() -> RECORD_LEVEL_INDEX.name(),
+      HoodieWriteConfig.AVRO_SCHEMA_STRING.key() -> latestSchema)
+    val writeConfig = getWriteConfig(optionsWithRecordIndex)
+    try {
+      metadataWriter(writeConfig).getTableMetadata
+    } catch {
+      case e: HoodieMetadataException =>
+        val stackTraceWriter = new StringWriter()
+        e.printStackTrace(new PrintWriter(stackTraceWriter))
+        val stackTraceText = stackTraceWriter.toString
+        assertTrue(stackTraceText.contains(corruptedBaseFileName),
+          s"Expected HoodieMetadataException stack trace to contain corrupted 
file name: $corruptedBaseFileName")
+      case t: Throwable =>
+        fail(s"Expected HoodieMetadataException but got ${t.getClass.getName}: 
${t.getMessage}")
+    }
+  }
+
+  private def replaceOneBaseFileWithEmpty(partitionPaths: Seq[String]): String 
= {
+    val candidateBaseFile = partitionPaths.view.flatMap { partition =>
+      storage.listDirectEntries(new StoragePath(basePath, partition)).asScala
+        .map(_.getPath)
+        .find(path => path.getName.endsWith(".parquet"))
+    }.headOption.getOrElse(throw new IllegalStateException("No base file found 
to replace with empty file"))
+    assertTrue(storage.deleteFile(candidateBaseFile),
+      s"Failed to delete base file $candidateBaseFile")
+    assertTrue(storage.createNewFile(candidateBaseFile),
+      s"Failed to create empty replacement file $candidateBaseFile")
+    candidateBaseFile.getName
+  }
 }
 
 object TestRecordLevelIndex {

Reply via email to