This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new db8eb8c16ac [HUDI-8942] Validate merge mode based on payload class and 
merger strategy only for table versions 8 or higher (#12743)
db8eb8c16ac is described below

commit db8eb8c16ac2cf62e7bf0a78dd8a00ea0ebe6030
Author: Sagar Sumit <[email protected]>
AuthorDate: Fri Jan 31 08:34:21 2025 +0530

    [HUDI-8942] Validate merge mode based on payload class and merger strategy 
only for table versions 8 or higher (#12743)
    
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../upgrade/EightToSevenDowngradeHandler.java      |   3 +-
 .../hudi/common/table/HoodieTableConfig.java       |  12 +-
 .../hudi/common/table/HoodieTableMetaClient.java   |  14 +-
 .../common/table/read/HoodieFileGroupReader.java   |   2 +-
 .../hudi/common/table/TestHoodieTableConfig.java   | 168 ++++++++++++---------
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  14 +-
 .../org/apache/hudi/util/SparkConfigUtils.scala    |   2 +-
 .../functional/TestHoodieBackedMetadata.java       |   6 +-
 .../hudi/utilities/streamer/HoodieStreamer.java    |   4 +-
 .../deltastreamer/HoodieDeltaStreamerTestBase.java |   3 +-
 10 files changed, 135 insertions(+), 93 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
index 4980fcec4f7..52bbc830238 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
@@ -168,7 +168,8 @@ public class EightToSevenDowngradeHandler implements 
DowngradeHandler {
     Triple<RecordMergeMode, String, String> mergingConfigs =
         HoodieTableConfig.inferCorrectMergingBehavior(
             tableConfig.getRecordMergeMode(), tableConfig.getPayloadClass(),
-            tableConfig.getRecordMergeStrategyId(), 
tableConfig.getPreCombineField());
+            tableConfig.getRecordMergeStrategyId(), 
tableConfig.getPreCombineField(),
+            tableConfig.getTableVersion());
     if (StringUtils.nonEmpty(mergingConfigs.getMiddle())) {
       tablePropsToAdd.put(HoodieTableConfig.PAYLOAD_CLASS_NAME, 
mergingConfigs.getMiddle());
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 1b85a1707ae..c7f948e5d24 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -760,7 +760,8 @@ public class HoodieTableConfig extends HoodieConfig {
   public static Triple<RecordMergeMode, String, String> 
inferCorrectMergingBehavior(RecordMergeMode recordMergeMode,
                                                                                
     String payloadClassName,
                                                                                
     String recordMergeStrategyId,
-                                                                               
     String orderingFieldName) {
+                                                                               
     String orderingFieldName,
+                                                                               
     HoodieTableVersion tableVersion) {
     RecordMergeMode inferredRecordMergeMode;
     String inferredPayloadClassName;
     String inferredRecordMergeStrategyId;
@@ -781,13 +782,18 @@ public class HoodieTableConfig extends HoodieConfig {
               + "strategy ID (%s).", payloadClassName, recordMergeStrategyId));
       // TODO(HUDI-8925): once payload class name is not required, remove the 
check on
       //  modeBasedOnStrategyId
-      if (modeBasedOnStrategyId != CUSTOM && modeBasedOnPayload != null && 
modeBasedOnStrategyId != null) {
+      if (tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)
+          && modeBasedOnStrategyId != CUSTOM && modeBasedOnPayload != null && 
modeBasedOnStrategyId != null) {
         checkArgument(modeBasedOnPayload.equals(modeBasedOnStrategyId),
             String.format("Configured payload class (%s) and record merge 
strategy ID (%s) conflict "
                     + "with each other. Please only set one of them in the 
write config.",
                 payloadClassName, recordMergeStrategyId));
       }
-      inferredRecordMergeMode = modeBasedOnStrategyId != null ? 
modeBasedOnStrategyId : modeBasedOnPayload;
+      if (tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
+        inferredRecordMergeMode = modeBasedOnStrategyId != null ? 
modeBasedOnStrategyId : modeBasedOnPayload;
+      } else {
+        inferredRecordMergeMode = modeBasedOnPayload != null ? 
modeBasedOnPayload : modeBasedOnStrategyId;
+      }
     }
     if (recordMergeMode != null) {
       checkArgument(inferredRecordMergeMode == recordMergeMode,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 25ad815f600..b43c92cb385 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -1384,17 +1384,17 @@ public class HoodieTableMetaClient implements 
Serializable {
       tableConfig.setValue(HoodieTableConfig.NAME, tableName);
       tableConfig.setValue(HoodieTableConfig.TYPE, tableType.name());
 
-      if (null != tableVersion) {
-        tableConfig.setTableVersion(tableVersion);
-        tableConfig.setInitialVersion(tableVersion);
-      } else {
-        tableConfig.setTableVersion(HoodieTableVersion.current());
-        tableConfig.setInitialVersion(HoodieTableVersion.current());
+      if (null == tableVersion) {
+        tableVersion = HoodieTableVersion.current();
       }
 
+      tableConfig.setTableVersion(tableVersion);
+      tableConfig.setInitialVersion(tableVersion);
+
       Triple<RecordMergeMode, String, String> mergeConfigs =
           HoodieTableConfig.inferCorrectMergingBehavior(
-              recordMergeMode, payloadClassName, recordMergerStrategyId, 
preCombineField);
+              recordMergeMode, payloadClassName, recordMergerStrategyId, 
preCombineField,
+              tableVersion);
       tableConfig.setValue(RECORD_MERGE_MODE, mergeConfigs.getLeft().name());
       tableConfig.setValue(PAYLOAD_CLASS_NAME.key(), mergeConfigs.getMiddle());
       tableConfig.setValue(RECORD_MERGE_STRATEGY_ID, mergeConfigs.getRight());
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index e77bded8a77..3d6c23182c7 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -109,7 +109,7 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
     if 
(!tableConfig.getTableVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
       Triple<RecordMergeMode, String, String> triple = 
HoodieTableConfig.inferCorrectMergingBehavior(
           recordMergeMode, tableConfig.getPayloadClass(),
-          mergeStrategyId, null);
+          mergeStrategyId, null, tableConfig.getTableVersion());
       recordMergeMode = triple.getLeft();
       mergeStrategyId = triple.getRight();
     }
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
index 3dacf596258..fbc778150cb 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
@@ -60,8 +60,10 @@ import static 
org.apache.hudi.common.config.RecordMergeMode.EVENT_TIME_ORDERING;
 import static 
org.apache.hudi.common.model.HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID;
 import static 
org.apache.hudi.common.model.HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
 import static 
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.getRecordMergeStrategyId;
 import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
 import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_CHECKSUM;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.inferRecordMergeModeFromPayloadClass;
 import static org.apache.hudi.common.util.ConfigUtils.recoverIfNeeded;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -305,133 +307,136 @@ public class TestHoodieTableConfig extends 
HoodieCommonTestHarness {
     Stream<Arguments> arguments = Stream.of(
         //test empty args with both null and ""
         arguments(null, null, null, null,
-            false, COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+            "false", COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
         arguments(null, null, null, "",
-            false, COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+            "false", COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
         arguments(null, null, null, orderingFieldName,
-            false, EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+            "false", EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
         arguments(null, "", "", null,
-            false, COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+            "false", COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
         arguments(null, "", "", orderingFieldName,
-            false, EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+            "false", EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
 
         //test legal event time ordering combos
         arguments(EVENT_TIME_ORDERING, null, null, null,
-            false, EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+            "false", EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
         arguments(EVENT_TIME_ORDERING, null, null, orderingFieldName,
-            false, EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+            "false", EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
         arguments(EVENT_TIME_ORDERING, defaultPayload, null, orderingFieldName,
-            false, EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+            "false", EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
         arguments(EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, orderingFieldName,
-            false, EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+            "false", EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
         arguments(EVENT_TIME_ORDERING, null, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, orderingFieldName,
-            false, EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+            "false", EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
         arguments(null, defaultPayload, null, null,
-            false, EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+            "false", EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
         arguments(null, defaultPayload, null, orderingFieldName,
-            false, EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+            "false", EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
         arguments(null, defaultPayload, EVENT_TIME_BASED_MERGE_STRATEGY_UUID, 
orderingFieldName,
-            false, EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+            "false", EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
         arguments(null, null, EVENT_TIME_BASED_MERGE_STRATEGY_UUID, null,
-            false, EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+            "false", EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
         arguments(null, null, EVENT_TIME_BASED_MERGE_STRATEGY_UUID, 
orderingFieldName,
-            false, EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+            "false", EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
 
         //test legal commit time ordering combos
         arguments(COMMIT_TIME_ORDERING, null, null, null,
-            false, COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+            "false", COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
         arguments(COMMIT_TIME_ORDERING, null, null, "",
-            false, COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+            "false", COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
         arguments(COMMIT_TIME_ORDERING, null, null, orderingFieldName,
-            false, COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+            "false", COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
         arguments(COMMIT_TIME_ORDERING, overwritePayload, null, null,
-            false, COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+            "false", COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
         arguments(COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, null,
-            false, COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+            "false", COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
         arguments(COMMIT_TIME_ORDERING, null, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, null,
-            false, COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+            "false", COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
         arguments(null, overwritePayload, null, null,
-            false, COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+            "false", COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
         arguments(null, overwritePayload, null, "",
-            false, COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+            "false", COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
         arguments(null, overwritePayload, null, orderingFieldName,
-            false, COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+            "false", COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
         arguments(null, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, null,
-            false, COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+            "false", COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
         arguments(null, null, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, null,
-            false, COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+            "false", COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
         arguments(null, null, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, 
orderingFieldName,
-            false, COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+            "false", COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
 
         //test legal custom merge mode combos
         arguments(CUSTOM, customPayload, null, null,
-            false, CUSTOM, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID),
+            "false", CUSTOM, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID),
         arguments(CUSTOM, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID, 
null,
-            false, CUSTOM, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID),
+            "false", CUSTOM, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID),
         arguments(null, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID, null,
-            false, CUSTOM, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID),
+            "false", CUSTOM, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID),
         arguments(null, customPayload, null, null,
-            false, CUSTOM, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID),
+            "false", CUSTOM, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID),
         arguments(CUSTOM, null, customStrategy, null,
-            false, CUSTOM, defaultPayload, customStrategy),
+            "false", CUSTOM, defaultPayload, customStrategy),
         arguments(CUSTOM, customPayload, customStrategy, null,
-            false, CUSTOM, customPayload, customStrategy),
+            "false", CUSTOM, customPayload, customStrategy),
 
         //test legal configs that work but should not be used usually
         arguments(CUSTOM, defaultPayload, customStrategy, null,
-            false, CUSTOM, defaultPayload, customStrategy),
+            "six-only", CUSTOM, defaultPayload, customStrategy),
         arguments(CUSTOM, defaultPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID, 
null,
-            false, CUSTOM, defaultPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID),
+            "six-only", CUSTOM, defaultPayload, 
PAYLOAD_BASED_MERGE_STRATEGY_UUID),
         arguments(CUSTOM, overwritePayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID, 
null,
-            false, CUSTOM, overwritePayload, 
PAYLOAD_BASED_MERGE_STRATEGY_UUID),
+            "six-only", CUSTOM, overwritePayload, 
PAYLOAD_BASED_MERGE_STRATEGY_UUID),
         arguments(null, defaultPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID, 
null,
-            false, CUSTOM, defaultPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID),
+            "false", null, defaultPayload, null),
         arguments(null, overwritePayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID, 
null,
-            false, CUSTOM, overwritePayload, 
PAYLOAD_BASED_MERGE_STRATEGY_UUID),
+            "false", null, overwritePayload, null),
 
         //test illegal combos due to missing info
         arguments(CUSTOM, null, null, null,
-            true, null, null, null),
+            "true", null, null, null),
         arguments(CUSTOM, null, PAYLOAD_BASED_MERGE_STRATEGY_UUID, null,
-            true, null, null, null),
+            "true", null, null, null),
 
         //test illegal combos
         arguments(EVENT_TIME_ORDERING, overwritePayload, null, 
orderingFieldName,
-            true, null, null, null),
+            "true", null, null, null),
         arguments(EVENT_TIME_ORDERING, customPayload, null, orderingFieldName,
-            true, null, null, null),
+            "true", null, null, null),
         arguments(EVENT_TIME_ORDERING, null, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, orderingFieldName,
-            true, null, null, null),
+            "true", null, null, null),
         arguments(EVENT_TIME_ORDERING, null, customStrategy, orderingFieldName,
-            true, null, null, null),
+            "true", null, null, null),
         arguments(EVENT_TIME_ORDERING, null, 
PAYLOAD_BASED_MERGE_STRATEGY_UUID, orderingFieldName,
-            true, null, null, null),
+            "true", null, null, null),
         arguments(COMMIT_TIME_ORDERING, defaultPayload, null, null,
-            true, null, null, null),
+            "true", null, null, null),
         arguments(COMMIT_TIME_ORDERING, customPayload, null, null,
-            true, null, null, null),
+            "true", null, null, null),
         arguments(COMMIT_TIME_ORDERING, null, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, null,
-            true, null, null, null),
+            "true", null, null, null),
         arguments(COMMIT_TIME_ORDERING, null, customStrategy, null,
-            true, null, null, null),
+            "true", null, null, null),
         arguments(COMMIT_TIME_ORDERING, null, 
PAYLOAD_BASED_MERGE_STRATEGY_UUID, null,
-            true, null, null, null),
+            "true", null, null, null),
         arguments(CUSTOM, defaultPayload, null, null,
-            true, null, null, null),
+            "true", null, null, null),
         arguments(CUSTOM, overwritePayload, null, null,
-            true, null, null, null),
+            "true", null, null, null),
         arguments(CUSTOM, null, EVENT_TIME_BASED_MERGE_STRATEGY_UUID, null,
-            true, null, null, null),
+            "true", null, null, null),
         arguments(CUSTOM, null, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, null,
-            true, null, null, null),
+            "true", null, null, null),
         arguments(CUSTOM, defaultPayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, null,
-            true, null, null, null),
+            "true", null, null, null),
         arguments(CUSTOM, overwritePayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, null,
-            true, null, null, null),
+            "true", null, null, null),
+
+        // dimensions that should pass validation on table version 6, not 
table version 8
         arguments(null, defaultPayload, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, 
null,
-            true, null, null, null),
+            "eight-only", EVENT_TIME_ORDERING, defaultPayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
         arguments(null, overwritePayload, 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, null,
-            true, null, null, null));
+            "eight-only", COMMIT_TIME_ORDERING, overwritePayload, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID)
+    );
     return arguments;
   }
 
@@ -439,19 +444,38 @@ public class TestHoodieTableConfig extends 
HoodieCommonTestHarness {
   @MethodSource("argumentsForInferringRecordMergeMode")
   public void testInferMergeMode(RecordMergeMode inputMergeMode, String 
inputPayloadClass,
                                  String inputMergeStrategy, String 
orderingFieldName,
-                                 boolean shouldThrow, RecordMergeMode 
outputMergeMode,
-                                 String outputPayloadClass, String 
outputMergeStrategy) {
-    if (shouldThrow) {
-      assertThrows(IllegalArgumentException.class,
-          () -> HoodieTableConfig.inferCorrectMergingBehavior(
-              inputMergeMode, inputPayloadClass, inputMergeStrategy, 
orderingFieldName));
-    } else {
-      Triple<RecordMergeMode, String, String> inferredConfigs =
-          HoodieTableConfig.inferCorrectMergingBehavior(
-              inputMergeMode, inputPayloadClass, inputMergeStrategy, 
orderingFieldName);
-      assertEquals(outputMergeMode, inferredConfigs.getLeft());
-      assertEquals(outputPayloadClass, inferredConfigs.getMiddle());
-      assertEquals(outputMergeStrategy, inferredConfigs.getRight());
-    }
+                                 String shouldThrowString, RecordMergeMode 
outputMergeMode,
+                                 String outputPayloadClass, String 
outputMergeStrategy) throws IOException {
+    Arrays.stream(new HoodieTableVersion[] {HoodieTableVersion.EIGHT, 
HoodieTableVersion.SIX})
+        .forEach(tableVersion -> {
+          boolean shouldThrow = "eight-only".equals(shouldThrowString)
+              ? tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)
+              : "six-only".equals(shouldThrowString)
+              ? !tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)
+              : Boolean.parseBoolean(shouldThrowString);
+          RecordMergeMode expectedMergeMode = outputMergeMode;
+          String expectedMergeStrategy = outputMergeStrategy;
+          if (!shouldThrow && outputMergeMode == null) {
+            expectedMergeMode = 
tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)
+                ? CUSTOM : 
inferRecordMergeModeFromPayloadClass(outputPayloadClass);
+            expectedMergeStrategy = 
tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)
+                ? PAYLOAD_BASED_MERGE_STRATEGY_UUID
+                : getRecordMergeStrategyId(expectedMergeMode, 
outputPayloadClass, null);
+          }
+          if (shouldThrow) {
+            assertThrows(IllegalArgumentException.class,
+                () -> HoodieTableConfig.inferCorrectMergingBehavior(
+                    inputMergeMode, inputPayloadClass, inputMergeStrategy, 
orderingFieldName,
+                    tableVersion));
+          } else {
+            Triple<RecordMergeMode, String, String> inferredConfigs =
+                HoodieTableConfig.inferCorrectMergingBehavior(
+                    inputMergeMode, inputPayloadClass, inputMergeStrategy, 
orderingFieldName,
+                    tableVersion);
+            assertEquals(expectedMergeMode, inferredConfigs.getLeft());
+            assertEquals(outputPayloadClass, inferredConfigs.getMiddle());
+            assertEquals(expectedMergeStrategy, inferredConfigs.getRight());
+          }
+        });
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 4a64319752f..31d87b8f6c7 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -37,7 +37,7 @@ import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model._
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
 import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, 
