This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 21b8075f64eb refactor: Address follow up issues for payload 
deprecation (#13950)
21b8075f64eb is described below

commit 21b8075f64ebc7f870936803853cf291a97c463e
Author: Lin Liu <[email protected]>
AuthorDate: Wed Oct 1 09:52:19 2025 -0700

    refactor: Address follow up issues for payload deprecation (#13950)
---
 .../client/bootstrap/BootstrapRecordPayload.java   |  51 ----------
 .../table/upgrade/EightToNineUpgradeHandler.java   |  21 ++--
 .../table/upgrade/NineToEightDowngradeHandler.java |  14 ++-
 .../upgrade/TestEightToNineUpgradeHandler.java     |  23 +++--
 .../bootstrap/OrcBootstrapMetadataHandler.java     |   6 +-
 .../bootstrap/ParquetBootstrapMetadataHandler.java |   6 +-
 .../hudi/common/table/HoodieTableConfig.java       | 112 ++++++++++++++-------
 .../apache/hudi/common/util/HoodieRecordUtils.java |   2 +-
 .../org/apache/hudi/util/FlinkWriteClients.java    |   6 +-
 .../java/org/apache/hudi/util/StreamerUtil.java    |   3 +-
 .../test/java/org/apache/hudi/utils/TestData.java  |   3 +-
 .../apache/hudi/utils/TestFlinkWriteClients.java   |   8 +-
 .../org/apache/hudi/utils/TestStreamerUtil.java    |  12 +--
 .../hudi/common/table/TestHoodieTableConfig.java   |  28 +++---
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  12 ++-
 .../scala/org/apache/hudi/HoodieWriterUtils.scala  |  15 +++
 .../hudi/table/upgrade/TestUpgradeDowngrade.java   |  51 +++++-----
 .../org/apache/hudi/TestHoodieSparkSqlWriter.scala |  20 +++-
 .../TestHoodieSparkSqlWriterWithTestFormat.scala   |  11 +-
 .../functional/TestPayloadDeprecationFlow.scala    |  48 +++------
 .../hudi/functional/TestSparkDataSource.scala      |  52 ++++++----
 .../sql/hudi/dml/insert/TestInsertTable.scala      |   5 +-
 .../others/TestMergeModeCommitTimeOrdering.scala   |  29 ++++--
 .../others/TestMergeModeEventTimeOrdering.scala    |  38 ++++---
 .../sql/hudi/procedure/TestRepairsProcedure.scala  |   1 -
 .../apache/hudi/utilities/streamer/StreamSync.java |  18 ++--
 .../deltastreamer/TestHoodieDeltaStreamer.java     |   1 -
 27 files changed, 326 insertions(+), 270 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/BootstrapRecordPayload.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/BootstrapRecordPayload.java
deleted file mode 100644
index a60a0d39f7c5..000000000000
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/BootstrapRecordPayload.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.client.bootstrap;
-
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.Option;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-
-public class BootstrapRecordPayload implements 
HoodieRecordPayload<BootstrapRecordPayload> {
-
-  private final GenericRecord record;
-
-  public BootstrapRecordPayload(GenericRecord record) {
-    this.record = record;
-  }
-
-  @Override
-  public BootstrapRecordPayload preCombine(BootstrapRecordPayload oldValue) {
-    return this;
-  }
-
-  @Override
-  public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord 
currentValue, Schema schema) {
-    return Option.ofNullable(record);
-  }
-
-  @Override
-  public Option<IndexedRecord> getInsertValue(Schema schema) {
-    return Option.ofNullable(record);
-  }
-
-}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java
index d5d731d51833..788a6deb7171 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java
@@ -127,7 +127,7 @@ public class EightToNineUpgradeHandler implements 
UpgradeHandler {
           metaClient.getTableConfig().getTableVersion());
     }
     // Handle merge mode config.
-    reconcileMergeModeConfig(tablePropsToAdd, tableConfig);
+    reconcileMergeModeConfig(tablePropsToAdd, tablePropsToRemove, tableConfig);
     // Handle partial update mode config.
     reconcilePartialUpdateModeConfig(tablePropsToAdd, tableConfig);
     // Handle merge properties config.
