This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new db8eb8c16ac [HUDI-8942] Validate merge mode based on payload class and
merger strategy only for table versions 8 or higher (#12743)
db8eb8c16ac is described below
commit db8eb8c16ac2cf62e7bf0a78dd8a00ea0ebe6030
Author: Sagar Sumit <[email protected]>
AuthorDate: Fri Jan 31 08:34:21 2025 +0530
[HUDI-8942] Validate merge mode based on payload class and merger strategy
only for table versions 8 or higher (#12743)
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../upgrade/EightToSevenDowngradeHandler.java | 3 +-
.../hudi/common/table/HoodieTableConfig.java | 12 +-
.../hudi/common/table/HoodieTableMetaClient.java | 14 +-
.../common/table/read/HoodieFileGroupReader.java | 2 +-
.../hudi/common/table/TestHoodieTableConfig.java | 168 ++++++++++++---------
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 14 +-
.../org/apache/hudi/util/SparkConfigUtils.scala | 2 +-
.../functional/TestHoodieBackedMetadata.java | 6 +-
.../hudi/utilities/streamer/HoodieStreamer.java | 4 +-
.../deltastreamer/HoodieDeltaStreamerTestBase.java | 3 +-
10 files changed, 135 insertions(+), 93 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
index 4980fcec4f7..52bbc830238 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
@@ -168,7 +168,8 @@ public class EightToSevenDowngradeHandler implements
DowngradeHandler {
Triple<RecordMergeMode, String, String> mergingConfigs =
HoodieTableConfig.inferCorrectMergingBehavior(
tableConfig.getRecordMergeMode(), tableConfig.getPayloadClass(),
- tableConfig.getRecordMergeStrategyId(),
tableConfig.getPreCombineField());
+ tableConfig.getRecordMergeStrategyId(),
tableConfig.getPreCombineField(),
+ tableConfig.getTableVersion());
if (StringUtils.nonEmpty(mergingConfigs.getMiddle())) {
tablePropsToAdd.put(HoodieTableConfig.PAYLOAD_CLASS_NAME,
mergingConfigs.getMiddle());
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 1b85a1707ae..c7f948e5d24 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -760,7 +760,8 @@ public class HoodieTableConfig extends HoodieConfig {
public static Triple<RecordMergeMode, String, String>
inferCorrectMergingBehavior(RecordMergeMode recordMergeMode,
String payloadClassName,
String recordMergeStrategyId,
-
String orderingFieldName) {
+
String orderingFieldName,
+
HoodieTableVersion tableVersion) {
RecordMergeMode inferredRecordMergeMode;
String inferredPayloadClassName;
String inferredRecordMergeStrategyId;
@@ -781,13 +782,18 @@ public class HoodieTableConfig extends HoodieConfig {
+ "strategy ID (%s).", payloadClassName, recordMergeStrategyId));
// TODO(HUDI-8925): once payload class name is not required, remove the
check on
// modeBasedOnStrategyId
- if (modeBasedOnStrategyId != CUSTOM && modeBasedOnPayload != null &&
modeBasedOnStrategyId != null) {
+ if (tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)
+ && modeBasedOnStrategyId != CUSTOM && modeBasedOnPayload != null &&
modeBasedOnStrategyId != null) {
checkArgument(modeBasedOnPayload.equals(modeBasedOnStrategyId),
String.format("Configured payload class (%s) and record merge
strategy ID (%s) conflict "
+ "with each other. Please only set one of them in the
write config.",
payloadClassName, recordMergeStrategyId));
}
- inferredRecordMergeMode = modeBasedOnStrategyId != null ?
modeBasedOnStrategyId : modeBasedOnPayload;
+ if (tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
+ inferredRecordMergeMode = modeBasedOnStrategyId != null ?
modeBasedOnStrategyId : modeBasedOnPayload;
+ } else {
+ inferredRecordMergeMode = modeBasedOnPayload != null ?
modeBasedOnPayload : modeBasedOnStrategyId;
+ }
}
if (recordMergeMode != null) {
checkArgument(inferredRecordMergeMode == recordMergeMode,
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 25ad815f600..b43c92cb385 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -1384,17 +1384,17 @@ public class HoodieTableMetaClient implements
Serializable {
tableConfig.setValue(HoodieTableConfig.NAME, tableName);
tableConfig.setValue(HoodieTableConfig.TYPE, tableType.name());
- if (null != tableVersion) {
- tableConfig.setTableVersion(tableVersion);
- tableConfig.setInitialVersion(tableVersion);
- } else {
- tableConfig.setTableVersion(HoodieTableVersion.current());
- tableConfig.setInitialVersion(HoodieTableVersion.current());
+ if (null == tableVersion) {
+ tableVersion = HoodieTableVersion.current();
}
+ tableConfig.setTableVersion(tableVersion);
+ tableConfig.setInitialVersion(tableVersion);
+
Triple<RecordMergeMode, String, String> mergeConfigs =
HoodieTableConfig.inferCorrectMergingBehavior(
- recordMergeMode, payloadClassName, recordMergerStrategyId,
preCombineField);
+ recordMergeMode, payloadClassName, recordMergerStrategyId,
preCombineField,
+ tableVersion);
tableConfig.setValue(RECORD_MERGE_MODE, mergeConfigs.getLeft().name());
tableConfig.setValue(PAYLOAD_CLASS_NAME.key(), mergeConfigs.getMiddle());
tableConfig.setValue(RECORD_MERGE_STRATEGY_ID, mergeConfigs.getRight());
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index e77bded8a77..3d6c23182c7 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -109,7 +109,7 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
if
(!tableConfig.getTableVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
Triple<RecordMergeMode, String, String> triple =
HoodieTableConfig.inferCorrectMergingBehavior(
recordMergeMode, tableConfig.getPayloadClass(),
- mergeStrategyId, null);
+ mergeStrategyId, null, tableConfig.getTableVersion());
recordMergeMode = triple.getLeft();
mergeStrategyId = triple.getRight();
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
index 3dacf596258..fbc778150cb 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
@@ -60,8 +60,10 @@ import static
org.apache.hudi.common.config.RecordMergeMode.EVENT_TIME_ORDERING;
import static
org.apache.hudi.common.model.HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID;
import static
org.apache.hudi.common.model.HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
import static
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
+import static
org.apache.hudi.common.model.HoodieRecordMerger.getRecordMergeStrategyId;
import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_CHECKSUM;
+import static
org.apache.hudi.common.table.HoodieTableConfig.inferRecordMergeModeFromPayloadClass;
import static org.apache.hudi.common.util.ConfigUtils.recoverIfNeeded;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -305,133 +307,136 @@ public class TestHoodieTableConfig extends
HoodieCommonTestHarness {
Stream<Arguments> arguments = Stream.of(
//test empty args with both null and ""
arguments(null, null, null, null,
- false, COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ "false", COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
arguments(null, null, null, "",
- false, COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ "false", COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
arguments(null, null, null, orderingFieldName,
- false, EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+ "false", EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
arguments(null, "", "", null,
- false, COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ "false", COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
arguments(null, "", "", orderingFieldName,
- false, EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+ "false", EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
//test legal event time ordering combos
arguments(EVENT_TIME_ORDERING, null, null, null,
- false, EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+ "false", EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
arguments(EVENT_TIME_ORDERING, null, null, orderingFieldName,
- false, EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+ "false", EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
arguments(EVENT_TIME_ORDERING, defaultPayload, null, orderingFieldName,
- false, EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+ "false", EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
arguments(EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, orderingFieldName,
- false, EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+ "false", EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
arguments(EVENT_TIME_ORDERING, null,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, orderingFieldName,
- false, EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+ "false", EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
arguments(null, defaultPayload, null, null,
- false, EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+ "false", EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
arguments(null, defaultPayload, null, orderingFieldName,
- false, EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+ "false", EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
arguments(null, defaultPayload, EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
orderingFieldName,
- false, EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+ "false", EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
arguments(null, null, EVENT_TIME_BASED_MERGE_STRATEGY_UUID, null,
- false, EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+ "false", EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
arguments(null, null, EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
orderingFieldName,
- false, EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
+ "false", EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
//test legal commit time ordering combos
arguments(COMMIT_TIME_ORDERING, null, null, null,
- false, COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ "false", COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
arguments(COMMIT_TIME_ORDERING, null, null, "",
- false, COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ "false", COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
arguments(COMMIT_TIME_ORDERING, null, null, orderingFieldName,
- false, COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ "false", COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
arguments(COMMIT_TIME_ORDERING, overwritePayload, null, null,
- false, COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ "false", COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
arguments(COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, null,
- false, COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ "false", COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
arguments(COMMIT_TIME_ORDERING, null,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, null,
- false, COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ "false", COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
arguments(null, overwritePayload, null, null,
- false, COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ "false", COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
arguments(null, overwritePayload, null, "",
- false, COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ "false", COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
arguments(null, overwritePayload, null, orderingFieldName,
- false, COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ "false", COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
arguments(null, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, null,
- false, COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ "false", COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
arguments(null, null, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, null,
- false, COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ "false", COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
arguments(null, null, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
orderingFieldName,
- false, COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ "false", COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
//test legal custom merge mode combos
arguments(CUSTOM, customPayload, null, null,
- false, CUSTOM, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID),
+ "false", CUSTOM, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID),
arguments(CUSTOM, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID,
null,
- false, CUSTOM, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID),
+ "false", CUSTOM, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID),
arguments(null, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID, null,
- false, CUSTOM, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID),
+ "false", CUSTOM, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID),
arguments(null, customPayload, null, null,
- false, CUSTOM, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID),
+ "false", CUSTOM, customPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID),
arguments(CUSTOM, null, customStrategy, null,
- false, CUSTOM, defaultPayload, customStrategy),
+ "false", CUSTOM, defaultPayload, customStrategy),
arguments(CUSTOM, customPayload, customStrategy, null,
- false, CUSTOM, customPayload, customStrategy),
+ "false", CUSTOM, customPayload, customStrategy),
//test legal configs that work but should not be used usually
arguments(CUSTOM, defaultPayload, customStrategy, null,
- false, CUSTOM, defaultPayload, customStrategy),
+ "six-only", CUSTOM, defaultPayload, customStrategy),
arguments(CUSTOM, defaultPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID,
null,
- false, CUSTOM, defaultPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID),
+ "six-only", CUSTOM, defaultPayload,
PAYLOAD_BASED_MERGE_STRATEGY_UUID),
arguments(CUSTOM, overwritePayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID,
null,
- false, CUSTOM, overwritePayload,
PAYLOAD_BASED_MERGE_STRATEGY_UUID),
+ "six-only", CUSTOM, overwritePayload,
PAYLOAD_BASED_MERGE_STRATEGY_UUID),
arguments(null, defaultPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID,
null,
- false, CUSTOM, defaultPayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID),
+ "false", null, defaultPayload, null),
arguments(null, overwritePayload, PAYLOAD_BASED_MERGE_STRATEGY_UUID,
null,
- false, CUSTOM, overwritePayload,
PAYLOAD_BASED_MERGE_STRATEGY_UUID),
+ "false", null, overwritePayload, null),
//test illegal combos due to missing info
arguments(CUSTOM, null, null, null,
- true, null, null, null),
+ "true", null, null, null),
arguments(CUSTOM, null, PAYLOAD_BASED_MERGE_STRATEGY_UUID, null,
- true, null, null, null),
+ "true", null, null, null),
//test illegal combos
arguments(EVENT_TIME_ORDERING, overwritePayload, null,
orderingFieldName,
- true, null, null, null),
+ "true", null, null, null),
arguments(EVENT_TIME_ORDERING, customPayload, null, orderingFieldName,
- true, null, null, null),
+ "true", null, null, null),
arguments(EVENT_TIME_ORDERING, null,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, orderingFieldName,
- true, null, null, null),
+ "true", null, null, null),
arguments(EVENT_TIME_ORDERING, null, customStrategy, orderingFieldName,
- true, null, null, null),
+ "true", null, null, null),
arguments(EVENT_TIME_ORDERING, null,
PAYLOAD_BASED_MERGE_STRATEGY_UUID, orderingFieldName,
- true, null, null, null),
+ "true", null, null, null),
arguments(COMMIT_TIME_ORDERING, defaultPayload, null, null,
- true, null, null, null),
+ "true", null, null, null),
arguments(COMMIT_TIME_ORDERING, customPayload, null, null,
- true, null, null, null),
+ "true", null, null, null),
arguments(COMMIT_TIME_ORDERING, null,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, null,
- true, null, null, null),
+ "true", null, null, null),
arguments(COMMIT_TIME_ORDERING, null, customStrategy, null,
- true, null, null, null),
+ "true", null, null, null),
arguments(COMMIT_TIME_ORDERING, null,
PAYLOAD_BASED_MERGE_STRATEGY_UUID, null,
- true, null, null, null),
+ "true", null, null, null),
arguments(CUSTOM, defaultPayload, null, null,
- true, null, null, null),
+ "true", null, null, null),
arguments(CUSTOM, overwritePayload, null, null,
- true, null, null, null),
+ "true", null, null, null),
arguments(CUSTOM, null, EVENT_TIME_BASED_MERGE_STRATEGY_UUID, null,
- true, null, null, null),
+ "true", null, null, null),
arguments(CUSTOM, null, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, null,
- true, null, null, null),
+ "true", null, null, null),
arguments(CUSTOM, defaultPayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, null,
- true, null, null, null),
+ "true", null, null, null),
arguments(CUSTOM, overwritePayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, null,
- true, null, null, null),
+ "true", null, null, null),
+
+ // dimensions that should pass validation on table version 6, not
table version 8
arguments(null, defaultPayload, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
null,
- true, null, null, null),
+ "eight-only", EVENT_TIME_ORDERING, defaultPayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
arguments(null, overwritePayload,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, null,
- true, null, null, null));
+ "eight-only", COMMIT_TIME_ORDERING, overwritePayload,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID)
+ );
return arguments;
}
@@ -439,19 +444,38 @@ public class TestHoodieTableConfig extends
HoodieCommonTestHarness {
@MethodSource("argumentsForInferringRecordMergeMode")
public void testInferMergeMode(RecordMergeMode inputMergeMode, String
inputPayloadClass,
String inputMergeStrategy, String
orderingFieldName,
- boolean shouldThrow, RecordMergeMode
outputMergeMode,
- String outputPayloadClass, String
outputMergeStrategy) {
- if (shouldThrow) {
- assertThrows(IllegalArgumentException.class,
- () -> HoodieTableConfig.inferCorrectMergingBehavior(
- inputMergeMode, inputPayloadClass, inputMergeStrategy,
orderingFieldName));
- } else {
- Triple<RecordMergeMode, String, String> inferredConfigs =
- HoodieTableConfig.inferCorrectMergingBehavior(
- inputMergeMode, inputPayloadClass, inputMergeStrategy,
orderingFieldName);
- assertEquals(outputMergeMode, inferredConfigs.getLeft());
- assertEquals(outputPayloadClass, inferredConfigs.getMiddle());
- assertEquals(outputMergeStrategy, inferredConfigs.getRight());
- }
+ String shouldThrowString, RecordMergeMode
outputMergeMode,
+ String outputPayloadClass, String
outputMergeStrategy) throws IOException {
+ Arrays.stream(new HoodieTableVersion[] {HoodieTableVersion.EIGHT,
HoodieTableVersion.SIX})
+ .forEach(tableVersion -> {
+ boolean shouldThrow = "eight-only".equals(shouldThrowString)
+ ? tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)
+ : "six-only".equals(shouldThrowString)
+ ? !tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)
+ : Boolean.parseBoolean(shouldThrowString);
+ RecordMergeMode expectedMergeMode = outputMergeMode;
+ String expectedMergeStrategy = outputMergeStrategy;
+ if (!shouldThrow && outputMergeMode == null) {
+ expectedMergeMode =
tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)
+ ? CUSTOM :
inferRecordMergeModeFromPayloadClass(outputPayloadClass);
+ expectedMergeStrategy =
tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)
+ ? PAYLOAD_BASED_MERGE_STRATEGY_UUID
+ : getRecordMergeStrategyId(expectedMergeMode,
outputPayloadClass, null);
+ }
+ if (shouldThrow) {
+ assertThrows(IllegalArgumentException.class,
+ () -> HoodieTableConfig.inferCorrectMergingBehavior(
+ inputMergeMode, inputPayloadClass, inputMergeStrategy,
orderingFieldName,
+ tableVersion));
+ } else {
+ Triple<RecordMergeMode, String, String> inferredConfigs =
+ HoodieTableConfig.inferCorrectMergingBehavior(
+ inputMergeMode, inputPayloadClass, inputMergeStrategy,
orderingFieldName,
+ tableVersion);
+ assertEquals(expectedMergeMode, inferredConfigs.getLeft());
+ assertEquals(outputPayloadClass, inferredConfigs.getMiddle());
+ assertEquals(expectedMergeStrategy, inferredConfigs.getRight());
+ }
+ });
}
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 4a64319752f..31d87b8f6c7 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -37,7 +37,7 @@ import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model._
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE,
MERGE_ON_READ}
-import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
TableSchemaResolver}
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
HoodieTableVersion, TableSchemaResolver}
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator
import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option =>
HOption}
@@ -45,7 +45,6 @@ import
org.apache.hudi.common.util.ConfigUtils.getAllConfigKeys
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieInternalConfig,
HoodieWriteConfig}
import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH,
INDEX_CLASS_NAME}
import
org.apache.hudi.config.HoodieWriteConfig.{SPARK_SQL_MERGE_INTO_PREPPED_KEY,
WRITE_TABLE_VERSION}
-import org.apache.hudi.config.{HoodieCompactionConfig, HoodieInternalConfig,
HoodieWriteConfig}
import org.apache.hudi.exception.{HoodieException,
HoodieRecordCreationException, HoodieWriteConflictException}
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool}
@@ -61,8 +60,8 @@ import org.apache.hudi.storage.HoodieStorage
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.sync.common.util.SyncUtilHelpers
import
org.apache.hudi.sync.common.util.SyncUtilHelpers.getHoodieMetaSyncException
+import org.apache.hudi.util.{SparkConfigUtils, SparkKeyGenUtils}
import org.apache.hudi.util.SparkConfigUtils.getStringWithAltKeys
-import org.apache.hudi.util.SparkKeyGenUtils
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
@@ -1126,6 +1125,12 @@ class HoodieSparkSqlWriterInternal {
mergedParams.put(HoodieTableConfig.DROP_PARTITION_COLUMNS.key, "false")
}
+ val tableVersion = if (tableConfig != null) {
+ tableConfig.getTableVersion
+ } else {
+ HoodieTableVersion.fromVersionCode(
+ SparkConfigUtils.getStringWithAltKeys(mergedParams,
WRITE_TABLE_VERSION).toInt)
+ }
if (!mergedParams.contains(DataSourceWriteOptions.RECORD_MERGE_MODE.key())
||
!mergedParams.contains(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key())
||
!mergedParams.contains(DataSourceWriteOptions.RECORD_MERGE_STRATEGY_ID.key())) {
@@ -1133,7 +1138,8 @@ class HoodieSparkSqlWriterInternal {
RecordMergeMode.getValue(mergedParams.getOrElse(DataSourceWriteOptions.RECORD_MERGE_MODE.key(),
null)),
mergedParams.getOrElse(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key(), ""),
mergedParams.getOrElse(DataSourceWriteOptions.RECORD_MERGE_STRATEGY_ID.key(),
""),
- optParams.getOrElse(PRECOMBINE_FIELD.key(), null))
+ optParams.getOrElse(PRECOMBINE_FIELD.key(), null),
+ tableVersion)
mergedParams.put(DataSourceWriteOptions.RECORD_MERGE_MODE.key(),
inferredMergeConfigs.getLeft.name())
mergedParams.put(HoodieTableConfig.RECORD_MERGE_MODE.key(),
inferredMergeConfigs.getLeft.name())
mergedParams.put(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key(),
inferredMergeConfigs.getMiddle)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/SparkConfigUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/SparkConfigUtils.scala
index 107a3968c82..a795ec4caf8 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/SparkConfigUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/util/SparkConfigUtils.scala
@@ -32,7 +32,7 @@ object SparkConfigUtils {
* @return String value if the config exists; default String value if the
config does not exist
* and there is default value defined in the {@link ConfigProperty}
config; {@code null} otherwise.
*/
- def getStringWithAltKeys[T](props: Map[String, String], configProperty:
ConfigProperty[T]): String = {
+ def getStringWithAltKeys[T](props: scala.collection.Map[String, String],
configProperty: ConfigProperty[T]): String = {
ConfigUtils.getStringWithAltKeys(JFunction.toJavaFunction[String,
Object](key => props.getOrElse(key, null)), configProperty)
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index aac384b8192..73e104db8d4 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -411,7 +411,8 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
Triple<RecordMergeMode, String, String> inferredMergeConfs =
HoodieTableConfig.inferCorrectMergingBehavior(
writeConfig.getRecordMergeMode(), writeConfig.getPayloadClass(),
- writeConfig.getRecordMergeStrategyId(),
writeConfig.getPreCombineField());
+ writeConfig.getRecordMergeStrategyId(),
writeConfig.getPreCombineField(),
+ metaClient.getTableConfig().getTableVersion());
HoodieTableConfig hoodieTableConfig =
new HoodieTableConfig(this.storage, metaClient.getMetaPath(),
inferredMergeConfs.getLeft(), inferredMergeConfs.getMiddle(),
inferredMergeConfs.getRight());
assertFalse(hoodieTableConfig.getMetadataPartitions().isEmpty());
@@ -433,7 +434,8 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
Triple<RecordMergeMode, String, String> inferredMergeConfs2 =
HoodieTableConfig.inferCorrectMergingBehavior(
writeConfig2.getRecordMergeMode(), writeConfig2.getPayloadClass(),
- writeConfig2.getRecordMergeStrategyId(),
writeConfig2.getPreCombineField());
+ writeConfig2.getRecordMergeStrategyId(),
writeConfig2.getPreCombineField(),
+ metaClient.getTableConfig().getTableVersion());
HoodieTableConfig hoodieTableConfig2 =
new HoodieTableConfig(this.storage, metaClient.getMetaPath(),
inferredMergeConfs2.getLeft(),
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
index cfe01dfde68..db4f6061f20 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
@@ -39,6 +39,7 @@ import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CompactionUtils;
@@ -154,7 +155,8 @@ public class HoodieStreamer implements Serializable {
Option<TypedProperties> propsOverride,
Option<SourceProfileSupplier> sourceProfileSupplier) throws IOException {
Triple<RecordMergeMode, String, String> mergingConfigs =
HoodieTableConfig.inferCorrectMergingBehavior(
- cfg.recordMergeMode, cfg.payloadClassName,
cfg.recordMergeStrategyId, cfg.sourceOrderingField);
+ cfg.recordMergeMode, cfg.payloadClassName,
cfg.recordMergeStrategyId, cfg.sourceOrderingField,
+ HoodieTableVersion.current());
cfg.recordMergeMode = mergingConfigs.getLeft();
cfg.payloadClassName = mergingConfigs.getMiddle();
cfg.recordMergeStrategyId = mergingConfigs.getRight();
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index b6ed6139547..004e29cd1fb 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -623,7 +623,8 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
cfg.allowCommitOnNoCheckpointChange = allowCommitOnNoCheckpointChange;
Triple<RecordMergeMode, String, String> mergeCfgs =
HoodieTableConfig.inferCorrectMergingBehavior(
- cfg.recordMergeMode, cfg.payloadClassName,
cfg.recordMergeStrategyId, cfg.sourceOrderingField);
+ cfg.recordMergeMode, cfg.payloadClassName,
cfg.recordMergeStrategyId, cfg.sourceOrderingField,
+ HoodieTableVersion.current());
cfg.recordMergeMode = mergeCfgs.getLeft();
cfg.payloadClassName = mergeCfgs.getMiddle();
cfg.recordMergeStrategyId = mergeCfgs.getRight();