This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 11d7c95fc [fix] fix bug of ConcurrentModificationException while 
readers removed in flink context (#1509)
11d7c95fc is described below

commit 11d7c95fcb61949c0687847e3e879e270b8d44f0
Author: YeJunHao <[email protected]>
AuthorDate: Thu Jul 6 21:35:32 2023 +0800

    [fix] fix bug of ConcurrentModificationException while readers removed in 
flink context (#1509)
---
 .../source/ContinuousFileSplitEnumerator.java      | 25 ++++++++--------
 .../source/ContinuousFileSplitEnumeratorTest.java  | 34 ++++++++++++++++++++++
 2 files changed, 46 insertions(+), 13 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
index 6dcad9d9e..14c5c39af 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
@@ -193,19 +193,18 @@ public class ContinuousFileSplitEnumerator
 
     private Map<Integer, List<FileStoreSourceSplit>> createAssignment() {
         Map<Integer, List<FileStoreSourceSplit>> assignment = new HashMap<>();
-        readersAwaitingSplit.forEach(
-                task -> {
-                    // if the reader that requested another split has failed 
in the meantime, remove
-                    // it from the list of waiting readers
-                    if (!context.registeredReaders().containsKey(task)) {
-                        readersAwaitingSplit.remove(task);
-                        return;
-                    }
-                    List<FileStoreSourceSplit> splits = 
splitAssigner.getNext(task, null);
-                    if (splits.size() > 0) {
-                        assignment.put(task, splits);
-                    }
-                });
+        Iterator<Integer> readersAwait = readersAwaitingSplit.iterator();
+        while (readersAwait.hasNext()) {
+            Integer task = readersAwait.next();
+            if (!context.registeredReaders().containsKey(task)) {
+                readersAwait.remove();
+                continue;
+            }
+            List<FileStoreSourceSplit> splits = splitAssigner.getNext(task, 
null);
+            if (splits.size() > 0) {
+                assignment.put(task, splits);
+            }
+        }
         return assignment;
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index a885e9596..bce90c3a9 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -45,6 +45,7 @@ import java.util.stream.Collectors;
 import static 
org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext.SplitAssignmentState;
 import static 
org.apache.paimon.mergetree.compact.MergeTreeCompactManagerTest.row;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
 
 /** Unit tests for the {@link ContinuousFileSplitEnumerator}. */
 public class ContinuousFileSplitEnumeratorTest {
@@ -461,6 +462,39 @@ public class ContinuousFileSplitEnumeratorTest {
         
assertThat(toDataSplits(assignments.get(1).getAssignedSplits()).size()).isEqualTo(1);
     }
 
+    @Test
+    public void testRemoveReadersAwaitSuccessful() {
+        final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
+                new TestingSplitEnumeratorContext<>(2);
+        context.registerReader(0, "test-host");
+        context.registerReader(1, "test-host");
+
+        Queue<TableScan.Plan> results = new LinkedBlockingQueue<>();
+        StreamTableScan scan = new MockScan(results);
+        ContinuousFileSplitEnumerator enumerator =
+                new Builder()
+                        .setSplitEnumeratorContext(context)
+                        .setInitialSplits(Collections.emptyList())
+                        .setDiscoveryInterval(1)
+                        .setScan(scan)
+                        .withBucketMode(BucketMode.UNAWARE)
+                        .build();
+        enumerator.start();
+        enumerator.handleSplitRequest(1, "test-host");
+
+        long snapshot = 0;
+        List<DataSplit> splits = new ArrayList<>();
+        for (int i = 0; i < 4; i++) {
+            splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
+        }
+        results.add(new DataFilePlan(splits));
+
+        context.registeredReaders().remove(1);
+        // assign to task 0
+        assertThatCode(() -> enumerator.handleSplitRequest(0, "test-host"))
+                .doesNotThrowAnyException();
+    }
+
     private static List<DataSplit> toDataSplits(List<FileStoreSourceSplit> 
splits) {
         return splits.stream()
                 .map(FileStoreSourceSplit::split)

Reply via email to