This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b80239a1d61 fix: Disable column stats and partition stats indices for 
Lance base files (#18588)
0b80239a1d61 is described below

commit 0b80239a1d61b7de0d1346c16560a61eef527271
Author: Y Ethan Guo <[email protected]>
AuthorDate: Tue May 19 16:30:51 2026 -0700

    fix: Disable column stats and partition stats indices for Lance base files 
(#18588)
---
 .../hudi/metadata/MetadataPartitionType.java       | 13 +++-
 .../hudi/functional/TestLanceDataSource.scala      | 75 +++++++++++++++-------
 2 files changed, 63 insertions(+), 25 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
index 7944f0018127..04bd9bdab264 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
@@ -25,6 +25,7 @@ import org.apache.hudi.avro.model.HoodieRecordIndexInfo;
 import org.apache.hudi.avro.model.HoodieSecondaryIndexInfo;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.function.SerializableBiFunction;
+import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieIndexDefinition;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -108,7 +109,11 @@ public enum MetadataPartitionType {
   COLUMN_STATS(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, 
"col-stats-", 3) {
     @Override
     public boolean isMetadataPartitionEnabled(HoodieMetadataConfig 
metadataConfig, HoodieTableConfig tableConfig) {
-      return metadataConfig.isColumnStatsIndexEnabled();
+      // Lance base files do not yet emit column-range metadata, so per-file 
column stats
+      // aggregate as empty entries and silently prune everything on read. 
Disable until
+      // HoodieTableMetadataUtil#readColumnRangeMetadataFrom has a LANCE 
branch.
+      return tableConfig.getBaseFileFormat() != HoodieFileFormat.LANCE
+          && metadataConfig.isColumnStatsIndexEnabled();
     }
 
     @Override
@@ -240,7 +245,11 @@ public enum MetadataPartitionType {
   PARTITION_STATS(HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS, 
"partition-stats-", 6) {
     @Override
     public boolean isMetadataPartitionEnabled(HoodieMetadataConfig 
metadataConfig, HoodieTableConfig tableConfig) {
-      return tableConfig.isTablePartitioned() && 
metadataConfig.isPartitionStatsIndexEnabled();
+      // Partition stats aggregate per-file column ranges. Lance base files 
contribute none
+      // (see HoodieTableMetadataUtil#readColumnRangeMetadataFrom), so 
partition records end
+      // up with empty ranges and the partition stats index prunes everything 
on read.
+      return tableConfig.getBaseFileFormat() != HoodieFileFormat.LANCE
+          && tableConfig.isTablePartitioned() && 
metadataConfig.isPartitionStatsIndexEnabled();
     }
 
     @Override
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
index 1cd5647d9949..093b6ee30873 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
@@ -29,6 +29,7 @@ import 
org.apache.hudi.common.table.view.{FileSystemViewManager, FileSystemViewS
 import org.apache.hudi.common.testutils.HoodieTestUtils
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.io.storage.HoodieSparkLanceReader
+import org.apache.hudi.metadata.MetadataPartitionType
 import org.apache.hudi.storage.StoragePath
 import org.apache.hudi.testutils.HoodieSparkClientTestBase
 
@@ -88,6 +89,23 @@ class TestLanceDataSource extends HoodieSparkClientTestBase {
     }
   }
 
+  // For MOR tables, a compaction is recorded on the active timeline with 
action == "commit"
+  // (vs. "deltacommit" for regular writes). Builds a fresh meta client so 
callers see the
+  // current on-disk timeline state.
+  private def assertCompactionCommitPresence(tablePath: String, expectPresent: 
Boolean, message: String): Unit = {
+    val compactionCommits = HoodieTableMetaClient.builder()
+      .setConf(HoodieTestUtils.getDefaultStorageConf)
+      .setBasePath(tablePath)
+      .build()
+      .getActiveTimeline.filterCompletedInstants().getInstants.asScala
+      .filter(instant => instant.getAction == "commit")
+    if (expectPresent) {
+      assertTrue(compactionCommits.nonEmpty, message)
+    } else {
+      assertTrue(compactionCommits.isEmpty, message)
+    }
+  }
+
   @ParameterizedTest
   @EnumSource(value = classOf[HoodieTableType])
   def testBasicWriteAndRead(tableType: HoodieTableType): Unit = {
@@ -1149,6 +1167,7 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
     // Disable small file handling so the next insert creates a new file group
     // and updates in MOR generate log file(s)
     spark.sql(s"alter table $tableName set tblproperties 
('hoodie.merge.small.file.group.candidates.limit' = '0')")
+    spark.sql(s"alter table $tableName set tblproperties 
('hoodie.compact.inline.max.delta.commits' = '6')")
 
     // Test 3: INSERT with subset of columns (null handling)
     spark.sql(s"""
@@ -1174,15 +1193,18 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
     )
 
     // Test 5: DELETE a row
-    // TODO(#18558): test DELETE with MOR table type once the bug is fixed
-    if (tableType == HoodieTableType.COPY_ON_WRITE) {
-      spark.sql(s"delete from $tableName where id = 3")
+    spark.sql(s"delete from $tableName where id = 3")
+    checkAnswer(s"select id, name, age, score, dt from $tableName order by 
id")(
+      Seq(1, "Alice", 31, 99.9, "2025-01-01"),
+      Seq(2, "Bob", 25, 87.3, "2025-01-02"),
+      Seq(4, "Diana", 40, null, "2025-01-01")
+    )
 
-      checkAnswer(s"select id, name, age, score, dt from $tableName order by 
id")(
-        Seq(1, "Alice", 31, 99.9, "2025-01-01"),
-        Seq(2, "Bob", 25, 87.3, "2025-01-02"),
-        Seq(4, "Diana", 40, null, "2025-01-01")
-      )
+    // For MOR: 5 deltacommits so far (insert x3, update, delete) — below the
+    // max.delta.commits=6 threshold, so no inline compaction should have run 
yet.
+    if (tableType == HoodieTableType.MERGE_ON_READ) {
+      assertCompactionCommitPresence(tablePath, expectPresent = false,
+        "No compaction commit should be present before max.delta.commits=6 
threshold is reached")
     }
 
     // Test 6: INSERT with static partition (only for partitioned tables)
@@ -1192,21 +1214,18 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
         values (28, 5, 'Eve')
       """.stripMargin)
 
-      if (tableType == HoodieTableType.COPY_ON_WRITE) {
-        checkAnswer(s"select id, name, age, score, dt from $tableName order by 
id")(
-          Seq(1, "Alice", 31, 99.9, "2025-01-01"),
-          Seq(2, "Bob", 25, 87.3, "2025-01-02"),
-          Seq(4, "Diana", 40, null, "2025-01-01"),
-          Seq(5, "Eve", 28, null, "2025-01-05")
-        )
-      } else {
-        checkAnswer(s"select id, name, age, score, dt from $tableName order by 
id")(
-          Seq(1, "Alice", 31, 99.9, "2025-01-01"),
-          Seq(2, "Bob", 25, 87.3, "2025-01-02"),
-          Seq(3, "Charlie", 35, 92.1, "2025-01-02"),
-          Seq(4, "Diana", 40, null, "2025-01-01"),
-          Seq(5, "Eve", 28, null, "2025-01-05")
-        )
+      checkAnswer(s"select id, name, age, score, dt from $tableName order by 
id")(
+        Seq(1, "Alice", 31, 99.9, "2025-01-01"),
+        Seq(2, "Bob", 25, 87.3, "2025-01-02"),
+        Seq(4, "Diana", 40, null, "2025-01-01"),
+        Seq(5, "Eve", 28, null, "2025-01-05")
+      )
+
+      // For MOR: this is the 6th deltacommit, which should trigger inline 
compaction
+      // (HoodieSparkSqlWriter auto-enables hoodie.compact.inline for MOR 
batch writes).
+      if (tableType == HoodieTableType.MERGE_ON_READ) {
+        assertCompactionCommitPresence(tablePath, expectPresent = true,
+          "Inline compaction commit should be present after 6th deltacommit")
       }
     }
 
@@ -1219,6 +1238,16 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
     val baseFileFormat = metaClient.getTableConfig.getBaseFileFormat
     assertEquals(HoodieFileFormat.LANCE, baseFileFormat,
                  "Table should use Lance base file format")
+
+    // Column stats and partition stats indices are gated off for Lance base 
files in
+    // MetadataPartitionType — per-file column ranges aren't emitted for Lance 
yet, and
+    // empty ranges would silently prune everything on read. Confirm the 
metadata table
+    // never initialized these partitions.
+    val tableConfig = metaClient.getTableConfig
+    
assertFalse(tableConfig.isMetadataPartitionAvailable(MetadataPartitionType.COLUMN_STATS),
+      "Column stats metadata partition must not be initialized for Lance 
tables")
+    
assertFalse(tableConfig.isMetadataPartitionAvailable(MetadataPartitionType.PARTITION_STATS),
+      "Partition stats metadata partition must not be initialized for Lance 
tables")
   }
 
   /**

Reply via email to