This is an automated email from the ASF dual-hosted git repository.

zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 85fa04fa04 [Fix][E2E-MongoDB CDC] Stabilize concurrent submission test 
and ensure CI trigger (#10474)
85fa04fa04 is described below

commit 85fa04fa043feffb0a668f52d5820936f8413ebb
Author: yzeng1618 <[email protected]>
AuthorDate: Tue Feb 24 17:10:04 2026 +0800

    [Fix][E2E-MongoDB CDC] Stabilize concurrent submission test and ensure CI 
trigger (#10474)
    
    Co-authored-by: zengyi <[email protected]>
---
 .../src/test/java/mongodb/MongodbCDCIT.java        | 52 +++++++++++++++++++++-
 1 file changed, 50 insertions(+), 2 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
index fb937f512d..334f895dcb 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
@@ -78,6 +78,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -282,6 +283,8 @@ public class MongodbCDCIT extends TestSuiteBase implements 
TestResource {
                         });
 
         TimeUnit.SECONDS.sleep(20);
+        assertTaskNotCompletedExceptionally(task1, "products");
+        assertTaskNotCompletedExceptionally(task2, "orders");
 
         // insert update delete operations
         upsertDeleteSourceTable();
@@ -291,12 +294,16 @@ public class MongodbCDCIT extends TestSuiteBase 
implements TestResource {
         // Verify both tasks work correctly without cache interference
         assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS);
         assertionsSourceAndSink(MONGODB_COLLECTION_2, SINK_SQL_ORDERS);
+        assertTaskNotCompletedExceptionally(task1, "products");
+        assertTaskNotCompletedExceptionally(task2, "orders");
 
-        // Clean and verify again to ensure CDC continues to work
-        cleanSourceTable();
+        // Append incremental changes and verify again to ensure CDC continues 
to work
+        appendIncrementalSourceTableData();
         TimeUnit.SECONDS.sleep(20);
         assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS);
         assertionsSourceAndSink(MONGODB_COLLECTION_2, SINK_SQL_ORDERS);
+        assertTaskNotCompletedExceptionally(task1, "products");
+        assertTaskNotCompletedExceptionally(task2, "orders");
     }
 
     @TestTemplate
@@ -667,12 +674,53 @@ public class MongodbCDCIT extends TestSuiteBase 
implements TestResource {
         mongodbContainer.executeCommandFileInDatabase("inventoryDDL", 
MONGODB_DATABASE);
     }
 
+    private void appendIncrementalSourceTableData() {
+        MongoDatabase mongoDatabase = client.getDatabase(MONGODB_DATABASE);
+        MongoCollection<Document> products = 
mongoDatabase.getCollection(MONGODB_COLLECTION_1);
+        MongoCollection<Document> orders = 
mongoDatabase.getCollection(MONGODB_COLLECTION_2);
+
+        ObjectId productId = new ObjectId("100000000000000000000120");
+        Document product = new Document();
+        product.put("_id", productId);
+        product.put("name", "usb-c cable");
+        product.put("description", "durable usb-c charging cable");
+        product.put("weight", "50");
+        products.insertOne(product);
+        products.updateOne(
+                Filters.eq("_id", productId), Updates.set("description", 
"durable usb-c cable 1m"));
+
+        Document order = new Document();
+        order.put("_id", new ObjectId("100000000000000000000121"));
+        order.put("order_number", 102600);
+        order.put("order_date", "2023-11-18");
+        order.put("quantity", 7);
+        order.put("product_id", productId);
+        orders.insertOne(order);
+    }
+
     private void cleanSourceTable() {
         mongodbContainer.executeCommandFileInDatabase("inventoryClean", 
MONGODB_DATABASE);
         truncateMysqlTable(MONGODB_COLLECTION_1);
         truncateMysqlTable(MONGODB_COLLECTION_2);
     }
 
+    private void assertTaskNotCompletedExceptionally(
+            CompletableFuture<Void> task, String taskName) {
+        if (!task.isCompletedExceptionally()) {
+            return;
+        }
+        try {
+            task.join();
+        } catch (CompletionException e) {
+            Throwable cause = e.getCause() == null ? e : e.getCause();
+            throw new AssertionError(
+                    String.format(
+                            "Concurrent MongoDB CDC task for [%s] failed 
during submission",
+                            taskName),
+                    cause);
+        }
+    }
+
     public void initConnection() {
         String ipAddress = mongodbContainer.getHost();
         Integer port = mongodbContainer.getFirstMappedPort();

Reply via email to