This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.1.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit eeb07995b3ace8e0a0631433bc79c73dd20b6367 Author: Lin Liu <[email protected]> AuthorDate: Sun Oct 19 07:15:05 2025 -0700 fix: Skip payload class validation when merge mode is not custom with v9 tables (#14116) --- .../scala/org/apache/hudi/HoodieWriterUtils.scala | 17 ++++++-- .../org/apache/hudi/TestHoodieWriterUtils.java | 45 +++++++++++++++++++++- .../hudi/functional/TestSparkDataSource.scala | 22 +++++++++-- 3 files changed, 76 insertions(+), 8 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala index 520fd996b74f..fdc41bcfbd51 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala @@ -22,6 +22,7 @@ import org.apache.hudi.DataSourceOptionsHelper.allAlternatives import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieCommonConfig, HoodieConfig, TypedProperties} import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE +import org.apache.hudi.common.config.RecordMergeMode.CUSTOM import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord, OverwriteWithLatestAvroPayload, WriteOperationType} import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableVersion} import org.apache.hudi.common.util.StringUtils @@ -173,15 +174,15 @@ object HoodieWriterUtils { || key.equals(RECORD_MERGE_MODE.key()) || key.equals(RECORD_MERGE_STRATEGY_ID.key()))) - ignoreConfig = ignoreConfig || (key.equals(PAYLOAD_CLASS_NAME.key()) && shouldIgnorePayloadValidation(value, params, tableConfig)) + ignoreConfig = ignoreConfig || (key.equals(PAYLOAD_CLASS_NAME.key()) && shouldIgnorePayloadValidation(value, tableConfig)) // If hoodie.database.name is empty, ignore validation. ignoreConfig = ignoreConfig || (key.equals(HoodieTableConfig.DATABASE_NAME.key()) && isNullOrEmpty(getStringFromTableConfigWithAlternatives(tableConfig, key))) ignoreConfig } - def shouldIgnorePayloadValidation(value: String, params: Map[String, String], tableConfig: HoodieConfig): Boolean = { + def shouldIgnorePayloadValidation(incomingPayloadClass: String, tableConfig: HoodieConfig): Boolean = { //don't validate the payload only in the case that insert into is using fallback to some legacy configs - val ignoreConfig = value.equals(VALIDATE_DUPLICATE_KEY_PAYLOAD_CLASS_NAME) + val ignoreConfig = incomingPayloadClass.equals(VALIDATE_DUPLICATE_KEY_PAYLOAD_CLASS_NAME) if (ignoreConfig) { ignoreConfig } else { @@ -201,10 +202,18 @@ object HoodieWriterUtils { HoodieTableVersion.current() } + val recordMergeMode = tableConfig.getStringOrDefault(HoodieTableConfig.RECORD_MERGE_MODE.key(), "") if (tableVersion == HoodieTableVersion.EIGHT && initTableVersion.lesserThan(HoodieTableVersion.EIGHT) - && value.equals(classOf[OverwriteWithLatestAvroPayload].getName) + && incomingPayloadClass.equals(classOf[OverwriteWithLatestAvroPayload].getName) && tableConfig.getString(HoodieTableConfig.PAYLOAD_CLASS_NAME.key()).equals(classOf[DefaultHoodieRecordPayload].getName)) { true + } else if (tableVersion.greaterThanOrEquals(HoodieTableVersion.NINE) && !recordMergeMode.equals(CUSTOM.name)) { + // When table version >= v9, if the merge mode is not CUSTOM, we can safely skip payload class check + // since the payload class is ignored during these writes. Meanwhile, we should give a warning about this behavior. + if (!StringUtils.isNullOrEmpty(incomingPayloadClass)) { + log.warn(s"Payload class '$incomingPayloadClass' is ignored since merge behavior is determined by merge mode: $recordMergeMode") + } + true } else { ignoreConfig } diff --git a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieWriterUtils.java b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieWriterUtils.java index ee9ebef4cee7..8456074de934 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieWriterUtils.java +++ b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieWriterUtils.java @@ -18,6 +18,7 @@ package org.apache.hudi; import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.config.RecordMergeMode; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableConfig; @@ -35,6 +36,8 @@ import java.util.Properties; import static org.apache.hudi.common.testutils.HoodieTestUtils.getMetaClientBuilder; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; class TestHoodieWriterUtils extends HoodieClientTestBase { @@ -116,4 +119,44 @@ class TestHoodieWriterUtils extends HoodieClientTestBase { String result = HoodieWriterUtils.getKeyInTableConfig("my.custom.key", config); assertEquals("my.custom.key", result); } -} \ No newline at end of file + + @Test + void testShouldIgnorePayloadValidationVersion9WithCustomMergeMode() { + HoodieConfig config = new HoodieConfig(); + config.setValue(HoodieTableConfig.VERSION, String.valueOf(HoodieTableVersion.NINE.versionCode())); + config.setValue(HoodieTableConfig.RECORD_MERGE_MODE, RecordMergeMode.CUSTOM.name()); + + String payloadClass = "com.example.CustomPayload"; + assertFalse(HoodieWriterUtils.shouldIgnorePayloadValidation(payloadClass, config)); + } + + @Test + void testShouldIgnorePayloadValidationVersion9WithEmptyPayload() { + HoodieConfig config = new HoodieConfig(); + config.setValue(HoodieTableConfig.VERSION, String.valueOf(HoodieTableVersion.NINE.versionCode())); + config.setValue(HoodieTableConfig.RECORD_MERGE_MODE, RecordMergeMode.COMMIT_TIME_ORDERING.name()); + + String payloadClass = ""; + assertTrue(HoodieWriterUtils.shouldIgnorePayloadValidation(payloadClass, config)); + } + + @Test + void testShouldIgnorePayloadValidationVersion9WithCommitTimeOrdering() { + HoodieConfig config = new HoodieConfig(); + config.setValue(HoodieTableConfig.VERSION, String.valueOf(HoodieTableVersion.NINE.versionCode())); + config.setValue(HoodieTableConfig.RECORD_MERGE_MODE, RecordMergeMode.COMMIT_TIME_ORDERING.name()); + + String payloadClass = "com.example.CustomPayload"; + assertTrue(HoodieWriterUtils.shouldIgnorePayloadValidation(payloadClass, config)); + } + + @Test + void testShouldIgnorePayloadValidationVersion9WithEventTimeOrdering() { + HoodieConfig config = new HoodieConfig(); + config.setValue(HoodieTableConfig.VERSION, String.valueOf(HoodieTableVersion.NINE.versionCode())); + config.setValue(HoodieTableConfig.RECORD_MERGE_MODE, RecordMergeMode.EVENT_TIME_ORDERING.name()); + + String payloadClass = "com.example.CustomPayload"; + assertTrue(HoodieWriterUtils.shouldIgnorePayloadValidation(payloadClass, config)); + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala index 5ccde850c5b7..f5ee90920f5f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala @@ -42,6 +42,7 @@ import org.apache.spark.sql.functions.lit import org.apache.spark.sql.types.StructType import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertTrue} +import org.junit.jupiter.api.function.Executable import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.CsvSource @@ -475,14 +476,29 @@ class TestSparkDataSource extends SparkClientFunctionalTestHarness { .mode(SaveMode.Append) .save(basePath) }) - Assertions.assertThrows(classOf[HoodieException], () => { - df1.write.format("hudi") + if (mergeMode != RecordMergeMode.CUSTOM) { + Assertions.assertDoesNotThrow( + new Executable { + override def execute(): Unit = { + df1.write.format("hudi") + .option(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key, classOf[EventTimeAvroPayload].getName) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL) + .option(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key, "false") + .mode(SaveMode.Append) + .save(basePath) + } + } + ) + } else { + Assertions.assertThrows(classOf[HoodieException], () => { + df1.write.format("hudi") .option(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key, classOf[EventTimeAvroPayload].getName) .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL) .option(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key, "false") .mode(SaveMode.Append) .save(basePath) - }) + }) + } Assertions.assertThrows(classOf[HoodieException], () => { df1.write.format("hudi") .option(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key, HoodieRecordMerger.CUSTOM_MERGE_STRATEGY_UUID)
