ruanhang1993 commented on code in PR #4286:
URL: https://github.com/apache/flink-cdc/pull/4286#discussion_r2928768877


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java:
##########
@@ -23,10 +23,52 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Predicate;
 
 /** Utils for handling GTIDs. */
 public class GtidUtils {
 
+    /**
+     * Computes the merged GTID set for the LATEST new-channel-position mode.
+     *
+     * <p>For old channels (UUIDs present in checkpoint), non-contiguous GTID 
ranges are fixed via
+     * {@link #fixRestoredGtidSet} to avoid MySQL replaying pre-checkpoint 
transactions. For new
+     * channels (UUIDs not in checkpoint), the server's full GTID is used to 
skip all history.
+     *
+     * @param availableServerGtidSet the GTID set currently available on the 
MySQL server
+     * @param purgedServerGtid the GTID set already purged from the MySQL 
server
+     * @param checkpointGtidSet the GTID set restored from checkpoint (after 
source filter applied)
+     * @param gtidSourceFilter optional predicate to filter GTID source UUIDs; 
may be null
+     * @return the merged GTID set suitable for binlog subscription
+     */
+    public static GtidSet computeLatestModeGtidSet(
+            GtidSet availableServerGtidSet,
+            GtidSet purgedServerGtid,
+            GtidSet checkpointGtidSet,
+            Predicate<String> gtidSourceFilter) {
+        final GtidSet relevantAvailableServerGtidSet =
+                (gtidSourceFilter != null)
+                        ? availableServerGtidSet.retainAll(gtidSourceFilter)
+                        : availableServerGtidSet;
+
+        // Step 1: Fix old channels' GTID ranges
+        GtidSet fixedOldChannelsGtid =
+                fixRestoredGtidSet(
+                        mergeGtidSetInto(
+                                relevantAvailableServerGtidSet.retainAll(
+                                        uuid -> 
checkpointGtidSet.forServerWithId(uuid) != null),
+                                purgedServerGtid),
+                        checkpointGtidSet);
+
+        // Step 2: For new channels, use server's full GTID to skip all history
+        GtidSet newChannelsGtid =
+                relevantAvailableServerGtidSet.retainAll(
+                        uuid -> checkpointGtidSet.forServerWithId(uuid) == 
null);
+
+        // Step 3: Merge fixed old channels + new channels
+        return mergeGtidSetInto(fixedOldChannelsGtid, newChannelsGtid);
+    }

Review Comment:
   The code for `latest` reuses all codes for the `earliest` part. The 
difference between the two is that `latest` additionally includes the content 
of new channels, achieving the effect of skipping reads.
   
   Is it necessary to extract this part of the code into a new method for 
reuse? @Hisoka-X 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to