This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 0e4190ab2e [Improve][CDC] support exactly-once of cdc and fix the 
BinlogOffset comparing bug (#5057)
0e4190ab2e is described below

commit 0e4190ab2efddf2d3a1b498e22614f7828eec0fe
Author: happyboy1024 <[email protected]>
AuthorDate: Thu Jul 27 11:58:15 2023 +0800

    [Improve][CDC] support exactly-once of cdc and fix the BinlogOffset 
comparing bug (#5057)
    
    * [Improve][CDC] support exactly-once of cdc, fix the BinlogOffset 
comparing bug
    
    * [Improve][CDC] adjust code style
    
    * [Improve][CDC] fix ci error
    
    ---------
    
    Co-authored-by: happyboy1024 <[email protected]>
---
 .../external/IncrementalSourceScanFetcher.java     |  9 +--
 .../external/IncrementalSourceStreamFetcher.java   | 77 +++++++++++++++++++++-
 .../cdc/mysql/source/offset/BinlogOffset.java      |  8 ++-
 3 files changed, 84 insertions(+), 10 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
index 7a09ac6bc4..97c0c523e6 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
@@ -223,14 +223,11 @@ public class IncrementalSourceScanFetcher implements 
Fetcher<SourceRecords, Sour
 
     private boolean isChangeRecordInChunkRange(SourceRecord record) {
         if (taskContext.isDataChangeRecord(record)) {
+            // fix the between condition
             return taskContext.isRecordBetween(
                     record,
-                    null == currentSnapshotSplit.getSplitStart()
-                            ? null
-                            : new Object[] 
{currentSnapshotSplit.getSplitStart()},
-                    null == currentSnapshotSplit.getSplitEnd()
-                            ? null
-                            : new Object[] 
{currentSnapshotSplit.getSplitEnd()});
+                    currentSnapshotSplit.getSplitStart(),
+                    currentSnapshotSplit.getSplitEnd());
         }
         return false;
     }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
index 5257064dc1..2b8e9f7725 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
@@ -19,6 +19,7 @@ package 
org.apache.seatunnel.connectors.cdc.base.source.reader.external;
 
 import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
+import 
org.apache.seatunnel.connectors.cdc.base.source.split.CompletedSnapshotSplitInfo;
 import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
 import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
 import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
@@ -32,8 +33,12 @@ import io.debezium.relational.TableId;
 import lombok.extern.slf4j.Slf4j;
 
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
@@ -49,6 +54,8 @@ import static 
org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.g
 public class IncrementalSourceStreamFetcher implements Fetcher<SourceRecords, 
SourceSplitBase> {
     private final FetchTask.Context taskContext;
     private final ExecutorService executorService;
+    // has entered pure binlog mode
+    private final Set<TableId> pureBinlogPhaseTables;
     private volatile ChangeEventQueue<DataChangeEvent> queue;
     private volatile Throwable readException;
 
@@ -58,6 +65,11 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
 
     private Offset splitStartWatermark;
 
+    // maximum watermark for each table
+    private Map<TableId, Offset> maxSplitHighWatermarkMap;
+    // finished spilt info
+    private Map<TableId, List<CompletedSnapshotSplitInfo>> finishedSplitsInfo;
+
     private static final long READER_CLOSE_TIMEOUT_SECONDS = 30L;
 
     public IncrementalSourceStreamFetcher(FetchTask.Context taskContext, int 
subTaskId) {
@@ -65,6 +77,7 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
         ThreadFactory threadFactory =
                 new ThreadFactoryBuilder().setNameFormat("debezium-reader-" + 
subTaskId).build();
         this.executorService = 
Executors.newSingleThreadExecutor(threadFactory);
+        this.pureBinlogPhaseTables = new HashSet<>();
     }
 
     @Override
@@ -157,14 +170,72 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
                         tableId);
                 return position.isAfter(splitStartWatermark);
             }
