This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c56a7ac05 [INLONG-8175][Sort] MySQL CDC support read data from 
specific timestamp / earliest offset / specific offset (#8240)
9c56a7ac05 is described below

commit 9c56a7ac05de442cfc0031077a66741f708ee851
Author: emhui <[email protected]>
AuthorDate: Fri Jun 16 16:05:01 2023 +0800

    [INLONG-8175][Sort] MySQL CDC support read data from specific timestamp / 
earliest offset / specific offset (#8240)
---
 .../inlong/sort/parser/MySqlLoadSqlParseTest.java  | 136 +++++++++++++++
 .../source/assigner/SnapshotSplitAssigner.java     |   3 +-
 .../apache/inlong/sort/cdc/mysql/MySqlSource.java  |   7 +-
 .../sort/cdc/mysql/debezium/DebeziumUtils.java     |   6 +-
 .../mysql/debezium/reader/BinlogSplitReader.java   |  23 ++-
 .../mysql/debezium/reader/SnapshotSplitReader.java |   3 +-
 .../debezium/task/MySqlBinlogSplitReadTask.java    |  13 +-
 .../debezium/task/context/MySqlErrorHandler.java   |  73 ++++++--
 .../debezium/task/context/StatefulTaskContext.java |  10 +-
 .../exception/SchemaOutOfSyncException.java        |  29 ++++
 .../source/assigners/MySqlBinlogSplitAssigner.java |  23 +--
 .../source/assigners/MySqlHybridSplitAssigner.java |   4 +-
 .../source/config/MySqlSourceConfigFactory.java    |   8 -
 .../mysql/source/config/MySqlSourceOptions.java    |  24 ++-
 .../sort/cdc/mysql/source/offset/BinlogOffset.java |  97 +++++++----
 .../mysql/source/offset/BinlogOffsetBuilder.java   | 118 +++++++++++++
 .../cdc/mysql/source/offset/BinlogOffsetKind.java  |  59 +++++++
 .../cdc/mysql/source/offset/BinlogOffsetUtils.java |  64 +++++++
 .../cdc/mysql/source/reader/MySqlSourceReader.java |  51 ++++--
 .../sort/cdc/mysql/source/utils/RecordUtils.java   |   4 +-
 .../cdc/mysql/source/utils/SerializerUtils.java    |  21 ++-
 .../mysql/table/MySqlTableInlongSourceFactory.java |  81 ++++++---
 .../sort/cdc/mysql/table/StartupOptions.java       |  66 ++++----
 licenses/inlong-sort-connectors/LICENSE            | 184 +++++++++++----------
 24 files changed, 846 insertions(+), 261 deletions(-)

diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlLoadSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlLoadSqlParseTest.java
index 473331e986..1c324b5532 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlLoadSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlLoadSqlParseTest.java
@@ -63,6 +63,58 @@ public class MySqlLoadSqlParseTest extends AbstractTestBase {
                 null, null);
     }
 
+    /**
+     * build mysql extract node with specific timestamp
+     */
+    private MySqlExtractNode buildMySQLExtractNodeWithSpecificTimestamp() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new 
LongFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo()));
+        Map<String, String> map = new HashMap<>();
+        map.put("scan.startup.mode", "timestamp");
+        map.put("scan.startup.timestamp-millis", "1667232000000");
+        return new MySqlExtractNode("1", "mysql_input", fields,
+                null, map, "id",
+                Collections.singletonList("table_input"), "localhost", 
"inlong", "inlong",
+                "inlong", null, null,
+                null, null);
+    }
+
+    /**
+     * build mysql extract node with specific binlog
+     */
+    private MySqlExtractNode buildMySQLExtractNodeWithSpecificBinlog() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new 
LongFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo()));
+        Map<String, String> map = new HashMap<>();
+        map.put("scan.startup.mode", "specific-offset");
+        map.put("scan.startup.specific-offset.file", "mysql-bin.000003");
+        map.put("scan.startup.specific-offset.pos", "4");
+        return new MySqlExtractNode("1", "mysql_input", fields,
+                null, map, "id",
+                Collections.singletonList("table_input"), "localhost", 
"inlong", "inlong",
+                "inlong", null, null,
+                null, null);
+    }
+
+    /**
+     * build mysql extract node with specific gtid
+     */
+    private MySqlExtractNode buildMySQLExtractNodeWithSpecificGtid() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new 
LongFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo()));
+        Map<String, String> map = new HashMap<>();
+        map.put("scan.startup.mode", "specific-offset");
+        map.put("scan.startup.specific-offset.gtid-set", 
"24DA167-0C0C-11E8-8442-00059A3C7B00:1-19");
+        return new MySqlExtractNode("1", "mysql_input", fields,
+                null, map, "id",
+                Collections.singletonList("table_input"), "localhost", 
"inlong", "inlong",
+                "inlong", null, null,
+                null, null);
+    }
+
     private Node buildMysqlLoadNode() {
         List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new 
LongFormatInfo()),
                 new FieldInfo("name", new StringFormatInfo()),
@@ -127,4 +179,88 @@ public class MySqlLoadSqlParseTest extends 
AbstractTestBase {
         Assert.assertTrue(result.tryExecute());
     }
 
+    /**
+     * Test flink sql task for extract is mysql {@link MySqlExtractNode} and 
load is mysql {@link MySqlLoadNode}
+     *
+     * @throws Exception The exception may be thrown when executing
+     */
+    @Test
+    public void testMySqlLoadStartupFromSpecificTimestampSqlParse() throws 
Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        env.disableOperatorChaining();
+        EnvironmentSettings settings = EnvironmentSettings
+                .newInstance()
+                .useBlinkPlanner()
+                .inStreamingMode()
+                .build();
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
+        Node inputNode = buildMySQLExtractNodeWithSpecificTimestamp();
+        Node outputNode = buildMysqlLoadNode();
+        StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, 
outputNode),
+                
Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+                        Collections.singletonList(outputNode))));
+        GroupInfo groupInfo = new GroupInfo("1", 
Collections.singletonList(streamInfo));
+        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, 
groupInfo);
+        ParseResult result = parser.parse();
+        Assert.assertTrue(result.tryExecute());
+    }
+
+    /**
+     * Test flink sql task for extract is mysql {@link MySqlExtractNode} and 
load is mysql {@link MySqlLoadNode}
+     *
+     * @throws Exception The exception may be thrown when executing
+     */
+    @Test
+    public void testMySqlLoadStartupFromSpecificBinlogSqlParse() throws 
Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        env.disableOperatorChaining();
+        EnvironmentSettings settings = EnvironmentSettings
+                .newInstance()
+                .useBlinkPlanner()
+                .inStreamingMode()
+                .build();
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
+        Node inputNode = buildMySQLExtractNodeWithSpecificBinlog();
+        Node outputNode = buildMysqlLoadNode();
+        StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, 
outputNode),
+                
Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+                        Collections.singletonList(outputNode))));
+        GroupInfo groupInfo = new GroupInfo("1", 
Collections.singletonList(streamInfo));
+        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, 
groupInfo);
+        ParseResult result = parser.parse();
+        Assert.assertTrue(result.tryExecute());
+    }
+
+    /**
+     * Test flink sql task for extract is mysql {@link MySqlExtractNode} and 
load is mysql {@link MySqlLoadNode}
+     *
+     * @throws Exception The exception may be thrown when executing
+     */
+    @Test
+    public void testMySqlLoadStartupFromSpecificGtidSqlParse() throws 
Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        env.disableOperatorChaining();
+        EnvironmentSettings settings = EnvironmentSettings
+                .newInstance()
+                .useBlinkPlanner()
+                .inStreamingMode()
+                .build();
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
+        Node inputNode = buildMySQLExtractNodeWithSpecificGtid();
+        Node outputNode = buildMysqlLoadNode();
+        StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, 
outputNode),
+                
Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+                        Collections.singletonList(outputNode))));
+        GroupInfo groupInfo = new GroupInfo("1", 
Collections.singletonList(streamInfo));
+        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, 
groupInfo);
+        ParseResult result = parser.parse();
+        Assert.assertTrue(result.tryExecute());
+    }
+
 }
diff --git 
a/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/assigner/SnapshotSplitAssigner.java
 
b/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/assigner/SnapshotSplitAssigner.java
index a151b5c42e..0344a35824 100644
--- 
a/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/assigner/SnapshotSplitAssigner.java
+++ 
b/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/assigner/SnapshotSplitAssigner.java
@@ -52,8 +52,7 @@ import java.util.stream.Collectors;
  * */
 public class SnapshotSplitAssigner<C extends SourceConfig> implements 
