This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 81a8c26ee739 feat: support read commits limit in Hudi Flink Source V2 
(#18369)
81a8c26ee739 is described below

commit 81a8c26ee7391ae5adf2799e75e68296abbec757
Author: Peter Huang <[email protected]>
AuthorDate: Sun Mar 22 20:37:41 2026 -0700

    feat: support read commits limit in Hudi Flink Source V2 (#18369)
---
 .../HoodieContinuousSplitEnumerator.java           | 11 ++-
 .../TestHoodieContinuousSplitEnumerator.java       | 87 ++++++++++++++++++++++
 2 files changed, 96 insertions(+), 2 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
index 903dc5e3feff..b222ab17f197 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.source.enumerator;
 
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.source.HoodieScanContext;
 import org.apache.hudi.source.split.HoodieContinuousSplitBatch;
 import org.apache.hudi.source.split.HoodieContinuousSplitDiscover;
@@ -121,8 +122,14 @@ public class HoodieContinuousSplitEnumerator extends 
AbstractHoodieSplitEnumerat
           position.get().getIssuedOffset(),
           result.getOffset());
     }
-    position.set(HoodieEnumeratorPosition.of(result.getEndInstant(), 
result.getOffset()));
-    LOG.info("Update enumerator position to {}", position.get());
+    // Only advance position when there is a valid new offset from the 
analyzed commits.
+    // An empty/null offset means either no new commits were found or split 
discovery was paused
+    // due to too many pending splits. Preserving the current position ensures 
the next scan
+    // resumes from the correct point, which is required for 
READ_COMMITS_LIMIT to work correctly.
+    if (!StringUtils.isNullOrEmpty(result.getOffset())) {
+      position.set(HoodieEnumeratorPosition.of(result.getEndInstant(), 
result.getOffset()));
+      LOG.info("Update enumerator position to {}", position.get());
+    }
   }
 
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
index 4045676fd05f..548529e466cf 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
@@ -253,6 +253,87 @@ public class TestHoodieContinuousSplitEnumerator {
         "Split should be added back to provider");
   }
 
+  /**
+   * Verify that the enumerator position is NOT reset when no new commits are 
found
+   * (i.e., when the split discover returns a batch with an empty offset).
+   * This is critical for READ_COMMITS_LIMIT: after catching up to the latest 
commit,
+   * subsequent scans must resume from the last consumed offset, not from the 
beginning.
+   */
+  @Test
+  public void testPositionPreservedWhenNoNewCommits() throws Exception {
+    // Initialize enumerator with a known position
+    HoodieSplitEnumeratorState initialState = new HoodieSplitEnumeratorState(
+        Collections.emptyList(),
+        Option.of("20231201000000000"),
+        Option.of("20231201120000000"));
+
+    splitDiscover.setNextBatch(HoodieContinuousSplitBatch.EMPTY);
+    enumerator = new HoodieContinuousSplitEnumerator(
+        TABLE_NAME, context, splitProvider, splitDiscover, scanContext, 
Option.of(initialState));
+    enumerator.start();
+    context.executeAsyncCallbacks();
+
+    // Position must remain at the initial offset, not be reset to empty
+    HoodieSplitEnumeratorState state = enumerator.snapshotState(1L);
+    assertTrue(state.getLastEnumeratedInstantOffset().isPresent(),
+        "Position offset must not be cleared when no new commits are found");
+    assertEquals("20231201120000000", 
state.getLastEnumeratedInstantOffset().get(),
+        "Position offset must be preserved at the last consumed offset");
+  }
+
+  /**
+   * Verify that the enumerator position advances correctly when 
READ_COMMITS_LIMIT constrains
+   * the batch to commits that contain no data (empty commits).
+   * The position must advance to the end of the limited batch so the next 
scan picks up
+   * the subsequent commits rather than re-processing the same ones.
+   */
+  @Test
+  public void testReadCommitsLimitPositionAdvancesWithEmptyCommitBatch() 
throws Exception {
+    // Simulate READ_COMMITS_LIMIT returning a limited batch of empty commits:
+    // splits are empty but the batch carries a valid offset from the analyzed 
commits.
+    HoodieContinuousSplitBatch emptyCommitsBatch = new 
HoodieContinuousSplitBatch(
+        Collections.emptyList(), "20231201000000000", "20231201120000000");
+    splitDiscover.setNextBatch(emptyCommitsBatch);
+
+    enumerator = new HoodieContinuousSplitEnumerator(
+        TABLE_NAME, context, splitProvider, splitDiscover, scanContext, 
Option.empty());
+    enumerator.start();
+    context.executeAsyncCallbacks();
+
+    // Position must advance to the batch offset even though no splits were 
produced
+    HoodieSplitEnumeratorState state = enumerator.snapshotState(1L);
+    assertTrue(state.getLastEnumeratedInstantOffset().isPresent(),
+        "Position offset must be set after processing a limited batch of empty 
commits");
+    assertEquals("20231201120000000", 
state.getLastEnumeratedInstantOffset().get(),
+        "Position offset must advance to the end of the limited commit batch");
+  }
+
+  /**
+   * Verify that after READ_COMMITS_LIMIT advances the position to offset X,
+   * the next discover call passes X as the lastInstant so that only subsequent
+   * commits are scanned (no re-reading of already-consumed commits).
+   */
+  @Test
+  public void testReadCommitsLimitPassesLastInstantToNextDiscover() throws 
Exception {
+    // Initialize enumerator with a known position (simulating state after 
first limited batch)
+    String previousOffset = "20231201120000000";
+    HoodieSplitEnumeratorState initialState = new HoodieSplitEnumeratorState(
+        Collections.emptyList(),
+        Option.of("20231201000000000"),
+        Option.of(previousOffset));
+
+    splitDiscover.setNextBatch(HoodieContinuousSplitBatch.EMPTY);
+    enumerator = new HoodieContinuousSplitEnumerator(
+        TABLE_NAME, context, splitProvider, splitDiscover, scanContext, 
Option.of(initialState));
+    enumerator.start();
+    context.executeAsyncCallbacks();
+
+    // The discover call must receive the previous batch's offset so scanning 
resumes
+    // from after the last consumed commit, not from READ_START_COMMIT
+    assertEquals(previousOffset, splitDiscover.getLastDiscoveredInstant(),
+        "discover must be called with the last consumed offset to avoid 
re-reading commits");
+  }
+
   private HoodieSourceSplit createTestSplit(int splitNum, String fileId) {
     return new HoodieSourceSplit(
         splitNum,
@@ -273,6 +354,7 @@ public class TestHoodieContinuousSplitEnumerator {
   private static class MockContinuousSplitDiscover implements 
HoodieContinuousSplitDiscover {
     private HoodieContinuousSplitBatch nextBatch = 
HoodieContinuousSplitBatch.EMPTY;
     private boolean throwException = false;
+    private String lastDiscoveredInstant;
 
     public void setNextBatch(HoodieContinuousSplitBatch batch) {
       this.nextBatch = batch;
@@ -282,8 +364,13 @@ public class TestHoodieContinuousSplitEnumerator {
       this.throwException = throwException;
     }
 
+    public String getLastDiscoveredInstant() {
+      return lastDiscoveredInstant;
+    }
+
     @Override
     public HoodieContinuousSplitBatch discoverSplits(String lastInstant) {
+      this.lastDiscoveredInstant = lastInstant;
       if (throwException) {
         throw new RuntimeException("Mock exception during split discovery");
       }

Reply via email to