This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 95fe4d3d3 [FLINK-36684][cdc-connector/mysql] Support read changelog as 
append only mode
95fe4d3d3 is described below

commit 95fe4d3d36d0363540a3fd83cdf5e00f5dc6bcf0
Author: Runkang He <[email protected]>
AuthorDate: Thu Apr 17 16:09:10 2025 +0800

    [FLINK-36684][cdc-connector/mysql] Support read changelog as append only 
mode
    
    This close #3708
---
 .../docs/connectors/flink-sources/mysql-cdc.md     |  22 +++++
 .../docs/connectors/flink-sources/mysql-cdc.md     |  13 +++
 .../debezium/table/AppendMetadataCollector.java    |  15 ++-
 .../table/RowDataDebeziumDeserializeSchema.java    |  15 ++-
 .../mysql/source/config/MySqlSourceOptions.java    |   7 ++
 .../connectors/mysql/table/MySqlTableSource.java   |  22 ++++-
 .../mysql/table/MySqlTableSourceFactory.java       |   7 +-
 .../mysql/table/MySqlConnectorITCase.java          | 102 +++++++++++++++++++++
 .../mysql/table/MySqlTableSourceFactoryTest.java   |  36 +++++---
 9 files changed, 217 insertions(+), 22 deletions(-)

diff --git a/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md 
b/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
index 9df1b129c..7ae1ae449 100644
--- a/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
+++ b/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
@@ -400,6 +400,19 @@ Flink SQL> SELECT * FROM orders;
         这是一项实验特性,默认为 false。
       </td>
     </tr>
+    <tr>
+      <td>scan.read-changelog-as-append-only.enabled</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>
+        是否将 changelog 数据流转换为 append-only 数据流。<br>
+        仅在需要保存上游表删除消息等特殊场景下开启使用,比如在逻辑删除场景下,用户不允许物理删除下游消息,此时使用该特性,并配合 row_kind 
元数据字段,下游可以先保存所有明细数据,再通过 row_kind 字段判断是否进行逻辑删除。<br>
+        参数取值如下:<br>
+          <li>true:所有类型的消息(包括INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER)都会转换成 
INSERT 类型的消息。</li>
+          <li>false(默认):所有类型的消息都保持原样下发。</li>
+      </td>
+    </tr>
     </tbody>
 </table>
 </div>
@@ -433,6 +446,13 @@ Flink SQL> SELECT * FROM orders;
       <td>TIMESTAMP_LTZ(3) NOT NULL</td>
       <td>当前记录表在数据库中更新的时间。 <br>如果从表的快照而不是 binlog 读取记录,该值将始终为0。</td>
     </tr>
+    <tr>
+      <td>row_kind</td>
+      <td>STRING NOT NULL</td>
+      <td>当前记录的变更类型。<br>
+         注意:如果 Source 算子选择为每条记录输出 row_kind 列,则下游 SQL 
操作符在处理回撤时可能会由于此新添加的列而无法比较,导致出现非确定性更新问题。建议仅在简单的同步作业中使用此元数据列。<br>
+         '+I' 表示 INSERT 消息,'-D' 表示 DELETE 消息,'-U' 表示 UPDATE_BEFORE 消息,'+U' 表示 
UPDATE_AFTER 消息。</td>
+    </tr>
   </tbody>
 </table>
 
