This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b60855defe0c feat: support read splits limit in Hudi Flink Source V2
(#18370)
b60855defe0c is described below
commit b60855defe0c75f41f7d68a52741d90f125aa3b7
Author: Peter Huang <[email protected]>
AuthorDate: Mon Mar 23 19:16:18 2026 -0700
feat: support read splits limit in Hudi Flink Source V2 (#18370)
* feat: support read splits limit in Hudi Flink Source V2
* add test cases
---
.../org/apache/hudi/table/HoodieTableSource.java | 1 +
.../TestHoodieContinuousSplitEnumerator.java | 110 +++++++++++++++++++++
2 files changed, 111 insertions(+)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index d346a12f8be3..62852bce8875 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -331,6 +331,7 @@ public class HoodieTableSource extends FileIndexReader
implements
.skipClustering(conf.get(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING))
.skipInsertOverwrite(conf.get(FlinkOptions.READ_STREAMING_SKIP_INSERT_OVERWRITE))
.maxCompactionMemoryInBytes(conf.get(FlinkOptions.COMPACTION_MAX_MEMORY))
+ .maxPendingSplits(conf.get(FlinkOptions.READ_SPLITS_LIMIT))
.partitionPruner(partitionPruner)
.isStreaming(conf.get(FlinkOptions.READ_AS_STREAMING))
.build();
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
index 548529e466cf..cc54fe2c7951 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
@@ -253,6 +253,116 @@ public class TestHoodieContinuousSplitEnumerator {
"Split should be added back to provider");
}
+ /**
+ * Verify that when pending splits exceed maxPendingSplits
(READ_SPLITS_LIMIT), discovery
+ * is paused and the enumerator position is NOT advanced.
+ * This ensures that after a pause, the next scan resumes from the correct
instant.
+ */
+ @Test
+ public void testReadSplitsLimitPositionPreservedWhenDiscoveryIsPaused()
throws Exception {
+ // Use a small maxPendingSplits to make it easy to exceed
+ Configuration conf = new Configuration();
+ conf.set(FlinkOptions.PATH, "/tmp/test");
+ conf.set(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 1);
+ HoodieScanContext limitedScanContext = HoodieScanContext.builder()
+ .conf(conf)
+ .path(new StoragePath("/tmp/test"))
+ .rowType(TestConfigurations.ROW_TYPE)
+ .startInstant("20231201000000000")
+ .maxPendingSplits(2)
+ .build();
+
+ String knownOffset = "20231201060000000";
+ HoodieSplitEnumeratorState initialState = new HoodieSplitEnumeratorState(
+ Collections.emptyList(),
+ Option.of("20231201000000000"),
+ Option.of(knownOffset));
+
+ // Fill provider with 3 splits, exceeding the limit of 2
+ splitProvider.onDiscoveredSplits(Arrays.asList(split1, split2,
createTestSplit(3, "file3")));
+
+ // Configure discover to return new splits — but they should never be
reached
+ splitDiscover.setNextBatch(new HoodieContinuousSplitBatch(
+ Collections.singletonList(createTestSplit(4, "file4")),
"20231201000000000", "20231201120000000"));
+
+ enumerator = new HoodieContinuousSplitEnumerator(
+ TABLE_NAME, context, splitProvider, splitDiscover, limitedScanContext,
Option.of(initialState));
+ enumerator.start();
+ context.executeAsyncCallbacks();
+
+ // Position must remain at knownOffset; discovery was paused and returned
EMPTY (no new offset)
+ HoodieSplitEnumeratorState state = enumerator.snapshotState(1L);
+ assertTrue(state.getLastEnumeratedInstantOffset().isPresent(),
+ "Position offset must not be cleared when discovery is paused by
READ_SPLITS_LIMIT");
+ assertEquals(knownOffset, state.getLastEnumeratedInstantOffset().get(),
+ "Position offset must stay at last consumed offset when paused by
READ_SPLITS_LIMIT");
+ }
+
+ /**
+ * Verify that discovery proceeds when pending splits are exactly at the
threshold (not exceeding).
+ * The condition is strictly greater-than, so pending == limit should still
allow discovery.
+ */
+ @Test
+ public void testReadSplitsLimitDiscoveryProceedsAtExactThreshold() throws
Exception {
+ Configuration conf = new Configuration();
+ conf.set(FlinkOptions.PATH, "/tmp/test");
+ conf.set(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 1);
+ HoodieScanContext limitedScanContext = HoodieScanContext.builder()
+ .conf(conf)
+ .path(new StoragePath("/tmp/test"))
+ .rowType(TestConfigurations.ROW_TYPE)
+ .startInstant("20231201000000000")
+ .maxPendingSplits(2)
+ .build();
+
+ // Add exactly 2 splits — equal to limit, should NOT pause
+ splitProvider.onDiscoveredSplits(Arrays.asList(split1, split2));
+
+ splitDiscover.setNextBatch(new HoodieContinuousSplitBatch(
+ Collections.singletonList(createTestSplit(3, "file3")),
"20231201000000000", "20231201120000000"));
+
+ enumerator = new HoodieContinuousSplitEnumerator(
+ TABLE_NAME, context, splitProvider, splitDiscover, limitedScanContext,
Option.empty());
+ enumerator.start();
+ context.executeAsyncCallbacks();
+
+ // One new split should have been discovered and added
+ assertEquals(3, splitProvider.pendingSplitCount(),
+ "Discovery should proceed when pending splits equal (not exceed) the
limit");
+ }
+
+ /**
+ * Verify that discovery pauses when pending splits exceed the threshold by
exactly one.
+ */
+ @Test
+ public void testReadSplitsLimitDiscoveryPausedExceedingByOne() throws
Exception {
+ Configuration conf = new Configuration();
+ conf.set(FlinkOptions.PATH, "/tmp/test");
+ conf.set(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 1);
+ HoodieScanContext limitedScanContext = HoodieScanContext.builder()
+ .conf(conf)
+ .path(new StoragePath("/tmp/test"))
+ .rowType(TestConfigurations.ROW_TYPE)
+ .startInstant("20231201000000000")
+ .maxPendingSplits(2)
+ .build();
+
+ // Add 3 splits — exceeds limit by 1
+ splitProvider.onDiscoveredSplits(Arrays.asList(split1, split2,
createTestSplit(3, "file3")));
+
+ splitDiscover.setNextBatch(new HoodieContinuousSplitBatch(
+ Collections.singletonList(createTestSplit(4, "file4")),
"20231201000000000", "20231201120000000"));
+
+ enumerator = new HoodieContinuousSplitEnumerator(
+ TABLE_NAME, context, splitProvider, splitDiscover, limitedScanContext,
Option.empty());
+ enumerator.start();
+ int countBeforeCallback = splitProvider.pendingSplitCount();
+ context.executeAsyncCallbacks();
+
+ assertEquals(countBeforeCallback, splitProvider.pendingSplitCount(),
+ "Discovery should pause when pending splits exceed the limit by one");
+ }
+
/**
* Verify that the enumerator position is NOT reset when no new commits are
found
* (i.e., when the split discover returns a batch with an empty offset).