This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 37567ebb7e [Bugfix][CDC base] Fix CDC job cannot consume incremental
data After restore run (#625) (#6094)
37567ebb7e is described below
commit 37567ebb7e70c33f81346c4e0b0b403001f177e3
Author: ic4y <[email protected]>
AuthorDate: Fri Dec 29 10:54:17 2023 +0800
[Bugfix][CDC base] Fix CDC job cannot consume incremental data After
restore run (#625) (#6094)
---
.../source/enumerator/SnapshotSplitAssigner.java | 11 +++++++++++
.../base/source/reader/IncrementalSourceReader.java | 21 +++++++++++++++++++++
2 files changed, 32 insertions(+)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
index bcf286bdcc..443343947c 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
@@ -43,6 +43,7 @@ import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Collectors;
/** Assigner for snapshot split. */
public class SnapshotSplitAssigner<C extends SourceConfig> implements
SplitAssigner {
@@ -129,6 +130,16 @@ public class SnapshotSplitAssigner<C extends SourceConfig>
implements SplitAssig
this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
this.isTableIdCaseSensitive = isTableIdCaseSensitive;
this.dialect = dialect;
+
+ LOG.info("SnapshotSplitAssigner created with remaining tables: {}",
this.remainingTables);
+ LOG.info(
+ "SnapshotSplitAssigner created with remaining splits: [{}]",
+ this.remainingSplits.stream()
+ .map(SnapshotSplit::splitId)
+ .collect(Collectors.joining(",")));
+ LOG.info(
+ "SnapshotSplitAssigner created with assigned splits: {}",
+ this.assignedSplits.keySet());
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
index 9f1c5029d5..f37dd80818 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
@@ -45,6 +45,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -67,6 +68,8 @@ public class IncrementalSourceReader<T, C extends
SourceConfig>
private final C sourceConfig;
private final DebeziumDeserializationSchema<T>
debeziumDeserializationSchema;
+ private final AtomicBoolean needSendSplitRequest = new
AtomicBoolean(false);
+
public IncrementalSourceReader(
BlockingQueue<RecordsWithSplitIds<SourceRecords>> elementsQueue,
Supplier<IncrementalSourceSplitReader<C>> splitReaderSupplier,
@@ -95,6 +98,10 @@ public class IncrementalSourceReader<T, C extends
SourceConfig>
}
running = true;
}
+ if (needSendSplitRequest.get()) {
+ context.sendSplitRequest();
+ needSendSplitRequest.compareAndSet(true, false);
+ }
super.pollNext(output);
}
@@ -105,11 +112,19 @@ public class IncrementalSourceReader<T, C extends
SourceConfig>
public void addSplits(List<SourceSplitBase> splits) {
// restore for finishedUnackedSplits
List<SourceSplitBase> unfinishedSplits = new ArrayList<>();
+ log.info(
+ "subtask {} add splits: {}",
+ subtaskId,
+
splits.stream().map(SourceSplitBase::splitId).collect(Collectors.joining(",")));
for (SourceSplitBase split : splits) {
if (split.isSnapshotSplit()) {
SnapshotSplit snapshotSplit = split.asSnapshotSplit();
if (snapshotSplit.isSnapshotReadFinished()) {
finishedUnackedSplits.put(snapshotSplit.splitId(),
snapshotSplit);
+ log.info(
+ "subtask {} add finished split: {}",
+ subtaskId,
+ snapshotSplit.splitId());
} else {
unfinishedSplits.add(split);
}
@@ -122,6 +137,12 @@ public class IncrementalSourceReader<T, C extends
SourceConfig>
// add all un-finished splits (including incremental split) to
SourceReaderBase
if (!unfinishedSplits.isEmpty()) {
super.addSplits(unfinishedSplits);
+ } else {
+ // If the split received is 'isSnapshotReadFinished', we will not
run this split, hence
+ // we need to send the split request.
+ // We cannot directly execute context.sendSplitRequest() here, as
it is a synchronous
+ // call and can lead to a deadlock.
+ needSendSplitRequest.set(true);
}
}