This is an automated email from the ASF dual-hosted git repository.
xiaochenzhou pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 8e27bffff8 [Fix][Connector-v2][MongoDB] There is a problem with using
Cache for multi-task submission (#10116)
8e27bffff8 is described below
commit 8e27bffff8fdd5ecf6a7d41b5c3a2dafc3fe642b
Author: Jast <[email protected]>
AuthorDate: Fri Jan 30 18:55:02 2026 +0800
[Fix][Connector-v2][MongoDB] There is a problem with using Cache for
multi-task submission (#10116)
---
.../seatunnel/cdc/mongodb/utils/MongodbUtils.java | 17 ++-----
.../src/test/java/mongodb/MongodbCDCIT.java | 48 ++++++++++++++++++
.../test/resources/mongodbcdc_to_mysql_orders.conf | 59 ++++++++++++++++++++++
3 files changed, 110 insertions(+), 14 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbUtils.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbUtils.java
index 415424c2a1..8dba927f0b 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbUtils.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbUtils.java
@@ -51,9 +51,7 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
-import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
import static com.mongodb.client.model.Aggregates.match;
@@ -87,8 +85,6 @@ import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.Collec
@Slf4j
public class MongodbUtils {
- private static final Map<TableId, MongoCollection<?>> cache = new
ConcurrentHashMap<>();
-
public static ChangeStreamDescriptor getChangeStreamDescriptor(
@Nonnull MongodbSourceConfig sourceConfig,
List<String> discoveredDatabases,
@@ -364,16 +360,9 @@ public class MongodbUtils {
@SuppressWarnings("unchecked")
public static <T> @Nonnull MongoCollection<T> getCollection(
MongoClient mongoClient, TableId collectionId, Class<T>
documentClass) {
- MongoCollection<?> cachedCollection = cache.get(collectionId);
- if (cachedCollection == null) {
- MongoCollection<T> collection =
- mongoClient
- .getDatabase(collectionId.catalog())
- .getCollection(collectionId.table(),
documentClass);
- cache.put(collectionId, collection);
- return collection;
- }
- return (MongoCollection<T>) cachedCollection;
+ return mongoClient
+ .getDatabase(collectionId.catalog())
+ .getCollection(collectionId.table(), documentClass);
}
public static MongoClient createMongoClient(MongodbSourceConfig
sourceConfig) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
index 3698556cda..fb937f512d 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
@@ -251,6 +251,54 @@ public class MongodbCDCIT extends TestSuiteBase implements
TestResource {
assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS);
}
+ @TestTemplate
+ public void testMongodbCdcMultiTaskConcurrentSubmission(TestContainer
container)
+ throws InterruptedException {
+ cleanSourceTable();
+
+ // Submit two independent CDC tasks concurrently, each reading from
different collections
+ CompletableFuture<Void> task1 =
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+
container.executeJob("/mongodbcdc_to_mysql.conf");
+ } catch (Exception e) {
+ log.error("Task 1 (products) exception: " +
e.getMessage());
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+
+ CompletableFuture<Void> task2 =
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+
container.executeJob("/mongodbcdc_to_mysql_orders.conf");
+ } catch (Exception e) {
+ log.error("Task 2 (orders) exception: " +
e.getMessage());
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+
+ TimeUnit.SECONDS.sleep(20);
+
+ // insert update delete operations
+ upsertDeleteSourceTable();
+
+ TimeUnit.SECONDS.sleep(20);
+
+ // Verify both tasks work correctly without cache interference
+ assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS);
+ assertionsSourceAndSink(MONGODB_COLLECTION_2, SINK_SQL_ORDERS);
+
+ // Clean and verify again to ensure CDC continues to work
+ cleanSourceTable();
+ TimeUnit.SECONDS.sleep(20);
+ assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS);
+ assertionsSourceAndSink(MONGODB_COLLECTION_2, SINK_SQL_ORDERS);
+ }
+
@TestTemplate
@DisabledOnContainer(
value = {},
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_to_mysql_orders.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_to_mysql_orders.conf
new file mode 100644
index 0000000000..202c866624
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_to_mysql_orders.conf
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
+
+source {
+ MongoDB-CDC {
+ hosts = "mongo0:27017"
+ database = ["inventory"]
+ collection = ["inventory.orders"]
+ username = superuser
+ password = superpw
+ schema = {
+ primaryKey {
+ name = "id"
+ columnNames = ["_id"]
+ }
+ fields {
+ "_id": string,
+ "order_number": int,
+ "order_date": string,
+ "quantity": int,
+ "product_id": string
+ }
+ }
+ }
+}
+
+sink {
+ jdbc {
+ url = "jdbc:mysql://mysql_e2e:3306/mongodb_cdc"
+ driver = "com.mysql.cj.jdbc.Driver"
+ username = "st_user"
+ password = "seatunnel"
+ generate_sink_sql = true
+ # You need to configure both database and table
+ database = mongodb_cdc
+ table = orders
+ primary_keys = ["_id"]
+ }
+}