@@ -442,6 +462,7 @@ CREATE TABLE products (
     db_name STRING METADATA FROM 'database_name' VIRTUAL,
     table_name STRING METADATA  FROM 'table_name' VIRTUAL,
     operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
+    operation STRING METADATA FROM 'row_kind' VIRTUAL,
     order_id INT,
     order_date TIMESTAMP(0),
     customer_name STRING,
@@ -466,6 +487,7 @@ CREATE TABLE products (
     db_name STRING METADATA FROM 'database_name' VIRTUAL,
     table_name STRING METADATA  FROM 'table_name' VIRTUAL,
     operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
+    operation STRING METADATA FROM 'row_kind' VIRTUAL,
     order_id INT,
     order_date TIMESTAMP(0),
     customer_name STRING,
diff --git a/docs/content/docs/connectors/flink-sources/mysql-cdc.md 
b/docs/content/docs/connectors/flink-sources/mysql-cdc.md
index 45988afd9..2a7f3ca72 100644
--- a/docs/content/docs/connectors/flink-sources/mysql-cdc.md
+++ b/docs/content/docs/connectors/flink-sources/mysql-cdc.md
@@ -426,6 +426,19 @@ During a snapshot operation, the connector will query each 
included table to pro
         Experimental option, defaults to false.
       </td>
     </tr>
+    <tr>
+      <td>scan.read-changelog-as-append-only.enabled</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>
+        Whether to convert the changelog stream to an append-only stream.<br>
+        This feature is only used in special scenarios where you need to save 
upstream table deletion messages. For example, in a logical deletion scenario, 
users are not allowed to physically delete downstream messages. In this case, 
this feature is used in conjunction with the row_kind metadata field. 
Therefore, the downstream can save all detailed data at first, and then use the 
row_kind field to determine whether to perform logical deletion.<br>
+        The option values are as follows:<br>
+          <li>true: All types of messages (including INSERT, DELETE, 
UPDATE_BEFORE, and UPDATE_AFTER) will be converted into INSERT messages.</li>
+          <li>false (default): All types of messages are sent as is.</li>
+      </td>
+    </tr>
     </tbody>
 </table>
 </div>
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/AppendMetadataCollector.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/AppendMetadataCollector.java
index 6b8d95ab0..fc3f116c0 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/AppendMetadataCollector.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/AppendMetadataCollector.java
@@ -21,6 +21,7 @@ import org.apache.flink.cdc.common.annotation.Internal;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Collector;
 
 import org.apache.kafka.connect.source.SourceRecord;
@@ -37,8 +38,15 @@ public final class AppendMetadataCollector implements 
Collector<RowData>, Serial
     public transient SourceRecord inputRecord;
     public transient Collector<RowData> outputCollector;
 
+    private final boolean appendOnly;
+
     public AppendMetadataCollector(MetadataConverter[] metadataConverters) {
+        this(metadataConverters, false);
+    }
+
+    public AppendMetadataCollector(MetadataConverter[] metadataConverters, 
boolean appendOnly) {
         this.metadataConverters = metadataConverters;
+        this.appendOnly = appendOnly;
     }
 
     @Override
@@ -55,7 +63,12 @@ public final class AppendMetadataCollector implements 
Collector<RowData>, Serial
 
             metaRow.setField(i, meta);
         }
-        RowData outRow = new JoinedRowData(physicalRow.getRowKind(), 
physicalRow, metaRow);
+        RowData outRow;
+        if (appendOnly) {
+            outRow = new JoinedRowData(RowKind.INSERT, physicalRow, metaRow);
+        } else {
+            outRow = new JoinedRowData(physicalRow.getRowKind(), physicalRow, 
metaRow);
+        }
         outputCollector.collect(outRow);
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
index b68dcd171..9530490de 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
@@ -104,9 +104,10 @@ public final class RowDataDebeziumDeserializeSchema
             ValueValidator validator,
             ZoneId serverTimeZone,
             DeserializationRuntimeConverterFactory userDefinedConverterFactory,
-            DebeziumChangelogMode changelogMode) {
+            DebeziumChangelogMode changelogMode,
+            boolean appendOnly) {
         this.hasMetadata = checkNotNull(metadataConverters).length > 0;
-        this.appendMetadataCollector = new 
AppendMetadataCollector(metadataConverters);
+        this.appendMetadataCollector = new 
AppendMetadataCollector(metadataConverters, appendOnly);
         this.physicalConverter =
                 createConverter(
                         checkNotNull(physicalDataType),
@@ -190,6 +191,8 @@ public final class RowDataDebeziumDeserializeSchema
                 DeserializationRuntimeConverterFactory.DEFAULT;
         private DebeziumChangelogMode changelogMode = 
DebeziumChangelogMode.ALL;
 
+        private boolean appendOnly = false;
+
         public Builder setPhysicalRowType(RowType physicalRowType) {
             this.physicalRowType = physicalRowType;
             return this;
@@ -226,6 +229,11 @@ public final class RowDataDebeziumDeserializeSchema
             return this;
         }
 
+        public Builder setAppendOnly(boolean appendOnly) {
+            this.appendOnly = appendOnly;
+            return this;
+        }
+
         public RowDataDebeziumDeserializeSchema build() {
             return new RowDataDebeziumDeserializeSchema(
                     physicalRowType,
@@ -234,7 +242,8 @@ public final class RowDataDebeziumDeserializeSchema
                     validator,
                     serverTimeZone,
                     userDefinedConverterFactory,
-                    changelogMode);
+                    changelogMode,
+                    appendOnly);
         }
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
index 71e50d883..a00d6d564 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
@@ -191,6 +191,13 @@ public class MySqlSourceOptions {
                     .withDescription(
                             "Optional interval of sending heartbeat event for 
tracing the latest available binlog offsets");
 
+    public static final ConfigOption<Boolean> 
SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED =
+            ConfigOptions.key("scan.read-changelog-as-append-only.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to convert the changelog data stream to 
an append-only data stream");
+
     // 
----------------------------------------------------------------------------
     // experimental options, won't add them to documentation
     // 
----------------------------------------------------------------------------
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
index 110075442..2a1f05194 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
@@ -102,6 +102,8 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
     private final boolean useLegacyJsonFormat;
     private final boolean assignUnboundedChunkFirst;
 
+    private final boolean appendOnly;
+
     // 
--------------------------------------------------------------------------------------------
     // Mutable attributes
     // 
--------------------------------------------------------------------------------------------
@@ -141,7 +143,8 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
             boolean skipSnapshotBackFill,
             boolean parseOnlineSchemaChanges,
             boolean useLegacyJsonFormat,
-            boolean assignUnboundedChunkFirst) {
+            boolean assignUnboundedChunkFirst,
+            boolean appendOnly) {
         this.physicalSchema = physicalSchema;
         this.port = port;
         this.hostname = checkNotNull(hostname);
@@ -174,11 +177,16 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
         this.skipSnapshotBackFill = skipSnapshotBackFill;
         this.useLegacyJsonFormat = useLegacyJsonFormat;
         this.assignUnboundedChunkFirst = assignUnboundedChunkFirst;
+        this.appendOnly = appendOnly;
     }
 
     @Override
     public ChangelogMode getChangelogMode() {
-        return ChangelogMode.all();
+        if (appendOnly) {
+            return ChangelogMode.insertOnly();
+        } else {
+            return ChangelogMode.all();
+        }
     }
 
     @Override
@@ -197,6 +205,7 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
                         .setServerTimeZone(serverTimeZone)
                         .setUserDefinedConverterFactory(
                                 
MySqlDeserializationConverterFactory.instance())
+                        .setAppendOnly(appendOnly)
                         .build();
         if (enableParallelRead) {
             MySqlSource<RowData> parallelSource =
@@ -320,7 +329,8 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
                         skipSnapshotBackFill,
                         parseOnlineSchemaChanges,
                         useLegacyJsonFormat,
-                        assignUnboundedChunkFirst);
+                        assignUnboundedChunkFirst,
+                        appendOnly);
         source.metadataKeys = metadataKeys;
         source.producedDataType = producedDataType;
         return source;
@@ -365,7 +375,8 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
                 && Objects.equals(skipSnapshotBackFill, 
that.skipSnapshotBackFill)
                 && parseOnlineSchemaChanges == that.parseOnlineSchemaChanges
                 && useLegacyJsonFormat == that.useLegacyJsonFormat
-                && assignUnboundedChunkFirst == that.assignUnboundedChunkFirst;
+                && assignUnboundedChunkFirst == that.assignUnboundedChunkFirst
+                && Objects.equals(appendOnly, that.appendOnly);
     }
 
     @Override
@@ -401,7 +412,8 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
                 skipSnapshotBackFill,
                 parseOnlineSchemaChanges,
                 useLegacyJsonFormat,
-                assignUnboundedChunkFirst);
+                assignUnboundedChunkFirst,
+                appendOnly);
     }
 
     @Override
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
index 883929069..7b82aa588 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
@@ -108,6 +108,9 @@ public class MySqlTableSourceFactory implements 
DynamicTableSourceFactory {
         boolean assignUnboundedChunkFirst =
                 
config.get(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST);
 
+        boolean appendOnly =
+                
config.get(MySqlSourceOptions.SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED);
+
         if (enableParallelRead) {
             validatePrimaryKeyIfEnableParallel(physicalSchema, chunkKeyColumn);
             validateIntegerOption(
@@ -153,7 +156,8 @@ public class MySqlTableSourceFactory implements 
DynamicTableSourceFactory {
                 skipSnapshotBackFill,
                 parseOnLineSchemaChanges,
                 useLegacyJsonFormat,
-                assignUnboundedChunkFirst);
+                assignUnboundedChunkFirst,
+                appendOnly);
     }
 
     @Override
@@ -202,6 +206,7 @@ public class MySqlTableSourceFactory implements 
DynamicTableSourceFactory {
         options.add(MySqlSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES);
         options.add(MySqlSourceOptions.USE_LEGACY_JSON_FORMAT);
         
options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST);
+        
options.add(MySqlSourceOptions.SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED);
         return options;
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java
index 34cd5803a..7ec84611a 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java
@@ -2334,4 +2334,106 @@ class MySqlConnectorITCase extends MySqlSourceTestBase {
         assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, 
expected.length));
         result.getJobClient().get().cancel().get();
     }
