This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ad33d7999e0c fix: Fix upgrade handling for MySqlDebeziumAvroPayload
with deltastreamer (#14159)
ad33d7999e0c is described below
commit ad33d7999e0ca2bde3985185eaa28e88cfed5113
Author: Lokesh Jain <[email protected]>
AuthorDate: Tue Oct 28 07:50:23 2025 +0530
fix: Fix upgrade handling for MySqlDebeziumAvroPayload with deltastreamer
(#14159)
---------
Co-authored-by: Lokesh Jain <[email protected]>
Co-authored-by: Lokesh Jain <[email protected]>
Co-authored-by: sivabalan <[email protected]>
---
.../table/upgrade/EightToNineUpgradeHandler.java | 44 ++++-----
.../upgrade/TestEightToNineUpgradeHandler.java | 107 +++++++++------------
.../hudi/common/model/HoodieRecordPayload.java | 21 ++--
.../model/debezium/MySqlDebeziumAvroPayload.java | 5 +
.../hudi/common/table/HoodieTableConfig.java | 7 ++
.../hudi/utilities/streamer/HoodieStreamer.java | 1 -
.../utilities/streamer/HoodieStreamerUtils.java | 8 +-
.../apache/hudi/utilities/streamer/StreamSync.java | 20 +++-
.../deltastreamer/TestHoodieDeltaStreamer.java | 28 ++++++
.../streamer/TestHoodieStreamerUtils.java | 5 +-
10 files changed, 140 insertions(+), 106 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java
index 788a6deb7171..1621cecfd65d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java
@@ -48,8 +48,6 @@ import java.util.Set;
import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
-import static
org.apache.hudi.common.model.debezium.DebeziumConstants.FLATTENED_FILE_COL_NAME;
-import static
org.apache.hudi.common.model.debezium.DebeziumConstants.FLATTENED_POS_COL_NAME;
import static
org.apache.hudi.common.model.HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID;
import static
org.apache.hudi.common.model.HoodieRecordMerger.CUSTOM_MERGE_STRATEGY_UUID;
import static
org.apache.hudi.common.model.HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
@@ -127,22 +125,22 @@ public class EightToNineUpgradeHandler implements
UpgradeHandler {
metaClient.getTableConfig().getTableVersion());
}
// Handle merge mode config.
- reconcileMergeModeConfig(tablePropsToAdd, tablePropsToRemove, tableConfig);
+ reconcileMergeModeConfig(tablePropsToAdd, tablePropsToRemove, tableConfig,
config);
// Handle partial update mode config.
- reconcilePartialUpdateModeConfig(tablePropsToAdd, tableConfig);
+ reconcilePartialUpdateModeConfig(tablePropsToAdd, tableConfig, config);
// Handle merge properties config.
reconcileMergePropertiesConfig(tablePropsToAdd, tableConfig, config);
// Handle payload class configs.
- reconcilePayloadClassConfig(tablePropsToAdd, tablePropsToRemove,
tableConfig);
+ reconcilePayloadClassConfig(tablePropsToAdd, tablePropsToRemove,
tableConfig, config);
// Handle ordering fields config.
- reconcileOrderingFieldsConfig(tablePropsToAdd, tablePropsToRemove,
tableConfig);
+ reconcileOrderingFieldsConfig(tablePropsToAdd, tablePropsToRemove,
tableConfig, config);
return new UpgradeDowngrade.TableConfigChangeSet(tablePropsToAdd,
tablePropsToRemove);
}
- private void reconcileMergeModeConfig(Map<ConfigProperty, String>
tablePropsToAdd,
- Set<ConfigProperty> tablePropsToRemove,
- HoodieTableConfig tableConfig) {
- String payloadClass = tableConfig.getPayloadClass();
+ private void reconcileMergeModeConfig(Map<ConfigProperty, String>
tablePropsToAdd, Set<ConfigProperty> tablePropsToRemove,
+ HoodieTableConfig tableConfig,
HoodieWriteConfig config) {
+ String payloadClass = tableConfig.getPayloadClassIfPresent()
+ .orElse(config.getPayloadClass());
RecordMergeMode mergeMode = tableConfig.getRecordMergeMode();
if (mergeMode != RecordMergeMode.CUSTOM) {
// For commit time or event time based table, remove merge strategy id.
@@ -162,10 +160,10 @@ public class EightToNineUpgradeHandler implements
UpgradeHandler {
// else: No op, which means merge strategy id and merge mode are not
changed.
}
- private void reconcilePayloadClassConfig(Map<ConfigProperty, String>
tablePropsToAdd,
- Set<ConfigProperty>
tablePropsToRemove,
- HoodieTableConfig tableConfig) {
- String payloadClass = tableConfig.getPayloadClass();
+ private void reconcilePayloadClassConfig(Map<ConfigProperty, String>
tablePropsToAdd, Set<ConfigProperty> tablePropsToRemove,
+ HoodieTableConfig tableConfig,
HoodieWriteConfig config) {
+ String payloadClass = tableConfig.getPayloadClassIfPresent()
+ .orElse(config.getPayloadClass());
if (StringUtils.isNullOrEmpty(payloadClass)) {
return;
}
@@ -176,8 +174,9 @@ public class EightToNineUpgradeHandler implements
UpgradeHandler {
}
private void reconcilePartialUpdateModeConfig(Map<ConfigProperty, String>
tablePropsToAdd,
- HoodieTableConfig tableConfig)
{
- String payloadClass = tableConfig.getPayloadClass();
+ HoodieTableConfig tableConfig,
HoodieWriteConfig config) {
+ String payloadClass = tableConfig.getPayloadClassIfPresent()
+ .orElse(config.getPayloadClass());
if (StringUtils.isNullOrEmpty(payloadClass)) {
return;
}
@@ -190,7 +189,8 @@ public class EightToNineUpgradeHandler implements
UpgradeHandler {
}
private void reconcileMergePropertiesConfig(Map<ConfigProperty, String>
tablePropsToAdd, HoodieTableConfig tableConfig, HoodieWriteConfig writeConfig) {
- String payloadClass = tableConfig.getPayloadClass();
+ String payloadClass = tableConfig.getPayloadClassIfPresent()
+ .orElse(writeConfig.getPayloadClass());
if (StringUtils.isNullOrEmpty(payloadClass)) {
return;
}
@@ -224,13 +224,13 @@ public class EightToNineUpgradeHandler implements
UpgradeHandler {
}
}
- private void reconcileOrderingFieldsConfig(Map<ConfigProperty, String>
tablePropsToAdd,
- Set<ConfigProperty>
tablePropsToRemove,
- HoodieTableConfig tableConfig) {
- String payloadClass = tableConfig.getPayloadClass();
+ private void reconcileOrderingFieldsConfig(Map<ConfigProperty, String>
tablePropsToAdd, Set<ConfigProperty> tablePropsToRemove,
+ HoodieTableConfig tableConfig,
HoodieWriteConfig config) {
+ String payloadClass = tableConfig.getPayloadClassIfPresent()
+ .orElse(config.getPayloadClass());
Option<String> orderingFieldsOpt;
if (MySqlDebeziumAvroPayload.class.getName().equals(payloadClass)) {
- orderingFieldsOpt = Option.of(FLATTENED_FILE_COL_NAME + "," +
FLATTENED_POS_COL_NAME);
+ orderingFieldsOpt = Option.of(MySqlDebeziumAvroPayload.ORDERING_FIELDS);
} else if
(PostgresDebeziumAvroPayload.class.getName().equals(payloadClass)) {
orderingFieldsOpt = Option.of(DebeziumConstants.FLATTENED_LSN_COL_NAME);
} else {
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java
index eb885f8be5ad..58946703091c 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java
@@ -56,9 +56,11 @@ import org.mockito.MockedStatic;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.file.Path;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
@@ -117,6 +119,7 @@ class TestEightToNineUpgradeHandler {
when(table.getMetaClient()).thenReturn(metaClient);
when(metaClient.getTableConfig()).thenReturn(tableConfig);
when(config.autoUpgrade()).thenReturn(true);
+ when(config.getPayloadClass()).thenReturn(null);
// Setup common mocks
when(upgradeDowngradeHelper.getTable(config, context)).thenReturn(table);
@@ -125,6 +128,7 @@ class TestEightToNineUpgradeHandler {
when(metaClient.getStorage()).thenReturn(storage);
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.EIGHT);
when(tableConfig.getOrderingFieldsStr()).thenReturn(Option.empty());
+ when(tableConfig.getPayloadClassIfPresent()).thenReturn(Option.empty());
// Use a temp file for index definition path
indexDefPath = new StoragePath(tempDir.resolve("index.json").toString());
@@ -143,81 +147,55 @@ class TestEightToNineUpgradeHandler {
}
static Stream<Arguments> payloadClassTestCases() {
- return Stream.of(
- Arguments.of(
- DefaultHoodieRecordPayload.class.getName(),
- "",
- null,
- null,
- "DefaultHoodieRecordPayload"
- ),
- Arguments.of(
- EventTimeAvroPayload.class.getName(),
- "",
- EVENT_TIME_ORDERING.name(),
- null,
- "EventTimeAvroPayload"
- ),
- Arguments.of(
- OverwriteWithLatestAvroPayload.class.getName(),
- "",
- null,
- null,
- "OverwriteWithLatestAvroPayload"
- ),
- Arguments.of(
- AWSDmsAvroPayload.class.getName(),
- RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY + "=Op,"
- + RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER + "=D", //
mergeProperties
- COMMIT_TIME_ORDERING.name(),
- null,
- "AWSDmsAvroPayload"
- ),
- Arguments.of(
- PostgresDebeziumAvroPayload.class.getName(),
- RECORD_MERGE_PROPERTY_PREFIX + PARTIAL_UPDATE_UNAVAILABLE_VALUE +
"=" + DEBEZIUM_UNAVAILABLE_VALUE + ","
- + RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY +
"=_change_operation_type,"
- + RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER + "=d",
- EVENT_TIME_ORDERING.name(),
- FILL_UNAVAILABLE.name(),
- "PostgresDebeziumAvroPayload"
- ),
- Arguments.of(
- PartialUpdateAvroPayload.class.getName(),
- "",
- EVENT_TIME_ORDERING.name(),
- PartialUpdateMode.IGNORE_DEFAULTS.name(),
- "PartialUpdateAvroPayload"
- ),
- Arguments.of(
- MySqlDebeziumAvroPayload.class.getName(),
- RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY +
"=_change_operation_type,"
- + RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER + "=d",
- EVENT_TIME_ORDERING.name(),
- null,
- "MySqlDebeziumAvroPayload"
- ),
- Arguments.of(
- OverwriteNonDefaultsWithLatestAvroPayload.class.getName(),
- "",
- COMMIT_TIME_ORDERING.name(),
- PartialUpdateMode.IGNORE_DEFAULTS.name(),
- "OverwriteNonDefaultsWithLatestAvroPayload"
- )
- );
+ List<Arguments> arguments = new ArrayList<>();
+ arguments.addAll(getArguments(DefaultHoodieRecordPayload.class.getName(),
"",
+ null, null, "DefaultHoodieRecordPayload"));
+ arguments.addAll(getArguments(EventTimeAvroPayload.class.getName(), "",
+ EVENT_TIME_ORDERING.name(), null, "EventTimeAvroPayload"));
+
arguments.addAll(getArguments(OverwriteWithLatestAvroPayload.class.getName(),
"",
+ null, null, "OverwriteWithLatestAvroPayload"));
+ arguments.addAll(getArguments(AWSDmsAvroPayload.class.getName(),
RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY + "=Op,"
+ + RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER + "=D", //
mergeProperties
+ COMMIT_TIME_ORDERING.name(), null, "AWSDmsAvroPayload"));
+ arguments.addAll(getArguments(PostgresDebeziumAvroPayload.class.getName(),
RECORD_MERGE_PROPERTY_PREFIX + PARTIAL_UPDATE_UNAVAILABLE_VALUE + "=" +
DEBEZIUM_UNAVAILABLE_VALUE + ","
+ + RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY +
"=_change_operation_type,"
+ + RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER + "=d",
+ EVENT_TIME_ORDERING.name(), FILL_UNAVAILABLE.name(),
"PostgresDebeziumAvroPayload"));
+ arguments.addAll(getArguments(PartialUpdateAvroPayload.class.getName(), "",
+ EVENT_TIME_ORDERING.name(), PartialUpdateMode.IGNORE_DEFAULTS.name(),
"PartialUpdateAvroPayload"));
+ arguments.addAll(getArguments(MySqlDebeziumAvroPayload.class.getName(),
RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY + "=_change_operation_type,"
+ + RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER + "=d",
+ EVENT_TIME_ORDERING.name(), null, "MySqlDebeziumAvroPayload"));
+
arguments.addAll(getArguments(OverwriteNonDefaultsWithLatestAvroPayload.class.getName(),
"",
+ COMMIT_TIME_ORDERING.name(), PartialUpdateMode.IGNORE_DEFAULTS.name(),
"OverwriteNonDefaultsWithLatestAvroPayload"));
+ return arguments.stream();
+ }
+
+ private static List<Arguments> getArguments(String payloadClassName, String
expectedMergeProperties,
+ String expectedRecordMergeMode,
String expectedPartialUpdateMode,
+ String testName) {
+ return Arrays.asList(
+ Arguments.of(payloadClassName, expectedMergeProperties,
+ expectedRecordMergeMode, expectedPartialUpdateMode, testName,
true),
+ Arguments.of(payloadClassName, expectedMergeProperties,
+ expectedRecordMergeMode, expectedPartialUpdateMode, testName,
false));
}
@ParameterizedTest(name = "testUpgradeWith{4}")
@MethodSource("payloadClassTestCases")
void testUpgradeWithPayloadClass(String payloadClassName, String
expectedMergeProperties,
String expectedRecordMergeMode, String
expectedPartialUpdateMode,
- String testName) {
+ String testName, boolean
isPayloadClassConfiguredInTableConfig) {
try (org.mockito.MockedStatic<UpgradeDowngradeUtils> utilities =
org.mockito.Mockito.mockStatic(UpgradeDowngradeUtils.class)) {
utilities.when(() ->
UpgradeDowngradeUtils.rollbackFailedWritesAndCompact(
any(), any(), any(), any(), anyBoolean(), any()))
.thenAnswer(invocation -> null);
- when(tableConfig.getPayloadClass()).thenReturn(payloadClassName);
+ if (isPayloadClassConfiguredInTableConfig) {
+
when(tableConfig.getPayloadClassIfPresent()).thenReturn(Option.ofNullable(payloadClassName));
+ } else {
+ when(config.getPayloadClass()).thenReturn(payloadClassName);
+ }
when(tableConfig.getTableType()).thenReturn(HoodieTableType.MERGE_ON_READ);
when(tableConfig.getRecordMergeStrategyId()).thenReturn(HoodieRecordMerger.CUSTOM_MERGE_STRATEGY_UUID);
when(metaClient.getIndexMetadata()).thenReturn(Option.empty());
@@ -301,6 +279,7 @@ class TestEightToNineUpgradeHandler {
if (payloadClass.equals(MySqlDebeziumAvroPayload.class.getName())) {
assertTrue(propertiesToAdd.containsKey(HoodieTableConfig.ORDERING_FIELDS));
assertEquals(FLATTENED_FILE_COL_NAME + "," + FLATTENED_POS_COL_NAME,
propertiesToAdd.get(HoodieTableConfig.ORDERING_FIELDS));
+
assertTrue(propertiesToRemove.contains(HoodieTableConfig.PRECOMBINE_FIELD));
} else if
(payloadClass.equals(PostgresDebeziumAvroPayload.class.getName())) {
assertEquals(FLATTENED_LSN_COL_NAME,
propertiesToAdd.get(HoodieTableConfig.ORDERING_FIELDS));
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
index af838e31bade..45856dbf4f2e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
@@ -184,19 +184,7 @@ public interface HoodieRecordPayload<T extends
HoodieRecordPayload> extends Seri
}
static String getPayloadClassName(Properties props) {
- Option<String> payloadOpt = getPayloadClassNameIfPresent(props);
- if (payloadOpt.isPresent()) {
- return payloadOpt.get();
- }
- // Note: starting from version 9, payload class is not necessary set, but
- // merge mode must exist. Therefore, we use merge mode to infer
- // the payload class for certain corner cases, like for MIT command.
- if (ConfigUtils.containsConfigProperty(props, RECORD_MERGE_MODE)
- && ConfigUtils.getStringWithAltKeys(props, RECORD_MERGE_MODE,
StringUtils.EMPTY_STRING)
- .equals(RecordMergeMode.COMMIT_TIME_ORDERING.name())) {
- return OverwriteWithLatestAvroPayload.class.getName();
- }
- return HoodieTableConfig.getDefaultPayloadClassName();
+ return
getPayloadClassNameIfPresent(props).orElse(HoodieTableConfig.getDefaultPayloadClassName());
}
// NOTE: PAYLOAD_CLASS_NAME is before LEGACY_PAYLOAD_CLASS_NAME to make sure
@@ -207,6 +195,13 @@ public interface HoodieRecordPayload<T extends
HoodieRecordPayload> extends Seri
payloadClassName = ConfigUtils.getStringWithAltKeys(props,
PAYLOAD_CLASS_NAME);
} else if (props.containsKey("hoodie.datasource.write.payload.class")) {
payloadClassName =
props.getProperty("hoodie.datasource.write.payload.class");
+ } else if (ConfigUtils.containsConfigProperty(props, RECORD_MERGE_MODE)
+ && ConfigUtils.getStringWithAltKeys(props, RECORD_MERGE_MODE,
StringUtils.EMPTY_STRING)
+ .equals(RecordMergeMode.COMMIT_TIME_ORDERING.name())) {
+ // Note: starting from version 9, payload class is not necessary set, but
+ // merge mode must exist. Therefore, we use merge mode to infer
+ // the payload class for certain corner cases, like for MIT
command.
+ payloadClassName = OverwriteWithLatestAvroPayload.class.getName();
}
// There could be tables written with payload class from com.uber.hoodie.
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/MySqlDebeziumAvroPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/MySqlDebeziumAvroPayload.java
index 46bf1b896ca2..aa3cf5eaea61 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/MySqlDebeziumAvroPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/MySqlDebeziumAvroPayload.java
@@ -32,6 +32,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Objects;
+import static
org.apache.hudi.common.model.debezium.DebeziumConstants.FLATTENED_FILE_COL_NAME;
+import static
org.apache.hudi.common.model.debezium.DebeziumConstants.FLATTENED_POS_COL_NAME;
+
/**
* Provides support for seamlessly applying changes captured via Debezium for
MysqlDB.
* <p>
@@ -48,6 +51,8 @@ public class MySqlDebeziumAvroPayload extends
AbstractDebeziumAvroPayload {
private static final Logger LOG =
LoggerFactory.getLogger(MySqlDebeziumAvroPayload.class);
+ public static final String ORDERING_FIELDS = FLATTENED_FILE_COL_NAME + "," +
FLATTENED_POS_COL_NAME;
+
public MySqlDebeziumAvroPayload(GenericRecord record, Comparable
orderingVal) {
super(record, orderingVal);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 9b24f7d0c989..c5bcfbaff6f6 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -801,6 +801,13 @@ public class HoodieTableConfig extends HoodieConfig {
return HoodieRecordPayload.getPayloadClassName(this);
}
+ /**
+ * Read the payload class if present for HoodieRecords from the table
properties.
+ */
+ public Option<String> getPayloadClassIfPresent() {
+ return HoodieRecordPayload.getPayloadClassNameIfPresent(this.getProps());
+ }
+
public String getLegacyPayloadClass() {
return getStringOrDefault(LEGACY_PAYLOAD_CLASS_NAME, EMPTY_STRING);
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
index fda1b8b6391d..6b3c1b2f94a4 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
@@ -161,7 +161,6 @@ public class HoodieStreamer implements Serializable {
cfg.recordMergeMode, cfg.payloadClassName,
cfg.recordMergeStrategyId, cfg.sourceOrderingFields,
HoodieTableVersion.fromVersionCode(ConfigUtils.getIntWithAltKeys(this.properties,
HoodieWriteConfig.WRITE_TABLE_VERSION)));
cfg.recordMergeMode = mergingConfigs.getLeft();
- cfg.payloadClassName = mergingConfigs.getMiddle();
cfg.recordMergeStrategyId = mergingConfigs.getRight();
if (cfg.initialCheckpointProvider != null && cfg.checkpoint == null) {
InitialCheckPointProvider checkPointProvider =
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
index 2d6b27f13434..7ab24034b633 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieSparkRecord;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.read.DeleteContext;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Either;
@@ -88,9 +89,10 @@ public class HoodieStreamerUtils {
*/
public static Option<JavaRDD<HoodieRecord>>
createHoodieRecords(HoodieStreamer.Config cfg, TypedProperties props,
Option<JavaRDD<GenericRecord>> avroRDDOptional,
SchemaProvider schemaProvider, HoodieRecord.HoodieRecordType recordType,
boolean autoGenerateRecordKeys,
- String
instantTime, Option<BaseErrorTableWriter> errorTableWriter) {
+ String
instantTime, Option<BaseErrorTableWriter> errorTableWriter, HoodieTableConfig
tableConfig) {
boolean shouldCombine = cfg.filterDupes ||
cfg.operation.equals(WriteOperationType.UPSERT);
- boolean shouldUseOrderingField = shouldCombine &&
!StringUtils.isNullOrEmpty(cfg.sourceOrderingFields);
+ String orderingFieldsStr =
tableConfig.getOrderingFieldsStr().orElse(cfg.sourceOrderingFields);
+ boolean shouldUseOrderingField = shouldCombine &&
!StringUtils.isNullOrEmpty(orderingFieldsStr);
boolean shouldErrorTable = errorTableWriter.isPresent() &&
props.getBoolean(ERROR_ENABLE_VALIDATE_RECORD_CREATION.key(),
ERROR_ENABLE_VALIDATE_RECORD_CREATION.defaultValue());
boolean useConsistentLogicalTimestamp = ConfigUtils.getBooleanWithAltKeys(
props,
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED);
@@ -129,7 +131,7 @@ public class HoodieStreamerUtils {
GenericRecord gr = isDropPartitionColumns(props) ?
HoodieAvroUtils.removeFields(genRec, partitionColumns) : genRec;
boolean isDelete =
AvroRecordContext.getFieldAccessorInstance().isDeleteRecord(gr, deleteContext);
Comparable orderingValue = shouldUseOrderingField
- ?
OrderingValues.create(cfg.sourceOrderingFields.split(","),
+ ? OrderingValues.create(orderingFieldsStr.split(","),
field -> (Comparable)
HoodieAvroUtils.getNestedFieldVal(gr, field, false,
useConsistentLogicalTimestamp))
: null;
HoodieRecord record = shouldUseOrderingField ?
HoodieRecordUtils.createHoodieRecord(gr, orderingValue, hoodieKey,
payloadClassName, requiresPayload, isDelete)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 5985abcf5423..fb55eb144cb4 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -47,6 +47,8 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.model.debezium.DebeziumConstants;
+import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
@@ -75,6 +77,7 @@ import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieMetaSyncException;
+import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
@@ -951,10 +954,11 @@ public class StreamSync implements Serializable,
Closeable {
BaseDatasetBulkInsertCommitActionExecutor executor = new
HoodieStreamerDatasetBulkInsertCommitActionExecutor(hoodieWriteConfig,
writeClient, instantTime);
writeClientWriteResult = new WriteClientWriteResult(executor.execute(df,
!HoodieStreamerUtils.getPartitionColumns(props).isEmpty()).getWriteStatuses());
} else {
+ metaClient = HoodieTableMetaClient.reload(metaClient);
TypedProperties mergeProps = ConfigUtils.getMergeProps(props,
metaClient.getTableConfig());
HoodieRecordType recordType =
createRecordMerger(mergeProps).getRecordType();
Option<JavaRDD<HoodieRecord>> recordsOption =
HoodieStreamerUtils.createHoodieRecords(cfg, mergeProps, inputBatch.getBatch(),
inputBatch.getSchemaProvider(),
- recordType, autoGenerateRecordKeys, instantTime, errorTableWriter);
+ recordType, autoGenerateRecordKeys, instantTime, errorTableWriter,
metaClient.getTableConfig());
JavaRDD<HoodieRecord> records = recordsOption.orElseGet(() ->
hoodieSparkContext.emptyRDD());
// filter dupes if needed
if (cfg.filterDupes) {
@@ -1149,6 +1153,20 @@ public class StreamSync implements Serializable,
Closeable {
if (!StringUtils.isNullOrEmpty(cfg.recordMergeStrategyId)) {
builder.withRecordMergeStrategyId(cfg.recordMergeStrategyId);
}
+
+ if (metaClient != null) {
+ HoodieTableConfig tableConfig = metaClient.getTableConfig();
+ // After upgrade to table version 9 with MySqlDebeziumAvroPayload,
ordering fields are changed from
+ // `_event_seq` to `_event_bin_file,_event_pos`. The logic here ensures
that deltastreamer config is updated
+ // if it points to older ordering field `_event_seq`.
+ if
(tableConfig.getTableVersion().greaterThanOrEquals(HoodieTableVersion.NINE) &&
tableConfig.getLegacyPayloadClass().equals(MySqlDebeziumAvroPayload.class.getCanonicalName())
+ &&
cfg.sourceOrderingFields.equals(DebeziumConstants.ADDED_SEQ_COL_NAME)) {
+ cfg.sourceOrderingFields = MySqlDebeziumAvroPayload.ORDERING_FIELDS;
+ } else if (tableConfig.getOrderingFieldsStr().isPresent() &&
!StringUtils.isNullOrEmpty(cfg.sourceOrderingFields)
+ &&
!tableConfig.getOrderingFieldsStr().orElse("").equals(cfg.sourceOrderingFields))
{
+ throw new HoodieValidationException(String.format("Configured ordering
fields: %s do not match table ordering fields: %s", cfg.sourceOrderingFields,
tableConfig.getOrderingFields()));
+ }
+ }
HoodiePayloadConfig.Builder payloadConfigBuilder =
HoodiePayloadConfig.newBuilder().withPayloadOrderingFields(cfg.sourceOrderingFields);
// Payload class can be NULL.
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 42f5aab7559a..a5c393af1a0a 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -74,6 +74,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
@@ -2267,6 +2268,33 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
}
+ @ParameterizedTest
+ @EnumSource(HoodieTableType.class)
+ public void
testDeltaStreamerFailureWithChangingOrderingFields(HoodieTableType tableType)
throws Exception {
+ String tableBasePath = basePath + "/test_with_changing_ordering_fields";
+ HoodieDeltaStreamer.Config cfg =
+ TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
+ cfg.tableType = tableType.name();
+ cfg.filterDupes = true;
+ cfg.sourceOrderingFields = "timestamp,rider";
+ cfg.recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING;
+ cfg.payloadClassName = DefaultHoodieRecordPayload.class.getName();
+ cfg.recordMergeStrategyId =
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
+
+ TestDataSource.recordInstantTime = Option.of("001");
+ new HoodieStreamer(cfg, jsc).sync();
+ assertRecordCount(1000, tableBasePath, sqlContext);
+ TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
+
+ // Change ordering fields in deltastreamer
+ Exception e = assertThrows(HoodieValidationException.class, () -> {
+ cfg.sourceOrderingFields = "timestamp";
+ TestDataSource.recordInstantTime = Option.of("002");
+ runStreamSync(cfg, false, 10, WriteOperationType.UPSERT);
+ });
+ assertTrue(e.getMessage().equals("Configured ordering fields: timestamp do
not match table ordering fields: [timestamp, rider]"));
+ }
+
private static long getNumUpdates(HoodieCommitMetadata metadata) {
return metadata.getPartitionToWriteStats().values().stream()
.flatMap(Collection::stream)
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieStreamerUtils.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieStreamerUtils.java
index 1c40eedf588c..eae9c0f0308d 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieStreamerUtils.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieStreamerUtils.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.hudi.exception.HoodieRecordCreationException;
@@ -108,7 +109,7 @@ public class TestHoodieStreamerUtils extends
UtilitiesTestBase {
doNothing().when(errorTableWriter.get()).addErrorEvents(errorEventCaptor.capture());
}
Option<JavaRDD<HoodieRecord>> recordOpt =
HoodieStreamerUtils.createHoodieRecords(cfg, props, Option.of(recordRdd),
- new SimpleSchemaProvider(jsc, schema, props), recordType, false,
"000", errorTableWriter);
+ new SimpleSchemaProvider(jsc, schema, props), recordType, false,
"000", errorTableWriter, new HoodieTableConfig());
if (errorTableWriter.isPresent()) {
assertEquals(0, errorEventCaptor.getValue().collect().size());
@@ -153,7 +154,7 @@ public class TestHoodieStreamerUtils extends
UtilitiesTestBase {
doNothing().when(errorTableWriter.get()).addErrorEvents(errorEventCaptor.capture());
}
Option<JavaRDD<HoodieRecord>> records =
HoodieStreamerUtils.createHoodieRecords(cfg, props, Option.of(recordRdd),
- schemaProvider, recordType, false, "000", errorTableWriter);
+ schemaProvider, recordType, false, "000", errorTableWriter, new
HoodieTableConfig());
assertTrue(records.isPresent());
if (enableErrorTableWriter) {