This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 95fe4d3d3 [FLINK-36684][cdc-connector/mysql] Support read changelog as
append only mode
95fe4d3d3 is described below
commit 95fe4d3d36d0363540a3fd83cdf5e00f5dc6bcf0
Author: Runkang He <[email protected]>
AuthorDate: Thu Apr 17 16:09:10 2025 +0800
[FLINK-36684][cdc-connector/mysql] Support read changelog as append only
mode
This close #3708
---
.../docs/connectors/flink-sources/mysql-cdc.md | 22 +++++
.../docs/connectors/flink-sources/mysql-cdc.md | 13 +++
.../debezium/table/AppendMetadataCollector.java | 15 ++-
.../table/RowDataDebeziumDeserializeSchema.java | 15 ++-
.../mysql/source/config/MySqlSourceOptions.java | 7 ++
.../connectors/mysql/table/MySqlTableSource.java | 22 ++++-
.../mysql/table/MySqlTableSourceFactory.java | 7 +-
.../mysql/table/MySqlConnectorITCase.java | 102 +++++++++++++++++++++
.../mysql/table/MySqlTableSourceFactoryTest.java | 36 +++++---
9 files changed, 217 insertions(+), 22 deletions(-)
diff --git a/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
b/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
index 9df1b129c..7ae1ae449 100644
--- a/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
+++ b/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
@@ -400,6 +400,19 @@ Flink SQL> SELECT * FROM orders;
这是一项实验特性,默认为 false。
</td>
</tr>
+ <tr>
+ <td>scan.read-changelog-as-append-only.enabled</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>
+ 是否将 changelog 数据流转换为 append-only 数据流。<br>
+ 仅在需要保存上游表删除消息等特殊场景下开启使用,比如在逻辑删除场景下,用户不允许物理删除下游消息,此时使用该特性,并配合 row_kind
元数据字段,下游可以先保存所有明细数据,再通过 row_kind 字段判断是否进行逻辑删除。<br>
+ 参数取值如下:<br>
+ <li>true:所有类型的消息(包括INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER)都会转换成
INSERT 类型的消息。</li>
+ <li>false(默认):所有类型的消息都保持原样下发。</li>
+ </td>
+ </tr>
</tbody>
</table>
</div>
@@ -433,6 +446,13 @@ Flink SQL> SELECT * FROM orders;
<td>TIMESTAMP_LTZ(3) NOT NULL</td>
<td>当前记录表在数据库中更新的时间。 <br>如果从表的快照而不是 binlog 读取记录,该值将始终为0。</td>
</tr>
+ <tr>
+ <td>row_kind</td>
+ <td>STRING NOT NULL</td>
+ <td>当前记录的变更类型。<br>
+ 注意:如果 Source 算子选择为每条记录输出 row_kind 列,则下游 SQL
操作符在处理回撤时可能会由于此新添加的列而无法比较,导致出现非确定性更新问题。建议仅在简单的同步作业中使用此元数据列。<br>
+ '+I' 表示 INSERT 消息,'-D' 表示 DELETE 消息,'-U' 表示 UPDATE_BEFORE 消息,'+U' 表示
UPDATE_AFTER 消息。</td>
+ </tr>
</tbody>
</table>
@@ -442,6 +462,7 @@ CREATE TABLE products (
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
+ operation STRING METADATA FROM 'row_kind' VIRTUAL,
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
@@ -466,6 +487,7 @@ CREATE TABLE products (
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
+ operation STRING METADATA FROM 'row_kind' VIRTUAL,
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
diff --git a/docs/content/docs/connectors/flink-sources/mysql-cdc.md
b/docs/content/docs/connectors/flink-sources/mysql-cdc.md
index 45988afd9..2a7f3ca72 100644
--- a/docs/content/docs/connectors/flink-sources/mysql-cdc.md
+++ b/docs/content/docs/connectors/flink-sources/mysql-cdc.md
@@ -426,6 +426,19 @@ During a snapshot operation, the connector will query each
included table to pro
Experimental option, defaults to false.
</td>
</tr>
+ <tr>
+ <td>scan.read-changelog-as-append-only.enabled</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>
+ Whether to convert the changelog stream to an append-only stream.<br>
+ This feature is only used in special scenarios where you need to save
upstream table deletion messages. For example, in a logical deletion scenario,
users are not allowed to physically delete downstream messages. In this case,
this feature is used in conjunction with the row_kind metadata field.
Therefore, the downstream can save all detailed data at first, and then use the
row_kind field to determine whether to perform logical deletion.<br>
+ The option values are as follows:<br>
+ <li>true: All types of messages (including INSERT, DELETE,
UPDATE_BEFORE, and UPDATE_AFTER) will be converted into INSERT messages.</li>
+ <li>false (default): All types of messages are sent as is.</li>
+ </td>
+ </tr>
</tbody>
</table>
</div>
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/AppendMetadataCollector.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/AppendMetadataCollector.java
index 6b8d95ab0..fc3f116c0 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/AppendMetadataCollector.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/AppendMetadataCollector.java
@@ -21,6 +21,7 @@ import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.source.SourceRecord;
@@ -37,8 +38,15 @@ public final class AppendMetadataCollector implements
Collector<RowData>, Serial
public transient SourceRecord inputRecord;
public transient Collector<RowData> outputCollector;
+ private final boolean appendOnly;
+
public AppendMetadataCollector(MetadataConverter[] metadataConverters) {
+ this(metadataConverters, false);
+ }
+
+ public AppendMetadataCollector(MetadataConverter[] metadataConverters,
boolean appendOnly) {
this.metadataConverters = metadataConverters;
+ this.appendOnly = appendOnly;
}
@Override
@@ -55,7 +63,12 @@ public final class AppendMetadataCollector implements
Collector<RowData>, Serial
metaRow.setField(i, meta);
}
- RowData outRow = new JoinedRowData(physicalRow.getRowKind(),
physicalRow, metaRow);
+ RowData outRow;
+ if (appendOnly) {
+ outRow = new JoinedRowData(RowKind.INSERT, physicalRow, metaRow);
+ } else {
+ outRow = new JoinedRowData(physicalRow.getRowKind(), physicalRow,
metaRow);
+ }
outputCollector.collect(outRow);
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
index b68dcd171..9530490de 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
@@ -104,9 +104,10 @@ public final class RowDataDebeziumDeserializeSchema
ValueValidator validator,
ZoneId serverTimeZone,
DeserializationRuntimeConverterFactory userDefinedConverterFactory,
- DebeziumChangelogMode changelogMode) {
+ DebeziumChangelogMode changelogMode,
+ boolean appendOnly) {
this.hasMetadata = checkNotNull(metadataConverters).length > 0;
- this.appendMetadataCollector = new
AppendMetadataCollector(metadataConverters);
+ this.appendMetadataCollector = new
AppendMetadataCollector(metadataConverters, appendOnly);
this.physicalConverter =
createConverter(
checkNotNull(physicalDataType),
@@ -190,6 +191,8 @@ public final class RowDataDebeziumDeserializeSchema
DeserializationRuntimeConverterFactory.DEFAULT;
private DebeziumChangelogMode changelogMode =
DebeziumChangelogMode.ALL;
+ private boolean appendOnly = false;
+
public Builder setPhysicalRowType(RowType physicalRowType) {
this.physicalRowType = physicalRowType;
return this;
@@ -226,6 +229,11 @@ public final class RowDataDebeziumDeserializeSchema
return this;
}
+ public Builder setAppendOnly(boolean appendOnly) {
+ this.appendOnly = appendOnly;
+ return this;
+ }
+
public RowDataDebeziumDeserializeSchema build() {
return new RowDataDebeziumDeserializeSchema(
physicalRowType,
@@ -234,7 +242,8 @@ public final class RowDataDebeziumDeserializeSchema
validator,
serverTimeZone,
userDefinedConverterFactory,
- changelogMode);
+ changelogMode,
+ appendOnly);
}
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
index 71e50d883..a00d6d564 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
@@ -191,6 +191,13 @@ public class MySqlSourceOptions {
.withDescription(
"Optional interval of sending heartbeat event for
tracing the latest available binlog offsets");
+ public static final ConfigOption<Boolean>
SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED =
+ ConfigOptions.key("scan.read-changelog-as-append-only.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to convert the changelog data stream to
an append-only data stream");
+
//
----------------------------------------------------------------------------
// experimental options, won't add them to documentation
//
----------------------------------------------------------------------------
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
index 110075442..2a1f05194 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
@@ -102,6 +102,8 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
private final boolean useLegacyJsonFormat;
private final boolean assignUnboundedChunkFirst;
+ private final boolean appendOnly;
+
//
--------------------------------------------------------------------------------------------
// Mutable attributes
//
--------------------------------------------------------------------------------------------
@@ -141,7 +143,8 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
boolean skipSnapshotBackFill,
boolean parseOnlineSchemaChanges,
boolean useLegacyJsonFormat,
- boolean assignUnboundedChunkFirst) {
+ boolean assignUnboundedChunkFirst,
+ boolean appendOnly) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = checkNotNull(hostname);
@@ -174,11 +177,16 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
this.skipSnapshotBackFill = skipSnapshotBackFill;
this.useLegacyJsonFormat = useLegacyJsonFormat;
this.assignUnboundedChunkFirst = assignUnboundedChunkFirst;
+ this.appendOnly = appendOnly;
}
@Override
public ChangelogMode getChangelogMode() {
- return ChangelogMode.all();
+ if (appendOnly) {
+ return ChangelogMode.insertOnly();
+ } else {
+ return ChangelogMode.all();
+ }
}
@Override
@@ -197,6 +205,7 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
.setServerTimeZone(serverTimeZone)
.setUserDefinedConverterFactory(
MySqlDeserializationConverterFactory.instance())
+ .setAppendOnly(appendOnly)
.build();
if (enableParallelRead) {
MySqlSource<RowData> parallelSource =
@@ -320,7 +329,8 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
skipSnapshotBackFill,
parseOnlineSchemaChanges,
useLegacyJsonFormat,
- assignUnboundedChunkFirst);
+ assignUnboundedChunkFirst,
+ appendOnly);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
@@ -365,7 +375,8 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
&& Objects.equals(skipSnapshotBackFill,
that.skipSnapshotBackFill)
&& parseOnlineSchemaChanges == that.parseOnlineSchemaChanges
&& useLegacyJsonFormat == that.useLegacyJsonFormat
- && assignUnboundedChunkFirst == that.assignUnboundedChunkFirst;
+ && assignUnboundedChunkFirst == that.assignUnboundedChunkFirst
+ && Objects.equals(appendOnly, that.appendOnly);
}
@Override
@@ -401,7 +412,8 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
skipSnapshotBackFill,
parseOnlineSchemaChanges,
useLegacyJsonFormat,
- assignUnboundedChunkFirst);
+ assignUnboundedChunkFirst,
+ appendOnly);
}
@Override
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
index 883929069..7b82aa588 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
@@ -108,6 +108,9 @@ public class MySqlTableSourceFactory implements
DynamicTableSourceFactory {
boolean assignUnboundedChunkFirst =
config.get(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST);
+ boolean appendOnly =
+
config.get(MySqlSourceOptions.SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED);
+
if (enableParallelRead) {
validatePrimaryKeyIfEnableParallel(physicalSchema, chunkKeyColumn);
validateIntegerOption(
@@ -153,7 +156,8 @@ public class MySqlTableSourceFactory implements
DynamicTableSourceFactory {
skipSnapshotBackFill,
parseOnLineSchemaChanges,
useLegacyJsonFormat,
- assignUnboundedChunkFirst);
+ assignUnboundedChunkFirst,
+ appendOnly);
}
@Override
@@ -202,6 +206,7 @@ public class MySqlTableSourceFactory implements
DynamicTableSourceFactory {
options.add(MySqlSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES);
options.add(MySqlSourceOptions.USE_LEGACY_JSON_FORMAT);
options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST);
+
options.add(MySqlSourceOptions.SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED);
return options;
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java
index 34cd5803a..7ec84611a 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java
@@ -2334,4 +2334,106 @@ class MySqlConnectorITCase extends MySqlSourceTestBase {
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator,
expected.length));
result.getJobClient().get().cancel().get();
}
+
+ @ParameterizedTest(name = "incrementalSnapshot = {0}")
+ @ValueSource(booleans = {true, false})
+ public void testReadChangelogAppendOnly(boolean incrementalSnapshot)
throws Exception {
+ setup(incrementalSnapshot);
+ userDatabase1.createAndInitialize();
+ String sourceDDL =
+ String.format(
+ "CREATE TABLE mysql_users ("
+ + " db_name STRING METADATA FROM
'database_name' VIRTUAL,"
+ + " table_name STRING METADATA VIRTUAL,"
+ + " row_kind STRING METADATA FROM 'row_kind'
VIRTUAL,"
+ + " `id` DECIMAL(20, 0) NOT NULL,"
+ + " name STRING,"
+ + " address STRING,"
+ + " phone_number STRING,"
+ + " email STRING,"
+ + " age INT,"
+ + " primary key (`id`) not enforced"
+ + ") WITH ("
+ + " 'connector' = 'mysql-cdc',"
+ + " 'hostname' = '%s',"
+ + " 'port' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s',"
+ + " 'database-name' = '%s',"
+ + " 'table-name' = '%s',"
+ + " 'scan.incremental.snapshot.enabled' =
'%s',"
+ + " 'server-id' = '%s',"
+ + " 'server-time-zone' = 'UTC',"
+ + " 'scan.incremental.snapshot.chunk.size' =
'%s',"
+ + "
'scan.read-changelog-as-append-only.enabled' = 'true'"
+ + ")",
+ MYSQL_CONTAINER.getHost(),
+ MYSQL_CONTAINER.getDatabasePort(),
+ userDatabase1.getUsername(),
+ userDatabase1.getPassword(),
+ userDatabase1.getDatabaseName(),
+ "user_table_.*",
+ incrementalSnapshot,
+ getServerId(incrementalSnapshot),
+ getSplitSize(incrementalSnapshot));
+
+ String sinkDDL =
+ "CREATE TABLE sink ("
+ + " database_name STRING,"
+ + " table_name STRING,"
+ + " row_kind STRING,"
+ + " `id` DECIMAL(20, 0) NOT NULL,"
+ + " name STRING,"
+ + " address STRING,"
+ + " phone_number STRING,"
+ + " email STRING,"
+ + " age INT,"
+ + " primary key (database_name, table_name, id) not
enforced"
+ + ") WITH ("
+ + " 'connector' = 'values',"
+ + " 'sink-insert-only' = 'false'"
+ + ")";
+ tEnv.executeSql(sourceDDL);
+ tEnv.executeSql(sinkDDL);
+
+ // async submit job
+ TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM
mysql_users");
+
+ // wait for snapshot finished and begin binlog
+ waitForSinkSize("sink", 2);
+
+ try (Connection connection = userDatabase1.getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+
+ statement.execute(
+ "INSERT INTO user_table_1_2 VALUES
(200,'user_200','Wuhan',123567891234);");
+ statement.execute(
+ "INSERT INTO user_table_1_1 VALUES
(300,'user_300','Hangzhou',123567891234, '[email protected]');");
+ statement.execute("UPDATE user_table_1_1 SET address='Beijing'
WHERE id=300;");
+ statement.execute("UPDATE user_table_1_2 SET phone_number=88888888
WHERE id=121;");
+ statement.execute("DELETE FROM user_table_1_1 WHERE id=111;");
+ }
+
+ // waiting for binlog finished (5 more events)
+ waitForSinkSize("sink", 7);
+
+ List<String> expected =
+ Stream.of(
+ "+I[%s, user_table_1_1, +I, 111, user_111,
Shanghai, 123567891234, [email protected], null]",
+ "+I[%s, user_table_1_2, +I, 121, user_121,
Shanghai, 123567891234, null, null]",
+ "+I[%s, user_table_1_2, +I, 200, user_200,
Wuhan, 123567891234, null, null]",
+ "+I[%s, user_table_1_1, +I, 300, user_300,
Hangzhou, 123567891234, [email protected], null]",
+ "+I[%s, user_table_1_1, +U, 300, user_300,
Beijing, 123567891234, [email protected], null]",
+ "+I[%s, user_table_1_2, +U, 121, user_121,
Shanghai, 88888888, null, null]",
+ "+I[%s, user_table_1_1, -D, 111, user_111,
Shanghai, 123567891234, [email protected], null]",
+ "+I[%s, user_table_1_1, -U, 300, user_300,
Hangzhou, 123567891234, [email protected], null]",
+ "+I[%s, user_table_1_2, -U, 121, user_121,
Shanghai, 123567891234, null, null]")
+ .map(s -> String.format(s,
userDatabase1.getDatabaseName()))
+ .sorted()
+ .collect(Collectors.toList());
+
+ List<String> actual =
TestValuesTableFactory.getRawResultsAsStrings("sink");
+
Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);
+ result.getJobClient().get().cancel().get();
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
index 4dbb6dc21..4f249335f 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
@@ -128,7 +128,8 @@ class MySqlTableSourceFactoryTest {
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
-
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue());
+
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
+ false);
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -177,7 +178,8 @@ class MySqlTableSourceFactoryTest {
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
-
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue());
+
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
+ false);
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -222,7 +224,8 @@ class MySqlTableSourceFactoryTest {
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
-
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue());
+
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
+ false);
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -265,7 +268,8 @@ class MySqlTableSourceFactoryTest {
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
-
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue());
+
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
+ false);
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -325,7 +329,8 @@ class MySqlTableSourceFactoryTest {
true,
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
true,
-
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue());
+
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
+ false);
Assertions.assertThat(actualSource)
.isEqualTo(expectedSource)
.isInstanceOf(MySqlTableSource.class);
@@ -383,7 +388,8 @@ class MySqlTableSourceFactoryTest {
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
-
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue());
+
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
+ false);
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -424,7 +430,8 @@ class MySqlTableSourceFactoryTest {
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
-
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue());
+
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
+ false);
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -466,7 +473,8 @@ class MySqlTableSourceFactoryTest {
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
-
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue());
+
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
+ false);
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -509,7 +517,8 @@ class MySqlTableSourceFactoryTest {
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
-
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue());
+
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
+ false);
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -550,7 +559,8 @@ class MySqlTableSourceFactoryTest {
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
-
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue());
+
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
+ false);
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -596,7 +606,8 @@ class MySqlTableSourceFactoryTest {
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
-
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue());
+
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
+ false);
expectedSource.producedDataType =
SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name");
@@ -798,7 +809,8 @@ class MySqlTableSourceFactoryTest {
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
true,
true,
- true);
+ true,
+ false);
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}