This is an automated email from the ASF dual-hosted git repository.

ruanhang1993 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 986f37b30 [FLINK-35674][cdc-connector][mysql]Fix blocking caused by 
searching for timestamp in binlog file (#3432)
986f37b30 is described below

commit 986f37b30750da0a84e2da8d0ff9554702dacc01
Author: Thorne <[email protected]>
AuthorDate: Tue Aug 6 15:49:50 2024 +0800

    [FLINK-35674][cdc-connector][mysql]Fix blocking caused by searching for 
timestamp in binlog file (#3432)
---
 .../connectors/mysql/debezium/DebeziumUtils.java   |  7 ++--
 .../debezium/task/context/StatefulTaskContext.java |  4 ++-
 .../mysql/source/offset/BinlogOffsetUtils.java     |  6 ++--
 .../debezium/reader/BinlogSplitReaderTest.java     |  3 +-
 .../mysql/source/SpecificStartingOffsetITCase.java | 38 ++++++++++++++++++----
 5 files changed, 45 insertions(+), 13 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java
index 33dac6771..ac3b20c4f 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java
@@ -240,12 +240,15 @@ public class DebeziumUtils {
         return variables;
     }
 
-    public static BinlogOffset findBinlogOffset(long targetMs, MySqlConnection 
connection) {
+    public static BinlogOffset findBinlogOffset(
+            long targetMs, MySqlConnection connection, MySqlSourceConfig 
mySqlSourceConfig) {
         MySqlConnection.MySqlConnectionConfiguration config = 
connection.connectionConfig();
         BinaryLogClient client =
                 new BinaryLogClient(
                         config.hostname(), config.port(), config.username(), 
config.password());
-
+        if (mySqlSourceConfig.getServerIdRange() != null) {
+            
client.setServerId(mySqlSourceConfig.getServerIdRange().getStartServerId());
+        }
         List<String> binlogFiles = new ArrayList<>();
         JdbcConnection.ResultSetConsumer rsc =
                 rs -> {
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java
index 5fc342a4c..fbce9cdd7 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java
@@ -193,7 +193,9 @@ public class StatefulTaskContext {
                 mySqlSplit.isSnapshotSplit()
                         ? BinlogOffset.ofEarliest()
                         : initializeEffectiveOffset(
-                                
mySqlSplit.asBinlogSplit().getStartingOffset(), connection);
+                                mySqlSplit.asBinlogSplit().getStartingOffset(),
+                                connection,
+                                sourceConfig);
 
         LOG.info("Starting offset is initialized to {}", offset);
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffsetUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffsetUtils.java
index ce82d6a1b..3c192eaa7 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffsetUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffsetUtils.java
@@ -19,6 +19,7 @@ package org.apache.flink.cdc.connectors.mysql.source.offset;
 
 import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
 import 
org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
+import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
 
 import io.debezium.connector.mysql.MySqlConnection;
 
@@ -45,13 +46,14 @@ public class BinlogOffsetUtils {
      * </ul>
      */
     public static BinlogOffset initializeEffectiveOffset(
-            BinlogOffset offset, MySqlConnection connection) {
+            BinlogOffset offset, MySqlConnection connection, MySqlSourceConfig 
mySqlSourceConfig) {
         BinlogOffsetKind offsetKind = offset.getOffsetKind();
         switch (offsetKind) {
             case EARLIEST:
                 return BinlogOffset.ofBinlogFilePosition("", 0);
             case TIMESTAMP:
-                return DebeziumUtils.findBinlogOffset(offset.getTimestampSec() 
* 1000, connection);
+                return DebeziumUtils.findBinlogOffset(
+                        offset.getTimestampSec() * 1000, connection, 
mySqlSourceConfig);
             case LATEST:
                 return DebeziumUtils.currentBinlogOffset(connection);
             default:
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
index 05e603c00..f9c070d96 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
@@ -1262,7 +1262,8 @@ public class BinlogSplitReaderTest extends 
MySqlSourceTestBase {
                             ? BinlogOffset.ofEarliest()
                             : initializeEffectiveOffset(
                                     
mySqlSplit.asBinlogSplit().getStartingOffset(),
-                                    getConnection());
+                                    getConnection(),
+                                    getSourceConfig());
 
             LOG.info("Starting offset is initialized to {}", offset);
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/SpecificStartingOffsetITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/SpecificStartingOffsetITCase.java
index 9761a6f38..44d72a64b 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/SpecificStartingOffsetITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/SpecificStartingOffsetITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.cdc.connectors.mysql.source;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
+import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
 import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
 import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
 import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
@@ -263,9 +264,12 @@ public class SpecificStartingOffsetITCase {
         // Purge binary log at first
         purgeBinaryLogs();
 
+        long t0 = System.currentTimeMillis();
+        String servedId0 = "5400";
         Assert.equals(
                 BinlogOffset.ofBinlogFilePosition("mysql-bin.000004", 0),
-                DebeziumUtils.findBinlogOffset(System.currentTimeMillis(), 
connection));
+                DebeziumUtils.findBinlogOffset(
+                        t0, connection, getMySqlSourceConfig(t0, servedId0)));
 
         executeStatements(
                 String.format(
@@ -273,6 +277,7 @@ public class SpecificStartingOffsetITCase {
                         customers.getTableId()));
         Thread.sleep(1000);
         long t1 = System.currentTimeMillis();
+        String servedId1 = "5401";
         flushLogs();
 
         executeStatements(
@@ -281,6 +286,7 @@ public class SpecificStartingOffsetITCase {
                         customers.getTableId()));
         Thread.sleep(1000);
         long t2 = System.currentTimeMillis();
+        String servedId2 = "5402";
         flushLogs();
 
         executeStatements(
@@ -289,6 +295,7 @@ public class SpecificStartingOffsetITCase {
                         customers.getTableId()));
         Thread.sleep(1000);
         long t3 = System.currentTimeMillis();
+        String servedId3 = "5403";
         flushLogs();
 
         executeStatements(
@@ -297,6 +304,7 @@ public class SpecificStartingOffsetITCase {
                         customers.getTableId()));
         Thread.sleep(1000);
         long t4 = System.currentTimeMillis();
+        String servedId4 = "5404";
         flushLogs();
 
         executeStatements(
@@ -305,28 +313,35 @@ public class SpecificStartingOffsetITCase {
                         customers.getTableId()));
         Thread.sleep(1000);
         long t5 = System.currentTimeMillis();
+        String servedId5 = "5405";
         flushLogs();
 
         Assert.equals(
                 BinlogOffset.ofBinlogFilePosition("mysql-bin.000005", 0),
-                DebeziumUtils.findBinlogOffset(t1, connection));
+                DebeziumUtils.findBinlogOffset(
+                        t1, connection, getMySqlSourceConfig(t1, servedId1)));
         Assert.equals(
                 BinlogOffset.ofBinlogFilePosition("mysql-bin.000006", 0),
-                DebeziumUtils.findBinlogOffset(t2, connection));
+                DebeziumUtils.findBinlogOffset(
+                        t2, connection, getMySqlSourceConfig(t1, servedId2)));
         Assert.equals(
                 BinlogOffset.ofBinlogFilePosition("mysql-bin.000007", 0),
-                DebeziumUtils.findBinlogOffset(t3, connection));
+                DebeziumUtils.findBinlogOffset(
+                        t3, connection, getMySqlSourceConfig(t1, servedId3)));
         Assert.equals(
                 BinlogOffset.ofBinlogFilePosition("mysql-bin.000008", 0),
-                DebeziumUtils.findBinlogOffset(t4, connection));
+                DebeziumUtils.findBinlogOffset(
+                        t4, connection, getMySqlSourceConfig(t1, servedId4)));
         Assert.equals(
                 BinlogOffset.ofBinlogFilePosition("mysql-bin.000009", 0),
-                DebeziumUtils.findBinlogOffset(t5, connection));
+                DebeziumUtils.findBinlogOffset(
+                        t5, connection, getMySqlSourceConfig(t1, servedId5)));
 
         purgeBinaryLogs();
         Assert.equals(
                 BinlogOffset.ofBinlogFilePosition("mysql-bin.000009", 0),
-                DebeziumUtils.findBinlogOffset(t3, connection));
+                DebeziumUtils.findBinlogOffset(
+                        t5, connection, getMySqlSourceConfig(t1, servedId5)));
     }
 
     @Test
@@ -440,6 +455,15 @@ public class SpecificStartingOffsetITCase {
         return DebeziumUtils.createMySqlConnection(configuration, new 
Properties());
     }
 
+    private MySqlSourceConfig getMySqlSourceConfig(Long timestamp, String 
serverId) {
+        return getSourceBuilder()
+                .startupOptions(StartupOptions.timestamp(timestamp))
+                .serverId(serverId)
+                .build()
+                .getConfigFactory()
+                .createConfig(0);
+    }
+
     private void executeStatements(String... statements) throws Exception {
         connection.execute(statements);
         connection.commit();

Reply via email to