This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new ae981df67 [feature][connector] add mysql cdc reader (#3455)
ae981df67 is described below

commit ae981df6751ecaad9c19d336f2ae7d428c1ada99
Author: ic4y <[email protected]>
AuthorDate: Thu Nov 17 17:41:18 2022 +0800

    [feature][connector] add mysql cdc reader (#3455)
    
    * [feature][connector] add mysql cdc reader
    
    * Update seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/pom.xml
    
    Co-authored-by: hailin0 <[email protected]>
    
    Co-authored-by: hailin0 <[email protected]>
---
 LICENSE                                            |   1 +
 .../connector-cdc/connector-cdc-base/pom.xml       |   1 -
 .../cdc/base/config/JdbcSourceConfigFactory.java   | 182 +++++++++
 .../cdc/base/option/JdbcSourceOptions.java         | 107 +++++
 .../external/IncrementalSourceStreamFetcher.java   |  52 +--
 .../{ => connector-cdc-mysql}/pom.xml              |  41 +-
 .../cdc/mysql/config/MySqlSourceConfig.java        |  86 ++++
 .../cdc/mysql/config/MySqlSourceConfigFactory.java | 123 ++++++
 .../seatunnel/cdc/mysql/config/ServerIdRange.java  | 112 ++++++
 .../seatunnel/cdc/mysql/source/MySqlDialect.java   | 116 ++++++
 .../mysql/source/MysqlPooledDataSourceFactory.java |  37 ++
 .../cdc/mysql/source/offset/BinlogOffset.java      | 211 ++++++++++
 .../reader/fetch/MySqlSourceFetchTaskContext.java  | 350 +++++++++++++++++
 .../reader/fetch/binlog/MySqlBinlogFetchTask.java  |  74 ++++
 .../reader/fetch/scan/MySqlSnapshotFetchTask.java  |  67 ++++
 .../fetch/scan/MySqlSnapshotSplitReadTask.java     | 371 ++++++++++++++++++
 .../SnapshotSplitChangeEventSourceContext.java     |  48 +++
 .../cdc/mysql/utils/MySqlConnectionUtils.java      | 175 +++++++++
 .../seatunnel/cdc/mysql/utils/MySqlTypeUtils.java  | 159 ++++++++
 .../seatunnel/cdc/mysql/utils/MySqlUtils.java      | 432 +++++++++++++++++++++
 .../cdc/mysql/utils/TableDiscoveryUtils.java       |  90 +++++
 seatunnel-connectors-v2/connector-cdc/pom.xml      |   2 +
 22 files changed, 2779 insertions(+), 58 deletions(-)

diff --git a/LICENSE b/LICENSE
index 0abce8328..6d2243fa1 100644
--- a/LICENSE
+++ b/LICENSE
@@ -216,6 +216,7 @@ 
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connec
 
seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/
  from https://github.com/apache/flink
 
seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/
   from https://github.com/apache/iceberg
 
seatunnel-connectors-v2/connector-cdc/connector-base/src/main/java/org/apache/seatunnel/connectors/cdc/base
 from https://github.com/ververica/flink-cdc-connectors
+seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql
 from https://github.com/ververica/flink-cdc-connectors
 
seatunnel-connectors-v2/connector-cdc/connector-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium
 from https://github.com/ververica/flink-cdc-connectors
 generate_client_protocol.sh                                                    
                                                             from 
https://github.com/hazelcast/hazelcast
 
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
                          from https://github.com/hazelcast/hazelcast
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/pom.xml 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/pom.xml
index feff219d1..cc7ff4991 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/pom.xml
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/pom.xml
@@ -29,7 +29,6 @@
     <artifactId>connector-cdc-base</artifactId>
 
     <properties>
-        <debezium.version>1.6.4.Final</debezium.version>
         <hikaricp.version>4.0.3</hikaricp.version>
     </properties>
 
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
new file mode 100644
index 000000000..041999adb
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.config;
+
+import org.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
+import org.seatunnel.connectors.cdc.base.option.SourceOptions;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+/** A {@link SourceConfig.Factory} to provide {@link SourceConfig} of JDBC 
data source. */
+@SuppressWarnings("checkstyle:MagicNumber")
+public abstract class JdbcSourceConfigFactory implements 
SourceConfig.Factory<JdbcSourceConfig> {
+
+    private static final long serialVersionUID = 1L;
+
+    protected int port;
+    protected String hostname;
+    protected String username;
+    protected String password;
+    protected List<String> databaseList;
+    protected List<String> tableList;
+    protected StartupConfig startupConfig;
+    protected StopConfig stopConfig;
+    protected boolean includeSchemaChanges = false;
+    protected double distributionFactorUpper = 1000.0d;
+    protected double distributionFactorLower = 0.05d;
+    protected int splitSize = SourceOptions.SNAPSHOT_SPLIT_SIZE.defaultValue();
+    protected int fetchSize = SourceOptions.SNAPSHOT_FETCH_SIZE.defaultValue();
+    protected String serverTimeZone = 
JdbcSourceOptions.SERVER_TIME_ZONE.defaultValue();
+    protected Duration connectTimeout = 
JdbcSourceOptions.CONNECT_TIMEOUT.defaultValue();
+    protected int connectMaxRetries = 
JdbcSourceOptions.CONNECT_MAX_RETRIES.defaultValue();
+    protected int connectionPoolSize = 
JdbcSourceOptions.CONNECTION_POOL_SIZE.defaultValue();
+    protected Properties dbzProperties;
+
+    /** Integer port number of the database server. */
+    public JdbcSourceConfigFactory hostname(String hostname) {
+        this.hostname = hostname;
+        return this;
+    }
+
+    /** Integer port number of the database server. */
+    public JdbcSourceConfigFactory port(int port) {
+        this.port = port;
+        return this;
+    }
+
+    /**
+     * An optional list of regular expressions that match database names to be 
monitored; any
+     * database name not included in the whitelist will be excluded from 
monitoring. By default all
+     * databases will be monitored.
+     */
+    public JdbcSourceConfigFactory databaseList(String... databaseList) {
+        this.databaseList = Arrays.asList(databaseList);
+        return this;
+    }
+
+    /**
+     * An optional list of regular expressions that match fully-qualified 
table identifiers for
+     * tables to be monitored; any table not included in the list will be 
excluded from monitoring.
+     * Each identifier is of the form databaseName.tableName. by default the 
connector will monitor
+     * every non-system table in each monitored database.
+     */
+    public JdbcSourceConfigFactory tableList(String... tableList) {
+        this.tableList = Arrays.asList(tableList);
+        return this;
+    }
+
+    /** Name of the user to use when connecting to the database server. */
+    public JdbcSourceConfigFactory username(String username) {
+        this.username = username;
+        return this;
+    }
+
+    /** Password to use when connecting to the database server. */
+    public JdbcSourceConfigFactory password(String password) {
+        this.password = password;
+        return this;
+    }
+
+    /**
+     * The session time zone in database server, e.g. "America/Los_Angeles". 
It controls how the
+     * TIMESTAMP type converted to STRING. See more
+     * 
https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-temporal-types
+     */
+    public JdbcSourceConfigFactory serverTimeZone(String timeZone) {
+        this.serverTimeZone = timeZone;
+        return this;
+    }
+
+    /**
+     * The split size (number of rows) of table snapshot, captured tables are 
split into multiple
+     * splits when read the snapshot of table.
+     */
+    public JdbcSourceConfigFactory splitSize(int splitSize) {
+        this.splitSize = splitSize;
+        return this;
+    }
+
+    /**
+     * The upper bound of split key evenly distribution factor, the factor is 
used to determine
+     * whether the table is evenly distribution or not.
+     */
+    public JdbcSourceConfigFactory distributionFactorUpper(double 
distributionFactorUpper) {
+        this.distributionFactorUpper = distributionFactorUpper;
+        return this;
+    }
+
+    /**
+     * The lower bound of split key evenly distribution factor, the factor is 
used to determine
+     * whether the table is evenly distribution or not.
+     */
+    public JdbcSourceConfigFactory distributionFactorLower(double 
distributionFactorLower) {
+        this.distributionFactorLower = distributionFactorLower;
+        return this;
+    }
+
+    /** The maximum fetch size for per poll when read table snapshot. */
+    public JdbcSourceConfigFactory fetchSize(int fetchSize) {
+        this.fetchSize = fetchSize;
+        return this;
+    }
+
+    /**
+     * The maximum time that the connector should wait after trying to connect 
to the database
+     * server before timing out.
+     */
+    public JdbcSourceConfigFactory connectTimeout(Duration connectTimeout) {
+        this.connectTimeout = connectTimeout;
+        return this;
+    }
+
+    /** The connection pool size. */
+    public JdbcSourceConfigFactory connectionPoolSize(int connectionPoolSize) {
+        this.connectionPoolSize = connectionPoolSize;
+        return this;
+    }
+
+    /** The max retry times to get connection. */
+    public JdbcSourceConfigFactory connectMaxRetries(int connectMaxRetries) {
+        this.connectMaxRetries = connectMaxRetries;
+        return this;
+    }
+
+    /** Whether the {@link SourceConfig} should output the schema changes or 
not. */
+    public JdbcSourceConfigFactory includeSchemaChanges(boolean 
includeSchemaChanges) {
+        this.includeSchemaChanges = includeSchemaChanges;
+        return this;
+    }
+
+    /** The Debezium connector properties. For example, "snapshot.mode". */
+    public JdbcSourceConfigFactory debeziumProperties(Properties properties) {
+        this.dbzProperties = properties;
+        return this;
+    }
+
+    /** Specifies the startup options. */
+    public JdbcSourceConfigFactory startupOptions(StartupConfig startupConfig) 
{
+        this.startupConfig = startupConfig;
+        return this;
+    }
+
+    @Override
+    public abstract JdbcSourceConfig create(int subtask);
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
new file mode 100644
index 000000000..9f387dea7
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.option;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import org.seatunnel.connectors.cdc.base.source.IncrementalSource;
+
+import java.time.Duration;
+
+/** Configurations for {@link IncrementalSource} of JDBC data source. */
+@SuppressWarnings("checkstyle:MagicNumber")
+public class JdbcSourceOptions extends SourceOptions {
+
+    public static final Option<String> HOSTNAME =
+            Options.key("hostname")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("IP address or hostname of the database 
server.");
+
+    public static final Option<Integer> PORT =
+            Options.key("port")
+                    .intType()
+                    .defaultValue(3306)
+                    .withDescription("Integer port number of the database 
server.");
+
+    public static final Option<String> USERNAME =
+            Options.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Name of the database to use when connecting to 
the database server.");
+
+    public static final Option<String> PASSWORD =
+            Options.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Password to use when connecting to the 
database server.");
+
+    public static final Option<String> DATABASE_NAME =
+            Options.key("database-name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Database name of the database to 
monitor.");
+
+    public static final Option<String> TABLE_NAME =
+            Options.key("table-name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Table name of the database to monitor.");
+
+    public static final Option<String> SERVER_TIME_ZONE =
+            Options.key("server-time-zone")
+                    .stringType()
+                    .defaultValue("UTC")
+                    .withDescription("The session time zone in database 
server.");
+
+    public static final Option<String> SERVER_ID =
+            Options.key("server-id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "A numeric ID or a numeric ID range of this 
database client, "
+                                    + "The numeric ID syntax is like '5400', 
the numeric ID range syntax "
+                                    + "is like '5400-5408', The numeric ID 
range syntax is recommended when "
+                                    + "'scan.incremental.snapshot.enabled' 
enabled. Every ID must be unique across all "
+                                    + "currently-running database processes in 
the MySQL cluster. This connector"
+                                    + " joins the MySQL  cluster as another 
server (with this unique ID) "
+                                    + "so it can read the binlog. By default, 
a random number is generated between"
+                                    + " 5400 and 6400, though we recommend 
setting an explicit value.");
+
+    public static final Option<Duration> CONNECT_TIMEOUT =
+            Options.key("connect.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(30))
+                    .withDescription(
+                            "The maximum time that the connector should wait 
after trying to connect to the database server before timing out.");
+
+    public static final Option<Integer> CONNECTION_POOL_SIZE =
+            Options.key("connection.pool.size")
+                    .intType()
+                    .defaultValue(20)
+                    .withDescription("The connection pool size.");
+
+    public static final Option<Integer> CONNECT_MAX_RETRIES =
+            Options.key("connect.max-retries")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription(
+                            "The max retry times that the connector should 
retry to build database server connection.");
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
index 6b6e22104..99d468da2 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
@@ -22,7 +22,6 @@ import org.apache.seatunnel.common.utils.SeaTunnelException;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.debezium.connector.base.ChangeEventQueue;
 import io.debezium.pipeline.DataChangeEvent;
-import io.debezium.relational.TableId;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.seatunnel.connectors.cdc.base.source.offset.Offset;
@@ -31,12 +30,8 @@ import 
org.seatunnel.connectors.cdc.base.source.split.SourceRecords;
 import org.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
 
 import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
@@ -49,8 +44,6 @@ import java.util.concurrent.TimeUnit;
 public class IncrementalSourceStreamFetcher implements Fetcher<SourceRecords, 
SourceSplitBase> {
     private final FetchTask.Context taskContext;
     private final ExecutorService executorService;
-    private final Set<TableId> pureStreamPhaseTables;
-
     private volatile ChangeEventQueue<DataChangeEvent> queue;
     private volatile Throwable readException;
 
@@ -58,7 +51,7 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
 
     private IncrementalSplit currentIncrementalSplit;
 
-    private Map<TableId, Offset> maxSplitHighWatermarkMap;
+    private Offset splitStartWatermark;
 
     private static final long READER_CLOSE_TIMEOUT_SECONDS = 30L;
 
@@ -67,7 +60,6 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
         ThreadFactory threadFactory =
             new ThreadFactoryBuilder().setNameFormat("debezium-reader-" + 
subTaskId).build();
         this.executorService = 
Executors.newSingleThreadExecutor(threadFactory);
-        this.pureStreamPhaseTables = new HashSet<>();
     }
 
     @Override
@@ -143,55 +135,17 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
 
     /**
      * Returns the record should emit or not.
-     *
-     * <p>The watermark signal algorithm is the incremental split reader only 
sends the change event that
-     * belongs to its finished snapshot splits. For each snapshot split, the 
change event is valid
-     * since the offset is after its high watermark.
-     *
-     * <pre> E.g: the data input is :
-     *    snapshot-split-0 info : [0,    1024) highWatermark0
-     *    snapshot-split-1 info : [1024, 2048) highWatermark1
-     *  the data output is:
-     *  only the change event belong to [0,    1024) and offset is after 
highWatermark0 should send,
-     *  only the change event belong to [1024, 2048) and offset is after 
highWatermark1 should send.
-     * </pre>
      */
     private boolean shouldEmit(SourceRecord sourceRecord) {
         if (taskContext.isDataChangeRecord(sourceRecord)) {
-            TableId tableId = taskContext.getTableId(sourceRecord);
             Offset position = taskContext.getStreamOffset(sourceRecord);
-            return hasEnterPureStreamPhase(tableId, position);
+            return position.isAtOrAfter(splitStartWatermark);
             // TODO only the table who captured snapshot splits need to 
filter( Used to support Exactly-Once )
-            // not in the monitored splits scope, do not emit
         }
-        // always send the schema change event and signal event
-        // we need record them to state of SeaTunnel
         return true;
     }
 
-    private boolean hasEnterPureStreamPhase(TableId tableId, Offset position) {
-        if (pureStreamPhaseTables.contains(tableId)) {
-            return true;
-        }
-        // the existed tables those have finished snapshot reading
-        if (maxSplitHighWatermarkMap.containsKey(tableId)
-            && position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) {
-            pureStreamPhaseTables.add(tableId);
-            return true;
-        }
-
-        return !maxSplitHighWatermarkMap.containsKey(tableId)
-            && taskContext.getTableFilter().isIncluded(tableId);
-    }
-
     private void configureFilter() {
-        Map<TableId, Offset> tableIdOffsetPositionMap = new HashMap<>();
-        // latest-offset mode
-
-        for (TableId tableId : currentIncrementalSplit.getTableIds()) {
-            tableIdOffsetPositionMap.put(tableId, 
currentIncrementalSplit.getStartupOffset());
-        }
-        this.maxSplitHighWatermarkMap = tableIdOffsetPositionMap;
-        this.pureStreamPhaseTables.clear();
+        splitStartWatermark = currentIncrementalSplit.getStartupOffset();
     }
 }
diff --git a/seatunnel-connectors-v2/connector-cdc/pom.xml 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/pom.xml
similarity index 53%
copy from seatunnel-connectors-v2/connector-cdc/pom.xml
copy to seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/pom.xml
index cf6a2e6c0..e5ec3a4b4 100644
--- a/seatunnel-connectors-v2/connector-cdc/pom.xml
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/pom.xml
@@ -21,18 +21,43 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <parent>
-        <artifactId>seatunnel-connectors-v2</artifactId>
+        <artifactId>connector-cdc</artifactId>
         <groupId>org.apache.seatunnel</groupId>
         <version>${revision}</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-    <artifactId>connector-cdc</artifactId>
-    <packaging>pom</packaging>
+    <artifactId>connector-cdc-mysql</artifactId>
 
-    <properties>
-    </properties>
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-cdc-base</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.debezium</groupId>
+            <artifactId>debezium-connector-mysql</artifactId>
+        </dependency>
+    </dependencies>
+
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.apache.seatunnel</groupId>
+                <artifactId>connector-cdc-base</artifactId>
+                <version>${project.version}</version>
+                <scope>compile</scope>
+            </dependency>
+
+            <dependency>
+                <groupId>io.debezium</groupId>
+                <artifactId>debezium-connector-mysql</artifactId>
+                <version>${debezium.version}</version>
+                <scope>compile</scope>
+            </dependency>
+
+        </dependencies>
+    </dependencyManagement>
 
-    <modules>
-        <module>connector-cdc-base</module>
-    </modules>
 </project>
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfig.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfig.java
new file mode 100644
index 000000000..317bc5b8a
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfig.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config;
+
+import io.debezium.connector.mysql.MySqlConnectorConfig;
+import io.debezium.relational.RelationalTableFilters;
+import org.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
+import org.seatunnel.connectors.cdc.base.config.StartupConfig;
+import org.seatunnel.connectors.cdc.base.config.StopConfig;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Describes the connection information of the Mysql database and the 
configuration information for
+ * performing snapshotting and streaming reading, such as splitSize.
+ */
+public class MySqlSourceConfig extends JdbcSourceConfig {
+
+    private static final long serialVersionUID = 1L;
+
+    public MySqlSourceConfig(
+            StartupConfig startupConfig,
+            StopConfig stopConfig,
+            List<String> databaseList,
+            List<String> tableList,
+            int splitSize,
+            double distributionFactorUpper,
+            double distributionFactorLower,
+            Properties dbzProperties,
+            String driverClassName,
+            String hostname,
+            int port,
+            String username,
+            String password,
+            int fetchSize,
+            String serverTimeZone,
+            Duration connectTimeout,
+            int connectMaxRetries,
+            int connectionPoolSize) {
+        super(
+                startupConfig,
+                stopConfig,
+                databaseList,
+                tableList,
+                splitSize,
+                distributionFactorUpper,
+                distributionFactorLower,
+                dbzProperties,
+                driverClassName,
+                hostname,
+                port,
+                username,
+                password,
+                fetchSize,
+                serverTimeZone,
+                connectTimeout,
+                connectMaxRetries,
+                connectionPoolSize);
+    }
+
+    @Override
+    public MySqlConnectorConfig getDbzConnectorConfig() {
+        return new MySqlConnectorConfig(getDbzConfiguration());
+    }
+
+    public RelationalTableFilters getTableFilters() {
+        return getDbzConnectorConfig().getTableFilters();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
new file mode 100644
index 000000000..25d427a9b
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import io.debezium.config.Configuration;
+import io.debezium.connector.mysql.MySqlConnectorConfig;
+import org.seatunnel.connectors.cdc.base.config.JdbcSourceConfigFactory;
+
+import java.util.Properties;
+
+/** A factory to initialize {@link MySqlSourceConfig}. */
+public class MySqlSourceConfigFactory extends JdbcSourceConfigFactory {
+
+    private ServerIdRange serverIdRange;
+
+    /**
+     * A numeric ID or a numeric ID range of this database client, The numeric 
ID syntax is like
+     * '5400', the numeric ID range syntax is like '5400-5408', The numeric ID 
range syntax is
+     * required when 'scan.incremental.snapshot.enabled' enabled. Every ID 
must be unique across all
+     * currently-running database processes in the MySQL cluster. This 
connector joins the MySQL
+     * cluster as another server (with this unique ID) so it can read the 
binlog. By default, a
+     * random number is generated between 5400 and 6400, though we recommend 
setting an explicit
+     * value."
+     */
+    public MySqlSourceConfigFactory serverId(String serverId) {
+        this.serverIdRange = ServerIdRange.from(serverId);
+        return this;
+    }
+
+    /** Creates a new {@link MySqlSourceConfig} for the given subtask {@code 
subtaskId}. */
+    public MySqlSourceConfig create(int subtaskId) {
+        Properties props = new Properties();
+        // hard code server name, because we don't need to distinguish it, 
docs:
+        // Logical name that identifies and provides a namespace for the 
particular
+        // MySQL database server/cluster being monitored. The logical name 
should be
+        // unique across all other connectors, since it is used as a prefix 
for all
+        // Kafka topic names emanating from this connector.
+        // Only alphanumeric characters and underscores should be used.
+        props.setProperty("database.server.name", "mysql_binlog_source");
+        props.setProperty("database.hostname", checkNotNull(hostname));
+        props.setProperty("database.user", checkNotNull(username));
+        props.setProperty("database.password", checkNotNull(password));
+        props.setProperty("database.port", String.valueOf(port));
+        props.setProperty("database.fetchSize", String.valueOf(fetchSize));
+        props.setProperty("database.responseBuffering", "adaptive");
+        props.setProperty("database.serverTimezone", serverTimeZone);
+
+        props.setProperty("connect.timeout.ms", 
String.valueOf(connectTimeout.toMillis()));
+        // the underlying debezium reader should always capture the schema 
changes and forward them.
+        // Note: the includeSchemaChanges parameter is used to control 
emitting the schema record,
+        // only DataStream API program need to emit the schema record, the 
Table API need not
+
+        //TODO Not yet supported
+        props.setProperty("include.schema.changes", String.valueOf(false));
+        // disable the offset flush totally
+        props.setProperty("offset.flush.interval.ms", 
String.valueOf(Long.MAX_VALUE));
+        // disable tombstones
+        props.setProperty("tombstones.on.delete", String.valueOf(false));
+        // debezium use "long" mode to handle unsigned bigint by default,
+        // but it'll cause lose of precise when the value is larger than 2^63,
+        // so use "precise" mode to avoid it.
+        props.put("bigint.unsigned.handling.mode", "precise");
+
+        if (serverIdRange != null) {
+            props.setProperty("database.server.id.range", 
String.valueOf(serverIdRange));
+            int serverId = serverIdRange.getServerId(subtaskId);
+            props.setProperty("database.server.id", String.valueOf(serverId));
+        }
+        if (databaseList != null) {
+            props.setProperty("database.include.list", String.join(",", 
databaseList));
+        }
+        if (tableList != null) {
+            props.setProperty("table.include.list", String.join(",", 
tableList));
+        }
+        if (serverTimeZone != null) {
+            props.setProperty("database.serverTimezone", serverTimeZone);
+        }
+
+        // override the user-defined debezium properties
+        if (dbzProperties != null) {
+            dbzProperties.forEach(props::put);
+        }
+
+        Configuration dbzConfiguration = Configuration.from(props);
+        String driverClassName = 
dbzConfiguration.getString(MySqlConnectorConfig.JDBC_DRIVER);
+        return new MySqlSourceConfig(
+                startupConfig,
+                stopConfig,
+                databaseList,
+                tableList,
+                splitSize,
+                distributionFactorUpper,
+                distributionFactorLower,
+                props,
+                driverClassName,
+                hostname,
+                port,
+                username,
+                password,
+                fetchSize,
+                serverTimeZone,
+                connectTimeout,
+                connectMaxRetries,
+                connectionPoolSize);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/ServerIdRange.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/ServerIdRange.java
new file mode 100644
index 000000000..cc36d8f18
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/ServerIdRange.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
+
+import java.io.Serializable;
+
+/**
+ * This class defines a range of server id. The boundaries of the range are 
inclusive.
+ *
+ * @see JdbcSourceOptions#SERVER_ID
+ */
+public class ServerIdRange implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    /** Start of the range (inclusive). */
+    private final int startServerId;
+
+    /** End of the range (inclusive). */
+    private final int endServerId;
+
+    public ServerIdRange(int startServerId, int endServerId) {
+        this.startServerId = startServerId;
+        this.endServerId = endServerId;
+    }
+
+    public int getStartServerId() {
+        return startServerId;
+    }
+
+    public int getEndServerId() {
+        return endServerId;
+    }
+
+    public int getServerId(int subTaskId) {
+        checkArgument(subTaskId >= 0, "Subtask ID %s shouldn't be a negative 
number.", subTaskId);
+        if (subTaskId > getNumberOfServerIds()) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Subtask ID %s is out of server id range %s, "
+                                    + "please adjust the server id range to "
+                                    + "make the number of server id larger 
than "
+                                    + "the source parallelism.",
+                            subTaskId, this));
+        }
+        return startServerId + subTaskId;
+    }
+
+    public int getNumberOfServerIds() {
+        return endServerId - startServerId + 1;
+    }
+
+    @Override
+    public String toString() {
+        if (startServerId == endServerId) {
+            return String.valueOf(startServerId);
+        } else {
+            return startServerId + "-" + endServerId;
+        }
+    }
+
+    /**
+     * Returns a {@link ServerIdRange} from a server id range string which 
likes '5400-5408' or a
+     * single server id likes '5400'.
+     */
+    public static ServerIdRange from(String range) {
+        if (range == null) {
+            return null;
+        }
+        if (range.contains("-")) {
+            String[] idArray = range.split("-");
+            if (idArray.length != 2) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "The server id range should be syntax like 
'5400-5500', but got: %s",
+                                range));
+            }
+            return new ServerIdRange(
+                    parseServerId(idArray[0].trim()), 
parseServerId(idArray[1].trim()));
+        } else {
+            int serverId = parseServerId(range);
+            return new ServerIdRange(serverId, serverId);
+        }
+    }
+
+    private static int parseServerId(String serverIdValue) {
+        try {
+            return Integer.parseInt(serverIdValue);
+        } catch (NumberFormatException e) {
+            throw new IllegalStateException(
+                    String.format("The server id %s is not a valid numeric.", 
serverIdValue), e);
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
new file mode 100644
index 000000000..d7cf8c3db
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createBinaryClient;
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createMySqlConnection;
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.isTableIdCaseSensitive;
+
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfigFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.MySqlSourceFetchTaskContext;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.binlog.MySqlBinlogFetchTask;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.scan.MySqlSnapshotFetchTask;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.TableDiscoveryUtils;
+
+import com.github.shyiko.mysql.binlog.BinaryLogClient;
+import io.debezium.connector.mysql.MySqlConnection;
+import io.debezium.connector.mysql.legacy.MySqlSchema;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.TableId;
+import org.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
+import org.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
+import 
org.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
+import 
org.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
+import org.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
+import org.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+
+import java.sql.SQLException;
+import java.util.List;
+
+/** The {@link JdbcDataSourceDialect} implementation for MySQL datasource. */
+
+public class MySqlDialect implements JdbcDataSourceDialect {
+
+    private static final long serialVersionUID = 1L;
+    private final MySqlSourceConfigFactory configFactory;
+    private final MySqlSourceConfig sourceConfig;
+    private transient MySqlSchema mySqlSchema;
+
+    public MySqlDialect(MySqlSourceConfigFactory configFactory) {
+        this.configFactory = configFactory;
+        this.sourceConfig = configFactory.create(0);
+    }
+
+    @Override
+    public String getName() {
+        return "MySQL";
+    }
+
+    @Override
+    public boolean isDataCollectionIdCaseSensitive(JdbcSourceConfig 
sourceConfig) {
+        try (JdbcConnection jdbcConnection = openJdbcConnection(sourceConfig)) 
{
+            return isTableIdCaseSensitive(jdbcConnection);
+        } catch (SQLException e) {
+            throw new SeaTunnelException("Error reading MySQL variables: " + 
e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public ChunkSplitter createChunkSplitter(JdbcSourceConfig sourceConfig) {
+        //TODO waiting for other pr
+        return null;
+    }
+
+    @Override
+    public JdbcConnectionPoolFactory getPooledDataSourceFactory() {
+        return new MysqlPooledDataSourceFactory();
+    }
+
+    @Override
+    public List<TableId> discoverDataCollections(JdbcSourceConfig 
sourceConfig) {
+        MySqlSourceConfig mySqlSourceConfig = (MySqlSourceConfig) sourceConfig;
+        try (JdbcConnection jdbcConnection = openJdbcConnection(sourceConfig)) 
{
+            return TableDiscoveryUtils.listTables(
+                    jdbcConnection, mySqlSourceConfig.getTableFilters());
+        } catch (SQLException e) {
+            throw new SeaTunnelException("Error to discover tables: " + 
e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public MySqlSourceFetchTaskContext createFetchTaskContext(
+        SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig) {
+        final MySqlConnection jdbcConnection =
+                createMySqlConnection(taskSourceConfig.getDbzConfiguration());
+        final BinaryLogClient binaryLogClient =
+                createBinaryClient(taskSourceConfig.getDbzConfiguration());
+        return new MySqlSourceFetchTaskContext(
+                taskSourceConfig, this, jdbcConnection, binaryLogClient);
+    }
+
+    @Override
+    public FetchTask<SourceSplitBase> createFetchTask(SourceSplitBase 
sourceSplitBase) {
+        if (sourceSplitBase.isSnapshotSplit()) {
+            return new 
MySqlSnapshotFetchTask(sourceSplitBase.asSnapshotSplit());
+        } else {
+            return new 
MySqlBinlogFetchTask(sourceSplitBase.asIncrementalSplit());
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MysqlPooledDataSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MysqlPooledDataSourceFactory.java
new file mode 100644
index 000000000..9f65815f3
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MysqlPooledDataSourceFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;
+
+import org.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
+import 
org.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
+
+/** A MySQL datasource factory. */
+
+public class MysqlPooledDataSourceFactory extends JdbcConnectionPoolFactory {
+
+    public static final String JDBC_URL_PATTERN =
+            
"jdbc:mysql://%s:%s/?useInformationSchema=true&nullCatalogMeansCurrent=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL";
+
+    @Override
+    public String getJdbcUrl(JdbcSourceConfig sourceConfig) {
+        String hostName = sourceConfig.getHostname();
+        int port = sourceConfig.getPort();
+
+        return String.format(JDBC_URL_PATTERN, hostName, port);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/offset/BinlogOffset.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/offset/BinlogOffset.java
new file mode 100644
index 000000000..050f5c31c
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/offset/BinlogOffset.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset;
+
+import io.debezium.connector.mysql.GtidSet;
+import org.apache.commons.lang3.StringUtils;
+import org.seatunnel.connectors.cdc.base.source.offset.Offset;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A structure describes a fine grained offset in a binlog event including 
binlog position and gtid
+ * set etc.
+ *
+ * <p>This structure can also be used to deal the binlog event in transaction, 
a transaction may
+ * contains multiple change events, and each change event may contain multiple 
rows. When restart
+ * from a specific {@link BinlogOffset}, we need to skip the processed change 
events and the
+ * processed rows.
+ */
+public class BinlogOffset extends Offset {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final String BINLOG_FILENAME_OFFSET_KEY = "file";
+    public static final String BINLOG_POSITION_OFFSET_KEY = "pos";
+    public static final String EVENTS_TO_SKIP_OFFSET_KEY = "event";
+    public static final String ROWS_TO_SKIP_OFFSET_KEY = "row";
+    public static final String GTID_SET_KEY = "gtids";
+    public static final String TIMESTAMP_KEY = "ts_sec";
+    public static final String SERVER_ID_KEY = "server_id";
+
+    public static final BinlogOffset INITIAL_OFFSET = new BinlogOffset("", 0);
+    public static final BinlogOffset NO_STOPPING_OFFSET = new BinlogOffset("", 
Long.MIN_VALUE);
+
+    public BinlogOffset(Map<String, String> offset) {
+        this.offset = offset;
+    }
+
+    public BinlogOffset(String filename, long position) {
+        this(filename, position, 0L, 0L, 0L, null, null);
+    }
+
+    public BinlogOffset(
+            String filename,
+            long position,
+            long restartSkipEvents,
+            long restartSkipRows,
+            long binlogEpochSecs,
+            String restartGtidSet,
+            Integer serverId) {
+        Map<String, String> offsetMap = new HashMap<>();
+        offsetMap.put(BINLOG_FILENAME_OFFSET_KEY, filename);
+        offsetMap.put(BINLOG_POSITION_OFFSET_KEY, String.valueOf(position));
+        offsetMap.put(EVENTS_TO_SKIP_OFFSET_KEY, 
String.valueOf(restartSkipEvents));
+        offsetMap.put(ROWS_TO_SKIP_OFFSET_KEY, 
String.valueOf(restartSkipRows));
+        offsetMap.put(TIMESTAMP_KEY, String.valueOf(binlogEpochSecs));
+        if (restartGtidSet != null) {
+            offsetMap.put(GTID_SET_KEY, restartGtidSet);
+        }
+        if (serverId != null) {
+            offsetMap.put(SERVER_ID_KEY, String.valueOf(serverId));
+        }
+        this.offset = offsetMap;
+    }
+
+    public String getFilename() {
+        return offset.get(BINLOG_FILENAME_OFFSET_KEY);
+    }
+
+    public long getPosition() {
+        return longOffsetValue(offset, BINLOG_POSITION_OFFSET_KEY);
+    }
+
+    public long getRestartSkipEvents() {
+        return longOffsetValue(offset, EVENTS_TO_SKIP_OFFSET_KEY);
+    }
+
+    public long getRestartSkipRows() {
+        return longOffsetValue(offset, ROWS_TO_SKIP_OFFSET_KEY);
+    }
+
+    public String getGtidSet() {
+        return offset.get(GTID_SET_KEY);
+    }
+
+    public long getTimestamp() {
+        return longOffsetValue(offset, TIMESTAMP_KEY);
+    }
+
+    public Long getServerId() {
+        return longOffsetValue(offset, SERVER_ID_KEY);
+    }
+
+    /**
+     * This method is inspired by {@link 
io.debezium.relational.history.HistoryRecordComparator}.
+     */
+    @Override
+    public int compareTo(Offset offset) {
+        BinlogOffset that = (BinlogOffset) offset;
+        // the NO_STOPPING_OFFSET is the max offset
+        if (NO_STOPPING_OFFSET.equals(that) && 
NO_STOPPING_OFFSET.equals(this)) {
+            return 0;
+        }
+        if (NO_STOPPING_OFFSET.equals(this)) {
+            return 1;
+        }
+        if (NO_STOPPING_OFFSET.equals(that)) {
+            return -1;
+        }
+
+        String gtidSetStr = this.getGtidSet();
+        String targetGtidSetStr = that.getGtidSet();
+        if (StringUtils.isNotEmpty(targetGtidSetStr)) {
+            // The target offset uses GTIDs, so we ideally compare using GTIDs 
...
+            if (StringUtils.isNotEmpty(gtidSetStr)) {
+                // Both have GTIDs, so base the comparison entirely on the 
GTID sets.
+                GtidSet gtidSet = new GtidSet(gtidSetStr);
+                GtidSet targetGtidSet = new GtidSet(targetGtidSetStr);
+                if (gtidSet.equals(targetGtidSet)) {
+                    long restartSkipEvents = this.getRestartSkipEvents();
+                    long targetRestartSkipEvents = that.getRestartSkipEvents();
+                    return Long.compare(restartSkipEvents, 
targetRestartSkipEvents);
+                }
+                // The GTIDs are not an exact match, so figure out if this is 
a subset of the target
+                // offset
+                // ...
+                return gtidSet.isContainedWithin(targetGtidSet) ? -1 : 1;
+            }
+            // The target offset did use GTIDs while this did not use GTIDs. 
So, we assume
+            // that this offset is older since GTIDs are often enabled but 
rarely disabled.
+            // And if they are disabled,
+            // it is likely that this offset would not include GTIDs as we 
would be trying
+            // to read the binlog of a
+            // server that no longer has GTIDs. And if they are enabled, 
disabled, and re-enabled,
+            // per
+            // 
https://dev.mysql.com/doc/refman/5.7/en/replication-gtids-failover.html all 
properly
+            // configured slaves that
+            // use GTIDs should always have the complete set of GTIDs copied 
from the master, in
+            // which case
+            // again we know that this offset not having GTIDs is before the 
target offset ...
+            return -1;
+        } else if (StringUtils.isNotEmpty(gtidSetStr)) {
+            // This offset has a GTID but the target offset does not, so per 
the previous paragraph
+            // we
+            // assume that previous
+            // is not at or before ...
+            return 1;
+        }
+
+        // Both offsets are missing GTIDs. Look at the servers ...
+        long serverId = this.getServerId();
+        long targetServerId = that.getServerId();
+
+        if (serverId != targetServerId) {
+            // These are from different servers, and their binlog coordinates 
are not related. So
+            // the only thing we can do
+            // is compare timestamps, and we have to assume that the server 
timestamps can be
+            // compared ...
+            long timestamp = this.getTimestamp();
+            long targetTimestamp = that.getTimestamp();
+            return Long.compare(timestamp, targetTimestamp);
+        }
+
+        // First compare the MySQL binlog filenames
+        if (this.getFilename().compareToIgnoreCase(that.getFilename()) != 0) {
+            return this.getFilename().compareToIgnoreCase(that.getFilename());
+        }
+
+        // The filenames are the same, so compare the positions
+        if (this.getPosition() != that.getPosition()) {
+            return Long.compare(this.getPosition(), that.getPosition());
+        }
+
+        // The positions are the same, so compare the completed events in the 
transaction ...
+        if (this.getRestartSkipEvents() != that.getRestartSkipEvents()) {
+            return Long.compare(this.getRestartSkipEvents(), 
that.getRestartSkipEvents());
+        }
+
+        // The completed events are the same, so compare the row number ...
+        return Long.compare(this.getRestartSkipRows(), 
that.getRestartSkipRows());
+    }
+
+    @SuppressWarnings("checkstyle:EqualsHashCode")
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof BinlogOffset)) {
+            return false;
+        }
+        BinlogOffset that = (BinlogOffset) o;
+        return offset.equals(that.offset);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
new file mode 100644
index 000000000..dec05b07b
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset.BINLOG_FILENAME_OFFSET_KEY;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlUtils;
+
+import com.github.shyiko.mysql.binlog.BinaryLogClient;
+import io.debezium.connector.AbstractSourceInfo;
+import io.debezium.connector.base.ChangeEventQueue;
+import io.debezium.connector.mysql.MySqlChangeEventSourceMetricsFactory;
+import io.debezium.connector.mysql.MySqlConnection;
+import io.debezium.connector.mysql.MySqlConnectorConfig;
+import io.debezium.connector.mysql.MySqlDatabaseSchema;
+import io.debezium.connector.mysql.MySqlErrorHandler;
+import io.debezium.connector.mysql.MySqlOffsetContext;
+import io.debezium.connector.mysql.MySqlStreamingChangeEventSourceMetrics;
+import io.debezium.connector.mysql.MySqlTaskContext;
+import io.debezium.connector.mysql.MySqlTopicSelector;
+import io.debezium.data.Envelope;
+import io.debezium.pipeline.DataChangeEvent;
+import io.debezium.pipeline.ErrorHandler;
+import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
+import io.debezium.pipeline.source.spi.EventMetadataProvider;
+import io.debezium.pipeline.spi.OffsetContext;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableId;
+import io.debezium.relational.Tables;
+import io.debezium.schema.DataCollectionId;
+import io.debezium.schema.TopicSelector;
+import io.debezium.util.Collect;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
+import org.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
+import org.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
+import org.seatunnel.connectors.cdc.base.source.offset.Offset;
+import 
org.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext;
+import org.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The context for fetch task that fetching data of snapshot split from MySQL 
data source.
+ */
+public class MySqlSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MySqlSourceFetchTaskContext.class);
+
+    private final MySqlConnection connection;
+    private final BinaryLogClient binaryLogClient;
+    private final MySqlEventMetadataProvider metadataProvider;
+    private MySqlDatabaseSchema databaseSchema;
+    private MySqlTaskContextImpl taskContext;
+    private MySqlOffsetContext offsetContext;
+    private SnapshotChangeEventSourceMetrics snapshotChangeEventSourceMetrics;
+    private MySqlStreamingChangeEventSourceMetrics 
streamingChangeEventSourceMetrics;
+    private TopicSelector<TableId> topicSelector;
+    private JdbcSourceEventDispatcher dispatcher;
+    private ChangeEventQueue<DataChangeEvent> queue;
+    private MySqlErrorHandler errorHandler;
+
+    public MySqlSourceFetchTaskContext(
+        JdbcSourceConfig sourceConfig,
+        JdbcDataSourceDialect dataSourceDialect,
+        MySqlConnection connection,
+        BinaryLogClient binaryLogClient) {
+        super(sourceConfig, dataSourceDialect);
+        this.connection = connection;
+        this.binaryLogClient = binaryLogClient;
+        this.metadataProvider = new MySqlEventMetadataProvider();
+    }
+
+    @Override
+    public void configure(SourceSplitBase sourceSplitBase) {
+        // initial stateful objects
+        final MySqlConnectorConfig connectorConfig = getDbzConnectorConfig();
+        final boolean tableIdCaseInsensitive = 
connection.isTableIdCaseSensitive();
+        this.topicSelector = 
MySqlTopicSelector.defaultSelector(connectorConfig);
+
+        this.databaseSchema =
+            MySqlUtils.createMySqlDatabaseSchema(connectorConfig, 
tableIdCaseInsensitive);
+        this.offsetContext =
+            loadStartingOffsetState(
+                new MySqlOffsetContext.Loader(connectorConfig), 
sourceSplitBase);
+        validateAndLoadDatabaseHistory(offsetContext, databaseSchema);
+
+        this.taskContext =
+            new MySqlTaskContextImpl(connectorConfig, databaseSchema, 
binaryLogClient);
+        final int queueSize =
+            sourceSplitBase.isSnapshotSplit() ? Integer.MAX_VALUE
+                : getSourceConfig().getDbzConnectorConfig().getMaxQueueSize();
+        this.queue =
+            new ChangeEventQueue.Builder<DataChangeEvent>()
+                .pollInterval(connectorConfig.getPollInterval())
+                .maxBatchSize(connectorConfig.getMaxBatchSize())
+                .maxQueueSize(queueSize)
+                .maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes())
+                .loggingContextSupplier(
+                    () ->
+                        taskContext.configureLoggingContext(
+                            "mysql-cdc-connector-task"))
+                // do not buffer any element, we use signal event
+                // .buffering()
+                .build();
+        this.dispatcher =
+            new JdbcSourceEventDispatcher(
+                connectorConfig,
+                topicSelector,
+                databaseSchema,
+                queue,
+                connectorConfig.getTableFilters().dataCollectionFilter(),
+                DataChangeEvent::new,
+                metadataProvider,
+                schemaNameAdjuster);
+
+        final MySqlChangeEventSourceMetricsFactory 
changeEventSourceMetricsFactory =
+            new MySqlChangeEventSourceMetricsFactory(
+                new MySqlStreamingChangeEventSourceMetrics(
+                    taskContext, queue, metadataProvider));
+        this.snapshotChangeEventSourceMetrics =
+            changeEventSourceMetricsFactory.getSnapshotMetrics(
+                taskContext, queue, metadataProvider);
+        this.streamingChangeEventSourceMetrics =
+            (MySqlStreamingChangeEventSourceMetrics)
+                changeEventSourceMetricsFactory.getStreamingMetrics(
+                    taskContext, queue, metadataProvider);
+        this.errorHandler = new 
MySqlErrorHandler(connectorConfig.getLogicalName(), queue);
+    }
+
+    @Override
+    public MySqlSourceConfig getSourceConfig() {
+        return (MySqlSourceConfig) sourceConfig;
+    }
+
+    public MySqlConnection getConnection() {
+        return connection;
+    }
+
+    public BinaryLogClient getBinaryLogClient() {
+        return binaryLogClient;
+    }
+
+    public MySqlTaskContextImpl getTaskContext() {
+        return taskContext;
+    }
+
+    @Override
+    public MySqlConnectorConfig getDbzConnectorConfig() {
+        return (MySqlConnectorConfig) super.getDbzConnectorConfig();
+    }
+
+    @Override
+    public MySqlOffsetContext getOffsetContext() {
+        return offsetContext;
+    }
+
+    public SnapshotChangeEventSourceMetrics 
getSnapshotChangeEventSourceMetrics() {
+        return snapshotChangeEventSourceMetrics;
+    }
+
+    public MySqlStreamingChangeEventSourceMetrics 
getStreamingChangeEventSourceMetrics() {
+        return streamingChangeEventSourceMetrics;
+    }
+
+    @Override
+    public ErrorHandler getErrorHandler() {
+        return errorHandler;
+    }
+
+    @Override
+    public MySqlDatabaseSchema getDatabaseSchema() {
+        return databaseSchema;
+    }
+
+    @Override
+    public SeaTunnelRowType getSplitType(Table table) {
+        return MySqlUtils.getSplitType(table);
+    }
+
+    @Override
+    public JdbcSourceEventDispatcher getDispatcher() {
+        return dispatcher;
+    }
+
+    @Override
+    public ChangeEventQueue<DataChangeEvent> getQueue() {
+        return queue;
+    }
+
+    @Override
+    public Tables.TableFilter getTableFilter() {
+        return 
getDbzConnectorConfig().getTableFilters().dataCollectionFilter();
+    }
+
+    @Override
+    public Offset getStreamOffset(SourceRecord sourceRecord) {
+        return MySqlUtils.getBinlogPosition(sourceRecord);
+    }
+
+    /**
+     * Loads the connector's persistent offset (if present) via the given 
loader.
+     */
+    private MySqlOffsetContext loadStartingOffsetState(
+        OffsetContext.Loader loader, SourceSplitBase mySqlSplit) {
+        Offset offset =
+            mySqlSplit.isSnapshotSplit() ? BinlogOffset.INITIAL_OFFSET
+                : mySqlSplit.asIncrementalSplit().getStartupOffset();
+
+        MySqlOffsetContext mySqlOffsetContext =
+            (MySqlOffsetContext) loader.load(offset.getOffset());
+
+        if (!isBinlogAvailable(mySqlOffsetContext)) {
+            throw new IllegalStateException(
+                "The connector is trying to read binlog starting at "
+                    + mySqlOffsetContext.getSourceInfo()
+                    + ", but this is no longer "
+                    + "available on the server. Reconfigure the connector to 
use a snapshot when needed.");
+        }
+        return mySqlOffsetContext;
+    }
+
+    private boolean isBinlogAvailable(MySqlOffsetContext offset) {
+        String binlogFilename = 
offset.getSourceInfo().getString(BINLOG_FILENAME_OFFSET_KEY);
+        if (binlogFilename == null) {
+            return true; // start at current position
+        }
+        if (binlogFilename.equals("")) {
+            return true; // start at beginning
+        }
+
+        // Accumulate the available binlog filenames ...
+        List<String> logNames = connection.availableBinlogFiles();
+
+        // And compare with the one we're supposed to use ...
+        boolean found = logNames.stream().anyMatch(binlogFilename::equals);
+        if (!found) {
+            LOG.info(
+                "Connector requires binlog file '{}', but MySQL only has {}",
+                binlogFilename,
+                String.join(", ", logNames));
+        } else {
+            LOG.info("MySQL has the binlog file '{}' required by the 
connector", binlogFilename);
+        }
+        return found;
+    }
+
+    private void validateAndLoadDatabaseHistory(
+        MySqlOffsetContext offset, MySqlDatabaseSchema schema) {
+        schema.initializeStorage();
+        schema.recover(offset);
+    }
+
+    /**
+     * A subclass implementation of {@link MySqlTaskContext} which reuses one 
BinaryLogClient.
+     */
+    public class MySqlTaskContextImpl extends MySqlTaskContext {
+
+        private final BinaryLogClient reusedBinaryLogClient;
+
+        public MySqlTaskContextImpl(
+            MySqlConnectorConfig config,
+            MySqlDatabaseSchema schema,
+            BinaryLogClient reusedBinaryLogClient) {
+            super(config, schema);
+            this.reusedBinaryLogClient = reusedBinaryLogClient;
+        }
+
+        @Override
+        public BinaryLogClient getBinaryLogClient() {
+            return reusedBinaryLogClient;
+        }
+    }
+
+    /**
+     * Copied from debezium for accessing here.
+     */
+    public static class MySqlEventMetadataProvider implements 
EventMetadataProvider {
+        public static final String SERVER_ID_KEY = "server_id";
+
+        public static final String GTID_KEY = "gtid";
+        public static final String BINLOG_FILENAME_OFFSET_KEY = "file";
+        public static final String BINLOG_POSITION_OFFSET_KEY = "pos";
+        public static final String BINLOG_ROW_IN_EVENT_OFFSET_KEY = "row";
+        public static final String THREAD_KEY = "thread";
+        public static final String QUERY_KEY = "query";
+
+        @Override
+        public Instant getEventTimestamp(
+            DataCollectionId source, OffsetContext offset, Object key, Struct 
value) {
+            if (value == null) {
+                return null;
+            }
+            final Struct sourceInfo = 
value.getStruct(Envelope.FieldName.SOURCE);
+            if (source == null) {
+                return null;
+            }
+            final Long timestamp = 
sourceInfo.getInt64(AbstractSourceInfo.TIMESTAMP_KEY);
+            return timestamp == null ? null : Instant.ofEpochMilli(timestamp);
+        }
+
+        @Override
+        public Map<String, String> getEventSourcePosition(
+            DataCollectionId source, OffsetContext offset, Object key, Struct 
value) {
+            if (value == null) {
+                return null;
+            }
+            final Struct sourceInfo = 
value.getStruct(Envelope.FieldName.SOURCE);
+            if (source == null) {
+                return null;
+            }
+            return Collect.hashMapOf(
+                BINLOG_FILENAME_OFFSET_KEY,
+                sourceInfo.getString(BINLOG_FILENAME_OFFSET_KEY),
+                BINLOG_POSITION_OFFSET_KEY,
+                Long.toString(sourceInfo.getInt64(BINLOG_POSITION_OFFSET_KEY)),
+                BINLOG_ROW_IN_EVENT_OFFSET_KEY,
+                
Integer.toString(sourceInfo.getInt32(BINLOG_ROW_IN_EVENT_OFFSET_KEY)));
+        }
+
+        @Override
+        public String getTransactionId(
+            DataCollectionId source, OffsetContext offset, Object key, Struct 
value) {
+            return ((MySqlOffsetContext) offset).getTransactionId();
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java
new file mode 100644
index 000000000..21da9227a
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.binlog;
+
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.MySqlSourceFetchTaskContext;
+
+import io.debezium.connector.mysql.MySqlStreamingChangeEventSource;
+import io.debezium.pipeline.source.spi.ChangeEventSource;
+import io.debezium.util.Clock;
+import org.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
+import org.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
+import org.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+
+public class MySqlBinlogFetchTask implements FetchTask<SourceSplitBase> {
+    private final IncrementalSplit split;
+    private volatile boolean taskRunning = false;
+
+    public MySqlBinlogFetchTask(IncrementalSplit split) {
+        this.split = split;
+    }
+
+    @Override
+    public void execute(Context context) throws Exception {
+        MySqlSourceFetchTaskContext sourceFetchContext = 
(MySqlSourceFetchTaskContext) context;
+        taskRunning = true;
+
+        MySqlStreamingChangeEventSource mySqlStreamingChangeEventSource = new 
MySqlStreamingChangeEventSource(
+            sourceFetchContext.getDbzConnectorConfig(),
+            sourceFetchContext.getConnection(),
+            sourceFetchContext.getDispatcher(),
+            sourceFetchContext.getErrorHandler(),
+            Clock.SYSTEM,
+            sourceFetchContext.getTaskContext(),
+            sourceFetchContext.getStreamingChangeEventSourceMetrics());
+
+        BinlogSplitChangeEventSourceContext changeEventSourceContext =
+            new BinlogSplitChangeEventSourceContext();
+
+        mySqlStreamingChangeEventSource.execute(changeEventSourceContext, 
sourceFetchContext.getOffsetContext());
+    }
+
+    @Override
+    public boolean isRunning() {
+        return taskRunning;
+    }
+
+    @Override
+    public SourceSplitBase getSplit() {
+        return split;
+    }
+
+    private class BinlogSplitChangeEventSourceContext
+        implements ChangeEventSource.ChangeEventSourceContext {
+        @Override
+        public boolean isRunning() {
+            return taskRunning;
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java
new file mode 100644
index 000000000..832bc8b6d
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.scan;
+
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.MySqlSourceFetchTaskContext;
+
+import org.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
+import org.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
+import org.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+
+public class MySqlSnapshotFetchTask implements FetchTask<SourceSplitBase> {
+
+    private final SnapshotSplit split;
+
+    private volatile boolean taskRunning = false;
+
+    private MySqlSnapshotSplitReadTask snapshotSplitReadTask;
+
+    public MySqlSnapshotFetchTask(SnapshotSplit split) {
+        this.split = split;
+    }
+
+    @Override
+    public void execute(Context context) throws Exception {
+        MySqlSourceFetchTaskContext sourceFetchContext = 
(MySqlSourceFetchTaskContext) context;
+        taskRunning = true;
+        snapshotSplitReadTask =
+            new MySqlSnapshotSplitReadTask(
+                sourceFetchContext.getDbzConnectorConfig(),
+                sourceFetchContext.getOffsetContext(),
+                sourceFetchContext.getSnapshotChangeEventSourceMetrics(),
+                sourceFetchContext.getDatabaseSchema(),
+                sourceFetchContext.getConnection(),
+                sourceFetchContext.getDispatcher(),
+                split);
+        SnapshotSplitChangeEventSourceContext changeEventSourceContext =
+            new SnapshotSplitChangeEventSourceContext();
+        snapshotSplitReadTask.execute(
+                changeEventSourceContext, 
sourceFetchContext.getOffsetContext());
+        taskRunning = false;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return taskRunning;
+    }
+
+    @Override
+    public SourceSplitBase getSplit() {
+        return split;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java
new file mode 100644
index 000000000..0699ec394
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.scan;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.currentBinlogOffset;
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlUtils.buildSplitScanQuery;
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlUtils.readTableSplitDataStatement;
+
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset;
+
+import io.debezium.DebeziumException;
+import io.debezium.connector.mysql.MySqlConnection;
+import io.debezium.connector.mysql.MySqlConnectorConfig;
+import io.debezium.connector.mysql.MySqlDatabaseSchema;
+import io.debezium.connector.mysql.MySqlOffsetContext;
+import io.debezium.connector.mysql.MySqlValueConverters;
+import io.debezium.pipeline.EventDispatcher;
+import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
+import io.debezium.pipeline.source.spi.SnapshotProgressListener;
+import io.debezium.pipeline.spi.ChangeRecordEmitter;
+import io.debezium.pipeline.spi.OffsetContext;
+import io.debezium.pipeline.spi.SnapshotResult;
+import io.debezium.relational.Column;
+import io.debezium.relational.RelationalSnapshotChangeEventSource;
+import io.debezium.relational.SnapshotChangeRecordEmitter;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableId;
+import io.debezium.util.Clock;
+import io.debezium.util.ColumnUtils;
+import io.debezium.util.Strings;
+import io.debezium.util.Threads;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
+import org.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
+import org.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkKind;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.sql.Blob;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.time.Duration;
+import java.util.Calendar;
+
+public class MySqlSnapshotSplitReadTask extends 
AbstractSnapshotChangeEventSource {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MySqlSnapshotSplitReadTask.class);
+
+    /** Interval for showing a log statement with the progress while scanning 
a single table. */
+    private static final Duration LOG_INTERVAL = Duration.ofMillis(10_000);
+
+    private final MySqlConnectorConfig connectorConfig;
+    private final MySqlDatabaseSchema databaseSchema;
+    private final MySqlConnection jdbcConnection;
+    private final JdbcSourceEventDispatcher dispatcher;
+    private final Clock clock;
+    private final SnapshotSplit snapshotSplit;
+    private final MySqlOffsetContext offsetContext;
+    private final SnapshotProgressListener snapshotProgressListener;
+
+    public MySqlSnapshotSplitReadTask(
+        MySqlConnectorConfig connectorConfig,
+        MySqlOffsetContext previousOffset,
+        SnapshotProgressListener snapshotProgressListener,
+        MySqlDatabaseSchema databaseSchema,
+        MySqlConnection jdbcConnection,
+        JdbcSourceEventDispatcher dispatcher,
+        SnapshotSplit snapshotSplit) {
+        super(connectorConfig, snapshotProgressListener);
+        this.offsetContext = previousOffset;
+        this.connectorConfig = connectorConfig;
+        this.databaseSchema = databaseSchema;
+        this.jdbcConnection = jdbcConnection;
+        this.dispatcher = dispatcher;
+        this.clock = Clock.SYSTEM;
+        this.snapshotSplit = snapshotSplit;
+        this.snapshotProgressListener = snapshotProgressListener;
+    }
+
+    @Override
+    public SnapshotResult execute(
+        ChangeEventSourceContext context, OffsetContext previousOffset)
+        throws InterruptedException {
+        SnapshottingTask snapshottingTask = 
getSnapshottingTask(previousOffset);
+        final SnapshotContext ctx;
+        try {
+            ctx = prepare(context);
+        } catch (Exception e) {
+            LOG.error("Failed to initialize snapshot context.", e);
+            throw new RuntimeException(e);
+        }
+        try {
+            return doExecute(context, previousOffset, ctx, snapshottingTask);
+        } catch (InterruptedException e) {
+            LOG.warn("Snapshot was interrupted before completion");
+            throw e;
+        } catch (Exception t) {
+            throw new DebeziumException(t);
+        }
+    }
+
+    @Override
+    protected SnapshotResult doExecute(
+        ChangeEventSourceContext context,
+        OffsetContext previousOffset,
+        SnapshotContext snapshotContext,
+        SnapshottingTask snapshottingTask)
+        throws Exception {
+        final RelationalSnapshotChangeEventSource.RelationalSnapshotContext 
ctx =
+            (RelationalSnapshotChangeEventSource.RelationalSnapshotContext) 
snapshotContext;
+        ctx.offset = offsetContext;
+
+        final BinlogOffset lowWatermark = currentBinlogOffset(jdbcConnection);
+        LOG.info(
+            "Snapshot step 1 - Determining low watermark {} for split {}",
+            lowWatermark,
+            snapshotSplit);
+        ((SnapshotSplitChangeEventSourceContext) 
context).setLowWatermark(lowWatermark);
+        dispatcher.dispatchWatermarkEvent(
+            offsetContext.getPartition(), snapshotSplit, lowWatermark, 
WatermarkKind.LOW);
+
+        LOG.info("Snapshot step 2 - Snapshotting data");
+        createDataEvents(ctx, snapshotSplit.getTableId());
+
+        final BinlogOffset highWatermark = currentBinlogOffset(jdbcConnection);
+        LOG.info(
+            "Snapshot step 3 - Determining high watermark {} for split {}",
+            highWatermark,
+            snapshotSplit);
+        ((SnapshotSplitChangeEventSourceContext) 
context).setHighWatermark(lowWatermark);
+        dispatcher.dispatchWatermarkEvent(
+            offsetContext.getPartition(), snapshotSplit, highWatermark, 
WatermarkKind.HIGH);
+        return SnapshotResult.completed(ctx.offset);
+    }
+
+    @Override
+    protected SnapshottingTask getSnapshottingTask(OffsetContext 
previousOffset) {
+        return new SnapshottingTask(false, true);
+    }
+
+    @Override
+    protected SnapshotContext prepare(ChangeEventSourceContext 
changeEventSourceContext)
+        throws Exception {
+        return new MySqlSnapshotContext();
+    }
+
+    private void createDataEvents(
+        RelationalSnapshotChangeEventSource.RelationalSnapshotContext 
snapshotContext,
+        TableId tableId)
+        throws Exception {
+        EventDispatcher.SnapshotReceiver snapshotReceiver =
+            dispatcher.getSnapshotChangeEventReceiver();
+        LOG.debug("Snapshotting table {}", tableId);
+        createDataEventsForTable(
+            snapshotContext, snapshotReceiver, 
databaseSchema.tableFor(tableId));
+        snapshotReceiver.completeSnapshot();
+    }
+
+    /** Dispatches the data change events for the records of a single table. */
+    private void createDataEventsForTable(
+        RelationalSnapshotChangeEventSource.RelationalSnapshotContext 
snapshotContext,
+        EventDispatcher.SnapshotReceiver snapshotReceiver,
+        Table table)
+        throws InterruptedException {
+
+        long exportStart = clock.currentTimeInMillis();
+        LOG.info(
+            "Exporting data from split '{}' of table {}",
+            snapshotSplit.splitId(),
+            table.id());
+
+        final String selectSql =
+            buildSplitScanQuery(
+                snapshotSplit.getTableId(),
+                snapshotSplit.getSplitKeyType(),
+                snapshotSplit.getSplitStart() == null,
+                snapshotSplit.getSplitEnd() == null);
+        LOG.info(
+            "For split '{}' of table {} using select statement: '{}'",
+            snapshotSplit.splitId(),
+            table.id(),
+            selectSql);
+
+        try (PreparedStatement selectStatement =
+                 readTableSplitDataStatement(
+                     jdbcConnection,
+                     selectSql,
+                     snapshotSplit.getSplitStart() == null,
+                     snapshotSplit.getSplitEnd() == null,
+                     new Object[]{snapshotSplit.getSplitStart()},
+                     new Object[]{snapshotSplit.getSplitEnd()},
+                     snapshotSplit.getSplitKeyType().getTotalFields(),
+                     connectorConfig.getQueryFetchSize());
+             ResultSet rs = selectStatement.executeQuery()) {
+
+            ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, 
table);
+            long rows = 0;
+            Threads.Timer logTimer = getTableScanLogTimer();
+
+            while (rs.next()) {
+                rows++;
+                final Object[] row = new 
Object[columnArray.getGreatestColumnPosition()];
+                for (int i = 0; i < columnArray.getColumns().length; i++) {
+                    Column actualColumn = table.columns().get(i);
+                    row[columnArray.getColumns()[i].position() - 1] =
+                        readField(rs, i + 1, actualColumn, table);
+                }
+                if (logTimer.expired()) {
+                    long stop = clock.currentTimeInMillis();
+                    LOG.info(
+                        "Exported {} records for split '{}' after {}",
+                        rows,
+                        snapshotSplit.splitId(),
+                        Strings.duration(stop - exportStart));
+                    snapshotProgressListener.rowsScanned(table.id(), rows);
+                    logTimer = getTableScanLogTimer();
+                }
+                dispatcher.dispatchSnapshotEvent(
+                    table.id(),
+                    getChangeRecordEmitter(snapshotContext, table.id(), row),
+                    snapshotReceiver);
+            }
+            LOG.info(
+                "Finished exporting {} records for split '{}', total duration 
'{}'",
+                rows,
+                snapshotSplit.splitId(),
+                Strings.duration(clock.currentTimeInMillis() - exportStart));
+        } catch (SQLException e) {
+            throw new ConnectException("Snapshotting of table " + table.id() + 
" failed", e);
+        }
+    }
+
+    protected ChangeRecordEmitter getChangeRecordEmitter(
+        SnapshotContext snapshotContext, TableId tableId, Object[] row) {
+        snapshotContext.offset.event(tableId, clock.currentTime());
+        return new SnapshotChangeRecordEmitter(snapshotContext.offset, row, 
clock);
+    }
+
+    private Threads.Timer getTableScanLogTimer() {
+        return Threads.timer(clock, LOG_INTERVAL);
+    }
+
+    /**
+     * Read JDBC return value and deal special type like time, timestamp.
+     *
+     * <p>Note https://issues.redhat.com/browse/DBZ-3238 has fixed this issue, 
please remove
+     * this method once we bump Debezium version to 1.6
+     */
+    private Object readField(ResultSet rs, int fieldNo, Column actualColumn, 
Table actualTable)
+        throws SQLException {
+        if (actualColumn.jdbcType() == Types.TIME) {
+            return readTimeField(rs, fieldNo);
+        } else if (actualColumn.jdbcType() == Types.DATE) {
+            return readDateField(rs, fieldNo, actualColumn, actualTable);
+        }
+        // This is for DATETIME columns (a logical date + time without time 
zone)
+        // by reading them with a calendar based on the default time zone, we 
make sure that the
+        // value
+        // is constructed correctly using the database's (or connection's) 
time zone
+        else if (actualColumn.jdbcType() == Types.TIMESTAMP) {
+            return readTimestampField(rs, fieldNo, actualColumn, actualTable);
+        }
+        // JDBC's rs.GetObject() will return a Boolean for all TINYINT(1) 
columns.
+        // TINYINT columns are reprtoed as SMALLINT by JDBC driver
+        else if (actualColumn.jdbcType() == Types.TINYINT
+            || actualColumn.jdbcType() == Types.SMALLINT) {
+            // It seems that rs.wasNull() returns false when default value is 
set and NULL is
+            // inserted
+            // We thus need to use getObject() to identify if the value was 
provided and if yes
+            // then
+            // read it again to get correct scale
+            return rs.getObject(fieldNo) == null ? null : rs.getInt(fieldNo);
+        }
+        // DBZ-2673
+        // It is necessary to check the type names as types like ENUM and SET 
are
+        // also reported as JDBC type char
+        else if ("CHAR".equals(actualColumn.typeName())
+            || "VARCHAR".equals(actualColumn.typeName())
+            || "TEXT".equals(actualColumn.typeName())) {
+            return rs.getBytes(fieldNo);
+        } else {
+            return rs.getObject(fieldNo);
+        }
+    }
+
+    /**
+     * As MySQL connector/J implementation is broken for MySQL type "TIME" we 
have to use a
+     * binary-ish workaround. https://issues.jboss.org/browse/DBZ-342
+     */
+    private Object readTimeField(ResultSet rs, int fieldNo) throws 
SQLException {
+        Blob b = rs.getBlob(fieldNo);
+        if (b == null) {
+            return null; // Don't continue parsing time field if it is null
+        }
+
+        try {
+            return MySqlValueConverters.stringToDuration(
+                new String(b.getBytes(1, (int) (b.length())), "UTF-8"));
+        } catch (UnsupportedEncodingException e) {
+            LOG.error("Could not read MySQL TIME value as UTF-8");
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * In non-string mode the date field can contain zero in any of the date 
part which we need
+     * to handle as all-zero.
+     */
+    private Object readDateField(ResultSet rs, int fieldNo, Column column, 
Table table)
+        throws SQLException {
+        Blob b = rs.getBlob(fieldNo);
+        if (b == null) {
+            return null; // Don't continue parsing date field if it is null
+        }
+
+        try {
+            return MySqlValueConverters.stringToLocalDate(
+                new String(b.getBytes(1, (int) (b.length())), "UTF-8"), 
column, table);
+        } catch (UnsupportedEncodingException e) {
+            LOG.error("Could not read MySQL TIME value as UTF-8");
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * In non-string mode the time field can contain zero in any of the date 
part which we need
+     * to handle as all-zero.
+     */
+    private Object readTimestampField(ResultSet rs, int fieldNo, Column 
column, Table table)
+        throws SQLException {
+        Blob b = rs.getBlob(fieldNo);
+        if (b == null) {
+            return null; // Don't continue parsing timestamp field if it is 
null
+        }
+
+        try {
+            return MySqlValueConverters.containsZeroValuesInDatePart(
+                new String(b.getBytes(1, (int) (b.length())), "UTF-8"),
+                column,
+                table) ? null
+                : rs.getTimestamp(fieldNo, Calendar.getInstance());
+        } catch (UnsupportedEncodingException e) {
+            LOG.error("Could not read MySQL TIME value as UTF-8");
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static class MySqlSnapshotContext extends 
RelationalSnapshotChangeEventSource.RelationalSnapshotContext {
+        public MySqlSnapshotContext() throws SQLException {
+            super("");
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/SnapshotSplitChangeEventSourceContext.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/SnapshotSplitChangeEventSourceContext.java
new file mode 100644
index 000000000..982226ac0
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/SnapshotSplitChangeEventSourceContext.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.scan;
+
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset;
+
+import io.debezium.pipeline.source.spi.ChangeEventSource;
+
+public class SnapshotSplitChangeEventSourceContext implements 
ChangeEventSource.ChangeEventSourceContext{
+    private BinlogOffset lowWatermark;
+    private BinlogOffset highWatermark;
+
+    public BinlogOffset getLowWatermark() {
+        return lowWatermark;
+    }
+
+    public void setLowWatermark(BinlogOffset lowWatermark) {
+        this.lowWatermark = lowWatermark;
+    }
+
+    public BinlogOffset getHighWatermark() {
+        return highWatermark;
+    }
+
+    public void setHighWatermark(BinlogOffset highWatermark) {
+        this.highWatermark = highWatermark;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return lowWatermark != null && highWatermark != null;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlConnectionUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlConnectionUtils.java
new file mode 100644
index 000000000..691f2f1ae
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlConnectionUtils.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils;
+
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset;
+
+import com.github.shyiko.mysql.binlog.BinaryLogClient;
+import io.debezium.config.Configuration;
+import io.debezium.connector.mysql.MySqlConnection;
+import io.debezium.connector.mysql.MySqlConnectorConfig;
+import io.debezium.connector.mysql.MySqlDatabaseSchema;
+import io.debezium.connector.mysql.MySqlSystemVariables;
+import io.debezium.connector.mysql.MySqlTopicSelector;
+import io.debezium.connector.mysql.MySqlValueConverters;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.jdbc.JdbcValueConverters;
+import io.debezium.jdbc.TemporalPrecisionMode;
+import io.debezium.relational.TableId;
+import io.debezium.schema.TopicSelector;
+import io.debezium.util.SchemaNameAdjuster;
+
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * MySQL connection Utilities.
+ */
+public class MySqlConnectionUtils {
+
+    /**
+     * Creates a new {@link MySqlConnection}, but not open the connection.
+     */
+    public static MySqlConnection createMySqlConnection(Configuration 
dbzConfiguration) {
+        return new MySqlConnection(
+            new 
MySqlConnection.MySqlConnectionConfiguration(dbzConfiguration));
+    }
+
+    /**
+     * Creates a new {@link BinaryLogClient} for consuming mysql binlog.
+     */
+    public static BinaryLogClient createBinaryClient(Configuration 
dbzConfiguration) {
+        final MySqlConnectorConfig connectorConfig = new 
MySqlConnectorConfig(dbzConfiguration);
+        return new BinaryLogClient(
+            connectorConfig.hostname(),
+            connectorConfig.port(),
+            connectorConfig.username(),
+            connectorConfig.password());
+    }
+
+    /**
+     * Creates a new {@link MySqlDatabaseSchema} to monitor the latest MySql 
database schemas.
+     */
+    public static MySqlDatabaseSchema createMySqlDatabaseSchema(
+        MySqlConnectorConfig dbzMySqlConfig, boolean isTableIdCaseSensitive) {
+        TopicSelector<TableId> topicSelector = 
MySqlTopicSelector.defaultSelector(dbzMySqlConfig);
+        SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
+        MySqlValueConverters valueConverters = 
getValueConverters(dbzMySqlConfig);
+        return new MySqlDatabaseSchema(
+            dbzMySqlConfig,
+            valueConverters,
+            topicSelector,
+            schemaNameAdjuster,
+            isTableIdCaseSensitive);
+    }
+
+    /**
+     * Fetch current binlog offsets in MySql Server.
+     */
+    @SuppressWarnings("checkstyle:MagicNumber")
+    public static BinlogOffset currentBinlogOffset(JdbcConnection jdbc) {
+        final String showMasterStmt = "SHOW MASTER STATUS";
+        try {
+            return jdbc.queryAndMap(
+                showMasterStmt,
+                rs -> {
+                    if (rs.next()) {
+                        final String binlogFilename = rs.getString(1);
+                        final long binlogPosition = rs.getLong(2);
+                        final String gtidSet =
+                            rs.getMetaData().getColumnCount() > 4 ? 
rs.getString(5) : null;
+                        return new BinlogOffset(
+                            binlogFilename, binlogPosition, 0L, 0, 0, gtidSet, 
null);
+                    } else {
+                        throw new SeaTunnelException(
+                            "Cannot read the binlog filename and position via 
'"
+                                + showMasterStmt
+                                + "'. Make sure your server is correctly 
configured");
+                    }
+                });
+        } catch (SQLException e) {
+            throw new SeaTunnelException(
+                "Cannot read the binlog filename and position via '"
+                    + showMasterStmt
+                    + "'. Make sure your server is correctly configured",
+                e);
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    private static MySqlValueConverters 
getValueConverters(MySqlConnectorConfig dbzMySqlConfig) {
+        TemporalPrecisionMode timePrecisionMode = 
dbzMySqlConfig.getTemporalPrecisionMode();
+        JdbcValueConverters.DecimalMode decimalMode = 
dbzMySqlConfig.getDecimalMode();
+        String bigIntUnsignedHandlingModeStr =
+            dbzMySqlConfig
+                .getConfig()
+                .getString(MySqlConnectorConfig.BIGINT_UNSIGNED_HANDLING_MODE);
+        MySqlConnectorConfig.BigIntUnsignedHandlingMode 
bigIntUnsignedHandlingMode =
+            MySqlConnectorConfig.BigIntUnsignedHandlingMode.parse(
+                bigIntUnsignedHandlingModeStr);
+        JdbcValueConverters.BigIntUnsignedMode bigIntUnsignedMode =
+            bigIntUnsignedHandlingMode.asBigIntUnsignedMode();
+
+        boolean timeAdjusterEnabled =
+            
dbzMySqlConfig.getConfig().getBoolean(MySqlConnectorConfig.ENABLE_TIME_ADJUSTER);
+        return new MySqlValueConverters(
+            decimalMode,
+            timePrecisionMode,
+            bigIntUnsignedMode,
+            dbzMySqlConfig.binaryHandlingMode(),
+            timeAdjusterEnabled ? MySqlValueConverters::adjustTemporal : x -> 
x,
+            MySqlValueConverters::defaultParsingErrorHandler);
+    }
+
+    public static boolean isTableIdCaseSensitive(JdbcConnection connection) {
+        return !"0"
+            .equals(
+                readMySqlSystemVariables(connection)
+                    .get(MySqlSystemVariables.LOWER_CASE_TABLE_NAMES));
+    }
+
+    public static Map<String, String> readMySqlSystemVariables(JdbcConnection 
connection) {
+        // Read the system variables from the MySQL instance and get the 
current database name ...
+        return querySystemVariables(connection, "SHOW VARIABLES");
+    }
+
+    private static Map<String, String> querySystemVariables(
+        JdbcConnection connection, String statement) {
+        final Map<String, String> variables = new HashMap<>();
+        try {
+            connection.query(
+                statement,
+                rs -> {
+                    while (rs.next()) {
+                        String varName = rs.getString(1);
+                        String value = rs.getString(2);
+                        if (varName != null && value != null) {
+                            variables.put(varName, value);
+                        }
+                    }
+                });
+        } catch (SQLException e) {
+            throw new SeaTunnelException("Error reading MySQL variables: " + 
e.getMessage(), e);
+        }
+
+        return variables;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java
new file mode 100644
index 000000000..8250e3cf2
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import io.debezium.relational.Column;
+import lombok.extern.slf4j.Slf4j;
+
+/** Utilities for converting from MySQL types to Flink types. */
+
+@Slf4j
+public class MySqlTypeUtils {
+
+    // ============================data types=====================
+
+    private static final String MYSQL_UNKNOWN = "UNKNOWN";
+    private static final String MYSQL_BIT = "BIT";
+
+    // -------------------------number----------------------------
+    private static final String MYSQL_TINYINT = "TINYINT";
+    private static final String MYSQL_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+    private static final String MYSQL_SMALLINT = "SMALLINT";
+    private static final String MYSQL_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+    private static final String MYSQL_MEDIUMINT = "MEDIUMINT";
+    private static final String MYSQL_MEDIUMINT_UNSIGNED = "MEDIUMINT 
UNSIGNED";
+    private static final String MYSQL_INT = "INT";
+    private static final String MYSQL_INT_UNSIGNED = "INT UNSIGNED";
+    private static final String MYSQL_INTEGER = "INTEGER";
+    private static final String MYSQL_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
+    private static final String MYSQL_BIGINT = "BIGINT";
+    private static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+    private static final String MYSQL_DECIMAL = "DECIMAL";
+    private static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+    private static final String MYSQL_FLOAT = "FLOAT";
+    private static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+    private static final String MYSQL_DOUBLE = "DOUBLE";
+    private static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+
+    // -------------------------string----------------------------
+    private static final String MYSQL_CHAR = "CHAR";
+    private static final String MYSQL_VARCHAR = "VARCHAR";
+    private static final String MYSQL_TINYTEXT = "TINYTEXT";
+    private static final String MYSQL_MEDIUMTEXT = "MEDIUMTEXT";
+    private static final String MYSQL_TEXT = "TEXT";
+    private static final String MYSQL_LONGTEXT = "LONGTEXT";
+    private static final String MYSQL_JSON = "JSON";
+
+    // ------------------------------time-------------------------
+    private static final String MYSQL_DATE = "DATE";
+    private static final String MYSQL_DATETIME = "DATETIME";
+    private static final String MYSQL_TIME = "TIME";
+    private static final String MYSQL_TIMESTAMP = "TIMESTAMP";
+    private static final String MYSQL_YEAR = "YEAR";
+
+    // ------------------------------blob-------------------------
+    private static final String MYSQL_TINYBLOB = "TINYBLOB";
+    private static final String MYSQL_MEDIUMBLOB = "MEDIUMBLOB";
+    private static final String MYSQL_BLOB = "BLOB";
+    private static final String MYSQL_LONGBLOB = "LONGBLOB";
+    private static final String MYSQL_BINARY = "BINARY";
+    private static final String MYSQL_VARBINARY = "VARBINARY";
+    private static final String MYSQL_GEOMETRY = "GEOMETRY";
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    public static SeaTunnelDataType<?> convertFromColumn(Column column) {
+        String typeName = column.typeName();
+        switch (typeName) {
+            case MYSQL_BIT:
+                return BasicType.BOOLEAN_TYPE;
+            case MYSQL_TINYINT:
+            case MYSQL_TINYINT_UNSIGNED:
+            case MYSQL_SMALLINT:
+            case MYSQL_SMALLINT_UNSIGNED:
+            case MYSQL_MEDIUMINT:
+            case MYSQL_MEDIUMINT_UNSIGNED:
+            case MYSQL_INT:
+            case MYSQL_INTEGER:
+            case MYSQL_YEAR:
+                return BasicType.INT_TYPE;
+            case MYSQL_INT_UNSIGNED:
+            case MYSQL_INTEGER_UNSIGNED:
+            case MYSQL_BIGINT:
+                return BasicType.LONG_TYPE;
+            case MYSQL_BIGINT_UNSIGNED:
+                return new DecimalType(20, 0);
+            case MYSQL_DECIMAL:
+                return new DecimalType(column.length(), 
column.scale().orElse(0));
+            case MYSQL_FLOAT:
+                return BasicType.FLOAT_TYPE;
+            case MYSQL_FLOAT_UNSIGNED:
+                log.warn("{} will probably cause value overflow.", 
MYSQL_FLOAT_UNSIGNED);
+                return BasicType.FLOAT_TYPE;
+            case MYSQL_DOUBLE:
+                return BasicType.DOUBLE_TYPE;
+            case MYSQL_DOUBLE_UNSIGNED:
+                log.warn("{} will probably cause value overflow.", 
MYSQL_DOUBLE_UNSIGNED);
+                return BasicType.DOUBLE_TYPE;
+            case MYSQL_CHAR:
+            case MYSQL_TINYTEXT:
+            case MYSQL_MEDIUMTEXT:
+            case MYSQL_TEXT:
+            case MYSQL_VARCHAR:
+            case MYSQL_JSON:
+                return BasicType.STRING_TYPE;
+            case MYSQL_LONGTEXT:
+                log.warn(
+                    "Type '{}' has a maximum precision of 536870911 in MySQL. "
+                        + "Due to limitations in the seatunnel type system, "
+                        + "the precision will be set to 2147483647.",
+                    MYSQL_LONGTEXT);
+                return BasicType.STRING_TYPE;
+            case MYSQL_DATE:
+                return LocalTimeType.LOCAL_DATE_TYPE;
+            case MYSQL_TIME:
+                return LocalTimeType.LOCAL_TIME_TYPE;
+            case MYSQL_DATETIME:
+            case MYSQL_TIMESTAMP:
+                return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+
+            case MYSQL_TINYBLOB:
+            case MYSQL_MEDIUMBLOB:
+            case MYSQL_BLOB:
+            case MYSQL_LONGBLOB:
+            case MYSQL_VARBINARY:
+            case MYSQL_BINARY:
+                return PrimitiveByteArrayType.INSTANCE;
+
+            //Doesn't support yet
+            case MYSQL_GEOMETRY:
+            case MYSQL_UNKNOWN:
+            default:
+                final String columnName = column.name();
+                throw new UnsupportedOperationException(
+                    String.format(
+                        "Doesn't support MySQL type '%s' on column '%s'  yet.",
+                        typeName, columnName));
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
new file mode 100644
index 000000000..cbad9cde8
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils;
+
+import static 
org.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.rowToArray;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset;
+
+import io.debezium.connector.mysql.MySqlConnectorConfig;
+import io.debezium.connector.mysql.MySqlDatabaseSchema;
+import io.debezium.connector.mysql.MySqlTopicSelector;
+import io.debezium.connector.mysql.MySqlValueConverters;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.jdbc.JdbcValueConverters;
+import io.debezium.jdbc.TemporalPrecisionMode;
+import io.debezium.relational.Column;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableId;
+import io.debezium.schema.TopicSelector;
+import io.debezium.util.SchemaNameAdjuster;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Utils to prepare MySQL SQL statement.
+ */
+public class MySqlUtils {
+
+    private MySqlUtils() {
+    }
+
+    public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, 
String columnName)
+        throws SQLException {
+        final String minMaxQuery =
+            String.format(
+                "SELECT MIN(%s), MAX(%s) FROM %s",
+                quote(columnName), quote(columnName), quote(tableId));
+        return jdbc.queryAndMap(
+            minMaxQuery,
+            rs -> {
+                if (!rs.next()) {
+                    // this should never happen
+                    throw new SQLException(
+                        String.format(
+                            "No result returned after running query [%s]",
+                            minMaxQuery));
+                }
+                return rowToArray(rs, 2);
+            });
+    }
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId 
tableId)
+        throws SQLException {
+        // The statement used to get approximate row count which is less
+        // accurate than COUNT(*), but is more efficient for large table.
+        final String useDatabaseStatement = String.format("USE %s;", 
quote(tableId.catalog()));
+        final String rowCountQuery = String.format("SHOW TABLE STATUS LIKE 
'%s';", tableId.table());
+        jdbc.executeWithoutCommitting(useDatabaseStatement);
+        return jdbc.queryAndMap(
+            rowCountQuery,
+            rs -> {
+                if (!rs.next() || rs.getMetaData().getColumnCount() < 5) {
+                    throw new SQLException(
+                        String.format(
+                            "No result returned after running query [%s]",
+                            rowCountQuery));
+                }
+                return rs.getLong(5);
+            });
+    }
+
+    public static Object queryMin(
+        JdbcConnection jdbc, TableId tableId, String columnName, Object 
excludedLowerBound)
+        throws SQLException {
+        final String minQuery =
+            String.format(
+                "SELECT MIN(%s) FROM %s WHERE %s > ?",
+                quote(columnName), quote(tableId), quote(columnName));
+        return jdbc.prepareQueryAndMap(
+            minQuery,
+            ps -> ps.setObject(1, excludedLowerBound),
+            rs -> {
+                if (!rs.next()) {
+                    // this should never happen
+                    throw new SQLException(
+                        String.format(
+                            "No result returned after running query [%s]", 
minQuery));
+                }
+                return rs.getObject(1);
+            });
+    }
+
+    public static Object queryNextChunkMax(
+        JdbcConnection jdbc,
+        TableId tableId,
+        String splitColumnName,
+        int chunkSize,
+        Object includedLowerBound)
+        throws SQLException {
+        String quotedColumn = quote(splitColumnName);
+        String query =
+            String.format(
+                "SELECT MAX(%s) FROM ("
+                    + "SELECT %s FROM %s WHERE %s >= ? ORDER BY %s ASC LIMIT 
%s"
+                    + ") AS T",
+                quotedColumn,
+                quotedColumn,
+                quote(tableId),
+                quotedColumn,
+                quotedColumn,
+                chunkSize);
+        return jdbc.prepareQueryAndMap(
+            query,
+            ps -> ps.setObject(1, includedLowerBound),
+            rs -> {
+                if (!rs.next()) {
+                    // this should never happen
+                    throw new SQLException(
+                        String.format(
+                            "No result returned after running query [%s]", 
query));
+                }
+                return rs.getObject(1);
+            });
+    }
+
+    public static String buildSplitScanQuery(
+        TableId tableId, SeaTunnelRowType rowType, boolean isFirstSplit, 
boolean isLastSplit) {
+        return buildSplitQuery(tableId, rowType, isFirstSplit, isLastSplit, 
-1, true);
+    }
+
+    private static String buildSplitQuery(
+        TableId tableId,
+        SeaTunnelRowType rowType,
+        boolean isFirstSplit,
+        boolean isLastSplit,
+        int limitSize,
+        boolean isScanningData) {
+        final String condition;
+
+        if (isFirstSplit && isLastSplit) {
+            condition = null;
+        } else if (isFirstSplit) {
+            final StringBuilder sql = new StringBuilder();
+            addPrimaryKeyColumnsToCondition(rowType, sql, " <= ?");
+            if (isScanningData) {
+                sql.append(" AND NOT (");
+                addPrimaryKeyColumnsToCondition(rowType, sql, " = ?");
+                sql.append(")");
+            }
+            condition = sql.toString();
+        } else if (isLastSplit) {
+            final StringBuilder sql = new StringBuilder();
+            addPrimaryKeyColumnsToCondition(rowType, sql, " >= ?");
+            condition = sql.toString();
+        } else {
+            final StringBuilder sql = new StringBuilder();
+            addPrimaryKeyColumnsToCondition(rowType, sql, " >= ?");
+            if (isScanningData) {
+                sql.append(" AND NOT (");
+                addPrimaryKeyColumnsToCondition(rowType, sql, " = ?");
+                sql.append(")");
+            }
+            sql.append(" AND ");
+            addPrimaryKeyColumnsToCondition(rowType, sql, " <= ?");
+            condition = sql.toString();
+        }
+
+        if (isScanningData) {
+            return buildSelectWithRowLimits(
+                tableId, limitSize, "*", Optional.ofNullable(condition), 
Optional.empty());
+        } else {
+            final String orderBy =
+                String.join(", ", rowType.getFieldNames());
+            return buildSelectWithBoundaryRowLimits(
+                tableId,
+                limitSize,
+                getPrimaryKeyColumnsProjection(rowType),
+                getMaxPrimaryKeyColumnsProjection(rowType),
+                Optional.ofNullable(condition),
+                orderBy);
+        }
+    }
+
+    public static PreparedStatement readTableSplitDataStatement(
+        JdbcConnection jdbc,
+        String sql,
+        boolean isFirstSplit,
+        boolean isLastSplit,
+        Object[] splitStart,
+        Object[] splitEnd,
+        int primaryKeyNum,
+        int fetchSize) {
+        try {
+            final PreparedStatement statement = initStatement(jdbc, sql, 
fetchSize);
+            if (isFirstSplit && isLastSplit) {
+                return statement;
+            }
+            if (isFirstSplit) {
+                for (int i = 0; i < primaryKeyNum; i++) {
+                    statement.setObject(i + 1, splitEnd[i]);
+                    statement.setObject(i + 1 + primaryKeyNum, splitEnd[i]);
+                }
+            } else if (isLastSplit) {
+                for (int i = 0; i < primaryKeyNum; i++) {
+                    statement.setObject(i + 1, splitStart[i]);
+                }
+            } else {
+                for (int i = 0; i < primaryKeyNum; i++) {
+                    statement.setObject(i + 1, splitStart[i]);
+                    statement.setObject(i + 1 + primaryKeyNum, splitEnd[i]);
+                    statement.setObject(i + 1 + 2 * primaryKeyNum, 
splitEnd[i]);
+                }
+            }
+            return statement;
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to build the split data read 
statement.", e);
+        }
+    }
+
+    public static SeaTunnelRowType getSplitType(Table table) {
+        List<Column> primaryKeys = table.primaryKeyColumns();
+        if (primaryKeys.isEmpty()) {
+            throw new SeaTunnelException(
+                String.format(
+                    "Incremental snapshot for tables requires primary key,"
+                        + " but table %s doesn't have primary key.",
+                    table.id()));
+        }
+
+        // use first field in primary key as the split key
+        return getSplitType(primaryKeys.get(0));
+    }
+
+    /**
+     * Creates a new {@link MySqlDatabaseSchema} to monitor the latest MySql 
database schemas.
+     */
+    public static MySqlDatabaseSchema createMySqlDatabaseSchema(
+        MySqlConnectorConfig dbzMySqlConfig, boolean isTableIdCaseSensitive) {
+        TopicSelector<TableId> topicSelector = 
MySqlTopicSelector.defaultSelector(dbzMySqlConfig);
+        SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
+        MySqlValueConverters valueConverters = 
getValueConverters(dbzMySqlConfig);
+        return new MySqlDatabaseSchema(
+            dbzMySqlConfig,
+            valueConverters,
+            topicSelector,
+            schemaNameAdjuster,
+            isTableIdCaseSensitive);
+    }
+
+    private static MySqlValueConverters 
getValueConverters(MySqlConnectorConfig dbzMySqlConfig) {
+        TemporalPrecisionMode timePrecisionMode = 
dbzMySqlConfig.getTemporalPrecisionMode();
+        JdbcValueConverters.DecimalMode decimalMode = 
dbzMySqlConfig.getDecimalMode();
+        String bigIntUnsignedHandlingModeStr =
+            dbzMySqlConfig
+                .getConfig()
+                .getString(MySqlConnectorConfig.BIGINT_UNSIGNED_HANDLING_MODE);
+        MySqlConnectorConfig.BigIntUnsignedHandlingMode 
bigIntUnsignedHandlingMode =
+            MySqlConnectorConfig.BigIntUnsignedHandlingMode.parse(
+                bigIntUnsignedHandlingModeStr);
+        JdbcValueConverters.BigIntUnsignedMode bigIntUnsignedMode =
+            bigIntUnsignedHandlingMode.asBigIntUnsignedMode();
+
+        boolean timeAdjusterEnabled =
+            
dbzMySqlConfig.getConfig().getBoolean(MySqlConnectorConfig.ENABLE_TIME_ADJUSTER);
+        return new MySqlValueConverters(
+            decimalMode,
+            timePrecisionMode,
+            bigIntUnsignedMode,
+            dbzMySqlConfig.binaryHandlingMode(),
+            timeAdjusterEnabled ? MySqlValueConverters::adjustTemporal : x -> 
x,
+            MySqlValueConverters::defaultParsingErrorHandler);
+    }
+
+    public static BinlogOffset getBinlogPosition(SourceRecord dataRecord) {
+        return getBinlogPosition(dataRecord.sourceOffset());
+    }
+
+    public static BinlogOffset getBinlogPosition(Map<String, ?> offset) {
+        Map<String, String> offsetStrMap = new HashMap<>();
+        for (Map.Entry<String, ?> entry : offset.entrySet()) {
+            offsetStrMap.put(
+                entry.getKey(), entry.getValue() == null ? null : 
entry.getValue().toString());
+        }
+        return new BinlogOffset(offsetStrMap);
+    }
+
+    public static SeaTunnelRowType getSplitType(Column splitColumn) {
+        return new SeaTunnelRowType(new String[]{splitColumn.name()},
+            new 
SeaTunnelDataType<?>[]{MySqlTypeUtils.convertFromColumn(splitColumn)});
+    }
+
+    public static Column getSplitColumn(Table table) {
+        List<Column> primaryKeys = table.primaryKeyColumns();
+        if (primaryKeys.isEmpty()) {
+            throw new SeaTunnelException(
+                String.format(
+                    "Incremental snapshot for tables requires primary key,"
+                        + " but table %s doesn't have primary key.",
+                    table.id()));
+        }
+
+        // use first field in primary key as the split key
+        return primaryKeys.get(0);
+    }
+
+    public static String quote(String dbOrTableName) {
+        return "`" + dbOrTableName + "`";
+    }
+
+    public static String quote(TableId tableId) {
+        return tableId.toQuotedString('`');
+    }
+
+    private static PreparedStatement initStatement(JdbcConnection jdbc, String 
sql, int fetchSize)
+        throws SQLException {
+        final Connection connection = jdbc.connection();
+        connection.setAutoCommit(false);
+        final PreparedStatement statement = connection.prepareStatement(sql);
+        statement.setFetchSize(fetchSize);
+        return statement;
+    }
+
+    private static void addPrimaryKeyColumnsToCondition(
+        SeaTunnelRowType rowType, StringBuilder sql, String predicate) {
+        for (Iterator<String> fieldNamesIt = 
Arrays.stream(rowType.getFieldNames()).iterator();
+             fieldNamesIt.hasNext(); ) {
+            sql.append(fieldNamesIt.next()).append(predicate);
+            if (fieldNamesIt.hasNext()) {
+                sql.append(" AND ");
+            }
+        }
+    }
+
+    private static String getPrimaryKeyColumnsProjection(SeaTunnelRowType 
rowType) {
+        StringBuilder sql = new StringBuilder();
+        for (Iterator<String> fieldNamesIt = 
Arrays.stream(rowType.getFieldNames()).iterator();
+             fieldNamesIt.hasNext(); ) {
+            sql.append(fieldNamesIt.next());
+            if (fieldNamesIt.hasNext()) {
+                sql.append(" , ");
+            }
+        }
+        return sql.toString();
+    }
+
+    private static String getMaxPrimaryKeyColumnsProjection(SeaTunnelRowType 
rowType) {
+        StringBuilder sql = new StringBuilder();
+        for (Iterator<String> fieldNamesIt = 
Arrays.stream(rowType.getFieldNames()).iterator();
+             fieldNamesIt.hasNext(); ) {
+            sql.append("MAX(" + fieldNamesIt.next() + ")");
+            if (fieldNamesIt.hasNext()) {
+                sql.append(" , ");
+            }
+        }
+        return sql.toString();
+    }
+
+    private static String buildSelectWithRowLimits(
+        TableId tableId,
+        int limit,
+        String projection,
+        Optional<String> condition,
+        Optional<String> orderBy) {
+        final StringBuilder sql = new StringBuilder("SELECT ");
+        sql.append(projection).append(" FROM ");
+        sql.append(quotedTableIdString(tableId));
+        if (condition.isPresent()) {
+            sql.append(" WHERE ").append(condition.get());
+        }
+        if (orderBy.isPresent()) {
+            sql.append(" ORDER BY ").append(orderBy.get());
+        }
+        if (limit > 0) {
+            sql.append(" LIMIT ").append(limit);
+        }
+        return sql.toString();
+    }
+
+    private static String buildSelectWithBoundaryRowLimits(
+        TableId tableId,
+        int limit,
+        String projection,
+        String maxColumnProjection,
+        Optional<String> condition,
+        String orderBy) {
+        final StringBuilder sql = new StringBuilder("SELECT ");
+        sql.append(maxColumnProjection);
+        sql.append(" FROM (");
+        sql.append("SELECT ");
+        sql.append(projection);
+        sql.append(" FROM ");
+        sql.append(quotedTableIdString(tableId));
+        if (condition.isPresent()) {
+            sql.append(" WHERE ").append(condition.get());
+        }
+        sql.append(" ORDER BY ").append(orderBy).append(" LIMIT 
").append(limit);
+        sql.append(") T");
+        return sql.toString();
+    }
+
+    private static String quotedTableIdString(TableId tableId) {
+        return tableId.toQuotedString('`');
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/TableDiscoveryUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/TableDiscoveryUtils.java
new file mode 100644
index 000000000..1e3e65a6c
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/TableDiscoveryUtils.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlUtils.quote;
+
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.RelationalTableFilters;
+import io.debezium.relational.TableId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utilities to discovery matched tables.
+ */
+public class TableDiscoveryUtils {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TableDiscoveryUtils.class);
+
+    public static List<TableId> listTables(JdbcConnection jdbc, 
RelationalTableFilters tableFilters)
+        throws SQLException {
+        final List<TableId> capturedTableIds = new ArrayList<>();
+        // -------------------
+        // READ DATABASE NAMES
+        // -------------------
+        // Get the list of databases ...
+        LOG.info("Read list of available databases");
+        final List<String> databaseNames = new ArrayList<>();
+
+        jdbc.query(
+            "SHOW DATABASES",
+            rs -> {
+                while (rs.next()) {
+                    databaseNames.add(rs.getString(1));
+                }
+            });
+        LOG.info("\t list of available databases is: {}", databaseNames);
+
+        // ----------------
+        // READ TABLE NAMES
+        // ----------------
+        // Get the list of table IDs for each database. We can't use a 
prepared statement with
+        // MySQL, so we have to build the SQL statement each time. Although in 
other cases this
+        // might lead to SQL injection, in our case we are reading the 
database names from the
+        // database and not taking them from the user ...
+        LOG.info("Read list of available tables in each database");
+        for (String dbName : databaseNames) {
+            try {
+                jdbc.query(
+                    "SHOW FULL TABLES IN " + quote(dbName) + " where 
Table_Type = 'BASE TABLE'",
+                    rs -> {
+                        while (rs.next()) {
+                            TableId tableId = new TableId(dbName, null, 
rs.getString(1));
+                            if 
(tableFilters.dataCollectionFilter().isIncluded(tableId)) {
+                                capturedTableIds.add(tableId);
+                                LOG.info("\t including '{}' for further 
processing", tableId);
+                            } else {
+                                LOG.info("\t '{}' is filtered out of 
capturing", tableId);
+                            }
+                        }
+                    });
+            } catch (SQLException e) {
+                // We were unable to execute the query or process the results, 
so skip this ...
+                LOG.warn(
+                    "\t skipping database '{}' due to error reading tables: 
{}",
+                    dbName,
+                    e.getMessage());
+            }
+        }
+        return capturedTableIds;
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/pom.xml 
b/seatunnel-connectors-v2/connector-cdc/pom.xml
index cf6a2e6c0..722d678b3 100644
--- a/seatunnel-connectors-v2/connector-cdc/pom.xml
+++ b/seatunnel-connectors-v2/connector-cdc/pom.xml
@@ -30,9 +30,11 @@
     <packaging>pom</packaging>
 
     <properties>
+        <debezium.version>1.6.4.Final</debezium.version>
     </properties>
 
     <modules>
         <module>connector-cdc-base</module>
+        <module>connector-cdc-mysql</module>
     </modules>
 </project>


Reply via email to