This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 94a416737c [flink] ContinuousFileSplitEnumerator supports setting 
splitMaxPerTask for SplitAssigner. (#5835)
94a416737c is described below

commit 94a416737c875716941730a479195697b4bac50d
Author: zhuanshenbsj1 <34104400+zhuanshenb...@users.noreply.github.com>
AuthorDate: Fri Jul 4 20:47:04 2025 +0800

    [flink] ContinuousFileSplitEnumerator supports setting splitMaxPerTask for 
SplitAssigner. (#5835)
---
 .../source/ContinuousFileSplitEnumerator.java      |  8 +++--
 .../source/ContinuousFileSplitEnumeratorTest.java  | 41 ++++++++++++++++------
 2 files changed, 37 insertions(+), 12 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
index 4db86da117..38c593e75d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
@@ -69,6 +69,8 @@ public class ContinuousFileSplitEnumerator
 
     protected final StreamTableScan scan;
 
+    protected final int splitMaxPerTask;
+
     protected final SplitAssigner splitAssigner;
 
     protected final ConsumerProgressCalculator consumerProgressCalculator;
@@ -104,8 +106,9 @@ public class ContinuousFileSplitEnumerator
         this.readersAwaitingSplit = new LinkedHashSet<>();
         this.splitGenerator = new FileStoreSourceSplitGenerator();
         this.scan = scan;
-        this.splitAssigner = createSplitAssigner(unawareBucket);
+        this.splitMaxPerTask = splitMaxPerTask;
         this.splitMaxNum = context.currentParallelism() * splitMaxPerTask;
+        this.splitAssigner = createSplitAssigner(unawareBucket);
         this.shuffleBucketWithPartition = shuffleBucketWithPartition;
         addSplits(remainSplits);
 
@@ -311,7 +314,8 @@ public class ContinuousFileSplitEnumerator
     protected SplitAssigner createSplitAssigner(boolean unawareBucket) {
         return unawareBucket
                 ? new FIFOSplitAssigner(Collections.emptyList())
-                : new PreAssignSplitAssigner(1, context, 
Collections.emptyList());
+                : new PreAssignSplitAssigner(
+                        this.splitMaxPerTask, context, 
Collections.emptyList());
     }
 
     protected boolean noMoreSplits() {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index 05e3d3b0b6..90c35350eb 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -74,6 +74,7 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
                         .setSplitEnumeratorContext(context)
                         .setInitialSplits(initialSplits)
                         .setDiscoveryInterval(3)
+                        .withSplitMaxPerTask(1)
                         .build();
 
         // The first time split is allocated, split1 and split2 should be 
allocated
@@ -125,6 +126,7 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
                         .setSplitEnumeratorContext(context)
                         .setInitialSplits(initialSplits)
                         .setDiscoveryInterval(3)
+                        .withSplitMaxPerTask(1)
                         .build();
 
         // The first time split is allocated, split1 and split2 should be 
allocated
@@ -166,6 +168,7 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
                         .setSplitEnumeratorContext(context)
                         .setInitialSplits(initialSplits)
                         .setDiscoveryInterval(3)
+                        .withSplitMaxPerTask(1)
                         .build();
 
         // each time a split is allocated from bucket-0 and bucket-1
@@ -205,6 +208,7 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
                         .setInitialSplits(Collections.emptyList())
                         .setDiscoveryInterval(1)
                         .setScan(scan)
+                        .withSplitMaxPerTask(1)
                         .build();
         enumerator.start();
 
@@ -231,11 +235,17 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
         assertThat(toDataSplits(assignments.get(0).getAssignedSplits()))
                 .containsExactly(splits.get(0), splits.get(2));
 
+        // assign to task 1
+        enumerator.handleSplitRequest(1, "test-host");
+        assignments = context.getSplitAssignments();
+        assertThat(assignments).containsKey(1);
+        assertThat(toDataSplits(assignments.get(1).getAssignedSplits()))
+                .containsExactly(splits.get(1));
+
         // no more splits task 0
         enumerator.handleSplitRequest(0, "test-host");
         context.triggerAllActions();
         assignments = context.getSplitAssignments();
-        assertThat(assignments).containsOnlyKeys(0);
         
assertThat(assignments.get(0).hasReceivedNoMoreSplitsSignal()).isTrue();
         assignments.clear();
 
@@ -244,14 +254,7 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
         assignments = context.getSplitAssignments();
         assertThat(assignments).containsOnlyKeys(1);
         assertThat(toDataSplits(assignments.get(1).getAssignedSplits()))
-                .containsExactly(splits.get(1));
-
-        // assign to task 1
-        enumerator.handleSplitRequest(1, "test-host");
-        assignments = context.getSplitAssignments();
-        assertThat(assignments).containsOnlyKeys(1);
-        assertThat(toDataSplits(assignments.get(1).getAssignedSplits()))
-                .containsExactly(splits.get(1), splits.get(3));
+                .containsExactly(splits.get(3));
 
         // no more splits task 1
         enumerator.handleSplitRequest(1, "test-host");
@@ -273,6 +276,7 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
                         .setInitialSplits(Collections.emptyList())
                         .setDiscoveryInterval(1)
                         .setScan(scan)
+                        .withSplitMaxPerTask(1)
                         .unawareBucket(true)
                         .build();
         enumerator.start();
@@ -315,6 +319,7 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
                         .setInitialSplits(Collections.emptyList())
                         .setDiscoveryInterval(1)
                         .setScan(scan)
+                        .withSplitMaxPerTask(1)
                         .unawareBucket(true)
                         .build();
         enumerator.start();
@@ -374,6 +379,7 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
                         .setInitialSplits(Collections.emptyList())
                         .setDiscoveryInterval(1)
                         .setScan(scan)
+                        .withSplitMaxPerTask(1)
                         .unawareBucket(true)
                         .build();
         enumerator.start();
@@ -430,6 +436,7 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
                         .setInitialSplits(Collections.emptyList())
                         .setDiscoveryInterval(1)
                         .setScan(scan)
+                        .withSplitMaxPerTask(1)
                         .unawareBucket(true)
                         .build();
         enumerator.start();
@@ -469,6 +476,7 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
                         .setInitialSplits(Collections.emptyList())
                         .setDiscoveryInterval(1)
                         .setScan(scan)
+                        .withSplitMaxPerTask(1)
                         .unawareBucket(true)
                         .build();
         enumerator.start();
@@ -501,6 +509,7 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
                         .setInitialSplits(Collections.emptyList())
                         .setDiscoveryInterval(1)
                         .setScan(scan)
+                        .withSplitMaxPerTask(1)
                         .build();
         enumerator.start();
 
@@ -542,6 +551,7 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
                         .setInitialSplits(Collections.emptyList())
                         .setDiscoveryInterval(1)
                         .setScan(scan)
+                        .withSplitMaxPerTask(1)
                         .build();
         enumerator.start();
         enumerator.handleSplitRequest(0, "test-host");
@@ -641,6 +651,7 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
                         .setInitialSplits(Collections.emptyList())
                         .setDiscoveryInterval(1)
                         .setScan(scan)
+                        .withSplitMaxPerTask(1)
                         .build();
         enumerator.start();
 
@@ -709,6 +720,7 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
                         .setSplitEnumeratorContext(context)
                         .setInitialSplits(Collections.emptyList())
                         .setDiscoveryInterval(1)
+                        .withSplitMaxPerTask(1)
                         .setScan(scan)
                         .build();
         enumerator.start();
@@ -765,6 +777,7 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
                         .setInitialSplits(Collections.emptyList())
                         .setDiscoveryInterval(1)
                         .setScan(scan)
+                        .withSplitMaxPerTask(10)
                         .unawareBucket(true)
                         .build();
         enumerator.start();
@@ -816,6 +829,7 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
                         .setScan(scan)
                         .unawareBucket(true)
                         .withMaxSnapshotCount(1)
+                        .withSplitMaxPerTask(1)
                         .build();
         enumerator.start();
 
@@ -901,6 +915,8 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
         private boolean unawareBucket = false;
         private int maxSnapshotCount = -1;
 
+        private int splitMaxPerTask = 10;
+
         public Builder setSplitEnumeratorContext(
                 SplitEnumeratorContext<FileStoreSourceSplit> context) {
             this.context = context;
@@ -932,6 +948,11 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
             return this;
         }
 
+        public Builder withSplitMaxPerTask(int splitMaxPerTask) {
+            this.splitMaxPerTask = splitMaxPerTask;
+            return this;
+        }
+
         public ContinuousFileSplitEnumerator build() {
             return new ContinuousFileSplitEnumerator(
                     context,
@@ -940,7 +961,7 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
                     discoveryInterval,
                     scan,
                     unawareBucket,
-                    10,
+                    this.splitMaxPerTask,
                     false,
                     maxSnapshotCount);
         }

Reply via email to