This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new 2e4abdb68 [FLINK-36683][cdc-connector/mongo] Support metadata 'row_kind' virtual column 2e4abdb68 is described below commit 2e4abdb68ed960f09610cb8be332b770a07ba53e Author: Runkang He <hrun...@gmail.com> AuthorDate: Mon Apr 7 17:04:29 2025 +0800 [FLINK-36683][cdc-connector/mongo] Support metadata 'row_kind' virtual column This closes #3705 --- .../docs/connectors/flink-sources/mongodb-cdc.md | 41 +++--- .../docs/connectors/flink-sources/mongodb-cdc.md | 24 ++-- .../mongodb/table/MongoDBReadableMetadata.java | 24 ++++ .../mongodb/source/MongoDBFullChangelogITCase.java | 147 +++++++++++++++++++++ .../mongodb/table/MongoDBTableFactoryTest.java | 7 +- 5 files changed, 215 insertions(+), 28 deletions(-) diff --git a/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md b/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md index 5f12524a8..f52b3be98 100644 --- a/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md +++ b/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md @@ -296,6 +296,17 @@ MongoDB 的更改事件记录在消息之前没有更新。因此,我们只能 <td>Boolean</td> <td>是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。</td> </tr> + <tr> + <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td> + <td>optional</td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td> + 快照读取阶段是否先分配 UnboundedChunk。<br> + 这有助于降低 TaskManager 在快照阶段同步最后一个chunk时遇到内存溢出 (OOM) 的风险。<br> + 这是一项实验特性,默认为 false。 + </td> + </tr> </tbody> </table> </div> @@ -333,15 +344,10 @@ MongoDB 的更改事件记录在消息之前没有更新。因此,我们只能 <td>它指示在数据库中进行更改的时间。 <br>如果记录是从表的快照而不是改变流中读取的,该值将始终为0。</td> </tr> <tr> - <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td> - <td>optional</td> - <td style="word-wrap: break-word;">false</td> - <td>Boolean</td> - <td> - 快照读取阶段是否先分配 UnboundedChunk。<br> - 这有助于降低 TaskManager 在快照阶段同步最后一个chunk时遇到内存溢出 (OOM) 的风险。<br> - 这是一项实验特性,默认为 false。 - </td> + <td>row_kind</td> + <td>STRING NOT NULL</td> + <td>当前记录对应的 changelog 类型。注意:当 Source 算子选择为每条记录输出 row_kind 字段后,下游 SQL 算子在处理消息撤回时会因为这个字段不同而比对失败, +建议只在简单的同步作业中引用该元数据列。<br>'+I' 表示 INSERT 数据,'-D' 表示 DELETE 数据,'-U' 表示 UPDATE_BEFORE 数据,'+U' 表示 UPDATE_AFTER 数据。</td> </tr> </tbody> </table> @@ -349,15 +355,16 @@ MongoDB 的更改事件记录在消息之前没有更新。因此,我们只能 扩展的 CREATE TABLE 示例演示了用于公开这些元数据字段的语法: ```sql CREATE TABLE products ( - db_name STRING METADATA FROM 'database_name' VIRTUAL, + db_name STRING METADATA FROM 'database_name' VIRTUAL, collection_name STRING METADATA FROM 'collection_name' VIRTUAL, - operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, - _id STRING, // 必须声明 - name STRING, - weight DECIMAL(10,3), - tags ARRAY<STRING>, -- array - price ROW<amount DECIMAL(10,2), currency STRING>, -- 嵌入式文档 - suppliers ARRAY<ROW<name STRING, address STRING>>, -- 嵌入式文档 + operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, + operation STRING METADATA FROM 'row_kind' VIRTUAL, + _id STRING, // 必须声明 + name STRING, + weight DECIMAL(10,3), + tags ARRAY<STRING>, -- array + price ROW<amount DECIMAL(10,2), currency STRING>, -- 嵌入式文档 + suppliers ARRAY<ROW<name STRING, address STRING>>, -- 嵌入式文档 PRIMARY KEY(_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb-cdc', diff --git a/docs/content/docs/connectors/flink-sources/mongodb-cdc.md b/docs/content/docs/connectors/flink-sources/mongodb-cdc.md index aebbbf08d..c4fb00af7 100644 --- a/docs/content/docs/connectors/flink-sources/mongodb-cdc.md +++ b/docs/content/docs/connectors/flink-sources/mongodb-cdc.md @@ -368,21 +368,29 @@ The following format metadata can be exposed as read-only (VIRTUAL) columns in a <td>TIMESTAMP_LTZ(3) NOT NULL</td> <td>It indicates the time that the change was made in the database. <br>If the record is read from snapshot of the table instead of the change stream, the value is always 0.</td> </tr> + <tr> + <td>row_kind</td> + <td>STRING NOT NULL</td> + <td>It indicates the row kind of the changelog,Note: The downstream SQL operator may fail to compare due to this new added column when processing the row retraction if +the source operator chooses to output the 'row_kind' column for each record. It is recommended to use this metadata column only in simple synchronization jobs. +<br>'+I' means INSERT message, '-D' means DELETE message, '-U' means UPDATE_BEFORE message and '+U' means UPDATE_AFTER message.</td> + </tr> </tbody> </table> The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields: ```sql CREATE TABLE products ( - db_name STRING METADATA FROM 'database_name' VIRTUAL, + db_name STRING METADATA FROM 'database_name' VIRTUAL, collection_name STRING METADATA FROM 'collection_name' VIRTUAL, - operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, - _id STRING, // must be declared - name STRING, - weight DECIMAL(10,3), - tags ARRAY<STRING>, -- array - price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document - suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents + operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, + operation STRING METADATA FROM 'row_kind' VIRTUAL, + _id STRING, // must be declared + name STRING, + weight DECIMAL(10,3), + tags ARRAY<STRING>, -- array + price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document + suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents PRIMARY KEY(_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb-cdc', diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBReadableMetadata.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBReadableMetadata.java index c2baf021c..581873d20 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBReadableMetadata.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBReadableMetadata.java @@ -19,7 +19,9 @@ package org.apache.flink.cdc.connectors.mongodb.table; import org.apache.flink.cdc.connectors.mongodb.internal.MongoDBEnvelope; import org.apache.flink.cdc.debezium.table.MetadataConverter; +import org.apache.flink.cdc.debezium.table.RowDataMetadataConverter; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.DataType; @@ -81,6 +83,28 @@ public enum MongoDBReadableMetadata { return TimestampData.fromEpochMillis( (Long) source.get(AbstractSourceInfo.TIMESTAMP_KEY)); } + }), + + /** + * It indicates the row kind of the changelog. '+I' means INSERT message, '-D' means DELETE + * message, '-U' means UPDATE_BEFORE message and '+U' means UPDATE_AFTER message + */ + ROW_KIND( + "row_kind", + DataTypes.STRING().notNull(), + new RowDataMetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(RowData rowData) { + return StringData.fromString(rowData.getRowKind().shortString()); + } + + @Override + public Object read(SourceRecord record) { + throw new UnsupportedOperationException( + "Please call read(RowData rowData) method instead."); + } }); private final String key; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java index 7500a64ae..257f7ce1f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java @@ -56,6 +56,7 @@ import java.util.List; import java.util.Random; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBAssertUtils.assertEqualsInAnyOrder; import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER; @@ -541,6 +542,152 @@ class MongoDBFullChangelogITCase extends MongoDBSourceTestBase { return records; } + @ParameterizedTest(name = "parallelismSnapshot: {0}") + @ValueSource(booleans = {true, false}) + public void testMetadataColumns(boolean parallelismSnapshot) throws Exception { + testMongoDBParallelSourceWithMetadataColumns( + DEFAULT_PARALLELISM, new String[] {"customers"}, true, parallelismSnapshot); + } + + private void testMongoDBParallelSourceWithMetadataColumns( + int parallelism, + String[] captureCustomerCollections, + boolean skipSnapshotBackfill, + boolean parallelismSnapshot) + throws Exception { + String customerDatabase = + "customer_" + Integer.toUnsignedString(new Random().nextInt(), 36); + + // A - enable system-level fulldoc pre & post image feature + MONGO_CONTAINER.executeCommand( + "use admin; db.runCommand({ setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 'off' } } } })"); + + // B - enable collection-level fulldoc pre & post image for change capture collection + for (String collectionName : captureCustomerCollections) { + MONGO_CONTAINER.executeCommandInDatabase( + String.format( + "db.createCollection('%s'); db.runCommand({ collMod: '%s', changeStreamPreAndPostImages: { enabled: true } })", + collectionName, collectionName), + customerDatabase); + } + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + env.setParallelism(parallelism); + env.enableCheckpointing(200L); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); + + String sourceDDL = + String.format( + "CREATE TABLE customers (" + + " _id STRING NOT NULL," + + " cid BIGINT NOT NULL," + + " name STRING," + + " address STRING," + + " phone_number STRING," + + " database_name STRING METADATA VIRTUAL," + + " collection_name STRING METADATA VIRTUAL," + + " row_kind STRING METADATA VIRTUAL," + + " primary key (_id) not enforced" + + ") WITH (" + + " 'connector' = 'mongodb-cdc'," + + " 'scan.incremental.snapshot.enabled' = '%s'," + + " 'hosts' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database' = '%s'," + + " 'collection' = '%s'," + + " 'heartbeat.interval.ms' = '500'," + + " 'scan.full-changelog' = 'true'," + + " 'scan.incremental.snapshot.backfill.skip' = '%s'" + + ")", + parallelismSnapshot ? "true" : "false", + MONGO_CONTAINER.getHostAndPort(), + FLINK_USER, + FLINK_USER_PASSWORD, + customerDatabase, + getCollectionNameRegex(customerDatabase, captureCustomerCollections), + skipSnapshotBackfill); + + MONGO_CONTAINER.executeCommandFileInDatabase("customer", customerDatabase); + + // first step: check the snapshot data + List<String> snapshotForSingleTable = + Stream.of( + "+I[%s, %s, +I, 101, user_1, Shanghai, 123567891234]", + "+I[%s, %s, +I, 102, user_2, Shanghai, 123567891234]", + "+I[%s, %s, +I, 103, user_3, Shanghai, 123567891234]", + "+I[%s, %s, +I, 109, user_4, Shanghai, 123567891234]", + "+I[%s, %s, +I, 110, user_5, Shanghai, 123567891234]", + "+I[%s, %s, +I, 111, user_6, Shanghai, 123567891234]", + "+I[%s, %s, +I, 118, user_7, Shanghai, 123567891234]", + "+I[%s, %s, +I, 121, user_8, Shanghai, 123567891234]", + "+I[%s, %s, +I, 123, user_9, Shanghai, 123567891234]", + "+I[%s, %s, +I, 1009, user_10, Shanghai, 123567891234]", + "+I[%s, %s, +I, 1010, user_11, Shanghai, 123567891234]", + "+I[%s, %s, +I, 1011, user_12, Shanghai, 123567891234]", + "+I[%s, %s, +I, 1012, user_13, Shanghai, 123567891234]", + "+I[%s, %s, +I, 1013, user_14, Shanghai, 123567891234]", + "+I[%s, %s, +I, 1014, user_15, Shanghai, 123567891234]", + "+I[%s, %s, +I, 1015, user_16, Shanghai, 123567891234]", + "+I[%s, %s, +I, 1016, user_17, Shanghai, 123567891234]", + "+I[%s, %s, +I, 1017, user_18, Shanghai, 123567891234]", + "+I[%s, %s, +I, 1018, user_19, Shanghai, 123567891234]", + "+I[%s, %s, +I, 1019, user_20, Shanghai, 123567891234]", + "+I[%s, %s, +I, 2000, user_21, Shanghai, 123567891234]") + .map(s -> String.format(s, customerDatabase, captureCustomerCollections[0])) + .collect(Collectors.toList()); + + tEnv.executeSql(sourceDDL); + TableResult tableResult = + tEnv.executeSql( + "select database_name, collection_name, row_kind, " + + "cid, name, address, phone_number from customers"); + CloseableIterator<Row> iterator = tableResult.collect(); + JobID jobId = tableResult.getJobClient().get().getJobID(); + List<String> expectedSnapshotData = new ArrayList<>(); + for (int i = 0; i < captureCustomerCollections.length; i++) { + expectedSnapshotData.addAll(snapshotForSingleTable); + } + + assertEqualsInAnyOrder( + expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size())); + + // second step: check the change stream data + for (String collectionName : captureCustomerCollections) { + makeFirstPartChangeStreamEvents( + mongodbClient.getDatabase(customerDatabase), collectionName); + } + for (String collectionName : captureCustomerCollections) { + makeSecondPartChangeStreamEvents( + mongodbClient.getDatabase(customerDatabase), collectionName); + } + + List<String> changeEventsForSingleTable = + Stream.of( + "-U[%s, %s, -U, 101, user_1, Shanghai, 123567891234]", + "+U[%s, %s, +U, 101, user_1, Hangzhou, 123567891234]", + "-D[%s, %s, -D, 102, user_2, Shanghai, 123567891234]", + "+I[%s, %s, +I, 102, user_2, Shanghai, 123567891234]", + "-U[%s, %s, -U, 103, user_3, Shanghai, 123567891234]", + "+U[%s, %s, +U, 103, user_3, Hangzhou, 123567891234]", + "-U[%s, %s, -U, 1010, user_11, Shanghai, 123567891234]", + "+U[%s, %s, +U, 1010, user_11, Hangzhou, 123567891234]", + "+I[%s, %s, +I, 2001, user_22, Shanghai, 123567891234]", + "+I[%s, %s, +I, 2002, user_23, Shanghai, 123567891234]", + "+I[%s, %s, +I, 2003, user_24, Shanghai, 123567891234]") + .map(s -> String.format(s, customerDatabase, captureCustomerCollections[0])) + .collect(Collectors.toList()); + List<String> expectedChangeStreamData = new ArrayList<>(); + for (int i = 0; i < captureCustomerCollections.length; i++) { + expectedChangeStreamData.addAll(changeEventsForSingleTable); + } + List<String> actualChangeStreamData = fetchRows(iterator, expectedChangeStreamData.size()); + assertEqualsInAnyOrder(expectedChangeStreamData, actualChangeStreamData); + tableResult.getJobClient().get().cancel().get(); + } + private void testMongoDBParallelSource( MongoDBTestUtils.FailoverType failoverType, MongoDBTestUtils.FailoverPhase failoverPhase, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java index c4613351e..5960fed9f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java @@ -87,7 +87,8 @@ class MongoDBTableFactoryTest { Column.physical("eee", DataTypes.TIMESTAMP(3)), Column.metadata("time", DataTypes.TIMESTAMP_LTZ(3), "op_ts", true), Column.metadata( - "_database_name", DataTypes.STRING(), "database_name", true)), + "_database_name", DataTypes.STRING(), "database_name", true), + Column.metadata("_row_kind", DataTypes.STRING(), "row_kind", true)), Collections.emptyList(), UniqueConstraint.primaryKey("pk", Collections.singletonList("_id"))); @@ -222,7 +223,7 @@ class MongoDBTableFactoryTest { DynamicTableSource actualSource = createTableSource(SCHEMA_WITH_METADATA, properties); MongoDBTableSource mongoDBSource = (MongoDBTableSource) actualSource; mongoDBSource.applyReadableMetadata( - Arrays.asList("op_ts", "database_name"), + Arrays.asList("op_ts", "database_name", "row_kind"), SCHEMA_WITH_METADATA.toSourceRowDataType()); actualSource = mongoDBSource.copy(); @@ -255,7 +256,7 @@ class MongoDBTableFactoryTest { SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue()); expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); - expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name"); + expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name", "row_kind"); Assertions.assertThat(actualSource).isEqualTo(expectedSource);