lzshlzsh commented on code in PR #3845:
URL: https://github.com/apache/flink-cdc/pull/3845#discussion_r2303231371


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java:
##########
@@ -36,36 +38,64 @@ public class GtidUtils {
     public static GtidSet fixRestoredGtidSet(GtidSet serverGtidSet, GtidSet 
restoredGtidSet) {
         Map<String, GtidSet.UUIDSet> newSet = new HashMap<>();
         serverGtidSet.getUUIDSets().forEach(uuidSet -> 
newSet.put(uuidSet.getUUID(), uuidSet));
-        for (GtidSet.UUIDSet uuidSet : restoredGtidSet.getUUIDSets()) {
-            GtidSet.UUIDSet serverUuidSet = newSet.get(uuidSet.getUUID());
+        for (GtidSet.UUIDSet restoredUuidSet : restoredGtidSet.getUUIDSets()) {
+            GtidSet.UUIDSet serverUuidSet = 
newSet.get(restoredUuidSet.getUUID());
             if (serverUuidSet != null) {
-                long restoredIntervalEnd = getIntervalEnd(uuidSet);
-                List<com.github.shyiko.mysql.binlog.GtidSet.Interval> 
newIntervals =
-                        new ArrayList<>();
-                for (GtidSet.Interval serverInterval : 
serverUuidSet.getIntervals()) {
-                    if (serverInterval.getEnd() <= restoredIntervalEnd) {
-                        newIntervals.add(
-                                new 
com.github.shyiko.mysql.binlog.GtidSet.Interval(
-                                        serverInterval.getStart(), 
serverInterval.getEnd()));
-                    } else if (serverInterval.getStart() <= restoredIntervalEnd
-                            && serverInterval.getEnd() > restoredIntervalEnd) {
-                        newIntervals.add(
+                List<GtidSet.Interval> serverIntervals = 
serverUuidSet.getIntervals();
+                List<GtidSet.Interval> restoredIntervals = 
restoredUuidSet.getIntervals();
+
+                long earliestRestoredTx = 
getMinIntervalStart(restoredIntervals);
+
+                List<com.github.shyiko.mysql.binlog.GtidSet.Interval> merged = 
new ArrayList<>();
+
+                // Process each server interval
+                for (GtidSet.Interval serverInterval : serverIntervals) {
+                    // First, check if any part comes before earliest restored
+                    if (serverInterval.getStart() < earliestRestoredTx) {
+                        long end = Math.min(serverInterval.getEnd(), 
earliestRestoredTx - 1);
+                        merged.add(
                                 new 
com.github.shyiko.mysql.binlog.GtidSet.Interval(
-                                        serverInterval.getStart(), 
restoredIntervalEnd));
+                                        serverInterval.getStart(), end));

Review Comment:
   > Considering that the gtid in the state may be incomplete, I think the 
merging is acceptable. But we need to add handling for the `PreviousGtidsEvent` 
so that we can use cleaner methods to deal with this in future versions.
   
   In my opinion, PreviousGtidsEvent cannot solve the problem of data loss with 
small probability. Merging the gtid on the server side that precedes the 
minimum restored gtid may result in small risk of data loss. This merging logic 
exists before this PR. I mentioned a case of data loss above. Let me briefly 
explain it here.
   
   1. There is only one binlog file avaible in mysql, gtids are out of order, 
182580182579182578, corresponding to three update data. PreviousGtidsEvent is 
A:1-182571. When starting with the `'scan.startup.mode' = 'earliest-offset'`, 
after reading 182580 and 182579 data, a checkpoint is made. The saved gtid is 
A:182580-182580, "row": "1", "event": "2".  The gtid known to the server from 
the `show master status` command is A:1-1825771:182578-182580. That is, 182578 
has not been read.
   
   2. When recovering from the checkpoint, the gtid of the merged  to connect 
to MySQL is A:1-1825771:182578-182580, and 182578 data will not be read, thus 
it is lost.
   
   PreviousGtidsEvent A:1-182571 could not be used to avoid the data loss.
   
   Beside that I think the merging is acceptable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to