This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 81a8c26ee739 feat: support read commits limit in Hudi Flink Source V2
(#18369)
81a8c26ee739 is described below
commit 81a8c26ee7391ae5adf2799e75e68296abbec757
Author: Peter Huang <[email protected]>
AuthorDate: Sun Mar 22 20:37:41 2026 -0700
feat: support read commits limit in Hudi Flink Source V2 (#18369)
---
.../HoodieContinuousSplitEnumerator.java | 11 ++-
.../TestHoodieContinuousSplitEnumerator.java | 87 ++++++++++++++++++++++
2 files changed, 96 insertions(+), 2 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
index 903dc5e3feff..b222ab17f197 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
@@ -19,6 +19,7 @@
package org.apache.hudi.source.enumerator;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.source.HoodieScanContext;
import org.apache.hudi.source.split.HoodieContinuousSplitBatch;
import org.apache.hudi.source.split.HoodieContinuousSplitDiscover;
@@ -121,8 +122,14 @@ public class HoodieContinuousSplitEnumerator extends
AbstractHoodieSplitEnumerat
position.get().getIssuedOffset(),
result.getOffset());
}
- position.set(HoodieEnumeratorPosition.of(result.getEndInstant(),
result.getOffset()));
- LOG.info("Update enumerator position to {}", position.get());
+ // Only advance position when there is a valid new offset from the
analyzed commits.
+ // An empty/null offset means either no new commits were found or split
discovery was paused
+ // due to too many pending splits. Preserving the current position ensures
the next scan
+ // resumes from the correct point, which is required for
READ_COMMITS_LIMIT to work correctly.
+ if (!StringUtils.isNullOrEmpty(result.getOffset())) {
+ position.set(HoodieEnumeratorPosition.of(result.getEndInstant(),
result.getOffset()));
+ LOG.info("Update enumerator position to {}", position.get());
+ }
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
index 4045676fd05f..548529e466cf 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
@@ -253,6 +253,87 @@ public class TestHoodieContinuousSplitEnumerator {
"Split should be added back to provider");
}
+ /**
+ * Verify that the enumerator position is NOT reset when no new commits are
found
+ * (i.e., when the split discover returns a batch with an empty offset).
+ * This is critical for READ_COMMITS_LIMIT: after catching up to the latest
commit,
+ * subsequent scans must resume from the last consumed offset, not from the
beginning.
+ */
+ @Test
+ public void testPositionPreservedWhenNoNewCommits() throws Exception {
+ // Initialize enumerator with a known position
+ HoodieSplitEnumeratorState initialState = new HoodieSplitEnumeratorState(
+ Collections.emptyList(),
+ Option.of("20231201000000000"),
+ Option.of("20231201120000000"));
+
+ splitDiscover.setNextBatch(HoodieContinuousSplitBatch.EMPTY);
+ enumerator = new HoodieContinuousSplitEnumerator(
+ TABLE_NAME, context, splitProvider, splitDiscover, scanContext,
Option.of(initialState));
+ enumerator.start();
+ context.executeAsyncCallbacks();
+
+ // Position must remain at the initial offset, not be reset to empty
+ HoodieSplitEnumeratorState state = enumerator.snapshotState(1L);
+ assertTrue(state.getLastEnumeratedInstantOffset().isPresent(),
+ "Position offset must not be cleared when no new commits are found");
+ assertEquals("20231201120000000",
state.getLastEnumeratedInstantOffset().get(),
+ "Position offset must be preserved at the last consumed offset");
+ }
+
+ /**
+ * Verify that the enumerator position advances correctly when
READ_COMMITS_LIMIT constrains
+ * the batch to commits that contain no data (empty commits).
+ * The position must advance to the end of the limited batch so the next
scan picks up
+ * the subsequent commits rather than re-processing the same ones.
+ */
+ @Test
+ public void testReadCommitsLimitPositionAdvancesWithEmptyCommitBatch()
throws Exception {
+ // Simulate READ_COMMITS_LIMIT returning a limited batch of empty commits:
+ // splits are empty but the batch carries a valid offset from the analyzed
commits.
+ HoodieContinuousSplitBatch emptyCommitsBatch = new
HoodieContinuousSplitBatch(
+ Collections.emptyList(), "20231201000000000", "20231201120000000");
+ splitDiscover.setNextBatch(emptyCommitsBatch);
+
+ enumerator = new HoodieContinuousSplitEnumerator(
+ TABLE_NAME, context, splitProvider, splitDiscover, scanContext,
Option.empty());
+ enumerator.start();
+ context.executeAsyncCallbacks();
+
+ // Position must advance to the batch offset even though no splits were
produced
+ HoodieSplitEnumeratorState state = enumerator.snapshotState(1L);
+ assertTrue(state.getLastEnumeratedInstantOffset().isPresent(),
+ "Position offset must be set after processing a limited batch of empty
commits");
+ assertEquals("20231201120000000",
state.getLastEnumeratedInstantOffset().get(),
+ "Position offset must advance to the end of the limited commit batch");
+ }
+
+ /**
+ * Verify that after READ_COMMITS_LIMIT advances the position to offset X,
+ * the next discover call passes X as the lastInstant so that only subsequent
+ * commits are scanned (no re-reading of already-consumed commits).
+ */
+ @Test
+ public void testReadCommitsLimitPassesLastInstantToNextDiscover() throws
Exception {
+ // Initialize enumerator with a known position (simulating state after
first limited batch)
+ String previousOffset = "20231201120000000";
+ HoodieSplitEnumeratorState initialState = new HoodieSplitEnumeratorState(
+ Collections.emptyList(),
+ Option.of("20231201000000000"),
+ Option.of(previousOffset));
+
+ splitDiscover.setNextBatch(HoodieContinuousSplitBatch.EMPTY);
+ enumerator = new HoodieContinuousSplitEnumerator(
+ TABLE_NAME, context, splitProvider, splitDiscover, scanContext,
Option.of(initialState));
+ enumerator.start();
+ context.executeAsyncCallbacks();
+
+ // The discover call must receive the previous batch's offset so scanning
resumes
+ // from after the last consumed commit, not from READ_START_COMMIT
+ assertEquals(previousOffset, splitDiscover.getLastDiscoveredInstant(),
+ "discover must be called with the last consumed offset to avoid
re-reading commits");
+ }
+
private HoodieSourceSplit createTestSplit(int splitNum, String fileId) {
return new HoodieSourceSplit(
splitNum,
@@ -273,6 +354,7 @@ public class TestHoodieContinuousSplitEnumerator {
private static class MockContinuousSplitDiscover implements
HoodieContinuousSplitDiscover {
private HoodieContinuousSplitBatch nextBatch =
HoodieContinuousSplitBatch.EMPTY;
private boolean throwException = false;
+ private String lastDiscoveredInstant;
public void setNextBatch(HoodieContinuousSplitBatch batch) {
this.nextBatch = batch;
@@ -282,8 +364,13 @@ public class TestHoodieContinuousSplitEnumerator {
this.throwException = throwException;
}
+ public String getLastDiscoveredInstant() {
+ return lastDiscoveredInstant;
+ }
+
@Override
public HoodieContinuousSplitBatch discoverSplits(String lastInstant) {
+ this.lastDiscoveredInstant = lastInstant;
if (throwException) {
throw new RuntimeException("Mock exception during split discovery");
}