This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a18bcd5bb034 fix: Skip payload class validation when merge mode is not 
custom with v9 tables (#14116)
a18bcd5bb034 is described below

commit a18bcd5bb034efc36d2df8ef6e184cc5ea13e885
Author: Lin Liu <[email protected]>
AuthorDate: Sun Oct 19 07:15:05 2025 -0700

    fix: Skip payload class validation when merge mode is not custom with v9 
tables (#14116)
---
 .../scala/org/apache/hudi/HoodieWriterUtils.scala  | 17 ++++++--
 .../org/apache/hudi/TestHoodieWriterUtils.java     | 45 +++++++++++++++++++++-
 .../hudi/functional/TestSparkDataSource.scala      | 22 +++++++++--
 3 files changed, 76 insertions(+), 8 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index 520fd996b74f..fdc41bcfbd51 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -22,6 +22,7 @@ import org.apache.hudi.DataSourceOptionsHelper.allAlternatives
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.common.config.{DFSPropertiesConfiguration, 
HoodieCommonConfig, HoodieConfig, TypedProperties}
 import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE
+import org.apache.hudi.common.config.RecordMergeMode.CUSTOM
 import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord, 
OverwriteWithLatestAvroPayload, WriteOperationType}
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableVersion}
 import org.apache.hudi.common.util.StringUtils
@@ -173,15 +174,15 @@ object HoodieWriterUtils {
       || key.equals(RECORD_MERGE_MODE.key())
       || key.equals(RECORD_MERGE_STRATEGY_ID.key())))
 
-    ignoreConfig = ignoreConfig || (key.equals(PAYLOAD_CLASS_NAME.key()) && 
shouldIgnorePayloadValidation(value, params, tableConfig))
+    ignoreConfig = ignoreConfig || (key.equals(PAYLOAD_CLASS_NAME.key()) && 
shouldIgnorePayloadValidation(value, tableConfig))
     // If hoodie.database.name is empty, ignore validation.
     ignoreConfig = ignoreConfig || 
(key.equals(HoodieTableConfig.DATABASE_NAME.key()) && 
isNullOrEmpty(getStringFromTableConfigWithAlternatives(tableConfig, key)))
     ignoreConfig
   }
 
