This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f3970d6188 [Bug][Connector-v2] MongoDB CDC Set SeatunnelRow's tableId 
(#7935)
f3970d6188 is described below

commit f3970d6188780d70ba17f0321dc2c6e092820969
Author: zhangdonghao <[email protected]>
AuthorDate: Wed Nov 6 10:52:49 2024 +0800

    [Bug][Connector-v2] MongoDB CDC Set SeatunnelRow's tableId (#7935)
---
 docs/en/connector-v2/source/MongoDB-CDC.md         |  82 ++-------------
 .../mongodb/MongodbIncrementalSourceFactory.java   |  32 +++++-
 .../MongoDBConnectorDeserializationSchema.java     |  18 +++-
 .../MongoDBConnectorDeserializationSchemaTest.java | 113 +++++++++++++++++++++
 4 files changed, 169 insertions(+), 76 deletions(-)

diff --git a/docs/en/connector-v2/source/MongoDB-CDC.md 
b/docs/en/connector-v2/source/MongoDB-CDC.md
index 301d707573..d7e6c7e440 100644
--- a/docs/en/connector-v2/source/MongoDB-CDC.md
+++ b/docs/en/connector-v2/source/MongoDB-CDC.md
@@ -105,13 +105,14 @@ For specific types in MongoDB, we use Extended JSON 
format to map them to Seatun
 
 ## Source Options
 
-|                Name                |  Type  | Required | Default |           
                                                                                
                                      Description                               
                                                                                
                  |
+| Name                               | Type   | Required | Default | 
Description                                                                     
                                                                                
                                                                                
                            |
 
|------------------------------------|--------|----------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
 | hosts                              | String | Yes      | -       | The 
comma-separated list of hostname and port pairs of the MongoDB servers. eg. 
`localhost:27017,localhost:27018`                                               
                                                                                
                            |
 | username                           | String | No       | -       | Name of 
the database user to be used when connecting to MongoDB.                        
                                                                                
                                                                                
                    |
 | password                           | String | No       | -       | Password 
to be used when connecting to MongoDB.                                          
                                                                                
                                                                                
                   |
 | database                           | List   | Yes      | -       | Name of 
the database to watch for changes. If not set then all databases will be 
captured. The database also supports regular expressions to monitor multiple 
databases matching the regular expression. eg. `db1,db2`.                       
                              |
 | collection                         | List   | Yes      | -       | Name of 
the collection in the database to watch for changes. If not set then all 
collections will be captured. The collection also supports regular expressions 
to monitor multiple collections matching fully-qualified collection 
identifiers. eg. `db1.coll1,db2.coll2`. |
+| schema                             |        | yes      | -       | The 
structure of the data, including field names and field types.                   
                                                                                
                                                                                
                        |
 | connection.options                 | String | No       | -       | The 
ampersand-separated connection options of MongoDB.  eg. 
`replicaSet=test&connectTimeoutMS=300000`.                                      
                                                                                
                                                |
 | batch.size                         | Long   | No       | 1024    | The 
cursor batch size.                                                              
                                                                                
                                                                                
                        |
 | poll.max.batch.size                | Enum   | No       | 1024    | Maximum 
number of change stream documents to include in a single batch when polling for 
new data.                                                                       
                                                                                
                    |
@@ -185,6 +186,14 @@ source {
     collection = ["inventory.products"]
     username = stuser
     password = stpw
+    schema = {
+      fields {
+        "_id" : string,
+        "name" : string,
+        "description" : string,
+        "weight" : string
+      }
+    }
   }
 }
 
