This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e6795136a [flink] ContinuousFileSplitEnumerator.assignSplits should be 
thread safe (#1507)
e6795136a is described below

commit e6795136ae2849573e110841f0ac359125c032b1
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Jul 6 17:33:15 2023 +0800

    [flink] ContinuousFileSplitEnumerator.assignSplits should be thread safe 
(#1507)
---
 .../paimon/flink/source/ContinuousFileSplitEnumerator.java       | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
index c7772d5fb..6dcad9d9e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
@@ -138,8 +138,7 @@ public class ContinuousFileSplitEnumerator
 
     @Override
     public PendingSplitsCheckpoint snapshotState(long checkpointId) {
-        List<FileStoreSourceSplit> splits = new ArrayList<>();
-        splits.addAll(splitAssigner.remainingSplits());
+        List<FileStoreSourceSplit> splits = new 
ArrayList<>(splitAssigner.remainingSplits());
         final PendingSplitsCheckpoint checkpoint =
                 new PendingSplitsCheckpoint(splits, nextSnapshotId);
 
@@ -172,7 +171,11 @@ public class ContinuousFileSplitEnumerator
         assignSplits();
     }
 
-    private void assignSplits() {
+    /**
+     * Method should be synchronized because {@link #handleSplitRequest} and 
{@link
+     * #processDiscoveredSplits} have thread conflicts.
+     */
+    private synchronized void assignSplits() {
         Map<Integer, List<FileStoreSourceSplit>> assignment = 
createAssignment();
         if (finished) {
             Iterator<Integer> iterator = readersAwaitingSplit.iterator();

Reply via email to