This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9a2efa51c [Bugfix][CDC Base] Solving the 
ConcurrentModificationException caused by snapshotState being modified 
concurrently. (#4877)
9a2efa51c is described below

commit 9a2efa51c7180b369ecfea17efaae813d9d0bfc5
Author: ic4y <[email protected]>
AuthorDate: Mon Jun 5 16:49:50 2023 +0800

    [Bugfix][CDC Base] Solving the ConcurrentModificationException caused by 
snapshotState being modified concurrently. (#4877)
---
 .../cdc/base/source/enumerator/IncrementalSourceEnumerator.java  | 9 +++++++--
 .../seatunnel/cdc/sqlserver/source/source/offset/LsnOffset.java  | 4 ++--
 .../seatunnel/engine/server/task/SourceSplitEnumeratorTask.java  | 1 +
 3 files changed, 10 insertions(+), 4 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
index fa0dddddb..86f7ac42d 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
@@ -109,7 +109,9 @@ public class IncrementalSourceEnumerator
                     (CompletedSnapshotSplitsReportEvent) sourceEvent;
             List<SnapshotSplitWatermark> completedSplitWatermarks =
                     reportEvent.getCompletedSnapshotSplitWatermarks();
-            splitAssigner.onCompletedSplits(completedSplitWatermarks);
+            synchronized (context) {
+                splitAssigner.onCompletedSplits(completedSplitWatermarks);
+            }
 
             // send acknowledge event
             CompletedSnapshotSplitsAckEvent ackEvent =
@@ -153,7 +155,10 @@ public class IncrementalSourceEnumerator
                 continue;
             }
 
-            Optional<SourceSplitBase> split = splitAssigner.getNext();
+            Optional<SourceSplitBase> split;
+            synchronized (context) {
+                split = splitAssigner.getNext();
+            }
             if (split.isPresent()) {
                 final SourceSplitBase sourceSplit = split.get();
                 context.assignSplit(nextAwaiting, sourceSplit);
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/offset/LsnOffset.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/offset/LsnOffset.java
index 61a7f5a4c..277c05457 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/offset/LsnOffset.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/offset/LsnOffset.java
@@ -61,8 +61,8 @@ public class LsnOffset extends Offset {
         return Lsn.valueOf(offset.get(SourceInfo.COMMIT_LSN_KEY));
     }
 
-    public Long getEventSerialNo() {
-        return Long.valueOf(offset.get(SourceInfo.EVENT_SERIAL_NO_KEY));
+    public Object getEventSerialNo() {
+        return offset.get(SourceInfo.EVENT_SERIAL_NO_KEY);
     }
 
     public int compareTo(Offset o) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index b39c282f3..0d177dc90 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -142,6 +142,7 @@ public class SourceSplitEnumeratorTask<SplitT extends 
SourceSplit> extends Coord
         final long barrierId = barrier.getId();
         Serializable snapshotState = null;
         byte[] serialize = null;
+        // Do not modify this lock object, as it is also used in the 
SourceSplitEnumerator.
         synchronized (enumeratorContext) {
             if (barrier.snapshot()) {
                 snapshotState = enumerator.snapshotState(barrierId);

Reply via email to