lzshlzsh commented on code in PR #3845: URL: https://github.com/apache/flink-cdc/pull/3845#discussion_r2303231371
########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java: ########## @@ -36,36 +38,64 @@ public class GtidUtils { public static GtidSet fixRestoredGtidSet(GtidSet serverGtidSet, GtidSet restoredGtidSet) { Map<String, GtidSet.UUIDSet> newSet = new HashMap<>(); serverGtidSet.getUUIDSets().forEach(uuidSet -> newSet.put(uuidSet.getUUID(), uuidSet)); - for (GtidSet.UUIDSet uuidSet : restoredGtidSet.getUUIDSets()) { - GtidSet.UUIDSet serverUuidSet = newSet.get(uuidSet.getUUID()); + for (GtidSet.UUIDSet restoredUuidSet : restoredGtidSet.getUUIDSets()) { + GtidSet.UUIDSet serverUuidSet = newSet.get(restoredUuidSet.getUUID()); if (serverUuidSet != null) { - long restoredIntervalEnd = getIntervalEnd(uuidSet); - List<com.github.shyiko.mysql.binlog.GtidSet.Interval> newIntervals = - new ArrayList<>(); - for (GtidSet.Interval serverInterval : serverUuidSet.getIntervals()) { - if (serverInterval.getEnd() <= restoredIntervalEnd) { - newIntervals.add( - new com.github.shyiko.mysql.binlog.GtidSet.Interval( - serverInterval.getStart(), serverInterval.getEnd())); - } else if (serverInterval.getStart() <= restoredIntervalEnd - && serverInterval.getEnd() > restoredIntervalEnd) { - newIntervals.add( + List<GtidSet.Interval> serverIntervals = serverUuidSet.getIntervals(); + List<GtidSet.Interval> restoredIntervals = restoredUuidSet.getIntervals(); + + long earliestRestoredTx = getMinIntervalStart(restoredIntervals); + + List<com.github.shyiko.mysql.binlog.GtidSet.Interval> merged = new ArrayList<>(); + + // Process each server interval + for (GtidSet.Interval serverInterval : serverIntervals) { + // First, check if any part comes before earliest restored + if (serverInterval.getStart() < earliestRestoredTx) { + long end = Math.min(serverInterval.getEnd(), earliestRestoredTx - 1); + merged.add( new com.github.shyiko.mysql.binlog.GtidSet.Interval( - serverInterval.getStart(), restoredIntervalEnd)); + serverInterval.getStart(), end)); Review Comment: > Considering that the gtid in the state may be incomplete, I think the merging is acceptable. But we need to add handling for the `PreviousGtidsEvent` so that we can use cleaner methods to deal with this in future versions. In my opinion, PreviousGtidsEvent cannot solve the problem of small risk of data loss. Merging the gtid on the server side that precedes the minimum restored gtid may result in small risk of data loss. This merging logic exists before this PR. I mentioned a case of data loss above. Let me briefly explain it here. 1. There is only one binlog file avaible in mysql, gtids are out of order, 182580, 182579, 182578, corresponding to three update data. PreviousGtidsEvent is A:1-182571. When starting with the `'scan.startup.mode' = 'earliest-offset'`, after reading 182580 and 182579 data, a checkpoint is made. The saved gtid is A:182580-182580, "row": "1", "event": "2". The gtid known to the server from the `show master status` command is A:1-1825771:182578-182580. That is, 182578 has not been read. 2. When recovering from the checkpoint, the gtid of the merged to connect to MySQL is A:1-1825771:182578-182580, and 182578 data will not be read, thus it is lost. PreviousGtidsEvent A:1-182571 could not be used to avoid the data loss. Beside that I think the merging is acceptable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org