@@ -140,12 +140,18 @@ public class EightToNineUpgradeHandler implements 
UpgradeHandler {
   }
 
   private void reconcileMergeModeConfig(Map<ConfigProperty, String> 
tablePropsToAdd,
+                                        Set<ConfigProperty> tablePropsToRemove,
                                         HoodieTableConfig tableConfig) {
     String payloadClass = tableConfig.getPayloadClass();
-    String mergeStrategy = tableConfig.getRecordMergeStrategyId();
-    if (!BUILTIN_MERGE_STRATEGIES.contains(mergeStrategy) || 
StringUtils.isNullOrEmpty(payloadClass)) {
+    RecordMergeMode mergeMode = tableConfig.getRecordMergeMode();
+    if (mergeMode != RecordMergeMode.CUSTOM) {
+      // For commit time or event time based table, remove merge strategy id.
+      tablePropsToRemove.add(RECORD_MERGE_STRATEGY_ID);
+    } else if (StringUtils.isNullOrEmpty(payloadClass)) {
+      // For table using custom merger, keep their configs.
       return;
     }
+
     if (PAYLOADS_MAPPED_TO_COMMIT_TIME_MERGE_MODE.contains(payloadClass)) {
       tablePropsToAdd.put(RECORD_MERGE_MODE, 
RecordMergeMode.COMMIT_TIME_ORDERING.name());
       tablePropsToAdd.put(RECORD_MERGE_STRATEGY_ID, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID);
@@ -160,8 +166,7 @@ public class EightToNineUpgradeHandler implements 
UpgradeHandler {
                                            Set<ConfigProperty> 
tablePropsToRemove,
                                            HoodieTableConfig tableConfig) {
     String payloadClass = tableConfig.getPayloadClass();
-    String mergeStrategy = tableConfig.getRecordMergeStrategyId();
-    if (!BUILTIN_MERGE_STRATEGIES.contains(mergeStrategy) || 
StringUtils.isNullOrEmpty(payloadClass)) {
+    if (StringUtils.isNullOrEmpty(payloadClass)) {
       return;
     }
     if (PAYLOAD_CLASSES_TO_HANDLE.contains(payloadClass)) {
@@ -173,8 +178,7 @@ public class EightToNineUpgradeHandler implements 
UpgradeHandler {
   private void reconcilePartialUpdateModeConfig(Map<ConfigProperty, String> 
tablePropsToAdd,
                                                 HoodieTableConfig tableConfig) 
{
     String payloadClass = tableConfig.getPayloadClass();
-    String mergeStrategy = tableConfig.getRecordMergeStrategyId();
-    if (!BUILTIN_MERGE_STRATEGIES.contains(mergeStrategy) || 
StringUtils.isNullOrEmpty(payloadClass)) {
+    if (StringUtils.isNullOrEmpty(payloadClass)) {
       return;
     }
     if 
(payloadClass.equals(OverwriteNonDefaultsWithLatestAvroPayload.class.getName())
@@ -187,8 +191,7 @@ public class EightToNineUpgradeHandler implements 
UpgradeHandler {
 
   private void reconcileMergePropertiesConfig(Map<ConfigProperty, String> 
tablePropsToAdd, HoodieTableConfig tableConfig, HoodieWriteConfig writeConfig) {
     String payloadClass = tableConfig.getPayloadClass();
-    String mergeStrategy = tableConfig.getRecordMergeStrategyId();
-    if (!BUILTIN_MERGE_STRATEGIES.contains(mergeStrategy) || 
StringUtils.isNullOrEmpty(payloadClass)) {
+    if (StringUtils.isNullOrEmpty(payloadClass)) {
       return;
     }
     if (payloadClass.equals(AWSDmsAvroPayload.class.getName())) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/NineToEightDowngradeHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/NineToEightDowngradeHandler.java
index aa62e1a20a16..5d547f16e7dd 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/NineToEightDowngradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/NineToEightDowngradeHandler.java
@@ -44,6 +44,8 @@ import java.util.Set;
 
 import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
 import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID;
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
 import static 
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
@@ -100,9 +102,17 @@ public class NineToEightDowngradeHandler implements 
DowngradeHandler {
                                      HoodieTableConfig tableConfig) {
     // Update table properties.
     propertiesToRemove.add(PARTIAL_UPDATE_MODE);
-    // For specified payload classes, add strategy id and custom merge mode.
     String legacyPayloadClass = tableConfig.getLegacyPayloadClass();
-    if (!StringUtils.isNullOrEmpty(legacyPayloadClass) && 
(PAYLOAD_CLASSES_TO_HANDLE.contains(legacyPayloadClass))) {
+    // For tables with commit time or event time mode, add strategy id.
+    RecordMergeMode mergeMode = tableConfig.getRecordMergeMode();
+    if (mergeMode == RecordMergeMode.EVENT_TIME_ORDERING) {
+      propertiesToAdd.put(RECORD_MERGE_STRATEGY_ID, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID);
+    } else if (mergeMode == RecordMergeMode.COMMIT_TIME_ORDERING) {
+      propertiesToAdd.put(RECORD_MERGE_STRATEGY_ID, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID);
+    }
+    // For specified payload classes, add strategy id and custom merge mode.
+    if (!StringUtils.isNullOrEmpty(legacyPayloadClass)
+        && (PAYLOAD_CLASSES_TO_HANDLE.contains(legacyPayloadClass))) {
       propertiesToRemove.add(LEGACY_PAYLOAD_CLASS_NAME);
       propertiesToAdd.put(PAYLOAD_CLASS_NAME, legacyPayloadClass);
       if 
(!legacyPayloadClass.equals(OverwriteWithLatestAvroPayload.class.getName())
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java
index ef867d376f8f..eb885f8be5ad 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java
@@ -77,6 +77,7 @@ import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_UNAV
 import static 
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
 import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
 import static org.apache.hudi.common.table.PartialUpdateMode.FILL_UNAVAILABLE;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -277,7 +278,8 @@ class TestEightToNineUpgradeHandler {
           handler.upgrade(config, context, INSTANT_TIME, 
upgradeDowngradeHelper);
       // Verify
       assertEquals(DEFAULT_UPGRADE_RESULT.propertiesToUpdate(), 
result.propertiesToUpdate());
-      assertEquals(DEFAULT_UPGRADE_RESULT.propertiesToDelete(), 
result.propertiesToDelete());
+      assertEquals(
+          Collections.singleton(RECORD_MERGE_STRATEGY_ID), 
result.propertiesToDelete());
     }
   }
 
@@ -285,12 +287,13 @@ class TestEightToNineUpgradeHandler {
                                         Set<ConfigProperty> propertiesToRemove,
                                         String payloadClass) {
     if (payloadClass.equals(MySqlDebeziumAvroPayload.class.getName()) || 
payloadClass.equals(PostgresDebeziumAvroPayload.class.getName())) {
-      assertEquals(2, propertiesToRemove.size());
+      assertEquals(3, propertiesToRemove.size());
       
assertTrue(propertiesToRemove.contains(HoodieTableConfig.PRECOMBINE_FIELD));
     } else {
-      assertEquals(1, propertiesToRemove.size());
+      assertEquals(2, propertiesToRemove.size());
     }
     assertTrue(propertiesToRemove.contains(PAYLOAD_CLASS_NAME));
+    assertTrue(propertiesToRemove.contains(RECORD_MERGE_STRATEGY_ID));
     assertTrue(propertiesToAdd.containsKey(LEGACY_PAYLOAD_CLASS_NAME));
     assertEquals(
         payloadClass,
@@ -342,7 +345,9 @@ class TestEightToNineUpgradeHandler {
 
       // Verify
       assertEquals(DEFAULT_UPGRADE_RESULT.propertiesToUpdate(), 
result.propertiesToUpdate());
-      assertEquals(DEFAULT_UPGRADE_RESULT.propertiesToDelete(), 
result.propertiesToDelete());
+      assertEquals(
+          Collections.singleton(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID),
+          result.propertiesToDelete());
 
       // Verify storage methods were called correctly
       // Note: createFileInPath directly calls create() when contentWriter is 
present
@@ -426,7 +431,8 @@ class TestEightToNineUpgradeHandler {
           handler.upgrade(config, context, INSTANT_TIME, 
upgradeDowngradeHelper);
       // Verify
       assertEquals(DEFAULT_UPGRADE_RESULT.propertiesToUpdate(), 
result.propertiesToUpdate());
-      assertEquals(DEFAULT_UPGRADE_RESULT.propertiesToDelete(), 
result.propertiesToDelete());
+      assertEquals(
+          Collections.singleton(RECORD_MERGE_STRATEGY_ID), 
result.propertiesToDelete());
     }
   }
 
@@ -451,7 +457,8 @@ class TestEightToNineUpgradeHandler {
           handler.upgrade(config, context, INSTANT_TIME, 
upgradeDowngradeHelper);
       // Verify
       assertEquals(DEFAULT_UPGRADE_RESULT.propertiesToUpdate(), 
result.propertiesToUpdate());
-      assertEquals(DEFAULT_UPGRADE_RESULT.propertiesToDelete(), 
result.propertiesToDelete());
+      assertEquals(
+          Collections.singleton(RECORD_MERGE_STRATEGY_ID), 
result.propertiesToDelete());
     }
   }
 
@@ -492,7 +499,9 @@ class TestEightToNineUpgradeHandler {
 
       // Verify
       assertEquals(DEFAULT_UPGRADE_RESULT.propertiesToUpdate(), 
result.propertiesToUpdate());
-      assertEquals(DEFAULT_UPGRADE_RESULT.propertiesToDelete(), 
result.propertiesToDelete());
+      assertEquals(
+          Collections.singleton(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID),
+          result.propertiesToDelete());
 
       // Verify storage methods were called correctly
       // Note: createFileInPath directly calls create() when contentWriter is 
present
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
index 86944ae3f5bf..c0981bdaeb62 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
@@ -19,8 +19,7 @@
 package org.apache.hudi.table.action.bootstrap;
 
 import org.apache.hudi.avro.model.HoodieFileStatus;
-import org.apache.hudi.client.bootstrap.BootstrapRecordPayload;
-import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
@@ -85,8 +84,7 @@ class OrcBootstrapMetadataHandler extends 
BaseBootstrapMetadataHandler {
             String recKey = keyGenerator.getKey(inp).getRecordKey();
             GenericRecord gr = new 
GenericData.Record(METADATA_BOOTSTRAP_RECORD_SCHEMA);
             gr.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recKey);
-            BootstrapRecordPayload payload = new BootstrapRecordPayload(gr);
-            HoodieRecord rec = new HoodieAvroRecord(new HoodieKey(recKey, 
partitionPath), payload);
+            HoodieRecord rec = new HoodieAvroIndexedRecord(new 
HoodieKey(recKey, partitionPath), gr);
             return rec;
           }, table.getPreExecuteRunnable());
       executor.execute();
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
index 7b1f48cd1aff..44d4a18b781e 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
@@ -19,8 +19,7 @@
 package org.apache.hudi.table.action.bootstrap;
 
 import org.apache.hudi.avro.model.HoodieFileStatus;
-import org.apache.hudi.client.bootstrap.BootstrapRecordPayload;
-import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieSparkRecord;
@@ -120,8 +119,7 @@ class ParquetBootstrapMetadataHandler extends 
BaseBootstrapMetadataHandler {
       case AVRO:
         GenericRecord avroRecord = new 
GenericData.Record(METADATA_BOOTSTRAP_RECORD_SCHEMA);
         avroRecord.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recordKey);
-        BootstrapRecordPayload payload = new 
BootstrapRecordPayload(avroRecord);
-        return new HoodieAvroRecord<>(hoodieKey, payload);
+        return new HoodieAvroIndexedRecord(hoodieKey, avroRecord);
 
       case SPARK:
         StructType schema = 
HoodieInternalRowUtils$.MODULE$.getCachedSchema(METADATA_BOOTSTRAP_RECORD_SCHEMA);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index cdace761e6bc..9b24f7d0c989 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -833,15 +833,24 @@ public class HoodieTableConfig extends HoodieConfig {
     recordMergeStrategyId = inferredConfigs.getRight();
 
     // Step 2: Handle Version 9 specific logic.
-    // CASE 0: For tables with special merger properties, e.g., with 
non-builtin mergers.
-    // CASE 1: For tables using MERGE MODE, or CUSTOM builtin mergers.
-    //   NOTE: Payload class should NOT be set for these cases.
-    if (!BUILTIN_MERGE_STRATEGIES.contains(recordMergeStrategyId)
+    if (recordMergeMode != CUSTOM && 
StringUtils.isNullOrEmpty(payloadClassName)) {
+      // CASE 0: For commit time and event time based table, only merge mode 
is set.
+      //   `StringUtils.isNullOrEmpty(payloadClassName)` is added since
+      //   `DefaultHoodieRecordPayload`, `OverwriteWithLatestAvroPayload` and 
`EventTimeAvroPayload`
+      //   have the commit/event time merge mode through 
`inferMergingConfigsForPreV9Table`.
+      //   NOTE: Payload class / strategy id should NOT be set for this case.
+      reconciledConfigs.put(RECORD_MERGE_MODE.key(), recordMergeMode.name());
+    } else if (!BUILTIN_MERGE_STRATEGIES.contains(recordMergeStrategyId)
         || StringUtils.isNullOrEmpty(payloadClassName)) {
+      // CASE 1: For tables using non-builtin custom merger
+      //           (payload class is inferred in 
`inferMergingConfigsForPreV9Table`),
+      //         or for tables using builtin custom merger without using a 
payload class,
+      //         in this case, merge mode and strategy id should be set.
+      //   NOTE: Payload class should NOT be set for these cases.
       reconciledConfigs.put(RECORD_MERGE_MODE.key(), recordMergeMode.name());
       reconciledConfigs.put(RECORD_MERGE_STRATEGY_ID.key(), 
recordMergeStrategyId);
     } else {
-      // For tables using payload classes.
+      // For tables using payload classes in custom merge mode.
       //   CASE 2: Custom payload class. We set these properties explicitly.
       if 
(!PayloadGroupings.getPayloadsUnderDeprecation().contains(payloadClassName)) {
         reconciledConfigs.put(RECORD_MERGE_MODE.key(), CUSTOM.toString());
@@ -849,46 +858,58 @@ public class HoodieTableConfig extends HoodieConfig {
         reconciledConfigs.put(RECORD_MERGE_STRATEGY_ID.key(), 
PAYLOAD_BASED_MERGE_STRATEGY_UUID);
       } else { // CASE 3: Payload classes are under deprecation.
         // Standard merging configs.
-        // NOTE: We use LEGACY_PAYLOAD_CLASS_NAME instead of 
PAYLOAD_CLASS_NAME here.
-        if 
(PayloadGroupings.getEventTimeOrderingPayloads().contains(payloadClassName)) {
-          reconciledConfigs.put(RECORD_MERGE_MODE.key(), 
EVENT_TIME_ORDERING.name());
-          reconciledConfigs.put(LEGACY_PAYLOAD_CLASS_NAME.key(), 
payloadClassName);
-          reconciledConfigs.put(RECORD_MERGE_STRATEGY_ID.key(), 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID);
-        } else {
-          reconciledConfigs.put(RECORD_MERGE_MODE.key(), 
COMMIT_TIME_ORDERING.name());
-          reconciledConfigs.put(LEGACY_PAYLOAD_CLASS_NAME.key(), 
payloadClassName);
-          reconciledConfigs.put(RECORD_MERGE_STRATEGY_ID.key(), 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID);
-        }
+        handleMergeModeConfigs(payloadClassName, reconciledConfigs);
         // Partial update mode config.
-        // Certain payloads are migrated to non payload way from 1.1 Hudi 
binary.
-        // Hence we need to set the right value for partial update mode for 
some of the cases.
-        if (payloadClassName.equals(PartialUpdateAvroPayload.class.getName())
-            || 
payloadClassName.equals(OverwriteNonDefaultsWithLatestAvroPayload.class.getName()))
 {
-          reconciledConfigs.put(PARTIAL_UPDATE_MODE.key(), 
PartialUpdateMode.IGNORE_DEFAULTS.name());
-        } else if 
(payloadClassName.equals(PostgresDebeziumAvroPayload.class.getName())) {
-          reconciledConfigs.put(PARTIAL_UPDATE_MODE.key(), 
PartialUpdateMode.FILL_UNAVAILABLE.name());
-        }
-        // Additional custom merge properties.
+        handlePartialUpdateModeConfigs(payloadClassName, reconciledConfigs);
+        // Additional custom merge properties.s
         // Certain payloads are migrated to non payload way from 1.1 Hudi 
binary and the reader might need certain properties for the
         // merge to function as expected. Handing such special cases here.
-        if 
(payloadClassName.equals(PostgresDebeziumAvroPayload.class.getName())) {
-          reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + 
PARTIAL_UPDATE_UNAVAILABLE_VALUE, DEBEZIUM_UNAVAILABLE_VALUE);
-          reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY, 
DebeziumConstants.FLATTENED_OP_COL_NAME);
-          reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER, 
DebeziumConstants.DELETE_OP);
-          reconciledConfigs.put(ORDERING_FIELDS.key(), 
DebeziumConstants.FLATTENED_LSN_COL_NAME);
-        } else if 
(payloadClassName.equals(MySqlDebeziumAvroPayload.class.getName())) {
-          reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY, 
DebeziumConstants.FLATTENED_OP_COL_NAME);
-          reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER, 
DebeziumConstants.DELETE_OP);
-          reconciledConfigs.put(ORDERING_FIELDS.key(), 
DebeziumConstants.FLATTENED_FILE_COL_NAME + "," + 
DebeziumConstants.FLATTENED_POS_COL_NAME);
-        } else if (payloadClassName.equals(AWSDmsAvroPayload.class.getName())) 
{
-          reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY, 
OP_FIELD);
-          reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER, 
DELETE_OPERATION_VALUE);
-        }
+        handlePayloadAdhocConfigs(payloadClassName, reconciledConfigs);
       }
     }
     return reconciledConfigs;
   }
 
+  private static void handleMergeModeConfigs(String payloadClassName, 
Map<String, String> reconciledConfigs) {
+    // NOTE: We use LEGACY_PAYLOAD_CLASS_NAME instead of PAYLOAD_CLASS_NAME 
here.
+    if 
(PayloadGroupings.getEventTimeOrderingPayloads().contains(payloadClassName)) {
+      reconciledConfigs.put(RECORD_MERGE_MODE.key(), 
EVENT_TIME_ORDERING.name());
+      reconciledConfigs.put(LEGACY_PAYLOAD_CLASS_NAME.key(), payloadClassName);
+    } else {
+      reconciledConfigs.put(RECORD_MERGE_MODE.key(), 
COMMIT_TIME_ORDERING.name());
+      reconciledConfigs.put(LEGACY_PAYLOAD_CLASS_NAME.key(), payloadClassName);
+    }
+  }
+
+  private static void handlePartialUpdateModeConfigs(String payloadClassName, 
Map<String, String> reconciledConfigs) {
+    // Certain payloads are migrated to non payload way from 1.1 Hudi binary.
+    // Hence we need to set the right value for partial update mode for some 
of the cases.
+    if (payloadClassName.equals(PartialUpdateAvroPayload.class.getName())
+        || 
payloadClassName.equals(OverwriteNonDefaultsWithLatestAvroPayload.class.getName()))
 {
+      reconciledConfigs.put(PARTIAL_UPDATE_MODE.key(), 
PartialUpdateMode.IGNORE_DEFAULTS.name());
+    } else if 
(payloadClassName.equals(PostgresDebeziumAvroPayload.class.getName())) {
+      reconciledConfigs.put(PARTIAL_UPDATE_MODE.key(), 
PartialUpdateMode.FILL_UNAVAILABLE.name());
+    }
+  }
+
+  private static void handlePayloadAdhocConfigs(String payloadClassName, 
Map<String, String> reconciledConfigs) {
+    // Certain payloads are migrated to non payload way from 1.1 Hudi binary 
and the reader might need certain properties for the
+    // merge to function as expected. Handing such special cases here.
+    if (payloadClassName.equals(PostgresDebeziumAvroPayload.class.getName())) {
+      reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + 
PARTIAL_UPDATE_UNAVAILABLE_VALUE, DEBEZIUM_UNAVAILABLE_VALUE);
+      reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY, 
DebeziumConstants.FLATTENED_OP_COL_NAME);
+      reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER, 
DebeziumConstants.DELETE_OP);
+      reconciledConfigs.put(ORDERING_FIELDS.key(), 
DebeziumConstants.FLATTENED_LSN_COL_NAME);
+    } else if 
(payloadClassName.equals(MySqlDebeziumAvroPayload.class.getName())) {
+      reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY, 
DebeziumConstants.FLATTENED_OP_COL_NAME);
+      reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER, 
DebeziumConstants.DELETE_OP);
+      reconciledConfigs.put(ORDERING_FIELDS.key(), 
DebeziumConstants.FLATTENED_FILE_COL_NAME + "," + 
DebeziumConstants.FLATTENED_POS_COL_NAME);
+    } else if (payloadClassName.equals(AWSDmsAvroPayload.class.getName())) {
+      reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY, 
OP_FIELD);
+      reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER, 
DELETE_OPERATION_VALUE);
+    }
+  }
+
   /**
    * To be invoked for table creation flows or writer flows.
    * @return the merging configs to use.
@@ -898,7 +919,18 @@ public class HoodieTableConfig extends HoodieConfig {
                                                                                
      String recordMergeStrategyId,
                                                                                
      String orderingFieldNamesAsString,
                                                                                
      HoodieTableVersion tableVersion) {
-    return inferMergingConfigsForPreV9Table(recordMergeMode, payloadClassName, 
recordMergeStrategyId, orderingFieldNamesAsString, tableVersion);
+    if (tableVersion.greaterThanOrEquals(HoodieTableVersion.NINE)) {
+      Map<String, String> inferredMergingConfigs = 
inferMergingConfigsForV9TableCreation(
+          recordMergeMode, payloadClassName, recordMergeStrategyId, 
orderingFieldNamesAsString, tableVersion);
+      
checkArgument(inferredMergingConfigs.containsKey(HoodieTableConfig.RECORD_MERGE_MODE.key()));
+      return Triple.of(
+          
RecordMergeMode.valueOf(inferredMergingConfigs.get(RECORD_MERGE_MODE.key())),
+          inferredMergingConfigs.getOrDefault(PAYLOAD_CLASS_NAME.key(), null),
+          inferredMergingConfigs.getOrDefault(RECORD_MERGE_STRATEGY_ID.key(), 
null));
+    }
+    // For table version <= 8.
+    return inferMergingConfigsForPreV9Table(
+        recordMergeMode, payloadClassName, recordMergeStrategyId, 
orderingFieldNamesAsString, tableVersion);
   }
 
   /**
@@ -944,7 +976,9 @@ public class HoodieTableConfig extends HoodieConfig {
         inferredRecordMergeMode = modeBasedOnPayload != null ? 
modeBasedOnPayload : modeBasedOnStrategyId;
       }
     }
-    if (recordMergeMode != null) {
+
+    if (recordMergeMode != null && 
(tableVersion.lesserThan(HoodieTableVersion.NINE)
+        || 
!PayloadGroupings.getPayloadsUnderDeprecation().contains(payloadClassName))) {
       checkArgument(inferredRecordMergeMode == recordMergeMode,
           String.format("Configured record merge mode (%s) is inconsistent 
with payload class (%s) "
                   + "or record merge strategy ID (%s) configured. Please 
revisit the configs.",
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
index ae86f95448fc..14b5ad8efd45 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
@@ -110,7 +110,7 @@ public class HoodieRecordUtils {
     return Option.fromJavaOptional(mergeImplClassList.stream()
         .map(clazz -> loadRecordMerger(clazz))
         .filter(Objects::nonNull)
-        .filter(merger -> 
merger.getMergingStrategy().equals(recordMergeStrategyId))
+        .filter(merger -> StringUtils.isNullOrEmpty(recordMergeStrategyId) || 
merger.getMergingStrategy().equals(recordMergeStrategyId))
         .filter(merger -> recordTypeCompatibleEngine(merger.getRecordType(), 
engineType))
         .findFirst());
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
index 03ef1a46a588..3fef94c8e60b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
@@ -245,9 +245,11 @@ public class FlinkWriteClients {
 
     // <merge_mode, payload_class, merge_strategy_id>
     Triple<RecordMergeMode, String, String> mergingBehavior = 
StreamerUtil.inferMergingBehavior(conf);
-    builder.withRecordMergeStrategyId(mergingBehavior.getRight())
-        .withRecordMergeMode(mergingBehavior.getLeft())
+    builder.withRecordMergeMode(mergingBehavior.getLeft())
         .withRecordMergeImplClasses(StreamerUtil.getMergerClasses(conf, 
mergingBehavior.getLeft(), mergingBehavior.getMiddle()));
+    if (mergingBehavior.getRight() != null) {
+      builder.withRecordMergeStrategyId(mergingBehavior.getRight());
+    }
 
     Option<HoodieLockConfig> lockConfig = getLockConfig(conf);
     if (lockConfig.isPresent()) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 11d3817e2dd1..a5901fc69387 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -398,8 +398,7 @@ public class StreamerUtil {
         getMergeMode(conf), payloadClassName, getMergeStrategyId(conf), 
OptionsResolver.getOrderingFieldsStr(conf), HoodieTableVersion.current());
     String mergeMode = 
mergeConf.get(HoodieTableConfig.RECORD_MERGE_MODE.key());
     String mergeStrategyId = 
mergeConf.get(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key());
-    ValidationUtils.checkArgument(mergeMode != null && mergeStrategyId != null,
-        "Both merge mode and merge strategy id should not be null");
+    ValidationUtils.checkArgument(mergeMode != null, "Merge mode should not be 
null");
     return Triple.of(RecordMergeMode.valueOf(mergeMode), payloadClassName, 
mergeStrategyId);
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 21b8d5a2b0be..6b5f79c6872b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -961,7 +961,8 @@ public class TestData {
         .build();
     // deal with partial update merger
     if 
(config.getString(HoodieTableConfig.PAYLOAD_CLASS_NAME).contains(PartialUpdateAvroPayload.class.getSimpleName())
-        || 
config.getString(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID).equalsIgnoreCase(HoodieRecordMerger.CUSTOM_MERGE_STRATEGY_UUID))
 {
+        || (config.getString(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID) != 
null
+        && 
config.getString(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID).equalsIgnoreCase(HoodieRecordMerger.CUSTOM_MERGE_STRATEGY_UUID)))
 {
       config.setValue(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
PartialUpdateFlinkRecordMerger.class.getName());
     }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
index 27bbf96a38df..d823a01f5be1 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
@@ -28,7 +28,6 @@ import 
org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
 import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.EventTimeAvroPayload;
-import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.model.PartialUpdateAvroPayload;
@@ -54,6 +53,7 @@ import java.io.File;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
@@ -109,7 +109,7 @@ public class TestFlinkWriteClients {
     HoodieTableConfig tableConfig = metaClient.getTableConfig();
 
     assertThat(tableConfig.getRecordMergeMode(), 
is(RecordMergeMode.EVENT_TIME_ORDERING));
-    assertThat(tableConfig.getRecordMergeStrategyId(), 
is(HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID));
+    assertNull(tableConfig.getRecordMergeStrategyId());
     assertThat(tableConfig.getPayloadClass(), 
is(DefaultHoodieRecordPayload.class.getName()));
 
     HoodieWriteConfig writeConfig = 
FlinkWriteClients.getHoodieClientConfig(conf, false, false);
@@ -129,7 +129,7 @@ public class TestFlinkWriteClients {
     HoodieTableConfig tableConfig = metaClient.getTableConfig();
 
     assertThat(tableConfig.getRecordMergeMode(), 
is(RecordMergeMode.COMMIT_TIME_ORDERING));
-    assertThat(tableConfig.getRecordMergeStrategyId(), 
is(HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID));
+    assertNull(tableConfig.getRecordMergeStrategyId());
     assertThat(tableConfig.getPayloadClass(), 
is(OverwriteWithLatestAvroPayload.class.getName()));
 
 
@@ -154,7 +154,7 @@ public class TestFlinkWriteClients {
     HoodieTableConfig tableConfig = metaClient.getTableConfig();
 
     assertThat(tableConfig.getRecordMergeMode(), 
is(RecordMergeMode.EVENT_TIME_ORDERING));
-    assertThat(tableConfig.getRecordMergeStrategyId(), 
is(HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID));
+    assertNull(tableConfig.getRecordMergeStrategyId());
     assertThat(tableConfig.getPayloadClass(), 
is(DefaultHoodieRecordPayload.class.getName()));
 
     HoodieWriteConfig writeConfig = 
FlinkWriteClients.getHoodieClientConfig(conf, false, false);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
index f8d9fd55f98d..8e19c8c2e8d4 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
@@ -21,7 +21,6 @@ package org.apache.hudi.utils;
 import org.apache.hudi.client.model.PartialUpdateFlinkRecordMerger;
 import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.model.EventTimeAvroPayload;
-import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.model.PartialUpdateAvroPayload;
 import org.apache.hudi.common.table.HoodieTableConfig;
@@ -52,6 +51,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -70,7 +70,7 @@ class TestStreamerUtil {
     Triple<RecordMergeMode, String, String> mergeBehavior = 
StreamerUtil.inferMergingBehavior(conf);
     assertEquals(RecordMergeMode.EVENT_TIME_ORDERING, mergeBehavior.getLeft());
     assertEquals(EventTimeAvroPayload.class.getName(), 
mergeBehavior.getMiddle());
-    assertEquals(HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID, 
mergeBehavior.getRight());
+    assertNull(mergeBehavior.getRight());
 
     // set commit time merge mode
     conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
@@ -78,7 +78,7 @@ class TestStreamerUtil {
     mergeBehavior = StreamerUtil.inferMergingBehavior(conf);
     assertEquals(RecordMergeMode.COMMIT_TIME_ORDERING, 
mergeBehavior.getLeft());
     assertEquals(OverwriteWithLatestAvroPayload.class.getName(), 
mergeBehavior.getMiddle());
-    assertEquals(HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, 
mergeBehavior.getRight());
+    assertNull(mergeBehavior.getRight());
 
     // set partial update merger.
     conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
@@ -86,7 +86,7 @@ class TestStreamerUtil {
     mergeBehavior = StreamerUtil.inferMergingBehavior(conf);
     assertEquals(RecordMergeMode.EVENT_TIME_ORDERING, mergeBehavior.getLeft());
     assertEquals(PartialUpdateAvroPayload.class.getName(), 
mergeBehavior.getMiddle());
-    assertEquals(HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID, 
mergeBehavior.getRight());
+    assertNull(mergeBehavior.getRight());
 
     // set partial update payload
     conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
@@ -94,7 +94,7 @@ class TestStreamerUtil {
     mergeBehavior = StreamerUtil.inferMergingBehavior(conf);
     assertEquals(RecordMergeMode.EVENT_TIME_ORDERING, mergeBehavior.getLeft());
     assertEquals(PartialUpdateAvroPayload.class.getName(), 
mergeBehavior.getMiddle());
-    assertEquals(HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID, 
mergeBehavior.getRight());
+    assertNull(mergeBehavior.getRight());
 
     // set partial update payload
     conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
@@ -103,7 +103,7 @@ class TestStreamerUtil {
     mergeBehavior = StreamerUtil.inferMergingBehavior(conf);
     assertEquals(RecordMergeMode.EVENT_TIME_ORDERING, mergeBehavior.getLeft());
     assertEquals(PartialUpdateAvroPayload.class.getName(), 
mergeBehavior.getMiddle());
-    assertEquals(HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID, 
mergeBehavior.getRight());
+    assertNull(mergeBehavior.getRight());
   }
 
   @Test
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
index 25042d59783e..a59ae8c5c3f2 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
@@ -599,11 +599,11 @@ class TestHoodieTableConfig extends 
HoodieCommonTestHarness {
 
         // Test case: Version 9 table with null payload class and event time 
ordering
         arguments("Version 9 with event time ordering", EVENT_TIME_ORDERING, 
null, EVENT_TIME_BASED_MERGE_STRATEGY_UUID, "ts", HoodieTableVersion.NINE,
-            2, EVENT_TIME_ORDERING.name(), null, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, null, null, null, null, null),
+            1, EVENT_TIME_ORDERING.name(), null, null, null, null, null, null, 
null),
 
         // Test case: Version 9 table with null payload class and commit time 
ordering
         arguments("Version 9 with commit time ordering", COMMIT_TIME_ORDERING, 
null, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, null, HoodieTableVersion.NINE,
-            2, COMMIT_TIME_ORDERING.name(), null, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, null, null, null, null, null),
+            1, COMMIT_TIME_ORDERING.name(), null, null, null, null, null, 
null, null),
 
         // Test case: Version 9 table with null payload class and custom merge 
mode
         arguments("Version 9 with custom merge mode", CUSTOM, null, 
CUSTOM_MERGE_STRATEGY_UUID, null, HoodieTableVersion.NINE,
@@ -615,38 +615,38 @@ class TestHoodieTableConfig extends 
HoodieCommonTestHarness {
 
         // Test case: Version 9 table with event time based payload 
(DefaultHoodieRecordPayload)
         arguments("Version 9 with DefaultHoodieRecordPayload", null, 
DefaultHoodieRecordPayload.class.getName(), null, "ts", HoodieTableVersion.NINE,
-            3, EVENT_TIME_ORDERING.name(), null, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, 
DefaultHoodieRecordPayload.class.getName(), null, null, null, null),
+            2, EVENT_TIME_ORDERING.name(), null, null, 
DefaultHoodieRecordPayload.class.getName(), null, null, null, null),
 
         // Test case: Version 9 table with commit time based payload 
(OverwriteWithLatestAvroPayload)
         arguments("Version 9 with OverwriteWithLatestAvroPayload", null, 
OverwriteWithLatestAvroPayload.class.getName(), null, null, 
HoodieTableVersion.NINE,
-            3, COMMIT_TIME_ORDERING.name(), null, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, 
OverwriteWithLatestAvroPayload.class.getName(), null, null, null, null),
+            2, COMMIT_TIME_ORDERING.name(), null, null, 
OverwriteWithLatestAvroPayload.class.getName(), null, null, null, null),
 
         // Test case: Version 9 table with PartialUpdateAvroPayload (should 
set partial update mode)
         arguments("Version 9 with PartialUpdateAvroPayload", null, 
PartialUpdateAvroPayload.class.getName(), null, "ts", HoodieTableVersion.NINE,
-            4, EVENT_TIME_ORDERING.name(), null, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, PartialUpdateAvroPayload.class.getName(), 
PartialUpdateMode.IGNORE_DEFAULTS.name(),
+            3, EVENT_TIME_ORDERING.name(), null, null, 
PartialUpdateAvroPayload.class.getName(), 
PartialUpdateMode.IGNORE_DEFAULTS.name(),
             null, null, null),
 
         // Test case: Version 9 table with 
OverwriteNonDefaultsWithLatestAvroPayload (should set partial update mode)
         arguments("Version 9 with OverwriteNonDefaultsWithLatestAvroPayload", 
null, OverwriteNonDefaultsWithLatestAvroPayload.class.getName(), null, null, 
HoodieTableVersion.NINE,
-            4, COMMIT_TIME_ORDERING.name(), null, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, 
OverwriteNonDefaultsWithLatestAvroPayload.class.getName(),
+            3, COMMIT_TIME_ORDERING.name(), null, null, 
OverwriteNonDefaultsWithLatestAvroPayload.class.getName(),
             PartialUpdateMode.IGNORE_DEFAULTS.name(), null, null, null),
 
         // Test case: Version 9 table with PostgresDebeziumAvroPayload (should 
set partial update mode and custom properties)
         arguments("Version 9 with PostgresDebeziumAvroPayload", null, 
PostgresDebeziumAvroPayload.class.getName(), null, "ts", 
HoodieTableVersion.NINE,
-            8, EVENT_TIME_ORDERING.name(), null, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, 
PostgresDebeziumAvroPayload.class.getName(),
+            7, EVENT_TIME_ORDERING.name(), null, null, 
PostgresDebeziumAvroPayload.class.getName(),
             PartialUpdateMode.FILL_UNAVAILABLE.name(), 
HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE, "_change_operation_type", "d"),
 
         // Test case: Version 9 table with AWSDmsAvroPayload (should set 
custom delete properties)
         arguments("Version 9 with AWSDmsAvroPayload", null, 
AWSDmsAvroPayload.class.getName(), null, null, HoodieTableVersion.NINE,
-            5, COMMIT_TIME_ORDERING.name(), null, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, AWSDmsAvroPayload.class.getName(), null, 
null, "Op", "D"),
+            4, COMMIT_TIME_ORDERING.name(), null, null, 
AWSDmsAvroPayload.class.getName(), null, null, "Op", "D"),
 
         // Test case: Version 9 table with EventTimeAvroPayload (event time 
based payload)
         arguments("Version 9 with EventTimeAvroPayload", null, 
EventTimeAvroPayload.class.getName(), null, "ts", HoodieTableVersion.NINE,
-            3, EVENT_TIME_ORDERING.name(), null, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, EventTimeAvroPayload.class.getName(), 
null, null, null, null),
+            2, EVENT_TIME_ORDERING.name(), null, null, 
EventTimeAvroPayload.class.getName(), null, null, null, null),
 
         // Test case: Version 9 table with MySqlDebeziumAvroPayload (event 
time based payload)
         arguments("Version 9 with MySqlDebeziumAvroPayload", null, 
MySqlDebeziumAvroPayload.class.getName(), null, "ts", HoodieTableVersion.NINE,
-            6, EVENT_TIME_ORDERING.name(), null, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, MySqlDebeziumAvroPayload.class.getName(), 
null, null, "_change_operation_type", "d")
+            5, EVENT_TIME_ORDERING.name(), null, null, 
MySqlDebeziumAvroPayload.class.getName(), null, null, "_change_operation_type", 
"d")
     );
   }
 
@@ -747,9 +747,8 @@ class TestHoodieTableConfig extends HoodieCommonTestHarness 
{
     // Test case: Empty string payload class should be treated as null
     Map<String, String> configs = 
HoodieTableConfig.inferMergingConfigsForV9TableCreation(
         EVENT_TIME_ORDERING, "", EVENT_TIME_BASED_MERGE_STRATEGY_UUID, "ts", 
HoodieTableVersion.NINE);
-    assertEquals(2, configs.size());
+    assertEquals(1, configs.size());
     assertEquals(EVENT_TIME_ORDERING.name(), 
configs.get(RECORD_MERGE_MODE.key()));
-    assertEquals(EVENT_TIME_BASED_MERGE_STRATEGY_UUID, 
configs.get(RECORD_MERGE_STRATEGY_ID.key()));
 
     // Test case: Non-version 9 table with all parameters should throw
     
assertExceptionWithInferMergingConfigsForV9TableCreation(EVENT_TIME_ORDERING, 
DefaultHoodieRecordPayload.class.getName(),
@@ -758,9 +757,8 @@ class TestHoodieTableConfig extends HoodieCommonTestHarness 
{
     // Test case: Version 9 table with null ordering field for event time 
ordering should still work
     configs = HoodieTableConfig.inferMergingConfigsForV9TableCreation(
         EVENT_TIME_ORDERING, null, EVENT_TIME_BASED_MERGE_STRATEGY_UUID, null, 
HoodieTableVersion.NINE);
-    assertEquals(2, configs.size());
+    assertEquals(1, configs.size());
     assertEquals(EVENT_TIME_ORDERING.name(), 
configs.get(RECORD_MERGE_MODE.key()));
-    assertEquals(EVENT_TIME_BASED_MERGE_STRATEGY_UUID, 
configs.get(RECORD_MERGE_STRATEGY_ID.key()));
   }
 
   @Test
@@ -774,7 +772,7 @@ class TestHoodieTableConfig extends HoodieCommonTestHarness 
{
         Map<String, String> configs = 
HoodieTableConfig.inferMergingConfigsForV9TableCreation(
             EVENT_TIME_ORDERING, DefaultHoodieRecordPayload.class.getName(),
             EVENT_TIME_BASED_MERGE_STRATEGY_UUID, "ts", version);
-        assertEquals(3, configs.size(), "Table version 9 and above should 
return 3 configs");
+        assertEquals(2, configs.size(), "Table version 9 and above should 
return 2 configs");
       }
     }
   }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index c8e70d5db833..009c2c4f8d8e 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -1137,10 +1137,14 @@ class HoodieSparkSqlWriterInternal {
         tableVersion)
       mergedParams.put(DataSourceWriteOptions.RECORD_MERGE_MODE.key(), 
inferredMergeConfigs.getLeft.name())
       mergedParams.put(HoodieTableConfig.RECORD_MERGE_MODE.key(), 
inferredMergeConfigs.getLeft.name())
-      mergedParams.put(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key(), 
inferredMergeConfigs.getMiddle)
-      mergedParams.put(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), 
inferredMergeConfigs.getMiddle)
-      mergedParams.put(DataSourceWriteOptions.RECORD_MERGE_STRATEGY_ID.key(), 
inferredMergeConfigs.getRight)
-      mergedParams.put(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key(), 
inferredMergeConfigs.getRight)
+      if (inferredMergeConfigs.getMiddle != null) {
+        mergedParams.put(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key(), 
inferredMergeConfigs.getMiddle)
+        mergedParams.put(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), 
inferredMergeConfigs.getMiddle)
+      }
+      if (inferredMergeConfigs.getRight != null) {
+        
mergedParams.put(DataSourceWriteOptions.RECORD_MERGE_STRATEGY_ID.key(), 
inferredMergeConfigs.getRight)
+        mergedParams.put(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key(), 
inferredMergeConfigs.getRight)
+      }
     } else {
       mergedParams.put(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), 
mergedParams(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key()))
       mergedParams.put(HoodieTableConfig.RECORD_MERGE_MODE.key(), 
mergedParams(HoodieWriteConfig.RECORD_MERGE_MODE.key()))
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index c9807da5a434..520fd996b74f 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -314,6 +314,21 @@ object HoodieWriterUtils {
           && currentPartitionFields != tableConfigPartitionFields) {
           
diffConfigs.append(s"PartitionPath:\t$currentPartitionFields\t$tableConfigPartitionFields\n")
         }
+        // The value of `HoodieTableConfig.RECORD_MERGE_STRATEGY_ID` can be 
NULL or non-NULL.
+        // The non-NULL value has been validated above in the regular code 
path.
+        // Here we check the NULL case since if the value is NULL, the check 
is skipped above.
+        // So here we check if the write config contains non-null merge 
strategy id. If so, throw.
+        // Here are two exclusions:
+        // CASE 1: For < v9 tables, we skip check completely for backward 
compatibility.
+        // CASE 2: For >= v9 tables, merge-into queries.
+        if (tableConfig.getInt(HoodieTableConfig.VERSION) >= 
HoodieTableVersion.NINE.versionCode()
+          && !params.getOrElse(PAYLOAD_CLASS_NAME.key(), 
"").equals(EXPRESSION_PAYLOAD_CLASS_NAME)
+          && 
StringUtils.isNullOrEmpty(tableConfig.getStringOrDefault(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key,
 null))) {
+          val mergeStrategyId = 
params.getOrElse(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key(), null)
+          if (!StringUtils.isNullOrEmpty(mergeStrategyId)) {
+            
diffConfigs.append(s"${HoodieTableConfig.RECORD_MERGE_STRATEGY_ID}:\t$mergeStrategyId\tnull\n")
+          }
+        }
       }
 
       if (diffConfigs.nonEmpty) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
index 13f5ba7ba60d..d3452220d7b0 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
@@ -726,47 +726,46 @@ public class TestUpgradeDowngrade extends 
SparkClientFunctionalTestHarness {
   }
 
   private void validateVersion8Properties(HoodieTableConfig tableConfig) {
+    validatePropertiesForV8Plus(tableConfig);
+    
assertTrue(tableConfig.contains(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID),
+        "Record merge strategy ID should be set for V8");
+  }
+
+  private void validateVersion9Properties(HoodieTableMetaClient metaClient, 
HoodieTableConfig tableConfig) {
+    validatePropertiesForV8Plus(tableConfig);
+
+    // Check if index metadata exists and has proper version information
+    Option<HoodieIndexMetadata> indexMetadata = metaClient.getIndexMetadata();
+    if (indexMetadata.isPresent()) {
+      indexMetadata.get().getIndexDefinitions().forEach((indexName, indexDef) 
-> {
+        assertNotNull(indexDef.getVersion(), 
+            "Index " + indexName + " should have version information in V9");
+      });
+    }
+  }
+
+  private void validatePropertiesForV8Plus(HoodieTableConfig tableConfig) {
     Option<TimelineLayoutVersion> layoutVersion = 
tableConfig.getTimelineLayoutVersion();
     assertTrue(layoutVersion.isPresent(), "Timeline layout version should be 
present for V8+");
     assertEquals(TimelineLayoutVersion.LAYOUT_VERSION_2, layoutVersion.get(),
         "Timeline layout should be V2 for V8+");
-
     assertTrue(tableConfig.contains(HoodieTableConfig.TIMELINE_PATH),
-        "Timeline path should be set for V8");
+        "Timeline path should be set for V8+");
     assertEquals(HoodieTableConfig.TIMELINE_PATH.defaultValue(),
         tableConfig.getString(HoodieTableConfig.TIMELINE_PATH),
         "Timeline path should have default value");
-    
     assertTrue(tableConfig.contains(HoodieTableConfig.RECORD_MERGE_MODE),
-        "Record merge mode should be set for V8");
+        "Record merge mode should be set for V8+");
     RecordMergeMode mergeMode = tableConfig.getRecordMergeMode();
     assertNotNull(mergeMode, "Merge mode should not be null");
-    
-    
assertTrue(tableConfig.contains(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID),
-        "Record merge strategy ID should be set for V8");
-    
     assertTrue(tableConfig.contains(HoodieTableConfig.INITIAL_VERSION),
-        "Initial version should be set for V8");
-    
+        "Initial version should be set for V8+");
     if (tableConfig.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)) {
       assertTrue(tableConfig.contains(HoodieTableConfig.KEY_GENERATOR_TYPE),
           "Key generator type should be set when key generator class is 
present");
     }
   }
 
-  private void validateVersion9Properties(HoodieTableMetaClient metaClient, 
HoodieTableConfig tableConfig) {
-    validateVersion8Properties(tableConfig);
-
-    // Check if index metadata exists and has proper version information
-    Option<HoodieIndexMetadata> indexMetadata = metaClient.getIndexMetadata();
-    if (indexMetadata.isPresent()) {
-      indexMetadata.get().getIndexDefinitions().forEach((indexName, indexDef) 
-> {
-        assertNotNull(indexDef.getVersion(), 
-            "Index " + indexName + " should have version information in V9");
-      });
-    }
-  }
-
   /**
    * Read table data for validation purposes.
    */
@@ -780,15 +779,15 @@ public class TestUpgradeDowngrade extends 
SparkClientFunctionalTestHarness {
           .load(basePath);
 
       assertNotNull(tableData, "Table read should not return null " + stage);
-      
+
       // Force execution to ensure data is read immediately (not lazily)
       List<Row> rows = tableData.collectAsList();
       long rowCount = rows.size();
       assertTrue(rowCount >= 0, "Row count should be non-negative " + stage);
-      
+
       // Convert collected rows back to Dataset for use in validation
       Dataset<Row> materializedData = sqlContext().createDataFrame(rows, 
tableData.schema());
-      
+
       LOG.info("Successfully read and materialized table data {} ({} rows)", 
stage, rowCount);
       return materializedData;
     } catch (Exception e) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index 37fdd03be341..496c7b1790f2 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -19,7 +19,7 @@ package org.apache.hudi
 
 import org.apache.hudi.DataSourceWriteOptions.{DROP_INSERT_DUP_POLICY, 
FAIL_INSERT_DUP_POLICY, INSERT_DROP_DUPS, INSERT_DUP_POLICY}
 import org.apache.hudi.client.SparkRDDWriteClient
-import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig}
+import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig, 
RecordMergeMode}
 import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, 
HoodieFileFormat, HoodieRecord, HoodieRecordPayload, 
HoodieReplaceCommitMetadata, HoodieTableType, WriteOperationType}
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.table.timeline.TimelineUtils
@@ -618,10 +618,20 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
       
