This is an automated email from the ASF dual-hosted git repository. zhangyue19921010 pushed a commit to branch partition-bucket-index in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 9e3202256ada593b5fc2ca17d722422af28e1a30 Author: Lin Liu <[email protected]> AuthorDate: Wed Mar 19 20:16:23 2025 -0700 [HUDI-9120] Fix merge mode inference for table version 6 in file group reader (#12991) Co-authored-by: Y Ethan Guo <[email protected]> --- .../table/read/HoodieFileGroupReaderSchemaHandler.java | 15 ++++++++++++--- .../table/read/TestHoodieFileGroupRecordBuffer.java | 6 +++++- .../org/apache/spark/sql/hudi/common/TestSqlConf.scala | 5 +++-- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java index f007f9ef400..15f7b161518 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java @@ -27,10 +27,12 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordMerger; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.util.LocalAvroSchemaCache; +import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.VisibleForTesting; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.common.util.collection.Triple; import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter; @@ -183,8 +185,15 @@ public class HoodieFileGroupReaderSchemaHandler<T> { } private static String[] getMandatoryFieldsForMerging(HoodieTableConfig cfg, TypedProperties props, - Schema dataSchema, Option<HoodieRecordMerger> recordMerger) { - if (cfg.getRecordMergeMode() == RecordMergeMode.CUSTOM) { + Schema dataSchema, Option<HoodieRecordMerger> recordMerger) { + Triple<RecordMergeMode, String, String> mergingConfigs = HoodieTableConfig.inferCorrectMergingBehavior( + cfg.getRecordMergeMode(), + cfg.getPayloadClass(), + cfg.getRecordMergeStrategyId(), + cfg.getPreCombineField(), + HoodieTableVersion.current()); + + if (mergingConfigs.getLeft() == RecordMergeMode.CUSTOM) { return recordMerger.get().getMandatoryFieldsForMerging(dataSchema, cfg, props); } @@ -199,7 +208,7 @@ public class HoodieFileGroupReaderSchemaHandler<T> { } } - if (cfg.getRecordMergeMode() == RecordMergeMode.EVENT_TIME_ORDERING) { + if (mergingConfigs.getLeft() == RecordMergeMode.EVENT_TIME_ORDERING) { String preCombine = cfg.getPreCombineField(); if (!StringUtils.isNullOrEmpty(preCombine)) { requiredFields.add(preCombine); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupRecordBuffer.java b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupRecordBuffer.java index ca3a09d96df..ee369410c8e 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupRecordBuffer.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupRecordBuffer.java @@ -77,7 +77,11 @@ public class TestHoodieFileGroupRecordBuffer { "true, true, CUSTOM", "true, false, CUSTOM", "false, true, CUSTOM", - "false, false, CUSTOM" + "false, false, CUSTOM", + "true, true,", + "true, false,", + "false, true,", + "false, false," }) public void testSchemaForMandatoryFields(boolean setPrecombine, boolean addHoodieIsDeleted, RecordMergeMode mergeMode) { HoodieReaderContext readerContext = mock(HoodieReaderContext.class); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala index e02a9b30c63..b4031935406 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hudi.common import org.apache.hudi.DataSourceReadOptions._ -import org.apache.hudi.common.config.{DFSPropertiesConfiguration, RecordMergeMode} +import org.apache.hudi.common.config.DFSPropertiesConfiguration import org.apache.hudi.common.model.HoodieTableType import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.testutils.HoodieTestUtils @@ -82,7 +82,8 @@ class TestSqlConf extends HoodieSparkSqlTestBase with BeforeAndAfter { assertResult(true)(Files.exists(Paths.get(s"$tablePath/$partitionVal"))) assertResult(HoodieTableType.MERGE_ON_READ)(new HoodieTableConfig( HoodieStorageUtils.getStorage(tablePath, HoodieTestUtils.getDefaultStorageConf), - new StoragePath(tablePath, HoodieTableMetaClient.METAFOLDER_NAME), RecordMergeMode.COMMIT_TIME_ORDERING, null, null).getTableType) + new StoragePath(tablePath, HoodieTableMetaClient.METAFOLDER_NAME), + null, null, null, false).getTableType) // Manually pass incremental configs to global configs to make sure Hudi query is able to load the // global configs
