This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b60855defe0c feat: support read splits limit in Hudi Flink Source V2 
(#18370)
b60855defe0c is described below

commit b60855defe0c75f41f7d68a52741d90f125aa3b7
Author: Peter Huang <[email protected]>
AuthorDate: Mon Mar 23 19:16:18 2026 -0700

    feat: support read splits limit in Hudi Flink Source V2 (#18370)
    
    * feat: support read splits limit in Hudi Flink Source V2
    * add test cases
---
 .../org/apache/hudi/table/HoodieTableSource.java   |   1 +
 .../TestHoodieContinuousSplitEnumerator.java       | 110 +++++++++++++++++++++
 2 files changed, 111 insertions(+)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index d346a12f8be3..62852bce8875 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -331,6 +331,7 @@ public class HoodieTableSource extends FileIndexReader 
implements
         .skipClustering(conf.get(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING))
         
.skipInsertOverwrite(conf.get(FlinkOptions.READ_STREAMING_SKIP_INSERT_OVERWRITE))
         
.maxCompactionMemoryInBytes(conf.get(FlinkOptions.COMPACTION_MAX_MEMORY))
+        .maxPendingSplits(conf.get(FlinkOptions.READ_SPLITS_LIMIT))
         .partitionPruner(partitionPruner)
         .isStreaming(conf.get(FlinkOptions.READ_AS_STREAMING))
         .build();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
index 548529e466cf..cc54fe2c7951 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
@@ -253,6 +253,116 @@ public class TestHoodieContinuousSplitEnumerator {
         "Split should be added back to provider");
   }
 
+  /**
+   * Verify that when pending splits exceed maxPendingSplits 
(READ_SPLITS_LIMIT), discovery
+   * is paused and the enumerator position is NOT advanced.
+   * This ensures that after a pause, the next scan resumes from the correct 
instant.
+   */
+  @Test
+  public void testReadSplitsLimitPositionPreservedWhenDiscoveryIsPaused() 
throws Exception {
+    // Use a small maxPendingSplits to make it easy to exceed
+    Configuration conf = new Configuration();
+    conf.set(FlinkOptions.PATH, "/tmp/test");
+    conf.set(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 1);
+    HoodieScanContext limitedScanContext = HoodieScanContext.builder()
+        .conf(conf)
+        .path(new StoragePath("/tmp/test"))
+        .rowType(TestConfigurations.ROW_TYPE)
+        .startInstant("20231201000000000")
+        .maxPendingSplits(2)
+        .build();
+
+    String knownOffset = "20231201060000000";
+    HoodieSplitEnumeratorState initialState = new HoodieSplitEnumeratorState(
+        Collections.emptyList(),
+        Option.of("20231201000000000"),
+        Option.of(knownOffset));
+
+    // Fill provider with 3 splits, exceeding the limit of 2
+    splitProvider.onDiscoveredSplits(Arrays.asList(split1, split2, 
createTestSplit(3, "file3")));
+
+    // Configure discover to return new splits — but they should never be 
reached
+    splitDiscover.setNextBatch(new HoodieContinuousSplitBatch(
+        Collections.singletonList(createTestSplit(4, "file4")), 
"20231201000000000", "20231201120000000"));
+
+    enumerator = new HoodieContinuousSplitEnumerator(
+        TABLE_NAME, context, splitProvider, splitDiscover, limitedScanContext, 
Option.of(initialState));
+    enumerator.start();
+    context.executeAsyncCallbacks();
+
+    // Position must remain at knownOffset; discovery was paused and returned 
EMPTY (no new offset)
+    HoodieSplitEnumeratorState state = enumerator.snapshotState(1L);
+    assertTrue(state.getLastEnumeratedInstantOffset().isPresent(),
+        "Position offset must not be cleared when discovery is paused by 
READ_SPLITS_LIMIT");
+    assertEquals(knownOffset, state.getLastEnumeratedInstantOffset().get(),
+        "Position offset must stay at last consumed offset when paused by 
READ_SPLITS_LIMIT");
+  }
+
+  /**
+   * Verify that discovery proceeds when pending splits are exactly at the 
threshold (not exceeding).
+   * The condition is strictly greater-than, so pending == limit should still 
allow discovery.
+   */
+  @Test
+  public void testReadSplitsLimitDiscoveryProceedsAtExactThreshold() throws 
Exception {
+    Configuration conf = new Configuration();
+    conf.set(FlinkOptions.PATH, "/tmp/test");
+    conf.set(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 1);
+    HoodieScanContext limitedScanContext = HoodieScanContext.builder()
+        .conf(conf)
+        .path(new StoragePath("/tmp/test"))
+        .rowType(TestConfigurations.ROW_TYPE)
+        .startInstant("20231201000000000")
+        .maxPendingSplits(2)
+        .build();
+
+    // Add exactly 2 splits — equal to limit, should NOT pause
+    splitProvider.onDiscoveredSplits(Arrays.asList(split1, split2));
+
+    splitDiscover.setNextBatch(new HoodieContinuousSplitBatch(
+        Collections.singletonList(createTestSplit(3, "file3")), 
"20231201000000000", "20231201120000000"));
+
+    enumerator = new HoodieContinuousSplitEnumerator(
+        TABLE_NAME, context, splitProvider, splitDiscover, limitedScanContext, 
Option.empty());
+    enumerator.start();
+    context.executeAsyncCallbacks();
+
+    // One new split should have been discovered and added
+    assertEquals(3, splitProvider.pendingSplitCount(),
+        "Discovery should proceed when pending splits equal (not exceed) the 
limit");
+  }
+
+  /**
+   * Verify that discovery pauses when pending splits exceed the threshold by 
exactly one.
+   */
+  @Test
+  public void testReadSplitsLimitDiscoveryPausedExceedingByOne() throws 
Exception {
+    Configuration conf = new Configuration();
+    conf.set(FlinkOptions.PATH, "/tmp/test");
+    conf.set(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 1);
+    HoodieScanContext limitedScanContext = HoodieScanContext.builder()
+        .conf(conf)
+        .path(new StoragePath("/tmp/test"))
+        .rowType(TestConfigurations.ROW_TYPE)
+        .startInstant("20231201000000000")
+        .maxPendingSplits(2)
+        .build();
+
+    // Add 3 splits — exceeds limit by 1
+    splitProvider.onDiscoveredSplits(Arrays.asList(split1, split2, 
createTestSplit(3, "file3")));
+
+    splitDiscover.setNextBatch(new HoodieContinuousSplitBatch(
+        Collections.singletonList(createTestSplit(4, "file4")), 
"20231201000000000", "20231201120000000"));
+
+    enumerator = new HoodieContinuousSplitEnumerator(
+        TABLE_NAME, context, splitProvider, splitDiscover, limitedScanContext, 
Option.empty());
+    enumerator.start();
+    int countBeforeCallback = splitProvider.pendingSplitCount();
+    context.executeAsyncCallbacks();
+
+    assertEquals(countBeforeCallback, splitProvider.pendingSplitCount(),
+        "Discovery should pause when pending splits exceed the limit by one");
+  }
+
   /**
    * Verify that the enumerator position is NOT reset when no new commits are 
found
    * (i.e., when the split discover returns a batch with an empty offset).

Reply via email to