This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-1.1.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 72f4452981eb9299cbb84508570f9e702622e341
Author: Lokesh Jain <[email protected]>
AuthorDate: Tue Oct 28 07:50:23 2025 +0530

    fix: Fix upgrade handling for MySqlDebeziumAvroPayload with deltastreamer 
(#14159)
    
    
    ---------
    
    Co-authored-by: Lokesh Jain <[email protected]>
    Co-authored-by: Lokesh Jain <[email protected]>
    Co-authored-by: sivabalan <[email protected]>
---
 .../table/upgrade/EightToNineUpgradeHandler.java   |  44 ++++-----
 .../upgrade/TestEightToNineUpgradeHandler.java     | 107 +++++++++------------
 .../hudi/common/model/HoodieRecordPayload.java     |  21 ++--
 .../model/debezium/MySqlDebeziumAvroPayload.java   |   5 +
 .../hudi/common/table/HoodieTableConfig.java       |   7 ++
 .../hudi/utilities/streamer/HoodieStreamer.java    |   1 -
 .../utilities/streamer/HoodieStreamerUtils.java    |   8 +-
 .../apache/hudi/utilities/streamer/StreamSync.java |  20 +++-
 .../deltastreamer/TestHoodieDeltaStreamer.java     |  28 ++++++
 .../streamer/TestHoodieStreamerUtils.java          |   5 +-
 10 files changed, 140 insertions(+), 106 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java
index 788a6deb7171..1621cecfd65d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java
@@ -48,8 +48,6 @@ import java.util.Set;
 
 import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
 import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
-import static 
org.apache.hudi.common.model.debezium.DebeziumConstants.FLATTENED_FILE_COL_NAME;
-import static 
org.apache.hudi.common.model.debezium.DebeziumConstants.FLATTENED_POS_COL_NAME;
 import static 
org.apache.hudi.common.model.HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID;
 import static 
org.apache.hudi.common.model.HoodieRecordMerger.CUSTOM_MERGE_STRATEGY_UUID;
 import static 
org.apache.hudi.common.model.HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
@@ -127,22 +125,22 @@ public class EightToNineUpgradeHandler implements 
UpgradeHandler {
           metaClient.getTableConfig().getTableVersion());
     }
     // Handle merge mode config.
-    reconcileMergeModeConfig(tablePropsToAdd, tablePropsToRemove, tableConfig);
+    reconcileMergeModeConfig(tablePropsToAdd, tablePropsToRemove, tableConfig, 
config);
     // Handle partial update mode config.
-    reconcilePartialUpdateModeConfig(tablePropsToAdd, tableConfig);
+    reconcilePartialUpdateModeConfig(tablePropsToAdd, tableConfig, config);
     // Handle merge properties config.
     reconcileMergePropertiesConfig(tablePropsToAdd, tableConfig, config);
     // Handle payload class configs.
-    reconcilePayloadClassConfig(tablePropsToAdd, tablePropsToRemove, 
tableConfig);
+    reconcilePayloadClassConfig(tablePropsToAdd, tablePropsToRemove, 
tableConfig, config);
     // Handle ordering fields config.
-    reconcileOrderingFieldsConfig(tablePropsToAdd, tablePropsToRemove, 
tableConfig);
+    reconcileOrderingFieldsConfig(tablePropsToAdd, tablePropsToRemove, 
tableConfig, config);
     return new UpgradeDowngrade.TableConfigChangeSet(tablePropsToAdd, 
tablePropsToRemove);
   }
 
