This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e4abdb68 [FLINK-36683][cdc-connector/mongo] Support metadata 
'row_kind' virtual column
2e4abdb68 is described below

commit 2e4abdb68ed960f09610cb8be332b770a07ba53e
Author: Runkang He <hrun...@gmail.com>
AuthorDate: Mon Apr 7 17:04:29 2025 +0800

    [FLINK-36683][cdc-connector/mongo] Support metadata 'row_kind' virtual 
column
    
    This closes  #3705
---
 .../docs/connectors/flink-sources/mongodb-cdc.md   |  41 +++---
 .../docs/connectors/flink-sources/mongodb-cdc.md   |  24 ++--
 .../mongodb/table/MongoDBReadableMetadata.java     |  24 ++++
 .../mongodb/source/MongoDBFullChangelogITCase.java | 147 +++++++++++++++++++++
 .../mongodb/table/MongoDBTableFactoryTest.java     |   7 +-
 5 files changed, 215 insertions(+), 28 deletions(-)

diff --git a/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md 
b/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md
index 5f12524a8..f52b3be98 100644
--- a/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md
+++ b/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md
@@ -296,6 +296,17 @@ MongoDB 的更改事件记录在消息之前没有更新。因此,我们只能
       <td>Boolean</td>
       <td>是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 
'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 
true。</td>
     </tr>
+    <tr>
+      <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>
+        快照读取阶段是否先分配 UnboundedChunk。<br>
+        这有助于降低 TaskManager 在快照阶段同步最后一个chunk时遇到内存溢出 (OOM) 的风险。<br> 
+        这是一项实验特性,默认为 false。
+      </td>
+    </tr>
     </tbody>
 </table>
 </div>
@@ -333,15 +344,10 @@ MongoDB 的更改事件记录在消息之前没有更新。因此,我们只能
       <td>它指示在数据库中进行更改的时间。 <br>如果记录是从表的快照而不是改变流中读取的,该值将始终为0。</td>
     </tr>
     <tr>
-      <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td>
-      <td>optional</td>
-      <td style="word-wrap: break-word;">false</td>
-      <td>Boolean</td>
-      <td>
-        快照读取阶段是否先分配 UnboundedChunk。<br>
-        这有助于降低 TaskManager 在快照阶段同步最后一个chunk时遇到内存溢出 (OOM) 的风险。<br> 
-        这是一项实验特性,默认为 false。
-      </td>
+      <td>row_kind</td>
+      <td>STRING NOT NULL</td>
+      <td>当前记录对应的 changelog 类型。注意:当 Source 算子选择为每条记录输出 row_kind 字段后,下游 SQL 
算子在处理消息撤回时会因为这个字段不同而比对失败,
+建议只在简单的同步作业中引用该元数据列。<br>'+I' 表示 INSERT 数据,'-D' 表示 DELETE 数据,'-U' 表示 
UPDATE_BEFORE 数据,'+U' 表示 UPDATE_AFTER 数据。</td>
     </tr>
   </tbody>
 </table>
