This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 9c56a7ac05 [INLONG-8175][Sort] MySQL CDC support read data from
specific timestamp / earliest offset / specific offset (#8240)
9c56a7ac05 is described below
commit 9c56a7ac05de442cfc0031077a66741f708ee851
Author: emhui <[email protected]>
AuthorDate: Fri Jun 16 16:05:01 2023 +0800
[INLONG-8175][Sort] MySQL CDC support read data from specific timestamp /
earliest offset / specific offset (#8240)
---
.../inlong/sort/parser/MySqlLoadSqlParseTest.java | 136 +++++++++++++++
.../source/assigner/SnapshotSplitAssigner.java | 3 +-
.../apache/inlong/sort/cdc/mysql/MySqlSource.java | 7 +-
.../sort/cdc/mysql/debezium/DebeziumUtils.java | 6 +-
.../mysql/debezium/reader/BinlogSplitReader.java | 23 ++-
.../mysql/debezium/reader/SnapshotSplitReader.java | 3 +-
.../debezium/task/MySqlBinlogSplitReadTask.java | 13 +-
.../debezium/task/context/MySqlErrorHandler.java | 73 ++++++--
.../debezium/task/context/StatefulTaskContext.java | 10 +-
.../exception/SchemaOutOfSyncException.java | 29 ++++
.../source/assigners/MySqlBinlogSplitAssigner.java | 23 +--
.../source/assigners/MySqlHybridSplitAssigner.java | 4 +-
.../source/config/MySqlSourceConfigFactory.java | 8 -
.../mysql/source/config/MySqlSourceOptions.java | 24 ++-
.../sort/cdc/mysql/source/offset/BinlogOffset.java | 97 +++++++----
.../mysql/source/offset/BinlogOffsetBuilder.java | 118 +++++++++++++
.../cdc/mysql/source/offset/BinlogOffsetKind.java | 59 +++++++
.../cdc/mysql/source/offset/BinlogOffsetUtils.java | 64 +++++++
.../cdc/mysql/source/reader/MySqlSourceReader.java | 51 ++++--
.../sort/cdc/mysql/source/utils/RecordUtils.java | 4 +-
.../cdc/mysql/source/utils/SerializerUtils.java | 21 ++-
.../mysql/table/MySqlTableInlongSourceFactory.java | 81 ++++++---
.../sort/cdc/mysql/table/StartupOptions.java | 66 ++++----
licenses/inlong-sort-connectors/LICENSE | 184 +++++++++++----------
24 files changed, 846 insertions(+), 261 deletions(-)
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlLoadSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlLoadSqlParseTest.java
index 473331e986..1c324b5532 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlLoadSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlLoadSqlParseTest.java
@@ -63,6 +63,58 @@ public class MySqlLoadSqlParseTest extends AbstractTestBase {
null, null);
}
+ /**
+ * build mysql extract node with specific timestamp
+ */
+ private MySqlExtractNode buildMySQLExtractNodeWithSpecificTimestamp() {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new
LongFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo()));
+ Map<String, String> map = new HashMap<>();
+ map.put("scan.startup.mode", "timestamp");
+ map.put("scan.startup.timestamp-millis", "1667232000000");
+ return new MySqlExtractNode("1", "mysql_input", fields,
+ null, map, "id",
+ Collections.singletonList("table_input"), "localhost",
"inlong", "inlong",
+ "inlong", null, null,
+ null, null);
+ }
+
+ /**
+ * build mysql extract node with specific binlog
+ */
+ private MySqlExtractNode buildMySQLExtractNodeWithSpecificBinlog() {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new
LongFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo()));
+ Map<String, String> map = new HashMap<>();
+ map.put("scan.startup.mode", "specific-offset");
+ map.put("scan.startup.specific-offset.file", "mysql-bin.000003");
+ map.put("scan.startup.specific-offset.pos", "4");
+ return new MySqlExtractNode("1", "mysql_input", fields,
+ null, map, "id",
+ Collections.singletonList("table_input"), "localhost",
"inlong", "inlong",
+ "inlong", null, null,
+ null, null);
+ }
+
+ /**
+ * build mysql extract node with specific gtid
+ */
+ private MySqlExtractNode buildMySQLExtractNodeWithSpecificGtid() {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new
LongFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo()));
+ Map<String, String> map = new HashMap<>();
+ map.put("scan.startup.mode", "specific-offset");
+ map.put("scan.startup.specific-offset.gtid-set",
"24DA167-0C0C-11E8-8442-00059A3C7B00:1-19");
+ return new MySqlExtractNode("1", "mysql_input", fields,
+ null, map, "id",
+ Collections.singletonList("table_input"), "localhost",
"inlong", "inlong",
+ "inlong", null, null,
+ null, null);
+ }
+
private Node buildMysqlLoadNode() {
List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new
LongFormatInfo()),
new FieldInfo("name", new StringFormatInfo()),
@@ -127,4 +179,88 @@ public class MySqlLoadSqlParseTest extends
AbstractTestBase {
Assert.assertTrue(result.tryExecute());
}
+ /**
+ * Test flink sql task for extract is mysql {@link MySqlExtractNode} and
load is mysql {@link MySqlLoadNode}
+ *
+ * @throws Exception The exception may be thrown when executing
+ */
+ @Test
+ public void testMySqlLoadStartupFromSpecificTimestampSqlParse() throws
Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(10000);
+ env.disableOperatorChaining();
+ EnvironmentSettings settings = EnvironmentSettings
+ .newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build();
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
+ Node inputNode = buildMySQLExtractNodeWithSpecificTimestamp();
+ Node outputNode = buildMysqlLoadNode();
+ StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode,
outputNode),
+
Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+ Collections.singletonList(outputNode))));
+ GroupInfo groupInfo = new GroupInfo("1",
Collections.singletonList(streamInfo));
+ FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv,
groupInfo);
+ ParseResult result = parser.parse();
+ Assert.assertTrue(result.tryExecute());
+ }
+
+ /**
+ * Test flink sql task for extract is mysql {@link MySqlExtractNode} and
load is mysql {@link MySqlLoadNode}
+ *
+ * @throws Exception The exception may be thrown when executing
+ */
+ @Test
+ public void testMySqlLoadStartupFromSpecificBinlogSqlParse() throws
Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(10000);
+ env.disableOperatorChaining();
+ EnvironmentSettings settings = EnvironmentSettings
+ .newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build();
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
+ Node inputNode = buildMySQLExtractNodeWithSpecificBinlog();
+ Node outputNode = buildMysqlLoadNode();
+ StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode,
outputNode),
+
Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+ Collections.singletonList(outputNode))));
+ GroupInfo groupInfo = new GroupInfo("1",
Collections.singletonList(streamInfo));
+ FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv,
groupInfo);
+ ParseResult result = parser.parse();
+ Assert.assertTrue(result.tryExecute());
+ }
+
+ /**
+ * Test flink sql task for extract is mysql {@link MySqlExtractNode} and
load is mysql {@link MySqlLoadNode}
+ *
+ * @throws Exception The exception may be thrown when executing
+ */
+ @Test
+ public void testMySqlLoadStartupFromSpecificGtidSqlParse() throws
Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(10000);
+ env.disableOperatorChaining();
+ EnvironmentSettings settings = EnvironmentSettings
+ .newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build();
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
+ Node inputNode = buildMySQLExtractNodeWithSpecificGtid();
+ Node outputNode = buildMysqlLoadNode();
+ StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode,
outputNode),
+
Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+ Collections.singletonList(outputNode))));
+ GroupInfo groupInfo = new GroupInfo("1",
Collections.singletonList(streamInfo));
+ FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv,
groupInfo);
+ ParseResult result = parser.parse();
+ Assert.assertTrue(result.tryExecute());
+ }
+
}
diff --git
a/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/assigner/SnapshotSplitAssigner.java
b/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/assigner/SnapshotSplitAssigner.java
index a151b5c42e..0344a35824 100644
---
a/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/assigner/SnapshotSplitAssigner.java
+++
b/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/assigner/SnapshotSplitAssigner.java
@@ -52,8 +52,7 @@ import java.util.stream.Collectors;
* */
public class SnapshotSplitAssigner<C extends SourceConfig> implements
SplitAssigner {
- private static final Logger LOG = LoggerFactory.getLogger(
-
com.ververica.cdc.connectors.base.source.assigner.SnapshotSplitAssigner.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(SnapshotSplitAssigner.class);
private final List<TableId> alreadyProcessedTables;
private final List<SchemalessSnapshotSplit> remainingSplits;
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/MySqlSource.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/MySqlSource.java
index 4f4de822ef..3576246b9a 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/MySqlSource.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/MySqlSource.java
@@ -248,8 +248,8 @@ public class MySqlSource {
specificOffset.setSourcePartition(sourcePartition);
Map<String, Object> sourceOffset = new HashMap<>();
- sourceOffset.put("file",
startupOptions.specificOffsetFile);
- sourceOffset.put("pos", startupOptions.specificOffsetPos);
+ sourceOffset.put("file",
startupOptions.binlogOffset.getFilename());
+ sourceOffset.put("pos",
startupOptions.binlogOffset.getPosition());
specificOffset.setSourceOffset(sourceOffset);
break;
@@ -258,7 +258,8 @@ public class MySqlSource {
props.setProperty("snapshot.mode", "never");
deserializer =
new SeekBinlogToTimestampFilter<>(
- startupOptions.startupTimestampMillis,
deserializer);
+
startupOptions.binlogOffset.getTimestampSec() * 1000,
+ deserializer);
break;
default:
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/DebeziumUtils.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/DebeziumUtils.java
index d487120036..547018f07e 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/DebeziumUtils.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/DebeziumUtils.java
@@ -129,8 +129,10 @@ public class DebeziumUtils {
final long binlogPosition = rs.getLong(2);
final String gtidSet =
rs.getMetaData().getColumnCount() > 4 ?
rs.getString(5) : null;
- return new BinlogOffset(
- binlogFilename, binlogPosition, 0L, 0, 0,
gtidSet, null);
+ return BinlogOffset.builder()
+ .setBinlogFilePosition(binlogFilename,
binlogPosition)
+ .setGtidSet(gtidSet)
+ .build();
} else {
throw new FlinkRuntimeException(
"Cannot read the binlog filename and
position via '"
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java
index eee97a0fcb..b3714f4f72 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java
@@ -20,12 +20,15 @@ package org.apache.inlong.sort.cdc.mysql.debezium.reader;
import org.apache.inlong.sort.cdc.mysql.debezium.task.MySqlBinlogSplitReadTask;
import
org.apache.inlong.sort.cdc.mysql.debezium.task.context.StatefulTaskContext;
import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffset;
+import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffsetKind;
import org.apache.inlong.sort.cdc.mysql.source.split.FinishedSnapshotSplitInfo;
import org.apache.inlong.sort.cdc.mysql.source.split.MySqlBinlogSplit;
import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSplit;
import org.apache.inlong.sort.cdc.mysql.source.utils.ChunkUtils;
import org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils;
+import com.github.shyiko.mysql.binlog.event.Event;
+import com.github.shyiko.mysql.binlog.event.EventType;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.connector.mysql.MySqlStreamingChangeEventSourceMetrics;
@@ -50,6 +53,7 @@ import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
+import java.util.function.Predicate;
import static
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getBinlogPosition;
import static
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getSplitInfoByBinarySearch;
@@ -107,7 +111,8 @@ public class BinlogSplitReader implements
DebeziumReader<SourceRecord, MySqlSpli
(MySqlStreamingChangeEventSourceMetrics)
statefulTaskContext
.getStreamingChangeEventSourceMetrics(),
statefulTaskContext.getTopicSelector().getPrimaryTopic(),
- currentBinlogSplit);
+ currentBinlogSplit,
+
createEventFilter(currentBinlogSplit.getStartingOffset()));
executor.submit(
() -> {
@@ -247,7 +252,7 @@ public class BinlogSplitReader implements
DebeziumReader<SourceRecord, MySqlSpli
currentBinlogSplit.getFinishedSnapshotSplitInfos();
Map<TableId, List<FinishedSnapshotSplitInfo>> splitsInfoMap = new
HashMap<>();
Map<TableId, BinlogOffset> tableIdBinlogPositionMap = new HashMap<>();
- // latest-offset mode
+ // specific offset mode
if (finishedSplitInfos.isEmpty()) {
for (TableId tableId :
currentBinlogSplit.getTableSchemas().keySet()) {
tableIdBinlogPositionMap.put(tableId,
currentBinlogSplit.getStartingOffset());
@@ -271,6 +276,20 @@ public class BinlogSplitReader implements
DebeziumReader<SourceRecord, MySqlSpli
this.maxSplitHighWatermarkMap = tableIdBinlogPositionMap;
}
+ private Predicate<Event> createEventFilter(BinlogOffset startingOffset) {
+ // If the startup mode is set as TIMESTAMP, we need to apply a filter
on event to drop
+ // events earlier than the specified timestamp.
+ if (BinlogOffsetKind.TIMESTAMP.equals(startingOffset.getOffsetKind()))
{
+ long startTimestampSec = startingOffset.getTimestampSec();
+ // Notes:
+ // 1. Heartbeat event doesn't contain timestamp, so we just keep it
+ // 2. Timestamp of event is in epoch millisecond
+ return event ->
EventType.HEARTBEAT.equals(event.getHeader().getEventType())
+ || event.getHeader().getTimestamp() >= startTimestampSec *
1000;
+ }
+ return event -> true;
+ }
+
public void stopBinlogReadTask() {
this.currentTaskRunning = false;
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/SnapshotSplitReader.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/SnapshotSplitReader.java
index 9110bb53d6..c1200deac1 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/SnapshotSplitReader.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/SnapshotSplitReader.java
@@ -190,7 +190,8 @@ public class SnapshotSplitReader implements
DebeziumReader<SourceRecord, MySqlSp
statefulTaskContext.getTaskContext(),
(MySqlStreamingChangeEventSourceMetrics)
statefulTaskContext.getStreamingChangeEventSourceMetrics(),
statefulTaskContext.getTopicSelector().getPrimaryTopic(),
- backfillBinlogSplit);
+ backfillBinlogSplit,
+ event -> true);
}
private void dispatchBinlogEndEvent(MySqlBinlogSplit backFillBinlogSplit)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlBinlogSplitReadTask.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlBinlogSplitReadTask.java
index 1068cb0de9..63bbd402de 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlBinlogSplitReadTask.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlBinlogSplitReadTask.java
@@ -40,8 +40,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
+import java.util.function.Predicate;
-import static
org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffset.NO_STOPPING_OFFSET;
+import static
org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffsetUtils.isNonStoppingOffset;
import static
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getBinlogPosition;
/**
@@ -56,6 +57,7 @@ public class MySqlBinlogSplitReadTask extends
MySqlStreamingChangeEventSource {
private final EventDispatcherImpl<TableId> eventDispatcher;
private final SignalEventDispatcher signalEventDispatcher;
private final ErrorHandler errorHandler;
+ private final Predicate<Event> eventFilter;
private ChangeEventSourceContext context;
private final MySqlTaskContext taskContext;
@@ -72,7 +74,8 @@ public class MySqlBinlogSplitReadTask extends
MySqlStreamingChangeEventSource {
MySqlTaskContext taskContext,
MySqlStreamingChangeEventSourceMetrics metrics,
String topic,
- MySqlBinlogSplit binlogSplit) {
+ MySqlBinlogSplit binlogSplit,
+ Predicate<Event> eventFilter) {
super(
connectorConfig,
offsetContext,
@@ -91,6 +94,7 @@ public class MySqlBinlogSplitReadTask extends
MySqlStreamingChangeEventSource {
this.signalEventDispatcher =
new SignalEventDispatcher(
offsetContext.getPartition(), topic,
eventDispatcher.getQueue());
+ this.eventFilter = eventFilter;
}
/**
@@ -124,6 +128,9 @@ public class MySqlBinlogSplitReadTask extends
MySqlStreamingChangeEventSource {
@Override
protected void handleEvent(Event event) {
+ if (!eventFilter.test(event)) {
+ return;
+ }
super.handleEvent(event);
// check do we need to stop for read binlog for snapshot split.
if (isBoundedRead()) {
@@ -148,6 +155,6 @@ public class MySqlBinlogSplitReadTask extends
MySqlStreamingChangeEventSource {
}
private boolean isBoundedRead() {
- return !NO_STOPPING_OFFSET.equals(binlogSplit.getEndingOffset());
+ return !isNonStoppingOffset(binlogSplit.getEndingOffset());
}
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/MySqlErrorHandler.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/MySqlErrorHandler.java
index 090dfe3176..d2e538a476 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/MySqlErrorHandler.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/MySqlErrorHandler.java
@@ -17,12 +17,18 @@
package org.apache.inlong.sort.cdc.mysql.debezium.task.context;
+import
org.apache.inlong.sort.cdc.mysql.debezium.task.context.exception.SchemaOutOfSyncException;
+import org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceConfig;
+import org.apache.inlong.sort.cdc.mysql.table.StartupMode;
+
import io.debezium.DebeziumException;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.connector.mysql.MySqlTaskContext;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.relational.TableId;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,11 +47,16 @@ public class MySqlErrorHandler extends ErrorHandler {
"Encountered change event for table (.+)\\.(.+) whose
schema isn't known to this connector");
MySqlTaskContext context;
+ private final MySqlSourceConfig sourceConfig;
public MySqlErrorHandler(
- String logicalName, ChangeEventQueue<?> queue, MySqlTaskContext
context) {
+ String logicalName,
+ ChangeEventQueue<?> queue,
+ MySqlTaskContext context,
+ MySqlSourceConfig sourceConfig) {
super(MySqlConnector.class, logicalName, queue);
this.context = context;
+ this.sourceConfig = sourceConfig;
}
@Override
@@ -55,20 +66,56 @@ public class MySqlErrorHandler extends ErrorHandler {
@Override
public void setProducerThrowable(Throwable producerThrowable) {
- if (producerThrowable.getCause() instanceof DebeziumException) {
- DebeziumException e = (DebeziumException)
producerThrowable.getCause();
- String detailMessage = e.getMessage();
- Matcher matcher =
NOT_FOUND_TABLE_MSG_PATTERN.matcher(detailMessage);
- if (matcher.find()) {
- String databaseName = matcher.group(1);
- String tableName = matcher.group(2);
- TableId tableId = new TableId(databaseName, null, tableName);
- if (context.getSchema().schemaFor(tableId) == null) {
- LOG.warn("Schema for table " + tableId + " is null");
- return;
- }
+ if (isTableNotFoundException(producerThrowable)) {
+ Matcher matcher =
+
NOT_FOUND_TABLE_MSG_PATTERN.matcher(producerThrowable.getCause().getMessage());
+ String databaseName = matcher.group(1);
+ String tableName = matcher.group(2);
+ TableId tableId = new TableId(databaseName, null, tableName);
+ if (context.getSchema().schemaFor(tableId) == null) {
+ LOG.warn("Schema for table " + tableId + " is null");
+ return;
}
}
+
+ if (isSchemaOutOfSyncException(producerThrowable)) {
+ super.setProducerThrowable(
+ new SchemaOutOfSyncException(
+ "Internal schema representation is probably out of
sync with real database schema. "
+ + "The reason could be that the table
schema was changed after the starting "
+ + "binlog offset, which is not supported
when startup mode is set to "
+ +
sourceConfig.getStartupOptions().startupMode,
+ producerThrowable));
+ return;
+ }
+
super.setProducerThrowable(producerThrowable);
}
+
+ private boolean isTableNotFoundException(Throwable t) {
+ if (!(t.getCause() instanceof DebeziumException)) {
+ return false;
+ }
+ DebeziumException e = (DebeziumException) t.getCause();
+ String detailMessage = e.getMessage();
+ Matcher matcher = NOT_FOUND_TABLE_MSG_PATTERN.matcher(detailMessage);
+ return matcher.find();
+ }
+
+ private boolean isSchemaOutOfSyncException(Throwable t) {
+ Throwable rootCause = ExceptionUtils.getRootCause(t);
+ return rootCause instanceof ConnectException
+ && rootCause
+ .getMessage()
+ .endsWith(
+ "internal schema representation is probably
out of sync with real database schema")
+ && isSettingStartingOffset();
+ }
+
+ private boolean isSettingStartingOffset() {
+ StartupMode startupMode = sourceConfig.getStartupOptions().startupMode;
+ return startupMode == StartupMode.EARLIEST_OFFSET
+ || startupMode == StartupMode.TIMESTAMP
+ || startupMode == StartupMode.SPECIFIC_OFFSETS;
+ }
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/StatefulTaskContext.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/StatefulTaskContext.java
index 2464756d9c..7082845a56 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/StatefulTaskContext.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/StatefulTaskContext.java
@@ -57,6 +57,7 @@ import java.util.List;
import java.util.Map;
import static
org.apache.inlong.sort.cdc.mysql.debezium.task.context.StatefulTaskContext.MySqlEventMetadataProvider.BINLOG_FILENAME_OFFSET_KEY;
+import static
org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffsetUtils.initializeEffectiveOffset;
/**
* A stateful task context that contains entries the debezium mysql connector
task required.
@@ -160,7 +161,7 @@ public class StatefulTaskContext {
changeEventSourceMetricsFactory.getStreamingMetrics(
taskContext, queue, metadataProvider);
this.errorHandler =
- new MySqlErrorHandler(connectorConfig.getLogicalName(), queue,
taskContext);
+ new MySqlErrorHandler(connectorConfig.getLogicalName(), queue,
taskContext, sourceConfig);
}
private void validateAndLoadDatabaseHistory(
@@ -176,8 +177,11 @@ public class StatefulTaskContext {
OffsetContext.Loader loader, MySqlSplit mySqlSplit) {
BinlogOffset offset =
mySqlSplit.isSnapshotSplit()
- ? BinlogOffset.INITIAL_OFFSET
- : mySqlSplit.asBinlogSplit().getStartingOffset();
+ ? BinlogOffset.ofEarliest()
+ : initializeEffectiveOffset(
+
mySqlSplit.asBinlogSplit().getStartingOffset(), connection);
+
+ LOG.info("Starting offset is initialized to {}", offset);
MySqlOffsetContext mySqlOffsetContext =
(MySqlOffsetContext) loader.load(offset.getOffset());
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/exception/SchemaOutOfSyncException.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/exception/SchemaOutOfSyncException.java
new file mode 100644
index 0000000000..edfb7e1082
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/exception/SchemaOutOfSyncException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.mysql.debezium.task.context.exception;
+
+/**
+ * A wrapper class for clearly show the possible reason of a
schema-out-of-sync exception thrown
+ * inside Debezium.
+ */
+public class SchemaOutOfSyncException extends Exception {
+
+ public SchemaOutOfSyncException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlBinlogSplitAssigner.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlBinlogSplitAssigner.java
index 0123e3f2a8..68b375d301 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlBinlogSplitAssigner.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlBinlogSplitAssigner.java
@@ -29,13 +29,10 @@ import
org.apache.inlong.sort.cdc.mysql.source.split.MySqlBinlogSplit;
import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSplit;
import io.debezium.connector.mysql.MySqlConnection;
-import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.RelationalTableFilters;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges.TableChange;
import org.apache.flink.util.FlinkRuntimeException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.ArrayList;
@@ -47,14 +44,12 @@ import java.util.Map;
import java.util.Optional;
import static
com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils.listTables;
-import static
org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils.currentBinlogOffset;
/**
* A {@link MySqlSplitAssigner} which only read binlog from current binlog
position.
*/
public class MySqlBinlogSplitAssigner implements MySqlSplitAssigner {
- private static final Logger LOG =
LoggerFactory.getLogger(MySqlBinlogSplitAssigner.class);
private static final String BINLOG_SPLIT_ID = "binlog-split";
private final MySqlSourceConfig sourceConfig;
@@ -149,17 +144,13 @@ public class MySqlBinlogSplitAssigner implements
MySqlSplitAssigner {
private MySqlBinlogSplit createBinlogSplit() {
Map<TableId, TableChange> tableSchemas =
discoverCapturedTableSchemas();
- try (JdbcConnection jdbc =
DebeziumUtils.openJdbcConnection(sourceConfig)) {
- return new MySqlBinlogSplit(
- BINLOG_SPLIT_ID,
- currentBinlogOffset(jdbc),
- BinlogOffset.NO_STOPPING_OFFSET,
- new ArrayList<>(),
- tableSchemas,
- 0);
- } catch (Exception e) {
- throw new FlinkRuntimeException("Read the binlog offset error", e);
- }
+ return new MySqlBinlogSplit(
+ BINLOG_SPLIT_ID,
+ sourceConfig.getStartupOptions().binlogOffset,
+ BinlogOffset.ofNonStopping(),
+ new ArrayList<>(),
+ tableSchemas,
+ 0);
}
private Map<TableId, TableChange> discoverCapturedTableSchemas() {
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlHybridSplitAssigner.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlHybridSplitAssigner.java
index 5b6fbaff5e..5576786e60 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlHybridSplitAssigner.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlHybridSplitAssigner.java
@@ -218,8 +218,8 @@ public class MySqlHybridSplitAssigner implements
MySqlSplitAssigner {
boolean divideMetaToGroups = finishedSnapshotSplitInfos.size() >
splitMetaGroupSize;
return new MySqlBinlogSplit(
BINLOG_SPLIT_ID,
- minBinlogOffset == null ? BinlogOffset.INITIAL_OFFSET :
minBinlogOffset,
- BinlogOffset.NO_STOPPING_OFFSET,
+ minBinlogOffset == null ? BinlogOffset.ofEarliest() :
minBinlogOffset,
+ BinlogOffset.ofNonStopping(),
divideMetaToGroups ? new ArrayList<>() :
finishedSnapshotSplitInfos,
new HashMap<>(),
finishedSnapshotSplitInfos.size());
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java
index 17e751b409..db9475ebcb 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java
@@ -282,14 +282,6 @@ public class MySqlSourceConfigFactory implements
Serializable {
* Specifies the startup options.
*/
public MySqlSourceConfigFactory startupOptions(StartupOptions
startupOptions) {
- switch (startupOptions.startupMode) {
- case INITIAL:
- case LATEST_OFFSET:
- break;
- default:
- throw new UnsupportedOperationException(
- "Unsupported startup mode: " +
startupOptions.startupMode);
- }
this.startupOptions = startupOptions;
return this;
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
index e7be00ec6c..8faabdca41 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
@@ -162,9 +162,9 @@ public class MySqlSourceOptions {
.withDescription(
"Optional offsets used in case of
\"specific-offset\" startup mode");
- public static final ConfigOption<Integer> SCAN_STARTUP_SPECIFIC_OFFSET_POS
=
+ public static final ConfigOption<Long> SCAN_STARTUP_SPECIFIC_OFFSET_POS =
ConfigOptions.key("scan.startup.specific-offset.pos")
- .intType()
+ .longType()
.noDefaultValue()
.withDescription(
"Optional offsets used in case of
\"specific-offset\" startup mode");
@@ -269,4 +269,24 @@ public class MySqlSourceOptions {
.defaultValue(false)
.withDescription(
"Whether capture the scan the newly added tables
or not, by default is false.");
+
+ public static final ConfigOption<String>
SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET =
+ ConfigOptions.key("scan.startup.specific-offset.gtid-set")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Optional GTID set used in case of
\"specific-offset\" startup mode");
+
+ public static final ConfigOption<Long>
SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS =
+ ConfigOptions.key("scan.startup.specific-offset.skip-events")
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ "Optional number of events to skip after the
specific starting offset");
+
+ public static final ConfigOption<Long>
SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS =
+ ConfigOptions.key("scan.startup.specific-offset.skip-rows")
+ .longType()
+ .noDefaultValue()
+ .withDescription("Optional number of rows to skip after
the specific offset");
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffset.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffset.java
index 20c7e60ec6..04decfb9ba 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffset.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffset.java
@@ -19,15 +19,21 @@ package org.apache.inlong.sort.cdc.mysql.source.offset;
import io.debezium.connector.mysql.GtidSet;
import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.kafka.connect.errors.ConnectException;
import javax.annotation.Nullable;
import java.io.Serializable;
-import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import static
org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffsetKind.EARLIEST;
+import static
org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffsetKind.LATEST;
+import static
org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffsetKind.NON_STOPPING;
+import static
org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffsetKind.TIMESTAMP;
+
/**
* A structure describes a fine grained offset in a binlog event including
binlog position and gtid
* set etc.
@@ -48,43 +54,54 @@ public class BinlogOffset implements
Comparable<BinlogOffset>, Serializable {
public static final String GTID_SET_KEY = "gtids";
public static final String TIMESTAMP_KEY = "ts_sec";
public static final String SERVER_ID_KEY = "server_id";
-
- public static final BinlogOffset INITIAL_OFFSET = new BinlogOffset("", 0);
- public static final BinlogOffset NO_STOPPING_OFFSET = new BinlogOffset("",
Long.MIN_VALUE);
+ public static final String OFFSET_KIND_KEY = "kind";
private final Map<String, String> offset;
- public BinlogOffset(Map<String, String> offset) {
- this.offset = offset;
+ // ------------------------------- Builders
--------------------------------
+
+ /** Create a {@link BinlogOffsetBuilder}. */
+ public static BinlogOffsetBuilder builder() {
+ return new BinlogOffsetBuilder();
}
- public BinlogOffset(String filename, long position) {
- this(filename, position, 0L, 0L, 0L, null, null);
+ /** Create offset from binlog filename and position. */
+ public static BinlogOffset ofBinlogFilePosition(String filename, long
position) {
+ return builder().setBinlogFilePosition(filename, position).build();
}
- public BinlogOffset(
- String filename,
- long position,
- long restartSkipEvents,
- long restartSkipRows,
- long binlogEpochSecs,
- @Nullable String restartGtidSet,
- @Nullable Integer serverId) {
- Map<String, String> offsetMap = new HashMap<>();
- offsetMap.put(BINLOG_FILENAME_OFFSET_KEY, filename);
- offsetMap.put(BINLOG_POSITION_OFFSET_KEY, String.valueOf(position));
- offsetMap.put(EVENTS_TO_SKIP_OFFSET_KEY,
String.valueOf(restartSkipEvents));
- offsetMap.put(ROWS_TO_SKIP_OFFSET_KEY,
String.valueOf(restartSkipRows));
- offsetMap.put(TIMESTAMP_KEY, String.valueOf(binlogEpochSecs));
- if (restartGtidSet != null) {
- offsetMap.put(GTID_SET_KEY, restartGtidSet);
- }
- if (serverId != null) {
- offsetMap.put(SERVER_ID_KEY, String.valueOf(serverId));
- }
- this.offset = offsetMap;
+ /** Create offset from GTID set. */
+ public static BinlogOffset ofGtidSet(String gtidSet) {
+ return builder().setBinlogFilePosition("",
0).setGtidSet(gtidSet).build();
+ }
+
+ /** Create offset which represents the earliest accessible binlog offset.
*/
+ public static BinlogOffset ofEarliest() {
+ return builder().setOffsetKind(EARLIEST).build();
}
+ /** Create offset which represents the latest offset at the point of
access. */
+ public static BinlogOffset ofLatest() {
+ return builder().setOffsetKind(LATEST).build();
+ }
+
+ /** Create offset specified by a timestamp in second. */
+ public static BinlogOffset ofTimestampSec(long timestampSec) {
+ return
builder().setOffsetKind(TIMESTAMP).setTimestampSec(timestampSec).build();
+ }
+
+ @Internal
+ public static BinlogOffset ofNonStopping() {
+ return builder().setOffsetKind(NON_STOPPING).build();
+ }
+
+ @VisibleForTesting
+ public BinlogOffset(Map<String, String> offset) {
+ this.offset = offset;
+ }
+
+ // ------------------------------ Field getters
-----------------------------
+
public Map<String, String> getOffset() {
return offset;
}
@@ -109,7 +126,7 @@ public class BinlogOffset implements
Comparable<BinlogOffset>, Serializable {
return offset.get(GTID_SET_KEY);
}
- public long getTimestamp() {
+ public long getTimestampSec() {
return longOffsetValue(offset, TIMESTAMP_KEY);
}
@@ -117,6 +134,14 @@ public class BinlogOffset implements
Comparable<BinlogOffset>, Serializable {
return longOffsetValue(offset, SERVER_ID_KEY);
}
+ @Nullable
+ public BinlogOffsetKind getOffsetKind() {
+ if (offset.get(OFFSET_KIND_KEY) == null) {
+ return null;
+ }
+ return BinlogOffsetKind.valueOf(offset.get(OFFSET_KIND_KEY));
+ }
+
private long longOffsetValue(Map<String, ?> values, String key) {
Object obj = values.get(key);
if (obj == null) {
@@ -142,14 +167,14 @@ public class BinlogOffset implements
Comparable<BinlogOffset>, Serializable {
*/
@Override
public int compareTo(BinlogOffset that) {
- // the NO_STOPPING_OFFSET is the max offset
- if (NO_STOPPING_OFFSET.equals(that) &&
NO_STOPPING_OFFSET.equals(this)) {
+ // the NON_STOPPING is the max offset
+ if (that.getOffsetKind() == NON_STOPPING && this.getOffsetKind() ==
NON_STOPPING) {
return 0;
}
- if (NO_STOPPING_OFFSET.equals(this)) {
+ if (this.getOffsetKind() == NON_STOPPING) {
return 1;
}
- if (NO_STOPPING_OFFSET.equals(that)) {
+ if (that.getOffsetKind() == NON_STOPPING) {
return -1;
}
@@ -201,8 +226,8 @@ public class BinlogOffset implements
Comparable<BinlogOffset>, Serializable {
// the only thing we can do
// is compare timestamps, and we have to assume that the server
timestamps can be
// compared ...
- long timestamp = this.getTimestamp();
- long targetTimestamp = that.getTimestamp();
+ long timestamp = this.getTimestampSec();
+ long targetTimestamp = that.getTimestampSec();
return Long.compare(timestamp, targetTimestamp);
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffsetBuilder.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffsetBuilder.java
new file mode 100644
index 0000000000..4b00e5e2f5
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffsetBuilder.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.mysql.source.offset;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static
org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffsetKind.EARLIEST;
+import static
org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffsetKind.NON_STOPPING;
+
+/** Builder for {@link BinlogOffset}. */
+public class BinlogOffsetBuilder {
+
+ private final Map<String, String> offsetMap = new HashMap<>();
+
+ public BinlogOffsetBuilder() {
+ // Initialize default values
+ offsetMap.put(BinlogOffset.EVENTS_TO_SKIP_OFFSET_KEY, "0");
+ offsetMap.put(BinlogOffset.ROWS_TO_SKIP_OFFSET_KEY, "0");
+ offsetMap.put(BinlogOffset.TIMESTAMP_KEY, "0");
+ offsetMap.put(BinlogOffset.OFFSET_KIND_KEY,
String.valueOf(BinlogOffsetKind.SPECIFIC));
+ }
+
+ public BinlogOffsetBuilder setBinlogFilePosition(String filename, long
position) {
+ offsetMap.put(BinlogOffset.BINLOG_FILENAME_OFFSET_KEY, filename);
+ offsetMap.put(BinlogOffset.BINLOG_POSITION_OFFSET_KEY,
String.valueOf(position));
+ return this;
+ }
+
+ public BinlogOffsetBuilder setSkipEvents(long skipEvents) {
+ offsetMap.put(BinlogOffset.EVENTS_TO_SKIP_OFFSET_KEY,
String.valueOf(skipEvents));
+ return this;
+ }
+
+ public BinlogOffsetBuilder setSkipRows(long skipRows) {
+ offsetMap.put(BinlogOffset.ROWS_TO_SKIP_OFFSET_KEY,
String.valueOf(skipRows));
+ return this;
+ }
+
+ public BinlogOffsetBuilder setTimestampSec(long timestampSec) {
+ offsetMap.put(BinlogOffset.TIMESTAMP_KEY,
String.valueOf(timestampSec));
+ return this;
+ }
+
+ public BinlogOffsetBuilder setGtidSet(String gtidSet) {
+ offsetMap.put(BinlogOffset.GTID_SET_KEY, gtidSet);
+ return this;
+ }
+
+ public BinlogOffsetBuilder setServerId(long serverId) {
+ offsetMap.put(BinlogOffset.SERVER_ID_KEY, String.valueOf(serverId));
+ return this;
+ }
+
+ public BinlogOffsetBuilder setOffsetKind(BinlogOffsetKind offsetKind) {
+ offsetMap.put(BinlogOffset.OFFSET_KIND_KEY,
String.valueOf(offsetKind));
+ if (offsetKind == EARLIEST) {
+ setBinlogFilePosition("", 0);
+ }
+ if (offsetKind == NON_STOPPING) {
+ setBinlogFilePosition("", Long.MIN_VALUE);
+ }
+ return this;
+ }
+
+ public BinlogOffsetBuilder setOffsetMap(Map<String, String> offsetMap) {
+ this.offsetMap.putAll(offsetMap);
+ return this;
+ }
+
+ public BinlogOffset build() {
+ sanityCheck();
+ return new BinlogOffset(offsetMap);
+ }
+
+ private void sanityCheck() {
+ checkArgument(
+ offsetMap.containsKey(BinlogOffset.OFFSET_KIND_KEY),
+ "Binlog offset kind is required");
+ BinlogOffsetKind offsetKind =
+
BinlogOffsetKind.valueOf(offsetMap.get(BinlogOffset.OFFSET_KIND_KEY));
+ switch (offsetKind) {
+ case SPECIFIC:
+ checkArgument(
+
offsetMap.containsKey(BinlogOffset.BINLOG_FILENAME_OFFSET_KEY)
+ ||
offsetMap.containsKey(BinlogOffset.BINLOG_POSITION_OFFSET_KEY)
+ ||
offsetMap.containsKey(BinlogOffset.GTID_SET_KEY),
+ "Either binlog file / position or GTID set is required
if offset kind is SPECIFIC");
+ break;
+ case TIMESTAMP:
+
checkArgument(offsetMap.containsKey(BinlogOffset.TIMESTAMP_KEY));
+ break;
+ case EARLIEST:
+ case LATEST:
+ case NON_STOPPING:
+ break;
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unrecognized offset kind \"%s\"",
offsetKind));
+ }
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffsetKind.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffsetKind.java
new file mode 100644
index 0000000000..9eaeb8749f
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffsetKind.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.mysql.source.offset;
+
+/**
+ * Predefined kind of binlog offset.
+ *
+ * <p>Binlog offset kind describes some special binlog offsets such as
earliest accessible binlog,
+ * the latest binlog offset currently, and a specific offset described by
binlog file + position or
+ * GTID set.
+ */
+public enum BinlogOffsetKind {
+
+ /** The earliest accessible offset in the binlog. */
+ EARLIEST,
+
+ /** The latest offset in the binlog. */
+ LATEST,
+
+ /**
+ * The binlog offset is described by a timestamp, for example "1667232000"
means start reading
+ * from events happens at 2022-11-01 00:00:00.
+ */
+ TIMESTAMP,
+
+ /**
+ * A specific offset described by binlog file name and position, or GTID
set if GTID is enabled
+ * on the cluster. For example:
+ *
+ * <ul>
+ * <li>Binlog file "mysql-bin.000002" and position 4
+ * <li>GTID set "24DA167-0C0C-11E8-8442-00059A3C7B00:1-19"
+ * </ul>
+ */
+ SPECIFIC,
+
+ /**
+ * A special offset indicating there's no ending offsets for the binlog
reader.
+ *
+ * <p>Please note that this is an INTERNAL kind and should not be used for
specifying starting
+ * offset.
+ */
+ NON_STOPPING
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffsetUtils.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffsetUtils.java
new file mode 100644
index 0000000000..5fb0d47856
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffsetUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.mysql.source.offset;
+
+import org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils;
+import
org.apache.inlong.sort.cdc.mysql.debezium.task.context.StatefulTaskContext;
+
+import io.debezium.connector.mysql.MySqlConnection;
+
+/** Utils for handling {@link BinlogOffset}. */
+public class BinlogOffsetUtils {
+
+ /**
+ * Initialize the binlog offset according to the kind of binlog offset, so
that the debezium
+ * reader could interpret it and seek the reader to the offset.
+ *
+ * <p>This method will be used in binlog reading phase, when the {@link
StatefulTaskContext} is
+ * being initialized to load the actual effective binlog offset.
+ *
+ * <p>The binlog offset kind will be overridden to {@link
BinlogOffsetKind#SPECIFIC} after the
+ * initialization, as the initialized effective offset describes a
specific position in binlog.
+ *
+ * <p>Initialization strategy:
+ *
+ * <ul>
+ * <li>EARLIEST: binlog filename = "", position = 0
+ * <li>TIMESTAMP: set to earliest, as the current implementation is
reading from the earliest
+ * offset and drop events earlier than the specified timestamp.
+ * <li>LATEST: fetch the current binlog by JDBC
+ * </ul>
+ */
+ public static BinlogOffset initializeEffectiveOffset(
+ BinlogOffset offset, MySqlConnection connection) {
+ BinlogOffsetKind offsetKind = offset.getOffsetKind();
+ switch (offsetKind) {
+ case EARLIEST:
+ case TIMESTAMP:
+ return BinlogOffset.ofBinlogFilePosition("", 0);
+ case LATEST:
+ return DebeziumUtils.currentBinlogOffset(connection);
+ default:
+ return offset;
+ }
+ }
+
+ public static boolean isNonStoppingOffset(BinlogOffset binlogOffset) {
+ return
BinlogOffsetKind.NON_STOPPING.equals(binlogOffset.getOffsetKind());
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
index 91a3c96841..2337cfc230 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
@@ -152,20 +152,13 @@ public class MySqlSourceReader<T>
if (suspendedBinlogSplit != null) {
unfinishedSplits.add(suspendedBinlogSplit);
}
- SourceTableMetricData sourceMetricData =
sourceReaderMetrics.getSourceMetricData();
- LOG.info("inlong-metric-states snapshot sourceMetricData:{}",
sourceMetricData);
- if (sourceMetricData != null) {
- long countNumBytesIn = sourceMetricData.getNumBytesIn().getCount();
- long countNumRecordsIn =
sourceMetricData.getNumRecordsIn().getCount();
- Map<String, Long> readPhaseMetricMap =
sourceMetricData.getReadPhaseMetricMap().entrySet().stream().collect(
- Collectors.toMap(v -> v.getKey().getPhase(), e ->
e.getValue().getReadPhase().getCount()));
- Map<String, MySqlTableMetric> tableMetricMap =
sourceMetricData.getSubSourceMetricMap().entrySet().stream()
- .collect(Collectors.toMap(Entry::getKey,
- e -> new
MySqlTableMetric(e.getValue().getNumRecordsIn().getCount(),
- e.getValue().getNumBytesIn().getCount())));
- unfinishedSplits
- .add(new MySqlMetricSplit(countNumBytesIn,
countNumRecordsIn, readPhaseMetricMap, tableMetricMap));
- }
+
+ // add mysql metric split
+ addMySqlMetricSplit(unfinishedSplits);
+
+ // log current binlog offsets
+ logCurrentBinlogOffsets(unfinishedSplits, checkpointId);
+
return unfinishedSplits;
}
@@ -390,6 +383,36 @@ public class MySqlSourceReader<T>
}
}
+ private void addMySqlMetricSplit(List<MySqlSplit> unfinishedSplits) {
+ SourceTableMetricData sourceMetricData =
sourceReaderMetrics.getSourceMetricData();
+ LOG.info("inlong-metric-states snapshot sourceMetricData:{}",
sourceMetricData);
+ if (sourceMetricData != null) {
+ long countNumBytesIn = sourceMetricData.getNumBytesIn().getCount();
+ long countNumRecordsIn =
sourceMetricData.getNumRecordsIn().getCount();
+ Map<String, Long> readPhaseMetricMap =
sourceMetricData.getReadPhaseMetricMap().entrySet().stream().collect(
+ Collectors.toMap(v -> v.getKey().getPhase(), e ->
e.getValue().getReadPhase().getCount()));
+ Map<String, MySqlTableMetric> tableMetricMap =
sourceMetricData.getSubSourceMetricMap().entrySet().stream()
+ .collect(Collectors.toMap(Entry::getKey,
+ e -> new
MySqlTableMetric(e.getValue().getNumRecordsIn().getCount(),
+ e.getValue().getNumBytesIn().getCount())));
+ unfinishedSplits
+ .add(new MySqlMetricSplit(countNumBytesIn,
countNumRecordsIn, readPhaseMetricMap, tableMetricMap));
+ }
+ }
+
+ private void logCurrentBinlogOffsets(List<MySqlSplit> splits, long
checkpointId) {
+ if (!LOG.isInfoEnabled()) {
+ return;
+ }
+ for (MySqlSplit split : splits) {
+ if (!split.isBinlogSplit()) {
+ return;
+ }
+ BinlogOffset offset = split.asBinlogSplit().getStartingOffset();
+ LOG.info("Binlog offset on checkpoint {}: {}", checkpointId,
offset);
+ }
+ }
+
@Override
protected MySqlSplit toSplitType(String splitId, MySqlSplitState
splitState) {
return splitState.toMySqlSplit();
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
index eaddb356d8..e3f32b6fa3 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
@@ -367,7 +367,7 @@ public class RecordUtils {
List<FinishedSnapshotSplitInfo> finishedSnapshotSplits) {
BinlogOffset startOffset =
finishedSnapshotSplits.isEmpty()
- ? BinlogOffset.INITIAL_OFFSET
+ ? BinlogOffset.ofEarliest()
: finishedSnapshotSplits.get(0).getHighWatermark();
for (FinishedSnapshotSplitInfo finishedSnapshotSplit :
finishedSnapshotSplits) {
if
(finishedSnapshotSplit.getHighWatermark().isBefore(startOffset)) {
@@ -410,7 +410,7 @@ public class RecordUtils {
offsetStrMap.put(
entry.getKey(), entry.getValue() == null ? null :
entry.getValue().toString());
}
- return new BinlogOffset(offsetStrMap);
+ return BinlogOffset.builder().setOffsetMap(offsetStrMap).build();
}
/**
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/SerializerUtils.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/SerializerUtils.java
index 95bab14cf3..4f7b85f6a2 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/SerializerUtils.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/SerializerUtils.java
@@ -18,10 +18,12 @@
package org.apache.inlong.sort.cdc.mysql.source.utils;
import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffset;
+import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffsetKind;
import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffsetSerializer;
import io.debezium.DebeziumException;
import io.debezium.util.HexConverter;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
@@ -53,7 +55,7 @@ public class SerializerUtils {
throws IOException {
switch (offsetVersion) {
case 1:
- return in.readBoolean() ? new BinlogOffset(in.readUTF(),
in.readLong()) : null;
+ return in.readBoolean() ?
BinlogOffset.ofBinlogFilePosition(in.readUTF(), in.readLong()) : null;
case 2:
case 3:
case 4:
@@ -69,7 +71,22 @@ public class SerializerUtils {
int binlogOffsetBytesLength = in.readInt();
byte[] binlogOffsetBytes = new byte[binlogOffsetBytesLength];
in.readFully(binlogOffsetBytes);
- return
BinlogOffsetSerializer.INSTANCE.deserialize(binlogOffsetBytes);
+ BinlogOffset offset =
BinlogOffsetSerializer.INSTANCE.deserialize(binlogOffsetBytes);
+ // Old version of binlog offset without offset kind
+ if (offset.getOffsetKind() == null) {
+ if (StringUtils.isEmpty(offset.getFilename()) &&
offset.getPosition() == Long.MIN_VALUE) {
+ return BinlogOffset.ofNonStopping();
+ }
+ if (StringUtils.isEmpty(offset.getFilename()) &&
offset.getPosition() == 0L) {
+ return BinlogOffset.ofEarliest();
+ }
+ // For other cases we treat it as a specific offset
+ return BinlogOffset.builder()
+ .setOffsetKind(BinlogOffsetKind.SPECIFIC)
+ .setOffsetMap(offset.getOffset())
+ .build();
+ }
+ return offset;
} else {
return null;
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
index 0ee5276479..0234e8e283 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
@@ -20,6 +20,8 @@ package org.apache.inlong.sort.cdc.mysql.table;
import org.apache.inlong.sort.cdc.base.debezium.table.DebeziumOptions;
import org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions;
import org.apache.inlong.sort.cdc.mysql.source.config.ServerIdRange;
+import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffset;
+import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffsetBuilder;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
@@ -28,11 +30,11 @@ import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.flink.util.Preconditions;
import java.time.Duration;
import java.time.ZoneId;
import java.util.HashSet;
+import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
@@ -63,7 +65,10 @@ import static
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.
import static
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.SCAN_STARTUP_MODE;
import static
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE;
+import static
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET;
import static
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS;
+import static
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS;
+import static
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS;
import static
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
import static
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.SERVER_ID;
import static
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.SERVER_TIME_ZONE;
@@ -96,17 +101,14 @@ public class MySqlTableInlongSourceFactory implements
DynamicTableSourceFactory
return StartupOptions.latest();
case SCAN_STARTUP_MODE_VALUE_EARLIEST:
+ return StartupOptions.earliest();
+
case SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET:
+ validateSpecificOffset(config);
+ return getSpecificOffset(config);
+
case SCAN_STARTUP_MODE_VALUE_TIMESTAMP:
- throw new ValidationException(
- String.format(
- "Unsupported option value '%s', the options
[%s, %s, %s] "
- + "are not supported correctly, "
- + "please do not use them until
they're correctly supported",
- modeString,
- SCAN_STARTUP_MODE_VALUE_EARLIEST,
- SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET,
- SCAN_STARTUP_MODE_VALUE_TIMESTAMP));
+ return
StartupOptions.timestamp(config.get(SCAN_STARTUP_TIMESTAMP_MILLIS));
default:
throw new ValidationException(
@@ -163,7 +165,6 @@ public class MySqlTableInlongSourceFactory implements
DynamicTableSourceFactory
final boolean ghostDdlChange = config.get(GH_OST_DDL_CHANGE);
final String ghostTableRegex = config.get(GH_OST_TABLE_REGEX);
if (enableParallelRead) {
- validateStartupOptionIfEnableParallel(startupOptions);
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE,
splitSize, 1);
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize,
1);
validateIntegerOption(SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1);
@@ -233,6 +234,9 @@ public class MySqlTableInlongSourceFactory implements
DynamicTableSourceFactory
options.add(SCAN_STARTUP_MODE);
options.add(SCAN_STARTUP_SPECIFIC_OFFSET_FILE);
options.add(SCAN_STARTUP_SPECIFIC_OFFSET_POS);
+ options.add(SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET);
+ options.add(SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS);
+ options.add(SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS);
options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
options.add(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
@@ -267,17 +271,6 @@ public class MySqlTableInlongSourceFactory implements
DynamicTableSourceFactory
}
}
- private void validateStartupOptionIfEnableParallel(StartupOptions
startupOptions) {
- // validate mode
- Preconditions.checkState(
- startupOptions.startupMode == StartupMode.INITIAL
- || startupOptions.startupMode ==
StartupMode.LATEST_OFFSET,
- String.format(
- "MySql Parallel Source only supports startup mode
'initial' and 'latest-offset',"
- + " but actual is %s",
- startupOptions.startupMode));
- }
-
private String validateAndGetServerId(ReadableConfig configuration) {
final String serverIdValue =
configuration.get(MySqlSourceOptions.SERVER_ID);
if (serverIdValue != null) {
@@ -351,4 +344,48 @@ public class MySqlTableInlongSourceFactory implements
DynamicTableSourceFactory
1.0d,
distributionFactorLower));
}
+
+ private static void validateSpecificOffset(ReadableConfig config) {
+ Optional<String> gtidSet = config.getOptional(
+ MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET);
+ Optional<String> binlogFilename = config.getOptional(
+ MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE);
+ Optional<Long> binlogPosition = config.getOptional(
+ MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS);
+ if (!gtidSet.isPresent() && !(binlogFilename.isPresent() &&
binlogPosition.isPresent())) {
+ throw new ValidationException(
+ String.format(
+ "Unable to find a valid binlog offset. Either %s,
or %s and %s are required.",
+
MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET.key(),
+
MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE.key(),
+
MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS.key()));
+ }
+ }
+
+ private static StartupOptions getSpecificOffset(ReadableConfig config) {
+ BinlogOffsetBuilder offsetBuilder = BinlogOffset.builder();
+
+ // GTID set
+ config.getOptional(
+ MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET)
+ .ifPresent(offsetBuilder::setGtidSet);
+
+ // Binlog file + pos
+ Optional<String> binlogFilename =
config.getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE);
+ Optional<Long> binlogPosition =
config.getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS);
+ if (binlogFilename.isPresent() && binlogPosition.isPresent()) {
+ offsetBuilder.setBinlogFilePosition(binlogFilename.get(),
binlogPosition.get());
+ } else {
+ offsetBuilder.setBinlogFilePosition("", 0);
+ }
+
+ config.getOptional(
+ MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS)
+ .ifPresent(offsetBuilder::setSkipEvents);
+ config.getOptional(
+ MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS)
+ .ifPresent(offsetBuilder::setSkipRows);
+ return StartupOptions.specificOffset(offsetBuilder.build());
+ }
+
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/StartupOptions.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/StartupOptions.java
index 172c6ae0ac..d97cbe1870 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/StartupOptions.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/StartupOptions.java
@@ -17,6 +17,10 @@
package org.apache.inlong.sort.cdc.mysql.table;
+import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffset;
+
+import javax.annotation.Nullable;
+
import java.io.Serializable;
import java.util.Objects;
@@ -28,16 +32,15 @@ public final class StartupOptions implements Serializable {
private static final long serialVersionUID = 1L;
public final StartupMode startupMode;
- public final String specificOffsetFile;
- public final Integer specificOffsetPos;
- public final Long startupTimestampMillis;
+ @Nullable
+ public final BinlogOffset binlogOffset;
/**
* Performs an initial snapshot on the monitored database tables upon
first startup, and
* continue to read the latest binlog.
*/
public static StartupOptions initial() {
- return new StartupOptions(StartupMode.INITIAL, null, null, null);
+ return new StartupOptions(StartupMode.INITIAL, null);
}
/**
@@ -46,7 +49,7 @@ public final class StartupOptions implements Serializable {
* binlog is guaranteed to contain the entire history of the database.
*/
public static StartupOptions earliest() {
- return new StartupOptions(StartupMode.EARLIEST_OFFSET, null, null,
null);
+ return new StartupOptions(StartupMode.EARLIEST_OFFSET,
BinlogOffset.ofEarliest());
}
/**
@@ -54,16 +57,25 @@ public final class StartupOptions implements Serializable {
* the end of the binlog which means only have the changes since the
connector was started.
*/
public static StartupOptions latest() {
- return new StartupOptions(StartupMode.LATEST_OFFSET, null, null, null);
+ return new StartupOptions(StartupMode.LATEST_OFFSET,
BinlogOffset.ofLatest());
}
/**
* Never to perform snapshot on the monitored database tables upon first
startup, and directly
* read binlog from the specified offset.
*/
- public static StartupOptions specificOffset(String specificOffsetFile, int
specificOffsetPos) {
+ public static StartupOptions specificOffset(String specificOffsetFile,
long specificOffsetPos) {
return new StartupOptions(
- StartupMode.SPECIFIC_OFFSETS, specificOffsetFile,
specificOffsetPos, null);
+ StartupMode.SPECIFIC_OFFSETS,
+ BinlogOffset.ofBinlogFilePosition(specificOffsetFile,
specificOffsetPos));
+ }
+
+ public static StartupOptions specificOffset(String gtidSet) {
+ return new StartupOptions(StartupMode.SPECIFIC_OFFSETS,
BinlogOffset.ofGtidSet(gtidSet));
+ }
+
+ public static StartupOptions specificOffset(BinlogOffset binlogOffset) {
+ return new StartupOptions(StartupMode.SPECIFIC_OFFSETS, binlogOffset);
}
/**
@@ -76,33 +88,15 @@ public final class StartupOptions implements Serializable {
* @param startupTimestampMillis timestamp for the startup offsets, as
milliseconds from epoch.
*/
public static StartupOptions timestamp(long startupTimestampMillis) {
- return new StartupOptions(StartupMode.TIMESTAMP, null, null,
startupTimestampMillis);
+ return new StartupOptions(StartupMode.TIMESTAMP,
BinlogOffset.ofTimestampSec(startupTimestampMillis / 1000));
}
- private StartupOptions(
- StartupMode startupMode,
- String specificOffsetFile,
- Integer specificOffsetPos,
- Long startupTimestampMillis) {
+ private StartupOptions(StartupMode startupMode, BinlogOffset binlogOffset)
{
this.startupMode = startupMode;
- this.specificOffsetFile = specificOffsetFile;
- this.specificOffsetPos = specificOffsetPos;
- this.startupTimestampMillis = startupTimestampMillis;
-
- switch (startupMode) {
- case INITIAL:
- case EARLIEST_OFFSET:
- case LATEST_OFFSET:
- break;
- case SPECIFIC_OFFSETS:
- checkNotNull(specificOffsetFile, "specificOffsetFile shouldn't
be null");
- checkNotNull(specificOffsetPos, "specificOffsetPos shouldn't
be null");
- break;
- case TIMESTAMP:
- checkNotNull(startupTimestampMillis, "startupTimestampMillis
shouldn't be null");
- break;
- default:
- throw new UnsupportedOperationException(startupMode + " mode
is not supported.");
+ this.binlogOffset = binlogOffset;
+ if (startupMode != StartupMode.INITIAL) {
+ checkNotNull(
+ binlogOffset, "Binlog offset is required if startup mode
is %s", startupMode);
}
}
@@ -115,15 +109,11 @@ public final class StartupOptions implements Serializable
{
return false;
}
StartupOptions that = (StartupOptions) o;
- return startupMode == that.startupMode
- && Objects.equals(specificOffsetFile, that.specificOffsetFile)
- && Objects.equals(specificOffsetPos, that.specificOffsetPos)
- && Objects.equals(startupTimestampMillis,
that.startupTimestampMillis);
+ return startupMode == that.startupMode && Objects.equals(binlogOffset,
that.binlogOffset);
}
@Override
public int hashCode() {
- return Objects.hash(
- startupMode, specificOffsetFile, specificOffsetPos,
startupTimestampMillis);
+ return Objects.hash(startupMode, binlogOffset);
}
}
diff --git a/licenses/inlong-sort-connectors/LICENSE
b/licenses/inlong-sort-connectors/LICENSE
index 5310204943..55d8862b8c 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -344,96 +344,100 @@
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/src/main/java/org/apache/inlong/sort/cdc/base/util/DatabaseHistoryUtil.java
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/src/main/java/org/apache/inlong/sort/cdc/base/util/RecordUtils.java
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/src/main/java/org/apache/inlong/sort/cdc/base/util/TemporalConversions.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSnapshotSplit.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSchemalessSnapshotSplit.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReaderContext.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/schema/MySqlTypeUtils.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/StatementUtils.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/schema/MySqlSchema.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlDeserializationConverterFactory.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/connection/JdbcConnectionFactory.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/FinishedSnapshotSplitsReportEvent.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/state/HybridPendingSplitsState.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/connection/ConnectionPoolId.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/JsonDebeziumDeserializationSchema.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/connection/ConnectionPools.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlRecords.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/SnapshotSplitReader.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/enumerator/MySqlSourceEnumerator.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/schema/MySqlTableDefinition.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/BinlogSplitMetaRequestEvent.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/Validator.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/StatefulTaskContext.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/DebeziumUtils.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplit.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlHybridSplitAssigner.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfig.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlBinlogSplit.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSnapshotSplitState.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/ChunkUtils.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/LatestFinishedSplitsSizeRequestEvent.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/dispatcher/EventDispatcherImpl.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlSnapshotSplitReadTask.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffsetSerializer.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/ChunkRange.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/TableDiscoveryUtils.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/state/PendingSplitsStateSerializer.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/state/BinlogPendingSplitsState.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSplitReader.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/MySqlErrorHandler.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlBinlogSplitAssigner.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/SerializerUtils.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/MySqlSource.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/AssignerStatus.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/state/SnapshotPendingSplitsState.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/StringDebeziumDeserializationSchema.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/ChunkSplitter.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/FinishedSnapshotSplitsAckEvent.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffset.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/StartupMode.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/OldFieldMetadataConverter.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/ObjectUtils.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/BinlogSplitMetaEvent.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/state/PendingSplitsState.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitState.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/SuspendBinlogReaderEvent.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/FinishedSnapshotSplitsRequestEvent.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/StartupOptions.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/JdbcUrlUtils.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/ServerIdRange.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/EmbeddedFlinkDatabaseHistory.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlBinlogSplitState.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/MySqlValidator.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/WakeupReaderEvent.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/DebeziumReader.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/dispatcher/SignalEventDispatcher.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSplitAssigner.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/SeekBinlogToTimestampFilter.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlBinlogSplitReadTask.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/LatestFinishedSplitsSizeEvent.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/connection/PooledDataSourceFactory.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/connection/JdbcConnectionPools.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSourceBuilder.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/SuspendBinlogReaderAckEvent.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/MySqlTaskContextImpl.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/schema/MySqlFieldDefinition.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/FinishedSnapshotSplitInfo.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/DebeziumChangeFetcher.java
-
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/sort-mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/DebeziumChangeConsumer.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSnapshotSplit.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSchemalessSnapshotSplit.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReaderContext.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/schema/MySqlTypeUtils.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/StatementUtils.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/schema/MySqlSchema.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlDeserializationConverterFactory.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/connection/JdbcConnectionFactory.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/FinishedSnapshotSplitsReportEvent.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/state/HybridPendingSplitsState.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/connection/ConnectionPoolId.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/JsonDebeziumDeserializationSchema.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/connection/ConnectionPools.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlRecords.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/SnapshotSplitReader.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/enumerator/MySqlSourceEnumerator.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/schema/MySqlTableDefinition.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/BinlogSplitMetaRequestEvent.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/Validator.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/StatefulTaskContext.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/DebeziumUtils.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplit.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlHybridSplitAssigner.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfig.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlBinlogSplit.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSnapshotSplitState.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/ChunkUtils.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/LatestFinishedSplitsSizeRequestEvent.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/dispatcher/EventDispatcherImpl.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlSnapshotSplitReadTask.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffsetSerializer.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/ChunkRange.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/TableDiscoveryUtils.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/state/PendingSplitsStateSerializer.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/state/BinlogPendingSplitsState.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSplitReader.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/exception/SchemaOutOfSyncException.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/MySqlErrorHandler.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlBinlogSplitAssigner.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/SerializerUtils.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/MySqlSource.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/AssignerStatus.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/state/SnapshotPendingSplitsState.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/StringDebeziumDeserializationSchema.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/ChunkSplitter.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/FinishedSnapshotSplitsAckEvent.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffset.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffsetBuilder.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffsetKind.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/offset/BinlogOffsetUtils.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/StartupMode.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/OldFieldMetadataConverter.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/ObjectUtils.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/BinlogSplitMetaEvent.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/state/PendingSplitsState.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitState.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/SuspendBinlogReaderEvent.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/FinishedSnapshotSplitsRequestEvent.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/StartupOptions.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/JdbcUrlUtils.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/ServerIdRange.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/EmbeddedFlinkDatabaseHistory.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlBinlogSplitState.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/MySqlValidator.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/WakeupReaderEvent.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/DebeziumReader.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/dispatcher/SignalEventDispatcher.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSplitAssigner.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/SeekBinlogToTimestampFilter.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlBinlogSplitReadTask.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/LatestFinishedSplitsSizeEvent.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/connection/PooledDataSourceFactory.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/connection/JdbcConnectionPools.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSourceBuilder.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/events/SuspendBinlogReaderAckEvent.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/MySqlTaskContextImpl.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/schema/MySqlFieldDefinition.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/FinishedSnapshotSplitInfo.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/DebeziumChangeFetcher.java
+
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/DebeziumChangeConsumer.java
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/PostgreSQLSource.java
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/DebeziumSourceFunction.java
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/debezium/internal/ColumnImpl.java