This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.2.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit c07937a5e666ea0fd5eeaa4d658270ed435d9aca Author: Y Ethan Guo <[email protected]> AuthorDate: Tue May 19 16:30:51 2026 -0700 fix: Disable column stats and partition stats indices for Lance base files (#18588) --- .../hudi/metadata/MetadataPartitionType.java | 13 +++- .../hudi/functional/TestLanceDataSource.scala | 75 +++++++++++++++------- 2 files changed, 63 insertions(+), 25 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java index 7944f0018127..04bd9bdab264 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java @@ -25,6 +25,7 @@ import org.apache.hudi.avro.model.HoodieRecordIndexInfo; import org.apache.hudi.avro.model.HoodieSecondaryIndexInfo; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.function.SerializableBiFunction; +import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieIndexDefinition; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -108,7 +109,11 @@ public enum MetadataPartitionType { COLUMN_STATS(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, "col-stats-", 3) { @Override public boolean isMetadataPartitionEnabled(HoodieMetadataConfig metadataConfig, HoodieTableConfig tableConfig) { - return metadataConfig.isColumnStatsIndexEnabled(); + // Lance base files do not yet emit column-range metadata, so per-file column stats + // aggregate as empty entries and silently prune everything on read. Disable until + // HoodieTableMetadataUtil#readColumnRangeMetadataFrom has a LANCE branch. + return tableConfig.getBaseFileFormat() != HoodieFileFormat.LANCE + && metadataConfig.isColumnStatsIndexEnabled(); } @Override @@ -240,7 +245,11 @@ public enum MetadataPartitionType { PARTITION_STATS(HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS, "partition-stats-", 6) { @Override public boolean isMetadataPartitionEnabled(HoodieMetadataConfig metadataConfig, HoodieTableConfig tableConfig) { - return tableConfig.isTablePartitioned() && metadataConfig.isPartitionStatsIndexEnabled(); + // Partition stats aggregate per-file column ranges. Lance base files contribute none + // (see HoodieTableMetadataUtil#readColumnRangeMetadataFrom), so partition records end + // up with empty ranges and the partition stats index prunes everything on read. + return tableConfig.getBaseFileFormat() != HoodieFileFormat.LANCE + && tableConfig.isTablePartitioned() && metadataConfig.isPartitionStatsIndexEnabled(); } @Override diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala index 1cd5647d9949..093b6ee30873 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala @@ -29,6 +29,7 @@ import org.apache.hudi.common.table.view.{FileSystemViewManager, FileSystemViewS import org.apache.hudi.common.testutils.HoodieTestUtils import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.io.storage.HoodieSparkLanceReader +import org.apache.hudi.metadata.MetadataPartitionType import org.apache.hudi.storage.StoragePath import org.apache.hudi.testutils.HoodieSparkClientTestBase @@ -88,6 +89,23 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { } } + // For MOR tables, a compaction is recorded on the active timeline with action == "commit" + // (vs. "deltacommit" for regular writes). Builds a fresh meta client so callers see the + // current on-disk timeline state. + private def assertCompactionCommitPresence(tablePath: String, expectPresent: Boolean, message: String): Unit = { + val compactionCommits = HoodieTableMetaClient.builder() + .setConf(HoodieTestUtils.getDefaultStorageConf) + .setBasePath(tablePath) + .build() + .getActiveTimeline.filterCompletedInstants().getInstants.asScala + .filter(instant => instant.getAction == "commit") + if (expectPresent) { + assertTrue(compactionCommits.nonEmpty, message) + } else { + assertTrue(compactionCommits.isEmpty, message) + } + } + @ParameterizedTest @EnumSource(value = classOf[HoodieTableType]) def testBasicWriteAndRead(tableType: HoodieTableType): Unit = { @@ -1149,6 +1167,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { // Disable small file handling so the next insert creates a new file group // and updates in MOR generate log file(s) spark.sql(s"alter table $tableName set tblproperties ('hoodie.merge.small.file.group.candidates.limit' = '0')") + spark.sql(s"alter table $tableName set tblproperties ('hoodie.compact.inline.max.delta.commits' = '6')") // Test 3: INSERT with subset of columns (null handling) spark.sql(s""" @@ -1174,15 +1193,18 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { ) // Test 5: DELETE a row - // TODO(#18558): test DELETE with MOR table type once the bug is fixed - if (tableType == HoodieTableType.COPY_ON_WRITE) { - spark.sql(s"delete from $tableName where id = 3") + spark.sql(s"delete from $tableName where id = 3") + checkAnswer(s"select id, name, age, score, dt from $tableName order by id")( + Seq(1, "Alice", 31, 99.9, "2025-01-01"), + Seq(2, "Bob", 25, 87.3, "2025-01-02"), + Seq(4, "Diana", 40, null, "2025-01-01") + ) - checkAnswer(s"select id, name, age, score, dt from $tableName order by id")( - Seq(1, "Alice", 31, 99.9, "2025-01-01"), - Seq(2, "Bob", 25, 87.3, "2025-01-02"), - Seq(4, "Diana", 40, null, "2025-01-01") - ) + // For MOR: 5 deltacommits so far (insert x3, update, delete) — below the + // max.delta.commits=6 threshold, so no inline compaction should have run yet. + if (tableType == HoodieTableType.MERGE_ON_READ) { + assertCompactionCommitPresence(tablePath, expectPresent = false, + "No compaction commit should be present before max.delta.commits=6 threshold is reached") } // Test 6: INSERT with static partition (only for partitioned tables) @@ -1192,21 +1214,18 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { values (28, 5, 'Eve') """.stripMargin) - if (tableType == HoodieTableType.COPY_ON_WRITE) { - checkAnswer(s"select id, name, age, score, dt from $tableName order by id")( - Seq(1, "Alice", 31, 99.9, "2025-01-01"), - Seq(2, "Bob", 25, 87.3, "2025-01-02"), - Seq(4, "Diana", 40, null, "2025-01-01"), - Seq(5, "Eve", 28, null, "2025-01-05") - ) - } else { - checkAnswer(s"select id, name, age, score, dt from $tableName order by id")( - Seq(1, "Alice", 31, 99.9, "2025-01-01"), - Seq(2, "Bob", 25, 87.3, "2025-01-02"), - Seq(3, "Charlie", 35, 92.1, "2025-01-02"), - Seq(4, "Diana", 40, null, "2025-01-01"), - Seq(5, "Eve", 28, null, "2025-01-05") - ) + checkAnswer(s"select id, name, age, score, dt from $tableName order by id")( + Seq(1, "Alice", 31, 99.9, "2025-01-01"), + Seq(2, "Bob", 25, 87.3, "2025-01-02"), + Seq(4, "Diana", 40, null, "2025-01-01"), + Seq(5, "Eve", 28, null, "2025-01-05") + ) + + // For MOR: this is the 6th deltacommit, which should trigger inline compaction + // (HoodieSparkSqlWriter auto-enables hoodie.compact.inline for MOR batch writes). + if (tableType == HoodieTableType.MERGE_ON_READ) { + assertCompactionCommitPresence(tablePath, expectPresent = true, + "Inline compaction commit should be present after 6th deltacommit") } } @@ -1219,6 +1238,16 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { val baseFileFormat = metaClient.getTableConfig.getBaseFileFormat assertEquals(HoodieFileFormat.LANCE, baseFileFormat, "Table should use Lance base file format") + + // Column stats and partition stats indices are gated off for Lance base files in + // MetadataPartitionType — per-file column ranges aren't emitted for Lance yet, and + // empty ranges would silently prune everything on read. Confirm the metadata table + // never initialized these partitions. + val tableConfig = metaClient.getTableConfig + assertFalse(tableConfig.isMetadataPartitionAvailable(MetadataPartitionType.COLUMN_STATS), + "Column stats metadata partition must not be initialized for Lance tables") + assertFalse(tableConfig.isMetadataPartitionAvailable(MetadataPartitionType.PARTITION_STATS), + "Partition stats metadata partition must not be initialized for Lance tables") } /**
