This is an automated email from the ASF dual-hosted git repository.

vhs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1bb520c783ea fix: Propagate cfg.sourceOrderingFields in HoodieStreamer 
(#17984)
1bb520c783ea is described below

commit 1bb520c783ea820c67e71b5bb927226ad0f6b6e7
Author: voonhous <[email protected]>
AuthorDate: Fri Jan 23 10:44:30 2026 +0800

    fix: Propagate cfg.sourceOrderingFields in HoodieStreamer (#17984)
---
 .../hudi/utilities/streamer/HoodieStreamer.java    |  3 +++
 .../deltastreamer/TestHoodieDeltaStreamer.java     | 25 +++++++++++++++++++---
 2 files changed, 25 insertions(+), 3 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
index 3266025d4a89..850850356a00 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
@@ -204,6 +204,9 @@ public class HoodieStreamer implements Serializable {
         && !StringUtils.isNullOrEmpty(cfg.recordMergeImplClasses)) {
       hoodieConfig.setValue(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
cfg.recordMergeImplClasses);
     }
+    if (!StringUtils.isNullOrEmpty(cfg.sourceOrderingFields)) {
+      hoodieConfig.setValue(HoodieTableConfig.ORDERING_FIELDS.key(), 
cfg.sourceOrderingFields);
+    }
 
     return hoodieConfig.getProps(true);
   }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 94c2d1f7056f..9e7a92e9f6e3 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -79,7 +79,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.config.metrics.HoodieMetricsConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
@@ -278,6 +277,26 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         props.getString("hoodie.datasource.write.keygenerator.class"));
   }
 
+  @Test
+  public void testCombinePropertiesWithSourceOrderingFields() {
+    HoodieStreamer.Config cfg = getBaseConfig();
+    cfg.sourceOrderingFields = "ts,seq_id";
+
+    TypedProperties props = HoodieStreamer.combineProperties(cfg, 
Option.empty(), jsc.hadoopConfiguration());
+
+    assertEquals("ts,seq_id", 
props.getString(HoodieTableConfig.ORDERING_FIELDS.key()));
+  }
+
+  @Test
+  public void testCombinePropertiesWithoutSourceOrderingFields() {
+    HoodieStreamer.Config cfg = getBaseConfig();
+    cfg.sourceOrderingFields = null;
+
+    TypedProperties props = HoodieStreamer.combineProperties(cfg, 
Option.empty(), jsc.hadoopConfiguration());
+
+    assertFalse(props.containsKey(HoodieTableConfig.ORDERING_FIELDS.key()));
+  }
+
   private static HoodieStreamer.Config getBaseConfig() {
     // Base config with all required fields
     HoodieStreamer.Config base = new HoodieStreamer.Config();
@@ -2780,12 +2799,12 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
 
     // Change ordering fields in deltastreamer
-    Exception e = assertThrows(HoodieValidationException.class, () -> {
+    Exception e = assertThrows(HoodieException.class, () -> {
       cfg.sourceOrderingFields = "timestamp";
       TestDataSource.recordInstantTime = Option.of("002");
       runStreamSync(cfg, false, 10, WriteOperationType.UPSERT);
     });
-    assertEquals("Configured ordering fields: timestamp do not match table 
ordering fields: [timestamp, rider]", e.getMessage());
+    assertTrue(e.getMessage().contains("hoodie.table.ordering.fields") && 
e.getMessage().contains("timestamp,rider"));
   }
 
   private static long getNumUpdates(HoodieCommitMetadata metadata) {

Reply via email to