+
+    @ParameterizedTest(name = "incrementalSnapshot = {0}")
+    @ValueSource(booleans = {true, false})
+    public void testReadChangelogAppendOnly(boolean incrementalSnapshot) 
throws Exception {
+        setup(incrementalSnapshot);
+        userDatabase1.createAndInitialize();
+        String sourceDDL =
+                String.format(
+                        "CREATE TABLE mysql_users ("
+                                + " db_name STRING METADATA FROM 
'database_name' VIRTUAL,"
+                                + " table_name STRING METADATA VIRTUAL,"
+                                + " row_kind STRING METADATA FROM 'row_kind' 
VIRTUAL,"
+                                + " `id` DECIMAL(20, 0) NOT NULL,"
+                                + " name STRING,"
+                                + " address STRING,"
+                                + " phone_number STRING,"
+                                + " email STRING,"
+                                + " age INT,"
+                                + " primary key (`id`) not enforced"
+                                + ") WITH ("
+                                + " 'connector' = 'mysql-cdc',"
+                                + " 'hostname' = '%s',"
+                                + " 'port' = '%s',"
+                                + " 'username' = '%s',"
+                                + " 'password' = '%s',"
+                                + " 'database-name' = '%s',"
+                                + " 'table-name' = '%s',"
+                                + " 'scan.incremental.snapshot.enabled' = 
'%s',"
+                                + " 'server-id' = '%s',"
+                                + " 'server-time-zone' = 'UTC',"
+                                + " 'scan.incremental.snapshot.chunk.size' = 
'%s',"
+                                + " 
'scan.read-changelog-as-append-only.enabled' = 'true'"
+                                + ")",
+                        MYSQL_CONTAINER.getHost(),
+                        MYSQL_CONTAINER.getDatabasePort(),
+                        userDatabase1.getUsername(),
+                        userDatabase1.getPassword(),
+                        userDatabase1.getDatabaseName(),
+                        "user_table_.*",
+                        incrementalSnapshot,
+                        getServerId(incrementalSnapshot),
+                        getSplitSize(incrementalSnapshot));
+
+        String sinkDDL =
+                "CREATE TABLE sink ("
+                        + " database_name STRING,"
+                        + " table_name STRING,"
+                        + " row_kind STRING,"
+                        + " `id` DECIMAL(20, 0) NOT NULL,"
+                        + " name STRING,"
+                        + " address STRING,"
+                        + " phone_number STRING,"
+                        + " email STRING,"
+                        + " age INT,"
+                        + " primary key (database_name, table_name, id) not 
enforced"
+                        + ") WITH ("
+                        + " 'connector' = 'values',"
+                        + " 'sink-insert-only' = 'false'"
+                        + ")";
+        tEnv.executeSql(sourceDDL);
+        tEnv.executeSql(sinkDDL);
+
+        // async submit job
+        TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM 
mysql_users");
+
+        // wait for snapshot finished and begin binlog
+        waitForSinkSize("sink", 2);
+
+        try (Connection connection = userDatabase1.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+
+            statement.execute(
+                    "INSERT INTO user_table_1_2 VALUES 
(200,'user_200','Wuhan',123567891234);");
+            statement.execute(
+                    "INSERT INTO user_table_1_1 VALUES 
(300,'user_300','Hangzhou',123567891234, '[email protected]');");
+            statement.execute("UPDATE user_table_1_1 SET address='Beijing' 
WHERE id=300;");
+            statement.execute("UPDATE user_table_1_2 SET phone_number=88888888 
WHERE id=121;");
+            statement.execute("DELETE FROM user_table_1_1 WHERE id=111;");
+        }
+
+        // waiting for binlog finished (5 more events)
+        waitForSinkSize("sink", 7);
+
+        List<String> expected =
+                Stream.of(
+                                "+I[%s, user_table_1_1, +I, 111, user_111, 
Shanghai, 123567891234, [email protected], null]",
+                                "+I[%s, user_table_1_2, +I, 121, user_121, 
Shanghai, 123567891234, null, null]",
+                                "+I[%s, user_table_1_2, +I, 200, user_200, 
Wuhan, 123567891234, null, null]",
+                                "+I[%s, user_table_1_1, +I, 300, user_300, 
Hangzhou, 123567891234, [email protected], null]",
+                                "+I[%s, user_table_1_1, +U, 300, user_300, 
Beijing, 123567891234, [email protected], null]",
+                                "+I[%s, user_table_1_2, +U, 121, user_121, 
Shanghai, 88888888, null, null]",
+                                "+I[%s, user_table_1_1, -D, 111, user_111, 
Shanghai, 123567891234, [email protected], null]",
+                                "+I[%s, user_table_1_1, -U, 300, user_300, 
Hangzhou, 123567891234, [email protected], null]",
+                                "+I[%s, user_table_1_2, -U, 121, user_121, 
Shanghai, 123567891234, null, null]")
+                        .map(s -> String.format(s, 
userDatabase1.getDatabaseName()))
+                        .sorted()
+                        .collect(Collectors.toList());
+
+        List<String> actual = 
TestValuesTableFactory.getRawResultsAsStrings("sink");
+        
Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);
+        result.getJobClient().get().cancel().get();
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
index 4dbb6dc21..4f249335f 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
@@ -128,7 +128,8 @@ class MySqlTableSourceFactoryTest {
                         SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
                         PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
                         USE_LEGACY_JSON_FORMAT.defaultValue(),
-                        
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue());
+                        
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
+                        false);
         Assertions.assertThat(actualSource).isEqualTo(expectedSource);
     }
 