-  private void reconcileMergeModeConfig(Map<ConfigProperty, String> 
tablePropsToAdd,
-                                        Set<ConfigProperty> tablePropsToRemove,
-                                        HoodieTableConfig tableConfig) {
-    String payloadClass = tableConfig.getPayloadClass();
+  private void reconcileMergeModeConfig(Map<ConfigProperty, String> 
tablePropsToAdd, Set<ConfigProperty> tablePropsToRemove,
+                                        HoodieTableConfig tableConfig, 
HoodieWriteConfig config) {
+    String payloadClass = tableConfig.getPayloadClassIfPresent()
+        .orElse(config.getPayloadClass());
     RecordMergeMode mergeMode = tableConfig.getRecordMergeMode();
     if (mergeMode != RecordMergeMode.CUSTOM) {
       // For commit time or event time based table, remove merge strategy id.
@@ -162,10 +160,10 @@ public class EightToNineUpgradeHandler implements 
UpgradeHandler {
     // else: No op, which means merge strategy id and merge mode are not 
changed.
   }
 
-  private void reconcilePayloadClassConfig(Map<ConfigProperty, String> 
tablePropsToAdd,
-                                           Set<ConfigProperty> 
tablePropsToRemove,
-                                           HoodieTableConfig tableConfig) {
-    String payloadClass = tableConfig.getPayloadClass();
+  private void reconcilePayloadClassConfig(Map<ConfigProperty, String> 
tablePropsToAdd, Set<ConfigProperty> tablePropsToRemove,
+                                           HoodieTableConfig tableConfig, 
HoodieWriteConfig config) {
+    String payloadClass = tableConfig.getPayloadClassIfPresent()
+        .orElse(config.getPayloadClass());
     if (StringUtils.isNullOrEmpty(payloadClass)) {
       return;
     }
@@ -176,8 +174,9 @@ public class EightToNineUpgradeHandler implements 
UpgradeHandler {
   }
 
   private void reconcilePartialUpdateModeConfig(Map<ConfigProperty, String> 
tablePropsToAdd,
-                                                HoodieTableConfig tableConfig) 
{
-    String payloadClass = tableConfig.getPayloadClass();
+                                                HoodieTableConfig tableConfig, 
HoodieWriteConfig config) {
+    String payloadClass = tableConfig.getPayloadClassIfPresent()
+        .orElse(config.getPayloadClass());
     if (StringUtils.isNullOrEmpty(payloadClass)) {
       return;
     }
@@ -190,7 +189,8 @@ public class EightToNineUpgradeHandler implements 
UpgradeHandler {
   }
 
   private void reconcileMergePropertiesConfig(Map<ConfigProperty, String> 
tablePropsToAdd, HoodieTableConfig tableConfig, HoodieWriteConfig writeConfig) {
-    String payloadClass = tableConfig.getPayloadClass();
+    String payloadClass = tableConfig.getPayloadClassIfPresent()
+        .orElse(writeConfig.getPayloadClass());
     if (StringUtils.isNullOrEmpty(payloadClass)) {
       return;
     }
@@ -224,13 +224,13 @@ public class EightToNineUpgradeHandler implements 
UpgradeHandler {
     }
   }
 
-  private void reconcileOrderingFieldsConfig(Map<ConfigProperty, String> 
tablePropsToAdd,
-                                             Set<ConfigProperty> 
tablePropsToRemove,
-                                             HoodieTableConfig tableConfig) {
-    String payloadClass = tableConfig.getPayloadClass();
+  private void reconcileOrderingFieldsConfig(Map<ConfigProperty, String> 
tablePropsToAdd, Set<ConfigProperty> tablePropsToRemove,
+                                             HoodieTableConfig tableConfig, 
HoodieWriteConfig config) {
+    String payloadClass = tableConfig.getPayloadClassIfPresent()
+        .orElse(config.getPayloadClass());
     Option<String> orderingFieldsOpt;
     if (MySqlDebeziumAvroPayload.class.getName().equals(payloadClass)) {
-      orderingFieldsOpt = Option.of(FLATTENED_FILE_COL_NAME + "," + 
FLATTENED_POS_COL_NAME);
+      orderingFieldsOpt = Option.of(MySqlDebeziumAvroPayload.ORDERING_FIELDS);
     } else if 
(PostgresDebeziumAvroPayload.class.getName().equals(payloadClass)) {
       orderingFieldsOpt = Option.of(DebeziumConstants.FLATTENED_LSN_COL_NAME);
     } else {
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java
index eb885f8be5ad..58946703091c 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java
@@ -56,9 +56,11 @@ import org.mockito.MockedStatic;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.file.Path;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Stream;
@@ -117,6 +119,7 @@ class TestEightToNineUpgradeHandler {
     when(table.getMetaClient()).thenReturn(metaClient);
     when(metaClient.getTableConfig()).thenReturn(tableConfig);
     when(config.autoUpgrade()).thenReturn(true);
+    when(config.getPayloadClass()).thenReturn(null);
 
     // Setup common mocks
     when(upgradeDowngradeHelper.getTable(config, context)).thenReturn(table);
@@ -125,6 +128,7 @@ class TestEightToNineUpgradeHandler {
     when(metaClient.getStorage()).thenReturn(storage);
     when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.EIGHT);
     when(tableConfig.getOrderingFieldsStr()).thenReturn(Option.empty());
+    when(tableConfig.getPayloadClassIfPresent()).thenReturn(Option.empty());
 
     // Use a temp file for index definition path
     indexDefPath = new StoragePath(tempDir.resolve("index.json").toString());
@@ -143,81 +147,55 @@ class TestEightToNineUpgradeHandler {
   }
 
   static Stream<Arguments> payloadClassTestCases() {
-    return Stream.of(
-        Arguments.of(
-            DefaultHoodieRecordPayload.class.getName(),
-            "",
-            null,
-            null,
-            "DefaultHoodieRecordPayload"
-        ),
-        Arguments.of(
-            EventTimeAvroPayload.class.getName(),
-            "",
-            EVENT_TIME_ORDERING.name(),
-            null,
-            "EventTimeAvroPayload"
-        ),
-        Arguments.of(
-            OverwriteWithLatestAvroPayload.class.getName(),
-            "",
-            null,
-            null,
-            "OverwriteWithLatestAvroPayload"
-        ),
-        Arguments.of(
-            AWSDmsAvroPayload.class.getName(),
-            RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY + "=Op,"
-                + RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER + "=D", // 
mergeProperties
-            COMMIT_TIME_ORDERING.name(),
-            null,
-            "AWSDmsAvroPayload"
-        ),
-        Arguments.of(
-            PostgresDebeziumAvroPayload.class.getName(),
-            RECORD_MERGE_PROPERTY_PREFIX + PARTIAL_UPDATE_UNAVAILABLE_VALUE + 
"=" + DEBEZIUM_UNAVAILABLE_VALUE + ","
-                + RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY + 
"=_change_operation_type,"
-                + RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER + "=d",
-            EVENT_TIME_ORDERING.name(),
-            FILL_UNAVAILABLE.name(),
-            "PostgresDebeziumAvroPayload"
-        ),
-        Arguments.of(
-            PartialUpdateAvroPayload.class.getName(),
-            "",
-            EVENT_TIME_ORDERING.name(),
-            PartialUpdateMode.IGNORE_DEFAULTS.name(),
-            "PartialUpdateAvroPayload"
-        ),
-        Arguments.of(
-            MySqlDebeziumAvroPayload.class.getName(),
-            RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY + 
"=_change_operation_type,"
-                + RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER + "=d",
-            EVENT_TIME_ORDERING.name(),
-            null,
-            "MySqlDebeziumAvroPayload"
-        ),
-        Arguments.of(
-            OverwriteNonDefaultsWithLatestAvroPayload.class.getName(),
-            "",
-            COMMIT_TIME_ORDERING.name(),
-            PartialUpdateMode.IGNORE_DEFAULTS.name(),
-            "OverwriteNonDefaultsWithLatestAvroPayload"
-        )
-    );
+    List<Arguments> arguments = new ArrayList<>();
+    arguments.addAll(getArguments(DefaultHoodieRecordPayload.class.getName(), 
"",
+        null, null, "DefaultHoodieRecordPayload"));
+    arguments.addAll(getArguments(EventTimeAvroPayload.class.getName(), "",
+        EVENT_TIME_ORDERING.name(), null, "EventTimeAvroPayload"));
+    
arguments.addAll(getArguments(OverwriteWithLatestAvroPayload.class.getName(), 
"",
+        null, null, "OverwriteWithLatestAvroPayload"));
+    arguments.addAll(getArguments(AWSDmsAvroPayload.class.getName(), 
RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY + "=Op,"
+            + RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER + "=D", // 
mergeProperties
+        COMMIT_TIME_ORDERING.name(), null, "AWSDmsAvroPayload"));
+    arguments.addAll(getArguments(PostgresDebeziumAvroPayload.class.getName(), 
RECORD_MERGE_PROPERTY_PREFIX + PARTIAL_UPDATE_UNAVAILABLE_VALUE + "=" + 
DEBEZIUM_UNAVAILABLE_VALUE + ","
+            + RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY + 
"=_change_operation_type,"
+            + RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER + "=d",
+        EVENT_TIME_ORDERING.name(), FILL_UNAVAILABLE.name(), 
"PostgresDebeziumAvroPayload"));
+    arguments.addAll(getArguments(PartialUpdateAvroPayload.class.getName(), "",
+        EVENT_TIME_ORDERING.name(), PartialUpdateMode.IGNORE_DEFAULTS.name(), 
"PartialUpdateAvroPayload"));
+    arguments.addAll(getArguments(MySqlDebeziumAvroPayload.class.getName(), 
RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY + "=_change_operation_type,"
+            + RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER + "=d",
+        EVENT_TIME_ORDERING.name(), null, "MySqlDebeziumAvroPayload"));
+    
arguments.addAll(getArguments(OverwriteNonDefaultsWithLatestAvroPayload.class.getName(),
 "",
+        COMMIT_TIME_ORDERING.name(), PartialUpdateMode.IGNORE_DEFAULTS.name(), 
"OverwriteNonDefaultsWithLatestAvroPayload"));
+    return arguments.stream();
+  }
+
+  private static List<Arguments> getArguments(String payloadClassName, String 
expectedMergeProperties,
+                                              String expectedRecordMergeMode, 
String expectedPartialUpdateMode,
+                                              String testName) {
+    return Arrays.asList(
+        Arguments.of(payloadClassName, expectedMergeProperties,
+            expectedRecordMergeMode, expectedPartialUpdateMode, testName, 
true),
+        Arguments.of(payloadClassName, expectedMergeProperties,
+            expectedRecordMergeMode, expectedPartialUpdateMode, testName, 
false));
   }
 
   @ParameterizedTest(name = "testUpgradeWith{4}")
   @MethodSource("payloadClassTestCases")
   void testUpgradeWithPayloadClass(String payloadClassName, String 
expectedMergeProperties,
                                    String expectedRecordMergeMode, String 
expectedPartialUpdateMode,
-                                   String testName) {
+                                   String testName, boolean 
isPayloadClassConfiguredInTableConfig) {
     try (org.mockito.MockedStatic<UpgradeDowngradeUtils> utilities =
              org.mockito.Mockito.mockStatic(UpgradeDowngradeUtils.class)) {
       utilities.when(() -> 
UpgradeDowngradeUtils.rollbackFailedWritesAndCompact(
               any(), any(), any(), any(), anyBoolean(), any()))
           .thenAnswer(invocation -> null);
-      when(tableConfig.getPayloadClass()).thenReturn(payloadClassName);
+      if (isPayloadClassConfiguredInTableConfig) {
+        
when(tableConfig.getPayloadClassIfPresent()).thenReturn(Option.ofNullable(payloadClassName));
+      } else {
+        when(config.getPayloadClass()).thenReturn(payloadClassName);
+      }
       
when(tableConfig.getTableType()).thenReturn(HoodieTableType.MERGE_ON_READ);
       
when(tableConfig.getRecordMergeStrategyId()).thenReturn(HoodieRecordMerger.CUSTOM_MERGE_STRATEGY_UUID);
       when(metaClient.getIndexMetadata()).thenReturn(Option.empty());
@@ -301,6 +279,7 @@ class TestEightToNineUpgradeHandler {
     if (payloadClass.equals(MySqlDebeziumAvroPayload.class.getName())) {
       
assertTrue(propertiesToAdd.containsKey(HoodieTableConfig.ORDERING_FIELDS));
       assertEquals(FLATTENED_FILE_COL_NAME + "," + FLATTENED_POS_COL_NAME, 
propertiesToAdd.get(HoodieTableConfig.ORDERING_FIELDS));
+      
assertTrue(propertiesToRemove.contains(HoodieTableConfig.PRECOMBINE_FIELD));
     } else if 
(payloadClass.equals(PostgresDebeziumAvroPayload.class.getName())) {
       assertEquals(FLATTENED_LSN_COL_NAME, 
propertiesToAdd.get(HoodieTableConfig.ORDERING_FIELDS));
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
index af838e31bade..45856dbf4f2e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
@@ -184,19 +184,7 @@ public interface HoodieRecordPayload<T extends 
HoodieRecordPayload> extends Seri
   }
 
   static String getPayloadClassName(Properties props) {
-    Option<String> payloadOpt = getPayloadClassNameIfPresent(props);
-    if (payloadOpt.isPresent()) {
-      return payloadOpt.get();
-    }
-    // Note: starting from version 9, payload class is not necessary set, but
-    //       merge mode must exist. Therefore, we use merge mode to infer
-    //       the payload class for certain corner cases, like for MIT command.
-    if (ConfigUtils.containsConfigProperty(props, RECORD_MERGE_MODE)
-        && ConfigUtils.getStringWithAltKeys(props, RECORD_MERGE_MODE, 
StringUtils.EMPTY_STRING)
-        .equals(RecordMergeMode.COMMIT_TIME_ORDERING.name())) {
-      return OverwriteWithLatestAvroPayload.class.getName();
-    }
-    return HoodieTableConfig.getDefaultPayloadClassName();
+    return 
getPayloadClassNameIfPresent(props).orElse(HoodieTableConfig.getDefaultPayloadClassName());
   }
 
   // NOTE: PAYLOAD_CLASS_NAME is before LEGACY_PAYLOAD_CLASS_NAME to make sure
@@ -207,6 +195,13 @@ public interface HoodieRecordPayload<T extends 
HoodieRecordPayload> extends Seri
       payloadClassName = ConfigUtils.getStringWithAltKeys(props, 
PAYLOAD_CLASS_NAME);
     } else if (props.containsKey("hoodie.datasource.write.payload.class")) {
       payloadClassName = 
props.getProperty("hoodie.datasource.write.payload.class");
+    } else if (ConfigUtils.containsConfigProperty(props, RECORD_MERGE_MODE)
+        && ConfigUtils.getStringWithAltKeys(props, RECORD_MERGE_MODE, 
StringUtils.EMPTY_STRING)
+        .equals(RecordMergeMode.COMMIT_TIME_ORDERING.name())) {
+      // Note: starting from version 9, payload class is not necessary set, but
+      //       merge mode must exist. Therefore, we use merge mode to infer
+      //       the payload class for certain corner cases, like for MIT 
command.
+      payloadClassName = OverwriteWithLatestAvroPayload.class.getName();
     }
 
     // There could be tables written with payload class from com.uber.hoodie.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/MySqlDebeziumAvroPayload.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/MySqlDebeziumAvroPayload.java
index 46bf1b896ca2..aa3cf5eaea61 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/MySqlDebeziumAvroPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/MySqlDebeziumAvroPayload.java
@@ -32,6 +32,9 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.Objects;
 
+import static 
org.apache.hudi.common.model.debezium.DebeziumConstants.FLATTENED_FILE_COL_NAME;
+import static 
org.apache.hudi.common.model.debezium.DebeziumConstants.FLATTENED_POS_COL_NAME;
+
 /**
  * Provides support for seamlessly applying changes captured via Debezium for 
MysqlDB.
  * <p>
@@ -48,6 +51,8 @@ public class MySqlDebeziumAvroPayload extends 
AbstractDebeziumAvroPayload {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(MySqlDebeziumAvroPayload.class);
 
+  public static final String ORDERING_FIELDS = FLATTENED_FILE_COL_NAME + "," + 
FLATTENED_POS_COL_NAME;
+
   public MySqlDebeziumAvroPayload(GenericRecord record, Comparable 
orderingVal) {
     super(record, orderingVal);
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 9b24f7d0c989..c5bcfbaff6f6 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -801,6 +801,13 @@ public class HoodieTableConfig extends HoodieConfig {
     return HoodieRecordPayload.getPayloadClassName(this);
   }
 
+  /**
+   * Read the payload class if present for HoodieRecords from the table 
properties.
+   */
+  public Option<String> getPayloadClassIfPresent() {
+    return HoodieRecordPayload.getPayloadClassNameIfPresent(this.getProps());
+  }
+
   public String getLegacyPayloadClass() {
     return getStringOrDefault(LEGACY_PAYLOAD_CLASS_NAME, EMPTY_STRING);
   }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
index fda1b8b6391d..6b3c1b2f94a4 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
@@ -161,7 +161,6 @@ public class HoodieStreamer implements Serializable {
             cfg.recordMergeMode, cfg.payloadClassName, 
cfg.recordMergeStrategyId, cfg.sourceOrderingFields,
             
HoodieTableVersion.fromVersionCode(ConfigUtils.getIntWithAltKeys(this.properties,
 HoodieWriteConfig.WRITE_TABLE_VERSION)));
     cfg.recordMergeMode = mergingConfigs.getLeft();
-    cfg.payloadClassName = mergingConfigs.getMiddle();
     cfg.recordMergeStrategyId = mergingConfigs.getRight();
     if (cfg.initialCheckpointProvider != null && cfg.checkpoint == null) {
       InitialCheckPointProvider checkPointProvider =
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
index 2d6b27f13434..7ab24034b633 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieSparkRecord;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.read.DeleteContext;
 import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Either;
@@ -88,9 +89,10 @@ public class HoodieStreamerUtils {
    */
   public static Option<JavaRDD<HoodieRecord>> 
createHoodieRecords(HoodieStreamer.Config cfg, TypedProperties props, 
Option<JavaRDD<GenericRecord>> avroRDDOptional,
                                                                   
SchemaProvider schemaProvider, HoodieRecord.HoodieRecordType recordType, 
boolean autoGenerateRecordKeys,
-                                                                  String 
instantTime, Option<BaseErrorTableWriter> errorTableWriter) {
+                                                                  String 
instantTime, Option<BaseErrorTableWriter> errorTableWriter, HoodieTableConfig 
tableConfig) {
     boolean shouldCombine = cfg.filterDupes || 
cfg.operation.equals(WriteOperationType.UPSERT);
-    boolean shouldUseOrderingField = shouldCombine && 
!StringUtils.isNullOrEmpty(cfg.sourceOrderingFields);
+    String orderingFieldsStr = 
tableConfig.getOrderingFieldsStr().orElse(cfg.sourceOrderingFields);
+    boolean shouldUseOrderingField = shouldCombine && 
!StringUtils.isNullOrEmpty(orderingFieldsStr);
     boolean shouldErrorTable = errorTableWriter.isPresent() && 
props.getBoolean(ERROR_ENABLE_VALIDATE_RECORD_CREATION.key(), 
ERROR_ENABLE_VALIDATE_RECORD_CREATION.defaultValue());
     boolean useConsistentLogicalTimestamp = ConfigUtils.getBooleanWithAltKeys(
         props, 
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED);
@@ -129,7 +131,7 @@ public class HoodieStreamerUtils {
                   GenericRecord gr = isDropPartitionColumns(props) ? 
HoodieAvroUtils.removeFields(genRec, partitionColumns) : genRec;
                   boolean isDelete = 
AvroRecordContext.getFieldAccessorInstance().isDeleteRecord(gr, deleteContext);
                   Comparable orderingValue = shouldUseOrderingField
-                      ? 
OrderingValues.create(cfg.sourceOrderingFields.split(","),
+                      ? OrderingValues.create(orderingFieldsStr.split(","),
                          field -> (Comparable) 
HoodieAvroUtils.getNestedFieldVal(gr, field, false, 
useConsistentLogicalTimestamp))
                       : null;
                   HoodieRecord record = shouldUseOrderingField ? 
HoodieRecordUtils.createHoodieRecord(gr, orderingValue, hoodieKey, 
payloadClassName, requiresPayload, isDelete)
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 5985abcf5423..fb55eb144cb4 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -47,6 +47,8 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.model.debezium.DebeziumConstants;
+import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
@@ -75,6 +77,7 @@ import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieMetaSyncException;
+import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.hive.HiveSyncTool;
@@ -951,10 +954,11 @@ public class StreamSync implements Serializable, 
Closeable {
       BaseDatasetBulkInsertCommitActionExecutor executor = new 
HoodieStreamerDatasetBulkInsertCommitActionExecutor(hoodieWriteConfig, 
writeClient, instantTime);
       writeClientWriteResult = new WriteClientWriteResult(executor.execute(df, 
!HoodieStreamerUtils.getPartitionColumns(props).isEmpty()).getWriteStatuses());
     } else {
+      metaClient = HoodieTableMetaClient.reload(metaClient);
       TypedProperties mergeProps = ConfigUtils.getMergeProps(props, 
metaClient.getTableConfig());
       HoodieRecordType recordType = 
createRecordMerger(mergeProps).getRecordType();
       Option<JavaRDD<HoodieRecord>> recordsOption = 
HoodieStreamerUtils.createHoodieRecords(cfg, mergeProps, inputBatch.getBatch(), 
inputBatch.getSchemaProvider(),
-          recordType, autoGenerateRecordKeys, instantTime, errorTableWriter);
+          recordType, autoGenerateRecordKeys, instantTime, errorTableWriter, 
metaClient.getTableConfig());
       JavaRDD<HoodieRecord> records = recordsOption.orElseGet(() -> 
hoodieSparkContext.emptyRDD());
       // filter dupes if needed
       if (cfg.filterDupes) {
@@ -1149,6 +1153,20 @@ public class StreamSync implements Serializable, 
Closeable {
     if (!StringUtils.isNullOrEmpty(cfg.recordMergeStrategyId)) {
       builder.withRecordMergeStrategyId(cfg.recordMergeStrategyId);
     }
+
+    if (metaClient != null) {
+      HoodieTableConfig tableConfig = metaClient.getTableConfig();
+      // After upgrade to table version 9 with MySqlDebeziumAvroPayload, 
ordering fields are changed from
+      // `_event_seq` to `_event_bin_file,_event_pos`. The logic here ensures 
that deltastreamer config is updated
+      // if it points to older ordering field `_event_seq`.
+      if 
(tableConfig.getTableVersion().greaterThanOrEquals(HoodieTableVersion.NINE) && 
tableConfig.getLegacyPayloadClass().equals(MySqlDebeziumAvroPayload.class.getCanonicalName())
+          && 
cfg.sourceOrderingFields.equals(DebeziumConstants.ADDED_SEQ_COL_NAME)) {
+        cfg.sourceOrderingFields = MySqlDebeziumAvroPayload.ORDERING_FIELDS;
+      } else if (tableConfig.getOrderingFieldsStr().isPresent() && 
!StringUtils.isNullOrEmpty(cfg.sourceOrderingFields)
+          && 
!tableConfig.getOrderingFieldsStr().orElse("").equals(cfg.sourceOrderingFields))
 {
+        throw new HoodieValidationException(String.format("Configured ordering 
fields: %s do not match table ordering fields: %s", cfg.sourceOrderingFields, 
tableConfig.getOrderingFields()));
+      }
+    }
     HoodiePayloadConfig.Builder payloadConfigBuilder =
         
HoodiePayloadConfig.newBuilder().withPayloadOrderingFields(cfg.sourceOrderingFields);
     // Payload class can be NULL.
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 42f5aab7559a..a5c393af1a0a 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -74,6 +74,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.config.metrics.HoodieMetricsConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
@@ -2267,6 +2268,33 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
   }
 
+  @ParameterizedTest
+  @EnumSource(HoodieTableType.class)
+  public void 
testDeltaStreamerFailureWithChangingOrderingFields(HoodieTableType tableType) 
throws Exception {
+    String tableBasePath = basePath + "/test_with_changing_ordering_fields";
+    HoodieDeltaStreamer.Config cfg =
+        TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
+    cfg.tableType = tableType.name();
+    cfg.filterDupes = true;
+    cfg.sourceOrderingFields = "timestamp,rider";
+    cfg.recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING;
+    cfg.payloadClassName = DefaultHoodieRecordPayload.class.getName();
+    cfg.recordMergeStrategyId = 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
+
+    TestDataSource.recordInstantTime = Option.of("001");
+    new HoodieStreamer(cfg, jsc).sync();
+    assertRecordCount(1000, tableBasePath, sqlContext);
+    TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
+
+    // Change ordering fields in deltastreamer
+    Exception e = assertThrows(HoodieValidationException.class, () -> {
+      cfg.sourceOrderingFields = "timestamp";
+      TestDataSource.recordInstantTime = Option.of("002");
+      runStreamSync(cfg, false, 10, WriteOperationType.UPSERT);
+    });
+    assertTrue(e.getMessage().equals("Configured ordering fields: timestamp do 
not match table ordering fields: [timestamp, rider]"));
+  }
+
   private static long getNumUpdates(HoodieCommitMetadata metadata) {
     return metadata.getPartitionToWriteStats().values().stream()
         .flatMap(Collection::stream)
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieStreamerUtils.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieStreamerUtils.java
index 1c40eedf588c..eae9c0f0308d 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieStreamerUtils.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieStreamerUtils.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieKeyException;
 import org.apache.hudi.exception.HoodieRecordCreationException;
@@ -108,7 +109,7 @@ public class TestHoodieStreamerUtils extends 
UtilitiesTestBase {
       
doNothing().when(errorTableWriter.get()).addErrorEvents(errorEventCaptor.capture());
     }
     Option<JavaRDD<HoodieRecord>> recordOpt = 
HoodieStreamerUtils.createHoodieRecords(cfg, props, Option.of(recordRdd),
-        new SimpleSchemaProvider(jsc, schema, props), recordType, false, 
"000", errorTableWriter);
+        new SimpleSchemaProvider(jsc, schema, props), recordType, false, 
"000", errorTableWriter, new HoodieTableConfig());
 
     if (errorTableWriter.isPresent()) {
       assertEquals(0, errorEventCaptor.getValue().collect().size());
@@ -153,7 +154,7 @@ public class TestHoodieStreamerUtils extends 
UtilitiesTestBase {
       
doNothing().when(errorTableWriter.get()).addErrorEvents(errorEventCaptor.capture());
     }
     Option<JavaRDD<HoodieRecord>> records = 
HoodieStreamerUtils.createHoodieRecords(cfg, props, Option.of(recordRdd),
-        schemaProvider, recordType, false, "000", errorTableWriter);
+        schemaProvider, recordType, false, "000", errorTableWriter, new 
HoodieTableConfig());
     assertTrue(records.isPresent());
 
     if (enableErrorTableWriter) {

Reply via email to