This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a18bcd5bb034 fix: Skip payload class validation when merge mode is not
custom with v9 tables (#14116)
a18bcd5bb034 is described below
commit a18bcd5bb034efc36d2df8ef6e184cc5ea13e885
Author: Lin Liu <[email protected]>
AuthorDate: Sun Oct 19 07:15:05 2025 -0700
fix: Skip payload class validation when merge mode is not custom with v9
tables (#14116)
---
.../scala/org/apache/hudi/HoodieWriterUtils.scala | 17 ++++++--
.../org/apache/hudi/TestHoodieWriterUtils.java | 45 +++++++++++++++++++++-
.../hudi/functional/TestSparkDataSource.scala | 22 +++++++++--
3 files changed, 76 insertions(+), 8 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index 520fd996b74f..fdc41bcfbd51 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -22,6 +22,7 @@ import org.apache.hudi.DataSourceOptionsHelper.allAlternatives
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.config.{DFSPropertiesConfiguration,
HoodieCommonConfig, HoodieConfig, TypedProperties}
import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE
+import org.apache.hudi.common.config.RecordMergeMode.CUSTOM
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord,
OverwriteWithLatestAvroPayload, WriteOperationType}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableVersion}
import org.apache.hudi.common.util.StringUtils
@@ -173,15 +174,15 @@ object HoodieWriterUtils {
|| key.equals(RECORD_MERGE_MODE.key())
|| key.equals(RECORD_MERGE_STRATEGY_ID.key())))
- ignoreConfig = ignoreConfig || (key.equals(PAYLOAD_CLASS_NAME.key()) &&
shouldIgnorePayloadValidation(value, params, tableConfig))
+ ignoreConfig = ignoreConfig || (key.equals(PAYLOAD_CLASS_NAME.key()) &&
shouldIgnorePayloadValidation(value, tableConfig))
// If hoodie.database.name is empty, ignore validation.
ignoreConfig = ignoreConfig ||
(key.equals(HoodieTableConfig.DATABASE_NAME.key()) &&
isNullOrEmpty(getStringFromTableConfigWithAlternatives(tableConfig, key)))
ignoreConfig
}
- def shouldIgnorePayloadValidation(value: String, params: Map[String,
String], tableConfig: HoodieConfig): Boolean = {
+ def shouldIgnorePayloadValidation(incomingPayloadClass: String, tableConfig:
HoodieConfig): Boolean = {
//don't validate the payload only in the case that insert into is using
fallback to some legacy configs
- val ignoreConfig = value.equals(VALIDATE_DUPLICATE_KEY_PAYLOAD_CLASS_NAME)
+ val ignoreConfig =
incomingPayloadClass.equals(VALIDATE_DUPLICATE_KEY_PAYLOAD_CLASS_NAME)
if (ignoreConfig) {
ignoreConfig
} else {
@@ -201,10 +202,18 @@ object HoodieWriterUtils {
HoodieTableVersion.current()
}
+ val recordMergeMode =
tableConfig.getStringOrDefault(HoodieTableConfig.RECORD_MERGE_MODE.key(), "")
if (tableVersion == HoodieTableVersion.EIGHT &&
initTableVersion.lesserThan(HoodieTableVersion.EIGHT)
- && value.equals(classOf[OverwriteWithLatestAvroPayload].getName)
+ &&
incomingPayloadClass.equals(classOf[OverwriteWithLatestAvroPayload].getName)
&&
tableConfig.getString(HoodieTableConfig.PAYLOAD_CLASS_NAME.key()).equals(classOf[DefaultHoodieRecordPayload].getName))
{
true
+ } else if (tableVersion.greaterThanOrEquals(HoodieTableVersion.NINE)
&& !recordMergeMode.equals(CUSTOM.name)) {
+ // When table version >= v9, if the merge mode is not CUSTOM, we can
safely skip payload class check
+ // since the payload class is ignored during these writes.
Meanwhile, we should give a warning about this behavior.
+ if (!StringUtils.isNullOrEmpty(incomingPayloadClass)) {
+ log.warn(s"Payload class '$incomingPayloadClass' is ignored since
merge behavior is determined by merge mode: $recordMergeMode")
+ }
+ true
} else {
ignoreConfig
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieWriterUtils.java
b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieWriterUtils.java
index ee9ebef4cee7..8456074de934 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieWriterUtils.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieWriterUtils.java
@@ -18,6 +18,7 @@
package org.apache.hudi;
import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
@@ -35,6 +36,8 @@ import java.util.Properties;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.getMetaClientBuilder;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
class TestHoodieWriterUtils extends HoodieClientTestBase {
@@ -116,4 +119,44 @@ class TestHoodieWriterUtils extends HoodieClientTestBase {
String result = HoodieWriterUtils.getKeyInTableConfig("my.custom.key",
config);
assertEquals("my.custom.key", result);
}
-}
\ No newline at end of file
+
+ @Test
+ void testShouldIgnorePayloadValidationVersion9WithCustomMergeMode() {
+ HoodieConfig config = new HoodieConfig();
+ config.setValue(HoodieTableConfig.VERSION,
String.valueOf(HoodieTableVersion.NINE.versionCode()));
+ config.setValue(HoodieTableConfig.RECORD_MERGE_MODE,
RecordMergeMode.CUSTOM.name());
+
+ String payloadClass = "com.example.CustomPayload";
+ assertFalse(HoodieWriterUtils.shouldIgnorePayloadValidation(payloadClass,
config));
+ }
+
+ @Test
+ void testShouldIgnorePayloadValidationVersion9WithEmptyPayload() {
+ HoodieConfig config = new HoodieConfig();
+ config.setValue(HoodieTableConfig.VERSION,
String.valueOf(HoodieTableVersion.NINE.versionCode()));
+ config.setValue(HoodieTableConfig.RECORD_MERGE_MODE,
RecordMergeMode.COMMIT_TIME_ORDERING.name());
+
+ String payloadClass = "";
+ assertTrue(HoodieWriterUtils.shouldIgnorePayloadValidation(payloadClass,
config));
+ }
+
+ @Test
+ void testShouldIgnorePayloadValidationVersion9WithCommitTimeOrdering() {
+ HoodieConfig config = new HoodieConfig();
+ config.setValue(HoodieTableConfig.VERSION,
String.valueOf(HoodieTableVersion.NINE.versionCode()));
+ config.setValue(HoodieTableConfig.RECORD_MERGE_MODE,
RecordMergeMode.COMMIT_TIME_ORDERING.name());
+
+ String payloadClass = "com.example.CustomPayload";
+ assertTrue(HoodieWriterUtils.shouldIgnorePayloadValidation(payloadClass,
config));
+ }
+
+ @Test
+ void testShouldIgnorePayloadValidationVersion9WithEventTimeOrdering() {
+ HoodieConfig config = new HoodieConfig();
+ config.setValue(HoodieTableConfig.VERSION,
String.valueOf(HoodieTableVersion.NINE.versionCode()));
+ config.setValue(HoodieTableConfig.RECORD_MERGE_MODE,
RecordMergeMode.EVENT_TIME_ORDERING.name());
+
+ String payloadClass = "com.example.CustomPayload";
+ assertTrue(HoodieWriterUtils.shouldIgnorePayloadValidation(payloadClass,
config));
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
index 5ccde850c5b7..f5ee90920f5f 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
@@ -42,6 +42,7 @@ import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.types.StructType
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertTrue}
+import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource
@@ -475,14 +476,29 @@ class TestSparkDataSource extends
SparkClientFunctionalTestHarness {
.mode(SaveMode.Append)
.save(basePath)
})
- Assertions.assertThrows(classOf[HoodieException], () => {
- df1.write.format("hudi")
+ if (mergeMode != RecordMergeMode.CUSTOM) {
+ Assertions.assertDoesNotThrow(
+ new Executable {
+ override def execute(): Unit = {
+ df1.write.format("hudi")
+ .option(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key,
classOf[EventTimeAvroPayload].getName)
+ .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
+ .option(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key, "false")
+ .mode(SaveMode.Append)
+ .save(basePath)
+ }
+ }
+ )
+ } else {
+ Assertions.assertThrows(classOf[HoodieException], () => {
+ df1.write.format("hudi")
.option(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key,
classOf[EventTimeAvroPayload].getName)
.option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
.option(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key, "false")
.mode(SaveMode.Append)
.save(basePath)
- })
+ })
+ }
Assertions.assertThrows(classOf[HoodieException], () => {
df1.write.format("hudi")
.option(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key,
HoodieRecordMerger.CUSTOM_MERGE_STRATEGY_UUID)