MERGE_ON_READ}
-import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
HoodieTableVersion, TableSchemaResolver}
 import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType
 import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator
 import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option => 
HOption}
@@ -45,7 +45,6 @@ import 
org.apache.hudi.common.util.ConfigUtils.getAllConfigKeys
 import org.apache.hudi.config.{HoodieCompactionConfig, HoodieInternalConfig, 
HoodieWriteConfig}
 import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, 
INDEX_CLASS_NAME}
 import 
org.apache.hudi.config.HoodieWriteConfig.{SPARK_SQL_MERGE_INTO_PREPPED_KEY, 
WRITE_TABLE_VERSION}
-import org.apache.hudi.config.{HoodieCompactionConfig, HoodieInternalConfig, 
HoodieWriteConfig}
 import org.apache.hudi.exception.{HoodieException, 
HoodieRecordCreationException, HoodieWriteConflictException}
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool}
@@ -61,8 +60,8 @@ import org.apache.hudi.storage.HoodieStorage
 import org.apache.hudi.sync.common.HoodieSyncConfig
 import org.apache.hudi.sync.common.util.SyncUtilHelpers
 import 
org.apache.hudi.sync.common.util.SyncUtilHelpers.getHoodieMetaSyncException
+import org.apache.hudi.util.{SparkConfigUtils, SparkKeyGenUtils}
 import org.apache.hudi.util.SparkConfigUtils.getStringWithAltKeys
