This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 37567ebb7e [Bugfix][CDC base] Fix CDC job cannot consume incremental 
data After restore run (#625) (#6094)
37567ebb7e is described below

commit 37567ebb7e70c33f81346c4e0b0b403001f177e3
Author: ic4y <[email protected]>
AuthorDate: Fri Dec 29 10:54:17 2023 +0800

    [Bugfix][CDC base] Fix CDC job cannot consume incremental data After 
restore run (#625) (#6094)
---
 .../source/enumerator/SnapshotSplitAssigner.java    | 11 +++++++++++
 .../base/source/reader/IncrementalSourceReader.java | 21 +++++++++++++++++++++
 2 files changed, 32 insertions(+)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
index bcf286bdcc..443343947c 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
@@ -43,6 +43,7 @@ import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Collectors;
 
 /** Assigner for snapshot split. */
 public class SnapshotSplitAssigner<C extends SourceConfig> implements 
SplitAssigner {
@@ -129,6 +130,16 @@ public class SnapshotSplitAssigner<C extends SourceConfig> 
implements SplitAssig
         this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
         this.isTableIdCaseSensitive = isTableIdCaseSensitive;
         this.dialect = dialect;
+
+        LOG.info("SnapshotSplitAssigner created with remaining tables: {}", 
this.remainingTables);
+        LOG.info(
+                "SnapshotSplitAssigner created with remaining splits: [{}]",
+                this.remainingSplits.stream()
+                        .map(SnapshotSplit::splitId)
+                        .collect(Collectors.joining(",")));
+        LOG.info(
+                "SnapshotSplitAssigner created with assigned splits: {}",
+                this.assignedSplits.keySet());
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
index 9f1c5029d5..f37dd80818 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
@@ -45,6 +45,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
@@ -67,6 +68,8 @@ public class IncrementalSourceReader<T, C extends 
SourceConfig>
     private final C sourceConfig;
     private final DebeziumDeserializationSchema<T> 
debeziumDeserializationSchema;
 
+    private final AtomicBoolean needSendSplitRequest = new 
AtomicBoolean(false);
+
     public IncrementalSourceReader(
             BlockingQueue<RecordsWithSplitIds<SourceRecords>> elementsQueue,
             Supplier<IncrementalSourceSplitReader<C>> splitReaderSupplier,
@@ -95,6 +98,10 @@ public class IncrementalSourceReader<T, C extends 
SourceConfig>
             }
             running = true;
         }
+        if (needSendSplitRequest.get()) {
+            context.sendSplitRequest();
+            needSendSplitRequest.compareAndSet(true, false);
+        }
         super.pollNext(output);
     }
 
@@ -105,11 +112,19 @@ public class IncrementalSourceReader<T, C extends 
SourceConfig>
     public void addSplits(List<SourceSplitBase> splits) {
         // restore for finishedUnackedSplits
         List<SourceSplitBase> unfinishedSplits = new ArrayList<>();
+        log.info(
+                "subtask {} add splits: {}",
+                subtaskId,
+                
splits.stream().map(SourceSplitBase::splitId).collect(Collectors.joining(",")));
         for (SourceSplitBase split : splits) {
             if (split.isSnapshotSplit()) {
                 SnapshotSplit snapshotSplit = split.asSnapshotSplit();
                 if (snapshotSplit.isSnapshotReadFinished()) {
                     finishedUnackedSplits.put(snapshotSplit.splitId(), 
snapshotSplit);
+                    log.info(
+                            "subtask {} add finished split: {}",
+                            subtaskId,
+                            snapshotSplit.splitId());
                 } else {
                     unfinishedSplits.add(split);
                 }
@@ -122,6 +137,12 @@ public class IncrementalSourceReader<T, C extends 
SourceConfig>
         // add all un-finished splits (including incremental split) to 
SourceReaderBase
         if (!unfinishedSplits.isEmpty()) {
             super.addSplits(unfinishedSplits);
+        } else {
+            // If the split received is 'isSnapshotReadFinished', we will not 
run this split, hence
+            // we need to send the split request.
+            // We cannot directly execute context.sendSplitRequest() here, as 
it is a synchronous
+            // call and can lead to a deadlock.
+            needSendSplitRequest.set(true);
         }
     }
 

Reply via email to