This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b2f70fd40b [Hotfix][CDC] Fix thread-unsafe collection container in cdc 
enumerator (#5614)
b2f70fd40b is described below

commit b2f70fd40b738c7ea60054d440f2356f496458b7
Author: hailin0 <[email protected]>
AuthorDate: Tue Oct 17 18:55:47 2023 +0800

    [Hotfix][CDC] Fix thread-unsafe collection container in cdc enumerator 
(#5614)
---
 .../source/enumerator/SnapshotSplitAssigner.java   | 29 ++++++++++++++--------
 1 file changed, 19 insertions(+), 10 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
index ce2373bc62..a1e6729bff 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
@@ -32,12 +32,17 @@ import io.debezium.relational.TableId;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 /** Assigner for snapshot split. */
 public class SnapshotSplitAssigner<C extends SourceConfig> implements 
SplitAssigner {
@@ -47,12 +52,12 @@ public class SnapshotSplitAssigner<C extends SourceConfig> 
implements SplitAssig
 
     private final C sourceConfig;
     private final List<TableId> alreadyProcessedTables;
-    private final List<SnapshotSplit> remainingSplits;
+    private final Queue<SnapshotSplit> remainingSplits;
     private final Map<String, SnapshotSplit> assignedSplits;
     private final Map<String, SnapshotSplitWatermark> splitCompletedOffsets;
     private boolean assignerCompleted;
     private final int currentParallelism;
-    private final LinkedList<TableId> remainingTables;
+    private final Deque<TableId> remainingTables;
     private final boolean isRemainingTablesCheckpointed;
 
     private ChunkSplitter chunkSplitter;
@@ -115,12 +120,12 @@ public class SnapshotSplitAssigner<C extends 
SourceConfig> implements SplitAssig
         this.context = context;
         this.sourceConfig = context.getSourceConfig();
         this.currentParallelism = currentParallelism;
-        this.alreadyProcessedTables = alreadyProcessedTables;
-        this.remainingSplits = remainingSplits;
-        this.assignedSplits = assignedSplits;
-        this.splitCompletedOffsets = splitCompletedOffsets;
+        this.alreadyProcessedTables = 
Collections.synchronizedList(alreadyProcessedTables);
+        this.remainingSplits = new ConcurrentLinkedQueue(remainingSplits);
+        this.assignedSplits = new ConcurrentHashMap<>(assignedSplits);
+        this.splitCompletedOffsets = new 
ConcurrentHashMap<>(splitCompletedOffsets);
         this.assignerCompleted = assignerCompleted;
-        this.remainingTables = new LinkedList<>(remainingTables);
+        this.remainingTables = new ConcurrentLinkedDeque<>(remainingTables);
         this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
         this.isTableIdCaseSensitive = isTableIdCaseSensitive;
         this.dialect = dialect;
@@ -211,11 +216,15 @@ public class SnapshotSplitAssigner<C extends 
SourceConfig> implements SplitAssig
         SnapshotPhaseState state =
                 new SnapshotPhaseState(
                         alreadyProcessedTables,
-                        remainingSplits,
+                        remainingSplits.isEmpty()
+                                ? Collections.emptyList()
+                                : new ArrayList<>(remainingSplits),
                         assignedSplits,
                         splitCompletedOffsets,
                         assignerCompleted,
-                        remainingTables,
+                        remainingTables.isEmpty()
+                                ? Collections.emptyList()
+                                : new ArrayList<>(remainingTables),
                         isTableIdCaseSensitive,
                         true);
         // we need a complete checkpoint before mark this assigner to be 
completed, to wait for all

Reply via email to