-            // TODO only the table who captured snapshot splits need to 
filter( Used to support
-            // Exactly-Once )
-            return position.isAfter(splitStartWatermark);
+            // check whether the pure binlog mode has been entered
+            if (hasEnterPureBinlogPhase(tableId, position)) {
+                return true;
+            }
+            // not enter pure binlog mode and need to check whether the 
current record meets the
+            // emitting conditions.
+            if (finishedSplitsInfo.containsKey(tableId)) {
+                for (CompletedSnapshotSplitInfo splitInfo : 
finishedSplitsInfo.get(tableId)) {
+                    if (taskContext.isRecordBetween(
+                                    sourceRecord,
+                                    splitInfo.getSplitStart(),
+                                    splitInfo.getSplitEnd())
+                            && 
position.isAfter(splitInfo.getWatermark().getHighWatermark())) {
+                        return true;
+                    }
+                }
+            }
+            return false;
         }
         return true;
     }
 
+    private boolean hasEnterPureBinlogPhase(TableId tableId, Offset position) {
+        // only the table who captured snapshot splits need to filter
+        if (pureBinlogPhaseTables.contains(tableId)) {
+            return true;
+        }
+        // the existed tables those have finished snapshot reading
+        if (maxSplitHighWatermarkMap.containsKey(tableId)
+                && 
position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) {
+            pureBinlogPhaseTables.add(tableId);
+            return true;
+        }
+        return false;
+    }
+
     private void configureFilter() {
         splitStartWatermark = currentIncrementalSplit.getStartupOffset();
+        Map<TableId, List<CompletedSnapshotSplitInfo>> splitsInfoMap = new 
HashMap<>();
+        Map<TableId, Offset> tableIdBinlogPositionMap = new HashMap<>();
+        List<CompletedSnapshotSplitInfo> completedSnapshotSplitInfos =
+                currentIncrementalSplit.getCompletedSnapshotSplitInfos();
+
+        // latest-offset mode
+        if (completedSnapshotSplitInfos.isEmpty()) {
+            for (TableId tableId : currentIncrementalSplit.getTableIds()) {
+                tableIdBinlogPositionMap.put(tableId, 
currentIncrementalSplit.getStartupOffset());
+            }
+        }
+
+        // calculate the max high watermark of every table
+        for (CompletedSnapshotSplitInfo finishedSplitInfo : 
completedSnapshotSplitInfos) {
+            TableId tableId = finishedSplitInfo.getTableId();
+            List<CompletedSnapshotSplitInfo> list =
+                    splitsInfoMap.getOrDefault(tableId, new ArrayList<>());
+            list.add(finishedSplitInfo);
+            splitsInfoMap.put(tableId, list);
+
+            Offset highWatermark = 
finishedSplitInfo.getWatermark().getHighWatermark();
+            Offset maxHighWatermark = tableIdBinlogPositionMap.get(tableId);
+            if (maxHighWatermark == null || 
highWatermark.isAfter(maxHighWatermark)) {
+                tableIdBinlogPositionMap.put(tableId, highWatermark);
+            }
+        }
+        this.finishedSplitsInfo = splitsInfoMap;
+        this.maxSplitHighWatermarkMap = tableIdBinlogPositionMap;
+        this.pureBinlogPhaseTables.clear();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/offset/BinlogOffset.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/offset/BinlogOffset.java
index 195b1a5a7c..0d91c02fee 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/offset/BinlogOffset.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/offset/BinlogOffset.java
@@ -176,7 +176,13 @@ public class BinlogOffset extends Offset {
             // compared ...
             long timestamp = this.getTimestamp();
             long targetTimestamp = that.getTimestamp();
-            return Long.compare(timestamp, targetTimestamp);
+            // Timestamps are presupposes that they exist,
+            // because timestamps do not exist for low watermark and high 
watermark.
+            // If not judging here results in the really binlog offset 
comparison to watermark
+            // always being true.
+            if (timestamp != 0 && targetTimestamp != 0) {
+                return Long.compare(timestamp, targetTimestamp);
+            }
         }
 
         // First compare the MySQL binlog filenames

Reply via email to