SplitAssigner {
 
-    private static final Logger LOG = LoggerFactory.getLogger(
-            
com.ververica.cdc.connectors.base.source.assigner.SnapshotSplitAssigner.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(SnapshotSplitAssigner.class);
 
     private final List<TableId> alreadyProcessedTables;
     private final List<SchemalessSnapshotSplit> remainingSplits;
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/MySqlSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/MySqlSource.java
index 4f4de822ef..3576246b9a 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/MySqlSource.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/MySqlSource.java
@@ -248,8 +248,8 @@ public class MySqlSource {
                     specificOffset.setSourcePartition(sourcePartition);
 
                     Map<String, Object> sourceOffset = new HashMap<>();
-                    sourceOffset.put("file", 
startupOptions.specificOffsetFile);
-                    sourceOffset.put("pos", startupOptions.specificOffsetPos);
+                    sourceOffset.put("file", 
startupOptions.binlogOffset.getFilename());
+                    sourceOffset.put("pos", 
startupOptions.binlogOffset.getPosition());
                     specificOffset.setSourceOffset(sourceOffset);
                     break;
 
@@ -258,7 +258,8 @@ public class MySqlSource {
                     props.setProperty("snapshot.mode", "never");
                     deserializer =
                             new SeekBinlogToTimestampFilter<>(
-                                    startupOptions.startupTimestampMillis, 
deserializer);
+                                    
startupOptions.binlogOffset.getTimestampSec() * 1000,
+                                    deserializer);
                     break;
 
                 default:
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/DebeziumUtils.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/DebeziumUtils.java
index d487120036..547018f07e 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/DebeziumUtils.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/DebeziumUtils.java
@@ -129,8 +129,10 @@ public class DebeziumUtils {
                             final long binlogPosition = rs.getLong(2);
                             final String gtidSet =
                                     rs.getMetaData().getColumnCount() > 4 ? 
rs.getString(5) : null;
-                            return new BinlogOffset(
-                                    binlogFilename, binlogPosition, 0L, 0, 0, 
gtidSet, null);
+                            return BinlogOffset.builder()
+                                    .setBinlogFilePosition(binlogFilename, 
binlogPosition)
+                                    .setGtidSet(gtidSet)
+                                    .build();
                         } else {
                             throw new FlinkRuntimeException(
                                     "Cannot read the binlog filename and 
position via '"
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java
index eee97a0fcb..b3714f4f72 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java
@@ -20,12 +20,15 @@ package org.apache.inlong.sort.cdc.mysql.debezium.reader;
 import org.apache.inlong.sort.cdc.mysql.debezium.task.MySqlBinlogSplitReadTask;
 import 
org.apache.inlong.sort.cdc.mysql.debezium.task.context.StatefulTaskContext;
 import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffset;
+import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffsetKind;
 import org.apache.inlong.sort.cdc.mysql.source.split.FinishedSnapshotSplitInfo;
 import org.apache.inlong.sort.cdc.mysql.source.split.MySqlBinlogSplit;
 import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSplit;
 import org.apache.inlong.sort.cdc.mysql.source.utils.ChunkUtils;
 import org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils;
 
+import com.github.shyiko.mysql.binlog.event.Event;
+import com.github.shyiko.mysql.binlog.event.EventType;
 import io.debezium.connector.base.ChangeEventQueue;
 import io.debezium.connector.mysql.MySqlOffsetContext;
 import io.debezium.connector.mysql.MySqlStreamingChangeEventSourceMetrics;
@@ -50,6 +53,7 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
+import java.util.function.Predicate;
 
 import static 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getBinlogPosition;
 import static 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getSplitInfoByBinarySearch;
@@ -107,7 +111,8 @@ public class BinlogSplitReader implements 
DebeziumReader<SourceRecord, MySqlSpli
                         (MySqlStreamingChangeEventSourceMetrics) 
statefulTaskContext
                                 .getStreamingChangeEventSourceMetrics(),
                         
statefulTaskContext.getTopicSelector().getPrimaryTopic(),
-                        currentBinlogSplit);
+                        currentBinlogSplit,
+                        
createEventFilter(currentBinlogSplit.getStartingOffset()));
 
         executor.submit(
                 () -> {
@@ -247,7 +252,7 @@ public class BinlogSplitReader implements 
DebeziumReader<SourceRecord, MySqlSpli
                 currentBinlogSplit.getFinishedSnapshotSplitInfos();
         Map<TableId, List<FinishedSnapshotSplitInfo>> splitsInfoMap = new 
HashMap<>();
         Map<TableId, BinlogOffset> tableIdBinlogPositionMap = new HashMap<>();
-        // latest-offset mode
+        // specific offset mode
         if (finishedSplitInfos.isEmpty()) {
             for (TableId tableId : 
currentBinlogSplit.getTableSchemas().keySet()) {
                 tableIdBinlogPositionMap.put(tableId, 
currentBinlogSplit.getStartingOffset());
@@ -271,6 +276,20 @@ public class BinlogSplitReader implements 
DebeziumReader<SourceRecord, MySqlSpli
         this.maxSplitHighWatermarkMap = tableIdBinlogPositionMap;
     }
 
+    private Predicate<Event> createEventFilter(BinlogOffset startingOffset) {
+        // If the startup mode is set as TIMESTAMP, we need to apply a filter 
on event to drop
+        // events earlier than the specified timestamp.
+        if (BinlogOffsetKind.TIMESTAMP.equals(startingOffset.getOffsetKind())) 
{
+            long startTimestampSec = startingOffset.getTimestampSec();
+            // Notes:
+            // 1. Heartbeat event doesn't contain timestamp, so we just keep it
+            // 2. Timestamp of event is in epoch millisecond
+            return event -> 
EventType.HEARTBEAT.equals(event.getHeader().getEventType())
+                    || event.getHeader().getTimestamp() >= startTimestampSec * 
1000;
+        }
+        return event -> true;
+    }
+
     public void stopBinlogReadTask() {
         this.currentTaskRunning = false;
     }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/SnapshotSplitReader.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/SnapshotSplitReader.java
index 9110bb53d6..c1200deac1 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/SnapshotSplitReader.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/SnapshotSplitReader.java
@@ -190,7 +190,8 @@ public class SnapshotSplitReader implements 
DebeziumReader<SourceRecord, MySqlSp
                 statefulTaskContext.getTaskContext(),
                 (MySqlStreamingChangeEventSourceMetrics) 
statefulTaskContext.getStreamingChangeEventSourceMetrics(),
                 statefulTaskContext.getTopicSelector().getPrimaryTopic(),
-                backfillBinlogSplit);
+                backfillBinlogSplit,
+                event -> true);
     }
 
     private void dispatchBinlogEndEvent(MySqlBinlogSplit backFillBinlogSplit)
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlBinlogSplitReadTask.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlBinlogSplitReadTask.java
index 1068cb0de9..63bbd402de 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlBinlogSplitReadTask.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlBinlogSplitReadTask.java
@@ -40,8 +40,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import java.util.function.Predicate;
 
-import static 
org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffset.NO_STOPPING_OFFSET;
+import static 
org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffsetUtils.isNonStoppingOffset;
 import static 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getBinlogPosition;
 
 /**
@@ -56,6 +57,7 @@ public class MySqlBinlogSplitReadTask extends 
MySqlStreamingChangeEventSource {
     private final EventDispatcherImpl<TableId> eventDispatcher;
     private final SignalEventDispatcher signalEventDispatcher;
     private final ErrorHandler errorHandler;
+    private final Predicate<Event> eventFilter;
     private ChangeEventSourceContext context;
     private final MySqlTaskContext taskContext;
 
@@ -72,7 +74,8 @@ public class MySqlBinlogSplitReadTask extends 
MySqlStreamingChangeEventSource {
             MySqlTaskContext taskContext,
             MySqlStreamingChangeEventSourceMetrics metrics,
             String topic,
-            MySqlBinlogSplit binlogSplit) {
+            MySqlBinlogSplit binlogSplit,
+            Predicate<Event> eventFilter) {
         super(
                 connectorConfig,
                 offsetContext,
@@ -91,6 +94,7 @@ public class MySqlBinlogSplitReadTask extends 
MySqlStreamingChangeEventSource {
         this.signalEventDispatcher =
                 new SignalEventDispatcher(
                         offsetContext.getPartition(), topic, 
eventDispatcher.getQueue());
+        this.eventFilter = eventFilter;
     }
 
     /**
@@ -124,6 +128,9 @@ public class MySqlBinlogSplitReadTask extends 
MySqlStreamingChangeEventSource {
 
     @Override
     protected void handleEvent(Event event) {
+        if (!eventFilter.test(event)) {
+            return;
+        }
         super.handleEvent(event);
         // check do we need to stop for read binlog for snapshot split.
         if (isBoundedRead()) {
@@ -148,6 +155,6 @@ public class MySqlBinlogSplitReadTask extends 
MySqlStreamingChangeEventSource {
     }
 
     private boolean isBoundedRead() {
-        return !NO_STOPPING_OFFSET.equals(binlogSplit.getEndingOffset());
+        return !isNonStoppingOffset(binlogSplit.getEndingOffset());
     }
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/MySqlErrorHandler.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/MySqlErrorHandler.java
index 090dfe3176..d2e538a476 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/MySqlErrorHandler.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/MySqlErrorHandler.java
@@ -17,12 +17,18 @@
 
 package org.apache.inlong.sort.cdc.mysql.debezium.task.context;
 
+import 
org.apache.inlong.sort.cdc.mysql.debezium.task.context.exception.SchemaOutOfSyncException;
+import org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceConfig;
+import org.apache.inlong.sort.cdc.mysql.table.StartupMode;
+
 import io.debezium.DebeziumException;
 import io.debezium.connector.base.ChangeEventQueue;
 import io.debezium.connector.mysql.MySqlConnector;
 import io.debezium.connector.mysql.MySqlTaskContext;
 import io.debezium.pipeline.ErrorHandler;
 import io.debezium.relational.TableId;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.kafka.connect.errors.ConnectException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,11 +47,16 @@ public class MySqlErrorHandler extends ErrorHandler {
                     "Encountered change event for table (.+)\\.(.+) whose 
schema isn't known to this connector");
 
     MySqlTaskContext context;
+    private final MySqlSourceConfig sourceConfig;
 
     public MySqlErrorHandler(
-            String logicalName, ChangeEventQueue<?> queue, MySqlTaskContext 
context) {
+            String logicalName,
+            ChangeEventQueue<?> queue,
+            MySqlTaskContext context,
+            MySqlSourceConfig sourceConfig) {
         super(MySqlConnector.class, logicalName, queue);
         this.context = context;
+        this.sourceConfig = sourceConfig;
     }
 
     @Override
@@ -55,20 +66,56 @@ public class MySqlErrorHandler extends ErrorHandler {
 
     @Override
     public void setProducerThrowable(Throwable producerThrowable) {
-        if (producerThrowable.getCause() instanceof DebeziumException) {
-            DebeziumException e = (DebeziumException) 
producerThrowable.getCause();
-            String detailMessage = e.getMessage();
-            Matcher matcher = 
NOT_FOUND_TABLE_MSG_PATTERN.matcher(detailMessage);
-            if (matcher.find()) {
-                String databaseName = matcher.group(1);
-                String tableName = matcher.group(2);
-                TableId tableId = new TableId(databaseName, null, tableName);
-                if (context.getSchema().schemaFor(tableId) == null) {
-                    LOG.warn("Schema for table " + tableId + " is null");
-                    return;
-                }
+        if (isTableNotFoundException(producerThrowable)) {
+            Matcher matcher =
+                    
NOT_FOUND_TABLE_MSG_PATTERN.matcher(producerThrowable.getCause().getMessage());
+            String databaseName = matcher.group(1);
+            String tableName = matcher.group(2);
+            TableId tableId = new TableId(databaseName, null, tableName);
+            if (context.getSchema().schemaFor(tableId) == null) {
+                LOG.warn("Schema for table " + tableId + " is null");
+                return;
             }
         }
+
+        if (isSchemaOutOfSyncException(producerThrowable)) {
+            super.setProducerThrowable(
+                    new SchemaOutOfSyncException(
+                            "Internal schema representation is probably out of 
sync with real database schema. "
+                                    + "The reason could be that the table 
schema was changed after the starting "
+                                    + "binlog offset, which is not supported 
when startup mode is set to "
+                                    + 
sourceConfig.getStartupOptions().startupMode,
+                            producerThrowable));
+            return;
+        }
+
         super.setProducerThrowable(producerThrowable);
     }
+
+    private boolean isTableNotFoundException(Throwable t) {
+        if (!(t.getCause() instanceof DebeziumException)) {
+            return false;
+        }
+        DebeziumException e = (DebeziumException) t.getCause();
+        String detailMessage = e.getMessage();
+        Matcher matcher = NOT_FOUND_TABLE_MSG_PATTERN.matcher(detailMessage);
+        return matcher.find();
+    }
+
+    private boolean isSchemaOutOfSyncException(Throwable t) {
+        Throwable rootCause = ExceptionUtils.getRootCause(t);
+        return rootCause instanceof ConnectException
+                && rootCause
+                        .getMessage()
+                        .endsWith(
+                                "internal schema representation is probably 
out of sync with real database schema")
+                && isSettingStartingOffset();
+    }
+
+    private boolean isSettingStartingOffset() {
+        StartupMode startupMode = sourceConfig.getStartupOptions().startupMode;
+        return startupMode == StartupMode.EARLIEST_OFFSET
+                || startupMode == StartupMode.TIMESTAMP
+                || startupMode == StartupMode.SPECIFIC_OFFSETS;
+    }
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/StatefulTaskContext.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/StatefulTaskContext.java
index 2464756d9c..7082845a56 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/StatefulTaskContext.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/StatefulTaskContext.java
@@ -57,6 +57,7 @@ import java.util.List;
 import java.util.Map;
 
 import static 
org.apache.inlong.sort.cdc.mysql.debezium.task.context.StatefulTaskContext.MySqlEventMetadataProvider.BINLOG_FILENAME_OFFSET_KEY;
+import static 
org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffsetUtils.initializeEffectiveOffset;
 
 /**
  * A stateful task context that contains entries the debezium mysql connector 
task required.
@@ -160,7 +161,7 @@ public class StatefulTaskContext {
                 changeEventSourceMetricsFactory.getStreamingMetrics(
                         taskContext, queue, metadataProvider);
         this.errorHandler =
-                new MySqlErrorHandler(connectorConfig.getLogicalName(), queue, 
taskContext);
+                new MySqlErrorHandler(connectorConfig.getLogicalName(), queue, 
taskContext, sourceConfig);
     }
 
     private void validateAndLoadDatabaseHistory(
@@ -176,8 +177,11 @@ public class StatefulTaskContext {
             OffsetContext.Loader loader, MySqlSplit mySqlSplit) {
         BinlogOffset offset =
                 mySqlSplit.isSnapshotSplit()
-                        ? BinlogOffset.INITIAL_OFFSET
-                        : mySqlSplit.asBinlogSplit().getStartingOffset();
+                        ? BinlogOffset.ofEarliest()
+                        : initializeEffectiveOffset(
+                                
mySqlSplit.asBinlogSplit().getStartingOffset(), connection);
+
+        LOG.info("Starting offset is initialized to {}", offset);
 
         MySqlOffsetContext mySqlOffsetContext =
                 (MySqlOffsetContext) loader.load(offset.getOffset());
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/exception/SchemaOutOfSyncException.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/exception/SchemaOutOfSyncException.java
new file mode 100644
index 0000000000..edfb7e1082
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/exception/SchemaOutOfSyncException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.mysql.debezium.task.context.exception;
+
+/**
+ * A wrapper class for clearly show the possible reason of a 
schema-out-of-sync exception thrown
+ * inside Debezium.
+ */
+public class SchemaOutOfSyncException extends Exception {
+
+    public SchemaOutOfSyncException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlBinlogSplitAssigner.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlBinlogSplitAssigner.java
index 0123e3f2a8..68b375d301 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlBinlogSplitAssigner.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlBinlogSplitAssigner.java
@@ -29,13 +29,10 @@ import 
org.apache.inlong.sort.cdc.mysql.source.split.MySqlBinlogSplit;
 import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSplit;
 
 import io.debezium.connector.mysql.MySqlConnection;
-import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.RelationalTableFilters;
 import io.debezium.relational.TableId;
 import io.debezium.relational.history.TableChanges.TableChange;
 import org.apache.flink.util.FlinkRuntimeException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.sql.SQLException;
 import java.util.ArrayList;
@@ -47,14 +44,12 @@ import java.util.Map;
 import java.util.Optional;
 
 import static 
com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils.listTables;
-import static 
org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils.currentBinlogOffset;
 
 /**
  * A {@link MySqlSplitAssigner} which only read binlog from current binlog 
position.
  */
 public class MySqlBinlogSplitAssigner implements MySqlSplitAssigner {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(MySqlBinlogSplitAssigner.class);
     private static final String BINLOG_SPLIT_ID = "binlog-split";
 
     private final MySqlSourceConfig sourceConfig;
@@ -149,17 +144,13 @@ public class MySqlBinlogSplitAssigner implements 
MySqlSplitAssigner {
 
     private MySqlBinlogSplit createBinlogSplit() {
         Map<TableId, TableChange> tableSchemas = 
discoverCapturedTableSchemas();
-        try (JdbcConnection jdbc = 
DebeziumUtils.openJdbcConnection(sourceConfig)) {
-            return new MySqlBinlogSplit(
-                    BINLOG_SPLIT_ID,
-                    currentBinlogOffset(jdbc),
-                    BinlogOffset.NO_STOPPING_OFFSET,
-                    new ArrayList<>(),
-                    tableSchemas,
-                    0);
-        } catch (Exception e) {
-            throw new FlinkRuntimeException("Read the binlog offset error", e);
-        }
+        return new MySqlBinlogSplit(
+                BINLOG_SPLIT_ID,
+                sourceConfig.getStartupOptions().binlogOffset,
+                BinlogOffset.ofNonStopping(),
+                new ArrayList<>(),
+                tableSchemas,
+                0);
     }
 
     private Map<TableId, TableChange> discoverCapturedTableSchemas() {
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlHybridSplitAssigner.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlHybridSplitAssigner.java
index 5b6fbaff5e..5576786e60 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlHybridSplitAssigner.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlHybridSplitAssigner.java
@@ -218,8 +218,8 @@ public class MySqlHybridSplitAssigner implements 
MySqlSplitAssigner {
         boolean divideMetaToGroups = finishedSnapshotSplitInfos.size() > 
splitMetaGroupSize;
         return new MySqlBinlogSplit(
                 BINLOG_SPLIT_ID,
-                minBinlogOffset == null ? BinlogOffset.INITIAL_OFFSET : 
minBinlogOffset,
-                BinlogOffset.NO_STOPPING_OFFSET,
+                minBinlogOffset == null ? BinlogOffset.ofEarliest() : 
minBinlogOffset,
+                BinlogOffset.ofNonStopping(),
                 divideMetaToGroups ? new ArrayList<>() : 
finishedSnapshotSplitInfos,
                 new HashMap<>(),
                 finishedSnapshotSplitInfos.size());
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java
index 17e751b409..db9475ebcb 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java
@@ -282,14 +282,6 @@ public class MySqlSourceConfigFactory implements 
Serializable {
      * Specifies the startup options.
      */
     public MySqlSourceConfigFactory startupOptions(StartupOptions 
startupOptions) {
-        switch (startupOptions.startupMode) {
-            case INITIAL:
-            case LATEST_OFFSET:
-                break;
-            default:
-                throw new UnsupportedOperationException(
-                        "Unsupported startup mode: " + 
startupOptions.startupMode);
-        }
         this.startupOptions = startupOptions;
         return this;
     }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
index e7be00ec6c..8faabdca41 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
@@ -162,9 +162,9 @@ public class MySqlSourceOptions {
                     .withDescription(
                             "Optional offsets used in case of 
\"specific-offset\" startup mode");
 
-    public static final ConfigOption<Integer> SCAN_STARTUP_SPECIFIC_OFFSET_POS 
=
+    public static final ConfigOption<Long> SCAN_STARTUP_SPECIFIC_OFFSET_POS =
             ConfigOptions.key("scan.startup.specific-offset.pos")
-                    .intType()
+                    .longType()
                     .noDefaultValue()
                     .withDescription(
                             "Optional offsets used in case of 
\"specific-offset\" startup mode");
@@ -269,4 +269,24 @@ public class MySqlSourceOptions {
                     .defaultValue(false)
                     .withDescription(
                             "Whether capture the scan the newly added tables 
or not, by default is false.");
+
+    public static final ConfigOption<String> 
SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET =
+            ConfigOptions.key("scan.startup.specific-offset.gtid-set")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional GTID set used in case of 
\"specific-offset\" startup mode");
+
+    public static final ConfigOption<Long> 
SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS =
+            ConfigOptions.key("scan.startup.specific-offset.skip-events")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional number of events to skip after the 
specific starting offset");
+
+    public static final ConfigOption<Long> 
SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS =
+            ConfigOptions.key("scan.startup.specific-offset.skip-rows")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription("Optional number of rows to skip after 
the specific offset");
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffset.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffset.java
index 20c7e60ec6..04decfb9ba 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffset.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffset.java
@@ -19,15 +19,21 @@ package org.apache.inlong.sort.cdc.mysql.source.offset;
 
 import io.debezium.connector.mysql.GtidSet;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.kafka.connect.errors.ConnectException;
 
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
+import static 
org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffsetKind.EARLIEST;
+import static 
org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffsetKind.LATEST;
+import static 
org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffsetKind.NON_STOPPING;
+import static 
org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffsetKind.TIMESTAMP;
+
 /**
  * A structure describes a fine grained offset in a binlog event including 
binlog position and gtid
  * set etc.
@@ -48,43 +54,54 @@ public class BinlogOffset implements 
Comparable<BinlogOffset>, Serializable {
     public static final String GTID_SET_KEY = "gtids";
     public static final String TIMESTAMP_KEY = "ts_sec";
     public static final String SERVER_ID_KEY = "server_id";
-
-    public static final BinlogOffset INITIAL_OFFSET = new BinlogOffset("", 0);
-    public static final BinlogOffset NO_STOPPING_OFFSET = new BinlogOffset("", 
Long.MIN_VALUE);
+    public static final String OFFSET_KIND_KEY = "kind";
 
     private final Map<String, String> offset;
 
-    public BinlogOffset(Map<String, String> offset) {
-        this.offset = offset;
+    // ------------------------------- Builders 
--------------------------------
+
+    /** Create a {@link BinlogOffsetBuilder}. */
+    public static BinlogOffsetBuilder builder() {
+        return new BinlogOffsetBuilder();
     }
 
-    public BinlogOffset(String filename, long position) {
-        this(filename, position, 0L, 0L, 0L, null, null);
+    /** Create offset from binlog filename and position. */
+    public static BinlogOffset ofBinlogFilePosition(String filename, long 
position) {
+        return builder().setBinlogFilePosition(filename, position).build();
     }
 
-    public BinlogOffset(
-            String filename,
-            long position,
-            long restartSkipEvents,
-            long restartSkipRows,
-            long binlogEpochSecs,
-            @Nullable String restartGtidSet,
-            @Nullable Integer serverId) {
-        Map<String, String> offsetMap = new HashMap<>();
-        offsetMap.put(BINLOG_FILENAME_OFFSET_KEY, filename);
-        offsetMap.put(BINLOG_POSITION_OFFSET_KEY, String.valueOf(position));
-        offsetMap.put(EVENTS_TO_SKIP_OFFSET_KEY, 
String.valueOf(restartSkipEvents));
-        offsetMap.put(ROWS_TO_SKIP_OFFSET_KEY, 
String.valueOf(restartSkipRows));
-        offsetMap.put(TIMESTAMP_KEY, String.valueOf(binlogEpochSecs));
-        if (restartGtidSet != null) {
-            offsetMap.put(GTID_SET_KEY, restartGtidSet);
-        }
-        if (serverId != null) {
-            offsetMap.put(SERVER_ID_KEY, String.valueOf(serverId));
-        }
-        this.offset = offsetMap;
+    /** Create offset from GTID set. */
+    public static BinlogOffset ofGtidSet(String gtidSet) {
+        return builder().setBinlogFilePosition("", 
0).setGtidSet(gtidSet).build();
+    }
+
+    /** Create offset which represents the earliest accessible binlog offset. 
*/
+    public static BinlogOffset ofEarliest() {
+        return builder().setOffsetKind(EARLIEST).build();
     }
 
+    /** Create offset which represents the latest offset at the point of 
access. */
+    public static BinlogOffset ofLatest() {
+        return builder().setOffsetKind(LATEST).build();
+    }
+
+    /** Create offset specified by a timestamp in second. */
+    public static BinlogOffset ofTimestampSec(long timestampSec) {
+        return 
builder().setOffsetKind(TIMESTAMP).setTimestampSec(timestampSec).build();
+    }
+
+    @Internal
+    public static BinlogOffset ofNonStopping() {
+        return builder().setOffsetKind(NON_STOPPING).build();
+    }
+
+    @VisibleForTesting
+    public BinlogOffset(Map<String, String> offset) {
+        this.offset = offset;
+    }
+
+    // ------------------------------ Field getters 
-----------------------------
+
     public Map<String, String> getOffset() {
         return offset;
     }
@@ -109,7 +126,7 @@ public class BinlogOffset implements 
Comparable<BinlogOffset>, Serializable {
         return offset.get(GTID_SET_KEY);
     }
 
-    public long getTimestamp() {
+    public long getTimestampSec() {
         return longOffsetValue(offset, TIMESTAMP_KEY);
     }
 
@@ -117,6 +134,14 @@ public class BinlogOffset implements 
Comparable<BinlogOffset>, Serializable {
         return longOffsetValue(offset, SERVER_ID_KEY);
     }
 
+    @Nullable
+    public BinlogOffsetKind getOffsetKind() {
+        if (offset.get(OFFSET_KIND_KEY) == null) {
+            return null;
+        }
+        return BinlogOffsetKind.valueOf(offset.get(OFFSET_KIND_KEY));
+    }
+
     private long longOffsetValue(Map<String, ?> values, String key) {
         Object obj = values.get(key);
         if (obj == null) {
@@ -142,14 +167,14 @@ public class BinlogOffset implements 
Comparable<BinlogOffset>, Serializable {
      */
     @Override
     public int compareTo(BinlogOffset that) {
-        // the NO_STOPPING_OFFSET is the max offset
-        if (NO_STOPPING_OFFSET.equals(that) && 
NO_STOPPING_OFFSET.equals(this)) {
+        // the NON_STOPPING is the max offset
+        if (that.getOffsetKind() == NON_STOPPING && this.getOffsetKind() == 
NON_STOPPING) {
             return 0;
         }
-        if (NO_STOPPING_OFFSET.equals(this)) {
+        if (this.getOffsetKind() == NON_STOPPING) {
             return 1;
         }
-        if (NO_STOPPING_OFFSET.equals(that)) {
+        if (that.getOffsetKind() == NON_STOPPING) {
             return -1;
         }
 
@@ -201,8 +226,8 @@ public class BinlogOffset implements 
Comparable<BinlogOffset>, Serializable {
             // the only thing we can do
             // is compare timestamps, and we have to assume that the server 
timestamps can be
             // compared ...
-            long timestamp = this.getTimestamp();
-            long targetTimestamp = that.getTimestamp();
+            long timestamp = this.getTimestampSec();
+            long targetTimestamp = that.getTimestampSec();
             return Long.compare(timestamp, targetTimestamp);
         }
 
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffsetBuilder.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffsetBuilder.java
new file mode 100644
index 0000000000..4b00e5e2f5
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffsetBuilder.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.mysql.source.offset;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static 
org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffsetKind.EARLIEST;
+import static 
org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffsetKind.NON_STOPPING;
+
+/** Builder for {@link BinlogOffset}. */
+public class BinlogOffsetBuilder {
+
+    private final Map<String, String> offsetMap = new HashMap<>();
+
+    public BinlogOffsetBuilder() {
+        // Initialize default values
+        offsetMap.put(BinlogOffset.EVENTS_TO_SKIP_OFFSET_KEY, "0");
+        offsetMap.put(BinlogOffset.ROWS_TO_SKIP_OFFSET_KEY, "0");
+        offsetMap.put(BinlogOffset.TIMESTAMP_KEY, "0");
+        offsetMap.put(BinlogOffset.OFFSET_KIND_KEY, 
String.valueOf(BinlogOffsetKind.SPECIFIC));
+    }
+
+    public BinlogOffsetBuilder setBinlogFilePosition(String filename, long 
position) {
+        offsetMap.put(BinlogOffset.BINLOG_FILENAME_OFFSET_KEY, filename);
+        offsetMap.put(BinlogOffset.BINLOG_POSITION_OFFSET_KEY, 
String.valueOf(position));
+        return this;
+    }
+
+    public BinlogOffsetBuilder setSkipEvents(long skipEvents) {
+        offsetMap.put(BinlogOffset.EVENTS_TO_SKIP_OFFSET_KEY, 
String.valueOf(skipEvents));
+        return this;
+    }
+
+    public BinlogOffsetBuilder setSkipRows(long skipRows) {
+        offsetMap.put(BinlogOffset.ROWS_TO_SKIP_OFFSET_KEY, 
String.valueOf(skipRows));
+        return this;
+    }
+
+    public BinlogOffsetBuilder setTimestampSec(long timestampSec) {
+        offsetMap.put(BinlogOffset.TIMESTAMP_KEY, 
String.valueOf(timestampSec));
+        return this;
+    }
+
+    public BinlogOffsetBuilder setGtidSet(String gtidSet) {
+        offsetMap.put(BinlogOffset.GTID_SET_KEY, gtidSet);
+        return this;
+    }
+
+    public BinlogOffsetBuilder setServerId(long serverId) {
+        offsetMap.put(BinlogOffset.SERVER_ID_KEY, String.valueOf(serverId));
+        return this;
+    }
+
+    public BinlogOffsetBuilder setOffsetKind(BinlogOffsetKind offsetKind) {
+        offsetMap.put(BinlogOffset.OFFSET_KIND_KEY, 
String.valueOf(offsetKind));
+        if (offsetKind == EARLIEST) {
+            setBinlogFilePosition("", 0);
+        }
+        if (offsetKind == NON_STOPPING) {
+            setBinlogFilePosition("", Long.MIN_VALUE);
+        }
+        return this;
+    }
+
+    public BinlogOffsetBuilder setOffsetMap(Map<String, String> offsetMap) {
+        this.offsetMap.putAll(offsetMap);
+        return this;
+    }
+
+    public BinlogOffset build() {
+        sanityCheck();
+        return new BinlogOffset(offsetMap);
+    }
+
+    private void sanityCheck() {
+        checkArgument(
+                offsetMap.containsKey(BinlogOffset.OFFSET_KIND_KEY),
+                "Binlog offset kind is required");
+        BinlogOffsetKind offsetKind =
+                
BinlogOffsetKind.valueOf(offsetMap.get(BinlogOffset.OFFSET_KIND_KEY));
+        switch (offsetKind) {
+            case SPECIFIC:
+                checkArgument(
+                        
offsetMap.containsKey(BinlogOffset.BINLOG_FILENAME_OFFSET_KEY)
+                                || 
offsetMap.containsKey(BinlogOffset.BINLOG_POSITION_OFFSET_KEY)
+                                || 
offsetMap.containsKey(BinlogOffset.GTID_SET_KEY),
+                        "Either binlog file / position or GTID set is required 
if offset kind is SPECIFIC");
+                break;
+            case TIMESTAMP:
+                
checkArgument(offsetMap.containsKey(BinlogOffset.TIMESTAMP_KEY));
+                break;
+            case EARLIEST:
+            case LATEST:
+            case NON_STOPPING:
+                break;
+            default:
+                throw new IllegalArgumentException(
+                        String.format("Unrecognized offset kind \"%s\"", 
offsetKind));
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffsetKind.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffsetKind.java
new file mode 100644
index 0000000000..9eaeb8749f
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffsetKind.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.mysql.source.offset;
+
+/**
+ * Predefined kind of binlog offset.
+ *
+ * <p>Binlog offset kind describes some special binlog offsets such as 
earliest accessible binlog,
+ * the latest binlog offset currently, and a specific offset described by 
binlog file + position or
+ * GTID set.
+ */
+public enum BinlogOffsetKind {
+
+    /** The earliest accessible offset in the binlog. */
+    EARLIEST,
+
+    /** The latest offset in the binlog. */
+    LATEST,
+
+    /**
+     * The binlog offset is described by a timestamp, for example "1667232000" 
means start reading
+     * from events happens at 2022-11-01 00:00:00.
+     */
+    TIMESTAMP,
+
+    /**
+     * A specific offset described by binlog file name and position, or GTID 
set if GTID is enabled
+     * on the cluster. For example:
+     *
+     * <ul>
+     *   <li>Binlog file "mysql-bin.000002" and position 4
+     *   <li>GTID set "24DA167-0C0C-11E8-8442-00059A3C7B00:1-19"
+     * </ul>
+     */
+    SPECIFIC,
+
+    /**
+     * A special offset indicating there's no ending offsets for the binlog 
reader.
+     *
+     * <p>Please note that this is an INTERNAL kind and should not be used for 
specifying starting
+     * offset.
+     */
+    NON_STOPPING
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffsetUtils.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffsetUtils.java
new file mode 100644
index 0000000000..5fb0d47856
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffsetUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.mysql.source.offset;
+
+import org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils;
+import 
org.apache.inlong.sort.cdc.mysql.debezium.task.context.StatefulTaskContext;
+
+import io.debezium.connector.mysql.MySqlConnection;
+
+/** Utils for handling {@link BinlogOffset}. */
+public class BinlogOffsetUtils {
+
+    /**
+     * Initialize the binlog offset according to the kind of binlog offset, so 
that the debezium
+     * reader could interpret it and seek the reader to the offset.
+     *
+     * <p>This method will be used in binlog reading phase, when the {@link 
StatefulTaskContext} is
+     * being initialized to load the actual effective binlog offset.
+     *
+     * <p>The binlog offset kind will be overridden to {@link 
BinlogOffsetKind#SPECIFIC} after the
+     * initialization, as the initialized effective offset describes a 
specific position in binlog.
+     *
+     * <p>Initialization strategy:
+     *
+     * <ul>
+     *   <li>EARLIEST: binlog filename = "", position = 0
+     *   <li>TIMESTAMP: set to earliest, as the current implementation is 
reading from the earliest
+     *       offset and drop events earlier than the specified timestamp.
+     *   <li>LATEST: fetch the current binlog by JDBC
+     * </ul>
+     */
+    public static BinlogOffset initializeEffectiveOffset(
+            BinlogOffset offset, MySqlConnection connection) {
+        BinlogOffsetKind offsetKind = offset.getOffsetKind();
+        switch (offsetKind) {
+            case EARLIEST:
+            case TIMESTAMP:
+                return BinlogOffset.ofBinlogFilePosition("", 0);
+            case LATEST:
+                return DebeziumUtils.currentBinlogOffset(connection);
+            default:
+                return offset;
+        }
+    }
+
+    public static boolean isNonStoppingOffset(BinlogOffset binlogOffset) {
+        return 
BinlogOffsetKind.NON_STOPPING.equals(binlogOffset.getOffsetKind());
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
index 91a3c96841..2337cfc230 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
@@ -152,20 +152,13 @@ public class MySqlSourceReader<T>
         if (suspendedBinlogSplit != null) {
             unfinishedSplits.add(suspendedBinlogSplit);
         }
-        SourceTableMetricData sourceMetricData = 
sourceReaderMetrics.getSourceMetricData();
-        LOG.info("inlong-metric-states snapshot sourceMetricData:{}", 
sourceMetricData);
-        if (sourceMetricData != null) {
-            long countNumBytesIn = sourceMetricData.getNumBytesIn().getCount();
-            long countNumRecordsIn = 
sourceMetricData.getNumRecordsIn().getCount();
-            Map<String, Long> readPhaseMetricMap = 
sourceMetricData.getReadPhaseMetricMap().entrySet().stream().collect(
-                    Collectors.toMap(v -> v.getKey().getPhase(), e -> 
e.getValue().getReadPhase().getCount()));
-            Map<String, MySqlTableMetric> tableMetricMap = 
sourceMetricData.getSubSourceMetricMap().entrySet().stream()
-                    .collect(Collectors.toMap(Entry::getKey,
-                            e -> new 
MySqlTableMetric(e.getValue().getNumRecordsIn().getCount(),
-                                    e.getValue().getNumBytesIn().getCount())));
-            unfinishedSplits
-                    .add(new MySqlMetricSplit(countNumBytesIn, 
countNumRecordsIn, readPhaseMetricMap, tableMetricMap));
-        }
+
+        // add mysql metric split
+        addMySqlMetricSplit(unfinishedSplits);
+
+        // log current binlog offsets
+        logCurrentBinlogOffsets(unfinishedSplits, checkpointId);
+
         return unfinishedSplits;
     }
 
@@ -390,6 +383,36 @@ public class MySqlSourceReader<T>
         }
     }
 
+    private void addMySqlMetricSplit(List<MySqlSplit> unfinishedSplits) {
+        SourceTableMetricData sourceMetricData = 
sourceReaderMetrics.getSourceMetricData();
+        LOG.info("inlong-metric-states snapshot sourceMetricData:{}", 
sourceMetricData);
+        if (sourceMetricData != null) {
+            long countNumBytesIn = sourceMetricData.getNumBytesIn().getCount();
+            long countNumRecordsIn = 
sourceMetricData.getNumRecordsIn().getCount();
+            Map<String, Long> readPhaseMetricMap = 
sourceMetricData.getReadPhaseMetricMap().entrySet().stream().collect(
+                    Collectors.toMap(v -> v.getKey().getPhase(), e -> 
e.getValue().getReadPhase().getCount()));
+            Map<String, MySqlTableMetric> tableMetricMap = 
sourceMetricData.getSubSourceMetricMap().entrySet().stream()
+                    .collect(Collectors.toMap(Entry::getKey,
+                            e -> new 
MySqlTableMetric(e.getValue().getNumRecordsIn().getCount(),
+                                    e.getValue().getNumBytesIn().getCount())));
+            unfinishedSplits
+                    .add(new MySqlMetricSplit(countNumBytesIn, 
countNumRecordsIn, readPhaseMetricMap, tableMetricMap));
+        }
+    }
+
+    private void logCurrentBinlogOffsets(List<MySqlSplit> splits, long 
checkpointId) {
+        if (!LOG.isInfoEnabled()) {
+            return;
+        }
+        for (MySqlSplit split : splits) {
+            if (!split.isBinlogSplit()) {
+                return;
+            }
+            BinlogOffset offset = split.asBinlogSplit().getStartingOffset();
+            LOG.info("Binlog offset on checkpoint {}: {}", checkpointId, 
offset);
+        }
+    }
+
     @Override
     protected MySqlSplit toSplitType(String splitId, MySqlSplitState 
splitState) {
         return splitState.toMySqlSplit();
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
index eaddb356d8..e3f32b6fa3 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
@@ -367,7 +367,7 @@ public class RecordUtils {
             List<FinishedSnapshotSplitInfo> finishedSnapshotSplits) {
         BinlogOffset startOffset =
                 finishedSnapshotSplits.isEmpty()
-                        ? BinlogOffset.INITIAL_OFFSET
+                        ? BinlogOffset.ofEarliest()
                         : finishedSnapshotSplits.get(0).getHighWatermark();
         for (FinishedSnapshotSplitInfo finishedSnapshotSplit : 
finishedSnapshotSplits) {
             if 
(finishedSnapshotSplit.getHighWatermark().isBefore(startOffset)) {
@@ -410,7 +410,7 @@ public class RecordUtils {
             offsetStrMap.put(
                     entry.getKey(), entry.getValue() == null ? null : 
entry.getValue().toString());
         }
-        return new BinlogOffset(offsetStrMap);
+        return BinlogOffset.builder().setOffsetMap(offsetStrMap).build();
     }
 
     /**
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/SerializerUtils.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/SerializerUtils.java
index 95bab14cf3..4f7b85f6a2 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/SerializerUtils.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/SerializerUtils.java
@@ -18,10 +18,12 @@
 package org.apache.inlong.sort.cdc.mysql.source.utils;
 
 import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffset;
+import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffsetKind;
 import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffsetSerializer;
 
 import io.debezium.DebeziumException;
 import io.debezium.util.HexConverter;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
 
@@ -53,7 +55,7 @@ public class SerializerUtils {
             throws IOException {
         switch (offsetVersion) {
             case 1:
-                return in.readBoolean() ? new BinlogOffset(in.readUTF(), 
in.readLong()) : null;
+                return in.readBoolean() ? 
BinlogOffset.ofBinlogFilePosition(in.readUTF(), in.readLong()) : null;
             case 2:
             case 3:
             case 4:
@@ -69,7 +71,22 @@ public class SerializerUtils {
             int binlogOffsetBytesLength = in.readInt();
             byte[] binlogOffsetBytes = new byte[binlogOffsetBytesLength];
             in.readFully(binlogOffsetBytes);
-            return 
BinlogOffsetSerializer.INSTANCE.deserialize(binlogOffsetBytes);
+            BinlogOffset offset = 
BinlogOffsetSerializer.INSTANCE.deserialize(binlogOffsetBytes);
+            // Old version of binlog offset without offset kind
+            if (offset.getOffsetKind() == null) {
+                if (StringUtils.isEmpty(offset.getFilename()) && 
offset.getPosition() == Long.MIN_VALUE) {
+                    return BinlogOffset.ofNonStopping();
+                }
+                if (StringUtils.isEmpty(offset.getFilename()) && 
offset.getPosition() == 0L) {
+                    return BinlogOffset.ofEarliest();
+                }
+                // For other cases we treat it as a specific offset
+                return BinlogOffset.builder()
+                        .setOffsetKind(BinlogOffsetKind.SPECIFIC)
+                        .setOffsetMap(offset.getOffset())
+                        .build();
+            }
+            return offset;
         } else {
             return null;
         }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
index 0ee5276479..0234e8e283 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
@@ -20,6 +20,8 @@ package org.apache.inlong.sort.cdc.mysql.table;
 import org.apache.inlong.sort.cdc.base.debezium.table.DebeziumOptions;
 import org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions;
 import org.apache.inlong.sort.cdc.mysql.source.config.ServerIdRange;
+import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffset;
+import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffsetBuilder;
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ReadableConfig;
@@ -28,11 +30,11 @@ import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.flink.util.Preconditions;
 
 import java.time.Duration;
 import java.time.ZoneId;
 import java.util.HashSet;
+import java.util.Optional;
 import java.util.Set;
 import java.util.regex.Pattern;
 
@@ -63,7 +65,10 @@ import static 
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.
 import static 
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
 import static 
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.SCAN_STARTUP_MODE;
 import static 
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE;
+import static 
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET;
 import static 
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS;
+import static 
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS;
+import static 
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS;
 import static 
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
 import static 
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.SERVER_ID;
 import static 
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.SERVER_TIME_ZONE;
@@ -96,17 +101,14 @@ public class MySqlTableInlongSourceFactory implements 
DynamicTableSourceFactory
                 return StartupOptions.latest();
 
             case SCAN_STARTUP_MODE_VALUE_EARLIEST:
+                return StartupOptions.earliest();
+
             case SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET:
+                validateSpecificOffset(config);
+                return getSpecificOffset(config);
+
             case SCAN_STARTUP_MODE_VALUE_TIMESTAMP:
-                throw new ValidationException(
-                        String.format(
-                                "Unsupported option value '%s', the options 
[%s, %s, %s] "
-                                        + "are not supported correctly, "
-                                        + "please do not use them until 
they're correctly supported",
-                                modeString,
-                                SCAN_STARTUP_MODE_VALUE_EARLIEST,
-                                SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET,
-                                SCAN_STARTUP_MODE_VALUE_TIMESTAMP));
+                return 
StartupOptions.timestamp(config.get(SCAN_STARTUP_TIMESTAMP_MILLIS));
 
             default:
                 throw new ValidationException(
@@ -163,7 +165,6 @@ public class MySqlTableInlongSourceFactory implements 
DynamicTableSourceFactory
         final boolean ghostDdlChange = config.get(GH_OST_DDL_CHANGE);
         final String ghostTableRegex = config.get(GH_OST_TABLE_REGEX);
         if (enableParallelRead) {
-            validateStartupOptionIfEnableParallel(startupOptions);
             validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 
splitSize, 1);
             validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 
1);
             validateIntegerOption(SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1);
@@ -233,6 +234,9 @@ public class MySqlTableInlongSourceFactory implements 
DynamicTableSourceFactory
         options.add(SCAN_STARTUP_MODE);
         options.add(SCAN_STARTUP_SPECIFIC_OFFSET_FILE);
         options.add(SCAN_STARTUP_SPECIFIC_OFFSET_POS);
+        options.add(SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET);
+        options.add(SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS);
+        options.add(SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS);
         options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
         options.add(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
         options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
@@ -267,17 +271,6 @@ public class MySqlTableInlongSourceFactory implements 
DynamicTableSourceFactory
         }
     }
 
-    private void validateStartupOptionIfEnableParallel(StartupOptions 
startupOptions) {
-        // validate mode
-        Preconditions.checkState(
-                startupOptions.startupMode == StartupMode.INITIAL
-                        || startupOptions.startupMode == 
StartupMode.LATEST_OFFSET,
-                String.format(
-                        "MySql Parallel Source only supports startup mode 
'initial' and 'latest-offset',"
-                                + " but actual is %s",
-                        startupOptions.startupMode));
-    }
-
     private String validateAndGetServerId(ReadableConfig configuration) {
         final String serverIdValue = 
configuration.get(MySqlSourceOptions.SERVER_ID);
         if (serverIdValue != null) {
@@ -351,4 +344,48 @@ public class MySqlTableInlongSourceFactory implements 
DynamicTableSourceFactory
                         1.0d,
                         distributionFactorLower));
     }
+
+    private static void validateSpecificOffset(ReadableConfig config) {
+        Optional<String> gtidSet = config.getOptional(
+                MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET);
+        Optional<String> binlogFilename = config.getOptional(
+                MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE);
+        Optional<Long> binlogPosition = config.getOptional(
+                MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS);
+        if (!gtidSet.isPresent() && !(binlogFilename.isPresent() && 
binlogPosition.isPresent())) {
+            throw new ValidationException(
+                    String.format(
+                            "Unable to find a valid binlog offset. Either %s, 
or %s and %s are required.",
+                            
MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET.key(),
+                            
MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE.key(),
+                            
MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS.key()));
+        }
+    }
+
+    private static StartupOptions getSpecificOffset(ReadableConfig config) {
+        BinlogOffsetBuilder offsetBuilder = BinlogOffset.builder();
+
+        // GTID set
+        config.getOptional(
+                MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET)
+                .ifPresent(offsetBuilder::setGtidSet);
+
+        // Binlog file + pos
+        Optional<String> binlogFilename = 
config.getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE);
+        Optional<Long> binlogPosition = 
config.getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS);
+        if (binlogFilename.isPresent() && binlogPosition.isPresent()) {
+            offsetBuilder.setBinlogFilePosition(binlogFilename.get(), 
binlogPosition.get());
+        } else {
+            offsetBuilder.setBinlogFilePosition("", 0);
+        }
+
+        config.getOptional(
+                MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS)
+                .ifPresent(offsetBuilder::setSkipEvents);
+        config.getOptional(
+                MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS)
+                .ifPresent(offsetBuilder::setSkipRows);
+        return StartupOptions.specificOffset(offsetBuilder.build());
+    }
+
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/StartupOptions.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/StartupOptions.java
index 172c6ae0ac..d97cbe1870 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/StartupOptions.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/StartupOptions.java
@@ -17,6 +17,10 @@
 
 package org.apache.inlong.sort.cdc.mysql.table;
 
+import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffset;
+
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 import java.util.Objects;
 
@@ -28,16 +32,15 @@ public final class StartupOptions implements Serializable {
     private static final long serialVersionUID = 1L;
 
     public final StartupMode startupMode;
-    public final String specificOffsetFile;
-    public final Integer specificOffsetPos;
-    public final Long startupTimestampMillis;
+    @Nullable
+    public final BinlogOffset binlogOffset;
 
     /**
      * Performs an initial snapshot on the monitored database tables upon 
first startup, and
      * continue to read the latest binlog.
      */
     public static StartupOptions initial() {
-        return new StartupOptions(StartupMode.INITIAL, null, null, null);
+        return new StartupOptions(StartupMode.INITIAL, null);
     }
 
     /**
@@ -46,7 +49,7 @@ public final class StartupOptions implements Serializable {
      * binlog is guaranteed to contain the entire history of the database.
      */
     public static StartupOptions earliest() {
-        return new StartupOptions(StartupMode.EARLIEST_OFFSET, null, null, 
null);
+        return new StartupOptions(StartupMode.EARLIEST_OFFSET, 
BinlogOffset.ofEarliest());
     }
 
     /**
@@ -54,16 +57,25 @@ public final class StartupOptions implements Serializable {
      * the end of the binlog which means only have the changes since the 
connector was started.
      */
     public static StartupOptions latest() {
-        return new StartupOptions(StartupMode.LATEST_OFFSET, null, null, null);
+        return new StartupOptions(StartupMode.LATEST_OFFSET, 
BinlogOffset.ofLatest());
     }
 
     /**
      * Never to perform snapshot on the monitored database tables upon first 
startup, and directly
      * read binlog from the specified offset.
      */
-    public static StartupOptions specificOffset(String specificOffsetFile, int 
specificOffsetPos) {
+    public static StartupOptions specificOffset(String specificOffsetFile, 
long specificOffsetPos) {
         return new StartupOptions(
-                StartupMode.SPECIFIC_OFFSETS, specificOffsetFile, 
specificOffsetPos, null);
+                StartupMode.SPECIFIC_OFFSETS,
+                BinlogOffset.ofBinlogFilePosition(specificOffsetFile, 
specificOffsetPos));
+    }
+
+    public static StartupOptions specificOffset(String gtidSet) {
+        return new StartupOptions(StartupMode.SPECIFIC_OFFSETS, 
BinlogOffset.ofGtidSet(gtidSet));
+    }
+
+    public static StartupOptions specificOffset(BinlogOffset binlogOffset) {
+        return new StartupOptions(StartupMode.SPECIFIC_OFFSETS, binlogOffset);
     }
 
     /**
@@ -76,33 +88,15 @@ public final class StartupOptions implements Serializable {
      * @param startupTimestampMillis timestamp for the startup offsets, as 
milliseconds from epoch.
      */
     public static StartupOptions timestamp(long startupTimestampMillis) {
-        return new StartupOptions(StartupMode.TIMESTAMP, null, null, 
startupTimestampMillis);
+        return new StartupOptions(StartupMode.TIMESTAMP, 
BinlogOffset.ofTimestampSec(startupTimestampMillis / 1000));
     }
 
-    private StartupOptions(
-            StartupMode startupMode,
-            String specificOffsetFile,
-            Integer specificOffsetPos,
-            Long startupTimestampMillis) {
+    private StartupOptions(StartupMode startupMode, BinlogOffset binlogOffset) 
{
         this.startupMode = startupMode;
-        this.specificOffsetFile = specificOffsetFile;
-        this.specificOffsetPos = specificOffsetPos;
-        this.startupTimestampMillis = startupTimestampMillis;
-
-        switch (startupMode) {
-            case INITIAL:
-            case EARLIEST_OFFSET:
-            case LATEST_OFFSET:
-                break;
-            case SPECIFIC_OFFSETS:
-                checkNotNull(specificOffsetFile, "specificOffsetFile shouldn't 
be null");
-                checkNotNull(specificOffsetPos, "specificOffsetPos shouldn't 
be null");
-                break;
-            case TIMESTAMP:
-                checkNotNull(startupTimestampMillis, "startupTimestampMillis 
shouldn't be null");
-                break;
-            default:
-                throw new UnsupportedOperationException(startupMode + " mode 
is not supported.");
+        this.binlogOffset = binlogOffset;
+        if (startupMode != StartupMode.INITIAL) {
+            checkNotNull(
+                    binlogOffset, "Binlog offset is required if startup mode 
is %s", startupMode);
         }
     }
 
@@ -115,15 +109,11 @@ public final class StartupOptions implements Serializable 
{
             return false;
         }
         StartupOptions that = (StartupOptions) o;
-        return startupMode == that.startupMode
-                && Objects.equals(specificOffsetFile, that.specificOffsetFile)
-                && Objects.equals(specificOffsetPos, that.specificOffsetPos)
-                && Objects.equals(startupTimestampMillis, 
that.startupTimestampMillis);
+        return startupMode == that.startupMode && Objects.equals(binlogOffset, 
that.binlogOffset);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(
-                startupMode, specificOffsetFile, specificOffsetPos, 
startupTimestampMillis);
+        return Objects.hash(startupMode, binlogOffset);
     }
 }
diff --git a/licenses/inlong-sort-connectors/LICENSE 
b/licenses/inlong-sort-connectors/LICENSE
index 5310204943..55d8862b8c 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -344,96 +344,100 @@
       
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/src/main/java/org/apache/inlong/sort/cdc/base/util/DatabaseHistoryUtil.java
       
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/src/main/java/org/apache/inlong/sort/cdc/base/util/RecordUtils.java
       
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/src/main/java/org/apache/inlong/sort/cdc/base/util/TemporalConversions.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSnapshotSplit.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSchemalessSnapshotSplit.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReaderContext.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/schema/MySqlTypeUtils.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/StatementUtils.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/schema/MySqlSchema.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlDeserializationConverterFactory.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/connection/JdbcConnectionFactory.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/FinishedSnapshotSplitsReportEvent.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/state/HybridPendingSplitsState.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/connection/ConnectionPoolId.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/JsonDebeziumDeserializationSchema.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/connection/ConnectionPools.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlRecords.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/SnapshotSplitReader.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/enumerator/MySqlSourceEnumerator.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/schema/MySqlTableDefinition.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/BinlogSplitMetaRequestEvent.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/Validator.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/StatefulTaskContext.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/DebeziumUtils.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplit.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlHybridSplitAssigner.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfig.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlBinlogSplit.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSnapshotSplitState.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/ChunkUtils.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/LatestFinishedSplitsSizeRequestEvent.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/dispatcher/EventDispatcherImpl.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlSnapshotSplitReadTask.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffsetSerializer.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/ChunkRange.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/TableDiscoveryUtils.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/state/PendingSplitsStateSerializer.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/state/BinlogPendingSplitsState.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSplitReader.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/MySqlErrorHandler.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlBinlogSplitAssigner.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/SerializerUtils.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/MySqlSource.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/AssignerStatus.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/state/SnapshotPendingSplitsState.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/StringDebeziumDeserializationSchema.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/ChunkSplitter.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/FinishedSnapshotSplitsAckEvent.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffset.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/StartupMode.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/OldFieldMetadataConverter.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/ObjectUtils.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/BinlogSplitMetaEvent.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/state/PendingSplitsState.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitState.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/SuspendBinlogReaderEvent.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/FinishedSnapshotSplitsRequestEvent.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/StartupOptions.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/JdbcUrlUtils.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/ServerIdRange.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/EmbeddedFlinkDatabaseHistory.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlBinlogSplitState.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/MySqlValidator.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/WakeupReaderEvent.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/DebeziumReader.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/dispatcher/SignalEventDispatcher.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSplitAssigner.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/SeekBinlogToTimestampFilter.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlBinlogSplitReadTask.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/LatestFinishedSplitsSizeEvent.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/connection/PooledDataSourceFactory.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/connection/JdbcConnectionPools.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSourceBuilder.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/SuspendBinlogReaderAckEvent.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/MySqlTaskContextImpl.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/schema/MySqlFieldDefinition.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/FinishedSnapshotSplitInfo.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/DebeziumChangeFetcher.java
-      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/DebeziumChangeConsumer.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSnapshotSplit.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSchemalessSnapshotSplit.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReaderContext.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/schema/MySqlTypeUtils.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/StatementUtils.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/schema/MySqlSchema.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlDeserializationConverterFactory.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/connection/JdbcConnectionFactory.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/FinishedSnapshotSplitsReportEvent.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/state/HybridPendingSplitsState.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/connection/ConnectionPoolId.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/JsonDebeziumDeserializationSchema.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/connection/ConnectionPools.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlRecords.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/SnapshotSplitReader.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/enumerator/MySqlSourceEnumerator.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/schema/MySqlTableDefinition.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/BinlogSplitMetaRequestEvent.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/Validator.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/StatefulTaskContext.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/DebeziumUtils.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplit.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlHybridSplitAssigner.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfig.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlBinlogSplit.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSnapshotSplitState.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/ChunkUtils.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/LatestFinishedSplitsSizeRequestEvent.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/dispatcher/EventDispatcherImpl.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlSnapshotSplitReadTask.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffsetSerializer.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/ChunkRange.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/TableDiscoveryUtils.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/state/PendingSplitsStateSerializer.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/state/BinlogPendingSplitsState.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSplitReader.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/exception/SchemaOutOfSyncException.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/MySqlErrorHandler.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlBinlogSplitAssigner.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/SerializerUtils.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/MySqlSource.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/AssignerStatus.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/state/SnapshotPendingSplitsState.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/StringDebeziumDeserializationSchema.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/ChunkSplitter.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/FinishedSnapshotSplitsAckEvent.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffset.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffsetBuilder.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffsetKind.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffsetUtils.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/StartupMode.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/OldFieldMetadataConverter.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/ObjectUtils.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/BinlogSplitMetaEvent.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/state/PendingSplitsState.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitState.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/SuspendBinlogReaderEvent.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/FinishedSnapshotSplitsRequestEvent.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/StartupOptions.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/JdbcUrlUtils.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/ServerIdRange.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/EmbeddedFlinkDatabaseHistory.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlBinlogSplitState.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/MySqlValidator.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/WakeupReaderEvent.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/DebeziumReader.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/dispatcher/SignalEventDispatcher.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSplitAssigner.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/SeekBinlogToTimestampFilter.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlBinlogSplitReadTask.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/LatestFinishedSplitsSizeEvent.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/connection/PooledDataSourceFactory.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/connection/JdbcConnectionPools.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSourceBuilder.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/SuspendBinlogReaderAckEvent.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/MySqlTaskContextImpl.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/schema/MySqlFieldDefinition.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/FinishedSnapshotSplitInfo.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/DebeziumChangeFetcher.java
+      
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/DebeziumChangeConsumer.java
       
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/PostgreSQLSource.java
       
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/DebeziumSourceFunction.java
       
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/debezium/internal/ColumnImpl.java

Reply via email to