This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f122bf26a [flink] unaware-bucket mode parallelism read (streaming and 
batch) (#1372)
f122bf26a is described below

commit f122bf26a4acdcc52f9ba04a5a01c350d3ca1157
Author: YeJunHao <[email protected]>
AuthorDate: Tue Jun 27 13:53:19 2023 +0800

    [flink] unaware-bucket mode parallelism read (streaming and batch) (#1372)
---
 .../paimon/table/AppendOnlyFileStoreTable.java     |   4 +-
 .../table/source/AppendOnlySplitGenerator.java     |  20 +-
 .../table/source/MergeTreeSplitGenerator.java      |   9 +-
 .../apache/paimon/table/source/SplitGenerator.java |   4 +-
 .../table/source/snapshot/SnapshotReaderImpl.java  |  23 +-
 .../paimon/table/AppendOnlyFileStoreTableTest.java |  56 +++++
 .../paimon/table/source/SplitGeneratorTest.java    |  20 +-
 .../source/ContinuousFileSplitEnumerator.java      |  88 ++++----
 .../flink/source/ContinuousFileStoreSource.java    |  18 +-
 .../paimon/flink/source/FlinkSourceBuilder.java    |  10 +-
 .../paimon/flink/source/StaticFileStoreSource.java |  11 +-
 .../source/StaticFileStoreSplitEnumerator.java     |   2 +-
 ...veSplitAssigner.java => FIFOSplitAssigner.java} |  15 +-
 ...itAssigner.java => PreAssignSplitAssigner.java} |  20 +-
 .../flink/source/assigners/SplitAssigner.java      |   5 +-
 .../source/ContinuousFileSplitEnumeratorTest.java  | 242 ++++++++++++++++++++-
 .../source/FileStoreSourceSplitGeneratorTest.java  |  15 +-
 17 files changed, 461 insertions(+), 101 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index e5f65eace..68f274465 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -86,7 +86,9 @@ public class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
     @Override
     public SplitGenerator splitGenerator() {
         return new AppendOnlySplitGenerator(
-                store().options().splitTargetSize(), 
store().options().splitOpenFileCost());
+                store().options().splitTargetSize(),
+                store().options().splitOpenFileCost(),
+                bucketMode());
     }
 
     /**
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendOnlySplitGenerator.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendOnlySplitGenerator.java
index b00139ab9..e9c1b4b0f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendOnlySplitGenerator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendOnlySplitGenerator.java
@@ -19,9 +19,11 @@
 package org.apache.paimon.table.source;
 
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.utils.BinPacking;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.function.Function;
 
@@ -32,17 +34,31 @@ public class AppendOnlySplitGenerator implements 
SplitGenerator {
 
     private final long targetSplitSize;
     private final long openFileCost;
+    private final BucketMode bucketMode;
 
-    public AppendOnlySplitGenerator(long targetSplitSize, long openFileCost) {
+    public AppendOnlySplitGenerator(
+            long targetSplitSize, long openFileCost, BucketMode bucketMode) {
         this.targetSplitSize = targetSplitSize;
         this.openFileCost = openFileCost;
+        this.bucketMode = bucketMode;
     }
 
     @Override
-    public List<List<DataFileMeta>> split(List<DataFileMeta> input) {
+    public List<List<DataFileMeta>> splitForBatch(List<DataFileMeta> input) {
         List<DataFileMeta> files = new ArrayList<>(input);
         files.sort(fileComparator(false));
         Function<DataFileMeta, Long> weightFunc = file -> 
Math.max(file.fileSize(), openFileCost);
         return BinPacking.packForOrdered(files, weightFunc, targetSplitSize);
     }
+
+    @Override
+    public List<List<DataFileMeta>> splitForStreaming(List<DataFileMeta> 
files) {
+        // When the bucket mode is unaware, we spit the files as batch, 
because unaware-bucket table
+        // only contains one bucket (bucket 0).
+        if (bucketMode == BucketMode.UNAWARE) {
+            return splitForBatch(files);
+        } else {
+            return Collections.singletonList(files);
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java
index af9484804..cd024e1e7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java
@@ -25,6 +25,7 @@ import org.apache.paimon.mergetree.compact.IntervalPartition;
 import org.apache.paimon.utils.BinPacking;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.function.Function;
@@ -47,7 +48,7 @@ public class MergeTreeSplitGenerator implements 
SplitGenerator {
     }
 
     @Override
-    public List<List<DataFileMeta>> split(List<DataFileMeta> files) {
+    public List<List<DataFileMeta>> splitForBatch(List<DataFileMeta> files) {
         /*
          * The generator aims to parallel the scan execution by slicing the 
files of each bucket
          * into multiple splits. The generation has one constraint: files with 
intersected key
@@ -76,6 +77,12 @@ public class MergeTreeSplitGenerator implements 
SplitGenerator {
         return packSplits(sections);
     }
 
+    @Override
+    public List<List<DataFileMeta>> splitForStreaming(List<DataFileMeta> 
files) {
+        // We don't split streaming scan files
+        return Collections.singletonList(files);
+    }
+
     private List<List<DataFileMeta>> packSplits(List<List<DataFileMeta>> 
sections) {
         Function<List<DataFileMeta>, Long> weightFunc =
                 file -> Math.max(totalSize(file), openFileCost);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/SplitGenerator.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/SplitGenerator.java
index 7677875d0..f4aa7c7e2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/SplitGenerator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/SplitGenerator.java
@@ -25,5 +25,7 @@ import java.util.List;
 /** Generate splits from {@link DataFileMeta}s. */
 public interface SplitGenerator {
 
-    List<List<DataFileMeta>> split(List<DataFileMeta> files);
+    List<List<DataFileMeta>> splitForBatch(List<DataFileMeta> files);
+
+    List<List<DataFileMeta>> splitForStreaming(List<DataFileMeta> files);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index c82936434..15f4ab72c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -285,17 +285,20 @@ public class SnapshotReaderImpl implements SnapshotReader 
{
             for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry : 
buckets.entrySet()) {
                 int bucket = bucketEntry.getKey();
                 if (isIncremental) {
-                    // Don't split when incremental
-                    splits.add(
-                            new DataSplit(
-                                    snapshotId,
-                                    partition,
-                                    bucket,
-                                    bucketEntry.getValue(),
-                                    true,
-                                    reverseRowKind));
+                    // streaming splits incremental data files
+                    
splitGenerator.splitForStreaming(bucketEntry.getValue()).stream()
+                            .map(
+                                    files ->
+                                            new DataSplit(
+                                                    snapshotId,
+                                                    partition,
+                                                    bucket,
+                                                    files,
+                                                    true,
+                                                    reverseRowKind))
+                            .forEach(splits::add);
                 } else {
-                    splitGenerator.split(bucketEntry.getValue()).stream()
+                    
splitGenerator.splitForBatch(bucketEntry.getValue()).stream()
                             .map(
                                     files ->
                                             new DataSplit(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
index 5599eec76..6c8546866 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
@@ -33,13 +33,16 @@ import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.SchemaUtils;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.StreamTableScan;
 import org.apache.paimon.table.source.TableRead;
 
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
@@ -171,6 +174,40 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
         assertThat(partitions).containsExactly(1, 2, 3);
     }
 
+    @Test
+    public void testStreamingSplitInUnawareBucketMode() throws Exception {
+        // in unaware-bucket mode, we split files into splits all the time
+        FileStoreTable table =
+                createUnawareBucketFileStoreTable(
+                        options -> 
options.set(CoreOptions.SOURCE_SPLIT_TARGET_SIZE.key(), "1 M"));
+
+        StreamTableScan scan = table.newStreamScan();
+
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser);
+        List<CommitMessage> result = new ArrayList<>();
+        write.write(rowData(3, 33, 303L));
+        result.addAll(write.prepareCommit(true, 0));
+        write.write(rowData(1, 10, 100L));
+        result.addAll(write.prepareCommit(true, 0));
+        write.write(rowData(2, 22, 202L));
+        result.addAll(write.prepareCommit(true, 0));
+        commit.commit(0, result);
+        result.clear();
+        Assertions.assertEquals(scan.plan().splits().size(), 3);
+
+        write.write(rowData(3, 33, 303L));
+        result.addAll(write.prepareCommit(true, 1));
+        write.write(rowData(1, 10, 100L));
+        result.addAll(write.prepareCommit(true, 1));
+        write.write(rowData(2, 22, 202L));
+        result.addAll(write.prepareCommit(true, 1));
+        commit.commit(1, result);
+        Assertions.assertEquals(scan.plan().splits().size(), 3);
+
+        write.close();
+    }
+
     @Test
     public void testStreamingProjection() throws Exception {
         writeData();
@@ -375,4 +412,23 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
                                 ""));
         return new AppendOnlyFileStoreTable(FileIOFinder.find(tablePath), 
tablePath, tableSchema);
     }
+
+    protected FileStoreTable 
createUnawareBucketFileStoreTable(Consumer<Options> configure)
+            throws Exception {
+        Options conf = new Options();
+        conf.set(CoreOptions.PATH, tablePath.toString());
+        conf.set(CoreOptions.WRITE_MODE, WriteMode.APPEND_ONLY);
+        conf.set(CoreOptions.BUCKET, -1);
+        configure.accept(conf);
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(
+                        new SchemaManager(LocalFileIO.create(), tablePath),
+                        new Schema(
+                                ROW_TYPE.getFields(),
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                conf.toMap(),
+                                ""));
+        return new AppendOnlyFileStoreTable(FileIOFinder.find(tablePath), 
tablePath, tableSchema);
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
index cc482e6ff..1d0bb649f 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.source;
 
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.table.BucketMode;
 
 import org.junit.jupiter.api.Test;
 
@@ -62,20 +63,29 @@ public class SplitGeneratorTest {
                         newFileFromSequence("4", 23, 41, 50),
                         newFileFromSequence("5", 4, 51, 60),
                         newFileFromSequence("6", 101, 61, 100));
-        assertThat(toNames(new AppendOnlySplitGenerator(40, 2).split(files)))
+        assertThat(
+                        toNames(
+                                new AppendOnlySplitGenerator(40, 2, 
BucketMode.FIXED)
+                                        .splitForBatch(files)))
                 .containsExactlyInAnyOrder(
                         Arrays.asList("1", "2"),
                         Collections.singletonList("3"),
                         Arrays.asList("4", "5"),
                         Collections.singletonList("6"));
 
-        assertThat(toNames(new AppendOnlySplitGenerator(70, 2).split(files)))
+        assertThat(
+                        toNames(
+                                new AppendOnlySplitGenerator(70, 2, 
BucketMode.FIXED)
+                                        .splitForBatch(files)))
                 .containsExactlyInAnyOrder(
                         Arrays.asList("1", "2", "3"),
                         Arrays.asList("4", "5"),
                         Collections.singletonList("6"));
 
-        assertThat(toNames(new AppendOnlySplitGenerator(40, 20).split(files)))
+        assertThat(
+                        toNames(
+                                new AppendOnlySplitGenerator(40, 20, 
BucketMode.FIXED)
+                                        .splitForBatch(files)))
                 .containsExactlyInAnyOrder(
                         Arrays.asList("1", "2"),
                         Collections.singletonList("3"),
@@ -95,11 +105,11 @@ public class SplitGeneratorTest {
                         fromMinMax("5", 82, 85),
                         fromMinMax("6", 100, 200));
         Comparator<InternalRow> comparator = Comparator.comparingInt(o -> 
o.getInt(0));
-        assertThat(toNames(new MergeTreeSplitGenerator(comparator, 100, 
2).split(files)))
+        assertThat(toNames(new MergeTreeSplitGenerator(comparator, 100, 
2).splitForBatch(files)))
                 .containsExactlyInAnyOrder(
                         Arrays.asList("1", "2", "4", "3", "5"), 
Collections.singletonList("6"));
 
-        assertThat(toNames(new MergeTreeSplitGenerator(comparator, 100, 
30).split(files)))
+        assertThat(toNames(new MergeTreeSplitGenerator(comparator, 100, 
30).splitForBatch(files)))
                 .containsExactlyInAnyOrder(
                         Arrays.asList("1", "2", "4", "3"),
                         Collections.singletonList("5"),
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
index 166c2acca..c7772d5fb 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
@@ -18,6 +18,10 @@
 
 package org.apache.paimon.flink.source;
 
+import org.apache.paimon.flink.source.assigners.FIFOSplitAssigner;
+import org.apache.paimon.flink.source.assigners.PreAssignSplitAssigner;
+import org.apache.paimon.flink.source.assigners.SplitAssigner;
+import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.EndOfScanException;
 import org.apache.paimon.table.source.StreamTableScan;
@@ -35,10 +39,10 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedList;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -54,8 +58,6 @@ public class ContinuousFileSplitEnumerator
 
     private final SplitEnumeratorContext<FileStoreSourceSplit> context;
 
-    private final Map<Integer, LinkedList<FileStoreSourceSplit>> bucketSplits;
-
     private final long discoveryInterval;
 
     private final Set<Integer> readersAwaitingSplit;
@@ -64,8 +66,9 @@ public class ContinuousFileSplitEnumerator
 
     private final StreamTableScan scan;
 
-    /** Default batch splits size to avoid exceed `akka.framesize`. */
-    private final int splitBatchSize;
+    private final SplitAssigner splitAssigner;
+
+    private final BucketMode bucketMode;
 
     @Nullable private Long nextSnapshotId;
 
@@ -76,18 +79,21 @@ public class ContinuousFileSplitEnumerator
             Collection<FileStoreSourceSplit> remainSplits,
             @Nullable Long nextSnapshotId,
             long discoveryInterval,
-            int splitBatchSize,
-            StreamTableScan scan) {
+            StreamTableScan scan,
+            BucketMode bucketMode) {
         checkArgument(discoveryInterval > 0L);
         this.context = checkNotNull(context);
-        this.bucketSplits = new HashMap<>();
-        addSplits(remainSplits);
         this.nextSnapshotId = nextSnapshotId;
         this.discoveryInterval = discoveryInterval;
-        this.splitBatchSize = splitBatchSize;
-        this.readersAwaitingSplit = new HashSet<>();
+        this.readersAwaitingSplit = new LinkedHashSet<>();
         this.splitGenerator = new FileStoreSourceSplitGenerator();
         this.scan = scan;
+        this.bucketMode = bucketMode;
+        this.splitAssigner =
+                bucketMode == BucketMode.UNAWARE
+                        ? new FIFOSplitAssigner(Collections.emptyList())
+                        : new PreAssignSplitAssigner(1, context, 
Collections.emptyList());
+        addSplits(remainSplits);
     }
 
     private void addSplits(Collection<FileStoreSourceSplit> splits) {
@@ -95,19 +101,7 @@ public class ContinuousFileSplitEnumerator
     }
 
     private void addSplit(FileStoreSourceSplit split) {
-        bucketSplits
-                .computeIfAbsent(((DataSplit) split.split()).bucket(), i -> 
new LinkedList<>())
-                .add(split);
-    }
-
-    private void addSplitsBack(Collection<FileStoreSourceSplit> splits) {
-        new 
LinkedList<>(splits).descendingIterator().forEachRemaining(this::addSplitToHead);
-    }
-
-    private void addSplitToHead(FileStoreSourceSplit split) {
-        bucketSplits
-                .computeIfAbsent(((DataSplit) split.split()).bucket(), i -> 
new LinkedList<>())
-                .addFirst(split);
+        splitAssigner.addSplit(assignTask(((DataSplit) 
split.split()).bucket()), split);
     }
 
     @Override
@@ -139,13 +133,13 @@ public class ContinuousFileSplitEnumerator
     @Override
     public void addSplitsBack(List<FileStoreSourceSplit> splits, int 
subtaskId) {
         LOG.debug("File Source Enumerator adds splits back: {}", splits);
-        addSplitsBack(splits);
+        splitAssigner.addSplitsBack(subtaskId, splits);
     }
 
     @Override
     public PendingSplitsCheckpoint snapshotState(long checkpointId) {
         List<FileStoreSourceSplit> splits = new ArrayList<>();
-        bucketSplits.values().forEach(splits::addAll);
+        splits.addAll(splitAssigner.remainingSplits());
         final PendingSplitsCheckpoint checkpoint =
                 new PendingSplitsCheckpoint(splits, nextSnapshotId);
 
@@ -196,28 +190,30 @@ public class ContinuousFileSplitEnumerator
 
     private Map<Integer, List<FileStoreSourceSplit>> createAssignment() {
         Map<Integer, List<FileStoreSourceSplit>> assignment = new HashMap<>();
-        bucketSplits.forEach(
-                (bucket, splits) -> {
+        readersAwaitingSplit.forEach(
+                task -> {
+                    // if the reader that requested another split has failed 
in the meantime, remove
+                    // it from the list of waiting readers
+                    if (!context.registeredReaders().containsKey(task)) {
+                        readersAwaitingSplit.remove(task);
+                        return;
+                    }
+                    List<FileStoreSourceSplit> splits = 
splitAssigner.getNext(task, null);
                     if (splits.size() > 0) {
-                        // To ensure the order of consumption, the data of the 
same bucket is given
-                        // to a task to be consumed.
-                        int task = bucket % context.currentParallelism();
-                        if (readersAwaitingSplit.contains(task)) {
-                            // if the reader that requested another split has 
failed in the
-                            // meantime, remove
-                            // it from the list of waiting readers
-                            if 
(!context.registeredReaders().containsKey(task)) {
-                                readersAwaitingSplit.remove(task);
-                                return;
-                            }
-                            List<FileStoreSourceSplit> taskAssignment =
-                                    assignment.computeIfAbsent(task, i -> new 
ArrayList<>());
-                            if (taskAssignment.size() < splitBatchSize) {
-                                taskAssignment.add(splits.poll());
-                            }
-                        }
+                        assignment.put(task, splits);
                     }
                 });
         return assignment;
     }
+
+    private int assignTask(int bucket) {
+        if (bucketMode == BucketMode.UNAWARE) {
+            // we just assign task 0 when bucket unaware
+            return 0;
+        } else {
+            // if not bucket unaware, we assign the bucket % parallelism, the 
same bucket data go
+            // into the same task
+            return bucket % context.currentParallelism();
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
index b4f571069..4a2be4a58 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.flink.source;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.StreamTableScan;
 
@@ -39,11 +39,21 @@ public class ContinuousFileStoreSource extends FlinkSource {
     private static final long serialVersionUID = 4L;
 
     private final Map<String, String> options;
+    private final BucketMode bucketMode;
 
     public ContinuousFileStoreSource(
             ReadBuilder readBuilder, Map<String, String> options, @Nullable 
Long limit) {
+        this(readBuilder, options, limit, BucketMode.FIXED);
+    }
+
+    public ContinuousFileStoreSource(
+            ReadBuilder readBuilder,
+            Map<String, String> options,
+            @Nullable Long limit,
+            BucketMode bucketMode) {
         super(readBuilder, limit);
         this.options = options;
+        this.bucketMode = bucketMode;
     }
 
     @Override
@@ -70,9 +80,7 @@ public class ContinuousFileStoreSource extends FlinkSource {
                 splits,
                 nextSnapshotId,
                 coreOptions.continuousDiscoveryInterval().toMillis(),
-                coreOptions
-                        .toConfiguration()
-                        
.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE),
-                scan);
+                scan,
+                bucketMode);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index 2015a7c03..bab49ca4c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -28,6 +28,8 @@ import 
org.apache.paimon.flink.source.operator.MonitorFunction;
 import org.apache.paimon.flink.utils.TableScanUtils;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.ReadBuilder;
 
@@ -134,7 +136,13 @@ public class FlinkSourceBuilder {
 
     private DataStream<RowData> buildContinuousFileSource() {
         return toDataStream(
-                new ContinuousFileStoreSource(createReadBuilder(), 
table.options(), limit));
+                new ContinuousFileStoreSource(
+                        createReadBuilder(),
+                        table.options(),
+                        limit,
+                        table instanceof FileStoreTable
+                                ? ((FileStoreTable) table).bucketMode()
+                                : BucketMode.FIXED));
     }
 
     private DataStream<RowData> toDataStream(Source<RowData, ?, ?> source) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
index 1e8e92089..d18a49253 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
@@ -18,8 +18,8 @@
 
 package org.apache.paimon.flink.source;
 
-import org.apache.paimon.flink.source.assigners.FairSplitAssigner;
-import org.apache.paimon.flink.source.assigners.PreemptiveSplitAssigner;
+import org.apache.paimon.flink.source.assigners.FIFOSplitAssigner;
+import org.apache.paimon.flink.source.assigners.PreAssignSplitAssigner;
 import org.apache.paimon.flink.source.assigners.SplitAssigner;
 import org.apache.paimon.table.source.ReadBuilder;
 
@@ -30,7 +30,6 @@ import 
org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import javax.annotation.Nullable;
 
 import java.util.Collection;
-import java.util.LinkedList;
 import java.util.List;
 
 import static org.apache.paimon.flink.FlinkConnectorOptions.SplitAssignMode;
@@ -82,12 +81,12 @@ public class StaticFileStoreSource extends FlinkSource {
             Collection<FileStoreSourceSplit> splits) {
         switch (splitAssignMode) {
             case FAIR:
-                return new FairSplitAssigner(splitBatchSize, context, splits);
+                return new PreAssignSplitAssigner(splitBatchSize, context, 
splits);
             case PREEMPTIVE:
-                return new PreemptiveSplitAssigner(new LinkedList<>(splits));
+                return new FIFOSplitAssigner(splits);
             default:
                 throw new UnsupportedOperationException(
-                        "Unsupported assign mode " + 
splitAssignMode.toString());
+                        "Unsupported assign mode " + splitAssignMode);
         }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java
index 7415acf23..4834aecc2 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java
@@ -72,7 +72,7 @@ public class StaticFileStoreSplitEnumerator
 
     @Override
     public void addSplitsBack(List<FileStoreSourceSplit> backSplits, int 
subtaskId) {
-        splitAssigner.addSplits(subtaskId, backSplits);
+        splitAssigner.addSplitsBack(subtaskId, backSplits);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreemptiveSplitAssigner.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/FIFOSplitAssigner.java
similarity index 79%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreemptiveSplitAssigner.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/FIFOSplitAssigner.java
index b39900ef3..87289b34d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreemptiveSplitAssigner.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/FIFOSplitAssigner.java
@@ -28,17 +28,17 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.ListIterator;
-import java.util.Queue;
 
 /**
  * Splits are assigned preemptively in the order requested by the task. Only 
one split is assigned
- * to the task at a time.
+ * to the task at a time. This is unaware of subtask, it does not care about 
the relationship
+ * between splits and subtask, so only one queue is created and splits are 
fetched in order.
  */
-public class PreemptiveSplitAssigner implements SplitAssigner {
+public class FIFOSplitAssigner implements SplitAssigner {
 
     private final LinkedList<FileStoreSourceSplit> pendingSplitAssignment;
 
-    public PreemptiveSplitAssigner(Queue<FileStoreSourceSplit> splits) {
+    public FIFOSplitAssigner(Collection<FileStoreSourceSplit> splits) {
         this.pendingSplitAssignment = new LinkedList<>(splits);
     }
 
@@ -49,7 +49,12 @@ public class PreemptiveSplitAssigner implements 
SplitAssigner {
     }
 
     @Override
-    public void addSplits(int subtask, List<FileStoreSourceSplit> splits) {
+    public void addSplit(int subtask, FileStoreSourceSplit split) {
+        pendingSplitAssignment.add(split);
+    }
+
+    @Override
+    public void addSplitsBack(int subtask, List<FileStoreSourceSplit> splits) {
         ListIterator<FileStoreSourceSplit> iterator = 
splits.listIterator(splits.size());
         while (iterator.hasPrevious()) {
             pendingSplitAssignment.addFirst(iterator.previous());
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/FairSplitAssigner.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java
similarity index 84%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/FairSplitAssigner.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java
index c8ff92356..3f45133c3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/FairSplitAssigner.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java
@@ -38,19 +38,20 @@ import java.util.Queue;
  * Pre-calculate which splits each task should process according to the 
weight, and then distribute
  * the splits fairly.
  */
-public class FairSplitAssigner implements SplitAssigner {
+public class PreAssignSplitAssigner implements SplitAssigner {
 
     /** Default batch splits size to avoid exceed `akka.framesize`. */
     private final int splitBatchSize;
 
     private final Map<Integer, LinkedList<FileStoreSourceSplit>> 
pendingSplitAssignment;
 
-    public FairSplitAssigner(
+    public PreAssignSplitAssigner(
             int splitBatchSize,
             SplitEnumeratorContext<FileStoreSourceSplit> context,
             Collection<FileStoreSourceSplit> splits) {
         this.splitBatchSize = splitBatchSize;
-        this.pendingSplitAssignment = createSplitAssignment(splits, 
context.currentParallelism());
+        this.pendingSplitAssignment =
+                createBatchFairSplitAssignment(splits, 
context.currentParallelism());
     }
 
     @Override
@@ -67,7 +68,12 @@ public class FairSplitAssigner implements SplitAssigner {
     }
 
     @Override
-    public void addSplits(int subtask, List<FileStoreSourceSplit> splits) {
+    public void addSplit(int subtask, FileStoreSourceSplit split) {
+        pendingSplitAssignment.computeIfAbsent(subtask, k -> new 
LinkedList<>()).add(split);
+    }
+
+    @Override
+    public void addSplitsBack(int subtask, List<FileStoreSourceSplit> splits) {
         LinkedList<FileStoreSourceSplit> remainingSplits =
                 pendingSplitAssignment.computeIfAbsent(subtask, k -> new 
LinkedList<>());
         ListIterator<FileStoreSourceSplit> iterator = 
splits.listIterator(splits.size());
@@ -83,7 +89,11 @@ public class FairSplitAssigner implements SplitAssigner {
         return splits;
     }
 
-    private static Map<Integer, LinkedList<FileStoreSourceSplit>> 
createSplitAssignment(
+    /**
+     * this method only reload restore for batch execute, because in streaming 
mode, we need to
+     * assign certain bucket to certain task.
+     */
+    private static Map<Integer, LinkedList<FileStoreSourceSplit>> 
createBatchFairSplitAssignment(
             Collection<FileStoreSourceSplit> splits, int numReaders) {
         List<List<FileStoreSourceSplit>> assignmentList =
                 BinPacking.packForFixedBinNumber(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/SplitAssigner.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/SplitAssigner.java
index 1304d0938..ea0f99804 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/SplitAssigner.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/SplitAssigner.java
@@ -39,11 +39,14 @@ public interface SplitAssigner {
      */
     List<FileStoreSourceSplit> getNext(int subtask, @Nullable String hostname);
 
+    /** Add one split of a specified subtask to the assigner. */
+    void addSplit(int subtask, FileStoreSourceSplit splits);
+
     /**
      * Adds a set of splits to this assigner. This happens for example when 
some split processing
      * failed and the splits need to be re-added, or when new splits got 
discovered.
      */
-    void addSplits(int subtask, List<FileStoreSourceSplit> splits);
+    void addSplitsBack(int subtask, List<FileStoreSourceSplit> splits);
 
     /** Gets the remaining splits that this assigner has pending. */
     Collection<FileStoreSourceSplit> remainingSplits();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index af7101161..a885e9596 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.source;
 
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.source.DataFilePlan;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.EndOfScanException;
@@ -116,7 +117,6 @@ public class ContinuousFileSplitEnumeratorTest {
                         .setSplitEnumeratorContext(context)
                         .setInitialSplits(initialSplits)
                         .setDiscoveryInterval(3)
-                        .setSplitBatchSize(10)
                         .build();
 
         // The first time split is allocated, split1 and split2 should be 
allocated
@@ -125,19 +125,19 @@ public class ContinuousFileSplitEnumeratorTest {
                 context.getSplitAssignments();
         // Only subtask-0 is allocated.
         assertThat(assignments).containsOnlyKeys(0);
-        assertThat(assignments.get(0).getAssignedSplits()).hasSize(10);
+        assertThat(assignments.get(0).getAssignedSplits()).hasSize(1);
 
         // test second batch assign
         enumerator.handleSplitRequest(0, "test-host");
 
         assertThat(assignments).containsOnlyKeys(0);
-        assertThat(assignments.get(0).getAssignedSplits()).hasSize(18);
+        assertThat(assignments.get(0).getAssignedSplits()).hasSize(2);
 
         // test third batch assign
         enumerator.handleSplitRequest(0, "test-host");
 
         assertThat(assignments).containsOnlyKeys(0);
-        assertThat(assignments.get(0).getAssignedSplits()).hasSize(18);
+        assertThat(assignments.get(0).getAssignedSplits()).hasSize(3);
     }
 
     @Test
@@ -163,6 +163,7 @@ public class ContinuousFileSplitEnumeratorTest {
 
         // each time a split is allocated from bucket-0 and bucket-1
         enumerator.handleSplitRequest(0, "test-host");
+        enumerator.handleSplitRequest(0, "test-host");
         Map<Integer, SplitAssignmentState<FileStoreSourceSplit>> assignments =
                 context.getSplitAssignments();
         // Only subtask-0 is allocated.
@@ -176,6 +177,7 @@ public class ContinuousFileSplitEnumeratorTest {
 
         // continuing to allocate the rest splits
         enumerator.handleSplitRequest(0, "test-host");
+        enumerator.handleSplitRequest(0, "test-host");
         assignments = context.getSplitAssignments();
         // Only subtask-0 is allocated.
         assertThat(assignments).containsOnlyKeys(0);
@@ -214,6 +216,13 @@ public class ContinuousFileSplitEnumeratorTest {
         Map<Integer, SplitAssignmentState<FileStoreSourceSplit>> assignments =
                 context.getSplitAssignments();
         assertThat(assignments).containsOnlyKeys(0);
+        assertThat(toDataSplits(assignments.get(0).getAssignedSplits()))
+                .containsExactly(splits.get(0));
+
+        // assign to task 0
+        enumerator.handleSplitRequest(0, "test-host");
+        assignments = context.getSplitAssignments();
+        assertThat(assignments).containsOnlyKeys(0);
         assertThat(toDataSplits(assignments.get(0).getAssignedSplits()))
                 .containsExactly(splits.get(0), splits.get(2));
 
@@ -225,6 +234,13 @@ public class ContinuousFileSplitEnumeratorTest {
         
assertThat(assignments.get(0).hasReceivedNoMoreSplitsSignal()).isTrue();
         assignments.clear();
 
+        // assign to task 1
+        enumerator.handleSplitRequest(1, "test-host");
+        assignments = context.getSplitAssignments();
+        assertThat(assignments).containsOnlyKeys(1);
+        assertThat(toDataSplits(assignments.get(1).getAssignedSplits()))
+                .containsExactly(splits.get(1));
+
         // assign to task 1
         enumerator.handleSplitRequest(1, "test-host");
         assignments = context.getSplitAssignments();
@@ -239,6 +255,212 @@ public class ContinuousFileSplitEnumeratorTest {
         
assertThat(assignments.get(1).hasReceivedNoMoreSplitsSignal()).isTrue();
     }
 
+    @Test
+    public void testUnawareBucketEnumeratorWithBucket() {
+        final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
+                new TestingSplitEnumeratorContext<>(3);
+        context.registerReader(0, "test-host");
+
+        Queue<TableScan.Plan> results = new LinkedBlockingQueue<>();
+        StreamTableScan scan = new MockScan(results);
+        ContinuousFileSplitEnumerator enumerator =
+                new Builder()
+                        .setSplitEnumeratorContext(context)
+                        .setInitialSplits(Collections.emptyList())
+                        .setDiscoveryInterval(1)
+                        .setScan(scan)
+                        .withBucketMode(BucketMode.UNAWARE)
+                        .build();
+        enumerator.start();
+
+        long snapshot = 0;
+        List<DataSplit> splits = new ArrayList<>();
+        splits.add(createDataSplit(snapshot, 1, Collections.emptyList()));
+        results.add(new DataFilePlan(splits));
+        context.triggerAllActions();
+
+        // assign to task 0
+        enumerator.handleSplitRequest(0, "test-host");
+        Map<Integer, SplitAssignmentState<FileStoreSourceSplit>> assignments =
+                context.getSplitAssignments();
+        assertThat(assignments).containsOnlyKeys(0);
+        
assertThat(toDataSplits(assignments.get(0).getAssignedSplits()).size()).isEqualTo(1);
+
+        splits.clear();
+        splits.add(createDataSplit(snapshot, 2, Collections.emptyList()));
+        results.add(new DataFilePlan(splits));
+        context.triggerAllActions();
+
+        // assign to task 0
+        enumerator.handleSplitRequest(0, "test-host");
+        assignments = context.getSplitAssignments();
+        assertThat(assignments).containsOnlyKeys(0);
+        
assertThat(toDataSplits(assignments.get(0).getAssignedSplits()).size()).isEqualTo(2);
+    }
+
+    @Test
+    public void testUnawareBucketEnumeratorLot() {
+        final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
+                new TestingSplitEnumeratorContext<>(4);
+        context.registerReader(0, "test-host");
+        context.registerReader(1, "test-host");
+        context.registerReader(2, "test-host");
+        context.registerReader(3, "test-host");
+
+        Queue<TableScan.Plan> results = new LinkedBlockingQueue<>();
+        StreamTableScan scan = new MockScan(results);
+        ContinuousFileSplitEnumerator enumerator =
+                new Builder()
+                        .setSplitEnumeratorContext(context)
+                        .setInitialSplits(Collections.emptyList())
+                        .setDiscoveryInterval(1)
+                        .setScan(scan)
+                        .withBucketMode(BucketMode.UNAWARE)
+                        .build();
+        enumerator.start();
+
+        long snapshot = 0;
+        List<DataSplit> splits = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            splits.add(createDataSplit(snapshot, 0, Collections.emptyList()));
+        }
+        results.add(new DataFilePlan(splits));
+        context.triggerAllActions();
+
+        // assign to task 0
+        enumerator.handleSplitRequest(0, "test-host");
+        Map<Integer, SplitAssignmentState<FileStoreSourceSplit>> assignments =
+                context.getSplitAssignments();
+        assertThat(assignments).containsOnlyKeys(0);
+        
assertThat(toDataSplits(assignments.get(0).getAssignedSplits()).size()).isEqualTo(1);
+
+        // assign to task 1
+        enumerator.handleSplitRequest(1, "test-host");
+        assignments = context.getSplitAssignments();
+        assertThat(assignments).containsOnlyKeys(0, 1);
+        
assertThat(toDataSplits(assignments.get(1).getAssignedSplits()).size()).isEqualTo(1);
+
+        // assign to task 2
+        enumerator.handleSplitRequest(2, "test-host");
+        assignments = context.getSplitAssignments();
+        assertThat(assignments).containsOnlyKeys(0, 1, 2);
+        
assertThat(toDataSplits(assignments.get(2).getAssignedSplits()).size()).isEqualTo(1);
+
+        for (int i = 0; i < 97; i++) {
+            enumerator.handleSplitRequest(3, "test-host");
+            assignments = context.getSplitAssignments();
+            assertThat(assignments).containsOnlyKeys(0, 1, 2, 3);
+            
assertThat(toDataSplits(assignments.get(3).getAssignedSplits()).size())
+                    .isEqualTo(i + 1);
+        }
+
+        enumerator.handleSplitRequest(3, "test-host");
+        context.triggerAllActions();
+        assignments = context.getSplitAssignments();
+        assertThat(assignments).containsOnlyKeys(0, 1, 2, 3);
+        
assertThat(assignments.get(3).hasReceivedNoMoreSplitsSignal()).isTrue();
+    }
+
+    @Test
+    public void testUnawareBucketEnumeratorAssignLater() {
+        final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
+                new TestingSplitEnumeratorContext<>(4);
+        context.registerReader(0, "test-host");
+        context.registerReader(1, "test-host");
+        context.registerReader(2, "test-host");
+        context.registerReader(3, "test-host");
+
+        Queue<TableScan.Plan> results = new LinkedBlockingQueue<>();
+        StreamTableScan scan = new MockScan(results);
+        ContinuousFileSplitEnumerator enumerator =
+                new Builder()
+                        .setSplitEnumeratorContext(context)
+                        .setInitialSplits(Collections.emptyList())
+                        .setDiscoveryInterval(1)
+                        .setScan(scan)
+                        .withBucketMode(BucketMode.UNAWARE)
+                        .build();
+        enumerator.start();
+
+        // assign to task 0, but no assigned. add to wait list
+        enumerator.handleSplitRequest(0, "test-host");
+        Map<Integer, SplitAssignmentState<FileStoreSourceSplit>> assignments =
+                context.getSplitAssignments();
+        assertThat(assignments.size()).isEqualTo(0);
+
+        // assign to task 1, but no assigned. add to wait list
+        enumerator.handleSplitRequest(1, "test-host");
+        assignments = context.getSplitAssignments();
+        assertThat(assignments.size()).isEqualTo(0);
+
+        long snapshot = 0;
+        List<DataSplit> splits = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            splits.add(createDataSplit(snapshot, 0, Collections.emptyList()));
+        }
+        results.add(new DataFilePlan(splits));
+        // trigger assign task 0 and task 1 will get their assignment
+        context.triggerAllActions();
+
+        assignments = context.getSplitAssignments();
+        assertThat(assignments).containsOnlyKeys(0, 1);
+        assertThat(assignments.get(0).getAssignedSplits().size()).isEqualTo(1);
+        assertThat(assignments.get(1).getAssignedSplits().size()).isEqualTo(1);
+
+        // assign to task 2
+        enumerator.handleSplitRequest(2, "test-host");
+        assignments = context.getSplitAssignments();
+        assertThat(assignments).containsOnlyKeys(0, 1, 2);
+        
assertThat(toDataSplits(assignments.get(2).getAssignedSplits()).size()).isEqualTo(1);
+
+        // assign to task 3
+        enumerator.handleSplitRequest(3, "test-host");
+        assignments = context.getSplitAssignments();
+        assertThat(assignments).containsOnlyKeys(0, 1, 2, 3);
+        
assertThat(toDataSplits(assignments.get(3).getAssignedSplits()).size()).isEqualTo(1);
+    }
+
+    @Test
+    public void testEnumeratorDeregisteredByContext() {
+        final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
+                new TestingSplitEnumeratorContext<>(2);
+        context.registerReader(0, "test-host");
+        context.registerReader(1, "test-host");
+
+        Queue<TableScan.Plan> results = new LinkedBlockingQueue<>();
+        StreamTableScan scan = new MockScan(results);
+        ContinuousFileSplitEnumerator enumerator =
+                new Builder()
+                        .setSplitEnumeratorContext(context)
+                        .setInitialSplits(Collections.emptyList())
+                        .setDiscoveryInterval(1)
+                        .setScan(scan)
+                        .withBucketMode(BucketMode.UNAWARE)
+                        .build();
+        enumerator.start();
+
+        long snapshot = 0;
+        List<DataSplit> splits = new ArrayList<>();
+        for (int i = 0; i < 4; i++) {
+            splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
+        }
+        results.add(new DataFilePlan(splits));
+        context.triggerAllActions();
+
+        // assign to task 0
+        context.registeredReaders().remove(0);
+        enumerator.handleSplitRequest(0, "test-host");
+        Map<Integer, SplitAssignmentState<FileStoreSourceSplit>> assignments =
+                context.getSplitAssignments();
+        assertThat(assignments.size()).isEqualTo(0);
+
+        // assign to task 1
+        enumerator.handleSplitRequest(1, "test-host");
+        assignments = context.getSplitAssignments();
+        assertThat(assignments).containsOnlyKeys(1);
+        
assertThat(toDataSplits(assignments.get(1).getAssignedSplits()).size()).isEqualTo(1);
+    }
+
     private static List<DataSplit> toDataSplits(List<FileStoreSourceSplit> 
splits) {
         return splits.stream()
                 .map(FileStoreSourceSplit::split)
@@ -264,8 +486,8 @@ public class ContinuousFileSplitEnumeratorTest {
         private Collection<FileStoreSourceSplit> initialSplits = 
Collections.emptyList();
         private long discoveryInterval = Long.MAX_VALUE;
 
-        private int splitBatchSize = 10;
         private StreamTableScan scan;
+        private BucketMode bucketMode = BucketMode.FIXED;
 
         public Builder setSplitEnumeratorContext(
                 SplitEnumeratorContext<FileStoreSourceSplit> context) {
@@ -283,19 +505,19 @@ public class ContinuousFileSplitEnumeratorTest {
             return this;
         }
 
-        public Builder setSplitBatchSize(int splitBatchSize) {
-            this.splitBatchSize = splitBatchSize;
+        public Builder setScan(StreamTableScan scan) {
+            this.scan = scan;
             return this;
         }
 
-        public Builder setScan(StreamTableScan scan) {
-            this.scan = scan;
+        public Builder withBucketMode(BucketMode bucketMode) {
+            this.bucketMode = bucketMode;
             return this;
         }
 
         public ContinuousFileSplitEnumerator build() {
             return new ContinuousFileSplitEnumerator(
-                    context, initialSplits, null, discoveryInterval, 
splitBatchSize, scan);
+                    context, initialSplits, null, discoveryInterval, scan, 
bucketMode);
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
index ee7ff89af..87b082011 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
@@ -25,6 +25,7 @@ import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.stats.StatsTestUtils;
 import org.apache.paimon.table.source.DataFilePlan;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.SplitGenerator;
 import org.apache.paimon.table.source.snapshot.SnapshotReaderImpl;
 
 import org.junit.jupiter.api.Test;
@@ -83,7 +84,19 @@ public class FileStoreSourceSplitGeneratorTest {
                         1L,
                         false,
                         false,
-                        Collections::singletonList,
+                        new SplitGenerator() {
+                            @Override
+                            public List<List<DataFileMeta>> splitForBatch(
+                                    List<DataFileMeta> files) {
+                                return Collections.singletonList(files);
+                            }
+
+                            @Override
+                            public List<List<DataFileMeta>> splitForStreaming(
+                                    List<DataFileMeta> files) {
+                                return null;
+                            }
+                        },
                         
FileStoreScan.Plan.groupByPartFiles(plan.files(FileKind.ADD)));
         DataFilePlan tableScanPlan = new DataFilePlan(scanSplits);
 

Reply via email to