This is an automated email from the ASF dual-hosted git repository.

zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 49cbaeb9b3 [Feature][Mongodb-CDC] Support multi-table read (#8029)
49cbaeb9b3 is described below

commit 49cbaeb9b3cefb91e3cc4ecbe1772642ca3f52ed
Author: zhangdonghao <[email protected]>
AuthorDate: Tue Dec 3 11:41:44 2024 +0800

    [Feature][Mongodb-CDC] Support multi-table read (#8029)
    
    Co-authored-by: hailin0 <[email protected]>
---
 docs/en/connector-v2/source/MongoDB-CDC.md         |  85 +++++++++-
 .../mongodb/MongodbIncrementalSourceFactory.java   |  93 ++++++++---
 .../mongodb/source/offset/ChangeStreamOffset.java  |   5 +-
 .../src/test/java/mongodb/MongodbCDCIT.java        | 182 ++++++++++++---------
 .../src/test/resources/ddl/inventory.js            |   6 +
 .../src/test/resources/ddl/inventoryClean.js       |   2 +
 .../src/test/resources/ddl/inventoryDDL.js         |  15 +-
 .../src/test/resources/ddl/mongodb_cdc.sql         |   7 +
 ....conf => mongodb_multi_table_cdc_to_mysql.conf} |  38 +++--
 .../test/resources/mongodbcdc_metadata_trans.conf  |   5 +
 .../src/test/resources/mongodbcdc_to_mysql.conf    |   6 +-
 11 files changed, 331 insertions(+), 113 deletions(-)

diff --git a/docs/en/connector-v2/source/MongoDB-CDC.md 
b/docs/en/connector-v2/source/MongoDB-CDC.md
index d7e6c7e440..7cee634a71 100644
--- a/docs/en/connector-v2/source/MongoDB-CDC.md
+++ b/docs/en/connector-v2/source/MongoDB-CDC.md
@@ -112,7 +112,8 @@ For specific types in MongoDB, we use Extended JSON format 
to map them to Seatun
 | password                           | String | No       | -       | Password 
to be used when connecting to MongoDB.                                          
                                                                                
                                                                                
                   |
 | database                           | List   | Yes      | -       | Name of 
the database to watch for changes. If not set then all databases will be 
captured. The database also supports regular expressions to monitor multiple 
databases matching the regular expression. eg. `db1,db2`.                       
                              |
 | collection                         | List   | Yes      | -       | Name of 
the collection in the database to watch for changes. If not set then all 
collections will be captured. The collection also supports regular expressions 
to monitor multiple collections matching fully-qualified collection 
identifiers. eg. `db1.coll1,db2.coll2`. |
-| schema                             |        | yes      | -       | The 
structure of the data, including field names and field types.                   
                                                                                
                                                                                
                        |
+| schema                             |        | no       | -       | The 
structure of the data, including field names and field types, use single table 
cdc.                                                                            
                                                                                
                         |
+| tables_configs                     |        | no       | -       | The 
structure of the data, including field names and field types, use muliti table 
cdc.                                                                            
                                                                                
                         |
 | connection.options                 | String | No       | -       | The 
ampersand-separated connection options of MongoDB.  eg. 
`replicaSet=test&connectTimeoutMS=300000`.                                      
                                                                                
                                                |
 | batch.size                         | Long   | No       | 1024    | The 
cursor batch size.                                                              
                                                                                
                                                                                
                        |
 | poll.max.batch.size                | Enum   | No       | 1024    | Maximum 
number of change stream documents to include in a single batch when polling for 
new data.                                                                       
                                                                                
                    |
@@ -126,6 +127,31 @@ For specific types in MongoDB, we use Extended JSON format 
to map them to Seatun
 > 1.If the collection changes at a slow pace, it is strongly recommended to 
 > set an appropriate value greater than 0 for the heartbeat.interval.ms 
 > parameter. When we recover a Seatunnel job from a checkpoint or savepoint, 
 > the heartbeat events can push the resumeToken forward to avoid its 
 > expiration.<br/>
 > 2.MongoDB has a limit of 16MB for a single document. Change documents 
 > include additional information, so even if the original document is not 
 > larger than 15MB, the change document may exceed the 16MB limit, resulting 
 > in the termination of the Change Stream operation.<br/>
 > 3.It is recommended to use immutable shard keys. In MongoDB, shard keys 
 > allow modifications after transactions are enabled, but changing the shard 
 > key can cause frequent shard migrations, resulting in additional performance 
 > overhead. Additionally, modifying the shard key can also cause the Update 
 > Lookup feature to become ineffective, leading to inconsistent results in CDC 
 > (Change Data Capture) scenarios.<br/>
+> 4.`schema` `tables_configs` are mutually exclusive, and one must be 
configured at a time.
+
+## Change Streams
+
+[**Change Stream**](https://www.mongodb.com/docs/v5.0/changeStreams/) is a new 
feature provided by MongoDB 3.6 for replica sets and sharded clusters that 
allows applications to access real-time data changes without the complexity and 
risk of tailing the oplog.
+Applications can use change streams to subscribe to all data changes on a 
single collection, a database, or an entire deployment, and immediately react 
to them.
+
+**Lookup Full Document for Update Operations** is a feature provided by 
**Change Stream** which can configure the change stream to return the most 
current majority-committed version of the updated document. Because of this 
feature, we can easily collect the latest full document and convert the change 
log to Changelog Stream.
+
+The format of the data captured by delete events in change streams: [delete 
envet](https://www.mongodb.com/docs/v5.0/reference/change-events/delete/)
+```
+{
+   "_id": { <Resume Token> },
+   "operationType": "delete",
+   "clusterTime": <Timestamp>,
+   "ns": {
+      "db": "engineering",
+      "coll": "users"
+   },
+   "documentKey": {
+      "_id": ObjectId("599af247bb69cd89961c986d")
+   }
+}
+```
+The fullDocument document is omitted as the document no longer exists at the 
time the change stream cursor sends the delete event to the client.
 
 ## How to Create a MongoDB CDC Data Synchronization Jobs
 
@@ -149,6 +175,7 @@ source {
     username = stuser
     password = stpw
     schema = {
+      table = "inventory.products"
       fields {
         "_id" : string,
         "name" : string,
@@ -187,6 +214,7 @@ source {
     username = stuser
     password = stpw
     schema = {
+      table = "inventory.products"
       fields {
         "_id" : string,
         "name" : string,
@@ -213,6 +241,59 @@ sink {
 }
 ```
 
+## Multi-table Synchronization
+
+The following example demonstrates how to create a data synchronization job 
that read the cdc data of multiple library tables mongodb and prints it on the 
local client:
+
+```hocon
+env {
+  # You can set engine configuration here
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+}
+
+source {
+  MongoDB-CDC {
+    hosts = "mongo0:27017"
+    database = ["inventory"]
+    collection = ["inventory.products", "inventory.orders"]
+    username = superuser
+    password = superpw
+    tables_configs = [
+      {
+        schema {
+          table = "inventory.products"
+          fields {
+            "_id" : string,
+            "name" : string,
+            "description" : string,
+            "weight" : string
+          }
+        }
+      },
+      {
+        schema {
+          table = "inventory.orders"
+          fields {
+            "_id" : string,
+            "order_number" : int,
+            "order_date" : string,
+            "quantity" : int,
+            "product_id" : string
+          }
+        }
+      }
+    ]
+  }
+}
+
+# Console printing of the read Mongodb data
+sink {
+  Console {
+  }
+}
+```
 
 ## Format of real-time streaming data
 
@@ -244,7 +325,7 @@ sink {
    "txnNumber" : <NumberLong>,    // If the change operation is executed in a 
multi-document transaction, this field and value are displayed, representing 
the transaction number
    "lsid" : {          // Represents information related to the Session in 
which the transaction is located
       "id" : <UUID>,  
-      "uid" : <BinData> 
+      "uid" : <BinData>
    }
 }
 ```
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java
index 5e3a95145c..e2959a6d7a 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb;
 
+import org.apache.seatunnel.api.common.CommonOptions;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceSplit;
@@ -29,6 +31,7 @@ import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
 import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions;
@@ -37,9 +40,10 @@ import 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.exception.MongodbCo
 import com.google.auto.service.AutoService;
 
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 import static 
org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT;
 
@@ -56,8 +60,8 @@ public class MongodbIncrementalSourceFactory implements 
TableSourceFactory {
                 .required(
                         MongodbSourceOptions.HOSTS,
                         MongodbSourceOptions.DATABASE,
-                        MongodbSourceOptions.COLLECTION,
-                        TableSchemaOptions.SCHEMA)
+                        MongodbSourceOptions.COLLECTION)
+                .exclusive(TableSchemaOptions.SCHEMA, 
TableSchemaOptions.TABLE_CONFIGS)
                 .optional(
                         MongodbSourceOptions.USERNAME,
                         MongodbSourceOptions.PASSWORD,
@@ -86,30 +90,71 @@ public class MongodbIncrementalSourceFactory implements 
TableSourceFactory {
     public <T, SplitT extends SourceSplit, StateT extends Serializable>
             TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
         return () -> {
-            List<CatalogTable> configCatalog =
-                    CatalogTableUtil.getCatalogTables(
-                            context.getOptions(), context.getClassLoader());
+            List<CatalogTable> catalogTables = 
buildWithConfig(context.getOptions());
             List<String> collections = 
context.getOptions().get(MongodbSourceOptions.COLLECTION);
-            if (collections.size() != configCatalog.size()) {
-                throw new MongodbConnectorException(
-                        ILLEGAL_ARGUMENT,
-                        "The number of collections must be equal to the number 
of schema tables");
-            }
-            List<CatalogTable> catalogTables =
-                    IntStream.range(0, configCatalog.size())
-                            .mapToObj(
-                                    i -> {
-                                        CatalogTable catalogTable = 
configCatalog.get(i);
-                                        String fullName = collections.get(i);
-                                        TableIdentifier tableIdentifier =
-                                                TableIdentifier.of(
-                                                        
catalogTable.getCatalogName(),
-                                                        
TablePath.of(fullName));
-                                        return 
CatalogTable.of(tableIdentifier, catalogTable);
-                                    })
-                            .collect(Collectors.toList());
+            validateCatalogTablesAndCollections(catalogTables, collections);
+            catalogTables = updateAndValidateCatalogTableId(catalogTables, 
collections);
             return (SeaTunnelSource<T, SplitT, StateT>)
                     new MongodbIncrementalSource<>(context.getOptions(), 
catalogTables);
         };
     }
+
+    private List<CatalogTable> updateAndValidateCatalogTableId(
+            List<CatalogTable> catalogTables, List<String> collections) {
+        for (int i = 0; i < catalogTables.size(); i++) {
+            CatalogTable catalogTable = catalogTables.get(i);
+            String collectionName = collections.get(i);
+            String fullName = catalogTable.getTablePath().getFullName();
+            if (fullName.equals(TablePath.DEFAULT.getFullName())) {
+                if (catalogTables.size() == 1) {
+                    TableIdentifier updatedIdentifier =
+                            TableIdentifier.of(
+                                    catalogTable.getCatalogName(), 
TablePath.of(collectionName));
+                    return Collections.singletonList(
+                            CatalogTable.of(updatedIdentifier, catalogTable));
+                } else if (!fullName.equals(collectionName)) {
+                    throw new MongodbConnectorException(
+                            ILLEGAL_ARGUMENT,
+                            String.format(
+                                    "Inconsistent naming found at index %d: 
The collection name '%s' must match the schema table name '%s'.",
+                                    i, collectionName, fullName));
+                }
+            }
+        }
+        return catalogTables;
+    }
+
+    private void validateCatalogTablesAndCollections(
+            List<CatalogTable> catalogTables, List<String> collections) {
+        if (catalogTables.size() != collections.size()) {
+            throw new MongodbConnectorException(
+                    ILLEGAL_ARGUMENT,
+                    "The number of collections must be equal to the number of 
schema tables");
+        }
+    }
+
+    private List<CatalogTable> buildWithConfig(ReadonlyConfig config) {
+        String factoryId = 
config.get(CommonOptions.PLUGIN_NAME).replace("-CDC", "");
+        Map<String, Object> schemaMap = config.get(TableSchemaOptions.SCHEMA);
+        if (schemaMap != null) {
+            if (schemaMap.isEmpty()) {
+                throw new SeaTunnelException("Schema config can not be empty");
+            }
+            CatalogTable catalogTable = 
CatalogTableUtil.buildWithConfig(factoryId, config);
+            return Collections.singletonList(catalogTable);
+        }
+        List<Map<String, Object>> schemaMaps = 
config.get(TableSchemaOptions.TABLE_CONFIGS);
+        if (schemaMaps != null) {
+            if (schemaMaps.isEmpty()) {
+                throw new SeaTunnelException("tables_configs can not be 
empty");
+            }
+            return schemaMaps.stream()
+                    .map(
+                            map ->
+                                    CatalogTableUtil.buildWithConfig(
+                                            factoryId, 
ReadonlyConfig.fromMap(map)))
+                    .collect(Collectors.toList());
+        }
+        return Collections.emptyList();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/offset/ChangeStreamOffset.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/offset/ChangeStreamOffset.java
index 35acf43bba..2096fe7a6b 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/offset/ChangeStreamOffset.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/offset/ChangeStreamOffset.java
@@ -67,7 +67,10 @@ public class ChangeStreamOffset extends Offset {
     }
 
     public BsonTimestamp getTimestamp() {
-        long timestamp = Long.parseLong(offset.get(TIMESTAMP_FIELD));
+        long timestamp = System.currentTimeMillis();
+        if (offset.get(TIMESTAMP_FIELD) != null) {
+            timestamp = Long.parseLong(offset.get(TIMESTAMP_FIELD));
+        }
         return new BsonTimestamp(timestamp);
     }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
index 7cf18c8032..ee60d9e1c0 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
@@ -30,6 +30,7 @@ import 
org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
 import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
 
 import org.bson.Document;
+import org.bson.types.ObjectId;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
@@ -47,14 +48,16 @@ import com.mongodb.client.model.Sorts;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.sql.Connection;
 import java.sql.DriverManager;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -62,7 +65,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.awaitility.Awaitility.await;
+import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
+import static org.testcontainers.shaded.org.awaitility.Awaitility.with;
+import static org.testcontainers.shaded.org.awaitility.Durations.TWO_SECONDS;
 
 @Slf4j
 @DisabledOnContainer(
@@ -75,7 +80,8 @@ public class MongodbCDCIT extends TestSuiteBase implements 
TestResource {
     // mongodb
     protected static final String MONGODB_DATABASE = "inventory";
 
-    protected static final String MONGODB_COLLECTION = "products";
+    protected static final String MONGODB_COLLECTION_1 = "products";
+    protected static final String MONGODB_COLLECTION_2 = "orders";
     protected MongoDBContainer mongodbContainer;
 
     protected MongoClient client;
@@ -93,7 +99,10 @@ public class MongodbCDCIT extends TestSuiteBase implements 
TestResource {
     private static final MySqlContainer MYSQL_CONTAINER = 
createMySqlContainer();
 
     // mysql sink table query sql
-    private static final String SINK_SQL = "select name,description,weight 
from products";
+    private static final String SINK_SQL_PRODUCTS = "select 
name,description,weight from products";
+
+    private static final String SINK_SQL_ORDERS =
+            "select order_number,order_date,quantity,product_id from orders 
order by order_number asc";
 
     private static final String MYSQL_DRIVER_JAR =
             
"https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";;
@@ -163,70 +172,42 @@ public class MongodbCDCIT extends TestSuiteBase 
implements TestResource {
                     }
                     return null;
                 });
-        await().atMost(60000, TimeUnit.MILLISECONDS)
-                .untilAsserted(
-                        () -> {
-                            Assertions.assertIterableEquals(
-                                    readMongodbData().stream()
-                                            .peek(e -> e.remove("_id"))
-                                            .map(Document::entrySet)
-                                            .map(Set::stream)
-                                            .map(
-                                                    entryStream ->
-                                                            entryStream
-                                                                    
.map(Map.Entry::getValue)
-                                                                    .collect(
-                                                                            
Collectors.toCollection(
-                                                                               
     ArrayList
-                                                                               
             ::new)))
-                                            .collect(Collectors.toList()),
-                                    querySql());
-                        });
-
+        TimeUnit.SECONDS.sleep(10);
         // insert update delete
         upsertDeleteSourceTable();
+        TimeUnit.SECONDS.sleep(20);
+        assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS);
 
-        await().atMost(60000, TimeUnit.MILLISECONDS)
-                .untilAsserted(
-                        () -> {
-                            Assertions.assertIterableEquals(
-                                    readMongodbData().stream()
-                                            .peek(e -> e.remove("_id"))
-                                            .map(Document::entrySet)
-                                            .map(Set::stream)
-                                            .map(
-                                                    entryStream ->
-                                                            entryStream
-                                                                    
.map(Map.Entry::getValue)
-                                                                    .collect(
-                                                                            
Collectors.toCollection(
-                                                                               
     ArrayList
-                                                                               
             ::new)))
-                                            .collect(Collectors.toList()),
-                                    querySql());
-                        });
+        cleanSourceTable();
+        TimeUnit.SECONDS.sleep(20);
+        assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS);
+    }
 
+    @TestTemplate
+    public void testMongodbCdcMultiTableToMysqlE2e(TestContainer container)
+            throws InterruptedException {
         cleanSourceTable();
+        CompletableFuture.supplyAsync(
+                () -> {
+                    try {
+                        
container.executeJob("/mongodb_multi_table_cdc_to_mysql.conf");
+                    } catch (Exception e) {
+                        log.error("Commit task exception :" + e.getMessage());
+                        throw new RuntimeException();
+                    }
+                    return null;
+                });
+        TimeUnit.SECONDS.sleep(10);
+        // insert update delete
+        upsertDeleteSourceTable();
+        TimeUnit.SECONDS.sleep(30);
+        assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS);
+        assertionsSourceAndSink(MONGODB_COLLECTION_2, SINK_SQL_ORDERS);
 
-        await().atMost(60000, TimeUnit.MILLISECONDS)
-                .untilAsserted(
-                        () -> {
-                            Assertions.assertIterableEquals(
-                                    readMongodbData().stream()
-                                            .peek(e -> e.remove("_id"))
-                                            .map(Document::entrySet)
-                                            .map(Set::stream)
-                                            .map(
-                                                    entryStream ->
-                                                            entryStream
-                                                                    
.map(Map.Entry::getValue)
-                                                                    .collect(
-                                                                            
Collectors.toCollection(
-                                                                               
     ArrayList
-                                                                               
             ::new)))
-                                            .collect(Collectors.toList()),
-                                    querySql());
-                        });
+        cleanSourceTable();
+        TimeUnit.SECONDS.sleep(20);
+        assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS);
+        assertionsSourceAndSink(MONGODB_COLLECTION_2, SINK_SQL_ORDERS);
     }
 
     @TestTemplate
@@ -268,6 +249,42 @@ public class MongodbCDCIT extends TestSuiteBase implements 
TestResource {
         }
     }
 
+    private void assertionsSourceAndSink(String mongodbCollection, String 
sinkMysqlQuery) {
+        List<List<Object>> expected =
+                readMongodbData(mongodbCollection).stream()
+                        .peek(e -> e.remove("_id"))
+                        .map(Document::entrySet)
+                        .map(Set::stream)
+                        .map(
+                                entryStream ->
+                                        entryStream
+                                                .map(
+                                                        entry -> {
+                                                            Object value = 
entry.getValue();
+                                                            if (value 
instanceof Number) {
+                                                                return new 
BigDecimal(
+                                                                               
 value.toString())
+                                                                        
.intValue();
+                                                            }
+                                                            if (value 
instanceof ObjectId) {
+                                                                return 
((ObjectId) value)
+                                                                        
.toString();
+                                                            }
+                                                            return value;
+                                                        })
+                                                
.collect(Collectors.toCollection(ArrayList::new)))
+                        .collect(Collectors.toList());
+        log.info("Print mongodb source data: \n{}", expected);
+        with().pollInterval(TWO_SECONDS)
+                .pollDelay(500, TimeUnit.MILLISECONDS)
+                .await()
+                .atMost(450, TimeUnit.SECONDS)
+                .untilAsserted(
+                        () -> {
+                            Assertions.assertIterableEquals(expected, 
querySql(sinkMysqlQuery));
+                        });
+    }
+
     private Connection getJdbcConnection() throws SQLException {
         return DriverManager.getConnection(
                 MYSQL_CONTAINER.getJdbcUrl(),
@@ -275,10 +292,9 @@ public class MongodbCDCIT extends TestSuiteBase implements 
TestResource {
                 MYSQL_CONTAINER.getPassword());
     }
 
-    private List<List<Object>> querySql() {
+    private List<List<Object>> querySql(String querySql) {
         try (Connection connection = getJdbcConnection();
-                ResultSet resultSet =
-                        
connection.createStatement().executeQuery(MongodbCDCIT.SINK_SQL)) {
+                ResultSet resultSet = 
connection.createStatement().executeQuery(querySql)) {
             List<List<Object>> result = new ArrayList<>();
             int columnCount = resultSet.getMetaData().getColumnCount();
             while (resultSet.next()) {
@@ -286,21 +302,45 @@ public class MongodbCDCIT extends TestSuiteBase 
implements TestResource {
                 for (int i = 1; i <= columnCount; i++) {
                     objects.add(resultSet.getObject(i));
                 }
-                log.info("Print mysql sink data:" + objects);
+                log.info("Print mysql sink data: {} ", objects);
                 result.add(objects);
             }
+            log.info("============================= mysql data 
================================");
             return result;
         } catch (SQLException e) {
             throw new RuntimeException(e);
         }
     }
 
+    private void truncateMysqlTable(String tableName) {
+        String checkTableExistsSql =
+                "SELECT COUNT(*) FROM information_schema.tables WHERE 
table_schema = ? AND table_name = ?";
+        String truncateTableSql = String.format("TRUNCATE TABLE %s", 
tableName);
+
+        try (Connection connection = getJdbcConnection();
+                PreparedStatement checkStmt = 
connection.prepareStatement(checkTableExistsSql)) {
+            checkStmt.setString(1, MYSQL_DATABASE);
+            checkStmt.setString(2, tableName);
+            try (ResultSet rs = checkStmt.executeQuery()) {
+                if (rs.next() && rs.getInt(1) > 0) {
+                    try (Statement truncateStmt = 
connection.createStatement()) {
+                        truncateStmt.executeUpdate(truncateTableSql);
+                    }
+                }
+            }
+        } catch (SQLException e) {
+            throw new RuntimeException("Error checking if table exists: " + 
tableName, e);
+        }
+    }
+
     private void upsertDeleteSourceTable() {
         mongodbContainer.executeCommandFileInDatabase("inventoryDDL", 
MONGODB_DATABASE);
     }
 
     private void cleanSourceTable() {
         mongodbContainer.executeCommandFileInDatabase("inventoryClean", 
MONGODB_DATABASE);
+        truncateMysqlTable(MONGODB_COLLECTION_1);
+        truncateMysqlTable(MONGODB_COLLECTION_2);
     }
 
     public void initConnection() {
@@ -309,17 +349,13 @@ public class MongodbCDCIT extends TestSuiteBase 
implements TestResource {
         String url =
                 String.format(
                         "mongodb://%s:%s@%s:%d/%s?authSource=admin",
-                        "superuser",
-                        "superpw",
-                        ipAddress,
-                        port,
-                        MONGODB_DATABASE + "." + MONGODB_COLLECTION);
+                        "superuser", "superpw", ipAddress, port, 
MONGODB_DATABASE);
         client = MongoClients.create(url);
     }
 
-    protected List<Document> readMongodbData() {
+    protected List<Document> readMongodbData(String collection) {
         MongoCollection<Document> sinkTable =
-                
client.getDatabase(MONGODB_DATABASE).getCollection(MongodbCDCIT.MONGODB_COLLECTION);
+                client.getDatabase(MONGODB_DATABASE).getCollection(collection);
         // If the cursor has been traversed, it will automatically close 
without explicitly closing.
         MongoCursor<Document> cursor = 
sinkTable.find().sort(Sorts.ascending("_id")).cursor();
         List<Document> documents = new ArrayList<>();
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/inventory.js
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/inventory.js
index c834ec8a2c..cd4d6cd0ab 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/inventory.js
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/inventory.js
@@ -22,3 +22,9 @@ db.getCollection('products').insertOne({"_id": 
ObjectId("10000000000000000000010
 db.getCollection('products').insertOne({"_id": 
ObjectId("100000000000000000000107"), "name": "rocks", "description": "box of 
assorted rocks", "weight": "53"});
 db.getCollection('products').insertOne({"_id": 
ObjectId("100000000000000000000108"), "name": "jacket", "description": "water 
resistent black wind breaker", "weight": "1"});
 
+
+db.getCollection('orders').insertOne({"_id": 
ObjectId("100000000000000000000101"),"order_number": 102482, "order_date": 
"2023-11-12", "quantity": 2 , "product_id": 
ObjectId("100000000000000000000101")});
+db.getCollection('orders').insertOne({"_id": 
ObjectId("100000000000000000000102"),"order_number": 102483, "order_date": 
"2023-11-13", "quantity": 5 , "product_id": 
ObjectId("100000000000000000000102")});
+db.getCollection('orders').insertOne({"_id": 
ObjectId("100000000000000000000103"),"order_number": 102484, "order_date": 
"2023-11-14", "quantity": 6 , "product_id": 
ObjectId("100000000000000000000103")});
+db.getCollection('orders').insertOne({"_id": 
ObjectId("100000000000000000000104"),"order_number": 102485, "order_date": 
"2023-11-15", "quantity": 9 , "product_id": 
ObjectId("100000000000000000000104")});
+db.getCollection('orders').insertOne({"_id": 
ObjectId("100000000000000000000105"),"order_number": 102486, "order_date": 
"2023-11-16", "quantity": 8 , "product_id": 
ObjectId("100000000000000000000105")});
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/inventoryClean.js
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/inventoryClean.js
index fbbb0ea0df..2b99f98618 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/inventoryClean.js
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/inventoryClean.js
@@ -14,3 +14,5 @@
 // limitations under the License.
 
 db.getCollection('products').deleteMany({})
+
+db.getCollection('orders').deleteMany({})
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/inventoryDDL.js
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/inventoryDDL.js
index db05f5f59f..75683e086a 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/inventoryDDL.js
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/inventoryDDL.js
@@ -29,4 +29,17 @@ db.getCollection('products').deleteOne({"_id": 
ObjectId("10000000000000000000010
 db.getCollection('products').deleteOne({"name": "car battery"});
 db.getCollection('products').deleteOne({"name": "12-pack drill bits"});
 db.getCollection('products').deleteOne({"name": "hammer", "weight": "875"});
-db.getCollection('products').deleteOne({"name": "jacket"});
\ No newline at end of file
+db.getCollection('products').deleteOne({"name": "jacket"});
+
+
+db.getCollection('orders').insertOne({"_id": 
ObjectId("100000000000000000000106"),"order_number": 102487, "order_date": 
"2023-11-12", "quantity": 2 , "product_id": 
ObjectId("100000000000000000000113")});
+db.getCollection('orders').insertOne({"_id": 
ObjectId("100000000000000000000107"),"order_number": 102488, "order_date": 
"2023-11-13", "quantity": 5 , "product_id": 
ObjectId("100000000000000000000112")});
+db.getCollection('orders').insertOne({"_id": 
ObjectId("100000000000000000000108"),"order_number": 102489, "order_date": 
"2023-11-14", "quantity": 6 , "product_id": 
ObjectId("100000000000000000000111")});
+db.getCollection('orders').insertOne({"_id": 
ObjectId("100000000000000000000109"),"order_number": 102490, "order_date": 
"2023-11-15", "quantity": 9 , "product_id": 
ObjectId("100000000000000000000110")});
+db.getCollection('orders').insertOne({"_id": 
ObjectId("100000000000000000000110"),"order_number": 102491, "order_date": 
"2023-11-16", "quantity": 8 , "product_id": 
ObjectId("100000000000000000000109")});
+
+db.getCollection('orders').updateOne({"order_number": 102490}, {$set: 
{"quantity": 99}});
+
+db.getCollection('orders').deleteOne({"order_number": 102487});
+db.getCollection('orders').deleteOne({"order_number": 102488});
+db.getCollection('orders').deleteOne({"order_number": 102489});
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/mongodb_cdc.sql
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/mongodb_cdc.sql
index cc7a619af6..d81e73b764 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/mongodb_cdc.sql
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/mongodb_cdc.sql
@@ -30,3 +30,10 @@ CREATE TABLE products (
   weight VARCHAR(255)
 );
 
+CREATE TABLE orders (
+  _id VARCHAR(512) NOT NULL PRIMARY KEY,
+  order_number INT NOT NULL,
+  order_date VARCHAR(20) NOT NULL,
+  quantity INT NOT NULL,
+  product_id VARCHAR(512) NOT NULL
+);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_to_mysql.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodb_multi_table_cdc_to_mysql.conf
similarity index 66%
copy from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_to_mysql.conf
copy to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodb_multi_table_cdc_to_mysql.conf
index f4e3c45735..0cfea201de 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_to_mysql.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodb_multi_table_cdc_to_mysql.conf
@@ -25,17 +25,34 @@ source {
   MongoDB-CDC {
     hosts = "mongo0:27017"
     database = ["inventory"]
-    collection = ["inventory.products"]
+    collection = ["inventory.products","inventory.orders"]
     username = superuser
     password = superpw
-    schema = {
-      fields {
-        "_id": string,
-        "name": string,
-        "description": string,
-        "weight": string
+    tables_configs = [
+      {
+        schema {
+          table = "inventory.products"
+          fields {
+            "_id" : string,
+            "name" : string,
+            "description" : string,
+            "weight" : string
+          }
+        }
+      },
+      {
+        schema {
+          table = "inventory.orders"
+          fields {
+            "_id" : string,
+            "order_number" : int,
+            "order_date" : string,
+            "quantity" : int,
+            "product_id" : string
+          }
+        }
       }
-    }
+    ]
   }
 }
 
@@ -46,9 +63,8 @@ sink {
     user = "st_user"
     password = "seatunnel"
     generate_sink_sql = true
-    # You need to configure both database and table
     database = mongodb_cdc
-    table = products
+    table = "${table_name}"
     primary_keys = ["_id"]
   }
-}
\ No newline at end of file
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_metadata_trans.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_metadata_trans.conf
index e7cacec499..3c62a06c00 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_metadata_trans.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_metadata_trans.conf
@@ -29,6 +29,11 @@ source {
     username = superuser
     password = superpw
     schema = {
+      table = "inventory.products"
+      primaryKey {
+        name = "id"
+        columnNames = ["_id"]
+      }
       fields {
         "_id": string,
         "name": string,
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_to_mysql.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_to_mysql.conf
index f4e3c45735..7e3d41d045 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_to_mysql.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_to_mysql.conf
@@ -29,6 +29,10 @@ source {
     username = superuser
     password = superpw
     schema = {
+      primaryKey {
+        name = "id"
+        columnNames = ["_id"]
+      }
       fields {
         "_id": string,
         "name": string,
@@ -51,4 +55,4 @@ sink {
     table = products
     primary_keys = ["_id"]
   }
-}
\ No newline at end of file
+}


Reply via email to