.setPartitionFields(fooTableParams(DataSourceWriteOptions.PARTITIONPATH_FIELD.key))
       
.setKeyGeneratorClassProp(fooTableParams.getOrElse(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key,
         DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.defaultValue()))
-      if(addBootstrapPath) {
-        tableMetaClientBuilder
-          
.setBootstrapBasePath(fooTableParams(HoodieBootstrapConfig.BASE_PATH.key))
-      }
+    if 
(fooTableParams.contains(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key())) {
+      
tableMetaClientBuilder.setPayloadClassName(fooTableParams(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key))
+    }
+    if (fooTableParams.contains(HoodieWriteConfig.RECORD_MERGE_MODE.key)) {
+      tableMetaClientBuilder.setRecordMergeMode(RecordMergeMode.valueOf(
+        fooTableParams(HoodieWriteConfig.RECORD_MERGE_MODE.key)))
+    }
+    if 
(fooTableParams.contains(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key)) {
+      
tableMetaClientBuilder.setRecordMergeStrategyId(fooTableParams(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key))
+    }
+    if(addBootstrapPath) {
+      tableMetaClientBuilder
+        
.setBootstrapBasePath(fooTableParams(HoodieBootstrapConfig.BASE_PATH.key))
+    }
     if (initBasePath) {
       
tableMetaClientBuilder.initTable(HadoopFSUtils.getStorageConfWithCopy(sc.hadoopConfiguration),
 tempBasePath)
     }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterWithTestFormat.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterWithTestFormat.scala
