This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.1.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 72f4452981eb9299cbb84508570f9e702622e341 Author: Lokesh Jain <[email protected]> AuthorDate: Tue Oct 28 07:50:23 2025 +0530 fix: Fix upgrade handling for MySqlDebeziumAvroPayload with deltastreamer (#14159) --------- Co-authored-by: Lokesh Jain <[email protected]> Co-authored-by: Lokesh Jain <[email protected]> Co-authored-by: sivabalan <[email protected]> --- .../table/upgrade/EightToNineUpgradeHandler.java | 44 ++++----- .../upgrade/TestEightToNineUpgradeHandler.java | 107 +++++++++------------ .../hudi/common/model/HoodieRecordPayload.java | 21 ++-- .../model/debezium/MySqlDebeziumAvroPayload.java | 5 + .../hudi/common/table/HoodieTableConfig.java | 7 ++ .../hudi/utilities/streamer/HoodieStreamer.java | 1 - .../utilities/streamer/HoodieStreamerUtils.java | 8 +- .../apache/hudi/utilities/streamer/StreamSync.java | 20 +++- .../deltastreamer/TestHoodieDeltaStreamer.java | 28 ++++++ .../streamer/TestHoodieStreamerUtils.java | 5 +- 10 files changed, 140 insertions(+), 106 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java index 788a6deb7171..1621cecfd65d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java @@ -48,8 +48,6 @@ import java.util.Set; import static org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY; import static org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER; -import static org.apache.hudi.common.model.debezium.DebeziumConstants.FLATTENED_FILE_COL_NAME; -import static org.apache.hudi.common.model.debezium.DebeziumConstants.FLATTENED_POS_COL_NAME; import static org.apache.hudi.common.model.HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID; import static org.apache.hudi.common.model.HoodieRecordMerger.CUSTOM_MERGE_STRATEGY_UUID; import static org.apache.hudi.common.model.HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID; @@ -127,22 +125,22 @@ public class EightToNineUpgradeHandler implements UpgradeHandler { metaClient.getTableConfig().getTableVersion()); } // Handle merge mode config. - reconcileMergeModeConfig(tablePropsToAdd, tablePropsToRemove, tableConfig); + reconcileMergeModeConfig(tablePropsToAdd, tablePropsToRemove, tableConfig, config); // Handle partial update mode config. - reconcilePartialUpdateModeConfig(tablePropsToAdd, tableConfig); + reconcilePartialUpdateModeConfig(tablePropsToAdd, tableConfig, config); // Handle merge properties config. reconcileMergePropertiesConfig(tablePropsToAdd, tableConfig, config); // Handle payload class configs. - reconcilePayloadClassConfig(tablePropsToAdd, tablePropsToRemove, tableConfig); + reconcilePayloadClassConfig(tablePropsToAdd, tablePropsToRemove, tableConfig, config); // Handle ordering fields config. - reconcileOrderingFieldsConfig(tablePropsToAdd, tablePropsToRemove, tableConfig); + reconcileOrderingFieldsConfig(tablePropsToAdd, tablePropsToRemove, tableConfig, config); return new UpgradeDowngrade.TableConfigChangeSet(tablePropsToAdd, tablePropsToRemove); } - private void reconcileMergeModeConfig(Map<ConfigProperty, String> tablePropsToAdd, - Set<ConfigProperty> tablePropsToRemove, - HoodieTableConfig tableConfig) { - String payloadClass = tableConfig.getPayloadClass(); + private void reconcileMergeModeConfig(Map<ConfigProperty, String> tablePropsToAdd, Set<ConfigProperty> tablePropsToRemove, + HoodieTableConfig tableConfig, HoodieWriteConfig config) { + String payloadClass = tableConfig.getPayloadClassIfPresent() + .orElse(config.getPayloadClass()); RecordMergeMode mergeMode = tableConfig.getRecordMergeMode(); if (mergeMode != RecordMergeMode.CUSTOM) { // For commit time or event time based table, remove merge strategy id. @@ -162,10 +160,10 @@ public class EightToNineUpgradeHandler implements UpgradeHandler { // else: No op, which means merge strategy id and merge mode are not changed. } - private void reconcilePayloadClassConfig(Map<ConfigProperty, String> tablePropsToAdd, - Set<ConfigProperty> tablePropsToRemove, - HoodieTableConfig tableConfig) { - String payloadClass = tableConfig.getPayloadClass(); + private void reconcilePayloadClassConfig(Map<ConfigProperty, String> tablePropsToAdd, Set<ConfigProperty> tablePropsToRemove, + HoodieTableConfig tableConfig, HoodieWriteConfig config) { + String payloadClass = tableConfig.getPayloadClassIfPresent() + .orElse(config.getPayloadClass()); if (StringUtils.isNullOrEmpty(payloadClass)) { return; } @@ -176,8 +174,9 @@ public class EightToNineUpgradeHandler implements UpgradeHandler { } private void reconcilePartialUpdateModeConfig(Map<ConfigProperty, String> tablePropsToAdd, - HoodieTableConfig tableConfig) { - String payloadClass = tableConfig.getPayloadClass(); + HoodieTableConfig tableConfig, HoodieWriteConfig config) { + String payloadClass = tableConfig.getPayloadClassIfPresent() + .orElse(config.getPayloadClass()); if (StringUtils.isNullOrEmpty(payloadClass)) { return; } @@ -190,7 +189,8 @@ public class EightToNineUpgradeHandler implements UpgradeHandler { } private void reconcileMergePropertiesConfig(Map<ConfigProperty, String> tablePropsToAdd, HoodieTableConfig tableConfig, HoodieWriteConfig writeConfig) { - String payloadClass = tableConfig.getPayloadClass(); + String payloadClass = tableConfig.getPayloadClassIfPresent() + .orElse(writeConfig.getPayloadClass()); if (StringUtils.isNullOrEmpty(payloadClass)) { return; } @@ -224,13 +224,13 @@ public class EightToNineUpgradeHandler implements UpgradeHandler { } } - private void reconcileOrderingFieldsConfig(Map<ConfigProperty, String> tablePropsToAdd, - Set<ConfigProperty> tablePropsToRemove, - HoodieTableConfig tableConfig) { - String payloadClass = tableConfig.getPayloadClass(); + private void reconcileOrderingFieldsConfig(Map<ConfigProperty, String> tablePropsToAdd, Set<ConfigProperty> tablePropsToRemove, + HoodieTableConfig tableConfig, HoodieWriteConfig config) { + String payloadClass = tableConfig.getPayloadClassIfPresent() + .orElse(config.getPayloadClass()); Option<String> orderingFieldsOpt; if (MySqlDebeziumAvroPayload.class.getName().equals(payloadClass)) { - orderingFieldsOpt = Option.of(FLATTENED_FILE_COL_NAME + "," + FLATTENED_POS_COL_NAME); + orderingFieldsOpt = Option.of(MySqlDebeziumAvroPayload.ORDERING_FIELDS); } else if (PostgresDebeziumAvroPayload.class.getName().equals(payloadClass)) { orderingFieldsOpt = Option.of(DebeziumConstants.FLATTENED_LSN_COL_NAME); } else { diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java index eb885f8be5ad..58946703091c 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java @@ -56,9 +56,11 @@ import org.mockito.MockedStatic; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Stream; @@ -117,6 +119,7 @@ class TestEightToNineUpgradeHandler { when(table.getMetaClient()).thenReturn(metaClient); when(metaClient.getTableConfig()).thenReturn(tableConfig); when(config.autoUpgrade()).thenReturn(true); + when(config.getPayloadClass()).thenReturn(null); // Setup common mocks when(upgradeDowngradeHelper.getTable(config, context)).thenReturn(table); @@ -125,6 +128,7 @@ class TestEightToNineUpgradeHandler { when(metaClient.getStorage()).thenReturn(storage); when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.EIGHT); when(tableConfig.getOrderingFieldsStr()).thenReturn(Option.empty()); + when(tableConfig.getPayloadClassIfPresent()).thenReturn(Option.empty()); // Use a temp file for index definition path indexDefPath = new StoragePath(tempDir.resolve("index.json").toString()); @@ -143,81 +147,55 @@ class TestEightToNineUpgradeHandler { } static Stream<Arguments> payloadClassTestCases() { - return Stream.of( - Arguments.of( - DefaultHoodieRecordPayload.class.getName(), - "", - null, - null, - "DefaultHoodieRecordPayload" - ), - Arguments.of( - EventTimeAvroPayload.class.getName(), - "", - EVENT_TIME_ORDERING.name(), - null, - "EventTimeAvroPayload" - ), - Arguments.of( - OverwriteWithLatestAvroPayload.class.getName(), - "", - null, - null, - "OverwriteWithLatestAvroPayload" - ), - Arguments.of( - AWSDmsAvroPayload.class.getName(), - RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY + "=Op," - + RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER + "=D", // mergeProperties - COMMIT_TIME_ORDERING.name(), - null, - "AWSDmsAvroPayload" - ), - Arguments.of( - PostgresDebeziumAvroPayload.class.getName(), - RECORD_MERGE_PROPERTY_PREFIX + PARTIAL_UPDATE_UNAVAILABLE_VALUE + "=" + DEBEZIUM_UNAVAILABLE_VALUE + "," - + RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY + "=_change_operation_type," - + RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER + "=d", - EVENT_TIME_ORDERING.name(), - FILL_UNAVAILABLE.name(), - "PostgresDebeziumAvroPayload" - ), - Arguments.of( - PartialUpdateAvroPayload.class.getName(), - "", - EVENT_TIME_ORDERING.name(), - PartialUpdateMode.IGNORE_DEFAULTS.name(), - "PartialUpdateAvroPayload" - ), - Arguments.of( - MySqlDebeziumAvroPayload.class.getName(), - RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY + "=_change_operation_type," - + RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER + "=d", - EVENT_TIME_ORDERING.name(), - null, - "MySqlDebeziumAvroPayload" - ), - Arguments.of( - OverwriteNonDefaultsWithLatestAvroPayload.class.getName(), - "", - COMMIT_TIME_ORDERING.name(), - PartialUpdateMode.IGNORE_DEFAULTS.name(), - "OverwriteNonDefaultsWithLatestAvroPayload" - ) - ); + List<Arguments> arguments = new ArrayList<>(); + arguments.addAll(getArguments(DefaultHoodieRecordPayload.class.getName(), "", + null, null, "DefaultHoodieRecordPayload")); + arguments.addAll(getArguments(EventTimeAvroPayload.class.getName(), "", + EVENT_TIME_ORDERING.name(), null, "EventTimeAvroPayload")); + arguments.addAll(getArguments(OverwriteWithLatestAvroPayload.class.getName(), "", + null, null, "OverwriteWithLatestAvroPayload")); + arguments.addAll(getArguments(AWSDmsAvroPayload.class.getName(), RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY + "=Op," + + RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER + "=D", // mergeProperties + COMMIT_TIME_ORDERING.name(), null, "AWSDmsAvroPayload")); + arguments.addAll(getArguments(PostgresDebeziumAvroPayload.class.getName(), RECORD_MERGE_PROPERTY_PREFIX + PARTIAL_UPDATE_UNAVAILABLE_VALUE + "=" + DEBEZIUM_UNAVAILABLE_VALUE + "," + + RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY + "=_change_operation_type," + + RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER + "=d", + EVENT_TIME_ORDERING.name(), FILL_UNAVAILABLE.name(), "PostgresDebeziumAvroPayload")); + arguments.addAll(getArguments(PartialUpdateAvroPayload.class.getName(), "", + EVENT_TIME_ORDERING.name(), PartialUpdateMode.IGNORE_DEFAULTS.name(), "PartialUpdateAvroPayload")); + arguments.addAll(getArguments(MySqlDebeziumAvroPayload.class.getName(), RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY + "=_change_operation_type," + + RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER + "=d", + EVENT_TIME_ORDERING.name(), null, "MySqlDebeziumAvroPayload")); + arguments.addAll(getArguments(OverwriteNonDefaultsWithLatestAvroPayload.class.getName(), "", + COMMIT_TIME_ORDERING.name(), PartialUpdateMode.IGNORE_DEFAULTS.name(), "OverwriteNonDefaultsWithLatestAvroPayload")); + return arguments.stream(); + } + + private static List<Arguments> getArguments(String payloadClassName, String expectedMergeProperties, + String expectedRecordMergeMode, String expectedPartialUpdateMode, + String testName) { + return Arrays.asList( + Arguments.of(payloadClassName, expectedMergeProperties, + expectedRecordMergeMode, expectedPartialUpdateMode, testName, true), + Arguments.of(payloadClassName, expectedMergeProperties, + expectedRecordMergeMode, expectedPartialUpdateMode, testName, false)); } @ParameterizedTest(name = "testUpgradeWith{4}") @MethodSource("payloadClassTestCases") void testUpgradeWithPayloadClass(String payloadClassName, String expectedMergeProperties, String expectedRecordMergeMode, String expectedPartialUpdateMode, - String testName) { + String testName, boolean isPayloadClassConfiguredInTableConfig) { try (org.mockito.MockedStatic<UpgradeDowngradeUtils> utilities = org.mockito.Mockito.mockStatic(UpgradeDowngradeUtils.class)) { utilities.when(() -> UpgradeDowngradeUtils.rollbackFailedWritesAndCompact( any(), any(), any(), any(), anyBoolean(), any())) .thenAnswer(invocation -> null); - when(tableConfig.getPayloadClass()).thenReturn(payloadClassName); + if (isPayloadClassConfiguredInTableConfig) { + when(tableConfig.getPayloadClassIfPresent()).thenReturn(Option.ofNullable(payloadClassName)); + } else { + when(config.getPayloadClass()).thenReturn(payloadClassName); + } when(tableConfig.getTableType()).thenReturn(HoodieTableType.MERGE_ON_READ); when(tableConfig.getRecordMergeStrategyId()).thenReturn(HoodieRecordMerger.CUSTOM_MERGE_STRATEGY_UUID); when(metaClient.getIndexMetadata()).thenReturn(Option.empty()); @@ -301,6 +279,7 @@ class TestEightToNineUpgradeHandler { if (payloadClass.equals(MySqlDebeziumAvroPayload.class.getName())) { assertTrue(propertiesToAdd.containsKey(HoodieTableConfig.ORDERING_FIELDS)); assertEquals(FLATTENED_FILE_COL_NAME + "," + FLATTENED_POS_COL_NAME, propertiesToAdd.get(HoodieTableConfig.ORDERING_FIELDS)); + assertTrue(propertiesToRemove.contains(HoodieTableConfig.PRECOMBINE_FIELD)); } else if (payloadClass.equals(PostgresDebeziumAvroPayload.class.getName())) { assertEquals(FLATTENED_LSN_COL_NAME, propertiesToAdd.get(HoodieTableConfig.ORDERING_FIELDS)); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java index af838e31bade..45856dbf4f2e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java @@ -184,19 +184,7 @@ public interface HoodieRecordPayload<T extends HoodieRecordPayload> extends Seri } static String getPayloadClassName(Properties props) { - Option<String> payloadOpt = getPayloadClassNameIfPresent(props); - if (payloadOpt.isPresent()) { - return payloadOpt.get(); - } - // Note: starting from version 9, payload class is not necessary set, but - // merge mode must exist. Therefore, we use merge mode to infer - // the payload class for certain corner cases, like for MIT command. - if (ConfigUtils.containsConfigProperty(props, RECORD_MERGE_MODE) - && ConfigUtils.getStringWithAltKeys(props, RECORD_MERGE_MODE, StringUtils.EMPTY_STRING) - .equals(RecordMergeMode.COMMIT_TIME_ORDERING.name())) { - return OverwriteWithLatestAvroPayload.class.getName(); - } - return HoodieTableConfig.getDefaultPayloadClassName(); + return getPayloadClassNameIfPresent(props).orElse(HoodieTableConfig.getDefaultPayloadClassName()); } // NOTE: PAYLOAD_CLASS_NAME is before LEGACY_PAYLOAD_CLASS_NAME to make sure @@ -207,6 +195,13 @@ public interface HoodieRecordPayload<T extends HoodieRecordPayload> extends Seri payloadClassName = ConfigUtils.getStringWithAltKeys(props, PAYLOAD_CLASS_NAME); } else if (props.containsKey("hoodie.datasource.write.payload.class")) { payloadClassName = props.getProperty("hoodie.datasource.write.payload.class"); + } else if (ConfigUtils.containsConfigProperty(props, RECORD_MERGE_MODE) + && ConfigUtils.getStringWithAltKeys(props, RECORD_MERGE_MODE, StringUtils.EMPTY_STRING) + .equals(RecordMergeMode.COMMIT_TIME_ORDERING.name())) { + // Note: starting from version 9, payload class is not necessary set, but + // merge mode must exist. Therefore, we use merge mode to infer + // the payload class for certain corner cases, like for MIT command. + payloadClassName = OverwriteWithLatestAvroPayload.class.getName(); } // There could be tables written with payload class from com.uber.hoodie. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/MySqlDebeziumAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/MySqlDebeziumAvroPayload.java index 46bf1b896ca2..aa3cf5eaea61 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/MySqlDebeziumAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/MySqlDebeziumAvroPayload.java @@ -32,6 +32,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Objects; +import static org.apache.hudi.common.model.debezium.DebeziumConstants.FLATTENED_FILE_COL_NAME; +import static org.apache.hudi.common.model.debezium.DebeziumConstants.FLATTENED_POS_COL_NAME; + /** * Provides support for seamlessly applying changes captured via Debezium for MysqlDB. * <p> @@ -48,6 +51,8 @@ public class MySqlDebeziumAvroPayload extends AbstractDebeziumAvroPayload { private static final Logger LOG = LoggerFactory.getLogger(MySqlDebeziumAvroPayload.class); + public static final String ORDERING_FIELDS = FLATTENED_FILE_COL_NAME + "," + FLATTENED_POS_COL_NAME; + public MySqlDebeziumAvroPayload(GenericRecord record, Comparable orderingVal) { super(record, orderingVal); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index 9b24f7d0c989..c5bcfbaff6f6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -801,6 +801,13 @@ public class HoodieTableConfig extends HoodieConfig { return HoodieRecordPayload.getPayloadClassName(this); } + /** + * Read the payload class if present for HoodieRecords from the table properties. + */ + public Option<String> getPayloadClassIfPresent() { + return HoodieRecordPayload.getPayloadClassNameIfPresent(this.getProps()); + } + public String getLegacyPayloadClass() { return getStringOrDefault(LEGACY_PAYLOAD_CLASS_NAME, EMPTY_STRING); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java index fda1b8b6391d..6b3c1b2f94a4 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java @@ -161,7 +161,6 @@ public class HoodieStreamer implements Serializable { cfg.recordMergeMode, cfg.payloadClassName, cfg.recordMergeStrategyId, cfg.sourceOrderingFields, HoodieTableVersion.fromVersionCode(ConfigUtils.getIntWithAltKeys(this.properties, HoodieWriteConfig.WRITE_TABLE_VERSION))); cfg.recordMergeMode = mergingConfigs.getLeft(); - cfg.payloadClassName = mergingConfigs.getMiddle(); cfg.recordMergeStrategyId = mergingConfigs.getRight(); if (cfg.initialCheckpointProvider != null && cfg.checkpoint == null) { InitialCheckPointProvider checkPointProvider = diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java index 2d6b27f13434..7ab24034b633 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java @@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieSparkRecord; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.read.DeleteContext; import org.apache.hudi.common.util.ConfigUtils; import org.apache.hudi.common.util.Either; @@ -88,9 +89,10 @@ public class HoodieStreamerUtils { */ public static Option<JavaRDD<HoodieRecord>> createHoodieRecords(HoodieStreamer.Config cfg, TypedProperties props, Option<JavaRDD<GenericRecord>> avroRDDOptional, SchemaProvider schemaProvider, HoodieRecord.HoodieRecordType recordType, boolean autoGenerateRecordKeys, - String instantTime, Option<BaseErrorTableWriter> errorTableWriter) { + String instantTime, Option<BaseErrorTableWriter> errorTableWriter, HoodieTableConfig tableConfig) { boolean shouldCombine = cfg.filterDupes || cfg.operation.equals(WriteOperationType.UPSERT); - boolean shouldUseOrderingField = shouldCombine && !StringUtils.isNullOrEmpty(cfg.sourceOrderingFields); + String orderingFieldsStr = tableConfig.getOrderingFieldsStr().orElse(cfg.sourceOrderingFields); + boolean shouldUseOrderingField = shouldCombine && !StringUtils.isNullOrEmpty(orderingFieldsStr); boolean shouldErrorTable = errorTableWriter.isPresent() && props.getBoolean(ERROR_ENABLE_VALIDATE_RECORD_CREATION.key(), ERROR_ENABLE_VALIDATE_RECORD_CREATION.defaultValue()); boolean useConsistentLogicalTimestamp = ConfigUtils.getBooleanWithAltKeys( props, KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED); @@ -129,7 +131,7 @@ public class HoodieStreamerUtils { GenericRecord gr = isDropPartitionColumns(props) ? HoodieAvroUtils.removeFields(genRec, partitionColumns) : genRec; boolean isDelete = AvroRecordContext.getFieldAccessorInstance().isDeleteRecord(gr, deleteContext); Comparable orderingValue = shouldUseOrderingField - ? OrderingValues.create(cfg.sourceOrderingFields.split(","), + ? OrderingValues.create(orderingFieldsStr.split(","), field -> (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, field, false, useConsistentLogicalTimestamp)) : null; HoodieRecord record = shouldUseOrderingField ? HoodieRecordUtils.createHoodieRecord(gr, orderingValue, hoodieKey, payloadClassName, requiresPayload, isDelete) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java index 5985abcf5423..fb55eb144cb4 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java @@ -47,6 +47,8 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.model.debezium.DebeziumConstants; +import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; @@ -75,6 +77,7 @@ import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieMetaSyncException; +import org.apache.hudi.exception.HoodieValidationException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HiveSyncTool; @@ -951,10 +954,11 @@ public class StreamSync implements Serializable, Closeable { BaseDatasetBulkInsertCommitActionExecutor executor = new HoodieStreamerDatasetBulkInsertCommitActionExecutor(hoodieWriteConfig, writeClient, instantTime); writeClientWriteResult = new WriteClientWriteResult(executor.execute(df, !HoodieStreamerUtils.getPartitionColumns(props).isEmpty()).getWriteStatuses()); } else { + metaClient = HoodieTableMetaClient.reload(metaClient); TypedProperties mergeProps = ConfigUtils.getMergeProps(props, metaClient.getTableConfig()); HoodieRecordType recordType = createRecordMerger(mergeProps).getRecordType(); Option<JavaRDD<HoodieRecord>> recordsOption = HoodieStreamerUtils.createHoodieRecords(cfg, mergeProps, inputBatch.getBatch(), inputBatch.getSchemaProvider(), - recordType, autoGenerateRecordKeys, instantTime, errorTableWriter); + recordType, autoGenerateRecordKeys, instantTime, errorTableWriter, metaClient.getTableConfig()); JavaRDD<HoodieRecord> records = recordsOption.orElseGet(() -> hoodieSparkContext.emptyRDD()); // filter dupes if needed if (cfg.filterDupes) { @@ -1149,6 +1153,20 @@ public class StreamSync implements Serializable, Closeable { if (!StringUtils.isNullOrEmpty(cfg.recordMergeStrategyId)) { builder.withRecordMergeStrategyId(cfg.recordMergeStrategyId); } + + if (metaClient != null) { + HoodieTableConfig tableConfig = metaClient.getTableConfig(); + // After upgrade to table version 9 with MySqlDebeziumAvroPayload, ordering fields are changed from + // `_event_seq` to `_event_bin_file,_event_pos`. The logic here ensures that deltastreamer config is updated + // if it points to older ordering field `_event_seq`. + if (tableConfig.getTableVersion().greaterThanOrEquals(HoodieTableVersion.NINE) && tableConfig.getLegacyPayloadClass().equals(MySqlDebeziumAvroPayload.class.getCanonicalName()) + && cfg.sourceOrderingFields.equals(DebeziumConstants.ADDED_SEQ_COL_NAME)) { + cfg.sourceOrderingFields = MySqlDebeziumAvroPayload.ORDERING_FIELDS; + } else if (tableConfig.getOrderingFieldsStr().isPresent() && !StringUtils.isNullOrEmpty(cfg.sourceOrderingFields) + && !tableConfig.getOrderingFieldsStr().orElse("").equals(cfg.sourceOrderingFields)) { + throw new HoodieValidationException(String.format("Configured ordering fields: %s do not match table ordering fields: %s", cfg.sourceOrderingFields, tableConfig.getOrderingFields())); + } + } HoodiePayloadConfig.Builder payloadConfigBuilder = HoodiePayloadConfig.newBuilder().withPayloadOrderingFields(cfg.sourceOrderingFields); // Payload class can be NULL. diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index 42f5aab7559a..a5c393af1a0a 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -74,6 +74,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.metrics.HoodieMetricsConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieValidationException; import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode; import org.apache.hudi.hadoop.fs.HadoopFSUtils; @@ -2267,6 +2268,33 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath); } + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testDeltaStreamerFailureWithChangingOrderingFields(HoodieTableType tableType) throws Exception { + String tableBasePath = basePath + "/test_with_changing_ordering_fields"; + HoodieDeltaStreamer.Config cfg = + TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT); + cfg.tableType = tableType.name(); + cfg.filterDupes = true; + cfg.sourceOrderingFields = "timestamp,rider"; + cfg.recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING; + cfg.payloadClassName = DefaultHoodieRecordPayload.class.getName(); + cfg.recordMergeStrategyId = HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID; + + TestDataSource.recordInstantTime = Option.of("001"); + new HoodieStreamer(cfg, jsc).sync(); + assertRecordCount(1000, tableBasePath, sqlContext); + TestHelpers.assertCommitMetadata("00000", tableBasePath, 1); + + // Change ordering fields in deltastreamer + Exception e = assertThrows(HoodieValidationException.class, () -> { + cfg.sourceOrderingFields = "timestamp"; + TestDataSource.recordInstantTime = Option.of("002"); + runStreamSync(cfg, false, 10, WriteOperationType.UPSERT); + }); + assertTrue(e.getMessage().equals("Configured ordering fields: timestamp do not match table ordering fields: [timestamp, rider]")); + } + private static long getNumUpdates(HoodieCommitMetadata metadata) { return metadata.getPartitionToWriteStats().values().stream() .flatMap(Collection::stream) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieStreamerUtils.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieStreamerUtils.java index 1c40eedf588c..eae9c0f0308d 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieStreamerUtils.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieStreamerUtils.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieKeyException; import org.apache.hudi.exception.HoodieRecordCreationException; @@ -108,7 +109,7 @@ public class TestHoodieStreamerUtils extends UtilitiesTestBase { doNothing().when(errorTableWriter.get()).addErrorEvents(errorEventCaptor.capture()); } Option<JavaRDD<HoodieRecord>> recordOpt = HoodieStreamerUtils.createHoodieRecords(cfg, props, Option.of(recordRdd), - new SimpleSchemaProvider(jsc, schema, props), recordType, false, "000", errorTableWriter); + new SimpleSchemaProvider(jsc, schema, props), recordType, false, "000", errorTableWriter, new HoodieTableConfig()); if (errorTableWriter.isPresent()) { assertEquals(0, errorEventCaptor.getValue().collect().size()); @@ -153,7 +154,7 @@ public class TestHoodieStreamerUtils extends UtilitiesTestBase { doNothing().when(errorTableWriter.get()).addErrorEvents(errorEventCaptor.capture()); } Option<JavaRDD<HoodieRecord>> records = HoodieStreamerUtils.createHoodieRecords(cfg, props, Option.of(recordRdd), - schemaProvider, recordType, false, "000", errorTableWriter); + schemaProvider, recordType, false, "000", errorTableWriter, new HoodieTableConfig()); assertTrue(records.isPresent()); if (enableErrorTableWriter) {
