This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0d2877631a2 [HUDI-8409] Fixing merge mode config during upgrade and
downgrade from version 7 to 8 and back (#13046)
0d2877631a2 is described below
commit 0d2877631a20133d4f9801e8d1453a65dec62767
Author: Lokesh Jain <[email protected]>
AuthorDate: Sat Mar 29 12:03:35 2025 +0530
[HUDI-8409] Fixing merge mode config during upgrade and downgrade from
version 7 to 8 and back (#13046)
Update the upgrade/downgrade logic for merge mode related configs.
---------
Co-authored-by: Lin Liu <[email protected]>
Co-authored-by: sivabalan <[email protected]>
---
.../hudi/table/upgrade/DowngradeHandler.java | 7 ++-
.../upgrade/EightToSevenDowngradeHandler.java | 43 +++++++++-----
.../table/upgrade/FiveToFourDowngradeHandler.java | 6 +-
.../table/upgrade/FourToThreeDowngradeHandler.java | 6 +-
.../table/upgrade/OneToZeroDowngradeHandler.java | 5 +-
.../table/upgrade/SevenToEightUpgradeHandler.java | 68 +++++++++++++++++++---
.../table/upgrade/SevenToSixDowngradeHandler.java | 6 +-
.../table/upgrade/SixToFiveDowngradeHandler.java | 7 ++-
.../table/upgrade/ThreeToTwoDowngradeHandler.java | 6 +-
.../table/upgrade/TwoToOneDowngradeHandler.java | 5 +-
.../hudi/table/upgrade/UpgradeDowngrade.java | 23 ++++++--
.../upgrade/TestEightToSevenDowngradeHandler.java | 45 +++++++++++++-
.../upgrade/TestSevenToEightUpgradeHandler.java | 60 +++++++++++++++----
.../scala/org/apache/hudi/HoodieWriterUtils.scala | 41 +++++++++++--
.../hudi/functional/TestSevenToEightUpgrade.scala | 15 +++--
15 files changed, 277 insertions(+), 66 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/DowngradeHandler.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/DowngradeHandler.java
index 51981e441de..69e0d82aa3f 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/DowngradeHandler.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/DowngradeHandler.java
@@ -20,8 +20,10 @@ package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
+import java.util.List;
import java.util.Map;
/**
@@ -36,9 +38,10 @@ public interface DowngradeHandler {
* @param context instance of {@link HoodieEngineContext} to
be used.
* @param instantTime current instant time that should not be
touched.
* @param upgradeDowngradeHelper instance of {@link
SupportsUpgradeDowngrade} to be used.
- * @return Map of config properties and its values to be added to table
properties.
+ * @return Map of config properties and its values to be added to table
properties along with the list
+ * of config properties to be removed from the table config.
*/
- Map<ConfigProperty, String> downgrade(
+ Pair<Map<ConfigProperty, String>, List<ConfigProperty>> downgrade(
HoodieWriteConfig config, HoodieEngineContext context, String
instantTime,
SupportsUpgradeDowngrade upgradeDowngradeHelper);
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
index 0df93dc51ec..5bf9ce22eff 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
@@ -25,7 +25,9 @@ import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BootstrapIndexType;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
@@ -42,10 +44,11 @@ import
org.apache.hudi.common.table.timeline.versioning.v2.ArchivedTimelineLoade
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.common.util.collection.Triple;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieUpgradeDowngradeException;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.hudi.metadata.HoodieTableMetadata;
@@ -61,6 +64,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -95,7 +99,7 @@ public class EightToSevenDowngradeHandler implements
DowngradeHandler {
private static final Set<String> SUPPORTED_METADATA_PARTITION_PATHS =
getSupportedMetadataPartitionPaths();
@Override
- public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config,
HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade
upgradeDowngradeHelper) {
+ public Pair<Map<ConfigProperty, String>, List<ConfigProperty>>
downgrade(HoodieWriteConfig config, HoodieEngineContext context, String
instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
final HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
// Rollback and run compaction in one step
@@ -133,7 +137,9 @@ public class EightToSevenDowngradeHandler implements
DowngradeHandler {
// downgrade table properties
downgradePartitionFields(config, metaClient.getTableConfig(),
tablePropsToAdd);
unsetInitialVersion(metaClient.getTableConfig(), tablePropsToAdd);
- unsetRecordMergeMode(metaClient.getTableConfig(), tablePropsToAdd);
+ List<ConfigProperty> tablePropsToRemove = new ArrayList<>();
+ tablePropsToRemove.addAll(unsetRecordMergeMode(config,
metaClient.getTableConfig(), tablePropsToAdd));
+ tablePropsToRemove.add(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID);
downgradeKeyGeneratorType(metaClient.getTableConfig(), tablePropsToAdd);
downgradeBootstrapIndexType(metaClient.getTableConfig(), tablePropsToAdd);
@@ -143,7 +149,7 @@ public class EightToSevenDowngradeHandler implements
DowngradeHandler {
downgradeMetadataPartitions(context, metaClient.getStorage(),
metaClient, tablePropsToAdd);
UpgradeDowngradeUtils.updateMetadataTableVersion(context,
HoodieTableVersion.SEVEN, metaClient);
}
- return tablePropsToAdd;
+ return Pair.of(tablePropsToAdd, tablePropsToRemove);
}
static void downgradePartitionFields(HoodieWriteConfig config,
@@ -161,19 +167,24 @@ public class EightToSevenDowngradeHandler implements
DowngradeHandler {
tableConfig.getProps().remove(HoodieTableConfig.INITIAL_VERSION.key());
}
- static void unsetRecordMergeMode(HoodieTableConfig tableConfig,
Map<ConfigProperty, String> tablePropsToAdd) {
- Triple<RecordMergeMode, String, String> mergingConfigs =
- HoodieTableConfig.inferCorrectMergingBehavior(
- tableConfig.getRecordMergeMode(), tableConfig.getPayloadClass(),
- tableConfig.getRecordMergeStrategyId(),
tableConfig.getPreCombineField(),
- tableConfig.getTableVersion());
- if (StringUtils.nonEmpty(mergingConfigs.getMiddle())) {
- tablePropsToAdd.put(HoodieTableConfig.PAYLOAD_CLASS_NAME,
mergingConfigs.getMiddle());
- }
- if (StringUtils.nonEmpty(mergingConfigs.getRight())) {
- tablePropsToAdd.put(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID,
mergingConfigs.getRight());
+ static List<ConfigProperty> unsetRecordMergeMode(HoodieWriteConfig config,
HoodieTableConfig tableConfig, Map<ConfigProperty, String> tablePropsToAdd) {
+ String payloadClass = tableConfig.getPayloadClass();
+ if (StringUtils.isNullOrEmpty(payloadClass)) {
+ RecordMergeMode mergeMode = tableConfig.getRecordMergeMode();
+ switch (mergeMode) {
+ case EVENT_TIME_ORDERING:
+ tablePropsToAdd.put(HoodieTableConfig.PAYLOAD_CLASS_NAME,
DefaultHoodieRecordPayload.class.getName());
+ break;
+ case COMMIT_TIME_ORDERING:
+ tablePropsToAdd.put(HoodieTableConfig.PAYLOAD_CLASS_NAME,
OverwriteWithLatestAvroPayload.class.getName());
+ break;
+ case CUSTOM:
+ throw new HoodieUpgradeDowngradeException("Custom payload class must
be available for downgrading custom merge mode");
+ default:
+ throw new HoodieUpgradeDowngradeException("Downgrade is not handled
for " + mergeMode);
+ }
}
- tableConfig.getProps().remove(HoodieTableConfig.RECORD_MERGE_MODE.key());
+ return Collections.singletonList(HoodieTableConfig.RECORD_MERGE_MODE);
}
static void downgradeBootstrapIndexType(HoodieTableConfig tableConfig,
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FiveToFourDowngradeHandler.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FiveToFourDowngradeHandler.java
index e51f5496c2d..57ed25b8051 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FiveToFourDowngradeHandler.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FiveToFourDowngradeHandler.java
@@ -21,15 +21,17 @@ package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
public class FiveToFourDowngradeHandler implements DowngradeHandler {
@Override
- public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config,
HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade
upgradeDowngradeHelper) {
- return Collections.emptyMap();
+ public Pair<Map<ConfigProperty, String>, List<ConfigProperty>>
downgrade(HoodieWriteConfig config, HoodieEngineContext context, String
instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
+ return Pair.of(Collections.emptyMap(), Collections.emptyList());
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FourToThreeDowngradeHandler.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FourToThreeDowngradeHandler.java
index 86a594af17c..c4454c0164c 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FourToThreeDowngradeHandler.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FourToThreeDowngradeHandler.java
@@ -21,10 +21,12 @@ package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
/**
@@ -33,12 +35,12 @@ import java.util.Map;
public class FourToThreeDowngradeHandler implements DowngradeHandler {
@Override
- public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config,
HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade
upgradeDowngradeHelper) {
+ public Pair<Map<ConfigProperty, String>, List<ConfigProperty>>
downgrade(HoodieWriteConfig config, HoodieEngineContext context, String
instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
if (config.isMetadataTableEnabled()) {
// Metadata Table in version 4 has a schema that is not forward
compatible.
// Hence, it is safe to delete the metadata table, which will be
re-initialized in subsequent commit.
HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(),
context);
}
- return Collections.emptyMap();
+ return Pair.of(Collections.emptyMap(), Collections.emptyList());
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java
index 9e4a1fbd638..aaabd1ca2b8 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.WriteMarkers;
@@ -38,7 +39,7 @@ import java.util.stream.Collectors;
public class OneToZeroDowngradeHandler implements DowngradeHandler {
@Override
- public Map<ConfigProperty, String> downgrade(
+ public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> downgrade(
HoodieWriteConfig config, HoodieEngineContext context, String
instantTime,
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
@@ -50,6 +51,6 @@ public class OneToZeroDowngradeHandler implements
DowngradeHandler {
WriteMarkers writeMarkers =
WriteMarkersFactory.get(config.getMarkersType(), table,
inflightInstant.requestedTime());
writeMarkers.quietDeleteMarkerDir(context,
config.getMarkersDeleteParallelism());
}
- return Collections.emptyMap();
+ return Pair.of(Collections.emptyMap(), Collections.emptyList());
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java
index b7d74ff7b74..4e0d7ec004c 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.BootstrapIndexType;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
@@ -178,23 +179,76 @@ public class SevenToEightUpgradeHandler implements
UpgradeHandler {
}
static void upgradeMergeMode(HoodieTableConfig tableConfig,
Map<ConfigProperty, String> tablePropsToAdd) {
- if (tableConfig.getPayloadClass() != null
- &&
tableConfig.getPayloadClass().equals(OverwriteWithLatestAvroPayload.class.getName()))
{
- if (HoodieTableType.COPY_ON_WRITE == tableConfig.getTableType()) {
- tablePropsToAdd.put(
- HoodieTableConfig.RECORD_MERGE_MODE,
- RecordMergeMode.COMMIT_TIME_ORDERING.name());
- } else {
+ String payloadClass = tableConfig.getPayloadClass();
+ String preCombineField = tableConfig.getPreCombineField();
+ if (isCustomPayloadClass(payloadClass)) {
+ // This contains a special case: HoodieMetadataPayload.
+ tablePropsToAdd.put(
+ HoodieTableConfig.PAYLOAD_CLASS_NAME,
+ payloadClass);
+ tablePropsToAdd.put(
+ HoodieTableConfig.RECORD_MERGE_MODE,
+ RecordMergeMode.CUSTOM.name());
+ tablePropsToAdd.put(
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID,
+ HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID);
+ } else if (tableConfig.getTableType() == HoodieTableType.COPY_ON_WRITE) {
+ setEventTimeOrCommitTimeBasedOnPayload(payloadClass, tablePropsToAdd);
+ } else { // MOR table
+ if (StringUtils.nonEmpty(preCombineField)) {
+ // This contains a special case: OverwriteWithLatestPayload with
preCombine field.
tablePropsToAdd.put(
HoodieTableConfig.PAYLOAD_CLASS_NAME,
DefaultHoodieRecordPayload.class.getName());
tablePropsToAdd.put(
HoodieTableConfig.RECORD_MERGE_MODE,
RecordMergeMode.EVENT_TIME_ORDERING.name());
+ tablePropsToAdd.put(
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID,
+ HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID);
+ } else {
+ setEventTimeOrCommitTimeBasedOnPayload(payloadClass, tablePropsToAdd);
}
}
}
+ private static void setEventTimeOrCommitTimeBasedOnPayload(String
payloadClass, Map<ConfigProperty, String> tablePropsToAdd) {
+ // DefaultRecordPayload without preCombine Field.
+ // This is unlikely to happen.
+ if (useDefaultHoodieRecordPayload(payloadClass)) {
+ tablePropsToAdd.put(
+ HoodieTableConfig.PAYLOAD_CLASS_NAME,
+ DefaultHoodieRecordPayload.class.getName());
+ tablePropsToAdd.put(
+ HoodieTableConfig.RECORD_MERGE_MODE,
+ RecordMergeMode.EVENT_TIME_ORDERING.name());
+ tablePropsToAdd.put(
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID,
+ HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID);
+ } else {
+ tablePropsToAdd.put(
+ HoodieTableConfig.PAYLOAD_CLASS_NAME,
+ OverwriteWithLatestAvroPayload.class.getName());
+ tablePropsToAdd.put(
+ HoodieTableConfig.RECORD_MERGE_MODE,
+ RecordMergeMode.COMMIT_TIME_ORDERING.name());
+ tablePropsToAdd.put(
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID,
+ HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID);
+ }
+ }
+
+ static boolean useDefaultHoodieRecordPayload(String payloadClass) {
+ return !StringUtils.isNullOrEmpty(payloadClass)
+ && payloadClass.equals(DefaultHoodieRecordPayload.class.getName());
+ }
+
+ static boolean isCustomPayloadClass(String payloadClass) {
+ return !StringUtils.isNullOrEmpty(payloadClass)
+ && !payloadClass.equals(DefaultHoodieRecordPayload.class.getName())
+ &&
!payloadClass.equals(OverwriteWithLatestAvroPayload.class.getName());
+ }
+
static void setInitialVersion(HoodieTableConfig tableConfig,
Map<ConfigProperty, String> tablePropsToAdd) {
if (tableConfig.contains(HoodieTableConfig.VERSION)) {
tablePropsToAdd.put(HoodieTableConfig.INITIAL_VERSION,
String.valueOf(tableConfig.getTableVersion().versionCode()));
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToSixDowngradeHandler.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToSixDowngradeHandler.java
index 7a5280a57a3..3aae6f68504 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToSixDowngradeHandler.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToSixDowngradeHandler.java
@@ -21,10 +21,12 @@ package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
/**
@@ -36,9 +38,9 @@ import java.util.Map;
public class SevenToSixDowngradeHandler implements DowngradeHandler {
@Override
- public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config,
HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade
upgradeDowngradeHelper) {
+ public Pair<Map<ConfigProperty, String>, List<ConfigProperty>>
downgrade(HoodieWriteConfig config, HoodieEngineContext context, String
instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
final HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
UpgradeDowngradeUtils.updateMetadataTableVersion(context,
HoodieTableVersion.SIX, table.getMetaClient());
- return Collections.emptyMap();
+ return Pair.of(Collections.emptyMap(), Collections.emptyList());
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java
index bfeaf00447b..1cbb5936a2b 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java
@@ -25,11 +25,14 @@ import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.table.HoodieTable;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import static
org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS;
@@ -47,7 +50,7 @@ import static
org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PART
public class SixToFiveDowngradeHandler implements DowngradeHandler {
@Override
- public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config,
HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade
upgradeDowngradeHelper) {
+ public Pair<Map<ConfigProperty, String>, List<ConfigProperty>>
downgrade(HoodieWriteConfig config, HoodieEngineContext context, String
instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
final HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
// Since version 6 includes a new schema field for metadata table(MDT),
the MDT needs to be deleted during downgrade to avoid column drop error.
@@ -64,7 +67,7 @@ public class SixToFiveDowngradeHandler implements
DowngradeHandler {
.ifPresent(v -> updatedTableProps.put(TABLE_METADATA_PARTITIONS, v));
Option.ofNullable(tableConfig.getString(TABLE_METADATA_PARTITIONS_INFLIGHT))
.ifPresent(v ->
updatedTableProps.put(TABLE_METADATA_PARTITIONS_INFLIGHT, v));
- return updatedTableProps;
+ return Pair.of(updatedTableProps, Collections.emptyList());
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToTwoDowngradeHandler.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToTwoDowngradeHandler.java
index 4f209f05ffc..bc5baaa3302 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToTwoDowngradeHandler.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToTwoDowngradeHandler.java
@@ -21,10 +21,12 @@ package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
/**
@@ -33,13 +35,13 @@ import java.util.Map;
public class ThreeToTwoDowngradeHandler implements DowngradeHandler {
@Override
- public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config,
HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade
upgradeDowngradeHelper) {
+ public Pair<Map<ConfigProperty, String>, List<ConfigProperty>>
downgrade(HoodieWriteConfig config, HoodieEngineContext context, String
instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
if (config.isMetadataTableEnabled()) {
// Metadata Table in version 3 is synchronous and in version 2 is
asynchronous. Downgrading to asynchronous
// removes the checks in code to decide whether to use a LogBlock or
not. Also, the schema for the
// table has been updated and is not forward compatible. Hence, we need
to delete the table.
HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(),
context);
}
- return Collections.emptyMap();
+ return Pair.of(Collections.emptyMap(), Collections.emptyList());
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java
index 59193bfc9ce..05e0d1c31c4 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.MarkerUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.storage.HoodieStorage;
@@ -52,7 +53,7 @@ import static
org.apache.hudi.common.util.MarkerUtils.MARKERS_FILENAME_PREFIX;
public class TwoToOneDowngradeHandler implements DowngradeHandler {
@Override
- public Map<ConfigProperty, String> downgrade(
+ public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> downgrade(
HoodieWriteConfig config, HoodieEngineContext context, String
instantTime,
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
@@ -70,7 +71,7 @@ public class TwoToOneDowngradeHandler implements
DowngradeHandler {
throw new HoodieException("Converting marker files to DIRECT style
failed during downgrade", e);
}
}
- return Collections.emptyMap();
+ return Pair.of(Collections.emptyMap(), Collections.emptyList());
}
/**
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
index 2ffabc14128..124cdad0a4c 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
@@ -40,7 +41,9 @@ import org.apache.hudi.table.HoodieTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Hashtable;
+import java.util.List;
import java.util.Map;
/**
@@ -151,13 +154,14 @@ public class UpgradeDowngrade {
// Perform the actual upgrade/downgrade; this has to be idempotent, for
now.
LOG.info("Attempting to move table from version " + fromVersion + " to " +
toVersion);
- Map<ConfigProperty, String> tableProps = new Hashtable<>();
+ Map<ConfigProperty, String> tablePropsToAdd = new Hashtable<>();
+ List<ConfigProperty> tablePropsToRemove = new ArrayList<>();
boolean isDowngrade = false;
if (fromVersion.versionCode() < toVersion.versionCode()) {
// upgrade
while (fromVersion.versionCode() < toVersion.versionCode()) {
HoodieTableVersion nextVersion =
HoodieTableVersion.fromVersionCode(fromVersion.versionCode() + 1);
- tableProps.putAll(upgrade(fromVersion, nextVersion, instantTime));
+ tablePropsToAdd.putAll(upgrade(fromVersion, nextVersion, instantTime));
fromVersion = nextVersion;
}
} else {
@@ -165,7 +169,9 @@ public class UpgradeDowngrade {
isDowngrade = true;
while (fromVersion.versionCode() > toVersion.versionCode()) {
HoodieTableVersion prevVersion =
HoodieTableVersion.fromVersionCode(fromVersion.versionCode() - 1);
- tableProps.putAll(downgrade(fromVersion, prevVersion, instantTime));
+ Pair<Map<ConfigProperty, String>, List<ConfigProperty>>
tablePropsToAddAndRemove = downgrade(fromVersion, prevVersion, instantTime);
+ tablePropsToAdd.putAll(tablePropsToAddAndRemove.getLeft());
+ tablePropsToRemove.addAll(tablePropsToAddAndRemove.getRight());
fromVersion = prevVersion;
}
}
@@ -174,8 +180,15 @@ public class UpgradeDowngrade {
metaClient = HoodieTableMetaClient.reload(metaClient);
}
// Write out the current version in hoodie.properties.updated file
- for (Map.Entry<ConfigProperty, String> entry : tableProps.entrySet()) {
+ for (Map.Entry<ConfigProperty, String> entry : tablePropsToAdd.entrySet())
{
+ // add alternate keys.
metaClient.getTableConfig().setValue(entry.getKey(), entry.getValue());
+ entry.getKey().getAlternatives().forEach(alternateKey -> {
+ metaClient.getTableConfig().setValue((String)alternateKey,
entry.getValue());
+ });
+ }
+ for (ConfigProperty configProperty : tablePropsToRemove) {
+ metaClient.getTableConfig().clearValue(configProperty);
}
// user could have disabled auto upgrade (probably to deploy the new
binary only),
// in which case, we should not update the table version
@@ -234,7 +247,7 @@ public class UpgradeDowngrade {
}
}
- protected Map<ConfigProperty, String> downgrade(HoodieTableVersion
fromVersion, HoodieTableVersion toVersion, String instantTime) {
+ protected Pair<Map<ConfigProperty, String>, List<ConfigProperty>>
downgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String
instantTime) {
if (fromVersion == HoodieTableVersion.ONE && toVersion ==
HoodieTableVersion.ZERO) {
return new OneToZeroDowngradeHandler().downgrade(config, context,
instantTime, upgradeDowngradeHelper);
} else if (fromVersion == HoodieTableVersion.TWO && toVersion ==
HoodieTableVersion.ONE) {
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToSevenDowngradeHandler.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToSevenDowngradeHandler.java
index 300b00a4880..62f0e54f9ed 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToSevenDowngradeHandler.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToSevenDowngradeHandler.java
@@ -28,7 +28,9 @@ import org.apache.hudi.common.model.BootstrapIndexType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieUpgradeDowngradeException;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.hudi.metadata.HoodieTableMetadata;
@@ -41,8 +43,11 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.Mock;
import org.mockito.MockedStatic;
+import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import java.io.File;
@@ -67,6 +72,7 @@ import static
org.apache.hudi.metadata.MetadataPartitionType.FILES;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
@@ -154,6 +160,7 @@ class TestEightToSevenDowngradeHandler {
existingTableProps.put(RECORD_MERGE_MODE.key(),
RecordMergeMode.EVENT_TIME_ORDERING.name());
existingTableProps.put(BOOTSTRAP_INDEX_TYPE.key(),
BootstrapIndexType.HFILE.name());
existingTableProps.put(KEY_GENERATOR_TYPE.key(),
KeyGeneratorType.CUSTOM.name());
+
when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.EVENT_TIME_ORDERING);
when(tableConfig.getProps()).thenReturn(new
TypedProperties(existingTableProps));
when(config.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())).thenReturn("partition_field");
when(tableConfig.getPartitionFieldProp()).thenReturn("partition_field");
@@ -163,8 +170,8 @@ class TestEightToSevenDowngradeHandler {
assertEquals("partition_field", tablePropsToAdd.get(PARTITION_FIELDS));
EightToSevenDowngradeHandler.unsetInitialVersion(tableConfig,
tablePropsToAdd);
assertFalse(tableConfig.getProps().containsKey(INITIAL_VERSION.key()));
- EightToSevenDowngradeHandler.unsetRecordMergeMode(tableConfig,
tablePropsToAdd);
- assertFalse(tableConfig.getProps().containsKey(RECORD_MERGE_MODE.key()));
+ List<ConfigProperty> propsToRemove =
EightToSevenDowngradeHandler.unsetRecordMergeMode(config, tableConfig,
tablePropsToAdd);
+ assertTrue(propsToRemove.contains(RECORD_MERGE_MODE));
assertTrue(tablePropsToAdd.containsKey(PAYLOAD_CLASS_NAME));
EightToSevenDowngradeHandler.downgradeBootstrapIndexType(tableConfig,
tablePropsToAdd);
assertFalse(tablePropsToAdd.containsKey(BOOTSTRAP_INDEX_TYPE));
@@ -173,4 +180,38 @@ class TestEightToSevenDowngradeHandler {
assertFalse(tablePropsToAdd.containsKey(KEY_GENERATOR_TYPE));
assertFalse(tablePropsToAdd.containsKey(KEY_GENERATOR_CLASS_NAME));
}
+
+ @ParameterizedTest
+ @CsvSource({
+ "com.example.CustomPayload, CUSTOM, com.example.CustomPayload",
+ ", CUSTOM, ",
+ "org.apache.hudi.metadata.HoodieMetadataPayload, CUSTOM,
org.apache.hudi.metadata.HoodieMetadataPayload",
+ "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload,
COMMIT_TIME_ORDERING,
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload",
+ "org.apache.hudi.common.model.DefaultHoodieRecordPayload,
EVENT_TIME_ORDERING, org.apache.hudi.common.model.DefaultHoodieRecordPayload",
+ ", EVENT_TIME_ORDERING,
org.apache.hudi.common.model.DefaultHoodieRecordPayload",
+ ", COMMIT_TIME_ORDERING,
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload"
+ })
+ void testUnsetRecordMergeMode(String payloadClass, String recordMergeMode,
String expectedPayloadClass) {
+ HoodieTableConfig tableConfig = Mockito.mock(HoodieTableConfig.class);
+ Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
+
+ when(tableConfig.getPayloadClass()).thenReturn(payloadClass);
+ if (StringUtils.isNullOrEmpty(payloadClass)) {
+
when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.valueOf(recordMergeMode));
+ }
+
+ if (!StringUtils.isNullOrEmpty(recordMergeMode) &&
recordMergeMode.equals("CUSTOM") && StringUtils.isNullOrEmpty(payloadClass)) {
+ assertThrows(HoodieUpgradeDowngradeException.class, () ->
EightToSevenDowngradeHandler.unsetRecordMergeMode(config, tableConfig,
tablePropsToAdd));
+ } else {
+ List<ConfigProperty> propsToRemove =
EightToSevenDowngradeHandler.unsetRecordMergeMode(config, tableConfig,
tablePropsToAdd);
+ assertTrue(propsToRemove.stream().anyMatch(cfg ->
cfg.key().equals(RECORD_MERGE_MODE.key())));
+
assertTrue(!tablePropsToAdd.containsKey(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID));
+
+ if (!StringUtils.isNullOrEmpty(payloadClass)) {
+
assertFalse(tablePropsToAdd.containsKey(HoodieTableConfig.PAYLOAD_CLASS_NAME));
+ } else {
+ assertEquals(expectedPayloadClass,
tablePropsToAdd.get(HoodieTableConfig.PAYLOAD_CLASS_NAME));
+ }
+ }
+ }
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestSevenToEightUpgradeHandler.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestSevenToEightUpgradeHandler.java
index d6d3cb4998c..bcc9d897620 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestSevenToEightUpgradeHandler.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestSevenToEightUpgradeHandler.java
@@ -22,18 +22,18 @@ package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.bootstrap.index.hfile.HFileBootstrapIndex;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.RecordMergeMode;
-import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
-import org.apache.hudi.table.HoodieTable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.HashMap;
@@ -56,14 +56,7 @@ import static org.mockito.Mockito.isA;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
-public class TestSevenToEightUpgradeHandler {
-
- @Mock
- private HoodieTable table;
- @Mock
- private HoodieTableMetaClient metaClient;
- @Mock
- private HoodieEngineContext context;
+class TestSevenToEightUpgradeHandler {
@Mock
private HoodieWriteConfig config;
@Mock
@@ -122,4 +115,49 @@ public class TestSevenToEightUpgradeHandler {
assertTrue(tablePropsToAdd.containsKey(KEY_GENERATOR_CLASS_NAME));
assertTrue(tablePropsToAdd.containsKey(KEY_GENERATOR_TYPE));
}
+
+ @ParameterizedTest
+ @CsvSource({
+ // hard coding all merge strategy Ids since older version of hudi has
different variable name to represent the merge strategy id.
+ "com.example.CustomPayload, , CUSTOM, " +
"00000000-0000-0000-0000-000000000000" + ", com.example.CustomPayload",
+ "com.example.CustomPayload, preCombineFieldValue, CUSTOM, " +
"00000000-0000-0000-0000-000000000000" + ", com.example.CustomPayload",
+ "org.apache.hudi.metadata.HoodieMetadataPayload, , CUSTOM, " +
"00000000-0000-0000-0000-000000000000" + ",
org.apache.hudi.metadata.HoodieMetadataPayload",
+ "org.apache.hudi.metadata.HoodieMetadataPayload, preCombineFieldValue,
CUSTOM, " + "00000000-0000-0000-0000-000000000000" + ",
org.apache.hudi.metadata.HoodieMetadataPayload",
+ "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload, ,
COMMIT_TIME_ORDERING, " + "ce9acb64-bde0-424c-9b91-f6ebba25356d"
+ + ", org.apache.hudi.common.model.OverwriteWithLatestAvroPayload",
+ "org.apache.hudi.common.model.DefaultHoodieRecordPayload, ,
EVENT_TIME_ORDERING, " + "eeb8d96f-b1e4-49fd-bbf8-28ac514178e5"
+ + ", org.apache.hudi.common.model.DefaultHoodieRecordPayload",
+ ", preCombineFieldValue, EVENT_TIME_ORDERING, " +
"eeb8d96f-b1e4-49fd-bbf8-28ac514178e5" + ",
org.apache.hudi.common.model.DefaultHoodieRecordPayload",
+ "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload,
preCombineFieldValue, EVENT_TIME_ORDERING,"
+ + "eeb8d96f-b1e4-49fd-bbf8-28ac514178e5" + ",
org.apache.hudi.common.model.DefaultHoodieRecordPayload",
+ ", preCombineFieldValue, EVENT_TIME_ORDERING, " +
"eeb8d96f-b1e4-49fd-bbf8-28ac514178e5" + ",
org.apache.hudi.common.model.DefaultHoodieRecordPayload",
+ "org.apache.hudi.common.model.DefaultHoodieRecordPayload,
preCombineFieldValue, EVENT_TIME_ORDERING,"
+ + "eeb8d96f-b1e4-49fd-bbf8-28ac514178e5" + ",
org.apache.hudi.common.model.DefaultHoodieRecordPayload",
+ ", , COMMIT_TIME_ORDERING, " + "ce9acb64-bde0-424c-9b91-f6ebba25356d" +
", org.apache.hudi.common.model.OverwriteWithLatestAvroPayload"
+ })
+ void testUpgradeMergeMode(String payloadClass, String preCombineField,
String expectedMergeMode, String expectedStrategy, String expectedPayloadClass)
{
+ HoodieTableConfig tableConfig = Mockito.mock(HoodieTableConfig.class);
+ Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
+
+ when(tableConfig.getPayloadClass()).thenReturn(payloadClass);
+ when(tableConfig.getPreCombineField()).thenReturn(preCombineField);
+
+ SevenToEightUpgradeHandler.upgradeMergeMode(tableConfig, tablePropsToAdd);
+
+ assertEquals(expectedMergeMode,
tablePropsToAdd.get(HoodieTableConfig.RECORD_MERGE_MODE));
+ assertEquals(expectedStrategy,
tablePropsToAdd.get(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID));
+ if (expectedPayloadClass != null) {
+ assertEquals(expectedPayloadClass,
tablePropsToAdd.get(HoodieTableConfig.PAYLOAD_CLASS_NAME));
+ } else {
+
assertTrue(!tablePropsToAdd.containsKey(HoodieTableConfig.PAYLOAD_CLASS_NAME));
+ }
+ }
+
+ private static Map<ConfigProperty, String> createMap(Object... keyValues) {
+ Map<ConfigProperty, String> map = new HashMap<>();
+ for (int i = 0; i < keyValues.length; i += 2) {
+ map.put((ConfigProperty) keyValues[i], (String) keyValues[i + 1]);
+ }
+ return map;
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index 8f3da8cf31e..e815de43968 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -22,8 +22,8 @@ import org.apache.hudi.DataSourceOptionsHelper.allAlternatives
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.config.{DFSPropertiesConfiguration,
HoodieCommonConfig, HoodieConfig, TypedProperties}
import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE
-import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
-import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord,
OverwriteWithLatestAvroPayload, WriteOperationType}
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableVersion}
import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
import org.apache.hudi.config.HoodieWriteConfig.{RECORD_MERGE_MODE,
SPARK_SQL_MERGE_INTO_PREPPED_KEY}
import org.apache.hudi.exception.HoodieException
@@ -172,14 +172,45 @@ object HoodieWriterUtils {
|| key.equals(RECORD_MERGE_MODE.key())
|| key.equals(RECORD_MERGE_STRATEGY_ID.key())))
- //don't validate the payload only in the case that insert into is using
fallback to some legacy configs
- ignoreConfig = ignoreConfig || (key.equals(PAYLOAD_CLASS_NAME.key()) &&
value.equals(VALIDATE_DUPLICATE_KEY_PAYLOAD_CLASS_NAME))
+ ignoreConfig = ignoreConfig || (key.equals(PAYLOAD_CLASS_NAME.key()) &&
shouldIgnorePayloadValidation(value, params, tableConfig))
// If hoodie.database.name is empty, ignore validation.
ignoreConfig = ignoreConfig ||
(key.equals(HoodieTableConfig.DATABASE_NAME.key()) &&
isNullOrEmpty(getStringFromTableConfigWithAlternatives(tableConfig, key)))
-
ignoreConfig
}
+ def shouldIgnorePayloadValidation(value: String, params: Map[String,
String], tableConfig: HoodieConfig): Boolean = {
+ //don't validate the payload only in the case that insert into is using
fallback to some legacy configs
+ val ignoreConfig = value.equals(VALIDATE_DUPLICATE_KEY_PAYLOAD_CLASS_NAME)
+ if (ignoreConfig) {
+ ignoreConfig
+ } else {
+ if (tableConfig == null) {
+ true
+ } else {
+ // In table version 8, if table Config payload refers to
DefaultHoodieRecordPayload and if initial table version is 6, payload class
config
+ // writer props are allowed to be OverwriteWithLatest
+ val tableVersion = if
(tableConfig.contains(HoodieTableConfig.VERSION.key())) {
+
HoodieTableVersion.fromVersionCode(tableConfig.getInt(HoodieTableConfig.VERSION))
+ } else {
+ HoodieTableVersion.current()
+ }
+ val initTableVersion = if
(tableConfig.contains(HoodieTableConfig.INITIAL_VERSION.key())) {
+
HoodieTableVersion.fromVersionCode(tableConfig.getInt(HoodieTableConfig.INITIAL_VERSION))
+ } else {
+ HoodieTableVersion.current()
+ }
+
+ if (tableVersion == HoodieTableVersion.EIGHT &&
initTableVersion.lesserThan(HoodieTableVersion.EIGHT)
+ && value.equals(classOf[OverwriteWithLatestAvroPayload].getName)
+ &&
tableConfig.getString(HoodieTableConfig.PAYLOAD_CLASS_NAME.key()).equals(classOf[DefaultHoodieRecordPayload].getName))
{
+ true
+ } else {
+ ignoreConfig
+ }
+ }
+ }
+ }
+
def validateTableConfig(spark: SparkSession, params: Map[String, String],
tableConfig: HoodieConfig): Unit = {
validateTableConfig(spark, params, tableConfig, false)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
index 3595850841e..32f08888ae0 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
@@ -25,7 +25,7 @@ import org.apache.hudi.common.config.{HoodieMetadataConfig,
RecordMergeMode, Typ
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload,
HoodieRecordMerger, HoodieRecordPayload, HoodieTableType,
OverwriteWithLatestAvroPayload, TableServiceType}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
HoodieTableVersion}
import
org.apache.hudi.common.table.timeline.InstantComparison.{compareTimestamps,
GREATER_THAN_OR_EQUALS}
-import org.apache.hudi.common.util.Option
+import org.apache.hudi.common.util.{Option, StringUtils}
import org.apache.hudi.config.{HoodieCleanConfig, HoodieCompactionConfig,
HoodieLockConfig, HoodieWriteConfig}
import org.apache.hudi.keygen.NonpartitionedKeyGenerator
import org.apache.hudi.keygen.constant.KeyGeneratorType
@@ -101,13 +101,20 @@ class TestSevenToEightUpgrade extends
RecordLevelIndexTestBase {
assertEquals(partitionFields,
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).get())
// After upgrade, based on the payload and table type, the merge mode is
updated accordingly.
- if (HoodieTableType.COPY_ON_WRITE == tableType) {
+ if (metaClient.getTableConfig.getTableType ==
HoodieTableType.COPY_ON_WRITE) {
assertEquals(classOf[OverwriteWithLatestAvroPayload].getName,
metaClient.getTableConfig.getPayloadClass)
assertEquals(RecordMergeMode.COMMIT_TIME_ORDERING.name,
metaClient.getTableConfig.getRecordMergeMode.name)
assertEquals(HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
metaClient.getTableConfig.getRecordMergeStrategyId)
} else {
- assertEquals(classOf[DefaultHoodieRecordPayload].getName,
metaClient.getTableConfig.getPayloadClass)
- assertEquals(RecordMergeMode.EVENT_TIME_ORDERING.name,
metaClient.getTableConfig.getRecordMergeMode.name)
+ if
(StringUtils.isNullOrEmpty(metaClient.getTableConfig.getPreCombineField)) {
+ assertEquals(classOf[OverwriteWithLatestAvroPayload].getName,
metaClient.getTableConfig.getPayloadClass)
+ assertEquals(RecordMergeMode.COMMIT_TIME_ORDERING.name,
metaClient.getTableConfig.getRecordMergeMode.name)
+ assertEquals(HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
metaClient.getTableConfig.getRecordMergeStrategyId)
+ } else {
+ assertEquals(classOf[DefaultHoodieRecordPayload].getName,
metaClient.getTableConfig.getPayloadClass)
+ assertEquals(RecordMergeMode.EVENT_TIME_ORDERING.name,
metaClient.getTableConfig.getRecordMergeMode.name)
+ assertEquals(HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
metaClient.getTableConfig.getRecordMergeStrategyId)
+ }
}
}