This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 0102b50c3 fix(integration): reduce flakiness in connector integration 
tests (#2966)
0102b50c3 is described below

commit 0102b50c38710c7a70d4bda8b244e2d543c99c63
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Wed Mar 18 13:15:13 2026 +0100

    fix(integration): reduce flakiness in connector integration tests (#2966)
---
 .../integration/src/harness/handle/connectors_runtime.rs | 16 ++++++++++------
 core/integration/src/harness/handle/mcp.rs               | 12 ++++++------
 core/integration/tests/connectors/mongodb/mod.rs         |  2 +-
 3 files changed, 17 insertions(+), 13 deletions(-)

diff --git a/core/integration/src/harness/handle/connectors_runtime.rs 
b/core/integration/src/harness/handle/connectors_runtime.rs
index 34190bb15..ffe06cd1e 100644
--- a/core/integration/src/harness/handle/connectors_runtime.rs
+++ b/core/integration/src/harness/handle/connectors_runtime.rs
@@ -190,12 +190,6 @@ impl TestBinary for ConnectorsRuntimeHandle {
         })?;
         self.child_handle = Some(child);
 
-        // Release port reservation immediately after spawn to avoid 
SO_REUSEPORT
-        // load-balancing conflicts during health checks.
-        if let Some(reserver) = self.port_reserver.take() {
-            reserver.release();
-        }
-
         Ok(())
     }
 
@@ -251,6 +245,16 @@ impl IggyServerDependent for ConnectorsRuntimeHandle {
     }
 
     async fn wait_ready(&mut self) -> Result<(), TestBinaryError> {
+        // Release port reservation just before health-checking. The 
reservation
+        // holds the port while the child process initializes, preventing 
another
+        // process from claiming it. By the time we reach here the child has 
had
+        // enough time to bind (or will bind within the first few health-check
+        // retries). This matches the server handle pattern where the 
reservation
+        // is held until the process has bound.
+        if let Some(reserver) = self.port_reserver.take() {
+            reserver.release();
+        }
+
         let http_address = self.http_url();
         let client = reqwest::Client::new();
 
diff --git a/core/integration/src/harness/handle/mcp.rs 
b/core/integration/src/harness/handle/mcp.rs
index efacb1e7a..deab71de2 100644
--- a/core/integration/src/harness/handle/mcp.rs
+++ b/core/integration/src/harness/handle/mcp.rs
@@ -206,12 +206,6 @@ impl TestBinary for McpHandle {
         })?;
         self.child_handle = Some(child);
 
-        // Release port reservation immediately after spawn to avoid 
SO_REUSEPORT
-        // load-balancing conflicts during health checks.
-        if let Some(reserver) = self.port_reserver.take() {
-            reserver.release();
-        }
-
         Ok(())
     }
 
@@ -250,6 +244,12 @@ impl IggyServerDependent for McpHandle {
     }
 
     async fn wait_ready(&mut self) -> Result<(), TestBinaryError> {
+        // Release port reservation just before health-checking. Holds the port
+        // while the child initializes, matching the server handle pattern.
+        if let Some(reserver) = self.port_reserver.take() {
+            reserver.release();
+        }
+
         let http_address = format!(
             "http://{}:{}";,
             self.server_address.ip(),
diff --git a/core/integration/tests/connectors/mongodb/mod.rs 
b/core/integration/tests/connectors/mongodb/mod.rs
index d44706b49..daaf9b845 100644
--- a/core/integration/tests/connectors/mongodb/mod.rs
+++ b/core/integration/tests/connectors/mongodb/mod.rs
@@ -20,5 +20,5 @@
 mod mongodb_sink;
 
 const TEST_MESSAGE_COUNT: usize = 3;
-const POLL_ATTEMPTS: usize = 100;
+const POLL_ATTEMPTS: usize = 400;
 const POLL_INTERVAL_MS: u64 = 50;

Reply via email to