@@ -177,7 +178,8 @@ class MySqlTableSourceFactoryTest {
                         SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
                         PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
                         USE_LEGACY_JSON_FORMAT.defaultValue(),
-                        
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue());
+                        
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
+                        false);
         Assertions.assertThat(actualSource).isEqualTo(expectedSource);
     }
 
@@ -222,7 +224,8 @@ class MySqlTableSourceFactoryTest {
                         SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
                         PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
                         USE_LEGACY_JSON_FORMAT.defaultValue(),
-                        
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue());
+                        
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
+                        false);
         Assertions.assertThat(actualSource).isEqualTo(expectedSource);
     }
 
@@ -265,7 +268,8 @@ class MySqlTableSourceFactoryTest {
                         SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
                         PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
                         USE_LEGACY_JSON_FORMAT.defaultValue(),
-                        
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue());
+                        
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
+                        false);
         Assertions.assertThat(actualSource).isEqualTo(expectedSource);
     }
 
@@ -325,7 +329,8 @@ class MySqlTableSourceFactoryTest {
                         true,
                         PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
                         true,
-                        
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue());
+                        
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
+                        false);
         Assertions.assertThat(actualSource)
                 .isEqualTo(expectedSource)
                 .isInstanceOf(MySqlTableSource.class);
