This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new ac6b30bb8 fix(integration): add watchdog to detect test server crashes
(#2576)
ac6b30bb8 is described below
commit ac6b30bb860b426ede16181a1d9ca4665763c0e5
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Jan 16 13:16:55 2026 +0100
fix(integration): add watchdog to detect test server crashes (#2576)
If the server dies unexpectedly (crash, panic), the watchdog
prints stdout/stderr logs and aborts the test process, making
failures immediately visible instead of hanging.
---
core/integration/src/test_server.rs | 107 ++++++++++++++++++++++++++++++++++-
core/integration/tests/server/mod.rs | 84 +++++++--------------------
2 files changed, 127 insertions(+), 64 deletions(-)
diff --git a/core/integration/src/test_server.rs
b/core/integration/src/test_server.rs
index fd718178e..99846d4ad 100644
--- a/core/integration/src/test_server.rs
+++ b/core/integration/src/test_server.rs
@@ -33,7 +33,9 @@ use std::io::Write;
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
use std::path::{Path, PathBuf};
use std::process::{Child, Command, Stdio};
-use std::thread::{available_parallelism, panicking, sleep};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::thread::{self, JoinHandle, available_parallelism, panicking, sleep};
use std::time::Duration;
use uuid::Uuid;
@@ -75,7 +77,6 @@ enum ServerProtocolAddr {
WebSocket(SocketAddr),
}
-#[derive(Debug)]
pub struct TestServer {
local_data_path: String,
envs: HashMap<String, String>,
@@ -85,6 +86,18 @@ pub struct TestServer {
stderr_file_path: Option<PathBuf>,
cleanup: bool,
server_executable_path: Option<String>,
+ watchdog_handle: Option<JoinHandle<()>>,
+ watchdog_stop: Arc<AtomicBool>,
+}
+
+impl std::fmt::Debug for TestServer {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("TestServer")
+ .field("local_data_path", &self.local_data_path)
+ .field("server_addrs", &self.server_addrs)
+ .field("cleanup", &self.cleanup)
+ .finish_non_exhaustive()
+ }
}
impl TestServer {
@@ -194,6 +207,8 @@ impl TestServer {
stderr_file_path: None,
cleanup,
server_executable_path,
+ watchdog_handle: None,
+ watchdog_stop: Arc::new(AtomicBool::new(false)),
}
}
@@ -234,11 +249,99 @@ impl TestServer {
}
let child = command.spawn().unwrap();
+ let pid = child.id();
self.child_handle = Some(child);
self.wait_until_server_has_bound();
+
+ let watchdog_stop = self.watchdog_stop.clone();
+ let stdout_path = self.stdout_file_path.clone();
+ let stderr_path = self.stderr_file_path.clone();
+ let watchdog_handle = thread::Builder::new()
+ .name("test-server-watchdog".to_string())
+ .spawn(move || {
+ Self::watchdog_loop(pid, watchdog_stop, stdout_path,
stderr_path);
+ })
+ .expect("Failed to spawn watchdog thread");
+ self.watchdog_handle = Some(watchdog_handle);
+ }
+
+ /// Watchdog loop that monitors the server process.
+ /// Panics if the server exits while the watchdog is still running (i.e.,
not gracefully stopped).
+ fn watchdog_loop(
+ pid: u32,
+ stop_signal: Arc<AtomicBool>,
+ stdout_path: Option<PathBuf>,
+ stderr_path: Option<PathBuf>,
+ ) {
+ const CHECK_INTERVAL: Duration = Duration::from_millis(100);
+
+ loop {
+ if stop_signal.load(Ordering::SeqCst) {
+ return;
+ }
+
+ // Check if process is alive AND not a zombie via /proc/{pid}/stat
+ // A zombie process still exists in the process table (kill
returns 0)
+ // but has state 'Z' in /proc/pid/stat
+ if !Self::is_process_alive(pid) {
+ let stdout_content = stdout_path
+ .as_ref()
+ .and_then(|p| fs::read_to_string(p).ok())
+ .unwrap_or_else(|| "[No stdout log]".to_string());
+
+ let stderr_content = stderr_path
+ .as_ref()
+ .and_then(|p| fs::read_to_string(p).ok())
+ .unwrap_or_else(|| "[No stderr log]".to_string());
+
+ eprintln!(
+ "\n\n=== SERVER CRASHED ===\n\
+ The iggy-server process (PID {}) has died unexpectedly!\n\
+ This usually indicates a bug in the server.\n\n\
+ === STDOUT ===\n{}\n\n\
+ === STDERR ===\n{}\n",
+ pid, stdout_content, stderr_content
+ );
+ std::process::abort();
+ }
+
+ thread::sleep(CHECK_INTERVAL);
+ }
+ }
+
+ #[cfg(target_os = "linux")]
+ fn is_process_alive(pid: u32) -> bool {
+ // /proc is the only reliable way to distinguish zombies from live
processes.
+ // A zombie still responds to kill(pid, 0) but we need to detect it as
dead.
+ let stat_path = format!("/proc/{}/stat", pid);
+ match fs::read_to_string(&stat_path) {
+ Ok(content) => {
+ // State char is after the command name in parentheses: "pid
(comm) S ..."
+ if let Some(state_start) = content.rfind(')') {
+ let state = content[state_start +
1..].trim().chars().next();
+ !matches!(state, Some('Z') | Some('X'))
+ } else {
+ false
+ }
+ }
+ Err(_) => false,
+ }
+ }
+
+ #[cfg(all(unix, not(target_os = "linux")))]
+ fn is_process_alive(pid: u32) -> bool {
+ // macOS lacks /proc, so we use signal 0 which checks process existence
+ // without actually sending anything. Zombies are less of a concern
here
+ // since our child will be reaped when we call wait() in stop().
+ unsafe { libc::kill(pid as libc::pid_t, 0) == 0 }
}
pub fn stop(&mut self) {
+ self.watchdog_stop.store(true, Ordering::SeqCst);
+ if let Some(watchdog) = self.watchdog_handle.take() {
+ let _ = watchdog.join();
+ }
+
#[allow(unused_mut)]
if let Some(mut child_handle) = self.child_handle.take() {
#[cfg(unix)]
diff --git a/core/integration/tests/server/mod.rs
b/core/integration/tests/server/mod.rs
index 0b8fd502b..fd799ad1f 100644
--- a/core/integration/tests/server/mod.rs
+++ b/core/integration/tests/server/mod.rs
@@ -22,7 +22,6 @@ mod general;
mod scenarios;
mod specific;
-use futures::FutureExt;
use iggy_common::TransportProtocol;
use integration::{
http_client::HttpClientFactory,
@@ -40,8 +39,6 @@ use scenarios::{
stream_size_validation_scenario, system_scenario, user_scenario,
};
use std::pin::Pin;
-use std::sync::{Arc, Mutex};
-use std::time::Duration;
use std::{collections::HashMap, future::Future};
type ScenarioFn = fn(&dyn ClientFactory) -> Pin<Box<dyn Future<Output = ()> +
'_>>;
@@ -116,67 +113,30 @@ async fn run_scenario(transport: TransportProtocol,
scenario: ScenarioFn) {
"IGGY_QUIC_KEEP_ALIVE_INTERVAL".to_string(),
"15s".to_string(),
);
- let test_server = TestServer::new(Some(extra_envs), true, None,
IpAddrKind::V4);
- let test_server = Arc::new(Mutex::new(test_server));
-
- test_server.lock().unwrap().start();
-
- let client_factory: Box<dyn ClientFactory> = {
- let server = test_server.lock().unwrap();
- match transport {
- TransportProtocol::Tcp => {
- let server_addr = server.get_raw_tcp_addr().unwrap();
- Box::new(TcpClientFactory {
- server_addr,
- ..Default::default()
- })
- }
- TransportProtocol::Quic => {
- let server_addr = server.get_quic_udp_addr().unwrap();
- Box::new(QuicClientFactory { server_addr })
- }
- TransportProtocol::Http => {
- let server_addr = server.get_http_api_addr().unwrap();
- Box::new(HttpClientFactory { server_addr })
- }
- TransportProtocol::WebSocket => {
- let server_addr = server.get_websocket_addr().unwrap();
- Box::new(WebSocketClientFactory { server_addr })
- }
+ let mut test_server = TestServer::new(Some(extra_envs), true, None,
IpAddrKind::V4);
+ test_server.start();
+
+ let client_factory: Box<dyn ClientFactory> = match transport {
+ TransportProtocol::Tcp => {
+ let server_addr = test_server.get_raw_tcp_addr().unwrap();
+ Box::new(TcpClientFactory {
+ server_addr,
+ ..Default::default()
+ })
}
- };
-
- let monitor_server = test_server.clone();
- let crash_monitor = async move {
- loop {
- tokio::time::sleep(Duration::from_millis(100)).await;
- let mut server = monitor_server.lock().unwrap();
- if !server.is_running() {
- let (stdout, stderr) = server.collect_logs();
- return (stdout, stderr);
- }
+ TransportProtocol::Quic => {
+ let server_addr = test_server.get_quic_udp_addr().unwrap();
+ Box::new(QuicClientFactory { server_addr })
}
- };
-
- tokio::select! {
- biased;
-
- (stdout, stderr) = crash_monitor => {
- panic!(
- "Server crashed during test!\n\n\
- === STDOUT ===\n{}\n\n\
- === STDERR ===\n{}",
- stdout, stderr
- );
+ TransportProtocol::Http => {
+ let server_addr = test_server.get_http_api_addr().unwrap();
+ Box::new(HttpClientFactory { server_addr })
}
-
- result =
std::panic::AssertUnwindSafe(scenario(&*client_factory)).catch_unwind() => {
- test_server.lock().unwrap().assert_running();
-
- // Re-raise any panic from the scenario
- if let Err(panic_payload) = result {
- std::panic::resume_unwind(panic_payload);
- }
+ TransportProtocol::WebSocket => {
+ let server_addr = test_server.get_websocket_addr().unwrap();
+ Box::new(WebSocketClientFactory { server_addr })
}
- }
+ };
+
+ scenario(&*client_factory).await;
}