This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 11d7c95fc [fix] fix bug of ConcurrentModificationException while
readers removed in flink context (#1509)
11d7c95fc is described below
commit 11d7c95fcb61949c0687847e3e879e270b8d44f0
Author: YeJunHao <[email protected]>
AuthorDate: Thu Jul 6 21:35:32 2023 +0800
[fix] fix bug of ConcurrentModificationException while readers removed in
flink context (#1509)
---
.../source/ContinuousFileSplitEnumerator.java | 25 ++++++++--------
.../source/ContinuousFileSplitEnumeratorTest.java | 34 ++++++++++++++++++++++
2 files changed, 46 insertions(+), 13 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
index 6dcad9d9e..14c5c39af 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
@@ -193,19 +193,18 @@ public class ContinuousFileSplitEnumerator
private Map<Integer, List<FileStoreSourceSplit>> createAssignment() {
Map<Integer, List<FileStoreSourceSplit>> assignment = new HashMap<>();
- readersAwaitingSplit.forEach(
- task -> {
- // if the reader that requested another split has failed
in the meantime, remove
- // it from the list of waiting readers
- if (!context.registeredReaders().containsKey(task)) {
- readersAwaitingSplit.remove(task);
- return;
- }
- List<FileStoreSourceSplit> splits =
splitAssigner.getNext(task, null);
- if (splits.size() > 0) {
- assignment.put(task, splits);
- }
- });
+ Iterator<Integer> readersAwait = readersAwaitingSplit.iterator();
+ while (readersAwait.hasNext()) {
+ Integer task = readersAwait.next();
+ if (!context.registeredReaders().containsKey(task)) {
+ readersAwait.remove();
+ continue;
+ }
+ List<FileStoreSourceSplit> splits = splitAssigner.getNext(task,
null);
+ if (splits.size() > 0) {
+ assignment.put(task, splits);
+ }
+ }
return assignment;
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index a885e9596..bce90c3a9 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -45,6 +45,7 @@ import java.util.stream.Collectors;
import static
org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext.SplitAssignmentState;
import static
org.apache.paimon.mergetree.compact.MergeTreeCompactManagerTest.row;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
/** Unit tests for the {@link ContinuousFileSplitEnumerator}. */
public class ContinuousFileSplitEnumeratorTest {
@@ -461,6 +462,39 @@ public class ContinuousFileSplitEnumeratorTest {
assertThat(toDataSplits(assignments.get(1).getAssignedSplits()).size()).isEqualTo(1);
}
+ @Test
+ public void testRemoveReadersAwaitSuccessful() {
+ final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
+ new TestingSplitEnumeratorContext<>(2);
+ context.registerReader(0, "test-host");
+ context.registerReader(1, "test-host");
+
+ Queue<TableScan.Plan> results = new LinkedBlockingQueue<>();
+ StreamTableScan scan = new MockScan(results);
+ ContinuousFileSplitEnumerator enumerator =
+ new Builder()
+ .setSplitEnumeratorContext(context)
+ .setInitialSplits(Collections.emptyList())
+ .setDiscoveryInterval(1)
+ .setScan(scan)
+ .withBucketMode(BucketMode.UNAWARE)
+ .build();
+ enumerator.start();
+ enumerator.handleSplitRequest(1, "test-host");
+
+ long snapshot = 0;
+ List<DataSplit> splits = new ArrayList<>();
+ for (int i = 0; i < 4; i++) {
+ splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
+ }
+ results.add(new DataFilePlan(splits));
+
+ context.registeredReaders().remove(1);
+ // assign to task 0
+ assertThatCode(() -> enumerator.handleSplitRequest(0, "test-host"))
+ .doesNotThrowAnyException();
+ }
+
private static List<DataSplit> toDataSplits(List<FileStoreSourceSplit>
splits) {
return splits.stream()
.map(FileStoreSourceSplit::split)