This is an automated email from the ASF dual-hosted git repository.

xiaochenzhou pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8e27bffff8 [Fix][Connector-v2][MongoDB] There is a problem with using 
Cache for multi-task submission (#10116)
8e27bffff8 is described below

commit 8e27bffff8fdd5ecf6a7d41b5c3a2dafc3fe642b
Author: Jast <[email protected]>
AuthorDate: Fri Jan 30 18:55:02 2026 +0800

    [Fix][Connector-v2][MongoDB] There is a problem with using Cache for 
multi-task submission (#10116)
---
 .../seatunnel/cdc/mongodb/utils/MongodbUtils.java  | 17 ++-----
 .../src/test/java/mongodb/MongodbCDCIT.java        | 48 ++++++++++++++++++
 .../test/resources/mongodbcdc_to_mysql_orders.conf | 59 ++++++++++++++++++++++
 3 files changed, 110 insertions(+), 14 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbUtils.java
index 415424c2a1..8dba927f0b 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbUtils.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbUtils.java
@@ -51,9 +51,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Locale;
-import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.regex.Pattern;
 
 import static com.mongodb.client.model.Aggregates.match;
@@ -87,8 +85,6 @@ import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.Collec
 @Slf4j
 public class MongodbUtils {
 
-    private static final Map<TableId, MongoCollection<?>> cache = new 
ConcurrentHashMap<>();
-
     public static ChangeStreamDescriptor getChangeStreamDescriptor(
             @Nonnull MongodbSourceConfig sourceConfig,
             List<String> discoveredDatabases,
@@ -364,16 +360,9 @@ public class MongodbUtils {
     @SuppressWarnings("unchecked")
     public static <T> @Nonnull MongoCollection<T> getCollection(
             MongoClient mongoClient, TableId collectionId, Class<T> 
documentClass) {
-        MongoCollection<?> cachedCollection = cache.get(collectionId);
-        if (cachedCollection == null) {
-            MongoCollection<T> collection =
-                    mongoClient
-                            .getDatabase(collectionId.catalog())
-                            .getCollection(collectionId.table(), 
documentClass);
-            cache.put(collectionId, collection);
-            return collection;
-        }
-        return (MongoCollection<T>) cachedCollection;
+        return mongoClient
+                .getDatabase(collectionId.catalog())
+                .getCollection(collectionId.table(), documentClass);
     }
 
     public static MongoClient createMongoClient(MongodbSourceConfig 
sourceConfig) {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
index 3698556cda..fb937f512d 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
@@ -251,6 +251,54 @@ public class MongodbCDCIT extends TestSuiteBase implements 
TestResource {
         assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS);
     }
 
+    @TestTemplate
+    public void testMongodbCdcMultiTaskConcurrentSubmission(TestContainer 
container)
+            throws InterruptedException {
+        cleanSourceTable();
+
+        // Submit two independent CDC tasks concurrently, each reading from 
different collections
+        CompletableFuture<Void> task1 =
+                CompletableFuture.supplyAsync(
+                        () -> {
+                            try {
+                                
container.executeJob("/mongodbcdc_to_mysql.conf");
+                            } catch (Exception e) {
+                                log.error("Task 1 (products) exception: " + 
e.getMessage());
+                                throw new RuntimeException(e);
+                            }
+                            return null;
+                        });
+
+        CompletableFuture<Void> task2 =
+                CompletableFuture.supplyAsync(
+                        () -> {
+                            try {
+                                
container.executeJob("/mongodbcdc_to_mysql_orders.conf");
+                            } catch (Exception e) {
+                                log.error("Task 2 (orders) exception: " + 
e.getMessage());
+                                throw new RuntimeException(e);
+                            }
+                            return null;
+                        });
+
+        TimeUnit.SECONDS.sleep(20);
+
+        // insert update delete operations
+        upsertDeleteSourceTable();
+
+        TimeUnit.SECONDS.sleep(20);
+
+        // Verify both tasks work correctly without cache interference
+        assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS);
+        assertionsSourceAndSink(MONGODB_COLLECTION_2, SINK_SQL_ORDERS);
+
+        // Clean and verify again to ensure CDC continues to work
+        cleanSourceTable();
+        TimeUnit.SECONDS.sleep(20);
+        assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS);
+        assertionsSourceAndSink(MONGODB_COLLECTION_2, SINK_SQL_ORDERS);
+    }
+
     @TestTemplate
     @DisabledOnContainer(
             value = {},
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_to_mysql_orders.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_to_mysql_orders.conf
new file mode 100644
index 0000000000..202c866624
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_to_mysql_orders.conf
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+}
+
+source {
+  MongoDB-CDC {
+    hosts = "mongo0:27017"
+    database = ["inventory"]
+    collection = ["inventory.orders"]
+    username = superuser
+    password = superpw
+    schema = {
+      primaryKey {
+        name = "id"
+        columnNames = ["_id"]
+      }
+      fields {
+        "_id": string,
+        "order_number": int,
+        "order_date": string,
+        "quantity": int,
+        "product_id": string
+      }
+    }
+  }
+}
+
+sink {
+  jdbc {
+    url = "jdbc:mysql://mysql_e2e:3306/mongodb_cdc"
+    driver = "com.mysql.cj.jdbc.Driver"
+    username = "st_user"
+    password = "seatunnel"
+    generate_sink_sql = true
+    # You need to configure both database and table
+    database = mongodb_cdc
+    table = orders
+    primary_keys = ["_id"]
+  }
+}

Reply via email to