@@ -204,76 +213,6 @@ sink {
 }
 ```
 
-## Multi-table Synchronization
-
-The following example demonstrates how to create a data synchronization job 
that read the cdc data of multiple library tables mongodb and prints it on the 
local client:
-
-```hocon
-env {
-  # You can set engine configuration here
-  parallelism = 1
-  job.mode = "STREAMING"
-  checkpoint.interval = 5000
-}
-
-source {
-  MongoDB-CDC {
-    hosts = "mongo0:27017"
-    database = ["inventory","crm"]
-    collection = ["inventory.products","crm.test"]
-    username = stuser
-    password = stpw
-  }
-}
-
-# Console printing of the read Mongodb data
-sink {
-  Console {
-    parallelism = 1
-  }
-}
-```
-
-### Tips:
-
-> 1.The cdc synchronization of multiple library tables cannot specify the 
schema, and can only output json data downstream.
-> This is because MongoDB does not provide metadata information for querying, 
so if you want to support multiple tables, all tables can only be read as one 
structure.
-
-## Regular Expression Matching for Multiple Tables
-
-The following example demonstrates how to create a data synchronization job 
that through regular expression read the data of multiple library tables 
mongodb and prints it on the local client:
-
-| Matching example | Expressions |   |                                        
Describe                                        |
-|------------------|-------------|---|----------------------------------------------------------------------------------------|
-| Prefix matching  | ^(test).*   |   | Match the database name or table name 
with the prefix test, such as test1, test2, etc. |
-| Suffix matching  | .*[p$]      |   | Match the database name or table name 
with the suffix p, such as cdcp, edcp, etc.      |
-
-```hocon
-env {
-  # You can set engine configuration here
-  parallelism = 1
-  job.mode = "STREAMING"
-  checkpoint.interval = 5000
-}
-
-source {
-  MongoDB-CDC {
-    hosts = "mongo0:27017"
-    # So this example is used 
(^(test).*|^(tpc).*|txc|.*[p$]|t{2}).(t[5-8]|tt),matching txc.tt、test2.test5.
-    database = ["(^(test).*|^(tpc).*|txc|.*[p$]|t{2})"]
-    collection = ["(t[5-8]|tt)"]
-    username = stuser
-    password = stpw
-  }
-}
-
-# Console printing of the read Mongodb data
-sink {
-  Console {
-    parallelism = 1
-  }
-}
-```
 
 ## Format of real-time streaming data
 
@@ -309,4 +248,3 @@ sink {
    }
 }
 ```
-
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java
index ede71f0f79..03b3e1c9ba 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java
@@ -22,6 +22,9 @@ import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
 import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
