This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 0e4190ab2e [Improve][CDC] support exactly-once of cdc and fix the
BinlogOffset comparing bug (#5057)
0e4190ab2e is described below
commit 0e4190ab2efddf2d3a1b498e22614f7828eec0fe
Author: happyboy1024 <[email protected]>
AuthorDate: Thu Jul 27 11:58:15 2023 +0800
[Improve][CDC] support exactly-once of cdc and fix the BinlogOffset
comparing bug (#5057)
* [Improve][CDC] support exactly-once of cdc, fix the BinlogOffset
comparing bug
* [Improve][CDC] adjust code style
* [Improve][CDC] fix ci error
---------
Co-authored-by: happyboy1024 <[email protected]>
---
.../external/IncrementalSourceScanFetcher.java | 9 +--
.../external/IncrementalSourceStreamFetcher.java | 77 +++++++++++++++++++++-
.../cdc/mysql/source/offset/BinlogOffset.java | 8 ++-
3 files changed, 84 insertions(+), 10 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
index 7a09ac6bc4..97c0c523e6 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
@@ -223,14 +223,11 @@ public class IncrementalSourceScanFetcher implements
Fetcher<SourceRecords, Sour
private boolean isChangeRecordInChunkRange(SourceRecord record) {
if (taskContext.isDataChangeRecord(record)) {
+ // fix the between condition
return taskContext.isRecordBetween(
record,
- null == currentSnapshotSplit.getSplitStart()
- ? null
- : new Object[]
{currentSnapshotSplit.getSplitStart()},
- null == currentSnapshotSplit.getSplitEnd()
- ? null
- : new Object[]
{currentSnapshotSplit.getSplitEnd()});
+ currentSnapshotSplit.getSplitStart(),
+ currentSnapshotSplit.getSplitEnd());
}
return false;
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
index 5257064dc1..2b8e9f7725 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.connectors.cdc.base.source.reader.external;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
+import
org.apache.seatunnel.connectors.cdc.base.source.split.CompletedSnapshotSplitInfo;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
@@ -32,8 +33,12 @@ import io.debezium.relational.TableId;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
@@ -49,6 +54,8 @@ import static
org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.g
public class IncrementalSourceStreamFetcher implements Fetcher<SourceRecords,
SourceSplitBase> {
private final FetchTask.Context taskContext;
private final ExecutorService executorService;
+ // has entered pure binlog mode
+ private final Set<TableId> pureBinlogPhaseTables;
private volatile ChangeEventQueue<DataChangeEvent> queue;
private volatile Throwable readException;
@@ -58,6 +65,11 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
private Offset splitStartWatermark;
+ // maximum watermark for each table
+ private Map<TableId, Offset> maxSplitHighWatermarkMap;
+ // finished spilt info
+ private Map<TableId, List<CompletedSnapshotSplitInfo>> finishedSplitsInfo;
+
private static final long READER_CLOSE_TIMEOUT_SECONDS = 30L;
public IncrementalSourceStreamFetcher(FetchTask.Context taskContext, int
subTaskId) {
@@ -65,6 +77,7 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
ThreadFactory threadFactory =
new ThreadFactoryBuilder().setNameFormat("debezium-reader-" +
subTaskId).build();
this.executorService =
Executors.newSingleThreadExecutor(threadFactory);
+ this.pureBinlogPhaseTables = new HashSet<>();
}
@Override
@@ -157,14 +170,72 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
tableId);
return position.isAfter(splitStartWatermark);
}
- // TODO only the table who captured snapshot splits need to
filter( Used to support
- // Exactly-Once )
- return position.isAfter(splitStartWatermark);
+ // check whether the pure binlog mode has been entered
+ if (hasEnterPureBinlogPhase(tableId, position)) {
+ return true;
+ }
+ // not enter pure binlog mode and need to check whether the
current record meets the
+ // emitting conditions.
+ if (finishedSplitsInfo.containsKey(tableId)) {
+ for (CompletedSnapshotSplitInfo splitInfo :
finishedSplitsInfo.get(tableId)) {
+ if (taskContext.isRecordBetween(
+ sourceRecord,
+ splitInfo.getSplitStart(),
+ splitInfo.getSplitEnd())
+ &&
position.isAfter(splitInfo.getWatermark().getHighWatermark())) {
+ return true;
+ }
+ }
+ }
+ return false;
}
return true;
}
+ private boolean hasEnterPureBinlogPhase(TableId tableId, Offset position) {
+ // only the table who captured snapshot splits need to filter
+ if (pureBinlogPhaseTables.contains(tableId)) {
+ return true;
+ }
+ // the existed tables those have finished snapshot reading
+ if (maxSplitHighWatermarkMap.containsKey(tableId)
+ &&
position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) {
+ pureBinlogPhaseTables.add(tableId);
+ return true;
+ }
+ return false;
+ }
+
private void configureFilter() {
splitStartWatermark = currentIncrementalSplit.getStartupOffset();
+ Map<TableId, List<CompletedSnapshotSplitInfo>> splitsInfoMap = new
HashMap<>();
+ Map<TableId, Offset> tableIdBinlogPositionMap = new HashMap<>();
+ List<CompletedSnapshotSplitInfo> completedSnapshotSplitInfos =
+ currentIncrementalSplit.getCompletedSnapshotSplitInfos();
+
+ // latest-offset mode
+ if (completedSnapshotSplitInfos.isEmpty()) {
+ for (TableId tableId : currentIncrementalSplit.getTableIds()) {
+ tableIdBinlogPositionMap.put(tableId,
currentIncrementalSplit.getStartupOffset());
+ }
+ }
+
+ // calculate the max high watermark of every table
+ for (CompletedSnapshotSplitInfo finishedSplitInfo :
completedSnapshotSplitInfos) {
+ TableId tableId = finishedSplitInfo.getTableId();
+ List<CompletedSnapshotSplitInfo> list =
+ splitsInfoMap.getOrDefault(tableId, new ArrayList<>());
+ list.add(finishedSplitInfo);
+ splitsInfoMap.put(tableId, list);
+
+ Offset highWatermark =
finishedSplitInfo.getWatermark().getHighWatermark();
+ Offset maxHighWatermark = tableIdBinlogPositionMap.get(tableId);
+ if (maxHighWatermark == null ||
highWatermark.isAfter(maxHighWatermark)) {
+ tableIdBinlogPositionMap.put(tableId, highWatermark);
+ }
+ }
+ this.finishedSplitsInfo = splitsInfoMap;
+ this.maxSplitHighWatermarkMap = tableIdBinlogPositionMap;
+ this.pureBinlogPhaseTables.clear();
}
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/offset/BinlogOffset.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/offset/BinlogOffset.java
index 195b1a5a7c..0d91c02fee 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/offset/BinlogOffset.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/offset/BinlogOffset.java
@@ -176,7 +176,13 @@ public class BinlogOffset extends Offset {
// compared ...
long timestamp = this.getTimestamp();
long targetTimestamp = that.getTimestamp();
- return Long.compare(timestamp, targetTimestamp);
+ // Timestamps are presupposes that they exist,
+ // because timestamps do not exist for low watermark and high
watermark.
+ // If not judging here results in the really binlog offset
comparison to watermark
+ // always being true.
+ if (timestamp != 0 && targetTimestamp != 0) {
+ return Long.compare(timestamp, targetTimestamp);
+ }
}
// First compare the MySQL binlog filenames