This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 0102b50c3 fix(integration): reduce flakiness in connector integration
tests (#2966)
0102b50c3 is described below
commit 0102b50c38710c7a70d4bda8b244e2d543c99c63
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Wed Mar 18 13:15:13 2026 +0100
fix(integration): reduce flakiness in connector integration tests (#2966)
---
.../integration/src/harness/handle/connectors_runtime.rs | 16 ++++++++++------
core/integration/src/harness/handle/mcp.rs | 12 ++++++------
core/integration/tests/connectors/mongodb/mod.rs | 2 +-
3 files changed, 17 insertions(+), 13 deletions(-)
diff --git a/core/integration/src/harness/handle/connectors_runtime.rs
b/core/integration/src/harness/handle/connectors_runtime.rs
index 34190bb15..ffe06cd1e 100644
--- a/core/integration/src/harness/handle/connectors_runtime.rs
+++ b/core/integration/src/harness/handle/connectors_runtime.rs
@@ -190,12 +190,6 @@ impl TestBinary for ConnectorsRuntimeHandle {
})?;
self.child_handle = Some(child);
- // Release port reservation immediately after spawn to avoid
SO_REUSEPORT
- // load-balancing conflicts during health checks.
- if let Some(reserver) = self.port_reserver.take() {
- reserver.release();
- }
-
Ok(())
}
@@ -251,6 +245,16 @@ impl IggyServerDependent for ConnectorsRuntimeHandle {
}
async fn wait_ready(&mut self) -> Result<(), TestBinaryError> {
+ // Release port reservation just before health-checking. The
reservation
+ // holds the port while the child process initializes, preventing
another
+ // process from claiming it. By the time we reach here the child has
had
+ // enough time to bind (or will bind within the first few health-check
+ // retries). This matches the server handle pattern where the
reservation
+ // is held until the process has bound.
+ if let Some(reserver) = self.port_reserver.take() {
+ reserver.release();
+ }
+
let http_address = self.http_url();
let client = reqwest::Client::new();
diff --git a/core/integration/src/harness/handle/mcp.rs
b/core/integration/src/harness/handle/mcp.rs
index efacb1e7a..deab71de2 100644
--- a/core/integration/src/harness/handle/mcp.rs
+++ b/core/integration/src/harness/handle/mcp.rs
@@ -206,12 +206,6 @@ impl TestBinary for McpHandle {
})?;
self.child_handle = Some(child);
- // Release port reservation immediately after spawn to avoid
SO_REUSEPORT
- // load-balancing conflicts during health checks.
- if let Some(reserver) = self.port_reserver.take() {
- reserver.release();
- }
-
Ok(())
}
@@ -250,6 +244,12 @@ impl IggyServerDependent for McpHandle {
}
async fn wait_ready(&mut self) -> Result<(), TestBinaryError> {
+ // Release port reservation just before health-checking. Holds the port
+ // while the child initializes, matching the server handle pattern.
+ if let Some(reserver) = self.port_reserver.take() {
+ reserver.release();
+ }
+
let http_address = format!(
"http://{}:{}",
self.server_address.ip(),
diff --git a/core/integration/tests/connectors/mongodb/mod.rs
b/core/integration/tests/connectors/mongodb/mod.rs
index d44706b49..daaf9b845 100644
--- a/core/integration/tests/connectors/mongodb/mod.rs
+++ b/core/integration/tests/connectors/mongodb/mod.rs
@@ -20,5 +20,5 @@
mod mongodb_sink;
const TEST_MESSAGE_COUNT: usize = 3;
-const POLL_ATTEMPTS: usize = 100;
+const POLL_ATTEMPTS: usize = 400;
const POLL_INTERVAL_MS: u64 = 50;