index d5cafe06d5cf..0282b7375552 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterWithTestFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterWithTestFormat.scala
@@ -19,7 +19,7 @@ package org.apache.hudi
 
 
 import org.apache.hudi.client.SparkRDDWriteClient
-import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.config.{HoodieMetadataConfig, RecordMergeMode}
 import org.apache.hudi.common.model._
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
@@ -489,6 +489,15 @@ class TestHoodieSparkSqlWriterWithTestFormat extends 
HoodieSparkWriterTestBase {
       
.setPartitionFields(fooTableParams(DataSourceWriteOptions.PARTITIONPATH_FIELD.key))
       
.setKeyGeneratorClassProp(fooTableParams.getOrElse(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key,
         DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.defaultValue()))
+    if 
(fooTableParams.contains(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key)) {
+      
tableMetaClientBuilder.setPayloadClassName(fooTableParams(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key))
+    }
+    if (fooTableParams.contains(HoodieWriteConfig.RECORD_MERGE_MODE.key)) {
+      
tableMetaClientBuilder.setRecordMergeMode(RecordMergeMode.valueOf(fooTableParams(HoodieWriteConfig.RECORD_MERGE_MODE.key)))
+    }
+    if 
(fooTableParams.contains(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key)) {
+      
tableMetaClientBuilder.setRecordMergeStrategyId(fooTableParams(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key))
+    }
     if (addBootstrapPath) {
       tableMetaClientBuilder
         
.setBootstrapBasePath(fooTableParams(HoodieBootstrapConfig.BASE_PATH.key))
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
index 1146371be515..0db232308ca2 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
@@ -450,8 +450,7 @@ object TestPayloadDeprecationFlow {
         "false",
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
-          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[DefaultHoodieRecordPayload].getName,
-          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID)
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[DefaultHoodieRecordPayload].getName)
       ),
       Arguments.of(
         "COPY_ON_WRITE",
@@ -459,8 +458,7 @@ object TestPayloadDeprecationFlow {
         "true",
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
-          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[DefaultHoodieRecordPayload].getName,
-          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID)
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[DefaultHoodieRecordPayload].getName)
       ),
       Arguments.of(
         "COPY_ON_WRITE",
@@ -468,8 +466,7 @@ object TestPayloadDeprecationFlow {
         "false",
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
-          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[OverwriteWithLatestAvroPayload].getName,
-          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID)
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[OverwriteWithLatestAvroPayload].getName)
       ),
       Arguments.of(
         "COPY_ON_WRITE",
@@ -478,7 +475,6 @@ object TestPayloadDeprecationFlow {
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
           HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[PartialUpdateAvroPayload].getName,
-          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
           HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS")
       ),
       Arguments.of(
@@ -488,7 +484,6 @@ object TestPayloadDeprecationFlow {
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
           HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[PostgresDebeziumAvroPayload].getName,
-          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
           HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "FILL_UNAVAILABLE",
           HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + 
HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE
             -> "__debezium_unavailable_value")
@@ -500,8 +495,8 @@ object TestPayloadDeprecationFlow {
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
           HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[MySqlDebeziumAvroPayload].getName,
-          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
-          HoodieTableConfig.ORDERING_FIELDS.key() -> 
(DebeziumConstants.FLATTENED_FILE_COL_NAME + "," + 
DebeziumConstants.FLATTENED_POS_COL_NAME))),
+          HoodieTableConfig.ORDERING_FIELDS.key() -> 
(DebeziumConstants.FLATTENED_FILE_COL_NAME + "," + 
DebeziumConstants.FLATTENED_POS_COL_NAME))
+      ),
       Arguments.of(
         "COPY_ON_WRITE",
         classOf[AWSDmsAvroPayload].getName,
@@ -509,7 +504,6 @@ object TestPayloadDeprecationFlow {
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
           HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[AWSDmsAvroPayload].getName,
-          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
           HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + 
DefaultHoodieRecordPayload.DELETE_KEY -> "Op",
           HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + 
DefaultHoodieRecordPayload.DELETE_MARKER -> "D")
       ),
