nsivabalan commented on code in PR #13519:
URL: https://github.com/apache/hudi/pull/13519#discussion_r2214490669


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java:
##########
@@ -19,18 +19,105 @@
 package org.apache.hudi.table.upgrade;
 
 import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.PartialUpdateMode;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.table.HoodieTable;
 
-import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 
-public class EightToNineUpgradeHandler implements UpgradeHandler {
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID;
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE;
+import static org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
+import static 
org.apache.hudi.table.upgrade.SevenToEightUpgradeHandler.isMetadataTableBehindDataTable;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.rollbackFailedWritesAndCompact;
 
+public class EightToNineUpgradeHandler implements UpgradeHandler {

Review Comment:
   java docs. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/NineToEightDowngradeHandler.java:
##########
@@ -20,18 +20,78 @@
 package org.apache.hudi.table.upgrade;
 
 import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.table.HoodieTable;
 
-import java.util.Collections;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
-public class NineToEightDowngradeHandler implements DowngradeHandler {
+import static org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES;
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
+import static 
org.apache.hudi.table.upgrade.SevenToEightUpgradeHandler.isMetadataTableBehindDataTable;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.rollbackFailedWritesAndCompact;
 
+public class NineToEightDowngradeHandler implements DowngradeHandler {
   @Override
-  public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> 
downgrade(HoodieWriteConfig config, HoodieEngineContext context, String 
instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
-    return Pair.of(Collections.emptyMap(), Collections.emptyList());
+  public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> 
downgrade(HoodieWriteConfig config,
+                                                                           
HoodieEngineContext context,
+                                                                           
String instantTime,
+                                                                           
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
+    final HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
+    HoodieTableMetaClient metaClient = table.getMetaClient();
+
+    // If metadata is enabled for the data table, and
+    // existing metadata table is behind the data table, then delete it
+    if (!table.isMetadataTable()
+        && config.isMetadataTableEnabled()
+        && isMetadataTableBehindDataTable(config, metaClient)) {
+      HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), 
context);
+    }
+
+    // Rollback and run compaction in one step
+    rollbackFailedWritesAndCompact(
+        table, context, config, upgradeDowngradeHelper,
+        
HoodieTableType.MERGE_ON_READ.equals(table.getMetaClient().getTableType()),
+        HoodieTableVersion.NINE);
+
+    List<ConfigProperty> propertiesToRemove = new ArrayList<>();
+    propertiesToRemove.add(MERGE_PROPERTIES);
+    propertiesToRemove.add(PARTIAL_UPDATE_MODE);
+
+    Map<ConfigProperty, String> propertiesToAdd = new HashMap<>();
+    HoodieTableConfig tableConfig = metaClient.getTableConfig();
+    String payloadClass = tableConfig.getPayloadClass();
+    Set<String> payloadClassesToHandle = new HashSet<>(Arrays.asList(
+        OverwriteNonDefaultsWithLatestAvroPayload.class.getName(),

Review Comment:
   these are static right. why can't we define a static set upfront rather than 
constructing a Set everytime 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java:
##########
@@ -19,18 +19,105 @@
 package org.apache.hudi.table.upgrade;
 
 import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.PartialUpdateMode;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.table.HoodieTable;
 
-import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 
-public class EightToNineUpgradeHandler implements UpgradeHandler {
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID;
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE;
+import static org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
+import static 
org.apache.hudi.table.upgrade.SevenToEightUpgradeHandler.isMetadataTableBehindDataTable;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.rollbackFailedWritesAndCompact;
 
+public class EightToNineUpgradeHandler implements UpgradeHandler {
   @Override
-  public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, 
HoodieEngineContext context,
-                                             String instantTime, 
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
-    
-    return Collections.emptyMap();
+  public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config,
+                                             HoodieEngineContext context,
+                                             String instantTime,
+                                             SupportsUpgradeDowngrade 
upgradeDowngradeHelper) {
+    Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
+    HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
+    HoodieTableMetaClient metaClient = table.getMetaClient();
+    HoodieTableConfig tableConfig = metaClient.getTableConfig();
+    String payloadClass = tableConfig.getPayloadClass();
+
+    // If auto upgrade is disabled, set writer version to 8 and return
+    if (!config.autoUpgrade()) {
+      config.setValue(
+          HoodieWriteConfig.WRITE_TABLE_VERSION,
+          String.valueOf(HoodieTableVersion.EIGHT.versionCode()));
+      return tablePropsToAdd;
+    }
+
+    // If metadata is enabled for the data table, and
+    // existing metadata table is behind the data table, then delete it
+    if (!table.isMetadataTable()
+        && config.isMetadataTableEnabled()
+        && isMetadataTableBehindDataTable(config, metaClient)) {
+      HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), 
context);
+    }
+
+    // Rollback and run compaction in one step
+    rollbackFailedWritesAndCompact(
+        table, context, config, upgradeDowngradeHelper,
+        
HoodieTableType.MERGE_ON_READ.equals(table.getMetaClient().getTableType()),
+        HoodieTableVersion.EIGHT);
+
+    // Handle table configs.
+    String mergeProperties = tableConfig.getMergeProperties();
+    if (!StringUtils.isNullOrEmpty(payloadClass)) {

Review Comment:
   do you know if there can be table version 8 table where only payload is set, 
but not merge mode. 
   if yes, then we should be accounting for all payloads here right
   foreg: OverwriteWithLatestAvroPayload are not seen here 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/NineToEightDowngradeHandler.java:
##########
@@ -20,18 +20,78 @@
 package org.apache.hudi.table.upgrade;
 
 import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.table.HoodieTable;
 
-import java.util.Collections;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
-public class NineToEightDowngradeHandler implements DowngradeHandler {
+import static org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES;
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
+import static 
org.apache.hudi.table.upgrade.SevenToEightUpgradeHandler.isMetadataTableBehindDataTable;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.rollbackFailedWritesAndCompact;
 
+public class NineToEightDowngradeHandler implements DowngradeHandler {
   @Override
-  public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> 
downgrade(HoodieWriteConfig config, HoodieEngineContext context, String 
instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
-    return Pair.of(Collections.emptyMap(), Collections.emptyList());
+  public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> 
downgrade(HoodieWriteConfig config,
+                                                                           
HoodieEngineContext context,
+                                                                           
String instantTime,
+                                                                           
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
+    final HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
+    HoodieTableMetaClient metaClient = table.getMetaClient();
+
+    // If metadata is enabled for the data table, and
+    // existing metadata table is behind the data table, then delete it
+    if (!table.isMetadataTable()
+        && config.isMetadataTableEnabled()
+        && isMetadataTableBehindDataTable(config, metaClient)) {
+      HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), 
context);

Review Comment:
   why add this to downgrade as well? 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java:
##########
@@ -19,18 +19,105 @@
 package org.apache.hudi.table.upgrade;
 
 import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.PartialUpdateMode;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.table.HoodieTable;
 
-import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 
-public class EightToNineUpgradeHandler implements UpgradeHandler {
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID;
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE;
+import static org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
+import static 
org.apache.hudi.table.upgrade.SevenToEightUpgradeHandler.isMetadataTableBehindDataTable;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.rollbackFailedWritesAndCompact;
 
+public class EightToNineUpgradeHandler implements UpgradeHandler {
   @Override
-  public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, 
HoodieEngineContext context,
-                                             String instantTime, 
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
-    
-    return Collections.emptyMap();
+  public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config,
+                                             HoodieEngineContext context,
+                                             String instantTime,
+                                             SupportsUpgradeDowngrade 
upgradeDowngradeHelper) {
+    Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
+    HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
+    HoodieTableMetaClient metaClient = table.getMetaClient();
+    HoodieTableConfig tableConfig = metaClient.getTableConfig();
+    String payloadClass = tableConfig.getPayloadClass();
+
+    // If auto upgrade is disabled, set writer version to 8 and return
+    if (!config.autoUpgrade()) {
+      config.setValue(
+          HoodieWriteConfig.WRITE_TABLE_VERSION,
+          String.valueOf(HoodieTableVersion.EIGHT.versionCode()));
+      return tablePropsToAdd;
+    }
+
+    // If metadata is enabled for the data table, and
+    // existing metadata table is behind the data table, then delete it
+    if (!table.isMetadataTable()
+        && config.isMetadataTableEnabled()

Review Comment:
   lets introduce something called `BaseUpgradeHandler` which extends 
implements UpgradeHandler and all existing AtoBUpgradeHandlers can extend from 
this new base class. 
   and move all common methods to the base class and mark it protected 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java:
##########
@@ -19,18 +19,105 @@
 package org.apache.hudi.table.upgrade;
 
 import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.PartialUpdateMode;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.table.HoodieTable;
 
-import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 
-public class EightToNineUpgradeHandler implements UpgradeHandler {
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID;
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE;
+import static org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
+import static 
org.apache.hudi.table.upgrade.SevenToEightUpgradeHandler.isMetadataTableBehindDataTable;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.rollbackFailedWritesAndCompact;
 
+public class EightToNineUpgradeHandler implements UpgradeHandler {
   @Override
-  public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, 
HoodieEngineContext context,
-                                             String instantTime, 
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
-    
-    return Collections.emptyMap();
+  public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config,
+                                             HoodieEngineContext context,
+                                             String instantTime,
+                                             SupportsUpgradeDowngrade 
upgradeDowngradeHelper) {
+    Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
+    HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
+    HoodieTableMetaClient metaClient = table.getMetaClient();
+    HoodieTableConfig tableConfig = metaClient.getTableConfig();
+    String payloadClass = tableConfig.getPayloadClass();
+
+    // If auto upgrade is disabled, set writer version to 8 and return
+    if (!config.autoUpgrade()) {
+      config.setValue(
+          HoodieWriteConfig.WRITE_TABLE_VERSION,
+          String.valueOf(HoodieTableVersion.EIGHT.versionCode()));
+      return tablePropsToAdd;
+    }
+
+    // If metadata is enabled for the data table, and
+    // existing metadata table is behind the data table, then delete it
+    if (!table.isMetadataTable()
+        && config.isMetadataTableEnabled()
+        && isMetadataTableBehindDataTable(config, metaClient)) {
+      HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), 
context);
+    }
+
+    // Rollback and run compaction in one step
+    rollbackFailedWritesAndCompact(
+        table, context, config, upgradeDowngradeHelper,
+        
HoodieTableType.MERGE_ON_READ.equals(table.getMetaClient().getTableType()),
+        HoodieTableVersion.EIGHT);
+
+    // Handle table configs.
+    String mergeProperties = tableConfig.getMergeProperties();
+    if (!StringUtils.isNullOrEmpty(payloadClass)) {
+      // Add MERGE Mode.
+      if 
(payloadClass.equals(OverwriteNonDefaultsWithLatestAvroPayload.class.getName())
+          || payloadClass.equals(AWSDmsAvroPayload.class.getName())) {
+        tablePropsToAdd.put(RECORD_MERGE_STRATEGY_ID, 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID);
+        tablePropsToAdd.put(RECORD_MERGE_MODE, 
RecordMergeMode.COMMIT_TIME_ORDERING.name());
+      } else if (payloadClass.equals(PartialUpdateAvroPayload.class.getName())

Review Comment:
   I don't find below payloads
   
   EventTimeAvroPayload
   MySqlDebeziumAvroPayload
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/NineToEightDowngradeHandler.java:
##########
@@ -20,18 +20,78 @@
 package org.apache.hudi.table.upgrade;
 
 import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.table.HoodieTable;
 
-import java.util.Collections;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
-public class NineToEightDowngradeHandler implements DowngradeHandler {
+import static org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES;
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
+import static 
org.apache.hudi.table.upgrade.SevenToEightUpgradeHandler.isMetadataTableBehindDataTable;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.rollbackFailedWritesAndCompact;
 
+public class NineToEightDowngradeHandler implements DowngradeHandler {
   @Override
-  public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> 
downgrade(HoodieWriteConfig config, HoodieEngineContext context, String 
instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
-    return Pair.of(Collections.emptyMap(), Collections.emptyList());
+  public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> 
downgrade(HoodieWriteConfig config,
+                                                                           
HoodieEngineContext context,
+                                                                           
String instantTime,
+                                                                           
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
+    final HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
+    HoodieTableMetaClient metaClient = table.getMetaClient();
+
+    // If metadata is enabled for the data table, and
+    // existing metadata table is behind the data table, then delete it
+    if (!table.isMetadataTable()
+        && config.isMetadataTableEnabled()
+        && isMetadataTableBehindDataTable(config, metaClient)) {
+      HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), 
context);
+    }
+
+    // Rollback and run compaction in one step
+    rollbackFailedWritesAndCompact(
+        table, context, config, upgradeDowngradeHelper,
+        
HoodieTableType.MERGE_ON_READ.equals(table.getMetaClient().getTableType()),
+        HoodieTableVersion.NINE);
+
+    List<ConfigProperty> propertiesToRemove = new ArrayList<>();
+    propertiesToRemove.add(MERGE_PROPERTIES);
+    propertiesToRemove.add(PARTIAL_UPDATE_MODE);
+
+    Map<ConfigProperty, String> propertiesToAdd = new HashMap<>();
+    HoodieTableConfig tableConfig = metaClient.getTableConfig();
+    String payloadClass = tableConfig.getPayloadClass();
+    Set<String> payloadClassesToHandle = new HashSet<>(Arrays.asList(
+        OverwriteNonDefaultsWithLatestAvroPayload.class.getName(),
+        PartialUpdateAvroPayload.class.getName(),
+        AWSDmsAvroPayload.class.getName(),
+        PostgresDebeziumAvroPayload.class.getName()
+    ));
+    if (payloadClassesToHandle.contains(payloadClass)) {

Review Comment:
   why are we considering just the 4 payloads. 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/NineToEightDowngradeHandler.java:
##########
@@ -20,18 +20,78 @@
 package org.apache.hudi.table.upgrade;
 
 import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.table.HoodieTable;
 
-import java.util.Collections;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
-public class NineToEightDowngradeHandler implements DowngradeHandler {
+import static org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES;
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
+import static 
org.apache.hudi.table.upgrade.SevenToEightUpgradeHandler.isMetadataTableBehindDataTable;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.rollbackFailedWritesAndCompact;
 
+public class NineToEightDowngradeHandler implements DowngradeHandler {
   @Override
-  public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> 
downgrade(HoodieWriteConfig config, HoodieEngineContext context, String 
instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
-    return Pair.of(Collections.emptyMap(), Collections.emptyList());
+  public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> 
downgrade(HoodieWriteConfig config,
+                                                                           
HoodieEngineContext context,
+                                                                           
String instantTime,
+                                                                           
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
+    final HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
+    HoodieTableMetaClient metaClient = table.getMetaClient();
+
+    // If metadata is enabled for the data table, and
+    // existing metadata table is behind the data table, then delete it
+    if (!table.isMetadataTable()
+        && config.isMetadataTableEnabled()
+        && isMetadataTableBehindDataTable(config, metaClient)) {
+      HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), 
context);
+    }
+
+    // Rollback and run compaction in one step
+    rollbackFailedWritesAndCompact(
+        table, context, config, upgradeDowngradeHelper,
+        
HoodieTableType.MERGE_ON_READ.equals(table.getMetaClient().getTableType()),
+        HoodieTableVersion.NINE);
+
+    List<ConfigProperty> propertiesToRemove = new ArrayList<>();
+    propertiesToRemove.add(MERGE_PROPERTIES);
+    propertiesToRemove.add(PARTIAL_UPDATE_MODE);
+
+    Map<ConfigProperty, String> propertiesToAdd = new HashMap<>();
+    HoodieTableConfig tableConfig = metaClient.getTableConfig();
+    String payloadClass = tableConfig.getPayloadClass();
+    Set<String> payloadClassesToHandle = new HashSet<>(Arrays.asList(
+        OverwriteNonDefaultsWithLatestAvroPayload.class.getName(),
+        PartialUpdateAvroPayload.class.getName(),
+        AWSDmsAvroPayload.class.getName(),
+        PostgresDebeziumAvroPayload.class.getName()
+    ));
+    if (payloadClassesToHandle.contains(payloadClass)) {
+      propertiesToAdd.put(RECORD_MERGE_STRATEGY_ID, 
PAYLOAD_BASED_MERGE_STRATEGY_UUID);
+      propertiesToAdd.put(RECORD_MERGE_MODE, RecordMergeMode.CUSTOM.name());

Review Comment:
   why add merge mode to version 8 tables. 
   I thought for these custom payloads in v8, we don't set any merge only w/ v 
8 tables



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to