nsivabalan commented on code in PR #13615:
URL: https://github.com/apache/hudi/pull/13615#discussion_r2232300319


##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -807,6 +843,82 @@ public String getRecordMergeStrategyId() {
     return getString(RECORD_MERGE_STRATEGY_ID);
   }
 
+  /**
+   * Handle table config creation logic when creating a table for Table 
Version 9,
+   * since it has some different logic from previous versions.
+   */
+  public static Map<String, String> 
inferMergingConfigsForVersion9(RecordMergeMode recordMergeMode,
+                                                                   String 
payloadClassName,
+                                                                   String 
recordMergeStrategyId,
+                                                                   
HoodieTableVersion tableVersion) {
+    Map<String, String> finalConfigs = new HashMap<>();
+    if (tableVersion.versionCode() != HoodieTableVersion.NINE.versionCode()) {
+      return finalConfigs;
+    }
+
+    // Properties for all tables.
+    // Since PARTIAL_UPDATE_MODE does not have default value, do not set it.
+    // For tables using MERGE MODEs (event time/commit time), or CUSTOM 
mergers.
+    if (StringUtils.isNullOrEmpty(payloadClassName)) {
+      checkIfConsistent(recordMergeMode, recordMergeStrategyId);
+      finalConfigs.put(RECORD_MERGE_MODE.key(), recordMergeMode.name());
+      finalConfigs.put(RECORD_MERGE_STRATEGY_ID.key(), recordMergeStrategyId);
+      // TODO: We need to make sure these two are consistent.
+      // NOTE: No payload class should be set.
+    } else { // For tables using custom payload classes.
+      // For tables using CUSTOM payload classes.
+      if (!PAYLOADS_UNDER_DEPRECATION.contains(payloadClassName)) {
+        finalConfigs.put(RECORD_MERGE_MODE.key(), CUSTOM.toString());
+        finalConfigs.put(PAYLOAD_CLASS_NAME.key(), payloadClassName);
+        finalConfigs.put(RECORD_MERGE_STRATEGY_ID.key(), 
PAYLOAD_BASED_MERGE_STRATEGY_UUID);
+      } else { // For tables using builtin payload classes.
+        // Merge mode.
+        // NOTE: We use LEGACY_PAYLOAD_CLASS_NAME instead of 
PAYLOAD_CLASS_NAME here.
+        if (EVENT_TIME_BASED_PAYLOADS.contains(payloadClassName)) {
+          finalConfigs.put(RECORD_MERGE_MODE.key(), 
EVENT_TIME_ORDERING.name());
+          finalConfigs.put(LEGACY_PAYLOAD_CLASS_NAME.key(), payloadClassName);
+          finalConfigs.put(RECORD_MERGE_STRATEGY_ID.key(), 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID);
+        } else {
+          finalConfigs.put(RECORD_MERGE_MODE.key(), 
COMMIT_TIME_ORDERING.name());
+          finalConfigs.put(LEGACY_PAYLOAD_CLASS_NAME.key(), payloadClassName);
+          finalConfigs.put(RECORD_MERGE_STRATEGY_ID.key(), 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID);
+        }
+        // Partial update mode.
+        if (payloadClassName.equals(PartialUpdateAvroPayload.class.getName())
+            || 
payloadClassName.equals(OverwriteNonDefaultsWithLatestAvroPayload.class.getName()))
 {
+          finalConfigs.put(PARTIAL_UPDATE_MODE.key(), 
PartialUpdateMode.IGNORE_DEFAULTS.name());
+        } else if 
(payloadClassName.equals(PostgresDebeziumAvroPayload.class.getName())) {
+          finalConfigs.put(PARTIAL_UPDATE_MODE.key(), 
PartialUpdateMode.IGNORE_MARKERS.name());
+        }
+        // Merge properties.
+        if 
(payloadClassName.equals(PostgresDebeziumAvroPayload.class.getName())) {
+          finalConfigs.put(MERGE_CUSTOM_PROPERTY_PREFIX + 
PARTIAL_UPDATE_CUSTOM_MARKER, DEBEZIUM_UNAVAILABLE_VALUE);
+        } else if (payloadClassName.equals(AWSDmsAvroPayload.class.getName())) 
{
+          finalConfigs.put(MERGE_CUSTOM_PROPERTY_PREFIX + DELETE_KEY, "Op");
+          finalConfigs.put(MERGE_CUSTOM_PROPERTY_PREFIX + DELETE_MARKER, "D");
+        }
+      }
+    }
+    return finalConfigs;
+  }
+
+  private static void checkIfConsistent(RecordMergeMode mode, String 
strategyId) {
+    boolean isConsistent;

Review Comment:
   don't we already have similar methods ?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -807,6 +843,82 @@ public String getRecordMergeStrategyId() {
     return getString(RECORD_MERGE_STRATEGY_ID);
   }
 