@@ -519,9 +513,7 @@ object TestPayloadDeprecationFlow {
         "false",
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
-          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[EventTimeAvroPayload].getName,
-          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID
-        )
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[EventTimeAvroPayload].getName)
       ),
       Arguments.of(
         "COPY_ON_WRITE",
@@ -530,9 +522,7 @@ object TestPayloadDeprecationFlow {
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
           HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName,
-          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
-          HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS"
-        )
+          HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS")
       ),
       Arguments.of(
         "MERGE_ON_READ",
@@ -540,8 +530,7 @@ object TestPayloadDeprecationFlow {
         "false",
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
-          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[DefaultHoodieRecordPayload].getName,
-          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID)
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[DefaultHoodieRecordPayload].getName)
       ),
       Arguments.of(
         "MERGE_ON_READ",
@@ -549,8 +538,7 @@ object TestPayloadDeprecationFlow {
         "true",
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
-          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[DefaultHoodieRecordPayload].getName,
-          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID)
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[DefaultHoodieRecordPayload].getName)
       ),
       Arguments.of(
         "MERGE_ON_READ",
@@ -558,8 +546,7 @@ object TestPayloadDeprecationFlow {
         "false",
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
-          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[OverwriteWithLatestAvroPayload].getName,
-          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID)
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[OverwriteWithLatestAvroPayload].getName)
       ),
       Arguments.of(
         "MERGE_ON_READ",
@@ -568,7 +555,6 @@ object TestPayloadDeprecationFlow {
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
           HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[PartialUpdateAvroPayload].getName,
-          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
           HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS")
       ),
       Arguments.of(
@@ -578,7 +564,6 @@ object TestPayloadDeprecationFlow {
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
           HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[PostgresDebeziumAvroPayload].getName,
-          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
           HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "FILL_UNAVAILABLE",
           HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + 
HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE
             -> "__debezium_unavailable_value")
@@ -590,8 +575,8 @@ object TestPayloadDeprecationFlow {
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
           HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[MySqlDebeziumAvroPayload].getName,
-          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
-          HoodieTableConfig.ORDERING_FIELDS.key() -> 
(DebeziumConstants.FLATTENED_FILE_COL_NAME + "," + 
DebeziumConstants.FLATTENED_POS_COL_NAME))),
+          HoodieTableConfig.ORDERING_FIELDS.key() -> 
(DebeziumConstants.FLATTENED_FILE_COL_NAME + "," + 
DebeziumConstants.FLATTENED_POS_COL_NAME))
+      ),
       Arguments.of(
         "MERGE_ON_READ",
         classOf[AWSDmsAvroPayload].getName,
@@ -599,7 +584,6 @@ object TestPayloadDeprecationFlow {
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
           HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[AWSDmsAvroPayload].getName,
-          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
           HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + 
DefaultHoodieRecordPayload.DELETE_KEY -> "Op",
           HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + 
DefaultHoodieRecordPayload.DELETE_MARKER -> "D")
       ),
