This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1548704d5 [flink][bug] FileStoreSourceReader in Flink connector should 
only send split request after all given splits are consumed (#934)
1548704d5 is described below

commit 1548704d57b7181b4d65c6bf579ba40febf3f980
Author: tsreaper <[email protected]>
AuthorDate: Tue Apr 18 16:41:31 2023 +0800

    [flink][bug] FileStoreSourceReader in Flink connector should only send 
split request after all given splits are consumed (#934)
---
 .../paimon/flink/source/FileStoreSourceReader.java |  7 +++++-
 .../flink/source/FileStoreSourceReaderTest.java    | 25 +++++++++++++++++++---
 2 files changed, 28 insertions(+), 4 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
index a001bbf5a..97f741910 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
@@ -69,7 +69,12 @@ public final class FileStoreSourceReader<T>
 
     @Override
     protected void onSplitFinished(Map<String, FileStoreSourceSplitState> 
finishedSplitIds) {
-        context.sendSplitRequest();
+        // this method is called each time when we consume one split
+        // it is possible that one response from the coordinator contains 
multiple splits
+        // we should only require for more splits after we've consumed all 
given splits
+        if (getNumberOfCurrentlyAssignedSplits() == 0) {
+            context.sendSplitRequest();
+        }
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
index 883cb09b6..dc7e2c04b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
@@ -28,6 +28,8 @@ import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
 
 import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
+import org.apache.flink.table.data.RowData;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -78,13 +80,30 @@ public class FileStoreSourceReaderTest {
         final TestingReaderContext context = new TestingReaderContext();
         final FileStoreSourceReader<?> reader = createReader(context);
 
-        reader.addSplits(Collections.singletonList(createTestFileSplit()));
+        
reader.addSplits(Collections.singletonList(createTestFileSplit("id1")));
         reader.start();
         reader.close();
 
         assertThat(context.getNumSplitRequests()).isEqualTo(0);
     }
 
+    @Test
+    public void testAddMultipleSplits() throws Exception {
+        final TestingReaderContext context = new TestingReaderContext();
+        final FileStoreSourceReader<?> reader = createReader(context);
+
+        reader.start();
+        assertThat(context.getNumSplitRequests()).isEqualTo(1);
+
+        reader.addSplits(Arrays.asList(createTestFileSplit("id1"), 
createTestFileSplit("id2")));
+        TestingReaderOutput<RowData> output = new TestingReaderOutput<>();
+        while (reader.getNumberOfCurrentlyAssignedSplits() > 0) {
+            reader.pollNext(output);
+            Thread.sleep(10);
+        }
+        assertThat(context.getNumSplitRequests()).isEqualTo(2);
+    }
+
     private FileStoreSourceReader<?> createReader(TestingReaderContext 
context) {
         return new FileStoreSourceReader<>(
                 RecordsFunction.forIterate(),
@@ -93,7 +112,7 @@ public class FileStoreSourceReaderTest {
                 null);
     }
 
-    private static FileStoreSourceSplit createTestFileSplit() {
-        return newSourceSplit("id1", row(1), 0, Collections.emptyList());
+    private static FileStoreSourceSplit createTestFileSplit(String id) {
+        return newSourceSplit(id, row(1), 0, Collections.emptyList());
     }
 }

Reply via email to