-import org.apache.hudi.util.SparkKeyGenUtils
 
 import org.apache.avro.Schema
 import org.apache.avro.generic.GenericData
@@ -1126,6 +1125,12 @@ class HoodieSparkSqlWriterInternal {
       mergedParams.put(HoodieTableConfig.DROP_PARTITION_COLUMNS.key, "false")
     }
 
+    val tableVersion = if (tableConfig != null) {
+      tableConfig.getTableVersion
+    } else {
+      HoodieTableVersion.fromVersionCode(
+        SparkConfigUtils.getStringWithAltKeys(mergedParams, 
WRITE_TABLE_VERSION).toInt)
+    }
     if (!mergedParams.contains(DataSourceWriteOptions.RECORD_MERGE_MODE.key())
       || 
!mergedParams.contains(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key())
       || 
!mergedParams.contains(DataSourceWriteOptions.RECORD_MERGE_STRATEGY_ID.key())) {
@@ -1133,7 +1138,8 @@ class HoodieSparkSqlWriterInternal {
         
RecordMergeMode.getValue(mergedParams.getOrElse(DataSourceWriteOptions.RECORD_MERGE_MODE.key(),
 null)),
         
mergedParams.getOrElse(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key(), ""),
         
mergedParams.getOrElse(DataSourceWriteOptions.RECORD_MERGE_STRATEGY_ID.key(), 
""),
-        optParams.getOrElse(PRECOMBINE_FIELD.key(), null))
+        optParams.getOrElse(PRECOMBINE_FIELD.key(), null),
+        tableVersion)
       mergedParams.put(DataSourceWriteOptions.RECORD_MERGE_MODE.key(), 
inferredMergeConfigs.getLeft.name())
       mergedParams.put(HoodieTableConfig.RECORD_MERGE_MODE.key(), 
inferredMergeConfigs.getLeft.name())
       mergedParams.put(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key(), 
inferredMergeConfigs.getMiddle)
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/SparkConfigUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/SparkConfigUtils.scala
index 107a3968c82..a795ec4caf8 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/SparkConfigUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/SparkConfigUtils.scala
@@ -32,7 +32,7 @@ object SparkConfigUtils {
    * @return String value if the config exists; default String value if the 
config does not exist
    *         and there is default value defined in the {@link ConfigProperty} 
config; {@code null} otherwise.
    */
-  def getStringWithAltKeys[T](props: Map[String, String], configProperty: 
ConfigProperty[T]): String = {
+  def getStringWithAltKeys[T](props: scala.collection.Map[String, String], 
configProperty: ConfigProperty[T]): String = {
     ConfigUtils.getStringWithAltKeys(JFunction.toJavaFunction[String, 
Object](key => props.getOrElse(key, null)), configProperty)
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index aac384b8192..73e104db8d4 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -411,7 +411,8 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     Triple<RecordMergeMode, String, String> inferredMergeConfs =
         HoodieTableConfig.inferCorrectMergingBehavior(
             writeConfig.getRecordMergeMode(), writeConfig.getPayloadClass(),
-            writeConfig.getRecordMergeStrategyId(), 
writeConfig.getPreCombineField());
+            writeConfig.getRecordMergeStrategyId(), 
writeConfig.getPreCombineField(),
+            metaClient.getTableConfig().getTableVersion());
     HoodieTableConfig hoodieTableConfig =
         new HoodieTableConfig(this.storage, metaClient.getMetaPath(), 
inferredMergeConfs.getLeft(), inferredMergeConfs.getMiddle(), 
inferredMergeConfs.getRight());
     assertFalse(hoodieTableConfig.getMetadataPartitions().isEmpty());
@@ -433,7 +434,8 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     Triple<RecordMergeMode, String, String> inferredMergeConfs2 =
         HoodieTableConfig.inferCorrectMergingBehavior(
             writeConfig2.getRecordMergeMode(), writeConfig2.getPayloadClass(),
-            writeConfig2.getRecordMergeStrategyId(), 
writeConfig2.getPreCombineField());
+            writeConfig2.getRecordMergeStrategyId(), 
writeConfig2.getPreCombineField(),
+            metaClient.getTableConfig().getTableVersion());
     HoodieTableConfig hoodieTableConfig2 =
         new HoodieTableConfig(this.storage, metaClient.getMetaPath(),
             inferredMergeConfs2.getLeft(),
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
index cfe01dfde68..db4f6061f20 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
@@ -39,6 +39,7 @@ import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.CompactionUtils;
@@ -154,7 +155,8 @@ public class HoodieStreamer implements Serializable {
                         Option<TypedProperties> propsOverride, 
Option<SourceProfileSupplier> sourceProfileSupplier) throws IOException {
     Triple<RecordMergeMode, String, String> mergingConfigs =
         HoodieTableConfig.inferCorrectMergingBehavior(
-            cfg.recordMergeMode, cfg.payloadClassName, 
cfg.recordMergeStrategyId, cfg.sourceOrderingField);
+            cfg.recordMergeMode, cfg.payloadClassName, 
cfg.recordMergeStrategyId, cfg.sourceOrderingField,
+            HoodieTableVersion.current());
     cfg.recordMergeMode = mergingConfigs.getLeft();
     cfg.payloadClassName = mergingConfigs.getMiddle();
     cfg.recordMergeStrategyId = mergingConfigs.getRight();
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index b6ed6139547..004e29cd1fb 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -623,7 +623,8 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
       cfg.allowCommitOnNoCheckpointChange = allowCommitOnNoCheckpointChange;
       Triple<RecordMergeMode, String, String> mergeCfgs =
           HoodieTableConfig.inferCorrectMergingBehavior(
-              cfg.recordMergeMode, cfg.payloadClassName, 
cfg.recordMergeStrategyId, cfg.sourceOrderingField);
+              cfg.recordMergeMode, cfg.payloadClassName, 
cfg.recordMergeStrategyId, cfg.sourceOrderingField,
+              HoodieTableVersion.current());
       cfg.recordMergeMode = mergeCfgs.getLeft();
       cfg.payloadClassName = mergeCfgs.getMiddle();
       cfg.recordMergeStrategyId = mergeCfgs.getRight();


Reply via email to