@@ -609,9 +593,7 @@ object TestPayloadDeprecationFlow {
         "false",
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
-          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[EventTimeAvroPayload].getName,
-          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID
-        )
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[EventTimeAvroPayload].getName)
       ),
       Arguments.of(
         "MERGE_ON_READ",
@@ -620,9 +602,7 @@ object TestPayloadDeprecationFlow {
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
           HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> 
classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName,
-          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
-          HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS"
-        )
+          HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS")
       )
     )
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
index 123d23954efa..5ccde850c5b7 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
@@ -21,7 +21,7 @@ package org.apache.hudi.functional
 
 import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
DataSourceWriteOptions, HoodieDataSourceHelpers}
 import org.apache.hudi.common.config.{HoodieMetadataConfig, RecordMergeMode}
-import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord, 
HoodieRecordMerger, HoodieTableType, OverwriteWithLatestAvroPayload, 
PartialUpdateAvroPayload}
+import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, 
EventTimeAvroPayload, HoodieRecord, HoodieRecordMerger, HoodieTableType, 
OverwriteWithLatestAvroPayload, PartialUpdateAvroPayload}
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.table.read.CustomPayloadForTesting
@@ -433,7 +433,9 @@ class TestSparkDataSource extends 
SparkClientFunctionalTestHarness {
     val metaClient = 
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
     assertEquals(tableVersion, 
metaClient.getTableConfig.getTableVersion.versionCode())
     assertEquals(mergeMode, metaClient.getTableConfig.getRecordMergeMode)
-    assertEquals(strategyId, 
metaClient.getTableConfig.getRecordMergeStrategyId)
+    if (tableVersion < 9) {
+      assertEquals(strategyId, 
metaClient.getTableConfig.getRecordMergeStrategyId)
+    }
 
     val df1 = df.limit(1)
     val diffMergeMode = if (mergeMode == RecordMergeMode.EVENT_TIME_ORDERING) {
@@ -445,35 +447,51 @@ class TestSparkDataSource extends 
SparkClientFunctionalTestHarness {
       df1.write.format("hudi")
         .option(HoodieWriteConfig.RECORD_MERGE_MODE.key, diffMergeMode.name)
         .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
+        .option(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key, "false")
         .mode(SaveMode.Append)
         .save(basePath)
       val finalDf = spark.read.format("hudi")
         .options(readOpts)
         .load(basePath)
+      df1.write.format("hudi")
+        .option(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key, 
classOf[EventTimeAvroPayload].getName)
+        .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
+        .option(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key, "false")
+        .mode(SaveMode.Append)
+        .save(basePath)
+      df1.write.format("hudi")
+        .option(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key, 
classOf[EventTimeAvroPayload].getName)
+        .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
+        .option(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key, "false")
+        .mode(SaveMode.Append)
+        .save(basePath)
       assertEquals(399, finalDf.count())
     } else {
       Assertions.assertThrows(classOf[HoodieException], () => {
         df1.write.format("hudi")
-          .option(HoodieWriteConfig.RECORD_MERGE_MODE.key, "any_other_payload")
+          .option(HoodieWriteConfig.RECORD_MERGE_MODE.key, diffMergeMode.name)
+          .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
+          .option(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key, "false")
+          .mode(SaveMode.Append)
+          .save(basePath)
+      })
+      Assertions.assertThrows(classOf[HoodieException], () => {
+        df1.write.format("hudi")
+          .option(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key, 
classOf[EventTimeAvroPayload].getName)
           .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
+          .option(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key, "false")
+          .mode(SaveMode.Append)
+          .save(basePath)
+      })
+      Assertions.assertThrows(classOf[HoodieException], () => {
+        df1.write.format("hudi")
+          .option(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key, 
HoodieRecordMerger.CUSTOM_MERGE_STRATEGY_UUID)
+          .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
+          .option(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key, "false")
           .mode(SaveMode.Append)
           .save(basePath)
       })
     }
-    Assertions.assertThrows(classOf[HoodieException], () => {
-      df1.write.format("hudi")
-        .option(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key, 
"any_other_payload")
-        .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
-        .mode(SaveMode.Append)
-        .save(basePath)
-    })
-    Assertions.assertThrows(classOf[HoodieException], () => {
-      df1.write.format("hudi")
-        .option(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key, 
"any_other_strategy_id")
-        .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
-        .mode(SaveMode.Append)
-        .save(basePath)
-    })
   }
 
   def ingestNewBatch(tableType: HoodieTableType, recordsToUpdate: Integer, 
structType: StructType, inserts: java.util.List[Row],
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala
index 1bb82673acdf..627c6a34e606 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala
@@ -545,6 +545,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
 
       // disable this config to avoid affect other test in this class.
       spark.sql(s"set hoodie.sql.insert.mode=upsert")
+      spark.sql("set hoodie.merge.allow.duplicate.on.inserts=true")
     }
   }
 
