This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new afd4a74c0 [FLINK-35802][pipeline-connectors/mysql] Clean ChangeEventQueue to avoid deadlock when calling BinaryLogClient#disconnect method afd4a74c0 is described below commit afd4a74c0a3dc4f6353a37e875023a7f849a634e Author: Kunni <lvyanquan....@alibaba-inc.com> AuthorDate: Thu Jan 16 19:43:25 2025 +0800 [FLINK-35802][pipeline-connectors/mysql] Clean ChangeEventQueue to avoid deadlock when calling BinaryLogClient#disconnect method This closes #3463 --- .../mysql/debezium/reader/BinlogSplitReader.java | 28 ++++++++++++++-------- .../mysql/debezium/reader/SnapshotSplitReader.java | 26 +++++++++++++------- .../debezium/task/context/StatefulTaskContext.java | 23 +++++++++++++++++- .../mysql/source/reader/MySqlSplitReader.java | 21 ++-------------- 4 files changed, 59 insertions(+), 39 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index dee96802c..cd3c697e4 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -20,6 +20,7 @@ package org.apache.flink.cdc.connectors.mysql.debezium.reader; import org.apache.flink.cdc.common.annotation.VisibleForTesting; import org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask; import org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext; +import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset; import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo; import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit; @@ -61,6 +62,9 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; +import static org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createBinaryClient; +import static org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createMySqlConnection; + /** * A Debezium binlog reader implementation that also support reads binlog and filter overlapping * snapshot data that {@link SnapshotSplitReader} read. @@ -88,10 +92,19 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecords, MySqlSpl private static final long READER_CLOSE_TIMEOUT = 30L; - public BinlogSplitReader(StatefulTaskContext statefulTaskContext, int subTaskId) { + public BinlogSplitReader(MySqlSourceConfig sourceConfig, int subtaskId) { + this( + new StatefulTaskContext( + sourceConfig, + createBinaryClient(sourceConfig.getDbzConfiguration()), + createMySqlConnection(sourceConfig)), + subtaskId); + } + + public BinlogSplitReader(StatefulTaskContext statefulTaskContext, int subtaskId) { this.statefulTaskContext = statefulTaskContext; ThreadFactory threadFactory = - new ThreadFactoryBuilder().setNameFormat("binlog-reader-" + subTaskId).build(); + new ThreadFactoryBuilder().setNameFormat("binlog-reader-" + subtaskId).build(); this.executorService = Executors.newSingleThreadExecutor(threadFactory); this.currentTaskRunning = true; this.pureBinlogPhaseTables = new HashSet<>(); @@ -185,14 +198,10 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecords, MySqlSpl @Override public void close() { try { - if (statefulTaskContext.getConnection() != null) { - statefulTaskContext.getConnection().close(); - } - if (statefulTaskContext.getBinaryLogClient() != null) { - statefulTaskContext.getBinaryLogClient().disconnect(); - } - stopBinlogReadTask(); + if (statefulTaskContext != null) { + statefulTaskContext.close(); + } if (executorService != null) { executorService.shutdown(); if (!executorService.awaitTermination(READER_CLOSE_TIMEOUT, TimeUnit.SECONDS)) { @@ -201,7 +210,6 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecords, MySqlSpl READER_CLOSE_TIMEOUT); } } - statefulTaskContext.getDatabaseSchema().close(); } catch (Exception e) { LOG.error("Close binlog reader error", e); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java index f37428f8e..b86e6e527 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java @@ -22,6 +22,7 @@ import org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.SignalEventDisp import org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask; import org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask; import org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext; +import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset; import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit; import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit; @@ -66,6 +67,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import static org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createBinaryClient; +import static org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createMySqlConnection; + /** * A snapshot reader that reads data from Table in split level, the split is assigned by primary key * range. @@ -93,6 +97,17 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecords, MySqlS private static final long READER_CLOSE_TIMEOUT = 30L; + public SnapshotSplitReader( + MySqlSourceConfig sourceConfig, int subtaskId, SnapshotPhaseHooks hooks) { + this( + new StatefulTaskContext( + sourceConfig, + createBinaryClient(sourceConfig.getDbzConfiguration()), + createMySqlConnection(sourceConfig)), + subtaskId, + hooks); + } + public SnapshotSplitReader( StatefulTaskContext statefulTaskContext, int subtaskId, SnapshotPhaseHooks hooks) { this.statefulTaskContext = statefulTaskContext; @@ -148,7 +163,6 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecords, MySqlS // Step 2: read binlog events between low and high watermark and backfill // changes into snapshot backfill(snapshotResult, sourceContext); - } catch (Exception e) { setReadException(e); } finally { @@ -378,14 +392,8 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecords, MySqlS public void close() { try { stopCurrentTask(); - if (statefulTaskContext.getConnection() != null) { - statefulTaskContext.getConnection().close(); - } - if (statefulTaskContext.getBinaryLogClient() != null) { - statefulTaskContext.getBinaryLogClient().disconnect(); - } - if (statefulTaskContext.getDatabaseSchema() != null) { - statefulTaskContext.getDatabaseSchema().close(); + if (statefulTaskContext != null) { + statefulTaskContext.close(); } if (executorService != null) { executorService.shutdown(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java index fbce9cdd7..09a5fe8e8 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java @@ -71,7 +71,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffsetUt * <p>The offset change and schema change should record to MySqlSplitState when emit the record, * thus the Flink's state mechanism can help to store/restore when failover happens. */ -public class StatefulTaskContext { +public class StatefulTaskContext implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(StatefulTaskContext.class); private static final int DEFAULT_BINLOG_QUEUE_SIZE_IN_SNAPSHOT_SCAN = 1024; @@ -300,6 +300,27 @@ public class StatefulTaskContext { return found; } + @Override + public void close() throws Exception { + // BinaryLogClient still tries to enqueue records, if this queue is full, it will + // lead to deadlock. + if (queue != null) { + queue.poll(); + } + if (connection != null) { + connection.close(); + } + if (binaryLogClient != null) { + binaryLogClient.disconnect(); + } + if (databaseSchema != null) { + databaseSchema.close(); + } + if (dispatcher != null) { + dispatcher.close(); + } + } + /** Copied from debezium for accessing here. */ public static class MySqlEventMetadataProvider implements EventMetadataProvider { public static final String SERVER_ID_KEY = "server_id"; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSplitReader.java index 4a46b7dc3..e1178982b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSplitReader.java @@ -17,11 +17,9 @@ package org.apache.flink.cdc.connectors.mysql.source.reader; -import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils; import org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader; import org.apache.flink.cdc.connectors.mysql.debezium.reader.DebeziumReader; import org.apache.flink.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader; -import org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext; import org.apache.flink.cdc.connectors.mysql.source.MySqlSource; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit; @@ -35,8 +33,6 @@ import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; -import com.github.shyiko.mysql.binlog.BinaryLogClient; -import io.debezium.connector.mysql.MySqlConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -233,27 +229,14 @@ public class MySqlSplitReader implements SplitReader<SourceRecords, MySqlSplit> private SnapshotSplitReader getSnapshotSplitReader() { if (reusedSnapshotReader == null) { - final MySqlConnection jdbcConnection = - DebeziumUtils.createMySqlConnection(sourceConfig); - final BinaryLogClient binaryLogClient = - DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); - final StatefulTaskContext statefulTaskContext = - new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection); - reusedSnapshotReader = - new SnapshotSplitReader(statefulTaskContext, subtaskId, snapshotHooks); + reusedSnapshotReader = new SnapshotSplitReader(sourceConfig, subtaskId, snapshotHooks); } return reusedSnapshotReader; } private BinlogSplitReader getBinlogSplitReader() { if (reusedBinlogReader == null) { - final MySqlConnection jdbcConnection = - DebeziumUtils.createMySqlConnection(sourceConfig); - final BinaryLogClient binaryLogClient = - DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); - final StatefulTaskContext statefulTaskContext = - new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection); - reusedBinlogReader = new BinlogSplitReader(statefulTaskContext, subtaskId); + reusedBinlogReader = new BinlogSplitReader(sourceConfig, subtaskId); } return reusedBinlogReader; }