This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new afd4a74c0 [FLINK-35802][pipeline-connectors/mysql] Clean 
ChangeEventQueue to avoid deadlock when calling BinaryLogClient#disconnect 
method
afd4a74c0 is described below

commit afd4a74c0a3dc4f6353a37e875023a7f849a634e
Author: Kunni <lvyanquan....@alibaba-inc.com>
AuthorDate: Thu Jan 16 19:43:25 2025 +0800

    [FLINK-35802][pipeline-connectors/mysql] Clean ChangeEventQueue to avoid 
deadlock when calling BinaryLogClient#disconnect method
    
    This closes #3463
---
 .../mysql/debezium/reader/BinlogSplitReader.java   | 28 ++++++++++++++--------
 .../mysql/debezium/reader/SnapshotSplitReader.java | 26 +++++++++++++-------
 .../debezium/task/context/StatefulTaskContext.java | 23 +++++++++++++++++-
 .../mysql/source/reader/MySqlSplitReader.java      | 21 ++--------------
 4 files changed, 59 insertions(+), 39 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
index dee96802c..cd3c697e4 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
@@ -20,6 +20,7 @@ package org.apache.flink.cdc.connectors.mysql.debezium.reader;
 import org.apache.flink.cdc.common.annotation.VisibleForTesting;
 import 
org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask;
 import 
org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
+import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
 import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
 import 
org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
 import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
@@ -61,6 +62,9 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
 
+import static 
org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createBinaryClient;
+import static 
org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createMySqlConnection;
+
 /**
  * A Debezium binlog reader implementation that also support reads binlog and 
filter overlapping
  * snapshot data that {@link SnapshotSplitReader} read.
@@ -88,10 +92,19 @@ public class BinlogSplitReader implements 
DebeziumReader<SourceRecords, MySqlSpl
 
     private static final long READER_CLOSE_TIMEOUT = 30L;
 
-    public BinlogSplitReader(StatefulTaskContext statefulTaskContext, int 
subTaskId) {
+    public BinlogSplitReader(MySqlSourceConfig sourceConfig, int subtaskId) {
+        this(
+                new StatefulTaskContext(
+                        sourceConfig,
+                        createBinaryClient(sourceConfig.getDbzConfiguration()),
+                        createMySqlConnection(sourceConfig)),
+                subtaskId);
+    }
+
+    public BinlogSplitReader(StatefulTaskContext statefulTaskContext, int 
subtaskId) {
         this.statefulTaskContext = statefulTaskContext;
         ThreadFactory threadFactory =
-                new ThreadFactoryBuilder().setNameFormat("binlog-reader-" + 
subTaskId).build();
+                new ThreadFactoryBuilder().setNameFormat("binlog-reader-" + 
subtaskId).build();
         this.executorService = 
Executors.newSingleThreadExecutor(threadFactory);
         this.currentTaskRunning = true;
         this.pureBinlogPhaseTables = new HashSet<>();
@@ -185,14 +198,10 @@ public class BinlogSplitReader implements 
DebeziumReader<SourceRecords, MySqlSpl
     @Override
     public void close() {
         try {
-            if (statefulTaskContext.getConnection() != null) {
-                statefulTaskContext.getConnection().close();
-            }
-            if (statefulTaskContext.getBinaryLogClient() != null) {
-                statefulTaskContext.getBinaryLogClient().disconnect();
-            }
-
             stopBinlogReadTask();
+            if (statefulTaskContext != null) {
+                statefulTaskContext.close();
+            }
             if (executorService != null) {
                 executorService.shutdown();
                 if (!executorService.awaitTermination(READER_CLOSE_TIMEOUT, 
TimeUnit.SECONDS)) {
@@ -201,7 +210,6 @@ public class BinlogSplitReader implements 
DebeziumReader<SourceRecords, MySqlSpl
                             READER_CLOSE_TIMEOUT);
                 }
             }
-            statefulTaskContext.getDatabaseSchema().close();
         } catch (Exception e) {
             LOG.error("Close binlog reader error", e);
         }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java
index f37428f8e..b86e6e527 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.SignalEventDisp
 import 
org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask;
 import 
org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask;
 import 
org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
+import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
 import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
 import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
 import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
@@ -66,6 +67,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createBinaryClient;
+import static 
org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createMySqlConnection;
+
 /**
  * A snapshot reader that reads data from Table in split level, the split is 
assigned by primary key
  * range.
@@ -93,6 +97,17 @@ public class SnapshotSplitReader implements 
DebeziumReader<SourceRecords, MySqlS
 
     private static final long READER_CLOSE_TIMEOUT = 30L;
 
+    public SnapshotSplitReader(
+            MySqlSourceConfig sourceConfig, int subtaskId, SnapshotPhaseHooks 
hooks) {
+        this(
+                new StatefulTaskContext(
+                        sourceConfig,
+                        createBinaryClient(sourceConfig.getDbzConfiguration()),
+                        createMySqlConnection(sourceConfig)),
+                subtaskId,
+                hooks);
+    }
+
     public SnapshotSplitReader(
             StatefulTaskContext statefulTaskContext, int subtaskId, 
SnapshotPhaseHooks hooks) {
         this.statefulTaskContext = statefulTaskContext;
@@ -148,7 +163,6 @@ public class SnapshotSplitReader implements 
DebeziumReader<SourceRecords, MySqlS
                         // Step 2: read binlog events between low and high 
watermark and backfill
                         // changes into snapshot
                         backfill(snapshotResult, sourceContext);
-
                     } catch (Exception e) {
                         setReadException(e);
                     } finally {
@@ -378,14 +392,8 @@ public class SnapshotSplitReader implements 
DebeziumReader<SourceRecords, MySqlS
     public void close() {
         try {
             stopCurrentTask();
-            if (statefulTaskContext.getConnection() != null) {
-                statefulTaskContext.getConnection().close();
-            }
-            if (statefulTaskContext.getBinaryLogClient() != null) {
-                statefulTaskContext.getBinaryLogClient().disconnect();
-            }
-            if (statefulTaskContext.getDatabaseSchema() != null) {
-                statefulTaskContext.getDatabaseSchema().close();
+            if (statefulTaskContext != null) {
+                statefulTaskContext.close();
             }
             if (executorService != null) {
                 executorService.shutdown();
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java
index fbce9cdd7..09a5fe8e8 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java
@@ -71,7 +71,7 @@ import static 
org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffsetUt
  * <p>The offset change and schema change should record to MySqlSplitState 
when emit the record,
  * thus the Flink's state mechanism can help to store/restore when failover 
happens.
  */