@@ -818,7 +819,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
         ("date", "DATE'2021-05-20'")
       )
       typeAndValue.foreach { case (partitionType, partitionValue) =>
-        val tableName = generateTableName
+        val tableName = s"${generateTableName}_timestamp_type"
         spark.sql(s"set 
hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled=true")
         validateDifferentTypesOfPartitionColumn(tmp, partitionType, 
partitionValue, tableName)
       }
@@ -1010,7 +1011,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
     withSQLConf("hoodie.sql.insert.mode" -> "non-strict") {
       withRecordType()(withTempDir { tmp =>
         Seq("cow", "mor").foreach { tableType =>
-          withTable(generateTableName) { tableMultiPartition =>
+          withTable(s"${generateTableName}_multi_partition") { 
tableMultiPartition =>
             spark.sql(
               s"""
                  |create table $tableMultiPartition (
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeModeCommitTimeOrdering.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeModeCommitTimeOrdering.scala
index 7f7e3a9f4986..629c61af6c3a 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeModeCommitTimeOrdering.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeModeCommitTimeOrdering.scala
@@ -33,7 +33,8 @@ class TestMergeModeCommitTimeOrdering extends 
HoodieSparkSqlTestBase {
   Seq(
     "cow,current,false,false", "cow,current,false,true", 
"cow,current,true,false",
     "mor,current,false,false", "mor,current,false,true", 
"mor,current,true,false",
-    "cow,6,true,false", "cow,6,true,true", "mor,6,true,true").foreach { args =>
+    "cow,6,true,false", "cow,6,true,true", "mor,6,true,true",
+    "cow,8,true,false", "cow,8,true,true", "mor,8,true,true").foreach { args =>
     val argList = args.split(',')
     val tableType = argList(0)
     val tableVersion = if (argList(1).equals("current")) {
@@ -63,15 +64,23 @@ class TestMergeModeCommitTimeOrdering extends 
HoodieSparkSqlTestBase {
     } else {
       ""
     }
-    val expectedMergeConfigs = if (tableVersion.toInt == 6) {
-      Map(
-        HoodieTableConfig.VERSION.key -> "6",
-        HoodieTableConfig.PAYLOAD_CLASS_NAME.key -> 
classOf[OverwriteWithLatestAvroPayload].getName)
-    } else {
-      Map(
-        HoodieTableConfig.VERSION.key -> tableVersion,
-        HoodieTableConfig.RECORD_MERGE_MODE.key -> COMMIT_TIME_ORDERING.name(),
-        HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key -> 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID)
+    val expectedMergeConfigs: Map[String, String] = tableVersion.toInt match {
+      case 6 =>
+        Map(
+          HoodieTableConfig.VERSION.key -> "6",
+          HoodieTableConfig.PAYLOAD_CLASS_NAME.key -> 
classOf[OverwriteWithLatestAvroPayload].getName
+        )
+      case 8 =>
+        Map(
+          HoodieTableConfig.VERSION.key -> tableVersion,
+          HoodieTableConfig.RECORD_MERGE_MODE.key -> 
COMMIT_TIME_ORDERING.name(),
+          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key -> 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID
+        )
+      case _ =>
+        Map(
+          HoodieTableConfig.VERSION.key -> tableVersion,
+          HoodieTableConfig.RECORD_MERGE_MODE.key -> 
COMMIT_TIME_ORDERING.name()
+        )
     }
     val nonExistentConfigs = if (tableVersion.toInt == 6) {
       Seq(HoodieTableConfig.RECORD_MERGE_MODE.key, 
HoodieTableConfig.ORDERING_FIELDS.key)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeModeEventTimeOrdering.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeModeEventTimeOrdering.scala
index a3bc2af09106..cbcc9c5989fd 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeModeEventTimeOrdering.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeModeEventTimeOrdering.scala
@@ -62,23 +62,29 @@ class TestMergeModeEventTimeOrdering extends 
HoodieSparkSqlTestBase {
     } else {
       ""
     }
-    val writeTableVersionClause = if (tableVersion.toInt == 6) {
-      s"hoodie.write.table.version = $tableVersion,"
-    } else {
-      ""
+    val writeTableVersionClause = tableVersion.toInt match {
+      case 6 => s"hoodie.write.table.version = $tableVersion,"
+      case 8 => s"hoodie.write.table.version = $tableVersion,"
+      case _ => ""
     }
-    val expectedMergeConfigs = if (tableVersion.toInt == 6) {
-      Map(
-        HoodieTableConfig.VERSION.key -> "6",
-        HoodieTableConfig.PAYLOAD_CLASS_NAME.key -> 
classOf[DefaultHoodieRecordPayload].getName,
-        HoodieTableConfig.ORDERING_FIELDS.key -> "ts"
-      )
-    } else {
-      Map(
-        HoodieTableConfig.VERSION.key -> 
String.valueOf(HoodieTableVersion.current().versionCode()),
-        HoodieTableConfig.ORDERING_FIELDS.key -> "ts",
-        HoodieTableConfig.RECORD_MERGE_MODE.key -> EVENT_TIME_ORDERING.name(),
-        HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key -> 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID)
+    val expectedMergeConfigs: Map[String, String] = tableVersion.toInt match {
+      case 6 =>
+        Map(
+          HoodieTableConfig.VERSION.key -> "6",
+          HoodieTableConfig.PAYLOAD_CLASS_NAME.key -> 
classOf[DefaultHoodieRecordPayload].getName,
+          HoodieTableConfig.ORDERING_FIELDS.key -> "ts"
+        )
+      case 8 =>
+        Map(
+          HoodieTableConfig.VERSION.key -> "8",
+          HoodieTableConfig.RECORD_MERGE_MODE.key -> 
EVENT_TIME_ORDERING.name(),
+          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key -> 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID
+        )
+      case _ =>
+        Map(
+          HoodieTableConfig.VERSION.key -> "9",
+          HoodieTableConfig.RECORD_MERGE_MODE.key -> EVENT_TIME_ORDERING.name()
+        )
     }
     val nonExistentConfigs = if (tableVersion.toInt == 6) {
       Seq(HoodieTableConfig.RECORD_MERGE_MODE.key)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
index 726bbd5070b8..b0a33e203127 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
@@ -141,7 +141,6 @@ class TestRepairsProcedure extends 
HoodieSparkProcedureTestBase {
           |[hoodie.datasource.write.hive_style_partitioning,true,null]
           |[hoodie.datasource.write.partitionpath.urlencode,false,null]
           |[hoodie.record.merge.mode,EVENT_TIME_ORDERING,null]
-          
|[hoodie.record.merge.strategy.id,eeb8d96f-b1e4-49fd-bbf8-28ac514178e5,null]
           |[hoodie.table.checksum,,]
           |[hoodie.table.create.schema,,]
           |[hoodie.table.format,native,null]
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 5b5d4d8b83ef..afaf728d3fe0 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -59,6 +59,7 @@ import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.common.util.collection.Pair;
@@ -1136,16 +1137,21 @@ public class StreamSync implements Serializable, 
Closeable {
                     .withInlineCompaction(cfg.isInlineCompactionEnabled())
                     .build()
             )
-            .withPayloadConfig(
-                HoodiePayloadConfig.newBuilder()
-                    .withPayloadClass(cfg.payloadClassName)
-                    .withPayloadOrderingFields(cfg.sourceOrderingFields)
-                    .build())
             .withRecordMergeMode(cfg.recordMergeMode)
-            .withRecordMergeStrategyId(cfg.recordMergeStrategyId)
             .withRecordMergeImplClasses(cfg.recordMergeImplClasses)
             .forTable(cfg.targetTableName)
             .withProps(props);
+    // Merge strategy id can be NULL.
+    if (!StringUtils.isNullOrEmpty(cfg.recordMergeStrategyId)) {
+      builder.withRecordMergeStrategyId(cfg.recordMergeStrategyId);
+    }
+    HoodiePayloadConfig.Builder payloadConfigBuilder =
+        
HoodiePayloadConfig.newBuilder().withPayloadOrderingFields(cfg.sourceOrderingFields);
+    // Payload class can be NULL.
+    if (!StringUtils.isNullOrEmpty(cfg.payloadClassName)) {
+      payloadConfigBuilder.withPayloadClass(cfg.payloadClassName);
+    }
+    builder.withPayloadConfig(payloadConfigBuilder.build());
 
     // If schema is required in the config, we need to handle the case where 
the target schema is null and should be fetched from previous commits
     final Schema returnSchema;
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index c9614a765a8a..89781741e39f 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -212,7 +212,6 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
       opts.put(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
DefaultSparkRecordMerger.class.getName());
       opts.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet");
       opts.put(HoodieWriteConfig.RECORD_MERGE_MODE.key(), 
RecordMergeMode.EVENT_TIME_ORDERING.name());
-      opts.put(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key(), 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID);
       for (Map.Entry<String, String> entry : opts.entrySet()) {
         hoodieConfig.add(String.format("%s=%s", entry.getKey(), 
entry.getValue()));
       }


Reply via email to