This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 85a79aae2 [FLINK-37065]: Fix the problem that MySQL cdc may lose/skip 
data during recovering from the checkpoint (#3845)
85a79aae2 is described below

commit 85a79aae2676d571fa952377055edcfafabd973e
Author: Ihor Mielientiev <ihor.mielient...@gmail.com>
AuthorDate: Thu Sep 18 11:20:57 2025 +0200

    [FLINK-37065]: Fix the problem that MySQL cdc may lose/skip data during 
recovering from the checkpoint (#3845)
---
 .../io/debezium/connector/mysql/GtidUtils.java     |  73 ++++++++-----
 .../io/debezium/connector/mysql/GtidUtilsTest.java |  83 ++++++++++-----
 .../connectors/mysql/MysqlGtidRecoveryTest.java    | 117 +++++++++++++++++++++
 3 files changed, 222 insertions(+), 51 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java
index fe25208f6..f6c4987e7 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java
@@ -18,6 +18,8 @@
 package io.debezium.connector.mysql;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -36,36 +38,64 @@ public class GtidUtils {
     public static GtidSet fixRestoredGtidSet(GtidSet serverGtidSet, GtidSet 
restoredGtidSet) {
         Map<String, GtidSet.UUIDSet> newSet = new HashMap<>();
         serverGtidSet.getUUIDSets().forEach(uuidSet -> 
newSet.put(uuidSet.getUUID(), uuidSet));
-        for (GtidSet.UUIDSet uuidSet : restoredGtidSet.getUUIDSets()) {
-            GtidSet.UUIDSet serverUuidSet = newSet.get(uuidSet.getUUID());
+        for (GtidSet.UUIDSet restoredUuidSet : restoredGtidSet.getUUIDSets()) {
+            GtidSet.UUIDSet serverUuidSet = 
newSet.get(restoredUuidSet.getUUID());
             if (serverUuidSet != null) {
-                long restoredIntervalEnd = getIntervalEnd(uuidSet);
-                List<com.github.shyiko.mysql.binlog.GtidSet.Interval> 
newIntervals =
-                        new ArrayList<>();
-                for (GtidSet.Interval serverInterval : 
serverUuidSet.getIntervals()) {
-                    if (serverInterval.getEnd() <= restoredIntervalEnd) {
-                        newIntervals.add(
-                                new 
com.github.shyiko.mysql.binlog.GtidSet.Interval(
-                                        serverInterval.getStart(), 
serverInterval.getEnd()));
-                    } else if (serverInterval.getStart() <= restoredIntervalEnd
-                            && serverInterval.getEnd() > restoredIntervalEnd) {
-                        newIntervals.add(
+                List<GtidSet.Interval> serverIntervals = 
serverUuidSet.getIntervals();
+                List<GtidSet.Interval> restoredIntervals = 
restoredUuidSet.getIntervals();
+
+                long earliestRestoredTx = 
getMinIntervalStart(restoredIntervals);
+
+                List<com.github.shyiko.mysql.binlog.GtidSet.Interval> merged = 
new ArrayList<>();
+
+                // Process each server interval
+                for (GtidSet.Interval serverInterval : serverIntervals) {
+                    // First, check if any part comes before earliest restored
+                    if (serverInterval.getStart() < earliestRestoredTx) {
+                        long end = Math.min(serverInterval.getEnd(), 
earliestRestoredTx - 1);
+                        merged.add(
                                 new 
com.github.shyiko.mysql.binlog.GtidSet.Interval(
-                                        serverInterval.getStart(), 
restoredIntervalEnd));
+                                        serverInterval.getStart(), end));
+                    }
+
+                    // Then check for overlaps with restored intervals
+                    for (GtidSet.Interval restoredInterval : 
restoredIntervals) {
+                        if (serverInterval.getStart() <= 
restoredInterval.getEnd()
+                                && serverInterval.getEnd() >= 
restoredInterval.getStart()) {
+                            // There's an overlap - add the intersection
+                            long intersectionStart =
+                                    Math.max(
+                                            serverInterval.getStart(), 
restoredInterval.getStart());
+                            long intersectionEnd =
+                                    Math.min(serverInterval.getEnd(), 
restoredInterval.getEnd());
+
+                            if (intersectionStart <= intersectionEnd) {
+                                merged.add(
+                                        new 
com.github.shyiko.mysql.binlog.GtidSet.Interval(
+                                                intersectionStart, 
intersectionEnd));
+                            }
+                        }
                     }
                 }
-                newSet.put(
-                        uuidSet.getUUID(),
+
+                GtidSet.UUIDSet mergedUuidSet =
                         new GtidSet.UUIDSet(
                                 new 
com.github.shyiko.mysql.binlog.GtidSet.UUIDSet(
-                                        uuidSet.getUUID(), newIntervals)));
+                                        restoredUuidSet.getUUID(), merged));
+
+                newSet.put(restoredUuidSet.getUUID(), mergedUuidSet);
             } else {
-                newSet.put(uuidSet.getUUID(), uuidSet);
+                newSet.put(restoredUuidSet.getUUID(), restoredUuidSet);
             }
         }
         return new GtidSet(newSet);
     }
 
+    private static long getMinIntervalStart(List<GtidSet.Interval> intervals) {
+        return Collections.min(intervals, 
Comparator.comparingLong(GtidSet.Interval::getStart))
+                .getStart();
+    }
+
     /**
      * This method merges one GTID set (toMerge) into another (base), without 
overwriting the
      * existing elements in the base GTID set.
@@ -80,11 +110,4 @@ public class GtidUtils {
         }
         return new GtidSet(newSet);
     }
-
-    private static long getIntervalEnd(GtidSet.UUIDSet uuidSet) {
-        return uuidSet.getIntervals().stream()
-                .mapToLong(GtidSet.Interval::getEnd)
-                .max()
-                .getAsLong();
-    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java
index d0269df4c..88d4b8aba 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java
@@ -18,6 +18,11 @@
 package io.debezium.connector.mysql;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.stream.Stream;
 
 import static io.debezium.connector.mysql.GtidUtils.fixRestoredGtidSet;
 import static io.debezium.connector.mysql.GtidUtils.mergeGtidSetInto;
@@ -25,30 +30,57 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 /** Unit test for {@link GtidUtils}. */
 class GtidUtilsTest {
-    @Test
-    void testFixingRestoredGtidSet() {
-        GtidSet serverGtidSet = new GtidSet("A:1-100");
-        GtidSet restoredGtidSet = new GtidSet("A:30-100");
-        assertThat(fixRestoredGtidSet(serverGtidSet, 
restoredGtidSet)).hasToString("A:1-100");
 
-        serverGtidSet = new GtidSet("A:1-100");
-        restoredGtidSet = new GtidSet("A:30-50");
-        assertThat(fixRestoredGtidSet(serverGtidSet, 
restoredGtidSet)).hasToString("A:1-50");
+    @ParameterizedTest(name = "{0}")
+    @MethodSource("gtidSetsProvider")
+    void testFixingRestoredGtidSet(
+            String description, String serverStr, String restoredStr, String 
expectedStr) {
+        GtidSet serverGtidSet = new GtidSet(serverStr);
+        GtidSet restoredGtidSet = new GtidSet(restoredStr);
 
-        serverGtidSet = new GtidSet("A:1-100:102-200,B:20-200");
-        restoredGtidSet = new GtidSet("A:106-150");
-        assertThat(fixRestoredGtidSet(serverGtidSet, restoredGtidSet))
-                .hasToString("A:1-100:102-150,B:20-200");
+        GtidSet result = fixRestoredGtidSet(serverGtidSet, restoredGtidSet);
 
-        serverGtidSet = new GtidSet("A:1-100:102-200,B:20-200");
-        restoredGtidSet = new GtidSet("A:106-150,C:1-100");
-        assertThat(fixRestoredGtidSet(serverGtidSet, restoredGtidSet))
-                .hasToString("A:1-100:102-150,B:20-200,C:1-100");
+        assertThat(result).hasToString(expectedStr);
+    }
 
-        serverGtidSet = new GtidSet("A:1-100:102-200,B:20-200");
-        restoredGtidSet = new GtidSet("A:106-150:152-200,C:1-100");
-        assertThat(fixRestoredGtidSet(serverGtidSet, restoredGtidSet))
-                .hasToString("A:1-100:102-200,B:20-200,C:1-100");
+    private static Stream<Arguments> gtidSetsProvider() {
+        return Stream.of(
+                Arguments.of(
+                        "Basic example with a straightforward subset",
+                        "A:1-100",
+                        "A:1-50:63-100",
+                        "A:1-50:63-100"),
+                Arguments.of(
+                        "Multiple intervals with gaps in restored",
+                        "A:1-100",
+                        "A:45-80:83-90:92-98",
+                        "A:1-80:83-90:92-98"),
+                Arguments.of(
+                        "Server has disjoint intervals, restored partially 
overlaps",
+                        "A:1-50:60-90:95-200",
+                        "A:45-50:65-70:96-100",
+                        "A:1-50:65-70:96-100"),
+                Arguments.of(
+                        "Restored partially covers server range",
+                        "A:1-100:102-200",
+                        "A:106-150:152-200",
+                        "A:1-100:102-150:152-200"),
+                Arguments.of(
+                        "Restored end exceeds server range",
+                        "A:1-100,B:1-200:205-300",
+                        "A:1-110,B:1-201:210-230:245-305",
+                        "A:1-100,B:1-200:210-230:245-300"),
+                Arguments.of(
+                        "Multiple UUIDs with different overlaps",
+                        "A:1-100,B:1-50",
+                        "A:45-80,B:30-60,C:1-20",
+                        "A:1-80,B:1-50,C:1-20"),
+                Arguments.of("Restored starts after server ends", "A:1-100", 
"A:80-150", "A:1-100"),
+                Arguments.of(
+                        "Complex overlapping intervals",
+                        "A:1-20:30-50:60-80",
+                        "A:15-35:45-65:75-85",
+                        "A:1-20:30-35:45-50:60-65:75-80"));
     }
 
     @Test
@@ -58,11 +90,10 @@ class GtidUtilsTest {
         assertThat(mergeGtidSetInto(base, toMerge)).hasToString("A:1-100");
 
         base = new GtidSet("A:1-100");
-        toMerge = new GtidSet("B:1-10");
-        assertThat(mergeGtidSetInto(base, 
toMerge)).hasToString("A:1-100,B:1-10");
-
-        base = new GtidSet("A:1-100,C:1-100");
-        toMerge = new GtidSet("A:1-10,B:1-10");
-        assertThat(mergeGtidSetInto(base, 
toMerge)).hasToString("A:1-100,B:1-10,C:1-100");
+        toMerge = new GtidSet("C:1-10");
+        assertThat(mergeGtidSetInto(base, 
toMerge)).hasToString("A:1-100,C:1-10");
+        base = new GtidSet("A:1-100,B:1-100");
+        toMerge = new GtidSet("A:1-10,C:1-10");
+        assertThat(mergeGtidSetInto(base, 
toMerge)).hasToString("A:1-100,B:1-100,C:1-10");
     }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/MysqlGtidRecoveryTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/MysqlGtidRecoveryTest.java
new file mode 100644
index 000000000..4c1927c52
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/MysqlGtidRecoveryTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql;
+
+import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase;
+
+import com.github.shyiko.mysql.binlog.BinaryLogClient;
+import com.github.shyiko.mysql.binlog.event.GtidEventData;
+import org.junit.jupiter.api.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test gtid recovery in mysql. */
+public class MysqlGtidRecoveryTest extends MySqlSourceTestBase {
+
+    private static final String SCHEMA = "flink-test";
+    private static final String USERNAME = "mysqluser";
+    private static final String PASSWORD = "mysqlpw";
+
+    @Test
+    public void testGtidGapsPreservedDuringRecovery() throws Exception {
+        // After creation of the table GTID Set will be <gtid>:1-23
+        createTableAndFillWithData();
+        String serverUuid = getServerUuid();
+
+        String gtidSetWithGaps = serverUuid + ":1-10:14-16:19-19:21-22";
+
+        List<Integer> expectedReceivedGtids = Arrays.asList(11, 12, 13, 17, 
18, 20, 23);
+        List<Integer> receivedGtidNumbers = new ArrayList<>();
+        CountDownLatch latch = new 
CountDownLatch(expectedReceivedGtids.size());
+
+        BinaryLogClient client =
+                new BinaryLogClient(
+                        MYSQL_CONTAINER.getHost(),
+                        MYSQL_CONTAINER.getMappedPort(3306),
+                        SCHEMA,
+                        USERNAME,
+                        PASSWORD);
+
+        client.setGtidSet(gtidSetWithGaps);
+        client.setServerId(System.currentTimeMillis());
+
+        client.registerEventListener(
+                event -> {
+                    if (event.getData() instanceof GtidEventData) {
+                        String gtid = ((GtidEventData) 
event.getData()).getGtid();
+                        int gtidNum = Integer.parseInt(gtid.split(":")[1]);
+                        receivedGtidNumbers.add(gtidNum);
+                        latch.countDown();
+                    }
+                });
+
+        // Connect in background thread, since connect is blocking method
+        new Thread(
+                        () -> {
+                            try {
+                                client.connect();
+                            } catch (Exception ignored) {
+                            }
+                        })
+                .start();
+
+        assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
+        client.disconnect();
+
+        assertEqualsInAnyOrder(expectedReceivedGtids, receivedGtidNumbers);
+    }
+
+    private static void createTableAndFillWithData() throws SQLException {
+        try (Connection conn =
+                        DriverManager.getConnection(
+                                MYSQL_CONTAINER.getJdbcUrl(), USERNAME, 
PASSWORD);
+                Statement stmt = conn.createStatement()) {
+            stmt.execute("CREATE TABLE IF NOT EXISTS test_table (id INT 
PRIMARY KEY)");
+            for (int i = 1; i <= 10; i++) {
+                stmt.execute("INSERT INTO test_table VALUES (" + i + ")");
+            }
+        }
+    }
+
+    private String getServerUuid() throws Exception {
+        try (Connection conn =
+                        DriverManager.getConnection(
+                                MYSQL_CONTAINER.getJdbcUrl(), USERNAME, 
PASSWORD);
+                Statement stmt = conn.createStatement()) {
+
+            ResultSet rs = stmt.executeQuery("SELECT @@server_uuid");
+            return rs.next() ? rs.getString(1) : "";
+        }
+    }
+}

Reply via email to