+  /**
+   * Handle table config creation logic when creating a table for Table 
Version 9,
+   * since it has some different logic from previous versions.
+   */
+  public static Map<String, String> 
inferMergingConfigsForVersion9(RecordMergeMode recordMergeMode,
+                                                                   String 
payloadClassName,
+                                                                   String 
recordMergeStrategyId,
+                                                                   
HoodieTableVersion tableVersion) {
+    Map<String, String> finalConfigs = new HashMap<>();

Review Comment:
   how about reconciledConfigs?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -119,6 +129,35 @@ public class HoodieTableConfig extends HoodieConfig {
   public static final String HOODIE_TABLE_NAME_KEY = "hoodie.table.name";
   public static final String PARTIAL_UPDATE_CUSTOM_MARKER = 
"hoodie.write.partial.update.custom.marker";
   public static final String DEBEZIUM_UNAVAILABLE_VALUE = 
"__debezium_unavailable_value";
+  // This prefix is used to set merging related properties.
+  // A reader might need to read some writer properties to function as 
expected,
+  // and Hudi stores properties with this prefix so the reader parses these 
properties,
+  // and produces a map of key value pairs (Key1->Value1, Key2->Value2, ...) 
to use.

Review Comment:
   mis placed java docs



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -807,6 +844,133 @@ public String getRecordMergeStrategyId() {
     return getString(RECORD_MERGE_STRATEGY_ID);
   }
 
+  /**
+   * Handle table config creation logic when creating a table for Table 
Version 9,
+   * since it has some different logic from previous versions.
+   */
+  public static Map<String, String> 
inferMergingConfigsForVersion9(RecordMergeMode recordMergeMode,
+                                                                   String 
payloadClassName,
+                                                                   String 
recordMergeStrategyId,
+                                                                   
HoodieTableVersion tableVersion) {
+    Map<String, String> finalConfigs = new HashMap<>();
+    if (tableVersion.versionCode() != HoodieTableVersion.NINE.versionCode()) {
+      return finalConfigs;
+    }
+
+    // Infer merge mode and strategy id if needed.
+    Pair<RecordMergeMode, String> mergeModeAndStrategyId =
+        inferMergeModeOrStrategyId(recordMergeMode, recordMergeStrategyId, 
payloadClassName);
+    recordMergeMode = mergeModeAndStrategyId.getLeft();
+    recordMergeStrategyId = mergeModeAndStrategyId.getRight();
+
+    // Properties for all tables.
+    // Since PARTIAL_UPDATE_MODE does not have default value, do not set it.
+    // For tables using MERGE MODEs (event time/commit time), or CUSTOM 
mergers.
+    if (StringUtils.isNullOrEmpty(payloadClassName)) {
+      finalConfigs.put(RECORD_MERGE_MODE.key(), recordMergeMode.name());
+      finalConfigs.put(RECORD_MERGE_STRATEGY_ID.key(), recordMergeStrategyId);
+      // NOTE: No payload class should be set.
+    } else { // For tables using custom payload classes.
+      // For tables using CUSTOM payload classes.
+      if (!PAYLOADS_UNDER_DEPRECATION.contains(payloadClassName)) {
+        finalConfigs.put(RECORD_MERGE_MODE.key(), CUSTOM.toString());
+        finalConfigs.put(PAYLOAD_CLASS_NAME.key(), payloadClassName);
+        finalConfigs.put(RECORD_MERGE_STRATEGY_ID.key(), 
PAYLOAD_BASED_MERGE_STRATEGY_UUID);
+      } else { // For tables using builtin payload classes.
+        // Merge mode.
+        // NOTE: We use LEGACY_PAYLOAD_CLASS_NAME instead of 
PAYLOAD_CLASS_NAME here.
+        if (EVENT_TIME_BASED_PAYLOADS.contains(payloadClassName)) {
+          finalConfigs.put(RECORD_MERGE_MODE.key(), 
EVENT_TIME_ORDERING.name());
+          finalConfigs.put(LEGACY_PAYLOAD_CLASS_NAME.key(), payloadClassName);
+          finalConfigs.put(RECORD_MERGE_STRATEGY_ID.key(), 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID);
+        } else {
+          finalConfigs.put(RECORD_MERGE_MODE.key(), 
COMMIT_TIME_ORDERING.name());
+          finalConfigs.put(LEGACY_PAYLOAD_CLASS_NAME.key(), payloadClassName);
+          finalConfigs.put(RECORD_MERGE_STRATEGY_ID.key(), 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID);
+        }
+        // Partial update mode.
+        if (payloadClassName.equals(PartialUpdateAvroPayload.class.getName())
+            || 
payloadClassName.equals(OverwriteNonDefaultsWithLatestAvroPayload.class.getName()))
 {
+          finalConfigs.put(PARTIAL_UPDATE_MODE.key(), 
PartialUpdateMode.IGNORE_DEFAULTS.name());
+        } else if 
(payloadClassName.equals(PostgresDebeziumAvroPayload.class.getName())) {
+          finalConfigs.put(PARTIAL_UPDATE_MODE.key(), 
PartialUpdateMode.IGNORE_MARKERS.name());
+        }
+        // Merge properties.
+        if 
(payloadClassName.equals(PostgresDebeziumAvroPayload.class.getName())) {
+          finalConfigs.put(MERGE_CUSTOM_PROPERTY_PREFIX + 
PARTIAL_UPDATE_CUSTOM_MARKER, DEBEZIUM_UNAVAILABLE_VALUE);
+        } else if (payloadClassName.equals(AWSDmsAvroPayload.class.getName())) 
{
+          finalConfigs.put(MERGE_CUSTOM_PROPERTY_PREFIX + DELETE_KEY, "Op");

Review Comment:
   lets declare static variables for "Op" and "D" 
   Infact AWSDmsAvroPayload should already have it 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -807,6 +844,133 @@ public String getRecordMergeStrategyId() {
     return getString(RECORD_MERGE_STRATEGY_ID);
   }
 
+  /**
+   * Handle table config creation logic when creating a table for Table 
Version 9,
+   * since it has some different logic from previous versions.
+   */
+  public static Map<String, String> 
inferMergingConfigsForVersion9(RecordMergeMode recordMergeMode,
+                                                                   String 
payloadClassName,
+                                                                   String 
recordMergeStrategyId,
+                                                                   
HoodieTableVersion tableVersion) {
+    Map<String, String> finalConfigs = new HashMap<>();
+    if (tableVersion.versionCode() != HoodieTableVersion.NINE.versionCode()) {
+      return finalConfigs;
+    }
+
+    // Infer merge mode and strategy id if needed.
+    Pair<RecordMergeMode, String> mergeModeAndStrategyId =
+        inferMergeModeOrStrategyId(recordMergeMode, recordMergeStrategyId, 
payloadClassName);
+    recordMergeMode = mergeModeAndStrategyId.getLeft();
+    recordMergeStrategyId = mergeModeAndStrategyId.getRight();
+
+    // Properties for all tables.
+    // Since PARTIAL_UPDATE_MODE does not have default value, do not set it.
+    // For tables using MERGE MODEs (event time/commit time), or CUSTOM 
mergers.
+    if (StringUtils.isNullOrEmpty(payloadClassName)) {
+      finalConfigs.put(RECORD_MERGE_MODE.key(), recordMergeMode.name());
+      finalConfigs.put(RECORD_MERGE_STRATEGY_ID.key(), recordMergeStrategyId);
+      // NOTE: No payload class should be set.
+    } else { // For tables using custom payload classes.
+      // For tables using CUSTOM payload classes.
+      if (!PAYLOADS_UNDER_DEPRECATION.contains(payloadClassName)) {
+        finalConfigs.put(RECORD_MERGE_MODE.key(), CUSTOM.toString());

Review Comment:
   can we validate that this is only possible w/ CUSTOM merge mode.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -1476,13 +1477,21 @@ public Properties build() {
       tableConfig.setTableVersion(tableVersion);
       tableConfig.setInitialVersion(tableVersion);
 
-      Triple<RecordMergeMode, String, String> mergeConfigs =
-          HoodieTableConfig.inferCorrectMergingBehavior(
-              recordMergeMode, payloadClassName, recordMergerStrategyId, 
preCombineField,
-              tableVersion);
-      tableConfig.setValue(RECORD_MERGE_MODE, mergeConfigs.getLeft().name());
-      tableConfig.setValue(PAYLOAD_CLASS_NAME.key(), mergeConfigs.getMiddle());
-      tableConfig.setValue(RECORD_MERGE_STRATEGY_ID, mergeConfigs.getRight());
+      if (tableVersion.lesserThan(HoodieTableVersion.NINE)) {
+        Triple<RecordMergeMode, String, String> mergeConfigs =
+            HoodieTableConfig.inferCorrectMergingBehavior(
+                recordMergeMode, payloadClassName, recordMergerStrategyId, 
preCombineField,
+                tableVersion);
+        tableConfig.setValue(RECORD_MERGE_MODE, mergeConfigs.getLeft().name());
+        tableConfig.setValue(PAYLOAD_CLASS_NAME.key(), 
mergeConfigs.getMiddle());
+        tableConfig.setValue(RECORD_MERGE_STRATEGY_ID, 
mergeConfigs.getRight());
+      } else {
+        Map<String, String> mergeConfigs = inferMergingConfigsForVersion9(

Review Comment:
   we can simplify this. 
   we can call the same infer func that we do for table 8 version. and then do 
the same mapping we do during 8 to 9 upgrade. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -807,6 +843,82 @@ public String getRecordMergeStrategyId() {
     return getString(RECORD_MERGE_STRATEGY_ID);
   }
 
+  /**
+   * Handle table config creation logic when creating a table for Table 
Version 9,
+   * since it has some different logic from previous versions.
+   */
+  public static Map<String, String> 
inferMergingConfigsForVersion9(RecordMergeMode recordMergeMode,

Review Comment:
   why public?
   Also, lets avoid making static methods everywhere. if we can keep it 
private, lets go for it. 
   why make every method static 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to