This is an automated email from the ASF dual-hosted git repository.
zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 85fa04fa04 [Fix][E2E-MongoDB CDC] Stabilize concurrent submission test
and ensure CI trigger (#10474)
85fa04fa04 is described below
commit 85fa04fa043feffb0a668f52d5820936f8413ebb
Author: yzeng1618 <[email protected]>
AuthorDate: Tue Feb 24 17:10:04 2026 +0800
[Fix][E2E-MongoDB CDC] Stabilize concurrent submission test and ensure CI
trigger (#10474)
Co-authored-by: zengyi <[email protected]>
---
.../src/test/java/mongodb/MongodbCDCIT.java | 52 +++++++++++++++++++++-
1 file changed, 50 insertions(+), 2 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
index fb937f512d..334f895dcb 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
@@ -78,6 +78,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -282,6 +283,8 @@ public class MongodbCDCIT extends TestSuiteBase implements
TestResource {
});
TimeUnit.SECONDS.sleep(20);
+ assertTaskNotCompletedExceptionally(task1, "products");
+ assertTaskNotCompletedExceptionally(task2, "orders");
// insert update delete operations
upsertDeleteSourceTable();
@@ -291,12 +294,16 @@ public class MongodbCDCIT extends TestSuiteBase
implements TestResource {
// Verify both tasks work correctly without cache interference
assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS);
assertionsSourceAndSink(MONGODB_COLLECTION_2, SINK_SQL_ORDERS);
+ assertTaskNotCompletedExceptionally(task1, "products");
+ assertTaskNotCompletedExceptionally(task2, "orders");
- // Clean and verify again to ensure CDC continues to work
- cleanSourceTable();
+ // Append incremental changes and verify again to ensure CDC continues
to work
+ appendIncrementalSourceTableData();
TimeUnit.SECONDS.sleep(20);
assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS);
assertionsSourceAndSink(MONGODB_COLLECTION_2, SINK_SQL_ORDERS);
+ assertTaskNotCompletedExceptionally(task1, "products");
+ assertTaskNotCompletedExceptionally(task2, "orders");
}
@TestTemplate
@@ -667,12 +674,53 @@ public class MongodbCDCIT extends TestSuiteBase
implements TestResource {
mongodbContainer.executeCommandFileInDatabase("inventoryDDL",
MONGODB_DATABASE);
}
+ private void appendIncrementalSourceTableData() {
+ MongoDatabase mongoDatabase = client.getDatabase(MONGODB_DATABASE);
+ MongoCollection<Document> products =
mongoDatabase.getCollection(MONGODB_COLLECTION_1);
+ MongoCollection<Document> orders =
mongoDatabase.getCollection(MONGODB_COLLECTION_2);
+
+ ObjectId productId = new ObjectId("100000000000000000000120");
+ Document product = new Document();
+ product.put("_id", productId);
+ product.put("name", "usb-c cable");
+ product.put("description", "durable usb-c charging cable");
+ product.put("weight", "50");
+ products.insertOne(product);
+ products.updateOne(
+ Filters.eq("_id", productId), Updates.set("description",
"durable usb-c cable 1m"));
+
+ Document order = new Document();
+ order.put("_id", new ObjectId("100000000000000000000121"));
+ order.put("order_number", 102600);
+ order.put("order_date", "2023-11-18");
+ order.put("quantity", 7);
+ order.put("product_id", productId);
+ orders.insertOne(order);
+ }
+
private void cleanSourceTable() {
mongodbContainer.executeCommandFileInDatabase("inventoryClean",
MONGODB_DATABASE);
truncateMysqlTable(MONGODB_COLLECTION_1);
truncateMysqlTable(MONGODB_COLLECTION_2);
}
+ private void assertTaskNotCompletedExceptionally(
+ CompletableFuture<Void> task, String taskName) {
+ if (!task.isCompletedExceptionally()) {
+ return;
+ }
+ try {
+ task.join();
+ } catch (CompletionException e) {
+ Throwable cause = e.getCause() == null ? e : e.getCause();
+ throw new AssertionError(
+ String.format(
+ "Concurrent MongoDB CDC task for [%s] failed
during submission",
+ taskName),
+ cause);
+ }
+ }
+
public void initConnection() {
String ipAddress = mongodbContainer.getHost();
Integer port = mongodbContainer.getFirstMappedPort();