@@ -31,11 +34,16 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
 import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.exception.MongodbConnectorException;
 
 import com.google.auto.service.AutoService;
 
 import java.io.Serializable;
 import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT;
 
 @AutoService(Factory.class)
 public class MongodbIncrementalSourceFactory implements TableSourceFactory {
@@ -50,7 +58,8 @@ public class MongodbIncrementalSourceFactory implements 
TableSourceFactory {
                 .required(
                         MongodbSourceOptions.HOSTS,
                         MongodbSourceOptions.DATABASE,
-                        MongodbSourceOptions.COLLECTION)
+                        MongodbSourceOptions.COLLECTION,
+                        TableSchemaOptions.SCHEMA)
                 .optional(
                         MongodbSourceOptions.USERNAME,
                         MongodbSourceOptions.PASSWORD,
@@ -79,9 +88,28 @@ public class MongodbIncrementalSourceFactory implements 
TableSourceFactory {
     public <T, SplitT extends SourceSplit, StateT extends Serializable>
             TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
         return () -> {
-            List<CatalogTable> catalogTables =
+            List<CatalogTable> configCatalog =
                     CatalogTableUtil.getCatalogTables(
                             context.getOptions(), context.getClassLoader());
+            List<String> collections = 
context.getOptions().get(MongodbSourceOptions.COLLECTION);
+            if (collections.size() != configCatalog.size()) {
+                throw new MongodbConnectorException(
+                        ILLEGAL_ARGUMENT,
+                        "The number of collections must be equal to the number 
of schema tables");
+            }
+            List<CatalogTable> catalogTables =
+                    IntStream.range(0, configCatalog.size())
+                            .mapToObj(
+                                    i -> {
+                                        CatalogTable catalogTable = 
configCatalog.get(i);
+                                        String fullName = collections.get(i);
+                                        TableIdentifier tableIdentifier =
+                                                TableIdentifier.of(
+                                                        
catalogTable.getCatalogName(),
+                                                        
TablePath.of(fullName));
+                                        return 
CatalogTable.of(tableIdentifier, catalogTable);
+                                    })
+                            .collect(Collectors.toList());
             SeaTunnelDataType<SeaTunnelRow> dataType =
                     CatalogTableUtil.convertToMultipleRowType(catalogTables);
             return (SeaTunnelSource<T, SplitT, StateT>)
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
index 8ce920e841..4811217cf4 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
@@ -17,7 +17,10 @@
 
 package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.sender;
 
+import 
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
+
 import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.MapType;
@@ -62,10 +65,13 @@ import java.util.Objects;
 import static 
org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT;
 import static 
org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE;
 import static 
org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION;
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.COLL_FIELD;
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DB_FIELD;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DEFAULT_JSON_WRITER_SETTINGS;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DOCUMENT_KEY;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.ENCODE_VALUE_FIELD;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.FULL_DOCUMENT;
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.NS_FIELD;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.extractBsonDocument;
 import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
 
@@ -169,8 +175,16 @@ public class MongoDBConnectorDeserializationSchema
     }
 
     private String extractTableId(SourceRecord record) {
-        // TODO extract table id from record
-        return null;
+        Struct messageStruct = (Struct) record.value();
+        Struct nsStruct = (Struct) messageStruct.get(NS_FIELD);
+        String databaseName = nsStruct.getString(DB_FIELD);
+        String tableName = nsStruct.getString(COLL_FIELD);
+        return TablePath.of(databaseName, null, tableName).toString();
+    }
+
+    @VisibleForTesting
+    public String extractTableIdForTest(SourceRecord record) {
+        return extractTableId(record);
     }
 
     // 
-------------------------------------------------------------------------------------
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/test/java/mongodb/sender/MongoDBConnectorDeserializationSchemaTest.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/test/java/mongodb/sender/MongoDBConnectorDeserializationSchemaTest.java
new file mode 100644
index 0000000000..7fba3213f6
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/test/java/mongodb/sender/MongoDBConnectorDeserializationSchemaTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mongodb.sender;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.sender.MongoDBConnectorDeserializationSchema;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils;
+
+import org.apache.kafka.connect.source.SourceRecord;
+
+import org.bson.BsonDocument;
+import org.bson.BsonInt64;
+import org.bson.BsonString;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.COLL_FIELD;
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DB_FIELD;
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DOCUMENT_KEY;
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.FULL_DOCUMENT;
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.ID_FIELD;
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.NS_FIELD;
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.OPERATION_TYPE;
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.OPERATION_TYPE_INSERT;
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.SNAPSHOT_FIELD;
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.SNAPSHOT_TRUE;
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.SOURCE_FIELD;
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.TS_MS_FIELD;
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.createSourceOffsetMap;
+
+public class MongoDBConnectorDeserializationSchemaTest {
+
+    @Test
+    public void extractTableId() {
+        CatalogTable catalogTable =
+                CatalogTable.of(
+                        TableIdentifier.of("catalog", "database", "table"),
+                        TableSchema.builder()
+                                .column(
+                                        PhysicalColumn.of(
+                                                "name1", 
BasicType.STRING_TYPE, 1L, true, null, ""))
+                                .column(
+                                        PhysicalColumn.of(
+                                                "name1", 
BasicType.STRING_TYPE, 1L, true, null, ""))
+                                .build(),
+                        Collections.emptyMap(),
+                        Collections.emptyList(),
+                        "comment");
+        SeaTunnelDataType<SeaTunnelRow> dataType =
+                
CatalogTableUtil.convertToMultipleRowType(Collections.singletonList(catalogTable));
+        MongoDBConnectorDeserializationSchema schema =
+                new MongoDBConnectorDeserializationSchema(dataType, dataType);
+
+        // Build SourceRecord
+        Map<String, String> partitionMap =
+                MongodbRecordUtils.createPartitionMap("localhost:27017", 
"inventory", "products");
+
+        BsonDocument valueDocument =
+                new BsonDocument()
+                        .append(
+                                ID_FIELD,
+                                new BsonDocument(ID_FIELD, new 
BsonInt64(10000000000001L)))
+                        .append(OPERATION_TYPE, new 
BsonString(OPERATION_TYPE_INSERT))
+                        .append(
+                                NS_FIELD,
+                                new BsonDocument(DB_FIELD, new 
BsonString("inventory"))
+                                        .append(COLL_FIELD, new 
BsonString("products")))
+                        .append(
+                                DOCUMENT_KEY,
+                                new BsonDocument(ID_FIELD, new 
BsonInt64(10000000000001L)))
+                        .append(FULL_DOCUMENT, new BsonDocument())
+                        .append(TS_MS_FIELD, new 
BsonInt64(System.currentTimeMillis()))
+                        .append(
+                                SOURCE_FIELD,
+                                new BsonDocument(SNAPSHOT_FIELD, new 
BsonString(SNAPSHOT_TRUE))
+                                        .append(TS_MS_FIELD, new 
BsonInt64(0L)));
+        BsonDocument keyDocument = new BsonDocument(ID_FIELD, 
valueDocument.get(ID_FIELD));
+        SourceRecord sourceRecord =
+                MongodbRecordUtils.buildSourceRecord(
+                        partitionMap,
+                        
createSourceOffsetMap(keyDocument.getDocument(ID_FIELD), true),
+                        "inventory.products",
+                        keyDocument,
+                        valueDocument);
+        Object tableId = schema.extractTableIdForTest(sourceRecord);
+        Assertions.assertEquals("inventory.products", tableId);
+    }
+}

Reply via email to