This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new 94a416737c [flink] ContinuousFileSplitEnumerator supports setting splitMaxPerTask for SplitAssigner. (#5835) 94a416737c is described below commit 94a416737c875716941730a479195697b4bac50d Author: zhuanshenbsj1 <34104400+zhuanshenb...@users.noreply.github.com> AuthorDate: Fri Jul 4 20:47:04 2025 +0800 [flink] ContinuousFileSplitEnumerator supports setting splitMaxPerTask for SplitAssigner. (#5835) --- .../source/ContinuousFileSplitEnumerator.java | 8 +++-- .../source/ContinuousFileSplitEnumeratorTest.java | 41 ++++++++++++++++------ 2 files changed, 37 insertions(+), 12 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java index 4db86da117..38c593e75d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java @@ -69,6 +69,8 @@ public class ContinuousFileSplitEnumerator protected final StreamTableScan scan; + protected final int splitMaxPerTask; + protected final SplitAssigner splitAssigner; protected final ConsumerProgressCalculator consumerProgressCalculator; @@ -104,8 +106,9 @@ public class ContinuousFileSplitEnumerator this.readersAwaitingSplit = new LinkedHashSet<>(); this.splitGenerator = new FileStoreSourceSplitGenerator(); this.scan = scan; - this.splitAssigner = createSplitAssigner(unawareBucket); + this.splitMaxPerTask = splitMaxPerTask; this.splitMaxNum = context.currentParallelism() * splitMaxPerTask; + this.splitAssigner = createSplitAssigner(unawareBucket); this.shuffleBucketWithPartition = shuffleBucketWithPartition; addSplits(remainSplits); @@ -311,7 +314,8 @@ public class ContinuousFileSplitEnumerator protected SplitAssigner createSplitAssigner(boolean unawareBucket) { return unawareBucket ? new FIFOSplitAssigner(Collections.emptyList()) - : new PreAssignSplitAssigner(1, context, Collections.emptyList()); + : new PreAssignSplitAssigner( + this.splitMaxPerTask, context, Collections.emptyList()); } protected boolean noMoreSplits() { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java index 05e3d3b0b6..90c35350eb 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java @@ -74,6 +74,7 @@ public class ContinuousFileSplitEnumeratorTest extends FileSplitEnumeratorTestBa .setSplitEnumeratorContext(context) .setInitialSplits(initialSplits) .setDiscoveryInterval(3) + .withSplitMaxPerTask(1) .build(); // The first time split is allocated, split1 and split2 should be allocated @@ -125,6 +126,7 @@ public class ContinuousFileSplitEnumeratorTest extends FileSplitEnumeratorTestBa .setSplitEnumeratorContext(context) .setInitialSplits(initialSplits) .setDiscoveryInterval(3) + .withSplitMaxPerTask(1) .build(); // The first time split is allocated, split1 and split2 should be allocated @@ -166,6 +168,7 @@ public class ContinuousFileSplitEnumeratorTest extends FileSplitEnumeratorTestBa .setSplitEnumeratorContext(context) .setInitialSplits(initialSplits) .setDiscoveryInterval(3) + .withSplitMaxPerTask(1) .build(); // each time a split is allocated from bucket-0 and bucket-1 @@ -205,6 +208,7 @@ public class ContinuousFileSplitEnumeratorTest extends FileSplitEnumeratorTestBa .setInitialSplits(Collections.emptyList()) .setDiscoveryInterval(1) .setScan(scan) + .withSplitMaxPerTask(1) .build(); enumerator.start(); @@ -231,11 +235,17 @@ public class ContinuousFileSplitEnumeratorTest extends FileSplitEnumeratorTestBa assertThat(toDataSplits(assignments.get(0).getAssignedSplits())) .containsExactly(splits.get(0), splits.get(2)); + // assign to task 1 + enumerator.handleSplitRequest(1, "test-host"); + assignments = context.getSplitAssignments(); + assertThat(assignments).containsKey(1); + assertThat(toDataSplits(assignments.get(1).getAssignedSplits())) + .containsExactly(splits.get(1)); + // no more splits task 0 enumerator.handleSplitRequest(0, "test-host"); context.triggerAllActions(); assignments = context.getSplitAssignments(); - assertThat(assignments).containsOnlyKeys(0); assertThat(assignments.get(0).hasReceivedNoMoreSplitsSignal()).isTrue(); assignments.clear(); @@ -244,14 +254,7 @@ public class ContinuousFileSplitEnumeratorTest extends FileSplitEnumeratorTestBa assignments = context.getSplitAssignments(); assertThat(assignments).containsOnlyKeys(1); assertThat(toDataSplits(assignments.get(1).getAssignedSplits())) - .containsExactly(splits.get(1)); - - // assign to task 1 - enumerator.handleSplitRequest(1, "test-host"); - assignments = context.getSplitAssignments(); - assertThat(assignments).containsOnlyKeys(1); - assertThat(toDataSplits(assignments.get(1).getAssignedSplits())) - .containsExactly(splits.get(1), splits.get(3)); + .containsExactly(splits.get(3)); // no more splits task 1 enumerator.handleSplitRequest(1, "test-host"); @@ -273,6 +276,7 @@ public class ContinuousFileSplitEnumeratorTest extends FileSplitEnumeratorTestBa .setInitialSplits(Collections.emptyList()) .setDiscoveryInterval(1) .setScan(scan) + .withSplitMaxPerTask(1) .unawareBucket(true) .build(); enumerator.start(); @@ -315,6 +319,7 @@ public class ContinuousFileSplitEnumeratorTest extends FileSplitEnumeratorTestBa .setInitialSplits(Collections.emptyList()) .setDiscoveryInterval(1) .setScan(scan) + .withSplitMaxPerTask(1) .unawareBucket(true) .build(); enumerator.start(); @@ -374,6 +379,7 @@ public class ContinuousFileSplitEnumeratorTest extends FileSplitEnumeratorTestBa .setInitialSplits(Collections.emptyList()) .setDiscoveryInterval(1) .setScan(scan) + .withSplitMaxPerTask(1) .unawareBucket(true) .build(); enumerator.start(); @@ -430,6 +436,7 @@ public class ContinuousFileSplitEnumeratorTest extends FileSplitEnumeratorTestBa .setInitialSplits(Collections.emptyList()) .setDiscoveryInterval(1) .setScan(scan) + .withSplitMaxPerTask(1) .unawareBucket(true) .build(); enumerator.start(); @@ -469,6 +476,7 @@ public class ContinuousFileSplitEnumeratorTest extends FileSplitEnumeratorTestBa .setInitialSplits(Collections.emptyList()) .setDiscoveryInterval(1) .setScan(scan) + .withSplitMaxPerTask(1) .unawareBucket(true) .build(); enumerator.start(); @@ -501,6 +509,7 @@ public class ContinuousFileSplitEnumeratorTest extends FileSplitEnumeratorTestBa .setInitialSplits(Collections.emptyList()) .setDiscoveryInterval(1) .setScan(scan) + .withSplitMaxPerTask(1) .build(); enumerator.start(); @@ -542,6 +551,7 @@ public class ContinuousFileSplitEnumeratorTest extends FileSplitEnumeratorTestBa .setInitialSplits(Collections.emptyList()) .setDiscoveryInterval(1) .setScan(scan) + .withSplitMaxPerTask(1) .build(); enumerator.start(); enumerator.handleSplitRequest(0, "test-host"); @@ -641,6 +651,7 @@ public class ContinuousFileSplitEnumeratorTest extends FileSplitEnumeratorTestBa .setInitialSplits(Collections.emptyList()) .setDiscoveryInterval(1) .setScan(scan) + .withSplitMaxPerTask(1) .build(); enumerator.start(); @@ -709,6 +720,7 @@ public class ContinuousFileSplitEnumeratorTest extends FileSplitEnumeratorTestBa .setSplitEnumeratorContext(context) .setInitialSplits(Collections.emptyList()) .setDiscoveryInterval(1) + .withSplitMaxPerTask(1) .setScan(scan) .build(); enumerator.start(); @@ -765,6 +777,7 @@ public class ContinuousFileSplitEnumeratorTest extends FileSplitEnumeratorTestBa .setInitialSplits(Collections.emptyList()) .setDiscoveryInterval(1) .setScan(scan) + .withSplitMaxPerTask(10) .unawareBucket(true) .build(); enumerator.start(); @@ -816,6 +829,7 @@ public class ContinuousFileSplitEnumeratorTest extends FileSplitEnumeratorTestBa .setScan(scan) .unawareBucket(true) .withMaxSnapshotCount(1) + .withSplitMaxPerTask(1) .build(); enumerator.start(); @@ -901,6 +915,8 @@ public class ContinuousFileSplitEnumeratorTest extends FileSplitEnumeratorTestBa private boolean unawareBucket = false; private int maxSnapshotCount = -1; + private int splitMaxPerTask = 10; + public Builder setSplitEnumeratorContext( SplitEnumeratorContext<FileStoreSourceSplit> context) { this.context = context; @@ -932,6 +948,11 @@ public class ContinuousFileSplitEnumeratorTest extends FileSplitEnumeratorTestBa return this; } + public Builder withSplitMaxPerTask(int splitMaxPerTask) { + this.splitMaxPerTask = splitMaxPerTask; + return this; + } + public ContinuousFileSplitEnumerator build() { return new ContinuousFileSplitEnumerator( context, @@ -940,7 +961,7 @@ public class ContinuousFileSplitEnumeratorTest extends FileSplitEnumeratorTestBa discoveryInterval, scan, unawareBucket, - 10, + this.splitMaxPerTask, false, maxSnapshotCount); }