Copilot commented on code in PR #3666:
URL: https://github.com/apache/flink-cdc/pull/3666#discussion_r2964278202
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java:
##########
@@ -769,4 +784,27 @@ public <T extends DatabaseSchema<TableId>> Object
getColumnValue(
public String quotedTableIdString(TableId tableId) {
return tableId.toQuotedString('`');
}
+
+ public String getShowBinaryLogStatement() {
+ return showBinaryLogStatement;
+ }
+
+ private String probeShowBinaryLogStatement() {
+ LOGGER.info("Probing binary log statement.");
+ try {
+ // Attempt to query
+ query(MYSQL_NEW_SHOW_BINARY_LOG_STATEMENT, rs -> {});
+ LOGGER.info(
+ "Successfully found show binary log statement with `{}`.",
+ MYSQL_NEW_SHOW_BINARY_LOG_STATEMENT);
+ return MYSQL_NEW_SHOW_BINARY_LOG_STATEMENT;
+ } catch (SQLException e) {
+ LOGGER.info(
+ "Probing with {} failed, fallback to classic {}. Caused
by: {}",
+ MYSQL_NEW_SHOW_BINARY_LOG_STATEMENT,
+ MYSQL_CLASSIC_SHOW_BINARY_LOG_STATEMENT,
+ e.getMessage());
+ return MYSQL_CLASSIC_SHOW_BINARY_LOG_STATEMENT;
Review Comment:
`probeShowBinaryLogStatement()` swallows any `SQLException` (including
auth/connection failures) and falls back to `SHOW MASTER STATUS`. This can mask
real connection problems and also permanently select the legacy statement for
MySQL 8.4 if `SHOW BINARY LOG STATUS` fails for transient reasons or missing
privileges. Consider deferring probing until the first actual binlog-offset
read, and only falling back when the failure clearly indicates the statement is
unsupported (e.g., syntax/unknown command), otherwise rethrow.
```suggestion
if (showBinaryLogStatement == null) {
showBinaryLogStatement = probeShowBinaryLogStatement();
}
return showBinaryLogStatement;
}
private String probeShowBinaryLogStatement() {
LOGGER.info("Probing binary log statement.");
try {
// Attempt to query the new-style statement; if it works, prefer
it.
query(MYSQL_NEW_SHOW_BINARY_LOG_STATEMENT, rs -> {});
LOGGER.info(
"Successfully found show binary log statement with
`{}`.",
MYSQL_NEW_SHOW_BINARY_LOG_STATEMENT);
return MYSQL_NEW_SHOW_BINARY_LOG_STATEMENT;
} catch (SQLException e) {
final String sqlState = e.getSQLState();
final String message = e.getMessage() != null ? e.getMessage() :
"";
// Only fall back to the classic statement if the failure
clearly indicates
// that the new statement is not supported
(syntax/unknown-command).
final boolean syntaxOrUnknownCommand =
(sqlState != null && sqlState.startsWith("42"))
|| message.toLowerCase().contains("you have an
error in your sql syntax")
|| message.toLowerCase().contains("unknown
command")
|| message.toLowerCase().contains("unknown
statement");
if (syntaxOrUnknownCommand) {
LOGGER.info(
"Probing with {} failed due to unsupported syntax,
falling back to classic {}. Cause: {}",
MYSQL_NEW_SHOW_BINARY_LOG_STATEMENT,
MYSQL_CLASSIC_SHOW_BINARY_LOG_STATEMENT,
message);
return MYSQL_CLASSIC_SHOW_BINARY_LOG_STATEMENT;
}
// For all other SQLExceptions (auth, connection, privileges,
etc.), do not mask
// the problem as a fallback; surface it to the caller.
LOGGER.warn(
"Probing show binary log statement with {} failed due to
an unexpected error; "
+ "not falling back to {}.",
MYSQL_NEW_SHOW_BINARY_LOG_STATEMENT,
MYSQL_CLASSIC_SHOW_BINARY_LOG_STATEMENT,
e);
throw new DebeziumException(
"Error while probing show binary log statement using '"
+ MYSQL_NEW_SHOW_BINARY_LOG_STATEMENT + "'", e);
```
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineCompatibilityITCase.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.source.FlinkSourceProvider;
+import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory;
+import
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
+import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedClass;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Stream;
+
+import static
org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD;
+import static
org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration tests to check MySQL pipeline connector works well with
different MySQL server
+ * versions.
+ */
+@ParameterizedClass
+@EnumSource(
+ value = MySqlVersion.class,
+ names = {"V5_7", "V8_0", "V8_4"})
+class MySqlPipelineCompatibilityITCase {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(MySqlPipelineCompatibilityITCase.class);
+
+ private static Path tempFolder;
+ private static File resourceFolder;
+
+ private final MySqlVersion version;
+ private final MySqlContainer mySqlContainer;
+ private final UniqueDatabase testDatabase;
+
+ private final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment();
+
+ MySqlPipelineCompatibilityITCase(MySqlVersion version) {
+ this.version = version;
+ this.mySqlContainer =
+ (MySqlContainer)
+ new MySqlContainer(version)
+
.withConfigurationOverride(buildCustomMySqlConfig(version))
+ .withSetupSQL("docker/setup.sql")
+ .withDatabaseName("flink-test")
+ .withUsername("flinkuser")
+ .withPassword("flinkpw")
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+ this.testDatabase =
+ new UniqueDatabase(mySqlContainer, "inventory", TEST_USER,
TEST_PASSWORD);
+ }
+
+ @BeforeEach
+ void setup() throws Exception {
+ // Initialize static resources if needed
+ if (resourceFolder == null) {
+ resourceFolder =
+ Paths.get(
+ Objects.requireNonNull(
+
MySqlPipelineCompatibilityITCase.class
+ .getClassLoader()
+ .getResource("."))
+ .toURI())
+ .toFile();
+ tempFolder = Files.createTempDirectory(resourceFolder.toPath(),
"mysql-config");
+ }
+
+ env.setParallelism(4);
+ env.enableCheckpointing(200);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+
+ LOG.info("Starting container for MySQL {}...", version.getVersion());
+ Startables.deepStart(Stream.of(mySqlContainer)).join();
+ LOG.info("Container is started.");
+
+ testDatabase.createAndInitialize();
+ }
+
+ @AfterEach
+ void tearDown() {
+ testDatabase.dropDatabase();
+ if (mySqlContainer != null) {
+ LOG.info("Stopping container for MySQL {}...",
version.getVersion());
+ mySqlContainer.stop();
+ LOG.info("Container is stopped.");
Review Comment:
`tearDown()` stops the container only after `testDatabase.dropDatabase()`,
but `dropDatabase()` throws on failure. If dropping the DB fails, the container
won't be stopped and the test can leak resources. Use try/finally to ensure
`mySqlContainer.stop()` always runs, and optionally log (not rethrow) drop
failures during cleanup.
```suggestion
try {
testDatabase.dropDatabase();
} catch (Exception e) {
LOG.warn("Failed to drop test database during teardown.", e);
} finally {
if (mySqlContainer != null) {
LOG.info("Stopping container for MySQL {}...",
version.getVersion());
mySqlContainer.stop();
LOG.info("Container is stopped.");
}
```
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlCompatibilityITCase.java:
##########
@@ -56,85 +59,78 @@
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase.assertEqualsInOrder;
/** Integration tests to check mysql-cdc works well with different MySQL
server version. */
+@ParameterizedClass
+@EnumSource(
+ value = MySqlVersion.class,
+ names = {"V5_7", "V8_0", "V8_4"})
class MySqlCompatibilityITCase {
private static final Logger LOG =
LoggerFactory.getLogger(MySqlCompatibilityITCase.class);
- private Path tempFolder;
+ private static Path tempFolder;
private static File resourceFolder;
+ private final MySqlVersion version;
+ private final MySqlContainer mySqlContainer;
+ private final UniqueDatabase testDatabase;
+
private final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
private final StreamTableEnvironment tEnv =
StreamTableEnvironment.create(
env,
EnvironmentSettings.newInstance().inStreamingMode().build());
- @BeforeEach
- public void setup() throws Exception {
- resourceFolder =
- Paths.get(
- Objects.requireNonNull(
- MySqlValidatorTest.class
- .getClassLoader()
- .getResource("."))
- .toURI())
- .toFile();
- env.setParallelism(4);
- env.enableCheckpointing(200);
- tempFolder = Files.createTempDirectory(resourceFolder.toPath(),
"mysql-config");
- }
-
- @Test
- void testMySqlV56() throws Exception {
- testDifferentMySqlVersion(MySqlVersion.V5_6, false);
- }
-
- @Test
- void testMySqlV56WithGtidModeOn() throws Exception {
- testDifferentMySqlVersion(MySqlVersion.V5_6, true);
- }
-
- @Test
- void testMySqlV57() throws Exception {
- testDifferentMySqlVersion(MySqlVersion.V5_7, false);
- }
-
- @Test
- void testMySqlV57WithGtidModeOn() throws Exception {
- testDifferentMySqlVersion(MySqlVersion.V5_7, true);
- }
-
- @Test
- void testMySqlV8() throws Exception {
- testDifferentMySqlVersion(MySqlVersion.V8_0, false);
- }
-
- @Test
- void testMySqlV8WithGtidModeOn() throws Exception {
- testDifferentMySqlVersion(MySqlVersion.V8_0, true);
- }
-
- private void testDifferentMySqlVersion(MySqlVersion version, boolean
enableGtid)
- throws Exception {
- final MySqlContainer mySqlContainer =
+ MySqlCompatibilityITCase(MySqlVersion version) {
+ this.version = version;
+ this.mySqlContainer =
(MySqlContainer)
new MySqlContainer(version)
- .withConfigurationOverride(
- buildCustomMySqlConfig(version,
enableGtid))
+
.withConfigurationOverride(buildCustomMySqlConfig(version))
.withSetupSQL("docker/setup.sql")
.withDatabaseName("flink-test")
.withUsername("flinkuser")
.withPassword("flinkpw")
.withLogConsumer(new Slf4jLogConsumer(LOG));
+ this.testDatabase = new UniqueDatabase(mySqlContainer, "inventory",
"mysqluser", "mysqlpw");
+ }
- LOG.info("Starting containers...");
+ @BeforeEach
+ void setup() throws Exception {
+ // Initialize static resources if needed
+ if (resourceFolder == null) {
+ resourceFolder =
+ Paths.get(
+ Objects.requireNonNull(
+ MySqlValidatorTest.class
+ .getClassLoader()
+ .getResource("."))
+ .toURI())
+ .toFile();
+ tempFolder = Files.createTempDirectory(resourceFolder.toPath(),
"mysql-config");
+ }
+
+ env.setParallelism(4);
+ env.enableCheckpointing(200);
+
+ LOG.info("Starting container for MySQL {}...", version.getVersion());
Startables.deepStart(Stream.of(mySqlContainer)).join();
- LOG.info("Containers are started.");
+ LOG.info("Container is started.");
- UniqueDatabase testDatabase =
- new UniqueDatabase(mySqlContainer, "inventory", "mysqluser",
"mysqlpw");
testDatabase.createAndInitialize();
+ }
+
+ @AfterEach
+ void tearDown() {
+ testDatabase.dropDatabase();
+ if (mySqlContainer != null) {
+ LOG.info("Stopping container for MySQL {}...",
version.getVersion());
+ mySqlContainer.stop();
+ LOG.info("Container is stopped.");
Review Comment:
`tearDown()` calls `testDatabase.dropDatabase()` before stopping the
container, but `dropDatabase()` throws `IllegalStateException` on failure. If
dropping the DB fails, the container stop won't run and the test may leak
resources. Wrap cleanup in try/finally (stop the container in `finally`) and
consider swallowing/logging drop failures so teardown can proceed.
```suggestion
try {
testDatabase.dropDatabase();
} catch (IllegalStateException e) {
LOG.warn(
"Failed to drop test database during tearDown;
continuing with container shutdown.",
e);
} finally {
if (mySqlContainer != null) {
LOG.info("Stopping container for MySQL {}...",
version.getVersion());
mySqlContainer.stop();
LOG.info("Container is stopped.");
}
```
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java:
##########
@@ -0,0 +1,717 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at
http://www.apache.org/licenses/LICENSE-2.0
+ */
+
+package io.debezium.connector.mysql;
+
+import io.debezium.DebeziumException;
+import io.debezium.connector.SnapshotRecord;
+import io.debezium.connector.mysql.legacy.MySqlJdbcContext.DatabaseLocales;
+import io.debezium.data.Envelope;
+import io.debezium.function.BlockingConsumer;
+import io.debezium.pipeline.EventDispatcher;
+import io.debezium.relational.RelationalSnapshotChangeEventSource;
+import io.debezium.relational.RelationalTableFilters;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableId;
+import io.debezium.schema.SchemaChangeEvent;
+import io.debezium.util.Clock;
+import io.debezium.util.Collect;
+import io.debezium.util.Strings;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Copied from Debezium project(1.9.8.Final) to fix MySQL 8.x compatibility.
+ *
+ * <p>Line 338: Use probing methods to determine the statement.
+ */
+public class MySqlSnapshotChangeEventSource
+ extends RelationalSnapshotChangeEventSource<MySqlPartition,
MySqlOffsetContext> {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(MySqlSnapshotChangeEventSource.class);
+
+ private final MySqlConnectorConfig connectorConfig;
+ private final MySqlConnection connection;
+ private long globalLockAcquiredAt = -1;
+ private long tableLockAcquiredAt = -1;
+ private final RelationalTableFilters filters;
+ private final MySqlSnapshotChangeEventSourceMetrics metrics;
+ private final MySqlDatabaseSchema databaseSchema;
+ private final List<SchemaChangeEvent> schemaEvents = new ArrayList<>();
+ private Set<TableId> delayedSchemaSnapshotTables = Collections.emptySet();
+ private final BlockingConsumer<Function<SourceRecord, SourceRecord>>
lastEventProcessor;
+
+ public MySqlSnapshotChangeEventSource(
+ MySqlConnectorConfig connectorConfig,
+ MySqlConnection connection,
+ MySqlDatabaseSchema schema,
+ EventDispatcher<MySqlPartition, TableId> dispatcher,
+ Clock clock,
+ MySqlSnapshotChangeEventSourceMetrics metrics,
+ BlockingConsumer<Function<SourceRecord, SourceRecord>>
lastEventProcessor) {
+ super(connectorConfig, connection, schema, dispatcher, clock, metrics);
+ this.connectorConfig = connectorConfig;
+ this.connection = connection;
+ this.filters = connectorConfig.getTableFilters();
+ this.metrics = metrics;
+ this.databaseSchema = schema;
+ this.lastEventProcessor = lastEventProcessor;
+ }
+
+ @Override
+ protected SnapshottingTask getSnapshottingTask(
+ MySqlPartition partition, MySqlOffsetContext previousOffset) {
+ boolean snapshotSchema = true;
+ boolean snapshotData = true;
+
+ // found a previous offset and the earlier snapshot has completed
+ if (previousOffset != null && !previousOffset.isSnapshotRunning()) {
+ LOGGER.info(
+ "A previous offset indicating a completed snapshot has
been found. Neither schema nor data will be snapshotted.");
+ snapshotSchema = databaseSchema.isStorageInitializationExecuted();
+ snapshotData = false;
+ } else {
+ LOGGER.info("No previous offset has been found");
+ if (connectorConfig.getSnapshotMode().includeData()) {
+ LOGGER.info(
+ "According to the connector configuration both schema
and data will be snapshotted");
+ } else {
+ LOGGER.info(
+ "According to the connector configuration only schema
will be snapshotted");
+ }
+ snapshotData = connectorConfig.getSnapshotMode().includeData();
+ snapshotSchema = connectorConfig.getSnapshotMode().includeSchema();
+ }
+
+ return new SnapshottingTask(snapshotSchema, snapshotData);
+ }
+
+ @Override
+ protected SnapshotContext<MySqlPartition, MySqlOffsetContext>
prepare(MySqlPartition partition)
+ throws Exception {
+ return new MySqlSnapshotContext(partition);
+ }
+
+ @Override
+ protected void connectionCreated(
+ RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext>
snapshotContext)
+ throws Exception {}
+
+ @Override
+ protected Set<TableId> getAllTableIds(
+ RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> ctx)
throws Exception {
+ // -------------------
+ // READ DATABASE NAMES
+ // -------------------
+ // Get the list of databases ...
+ LOGGER.info("Read list of available databases");
+ final List<String> databaseNames = new ArrayList<>();
+ connection.query(
+ "SHOW DATABASES",
+ rs -> {
+ while (rs.next()) {
+ databaseNames.add(rs.getString(1));
+ }
+ });
+ LOGGER.info("\t list of available databases is: {}", databaseNames);
+
+ // ----------------
+ // READ TABLE NAMES
+ // ----------------
+ // Get the list of table IDs for each database. We can't use a
prepared statement with
+ // MySQL, so we have to
+ // build the SQL statement each time. Although in other cases this
might lead to SQL
+ // injection, in our case
+ // we are reading the database names from the database and not taking
them from the user ...
+ LOGGER.info("Read list of available tables in each database");
+ final Set<TableId> tableIds = new HashSet<>();
+ final Set<String> readableDatabaseNames = new HashSet<>();
+ for (String dbName : databaseNames) {
+ try {
+ // MySQL sometimes considers some local files as databases
(see DBZ-164),
+ // so we will simply try each one and ignore the problematic
ones ...
+ connection.query(
+ "SHOW FULL TABLES IN " + quote(dbName) + " where
Table_Type = 'BASE TABLE'",
+ rs -> {
+ while (rs.next()) {
+ TableId id = new TableId(dbName, null,
rs.getString(1));
+ tableIds.add(id);
+ }
+ });
+ readableDatabaseNames.add(dbName);
+ } catch (SQLException e) {
+ // We were unable to execute the query or process the results,
so skip this ...
+ LOGGER.warn(
+ "\t skipping database '{}' due to error reading
tables: {}",
+ dbName,
+ e.getMessage());
+ }
+ }
+ final Set<String> includedDatabaseNames =
+ readableDatabaseNames.stream()
+ .filter(filters.databaseFilter())
+ .collect(Collectors.toSet());
+ LOGGER.info("\tsnapshot continuing with database(s): {}",
includedDatabaseNames);
+ return tableIds;
+ }
+
+ @Override
+ protected void lockTablesForSchemaSnapshot(
+ ChangeEventSourceContext sourceContext,
+ RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext>
snapshotContext)
+ throws SQLException, InterruptedException {
+ // Set the transaction isolation level to REPEATABLE READ. This is the
default, but the
+ // default can be changed
+ // which is why we explicitly set it here.
+ //
+ // With REPEATABLE READ, all SELECT queries within the scope of a
transaction (which we
+ // don't yet have) will read
+ // from the same MVCC snapshot. Thus each plain (non-locking) SELECT
statements within the
+ // same transaction are
+ // consistent also with respect to each other.
+ //
+ // See: https://dev.mysql.com/doc/refman/5.7/en/set-transaction.html
+ // See:
https://dev.mysql.com/doc/refman/5.7/en/innodb-transaction-isolation-levels.html
+ // See:
https://dev.mysql.com/doc/refman/5.7/en/innodb-consistent-read.html
+
connection.connection().setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
+ connection.executeWithoutCommitting(
+ "SET SESSION lock_wait_timeout="
+ + connectorConfig.snapshotLockTimeout().getSeconds());
+ try {
+ connection.executeWithoutCommitting(
+ "SET SESSION innodb_lock_wait_timeout="
+ +
connectorConfig.snapshotLockTimeout().getSeconds());
+ } catch (SQLException e) {
+ LOGGER.warn("Unable to set innodb_lock_wait_timeout", e);
+ }
+
+ // ------------------------------------
+ // LOCK TABLES
+ // ------------------------------------
+ // Obtain read lock on all tables. This statement closes all open
tables and locks all
+ // tables
+ // for all databases with a global read lock, and it prevents ALL
updates while we have this
+ // lock.
+ // It also ensures that everything we do while we have this lock will
be consistent.
+ if (connectorConfig.getSnapshotLockingMode().usesLocking()
+ && connectorConfig.useGlobalLock()) {
+ try {
+ globalLock();
+ metrics.globalLockAcquired();
+ } catch (SQLException e) {
+ LOGGER.info(
+ "Unable to flush and acquire global read lock, will
use table read locks after reading table names");
+ // Continue anyway, since RDS (among others) don't allow
setting a global lock
+ assert !isGloballyLocked();
+ }
+ if
(connectorConfig.getSnapshotLockingMode().flushResetsIsolationLevel()) {
+ // FLUSH TABLES resets TX and isolation level
+ connection.executeWithoutCommitting(
+ "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ");
+ }
+ }
+ }
+
+ @Override
+ protected void releaseSchemaSnapshotLocks(
+ RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext>
snapshotContext)
+ throws SQLException {
+ if (connectorConfig.getSnapshotLockingMode().usesMinimalLocking()) {
+ if (isGloballyLocked()) {
+ globalUnlock();
+ }
+ if (isTablesLocked()) {
+ // We could not acquire a global read lock and instead had to
obtain individual
+ // table-level read locks
+ // using 'FLUSH TABLE <tableName> WITH READ LOCK'. However, if
we were to do this,
+ // the 'UNLOCK TABLES'
+ // would implicitly commit our active transaction, and this
would break our
+ // consistent snapshot logic.
+ // Therefore, we cannot unlock the tables here!
+ // https://dev.mysql.com/doc/refman/5.7/en/flush.html
+ LOGGER.warn(
+ "Tables were locked explicitly, but to get a
consistent snapshot we cannot release the locks until we've read all tables.");
+ }
+ }
+ }
+
+ @Override
+ protected void releaseDataSnapshotLocks(
+ RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext>
snapshotContext)
+ throws Exception {
+ if (isGloballyLocked()) {
+ globalUnlock();
+ }
+ if (isTablesLocked()) {
+ tableUnlock();
+ if (!delayedSchemaSnapshotTables.isEmpty()) {
+ schemaEvents.clear();
+ createSchemaEventsForTables(snapshotContext,
delayedSchemaSnapshotTables, false);
+
+ for (Iterator<SchemaChangeEvent> i = schemaEvents.iterator();
i.hasNext(); ) {
+ final SchemaChangeEvent event = i.next();
+
+ if (databaseSchema.storeOnlyCapturedTables()
+ && event.getDatabase() != null
+ && event.getDatabase().length() != 0
+ && !connectorConfig
+ .getTableFilters()
+ .databaseFilter()
+ .test(event.getDatabase())) {
+ LOGGER.debug(
+ "Skipping schema event as it belongs to a
non-captured database: '{}'",
+ event);
+ continue;
+ }
+
+ LOGGER.debug("Processing schema event {}", event);
+
+ final TableId tableId =
+ event.getTables().isEmpty()
+ ? null
+ : event.getTables().iterator().next().id();
+ snapshotContext.offset.event(tableId,
getClock().currentTime());
+
+ if (!i.hasNext()) {
+ super.lastSnapshotRecord(snapshotContext);
+ }
+
+ dispatcher.dispatchSchemaChangeEvent(
+ snapshotContext.partition,
+ tableId,
+ (receiver) -> receiver.schemaChangeEvent(event));
+ }
+
+ // Make schema available for snapshot source
+ databaseSchema
+ .tableIds()
+ .forEach(
+ x ->
+ snapshotContext.tables.overwriteTable(
+ databaseSchema.tableFor(x)));
+ }
+ }
+ }
+
+ @Override
+ protected void determineSnapshotOffset(
+ RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> ctx,
+ MySqlOffsetContext previousOffset)
+ throws Exception {
+ if (!isGloballyLocked()
+ && !isTablesLocked()
+ && connectorConfig.getSnapshotLockingMode().usesLocking()) {
+ return;
+ }
+ if (previousOffset != null) {
+ ctx.offset = previousOffset;
+ tryStartingSnapshot(ctx);
+ return;
+ }
+ final MySqlOffsetContext offsetContext =
MySqlOffsetContext.initial(connectorConfig);
+ ctx.offset = offsetContext;
+ LOGGER.info("Read binlog position of MySQL primary server");
+ final String showMasterStmt = connection.getShowBinaryLogStatement();
+ connection.query(
+ showMasterStmt,
+ rs -> {
+ if (rs.next()) {
+ final String binlogFilename = rs.getString(1);
+ final long binlogPosition = rs.getLong(2);
+ offsetContext.setBinlogStartPoint(binlogFilename,
binlogPosition);
+ if (rs.getMetaData().getColumnCount() > 4) {
+ // This column exists only in MySQL 5.6.5 or later
...
+ final String gtidSet =
+ rs.getString(
+ 5); // GTID set, may be null,
blank, or contain a GTID
+ // set
+ offsetContext.setCompletedGtidSet(gtidSet);
+ LOGGER.info(
+ "\t using binlog '{}' at position '{}' and
gtid '{}'",
+ binlogFilename,
+ binlogPosition,
+ gtidSet);
+ } else {
+ LOGGER.info(
+ "\t using binlog '{}' at position '{}'",
+ binlogFilename,
+ binlogPosition);
+ }
+ } else {
+ throw new DebeziumException(
+ "Cannot read the binlog filename and position
via '"
+ + showMasterStmt
+ + "'. Make sure your server is
correctly configured");
+ }
+ });
+ tryStartingSnapshot(ctx);
+ }
+
+ private void addSchemaEvent(
+ RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext>
snapshotContext,
+ String database,
+ String ddl) {
+ schemaEvents.addAll(
+ databaseSchema.parseSnapshotDdl(
+ snapshotContext.partition,
+ ddl,
+ database,
+ snapshotContext.offset,
+ clock.currentTimeAsInstant()));
+ }
+
+ @Override
+ protected void readTableStructure(
+ ChangeEventSourceContext sourceContext,
+ RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext>
snapshotContext,
+ MySqlOffsetContext offsetContext)
+ throws Exception {
+ Set<TableId> capturedSchemaTables;
+ if (twoPhaseSchemaSnapshot()) {
+ // Capture schema of captured tables after they are locked
+ tableLock(snapshotContext);
+ determineSnapshotOffset(snapshotContext, offsetContext);
+ capturedSchemaTables = snapshotContext.capturedTables;
+ LOGGER.info(
+ "Table level locking is in place, the schema will be
capture in two phases, now capturing: {}",
+ capturedSchemaTables);
+ delayedSchemaSnapshotTables =
+ Collect.minus(
+ snapshotContext.capturedSchemaTables,
snapshotContext.capturedTables);
+ LOGGER.info("Tables for delayed schema capture: {}",
delayedSchemaSnapshotTables);
+ }
+ if (databaseSchema.storeOnlyCapturedTables()) {
+ capturedSchemaTables = snapshotContext.capturedTables;
+ LOGGER.info(
+ "Only captured tables schema should be captured,
capturing: {}",
+ capturedSchemaTables);
+ } else {
+ capturedSchemaTables = snapshotContext.capturedSchemaTables;
+ LOGGER.info(
+ "All eligible tables schema should be captured, capturing:
{}",
+ capturedSchemaTables);
+ }
+ final Map<String, List<TableId>> tablesToRead =
+ capturedSchemaTables.stream()
+ .collect(
+ Collectors.groupingBy(
+ TableId::catalog, LinkedHashMap::new,
Collectors.toList()));
+ final Set<String> databases = tablesToRead.keySet();
+
+ // Record default charset
+ addSchemaEvent(
+ snapshotContext,
+ "",
+
connection.setStatementFor(connection.readMySqlCharsetSystemVariables()));
+
+ for (TableId tableId : capturedSchemaTables) {
+ if (!sourceContext.isRunning()) {
+ throw new InterruptedException(
+ "Interrupted while emitting initial DROP TABLE
events");
+ }
+ addSchemaEvent(
+ snapshotContext, tableId.catalog(), "DROP TABLE IF EXISTS
" + quote(tableId));
+ }
+
+ final Map<String, DatabaseLocales> databaseCharsets =
connection.readDatabaseCollations();
+ for (String database : databases) {
+ if (!sourceContext.isRunning()) {
+ throw new InterruptedException(
+ "Interrupted while reading structure of schema " +
databases);
+ }
+
+ LOGGER.info("Reading structure of database '{}'", database);
+ addSchemaEvent(snapshotContext, database, "DROP DATABASE IF EXISTS
" + quote(database));
+ final StringBuilder createDatabaseDddl =
+ new StringBuilder("CREATE DATABASE " + quote(database));
+ final DatabaseLocales defaultDatabaseLocales =
databaseCharsets.get(database);
+ if (defaultDatabaseLocales != null) {
+ defaultDatabaseLocales.appendToDdlStatement(database,
createDatabaseDddl);
+ }
+ addSchemaEvent(snapshotContext, database,
createDatabaseDddl.toString());
+ addSchemaEvent(snapshotContext, database, "USE " +
quote(database));
+
+ createSchemaEventsForTables(snapshotContext,
tablesToRead.get(database), true);
+ }
+ }
+
+ void createSchemaEventsForTables(
+ RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext>
snapshotContext,
+ final Collection<TableId> tablesToRead,
+ final boolean firstPhase)
+ throws SQLException {
+ for (TableId tableId : tablesToRead) {
+ if (firstPhase && delayedSchemaSnapshotTables.contains(tableId)) {
+ continue;
+ }
+ connection.query(
+ "SHOW CREATE TABLE " + quote(tableId),
+ rs -> {
+ if (rs.next()) {
+ addSchemaEvent(snapshotContext, tableId.catalog(),
rs.getString(2));
+ }
+ });
+ }
+ }
+
+ private boolean twoPhaseSchemaSnapshot() {
+ return connectorConfig.getSnapshotLockingMode().usesLocking() &&
!isGloballyLocked();
+ }
+
+ @Override
+ protected SchemaChangeEvent getCreateTableEvent(
+ RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext>
snapshotContext,
+ Table table)
+ throws SQLException {
+ return SchemaChangeEvent.ofSnapshotCreate(
+ snapshotContext.partition,
+ snapshotContext.offset,
+ snapshotContext.catalogName,
+ table);
+ }
+
+ @Override
+ protected void complete(SnapshotContext<MySqlPartition,
MySqlOffsetContext> snapshotContext) {}
+
+ /**
+ * Generate a valid MySQL query string for the specified table and columns
+ *
+ * @param tableId the table to generate a query for
+ * @return a valid query string
+ */
+ @Override
+ protected Optional<String> getSnapshotSelect(
+ RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext>
snapshotContext,
+ TableId tableId,
+ List<String> columns) {
+ String snapshotSelectColumns =
columns.stream().collect(Collectors.joining(", "));
+
+ return Optional.of(
+ String.format(
+ "SELECT %s FROM `%s`.`%s`",
+ snapshotSelectColumns, tableId.catalog(),
tableId.table()));
+ }
+
+ private boolean isGloballyLocked() {
+ return globalLockAcquiredAt != -1;
+ }
+
+ private boolean isTablesLocked() {
+ return tableLockAcquiredAt != -1;
+ }
+
+ private void globalLock() throws SQLException {
+ LOGGER.info("Flush and obtain global read lock to prevent writes to
database");
+ connection.executeWithoutCommitting(
+ connectorConfig.getSnapshotLockingMode().getLockStatement());
+ globalLockAcquiredAt = clock.currentTimeInMillis();
+ }
+
+ private void globalUnlock() throws SQLException {
+ LOGGER.info("Releasing global read lock to enable MySQL writes");
+ connection.executeWithoutCommitting("UNLOCK TABLES");
+ long lockReleased = clock.currentTimeInMillis();
+ metrics.globalLockReleased();
+ LOGGER.info(
+ "Writes to MySQL tables prevented for a total of {}",
+ Strings.duration(lockReleased - globalLockAcquiredAt));
+ globalLockAcquiredAt = -1;
+ }
+
+ private void tableLock(
+ RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext>
snapshotContext)
+ throws SQLException {
+ // ------------------------------------
+ // LOCK TABLES and READ BINLOG POSITION
+ // ------------------------------------
+ // We were not able to acquire the global read lock, so instead we
have to obtain a read
+ // lock on each table.
+ // This requires different privileges than normal, and also means we
can't unlock the tables
+ // without
+ // implicitly committing our transaction ...
+ if (!connection.userHasPrivileges("LOCK TABLES")) {
+ // We don't have the right privileges
+ throw new DebeziumException(
+ "User does not have the 'LOCK TABLES' privilege required
to obtain a "
+ + "consistent snapshot by preventing concurrent
writes to tables.");
+ }
+ // We have the required privileges, so try to lock all of the tables
we're interested in ...
+ LOGGER.info(
+ "Flush and obtain read lock for {} tables (preventing writes)",
+ snapshotContext.capturedTables);
+ if (!snapshotContext.capturedTables.isEmpty()) {
+ final String tableList =
+ snapshotContext.capturedTables.stream()
+ .map(tid -> quote(tid))
+ .collect(Collectors.joining(","));
+ connection.executeWithoutCommitting("FLUSH TABLES " + tableList +
" WITH READ LOCK");
+ }
+ tableLockAcquiredAt = clock.currentTimeInMillis();
+ metrics.globalLockAcquired();
+ }
+
+ private void tableUnlock() throws SQLException {
+ LOGGER.info("Releasing table read lock to enable MySQL writes");
+ connection.executeWithoutCommitting("UNLOCK TABLES");
+ long lockReleased = clock.currentTimeInMillis();
+ metrics.globalLockReleased();
+ LOGGER.info(
+ "Writes to MySQL tables prevented for a total of {}",
+ Strings.duration(lockReleased - tableLockAcquiredAt));
+ tableLockAcquiredAt = -1;
+ }
+
+ private String quote(String dbOrTableName) {
+ return "`" + dbOrTableName + "`";
+ }
+
+ private String quote(TableId id) {
+ return quote(id.catalog()) + "." + quote(id.table());
+ }
+
+ @Override
+ protected OptionalLong rowCountForTable(TableId tableId) {
+ return connection.getEstimatedTableSize(tableId);
+ }
+
+ @Override
+ protected Statement readTableStatement(OptionalLong rowCount) throws
SQLException {
+ final long largeTableRowCount =
connectorConfig.rowCountForLargeTable();
+ if (!rowCount.isPresent()
+ || largeTableRowCount == 0
+ || rowCount.getAsLong() <= largeTableRowCount) {
+ return super.readTableStatement(rowCount);
+ }
+ return createStatementWithLargeResultSet();
+ }
+
+ /**
+ * Create a JDBC statement that can be used for large result sets.
+ *
+ * <p>By default, the MySQL Connector/J driver retrieves all rows for
ResultSets and stores them
+ * in memory. In most cases this is the most efficient way to operate and,
due to the design of
+ * the MySQL network protocol, is easier to implement. However, when
ResultSets that have a
+ * large number of rows or large values, the driver may not be able to
allocate heap space in
+ * the JVM and may result in an {@link OutOfMemoryError}. See <a
+ * href="https://issues.jboss.org/browse/DBZ-94">DBZ-94</a> for details.
+ *
+ * <p>This method handles such cases using the <a
+ *
href="https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html">recommended
+ * technique</a> for MySQL by creating the JDBC {@link Statement} with
{@link
+ * ResultSet#TYPE_FORWARD_ONLY forward-only} cursor and {@link
ResultSet#CONCUR_READ_ONLY
+ * read-only concurrency} flags, and with a {@link Integer#MIN_VALUE
minimum value} {@link
+ * Statement#setFetchSize(int) fetch size hint}.
+ *
+ * @return the statement; never null
+ * @throws SQLException if there is a problem creating the statement
+ */
+ private Statement createStatementWithLargeResultSet() throws SQLException {
+ int fetchSize = connectorConfig.getSnapshotFetchSize();
+ Statement stmt =
+ connection
+ .connection()
+ .createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY);
+ stmt.setFetchSize(fetchSize);
+ return stmt;
+ }
+
+ /** Mutable context which is populated in the course of snapshotting. */
+ private static class MySqlSnapshotContext
+ extends RelationalSnapshotContext<MySqlPartition,
MySqlOffsetContext> {
+
+ public MySqlSnapshotContext(MySqlPartition partition) throws
SQLException {
+ super(partition, "");
+ }
+ }
+
+ @Override
+ protected void createSchemaChangeEventsForTables(
+ ChangeEventSourceContext sourceContext,
+ RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext>
snapshotContext,
+ SnapshottingTask snapshottingTask)
+ throws Exception {
+ tryStartingSnapshot(snapshotContext);
+
+ for (Iterator<SchemaChangeEvent> i = schemaEvents.iterator();
i.hasNext(); ) {
+ final SchemaChangeEvent event = i.next();
+ if (!sourceContext.isRunning()) {
+ throw new InterruptedException("Interrupted while processing
event " + event);
+ }
+
+ if (databaseSchema.skipSchemaChangeEvent(event)) {
+ continue;
+ }
+
+ LOGGER.debug("Processing schema event {}", event);
+
+ final TableId tableId =
+ event.getTables().isEmpty() ? null :
event.getTables().iterator().next().id();
+ snapshotContext.offset.event(tableId, getClock().currentTime());
+
+ // If data are not snapshotted then the last schema change must
set last snapshot flag
+ if (!snapshottingTask.snapshotData() && !i.hasNext()) {
+ lastSnapshotRecord(snapshotContext);
+ }
+ dispatcher.dispatchSchemaChangeEvent(
+ snapshotContext.partition,
+ tableId,
+ (receiver) -> receiver.schemaChangeEvent(event));
+ }
+
+ // Make schema available for snapshot source
+ databaseSchema
+ .tableIds()
+ .forEach(x ->
snapshotContext.tables.overwriteTable(databaseSchema.tableFor(x)));
+ }
+
+ @Override
+ protected void lastSnapshotRecord(
+ RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext>
snapshotContext) {
+ if (delayedSchemaSnapshotTables.isEmpty()) {
+ super.lastSnapshotRecord(snapshotContext);
+ }
+ }
+
+ @Override
+ protected void postSnapshot() throws InterruptedException {
+ // We cannot be sure that the last event as the last one
+ // - last table could be empty
+ // - data snapshot was not executed
+ // - the last table schema snaphsotted is not monitored and storing of
monitored is disabled
Review Comment:
Spelling/grammar in this comment block is off (e.g., "schema snaphsotted" ->
"schema snapshotted", and "the last event as the last one"). Please fix to keep
the copied Debezium code comments clear and professional.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]