This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new ac6b30bb8 fix(integration): add watchdog to detect test server crashes 
(#2576)
ac6b30bb8 is described below

commit ac6b30bb860b426ede16181a1d9ca4665763c0e5
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Jan 16 13:16:55 2026 +0100

    fix(integration): add watchdog to detect test server crashes (#2576)
    
    If the server dies unexpectedly (crash, panic), the watchdog
    prints stdout/stderr logs and aborts the test process, making
    failures immediately visible instead of hanging.
---
 core/integration/src/test_server.rs  | 107 ++++++++++++++++++++++++++++++++++-
 core/integration/tests/server/mod.rs |  84 +++++++--------------------
 2 files changed, 127 insertions(+), 64 deletions(-)

diff --git a/core/integration/src/test_server.rs 
b/core/integration/src/test_server.rs
index fd718178e..99846d4ad 100644
--- a/core/integration/src/test_server.rs
+++ b/core/integration/src/test_server.rs
@@ -33,7 +33,9 @@ use std::io::Write;
 use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
 use std::path::{Path, PathBuf};
 use std::process::{Child, Command, Stdio};
-use std::thread::{available_parallelism, panicking, sleep};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::thread::{self, JoinHandle, available_parallelism, panicking, sleep};
 use std::time::Duration;
 use uuid::Uuid;
 
@@ -75,7 +77,6 @@ enum ServerProtocolAddr {
     WebSocket(SocketAddr),
 }
 
-#[derive(Debug)]
 pub struct TestServer {
     local_data_path: String,
     envs: HashMap<String, String>,
@@ -85,6 +86,18 @@ pub struct TestServer {
     stderr_file_path: Option<PathBuf>,
     cleanup: bool,
     server_executable_path: Option<String>,
+    watchdog_handle: Option<JoinHandle<()>>,
+    watchdog_stop: Arc<AtomicBool>,
+}
+
+impl std::fmt::Debug for TestServer {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("TestServer")
+            .field("local_data_path", &self.local_data_path)
+            .field("server_addrs", &self.server_addrs)
+            .field("cleanup", &self.cleanup)
+            .finish_non_exhaustive()
+    }
 }
 
 impl TestServer {
@@ -194,6 +207,8 @@ impl TestServer {
             stderr_file_path: None,
             cleanup,
             server_executable_path,
+            watchdog_handle: None,
+            watchdog_stop: Arc::new(AtomicBool::new(false)),
         }
     }
 
@@ -234,11 +249,99 @@ impl TestServer {
         }
 
         let child = command.spawn().unwrap();
+        let pid = child.id();
         self.child_handle = Some(child);
         self.wait_until_server_has_bound();
+
+        let watchdog_stop = self.watchdog_stop.clone();
+        let stdout_path = self.stdout_file_path.clone();
+        let stderr_path = self.stderr_file_path.clone();
+        let watchdog_handle = thread::Builder::new()
+            .name("test-server-watchdog".to_string())
+            .spawn(move || {
+                Self::watchdog_loop(pid, watchdog_stop, stdout_path, 
stderr_path);
+            })
+            .expect("Failed to spawn watchdog thread");
+        self.watchdog_handle = Some(watchdog_handle);
+    }
+
+    /// Watchdog loop that monitors the server process.
+    /// Panics if the server exits while the watchdog is still running (i.e., 
not gracefully stopped).
+    fn watchdog_loop(
+        pid: u32,
+        stop_signal: Arc<AtomicBool>,
+        stdout_path: Option<PathBuf>,
+        stderr_path: Option<PathBuf>,
+    ) {
+        const CHECK_INTERVAL: Duration = Duration::from_millis(100);
+
+        loop {
+            if stop_signal.load(Ordering::SeqCst) {
+                return;
+            }
+
+            // Check if process is alive AND not a zombie via /proc/{pid}/stat
+            // A zombie process still exists in the process table (kill 
returns 0)
+            // but has state 'Z' in /proc/pid/stat
+            if !Self::is_process_alive(pid) {
+                let stdout_content = stdout_path
+                    .as_ref()
+                    .and_then(|p| fs::read_to_string(p).ok())
+                    .unwrap_or_else(|| "[No stdout log]".to_string());
+
+                let stderr_content = stderr_path
+                    .as_ref()
+                    .and_then(|p| fs::read_to_string(p).ok())
+                    .unwrap_or_else(|| "[No stderr log]".to_string());
+
+                eprintln!(
+                    "\n\n=== SERVER CRASHED ===\n\
+                     The iggy-server process (PID {}) has died unexpectedly!\n\
+                     This usually indicates a bug in the server.\n\n\
+                     === STDOUT ===\n{}\n\n\
+                     === STDERR ===\n{}\n",
+                    pid, stdout_content, stderr_content
+                );
+                std::process::abort();
+            }
+
+            thread::sleep(CHECK_INTERVAL);
+        }
+    }
+
+    #[cfg(target_os = "linux")]
+    fn is_process_alive(pid: u32) -> bool {
+        // /proc is the only reliable way to distinguish zombies from live 
processes.
+        // A zombie still responds to kill(pid, 0) but we need to detect it as 
dead.
+        let stat_path = format!("/proc/{}/stat", pid);
+        match fs::read_to_string(&stat_path) {
+            Ok(content) => {
+                // State char is after the command name in parentheses: "pid 
(comm) S ..."
+                if let Some(state_start) = content.rfind(')') {
+                    let state = content[state_start + 
1..].trim().chars().next();
+                    !matches!(state, Some('Z') | Some('X'))
+                } else {
+                    false
+                }
+            }
+            Err(_) => false,
+        }
+    }
+
+    #[cfg(all(unix, not(target_os = "linux")))]
+    fn is_process_alive(pid: u32) -> bool {
+        // macOS lacks /proc, so we use signal 0 which checks process existence
+        // without actually sending anything. Zombies are less of a concern 
here
+        // since our child will be reaped when we call wait() in stop().
+        unsafe { libc::kill(pid as libc::pid_t, 0) == 0 }
     }
 
     pub fn stop(&mut self) {
+        self.watchdog_stop.store(true, Ordering::SeqCst);
+        if let Some(watchdog) = self.watchdog_handle.take() {
+            let _ = watchdog.join();
+        }
+
         #[allow(unused_mut)]
         if let Some(mut child_handle) = self.child_handle.take() {
             #[cfg(unix)]
diff --git a/core/integration/tests/server/mod.rs 
b/core/integration/tests/server/mod.rs
index 0b8fd502b..fd799ad1f 100644
--- a/core/integration/tests/server/mod.rs
+++ b/core/integration/tests/server/mod.rs
@@ -22,7 +22,6 @@ mod general;
 mod scenarios;
 mod specific;
 
-use futures::FutureExt;
 use iggy_common::TransportProtocol;
 use integration::{
     http_client::HttpClientFactory,
@@ -40,8 +39,6 @@ use scenarios::{
     stream_size_validation_scenario, system_scenario, user_scenario,
 };
 use std::pin::Pin;
-use std::sync::{Arc, Mutex};
-use std::time::Duration;
 use std::{collections::HashMap, future::Future};
 
 type ScenarioFn = fn(&dyn ClientFactory) -> Pin<Box<dyn Future<Output = ()> + 
'_>>;
@@ -116,67 +113,30 @@ async fn run_scenario(transport: TransportProtocol, 
scenario: ScenarioFn) {
         "IGGY_QUIC_KEEP_ALIVE_INTERVAL".to_string(),
         "15s".to_string(),
     );
-    let test_server = TestServer::new(Some(extra_envs), true, None, 
IpAddrKind::V4);
-    let test_server = Arc::new(Mutex::new(test_server));
-
-    test_server.lock().unwrap().start();
-
-    let client_factory: Box<dyn ClientFactory> = {
-        let server = test_server.lock().unwrap();
-        match transport {
-            TransportProtocol::Tcp => {
-                let server_addr = server.get_raw_tcp_addr().unwrap();
-                Box::new(TcpClientFactory {
-                    server_addr,
-                    ..Default::default()
-                })
-            }
-            TransportProtocol::Quic => {
-                let server_addr = server.get_quic_udp_addr().unwrap();
-                Box::new(QuicClientFactory { server_addr })
-            }
-            TransportProtocol::Http => {
-                let server_addr = server.get_http_api_addr().unwrap();
-                Box::new(HttpClientFactory { server_addr })
-            }
-            TransportProtocol::WebSocket => {
-                let server_addr = server.get_websocket_addr().unwrap();
-                Box::new(WebSocketClientFactory { server_addr })
-            }
+    let mut test_server = TestServer::new(Some(extra_envs), true, None, 
IpAddrKind::V4);
+    test_server.start();
+
+    let client_factory: Box<dyn ClientFactory> = match transport {
+        TransportProtocol::Tcp => {
+            let server_addr = test_server.get_raw_tcp_addr().unwrap();
+            Box::new(TcpClientFactory {
+                server_addr,
+                ..Default::default()
+            })
         }
-    };
-
-    let monitor_server = test_server.clone();
-    let crash_monitor = async move {
-        loop {
-            tokio::time::sleep(Duration::from_millis(100)).await;
-            let mut server = monitor_server.lock().unwrap();
-            if !server.is_running() {
-                let (stdout, stderr) = server.collect_logs();
-                return (stdout, stderr);
-            }
+        TransportProtocol::Quic => {
+            let server_addr = test_server.get_quic_udp_addr().unwrap();
+            Box::new(QuicClientFactory { server_addr })
         }
-    };
-
-    tokio::select! {
-        biased;
-
-        (stdout, stderr) = crash_monitor => {
-            panic!(
-                "Server crashed during test!\n\n\
-                 === STDOUT ===\n{}\n\n\
-                 === STDERR ===\n{}",
-                stdout, stderr
-            );
+        TransportProtocol::Http => {
+            let server_addr = test_server.get_http_api_addr().unwrap();
+            Box::new(HttpClientFactory { server_addr })
         }
-
-        result = 
std::panic::AssertUnwindSafe(scenario(&*client_factory)).catch_unwind() => {
-            test_server.lock().unwrap().assert_running();
-
-            // Re-raise any panic from the scenario
-            if let Err(panic_payload) = result {
-                std::panic::resume_unwind(panic_payload);
-            }
+        TransportProtocol::WebSocket => {
+            let server_addr = test_server.get_websocket_addr().unwrap();
+            Box::new(WebSocketClientFactory { server_addr })
         }
-    }
+    };
+
+    scenario(&*client_factory).await;
 }

Reply via email to