@@ -383,7 +388,8 @@ class MySqlTableSourceFactoryTest {
                         SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
                         PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
                         USE_LEGACY_JSON_FORMAT.defaultValue(),
-                        
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue());
+                        
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
+                        false);
         Assertions.assertThat(actualSource).isEqualTo(expectedSource);
     }
 
@@ -424,7 +430,8 @@ class MySqlTableSourceFactoryTest {
                         SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
                         PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
                         USE_LEGACY_JSON_FORMAT.defaultValue(),
-                        
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue());
+                        
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
+                        false);
         Assertions.assertThat(actualSource).isEqualTo(expectedSource);
     }
 
@@ -466,7 +473,8 @@ class MySqlTableSourceFactoryTest {
                         SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
                         PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
                         USE_LEGACY_JSON_FORMAT.defaultValue(),
-                        
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue());
+                        
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
+                        false);
         Assertions.assertThat(actualSource).isEqualTo(expectedSource);
     }
 
@@ -509,7 +517,8 @@ class MySqlTableSourceFactoryTest {
                         SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
                         PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
                         USE_LEGACY_JSON_FORMAT.defaultValue(),
-                        
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue());
+                        
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
+                        false);
         Assertions.assertThat(actualSource).isEqualTo(expectedSource);
     }
 
@@ -550,7 +559,8 @@ class MySqlTableSourceFactoryTest {
                         SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
                         PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
                         USE_LEGACY_JSON_FORMAT.defaultValue(),
-                        
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue());
+                        
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
+                        false);
         Assertions.assertThat(actualSource).isEqualTo(expectedSource);
     }
 
@@ -596,7 +606,8 @@ class MySqlTableSourceFactoryTest {
                         SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
                         PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
                         USE_LEGACY_JSON_FORMAT.defaultValue(),
-                        
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue());
+                        
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
+                        false);
         expectedSource.producedDataType = 
SCHEMA_WITH_METADATA.toSourceRowDataType();
         expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name");
 
@@ -798,7 +809,8 @@ class MySqlTableSourceFactoryTest {
                         SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
                         true,
                         true,
-                        true);
+                        true,
+                        false);
         Assertions.assertThat(actualSource).isEqualTo(expectedSource);
     }
 

Reply via email to