@@ -349,15 +355,16 @@ MongoDB 的更改事件记录在消息之前没有更新。因此,我们只能
 扩展的 CREATE TABLE 示例演示了用于公开这些元数据字段的语法:
 ```sql
 CREATE TABLE products (
-    db_name STRING METADATA FROM 'database_name' VIRTUAL,
+    db_name         STRING METADATA FROM 'database_name' VIRTUAL,
     collection_name STRING METADATA  FROM 'collection_name' VIRTUAL,
-    operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
-    _id STRING, // 必须声明
-    name STRING,
-    weight DECIMAL(10,3),
-    tags ARRAY<STRING>, -- array
-    price ROW<amount DECIMAL(10,2), currency STRING>, -- 嵌入式文档
-    suppliers ARRAY<ROW<name STRING, address STRING>>, -- 嵌入式文档
+    operation_ts    TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
+    operation       STRING METADATA FROM 'row_kind' VIRTUAL,
+    _id             STRING, // 必须声明
+    name            STRING,
+    weight          DECIMAL(10,3),
+    tags            ARRAY<STRING>, -- array
+    price           ROW<amount DECIMAL(10,2), currency STRING>, -- 嵌入式文档
+    suppliers       ARRAY<ROW<name STRING, address STRING>>, -- 嵌入式文档
     PRIMARY KEY(_id) NOT ENFORCED
 ) WITH (
     'connector' = 'mongodb-cdc',
diff --git a/docs/content/docs/connectors/flink-sources/mongodb-cdc.md 
b/docs/content/docs/connectors/flink-sources/mongodb-cdc.md
index aebbbf08d..c4fb00af7 100644
--- a/docs/content/docs/connectors/flink-sources/mongodb-cdc.md
+++ b/docs/content/docs/connectors/flink-sources/mongodb-cdc.md
@@ -368,21 +368,29 @@ The following format metadata can be exposed as read-only 
(VIRTUAL) columns in a
       <td>TIMESTAMP_LTZ(3) NOT NULL</td>
       <td>It indicates the time that the change was made in the database. 
<br>If the record is read from snapshot of the table instead of the change 
stream, the value is always 0.</td>
     </tr>
+    <tr>
+      <td>row_kind</td>
+      <td>STRING NOT NULL</td>
+      <td>It indicates the row kind of the changelog,Note: The downstream SQL 
operator may fail to compare due to this new added column when processing the 
row retraction if 
+the source operator chooses to output the 'row_kind' column for each record. 
It is recommended to use this metadata column only in simple synchronization 
jobs.
+<br>'+I' means INSERT message, '-D' means DELETE message, '-U' means 
UPDATE_BEFORE message and '+U' means UPDATE_AFTER message.</td>
+    </tr>
   </tbody>
 </table>
 
 The extended CREATE TABLE example demonstrates the syntax for exposing these 
metadata fields:
 ```sql
 CREATE TABLE products (
-    db_name STRING METADATA FROM 'database_name' VIRTUAL,
+    db_name         STRING METADATA FROM 'database_name' VIRTUAL,
     collection_name STRING METADATA  FROM 'collection_name' VIRTUAL,
-    operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
-    _id STRING, // must be declared
-    name STRING,
-    weight DECIMAL(10,3),
-    tags ARRAY<STRING>, -- array
-    price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document
-    suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
+    operation_ts    TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
+    operation       STRING METADATA FROM 'row_kind' VIRTUAL,
+    _id             STRING, // must be declared
+    name            STRING,
+    weight          DECIMAL(10,3),
+    tags            ARRAY<STRING>, -- array
+    price           ROW<amount DECIMAL(10,2), currency STRING>, -- embedded 
document
+    suppliers       ARRAY<ROW<name STRING, address STRING>>, -- embedded 
documents
     PRIMARY KEY(_id) NOT ENFORCED
 ) WITH (
     'connector' = 'mongodb-cdc',
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBReadableMetadata.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBReadableMetadata.java
index c2baf021c..581873d20 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBReadableMetadata.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBReadableMetadata.java
@@ -19,7 +19,9 @@ package org.apache.flink.cdc.connectors.mongodb.table;
 
 import org.apache.flink.cdc.connectors.mongodb.internal.MongoDBEnvelope;
 import org.apache.flink.cdc.debezium.table.MetadataConverter;
+import org.apache.flink.cdc.debezium.table.RowDataMetadataConverter;
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.DataType;
@@ -81,6 +83,28 @@ public enum MongoDBReadableMetadata {
                     return TimestampData.fromEpochMillis(
                             (Long) 
source.get(AbstractSourceInfo.TIMESTAMP_KEY));
                 }
+            }),
+
+    /**
+     * It indicates the row kind of the changelog. '+I' means INSERT message, 
'-D' means DELETE
+     * message, '-U' means UPDATE_BEFORE message and '+U' means UPDATE_AFTER 
message
+     */
+    ROW_KIND(
+            "row_kind",
+            DataTypes.STRING().notNull(),
+            new RowDataMetadataConverter() {
+                private static final long serialVersionUID = 1L;
+
+                @Override
+                public Object read(RowData rowData) {
+                    return 
StringData.fromString(rowData.getRowKind().shortString());
+                }
+
+                @Override
+                public Object read(SourceRecord record) {
+                    throw new UnsupportedOperationException(
+                            "Please call read(RowData rowData) method 
instead.");
+                }
             });
 
     private final String key;
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java
index 7500a64ae..257f7ce1f 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java
@@ -56,6 +56,7 @@ import java.util.List;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static 
org.apache.flink.cdc.connectors.mongodb.utils.MongoDBAssertUtils.assertEqualsInAnyOrder;
 import static 
org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER;
@@ -541,6 +542,152 @@ class MongoDBFullChangelogITCase extends 
MongoDBSourceTestBase {
         return records;
     }
 
+    @ParameterizedTest(name = "parallelismSnapshot: {0}")
+    @ValueSource(booleans = {true, false})
+    public void testMetadataColumns(boolean parallelismSnapshot) throws 
Exception {
+        testMongoDBParallelSourceWithMetadataColumns(
+                DEFAULT_PARALLELISM, new String[] {"customers"}, true, 
parallelismSnapshot);
+    }
+
+    private void testMongoDBParallelSourceWithMetadataColumns(
+            int parallelism,
+            String[] captureCustomerCollections,
+            boolean skipSnapshotBackfill,
+            boolean parallelismSnapshot)
+            throws Exception {
+        String customerDatabase =
+                "customer_" + Integer.toUnsignedString(new Random().nextInt(), 
36);
+
+        // A - enable system-level fulldoc pre & post image feature
+        MONGO_CONTAINER.executeCommand(
+                "use admin; db.runCommand({ setClusterParameter: { 
changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 'off' } } } })");
+
+        // B - enable collection-level fulldoc pre & post image for change 
capture collection
+        for (String collectionName : captureCustomerCollections) {
+            MONGO_CONTAINER.executeCommandInDatabase(
+                    String.format(
+                            "db.createCollection('%s'); db.runCommand({ 
collMod: '%s', changeStreamPreAndPostImages: { enabled: true } })",
+                            collectionName, collectionName),
+                    customerDatabase);
+        }
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        env.setParallelism(parallelism);
+        env.enableCheckpointing(200L);
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+
+        String sourceDDL =
+                String.format(
+                        "CREATE TABLE customers ("
+                                + " _id STRING NOT NULL,"
+                                + " cid BIGINT NOT NULL,"
+                                + " name STRING,"
+                                + " address STRING,"
+                                + " phone_number STRING,"
+                                + " database_name STRING METADATA VIRTUAL,"
+                                + " collection_name STRING METADATA VIRTUAL,"
+                                + " row_kind STRING METADATA VIRTUAL,"
+                                + " primary key (_id) not enforced"
+                                + ") WITH ("
+                                + " 'connector' = 'mongodb-cdc',"
+                                + " 'scan.incremental.snapshot.enabled' = 
'%s',"
+                                + " 'hosts' = '%s',"
+                                + " 'username' = '%s',"
+                                + " 'password' = '%s',"
+                                + " 'database' = '%s',"
+                                + " 'collection' = '%s',"
+                                + " 'heartbeat.interval.ms' = '500',"
+                                + " 'scan.full-changelog' = 'true',"
+                                + " 'scan.incremental.snapshot.backfill.skip' 
= '%s'"
+                                + ")",
+                        parallelismSnapshot ? "true" : "false",
+                        MONGO_CONTAINER.getHostAndPort(),
+                        FLINK_USER,
+                        FLINK_USER_PASSWORD,
+                        customerDatabase,
+                        getCollectionNameRegex(customerDatabase, 
captureCustomerCollections),
+                        skipSnapshotBackfill);
+
+        MONGO_CONTAINER.executeCommandFileInDatabase("customer", 
customerDatabase);
+
+        // first step: check the snapshot data
+        List<String> snapshotForSingleTable =
+                Stream.of(
+                                "+I[%s, %s, +I, 101, user_1, Shanghai, 
123567891234]",
+                                "+I[%s, %s, +I, 102, user_2, Shanghai, 
123567891234]",
+                                "+I[%s, %s, +I, 103, user_3, Shanghai, 
123567891234]",
+                                "+I[%s, %s, +I, 109, user_4, Shanghai, 
123567891234]",
+                                "+I[%s, %s, +I, 110, user_5, Shanghai, 
123567891234]",
+                                "+I[%s, %s, +I, 111, user_6, Shanghai, 
123567891234]",
+                                "+I[%s, %s, +I, 118, user_7, Shanghai, 
123567891234]",
+                                "+I[%s, %s, +I, 121, user_8, Shanghai, 
123567891234]",
+                                "+I[%s, %s, +I, 123, user_9, Shanghai, 
123567891234]",
+                                "+I[%s, %s, +I, 1009, user_10, Shanghai, 
123567891234]",
+                                "+I[%s, %s, +I, 1010, user_11, Shanghai, 
123567891234]",
+                                "+I[%s, %s, +I, 1011, user_12, Shanghai, 
123567891234]",
+                                "+I[%s, %s, +I, 1012, user_13, Shanghai, 
123567891234]",
+                                "+I[%s, %s, +I, 1013, user_14, Shanghai, 
123567891234]",
+                                "+I[%s, %s, +I, 1014, user_15, Shanghai, 
123567891234]",
+                                "+I[%s, %s, +I, 1015, user_16, Shanghai, 
123567891234]",
+                                "+I[%s, %s, +I, 1016, user_17, Shanghai, 
123567891234]",
+                                "+I[%s, %s, +I, 1017, user_18, Shanghai, 
123567891234]",
+                                "+I[%s, %s, +I, 1018, user_19, Shanghai, 
123567891234]",
+                                "+I[%s, %s, +I, 1019, user_20, Shanghai, 
123567891234]",
+                                "+I[%s, %s, +I, 2000, user_21, Shanghai, 
123567891234]")
+                        .map(s -> String.format(s, customerDatabase, 
captureCustomerCollections[0]))
+                        .collect(Collectors.toList());
+
+        tEnv.executeSql(sourceDDL);
+        TableResult tableResult =
+                tEnv.executeSql(
+                        "select database_name, collection_name, row_kind, "
+                                + "cid, name, address, phone_number from 
customers");
+        CloseableIterator<Row> iterator = tableResult.collect();
+        JobID jobId = tableResult.getJobClient().get().getJobID();
+        List<String> expectedSnapshotData = new ArrayList<>();
+        for (int i = 0; i < captureCustomerCollections.length; i++) {
+            expectedSnapshotData.addAll(snapshotForSingleTable);
+        }
+
+        assertEqualsInAnyOrder(
+                expectedSnapshotData, fetchRows(iterator, 
expectedSnapshotData.size()));
+
+        // second step: check the change stream data
+        for (String collectionName : captureCustomerCollections) {
+            makeFirstPartChangeStreamEvents(
+                    mongodbClient.getDatabase(customerDatabase), 
collectionName);
+        }
+        for (String collectionName : captureCustomerCollections) {
+            makeSecondPartChangeStreamEvents(
+                    mongodbClient.getDatabase(customerDatabase), 
collectionName);
+        }
+
+        List<String> changeEventsForSingleTable =
+                Stream.of(
+                                "-U[%s, %s, -U, 101, user_1, Shanghai, 
123567891234]",
+                                "+U[%s, %s, +U, 101, user_1, Hangzhou, 
123567891234]",
+                                "-D[%s, %s, -D, 102, user_2, Shanghai, 
123567891234]",
+                                "+I[%s, %s, +I, 102, user_2, Shanghai, 
123567891234]",
+                                "-U[%s, %s, -U, 103, user_3, Shanghai, 
123567891234]",
+                                "+U[%s, %s, +U, 103, user_3, Hangzhou, 
123567891234]",
+                                "-U[%s, %s, -U, 1010, user_11, Shanghai, 
123567891234]",
+                                "+U[%s, %s, +U, 1010, user_11, Hangzhou, 
123567891234]",
+                                "+I[%s, %s, +I, 2001, user_22, Shanghai, 
123567891234]",
+                                "+I[%s, %s, +I, 2002, user_23, Shanghai, 
123567891234]",
+                                "+I[%s, %s, +I, 2003, user_24, Shanghai, 
123567891234]")
+                        .map(s -> String.format(s, customerDatabase, 
captureCustomerCollections[0]))
+                        .collect(Collectors.toList());
+        List<String> expectedChangeStreamData = new ArrayList<>();
+        for (int i = 0; i < captureCustomerCollections.length; i++) {
+            expectedChangeStreamData.addAll(changeEventsForSingleTable);
+        }
+        List<String> actualChangeStreamData = fetchRows(iterator, 
expectedChangeStreamData.size());
+        assertEqualsInAnyOrder(expectedChangeStreamData, 
actualChangeStreamData);
+        tableResult.getJobClient().get().cancel().get();
+    }
+
     private void testMongoDBParallelSource(
             MongoDBTestUtils.FailoverType failoverType,
             MongoDBTestUtils.FailoverPhase failoverPhase,
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java
index c4613351e..5960fed9f 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java
@@ -87,7 +87,8 @@ class MongoDBTableFactoryTest {
                             Column.physical("eee", DataTypes.TIMESTAMP(3)),
                             Column.metadata("time", 
DataTypes.TIMESTAMP_LTZ(3), "op_ts", true),
                             Column.metadata(
-                                    "_database_name", DataTypes.STRING(), 
"database_name", true)),
+                                    "_database_name", DataTypes.STRING(), 
"database_name", true),
+                            Column.metadata("_row_kind", DataTypes.STRING(), 
"row_kind", true)),
                     Collections.emptyList(),
                     UniqueConstraint.primaryKey("pk", 
Collections.singletonList("_id")));
 
@@ -222,7 +223,7 @@ class MongoDBTableFactoryTest {
         DynamicTableSource actualSource = 
createTableSource(SCHEMA_WITH_METADATA, properties);
         MongoDBTableSource mongoDBSource = (MongoDBTableSource) actualSource;
         mongoDBSource.applyReadableMetadata(
-                Arrays.asList("op_ts", "database_name"),
+                Arrays.asList("op_ts", "database_name", "row_kind"),
                 SCHEMA_WITH_METADATA.toSourceRowDataType());
         actualSource = mongoDBSource.copy();
 
@@ -255,7 +256,7 @@ class MongoDBTableFactoryTest {
                         
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue());
 
         expectedSource.producedDataType = 
SCHEMA_WITH_METADATA.toSourceRowDataType();
-        expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name");
+        expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name", 
"row_kind");
 
         Assertions.assertThat(actualSource).isEqualTo(expectedSource);
 

Reply via email to