This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bea55a6441 [HUDI-8413] Change the default merge mode to 
COMMIT_TIME_ORDERING (#12600)
4bea55a6441 is described below

commit 4bea55a644154618b3a25dad2618b13a400dfcce
Author: Y Ethan Guo <[email protected]>
AuthorDate: Tue Jan 28 08:19:24 2025 -0800

    [HUDI-8413] Change the default merge mode to COMMIT_TIME_ORDERING (#12600)
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   4 +-
 .../upgrade/EightToSevenDowngradeHandler.java      |   4 +-
 .../org/apache/hudi/DefaultSparkRecordMerger.java  |   2 +-
 .../apache/hudi/common/config/ConfigProperty.java  |   1 -
 .../hudi/common/model/HoodieRecordMerger.java      |  24 ++-
 .../hudi/common/model/HoodieRecordPayload.java     |   6 +-
 .../hudi/common/table/HoodieTableConfig.java       | 184 ++++++++++---------
 .../hudi/common/table/HoodieTableMetaClient.java   |   3 +-
 .../apache/hudi/configuration/FlinkOptions.java    |   2 +-
 .../apache/hudi/streamer/FlinkStreamerConfig.java  |   2 +-
 .../hudi/common/table/TestHoodieTableConfig.java   | 201 ++++++++++++++-------
 .../hudi/common/table/read/TestCustomMerger.java   |   4 +-
 .../common/table/read/TestEventTimeMerging.java    |   4 -
 .../table/read/TestOverwriteWithLatestMerger.java  |   2 -
 .../hudi/hadoop/DefaultHiveRecordMerger.java       |   2 +-
 .../scala/org/apache/hudi/DataSourceOptions.scala  |   3 -
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  39 ++--
 .../spark/sql/hudi/ProvidesHoodieConfig.scala      |   6 +-
 .../functional/TestHoodieBackedMetadata.java       |  10 +-
 .../org/apache/hudi/TestHoodieSparkSqlWriter.scala |   8 +-
 .../sql/hudi/common/TestHoodieOptionConfig.scala   |   4 +-
 .../apache/spark/sql/hudi/common/TestSqlConf.scala |   4 +-
 .../MaxwellJsonKafkaSourcePostProcessor.java       |   3 +-
 .../hudi/utilities/streamer/BootstrapExecutor.java |   3 +-
 .../hudi/utilities/streamer/HoodieStreamer.java    |   7 +-
 .../utilities/streamer/HoodieStreamerUtils.java    |   4 +-
 .../deltastreamer/HoodieDeltaStreamerTestBase.java |   3 +-
 .../deltastreamer/TestHoodieDeltaStreamer.java     |   4 +-
 28 files changed, 329 insertions(+), 214 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index a3f903f1c86..99d46ad08e4 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -166,7 +166,7 @@ public class HoodieWriteConfig extends HoodieConfig {
 
   public static final ConfigProperty<String> PRECOMBINE_FIELD_NAME = 
ConfigProperty
       .key("hoodie.datasource.write.precombine.field")
-      .defaultValue("ts")
+      .noDefaultValue()
       .withDocumentation("Field used in preCombining before actual write. When 
two records have the same key value, "
           + "we will pick the one with the largest value for the precombine 
field, determined by Object.compareTo(..)");
 
@@ -181,7 +181,7 @@ public class HoodieWriteConfig extends HoodieConfig {
   // This ConfigProperty is also used in SQL options which expect String type
   public static final ConfigProperty<String> RECORD_MERGE_MODE = ConfigProperty
       .key("hoodie.write.record.merge.mode")
-      .defaultValue(RecordMergeMode.EVENT_TIME_ORDERING.name())
+      .noDefaultValue("COMMIT_TIME_ORDERING if ordering field is not set; 
EVENT_TIME_ORDERING if ordering field is set")
       .sinceVersion("1.0.0")
       .withDocumentation(RecordMergeMode.class);
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
index 6193067eb84..4980fcec4f7 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
@@ -166,7 +166,9 @@ public class EightToSevenDowngradeHandler implements 
DowngradeHandler {
 
   static void unsetRecordMergeMode(HoodieTableConfig tableConfig, 
Map<ConfigProperty, String> tablePropsToAdd) {
     Triple<RecordMergeMode, String, String> mergingConfigs =
-        
HoodieTableConfig.inferCorrectMergingBehavior(tableConfig.getRecordMergeMode(), 
tableConfig.getPayloadClass(), tableConfig.getRecordMergeStrategyId());
+        HoodieTableConfig.inferCorrectMergingBehavior(
+            tableConfig.getRecordMergeMode(), tableConfig.getPayloadClass(),
+            tableConfig.getRecordMergeStrategyId(), 
tableConfig.getPreCombineField());
     if (StringUtils.nonEmpty(mergingConfigs.getMiddle())) {
       tablePropsToAdd.put(HoodieTableConfig.PAYLOAD_CLASS_NAME, 
mergingConfigs.getMiddle());
     }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/DefaultSparkRecordMerger.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/DefaultSparkRecordMerger.java
index 33bd74f2bf6..652b016cf50 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/DefaultSparkRecordMerger.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/DefaultSparkRecordMerger.java
@@ -40,7 +40,7 @@ public class DefaultSparkRecordMerger extends 
HoodieSparkRecordMerger {
 
   @Override
   public String getMergingStrategy() {
-    return HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID;
+    return HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigProperty.java 
b/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigProperty.java
index aa2cf642309..af8fae69d41 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigProperty.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigProperty.java
@@ -254,7 +254,6 @@ public class ConfigProperty<T> implements Serializable {
     }
 
     public <T> ConfigProperty<T> defaultValue(T value, String 
docOnDefaultValue) {
-      Objects.requireNonNull(value);
       Objects.requireNonNull(docOnDefaultValue);
       ConfigProperty<T> configProperty = new ConfigProperty<>(key, value, 
docOnDefaultValue, "", Option.empty(), Option.empty(), Option.empty(), 
Collections.emptySet(), false);
       return configProperty;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
index 9628e9ced24..f6d1cb1db3a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
@@ -20,6 +20,7 @@ package org.apache.hudi.common.model;
 
 import org.apache.hudi.ApiMaturityLevel;
 import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.table.HoodieTableConfig;
@@ -35,6 +36,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import static org.apache.hudi.common.util.StringUtils.nonEmpty;
+
 /**
  * HoodieMerge defines how to merge two records. It is a stateless component.
  * It can implement the merging logic of HoodieRecord of different engines
@@ -44,7 +47,7 @@ import java.util.List;
 public interface HoodieRecordMerger extends Serializable {
 
   // Uses event time ordering to determine which record is chosen
-  String DEFAULT_MERGE_STRATEGY_UUID = "eeb8d96f-b1e4-49fd-bbf8-28ac514178e5";
+  String EVENT_TIME_BASED_MERGE_STRATEGY_UUID = 
"eeb8d96f-b1e4-49fd-bbf8-28ac514178e5";
 
   // Always chooses the most recently written record
   String COMMIT_TIME_BASED_MERGE_STRATEGY_UUID = 
"ce9acb64-bde0-424c-9b91-f6ebba25356d";
@@ -183,4 +186,23 @@ public interface HoodieRecordMerger extends Serializable {
    */
   String getMergingStrategy();
 
+  static String getRecordMergeStrategyId(RecordMergeMode mergeMode,
+                                         String payloadClassName,
+                                         String recordMergeStrategyId) {
+    switch (mergeMode) {
+      case COMMIT_TIME_ORDERING:
+        return COMMIT_TIME_BASED_MERGE_STRATEGY_UUID;
+      case EVENT_TIME_ORDERING:
+        return EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
+      case CUSTOM:
+      default:
+        if (nonEmpty(recordMergeStrategyId)) {
+          return recordMergeStrategyId;
+        }
+        if (nonEmpty(payloadClassName)) {
+          return PAYLOAD_BASED_MERGE_STRATEGY_UUID;
+        }
+        return null;
+    }
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
index 39ff6ceb369..543562da375 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
@@ -146,14 +146,16 @@ public interface HoodieRecordPayload<T extends 
HoodieRecordPayload> extends Seri
     return 0;
   }
 
-  static String getAvroPayloadForMergeMode(RecordMergeMode mergeMode) {
+  static String getAvroPayloadForMergeMode(RecordMergeMode mergeMode, String 
payloadClassName) {
     switch (mergeMode) {
       //TODO: After we have merge mode working for writing, we should have a 
dummy payload that will throw exception when used
-      default:
       case EVENT_TIME_ORDERING:
         return DefaultHoodieRecordPayload.class.getName();
       case COMMIT_TIME_ORDERING:
         return OverwriteWithLatestAvroPayload.class.getName();
+      case CUSTOM:
+      default:
+        return payloadClassName;
     }
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index b225bacce0c..1b85a1707ae 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.model.BootstrapIndexType;
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieTimelineTimeZone;
@@ -76,9 +77,9 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.common.config.HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY;
+import static 
org.apache.hudi.common.config.RecordMergeMode.COMMIT_TIME_ORDERING;
 import static org.apache.hudi.common.config.RecordMergeMode.CUSTOM;
 import static 
org.apache.hudi.common.config.RecordMergeMode.EVENT_TIME_ORDERING;
-import static 
org.apache.hudi.common.config.RecordMergeMode.COMMIT_TIME_ORDERING;
 import static 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.DATE_TIME_PARSER;
 import static 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.INPUT_TIME_UNIT;
 import static 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAMP_INPUT_DATE_FORMAT;
@@ -88,13 +89,14 @@ import static 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAM
 import static 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT;
 import static 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAMP_TIMEZONE_FORMAT;
 import static 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD;
-import static 
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID;
 import static 
org.apache.hudi.common.model.HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID;
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
 import static 
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
 import static org.apache.hudi.common.util.ConfigUtils.fetchConfigs;
 import static org.apache.hudi.common.util.ConfigUtils.recoverIfNeeded;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static org.apache.hudi.common.util.StringUtils.nonEmpty;
 import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
 
 @Immutable
@@ -198,7 +200,8 @@ public class HoodieTableConfig extends HoodieConfig {
   
   public static final ConfigProperty<RecordMergeMode> RECORD_MERGE_MODE = 
ConfigProperty
       .key("hoodie.record.merge.mode")
-      .defaultValue(RecordMergeMode.EVENT_TIME_ORDERING)
+      .defaultValue((RecordMergeMode) null,
+          "COMMIT_TIME_ORDERING if precombine is not set; EVENT_TIME_ORDERING 
if precombine is set")
       .sinceVersion("1.0.0")
       .withDocumentation(RecordMergeMode.class);
 
@@ -736,7 +739,7 @@ public class HoodieTableConfig extends HoodieConfig {
   }
 
   public RecordMergeMode getRecordMergeMode() {
-    return RecordMergeMode.getValue(getStringOrDefault(RECORD_MERGE_MODE));
+    return RecordMergeMode.getValue(getString(RECORD_MERGE_MODE));
   }
 
   /**
@@ -754,94 +757,113 @@ public class HoodieTableConfig extends HoodieConfig {
    * Infers the merging behavior based on what the user sets (or doesn't set).
    * Validates that the user has not set an illegal combination of configs
    */
-  public static Triple<RecordMergeMode, String, String> 
inferCorrectMergingBehavior(RecordMergeMode recordMergeMode, String 
payloadClassName,
-                                                                               
     String recordMergeStrategyId) {
+  public static Triple<RecordMergeMode, String, String> 
inferCorrectMergingBehavior(RecordMergeMode recordMergeMode,
+                                                                               
     String payloadClassName,
+                                                                               
     String recordMergeStrategyId,
+                                                                               
     String orderingFieldName) {
     RecordMergeMode inferredRecordMergeMode;
     String inferredPayloadClassName;
     String inferredRecordMergeStrategyId;
 
-    if (isNullOrEmpty(payloadClassName)) {
-      if (isNullOrEmpty(recordMergeStrategyId)) {
-        // no payload class name or merge strategy ID. If nothing is set then 
we default. User cannot set custom because no payload or strategy is set
-        checkArgument(recordMergeMode != RecordMergeMode.CUSTOM,
-            "Custom merge mode should only be used if you set a payload class 
or merge strategy ID");
-        if (recordMergeMode == null) {
-          inferredRecordMergeMode = RECORD_MERGE_MODE.defaultValue();
-        } else {
-          inferredRecordMergeMode = recordMergeMode;
-        }
+    // Inferring record merge mode
+    if (isNullOrEmpty(payloadClassName) && 
isNullOrEmpty(recordMergeStrategyId)) {
+      // If nothing is set on record merge mode, payload class, or record 
merge strategy ID,
+      // use the default merge mode determined by whether the ordering field 
name is set.
+      inferredRecordMergeMode = recordMergeMode != null
+          ? recordMergeMode
+          : (isNullOrEmpty(orderingFieldName) ? COMMIT_TIME_ORDERING : 
EVENT_TIME_ORDERING);
+    } else {
+      // Infer the merge mode from either the payload class or record merge 
strategy ID
+      RecordMergeMode modeBasedOnPayload = 
inferRecordMergeModeFromPayloadClass(payloadClassName);
+      RecordMergeMode modeBasedOnStrategyId = 
inferRecordMergeModeFromMergeStrategyId(recordMergeStrategyId);
+      checkArgument(modeBasedOnPayload != null || modeBasedOnStrategyId != 
null,
+          String.format("Cannot infer record merge mode from payload class 
(%s) or record merge "
+              + "strategy ID (%s).", payloadClassName, recordMergeStrategyId));
+      // TODO(HUDI-8925): once payload class name is not required, remove the 
check on
+      //  modeBasedOnStrategyId
+      if (modeBasedOnStrategyId != CUSTOM && modeBasedOnPayload != null && 
modeBasedOnStrategyId != null) {
+        checkArgument(modeBasedOnPayload.equals(modeBasedOnStrategyId),
+            String.format("Configured payload class (%s) and record merge 
strategy ID (%s) conflict "
+                    + "with each other. Please only set one of them in the 
write config.",
+                payloadClassName, recordMergeStrategyId));
+      }
+      inferredRecordMergeMode = modeBasedOnStrategyId != null ? 
modeBasedOnStrategyId : modeBasedOnPayload;
+    }
+    if (recordMergeMode != null) {
+      checkArgument(inferredRecordMergeMode == recordMergeMode,
+          String.format("Configured record merge mode (%s) is inconsistent 
with payload class (%s) "
+                  + "or record merge strategy ID (%s) configured. Please 
revisit the configs.",
+              recordMergeMode, payloadClassName, recordMergeStrategyId));
+    }
 
-        // set merger strategy based on merge mode
-        if (inferredRecordMergeMode == COMMIT_TIME_ORDERING) {
-          inferredRecordMergeStrategyId = 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID;
-        } else if (inferredRecordMergeMode == EVENT_TIME_ORDERING) {
-          inferredRecordMergeStrategyId = DEFAULT_MERGE_STRATEGY_UUID;
-        } else {
-          throw new IllegalStateException("Merge Mode: '" + 
inferredRecordMergeMode + "' has not been fully implemented.");
-        }
-      } else {
-        // no payload class but merge strategy ID is set. Need to validate 
that strategy and merge mode align if both are set
-        inferredRecordMergeStrategyId = recordMergeStrategyId;
-        if (recordMergeStrategyId.equals(DEFAULT_MERGE_STRATEGY_UUID)) {
-          checkArgument(recordMergeMode == null || recordMergeMode == 
EVENT_TIME_ORDERING,
-              "Default merge strategy ID can only be used with the merge mode 
of EVENT_TIME_ORDERING");
-          inferredRecordMergeMode = EVENT_TIME_ORDERING;
-        } else if 
(recordMergeStrategyId.equals(COMMIT_TIME_BASED_MERGE_STRATEGY_UUID)) {
-          checkArgument(recordMergeMode == null || recordMergeMode == 
COMMIT_TIME_ORDERING,
-              "Commit time ordering merger strategy ID can only be used with 
the merge mode of COMMIT_TIME_ORDERING");
-          inferredRecordMergeMode = COMMIT_TIME_ORDERING;
-        } else {
-          
checkArgument(!recordMergeStrategyId.equals(PAYLOAD_BASED_MERGE_STRATEGY_UUID),
-              "Payload based strategy should only be used if you have a custom 
payload class set");
-          checkArgument(recordMergeMode == null || recordMergeMode == CUSTOM,
-              "Record merge mode must be set to custom when using a custom 
merge strategy ID");
-          inferredRecordMergeMode = CUSTOM;
-        }
+    // Check ordering field name based on record merge mode
+    if (inferredRecordMergeMode == COMMIT_TIME_ORDERING) {
+      if (nonEmpty(orderingFieldName)) {
+        LOG.warn("The precombine or ordering field ({}) is specified. 
COMMIT_TIME_ORDERING "
+            + "merge mode does not use precombine or ordering field anymore.", 
orderingFieldName);
       }
-      inferredPayloadClassName = 
HoodieRecordPayload.getAvroPayloadForMergeMode(inferredRecordMergeMode);
-    } else {
-      // payload class name is set
-      inferredPayloadClassName = payloadClassName;
-      if (payloadClassName.equals(DefaultHoodieRecordPayload.class.getName())) 
{
-        // Default payload matches with EVENT_TIME_ORDERING. However, Custom 
merge modes still have some gaps (tracked by [HUDI-8317]) so
-        // we will use default merger for now on the write path if the user 
has a custom merger. After all gaps have been closed, we will set
-        // a dummy payload by default for custom merge mode. Then we can get 
rid of this if else, and add the validation:
-        // checkArgument(isNullOrEmpty(recordMergeStrategyId) || 
recordMergeStrategyId.equals(DEFAULT_MERGER_STRATEGY_UUID),
-        //   "Record merge strategy cannot be set if a merge payload is used");
-        if (isNullOrEmpty(recordMergeStrategyId) || 
recordMergeStrategyId.equals(DEFAULT_MERGE_STRATEGY_UUID)) {
-          // Default case, everything should be null or event time ordering / 
default strategy
-          checkArgument(recordMergeMode == null || recordMergeMode == 
EVENT_TIME_ORDERING,
-              "Only the record merge mode of EVENT_TIME_ORDERING can be used 
with default payload");
-          inferredRecordMergeMode = EVENT_TIME_ORDERING;
-          inferredRecordMergeStrategyId = DEFAULT_MERGE_STRATEGY_UUID;
-        } else {
-          // currently for the custom case. This block will be moved below and 
check if the payload class name is dummy
-          checkArgument(recordMergeMode == null || recordMergeMode == CUSTOM, 
"Record merge mode, payload class, and merge strategy are in an illegal 
configuration");
-          checkArgument(
-              
!recordMergeStrategyId.equals(COMMIT_TIME_BASED_MERGE_STRATEGY_UUID) && 
!recordMergeStrategyId.equals(PAYLOAD_BASED_MERGE_STRATEGY_UUID),
-              "Record merger strategy is incompatible with payload class");
-          inferredRecordMergeMode = CUSTOM;
-          inferredRecordMergeStrategyId = recordMergeStrategyId;
-        }
-      } else if 
(payloadClassName.equals(OverwriteWithLatestAvroPayload.class.getName())) {
-        // strategy and merge mode must be unset or align with overwrite
-        checkArgument(isNullOrEmpty(recordMergeStrategyId) || 
recordMergeStrategyId.equals(COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
-            "Record merge strategy cannot be set if a merge payload is used");
-        checkArgument(recordMergeMode == null || recordMergeMode == 
COMMIT_TIME_ORDERING, "Only commit time ordering merge mode can be used with 
overwrite payload");
-        inferredRecordMergeMode = COMMIT_TIME_ORDERING;
-        inferredRecordMergeStrategyId = COMMIT_TIME_BASED_MERGE_STRATEGY_UUID;
-      } else {
-        // using custom avro payload
-        checkArgument(isNullOrEmpty(recordMergeStrategyId) || 
recordMergeStrategyId.equals(PAYLOAD_BASED_MERGE_STRATEGY_UUID),
-            "Record merge strategy cannot be set if a merge payload is used");
-        checkArgument(recordMergeMode == null || recordMergeMode == CUSTOM, 
"Record merge mode must be custom if payload is defined");
-        inferredRecordMergeMode = CUSTOM;
-        inferredRecordMergeStrategyId = PAYLOAD_BASED_MERGE_STRATEGY_UUID;
+    } else if (inferredRecordMergeMode == EVENT_TIME_ORDERING) {
+      if (isNullOrEmpty(orderingFieldName)) {
+        LOG.warn("The precombine or ordering field is not specified. 
EVENT_TIME_ORDERING "
+            + "merge mode requires precombine or ordering field to be set for 
getting the "
+            + "event time. Using commit time-based ordering now.");
       }
     }
+
+    // Inferring payload class name
+    inferredPayloadClassName = HoodieRecordPayload.getAvroPayloadForMergeMode(
+        inferredRecordMergeMode, payloadClassName);
+    // Inferring record merge strategy ID
+    inferredRecordMergeStrategyId = 
HoodieRecordMerger.getRecordMergeStrategyId(
+        inferredRecordMergeMode, inferredPayloadClassName, 
recordMergeStrategyId);
+
+    // For custom merge mode, either payload class name or record merge 
strategy ID must be configured
+    if (inferredRecordMergeMode == CUSTOM) {
+      checkArgument(nonEmpty(inferredPayloadClassName) || 
nonEmpty(inferredRecordMergeStrategyId),
+          "Either payload class name or record merge strategy ID must be 
configured "
+              + "in CUSTOM merge mode.");
+      if 
(PAYLOAD_BASED_MERGE_STRATEGY_UUID.equals(inferredRecordMergeStrategyId)) {
+        checkArgument(nonEmpty(inferredPayloadClassName),
+            "For payload class based merge strategy as a fallback, payload 
class name is "
+                + "required to be set.");
+      }
+      // TODO(HUDI-8925): remove this once the payload class name is no longer 
required
+      if (isNullOrEmpty(inferredPayloadClassName)) {
+        inferredPayloadClassName = DEFAULT_PAYLOAD_CLASS_NAME;
+      }
+    }
+
     return Triple.of(inferredRecordMergeMode, inferredPayloadClassName, 
inferredRecordMergeStrategyId);
   }
 
+  static RecordMergeMode inferRecordMergeModeFromPayloadClass(String 
payloadClassName) {
+    if (isNullOrEmpty(payloadClassName)) {
+      return null;
+    }
+    if (DefaultHoodieRecordPayload.class.getName().equals(payloadClassName)) {
+      // DefaultHoodieRecordPayload matches with EVENT_TIME_ORDERING.
+      return EVENT_TIME_ORDERING;
+    } else if 
(payloadClassName.equals(OverwriteWithLatestAvroPayload.class.getName())) {
+      // OverwriteWithLatestAvroPayload matches with COMMIT_TIME_ORDERING.
+      return COMMIT_TIME_ORDERING;
+    } else {
+      return CUSTOM;
+    }
+  }
+
+  static RecordMergeMode inferRecordMergeModeFromMergeStrategyId(String 
recordMergeStrategyId) {
+    if (isNullOrEmpty(recordMergeStrategyId)) {
+      return null;
+    }
+    if (recordMergeStrategyId.equals(EVENT_TIME_BASED_MERGE_STRATEGY_UUID)) {
+      return EVENT_TIME_ORDERING;
+    } else if 
(recordMergeStrategyId.equals(COMMIT_TIME_BASED_MERGE_STRATEGY_UUID)) {
+      return COMMIT_TIME_ORDERING;
+    } else {
+      return CUSTOM;
+    }
+  }
+
   public String getPreCombineField() {
     return getString(PRECOMBINE_FIELD);
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 199fc770096..25ad815f600 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -1393,7 +1393,8 @@ public class HoodieTableMetaClient implements 
Serializable {
       }
 
       Triple<RecordMergeMode, String, String> mergeConfigs =
-          HoodieTableConfig.inferCorrectMergingBehavior(recordMergeMode, 
payloadClassName, recordMergerStrategyId);
+          HoodieTableConfig.inferCorrectMergingBehavior(
+              recordMergeMode, payloadClassName, recordMergerStrategyId, 
preCombineField);
       tableConfig.setValue(RECORD_MERGE_MODE, mergeConfigs.getLeft().name());
       tableConfig.setValue(PAYLOAD_CLASS_NAME.key(), mergeConfigs.getMiddle());
       tableConfig.setValue(RECORD_MERGE_STRATEGY_ID, mergeConfigs.getRight());
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 36af35435fe..d65c80f89e9 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -142,7 +142,7 @@ public class FlinkOptions extends HoodieConfig {
   public static final ConfigOption<String> RECORD_MERGER_STRATEGY_ID = 
ConfigOptions
       .key("record.merger.strategy")
       .stringType()
-      .defaultValue(HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID)
+      .defaultValue(HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID)
       .withFallbackKeys(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key())
       .withDescription("Id of merger strategy. Hudi will pick 
HoodieRecordMerger implementations in record.merger.impls which has the same 
merger strategy id");
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index 66f81221c34..a58140635c9 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -132,7 +132,7 @@ public class FlinkStreamerConfig extends Configuration {
 
   @Parameter(names = {"--record-merger-strategy"}, description = "Id of record 
merger strategy. Hudi will pick HoodieRecordMerger implementations in 
record-merger-impls "
       + "which has the same record merger strategy id")
-  public String recordMergerStrategy = 
HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID;
+  public String recordMergerStrategy = 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
 
   @Parameter(names = {"--op"}, description = "Takes one of these values : 
UPSERT (default), INSERT (use when input "
       + "is purely new data/inserts to gain speed).", converter = 
OperationConverter.class)
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
index 5e23391e0ff..3dacf596258 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
@@ -54,11 +54,11 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.stream.Stream;
 
+import static 
org.apache.hudi.common.config.RecordMergeMode.COMMIT_TIME_ORDERING;
 import static org.apache.hudi.common.config.RecordMergeMode.CUSTOM;
 import static 
org.apache.hudi.common.config.RecordMergeMode.EVENT_TIME_ORDERING;
-import static 
org.apache.hudi.common.config.RecordMergeMode.COMMIT_TIME_ORDERING;
-import static 
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID;
 import static 
org.apache.hudi.common.model.HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID;
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
 import static 
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
 import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
 import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_CHECKSUM;
@@ -270,7 +270,7 @@ public class TestHoodieTableConfig extends 
HoodieCommonTestHarness {
     HoodieConfig config = new HoodieConfig();
     config.setValue(HoodieTableConfig.VERSION, 
String.valueOf(HoodieTableVersion.SIX.versionCode()));
     config.setValue(HoodieTableConfig.INITIAL_VERSION, 
String.valueOf(HoodieTableVersion.EIGHT.versionCode()));
-    config.setValue(RECORD_MERGE_MODE, 
RECORD_MERGE_MODE.defaultValue().name());
+    config.setValue(RECORD_MERGE_MODE, COMMIT_TIME_ORDERING.name());
 
     HoodieTableConfig.dropInvalidConfigs(config);
     assertTrue(config.contains(HoodieTableConfig.VERSION));
@@ -280,7 +280,7 @@ public class TestHoodieTableConfig extends 
HoodieCommonTestHarness {
     // test valid ones are not dropped
     config = new HoodieConfig();
     config.setValue(HoodieTableConfig.VERSION, 
String.valueOf(HoodieTableVersion.EIGHT.versionCode()));
-    config.setValue(RECORD_MERGE_MODE, 
RECORD_MERGE_MODE.defaultValue().name());
+    config.setValue(RECORD_MERGE_MODE, COMMIT_TIME_ORDERING.name());
     HoodieTableConfig.dropInvalidConfigs(config);
     assertTrue(config.contains(RECORD_MERGE_MODE));
   }
@@ -300,84 +300,155 @@ public class TestHoodieTableConfig extends 
HoodieCommonTestHarness {
     String overwritePayload = OverwriteWithLatestAvroPayload.class.getName();
     String customPayload = "custom_payload";
     String customStrategy = "custom_strategy";
+    String orderingFieldName = "timestamp";
 
     Stream<Arguments> arguments = Stream.of(
         //test empty args with both null and ""
-        arguments(null, null, null, false, EVENT_TIME_ORDERING, 
defaultPayload, DEFAULT_MERGE_STRATEGY_UUID),
-        arguments(null, "", "", false, EVENT_TIME_ORDERING, defaultPayload, 
DEFAULT_MERGE_STRATEGY_UUID),
+        arguments(null, null, null, null,
+            false, COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+        arguments(null, null, null, "",
+            false, COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+        arguments(null, null, null, orderingFieldName,
+            false, EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+        arguments(null, "", "", null,
+            false, COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+        arguments(null, "", "", orderingFieldName,
+            false, EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
 
         //test legal event time ordering combos
-        arguments(EVENT_TIME_ORDERING, null, null, false, EVENT_TIME_ORDERING, 
defaultPayload, DEFAULT_MERGE_STRATEGY_UUID),
-        arguments(EVENT_TIME_ORDERING, defaultPayload, null, false, 
EVENT_TIME_ORDERING, defaultPayload, DEFAULT_MERGE_STRATEGY_UUID),
-        arguments(EVENT_TIME_ORDERING, defaultPayload, 
DEFAULT_MERGE_STRATEGY_UUID, false, EVENT_TIME_ORDERING, defaultPayload,
-            DEFAULT_MERGE_STRATEGY_UUID),
-        arguments(EVENT_TIME_ORDERING, null, DEFAULT_MERGE_STRATEGY_UUID, 
false, EVENT_TIME_ORDERING, defaultPayload, DEFAULT_MERGE_STRATEGY_UUID),
-        arguments(null, defaultPayload, null, false, EVENT_TIME_ORDERING, 
defaultPayload, DEFAULT_MERGE_STRATEGY_UUID),
-        arguments(null, defaultPayload, DEFAULT_MERGE_STRATEGY_UUID, false, 
EVENT_TIME_ORDERING, defaultPayload, DEFAULT_MERGE_STRATEGY_UUID),
-        arguments(null, null, DEFAULT_MERGE_STRATEGY_UUID, false, 
EVENT_TIME_ORDERING, defaultPayload, DEFAULT_MERGE_STRATEGY_UUID),
-
-        //test legal overwrite combos
-        arguments(COMMIT_TIME_ORDERING, null, null, false, 
COMMIT_TIME_ORDERING, overwritePayload, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
-        arguments(COMMIT_TIME_ORDERING, overwritePayload, null, false, 
COMMIT_TIME_ORDERING, overwritePayload, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
-        arguments(COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, false, COMMIT_TIME_ORDERING, 
overwritePayload,
-            COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
-        arguments(COMMIT_TIME_ORDERING, null, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, false, COMMIT_TIME_ORDERING, 
overwritePayload,
-            COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
-        arguments(null, overwritePayload, null, false, COMMIT_TIME_ORDERING, 
overwritePayload, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
-        arguments(null, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, false, COMMIT_TIME_ORDERING, 
overwritePayload,
-            COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
-        arguments(null, null, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, false, 
COMMIT_TIME_ORDERING, overwritePayload, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
-
-        //test legal custom payload combos
-        arguments(CUSTOM, customPayload, null, false, CUSTOM, customPayload, 
PAYLOAD_BASED_MERGE_STRATEGY_UUID),
-        arguments(CUSTOM, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID, 
false, CUSTOM, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID),
-        arguments(null, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID, 
false, CUSTOM, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID),
-        arguments(null, customPayload, null, false, CUSTOM, customPayload, 
PAYLOAD_BASED_MERGE_STRATEGY_UUID),
-
-        //test legal custom merger combos
-        arguments(CUSTOM, null, customStrategy, false, CUSTOM, defaultPayload, 
customStrategy),
-        //for now this case is ok but will need to be changed when we add 
dummy payload for [HUDI-8317]
-        arguments(CUSTOM, defaultPayload, customStrategy, false, CUSTOM, 
defaultPayload, customStrategy),
+        arguments(EVENT_TIME_ORDERING, null, null, null,
+            false, EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+        arguments(EVENT_TIME_ORDERING, null, null, orderingFieldName,
+            false, EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+        arguments(EVENT_TIME_ORDERING, defaultPayload, null, orderingFieldName,
+            false, EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+        arguments(EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, orderingFieldName,
+            false, EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+        arguments(EVENT_TIME_ORDERING, null, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, orderingFieldName,
+            false, EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+        arguments(null, defaultPayload, null, null,
+            false, EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+        arguments(null, defaultPayload, null, orderingFieldName,
+            false, EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+        arguments(null, defaultPayload, EVENT_TIME_BASED_MERGE_STRATEGY_UUID, 
orderingFieldName,
+            false, EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+        arguments(null, null, EVENT_TIME_BASED_MERGE_STRATEGY_UUID, null,
+            false, EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+        arguments(null, null, EVENT_TIME_BASED_MERGE_STRATEGY_UUID, 
orderingFieldName,
+            false, EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+
+        //test legal commit time ordering combos
+        arguments(COMMIT_TIME_ORDERING, null, null, null,
+            false, COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+        arguments(COMMIT_TIME_ORDERING, null, null, "",
+            false, COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+        arguments(COMMIT_TIME_ORDERING, null, null, orderingFieldName,
+            false, COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+        arguments(COMMIT_TIME_ORDERING, overwritePayload, null, null,
+            false, COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+        arguments(COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, null,
+            false, COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+        arguments(COMMIT_TIME_ORDERING, null, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, null,
+            false, COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+        arguments(null, overwritePayload, null, null,
+            false, COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+        arguments(null, overwritePayload, null, "",
+            false, COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+        arguments(null, overwritePayload, null, orderingFieldName,
+            false, COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+        arguments(null, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, null,
+            false, COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+        arguments(null, null, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, null,
+            false, COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+        arguments(null, null, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, 
orderingFieldName,
+            false, COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+
+        //test legal custom merge mode combos
+        arguments(CUSTOM, customPayload, null, null,
+            false, CUSTOM, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID),
+        arguments(CUSTOM, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID, 
null,
+            false, CUSTOM, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID),
+        arguments(null, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID, null,
+            false, CUSTOM, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID),
+        arguments(null, customPayload, null, null,
+            false, CUSTOM, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID),
+        arguments(CUSTOM, null, customStrategy, null,
+            false, CUSTOM, defaultPayload, customStrategy),
+        arguments(CUSTOM, customPayload, customStrategy, null,
+            false, CUSTOM, customPayload, customStrategy),
+
+        //test legal configs that work but should not be used usually
+        arguments(CUSTOM, defaultPayload, customStrategy, null,
+            false, CUSTOM, defaultPayload, customStrategy),
+        arguments(CUSTOM, defaultPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID, 
null,
+            false, CUSTOM, defaultPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID),
+        arguments(CUSTOM, overwritePayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID, 
null,
+            false, CUSTOM, overwritePayload, 
PAYLOAD_BASED_MERGE_STRATEGY_UUID),
+        arguments(null, defaultPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID, 
null,
+            false, CUSTOM, defaultPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID),
+        arguments(null, overwritePayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID, 
null,
+            false, CUSTOM, overwritePayload, 
PAYLOAD_BASED_MERGE_STRATEGY_UUID),
 
         //test illegal combos due to missing info
-        arguments(CUSTOM, null, null, true, null, null, null),
-        arguments(CUSTOM, null, PAYLOAD_BASED_MERGE_STRATEGY_UUID, true, null, 
null, null),
+        arguments(CUSTOM, null, null, null,
+            true, null, null, null),
+        arguments(CUSTOM, null, PAYLOAD_BASED_MERGE_STRATEGY_UUID, null,
+            true, null, null, null),
 
         //test illegal combos
-        arguments(EVENT_TIME_ORDERING, overwritePayload, null, true, null, 
null, null),
-        arguments(EVENT_TIME_ORDERING, customPayload, null, true, null, null, 
null),
-        arguments(EVENT_TIME_ORDERING, null, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, true, null, null, null),
-        arguments(EVENT_TIME_ORDERING, null, customStrategy, true, null, null, 
null),
-        arguments(EVENT_TIME_ORDERING, null, 
PAYLOAD_BASED_MERGE_STRATEGY_UUID, true, null, null, null),
-        arguments(COMMIT_TIME_ORDERING, defaultPayload, null, true, null, 
null, null),
-        arguments(COMMIT_TIME_ORDERING, customPayload, null, true, null, null, 
null),
-        arguments(COMMIT_TIME_ORDERING, null, DEFAULT_MERGE_STRATEGY_UUID, 
true, null, null, null),
-        arguments(COMMIT_TIME_ORDERING, null, customStrategy, true, null, 
null, null),
-        arguments(COMMIT_TIME_ORDERING, null, 
PAYLOAD_BASED_MERGE_STRATEGY_UUID, true, null, null, null),
-        arguments(CUSTOM, defaultPayload, null, true, null, null, null),
-        arguments(CUSTOM, overwritePayload, null, true, null, null, null),
-        arguments(CUSTOM, null, DEFAULT_MERGE_STRATEGY_UUID, true, null, null, 
null),
-        arguments(CUSTOM, null, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, true, 
null, null, null),
-        arguments(CUSTOM, defaultPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID, 
true, null, null, null),
-        arguments(CUSTOM, overwritePayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID, 
true, null, null, null),
-        arguments(CUSTOM, defaultPayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, true, null, null, null),
-        arguments(CUSTOM, overwritePayload, DEFAULT_MERGE_STRATEGY_UUID, true, 
null, null, null),
-        arguments(null, defaultPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID, 
true, null, null, null),
-        arguments(null, overwritePayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID, 
true, null, null, null),
-        arguments(null, defaultPayload, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, 
true, null, null, null),
-        arguments(null, overwritePayload, DEFAULT_MERGE_STRATEGY_UUID, true, 
null, null, null));
+        arguments(EVENT_TIME_ORDERING, overwritePayload, null, 
orderingFieldName,
+            true, null, null, null),
+        arguments(EVENT_TIME_ORDERING, customPayload, null, orderingFieldName,
+            true, null, null, null),
+        arguments(EVENT_TIME_ORDERING, null, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, orderingFieldName,
+            true, null, null, null),
+        arguments(EVENT_TIME_ORDERING, null, customStrategy, orderingFieldName,
+            true, null, null, null),
+        arguments(EVENT_TIME_ORDERING, null, 
PAYLOAD_BASED_MERGE_STRATEGY_UUID, orderingFieldName,
+            true, null, null, null),
+        arguments(COMMIT_TIME_ORDERING, defaultPayload, null, null,
+            true, null, null, null),
+        arguments(COMMIT_TIME_ORDERING, customPayload, null, null,
+            true, null, null, null),
+        arguments(COMMIT_TIME_ORDERING, null, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, null,
+            true, null, null, null),
+        arguments(COMMIT_TIME_ORDERING, null, customStrategy, null,
+            true, null, null, null),
+        arguments(COMMIT_TIME_ORDERING, null, 
PAYLOAD_BASED_MERGE_STRATEGY_UUID, null,
+            true, null, null, null),
+        arguments(CUSTOM, defaultPayload, null, null,
+            true, null, null, null),
+        arguments(CUSTOM, overwritePayload, null, null,
+            true, null, null, null),
+        arguments(CUSTOM, null, EVENT_TIME_BASED_MERGE_STRATEGY_UUID, null,
+            true, null, null, null),
+        arguments(CUSTOM, null, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, null,
+            true, null, null, null),
+        arguments(CUSTOM, defaultPayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, null,
+            true, null, null, null),
+        arguments(CUSTOM, overwritePayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, null,
+            true, null, null, null),
+        arguments(null, defaultPayload, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, 
null,
+            true, null, null, null),
+        arguments(null, overwritePayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, null,
+            true, null, null, null));
     return arguments;
   }
 
   @ParameterizedTest
   @MethodSource("argumentsForInferringRecordMergeMode")
-  public void testInferMergeMode(RecordMergeMode inputMergeMode, String 
inputPayloadClass, String inputMergeStrategy, boolean shouldThrow,
-                                 RecordMergeMode outputMergeMode, String 
outputPayloadClass, String outputMergeStrategy) {
+  public void testInferMergeMode(RecordMergeMode inputMergeMode, String 
inputPayloadClass,
+                                 String inputMergeStrategy, String 
orderingFieldName,
+                                 boolean shouldThrow, RecordMergeMode 
outputMergeMode,
+                                 String outputPayloadClass, String 
outputMergeStrategy) {
     if (shouldThrow) {
       assertThrows(IllegalArgumentException.class,
-          () -> HoodieTableConfig.inferCorrectMergingBehavior(inputMergeMode, 
inputPayloadClass, inputMergeStrategy));
+          () -> HoodieTableConfig.inferCorrectMergingBehavior(
+              inputMergeMode, inputPayloadClass, inputMergeStrategy, 
orderingFieldName));
     } else {
-      Triple<RecordMergeMode, String, String> inferredConfigs = 
HoodieTableConfig.inferCorrectMergingBehavior(inputMergeMode, 
inputPayloadClass, inputMergeStrategy);
+      Triple<RecordMergeMode, String, String> inferredConfigs =
+          HoodieTableConfig.inferCorrectMergingBehavior(
+              inputMergeMode, inputPayloadClass, inputMergeStrategy, 
orderingFieldName);
       assertEquals(outputMergeMode, inferredConfigs.getLeft());
       assertEquals(outputPayloadClass, inferredConfigs.getMiddle());
       assertEquals(outputMergeStrategy, inferredConfigs.getRight());
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java
index 43cd4f7c4f0..31d0700935b 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java
@@ -21,7 +21,6 @@ package org.apache.hudi.common.table.read;
 
 import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
@@ -64,10 +63,9 @@ public class TestCustomMerger extends 
HoodieFileGroupReaderTestHarness {
 
   @Override
   protected Properties getMetaProps() {
-    Properties metaProps =  super.getMetaProps();
+    Properties metaProps = super.getMetaProps();
     metaProps.setProperty(HoodieTableConfig.RECORD_MERGE_MODE.key(), 
RecordMergeMode.CUSTOM.name());
     metaProps.setProperty(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key(), 
CustomAvroMerger.KEEP_CERTAIN_TIMESTAMP_VALUE_ONLY);
-    metaProps.setProperty(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), 
DefaultHoodieRecordPayload.class.getName());
     return metaProps;
   }
 
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java
index 222c3e2d1bd..178b6a43113 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java
@@ -20,9 +20,7 @@
 package org.apache.hudi.common.table.read;
 
 import org.apache.hudi.common.config.RecordMergeMode;
-import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieAvroRecordMerger;
-import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.common.testutils.reader.HoodieAvroRecordTestMerger;
@@ -62,8 +60,6 @@ public class TestEventTimeMerging extends 
HoodieFileGroupReaderTestHarness {
   protected Properties getMetaProps() {
     Properties metaProps =  super.getMetaProps();
     metaProps.setProperty(HoodieTableConfig.RECORD_MERGE_MODE.key(), 
RecordMergeMode.EVENT_TIME_ORDERING.name());
-    metaProps.setProperty(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key(), 
HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID);
-    metaProps.setProperty(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), 
DefaultHoodieRecordPayload.class.getName());
     return metaProps;
   }
 
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestOverwriteWithLatestMerger.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestOverwriteWithLatestMerger.java
index a39809ced14..4453c439535 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestOverwriteWithLatestMerger.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestOverwriteWithLatestMerger.java
@@ -60,8 +60,6 @@ public class TestOverwriteWithLatestMerger extends 
HoodieFileGroupReaderTestHarn
   protected Properties getMetaProps() {
     Properties metaProps =  super.getMetaProps();
     metaProps.setProperty(HoodieTableConfig.RECORD_MERGE_MODE.key(), 
RecordMergeMode.COMMIT_TIME_ORDERING.name());
-    metaProps.setProperty(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key(), 
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID);
-    metaProps.setProperty(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), 
OverwriteWithLatestAvroPayload.class.getName());
     return metaProps;
   }
 
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/DefaultHiveRecordMerger.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/DefaultHiveRecordMerger.java
index bc7a0871917..dc03c755ee7 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/DefaultHiveRecordMerger.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/DefaultHiveRecordMerger.java
@@ -64,6 +64,6 @@ public class DefaultHiveRecordMerger extends 
HoodieHiveRecordMerger {
 
   @Override
   public String getMergingStrategy() {
-    return HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID;
+    return HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 39a8f815768..4ad8cbb25fb 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -857,9 +857,6 @@ object DataSourceWriteOptions {
   /** @deprecated Use {@link PRECOMBINE_FIELD} and its methods instead */
   @Deprecated
   val PRECOMBINE_FIELD_OPT_KEY = HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key()
-  /** @deprecated Use {@link PRECOMBINE_FIELD} and its methods instead */
-  @Deprecated
-  val DEFAULT_PRECOMBINE_FIELD_OPT_VAL = PRECOMBINE_FIELD.defaultValue()
 
   /** @deprecated Use {@link HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME} and 
its methods instead */
   @Deprecated
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index c6a9559b2c8..4a64319752f 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -17,12 +17,6 @@
 
 package org.apache.hudi
 
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericData
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.shims.ShimLoader
 import 
org.apache.hudi.AutoRecordKeyGenerationUtils.mayBeValidateParamsForAutoGenerationOfRecordKeys
 import org.apache.hudi.AvroConversionUtils.{convertAvroSchemaToStructType, 
convertStructTypeToAvroSchema, getAvroRecordNameAndNamespace}
 import 
org.apache.hudi.DataSourceOptionsHelper.fetchMissingWriteConfigsFromTableConfig
@@ -34,33 +28,34 @@ import org.apache.hudi.HoodieSparkUtils.sparkAdapter
 import org.apache.hudi.HoodieWriterUtils._
 import org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema
 import org.apache.hudi.avro.HoodieAvroUtils
-import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
+import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.commit.{DatasetBulkInsertCommitActionExecutor, 
DatasetBulkInsertOverwriteCommitActionExecutor, 
DatasetBulkInsertOverwriteTableCommitActionExecutor}
 import org.apache.hudi.common.config._
 import org.apache.hudi.common.engine.HoodieEngineContext
 import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model._
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
 import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, 
MERGE_ON_READ}
-import org.apache.hudi.common.model._
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType
 import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator
-import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
TableSchemaResolver}
-import org.apache.hudi.common.util.ConfigUtils.getAllConfigKeys
 import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option => 
HOption}
+import org.apache.hudi.common.util.ConfigUtils.getAllConfigKeys
+import org.apache.hudi.config.{HoodieCompactionConfig, HoodieInternalConfig, 
HoodieWriteConfig}
 import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, 
INDEX_CLASS_NAME}
-import 
org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY
+import 
org.apache.hudi.config.HoodieWriteConfig.{SPARK_SQL_MERGE_INTO_PREPPED_KEY, 
WRITE_TABLE_VERSION}
 import org.apache.hudi.config.{HoodieCompactionConfig, HoodieInternalConfig, 
HoodieWriteConfig}
 import org.apache.hudi.exception.{HoodieException, 
HoodieRecordCreationException, HoodieWriteConflictException}
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
-import org.apache.hudi.hive.ddl.HiveSyncMode
 import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool}
+import org.apache.hudi.hive.ddl.HiveSyncMode
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
 import org.apache.hudi.internal.schema.utils.SerDeHelper
+import org.apache.hudi.keygen.{BaseKeyGenerator, 
TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
 import org.apache.hudi.keygen.constant.KeyGeneratorType
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
-import org.apache.hudi.keygen.{BaseKeyGenerator, 
TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
 import org.apache.hudi.metrics.Metrics
 import org.apache.hudi.storage.HoodieStorage
 import org.apache.hudi.sync.common.HoodieSyncConfig
@@ -68,18 +63,26 @@ import org.apache.hudi.sync.common.util.SyncUtilHelpers
 import 
org.apache.hudi.sync.common.util.SyncUtilHelpers.getHoodieMetaSyncException
 import org.apache.hudi.util.SparkConfigUtils.getStringWithAltKeys
 import org.apache.hudi.util.SparkKeyGenUtils
+
+import org.apache.avro.Schema
+import org.apache.avro.generic.GenericData
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.shims.ShimLoader
+import org.apache.spark.{SPARK_VERSION, SparkContext}
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
-import 
org.apache.spark.sql.HoodieDataTypeUtils.tryOverrideParquetWriteLegacyFormatProperty
 import org.apache.spark.sql._
+import 
org.apache.spark.sql.HoodieDataTypeUtils.tryOverrideParquetWriteLegacyFormatProperty
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
 import org.apache.spark.sql.internal.StaticSQLConf
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.{SPARK_VERSION, SparkContext}
 import org.slf4j.LoggerFactory
 
 import java.util.function.BiConsumer
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.util.{Failure, Success, Try}
@@ -1127,10 +1130,10 @@ class HoodieSparkSqlWriterInternal {
       || 
!mergedParams.contains(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key())
       || 
!mergedParams.contains(DataSourceWriteOptions.RECORD_MERGE_STRATEGY_ID.key())) {
       val inferredMergeConfigs = HoodieTableConfig.inferCorrectMergingBehavior(
-        
RecordMergeMode.getValue(mergedParams.getOrElse(DataSourceWriteOptions.RECORD_MERGE_MODE.key(),
-          DataSourceWriteOptions.RECORD_MERGE_MODE.defaultValue())),
+        
RecordMergeMode.getValue(mergedParams.getOrElse(DataSourceWriteOptions.RECORD_MERGE_MODE.key(),
 null)),
         
mergedParams.getOrElse(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key(), ""),
-        
mergedParams.getOrElse(DataSourceWriteOptions.RECORD_MERGE_STRATEGY_ID.key(), 
""))
+        
mergedParams.getOrElse(DataSourceWriteOptions.RECORD_MERGE_STRATEGY_ID.key(), 
""),
+        optParams.getOrElse(PRECOMBINE_FIELD.key(), null))
       mergedParams.put(DataSourceWriteOptions.RECORD_MERGE_MODE.key(), 
inferredMergeConfigs.getLeft.name())
       mergedParams.put(HoodieTableConfig.RECORD_MERGE_MODE.key(), 
inferredMergeConfigs.getLeft.name())
       mergedParams.put(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key(), 
inferredMergeConfigs.getMiddle)
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index a6008cbf523..7de0617b23b 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -24,9 +24,9 @@ import org.apache.hudi.HoodieConversionUtils.toProperties
 import org.apache.hudi.common.config.{DFSPropertiesConfiguration, 
HoodieCommonConfig, RecordMergeMode, TypedProperties}
 import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, 
HoodieRecordMerger, WriteOperationType}
 import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.table.HoodieTableConfig.DATABASE_NAME
 import org.apache.hudi.common.util.{ReflectionUtils, StringUtils}
 import org.apache.hudi.config.{HoodieIndexConfig, HoodieInternalConfig, 
HoodieWriteConfig}
-import org.apache.hudi.common.table.HoodieTableConfig.DATABASE_NAME
 import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
 import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncConfigHolder, 
MultiPartKeysValueExtractor}
 import org.apache.hudi.hive.ddl.HiveSyncMode
@@ -259,10 +259,10 @@ trait ProvidesHoodieConfig extends Logging {
 
     val deducedPayloadClassName = 
classOf[DefaultHoodieRecordPayload].getCanonicalName
     val recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING.name
-    val recordMergeStrategy = HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID
+    val recordMergeStrategy = 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID
 
     if 
(tableConfig.getPayloadClass.equals(classOf[DefaultHoodieRecordPayload].getCanonicalName)
 &&
-        
tableConfig.getRecordMergeMode.equals(RecordMergeMode.EVENT_TIME_ORDERING)) {
+      
RecordMergeMode.EVENT_TIME_ORDERING.equals(tableConfig.getRecordMergeMode)) {
       tableConfig.clearValue(HoodieTableConfig.PAYLOAD_CLASS_NAME)
       tableConfig.clearValue(HoodieTableConfig.RECORD_MERGE_MODE)
       tableConfig.clearValue(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 240bf1c8a36..7ba05a9948c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -409,8 +409,9 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
 
     assertTrue(metadataWriter.isPresent());
     Triple<RecordMergeMode, String, String> inferredMergeConfs =
-        
HoodieTableConfig.inferCorrectMergingBehavior(writeConfig.getRecordMergeMode(), 
writeConfig.getPayloadClass(),
-            writeConfig.getRecordMergeStrategyId());
+        HoodieTableConfig.inferCorrectMergingBehavior(
+            writeConfig.getRecordMergeMode(), writeConfig.getPayloadClass(),
+            writeConfig.getRecordMergeStrategyId(), 
writeConfig.getPreCombineField());
     HoodieTableConfig hoodieTableConfig =
         new HoodieTableConfig(this.storage, metaClient.getMetaPath(), 
inferredMergeConfs.getLeft(), inferredMergeConfs.getMiddle(), 
inferredMergeConfs.getRight());
     assertFalse(hoodieTableConfig.getMetadataPartitions().isEmpty());
@@ -430,8 +431,9 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     assertFalse(metadataWriter2.isPresent());
 
     Triple<RecordMergeMode, String, String> inferredMergeConfs2 =
-        
HoodieTableConfig.inferCorrectMergingBehavior(writeConfig2.getRecordMergeMode(),
 writeConfig2.getPayloadClass(),
-            writeConfig2.getRecordMergeStrategyId());
+        HoodieTableConfig.inferCorrectMergingBehavior(
+            writeConfig2.getRecordMergeMode(), writeConfig2.getPayloadClass(),
+            writeConfig2.getRecordMergeStrategyId(), 
writeConfig2.getPreCombineField());
     HoodieTableConfig hoodieTableConfig2 =
         new HoodieTableConfig(this.storage, metaClient.getMetaPath(),
             inferredMergeConfs2.getLeft(),
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index b020598aa24..c4db65a1bba 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -30,10 +30,11 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.keygen.{ComplexKeyGenerator, 
NonpartitionedKeyGenerator, SimpleKeyGenerator}
 import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestUtils}
 import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
-import org.apache.avro.Schema
-import org.apache.commons.io.FileUtils
 import org.apache.hudi.DataSourceWriteOptions.{DROP_INSERT_DUP_POLICY, 
FAIL_INSERT_DUP_POLICY, INSERT_DROP_DUPS, INSERT_DUP_POLICY}
 import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig}
+
+import org.apache.avro.Schema
+import org.apache.commons.io.FileUtils
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
 import org.apache.spark.sql.functions.{expr, lit}
@@ -49,6 +50,7 @@ import org.scalatest.Matchers.{be, convertToAnyShouldWrapper, 
intercept}
 
 import java.time.Instant
 import java.util.{Collections, Date, UUID}
+
 import scala.collection.JavaConverters._
 
 /**
@@ -612,7 +614,7 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
       
.setBaseFileFormat(fooTableParams.getOrElse(HoodieWriteConfig.BASE_FILE_FORMAT.key,
         HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().name))
       
.setArchiveLogFolder(HoodieTableConfig.TIMELINE_HISTORY_PATH.defaultValue())
-      
.setPreCombineField(fooTableParams.getOrElse(DataSourceWriteOptions.PRECOMBINE_FIELD.key,
 DataSourceWriteOptions.PRECOMBINE_FIELD.defaultValue()))
+      
.setPreCombineField(fooTableParams.getOrElse(DataSourceWriteOptions.PRECOMBINE_FIELD.key,
 null))
       
.setPartitionFields(fooTableParams(DataSourceWriteOptions.PARTITIONPATH_FIELD.key))
       
.setKeyGeneratorClassProp(fooTableParams.getOrElse(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key,
         DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.defaultValue()))
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieOptionConfig.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieOptionConfig.scala
index 1f1e99b14ad..c11f5cc5140 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieOptionConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieOptionConfig.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hudi.common
 
 import org.apache.hudi.common.model.{HoodieRecordMerger, 
OverwriteWithLatestAvroPayload}
 import org.apache.hudi.common.table.HoodieTableConfig
-import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
 
 import org.apache.spark.sql.hudi.HoodieOptionConfig
@@ -42,8 +41,7 @@ class TestHoodieOptionConfig extends 
SparkClientFunctionalTestHarness {
       "preCombineField" -> "timestamp",
       "type" -> "mor",
       "payloadClass" -> classOf[OverwriteWithLatestAvroPayload].getName,
-      "recordMergeStrategyId" -> 
HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID,
-      "recordMergeMode" -> HoodieWriteConfig.RECORD_MERGE_MODE.defaultValue
+      "recordMergeStrategyId" -> 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID
     )
     val with2 = HoodieOptionConfig.withDefaultSqlOptions(ops2)
     assertTrue(ops2 == with2)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala
index 620746abe4d..e02a9b30c63 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.hudi.common
 
 import org.apache.hudi.DataSourceReadOptions._
-import org.apache.hudi.common.config.DFSPropertiesConfiguration
+import org.apache.hudi.common.config.{DFSPropertiesConfiguration, 
RecordMergeMode}
 import org.apache.hudi.common.model.HoodieTableType
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.testutils.HoodieTestUtils
@@ -82,7 +82,7 @@ class TestSqlConf extends HoodieSparkSqlTestBase with 
BeforeAndAfter {
       assertResult(true)(Files.exists(Paths.get(s"$tablePath/$partitionVal")))
       assertResult(HoodieTableType.MERGE_ON_READ)(new HoodieTableConfig(
         HoodieStorageUtils.getStorage(tablePath, 
HoodieTestUtils.getDefaultStorageConf),
-        new StoragePath(tablePath, HoodieTableMetaClient.METAFOLDER_NAME), 
HoodieTableConfig.RECORD_MERGE_MODE.defaultValue(), null, null).getTableType)
+        new StoragePath(tablePath, HoodieTableMetaClient.METAFOLDER_NAME), 
RecordMergeMode.COMMIT_TIME_ORDERING, null, null).getTableType)
 
       // Manually pass incremental configs to global configs to make sure Hudi 
query is able to load the
       // global configs
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/maxwell/MaxwellJsonKafkaSourcePostProcessor.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/maxwell/MaxwellJsonKafkaSourcePostProcessor.java
index c4699db3bed..50fd71c949f 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/maxwell/MaxwellJsonKafkaSourcePostProcessor.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/maxwell/MaxwellJsonKafkaSourcePostProcessor.java
@@ -124,8 +124,7 @@ public class MaxwellJsonKafkaSourcePostProcessor extends 
JsonKafkaSourcePostProc
 
     // we can update the `update_time`(delete time) only when it is in 
timestamp format.
     if (!preCombineFieldType.equals(NON_TIMESTAMP)) {
-      String preCombineField = 
this.props.getString(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(),
-          HoodieWriteConfig.PRECOMBINE_FIELD_NAME.defaultValue());
+      String preCombineField = 
this.props.getString(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), null);
 
       // ts from maxwell
       long ts = inputJson.get(TS).longValue();
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BootstrapExecutor.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BootstrapExecutor.java
index cc261545554..2493451ec1a 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BootstrapExecutor.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BootstrapExecutor.java
@@ -207,8 +207,7 @@ public class BootstrapExecutor implements Serializable {
         .setTableType(cfg.tableType)
         .setTableName(cfg.targetTableName)
         .setRecordKeyFields(props.getString(RECORDKEY_FIELD_NAME.key()))
-        .setPreCombineField(props.getString(
-            PRECOMBINE_FIELD_NAME.key(), PRECOMBINE_FIELD_NAME.defaultValue()))
+        .setPreCombineField(props.getString(PRECOMBINE_FIELD_NAME.key(), null))
         .setTableVersion(ConfigUtils.getIntWithAltKeys(props, 
WRITE_TABLE_VERSION))
         .setPopulateMetaFields(props.getBoolean(
             POPULATE_META_FIELDS.key(), POPULATE_META_FIELDS.defaultValue()))
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
index f784a7dba6d..074ad8743b5 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
@@ -153,7 +153,8 @@ public class HoodieStreamer implements Serializable {
   public HoodieStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, 
Configuration conf,
                         Option<TypedProperties> propsOverride, 
Option<SourceProfileSupplier> sourceProfileSupplier) throws IOException {
     Triple<RecordMergeMode, String, String> mergingConfigs =
-        HoodieTableConfig.inferCorrectMergingBehavior(cfg.recordMergeMode, 
cfg.payloadClassName, cfg.recordMergeStrategyId);
+        HoodieTableConfig.inferCorrectMergingBehavior(
+            cfg.recordMergeMode, cfg.payloadClassName, 
cfg.recordMergeStrategyId, cfg.sourceOrderingField);
     cfg.recordMergeMode = mergingConfigs.getLeft();
     cfg.payloadClassName = mergingConfigs.getMiddle();
     cfg.recordMergeStrategyId = mergingConfigs.getRight();
@@ -267,8 +268,8 @@ public class HoodieStreamer implements Serializable {
     public String sourceClassName = JsonDFSSource.class.getName();
 
     @Parameter(names = {"--source-ordering-field"}, description = "Field 
within source record to decide how"
-        + " to break ties between records with same key in input data. 
Default: 'ts' holding unix timestamp of record")
-    public String sourceOrderingField = "ts";
+        + " to break ties between records with same key in input data.")
+    public String sourceOrderingField = null;
 
     @Parameter(names = {"--payload-class"}, description = "Deprecated. "
         + "Use --merge-mode for commit time or event time merging. "
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
index ff726d7db23..2d9d2cdc9c7 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
@@ -83,7 +83,9 @@ public class HoodieStreamerUtils {
     boolean useConsistentLogicalTimestamp = ConfigUtils.getBooleanWithAltKeys(
         props, 
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED);
     Set<String> partitionColumns = getPartitionColumns(props);
-    String payloadClassName = StringUtils.isNullOrEmpty(cfg.payloadClassName) 
? HoodieRecordPayload.getAvroPayloadForMergeMode(cfg.recordMergeMode) : 
cfg.payloadClassName;
+    String payloadClassName = StringUtils.isNullOrEmpty(cfg.payloadClassName)
+        ? HoodieRecordPayload.getAvroPayloadForMergeMode(cfg.recordMergeMode, 
cfg.payloadClassName)
+        : cfg.payloadClassName;
     return avroRDDOptional.map(avroRDD -> {
       SerializableSchema avroSchema = new 
SerializableSchema(schemaProvider.getTargetSchema());
       SerializableSchema processedAvroSchema = new 
SerializableSchema(isDropPartitionColumns(props) ? 
HoodieAvroUtils.removeMetadataFields(avroSchema.get()) : avroSchema.get());
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index 7a9372c5ac7..989be986b40 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -622,7 +622,8 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
       }
       cfg.allowCommitOnNoCheckpointChange = allowCommitOnNoCheckpointChange;
       Triple<RecordMergeMode, String, String> mergeCfgs =
-          HoodieTableConfig.inferCorrectMergingBehavior(cfg.recordMergeMode, 
cfg.payloadClassName, cfg.recordMergeStrategyId);
+          HoodieTableConfig.inferCorrectMergingBehavior(
+              cfg.recordMergeMode, cfg.payloadClassName, 
cfg.recordMergeStrategyId, cfg.sourceOrderingField);
       cfg.recordMergeMode = mergeCfgs.getLeft();
       cfg.payloadClassName = mergeCfgs.getMiddle();
       cfg.recordMergeStrategyId = mergeCfgs.getRight();
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 55c03716d53..cf504ef5fd6 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -192,7 +192,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
       opts.put(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
DefaultSparkRecordMerger.class.getName());
       opts.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet");
       opts.put(HoodieWriteConfig.RECORD_MERGE_MODE.key(), 
RecordMergeMode.CUSTOM.name());
-      opts.put(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key(), 
HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID);
+      opts.put(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key(), 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID);
       for (Map.Entry<String, String> entry : opts.entrySet()) {
         hoodieConfig.add(String.format("%s=%s", entry.getKey(), 
entry.getValue()));
       }
@@ -561,7 +561,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         PROPS_FILENAME_TEST_SOURCE, false, true, false, null, tableType);
     addRecordMerger(recordType, cfg.configs);
     cfg.payloadClassName = DefaultHoodieRecordPayload.class.getName();
-    cfg.recordMergeStrategyId = HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID;
+    cfg.recordMergeStrategyId = 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
     cfg.recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING;
     cfg.configs.add("hoodie.streamer.schemaprovider.source.schema.file=" + 
basePath + "/source.avsc");
     cfg.configs.add("hoodie.streamer.schemaprovider.target.schema.file=" + 
basePath + "/source.avsc");

Reply via email to