This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a165de66546 [HUDI-8571] Fix upgrade for explicit 
OverwriteWithLatestAvroPayload (#12330)
a165de66546 is described below

commit a165de66546e6446ed4dde0e93477a033c6fa0ca
Author: Lin Liu <[email protected]>
AuthorDate: Mon Nov 25 10:21:41 2024 -0800

    [HUDI-8571] Fix upgrade for explicit OverwriteWithLatestAvroPayload (#12330)
    
    * Add initial changes
    
    * Fix merge mode upgrade
    
    * Fix an issue
    
    * Remove a code
    
    * Fix a bug
    
    * Comments
    
    * Fix the meta client table config to reload from properties file
    
    ---------
    
    Co-authored-by: danny0405 <[email protected]>
---
 .../table/upgrade/SevenToEightUpgradeHandler.java  | 23 ++++++++++++++++++++++
 .../hudi/common/table/HoodieTableConfig.java       | 20 +++++++++++++++++++
 .../hudi/common/table/HoodieTableMetaClient.java   |  3 +--
 .../hudi/functional/TestSevenToEightUpgrade.scala  | 22 +++++++++++++++++++--
 4 files changed, 64 insertions(+), 4 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java
index b34d876ff4f..4926ac1dc2f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java
@@ -19,7 +19,11 @@
 package org.apache.hudi.table.upgrade;
 
 import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -65,6 +69,7 @@ public class SevenToEightUpgradeHandler implements 
UpgradeHandler {
     Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
     tablePropsToAdd.put(HoodieTableConfig.TIMELINE_PATH, 
HoodieTableConfig.TIMELINE_PATH.defaultValue());
     upgradePartitionFields(config, tableConfig, tablePropsToAdd);
+    upgradeMergeMode(config, tableConfig, tablePropsToAdd);
 
     // Handle timeline upgrade:
     //  - Rewrite instants in active timeline to new format
@@ -120,4 +125,22 @@ public class SevenToEightUpgradeHandler implements 
UpgradeHandler {
       tablePropsToAdd.put(HoodieTableConfig.PARTITION_FIELDS, 
partitionPathField);
     }
   }
+
+  private static void upgradeMergeMode(HoodieWriteConfig config, 
HoodieTableConfig tableConfig, Map<ConfigProperty, String> tablePropsToAdd) {
+    if (tableConfig.getPayloadClass() != null
+        && 
tableConfig.getPayloadClass().equals(OverwriteWithLatestAvroPayload.class.getName()))
 {
+      if (HoodieTableType.COPY_ON_WRITE == tableConfig.getTableType()) {
+        tablePropsToAdd.put(
+            HoodieTableConfig.PAYLOAD_CLASS_NAME,
+            DefaultHoodieRecordPayload.class.getName());
+        tablePropsToAdd.put(
+            HoodieTableConfig.RECORD_MERGE_MODE,
+            RecordMergeMode.EVENT_TIME_ORDERING.name());
+      } else {
+        tablePropsToAdd.put(
+            HoodieTableConfig.RECORD_MERGE_MODE,
+            RecordMergeMode.COMMIT_TIME_ORDERING.name());
+      }
+    }
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index e83c1e70e4e..b225bacce0c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -367,11 +367,31 @@ public class HoodieTableConfig extends HoodieConfig {
         .collect(Collectors.toList());
   }
 
+  /**
+   * Loads the table config from properties file.
+   *
+   * @param storage  The storage.
+   * @param basePath The table base path.
+   *
+   * @return The reloaded table config.
+   */
   public static HoodieTableConfig loadFromHoodieProps(HoodieStorage storage, 
String basePath) {
     StoragePath metaPath = new StoragePath(basePath, 
HoodieTableMetaClient.METAFOLDER_NAME);
     return new HoodieTableConfig(storage, metaPath);
   }
 
+  /**
+   * Loads the table config from properties file.
+   *
+   * @param storage  The storage.
+   * @param metaPath The table metadata path.
+   *
+   * @return The reloaded table config.
+   */
+  public static HoodieTableConfig loadFromHoodieProps(HoodieStorage storage, 
StoragePath metaPath) {
+    return new HoodieTableConfig(storage, metaPath);
+  }
+
   private HoodieTableConfig(HoodieStorage storage, StoragePath metaPath) {
     this(storage, metaPath, null, null, null, false);
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 7c6dbe49f8a..423e6b7a2df 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -480,8 +480,7 @@ public class HoodieTableMetaClient implements Serializable {
    * Reload the table config properties.
    */
   public synchronized void reloadTableConfig() {
-    this.tableConfig = new HoodieTableConfig(this.storage, metaPath,
-        this.tableConfig.getRecordMergeMode(), 
this.tableConfig.getPayloadClass(), 
this.tableConfig.getRecordMergeStrategyId());
+    this.tableConfig = HoodieTableConfig.loadFromHoodieProps(this.storage, 
metaPath);
     reloadTimelineLayoutAndPath();
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
index d220c6f9206..18eafd16029 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
@@ -19,7 +19,8 @@
 package org.apache.hudi.functional
 
 import org.apache.hudi.DataSourceWriteOptions
-import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.config.RecordMergeMode
+import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, 
HoodieRecordMerger, HoodieTableType, OverwriteWithLatestAvroPayload}
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
HoodieTableVersion}
 import org.apache.hudi.keygen.constant.KeyGeneratorType
 import org.apache.hudi.table.upgrade.{SparkUpgradeDowngradeHelper, 
UpgradeDowngrade}
@@ -39,7 +40,10 @@ class TestSevenToEightUpgrade extends 
RecordLevelIndexTestBase {
       DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
       DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> 
KeyGeneratorType.CUSTOM.getClassName,
       DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> partitionFields,
-      "hoodie.metadata.enable" -> "false")
+      "hoodie.metadata.enable" -> "false",
+      // "OverwriteWithLatestAvroPayload" is used to trigger merge mode 
upgrade/downgrade.
+      DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key -> 
classOf[OverwriteWithLatestAvroPayload].getName,
+      DataSourceWriteOptions.RECORD_MERGE_MODE.key -> 
RecordMergeMode.COMMIT_TIME_ORDERING.name)
 
     doWriteAndValidateDataAndRecordIndex(hudiOpts,
       operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
@@ -50,6 +54,7 @@ class TestSevenToEightUpgrade extends 
RecordLevelIndexTestBase {
     // assert table version is eight and the partition fields in table config 
has partition type
     assertEquals(HoodieTableVersion.EIGHT, 
metaClient.getTableConfig.getTableVersion)
     assertEquals(partitionFields, 
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).get())
+    assertEquals(classOf[OverwriteWithLatestAvroPayload].getName, 
metaClient.getTableConfig.getPayloadClass)
 
     // downgrade table props to version seven
     // assert table version is seven and the partition fields in table config 
does not have partition type
@@ -58,6 +63,8 @@ class TestSevenToEightUpgrade extends 
RecordLevelIndexTestBase {
     metaClient = HoodieTableMetaClient.reload(metaClient)
     assertEquals(HoodieTableVersion.SEVEN, 
metaClient.getTableConfig.getTableVersion)
     assertEquals("partition", 
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).get())
+    // The payload class should be maintained.
+    assertEquals(classOf[OverwriteWithLatestAvroPayload].getName, 
metaClient.getTableConfig.getPayloadClass)
 
     // auto upgrade the table
     // assert table version is eight and the partition fields in table config 
has partition type
@@ -65,8 +72,19 @@ class TestSevenToEightUpgrade extends 
RecordLevelIndexTestBase {
       operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
       saveMode = SaveMode.Append,
       validate = false)
+
     metaClient = HoodieTableMetaClient.reload(metaClient)
     assertEquals(HoodieTableVersion.EIGHT, 
metaClient.getTableConfig.getTableVersion)
     assertEquals(partitionFields, 
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).get())
+
+    // After upgrade, based on the payload and table type, the merge mode is 
updated accordingly.
+    if (HoodieTableType.COPY_ON_WRITE == tableType) {
+      assertEquals(classOf[DefaultHoodieRecordPayload].getName, 
metaClient.getTableConfig.getPayloadClass)
+      assertEquals(RecordMergeMode.EVENT_TIME_ORDERING.name, 
metaClient.getTableConfig.getRecordMergeMode.name)
+    } else {
+      assertEquals(classOf[OverwriteWithLatestAvroPayload].getName, 
metaClient.getTableConfig.getPayloadClass)
+      assertEquals(RecordMergeMode.COMMIT_TIME_ORDERING.name, 
metaClient.getTableConfig.getRecordMergeMode.name)
+      assertEquals(HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, 
metaClient.getTableConfig.getRecordMergeStrategyId)
+    }
   }
 }

Reply via email to