This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a165de66546 [HUDI-8571] Fix upgrade for explicit
OverwriteWithLatestAvroPayload (#12330)
a165de66546 is described below
commit a165de66546e6446ed4dde0e93477a033c6fa0ca
Author: Lin Liu <[email protected]>
AuthorDate: Mon Nov 25 10:21:41 2024 -0800
[HUDI-8571] Fix upgrade for explicit OverwriteWithLatestAvroPayload (#12330)
* Add initial changes
* Fix merge mode upgrade
* Fix an issue
* Remove a code
* Fix a bug
* Comments
* Fix the meta client table config to reload from properties file
---------
Co-authored-by: danny0405 <[email protected]>
---
.../table/upgrade/SevenToEightUpgradeHandler.java | 23 ++++++++++++++++++++++
.../hudi/common/table/HoodieTableConfig.java | 20 +++++++++++++++++++
.../hudi/common/table/HoodieTableMetaClient.java | 3 +--
.../hudi/functional/TestSevenToEightUpgrade.scala | 22 +++++++++++++++++++--
4 files changed, 64 insertions(+), 4 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java
index b34d876ff4f..4926ac1dc2f 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java
@@ -19,7 +19,11 @@
package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -65,6 +69,7 @@ public class SevenToEightUpgradeHandler implements
UpgradeHandler {
Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
tablePropsToAdd.put(HoodieTableConfig.TIMELINE_PATH,
HoodieTableConfig.TIMELINE_PATH.defaultValue());
upgradePartitionFields(config, tableConfig, tablePropsToAdd);
+ upgradeMergeMode(config, tableConfig, tablePropsToAdd);
// Handle timeline upgrade:
// - Rewrite instants in active timeline to new format
@@ -120,4 +125,22 @@ public class SevenToEightUpgradeHandler implements
UpgradeHandler {
tablePropsToAdd.put(HoodieTableConfig.PARTITION_FIELDS,
partitionPathField);
}
}
+
+ private static void upgradeMergeMode(HoodieWriteConfig config,
HoodieTableConfig tableConfig, Map<ConfigProperty, String> tablePropsToAdd) {
+ if (tableConfig.getPayloadClass() != null
+ &&
tableConfig.getPayloadClass().equals(OverwriteWithLatestAvroPayload.class.getName()))
{
+ if (HoodieTableType.COPY_ON_WRITE == tableConfig.getTableType()) {
+ tablePropsToAdd.put(
+ HoodieTableConfig.PAYLOAD_CLASS_NAME,
+ DefaultHoodieRecordPayload.class.getName());
+ tablePropsToAdd.put(
+ HoodieTableConfig.RECORD_MERGE_MODE,
+ RecordMergeMode.EVENT_TIME_ORDERING.name());
+ } else {
+ tablePropsToAdd.put(
+ HoodieTableConfig.RECORD_MERGE_MODE,
+ RecordMergeMode.COMMIT_TIME_ORDERING.name());
+ }
+ }
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index e83c1e70e4e..b225bacce0c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -367,11 +367,31 @@ public class HoodieTableConfig extends HoodieConfig {
.collect(Collectors.toList());
}
+ /**
+ * Loads the table config from properties file.
+ *
+ * @param storage The storage.
+ * @param basePath The table base path.
+ *
+ * @return The reloaded table config.
+ */
public static HoodieTableConfig loadFromHoodieProps(HoodieStorage storage,
String basePath) {
StoragePath metaPath = new StoragePath(basePath,
HoodieTableMetaClient.METAFOLDER_NAME);
return new HoodieTableConfig(storage, metaPath);
}
+ /**
+ * Loads the table config from properties file.
+ *
+ * @param storage The storage.
+ * @param metaPath The table metadata path.
+ *
+ * @return The reloaded table config.
+ */
+ public static HoodieTableConfig loadFromHoodieProps(HoodieStorage storage,
StoragePath metaPath) {
+ return new HoodieTableConfig(storage, metaPath);
+ }
+
private HoodieTableConfig(HoodieStorage storage, StoragePath metaPath) {
this(storage, metaPath, null, null, null, false);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 7c6dbe49f8a..423e6b7a2df 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -480,8 +480,7 @@ public class HoodieTableMetaClient implements Serializable {
* Reload the table config properties.
*/
public synchronized void reloadTableConfig() {
- this.tableConfig = new HoodieTableConfig(this.storage, metaPath,
- this.tableConfig.getRecordMergeMode(),
this.tableConfig.getPayloadClass(),
this.tableConfig.getRecordMergeStrategyId());
+ this.tableConfig = HoodieTableConfig.loadFromHoodieProps(this.storage,
metaPath);
reloadTimelineLayoutAndPath();
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
index d220c6f9206..18eafd16029 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
@@ -19,7 +19,8 @@
package org.apache.hudi.functional
import org.apache.hudi.DataSourceWriteOptions
-import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.config.RecordMergeMode
+import org.apache.hudi.common.model.{DefaultHoodieRecordPayload,
HoodieRecordMerger, HoodieTableType, OverwriteWithLatestAvroPayload}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
HoodieTableVersion}
import org.apache.hudi.keygen.constant.KeyGeneratorType
import org.apache.hudi.table.upgrade.{SparkUpgradeDowngradeHelper,
UpgradeDowngrade}
@@ -39,7 +40,10 @@ class TestSevenToEightUpgrade extends
RecordLevelIndexTestBase {
DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key ->
KeyGeneratorType.CUSTOM.getClassName,
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> partitionFields,
- "hoodie.metadata.enable" -> "false")
+ "hoodie.metadata.enable" -> "false",
+ // "OverwriteWithLatestAvroPayload" is used to trigger merge mode
upgrade/downgrade.
+ DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key ->
classOf[OverwriteWithLatestAvroPayload].getName,
+ DataSourceWriteOptions.RECORD_MERGE_MODE.key ->
RecordMergeMode.COMMIT_TIME_ORDERING.name)
doWriteAndValidateDataAndRecordIndex(hudiOpts,
operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
@@ -50,6 +54,7 @@ class TestSevenToEightUpgrade extends
RecordLevelIndexTestBase {
// assert table version is eight and the partition fields in table config
has partition type
assertEquals(HoodieTableVersion.EIGHT,
metaClient.getTableConfig.getTableVersion)
assertEquals(partitionFields,
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).get())
+ assertEquals(classOf[OverwriteWithLatestAvroPayload].getName,
metaClient.getTableConfig.getPayloadClass)
// downgrade table props to version seven
// assert table version is seven and the partition fields in table config
does not have partition type
@@ -58,6 +63,8 @@ class TestSevenToEightUpgrade extends
RecordLevelIndexTestBase {
metaClient = HoodieTableMetaClient.reload(metaClient)
assertEquals(HoodieTableVersion.SEVEN,
metaClient.getTableConfig.getTableVersion)
assertEquals("partition",
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).get())
+ // The payload class should be maintained.
+ assertEquals(classOf[OverwriteWithLatestAvroPayload].getName,
metaClient.getTableConfig.getPayloadClass)
// auto upgrade the table
// assert table version is eight and the partition fields in table config
has partition type
@@ -65,8 +72,19 @@ class TestSevenToEightUpgrade extends
RecordLevelIndexTestBase {
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Append,
validate = false)
+
metaClient = HoodieTableMetaClient.reload(metaClient)
assertEquals(HoodieTableVersion.EIGHT,
metaClient.getTableConfig.getTableVersion)
assertEquals(partitionFields,
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).get())
+
+ // After upgrade, based on the payload and table type, the merge mode is
updated accordingly.
+ if (HoodieTableType.COPY_ON_WRITE == tableType) {
+ assertEquals(classOf[DefaultHoodieRecordPayload].getName,
metaClient.getTableConfig.getPayloadClass)
+ assertEquals(RecordMergeMode.EVENT_TIME_ORDERING.name,
metaClient.getTableConfig.getRecordMergeMode.name)
+ } else {
+ assertEquals(classOf[OverwriteWithLatestAvroPayload].getName,
metaClient.getTableConfig.getPayloadClass)
+ assertEquals(RecordMergeMode.COMMIT_TIME_ORDERING.name,
metaClient.getTableConfig.getRecordMergeMode.name)
+ assertEquals(HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
metaClient.getTableConfig.getRecordMergeStrategyId)
+ }
}
}