hailin0 commented on code in PR #5057:
URL: https://github.com/apache/seatunnel/pull/5057#discussion_r1271770767


##########
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java:
##########
@@ -157,14 +170,72 @@ private boolean shouldEmit(SourceRecord sourceRecord) {
                         tableId);
                 return position.isAfter(splitStartWatermark);
             }
-            // TODO only the table who captured snapshot splits need to 
filter( Used to support
-            // Exactly-Once )
-            return position.isAfter(splitStartWatermark);
+            // check whether the pure binlog mode has been entered
+            if (hasEnterPureBinlogPhase(tableId, position)) {
+                return true;
+            }
+            // not enter pure binlog mode and need to check whether the 
current record meets the
+            // emitting conditions.
+            if (finishedSplitsInfo.containsKey(tableId)) {
+                for (CompletedSnapshotSplitInfo splitInfo : 
finishedSplitsInfo.get(tableId)) {
+                    if (taskContext.isRecordBetween(
+                                    sourceRecord,
+                                    splitInfo.getSplitStart(),
+                                    splitInfo.getSplitEnd())
+                            && 
position.isAfter(splitInfo.getWatermark().getHighWatermark())) {
+                        return true;
+                    }
+                }
+            }
+            return false;
         }
         return true;
     }
 
+    private boolean hasEnterPureBinlogPhase(TableId tableId, Offset position) {
+        // only the table who captured snapshot splits need to filter
+        if (pureBinlogPhaseTables.contains(tableId)) {
+            return true;
+        }
+        // the existed tables those have finished snapshot reading
+        if (maxSplitHighWatermarkMap.containsKey(tableId)
+                && 
position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) {
+            pureBinlogPhaseTables.add(tableId);

Review Comment:
   ```suggestion
               pureBinlogPhaseTables.add(tableId);
               maxSplitHighWatermarkMap.remove(tableId)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to