This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1548704d5 [flink][bug] FileStoreSourceReader in Flink connector should
only send split request after all given splits are consumed (#934)
1548704d5 is described below
commit 1548704d57b7181b4d65c6bf579ba40febf3f980
Author: tsreaper <[email protected]>
AuthorDate: Tue Apr 18 16:41:31 2023 +0800
[flink][bug] FileStoreSourceReader in Flink connector should only send
split request after all given splits are consumed (#934)
---
.../paimon/flink/source/FileStoreSourceReader.java | 7 +++++-
.../flink/source/FileStoreSourceReaderTest.java | 25 +++++++++++++++++++---
2 files changed, 28 insertions(+), 4 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
index a001bbf5a..97f741910 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
@@ -69,7 +69,12 @@ public final class FileStoreSourceReader<T>
@Override
protected void onSplitFinished(Map<String, FileStoreSourceSplitState>
finishedSplitIds) {
- context.sendSplitRequest();
+ // this method is called each time when we consume one split
+ // it is possible that one response from the coordinator contains
multiple splits
+ // we should only require for more splits after we've consumed all
given splits
+ if (getNumberOfCurrentlyAssignedSplits() == 0) {
+ context.sendSplitRequest();
+ }
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
index 883cb09b6..dc7e2c04b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
@@ -28,6 +28,8 @@ import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
+import org.apache.flink.table.data.RowData;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -78,13 +80,30 @@ public class FileStoreSourceReaderTest {
final TestingReaderContext context = new TestingReaderContext();
final FileStoreSourceReader<?> reader = createReader(context);
- reader.addSplits(Collections.singletonList(createTestFileSplit()));
+
reader.addSplits(Collections.singletonList(createTestFileSplit("id1")));
reader.start();
reader.close();
assertThat(context.getNumSplitRequests()).isEqualTo(0);
}
+ @Test
+ public void testAddMultipleSplits() throws Exception {
+ final TestingReaderContext context = new TestingReaderContext();
+ final FileStoreSourceReader<?> reader = createReader(context);
+
+ reader.start();
+ assertThat(context.getNumSplitRequests()).isEqualTo(1);
+
+ reader.addSplits(Arrays.asList(createTestFileSplit("id1"),
createTestFileSplit("id2")));
+ TestingReaderOutput<RowData> output = new TestingReaderOutput<>();
+ while (reader.getNumberOfCurrentlyAssignedSplits() > 0) {
+ reader.pollNext(output);
+ Thread.sleep(10);
+ }
+ assertThat(context.getNumSplitRequests()).isEqualTo(2);
+ }
+
private FileStoreSourceReader<?> createReader(TestingReaderContext
context) {
return new FileStoreSourceReader<>(
RecordsFunction.forIterate(),
@@ -93,7 +112,7 @@ public class FileStoreSourceReaderTest {
null);
}
- private static FileStoreSourceSplit createTestFileSplit() {
- return newSourceSplit("id1", row(1), 0, Collections.emptyList());
+ private static FileStoreSourceSplit createTestFileSplit(String id) {
+ return newSourceSplit(id, row(1), 0, Collections.emptyList());
}
}