This is an automated email from the ASF dual-hosted git repository.
vhs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1bb520c783ea fix: Propagate cfg.sourceOrderingFields in HoodieStreamer
(#17984)
1bb520c783ea is described below
commit 1bb520c783ea820c67e71b5bb927226ad0f6b6e7
Author: voonhous <[email protected]>
AuthorDate: Fri Jan 23 10:44:30 2026 +0800
fix: Propagate cfg.sourceOrderingFields in HoodieStreamer (#17984)
---
.../hudi/utilities/streamer/HoodieStreamer.java | 3 +++
.../deltastreamer/TestHoodieDeltaStreamer.java | 25 +++++++++++++++++++---
2 files changed, 25 insertions(+), 3 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
index 3266025d4a89..850850356a00 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
@@ -204,6 +204,9 @@ public class HoodieStreamer implements Serializable {
&& !StringUtils.isNullOrEmpty(cfg.recordMergeImplClasses)) {
hoodieConfig.setValue(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
cfg.recordMergeImplClasses);
}
+ if (!StringUtils.isNullOrEmpty(cfg.sourceOrderingFields)) {
+ hoodieConfig.setValue(HoodieTableConfig.ORDERING_FIELDS.key(),
cfg.sourceOrderingFields);
+ }
return hoodieConfig.getProps(true);
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 94c2d1f7056f..9e7a92e9f6e3 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -79,7 +79,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
@@ -278,6 +277,26 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
props.getString("hoodie.datasource.write.keygenerator.class"));
}
+ @Test
+ public void testCombinePropertiesWithSourceOrderingFields() {
+ HoodieStreamer.Config cfg = getBaseConfig();
+ cfg.sourceOrderingFields = "ts,seq_id";
+
+ TypedProperties props = HoodieStreamer.combineProperties(cfg,
Option.empty(), jsc.hadoopConfiguration());
+
+ assertEquals("ts,seq_id",
props.getString(HoodieTableConfig.ORDERING_FIELDS.key()));
+ }
+
+ @Test
+ public void testCombinePropertiesWithoutSourceOrderingFields() {
+ HoodieStreamer.Config cfg = getBaseConfig();
+ cfg.sourceOrderingFields = null;
+
+ TypedProperties props = HoodieStreamer.combineProperties(cfg,
Option.empty(), jsc.hadoopConfiguration());
+
+ assertFalse(props.containsKey(HoodieTableConfig.ORDERING_FIELDS.key()));
+ }
+
private static HoodieStreamer.Config getBaseConfig() {
// Base config with all required fields
HoodieStreamer.Config base = new HoodieStreamer.Config();
@@ -2780,12 +2799,12 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
// Change ordering fields in deltastreamer
- Exception e = assertThrows(HoodieValidationException.class, () -> {
+ Exception e = assertThrows(HoodieException.class, () -> {
cfg.sourceOrderingFields = "timestamp";
TestDataSource.recordInstantTime = Option.of("002");
runStreamSync(cfg, false, 10, WriteOperationType.UPSERT);
});
- assertEquals("Configured ordering fields: timestamp do not match table
ordering fields: [timestamp, rider]", e.getMessage());
+ assertTrue(e.getMessage().contains("hoodie.table.ordering.fields") &&
e.getMessage().contains("timestamp,rider"));
}
private static long getNumUpdates(HoodieCommitMetadata metadata) {