This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new cb44650c9491 [HUDI-9728] Set record merge properties properly for 
write and read paths (#13738)
cb44650c9491 is described below

commit cb44650c9491029d5c7eace9b938eef254b4ea99
Author: Lin Liu <[email protected]>
AuthorDate: Fri Aug 22 12:21:36 2025 -0700

    [HUDI-9728] Set record merge properties properly for write and read paths 
(#13738)
    
    Co-authored-by: danny0405 <[email protected]>
---
 .../java/org/apache/hudi/table/HoodieTable.java    |   8 +-
 .../apache/hudi/table/TestHoodieSparkTable.java    |   2 +
 .../hudi/common/engine/HoodieReaderContext.java    |  23 +-
 .../hudi/common/model/HoodieRecordPayload.java     |   3 -
 .../hudi/common/table/HoodieTableConfig.java       |   8 +-
 .../table/read/BufferedRecordMergerFactory.java    |   2 +
 .../org/apache/hudi/common/util/ConfigUtils.java   |  14 ++
 .../table/read/TestPartialUpdateHandler.java       |  13 ++
 .../apache/hudi/common/util/TestConfigUtils.java   |  39 ++++
 .../apache/hudi/utils/TestFlinkWriteClients.java   |   5 +-
 .../functional/TestPayloadDeprecationFlow.scala    | 260 +++++++++++++++++----
 .../deltastreamer/TestHoodieDeltaStreamer.java     |   2 +-
 12 files changed, 312 insertions(+), 67 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index fc87ac6fb5b8..5c8e2d1cd680 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -810,10 +810,10 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
    * @throws HoodieIOException
    */
   void reconcileAgainstMarkers(HoodieEngineContext context,
-                                         String instantTs,
-                                         List<HoodieWriteStat> stats,
-                                         boolean consistencyCheckEnabled,
-                                         boolean 
shouldFailOnDuplicateDataFileDetection,
+                               String instantTs,
+                               List<HoodieWriteStat> stats,
+                               boolean consistencyCheckEnabled,
+                               boolean shouldFailOnDuplicateDataFileDetection,
                                WriteMarkers markers) throws HoodieIOException {
     try {
       // Reconcile marker and data files with WriteStats so that partially 
written data-files due to failed
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieSparkTable.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieSparkTable.java
index 6cc1d42a2a66..e50bb9e5ecb7 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieSparkTable.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieSparkTable.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.table;
 
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.marker.MarkerType;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
@@ -84,6 +85,7 @@ public class TestHoodieSparkTable extends 
HoodieCommonTestHarness {
     when(metaClient.getTableType()).thenReturn(HoodieTableType.COPY_ON_WRITE);
     HoodieStorage storage = mock(HoodieStorage.class);
     when(metaClient.getStorage()).thenReturn(storage);
+    when(metaClient.getTableConfig()).thenReturn(new HoodieTableConfig());
 
     additionalFiles.forEach(fileName -> {
       try {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index fa9c0658e7c0..11ff79eacff7 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.table.read.BufferedRecord;
 import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
 import org.apache.hudi.common.table.read.IteratorMode;
+import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.SizeEstimator;
@@ -52,10 +53,13 @@ import org.apache.avro.Schema;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 import static 
org.apache.hudi.common.config.HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_DEPRECATED_WRITE_CONFIG_KEY;
 import static 
org.apache.hudi.common.config.HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY;
 import static 
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.inferMergingConfigsForPreV9Table;
 
 /**
@@ -189,6 +193,10 @@ public abstract class HoodieReaderContext<T> {
     return storageConfiguration;
   }
 
+  public TypedProperties getMergeProps(TypedProperties props) {
+    return ConfigUtils.getMergeProps(props, this.tableConfig.getProps());
+  }
+
   public Option<Predicate> getKeyFilterOpt() {
     return keyFilterOpt;
   }
@@ -276,10 +284,17 @@ public abstract class HoodieReaderContext<T> {
     HoodieTableVersion tableVersion = tableConfig.getTableVersion();
     // If the provided payload class differs from the table's payload class, 
we need to infer the correct merging behavior.
     if (isIngestion && writerPayloadClass.map(className -> 
!className.equals(tableConfig.getPayloadClass())).orElse(false)) {
-      Triple<RecordMergeMode, String, String> triple = 
HoodieTableConfig.inferMergingConfigsForWrites(null, writerPayloadClass.get(), 
null,
-          tableConfig.getOrderingFieldsStr().orElse(null), tableVersion);
-      recordMergeMode = triple.getLeft();
-      mergeStrategyId = triple.getRight();
+      if (tableVersion.greaterThanOrEquals(HoodieTableVersion.NINE)) {
+        Map<String, String> mergeProperties = 
HoodieTableConfig.inferMergingConfigsForV9TableCreation(
+            null, writerPayloadClass.get(), null, 
tableConfig.getOrderingFieldsStr().orElse(null), tableVersion);
+        recordMergeMode = 
RecordMergeMode.valueOf(mergeProperties.get(RECORD_MERGE_MODE.key()));
+        mergeStrategyId = mergeProperties.get(RECORD_MERGE_STRATEGY_ID.key());
+      } else {
+        Triple<RecordMergeMode, String, String> triple = 
HoodieTableConfig.inferMergingConfigsForWrites(
+            null, writerPayloadClass.get(), null, 
tableConfig.getOrderingFieldsStr().orElse(null), tableVersion);
+        recordMergeMode = triple.getLeft();
+        mergeStrategyId = triple.getRight();
+      }
     } else if (tableVersion.lesserThan(HoodieTableVersion.EIGHT)) {
       Triple<RecordMergeMode, String, String> triple = 
inferMergingConfigsForPreV9Table(
           recordMergeMode, tableConfig.getPayloadClass(),
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
index 4cadae787020..1d4db1a17c00 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
@@ -36,7 +36,6 @@ import java.util.Map;
 import java.util.Properties;
 
 import static 
org.apache.hudi.common.table.HoodieTableConfig.DEFAULT_PAYLOAD_CLASS_NAME;
-import static 
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
 import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
 
@@ -206,8 +205,6 @@ public interface HoodieRecordPayload<T extends 
HoodieRecordPayload> extends Seri
     String payloadClassName = null;
     if (ConfigUtils.containsConfigProperty(props, PAYLOAD_CLASS_NAME)) {
       payloadClassName = ConfigUtils.getStringWithAltKeys(props, 
PAYLOAD_CLASS_NAME);
-    } else if (props.containsKey(LEGACY_PAYLOAD_CLASS_NAME.key())) {
-      payloadClassName = ConfigUtils.getStringWithAltKeys(props, 
LEGACY_PAYLOAD_CLASS_NAME);
     } else if (props.containsKey("hoodie.datasource.write.payload.class")) {
       payloadClassName = 
props.getProperty("hoodie.datasource.write.payload.class");
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index a815f6d88814..2cd47294d87e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -830,10 +830,10 @@ public class HoodieTableConfig extends HoodieConfig {
    * This approach fits the same behavior of upgrade from 8 to 9.
    */
   public static Map<String, String> 
inferMergingConfigsForV9TableCreation(RecordMergeMode recordMergeMode,
-                                                                   String 
payloadClassName,
-                                                                   String 
recordMergeStrategyId,
-                                                                   String 
orderingFieldName,
-                                                                   
HoodieTableVersion tableVersion) {
+                                                                          
String payloadClassName,
+                                                                          
String recordMergeStrategyId,
+                                                                          
String orderingFieldName,
+                                                                          
HoodieTableVersion tableVersion) {
     Map<String, String> reconciledConfigs = new HashMap<>();
     if (tableVersion.lesserThan(HoodieTableVersion.NINE)) {
       throw new HoodieIOException("Unsupported flow for table versions less 
than 9");
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
index a08886a23b9a..5fd50ffcfd00 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
@@ -82,6 +82,8 @@ public class BufferedRecordMergerFactory {
       return new 
PartialUpdateBufferedRecordMerger<>(readerContext.getRecordContext(), 
recordMerger, deleteRecordMerger, orderingFieldNames, readerSchema, props);
     }
 
+    // might need to introduce a merge config for the factory in the future to 
get rid of this.
+    props = readerContext.getMergeProps(props);
     switch (recordMergeMode) {
       case COMMIT_TIME_ORDERING:
         if (partialUpdateModeOpt.isEmpty()) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
index 38b7637184bf..4e8ce7f274a9 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
@@ -52,6 +52,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.common.config.HoodieReaderConfig.USE_NATIVE_HFILE_READER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX;
 import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_CHECKSUM;
 import static 
org.apache.hudi.keygen.constant.KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED;
 
@@ -139,6 +140,19 @@ public class ConfigUtils {
     return props;
   }
 
+  /**
+   * Ensures that the prefixed merge properties are populated for mergers.
+   */
+  public static TypedProperties getMergeProps(TypedProperties props, 
TypedProperties tableProps) {
+    Map<String, String> mergeProps = extractWithPrefix(tableProps, 
RECORD_MERGE_PROPERTY_PREFIX);
+    if (mergeProps.isEmpty()) {
+      return props;
+    }
+    TypedProperties copied = TypedProperties.copy(props);
+    mergeProps.forEach(copied::setProperty);
+    return copied;
+  }
+
   /**
    * Get payload class.
    */
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestPartialUpdateHandler.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestPartialUpdateHandler.java
index 008c70916fae..1b1bc3b369a1 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestPartialUpdateHandler.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestPartialUpdateHandler.java
@@ -26,6 +26,10 @@ import org.junit.jupiter.api.Test;
 
 import java.util.Map;
 
+import static 
org.apache.hudi.common.table.HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -37,6 +41,15 @@ class TestPartialUpdateHandler {
     assertTrue(result.isEmpty());
   }
 
+  @Test
+  void testNonEmptyProperties() {
+    TypedProperties props = new TypedProperties();
+    props.put(RECORD_MERGE_PROPERTY_PREFIX + PARTIAL_UPDATE_UNAVAILABLE_VALUE, 
DEBEZIUM_UNAVAILABLE_VALUE);
+    Map<String, String> result = 
PartialUpdateHandler.parseMergeProperties(props);
+    assertTrue(result.containsKey(PARTIAL_UPDATE_UNAVAILABLE_VALUE));
+    assertEquals(DEBEZIUM_UNAVAILABLE_VALUE, 
result.get(PARTIAL_UPDATE_UNAVAILABLE_VALUE));
+  }
+
   @Test
   void testDirectMatch() {
     Schema stringSchema = Schema.create(Schema.Type.STRING);
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
index 9075228c7802..4781aff2f8d7 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
@@ -383,4 +383,43 @@ public class TestConfigUtils {
     Map<String, String> result = ConfigUtils.extractWithPrefix(null, 
RECORD_MERGE_PROPERTY_PREFIX);
     assertTrue(result.isEmpty());
   }
+
+  @Test
+  void testParseRecordMergePropertiesWithPrefixedProperties() {
+    TypedProperties tableProps = new TypedProperties();
+    tableProps.put(RECORD_MERGE_PROPERTY_PREFIX + "strategy", "overwrite");
+    tableProps.put(RECORD_MERGE_PROPERTY_PREFIX + "field", "col1");
+
+    TypedProperties props = ConfigUtils.getMergeProps(new TypedProperties(), 
tableProps);
+    props.put("unrelated.key", "value");
+
+    assertEquals("overwrite", props.get("strategy"));
+    assertEquals("col1", props.get("field"));
+    assertEquals("value", props.get("unrelated.key"));
+  }
+
+  @Test
+  void testParseRecordMergePropertiesWithNoPrefixedProperties() {
+    TypedProperties tableProps = new TypedProperties();
+
+    TypedProperties props = new TypedProperties();
+    props.put("normal.key", "val");
+    props = ConfigUtils.getMergeProps(props, tableProps);
+
+    assertEquals(1, props.size());
+    assertEquals("val", props.get("normal.key"));
+  }
+
+  @Test
+  void testParseRecordMergePropertiesWithOverwrite() {
+    TypedProperties tableProps = new TypedProperties();
+    tableProps.put(RECORD_MERGE_PROPERTY_PREFIX + "strategy", "overwrite");
+
+    TypedProperties props = new TypedProperties();
+    props.put("strategy", "keep");
+    props = ConfigUtils.getMergeProps(props, tableProps);
+
+    assertEquals(1, props.size());
+    assertEquals("overwrite", props.get("strategy"));
+  }
 }
\ No newline at end of file
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
index 042b217710ef..27bbf96a38df 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
@@ -26,6 +26,7 @@ import 
org.apache.hudi.client.model.EventTimeFlinkRecordMerger;
 import org.apache.hudi.client.model.PartialUpdateFlinkRecordMerger;
 import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
 import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.EventTimeAvroPayload;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieTableType;
@@ -109,7 +110,7 @@ public class TestFlinkWriteClients {
 
     assertThat(tableConfig.getRecordMergeMode(), 
is(RecordMergeMode.EVENT_TIME_ORDERING));
     assertThat(tableConfig.getRecordMergeStrategyId(), 
is(HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID));
-    assertThat(tableConfig.getPayloadClass(), 
is(EventTimeAvroPayload.class.getName()));
+    assertThat(tableConfig.getPayloadClass(), 
is(DefaultHoodieRecordPayload.class.getName()));
 
     HoodieWriteConfig writeConfig = 
FlinkWriteClients.getHoodieClientConfig(conf, false, false);
     String mergerClasses = 
writeConfig.getString(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES);
@@ -154,7 +155,7 @@ public class TestFlinkWriteClients {
 
     assertThat(tableConfig.getRecordMergeMode(), 
is(RecordMergeMode.EVENT_TIME_ORDERING));
     assertThat(tableConfig.getRecordMergeStrategyId(), 
is(HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID));
-    assertThat(tableConfig.getPayloadClass(), 
is(PartialUpdateAvroPayload.class.getName()));
+    assertThat(tableConfig.getPayloadClass(), 
is(DefaultHoodieRecordPayload.class.getName()));
 
     HoodieWriteConfig writeConfig = 
FlinkWriteClients.getHoodieClientConfig(conf, false, false);
     String mergerClasses = 
writeConfig.getString(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES);
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
index b3de7692c65e..36961ead2d66 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
@@ -21,8 +21,8 @@ package org.apache.hudi.functional
 
 import org.apache.hudi.DataSourceWriteOptions
 import org.apache.hudi.DataSourceWriteOptions.{OPERATION, ORDERING_FIELDS, 
RECORDKEY_FIELD, TABLE_TYPE}
-import org.apache.hudi.common.config.TypedProperties
-import org.apache.hudi.common.model.{AWSDmsAvroPayload, 
DefaultHoodieRecordPayload, EventTimeAvroPayload, HoodieRecordMerger, 
OverwriteNonDefaultsWithLatestAvroPayload, OverwriteWithLatestAvroPayload, 
PartialUpdateAvroPayload}
+import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
+import org.apache.hudi.common.model.{AWSDmsAvroPayload, 
DefaultHoodieRecordPayload, EventTimeAvroPayload, HoodieRecordMerger, 
HoodieTableType, OverwriteNonDefaultsWithLatestAvroPayload, 
OverwriteWithLatestAvroPayload, PartialUpdateAvroPayload}
 import org.apache.hudi.common.model.debezium.{DebeziumConstants, 
MySqlDebeziumAvroPayload, PostgresDebeziumAvroPayload}
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig}
@@ -43,12 +43,14 @@ class TestPayloadDeprecationFlow extends 
SparkClientFunctionalTestHarness {
    */
   @ParameterizedTest
   @MethodSource(Array("providePayloadClassTestCases"))
-  def testMergerBuiltinPayload(tableType: String,
-                               payloadClazz: String,
-                               expectedConfigs: Map[String, String]): Unit = {
+  def testMergerBuiltinPayloadUpgradePath(tableType: String,
+                                          payloadClazz: String,
+                                          expectedConfigs: Map[String, 
String]): Unit = {
     val opts: Map[String, String] = Map(
-      HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> payloadClazz)
-    val columns = Seq("ts", "_event_lsn", "rider", "driver", "fare", "Op", 
"_event_seq", DebeziumConstants.FLATTENED_FILE_COL_NAME, 
DebeziumConstants.FLATTENED_POS_COL_NAME)
+      HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> payloadClazz,
+      HoodieMetadataConfig.ENABLE.key() -> "false")
+    val columns = Seq("ts", "_event_lsn", "rider", "driver", "fare", "Op", 
"_event_seq",
+      DebeziumConstants.FLATTENED_FILE_COL_NAME, 
DebeziumConstants.FLATTENED_POS_COL_NAME)
     // 1. Add an insert.
     val data = Seq(
       (10, 1L, "rider-A", "driver-A", 19.10, "i", "10.1", 10, 1),
@@ -83,7 +85,8 @@ class TestPayloadDeprecationFlow extends 
SparkClientFunctionalTestHarness {
     assertTrue(metaClient.getActiveTimeline.firstInstant().isPresent)
     // 2. Add an update.
     val firstUpdateData = Seq(
-      (11, 1L, "rider-X", "driver-X", 19.10, "D", "11.1", 11, 1),
+      (11, 1L, "rider-X", "driver-X", 19.10, "i", "11.1", 11, 1),
+      (12, 1L, "rider-X", "driver-X", 20.10, "D", "12.1", 12, 1),
       (11, 2L, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1))
     val firstUpdate = spark.createDataFrame(firstUpdateData).toDF(columns: _*)
     firstUpdate.write.format("hudi").
@@ -133,7 +136,19 @@ class TestPayloadDeprecationFlow extends 
SparkClientFunctionalTestHarness {
       }
     }
 
-    // 5. Validate.
+    // 5. Add a delete.
+    val fourthUpdateData = Seq(
+      (12, 3L, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1),
+      (12, 5L, "rider-EE", "driver-EE", 17.85, "i", "12.1", 12, 1))
+    val fourthUpdate = spark.createDataFrame(fourthUpdateData).toDF(columns: 
_*)
+    fourthUpdate.write.format("hudi").
+      option(OPERATION.key(), "delete").
+      option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+      option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), 
"1").
+      mode(SaveMode.Append).
+      save(basePath)
+
+    // 6. Validate.
     // Validate table configs.
     tableConfig = metaClient.getTableConfig
     expectedConfigs.foreach { case (key, expectedValue) =>
@@ -168,6 +183,162 @@ class TestPayloadDeprecationFlow extends 
SparkClientFunctionalTestHarness {
       && timeTravelDf.except(expectedTimeTravelDf).isEmpty)
   }
 
+  @ParameterizedTest
+  @MethodSource(Array("providePayloadClassTestCases"))
+  def testMergerBuiltinPayloadFromTableCreationPath(tableType: String,
+                                                    payloadClazz: String,
+                                                    expectedConfigs: 
Map[String, String]): Unit = {
+    val opts: Map[String, String] = Map(
+      HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> payloadClazz,
+      HoodieMetadataConfig.ENABLE.key() -> "false")
+    val columns = Seq("ts", "_event_lsn", "rider", "driver", "fare", "Op", 
"_event_seq",
+      DebeziumConstants.FLATTENED_FILE_COL_NAME, 
DebeziumConstants.FLATTENED_POS_COL_NAME)
+    // 1. Add an insert.
+    val data = Seq(
+      (10, 1L, "rider-A", "driver-A", 19.10, "i", "10.1", 10, 1),
+      (10, 2L, "rider-B", "driver-B", 27.70, "i", "10.1", 10, 1),
+      (10, 3L, "rider-C", "driver-C", 33.90, "i", "10.1", 10, 1),
+      (10, 4L, "rider-D", "driver-D", 34.15, "i", "10.1", 10, 1),
+      (10, 5L, "rider-E", "driver-E", 17.85, "i", "10.1", 10, 1))
+    val inserts = spark.createDataFrame(data).toDF(columns: _*)
+    val orderingFields = if 
(payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) {
+      "_event_bin_file,_event_pos"
+    } else {
+      "ts"
+    }
+    inserts.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "_event_lsn").
+      option(ORDERING_FIELDS.key(), orderingFields).
+      option(TABLE_TYPE.key(), tableType).
+      option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
+      option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+      options(opts).
+      mode(SaveMode.Overwrite).
+      save(basePath)
+    // Verify table was created successfully
+    var metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(basePath)
+      .setConf(storageConf())
+      .build()
+    var tableConfig = metaClient.getTableConfig
+    // Verify table version is 9
+    assertEquals(9, tableConfig.getTableVersion.versionCode())
+    assertTrue(metaClient.getActiveTimeline.firstInstant().isPresent)
+    // Verify table properties
+    expectedConfigs.foreach { case (key, expectedValue) =>
+      if (expectedValue != null) {
+        assertEquals(expectedValue, tableConfig.getString(key), s"Config $key 
should be $expectedValue")
+      } else {
+        assertFalse(tableConfig.contains(key), s"Config $key should not be 
present")
+      }
+    }
+
+    // 2. Add an update.
+    val firstUpdateData = Seq(
+      (11, 1L, "rider-X", "driver-X", 19.10, "i", "11.1", 11, 1),
+      (12, 1L, "rider-X", "driver-X", 20.10, "D", "12.1", 12, 1),
+      (11, 2L, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1))
+    val firstUpdate = spark.createDataFrame(firstUpdateData).toDF(columns: _*)
+    firstUpdate.write.format("hudi").
+      option(OPERATION.key(), "upsert").
+      option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+      mode(SaveMode.Append).
+      save(basePath)
+    // Validate table version.
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    assertEquals(9, metaClient.getTableConfig.getTableVersion.versionCode())
+    val firstUpdateInstantTime = 
metaClient.getActiveTimeline.getInstants.get(1).requestedTime()
+
+
+    // 3. Add an update. This is expected to trigger the upgrade
+    val compactionEnabled = if 
(tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
+      "true"
+    } else {
+      "false"
+    }
+    val secondUpdateData = Seq(
+      (12, 3L, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1),
+      (9, 4L, "rider-DD", "driver-DD", 34.15, "i", "9.1", 9, 1),
+      (12, 5L, "rider-EE", "driver-EE", 17.85, "i", "12.1", 12, 1))
+    val secondUpdate = spark.createDataFrame(secondUpdateData).toDF(columns: 
_*)
+    secondUpdate.write.format("hudi").
+      option(OPERATION.key(), "upsert").
+      option(HoodieCompactionConfig.INLINE_COMPACT.key(), compactionEnabled).
+      option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), 
"1").
+      mode(SaveMode.Append).
+      save(basePath)
+    // Validate table version as 9.
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    assertEquals(9, metaClient.getTableConfig.getTableVersion.versionCode())
+    assertEquals(payloadClazz, metaClient.getTableConfig.getLegacyPayloadClass)
+    val compactionInstants = 
metaClient.getActiveTimeline.getCommitsAndCompactionTimeline.getInstants
+    val foundCompaction = compactionInstants.stream().anyMatch(i => 
i.getAction.equals("commit"))
+    assertTrue(foundCompaction)
+
+    // 4. Add a trivial update to trigger payload class mismatch.
+    val thirdUpdateData = Seq(
+      (12, 3L, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1))
+    val thirdUpdate = spark.createDataFrame(thirdUpdateData).toDF(columns: _*)
+    if (!payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) {
+      assertThrows[HoodieException] {
+        thirdUpdate.write.format("hudi").
+          option(OPERATION.key(), "upsert").
+          option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+          
option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1").
+          option(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(),
+            classOf[MySqlDebeziumAvroPayload].getName).
+          mode(SaveMode.Append).
+          save(basePath)
+      }
+    }
+
+    // 5. Add a delete.
+    val fourthUpdateData = Seq(
+      (12, 3L, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1),
+      (12, 5L, "rider-EE", "driver-EE", 17.85, "i", "12.1", 12, 1))
+    val fourthUpdate = spark.createDataFrame(fourthUpdateData).toDF(columns: 
_*)
+    fourthUpdate.write.format("hudi").
+      option(OPERATION.key(), "delete").
+      option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+      option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), 
"1").
+      mode(SaveMode.Append).
+      save(basePath)
+
+    // 6. Validate.
+    // Validate table configs again.
+    tableConfig = metaClient.getTableConfig
+    expectedConfigs.foreach { case (key, expectedValue) =>
+      if (expectedValue != null) {
+        assertEquals(expectedValue, tableConfig.getString(key), s"Config $key 
should be $expectedValue")
+      } else {
+        assertFalse(tableConfig.contains(key), s"Config $key should not be 
present")
+      }
+    }
+    // Validate snapshot query.
+    val df = spark.read.format("hudi").load(basePath)
+    val finalDf = df.select("ts", "_event_lsn", "rider", "driver", "fare", 
"Op", "_event_seq", DebeziumConstants.FLATTENED_FILE_COL_NAME, 
DebeziumConstants.FLATTENED_POS_COL_NAME)
+      .sort("_event_lsn")
+    val expectedData = getExpectedResultForSnapshotQuery(payloadClazz)
+    val expectedDf = 
spark.createDataFrame(spark.sparkContext.parallelize(expectedData)).toDF(columns:
 _*).sort("_event_lsn")
+    expectedDf.show(false)
+    finalDf.show(false)
+    assertTrue(expectedDf.except(finalDf).isEmpty && 
finalDf.except(expectedDf).isEmpty)
+    // Validate time travel query.
+    val timeTravelDf = spark.read.format("hudi")
+      .option("as.of.instant", firstUpdateInstantTime).load(basePath)
+      .select("ts", "_event_lsn", "rider", "driver", "fare", "Op", 
"_event_seq", DebeziumConstants.FLATTENED_FILE_COL_NAME, 
DebeziumConstants.FLATTENED_POS_COL_NAME)
+      .sort("_event_lsn")
+    timeTravelDf.show(false)
+    val expectedTimeTravelData = 
getExpectedResultForTimeTravelQuery(payloadClazz)
+    val expectedTimeTravelDf = spark.createDataFrame(
+      spark.sparkContext.parallelize(expectedTimeTravelData)).toDF(columns: 
_*).sort("_event_lsn")
+    expectedTimeTravelDf.show(false)
+    timeTravelDf.show(false)
+    assertTrue(
+      expectedTimeTravelDf.except(timeTravelDf).isEmpty
+        && timeTravelDf.except(expectedTimeTravelDf).isEmpty)
+  }
+
   def getWriteConfig(hudiOpts: Map[String, String]): HoodieWriteConfig = {
     val props = TypedProperties.fromMap(hudiOpts.asJava)
     HoodieWriteConfig.newBuilder()
@@ -184,25 +355,19 @@ class TestPayloadDeprecationFlow extends 
SparkClientFunctionalTestHarness {
         || payloadClazz.equals(classOf[PostgresDebeziumAvroPayload].getName)
         || payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) {
         Seq(
-          (11, 1, "rider-X", "driver-X", 19.10, "D", "11.1", 11, 1),
+          (12, 1, "rider-X", "driver-X", 20.10, "D", "12.1", 12, 1),
           (11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1),
-          (12, 3, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1),
-          (10, 4, "rider-D", "driver-D", 34.15, "i", "10.1", 10, 1),
-          (12, 5, "rider-EE", "driver-EE", 17.85, "i", "12.1", 12, 1))
+          (10, 4, "rider-D", "driver-D", 34.15, "i", "10.1", 10, 1))
       } else {
         Seq(
-          (11, 1, "rider-X", "driver-X", 19.10, "D", "11.1", 11, 1),
+          (12, 1, "rider-X", "driver-X", 20.10, "D", "12.1", 12, 1),
           (11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1),
-          (12, 3, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1),
-          (9, 4, "rider-DD", "driver-DD", 34.15, "i", "9.1", 9, 1),
-          (12, 5, "rider-EE", "driver-EE", 17.85, "i", "12.1", 12, 1))
+          (9, 4, "rider-DD", "driver-DD", 34.15, "i", "9.1", 9, 1))
       }
     } else {
       Seq(
         (11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1),
-        (12, 3, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1),
-        (9, 4, "rider-DD", "driver-DD", 34.15, "i", "9.1", 9, 1),
-        (12, 5, "rider-EE", "driver-EE", 17.85, "i", "12.1", 12, 1))
+        (9, 4, "rider-DD", "driver-DD", 34.15, "i", "9.1", 9, 1))
     }
   }
 
@@ -210,7 +375,7 @@ class TestPayloadDeprecationFlow extends 
SparkClientFunctionalTestHarness {
   Seq[(Int, Long, String, String, Double, String, String, Int, Int)] = {
     if (!payloadClazz.equals(classOf[AWSDmsAvroPayload].getName)) {
       Seq(
-        (11, 1, "rider-X", "driver-X", 19.10, "D", "11.1", 11, 1),
+        (12, 1, "rider-X", "driver-X", 20.10, "D", "12.1", 12, 1),
         (11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1),
         (10, 3, "rider-C", "driver-C", 33.90, "i", "10.1", 10, 1),
         (10, 4, "rider-D", "driver-D", 34.15, "i", "10.1", 10, 1),
@@ -234,34 +399,27 @@ object TestPayloadDeprecationFlow {
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
           HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[DefaultHoodieRecordPayload].getName,
-          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID)),
+          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID)
+      ),
       Arguments.of(
         "COPY_ON_WRITE",
         classOf[OverwriteWithLatestAvroPayload].getName,
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
           HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[OverwriteWithLatestAvroPayload].getName,
-          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID
-        )
+          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID)
       ),
-      Arguments.of(
-        "COPY_ON_WRITE",
-        classOf[PartialUpdateAvroPayload].getName,
-        Map(
-          HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
-          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[PartialUpdateAvroPayload].getName,
-          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
-        HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS"),
       Arguments.of(
         "COPY_ON_WRITE",
         classOf[PostgresDebeziumAvroPayload].getName,
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
           HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[PostgresDebeziumAvroPayload].getName,
-          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
-        HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_MARKERS",
-        HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + 
HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE
-          -> "__debezium_unavailable_value"),
+          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
+          HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "FILL_UNAVAILABLE",
+          HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + 
HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE
+            -> "__debezium_unavailable_value")
+      ),
       Arguments.of(
         "COPY_ON_WRITE",
         classOf[MySqlDebeziumAvroPayload].getName,
@@ -276,9 +434,10 @@ object TestPayloadDeprecationFlow {
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
           HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[AWSDmsAvroPayload].getName,
-          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
-        HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + 
DefaultHoodieRecordPayload.DELETE_KEY -> "Op",
-        HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + 
DefaultHoodieRecordPayload.DELETE_MARKER -> "D"),
+          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
+          HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + 
DefaultHoodieRecordPayload.DELETE_KEY -> "Op",
+          HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + 
DefaultHoodieRecordPayload.DELETE_MARKER -> "D")
+      ),
       Arguments.of(
         "COPY_ON_WRITE",
         classOf[EventTimeAvroPayload].getName,
@@ -304,15 +463,15 @@ object TestPayloadDeprecationFlow {
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
           HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[DefaultHoodieRecordPayload].getName,
-          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID)),
+          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID)
+      ),
       Arguments.of(
         "MERGE_ON_READ",
         classOf[OverwriteWithLatestAvroPayload].getName,
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
           HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[OverwriteWithLatestAvroPayload].getName,
-          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID
-        )
+          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID)
       ),
       Arguments.of(
         "MERGE_ON_READ",
@@ -320,18 +479,20 @@ object TestPayloadDeprecationFlow {
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
           HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[PartialUpdateAvroPayload].getName,
-          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
-          HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS"),
+          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
+          HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS")
+      ),
       Arguments.of(
         "MERGE_ON_READ",
         classOf[PostgresDebeziumAvroPayload].getName,
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
           HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[PostgresDebeziumAvroPayload].getName,
-          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
-          HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_MARKERS",
+          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
+          HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "FILL_UNAVAILABLE",
           HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + 
HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE
-            -> "__debezium_unavailable_value"),
+            -> "__debezium_unavailable_value")
+      ),
       Arguments.of(
         "MERGE_ON_READ",
         classOf[MySqlDebeziumAvroPayload].getName,
@@ -346,9 +507,10 @@ object TestPayloadDeprecationFlow {
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
           HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[AWSDmsAvroPayload].getName,
-          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
           HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + 
DefaultHoodieRecordPayload.DELETE_KEY -> "Op",
-          HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + 
DefaultHoodieRecordPayload.DELETE_MARKER -> "D"),
+          HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + 
DefaultHoodieRecordPayload.DELETE_MARKER -> "D")
+      ),
       Arguments.of(
         "MERGE_ON_READ",
         classOf[EventTimeAvroPayload].getName,
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 7948e0f8610f..4cbe5de74d73 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -1883,7 +1883,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
     //now assert that hoodie.properties file now has updated payload class name
     HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc, 
dataSetBasePath, false);
-    assertEquals(metaClient.getTableConfig().getPayloadClass(), 
PartialUpdateAvroPayload.class.getName());
+    assertEquals(metaClient.getTableConfig().getPayloadClass(), 
DefaultHoodieRecordPayload.class.getName());
   }
 
   @Disabled("To be fixed with HUDI-9714")


Reply via email to