This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f122bf26a [flink] unaware-bucket mode parallelism read (streaming and
batch) (#1372)
f122bf26a is described below
commit f122bf26a4acdcc52f9ba04a5a01c350d3ca1157
Author: YeJunHao <[email protected]>
AuthorDate: Tue Jun 27 13:53:19 2023 +0800
[flink] unaware-bucket mode parallelism read (streaming and batch) (#1372)
---
.../paimon/table/AppendOnlyFileStoreTable.java | 4 +-
.../table/source/AppendOnlySplitGenerator.java | 20 +-
.../table/source/MergeTreeSplitGenerator.java | 9 +-
.../apache/paimon/table/source/SplitGenerator.java | 4 +-
.../table/source/snapshot/SnapshotReaderImpl.java | 23 +-
.../paimon/table/AppendOnlyFileStoreTableTest.java | 56 +++++
.../paimon/table/source/SplitGeneratorTest.java | 20 +-
.../source/ContinuousFileSplitEnumerator.java | 88 ++++----
.../flink/source/ContinuousFileStoreSource.java | 18 +-
.../paimon/flink/source/FlinkSourceBuilder.java | 10 +-
.../paimon/flink/source/StaticFileStoreSource.java | 11 +-
.../source/StaticFileStoreSplitEnumerator.java | 2 +-
...veSplitAssigner.java => FIFOSplitAssigner.java} | 15 +-
...itAssigner.java => PreAssignSplitAssigner.java} | 20 +-
.../flink/source/assigners/SplitAssigner.java | 5 +-
.../source/ContinuousFileSplitEnumeratorTest.java | 242 ++++++++++++++++++++-
.../source/FileStoreSourceSplitGeneratorTest.java | 15 +-
17 files changed, 461 insertions(+), 101 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index e5f65eace..68f274465 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -86,7 +86,9 @@ public class AppendOnlyFileStoreTable extends
AbstractFileStoreTable {
@Override
public SplitGenerator splitGenerator() {
return new AppendOnlySplitGenerator(
- store().options().splitTargetSize(),
store().options().splitOpenFileCost());
+ store().options().splitTargetSize(),
+ store().options().splitOpenFileCost(),
+ bucketMode());
}
/**
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendOnlySplitGenerator.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendOnlySplitGenerator.java
index b00139ab9..e9c1b4b0f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendOnlySplitGenerator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendOnlySplitGenerator.java
@@ -19,9 +19,11 @@
package org.apache.paimon.table.source;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.utils.BinPacking;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.function.Function;
@@ -32,17 +34,31 @@ public class AppendOnlySplitGenerator implements
SplitGenerator {
private final long targetSplitSize;
private final long openFileCost;
+ private final BucketMode bucketMode;
- public AppendOnlySplitGenerator(long targetSplitSize, long openFileCost) {
+ public AppendOnlySplitGenerator(
+ long targetSplitSize, long openFileCost, BucketMode bucketMode) {
this.targetSplitSize = targetSplitSize;
this.openFileCost = openFileCost;
+ this.bucketMode = bucketMode;
}
@Override
- public List<List<DataFileMeta>> split(List<DataFileMeta> input) {
+ public List<List<DataFileMeta>> splitForBatch(List<DataFileMeta> input) {
List<DataFileMeta> files = new ArrayList<>(input);
files.sort(fileComparator(false));
Function<DataFileMeta, Long> weightFunc = file ->
Math.max(file.fileSize(), openFileCost);
return BinPacking.packForOrdered(files, weightFunc, targetSplitSize);
}
+
+ @Override
+ public List<List<DataFileMeta>> splitForStreaming(List<DataFileMeta>
files) {
+ // When the bucket mode is unaware, we spit the files as batch,
because unaware-bucket table
+ // only contains one bucket (bucket 0).
+ if (bucketMode == BucketMode.UNAWARE) {
+ return splitForBatch(files);
+ } else {
+ return Collections.singletonList(files);
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java
index af9484804..cd024e1e7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java
@@ -25,6 +25,7 @@ import org.apache.paimon.mergetree.compact.IntervalPartition;
import org.apache.paimon.utils.BinPacking;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.function.Function;
@@ -47,7 +48,7 @@ public class MergeTreeSplitGenerator implements
SplitGenerator {
}
@Override
- public List<List<DataFileMeta>> split(List<DataFileMeta> files) {
+ public List<List<DataFileMeta>> splitForBatch(List<DataFileMeta> files) {
/*
* The generator aims to parallel the scan execution by slicing the
files of each bucket
* into multiple splits. The generation has one constraint: files with
intersected key
@@ -76,6 +77,12 @@ public class MergeTreeSplitGenerator implements
SplitGenerator {
return packSplits(sections);
}
+ @Override
+ public List<List<DataFileMeta>> splitForStreaming(List<DataFileMeta>
files) {
+ // We don't split streaming scan files
+ return Collections.singletonList(files);
+ }
+
private List<List<DataFileMeta>> packSplits(List<List<DataFileMeta>>
sections) {
Function<List<DataFileMeta>, Long> weightFunc =
file -> Math.max(totalSize(file), openFileCost);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/SplitGenerator.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/SplitGenerator.java
index 7677875d0..f4aa7c7e2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/SplitGenerator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/SplitGenerator.java
@@ -25,5 +25,7 @@ import java.util.List;
/** Generate splits from {@link DataFileMeta}s. */
public interface SplitGenerator {
- List<List<DataFileMeta>> split(List<DataFileMeta> files);
+ List<List<DataFileMeta>> splitForBatch(List<DataFileMeta> files);
+
+ List<List<DataFileMeta>> splitForStreaming(List<DataFileMeta> files);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index c82936434..15f4ab72c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -285,17 +285,20 @@ public class SnapshotReaderImpl implements SnapshotReader
{
for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry :
buckets.entrySet()) {
int bucket = bucketEntry.getKey();
if (isIncremental) {
- // Don't split when incremental
- splits.add(
- new DataSplit(
- snapshotId,
- partition,
- bucket,
- bucketEntry.getValue(),
- true,
- reverseRowKind));
+ // streaming splits incremental data files
+
splitGenerator.splitForStreaming(bucketEntry.getValue()).stream()
+ .map(
+ files ->
+ new DataSplit(
+ snapshotId,
+ partition,
+ bucket,
+ files,
+ true,
+ reverseRowKind))
+ .forEach(splits::add);
} else {
- splitGenerator.split(bucketEntry.getValue()).stream()
+
splitGenerator.splitForBatch(bucketEntry.getValue()).stream()
.map(
files ->
new DataSplit(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
index 5599eec76..6c8546866 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
@@ -33,13 +33,16 @@ import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableRead;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
@@ -171,6 +174,40 @@ public class AppendOnlyFileStoreTableTest extends
FileStoreTableTestBase {
assertThat(partitions).containsExactly(1, 2, 3);
}
+ @Test
+ public void testStreamingSplitInUnawareBucketMode() throws Exception {
+ // in unaware-bucket mode, we split files into splits all the time
+ FileStoreTable table =
+ createUnawareBucketFileStoreTable(
+ options ->
options.set(CoreOptions.SOURCE_SPLIT_TARGET_SIZE.key(), "1 M"));
+
+ StreamTableScan scan = table.newStreamScan();
+
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+ List<CommitMessage> result = new ArrayList<>();
+ write.write(rowData(3, 33, 303L));
+ result.addAll(write.prepareCommit(true, 0));
+ write.write(rowData(1, 10, 100L));
+ result.addAll(write.prepareCommit(true, 0));
+ write.write(rowData(2, 22, 202L));
+ result.addAll(write.prepareCommit(true, 0));
+ commit.commit(0, result);
+ result.clear();
+ Assertions.assertEquals(scan.plan().splits().size(), 3);
+
+ write.write(rowData(3, 33, 303L));
+ result.addAll(write.prepareCommit(true, 1));
+ write.write(rowData(1, 10, 100L));
+ result.addAll(write.prepareCommit(true, 1));
+ write.write(rowData(2, 22, 202L));
+ result.addAll(write.prepareCommit(true, 1));
+ commit.commit(1, result);
+ Assertions.assertEquals(scan.plan().splits().size(), 3);
+
+ write.close();
+ }
+
@Test
public void testStreamingProjection() throws Exception {
writeData();
@@ -375,4 +412,23 @@ public class AppendOnlyFileStoreTableTest extends
FileStoreTableTestBase {
""));
return new AppendOnlyFileStoreTable(FileIOFinder.find(tablePath),
tablePath, tableSchema);
}
+
+ protected FileStoreTable
createUnawareBucketFileStoreTable(Consumer<Options> configure)
+ throws Exception {
+ Options conf = new Options();
+ conf.set(CoreOptions.PATH, tablePath.toString());
+ conf.set(CoreOptions.WRITE_MODE, WriteMode.APPEND_ONLY);
+ conf.set(CoreOptions.BUCKET, -1);
+ configure.accept(conf);
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(LocalFileIO.create(), tablePath),
+ new Schema(
+ ROW_TYPE.getFields(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ conf.toMap(),
+ ""));
+ return new AppendOnlyFileStoreTable(FileIOFinder.find(tablePath),
tablePath, tableSchema);
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
index cc482e6ff..1d0bb649f 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.source;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.table.BucketMode;
import org.junit.jupiter.api.Test;
@@ -62,20 +63,29 @@ public class SplitGeneratorTest {
newFileFromSequence("4", 23, 41, 50),
newFileFromSequence("5", 4, 51, 60),
newFileFromSequence("6", 101, 61, 100));
- assertThat(toNames(new AppendOnlySplitGenerator(40, 2).split(files)))
+ assertThat(
+ toNames(
+ new AppendOnlySplitGenerator(40, 2,
BucketMode.FIXED)
+ .splitForBatch(files)))
.containsExactlyInAnyOrder(
Arrays.asList("1", "2"),
Collections.singletonList("3"),
Arrays.asList("4", "5"),
Collections.singletonList("6"));
- assertThat(toNames(new AppendOnlySplitGenerator(70, 2).split(files)))
+ assertThat(
+ toNames(
+ new AppendOnlySplitGenerator(70, 2,
BucketMode.FIXED)
+ .splitForBatch(files)))
.containsExactlyInAnyOrder(
Arrays.asList("1", "2", "3"),
Arrays.asList("4", "5"),
Collections.singletonList("6"));
- assertThat(toNames(new AppendOnlySplitGenerator(40, 20).split(files)))
+ assertThat(
+ toNames(
+ new AppendOnlySplitGenerator(40, 20,
BucketMode.FIXED)
+ .splitForBatch(files)))
.containsExactlyInAnyOrder(
Arrays.asList("1", "2"),
Collections.singletonList("3"),
@@ -95,11 +105,11 @@ public class SplitGeneratorTest {
fromMinMax("5", 82, 85),
fromMinMax("6", 100, 200));
Comparator<InternalRow> comparator = Comparator.comparingInt(o ->
o.getInt(0));
- assertThat(toNames(new MergeTreeSplitGenerator(comparator, 100,
2).split(files)))
+ assertThat(toNames(new MergeTreeSplitGenerator(comparator, 100,
2).splitForBatch(files)))
.containsExactlyInAnyOrder(
Arrays.asList("1", "2", "4", "3", "5"),
Collections.singletonList("6"));
- assertThat(toNames(new MergeTreeSplitGenerator(comparator, 100,
30).split(files)))
+ assertThat(toNames(new MergeTreeSplitGenerator(comparator, 100,
30).splitForBatch(files)))
.containsExactlyInAnyOrder(
Arrays.asList("1", "2", "4", "3"),
Collections.singletonList("5"),
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
index 166c2acca..c7772d5fb 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
@@ -18,6 +18,10 @@
package org.apache.paimon.flink.source;
+import org.apache.paimon.flink.source.assigners.FIFOSplitAssigner;
+import org.apache.paimon.flink.source.assigners.PreAssignSplitAssigner;
+import org.apache.paimon.flink.source.assigners.SplitAssigner;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.StreamTableScan;
@@ -35,10 +39,10 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
-import java.util.LinkedList;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -54,8 +58,6 @@ public class ContinuousFileSplitEnumerator
private final SplitEnumeratorContext<FileStoreSourceSplit> context;
- private final Map<Integer, LinkedList<FileStoreSourceSplit>> bucketSplits;
-
private final long discoveryInterval;
private final Set<Integer> readersAwaitingSplit;
@@ -64,8 +66,9 @@ public class ContinuousFileSplitEnumerator
private final StreamTableScan scan;
- /** Default batch splits size to avoid exceed `akka.framesize`. */
- private final int splitBatchSize;
+ private final SplitAssigner splitAssigner;
+
+ private final BucketMode bucketMode;
@Nullable private Long nextSnapshotId;
@@ -76,18 +79,21 @@ public class ContinuousFileSplitEnumerator
Collection<FileStoreSourceSplit> remainSplits,
@Nullable Long nextSnapshotId,
long discoveryInterval,
- int splitBatchSize,
- StreamTableScan scan) {
+ StreamTableScan scan,
+ BucketMode bucketMode) {
checkArgument(discoveryInterval > 0L);
this.context = checkNotNull(context);
- this.bucketSplits = new HashMap<>();
- addSplits(remainSplits);
this.nextSnapshotId = nextSnapshotId;
this.discoveryInterval = discoveryInterval;
- this.splitBatchSize = splitBatchSize;
- this.readersAwaitingSplit = new HashSet<>();
+ this.readersAwaitingSplit = new LinkedHashSet<>();
this.splitGenerator = new FileStoreSourceSplitGenerator();
this.scan = scan;
+ this.bucketMode = bucketMode;
+ this.splitAssigner =
+ bucketMode == BucketMode.UNAWARE
+ ? new FIFOSplitAssigner(Collections.emptyList())
+ : new PreAssignSplitAssigner(1, context,
Collections.emptyList());
+ addSplits(remainSplits);
}
private void addSplits(Collection<FileStoreSourceSplit> splits) {
@@ -95,19 +101,7 @@ public class ContinuousFileSplitEnumerator
}
private void addSplit(FileStoreSourceSplit split) {
- bucketSplits
- .computeIfAbsent(((DataSplit) split.split()).bucket(), i ->
new LinkedList<>())
- .add(split);
- }
-
- private void addSplitsBack(Collection<FileStoreSourceSplit> splits) {
- new
LinkedList<>(splits).descendingIterator().forEachRemaining(this::addSplitToHead);
- }
-
- private void addSplitToHead(FileStoreSourceSplit split) {
- bucketSplits
- .computeIfAbsent(((DataSplit) split.split()).bucket(), i ->
new LinkedList<>())
- .addFirst(split);
+ splitAssigner.addSplit(assignTask(((DataSplit)
split.split()).bucket()), split);
}
@Override
@@ -139,13 +133,13 @@ public class ContinuousFileSplitEnumerator
@Override
public void addSplitsBack(List<FileStoreSourceSplit> splits, int
subtaskId) {
LOG.debug("File Source Enumerator adds splits back: {}", splits);
- addSplitsBack(splits);
+ splitAssigner.addSplitsBack(subtaskId, splits);
}
@Override
public PendingSplitsCheckpoint snapshotState(long checkpointId) {
List<FileStoreSourceSplit> splits = new ArrayList<>();
- bucketSplits.values().forEach(splits::addAll);
+ splits.addAll(splitAssigner.remainingSplits());
final PendingSplitsCheckpoint checkpoint =
new PendingSplitsCheckpoint(splits, nextSnapshotId);
@@ -196,28 +190,30 @@ public class ContinuousFileSplitEnumerator
private Map<Integer, List<FileStoreSourceSplit>> createAssignment() {
Map<Integer, List<FileStoreSourceSplit>> assignment = new HashMap<>();
- bucketSplits.forEach(
- (bucket, splits) -> {
+ readersAwaitingSplit.forEach(
+ task -> {
+ // if the reader that requested another split has failed
in the meantime, remove
+ // it from the list of waiting readers
+ if (!context.registeredReaders().containsKey(task)) {
+ readersAwaitingSplit.remove(task);
+ return;
+ }
+ List<FileStoreSourceSplit> splits =
splitAssigner.getNext(task, null);
if (splits.size() > 0) {
- // To ensure the order of consumption, the data of the
same bucket is given
- // to a task to be consumed.
- int task = bucket % context.currentParallelism();
- if (readersAwaitingSplit.contains(task)) {
- // if the reader that requested another split has
failed in the
- // meantime, remove
- // it from the list of waiting readers
- if
(!context.registeredReaders().containsKey(task)) {
- readersAwaitingSplit.remove(task);
- return;
- }
- List<FileStoreSourceSplit> taskAssignment =
- assignment.computeIfAbsent(task, i -> new
ArrayList<>());
- if (taskAssignment.size() < splitBatchSize) {
- taskAssignment.add(splits.poll());
- }
- }
+ assignment.put(task, splits);
}
});
return assignment;
}
+
+ private int assignTask(int bucket) {
+ if (bucketMode == BucketMode.UNAWARE) {
+ // we just assign task 0 when bucket unaware
+ return 0;
+ } else {
+ // if not bucket unaware, we assign the bucket % parallelism, the
same bucket data go
+ // into the same task
+ return bucket % context.currentParallelism();
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
index b4f571069..4a2be4a58 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
@@ -19,7 +19,7 @@
package org.apache.paimon.flink.source;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.StreamTableScan;
@@ -39,11 +39,21 @@ public class ContinuousFileStoreSource extends FlinkSource {
private static final long serialVersionUID = 4L;
private final Map<String, String> options;
+ private final BucketMode bucketMode;
public ContinuousFileStoreSource(
ReadBuilder readBuilder, Map<String, String> options, @Nullable
Long limit) {
+ this(readBuilder, options, limit, BucketMode.FIXED);
+ }
+
+ public ContinuousFileStoreSource(
+ ReadBuilder readBuilder,
+ Map<String, String> options,
+ @Nullable Long limit,
+ BucketMode bucketMode) {
super(readBuilder, limit);
this.options = options;
+ this.bucketMode = bucketMode;
}
@Override
@@ -70,9 +80,7 @@ public class ContinuousFileStoreSource extends FlinkSource {
splits,
nextSnapshotId,
coreOptions.continuousDiscoveryInterval().toMillis(),
- coreOptions
- .toConfiguration()
-
.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE),
- scan);
+ scan,
+ bucketMode);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index 2015a7c03..bab49ca4c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -28,6 +28,8 @@ import
org.apache.paimon.flink.source.operator.MonitorFunction;
import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
@@ -134,7 +136,13 @@ public class FlinkSourceBuilder {
private DataStream<RowData> buildContinuousFileSource() {
return toDataStream(
- new ContinuousFileStoreSource(createReadBuilder(),
table.options(), limit));
+ new ContinuousFileStoreSource(
+ createReadBuilder(),
+ table.options(),
+ limit,
+ table instanceof FileStoreTable
+ ? ((FileStoreTable) table).bucketMode()
+ : BucketMode.FIXED));
}
private DataStream<RowData> toDataStream(Source<RowData, ?, ?> source) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
index 1e8e92089..d18a49253 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
@@ -18,8 +18,8 @@
package org.apache.paimon.flink.source;
-import org.apache.paimon.flink.source.assigners.FairSplitAssigner;
-import org.apache.paimon.flink.source.assigners.PreemptiveSplitAssigner;
+import org.apache.paimon.flink.source.assigners.FIFOSplitAssigner;
+import org.apache.paimon.flink.source.assigners.PreAssignSplitAssigner;
import org.apache.paimon.flink.source.assigners.SplitAssigner;
import org.apache.paimon.table.source.ReadBuilder;
@@ -30,7 +30,6 @@ import
org.apache.flink.api.connector.source.SplitEnumeratorContext;
import javax.annotation.Nullable;
import java.util.Collection;
-import java.util.LinkedList;
import java.util.List;
import static org.apache.paimon.flink.FlinkConnectorOptions.SplitAssignMode;
@@ -82,12 +81,12 @@ public class StaticFileStoreSource extends FlinkSource {
Collection<FileStoreSourceSplit> splits) {
switch (splitAssignMode) {
case FAIR:
- return new FairSplitAssigner(splitBatchSize, context, splits);
+ return new PreAssignSplitAssigner(splitBatchSize, context,
splits);
case PREEMPTIVE:
- return new PreemptiveSplitAssigner(new LinkedList<>(splits));
+ return new FIFOSplitAssigner(splits);
default:
throw new UnsupportedOperationException(
- "Unsupported assign mode " +
splitAssignMode.toString());
+ "Unsupported assign mode " + splitAssignMode);
}
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java
index 7415acf23..4834aecc2 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java
@@ -72,7 +72,7 @@ public class StaticFileStoreSplitEnumerator
@Override
public void addSplitsBack(List<FileStoreSourceSplit> backSplits, int
subtaskId) {
- splitAssigner.addSplits(subtaskId, backSplits);
+ splitAssigner.addSplitsBack(subtaskId, backSplits);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreemptiveSplitAssigner.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/FIFOSplitAssigner.java
similarity index 79%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreemptiveSplitAssigner.java
rename to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/FIFOSplitAssigner.java
index b39900ef3..87289b34d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreemptiveSplitAssigner.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/FIFOSplitAssigner.java
@@ -28,17 +28,17 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
-import java.util.Queue;
/**
* Splits are assigned preemptively in the order requested by the task. Only
one split is assigned
- * to the task at a time.
+ * to the task at a time. This is unaware of subtask, it does not care about
the relationship
+ * between splits and subtask, so only one queue is created and splits are
fetched in order.
*/
-public class PreemptiveSplitAssigner implements SplitAssigner {
+public class FIFOSplitAssigner implements SplitAssigner {
private final LinkedList<FileStoreSourceSplit> pendingSplitAssignment;
- public PreemptiveSplitAssigner(Queue<FileStoreSourceSplit> splits) {
+ public FIFOSplitAssigner(Collection<FileStoreSourceSplit> splits) {
this.pendingSplitAssignment = new LinkedList<>(splits);
}
@@ -49,7 +49,12 @@ public class PreemptiveSplitAssigner implements
SplitAssigner {
}
@Override
- public void addSplits(int subtask, List<FileStoreSourceSplit> splits) {
+ public void addSplit(int subtask, FileStoreSourceSplit split) {
+ pendingSplitAssignment.add(split);
+ }
+
+ @Override
+ public void addSplitsBack(int subtask, List<FileStoreSourceSplit> splits) {
ListIterator<FileStoreSourceSplit> iterator =
splits.listIterator(splits.size());
while (iterator.hasPrevious()) {
pendingSplitAssignment.addFirst(iterator.previous());
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/FairSplitAssigner.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java
similarity index 84%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/FairSplitAssigner.java
rename to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java
index c8ff92356..3f45133c3 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/FairSplitAssigner.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java
@@ -38,19 +38,20 @@ import java.util.Queue;
* Pre-calculate which splits each task should process according to the
weight, and then distribute
* the splits fairly.
*/
-public class FairSplitAssigner implements SplitAssigner {
+public class PreAssignSplitAssigner implements SplitAssigner {
/** Default batch splits size to avoid exceed `akka.framesize`. */
private final int splitBatchSize;
private final Map<Integer, LinkedList<FileStoreSourceSplit>>
pendingSplitAssignment;
- public FairSplitAssigner(
+ public PreAssignSplitAssigner(
int splitBatchSize,
SplitEnumeratorContext<FileStoreSourceSplit> context,
Collection<FileStoreSourceSplit> splits) {
this.splitBatchSize = splitBatchSize;
- this.pendingSplitAssignment = createSplitAssignment(splits,
context.currentParallelism());
+ this.pendingSplitAssignment =
+ createBatchFairSplitAssignment(splits,
context.currentParallelism());
}
@Override
@@ -67,7 +68,12 @@ public class FairSplitAssigner implements SplitAssigner {
}
@Override
- public void addSplits(int subtask, List<FileStoreSourceSplit> splits) {
+ public void addSplit(int subtask, FileStoreSourceSplit split) {
+ pendingSplitAssignment.computeIfAbsent(subtask, k -> new
LinkedList<>()).add(split);
+ }
+
+ @Override
+ public void addSplitsBack(int subtask, List<FileStoreSourceSplit> splits) {
LinkedList<FileStoreSourceSplit> remainingSplits =
pendingSplitAssignment.computeIfAbsent(subtask, k -> new
LinkedList<>());
ListIterator<FileStoreSourceSplit> iterator =
splits.listIterator(splits.size());
@@ -83,7 +89,11 @@ public class FairSplitAssigner implements SplitAssigner {
return splits;
}
- private static Map<Integer, LinkedList<FileStoreSourceSplit>>
createSplitAssignment(
+ /**
+ * this method only reload restore for batch execute, because in streaming
mode, we need to
+ * assign certain bucket to certain task.
+ */
+ private static Map<Integer, LinkedList<FileStoreSourceSplit>>
createBatchFairSplitAssignment(
Collection<FileStoreSourceSplit> splits, int numReaders) {
List<List<FileStoreSourceSplit>> assignmentList =
BinPacking.packForFixedBinNumber(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/SplitAssigner.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/SplitAssigner.java
index 1304d0938..ea0f99804 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/SplitAssigner.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/SplitAssigner.java
@@ -39,11 +39,14 @@ public interface SplitAssigner {
*/
List<FileStoreSourceSplit> getNext(int subtask, @Nullable String hostname);
+ /** Add one split of a specified subtask to the assigner. */
+ void addSplit(int subtask, FileStoreSourceSplit splits);
+
/**
* Adds a set of splits to this assigner. This happens for example when
some split processing
* failed and the splits need to be re-added, or when new splits got
discovered.
*/
- void addSplits(int subtask, List<FileStoreSourceSplit> splits);
+ void addSplitsBack(int subtask, List<FileStoreSourceSplit> splits);
/** Gets the remaining splits that this assigner has pending. */
Collection<FileStoreSourceSplit> remainingSplits();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index af7101161..a885e9596 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.source;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.DataFilePlan;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
@@ -116,7 +117,6 @@ public class ContinuousFileSplitEnumeratorTest {
.setSplitEnumeratorContext(context)
.setInitialSplits(initialSplits)
.setDiscoveryInterval(3)
- .setSplitBatchSize(10)
.build();
// The first time split is allocated, split1 and split2 should be
allocated
@@ -125,19 +125,19 @@ public class ContinuousFileSplitEnumeratorTest {
context.getSplitAssignments();
// Only subtask-0 is allocated.
assertThat(assignments).containsOnlyKeys(0);
- assertThat(assignments.get(0).getAssignedSplits()).hasSize(10);
+ assertThat(assignments.get(0).getAssignedSplits()).hasSize(1);
// test second batch assign
enumerator.handleSplitRequest(0, "test-host");
assertThat(assignments).containsOnlyKeys(0);
- assertThat(assignments.get(0).getAssignedSplits()).hasSize(18);
+ assertThat(assignments.get(0).getAssignedSplits()).hasSize(2);
// test third batch assign
enumerator.handleSplitRequest(0, "test-host");
assertThat(assignments).containsOnlyKeys(0);
- assertThat(assignments.get(0).getAssignedSplits()).hasSize(18);
+ assertThat(assignments.get(0).getAssignedSplits()).hasSize(3);
}
@Test
@@ -163,6 +163,7 @@ public class ContinuousFileSplitEnumeratorTest {
// each time a split is allocated from bucket-0 and bucket-1
enumerator.handleSplitRequest(0, "test-host");
+ enumerator.handleSplitRequest(0, "test-host");
Map<Integer, SplitAssignmentState<FileStoreSourceSplit>> assignments =
context.getSplitAssignments();
// Only subtask-0 is allocated.
@@ -176,6 +177,7 @@ public class ContinuousFileSplitEnumeratorTest {
// continuing to allocate the rest splits
enumerator.handleSplitRequest(0, "test-host");
+ enumerator.handleSplitRequest(0, "test-host");
assignments = context.getSplitAssignments();
// Only subtask-0 is allocated.
assertThat(assignments).containsOnlyKeys(0);
@@ -214,6 +216,13 @@ public class ContinuousFileSplitEnumeratorTest {
Map<Integer, SplitAssignmentState<FileStoreSourceSplit>> assignments =
context.getSplitAssignments();
assertThat(assignments).containsOnlyKeys(0);
+ assertThat(toDataSplits(assignments.get(0).getAssignedSplits()))
+ .containsExactly(splits.get(0));
+
+ // assign to task 0
+ enumerator.handleSplitRequest(0, "test-host");
+ assignments = context.getSplitAssignments();
+ assertThat(assignments).containsOnlyKeys(0);
assertThat(toDataSplits(assignments.get(0).getAssignedSplits()))
.containsExactly(splits.get(0), splits.get(2));
@@ -225,6 +234,13 @@ public class ContinuousFileSplitEnumeratorTest {
assertThat(assignments.get(0).hasReceivedNoMoreSplitsSignal()).isTrue();
assignments.clear();
+ // assign to task 1
+ enumerator.handleSplitRequest(1, "test-host");
+ assignments = context.getSplitAssignments();
+ assertThat(assignments).containsOnlyKeys(1);
+ assertThat(toDataSplits(assignments.get(1).getAssignedSplits()))
+ .containsExactly(splits.get(1));
+
// assign to task 1
enumerator.handleSplitRequest(1, "test-host");
assignments = context.getSplitAssignments();
@@ -239,6 +255,212 @@ public class ContinuousFileSplitEnumeratorTest {
assertThat(assignments.get(1).hasReceivedNoMoreSplitsSignal()).isTrue();
}
+ @Test
+ public void testUnawareBucketEnumeratorWithBucket() {
+ final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
+ new TestingSplitEnumeratorContext<>(3);
+ context.registerReader(0, "test-host");
+
+ Queue<TableScan.Plan> results = new LinkedBlockingQueue<>();
+ StreamTableScan scan = new MockScan(results);
+ ContinuousFileSplitEnumerator enumerator =
+ new Builder()
+ .setSplitEnumeratorContext(context)
+ .setInitialSplits(Collections.emptyList())
+ .setDiscoveryInterval(1)
+ .setScan(scan)
+ .withBucketMode(BucketMode.UNAWARE)
+ .build();
+ enumerator.start();
+
+ long snapshot = 0;
+ List<DataSplit> splits = new ArrayList<>();
+ splits.add(createDataSplit(snapshot, 1, Collections.emptyList()));
+ results.add(new DataFilePlan(splits));
+ context.triggerAllActions();
+
+ // assign to task 0
+ enumerator.handleSplitRequest(0, "test-host");
+ Map<Integer, SplitAssignmentState<FileStoreSourceSplit>> assignments =
+ context.getSplitAssignments();
+ assertThat(assignments).containsOnlyKeys(0);
+
assertThat(toDataSplits(assignments.get(0).getAssignedSplits()).size()).isEqualTo(1);
+
+ splits.clear();
+ splits.add(createDataSplit(snapshot, 2, Collections.emptyList()));
+ results.add(new DataFilePlan(splits));
+ context.triggerAllActions();
+
+ // assign to task 0
+ enumerator.handleSplitRequest(0, "test-host");
+ assignments = context.getSplitAssignments();
+ assertThat(assignments).containsOnlyKeys(0);
+
assertThat(toDataSplits(assignments.get(0).getAssignedSplits()).size()).isEqualTo(2);
+ }
+
+ @Test
+ public void testUnawareBucketEnumeratorLot() {
+ final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
+ new TestingSplitEnumeratorContext<>(4);
+ context.registerReader(0, "test-host");
+ context.registerReader(1, "test-host");
+ context.registerReader(2, "test-host");
+ context.registerReader(3, "test-host");
+
+ Queue<TableScan.Plan> results = new LinkedBlockingQueue<>();
+ StreamTableScan scan = new MockScan(results);
+ ContinuousFileSplitEnumerator enumerator =
+ new Builder()
+ .setSplitEnumeratorContext(context)
+ .setInitialSplits(Collections.emptyList())
+ .setDiscoveryInterval(1)
+ .setScan(scan)
+ .withBucketMode(BucketMode.UNAWARE)
+ .build();
+ enumerator.start();
+
+ long snapshot = 0;
+ List<DataSplit> splits = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ splits.add(createDataSplit(snapshot, 0, Collections.emptyList()));
+ }
+ results.add(new DataFilePlan(splits));
+ context.triggerAllActions();
+
+ // assign to task 0
+ enumerator.handleSplitRequest(0, "test-host");
+ Map<Integer, SplitAssignmentState<FileStoreSourceSplit>> assignments =
+ context.getSplitAssignments();
+ assertThat(assignments).containsOnlyKeys(0);
+
assertThat(toDataSplits(assignments.get(0).getAssignedSplits()).size()).isEqualTo(1);
+
+ // assign to task 1
+ enumerator.handleSplitRequest(1, "test-host");
+ assignments = context.getSplitAssignments();
+ assertThat(assignments).containsOnlyKeys(0, 1);
+
assertThat(toDataSplits(assignments.get(1).getAssignedSplits()).size()).isEqualTo(1);
+
+ // assign to task 2
+ enumerator.handleSplitRequest(2, "test-host");
+ assignments = context.getSplitAssignments();
+ assertThat(assignments).containsOnlyKeys(0, 1, 2);
+
assertThat(toDataSplits(assignments.get(2).getAssignedSplits()).size()).isEqualTo(1);
+
+ for (int i = 0; i < 97; i++) {
+ enumerator.handleSplitRequest(3, "test-host");
+ assignments = context.getSplitAssignments();
+ assertThat(assignments).containsOnlyKeys(0, 1, 2, 3);
+
assertThat(toDataSplits(assignments.get(3).getAssignedSplits()).size())
+ .isEqualTo(i + 1);
+ }
+
+ enumerator.handleSplitRequest(3, "test-host");
+ context.triggerAllActions();
+ assignments = context.getSplitAssignments();
+ assertThat(assignments).containsOnlyKeys(0, 1, 2, 3);
+
assertThat(assignments.get(3).hasReceivedNoMoreSplitsSignal()).isTrue();
+ }
+
+ @Test
+ public void testUnawareBucketEnumeratorAssignLater() {
+ final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
+ new TestingSplitEnumeratorContext<>(4);
+ context.registerReader(0, "test-host");
+ context.registerReader(1, "test-host");
+ context.registerReader(2, "test-host");
+ context.registerReader(3, "test-host");
+
+ Queue<TableScan.Plan> results = new LinkedBlockingQueue<>();
+ StreamTableScan scan = new MockScan(results);
+ ContinuousFileSplitEnumerator enumerator =
+ new Builder()
+ .setSplitEnumeratorContext(context)
+ .setInitialSplits(Collections.emptyList())
+ .setDiscoveryInterval(1)
+ .setScan(scan)
+ .withBucketMode(BucketMode.UNAWARE)
+ .build();
+ enumerator.start();
+
+ // assign to task 0, but no assigned. add to wait list
+ enumerator.handleSplitRequest(0, "test-host");
+ Map<Integer, SplitAssignmentState<FileStoreSourceSplit>> assignments =
+ context.getSplitAssignments();
+ assertThat(assignments.size()).isEqualTo(0);
+
+ // assign to task 1, but no assigned. add to wait list
+ enumerator.handleSplitRequest(1, "test-host");
+ assignments = context.getSplitAssignments();
+ assertThat(assignments.size()).isEqualTo(0);
+
+ long snapshot = 0;
+ List<DataSplit> splits = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ splits.add(createDataSplit(snapshot, 0, Collections.emptyList()));
+ }
+ results.add(new DataFilePlan(splits));
+ // trigger assign task 0 and task 1 will get their assignment
+ context.triggerAllActions();
+
+ assignments = context.getSplitAssignments();
+ assertThat(assignments).containsOnlyKeys(0, 1);
+ assertThat(assignments.get(0).getAssignedSplits().size()).isEqualTo(1);
+ assertThat(assignments.get(1).getAssignedSplits().size()).isEqualTo(1);
+
+ // assign to task 2
+ enumerator.handleSplitRequest(2, "test-host");
+ assignments = context.getSplitAssignments();
+ assertThat(assignments).containsOnlyKeys(0, 1, 2);
+
assertThat(toDataSplits(assignments.get(2).getAssignedSplits()).size()).isEqualTo(1);
+
+ // assign to task 3
+ enumerator.handleSplitRequest(3, "test-host");
+ assignments = context.getSplitAssignments();
+ assertThat(assignments).containsOnlyKeys(0, 1, 2, 3);
+
assertThat(toDataSplits(assignments.get(3).getAssignedSplits()).size()).isEqualTo(1);
+ }
+
+ @Test
+ public void testEnumeratorDeregisteredByContext() {
+ final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
+ new TestingSplitEnumeratorContext<>(2);
+ context.registerReader(0, "test-host");
+ context.registerReader(1, "test-host");
+
+ Queue<TableScan.Plan> results = new LinkedBlockingQueue<>();
+ StreamTableScan scan = new MockScan(results);
+ ContinuousFileSplitEnumerator enumerator =
+ new Builder()
+ .setSplitEnumeratorContext(context)
+ .setInitialSplits(Collections.emptyList())
+ .setDiscoveryInterval(1)
+ .setScan(scan)
+ .withBucketMode(BucketMode.UNAWARE)
+ .build();
+ enumerator.start();
+
+ long snapshot = 0;
+ List<DataSplit> splits = new ArrayList<>();
+ for (int i = 0; i < 4; i++) {
+ splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
+ }
+ results.add(new DataFilePlan(splits));
+ context.triggerAllActions();
+
+ // assign to task 0
+ context.registeredReaders().remove(0);
+ enumerator.handleSplitRequest(0, "test-host");
+ Map<Integer, SplitAssignmentState<FileStoreSourceSplit>> assignments =
+ context.getSplitAssignments();
+ assertThat(assignments.size()).isEqualTo(0);
+
+ // assign to task 1
+ enumerator.handleSplitRequest(1, "test-host");
+ assignments = context.getSplitAssignments();
+ assertThat(assignments).containsOnlyKeys(1);
+
assertThat(toDataSplits(assignments.get(1).getAssignedSplits()).size()).isEqualTo(1);
+ }
+
private static List<DataSplit> toDataSplits(List<FileStoreSourceSplit>
splits) {
return splits.stream()
.map(FileStoreSourceSplit::split)
@@ -264,8 +486,8 @@ public class ContinuousFileSplitEnumeratorTest {
private Collection<FileStoreSourceSplit> initialSplits =
Collections.emptyList();
private long discoveryInterval = Long.MAX_VALUE;
- private int splitBatchSize = 10;
private StreamTableScan scan;
+ private BucketMode bucketMode = BucketMode.FIXED;
public Builder setSplitEnumeratorContext(
SplitEnumeratorContext<FileStoreSourceSplit> context) {
@@ -283,19 +505,19 @@ public class ContinuousFileSplitEnumeratorTest {
return this;
}
- public Builder setSplitBatchSize(int splitBatchSize) {
- this.splitBatchSize = splitBatchSize;
+ public Builder setScan(StreamTableScan scan) {
+ this.scan = scan;
return this;
}
- public Builder setScan(StreamTableScan scan) {
- this.scan = scan;
+ public Builder withBucketMode(BucketMode bucketMode) {
+ this.bucketMode = bucketMode;
return this;
}
public ContinuousFileSplitEnumerator build() {
return new ContinuousFileSplitEnumerator(
- context, initialSplits, null, discoveryInterval,
splitBatchSize, scan);
+ context, initialSplits, null, discoveryInterval, scan,
bucketMode);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
index ee7ff89af..87b082011 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
@@ -25,6 +25,7 @@ import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.stats.StatsTestUtils;
import org.apache.paimon.table.source.DataFilePlan;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.table.source.snapshot.SnapshotReaderImpl;
import org.junit.jupiter.api.Test;
@@ -83,7 +84,19 @@ public class FileStoreSourceSplitGeneratorTest {
1L,
false,
false,
- Collections::singletonList,
+ new SplitGenerator() {
+ @Override
+ public List<List<DataFileMeta>> splitForBatch(
+ List<DataFileMeta> files) {
+ return Collections.singletonList(files);
+ }
+
+ @Override
+ public List<List<DataFileMeta>> splitForStreaming(
+ List<DataFileMeta> files) {
+ return null;
+ }
+ },
FileStoreScan.Plan.groupByPartFiles(plan.files(FileKind.ADD)));
DataFilePlan tableScanPlan = new DataFilePlan(scanSplits);