This is an automated email from the ASF dual-hosted git repository. kunni pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new 85a79aae2 [FLINK-37065]: Fix the problem that MySQL cdc may lose/skip data during recovering from the checkpoint (#3845) 85a79aae2 is described below commit 85a79aae2676d571fa952377055edcfafabd973e Author: Ihor Mielientiev <ihor.mielient...@gmail.com> AuthorDate: Thu Sep 18 11:20:57 2025 +0200 [FLINK-37065]: Fix the problem that MySQL cdc may lose/skip data during recovering from the checkpoint (#3845) --- .../io/debezium/connector/mysql/GtidUtils.java | 73 ++++++++----- .../io/debezium/connector/mysql/GtidUtilsTest.java | 83 ++++++++++----- .../connectors/mysql/MysqlGtidRecoveryTest.java | 117 +++++++++++++++++++++ 3 files changed, 222 insertions(+), 51 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java index fe25208f6..f6c4987e7 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java @@ -18,6 +18,8 @@ package io.debezium.connector.mysql; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -36,36 +38,64 @@ public class GtidUtils { public static GtidSet fixRestoredGtidSet(GtidSet serverGtidSet, GtidSet restoredGtidSet) { Map<String, GtidSet.UUIDSet> newSet = new HashMap<>(); serverGtidSet.getUUIDSets().forEach(uuidSet -> newSet.put(uuidSet.getUUID(), uuidSet)); - for (GtidSet.UUIDSet uuidSet : restoredGtidSet.getUUIDSets()) { - GtidSet.UUIDSet serverUuidSet = newSet.get(uuidSet.getUUID()); + for (GtidSet.UUIDSet restoredUuidSet : restoredGtidSet.getUUIDSets()) { + GtidSet.UUIDSet serverUuidSet = newSet.get(restoredUuidSet.getUUID()); if (serverUuidSet != null) { - long restoredIntervalEnd = getIntervalEnd(uuidSet); - List<com.github.shyiko.mysql.binlog.GtidSet.Interval> newIntervals = - new ArrayList<>(); - for (GtidSet.Interval serverInterval : serverUuidSet.getIntervals()) { - if (serverInterval.getEnd() <= restoredIntervalEnd) { - newIntervals.add( - new com.github.shyiko.mysql.binlog.GtidSet.Interval( - serverInterval.getStart(), serverInterval.getEnd())); - } else if (serverInterval.getStart() <= restoredIntervalEnd - && serverInterval.getEnd() > restoredIntervalEnd) { - newIntervals.add( + List<GtidSet.Interval> serverIntervals = serverUuidSet.getIntervals(); + List<GtidSet.Interval> restoredIntervals = restoredUuidSet.getIntervals(); + + long earliestRestoredTx = getMinIntervalStart(restoredIntervals); + + List<com.github.shyiko.mysql.binlog.GtidSet.Interval> merged = new ArrayList<>(); + + // Process each server interval + for (GtidSet.Interval serverInterval : serverIntervals) { + // First, check if any part comes before earliest restored + if (serverInterval.getStart() < earliestRestoredTx) { + long end = Math.min(serverInterval.getEnd(), earliestRestoredTx - 1); + merged.add( new com.github.shyiko.mysql.binlog.GtidSet.Interval( - serverInterval.getStart(), restoredIntervalEnd)); + serverInterval.getStart(), end)); + } + + // Then check for overlaps with restored intervals + for (GtidSet.Interval restoredInterval : restoredIntervals) { + if (serverInterval.getStart() <= restoredInterval.getEnd() + && serverInterval.getEnd() >= restoredInterval.getStart()) { + // There's an overlap - add the intersection + long intersectionStart = + Math.max( + serverInterval.getStart(), restoredInterval.getStart()); + long intersectionEnd = + Math.min(serverInterval.getEnd(), restoredInterval.getEnd()); + + if (intersectionStart <= intersectionEnd) { + merged.add( + new com.github.shyiko.mysql.binlog.GtidSet.Interval( + intersectionStart, intersectionEnd)); + } + } } } - newSet.put( - uuidSet.getUUID(), + + GtidSet.UUIDSet mergedUuidSet = new GtidSet.UUIDSet( new com.github.shyiko.mysql.binlog.GtidSet.UUIDSet( - uuidSet.getUUID(), newIntervals))); + restoredUuidSet.getUUID(), merged)); + + newSet.put(restoredUuidSet.getUUID(), mergedUuidSet); } else { - newSet.put(uuidSet.getUUID(), uuidSet); + newSet.put(restoredUuidSet.getUUID(), restoredUuidSet); } } return new GtidSet(newSet); } + private static long getMinIntervalStart(List<GtidSet.Interval> intervals) { + return Collections.min(intervals, Comparator.comparingLong(GtidSet.Interval::getStart)) + .getStart(); + } + /** * This method merges one GTID set (toMerge) into another (base), without overwriting the * existing elements in the base GTID set. @@ -80,11 +110,4 @@ public class GtidUtils { } return new GtidSet(newSet); } - - private static long getIntervalEnd(GtidSet.UUIDSet uuidSet) { - return uuidSet.getIntervals().stream() - .mapToLong(GtidSet.Interval::getEnd) - .max() - .getAsLong(); - } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java index d0269df4c..88d4b8aba 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java @@ -18,6 +18,11 @@ package io.debezium.connector.mysql; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.stream.Stream; import static io.debezium.connector.mysql.GtidUtils.fixRestoredGtidSet; import static io.debezium.connector.mysql.GtidUtils.mergeGtidSetInto; @@ -25,30 +30,57 @@ import static org.assertj.core.api.Assertions.assertThat; /** Unit test for {@link GtidUtils}. */ class GtidUtilsTest { - @Test - void testFixingRestoredGtidSet() { - GtidSet serverGtidSet = new GtidSet("A:1-100"); - GtidSet restoredGtidSet = new GtidSet("A:30-100"); - assertThat(fixRestoredGtidSet(serverGtidSet, restoredGtidSet)).hasToString("A:1-100"); - serverGtidSet = new GtidSet("A:1-100"); - restoredGtidSet = new GtidSet("A:30-50"); - assertThat(fixRestoredGtidSet(serverGtidSet, restoredGtidSet)).hasToString("A:1-50"); + @ParameterizedTest(name = "{0}") + @MethodSource("gtidSetsProvider") + void testFixingRestoredGtidSet( + String description, String serverStr, String restoredStr, String expectedStr) { + GtidSet serverGtidSet = new GtidSet(serverStr); + GtidSet restoredGtidSet = new GtidSet(restoredStr); - serverGtidSet = new GtidSet("A:1-100:102-200,B:20-200"); - restoredGtidSet = new GtidSet("A:106-150"); - assertThat(fixRestoredGtidSet(serverGtidSet, restoredGtidSet)) - .hasToString("A:1-100:102-150,B:20-200"); + GtidSet result = fixRestoredGtidSet(serverGtidSet, restoredGtidSet); - serverGtidSet = new GtidSet("A:1-100:102-200,B:20-200"); - restoredGtidSet = new GtidSet("A:106-150,C:1-100"); - assertThat(fixRestoredGtidSet(serverGtidSet, restoredGtidSet)) - .hasToString("A:1-100:102-150,B:20-200,C:1-100"); + assertThat(result).hasToString(expectedStr); + } - serverGtidSet = new GtidSet("A:1-100:102-200,B:20-200"); - restoredGtidSet = new GtidSet("A:106-150:152-200,C:1-100"); - assertThat(fixRestoredGtidSet(serverGtidSet, restoredGtidSet)) - .hasToString("A:1-100:102-200,B:20-200,C:1-100"); + private static Stream<Arguments> gtidSetsProvider() { + return Stream.of( + Arguments.of( + "Basic example with a straightforward subset", + "A:1-100", + "A:1-50:63-100", + "A:1-50:63-100"), + Arguments.of( + "Multiple intervals with gaps in restored", + "A:1-100", + "A:45-80:83-90:92-98", + "A:1-80:83-90:92-98"), + Arguments.of( + "Server has disjoint intervals, restored partially overlaps", + "A:1-50:60-90:95-200", + "A:45-50:65-70:96-100", + "A:1-50:65-70:96-100"), + Arguments.of( + "Restored partially covers server range", + "A:1-100:102-200", + "A:106-150:152-200", + "A:1-100:102-150:152-200"), + Arguments.of( + "Restored end exceeds server range", + "A:1-100,B:1-200:205-300", + "A:1-110,B:1-201:210-230:245-305", + "A:1-100,B:1-200:210-230:245-300"), + Arguments.of( + "Multiple UUIDs with different overlaps", + "A:1-100,B:1-50", + "A:45-80,B:30-60,C:1-20", + "A:1-80,B:1-50,C:1-20"), + Arguments.of("Restored starts after server ends", "A:1-100", "A:80-150", "A:1-100"), + Arguments.of( + "Complex overlapping intervals", + "A:1-20:30-50:60-80", + "A:15-35:45-65:75-85", + "A:1-20:30-35:45-50:60-65:75-80")); } @Test @@ -58,11 +90,10 @@ class GtidUtilsTest { assertThat(mergeGtidSetInto(base, toMerge)).hasToString("A:1-100"); base = new GtidSet("A:1-100"); - toMerge = new GtidSet("B:1-10"); - assertThat(mergeGtidSetInto(base, toMerge)).hasToString("A:1-100,B:1-10"); - - base = new GtidSet("A:1-100,C:1-100"); - toMerge = new GtidSet("A:1-10,B:1-10"); - assertThat(mergeGtidSetInto(base, toMerge)).hasToString("A:1-100,B:1-10,C:1-100"); + toMerge = new GtidSet("C:1-10"); + assertThat(mergeGtidSetInto(base, toMerge)).hasToString("A:1-100,C:1-10"); + base = new GtidSet("A:1-100,B:1-100"); + toMerge = new GtidSet("A:1-10,C:1-10"); + assertThat(mergeGtidSetInto(base, toMerge)).hasToString("A:1-100,B:1-100,C:1-10"); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/MysqlGtidRecoveryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/MysqlGtidRecoveryTest.java new file mode 100644 index 000000000..4c1927c52 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/MysqlGtidRecoveryTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql; + +import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase; + +import com.github.shyiko.mysql.binlog.BinaryLogClient; +import com.github.shyiko.mysql.binlog.event.GtidEventData; +import org.junit.jupiter.api.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test gtid recovery in mysql. */ +public class MysqlGtidRecoveryTest extends MySqlSourceTestBase { + + private static final String SCHEMA = "flink-test"; + private static final String USERNAME = "mysqluser"; + private static final String PASSWORD = "mysqlpw"; + + @Test + public void testGtidGapsPreservedDuringRecovery() throws Exception { + // After creation of the table GTID Set will be <gtid>:1-23 + createTableAndFillWithData(); + String serverUuid = getServerUuid(); + + String gtidSetWithGaps = serverUuid + ":1-10:14-16:19-19:21-22"; + + List<Integer> expectedReceivedGtids = Arrays.asList(11, 12, 13, 17, 18, 20, 23); + List<Integer> receivedGtidNumbers = new ArrayList<>(); + CountDownLatch latch = new CountDownLatch(expectedReceivedGtids.size()); + + BinaryLogClient client = + new BinaryLogClient( + MYSQL_CONTAINER.getHost(), + MYSQL_CONTAINER.getMappedPort(3306), + SCHEMA, + USERNAME, + PASSWORD); + + client.setGtidSet(gtidSetWithGaps); + client.setServerId(System.currentTimeMillis()); + + client.registerEventListener( + event -> { + if (event.getData() instanceof GtidEventData) { + String gtid = ((GtidEventData) event.getData()).getGtid(); + int gtidNum = Integer.parseInt(gtid.split(":")[1]); + receivedGtidNumbers.add(gtidNum); + latch.countDown(); + } + }); + + // Connect in background thread, since connect is blocking method + new Thread( + () -> { + try { + client.connect(); + } catch (Exception ignored) { + } + }) + .start(); + + assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue(); + client.disconnect(); + + assertEqualsInAnyOrder(expectedReceivedGtids, receivedGtidNumbers); + } + + private static void createTableAndFillWithData() throws SQLException { + try (Connection conn = + DriverManager.getConnection( + MYSQL_CONTAINER.getJdbcUrl(), USERNAME, PASSWORD); + Statement stmt = conn.createStatement()) { + stmt.execute("CREATE TABLE IF NOT EXISTS test_table (id INT PRIMARY KEY)"); + for (int i = 1; i <= 10; i++) { + stmt.execute("INSERT INTO test_table VALUES (" + i + ")"); + } + } + } + + private String getServerUuid() throws Exception { + try (Connection conn = + DriverManager.getConnection( + MYSQL_CONTAINER.getJdbcUrl(), USERNAME, PASSWORD); + Statement stmt = conn.createStatement()) { + + ResultSet rs = stmt.executeQuery("SELECT @@server_uuid"); + return rs.next() ? rs.getString(1) : ""; + } + } +}