This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new cd2bd6a40 [Hotfix][Connector-V2] Fix ConcurrentModificationException 
when snapshotState based on SourceReaderBase (#4011)
cd2bd6a40 is described below

commit cd2bd6a40883d2e555f0ef3b5be2bb6dd495581a
Author: hailin0 <[email protected]>
AuthorDate: Wed Feb 1 10:08:42 2023 +0800

    [Hotfix][Connector-V2] Fix ConcurrentModificationException when 
snapshotState based on SourceReaderBase (#4011)
---
 .../connectors/seatunnel/common/source/reader/SourceReaderBase.java | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
index 23dff321f..3f856e465 100644
--- 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
@@ -36,6 +36,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 /**
  * An abstract implementation of {@link SourceReader} which provides some 
synchronization between
@@ -55,7 +57,7 @@ import java.util.concurrent.BlockingQueue;
 public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, 
SplitStateT>
     implements SourceReader<T, SplitT> {
     private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
-    private final Map<String, SplitContext<T, SplitStateT>> splitStates;
+    private final ConcurrentMap<String, SplitContext<T, SplitStateT>> 
splitStates;
     protected final RecordEmitter<E, T, SplitStateT> recordEmitter;
     protected final SplitFetcherManager<E, SplitT> splitFetcherManager;
     protected final SourceReaderOptions options;
@@ -74,7 +76,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends 
SourceSplit, SplitSt
         this.elementsQueue = elementsQueue;
         this.splitFetcherManager = splitFetcherManager;
         this.recordEmitter = recordEmitter;
-        this.splitStates = new HashMap<>();
+        this.splitStates = new ConcurrentHashMap<>();
         this.options = options;
         this.context = context;
     }

Reply via email to