-  def shouldIgnorePayloadValidation(value: String, params: Map[String, 
String], tableConfig: HoodieConfig): Boolean = {
+  def shouldIgnorePayloadValidation(incomingPayloadClass: String, tableConfig: 
HoodieConfig): Boolean = {
     //don't validate the payload only in the case that insert into is using 
fallback to some legacy configs
-    val ignoreConfig = value.equals(VALIDATE_DUPLICATE_KEY_PAYLOAD_CLASS_NAME)
+    val ignoreConfig = 
incomingPayloadClass.equals(VALIDATE_DUPLICATE_KEY_PAYLOAD_CLASS_NAME)
     if (ignoreConfig) {
        ignoreConfig
     } else {
@@ -201,10 +202,18 @@ object HoodieWriterUtils {
           HoodieTableVersion.current()
         }
 
+        val recordMergeMode = 
tableConfig.getStringOrDefault(HoodieTableConfig.RECORD_MERGE_MODE.key(), "")
         if (tableVersion == HoodieTableVersion.EIGHT && 
initTableVersion.lesserThan(HoodieTableVersion.EIGHT)
-          && value.equals(classOf[OverwriteWithLatestAvroPayload].getName)
+          && 
incomingPayloadClass.equals(classOf[OverwriteWithLatestAvroPayload].getName)
           && 
tableConfig.getString(HoodieTableConfig.PAYLOAD_CLASS_NAME.key()).equals(classOf[DefaultHoodieRecordPayload].getName))
 {
           true
+        } else if (tableVersion.greaterThanOrEquals(HoodieTableVersion.NINE) 
&& !recordMergeMode.equals(CUSTOM.name)) {
+          // When table version >= v9, if the merge mode is not CUSTOM, we can 
safely skip payload class check
+          // since the payload class is ignored during these writes. 
Meanwhile, we should give a warning about this behavior.
+          if (!StringUtils.isNullOrEmpty(incomingPayloadClass)) {
+            log.warn(s"Payload class '$incomingPayloadClass' is ignored since 
merge behavior is determined by merge mode: $recordMergeMode")
+          }
+          true
         } else {
           ignoreConfig
         }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieWriterUtils.java
 
b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieWriterUtils.java
index ee9ebef4cee7..8456074de934 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieWriterUtils.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieWriterUtils.java
@@ -18,6 +18,7 @@
 package org.apache.hudi;
 
 import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableConfig;
@@ -35,6 +36,8 @@ import java.util.Properties;
 
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.getMetaClientBuilder;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class TestHoodieWriterUtils extends HoodieClientTestBase {
 
@@ -116,4 +119,44 @@ class TestHoodieWriterUtils extends HoodieClientTestBase {
     String result = HoodieWriterUtils.getKeyInTableConfig("my.custom.key", 
config);
     assertEquals("my.custom.key", result);
   }
-}
\ No newline at end of file
+
+  @Test
+  void testShouldIgnorePayloadValidationVersion9WithCustomMergeMode() {
+    HoodieConfig config = new HoodieConfig();
+    config.setValue(HoodieTableConfig.VERSION, 
String.valueOf(HoodieTableVersion.NINE.versionCode()));
+    config.setValue(HoodieTableConfig.RECORD_MERGE_MODE, 
RecordMergeMode.CUSTOM.name());
+
+    String payloadClass = "com.example.CustomPayload";
+    assertFalse(HoodieWriterUtils.shouldIgnorePayloadValidation(payloadClass, 
config));
+  }
+
+  @Test
+  void testShouldIgnorePayloadValidationVersion9WithEmptyPayload() {
+    HoodieConfig config = new HoodieConfig();
+    config.setValue(HoodieTableConfig.VERSION, 
String.valueOf(HoodieTableVersion.NINE.versionCode()));
+    config.setValue(HoodieTableConfig.RECORD_MERGE_MODE, 
RecordMergeMode.COMMIT_TIME_ORDERING.name());
+
+    String payloadClass = "";
+    assertTrue(HoodieWriterUtils.shouldIgnorePayloadValidation(payloadClass, 
config));
+  }
+
+  @Test
+  void testShouldIgnorePayloadValidationVersion9WithCommitTimeOrdering() {
+    HoodieConfig config = new HoodieConfig();
+    config.setValue(HoodieTableConfig.VERSION, 
String.valueOf(HoodieTableVersion.NINE.versionCode()));
+    config.setValue(HoodieTableConfig.RECORD_MERGE_MODE, 
RecordMergeMode.COMMIT_TIME_ORDERING.name());
+
+    String payloadClass = "com.example.CustomPayload";
+    assertTrue(HoodieWriterUtils.shouldIgnorePayloadValidation(payloadClass, 
config));
+  }
+
+  @Test
+  void testShouldIgnorePayloadValidationVersion9WithEventTimeOrdering() {
+    HoodieConfig config = new HoodieConfig();
+    config.setValue(HoodieTableConfig.VERSION, 
String.valueOf(HoodieTableVersion.NINE.versionCode()));
+    config.setValue(HoodieTableConfig.RECORD_MERGE_MODE, 
RecordMergeMode.EVENT_TIME_ORDERING.name());
+
+    String payloadClass = "com.example.CustomPayload";
+    assertTrue(HoodieWriterUtils.shouldIgnorePayloadValidation(payloadClass, 
config));
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
index 5ccde850c5b7..f5ee90920f5f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
@@ -42,6 +42,7 @@ import org.apache.spark.sql.functions.lit
 import org.apache.spark.sql.types.StructType
 import org.junit.jupiter.api.Assertions
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertTrue}
+import org.junit.jupiter.api.function.Executable
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.CsvSource
 
@@ -475,14 +476,29 @@ class TestSparkDataSource extends 
SparkClientFunctionalTestHarness {
           .mode(SaveMode.Append)
           .save(basePath)
       })
-      Assertions.assertThrows(classOf[HoodieException], () => {
-        df1.write.format("hudi")
+      if (mergeMode != RecordMergeMode.CUSTOM) {
+        Assertions.assertDoesNotThrow(
+          new Executable {
+            override def execute(): Unit = {
+              df1.write.format("hudi")
+                .option(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key, 
classOf[EventTimeAvroPayload].getName)
+                .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
+                .option(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key, "false")
+                .mode(SaveMode.Append)
+                .save(basePath)
+            }
+          }
+        )
+      } else {
+        Assertions.assertThrows(classOf[HoodieException], () => {
+          df1.write.format("hudi")
           .option(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key, 
classOf[EventTimeAvroPayload].getName)
           .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
           .option(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key, "false")
           .mode(SaveMode.Append)
           .save(basePath)
-      })
+        })
+      }
       Assertions.assertThrows(classOf[HoodieException], () => {
         df1.write.format("hudi")
           .option(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key, 
HoodieRecordMerger.CUSTOM_MERGE_STRATEGY_UUID)

Reply via email to