-public class StatefulTaskContext {
+public class StatefulTaskContext implements AutoCloseable {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(StatefulTaskContext.class);
     private static final int DEFAULT_BINLOG_QUEUE_SIZE_IN_SNAPSHOT_SCAN = 1024;
@@ -300,6 +300,27 @@ public class StatefulTaskContext {
         return found;
     }
 
+    @Override
+    public void close() throws Exception {
+        // BinaryLogClient still tries to enqueue records, if this queue is 
full, it will
+        // lead to deadlock.
+        if (queue != null) {
+            queue.poll();
+        }
+        if (connection != null) {
+            connection.close();
+        }
+        if (binaryLogClient != null) {
+            binaryLogClient.disconnect();
+        }
+        if (databaseSchema != null) {
+            databaseSchema.close();
+        }
+        if (dispatcher != null) {
+            dispatcher.close();
+        }
+    }
+
     /** Copied from debezium for accessing here. */
     public static class MySqlEventMetadataProvider implements 
EventMetadataProvider {
         public static final String SERVER_ID_KEY = "server_id";
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSplitReader.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSplitReader.java
index 4a46b7dc3..e1178982b 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSplitReader.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSplitReader.java
@@ -17,11 +17,9 @@
 
 package org.apache.flink.cdc.connectors.mysql.source.reader;
 
-import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
 import org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader;
 import org.apache.flink.cdc.connectors.mysql.debezium.reader.DebeziumReader;
 import 
org.apache.flink.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader;
-import 
org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
 import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
 import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
 import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
@@ -35,8 +33,6 @@ import 
org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
 
-import com.github.shyiko.mysql.binlog.BinaryLogClient;
-import io.debezium.connector.mysql.MySqlConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -233,27 +229,14 @@ public class MySqlSplitReader implements 
SplitReader<SourceRecords, MySqlSplit>
 
     private SnapshotSplitReader getSnapshotSplitReader() {
         if (reusedSnapshotReader == null) {
-            final MySqlConnection jdbcConnection =
-                    DebeziumUtils.createMySqlConnection(sourceConfig);
-            final BinaryLogClient binaryLogClient =
-                    
DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
-            final StatefulTaskContext statefulTaskContext =
-                    new StatefulTaskContext(sourceConfig, binaryLogClient, 
jdbcConnection);
-            reusedSnapshotReader =
-                    new SnapshotSplitReader(statefulTaskContext, subtaskId, 
snapshotHooks);
+            reusedSnapshotReader = new SnapshotSplitReader(sourceConfig, 
subtaskId, snapshotHooks);
         }
         return reusedSnapshotReader;
     }
 
     private BinlogSplitReader getBinlogSplitReader() {
         if (reusedBinlogReader == null) {
-            final MySqlConnection jdbcConnection =
-                    DebeziumUtils.createMySqlConnection(sourceConfig);
-            final BinaryLogClient binaryLogClient =
-                    
DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
-            final StatefulTaskContext statefulTaskContext =
-                    new StatefulTaskContext(sourceConfig, binaryLogClient, 
jdbcConnection);
-            reusedBinlogReader = new BinlogSplitReader(statefulTaskContext, 
subtaskId);
+            reusedBinlogReader = new BinlogSplitReader(sourceConfig, 
subtaskId);
         }
         return reusedBinlogReader;
     }

Reply via email to