This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ae981df67 [feature][connector] add mysql cdc reader (#3455)
ae981df67 is described below
commit ae981df6751ecaad9c19d336f2ae7d428c1ada99
Author: ic4y <[email protected]>
AuthorDate: Thu Nov 17 17:41:18 2022 +0800
[feature][connector] add mysql cdc reader (#3455)
* [feature][connector] add mysql cdc reader
* Update seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/pom.xml
Co-authored-by: hailin0 <[email protected]>
Co-authored-by: hailin0 <[email protected]>
---
LICENSE | 1 +
.../connector-cdc/connector-cdc-base/pom.xml | 1 -
.../cdc/base/config/JdbcSourceConfigFactory.java | 182 +++++++++
.../cdc/base/option/JdbcSourceOptions.java | 107 +++++
.../external/IncrementalSourceStreamFetcher.java | 52 +--
.../{ => connector-cdc-mysql}/pom.xml | 41 +-
.../cdc/mysql/config/MySqlSourceConfig.java | 86 ++++
.../cdc/mysql/config/MySqlSourceConfigFactory.java | 123 ++++++
.../seatunnel/cdc/mysql/config/ServerIdRange.java | 112 ++++++
.../seatunnel/cdc/mysql/source/MySqlDialect.java | 116 ++++++
.../mysql/source/MysqlPooledDataSourceFactory.java | 37 ++
.../cdc/mysql/source/offset/BinlogOffset.java | 211 ++++++++++
.../reader/fetch/MySqlSourceFetchTaskContext.java | 350 +++++++++++++++++
.../reader/fetch/binlog/MySqlBinlogFetchTask.java | 74 ++++
.../reader/fetch/scan/MySqlSnapshotFetchTask.java | 67 ++++
.../fetch/scan/MySqlSnapshotSplitReadTask.java | 371 ++++++++++++++++++
.../SnapshotSplitChangeEventSourceContext.java | 48 +++
.../cdc/mysql/utils/MySqlConnectionUtils.java | 175 +++++++++
.../seatunnel/cdc/mysql/utils/MySqlTypeUtils.java | 159 ++++++++
.../seatunnel/cdc/mysql/utils/MySqlUtils.java | 432 +++++++++++++++++++++
.../cdc/mysql/utils/TableDiscoveryUtils.java | 90 +++++
seatunnel-connectors-v2/connector-cdc/pom.xml | 2 +
22 files changed, 2779 insertions(+), 58 deletions(-)
diff --git a/LICENSE b/LICENSE
index 0abce8328..6d2243fa1 100644
--- a/LICENSE
+++ b/LICENSE
@@ -216,6 +216,7 @@
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connec
seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/
from https://github.com/apache/flink
seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/
from https://github.com/apache/iceberg
seatunnel-connectors-v2/connector-cdc/connector-base/src/main/java/org/apache/seatunnel/connectors/cdc/base
from https://github.com/ververica/flink-cdc-connectors
+seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql
from https://github.com/ververica/flink-cdc-connectors
seatunnel-connectors-v2/connector-cdc/connector-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium
from https://github.com/ververica/flink-cdc-connectors
generate_client_protocol.sh
from
https://github.com/hazelcast/hazelcast
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
from https://github.com/hazelcast/hazelcast
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/pom.xml
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/pom.xml
index feff219d1..cc7ff4991 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/pom.xml
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/pom.xml
@@ -29,7 +29,6 @@
<artifactId>connector-cdc-base</artifactId>
<properties>
- <debezium.version>1.6.4.Final</debezium.version>
<hikaricp.version>4.0.3</hikaricp.version>
</properties>
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
new file mode 100644
index 000000000..041999adb
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.config;
+
+import org.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
+import org.seatunnel.connectors.cdc.base.option.SourceOptions;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+/** A {@link SourceConfig.Factory} to provide {@link SourceConfig} of JDBC
data source. */
+@SuppressWarnings("checkstyle:MagicNumber")
+public abstract class JdbcSourceConfigFactory implements
SourceConfig.Factory<JdbcSourceConfig> {
+
+ private static final long serialVersionUID = 1L;
+
+ protected int port;
+ protected String hostname;
+ protected String username;
+ protected String password;
+ protected List<String> databaseList;
+ protected List<String> tableList;
+ protected StartupConfig startupConfig;
+ protected StopConfig stopConfig;
+ protected boolean includeSchemaChanges = false;
+ protected double distributionFactorUpper = 1000.0d;
+ protected double distributionFactorLower = 0.05d;
+ protected int splitSize = SourceOptions.SNAPSHOT_SPLIT_SIZE.defaultValue();
+ protected int fetchSize = SourceOptions.SNAPSHOT_FETCH_SIZE.defaultValue();
+ protected String serverTimeZone =
JdbcSourceOptions.SERVER_TIME_ZONE.defaultValue();
+ protected Duration connectTimeout =
JdbcSourceOptions.CONNECT_TIMEOUT.defaultValue();
+ protected int connectMaxRetries =
JdbcSourceOptions.CONNECT_MAX_RETRIES.defaultValue();
+ protected int connectionPoolSize =
JdbcSourceOptions.CONNECTION_POOL_SIZE.defaultValue();
+ protected Properties dbzProperties;
+
+ /** Integer port number of the database server. */
+ public JdbcSourceConfigFactory hostname(String hostname) {
+ this.hostname = hostname;
+ return this;
+ }
+
+ /** Integer port number of the database server. */
+ public JdbcSourceConfigFactory port(int port) {
+ this.port = port;
+ return this;
+ }
+
+ /**
+ * An optional list of regular expressions that match database names to be
monitored; any
+ * database name not included in the whitelist will be excluded from
monitoring. By default all
+ * databases will be monitored.
+ */
+ public JdbcSourceConfigFactory databaseList(String... databaseList) {
+ this.databaseList = Arrays.asList(databaseList);
+ return this;
+ }
+
+ /**
+ * An optional list of regular expressions that match fully-qualified
table identifiers for
+ * tables to be monitored; any table not included in the list will be
excluded from monitoring.
+ * Each identifier is of the form databaseName.tableName. by default the
connector will monitor
+ * every non-system table in each monitored database.
+ */
+ public JdbcSourceConfigFactory tableList(String... tableList) {
+ this.tableList = Arrays.asList(tableList);
+ return this;
+ }
+
+ /** Name of the user to use when connecting to the database server. */
+ public JdbcSourceConfigFactory username(String username) {
+ this.username = username;
+ return this;
+ }
+
+ /** Password to use when connecting to the database server. */
+ public JdbcSourceConfigFactory password(String password) {
+ this.password = password;
+ return this;
+ }
+
+ /**
+ * The session time zone in database server, e.g. "America/Los_Angeles".
It controls how the
+ * TIMESTAMP type converted to STRING. See more
+ *
https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-temporal-types
+ */
+ public JdbcSourceConfigFactory serverTimeZone(String timeZone) {
+ this.serverTimeZone = timeZone;
+ return this;
+ }
+
+ /**
+ * The split size (number of rows) of table snapshot, captured tables are
split into multiple
+ * splits when read the snapshot of table.
+ */
+ public JdbcSourceConfigFactory splitSize(int splitSize) {
+ this.splitSize = splitSize;
+ return this;
+ }
+
+ /**
+ * The upper bound of split key evenly distribution factor, the factor is
used to determine
+ * whether the table is evenly distribution or not.
+ */
+ public JdbcSourceConfigFactory distributionFactorUpper(double
distributionFactorUpper) {
+ this.distributionFactorUpper = distributionFactorUpper;
+ return this;
+ }
+
+ /**
+ * The lower bound of split key evenly distribution factor, the factor is
used to determine
+ * whether the table is evenly distribution or not.
+ */
+ public JdbcSourceConfigFactory distributionFactorLower(double
distributionFactorLower) {
+ this.distributionFactorLower = distributionFactorLower;
+ return this;
+ }
+
+ /** The maximum fetch size for per poll when read table snapshot. */
+ public JdbcSourceConfigFactory fetchSize(int fetchSize) {
+ this.fetchSize = fetchSize;
+ return this;
+ }
+
+ /**
+ * The maximum time that the connector should wait after trying to connect
to the database
+ * server before timing out.
+ */
+ public JdbcSourceConfigFactory connectTimeout(Duration connectTimeout) {
+ this.connectTimeout = connectTimeout;
+ return this;
+ }
+
+ /** The connection pool size. */
+ public JdbcSourceConfigFactory connectionPoolSize(int connectionPoolSize) {
+ this.connectionPoolSize = connectionPoolSize;
+ return this;
+ }
+
+ /** The max retry times to get connection. */
+ public JdbcSourceConfigFactory connectMaxRetries(int connectMaxRetries) {
+ this.connectMaxRetries = connectMaxRetries;
+ return this;
+ }
+
+ /** Whether the {@link SourceConfig} should output the schema changes or
not. */
+ public JdbcSourceConfigFactory includeSchemaChanges(boolean
includeSchemaChanges) {
+ this.includeSchemaChanges = includeSchemaChanges;
+ return this;
+ }
+
+ /** The Debezium connector properties. For example, "snapshot.mode". */
+ public JdbcSourceConfigFactory debeziumProperties(Properties properties) {
+ this.dbzProperties = properties;
+ return this;
+ }
+
+ /** Specifies the startup options. */
+ public JdbcSourceConfigFactory startupOptions(StartupConfig startupConfig)
{
+ this.startupConfig = startupConfig;
+ return this;
+ }
+
+ @Override
+ public abstract JdbcSourceConfig create(int subtask);
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
new file mode 100644
index 000000000..9f387dea7
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.option;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import org.seatunnel.connectors.cdc.base.source.IncrementalSource;
+
+import java.time.Duration;
+
+/** Configurations for {@link IncrementalSource} of JDBC data source. */
+@SuppressWarnings("checkstyle:MagicNumber")
+public class JdbcSourceOptions extends SourceOptions {
+
+ public static final Option<String> HOSTNAME =
+ Options.key("hostname")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("IP address or hostname of the database
server.");
+
+ public static final Option<Integer> PORT =
+ Options.key("port")
+ .intType()
+ .defaultValue(3306)
+ .withDescription("Integer port number of the database
server.");
+
+ public static final Option<String> USERNAME =
+ Options.key("username")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Name of the database to use when connecting to
the database server.");
+
+ public static final Option<String> PASSWORD =
+ Options.key("password")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Password to use when connecting to the
database server.");
+
+ public static final Option<String> DATABASE_NAME =
+ Options.key("database-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Database name of the database to
monitor.");
+
+ public static final Option<String> TABLE_NAME =
+ Options.key("table-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Table name of the database to monitor.");
+
+ public static final Option<String> SERVER_TIME_ZONE =
+ Options.key("server-time-zone")
+ .stringType()
+ .defaultValue("UTC")
+ .withDescription("The session time zone in database
server.");
+
+ public static final Option<String> SERVER_ID =
+ Options.key("server-id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "A numeric ID or a numeric ID range of this
database client, "
+ + "The numeric ID syntax is like '5400',
the numeric ID range syntax "
+ + "is like '5400-5408', The numeric ID
range syntax is recommended when "
+ + "'scan.incremental.snapshot.enabled'
enabled. Every ID must be unique across all "
+ + "currently-running database processes in
the MySQL cluster. This connector"
+ + " joins the MySQL cluster as another
server (with this unique ID) "
+ + "so it can read the binlog. By default,
a random number is generated between"
+ + " 5400 and 6400, though we recommend
setting an explicit value.");
+
+ public static final Option<Duration> CONNECT_TIMEOUT =
+ Options.key("connect.timeout")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(30))
+ .withDescription(
+ "The maximum time that the connector should wait
after trying to connect to the database server before timing out.");
+
+ public static final Option<Integer> CONNECTION_POOL_SIZE =
+ Options.key("connection.pool.size")
+ .intType()
+ .defaultValue(20)
+ .withDescription("The connection pool size.");
+
+ public static final Option<Integer> CONNECT_MAX_RETRIES =
+ Options.key("connect.max-retries")
+ .intType()
+ .defaultValue(3)
+ .withDescription(
+ "The max retry times that the connector should
retry to build database server connection.");
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
index 6b6e22104..99d468da2 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
@@ -22,7 +22,6 @@ import org.apache.seatunnel.common.utils.SeaTunnelException;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
-import io.debezium.relational.TableId;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.source.SourceRecord;
import org.seatunnel.connectors.cdc.base.source.offset.Offset;
@@ -31,12 +30,8 @@ import
org.seatunnel.connectors.cdc.base.source.split.SourceRecords;
import org.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
@@ -49,8 +44,6 @@ import java.util.concurrent.TimeUnit;
public class IncrementalSourceStreamFetcher implements Fetcher<SourceRecords,
SourceSplitBase> {
private final FetchTask.Context taskContext;
private final ExecutorService executorService;
- private final Set<TableId> pureStreamPhaseTables;
-
private volatile ChangeEventQueue<DataChangeEvent> queue;
private volatile Throwable readException;
@@ -58,7 +51,7 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
private IncrementalSplit currentIncrementalSplit;
- private Map<TableId, Offset> maxSplitHighWatermarkMap;
+ private Offset splitStartWatermark;
private static final long READER_CLOSE_TIMEOUT_SECONDS = 30L;
@@ -67,7 +60,6 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
ThreadFactory threadFactory =
new ThreadFactoryBuilder().setNameFormat("debezium-reader-" +
subTaskId).build();
this.executorService =
Executors.newSingleThreadExecutor(threadFactory);
- this.pureStreamPhaseTables = new HashSet<>();
}
@Override
@@ -143,55 +135,17 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
/**
* Returns the record should emit or not.
- *
- * <p>The watermark signal algorithm is the incremental split reader only
sends the change event that
- * belongs to its finished snapshot splits. For each snapshot split, the
change event is valid
- * since the offset is after its high watermark.
- *
- * <pre> E.g: the data input is :
- * snapshot-split-0 info : [0, 1024) highWatermark0
- * snapshot-split-1 info : [1024, 2048) highWatermark1
- * the data output is:
- * only the change event belong to [0, 1024) and offset is after
highWatermark0 should send,
- * only the change event belong to [1024, 2048) and offset is after
highWatermark1 should send.
- * </pre>
*/
private boolean shouldEmit(SourceRecord sourceRecord) {
if (taskContext.isDataChangeRecord(sourceRecord)) {
- TableId tableId = taskContext.getTableId(sourceRecord);
Offset position = taskContext.getStreamOffset(sourceRecord);
- return hasEnterPureStreamPhase(tableId, position);
+ return position.isAtOrAfter(splitStartWatermark);
// TODO only the table who captured snapshot splits need to
filter( Used to support Exactly-Once )
- // not in the monitored splits scope, do not emit
}
- // always send the schema change event and signal event
- // we need record them to state of SeaTunnel
return true;
}
- private boolean hasEnterPureStreamPhase(TableId tableId, Offset position) {
- if (pureStreamPhaseTables.contains(tableId)) {
- return true;
- }
- // the existed tables those have finished snapshot reading
- if (maxSplitHighWatermarkMap.containsKey(tableId)
- && position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) {
- pureStreamPhaseTables.add(tableId);
- return true;
- }
-
- return !maxSplitHighWatermarkMap.containsKey(tableId)
- && taskContext.getTableFilter().isIncluded(tableId);
- }
-
private void configureFilter() {
- Map<TableId, Offset> tableIdOffsetPositionMap = new HashMap<>();
- // latest-offset mode
-
- for (TableId tableId : currentIncrementalSplit.getTableIds()) {
- tableIdOffsetPositionMap.put(tableId,
currentIncrementalSplit.getStartupOffset());
- }
- this.maxSplitHighWatermarkMap = tableIdOffsetPositionMap;
- this.pureStreamPhaseTables.clear();
+ splitStartWatermark = currentIncrementalSplit.getStartupOffset();
}
}
diff --git a/seatunnel-connectors-v2/connector-cdc/pom.xml
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/pom.xml
similarity index 53%
copy from seatunnel-connectors-v2/connector-cdc/pom.xml
copy to seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/pom.xml
index cf6a2e6c0..e5ec3a4b4 100644
--- a/seatunnel-connectors-v2/connector-cdc/pom.xml
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/pom.xml
@@ -21,18 +21,43 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <artifactId>seatunnel-connectors-v2</artifactId>
+ <artifactId>connector-cdc</artifactId>
<groupId>org.apache.seatunnel</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>connector-cdc</artifactId>
- <packaging>pom</packaging>
+ <artifactId>connector-cdc-mysql</artifactId>
- <properties>
- </properties>
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-cdc-base</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.debezium</groupId>
+ <artifactId>debezium-connector-mysql</artifactId>
+ </dependency>
+ </dependencies>
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-cdc-base</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>io.debezium</groupId>
+ <artifactId>debezium-connector-mysql</artifactId>
+ <version>${debezium.version}</version>
+ <scope>compile</scope>
+ </dependency>
+
+ </dependencies>
+ </dependencyManagement>
- <modules>
- <module>connector-cdc-base</module>
- </modules>
</project>
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfig.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfig.java
new file mode 100644
index 000000000..317bc5b8a
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfig.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config;
+
+import io.debezium.connector.mysql.MySqlConnectorConfig;
+import io.debezium.relational.RelationalTableFilters;
+import org.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
+import org.seatunnel.connectors.cdc.base.config.StartupConfig;
+import org.seatunnel.connectors.cdc.base.config.StopConfig;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Describes the connection information of the Mysql database and the
configuration information for
+ * performing snapshotting and streaming reading, such as splitSize.
+ */
+public class MySqlSourceConfig extends JdbcSourceConfig {
+
+ private static final long serialVersionUID = 1L;
+
+ public MySqlSourceConfig(
+ StartupConfig startupConfig,
+ StopConfig stopConfig,
+ List<String> databaseList,
+ List<String> tableList,
+ int splitSize,
+ double distributionFactorUpper,
+ double distributionFactorLower,
+ Properties dbzProperties,
+ String driverClassName,
+ String hostname,
+ int port,
+ String username,
+ String password,
+ int fetchSize,
+ String serverTimeZone,
+ Duration connectTimeout,
+ int connectMaxRetries,
+ int connectionPoolSize) {
+ super(
+ startupConfig,
+ stopConfig,
+ databaseList,
+ tableList,
+ splitSize,
+ distributionFactorUpper,
+ distributionFactorLower,
+ dbzProperties,
+ driverClassName,
+ hostname,
+ port,
+ username,
+ password,
+ fetchSize,
+ serverTimeZone,
+ connectTimeout,
+ connectMaxRetries,
+ connectionPoolSize);
+ }
+
+ @Override
+ public MySqlConnectorConfig getDbzConnectorConfig() {
+ return new MySqlConnectorConfig(getDbzConfiguration());
+ }
+
+ public RelationalTableFilters getTableFilters() {
+ return getDbzConnectorConfig().getTableFilters();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
new file mode 100644
index 000000000..25d427a9b
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import io.debezium.config.Configuration;
+import io.debezium.connector.mysql.MySqlConnectorConfig;
+import org.seatunnel.connectors.cdc.base.config.JdbcSourceConfigFactory;
+
+import java.util.Properties;
+
+/** A factory to initialize {@link MySqlSourceConfig}. */
+public class MySqlSourceConfigFactory extends JdbcSourceConfigFactory {
+
+ private ServerIdRange serverIdRange;
+
+ /**
+ * A numeric ID or a numeric ID range of this database client, The numeric
ID syntax is like
+ * '5400', the numeric ID range syntax is like '5400-5408', The numeric ID
range syntax is
+ * required when 'scan.incremental.snapshot.enabled' enabled. Every ID
must be unique across all
+ * currently-running database processes in the MySQL cluster. This
connector joins the MySQL
+ * cluster as another server (with this unique ID) so it can read the
binlog. By default, a
+ * random number is generated between 5400 and 6400, though we recommend
setting an explicit
+ * value."
+ */
+ public MySqlSourceConfigFactory serverId(String serverId) {
+ this.serverIdRange = ServerIdRange.from(serverId);
+ return this;
+ }
+
+ /** Creates a new {@link MySqlSourceConfig} for the given subtask {@code
subtaskId}. */
+ public MySqlSourceConfig create(int subtaskId) {
+ Properties props = new Properties();
+ // hard code server name, because we don't need to distinguish it,
docs:
+ // Logical name that identifies and provides a namespace for the
particular
+ // MySQL database server/cluster being monitored. The logical name
should be
+ // unique across all other connectors, since it is used as a prefix
for all
+ // Kafka topic names emanating from this connector.
+ // Only alphanumeric characters and underscores should be used.
+ props.setProperty("database.server.name", "mysql_binlog_source");
+ props.setProperty("database.hostname", checkNotNull(hostname));
+ props.setProperty("database.user", checkNotNull(username));
+ props.setProperty("database.password", checkNotNull(password));
+ props.setProperty("database.port", String.valueOf(port));
+ props.setProperty("database.fetchSize", String.valueOf(fetchSize));
+ props.setProperty("database.responseBuffering", "adaptive");
+ props.setProperty("database.serverTimezone", serverTimeZone);
+
+ props.setProperty("connect.timeout.ms",
String.valueOf(connectTimeout.toMillis()));
+ // the underlying debezium reader should always capture the schema
changes and forward them.
+ // Note: the includeSchemaChanges parameter is used to control
emitting the schema record,
+ // only DataStream API program need to emit the schema record, the
Table API need not
+
+ //TODO Not yet supported
+ props.setProperty("include.schema.changes", String.valueOf(false));
+ // disable the offset flush totally
+ props.setProperty("offset.flush.interval.ms",
String.valueOf(Long.MAX_VALUE));
+ // disable tombstones
+ props.setProperty("tombstones.on.delete", String.valueOf(false));
+ // debezium use "long" mode to handle unsigned bigint by default,
+ // but it'll cause lose of precise when the value is larger than 2^63,
+ // so use "precise" mode to avoid it.
+ props.put("bigint.unsigned.handling.mode", "precise");
+
+ if (serverIdRange != null) {
+ props.setProperty("database.server.id.range",
String.valueOf(serverIdRange));
+ int serverId = serverIdRange.getServerId(subtaskId);
+ props.setProperty("database.server.id", String.valueOf(serverId));
+ }
+ if (databaseList != null) {
+ props.setProperty("database.include.list", String.join(",",
databaseList));
+ }
+ if (tableList != null) {
+ props.setProperty("table.include.list", String.join(",",
tableList));
+ }
+ if (serverTimeZone != null) {
+ props.setProperty("database.serverTimezone", serverTimeZone);
+ }
+
+ // override the user-defined debezium properties
+ if (dbzProperties != null) {
+ dbzProperties.forEach(props::put);
+ }
+
+ Configuration dbzConfiguration = Configuration.from(props);
+ String driverClassName =
dbzConfiguration.getString(MySqlConnectorConfig.JDBC_DRIVER);
+ return new MySqlSourceConfig(
+ startupConfig,
+ stopConfig,
+ databaseList,
+ tableList,
+ splitSize,
+ distributionFactorUpper,
+ distributionFactorLower,
+ props,
+ driverClassName,
+ hostname,
+ port,
+ username,
+ password,
+ fetchSize,
+ serverTimeZone,
+ connectTimeout,
+ connectMaxRetries,
+ connectionPoolSize);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/ServerIdRange.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/ServerIdRange.java
new file mode 100644
index 000000000..cc36d8f18
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/ServerIdRange.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
+
+import java.io.Serializable;
+
+/**
+ * This class defines a range of server id. The boundaries of the range are
inclusive.
+ *
+ * @see JdbcSourceOptions#SERVER_ID
+ */
+public class ServerIdRange implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ /** Start of the range (inclusive). */
+ private final int startServerId;
+
+ /** End of the range (inclusive). */
+ private final int endServerId;
+
+ public ServerIdRange(int startServerId, int endServerId) {
+ this.startServerId = startServerId;
+ this.endServerId = endServerId;
+ }
+
+ public int getStartServerId() {
+ return startServerId;
+ }
+
+ public int getEndServerId() {
+ return endServerId;
+ }
+
+ public int getServerId(int subTaskId) {
+ checkArgument(subTaskId >= 0, "Subtask ID %s shouldn't be a negative
number.", subTaskId);
+ if (subTaskId > getNumberOfServerIds()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Subtask ID %s is out of server id range %s, "
+ + "please adjust the server id range to "
+ + "make the number of server id larger
than "
+ + "the source parallelism.",
+ subTaskId, this));
+ }
+ return startServerId + subTaskId;
+ }
+
+ public int getNumberOfServerIds() {
+ return endServerId - startServerId + 1;
+ }
+
+ @Override
+ public String toString() {
+ if (startServerId == endServerId) {
+ return String.valueOf(startServerId);
+ } else {
+ return startServerId + "-" + endServerId;
+ }
+ }
+
+ /**
+ * Returns a {@link ServerIdRange} from a server id range string which
likes '5400-5408' or a
+ * single server id likes '5400'.
+ */
+ public static ServerIdRange from(String range) {
+ if (range == null) {
+ return null;
+ }
+ if (range.contains("-")) {
+ String[] idArray = range.split("-");
+ if (idArray.length != 2) {
+ throw new IllegalArgumentException(
+ String.format(
+ "The server id range should be syntax like
'5400-5500', but got: %s",
+ range));
+ }
+ return new ServerIdRange(
+ parseServerId(idArray[0].trim()),
parseServerId(idArray[1].trim()));
+ } else {
+ int serverId = parseServerId(range);
+ return new ServerIdRange(serverId, serverId);
+ }
+ }
+
+ private static int parseServerId(String serverIdValue) {
+ try {
+ return Integer.parseInt(serverIdValue);
+ } catch (NumberFormatException e) {
+ throw new IllegalStateException(
+ String.format("The server id %s is not a valid numeric.",
serverIdValue), e);
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
new file mode 100644
index 000000000..d7cf8c3db
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;
+
+import static
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createBinaryClient;
+import static
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createMySqlConnection;
+import static
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.isTableIdCaseSensitive;
+
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfigFactory;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.MySqlSourceFetchTaskContext;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.binlog.MySqlBinlogFetchTask;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.scan.MySqlSnapshotFetchTask;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.TableDiscoveryUtils;
+
+import com.github.shyiko.mysql.binlog.BinaryLogClient;
+import io.debezium.connector.mysql.MySqlConnection;
+import io.debezium.connector.mysql.legacy.MySqlSchema;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.TableId;
+import org.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
+import org.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
+import
org.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
+import
org.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
+import org.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
+import org.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+
+import java.sql.SQLException;
+import java.util.List;
+
+/** The {@link JdbcDataSourceDialect} implementation for MySQL datasource. */
+
+public class MySqlDialect implements JdbcDataSourceDialect {
+
+ private static final long serialVersionUID = 1L;
+ private final MySqlSourceConfigFactory configFactory;
+ private final MySqlSourceConfig sourceConfig;
+ private transient MySqlSchema mySqlSchema;
+
+ public MySqlDialect(MySqlSourceConfigFactory configFactory) {
+ this.configFactory = configFactory;
+ this.sourceConfig = configFactory.create(0);
+ }
+
+ @Override
+ public String getName() {
+ return "MySQL";
+ }
+
+ @Override
+ public boolean isDataCollectionIdCaseSensitive(JdbcSourceConfig
sourceConfig) {
+ try (JdbcConnection jdbcConnection = openJdbcConnection(sourceConfig))
{
+ return isTableIdCaseSensitive(jdbcConnection);
+ } catch (SQLException e) {
+ throw new SeaTunnelException("Error reading MySQL variables: " +
e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public ChunkSplitter createChunkSplitter(JdbcSourceConfig sourceConfig) {
+ //TODO waiting for other pr
+ return null;
+ }
+
+ @Override
+ public JdbcConnectionPoolFactory getPooledDataSourceFactory() {
+ return new MysqlPooledDataSourceFactory();
+ }
+
+ @Override
+ public List<TableId> discoverDataCollections(JdbcSourceConfig
sourceConfig) {
+ MySqlSourceConfig mySqlSourceConfig = (MySqlSourceConfig) sourceConfig;
+ try (JdbcConnection jdbcConnection = openJdbcConnection(sourceConfig))
{
+ return TableDiscoveryUtils.listTables(
+ jdbcConnection, mySqlSourceConfig.getTableFilters());
+ } catch (SQLException e) {
+ throw new SeaTunnelException("Error to discover tables: " +
e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public MySqlSourceFetchTaskContext createFetchTaskContext(
+ SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig) {
+ final MySqlConnection jdbcConnection =
+ createMySqlConnection(taskSourceConfig.getDbzConfiguration());
+ final BinaryLogClient binaryLogClient =
+ createBinaryClient(taskSourceConfig.getDbzConfiguration());
+ return new MySqlSourceFetchTaskContext(
+ taskSourceConfig, this, jdbcConnection, binaryLogClient);
+ }
+
+ @Override
+ public FetchTask<SourceSplitBase> createFetchTask(SourceSplitBase
sourceSplitBase) {
+ if (sourceSplitBase.isSnapshotSplit()) {
+ return new
MySqlSnapshotFetchTask(sourceSplitBase.asSnapshotSplit());
+ } else {
+ return new
MySqlBinlogFetchTask(sourceSplitBase.asIncrementalSplit());
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MysqlPooledDataSourceFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MysqlPooledDataSourceFactory.java
new file mode 100644
index 000000000..9f65815f3
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MysqlPooledDataSourceFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;
+
+import org.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
+import
org.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
+
+/** A MySQL datasource factory. */
+
+public class MysqlPooledDataSourceFactory extends JdbcConnectionPoolFactory {
+
+ public static final String JDBC_URL_PATTERN =
+
"jdbc:mysql://%s:%s/?useInformationSchema=true&nullCatalogMeansCurrent=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL";
+
+ @Override
+ public String getJdbcUrl(JdbcSourceConfig sourceConfig) {
+ String hostName = sourceConfig.getHostname();
+ int port = sourceConfig.getPort();
+
+ return String.format(JDBC_URL_PATTERN, hostName, port);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/offset/BinlogOffset.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/offset/BinlogOffset.java
new file mode 100644
index 000000000..050f5c31c
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/offset/BinlogOffset.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset;
+
+import io.debezium.connector.mysql.GtidSet;
+import org.apache.commons.lang3.StringUtils;
+import org.seatunnel.connectors.cdc.base.source.offset.Offset;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A structure describes a fine grained offset in a binlog event including
binlog position and gtid
+ * set etc.
+ *
+ * <p>This structure can also be used to deal the binlog event in transaction,
a transaction may
+ * contains multiple change events, and each change event may contain multiple
rows. When restart
+ * from a specific {@link BinlogOffset}, we need to skip the processed change
events and the
+ * processed rows.
+ */
+public class BinlogOffset extends Offset {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final String BINLOG_FILENAME_OFFSET_KEY = "file";
+ public static final String BINLOG_POSITION_OFFSET_KEY = "pos";
+ public static final String EVENTS_TO_SKIP_OFFSET_KEY = "event";
+ public static final String ROWS_TO_SKIP_OFFSET_KEY = "row";
+ public static final String GTID_SET_KEY = "gtids";
+ public static final String TIMESTAMP_KEY = "ts_sec";
+ public static final String SERVER_ID_KEY = "server_id";
+
+ public static final BinlogOffset INITIAL_OFFSET = new BinlogOffset("", 0);
+ public static final BinlogOffset NO_STOPPING_OFFSET = new BinlogOffset("",
Long.MIN_VALUE);
+
+ public BinlogOffset(Map<String, String> offset) {
+ this.offset = offset;
+ }
+
+ public BinlogOffset(String filename, long position) {
+ this(filename, position, 0L, 0L, 0L, null, null);
+ }
+
+ public BinlogOffset(
+ String filename,
+ long position,
+ long restartSkipEvents,
+ long restartSkipRows,
+ long binlogEpochSecs,
+ String restartGtidSet,
+ Integer serverId) {
+ Map<String, String> offsetMap = new HashMap<>();
+ offsetMap.put(BINLOG_FILENAME_OFFSET_KEY, filename);
+ offsetMap.put(BINLOG_POSITION_OFFSET_KEY, String.valueOf(position));
+ offsetMap.put(EVENTS_TO_SKIP_OFFSET_KEY,
String.valueOf(restartSkipEvents));
+ offsetMap.put(ROWS_TO_SKIP_OFFSET_KEY,
String.valueOf(restartSkipRows));
+ offsetMap.put(TIMESTAMP_KEY, String.valueOf(binlogEpochSecs));
+ if (restartGtidSet != null) {
+ offsetMap.put(GTID_SET_KEY, restartGtidSet);
+ }
+ if (serverId != null) {
+ offsetMap.put(SERVER_ID_KEY, String.valueOf(serverId));
+ }
+ this.offset = offsetMap;
+ }
+
+ public String getFilename() {
+ return offset.get(BINLOG_FILENAME_OFFSET_KEY);
+ }
+
+ public long getPosition() {
+ return longOffsetValue(offset, BINLOG_POSITION_OFFSET_KEY);
+ }
+
+ public long getRestartSkipEvents() {
+ return longOffsetValue(offset, EVENTS_TO_SKIP_OFFSET_KEY);
+ }
+
+ public long getRestartSkipRows() {
+ return longOffsetValue(offset, ROWS_TO_SKIP_OFFSET_KEY);
+ }
+
+ public String getGtidSet() {
+ return offset.get(GTID_SET_KEY);
+ }
+
+ public long getTimestamp() {
+ return longOffsetValue(offset, TIMESTAMP_KEY);
+ }
+
+ public Long getServerId() {
+ return longOffsetValue(offset, SERVER_ID_KEY);
+ }
+
+ /**
+ * This method is inspired by {@link
io.debezium.relational.history.HistoryRecordComparator}.
+ */
+ @Override
+ public int compareTo(Offset offset) {
+ BinlogOffset that = (BinlogOffset) offset;
+ // the NO_STOPPING_OFFSET is the max offset
+ if (NO_STOPPING_OFFSET.equals(that) &&
NO_STOPPING_OFFSET.equals(this)) {
+ return 0;
+ }
+ if (NO_STOPPING_OFFSET.equals(this)) {
+ return 1;
+ }
+ if (NO_STOPPING_OFFSET.equals(that)) {
+ return -1;
+ }
+
+ String gtidSetStr = this.getGtidSet();
+ String targetGtidSetStr = that.getGtidSet();
+ if (StringUtils.isNotEmpty(targetGtidSetStr)) {
+ // The target offset uses GTIDs, so we ideally compare using GTIDs
...
+ if (StringUtils.isNotEmpty(gtidSetStr)) {
+ // Both have GTIDs, so base the comparison entirely on the
GTID sets.
+ GtidSet gtidSet = new GtidSet(gtidSetStr);
+ GtidSet targetGtidSet = new GtidSet(targetGtidSetStr);
+ if (gtidSet.equals(targetGtidSet)) {
+ long restartSkipEvents = this.getRestartSkipEvents();
+ long targetRestartSkipEvents = that.getRestartSkipEvents();
+ return Long.compare(restartSkipEvents,
targetRestartSkipEvents);
+ }
+ // The GTIDs are not an exact match, so figure out if this is
a subset of the target
+ // offset
+ // ...
+ return gtidSet.isContainedWithin(targetGtidSet) ? -1 : 1;
+ }
+ // The target offset did use GTIDs while this did not use GTIDs.
So, we assume
+ // that this offset is older since GTIDs are often enabled but
rarely disabled.
+ // And if they are disabled,
+ // it is likely that this offset would not include GTIDs as we
would be trying
+ // to read the binlog of a
+ // server that no longer has GTIDs. And if they are enabled,
disabled, and re-enabled,
+ // per
+ //
https://dev.mysql.com/doc/refman/5.7/en/replication-gtids-failover.html all
properly
+ // configured slaves that
+ // use GTIDs should always have the complete set of GTIDs copied
from the master, in
+ // which case
+ // again we know that this offset not having GTIDs is before the
target offset ...
+ return -1;
+ } else if (StringUtils.isNotEmpty(gtidSetStr)) {
+ // This offset has a GTID but the target offset does not, so per
the previous paragraph
+ // we
+ // assume that previous
+ // is not at or before ...
+ return 1;
+ }
+
+ // Both offsets are missing GTIDs. Look at the servers ...
+ long serverId = this.getServerId();
+ long targetServerId = that.getServerId();
+
+ if (serverId != targetServerId) {
+ // These are from different servers, and their binlog coordinates
are not related. So
+ // the only thing we can do
+ // is compare timestamps, and we have to assume that the server
timestamps can be
+ // compared ...
+ long timestamp = this.getTimestamp();
+ long targetTimestamp = that.getTimestamp();
+ return Long.compare(timestamp, targetTimestamp);
+ }
+
+ // First compare the MySQL binlog filenames
+ if (this.getFilename().compareToIgnoreCase(that.getFilename()) != 0) {
+ return this.getFilename().compareToIgnoreCase(that.getFilename());
+ }
+
+ // The filenames are the same, so compare the positions
+ if (this.getPosition() != that.getPosition()) {
+ return Long.compare(this.getPosition(), that.getPosition());
+ }
+
+ // The positions are the same, so compare the completed events in the
transaction ...
+ if (this.getRestartSkipEvents() != that.getRestartSkipEvents()) {
+ return Long.compare(this.getRestartSkipEvents(),
that.getRestartSkipEvents());
+ }
+
+ // The completed events are the same, so compare the row number ...
+ return Long.compare(this.getRestartSkipRows(),
that.getRestartSkipRows());
+ }
+
+ @SuppressWarnings("checkstyle:EqualsHashCode")
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof BinlogOffset)) {
+ return false;
+ }
+ BinlogOffset that = (BinlogOffset) o;
+ return offset.equals(that.offset);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
new file mode 100644
index 000000000..dec05b07b
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch;
+
+import static
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset.BINLOG_FILENAME_OFFSET_KEY;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlUtils;
+
+import com.github.shyiko.mysql.binlog.BinaryLogClient;
+import io.debezium.connector.AbstractSourceInfo;
+import io.debezium.connector.base.ChangeEventQueue;
+import io.debezium.connector.mysql.MySqlChangeEventSourceMetricsFactory;
+import io.debezium.connector.mysql.MySqlConnection;
+import io.debezium.connector.mysql.MySqlConnectorConfig;
+import io.debezium.connector.mysql.MySqlDatabaseSchema;
+import io.debezium.connector.mysql.MySqlErrorHandler;
+import io.debezium.connector.mysql.MySqlOffsetContext;
+import io.debezium.connector.mysql.MySqlStreamingChangeEventSourceMetrics;
+import io.debezium.connector.mysql.MySqlTaskContext;
+import io.debezium.connector.mysql.MySqlTopicSelector;
+import io.debezium.data.Envelope;
+import io.debezium.pipeline.DataChangeEvent;
+import io.debezium.pipeline.ErrorHandler;
+import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
+import io.debezium.pipeline.source.spi.EventMetadataProvider;
+import io.debezium.pipeline.spi.OffsetContext;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableId;
+import io.debezium.relational.Tables;
+import io.debezium.schema.DataCollectionId;
+import io.debezium.schema.TopicSelector;
+import io.debezium.util.Collect;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
+import org.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
+import org.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
+import org.seatunnel.connectors.cdc.base.source.offset.Offset;
+import
org.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext;
+import org.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The context for fetch task that fetching data of snapshot split from MySQL
data source.
+ */
+public class MySqlSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MySqlSourceFetchTaskContext.class);
+
+ private final MySqlConnection connection;
+ private final BinaryLogClient binaryLogClient;
+ private final MySqlEventMetadataProvider metadataProvider;
+ private MySqlDatabaseSchema databaseSchema;
+ private MySqlTaskContextImpl taskContext;
+ private MySqlOffsetContext offsetContext;
+ private SnapshotChangeEventSourceMetrics snapshotChangeEventSourceMetrics;
+ private MySqlStreamingChangeEventSourceMetrics
streamingChangeEventSourceMetrics;
+ private TopicSelector<TableId> topicSelector;
+ private JdbcSourceEventDispatcher dispatcher;
+ private ChangeEventQueue<DataChangeEvent> queue;
+ private MySqlErrorHandler errorHandler;
+
+ public MySqlSourceFetchTaskContext(
+ JdbcSourceConfig sourceConfig,
+ JdbcDataSourceDialect dataSourceDialect,
+ MySqlConnection connection,
+ BinaryLogClient binaryLogClient) {
+ super(sourceConfig, dataSourceDialect);
+ this.connection = connection;
+ this.binaryLogClient = binaryLogClient;
+ this.metadataProvider = new MySqlEventMetadataProvider();
+ }
+
+ @Override
+ public void configure(SourceSplitBase sourceSplitBase) {
+ // initial stateful objects
+ final MySqlConnectorConfig connectorConfig = getDbzConnectorConfig();
+ final boolean tableIdCaseInsensitive =
connection.isTableIdCaseSensitive();
+ this.topicSelector =
MySqlTopicSelector.defaultSelector(connectorConfig);
+
+ this.databaseSchema =
+ MySqlUtils.createMySqlDatabaseSchema(connectorConfig,
tableIdCaseInsensitive);
+ this.offsetContext =
+ loadStartingOffsetState(
+ new MySqlOffsetContext.Loader(connectorConfig),
sourceSplitBase);
+ validateAndLoadDatabaseHistory(offsetContext, databaseSchema);
+
+ this.taskContext =
+ new MySqlTaskContextImpl(connectorConfig, databaseSchema,
binaryLogClient);
+ final int queueSize =
+ sourceSplitBase.isSnapshotSplit() ? Integer.MAX_VALUE
+ : getSourceConfig().getDbzConnectorConfig().getMaxQueueSize();
+ this.queue =
+ new ChangeEventQueue.Builder<DataChangeEvent>()
+ .pollInterval(connectorConfig.getPollInterval())
+ .maxBatchSize(connectorConfig.getMaxBatchSize())
+ .maxQueueSize(queueSize)
+ .maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes())
+ .loggingContextSupplier(
+ () ->
+ taskContext.configureLoggingContext(
+ "mysql-cdc-connector-task"))
+ // do not buffer any element, we use signal event
+ // .buffering()
+ .build();
+ this.dispatcher =
+ new JdbcSourceEventDispatcher(
+ connectorConfig,
+ topicSelector,
+ databaseSchema,
+ queue,
+ connectorConfig.getTableFilters().dataCollectionFilter(),
+ DataChangeEvent::new,
+ metadataProvider,
+ schemaNameAdjuster);
+
+ final MySqlChangeEventSourceMetricsFactory
changeEventSourceMetricsFactory =
+ new MySqlChangeEventSourceMetricsFactory(
+ new MySqlStreamingChangeEventSourceMetrics(
+ taskContext, queue, metadataProvider));
+ this.snapshotChangeEventSourceMetrics =
+ changeEventSourceMetricsFactory.getSnapshotMetrics(
+ taskContext, queue, metadataProvider);
+ this.streamingChangeEventSourceMetrics =
+ (MySqlStreamingChangeEventSourceMetrics)
+ changeEventSourceMetricsFactory.getStreamingMetrics(
+ taskContext, queue, metadataProvider);
+ this.errorHandler = new
MySqlErrorHandler(connectorConfig.getLogicalName(), queue);
+ }
+
+ @Override
+ public MySqlSourceConfig getSourceConfig() {
+ return (MySqlSourceConfig) sourceConfig;
+ }
+
+ public MySqlConnection getConnection() {
+ return connection;
+ }
+
+ public BinaryLogClient getBinaryLogClient() {
+ return binaryLogClient;
+ }
+
+ public MySqlTaskContextImpl getTaskContext() {
+ return taskContext;
+ }
+
+ @Override
+ public MySqlConnectorConfig getDbzConnectorConfig() {
+ return (MySqlConnectorConfig) super.getDbzConnectorConfig();
+ }
+
+ @Override
+ public MySqlOffsetContext getOffsetContext() {
+ return offsetContext;
+ }
+
+ public SnapshotChangeEventSourceMetrics
getSnapshotChangeEventSourceMetrics() {
+ return snapshotChangeEventSourceMetrics;
+ }
+
+ public MySqlStreamingChangeEventSourceMetrics
getStreamingChangeEventSourceMetrics() {
+ return streamingChangeEventSourceMetrics;
+ }
+
+ @Override
+ public ErrorHandler getErrorHandler() {
+ return errorHandler;
+ }
+
+ @Override
+ public MySqlDatabaseSchema getDatabaseSchema() {
+ return databaseSchema;
+ }
+
+ @Override
+ public SeaTunnelRowType getSplitType(Table table) {
+ return MySqlUtils.getSplitType(table);
+ }
+
+ @Override
+ public JdbcSourceEventDispatcher getDispatcher() {
+ return dispatcher;
+ }
+
+ @Override
+ public ChangeEventQueue<DataChangeEvent> getQueue() {
+ return queue;
+ }
+
+ @Override
+ public Tables.TableFilter getTableFilter() {
+ return
getDbzConnectorConfig().getTableFilters().dataCollectionFilter();
+ }
+
+ @Override
+ public Offset getStreamOffset(SourceRecord sourceRecord) {
+ return MySqlUtils.getBinlogPosition(sourceRecord);
+ }
+
+ /**
+ * Loads the connector's persistent offset (if present) via the given
loader.
+ */
+ private MySqlOffsetContext loadStartingOffsetState(
+ OffsetContext.Loader loader, SourceSplitBase mySqlSplit) {
+ Offset offset =
+ mySqlSplit.isSnapshotSplit() ? BinlogOffset.INITIAL_OFFSET
+ : mySqlSplit.asIncrementalSplit().getStartupOffset();
+
+ MySqlOffsetContext mySqlOffsetContext =
+ (MySqlOffsetContext) loader.load(offset.getOffset());
+
+ if (!isBinlogAvailable(mySqlOffsetContext)) {
+ throw new IllegalStateException(
+ "The connector is trying to read binlog starting at "
+ + mySqlOffsetContext.getSourceInfo()
+ + ", but this is no longer "
+ + "available on the server. Reconfigure the connector to
use a snapshot when needed.");
+ }
+ return mySqlOffsetContext;
+ }
+
+ private boolean isBinlogAvailable(MySqlOffsetContext offset) {
+ String binlogFilename =
offset.getSourceInfo().getString(BINLOG_FILENAME_OFFSET_KEY);
+ if (binlogFilename == null) {
+ return true; // start at current position
+ }
+ if (binlogFilename.equals("")) {
+ return true; // start at beginning
+ }
+
+ // Accumulate the available binlog filenames ...
+ List<String> logNames = connection.availableBinlogFiles();
+
+ // And compare with the one we're supposed to use ...
+ boolean found = logNames.stream().anyMatch(binlogFilename::equals);
+ if (!found) {
+ LOG.info(
+ "Connector requires binlog file '{}', but MySQL only has {}",
+ binlogFilename,
+ String.join(", ", logNames));
+ } else {
+ LOG.info("MySQL has the binlog file '{}' required by the
connector", binlogFilename);
+ }
+ return found;
+ }
+
+ private void validateAndLoadDatabaseHistory(
+ MySqlOffsetContext offset, MySqlDatabaseSchema schema) {
+ schema.initializeStorage();
+ schema.recover(offset);
+ }
+
+ /**
+ * A subclass implementation of {@link MySqlTaskContext} which reuses one
BinaryLogClient.
+ */
+ public class MySqlTaskContextImpl extends MySqlTaskContext {
+
+ private final BinaryLogClient reusedBinaryLogClient;
+
+ public MySqlTaskContextImpl(
+ MySqlConnectorConfig config,
+ MySqlDatabaseSchema schema,
+ BinaryLogClient reusedBinaryLogClient) {
+ super(config, schema);
+ this.reusedBinaryLogClient = reusedBinaryLogClient;
+ }
+
+ @Override
+ public BinaryLogClient getBinaryLogClient() {
+ return reusedBinaryLogClient;
+ }
+ }
+
+ /**
+ * Copied from debezium for accessing here.
+ */
+ public static class MySqlEventMetadataProvider implements
EventMetadataProvider {
+ public static final String SERVER_ID_KEY = "server_id";
+
+ public static final String GTID_KEY = "gtid";
+ public static final String BINLOG_FILENAME_OFFSET_KEY = "file";
+ public static final String BINLOG_POSITION_OFFSET_KEY = "pos";
+ public static final String BINLOG_ROW_IN_EVENT_OFFSET_KEY = "row";
+ public static final String THREAD_KEY = "thread";
+ public static final String QUERY_KEY = "query";
+
+ @Override
+ public Instant getEventTimestamp(
+ DataCollectionId source, OffsetContext offset, Object key, Struct
value) {
+ if (value == null) {
+ return null;
+ }
+ final Struct sourceInfo =
value.getStruct(Envelope.FieldName.SOURCE);
+ if (source == null) {
+ return null;
+ }
+ final Long timestamp =
sourceInfo.getInt64(AbstractSourceInfo.TIMESTAMP_KEY);
+ return timestamp == null ? null : Instant.ofEpochMilli(timestamp);
+ }
+
+ @Override
+ public Map<String, String> getEventSourcePosition(
+ DataCollectionId source, OffsetContext offset, Object key, Struct
value) {
+ if (value == null) {
+ return null;
+ }
+ final Struct sourceInfo =
value.getStruct(Envelope.FieldName.SOURCE);
+ if (source == null) {
+ return null;
+ }
+ return Collect.hashMapOf(
+ BINLOG_FILENAME_OFFSET_KEY,
+ sourceInfo.getString(BINLOG_FILENAME_OFFSET_KEY),
+ BINLOG_POSITION_OFFSET_KEY,
+ Long.toString(sourceInfo.getInt64(BINLOG_POSITION_OFFSET_KEY)),
+ BINLOG_ROW_IN_EVENT_OFFSET_KEY,
+
Integer.toString(sourceInfo.getInt32(BINLOG_ROW_IN_EVENT_OFFSET_KEY)));
+ }
+
+ @Override
+ public String getTransactionId(
+ DataCollectionId source, OffsetContext offset, Object key, Struct
value) {
+ return ((MySqlOffsetContext) offset).getTransactionId();
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java
new file mode 100644
index 000000000..21da9227a
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.binlog;
+
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.MySqlSourceFetchTaskContext;
+
+import io.debezium.connector.mysql.MySqlStreamingChangeEventSource;
+import io.debezium.pipeline.source.spi.ChangeEventSource;
+import io.debezium.util.Clock;
+import org.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
+import org.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
+import org.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+
+public class MySqlBinlogFetchTask implements FetchTask<SourceSplitBase> {
+ private final IncrementalSplit split;
+ private volatile boolean taskRunning = false;
+
+ public MySqlBinlogFetchTask(IncrementalSplit split) {
+ this.split = split;
+ }
+
+ @Override
+ public void execute(Context context) throws Exception {
+ MySqlSourceFetchTaskContext sourceFetchContext =
(MySqlSourceFetchTaskContext) context;
+ taskRunning = true;
+
+ MySqlStreamingChangeEventSource mySqlStreamingChangeEventSource = new
MySqlStreamingChangeEventSource(
+ sourceFetchContext.getDbzConnectorConfig(),
+ sourceFetchContext.getConnection(),
+ sourceFetchContext.getDispatcher(),
+ sourceFetchContext.getErrorHandler(),
+ Clock.SYSTEM,
+ sourceFetchContext.getTaskContext(),
+ sourceFetchContext.getStreamingChangeEventSourceMetrics());
+
+ BinlogSplitChangeEventSourceContext changeEventSourceContext =
+ new BinlogSplitChangeEventSourceContext();
+
+ mySqlStreamingChangeEventSource.execute(changeEventSourceContext,
sourceFetchContext.getOffsetContext());
+ }
+
+ @Override
+ public boolean isRunning() {
+ return taskRunning;
+ }
+
+ @Override
+ public SourceSplitBase getSplit() {
+ return split;
+ }
+
+ private class BinlogSplitChangeEventSourceContext
+ implements ChangeEventSource.ChangeEventSourceContext {
+ @Override
+ public boolean isRunning() {
+ return taskRunning;
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java
new file mode 100644
index 000000000..832bc8b6d
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.scan;
+
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.MySqlSourceFetchTaskContext;
+
+import org.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
+import org.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
+import org.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+
+public class MySqlSnapshotFetchTask implements FetchTask<SourceSplitBase> {
+
+ private final SnapshotSplit split;
+
+ private volatile boolean taskRunning = false;
+
+ private MySqlSnapshotSplitReadTask snapshotSplitReadTask;
+
+ public MySqlSnapshotFetchTask(SnapshotSplit split) {
+ this.split = split;
+ }
+
+ @Override
+ public void execute(Context context) throws Exception {
+ MySqlSourceFetchTaskContext sourceFetchContext =
(MySqlSourceFetchTaskContext) context;
+ taskRunning = true;
+ snapshotSplitReadTask =
+ new MySqlSnapshotSplitReadTask(
+ sourceFetchContext.getDbzConnectorConfig(),
+ sourceFetchContext.getOffsetContext(),
+ sourceFetchContext.getSnapshotChangeEventSourceMetrics(),
+ sourceFetchContext.getDatabaseSchema(),
+ sourceFetchContext.getConnection(),
+ sourceFetchContext.getDispatcher(),
+ split);
+ SnapshotSplitChangeEventSourceContext changeEventSourceContext =
+ new SnapshotSplitChangeEventSourceContext();
+ snapshotSplitReadTask.execute(
+ changeEventSourceContext,
sourceFetchContext.getOffsetContext());
+ taskRunning = false;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return taskRunning;
+ }
+
+ @Override
+ public SourceSplitBase getSplit() {
+ return split;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java
new file mode 100644
index 000000000..0699ec394
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.scan;
+
+import static
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.currentBinlogOffset;
+import static
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlUtils.buildSplitScanQuery;
+import static
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlUtils.readTableSplitDataStatement;
+
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset;
+
+import io.debezium.DebeziumException;
+import io.debezium.connector.mysql.MySqlConnection;
+import io.debezium.connector.mysql.MySqlConnectorConfig;
+import io.debezium.connector.mysql.MySqlDatabaseSchema;
+import io.debezium.connector.mysql.MySqlOffsetContext;
+import io.debezium.connector.mysql.MySqlValueConverters;
+import io.debezium.pipeline.EventDispatcher;
+import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
+import io.debezium.pipeline.source.spi.SnapshotProgressListener;
+import io.debezium.pipeline.spi.ChangeRecordEmitter;
+import io.debezium.pipeline.spi.OffsetContext;
+import io.debezium.pipeline.spi.SnapshotResult;
+import io.debezium.relational.Column;
+import io.debezium.relational.RelationalSnapshotChangeEventSource;
+import io.debezium.relational.SnapshotChangeRecordEmitter;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableId;
+import io.debezium.util.Clock;
+import io.debezium.util.ColumnUtils;
+import io.debezium.util.Strings;
+import io.debezium.util.Threads;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
+import org.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
+import org.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkKind;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.sql.Blob;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.time.Duration;
+import java.util.Calendar;
+
+public class MySqlSnapshotSplitReadTask extends
AbstractSnapshotChangeEventSource {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MySqlSnapshotSplitReadTask.class);
+
+ /** Interval for showing a log statement with the progress while scanning
a single table. */
+ private static final Duration LOG_INTERVAL = Duration.ofMillis(10_000);
+
+ private final MySqlConnectorConfig connectorConfig;
+ private final MySqlDatabaseSchema databaseSchema;
+ private final MySqlConnection jdbcConnection;
+ private final JdbcSourceEventDispatcher dispatcher;
+ private final Clock clock;
+ private final SnapshotSplit snapshotSplit;
+ private final MySqlOffsetContext offsetContext;
+ private final SnapshotProgressListener snapshotProgressListener;
+
+ public MySqlSnapshotSplitReadTask(
+ MySqlConnectorConfig connectorConfig,
+ MySqlOffsetContext previousOffset,
+ SnapshotProgressListener snapshotProgressListener,
+ MySqlDatabaseSchema databaseSchema,
+ MySqlConnection jdbcConnection,
+ JdbcSourceEventDispatcher dispatcher,
+ SnapshotSplit snapshotSplit) {
+ super(connectorConfig, snapshotProgressListener);
+ this.offsetContext = previousOffset;
+ this.connectorConfig = connectorConfig;
+ this.databaseSchema = databaseSchema;
+ this.jdbcConnection = jdbcConnection;
+ this.dispatcher = dispatcher;
+ this.clock = Clock.SYSTEM;
+ this.snapshotSplit = snapshotSplit;
+ this.snapshotProgressListener = snapshotProgressListener;
+ }
+
+ @Override
+ public SnapshotResult execute(
+ ChangeEventSourceContext context, OffsetContext previousOffset)
+ throws InterruptedException {
+ SnapshottingTask snapshottingTask =
getSnapshottingTask(previousOffset);
+ final SnapshotContext ctx;
+ try {
+ ctx = prepare(context);
+ } catch (Exception e) {
+ LOG.error("Failed to initialize snapshot context.", e);
+ throw new RuntimeException(e);
+ }
+ try {
+ return doExecute(context, previousOffset, ctx, snapshottingTask);
+ } catch (InterruptedException e) {
+ LOG.warn("Snapshot was interrupted before completion");
+ throw e;
+ } catch (Exception t) {
+ throw new DebeziumException(t);
+ }
+ }
+
+ @Override
+ protected SnapshotResult doExecute(
+ ChangeEventSourceContext context,
+ OffsetContext previousOffset,
+ SnapshotContext snapshotContext,
+ SnapshottingTask snapshottingTask)
+ throws Exception {
+ final RelationalSnapshotChangeEventSource.RelationalSnapshotContext
ctx =
+ (RelationalSnapshotChangeEventSource.RelationalSnapshotContext)
snapshotContext;
+ ctx.offset = offsetContext;
+
+ final BinlogOffset lowWatermark = currentBinlogOffset(jdbcConnection);
+ LOG.info(
+ "Snapshot step 1 - Determining low watermark {} for split {}",
+ lowWatermark,
+ snapshotSplit);
+ ((SnapshotSplitChangeEventSourceContext)
context).setLowWatermark(lowWatermark);
+ dispatcher.dispatchWatermarkEvent(
+ offsetContext.getPartition(), snapshotSplit, lowWatermark,
WatermarkKind.LOW);
+
+ LOG.info("Snapshot step 2 - Snapshotting data");
+ createDataEvents(ctx, snapshotSplit.getTableId());
+
+ final BinlogOffset highWatermark = currentBinlogOffset(jdbcConnection);
+ LOG.info(
+ "Snapshot step 3 - Determining high watermark {} for split {}",
+ highWatermark,
+ snapshotSplit);
+ ((SnapshotSplitChangeEventSourceContext)
context).setHighWatermark(lowWatermark);
+ dispatcher.dispatchWatermarkEvent(
+ offsetContext.getPartition(), snapshotSplit, highWatermark,
WatermarkKind.HIGH);
+ return SnapshotResult.completed(ctx.offset);
+ }
+
+ @Override
+ protected SnapshottingTask getSnapshottingTask(OffsetContext
previousOffset) {
+ return new SnapshottingTask(false, true);
+ }
+
+ @Override
+ protected SnapshotContext prepare(ChangeEventSourceContext
changeEventSourceContext)
+ throws Exception {
+ return new MySqlSnapshotContext();
+ }
+
+ private void createDataEvents(
+ RelationalSnapshotChangeEventSource.RelationalSnapshotContext
snapshotContext,
+ TableId tableId)
+ throws Exception {
+ EventDispatcher.SnapshotReceiver snapshotReceiver =
+ dispatcher.getSnapshotChangeEventReceiver();
+ LOG.debug("Snapshotting table {}", tableId);
+ createDataEventsForTable(
+ snapshotContext, snapshotReceiver,
databaseSchema.tableFor(tableId));
+ snapshotReceiver.completeSnapshot();
+ }
+
+ /** Dispatches the data change events for the records of a single table. */
+ private void createDataEventsForTable(
+ RelationalSnapshotChangeEventSource.RelationalSnapshotContext
snapshotContext,
+ EventDispatcher.SnapshotReceiver snapshotReceiver,
+ Table table)
+ throws InterruptedException {
+
+ long exportStart = clock.currentTimeInMillis();
+ LOG.info(
+ "Exporting data from split '{}' of table {}",
+ snapshotSplit.splitId(),
+ table.id());
+
+ final String selectSql =
+ buildSplitScanQuery(
+ snapshotSplit.getTableId(),
+ snapshotSplit.getSplitKeyType(),
+ snapshotSplit.getSplitStart() == null,
+ snapshotSplit.getSplitEnd() == null);
+ LOG.info(
+ "For split '{}' of table {} using select statement: '{}'",
+ snapshotSplit.splitId(),
+ table.id(),
+ selectSql);
+
+ try (PreparedStatement selectStatement =
+ readTableSplitDataStatement(
+ jdbcConnection,
+ selectSql,
+ snapshotSplit.getSplitStart() == null,
+ snapshotSplit.getSplitEnd() == null,
+ new Object[]{snapshotSplit.getSplitStart()},
+ new Object[]{snapshotSplit.getSplitEnd()},
+ snapshotSplit.getSplitKeyType().getTotalFields(),
+ connectorConfig.getQueryFetchSize());
+ ResultSet rs = selectStatement.executeQuery()) {
+
+ ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs,
table);
+ long rows = 0;
+ Threads.Timer logTimer = getTableScanLogTimer();
+
+ while (rs.next()) {
+ rows++;
+ final Object[] row = new
Object[columnArray.getGreatestColumnPosition()];
+ for (int i = 0; i < columnArray.getColumns().length; i++) {
+ Column actualColumn = table.columns().get(i);
+ row[columnArray.getColumns()[i].position() - 1] =
+ readField(rs, i + 1, actualColumn, table);
+ }
+ if (logTimer.expired()) {
+ long stop = clock.currentTimeInMillis();
+ LOG.info(
+ "Exported {} records for split '{}' after {}",
+ rows,
+ snapshotSplit.splitId(),
+ Strings.duration(stop - exportStart));
+ snapshotProgressListener.rowsScanned(table.id(), rows);
+ logTimer = getTableScanLogTimer();
+ }
+ dispatcher.dispatchSnapshotEvent(
+ table.id(),
+ getChangeRecordEmitter(snapshotContext, table.id(), row),
+ snapshotReceiver);
+ }
+ LOG.info(
+ "Finished exporting {} records for split '{}', total duration
'{}'",
+ rows,
+ snapshotSplit.splitId(),
+ Strings.duration(clock.currentTimeInMillis() - exportStart));
+ } catch (SQLException e) {
+ throw new ConnectException("Snapshotting of table " + table.id() +
" failed", e);
+ }
+ }
+
+ protected ChangeRecordEmitter getChangeRecordEmitter(
+ SnapshotContext snapshotContext, TableId tableId, Object[] row) {
+ snapshotContext.offset.event(tableId, clock.currentTime());
+ return new SnapshotChangeRecordEmitter(snapshotContext.offset, row,
clock);
+ }
+
+ private Threads.Timer getTableScanLogTimer() {
+ return Threads.timer(clock, LOG_INTERVAL);
+ }
+
+ /**
+ * Read JDBC return value and deal special type like time, timestamp.
+ *
+ * <p>Note https://issues.redhat.com/browse/DBZ-3238 has fixed this issue,
please remove
+ * this method once we bump Debezium version to 1.6
+ */
+ private Object readField(ResultSet rs, int fieldNo, Column actualColumn,
Table actualTable)
+ throws SQLException {
+ if (actualColumn.jdbcType() == Types.TIME) {
+ return readTimeField(rs, fieldNo);
+ } else if (actualColumn.jdbcType() == Types.DATE) {
+ return readDateField(rs, fieldNo, actualColumn, actualTable);
+ }
+ // This is for DATETIME columns (a logical date + time without time
zone)
+ // by reading them with a calendar based on the default time zone, we
make sure that the
+ // value
+ // is constructed correctly using the database's (or connection's)
time zone
+ else if (actualColumn.jdbcType() == Types.TIMESTAMP) {
+ return readTimestampField(rs, fieldNo, actualColumn, actualTable);
+ }
+ // JDBC's rs.GetObject() will return a Boolean for all TINYINT(1)
columns.
+ // TINYINT columns are reprtoed as SMALLINT by JDBC driver
+ else if (actualColumn.jdbcType() == Types.TINYINT
+ || actualColumn.jdbcType() == Types.SMALLINT) {
+ // It seems that rs.wasNull() returns false when default value is
set and NULL is
+ // inserted
+ // We thus need to use getObject() to identify if the value was
provided and if yes
+ // then
+ // read it again to get correct scale
+ return rs.getObject(fieldNo) == null ? null : rs.getInt(fieldNo);
+ }
+ // DBZ-2673
+ // It is necessary to check the type names as types like ENUM and SET
are
+ // also reported as JDBC type char
+ else if ("CHAR".equals(actualColumn.typeName())
+ || "VARCHAR".equals(actualColumn.typeName())
+ || "TEXT".equals(actualColumn.typeName())) {
+ return rs.getBytes(fieldNo);
+ } else {
+ return rs.getObject(fieldNo);
+ }
+ }
+
+ /**
+ * As MySQL connector/J implementation is broken for MySQL type "TIME" we
have to use a
+ * binary-ish workaround. https://issues.jboss.org/browse/DBZ-342
+ */
+ private Object readTimeField(ResultSet rs, int fieldNo) throws
SQLException {
+ Blob b = rs.getBlob(fieldNo);
+ if (b == null) {
+ return null; // Don't continue parsing time field if it is null
+ }
+
+ try {
+ return MySqlValueConverters.stringToDuration(
+ new String(b.getBytes(1, (int) (b.length())), "UTF-8"));
+ } catch (UnsupportedEncodingException e) {
+ LOG.error("Could not read MySQL TIME value as UTF-8");
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * In non-string mode the date field can contain zero in any of the date
part which we need
+ * to handle as all-zero.
+ */
+ private Object readDateField(ResultSet rs, int fieldNo, Column column,
Table table)
+ throws SQLException {
+ Blob b = rs.getBlob(fieldNo);
+ if (b == null) {
+ return null; // Don't continue parsing date field if it is null
+ }
+
+ try {
+ return MySqlValueConverters.stringToLocalDate(
+ new String(b.getBytes(1, (int) (b.length())), "UTF-8"),
column, table);
+ } catch (UnsupportedEncodingException e) {
+ LOG.error("Could not read MySQL TIME value as UTF-8");
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * In non-string mode the time field can contain zero in any of the date
part which we need
+ * to handle as all-zero.
+ */
+ private Object readTimestampField(ResultSet rs, int fieldNo, Column
column, Table table)
+ throws SQLException {
+ Blob b = rs.getBlob(fieldNo);
+ if (b == null) {
+ return null; // Don't continue parsing timestamp field if it is
null
+ }
+
+ try {
+ return MySqlValueConverters.containsZeroValuesInDatePart(
+ new String(b.getBytes(1, (int) (b.length())), "UTF-8"),
+ column,
+ table) ? null
+ : rs.getTimestamp(fieldNo, Calendar.getInstance());
+ } catch (UnsupportedEncodingException e) {
+ LOG.error("Could not read MySQL TIME value as UTF-8");
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static class MySqlSnapshotContext extends
RelationalSnapshotChangeEventSource.RelationalSnapshotContext {
+ public MySqlSnapshotContext() throws SQLException {
+ super("");
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/SnapshotSplitChangeEventSourceContext.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/SnapshotSplitChangeEventSourceContext.java
new file mode 100644
index 000000000..982226ac0
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/SnapshotSplitChangeEventSourceContext.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.scan;
+
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset;
+
+import io.debezium.pipeline.source.spi.ChangeEventSource;
+
+public class SnapshotSplitChangeEventSourceContext implements
ChangeEventSource.ChangeEventSourceContext{
+ private BinlogOffset lowWatermark;
+ private BinlogOffset highWatermark;
+
+ public BinlogOffset getLowWatermark() {
+ return lowWatermark;
+ }
+
+ public void setLowWatermark(BinlogOffset lowWatermark) {
+ this.lowWatermark = lowWatermark;
+ }
+
+ public BinlogOffset getHighWatermark() {
+ return highWatermark;
+ }
+
+ public void setHighWatermark(BinlogOffset highWatermark) {
+ this.highWatermark = highWatermark;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return lowWatermark != null && highWatermark != null;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlConnectionUtils.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlConnectionUtils.java
new file mode 100644
index 000000000..691f2f1ae
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlConnectionUtils.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils;
+
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset;
+
+import com.github.shyiko.mysql.binlog.BinaryLogClient;
+import io.debezium.config.Configuration;
+import io.debezium.connector.mysql.MySqlConnection;
+import io.debezium.connector.mysql.MySqlConnectorConfig;
+import io.debezium.connector.mysql.MySqlDatabaseSchema;
+import io.debezium.connector.mysql.MySqlSystemVariables;
+import io.debezium.connector.mysql.MySqlTopicSelector;
+import io.debezium.connector.mysql.MySqlValueConverters;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.jdbc.JdbcValueConverters;
+import io.debezium.jdbc.TemporalPrecisionMode;
+import io.debezium.relational.TableId;
+import io.debezium.schema.TopicSelector;
+import io.debezium.util.SchemaNameAdjuster;
+
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * MySQL connection Utilities.
+ */
+public class MySqlConnectionUtils {
+
+ /**
+ * Creates a new {@link MySqlConnection}, but not open the connection.
+ */
+ public static MySqlConnection createMySqlConnection(Configuration
dbzConfiguration) {
+ return new MySqlConnection(
+ new
MySqlConnection.MySqlConnectionConfiguration(dbzConfiguration));
+ }
+
+ /**
+ * Creates a new {@link BinaryLogClient} for consuming mysql binlog.
+ */
+ public static BinaryLogClient createBinaryClient(Configuration
dbzConfiguration) {
+ final MySqlConnectorConfig connectorConfig = new
MySqlConnectorConfig(dbzConfiguration);
+ return new BinaryLogClient(
+ connectorConfig.hostname(),
+ connectorConfig.port(),
+ connectorConfig.username(),
+ connectorConfig.password());
+ }
+
+ /**
+ * Creates a new {@link MySqlDatabaseSchema} to monitor the latest MySql
database schemas.
+ */
+ public static MySqlDatabaseSchema createMySqlDatabaseSchema(
+ MySqlConnectorConfig dbzMySqlConfig, boolean isTableIdCaseSensitive) {
+ TopicSelector<TableId> topicSelector =
MySqlTopicSelector.defaultSelector(dbzMySqlConfig);
+ SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
+ MySqlValueConverters valueConverters =
getValueConverters(dbzMySqlConfig);
+ return new MySqlDatabaseSchema(
+ dbzMySqlConfig,
+ valueConverters,
+ topicSelector,
+ schemaNameAdjuster,
+ isTableIdCaseSensitive);
+ }
+
+ /**
+ * Fetch current binlog offsets in MySql Server.
+ */
+ @SuppressWarnings("checkstyle:MagicNumber")
+ public static BinlogOffset currentBinlogOffset(JdbcConnection jdbc) {
+ final String showMasterStmt = "SHOW MASTER STATUS";
+ try {
+ return jdbc.queryAndMap(
+ showMasterStmt,
+ rs -> {
+ if (rs.next()) {
+ final String binlogFilename = rs.getString(1);
+ final long binlogPosition = rs.getLong(2);
+ final String gtidSet =
+ rs.getMetaData().getColumnCount() > 4 ?
rs.getString(5) : null;
+ return new BinlogOffset(
+ binlogFilename, binlogPosition, 0L, 0, 0, gtidSet,
null);
+ } else {
+ throw new SeaTunnelException(
+ "Cannot read the binlog filename and position via
'"
+ + showMasterStmt
+ + "'. Make sure your server is correctly
configured");
+ }
+ });
+ } catch (SQLException e) {
+ throw new SeaTunnelException(
+ "Cannot read the binlog filename and position via '"
+ + showMasterStmt
+ + "'. Make sure your server is correctly configured",
+ e);
+ }
+ }
+
+ //
--------------------------------------------------------------------------------------------
+
+ private static MySqlValueConverters
getValueConverters(MySqlConnectorConfig dbzMySqlConfig) {
+ TemporalPrecisionMode timePrecisionMode =
dbzMySqlConfig.getTemporalPrecisionMode();
+ JdbcValueConverters.DecimalMode decimalMode =
dbzMySqlConfig.getDecimalMode();
+ String bigIntUnsignedHandlingModeStr =
+ dbzMySqlConfig
+ .getConfig()
+ .getString(MySqlConnectorConfig.BIGINT_UNSIGNED_HANDLING_MODE);
+ MySqlConnectorConfig.BigIntUnsignedHandlingMode
bigIntUnsignedHandlingMode =
+ MySqlConnectorConfig.BigIntUnsignedHandlingMode.parse(
+ bigIntUnsignedHandlingModeStr);
+ JdbcValueConverters.BigIntUnsignedMode bigIntUnsignedMode =
+ bigIntUnsignedHandlingMode.asBigIntUnsignedMode();
+
+ boolean timeAdjusterEnabled =
+
dbzMySqlConfig.getConfig().getBoolean(MySqlConnectorConfig.ENABLE_TIME_ADJUSTER);
+ return new MySqlValueConverters(
+ decimalMode,
+ timePrecisionMode,
+ bigIntUnsignedMode,
+ dbzMySqlConfig.binaryHandlingMode(),
+ timeAdjusterEnabled ? MySqlValueConverters::adjustTemporal : x ->
x,
+ MySqlValueConverters::defaultParsingErrorHandler);
+ }
+
+ public static boolean isTableIdCaseSensitive(JdbcConnection connection) {
+ return !"0"
+ .equals(
+ readMySqlSystemVariables(connection)
+ .get(MySqlSystemVariables.LOWER_CASE_TABLE_NAMES));
+ }
+
+ public static Map<String, String> readMySqlSystemVariables(JdbcConnection
connection) {
+ // Read the system variables from the MySQL instance and get the
current database name ...
+ return querySystemVariables(connection, "SHOW VARIABLES");
+ }
+
+ private static Map<String, String> querySystemVariables(
+ JdbcConnection connection, String statement) {
+ final Map<String, String> variables = new HashMap<>();
+ try {
+ connection.query(
+ statement,
+ rs -> {
+ while (rs.next()) {
+ String varName = rs.getString(1);
+ String value = rs.getString(2);
+ if (varName != null && value != null) {
+ variables.put(varName, value);
+ }
+ }
+ });
+ } catch (SQLException e) {
+ throw new SeaTunnelException("Error reading MySQL variables: " +
e.getMessage(), e);
+ }
+
+ return variables;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java
new file mode 100644
index 000000000..8250e3cf2
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import io.debezium.relational.Column;
+import lombok.extern.slf4j.Slf4j;
+
+/** Utilities for converting from MySQL types to Flink types. */
+
+@Slf4j
+public class MySqlTypeUtils {
+
+ // ============================data types=====================
+
+ private static final String MYSQL_UNKNOWN = "UNKNOWN";
+ private static final String MYSQL_BIT = "BIT";
+
+ // -------------------------number----------------------------
+ private static final String MYSQL_TINYINT = "TINYINT";
+ private static final String MYSQL_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+ private static final String MYSQL_SMALLINT = "SMALLINT";
+ private static final String MYSQL_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+ private static final String MYSQL_MEDIUMINT = "MEDIUMINT";
+ private static final String MYSQL_MEDIUMINT_UNSIGNED = "MEDIUMINT
UNSIGNED";
+ private static final String MYSQL_INT = "INT";
+ private static final String MYSQL_INT_UNSIGNED = "INT UNSIGNED";
+ private static final String MYSQL_INTEGER = "INTEGER";
+ private static final String MYSQL_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
+ private static final String MYSQL_BIGINT = "BIGINT";
+ private static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+ private static final String MYSQL_DECIMAL = "DECIMAL";
+ private static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+ private static final String MYSQL_FLOAT = "FLOAT";
+ private static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+ private static final String MYSQL_DOUBLE = "DOUBLE";
+ private static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+
+ // -------------------------string----------------------------
+ private static final String MYSQL_CHAR = "CHAR";
+ private static final String MYSQL_VARCHAR = "VARCHAR";
+ private static final String MYSQL_TINYTEXT = "TINYTEXT";
+ private static final String MYSQL_MEDIUMTEXT = "MEDIUMTEXT";
+ private static final String MYSQL_TEXT = "TEXT";
+ private static final String MYSQL_LONGTEXT = "LONGTEXT";
+ private static final String MYSQL_JSON = "JSON";
+
+ // ------------------------------time-------------------------
+ private static final String MYSQL_DATE = "DATE";
+ private static final String MYSQL_DATETIME = "DATETIME";
+ private static final String MYSQL_TIME = "TIME";
+ private static final String MYSQL_TIMESTAMP = "TIMESTAMP";
+ private static final String MYSQL_YEAR = "YEAR";
+
+ // ------------------------------blob-------------------------
+ private static final String MYSQL_TINYBLOB = "TINYBLOB";
+ private static final String MYSQL_MEDIUMBLOB = "MEDIUMBLOB";
+ private static final String MYSQL_BLOB = "BLOB";
+ private static final String MYSQL_LONGBLOB = "LONGBLOB";
+ private static final String MYSQL_BINARY = "BINARY";
+ private static final String MYSQL_VARBINARY = "VARBINARY";
+ private static final String MYSQL_GEOMETRY = "GEOMETRY";
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ public static SeaTunnelDataType<?> convertFromColumn(Column column) {
+ String typeName = column.typeName();
+ switch (typeName) {
+ case MYSQL_BIT:
+ return BasicType.BOOLEAN_TYPE;
+ case MYSQL_TINYINT:
+ case MYSQL_TINYINT_UNSIGNED:
+ case MYSQL_SMALLINT:
+ case MYSQL_SMALLINT_UNSIGNED:
+ case MYSQL_MEDIUMINT:
+ case MYSQL_MEDIUMINT_UNSIGNED:
+ case MYSQL_INT:
+ case MYSQL_INTEGER:
+ case MYSQL_YEAR:
+ return BasicType.INT_TYPE;
+ case MYSQL_INT_UNSIGNED:
+ case MYSQL_INTEGER_UNSIGNED:
+ case MYSQL_BIGINT:
+ return BasicType.LONG_TYPE;
+ case MYSQL_BIGINT_UNSIGNED:
+ return new DecimalType(20, 0);
+ case MYSQL_DECIMAL:
+ return new DecimalType(column.length(),
column.scale().orElse(0));
+ case MYSQL_FLOAT:
+ return BasicType.FLOAT_TYPE;
+ case MYSQL_FLOAT_UNSIGNED:
+ log.warn("{} will probably cause value overflow.",
MYSQL_FLOAT_UNSIGNED);
+ return BasicType.FLOAT_TYPE;
+ case MYSQL_DOUBLE:
+ return BasicType.DOUBLE_TYPE;
+ case MYSQL_DOUBLE_UNSIGNED:
+ log.warn("{} will probably cause value overflow.",
MYSQL_DOUBLE_UNSIGNED);
+ return BasicType.DOUBLE_TYPE;
+ case MYSQL_CHAR:
+ case MYSQL_TINYTEXT:
+ case MYSQL_MEDIUMTEXT:
+ case MYSQL_TEXT:
+ case MYSQL_VARCHAR:
+ case MYSQL_JSON:
+ return BasicType.STRING_TYPE;
+ case MYSQL_LONGTEXT:
+ log.warn(
+ "Type '{}' has a maximum precision of 536870911 in MySQL. "
+ + "Due to limitations in the seatunnel type system, "
+ + "the precision will be set to 2147483647.",
+ MYSQL_LONGTEXT);
+ return BasicType.STRING_TYPE;
+ case MYSQL_DATE:
+ return LocalTimeType.LOCAL_DATE_TYPE;
+ case MYSQL_TIME:
+ return LocalTimeType.LOCAL_TIME_TYPE;
+ case MYSQL_DATETIME:
+ case MYSQL_TIMESTAMP:
+ return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+
+ case MYSQL_TINYBLOB:
+ case MYSQL_MEDIUMBLOB:
+ case MYSQL_BLOB:
+ case MYSQL_LONGBLOB:
+ case MYSQL_VARBINARY:
+ case MYSQL_BINARY:
+ return PrimitiveByteArrayType.INSTANCE;
+
+ //Doesn't support yet
+ case MYSQL_GEOMETRY:
+ case MYSQL_UNKNOWN:
+ default:
+ final String columnName = column.name();
+ throw new UnsupportedOperationException(
+ String.format(
+ "Doesn't support MySQL type '%s' on column '%s' yet.",
+ typeName, columnName));
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
new file mode 100644
index 000000000..cbad9cde8
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils;
+
+import static
org.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.rowToArray;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset;
+
+import io.debezium.connector.mysql.MySqlConnectorConfig;
+import io.debezium.connector.mysql.MySqlDatabaseSchema;
+import io.debezium.connector.mysql.MySqlTopicSelector;
+import io.debezium.connector.mysql.MySqlValueConverters;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.jdbc.JdbcValueConverters;
+import io.debezium.jdbc.TemporalPrecisionMode;
+import io.debezium.relational.Column;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableId;
+import io.debezium.schema.TopicSelector;
+import io.debezium.util.SchemaNameAdjuster;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Utils to prepare MySQL SQL statement.
+ */
+public class MySqlUtils {
+
+ private MySqlUtils() {
+ }
+
+ public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId,
String columnName)
+ throws SQLException {
+ final String minMaxQuery =
+ String.format(
+ "SELECT MIN(%s), MAX(%s) FROM %s",
+ quote(columnName), quote(columnName), quote(tableId));
+ return jdbc.queryAndMap(
+ minMaxQuery,
+ rs -> {
+ if (!rs.next()) {
+ // this should never happen
+ throw new SQLException(
+ String.format(
+ "No result returned after running query [%s]",
+ minMaxQuery));
+ }
+ return rowToArray(rs, 2);
+ });
+ }
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId
tableId)
+ throws SQLException {
+ // The statement used to get approximate row count which is less
+ // accurate than COUNT(*), but is more efficient for large table.
+ final String useDatabaseStatement = String.format("USE %s;",
quote(tableId.catalog()));
+ final String rowCountQuery = String.format("SHOW TABLE STATUS LIKE
'%s';", tableId.table());
+ jdbc.executeWithoutCommitting(useDatabaseStatement);
+ return jdbc.queryAndMap(
+ rowCountQuery,
+ rs -> {
+ if (!rs.next() || rs.getMetaData().getColumnCount() < 5) {
+ throw new SQLException(
+ String.format(
+ "No result returned after running query [%s]",
+ rowCountQuery));
+ }
+ return rs.getLong(5);
+ });
+ }
+
+ public static Object queryMin(
+ JdbcConnection jdbc, TableId tableId, String columnName, Object
excludedLowerBound)
+ throws SQLException {
+ final String minQuery =
+ String.format(
+ "SELECT MIN(%s) FROM %s WHERE %s > ?",
+ quote(columnName), quote(tableId), quote(columnName));
+ return jdbc.prepareQueryAndMap(
+ minQuery,
+ ps -> ps.setObject(1, excludedLowerBound),
+ rs -> {
+ if (!rs.next()) {
+ // this should never happen
+ throw new SQLException(
+ String.format(
+ "No result returned after running query [%s]",
minQuery));
+ }
+ return rs.getObject(1);
+ });
+ }
+
+ public static Object queryNextChunkMax(
+ JdbcConnection jdbc,
+ TableId tableId,
+ String splitColumnName,
+ int chunkSize,
+ Object includedLowerBound)
+ throws SQLException {
+ String quotedColumn = quote(splitColumnName);
+ String query =
+ String.format(
+ "SELECT MAX(%s) FROM ("
+ + "SELECT %s FROM %s WHERE %s >= ? ORDER BY %s ASC LIMIT
%s"
+ + ") AS T",
+ quotedColumn,
+ quotedColumn,
+ quote(tableId),
+ quotedColumn,
+ quotedColumn,
+ chunkSize);
+ return jdbc.prepareQueryAndMap(
+ query,
+ ps -> ps.setObject(1, includedLowerBound),
+ rs -> {
+ if (!rs.next()) {
+ // this should never happen
+ throw new SQLException(
+ String.format(
+ "No result returned after running query [%s]",
query));
+ }
+ return rs.getObject(1);
+ });
+ }
+
+ public static String buildSplitScanQuery(
+ TableId tableId, SeaTunnelRowType rowType, boolean isFirstSplit,
boolean isLastSplit) {
+ return buildSplitQuery(tableId, rowType, isFirstSplit, isLastSplit,
-1, true);
+ }
+
+ private static String buildSplitQuery(
+ TableId tableId,
+ SeaTunnelRowType rowType,
+ boolean isFirstSplit,
+ boolean isLastSplit,
+ int limitSize,
+ boolean isScanningData) {
+ final String condition;
+
+ if (isFirstSplit && isLastSplit) {
+ condition = null;
+ } else if (isFirstSplit) {
+ final StringBuilder sql = new StringBuilder();
+ addPrimaryKeyColumnsToCondition(rowType, sql, " <= ?");
+ if (isScanningData) {
+ sql.append(" AND NOT (");
+ addPrimaryKeyColumnsToCondition(rowType, sql, " = ?");
+ sql.append(")");
+ }
+ condition = sql.toString();
+ } else if (isLastSplit) {
+ final StringBuilder sql = new StringBuilder();
+ addPrimaryKeyColumnsToCondition(rowType, sql, " >= ?");
+ condition = sql.toString();
+ } else {
+ final StringBuilder sql = new StringBuilder();
+ addPrimaryKeyColumnsToCondition(rowType, sql, " >= ?");
+ if (isScanningData) {
+ sql.append(" AND NOT (");
+ addPrimaryKeyColumnsToCondition(rowType, sql, " = ?");
+ sql.append(")");
+ }
+ sql.append(" AND ");
+ addPrimaryKeyColumnsToCondition(rowType, sql, " <= ?");
+ condition = sql.toString();
+ }
+
+ if (isScanningData) {
+ return buildSelectWithRowLimits(
+ tableId, limitSize, "*", Optional.ofNullable(condition),
Optional.empty());
+ } else {
+ final String orderBy =
+ String.join(", ", rowType.getFieldNames());
+ return buildSelectWithBoundaryRowLimits(
+ tableId,
+ limitSize,
+ getPrimaryKeyColumnsProjection(rowType),
+ getMaxPrimaryKeyColumnsProjection(rowType),
+ Optional.ofNullable(condition),
+ orderBy);
+ }
+ }
+
+ public static PreparedStatement readTableSplitDataStatement(
+ JdbcConnection jdbc,
+ String sql,
+ boolean isFirstSplit,
+ boolean isLastSplit,
+ Object[] splitStart,
+ Object[] splitEnd,
+ int primaryKeyNum,
+ int fetchSize) {
+ try {
+ final PreparedStatement statement = initStatement(jdbc, sql,
fetchSize);
+ if (isFirstSplit && isLastSplit) {
+ return statement;
+ }
+ if (isFirstSplit) {
+ for (int i = 0; i < primaryKeyNum; i++) {
+ statement.setObject(i + 1, splitEnd[i]);
+ statement.setObject(i + 1 + primaryKeyNum, splitEnd[i]);
+ }
+ } else if (isLastSplit) {
+ for (int i = 0; i < primaryKeyNum; i++) {
+ statement.setObject(i + 1, splitStart[i]);
+ }
+ } else {
+ for (int i = 0; i < primaryKeyNum; i++) {
+ statement.setObject(i + 1, splitStart[i]);
+ statement.setObject(i + 1 + primaryKeyNum, splitEnd[i]);
+ statement.setObject(i + 1 + 2 * primaryKeyNum,
splitEnd[i]);
+ }
+ }
+ return statement;
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to build the split data read
statement.", e);
+ }
+ }
+
+ public static SeaTunnelRowType getSplitType(Table table) {
+ List<Column> primaryKeys = table.primaryKeyColumns();
+ if (primaryKeys.isEmpty()) {
+ throw new SeaTunnelException(
+ String.format(
+ "Incremental snapshot for tables requires primary key,"
+ + " but table %s doesn't have primary key.",
+ table.id()));
+ }
+
+ // use first field in primary key as the split key
+ return getSplitType(primaryKeys.get(0));
+ }
+
+ /**
+ * Creates a new {@link MySqlDatabaseSchema} to monitor the latest MySql
database schemas.
+ */
+ public static MySqlDatabaseSchema createMySqlDatabaseSchema(
+ MySqlConnectorConfig dbzMySqlConfig, boolean isTableIdCaseSensitive) {
+ TopicSelector<TableId> topicSelector =
MySqlTopicSelector.defaultSelector(dbzMySqlConfig);
+ SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
+ MySqlValueConverters valueConverters =
getValueConverters(dbzMySqlConfig);
+ return new MySqlDatabaseSchema(
+ dbzMySqlConfig,
+ valueConverters,
+ topicSelector,
+ schemaNameAdjuster,
+ isTableIdCaseSensitive);
+ }
+
+ private static MySqlValueConverters
getValueConverters(MySqlConnectorConfig dbzMySqlConfig) {
+ TemporalPrecisionMode timePrecisionMode =
dbzMySqlConfig.getTemporalPrecisionMode();
+ JdbcValueConverters.DecimalMode decimalMode =
dbzMySqlConfig.getDecimalMode();
+ String bigIntUnsignedHandlingModeStr =
+ dbzMySqlConfig
+ .getConfig()
+ .getString(MySqlConnectorConfig.BIGINT_UNSIGNED_HANDLING_MODE);
+ MySqlConnectorConfig.BigIntUnsignedHandlingMode
bigIntUnsignedHandlingMode =
+ MySqlConnectorConfig.BigIntUnsignedHandlingMode.parse(
+ bigIntUnsignedHandlingModeStr);
+ JdbcValueConverters.BigIntUnsignedMode bigIntUnsignedMode =
+ bigIntUnsignedHandlingMode.asBigIntUnsignedMode();
+
+ boolean timeAdjusterEnabled =
+
dbzMySqlConfig.getConfig().getBoolean(MySqlConnectorConfig.ENABLE_TIME_ADJUSTER);
+ return new MySqlValueConverters(
+ decimalMode,
+ timePrecisionMode,
+ bigIntUnsignedMode,
+ dbzMySqlConfig.binaryHandlingMode(),
+ timeAdjusterEnabled ? MySqlValueConverters::adjustTemporal : x ->
x,
+ MySqlValueConverters::defaultParsingErrorHandler);
+ }
+
+ public static BinlogOffset getBinlogPosition(SourceRecord dataRecord) {
+ return getBinlogPosition(dataRecord.sourceOffset());
+ }
+
+ public static BinlogOffset getBinlogPosition(Map<String, ?> offset) {
+ Map<String, String> offsetStrMap = new HashMap<>();
+ for (Map.Entry<String, ?> entry : offset.entrySet()) {
+ offsetStrMap.put(
+ entry.getKey(), entry.getValue() == null ? null :
entry.getValue().toString());
+ }
+ return new BinlogOffset(offsetStrMap);
+ }
+
+ public static SeaTunnelRowType getSplitType(Column splitColumn) {
+ return new SeaTunnelRowType(new String[]{splitColumn.name()},
+ new
SeaTunnelDataType<?>[]{MySqlTypeUtils.convertFromColumn(splitColumn)});
+ }
+
+ public static Column getSplitColumn(Table table) {
+ List<Column> primaryKeys = table.primaryKeyColumns();
+ if (primaryKeys.isEmpty()) {
+ throw new SeaTunnelException(
+ String.format(
+ "Incremental snapshot for tables requires primary key,"
+ + " but table %s doesn't have primary key.",
+ table.id()));
+ }
+
+ // use first field in primary key as the split key
+ return primaryKeys.get(0);
+ }
+
+ public static String quote(String dbOrTableName) {
+ return "`" + dbOrTableName + "`";
+ }
+
+ public static String quote(TableId tableId) {
+ return tableId.toQuotedString('`');
+ }
+
+ private static PreparedStatement initStatement(JdbcConnection jdbc, String
sql, int fetchSize)
+ throws SQLException {
+ final Connection connection = jdbc.connection();
+ connection.setAutoCommit(false);
+ final PreparedStatement statement = connection.prepareStatement(sql);
+ statement.setFetchSize(fetchSize);
+ return statement;
+ }
+
+ private static void addPrimaryKeyColumnsToCondition(
+ SeaTunnelRowType rowType, StringBuilder sql, String predicate) {
+ for (Iterator<String> fieldNamesIt =
Arrays.stream(rowType.getFieldNames()).iterator();
+ fieldNamesIt.hasNext(); ) {
+ sql.append(fieldNamesIt.next()).append(predicate);
+ if (fieldNamesIt.hasNext()) {
+ sql.append(" AND ");
+ }
+ }
+ }
+
+ private static String getPrimaryKeyColumnsProjection(SeaTunnelRowType
rowType) {
+ StringBuilder sql = new StringBuilder();
+ for (Iterator<String> fieldNamesIt =
Arrays.stream(rowType.getFieldNames()).iterator();
+ fieldNamesIt.hasNext(); ) {
+ sql.append(fieldNamesIt.next());
+ if (fieldNamesIt.hasNext()) {
+ sql.append(" , ");
+ }
+ }
+ return sql.toString();
+ }
+
+ private static String getMaxPrimaryKeyColumnsProjection(SeaTunnelRowType
rowType) {
+ StringBuilder sql = new StringBuilder();
+ for (Iterator<String> fieldNamesIt =
Arrays.stream(rowType.getFieldNames()).iterator();
+ fieldNamesIt.hasNext(); ) {
+ sql.append("MAX(" + fieldNamesIt.next() + ")");
+ if (fieldNamesIt.hasNext()) {
+ sql.append(" , ");
+ }
+ }
+ return sql.toString();
+ }
+
+ private static String buildSelectWithRowLimits(
+ TableId tableId,
+ int limit,
+ String projection,
+ Optional<String> condition,
+ Optional<String> orderBy) {
+ final StringBuilder sql = new StringBuilder("SELECT ");
+ sql.append(projection).append(" FROM ");
+ sql.append(quotedTableIdString(tableId));
+ if (condition.isPresent()) {
+ sql.append(" WHERE ").append(condition.get());
+ }
+ if (orderBy.isPresent()) {
+ sql.append(" ORDER BY ").append(orderBy.get());
+ }
+ if (limit > 0) {
+ sql.append(" LIMIT ").append(limit);
+ }
+ return sql.toString();
+ }
+
+ private static String buildSelectWithBoundaryRowLimits(
+ TableId tableId,
+ int limit,
+ String projection,
+ String maxColumnProjection,
+ Optional<String> condition,
+ String orderBy) {
+ final StringBuilder sql = new StringBuilder("SELECT ");
+ sql.append(maxColumnProjection);
+ sql.append(" FROM (");
+ sql.append("SELECT ");
+ sql.append(projection);
+ sql.append(" FROM ");
+ sql.append(quotedTableIdString(tableId));
+ if (condition.isPresent()) {
+ sql.append(" WHERE ").append(condition.get());
+ }
+ sql.append(" ORDER BY ").append(orderBy).append(" LIMIT
").append(limit);
+ sql.append(") T");
+ return sql.toString();
+ }
+
+ private static String quotedTableIdString(TableId tableId) {
+ return tableId.toQuotedString('`');
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/TableDiscoveryUtils.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/TableDiscoveryUtils.java
new file mode 100644
index 000000000..1e3e65a6c
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/TableDiscoveryUtils.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils;
+
+import static
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlUtils.quote;
+
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.RelationalTableFilters;
+import io.debezium.relational.TableId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utilities to discovery matched tables.
+ */
+public class TableDiscoveryUtils {
+ private static final Logger LOG =
LoggerFactory.getLogger(TableDiscoveryUtils.class);
+
+ public static List<TableId> listTables(JdbcConnection jdbc,
RelationalTableFilters tableFilters)
+ throws SQLException {
+ final List<TableId> capturedTableIds = new ArrayList<>();
+ // -------------------
+ // READ DATABASE NAMES
+ // -------------------
+ // Get the list of databases ...
+ LOG.info("Read list of available databases");
+ final List<String> databaseNames = new ArrayList<>();
+
+ jdbc.query(
+ "SHOW DATABASES",
+ rs -> {
+ while (rs.next()) {
+ databaseNames.add(rs.getString(1));
+ }
+ });
+ LOG.info("\t list of available databases is: {}", databaseNames);
+
+ // ----------------
+ // READ TABLE NAMES
+ // ----------------
+ // Get the list of table IDs for each database. We can't use a
prepared statement with
+ // MySQL, so we have to build the SQL statement each time. Although in
other cases this
+ // might lead to SQL injection, in our case we are reading the
database names from the
+ // database and not taking them from the user ...
+ LOG.info("Read list of available tables in each database");
+ for (String dbName : databaseNames) {
+ try {
+ jdbc.query(
+ "SHOW FULL TABLES IN " + quote(dbName) + " where
Table_Type = 'BASE TABLE'",
+ rs -> {
+ while (rs.next()) {
+ TableId tableId = new TableId(dbName, null,
rs.getString(1));
+ if
(tableFilters.dataCollectionFilter().isIncluded(tableId)) {
+ capturedTableIds.add(tableId);
+ LOG.info("\t including '{}' for further
processing", tableId);
+ } else {
+ LOG.info("\t '{}' is filtered out of
capturing", tableId);
+ }
+ }
+ });
+ } catch (SQLException e) {
+ // We were unable to execute the query or process the results,
so skip this ...
+ LOG.warn(
+ "\t skipping database '{}' due to error reading tables:
{}",
+ dbName,
+ e.getMessage());
+ }
+ }
+ return capturedTableIds;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/pom.xml
b/seatunnel-connectors-v2/connector-cdc/pom.xml
index cf6a2e6c0..722d678b3 100644
--- a/seatunnel-connectors-v2/connector-cdc/pom.xml
+++ b/seatunnel-connectors-v2/connector-cdc/pom.xml
@@ -30,9 +30,11 @@
<packaging>pom</packaging>
<properties>
+ <debezium.version>1.6.4.Final</debezium.version>
</properties>
<modules>
<module>connector-cdc-base</module>
+ <module>connector-cdc-mysql</module>
</modules>
</project>