This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 21b8075f64eb refactor: Address follow up issues for payload
deprecation (#13950)
21b8075f64eb is described below
commit 21b8075f64ebc7f870936803853cf291a97c463e
Author: Lin Liu <[email protected]>
AuthorDate: Wed Oct 1 09:52:19 2025 -0700
refactor: Address follow up issues for payload deprecation (#13950)
---
.../client/bootstrap/BootstrapRecordPayload.java | 51 ----------
.../table/upgrade/EightToNineUpgradeHandler.java | 21 ++--
.../table/upgrade/NineToEightDowngradeHandler.java | 14 ++-
.../upgrade/TestEightToNineUpgradeHandler.java | 23 +++--
.../bootstrap/OrcBootstrapMetadataHandler.java | 6 +-
.../bootstrap/ParquetBootstrapMetadataHandler.java | 6 +-
.../hudi/common/table/HoodieTableConfig.java | 112 ++++++++++++++-------
.../apache/hudi/common/util/HoodieRecordUtils.java | 2 +-
.../org/apache/hudi/util/FlinkWriteClients.java | 6 +-
.../java/org/apache/hudi/util/StreamerUtil.java | 3 +-
.../test/java/org/apache/hudi/utils/TestData.java | 3 +-
.../apache/hudi/utils/TestFlinkWriteClients.java | 8 +-
.../org/apache/hudi/utils/TestStreamerUtil.java | 12 +--
.../hudi/common/table/TestHoodieTableConfig.java | 28 +++---
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 12 ++-
.../scala/org/apache/hudi/HoodieWriterUtils.scala | 15 +++
.../hudi/table/upgrade/TestUpgradeDowngrade.java | 51 +++++-----
.../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 20 +++-
.../TestHoodieSparkSqlWriterWithTestFormat.scala | 11 +-
.../functional/TestPayloadDeprecationFlow.scala | 48 +++------
.../hudi/functional/TestSparkDataSource.scala | 52 ++++++----
.../sql/hudi/dml/insert/TestInsertTable.scala | 5 +-
.../others/TestMergeModeCommitTimeOrdering.scala | 29 ++++--
.../others/TestMergeModeEventTimeOrdering.scala | 38 ++++---
.../sql/hudi/procedure/TestRepairsProcedure.scala | 1 -
.../apache/hudi/utilities/streamer/StreamSync.java | 18 ++--
.../deltastreamer/TestHoodieDeltaStreamer.java | 1 -
27 files changed, 326 insertions(+), 270 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/BootstrapRecordPayload.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/BootstrapRecordPayload.java
deleted file mode 100644
index a60a0d39f7c5..000000000000
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/BootstrapRecordPayload.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.client.bootstrap;
-
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.Option;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-
-public class BootstrapRecordPayload implements
HoodieRecordPayload<BootstrapRecordPayload> {
-
- private final GenericRecord record;
-
- public BootstrapRecordPayload(GenericRecord record) {
- this.record = record;
- }
-
- @Override
- public BootstrapRecordPayload preCombine(BootstrapRecordPayload oldValue) {
- return this;
- }
-
- @Override
- public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord
currentValue, Schema schema) {
- return Option.ofNullable(record);
- }
-
- @Override
- public Option<IndexedRecord> getInsertValue(Schema schema) {
- return Option.ofNullable(record);
- }
-
-}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java
index d5d731d51833..788a6deb7171 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java
@@ -127,7 +127,7 @@ public class EightToNineUpgradeHandler implements
UpgradeHandler {
metaClient.getTableConfig().getTableVersion());
}
// Handle merge mode config.
- reconcileMergeModeConfig(tablePropsToAdd, tableConfig);
+ reconcileMergeModeConfig(tablePropsToAdd, tablePropsToRemove, tableConfig);
// Handle partial update mode config.
reconcilePartialUpdateModeConfig(tablePropsToAdd, tableConfig);
// Handle merge properties config.
@@ -140,12 +140,18 @@ public class EightToNineUpgradeHandler implements
UpgradeHandler {
}
private void reconcileMergeModeConfig(Map<ConfigProperty, String>
tablePropsToAdd,
+ Set<ConfigProperty> tablePropsToRemove,
HoodieTableConfig tableConfig) {
String payloadClass = tableConfig.getPayloadClass();
- String mergeStrategy = tableConfig.getRecordMergeStrategyId();
- if (!BUILTIN_MERGE_STRATEGIES.contains(mergeStrategy) ||
StringUtils.isNullOrEmpty(payloadClass)) {
+ RecordMergeMode mergeMode = tableConfig.getRecordMergeMode();
+ if (mergeMode != RecordMergeMode.CUSTOM) {
+ // For commit time or event time based table, remove merge strategy id.
+ tablePropsToRemove.add(RECORD_MERGE_STRATEGY_ID);
+ } else if (StringUtils.isNullOrEmpty(payloadClass)) {
+ // For table using custom merger, keep their configs.
return;
}
+
if (PAYLOADS_MAPPED_TO_COMMIT_TIME_MERGE_MODE.contains(payloadClass)) {
tablePropsToAdd.put(RECORD_MERGE_MODE,
RecordMergeMode.COMMIT_TIME_ORDERING.name());
tablePropsToAdd.put(RECORD_MERGE_STRATEGY_ID,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID);
@@ -160,8 +166,7 @@ public class EightToNineUpgradeHandler implements
UpgradeHandler {
Set<ConfigProperty>
tablePropsToRemove,
HoodieTableConfig tableConfig) {
String payloadClass = tableConfig.getPayloadClass();
- String mergeStrategy = tableConfig.getRecordMergeStrategyId();
- if (!BUILTIN_MERGE_STRATEGIES.contains(mergeStrategy) ||
StringUtils.isNullOrEmpty(payloadClass)) {
+ if (StringUtils.isNullOrEmpty(payloadClass)) {
return;
}
if (PAYLOAD_CLASSES_TO_HANDLE.contains(payloadClass)) {
@@ -173,8 +178,7 @@ public class EightToNineUpgradeHandler implements
UpgradeHandler {
private void reconcilePartialUpdateModeConfig(Map<ConfigProperty, String>
tablePropsToAdd,
HoodieTableConfig tableConfig)
{
String payloadClass = tableConfig.getPayloadClass();
- String mergeStrategy = tableConfig.getRecordMergeStrategyId();
- if (!BUILTIN_MERGE_STRATEGIES.contains(mergeStrategy) ||
StringUtils.isNullOrEmpty(payloadClass)) {
+ if (StringUtils.isNullOrEmpty(payloadClass)) {
return;
}
if
(payloadClass.equals(OverwriteNonDefaultsWithLatestAvroPayload.class.getName())
@@ -187,8 +191,7 @@ public class EightToNineUpgradeHandler implements
UpgradeHandler {
private void reconcileMergePropertiesConfig(Map<ConfigProperty, String>
tablePropsToAdd, HoodieTableConfig tableConfig, HoodieWriteConfig writeConfig) {
String payloadClass = tableConfig.getPayloadClass();
- String mergeStrategy = tableConfig.getRecordMergeStrategyId();
- if (!BUILTIN_MERGE_STRATEGIES.contains(mergeStrategy) ||
StringUtils.isNullOrEmpty(payloadClass)) {
+ if (StringUtils.isNullOrEmpty(payloadClass)) {
return;
}
if (payloadClass.equals(AWSDmsAvroPayload.class.getName())) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/NineToEightDowngradeHandler.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/NineToEightDowngradeHandler.java
index aa62e1a20a16..5d547f16e7dd 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/NineToEightDowngradeHandler.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/NineToEightDowngradeHandler.java
@@ -44,6 +44,8 @@ import java.util.Set;
import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static
org.apache.hudi.common.model.HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID;
+import static
org.apache.hudi.common.model.HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
import static
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
import static
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
@@ -100,9 +102,17 @@ public class NineToEightDowngradeHandler implements
DowngradeHandler {
HoodieTableConfig tableConfig) {
// Update table properties.
propertiesToRemove.add(PARTIAL_UPDATE_MODE);
- // For specified payload classes, add strategy id and custom merge mode.
String legacyPayloadClass = tableConfig.getLegacyPayloadClass();
- if (!StringUtils.isNullOrEmpty(legacyPayloadClass) &&
(PAYLOAD_CLASSES_TO_HANDLE.contains(legacyPayloadClass))) {
+ // For tables with commit time or event time mode, add strategy id.
+ RecordMergeMode mergeMode = tableConfig.getRecordMergeMode();
+ if (mergeMode == RecordMergeMode.EVENT_TIME_ORDERING) {
+ propertiesToAdd.put(RECORD_MERGE_STRATEGY_ID,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID);
+ } else if (mergeMode == RecordMergeMode.COMMIT_TIME_ORDERING) {
+ propertiesToAdd.put(RECORD_MERGE_STRATEGY_ID,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID);
+ }
+ // For specified payload classes, add strategy id and custom merge mode.
+ if (!StringUtils.isNullOrEmpty(legacyPayloadClass)
+ && (PAYLOAD_CLASSES_TO_HANDLE.contains(legacyPayloadClass))) {
propertiesToRemove.add(LEGACY_PAYLOAD_CLASS_NAME);
propertiesToAdd.put(PAYLOAD_CLASS_NAME, legacyPayloadClass);
if
(!legacyPayloadClass.equals(OverwriteWithLatestAvroPayload.class.getName())
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java
index ef867d376f8f..eb885f8be5ad 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java
@@ -77,6 +77,7 @@ import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_UNAV
import static
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
import static
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX;
+import static
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
import static org.apache.hudi.common.table.PartialUpdateMode.FILL_UNAVAILABLE;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -277,7 +278,8 @@ class TestEightToNineUpgradeHandler {
handler.upgrade(config, context, INSTANT_TIME,
upgradeDowngradeHelper);
// Verify
assertEquals(DEFAULT_UPGRADE_RESULT.propertiesToUpdate(),
result.propertiesToUpdate());
- assertEquals(DEFAULT_UPGRADE_RESULT.propertiesToDelete(),
result.propertiesToDelete());
+ assertEquals(
+ Collections.singleton(RECORD_MERGE_STRATEGY_ID),
result.propertiesToDelete());
}
}
@@ -285,12 +287,13 @@ class TestEightToNineUpgradeHandler {
Set<ConfigProperty> propertiesToRemove,
String payloadClass) {
if (payloadClass.equals(MySqlDebeziumAvroPayload.class.getName()) ||
payloadClass.equals(PostgresDebeziumAvroPayload.class.getName())) {
- assertEquals(2, propertiesToRemove.size());
+ assertEquals(3, propertiesToRemove.size());
assertTrue(propertiesToRemove.contains(HoodieTableConfig.PRECOMBINE_FIELD));
} else {
- assertEquals(1, propertiesToRemove.size());
+ assertEquals(2, propertiesToRemove.size());
}
assertTrue(propertiesToRemove.contains(PAYLOAD_CLASS_NAME));
+ assertTrue(propertiesToRemove.contains(RECORD_MERGE_STRATEGY_ID));
assertTrue(propertiesToAdd.containsKey(LEGACY_PAYLOAD_CLASS_NAME));
assertEquals(
payloadClass,
@@ -342,7 +345,9 @@ class TestEightToNineUpgradeHandler {
// Verify
assertEquals(DEFAULT_UPGRADE_RESULT.propertiesToUpdate(),
result.propertiesToUpdate());
- assertEquals(DEFAULT_UPGRADE_RESULT.propertiesToDelete(),
result.propertiesToDelete());
+ assertEquals(
+ Collections.singleton(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID),
+ result.propertiesToDelete());
// Verify storage methods were called correctly
// Note: createFileInPath directly calls create() when contentWriter is
present
@@ -426,7 +431,8 @@ class TestEightToNineUpgradeHandler {
handler.upgrade(config, context, INSTANT_TIME,
upgradeDowngradeHelper);
// Verify
assertEquals(DEFAULT_UPGRADE_RESULT.propertiesToUpdate(),
result.propertiesToUpdate());
- assertEquals(DEFAULT_UPGRADE_RESULT.propertiesToDelete(),
result.propertiesToDelete());
+ assertEquals(
+ Collections.singleton(RECORD_MERGE_STRATEGY_ID),
result.propertiesToDelete());
}
}
@@ -451,7 +457,8 @@ class TestEightToNineUpgradeHandler {
handler.upgrade(config, context, INSTANT_TIME,
upgradeDowngradeHelper);
// Verify
assertEquals(DEFAULT_UPGRADE_RESULT.propertiesToUpdate(),
result.propertiesToUpdate());
- assertEquals(DEFAULT_UPGRADE_RESULT.propertiesToDelete(),
result.propertiesToDelete());
+ assertEquals(
+ Collections.singleton(RECORD_MERGE_STRATEGY_ID),
result.propertiesToDelete());
}
}
@@ -492,7 +499,9 @@ class TestEightToNineUpgradeHandler {
// Verify
assertEquals(DEFAULT_UPGRADE_RESULT.propertiesToUpdate(),
result.propertiesToUpdate());
- assertEquals(DEFAULT_UPGRADE_RESULT.propertiesToDelete(),
result.propertiesToDelete());
+ assertEquals(
+ Collections.singleton(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID),
+ result.propertiesToDelete());
// Verify storage methods were called correctly
// Note: createFileInPath directly calls create() when contentWriter is
present
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
index 86944ae3f5bf..c0981bdaeb62 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
@@ -19,8 +19,7 @@
package org.apache.hudi.table.action.bootstrap;
import org.apache.hudi.avro.model.HoodieFileStatus;
-import org.apache.hudi.client.bootstrap.BootstrapRecordPayload;
-import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
@@ -85,8 +84,7 @@ class OrcBootstrapMetadataHandler extends
BaseBootstrapMetadataHandler {
String recKey = keyGenerator.getKey(inp).getRecordKey();
GenericRecord gr = new
GenericData.Record(METADATA_BOOTSTRAP_RECORD_SCHEMA);
gr.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recKey);
- BootstrapRecordPayload payload = new BootstrapRecordPayload(gr);
- HoodieRecord rec = new HoodieAvroRecord(new HoodieKey(recKey,
partitionPath), payload);
+ HoodieRecord rec = new HoodieAvroIndexedRecord(new
HoodieKey(recKey, partitionPath), gr);
return rec;
}, table.getPreExecuteRunnable());
executor.execute();
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
index 7b1f48cd1aff..44d4a18b781e 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
@@ -19,8 +19,7 @@
package org.apache.hudi.table.action.bootstrap;
import org.apache.hudi.avro.model.HoodieFileStatus;
-import org.apache.hudi.client.bootstrap.BootstrapRecordPayload;
-import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieSparkRecord;
@@ -120,8 +119,7 @@ class ParquetBootstrapMetadataHandler extends
BaseBootstrapMetadataHandler {
case AVRO:
GenericRecord avroRecord = new
GenericData.Record(METADATA_BOOTSTRAP_RECORD_SCHEMA);
avroRecord.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recordKey);
- BootstrapRecordPayload payload = new
BootstrapRecordPayload(avroRecord);
- return new HoodieAvroRecord<>(hoodieKey, payload);
+ return new HoodieAvroIndexedRecord(hoodieKey, avroRecord);
case SPARK:
StructType schema =
HoodieInternalRowUtils$.MODULE$.getCachedSchema(METADATA_BOOTSTRAP_RECORD_SCHEMA);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index cdace761e6bc..9b24f7d0c989 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -833,15 +833,24 @@ public class HoodieTableConfig extends HoodieConfig {
recordMergeStrategyId = inferredConfigs.getRight();
// Step 2: Handle Version 9 specific logic.
- // CASE 0: For tables with special merger properties, e.g., with
non-builtin mergers.
- // CASE 1: For tables using MERGE MODE, or CUSTOM builtin mergers.
- // NOTE: Payload class should NOT be set for these cases.
- if (!BUILTIN_MERGE_STRATEGIES.contains(recordMergeStrategyId)
+ if (recordMergeMode != CUSTOM &&
StringUtils.isNullOrEmpty(payloadClassName)) {
+ // CASE 0: For commit time and event time based table, only merge mode
is set.
+ // `StringUtils.isNullOrEmpty(payloadClassName)` is added since
+ // `DefaultHoodieRecordPayload`, `OverwriteWithLatestAvroPayload` and
`EventTimeAvroPayload`
+ // have the commit/event time merge mode through
`inferMergingConfigsForPreV9Table`.
+ // NOTE: Payload class / strategy id should NOT be set for this case.
+ reconciledConfigs.put(RECORD_MERGE_MODE.key(), recordMergeMode.name());
+ } else if (!BUILTIN_MERGE_STRATEGIES.contains(recordMergeStrategyId)
|| StringUtils.isNullOrEmpty(payloadClassName)) {
+ // CASE 1: For tables using non-builtin custom merger
+ // (payload class is inferred in
`inferMergingConfigsForPreV9Table`),
+ // or for tables using builtin custom merger without using a
payload class,
+ // in this case, merge mode and strategy id should be set.
+ // NOTE: Payload class should NOT be set for these cases.
reconciledConfigs.put(RECORD_MERGE_MODE.key(), recordMergeMode.name());
reconciledConfigs.put(RECORD_MERGE_STRATEGY_ID.key(),
recordMergeStrategyId);
} else {
- // For tables using payload classes.
+ // For tables using payload classes in custom merge mode.
// CASE 2: Custom payload class. We set these properties explicitly.
if
(!PayloadGroupings.getPayloadsUnderDeprecation().contains(payloadClassName)) {
reconciledConfigs.put(RECORD_MERGE_MODE.key(), CUSTOM.toString());
@@ -849,46 +858,58 @@ public class HoodieTableConfig extends HoodieConfig {
reconciledConfigs.put(RECORD_MERGE_STRATEGY_ID.key(),
PAYLOAD_BASED_MERGE_STRATEGY_UUID);
} else { // CASE 3: Payload classes are under deprecation.
// Standard merging configs.
- // NOTE: We use LEGACY_PAYLOAD_CLASS_NAME instead of
PAYLOAD_CLASS_NAME here.
- if
(PayloadGroupings.getEventTimeOrderingPayloads().contains(payloadClassName)) {
- reconciledConfigs.put(RECORD_MERGE_MODE.key(),
EVENT_TIME_ORDERING.name());
- reconciledConfigs.put(LEGACY_PAYLOAD_CLASS_NAME.key(),
payloadClassName);
- reconciledConfigs.put(RECORD_MERGE_STRATEGY_ID.key(),
EVENT_TIME_BASED_MERGE_STRATEGY_UUID);
- } else {
- reconciledConfigs.put(RECORD_MERGE_MODE.key(),
COMMIT_TIME_ORDERING.name());
- reconciledConfigs.put(LEGACY_PAYLOAD_CLASS_NAME.key(),
payloadClassName);
- reconciledConfigs.put(RECORD_MERGE_STRATEGY_ID.key(),
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID);
- }
+ handleMergeModeConfigs(payloadClassName, reconciledConfigs);
// Partial update mode config.
- // Certain payloads are migrated to non payload way from 1.1 Hudi
binary.
- // Hence we need to set the right value for partial update mode for
some of the cases.
- if (payloadClassName.equals(PartialUpdateAvroPayload.class.getName())
- ||
payloadClassName.equals(OverwriteNonDefaultsWithLatestAvroPayload.class.getName()))
{
- reconciledConfigs.put(PARTIAL_UPDATE_MODE.key(),
PartialUpdateMode.IGNORE_DEFAULTS.name());
- } else if
(payloadClassName.equals(PostgresDebeziumAvroPayload.class.getName())) {
- reconciledConfigs.put(PARTIAL_UPDATE_MODE.key(),
PartialUpdateMode.FILL_UNAVAILABLE.name());
- }
- // Additional custom merge properties.
+ handlePartialUpdateModeConfigs(payloadClassName, reconciledConfigs);
+ // Additional custom merge properties.s
// Certain payloads are migrated to non payload way from 1.1 Hudi
binary and the reader might need certain properties for the
// merge to function as expected. Handing such special cases here.
- if
(payloadClassName.equals(PostgresDebeziumAvroPayload.class.getName())) {
- reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX +
PARTIAL_UPDATE_UNAVAILABLE_VALUE, DEBEZIUM_UNAVAILABLE_VALUE);
- reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY,
DebeziumConstants.FLATTENED_OP_COL_NAME);
- reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER,
DebeziumConstants.DELETE_OP);
- reconciledConfigs.put(ORDERING_FIELDS.key(),
DebeziumConstants.FLATTENED_LSN_COL_NAME);
- } else if
(payloadClassName.equals(MySqlDebeziumAvroPayload.class.getName())) {
- reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY,
DebeziumConstants.FLATTENED_OP_COL_NAME);
- reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER,
DebeziumConstants.DELETE_OP);
- reconciledConfigs.put(ORDERING_FIELDS.key(),
DebeziumConstants.FLATTENED_FILE_COL_NAME + "," +
DebeziumConstants.FLATTENED_POS_COL_NAME);
- } else if (payloadClassName.equals(AWSDmsAvroPayload.class.getName()))
{
- reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY,
OP_FIELD);
- reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER,
DELETE_OPERATION_VALUE);
- }
+ handlePayloadAdhocConfigs(payloadClassName, reconciledConfigs);
}
}
return reconciledConfigs;
}
+ private static void handleMergeModeConfigs(String payloadClassName,
Map<String, String> reconciledConfigs) {
+ // NOTE: We use LEGACY_PAYLOAD_CLASS_NAME instead of PAYLOAD_CLASS_NAME
here.
+ if
(PayloadGroupings.getEventTimeOrderingPayloads().contains(payloadClassName)) {
+ reconciledConfigs.put(RECORD_MERGE_MODE.key(),
EVENT_TIME_ORDERING.name());
+ reconciledConfigs.put(LEGACY_PAYLOAD_CLASS_NAME.key(), payloadClassName);
+ } else {
+ reconciledConfigs.put(RECORD_MERGE_MODE.key(),
COMMIT_TIME_ORDERING.name());
+ reconciledConfigs.put(LEGACY_PAYLOAD_CLASS_NAME.key(), payloadClassName);
+ }
+ }
+
+ private static void handlePartialUpdateModeConfigs(String payloadClassName,
Map<String, String> reconciledConfigs) {
+ // Certain payloads are migrated to non payload way from 1.1 Hudi binary.
+ // Hence we need to set the right value for partial update mode for some
of the cases.
+ if (payloadClassName.equals(PartialUpdateAvroPayload.class.getName())
+ ||
payloadClassName.equals(OverwriteNonDefaultsWithLatestAvroPayload.class.getName()))
{
+ reconciledConfigs.put(PARTIAL_UPDATE_MODE.key(),
PartialUpdateMode.IGNORE_DEFAULTS.name());
+ } else if
(payloadClassName.equals(PostgresDebeziumAvroPayload.class.getName())) {
+ reconciledConfigs.put(PARTIAL_UPDATE_MODE.key(),
PartialUpdateMode.FILL_UNAVAILABLE.name());
+ }
+ }
+
+ private static void handlePayloadAdhocConfigs(String payloadClassName,
Map<String, String> reconciledConfigs) {
+ // Certain payloads are migrated to non payload way from 1.1 Hudi binary
and the reader might need certain properties for the
+ // merge to function as expected. Handing such special cases here.
+ if (payloadClassName.equals(PostgresDebeziumAvroPayload.class.getName())) {
+ reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX +
PARTIAL_UPDATE_UNAVAILABLE_VALUE, DEBEZIUM_UNAVAILABLE_VALUE);
+ reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY,
DebeziumConstants.FLATTENED_OP_COL_NAME);
+ reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER,
DebeziumConstants.DELETE_OP);
+ reconciledConfigs.put(ORDERING_FIELDS.key(),
DebeziumConstants.FLATTENED_LSN_COL_NAME);
+ } else if
(payloadClassName.equals(MySqlDebeziumAvroPayload.class.getName())) {
+ reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY,
DebeziumConstants.FLATTENED_OP_COL_NAME);
+ reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER,
DebeziumConstants.DELETE_OP);
+ reconciledConfigs.put(ORDERING_FIELDS.key(),
DebeziumConstants.FLATTENED_FILE_COL_NAME + "," +
DebeziumConstants.FLATTENED_POS_COL_NAME);
+ } else if (payloadClassName.equals(AWSDmsAvroPayload.class.getName())) {
+ reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY,
OP_FIELD);
+ reconciledConfigs.put(RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER,
DELETE_OPERATION_VALUE);
+ }
+ }
+
/**
* To be invoked for table creation flows or writer flows.
* @return the merging configs to use.
@@ -898,7 +919,18 @@ public class HoodieTableConfig extends HoodieConfig {
String recordMergeStrategyId,
String orderingFieldNamesAsString,
HoodieTableVersion tableVersion) {
- return inferMergingConfigsForPreV9Table(recordMergeMode, payloadClassName,
recordMergeStrategyId, orderingFieldNamesAsString, tableVersion);
+ if (tableVersion.greaterThanOrEquals(HoodieTableVersion.NINE)) {
+ Map<String, String> inferredMergingConfigs =
inferMergingConfigsForV9TableCreation(
+ recordMergeMode, payloadClassName, recordMergeStrategyId,
orderingFieldNamesAsString, tableVersion);
+
checkArgument(inferredMergingConfigs.containsKey(HoodieTableConfig.RECORD_MERGE_MODE.key()));
+ return Triple.of(
+
RecordMergeMode.valueOf(inferredMergingConfigs.get(RECORD_MERGE_MODE.key())),
+ inferredMergingConfigs.getOrDefault(PAYLOAD_CLASS_NAME.key(), null),
+ inferredMergingConfigs.getOrDefault(RECORD_MERGE_STRATEGY_ID.key(),
null));
+ }
+ // For table version <= 8.
+ return inferMergingConfigsForPreV9Table(
+ recordMergeMode, payloadClassName, recordMergeStrategyId,
orderingFieldNamesAsString, tableVersion);
}
/**
@@ -944,7 +976,9 @@ public class HoodieTableConfig extends HoodieConfig {
inferredRecordMergeMode = modeBasedOnPayload != null ?
modeBasedOnPayload : modeBasedOnStrategyId;
}
}
- if (recordMergeMode != null) {
+
+ if (recordMergeMode != null &&
(tableVersion.lesserThan(HoodieTableVersion.NINE)
+ ||
!PayloadGroupings.getPayloadsUnderDeprecation().contains(payloadClassName))) {
checkArgument(inferredRecordMergeMode == recordMergeMode,
String.format("Configured record merge mode (%s) is inconsistent
with payload class (%s) "
+ "or record merge strategy ID (%s) configured. Please
revisit the configs.",
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
index ae86f95448fc..14b5ad8efd45 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
@@ -110,7 +110,7 @@ public class HoodieRecordUtils {
return Option.fromJavaOptional(mergeImplClassList.stream()
.map(clazz -> loadRecordMerger(clazz))
.filter(Objects::nonNull)
- .filter(merger ->
merger.getMergingStrategy().equals(recordMergeStrategyId))
+ .filter(merger -> StringUtils.isNullOrEmpty(recordMergeStrategyId) ||
merger.getMergingStrategy().equals(recordMergeStrategyId))
.filter(merger -> recordTypeCompatibleEngine(merger.getRecordType(),
engineType))
.findFirst());
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
index 03ef1a46a588..3fef94c8e60b 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
@@ -245,9 +245,11 @@ public class FlinkWriteClients {
// <merge_mode, payload_class, merge_strategy_id>
Triple<RecordMergeMode, String, String> mergingBehavior =
StreamerUtil.inferMergingBehavior(conf);
- builder.withRecordMergeStrategyId(mergingBehavior.getRight())
- .withRecordMergeMode(mergingBehavior.getLeft())
+ builder.withRecordMergeMode(mergingBehavior.getLeft())
.withRecordMergeImplClasses(StreamerUtil.getMergerClasses(conf,
mergingBehavior.getLeft(), mergingBehavior.getMiddle()));
+ if (mergingBehavior.getRight() != null) {
+ builder.withRecordMergeStrategyId(mergingBehavior.getRight());
+ }
Option<HoodieLockConfig> lockConfig = getLockConfig(conf);
if (lockConfig.isPresent()) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 11d3817e2dd1..a5901fc69387 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -398,8 +398,7 @@ public class StreamerUtil {
getMergeMode(conf), payloadClassName, getMergeStrategyId(conf),
OptionsResolver.getOrderingFieldsStr(conf), HoodieTableVersion.current());
String mergeMode =
mergeConf.get(HoodieTableConfig.RECORD_MERGE_MODE.key());
String mergeStrategyId =
mergeConf.get(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key());
- ValidationUtils.checkArgument(mergeMode != null && mergeStrategyId != null,
- "Both merge mode and merge strategy id should not be null");
+ ValidationUtils.checkArgument(mergeMode != null, "Merge mode should not be
null");
return Triple.of(RecordMergeMode.valueOf(mergeMode), payloadClassName,
mergeStrategyId);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 21b8d5a2b0be..6b5f79c6872b 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -961,7 +961,8 @@ public class TestData {
.build();
// deal with partial update merger
if
(config.getString(HoodieTableConfig.PAYLOAD_CLASS_NAME).contains(PartialUpdateAvroPayload.class.getSimpleName())
- ||
config.getString(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID).equalsIgnoreCase(HoodieRecordMerger.CUSTOM_MERGE_STRATEGY_UUID))
{
+ || (config.getString(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID) !=
null
+ &&
config.getString(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID).equalsIgnoreCase(HoodieRecordMerger.CUSTOM_MERGE_STRATEGY_UUID)))
{
config.setValue(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
PartialUpdateFlinkRecordMerger.class.getName());
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
index 27bbf96a38df..d823a01f5be1 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
@@ -28,7 +28,6 @@ import
org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.EventTimeAvroPayload;
-import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.PartialUpdateAvroPayload;
@@ -54,6 +53,7 @@ import java.io.File;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
@@ -109,7 +109,7 @@ public class TestFlinkWriteClients {
HoodieTableConfig tableConfig = metaClient.getTableConfig();
assertThat(tableConfig.getRecordMergeMode(),
is(RecordMergeMode.EVENT_TIME_ORDERING));
- assertThat(tableConfig.getRecordMergeStrategyId(),
is(HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID));
+ assertNull(tableConfig.getRecordMergeStrategyId());
assertThat(tableConfig.getPayloadClass(),
is(DefaultHoodieRecordPayload.class.getName()));
HoodieWriteConfig writeConfig =
FlinkWriteClients.getHoodieClientConfig(conf, false, false);
@@ -129,7 +129,7 @@ public class TestFlinkWriteClients {
HoodieTableConfig tableConfig = metaClient.getTableConfig();
assertThat(tableConfig.getRecordMergeMode(),
is(RecordMergeMode.COMMIT_TIME_ORDERING));
- assertThat(tableConfig.getRecordMergeStrategyId(),
is(HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID));
+ assertNull(tableConfig.getRecordMergeStrategyId());
assertThat(tableConfig.getPayloadClass(),
is(OverwriteWithLatestAvroPayload.class.getName()));
@@ -154,7 +154,7 @@ public class TestFlinkWriteClients {
HoodieTableConfig tableConfig = metaClient.getTableConfig();
assertThat(tableConfig.getRecordMergeMode(),
is(RecordMergeMode.EVENT_TIME_ORDERING));
- assertThat(tableConfig.getRecordMergeStrategyId(),
is(HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID));
+ assertNull(tableConfig.getRecordMergeStrategyId());
assertThat(tableConfig.getPayloadClass(),
is(DefaultHoodieRecordPayload.class.getName()));
HoodieWriteConfig writeConfig =
FlinkWriteClients.getHoodieClientConfig(conf, false, false);
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
index f8d9fd55f98d..8e19c8c2e8d4 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
@@ -21,7 +21,6 @@ package org.apache.hudi.utils;
import org.apache.hudi.client.model.PartialUpdateFlinkRecordMerger;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.model.EventTimeAvroPayload;
-import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.PartialUpdateAvroPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
@@ -52,6 +51,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -70,7 +70,7 @@ class TestStreamerUtil {
Triple<RecordMergeMode, String, String> mergeBehavior =
StreamerUtil.inferMergingBehavior(conf);
assertEquals(RecordMergeMode.EVENT_TIME_ORDERING, mergeBehavior.getLeft());
assertEquals(EventTimeAvroPayload.class.getName(),
mergeBehavior.getMiddle());
- assertEquals(HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
mergeBehavior.getRight());
+ assertNull(mergeBehavior.getRight());
// set commit time merge mode
conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
@@ -78,7 +78,7 @@ class TestStreamerUtil {
mergeBehavior = StreamerUtil.inferMergingBehavior(conf);
assertEquals(RecordMergeMode.COMMIT_TIME_ORDERING,
mergeBehavior.getLeft());
assertEquals(OverwriteWithLatestAvroPayload.class.getName(),
mergeBehavior.getMiddle());
- assertEquals(HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
mergeBehavior.getRight());
+ assertNull(mergeBehavior.getRight());
// set partial update merger.
conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
@@ -86,7 +86,7 @@ class TestStreamerUtil {
mergeBehavior = StreamerUtil.inferMergingBehavior(conf);
assertEquals(RecordMergeMode.EVENT_TIME_ORDERING, mergeBehavior.getLeft());
assertEquals(PartialUpdateAvroPayload.class.getName(),
mergeBehavior.getMiddle());
- assertEquals(HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
mergeBehavior.getRight());
+ assertNull(mergeBehavior.getRight());
// set partial update payload
conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
@@ -94,7 +94,7 @@ class TestStreamerUtil {
mergeBehavior = StreamerUtil.inferMergingBehavior(conf);
assertEquals(RecordMergeMode.EVENT_TIME_ORDERING, mergeBehavior.getLeft());
assertEquals(PartialUpdateAvroPayload.class.getName(),
mergeBehavior.getMiddle());
- assertEquals(HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
mergeBehavior.getRight());
+ assertNull(mergeBehavior.getRight());
// set partial update payload
conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
@@ -103,7 +103,7 @@ class TestStreamerUtil {
mergeBehavior = StreamerUtil.inferMergingBehavior(conf);
assertEquals(RecordMergeMode.EVENT_TIME_ORDERING, mergeBehavior.getLeft());
assertEquals(PartialUpdateAvroPayload.class.getName(),
mergeBehavior.getMiddle());
- assertEquals(HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
mergeBehavior.getRight());
+ assertNull(mergeBehavior.getRight());
}
@Test
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
index 25042d59783e..a59ae8c5c3f2 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
@@ -599,11 +599,11 @@ class TestHoodieTableConfig extends
HoodieCommonTestHarness {
// Test case: Version 9 table with null payload class and event time
ordering
arguments("Version 9 with event time ordering", EVENT_TIME_ORDERING,
null, EVENT_TIME_BASED_MERGE_STRATEGY_UUID, "ts", HoodieTableVersion.NINE,
- 2, EVENT_TIME_ORDERING.name(), null,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, null, null, null, null, null),
+ 1, EVENT_TIME_ORDERING.name(), null, null, null, null, null, null,
null),
// Test case: Version 9 table with null payload class and commit time
ordering
arguments("Version 9 with commit time ordering", COMMIT_TIME_ORDERING,
null, COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, null, HoodieTableVersion.NINE,
- 2, COMMIT_TIME_ORDERING.name(), null,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, null, null, null, null, null),
+ 1, COMMIT_TIME_ORDERING.name(), null, null, null, null, null,
null, null),
// Test case: Version 9 table with null payload class and custom merge
mode
arguments("Version 9 with custom merge mode", CUSTOM, null,
CUSTOM_MERGE_STRATEGY_UUID, null, HoodieTableVersion.NINE,
@@ -615,38 +615,38 @@ class TestHoodieTableConfig extends
HoodieCommonTestHarness {
// Test case: Version 9 table with event time based payload
(DefaultHoodieRecordPayload)
arguments("Version 9 with DefaultHoodieRecordPayload", null,
DefaultHoodieRecordPayload.class.getName(), null, "ts", HoodieTableVersion.NINE,
- 3, EVENT_TIME_ORDERING.name(), null,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
DefaultHoodieRecordPayload.class.getName(), null, null, null, null),
+ 2, EVENT_TIME_ORDERING.name(), null, null,
DefaultHoodieRecordPayload.class.getName(), null, null, null, null),
// Test case: Version 9 table with commit time based payload
(OverwriteWithLatestAvroPayload)
arguments("Version 9 with OverwriteWithLatestAvroPayload", null,
OverwriteWithLatestAvroPayload.class.getName(), null, null,
HoodieTableVersion.NINE,
- 3, COMMIT_TIME_ORDERING.name(), null,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
OverwriteWithLatestAvroPayload.class.getName(), null, null, null, null),
+ 2, COMMIT_TIME_ORDERING.name(), null, null,
OverwriteWithLatestAvroPayload.class.getName(), null, null, null, null),
// Test case: Version 9 table with PartialUpdateAvroPayload (should
set partial update mode)
arguments("Version 9 with PartialUpdateAvroPayload", null,
PartialUpdateAvroPayload.class.getName(), null, "ts", HoodieTableVersion.NINE,
- 4, EVENT_TIME_ORDERING.name(), null,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, PartialUpdateAvroPayload.class.getName(),
PartialUpdateMode.IGNORE_DEFAULTS.name(),
+ 3, EVENT_TIME_ORDERING.name(), null, null,
PartialUpdateAvroPayload.class.getName(),
PartialUpdateMode.IGNORE_DEFAULTS.name(),
null, null, null),
// Test case: Version 9 table with
OverwriteNonDefaultsWithLatestAvroPayload (should set partial update mode)
arguments("Version 9 with OverwriteNonDefaultsWithLatestAvroPayload",
null, OverwriteNonDefaultsWithLatestAvroPayload.class.getName(), null, null,
HoodieTableVersion.NINE,
- 4, COMMIT_TIME_ORDERING.name(), null,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
OverwriteNonDefaultsWithLatestAvroPayload.class.getName(),
+ 3, COMMIT_TIME_ORDERING.name(), null, null,
OverwriteNonDefaultsWithLatestAvroPayload.class.getName(),
PartialUpdateMode.IGNORE_DEFAULTS.name(), null, null, null),
// Test case: Version 9 table with PostgresDebeziumAvroPayload (should
set partial update mode and custom properties)
arguments("Version 9 with PostgresDebeziumAvroPayload", null,
PostgresDebeziumAvroPayload.class.getName(), null, "ts",
HoodieTableVersion.NINE,
- 8, EVENT_TIME_ORDERING.name(), null,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
PostgresDebeziumAvroPayload.class.getName(),
+ 7, EVENT_TIME_ORDERING.name(), null, null,
PostgresDebeziumAvroPayload.class.getName(),
PartialUpdateMode.FILL_UNAVAILABLE.name(),
HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE, "_change_operation_type", "d"),
// Test case: Version 9 table with AWSDmsAvroPayload (should set
custom delete properties)
arguments("Version 9 with AWSDmsAvroPayload", null,
AWSDmsAvroPayload.class.getName(), null, null, HoodieTableVersion.NINE,
- 5, COMMIT_TIME_ORDERING.name(), null,
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID, AWSDmsAvroPayload.class.getName(), null,
null, "Op", "D"),
+ 4, COMMIT_TIME_ORDERING.name(), null, null,
AWSDmsAvroPayload.class.getName(), null, null, "Op", "D"),
// Test case: Version 9 table with EventTimeAvroPayload (event time
based payload)
arguments("Version 9 with EventTimeAvroPayload", null,
EventTimeAvroPayload.class.getName(), null, "ts", HoodieTableVersion.NINE,
- 3, EVENT_TIME_ORDERING.name(), null,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, EventTimeAvroPayload.class.getName(),
null, null, null, null),
+ 2, EVENT_TIME_ORDERING.name(), null, null,
EventTimeAvroPayload.class.getName(), null, null, null, null),
// Test case: Version 9 table with MySqlDebeziumAvroPayload (event
time based payload)
arguments("Version 9 with MySqlDebeziumAvroPayload", null,
MySqlDebeziumAvroPayload.class.getName(), null, "ts", HoodieTableVersion.NINE,
- 6, EVENT_TIME_ORDERING.name(), null,
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, MySqlDebeziumAvroPayload.class.getName(),
null, null, "_change_operation_type", "d")
+ 5, EVENT_TIME_ORDERING.name(), null, null,
MySqlDebeziumAvroPayload.class.getName(), null, null, "_change_operation_type",
"d")
);
}
@@ -747,9 +747,8 @@ class TestHoodieTableConfig extends HoodieCommonTestHarness
{
// Test case: Empty string payload class should be treated as null
Map<String, String> configs =
HoodieTableConfig.inferMergingConfigsForV9TableCreation(
EVENT_TIME_ORDERING, "", EVENT_TIME_BASED_MERGE_STRATEGY_UUID, "ts",
HoodieTableVersion.NINE);
- assertEquals(2, configs.size());
+ assertEquals(1, configs.size());
assertEquals(EVENT_TIME_ORDERING.name(),
configs.get(RECORD_MERGE_MODE.key()));
- assertEquals(EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
configs.get(RECORD_MERGE_STRATEGY_ID.key()));
// Test case: Non-version 9 table with all parameters should throw
assertExceptionWithInferMergingConfigsForV9TableCreation(EVENT_TIME_ORDERING,
DefaultHoodieRecordPayload.class.getName(),
@@ -758,9 +757,8 @@ class TestHoodieTableConfig extends HoodieCommonTestHarness
{
// Test case: Version 9 table with null ordering field for event time
ordering should still work
configs = HoodieTableConfig.inferMergingConfigsForV9TableCreation(
EVENT_TIME_ORDERING, null, EVENT_TIME_BASED_MERGE_STRATEGY_UUID, null,
HoodieTableVersion.NINE);
- assertEquals(2, configs.size());
+ assertEquals(1, configs.size());
assertEquals(EVENT_TIME_ORDERING.name(),
configs.get(RECORD_MERGE_MODE.key()));
- assertEquals(EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
configs.get(RECORD_MERGE_STRATEGY_ID.key()));
}
@Test
@@ -774,7 +772,7 @@ class TestHoodieTableConfig extends HoodieCommonTestHarness
{
Map<String, String> configs =
HoodieTableConfig.inferMergingConfigsForV9TableCreation(
EVENT_TIME_ORDERING, DefaultHoodieRecordPayload.class.getName(),
EVENT_TIME_BASED_MERGE_STRATEGY_UUID, "ts", version);
- assertEquals(3, configs.size(), "Table version 9 and above should
return 3 configs");
+ assertEquals(2, configs.size(), "Table version 9 and above should
return 2 configs");
}
}
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index c8e70d5db833..009c2c4f8d8e 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -1137,10 +1137,14 @@ class HoodieSparkSqlWriterInternal {
tableVersion)
mergedParams.put(DataSourceWriteOptions.RECORD_MERGE_MODE.key(),
inferredMergeConfigs.getLeft.name())
mergedParams.put(HoodieTableConfig.RECORD_MERGE_MODE.key(),
inferredMergeConfigs.getLeft.name())
- mergedParams.put(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key(),
inferredMergeConfigs.getMiddle)
- mergedParams.put(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(),
inferredMergeConfigs.getMiddle)
- mergedParams.put(DataSourceWriteOptions.RECORD_MERGE_STRATEGY_ID.key(),
inferredMergeConfigs.getRight)
- mergedParams.put(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key(),
inferredMergeConfigs.getRight)
+ if (inferredMergeConfigs.getMiddle != null) {
+ mergedParams.put(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key(),
inferredMergeConfigs.getMiddle)
+ mergedParams.put(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(),
inferredMergeConfigs.getMiddle)
+ }
+ if (inferredMergeConfigs.getRight != null) {
+
mergedParams.put(DataSourceWriteOptions.RECORD_MERGE_STRATEGY_ID.key(),
inferredMergeConfigs.getRight)
+ mergedParams.put(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key(),
inferredMergeConfigs.getRight)
+ }
} else {
mergedParams.put(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(),
mergedParams(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key()))
mergedParams.put(HoodieTableConfig.RECORD_MERGE_MODE.key(),
mergedParams(HoodieWriteConfig.RECORD_MERGE_MODE.key()))
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index c9807da5a434..520fd996b74f 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -314,6 +314,21 @@ object HoodieWriterUtils {
&& currentPartitionFields != tableConfigPartitionFields) {
diffConfigs.append(s"PartitionPath:\t$currentPartitionFields\t$tableConfigPartitionFields\n")
}
+ // The value of `HoodieTableConfig.RECORD_MERGE_STRATEGY_ID` can be
NULL or non-NULL.
+ // The non-NULL value has been validated above in the regular code
path.
+ // Here we check the NULL case since if the value is NULL, the check
is skipped above.
+ // So here we check if the write config contains non-null merge
strategy id. If so, throw.
+ // Here are two exclusions:
+ // CASE 1: For < v9 tables, we skip check completely for backward
compatibility.
+ // CASE 2: For >= v9 tables, merge-into queries.
+ if (tableConfig.getInt(HoodieTableConfig.VERSION) >=
HoodieTableVersion.NINE.versionCode()
+ && !params.getOrElse(PAYLOAD_CLASS_NAME.key(),
"").equals(EXPRESSION_PAYLOAD_CLASS_NAME)
+ &&
StringUtils.isNullOrEmpty(tableConfig.getStringOrDefault(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key,
null))) {
+ val mergeStrategyId =
params.getOrElse(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key(), null)
+ if (!StringUtils.isNullOrEmpty(mergeStrategyId)) {
+
diffConfigs.append(s"${HoodieTableConfig.RECORD_MERGE_STRATEGY_ID}:\t$mergeStrategyId\tnull\n")
+ }
+ }
}
if (diffConfigs.nonEmpty) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
index 13f5ba7ba60d..d3452220d7b0 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
@@ -726,47 +726,46 @@ public class TestUpgradeDowngrade extends
SparkClientFunctionalTestHarness {
}
private void validateVersion8Properties(HoodieTableConfig tableConfig) {
+ validatePropertiesForV8Plus(tableConfig);
+
assertTrue(tableConfig.contains(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID),
+ "Record merge strategy ID should be set for V8");
+ }
+
+ private void validateVersion9Properties(HoodieTableMetaClient metaClient,
HoodieTableConfig tableConfig) {
+ validatePropertiesForV8Plus(tableConfig);
+
+ // Check if index metadata exists and has proper version information
+ Option<HoodieIndexMetadata> indexMetadata = metaClient.getIndexMetadata();
+ if (indexMetadata.isPresent()) {
+ indexMetadata.get().getIndexDefinitions().forEach((indexName, indexDef)
-> {
+ assertNotNull(indexDef.getVersion(),
+ "Index " + indexName + " should have version information in V9");
+ });
+ }
+ }
+
+ private void validatePropertiesForV8Plus(HoodieTableConfig tableConfig) {
Option<TimelineLayoutVersion> layoutVersion =
tableConfig.getTimelineLayoutVersion();
assertTrue(layoutVersion.isPresent(), "Timeline layout version should be
present for V8+");
assertEquals(TimelineLayoutVersion.LAYOUT_VERSION_2, layoutVersion.get(),
"Timeline layout should be V2 for V8+");
-
assertTrue(tableConfig.contains(HoodieTableConfig.TIMELINE_PATH),
- "Timeline path should be set for V8");
+ "Timeline path should be set for V8+");
assertEquals(HoodieTableConfig.TIMELINE_PATH.defaultValue(),
tableConfig.getString(HoodieTableConfig.TIMELINE_PATH),
"Timeline path should have default value");
-
assertTrue(tableConfig.contains(HoodieTableConfig.RECORD_MERGE_MODE),
- "Record merge mode should be set for V8");
+ "Record merge mode should be set for V8+");
RecordMergeMode mergeMode = tableConfig.getRecordMergeMode();
assertNotNull(mergeMode, "Merge mode should not be null");
-
-
assertTrue(tableConfig.contains(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID),
- "Record merge strategy ID should be set for V8");
-
assertTrue(tableConfig.contains(HoodieTableConfig.INITIAL_VERSION),
- "Initial version should be set for V8");
-
+ "Initial version should be set for V8+");
if (tableConfig.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)) {
assertTrue(tableConfig.contains(HoodieTableConfig.KEY_GENERATOR_TYPE),
"Key generator type should be set when key generator class is
present");
}
}
- private void validateVersion9Properties(HoodieTableMetaClient metaClient,
HoodieTableConfig tableConfig) {
- validateVersion8Properties(tableConfig);
-
- // Check if index metadata exists and has proper version information
- Option<HoodieIndexMetadata> indexMetadata = metaClient.getIndexMetadata();
- if (indexMetadata.isPresent()) {
- indexMetadata.get().getIndexDefinitions().forEach((indexName, indexDef)
-> {
- assertNotNull(indexDef.getVersion(),
- "Index " + indexName + " should have version information in V9");
- });
- }
- }
-
/**
* Read table data for validation purposes.
*/
@@ -780,15 +779,15 @@ public class TestUpgradeDowngrade extends
SparkClientFunctionalTestHarness {
.load(basePath);
assertNotNull(tableData, "Table read should not return null " + stage);
-
+
// Force execution to ensure data is read immediately (not lazily)
List<Row> rows = tableData.collectAsList();
long rowCount = rows.size();
assertTrue(rowCount >= 0, "Row count should be non-negative " + stage);
-
+
// Convert collected rows back to Dataset for use in validation
Dataset<Row> materializedData = sqlContext().createDataFrame(rows,
tableData.schema());
-
+
LOG.info("Successfully read and materialized table data {} ({} rows)",
stage, rowCount);
return materializedData;
} catch (Exception e) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index 37fdd03be341..496c7b1790f2 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -19,7 +19,7 @@ package org.apache.hudi
import org.apache.hudi.DataSourceWriteOptions.{DROP_INSERT_DUP_POLICY,
FAIL_INSERT_DUP_POLICY, INSERT_DROP_DUPS, INSERT_DUP_POLICY}
import org.apache.hudi.client.SparkRDDWriteClient
-import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig}
+import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig,
RecordMergeMode}
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload,
HoodieFileFormat, HoodieRecord, HoodieRecordPayload,
HoodieReplaceCommitMetadata, HoodieTableType, WriteOperationType}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.table.timeline.TimelineUtils
@@ -618,10 +618,20 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
.setPartitionFields(fooTableParams(DataSourceWriteOptions.PARTITIONPATH_FIELD.key))
.setKeyGeneratorClassProp(fooTableParams.getOrElse(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key,
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.defaultValue()))
- if(addBootstrapPath) {
- tableMetaClientBuilder
-
.setBootstrapBasePath(fooTableParams(HoodieBootstrapConfig.BASE_PATH.key))
- }
+ if
(fooTableParams.contains(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key())) {
+
tableMetaClientBuilder.setPayloadClassName(fooTableParams(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key))
+ }
+ if (fooTableParams.contains(HoodieWriteConfig.RECORD_MERGE_MODE.key)) {
+ tableMetaClientBuilder.setRecordMergeMode(RecordMergeMode.valueOf(
+ fooTableParams(HoodieWriteConfig.RECORD_MERGE_MODE.key)))
+ }
+ if
(fooTableParams.contains(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key)) {
+
tableMetaClientBuilder.setRecordMergeStrategyId(fooTableParams(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key))
+ }
+ if(addBootstrapPath) {
+ tableMetaClientBuilder
+
.setBootstrapBasePath(fooTableParams(HoodieBootstrapConfig.BASE_PATH.key))
+ }
if (initBasePath) {
tableMetaClientBuilder.initTable(HadoopFSUtils.getStorageConfWithCopy(sc.hadoopConfiguration),
tempBasePath)
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterWithTestFormat.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterWithTestFormat.scala
index d5cafe06d5cf..0282b7375552 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterWithTestFormat.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterWithTestFormat.scala
@@ -19,7 +19,7 @@ package org.apache.hudi
import org.apache.hudi.client.SparkRDDWriteClient
-import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.config.{HoodieMetadataConfig, RecordMergeMode}
import org.apache.hudi.common.model._
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
@@ -489,6 +489,15 @@ class TestHoodieSparkSqlWriterWithTestFormat extends
HoodieSparkWriterTestBase {
.setPartitionFields(fooTableParams(DataSourceWriteOptions.PARTITIONPATH_FIELD.key))
.setKeyGeneratorClassProp(fooTableParams.getOrElse(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key,
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.defaultValue()))
+ if
(fooTableParams.contains(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key)) {
+
tableMetaClientBuilder.setPayloadClassName(fooTableParams(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key))
+ }
+ if (fooTableParams.contains(HoodieWriteConfig.RECORD_MERGE_MODE.key)) {
+
tableMetaClientBuilder.setRecordMergeMode(RecordMergeMode.valueOf(fooTableParams(HoodieWriteConfig.RECORD_MERGE_MODE.key)))
+ }
+ if
(fooTableParams.contains(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key)) {
+
tableMetaClientBuilder.setRecordMergeStrategyId(fooTableParams(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key))
+ }
if (addBootstrapPath) {
tableMetaClientBuilder
.setBootstrapBasePath(fooTableParams(HoodieBootstrapConfig.BASE_PATH.key))
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
index 1146371be515..0db232308ca2 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
@@ -450,8 +450,7 @@ object TestPayloadDeprecationFlow {
"false",
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
- HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[DefaultHoodieRecordPayload].getName,
- HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID)
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[DefaultHoodieRecordPayload].getName)
),
Arguments.of(
"COPY_ON_WRITE",
@@ -459,8 +458,7 @@ object TestPayloadDeprecationFlow {
"true",
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
- HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[DefaultHoodieRecordPayload].getName,
- HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID)
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[DefaultHoodieRecordPayload].getName)
),
Arguments.of(
"COPY_ON_WRITE",
@@ -468,8 +466,7 @@ object TestPayloadDeprecationFlow {
"false",
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
- HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[OverwriteWithLatestAvroPayload].getName,
- HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID)
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[OverwriteWithLatestAvroPayload].getName)
),
Arguments.of(
"COPY_ON_WRITE",
@@ -478,7 +475,6 @@ object TestPayloadDeprecationFlow {
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[PartialUpdateAvroPayload].getName,
- HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS")
),
Arguments.of(
@@ -488,7 +484,6 @@ object TestPayloadDeprecationFlow {
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[PostgresDebeziumAvroPayload].getName,
- HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "FILL_UNAVAILABLE",
HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX +
HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE
-> "__debezium_unavailable_value")
@@ -500,8 +495,8 @@ object TestPayloadDeprecationFlow {
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[MySqlDebeziumAvroPayload].getName,
- HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
- HoodieTableConfig.ORDERING_FIELDS.key() ->
(DebeziumConstants.FLATTENED_FILE_COL_NAME + "," +
DebeziumConstants.FLATTENED_POS_COL_NAME))),
+ HoodieTableConfig.ORDERING_FIELDS.key() ->
(DebeziumConstants.FLATTENED_FILE_COL_NAME + "," +
DebeziumConstants.FLATTENED_POS_COL_NAME))
+ ),
Arguments.of(
"COPY_ON_WRITE",
classOf[AWSDmsAvroPayload].getName,
@@ -509,7 +504,6 @@ object TestPayloadDeprecationFlow {
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[AWSDmsAvroPayload].getName,
- HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX +
DefaultHoodieRecordPayload.DELETE_KEY -> "Op",
HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX +
DefaultHoodieRecordPayload.DELETE_MARKER -> "D")
),
@@ -519,9 +513,7 @@ object TestPayloadDeprecationFlow {
"false",
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
- HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[EventTimeAvroPayload].getName,
- HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID
- )
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[EventTimeAvroPayload].getName)
),
Arguments.of(
"COPY_ON_WRITE",
@@ -530,9 +522,7 @@ object TestPayloadDeprecationFlow {
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName,
- HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
- HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS"
- )
+ HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS")
),
Arguments.of(
"MERGE_ON_READ",
@@ -540,8 +530,7 @@ object TestPayloadDeprecationFlow {
"false",
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
- HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[DefaultHoodieRecordPayload].getName,
- HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID)
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[DefaultHoodieRecordPayload].getName)
),
Arguments.of(
"MERGE_ON_READ",
@@ -549,8 +538,7 @@ object TestPayloadDeprecationFlow {
"true",
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
- HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[DefaultHoodieRecordPayload].getName,
- HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID)
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[DefaultHoodieRecordPayload].getName)
),
Arguments.of(
"MERGE_ON_READ",
@@ -558,8 +546,7 @@ object TestPayloadDeprecationFlow {
"false",
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
- HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[OverwriteWithLatestAvroPayload].getName,
- HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID)
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[OverwriteWithLatestAvroPayload].getName)
),
Arguments.of(
"MERGE_ON_READ",
@@ -568,7 +555,6 @@ object TestPayloadDeprecationFlow {
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[PartialUpdateAvroPayload].getName,
- HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS")
),
Arguments.of(
@@ -578,7 +564,6 @@ object TestPayloadDeprecationFlow {
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[PostgresDebeziumAvroPayload].getName,
- HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "FILL_UNAVAILABLE",
HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX +
HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE
-> "__debezium_unavailable_value")
@@ -590,8 +575,8 @@ object TestPayloadDeprecationFlow {
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[MySqlDebeziumAvroPayload].getName,
- HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
- HoodieTableConfig.ORDERING_FIELDS.key() ->
(DebeziumConstants.FLATTENED_FILE_COL_NAME + "," +
DebeziumConstants.FLATTENED_POS_COL_NAME))),
+ HoodieTableConfig.ORDERING_FIELDS.key() ->
(DebeziumConstants.FLATTENED_FILE_COL_NAME + "," +
DebeziumConstants.FLATTENED_POS_COL_NAME))
+ ),
Arguments.of(
"MERGE_ON_READ",
classOf[AWSDmsAvroPayload].getName,
@@ -599,7 +584,6 @@ object TestPayloadDeprecationFlow {
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[AWSDmsAvroPayload].getName,
- HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX +
DefaultHoodieRecordPayload.DELETE_KEY -> "Op",
HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX +
DefaultHoodieRecordPayload.DELETE_MARKER -> "D")
),
@@ -609,9 +593,7 @@ object TestPayloadDeprecationFlow {
"false",
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
- HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[EventTimeAvroPayload].getName,
- HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID
- )
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[EventTimeAvroPayload].getName)
),
Arguments.of(
"MERGE_ON_READ",
@@ -620,9 +602,7 @@ object TestPayloadDeprecationFlow {
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName,
- HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
- HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS"
- )
+ HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS")
)
)
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
index 123d23954efa..5ccde850c5b7 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
@@ -21,7 +21,7 @@ package org.apache.hudi.functional
import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions,
DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.hudi.common.config.{HoodieMetadataConfig, RecordMergeMode}
-import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord,
HoodieRecordMerger, HoodieTableType, OverwriteWithLatestAvroPayload,
PartialUpdateAvroPayload}
+import org.apache.hudi.common.model.{DefaultHoodieRecordPayload,
EventTimeAvroPayload, HoodieRecord, HoodieRecordMerger, HoodieTableType,
OverwriteWithLatestAvroPayload, PartialUpdateAvroPayload}
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.table.read.CustomPayloadForTesting
@@ -433,7 +433,9 @@ class TestSparkDataSource extends
SparkClientFunctionalTestHarness {
val metaClient =
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
assertEquals(tableVersion,
metaClient.getTableConfig.getTableVersion.versionCode())
assertEquals(mergeMode, metaClient.getTableConfig.getRecordMergeMode)
- assertEquals(strategyId,
metaClient.getTableConfig.getRecordMergeStrategyId)
+ if (tableVersion < 9) {
+ assertEquals(strategyId,
metaClient.getTableConfig.getRecordMergeStrategyId)
+ }
val df1 = df.limit(1)
val diffMergeMode = if (mergeMode == RecordMergeMode.EVENT_TIME_ORDERING) {
@@ -445,35 +447,51 @@ class TestSparkDataSource extends
SparkClientFunctionalTestHarness {
df1.write.format("hudi")
.option(HoodieWriteConfig.RECORD_MERGE_MODE.key, diffMergeMode.name)
.option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
+ .option(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key, "false")
.mode(SaveMode.Append)
.save(basePath)
val finalDf = spark.read.format("hudi")
.options(readOpts)
.load(basePath)
+ df1.write.format("hudi")
+ .option(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key,
classOf[EventTimeAvroPayload].getName)
+ .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
+ .option(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key, "false")
+ .mode(SaveMode.Append)
+ .save(basePath)
+ df1.write.format("hudi")
+ .option(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key,
classOf[EventTimeAvroPayload].getName)
+ .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
+ .option(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key, "false")
+ .mode(SaveMode.Append)
+ .save(basePath)
assertEquals(399, finalDf.count())
} else {
Assertions.assertThrows(classOf[HoodieException], () => {
df1.write.format("hudi")
- .option(HoodieWriteConfig.RECORD_MERGE_MODE.key, "any_other_payload")
+ .option(HoodieWriteConfig.RECORD_MERGE_MODE.key, diffMergeMode.name)
+ .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
+ .option(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key, "false")
+ .mode(SaveMode.Append)
+ .save(basePath)
+ })
+ Assertions.assertThrows(classOf[HoodieException], () => {
+ df1.write.format("hudi")
+ .option(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key,
classOf[EventTimeAvroPayload].getName)
.option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
+ .option(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key, "false")
+ .mode(SaveMode.Append)
+ .save(basePath)
+ })
+ Assertions.assertThrows(classOf[HoodieException], () => {
+ df1.write.format("hudi")
+ .option(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key,
HoodieRecordMerger.CUSTOM_MERGE_STRATEGY_UUID)
+ .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
+ .option(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key, "false")
.mode(SaveMode.Append)
.save(basePath)
})
}
- Assertions.assertThrows(classOf[HoodieException], () => {
- df1.write.format("hudi")
- .option(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key,
"any_other_payload")
- .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
- .mode(SaveMode.Append)
- .save(basePath)
- })
- Assertions.assertThrows(classOf[HoodieException], () => {
- df1.write.format("hudi")
- .option(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key,
"any_other_strategy_id")
- .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
- .mode(SaveMode.Append)
- .save(basePath)
- })
}
def ingestNewBatch(tableType: HoodieTableType, recordsToUpdate: Integer,
structType: StructType, inserts: java.util.List[Row],
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala
index 1bb82673acdf..627c6a34e606 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala
@@ -545,6 +545,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
// disable this config to avoid affect other test in this class.
spark.sql(s"set hoodie.sql.insert.mode=upsert")
+ spark.sql("set hoodie.merge.allow.duplicate.on.inserts=true")
}
}
@@ -818,7 +819,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
("date", "DATE'2021-05-20'")
)
typeAndValue.foreach { case (partitionType, partitionValue) =>
- val tableName = generateTableName
+ val tableName = s"${generateTableName}_timestamp_type"
spark.sql(s"set
hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled=true")
validateDifferentTypesOfPartitionColumn(tmp, partitionType,
partitionValue, tableName)
}
@@ -1010,7 +1011,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
withSQLConf("hoodie.sql.insert.mode" -> "non-strict") {
withRecordType()(withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
- withTable(generateTableName) { tableMultiPartition =>
+ withTable(s"${generateTableName}_multi_partition") {
tableMultiPartition =>
spark.sql(
s"""
|create table $tableMultiPartition (
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeModeCommitTimeOrdering.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeModeCommitTimeOrdering.scala
index 7f7e3a9f4986..629c61af6c3a 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeModeCommitTimeOrdering.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeModeCommitTimeOrdering.scala
@@ -33,7 +33,8 @@ class TestMergeModeCommitTimeOrdering extends
HoodieSparkSqlTestBase {
Seq(
"cow,current,false,false", "cow,current,false,true",
"cow,current,true,false",
"mor,current,false,false", "mor,current,false,true",
"mor,current,true,false",
- "cow,6,true,false", "cow,6,true,true", "mor,6,true,true").foreach { args =>
+ "cow,6,true,false", "cow,6,true,true", "mor,6,true,true",
+ "cow,8,true,false", "cow,8,true,true", "mor,8,true,true").foreach { args =>
val argList = args.split(',')
val tableType = argList(0)
val tableVersion = if (argList(1).equals("current")) {
@@ -63,15 +64,23 @@ class TestMergeModeCommitTimeOrdering extends
HoodieSparkSqlTestBase {
} else {
""
}
- val expectedMergeConfigs = if (tableVersion.toInt == 6) {
- Map(
- HoodieTableConfig.VERSION.key -> "6",
- HoodieTableConfig.PAYLOAD_CLASS_NAME.key ->
classOf[OverwriteWithLatestAvroPayload].getName)
- } else {
- Map(
- HoodieTableConfig.VERSION.key -> tableVersion,
- HoodieTableConfig.RECORD_MERGE_MODE.key -> COMMIT_TIME_ORDERING.name(),
- HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key ->
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID)
+ val expectedMergeConfigs: Map[String, String] = tableVersion.toInt match {
+ case 6 =>
+ Map(
+ HoodieTableConfig.VERSION.key -> "6",
+ HoodieTableConfig.PAYLOAD_CLASS_NAME.key ->
classOf[OverwriteWithLatestAvroPayload].getName
+ )
+ case 8 =>
+ Map(
+ HoodieTableConfig.VERSION.key -> tableVersion,
+ HoodieTableConfig.RECORD_MERGE_MODE.key ->
COMMIT_TIME_ORDERING.name(),
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key ->
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID
+ )
+ case _ =>
+ Map(
+ HoodieTableConfig.VERSION.key -> tableVersion,
+ HoodieTableConfig.RECORD_MERGE_MODE.key ->
COMMIT_TIME_ORDERING.name()
+ )
}
val nonExistentConfigs = if (tableVersion.toInt == 6) {
Seq(HoodieTableConfig.RECORD_MERGE_MODE.key,
HoodieTableConfig.ORDERING_FIELDS.key)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeModeEventTimeOrdering.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeModeEventTimeOrdering.scala
index a3bc2af09106..cbcc9c5989fd 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeModeEventTimeOrdering.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeModeEventTimeOrdering.scala
@@ -62,23 +62,29 @@ class TestMergeModeEventTimeOrdering extends
HoodieSparkSqlTestBase {
} else {
""
}
- val writeTableVersionClause = if (tableVersion.toInt == 6) {
- s"hoodie.write.table.version = $tableVersion,"
- } else {
- ""
+ val writeTableVersionClause = tableVersion.toInt match {
+ case 6 => s"hoodie.write.table.version = $tableVersion,"
+ case 8 => s"hoodie.write.table.version = $tableVersion,"
+ case _ => ""
}
- val expectedMergeConfigs = if (tableVersion.toInt == 6) {
- Map(
- HoodieTableConfig.VERSION.key -> "6",
- HoodieTableConfig.PAYLOAD_CLASS_NAME.key ->
classOf[DefaultHoodieRecordPayload].getName,
- HoodieTableConfig.ORDERING_FIELDS.key -> "ts"
- )
- } else {
- Map(
- HoodieTableConfig.VERSION.key ->
String.valueOf(HoodieTableVersion.current().versionCode()),
- HoodieTableConfig.ORDERING_FIELDS.key -> "ts",
- HoodieTableConfig.RECORD_MERGE_MODE.key -> EVENT_TIME_ORDERING.name(),
- HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key ->
EVENT_TIME_BASED_MERGE_STRATEGY_UUID)
+ val expectedMergeConfigs: Map[String, String] = tableVersion.toInt match {
+ case 6 =>
+ Map(
+ HoodieTableConfig.VERSION.key -> "6",
+ HoodieTableConfig.PAYLOAD_CLASS_NAME.key ->
classOf[DefaultHoodieRecordPayload].getName,
+ HoodieTableConfig.ORDERING_FIELDS.key -> "ts"
+ )
+ case 8 =>
+ Map(
+ HoodieTableConfig.VERSION.key -> "8",
+ HoodieTableConfig.RECORD_MERGE_MODE.key ->
EVENT_TIME_ORDERING.name(),
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key ->
EVENT_TIME_BASED_MERGE_STRATEGY_UUID
+ )
+ case _ =>
+ Map(
+ HoodieTableConfig.VERSION.key -> "9",
+ HoodieTableConfig.RECORD_MERGE_MODE.key -> EVENT_TIME_ORDERING.name()
+ )
}
val nonExistentConfigs = if (tableVersion.toInt == 6) {
Seq(HoodieTableConfig.RECORD_MERGE_MODE.key)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
index 726bbd5070b8..b0a33e203127 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
@@ -141,7 +141,6 @@ class TestRepairsProcedure extends
HoodieSparkProcedureTestBase {
|[hoodie.datasource.write.hive_style_partitioning,true,null]
|[hoodie.datasource.write.partitionpath.urlencode,false,null]
|[hoodie.record.merge.mode,EVENT_TIME_ORDERING,null]
-
|[hoodie.record.merge.strategy.id,eeb8d96f-b1e4-49fd-bbf8-28ac514178e5,null]
|[hoodie.table.checksum,,]
|[hoodie.table.create.schema,,]
|[hoodie.table.format,native,null]
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 5b5d4d8b83ef..afaf728d3fe0 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -59,6 +59,7 @@ import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
@@ -1136,16 +1137,21 @@ public class StreamSync implements Serializable,
Closeable {
.withInlineCompaction(cfg.isInlineCompactionEnabled())
.build()
)
- .withPayloadConfig(
- HoodiePayloadConfig.newBuilder()
- .withPayloadClass(cfg.payloadClassName)
- .withPayloadOrderingFields(cfg.sourceOrderingFields)
- .build())
.withRecordMergeMode(cfg.recordMergeMode)
- .withRecordMergeStrategyId(cfg.recordMergeStrategyId)
.withRecordMergeImplClasses(cfg.recordMergeImplClasses)
.forTable(cfg.targetTableName)
.withProps(props);
+ // Merge strategy id can be NULL.
+ if (!StringUtils.isNullOrEmpty(cfg.recordMergeStrategyId)) {
+ builder.withRecordMergeStrategyId(cfg.recordMergeStrategyId);
+ }
+ HoodiePayloadConfig.Builder payloadConfigBuilder =
+
HoodiePayloadConfig.newBuilder().withPayloadOrderingFields(cfg.sourceOrderingFields);
+ // Payload class can be NULL.
+ if (!StringUtils.isNullOrEmpty(cfg.payloadClassName)) {
+ payloadConfigBuilder.withPayloadClass(cfg.payloadClassName);
+ }
+ builder.withPayloadConfig(payloadConfigBuilder.build());
// If schema is required in the config, we need to handle the case where
the target schema is null and should be fetched from previous commits
final Schema returnSchema;
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index c9614a765a8a..89781741e39f 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -212,7 +212,6 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
opts.put(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
DefaultSparkRecordMerger.class.getName());
opts.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet");
opts.put(HoodieWriteConfig.RECORD_MERGE_MODE.key(),
RecordMergeMode.EVENT_TIME_ORDERING.name());
- opts.put(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key(),
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID);
for (Map.Entry<String, String> entry : opts.entrySet()) {
hoodieConfig.add(String.format("%s=%s", entry.getKey(),
entry.getValue()));
}