This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch panic-handler in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 90ac0506d49b53dab8a9cccfd21bee42110f9fbd Author: Hubert Gruszecki <[email protected]> AuthorDate: Thu Dec 4 12:24:55 2025 +0100 fix(server): graceful shutdown all shards when one panics Wrap shard execution in catch_unwind and use mpsc channel to notify main thread of shard completion status. When any shard panics or errors, initiate_shutdown signals all remaining shards to stop gracefully. --- core/server/src/main.rs | 200 ++++++++++++++++++++++++++++++---------- core/server/src/server_error.rs | 7 +- 2 files changed, 157 insertions(+), 50 deletions(-) diff --git a/core/server/src/main.rs b/core/server/src/main.rs index 6f4384d04..d7b32f015 100644 --- a/core/server/src/main.rs +++ b/core/server/src/main.rs @@ -50,16 +50,57 @@ use server::streaming::storage::SystemStorage; use server::streaming::utils::ptr::EternalPtr; use server::versioning::SemanticVersion; use std::collections::HashSet; +use std::panic::AssertUnwindSafe; use std::rc::Rc; use std::str::FromStr; use std::sync::Arc; -use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}; +use std::sync::mpsc; +use std::thread::JoinHandle; use tracing::{error, info, instrument, warn}; const COMPONENT: &str = "MAIN"; const SHARDS_TABLE_CAPACITY: usize = 16384; static SHUTDOWN_START_TIME: AtomicU64 = AtomicU64::new(0); +static SHUTDOWN_INITIATED: AtomicBool = AtomicBool::new(false); + +enum ShardExitStatus { + Success, + Error(String), + Panic(String), +} + +fn initiate_shutdown( + reason: &str, + shutdown_handles: &[(u16, server::shard::transmission::connector::StopSender)], +) { + if SHUTDOWN_INITIATED.swap(true, Ordering::SeqCst) { + return; + } + + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_millis() as u64) + .unwrap_or(0); + SHUTDOWN_START_TIME.store(now, Ordering::SeqCst); + + info!("{reason}, initiating graceful shutdown..."); + + for (shard_id, stop_sender) in shutdown_handles { + if let Err(e) = stop_sender.try_send(()) { + error!("Failed to send shutdown signal to shard {shard_id}: {e}"); + } + } +} + +fn extract_panic_message(payload: Box<dyn std::any::Any + Send>) -> String { + payload + .downcast_ref::<&str>() + .map(|s| s.to_string()) + .or_else(|| payload.downcast_ref::<String>().cloned()) + .unwrap_or_else(|| "unknown panic".to_string()) +} #[instrument(skip_all, name = "trace_start_server")] fn main() -> Result<(), ServerError> { @@ -266,9 +307,13 @@ fn main() -> Result<(), ServerError> { let metrics = Metrics::init(); // TWELFTH DISCRETE LOADING STEP. - info!("Starting {} shard(s)", shards_set.len()); + let shards_count = shards_set.len(); + info!("Starting {} shard(s)", shards_count); let (connections, shutdown_handles) = create_shard_connections(&shards_set); - let mut handles = Vec::with_capacity(shards_set.len()); + let mut handles: Vec<JoinHandle<()>> = Vec::with_capacity(shards_count); + + // Channel for shard completion notifications + let (shard_done_tx, shard_done_rx) = mpsc::channel::<(u16, ShardExitStatus)>(); // TODO: Persist the shards table and load it from the disk, so it does not have to be // THIRTEENTH DISCRETE LOADING STEP. @@ -356,66 +401,115 @@ fn main() -> Result<(), ServerError> { } }); + let shard_done_tx = shard_done_tx.clone(); let handle = std::thread::Builder::new() .name(format!("shard-{id}")) .spawn(move || { - let affinity_set = HashSet::from([cpu_id]); - let rt = create_shard_executor(affinity_set); - rt.block_on(async move { - let builder = IggyShard::builder(); - let shard = builder - .id(id) - .streams(streams) - .state(state) - .users(users) - .shards_table(shards_table) - .connections(connections) - .clients_manager(client_manager) - .config(config) - .encryptor(encryptor) - .version(current_version) - .metrics(metrics) - .is_follower(is_follower) - .build(); - let shard = Rc::new(shard); - - if let Err(e) = shard.run().await { - error!("Failed to run shard-{id}: {e}"); + let result = std::panic::catch_unwind(AssertUnwindSafe(|| { + let affinity_set = HashSet::from([cpu_id]); + let rt = create_shard_executor(affinity_set); + rt.block_on(async move { + let builder = IggyShard::builder(); + let shard = builder + .id(id) + .streams(streams) + .state(state) + .users(users) + .shards_table(shards_table) + .connections(connections) + .clients_manager(client_manager) + .config(config) + .encryptor(encryptor) + .version(current_version) + .metrics(metrics) + .is_follower(is_follower) + .build(); + + let shard = Rc::new(shard); + + if let Err(e) = shard.run().await { + error!("Failed to run shard-{id}: {e}"); + return Err(e.to_string()); + } + info!("Shard {id} run completed"); + Ok(()) + }) + })); + + let status = match result { + Ok(Ok(())) => ShardExitStatus::Success, + Ok(Err(msg)) => ShardExitStatus::Error(msg), + Err(panic_payload) => { + ShardExitStatus::Panic(extract_panic_message(panic_payload)) } - info!("Run completed"); - }) + }; + + let _ = shard_done_tx.send((id, status)); }) .unwrap_or_else(|e| panic!("Failed to spawn thread for shard-{id}: {e}")); handles.push(handle); } + drop(shard_done_tx); + let shutdown_handles_for_signal = shutdown_handles.clone(); ctrlc::set_handler(move || { - let now = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_millis() as u64; - SHUTDOWN_START_TIME.store(now, Ordering::SeqCst); - - info!("Received shutdown signal (SIGTERM/SIGINT), initiating graceful shutdown..."); - - for (shard_id, stop_sender) in &shutdown_handles_for_signal { - if let Err(e) = stop_sender.try_send(()) { - error!( - "Failed to send shutdown signal to shard {}: {}", - shard_id, e - ); - } - } + initiate_shutdown( + "Received shutdown signal (SIGTERM/SIGINT)", + &shutdown_handles_for_signal, + ); }) .expect("Error setting Ctrl-C handler"); info!("Iggy server is running. Press Ctrl+C or send SIGTERM to shutdown."); + let mut completed_shards = 0usize; + let mut failure_message: Option<String> = None; + + while completed_shards < shards_count { + match shard_done_rx.recv() { + Ok((shard_id, status)) => { + completed_shards += 1; + + match status { + ShardExitStatus::Success => { + info!("Shard {shard_id} exited successfully"); + } + ShardExitStatus::Error(msg) => { + error!("Shard {shard_id} exited with error: {msg}"); + if failure_message.is_none() { + failure_message = + Some(format!("Shard {shard_id} exited with error: {msg}")); + } + initiate_shutdown( + &format!("Shard {shard_id} exited with error"), + &shutdown_handles, + ); + } + ShardExitStatus::Panic(msg) => { + error!("Shard {shard_id} panicked: {msg}"); + if failure_message.is_none() { + failure_message = + Some(format!("Shard {shard_id} panicked: {msg}")); + } + initiate_shutdown( + &format!("Shard {shard_id} panicked"), + &shutdown_handles, + ); + } + } + } + Err(_) => { + error!("Shard completion channel closed unexpectedly"); + break; + } + } + } + for (idx, handle) in handles.into_iter().enumerate() { - handle - .join() - .unwrap_or_else(|_| panic!("Failed to join shard thread-{}", idx)); + if let Err(e) = handle.join() { + warn!("Shard {idx} thread join returned panic: {e:?}"); + } } let shutdown_duration_msg = { @@ -423,15 +517,23 @@ fn main() -> Result<(), ServerError> { if start_time > 0 { let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_millis() as u64; - let elapsed = now - start_time; + .map(|d| d.as_millis() as u64) + .unwrap_or(0); + let elapsed = now.saturating_sub(start_time); format!(" (shutdown took {} ms)", elapsed) } else { String::new() } }; + if let Some(msg) = failure_message { + error!( + "Server shutting down due to shard failure.{}", + shutdown_duration_msg + ); + return Err(ServerError::ShardFailure { message: msg }); + } + info!( "All shards have shut down. Iggy server is exiting.{}", shutdown_duration_msg diff --git a/core/server/src/server_error.rs b/core/server/src/server_error.rs index 4cc01f414..c7e211465 100644 --- a/core/server/src/server_error.rs +++ b/core/server/src/server_error.rs @@ -23,7 +23,7 @@ use std::array::TryFromSliceError; use std::io; error_set!( - ServerError := ConfigurationError || ArchiverError || ConnectionError || LogError || CompatError || QuicError + ServerError := ConfigurationError || ArchiverError || ConnectionError || LogError || CompatError || QuicError || ShardError IoError := { #[display("IO error")] @@ -96,4 +96,9 @@ error_set!( #[display("Transport config error")] TransportConfigError, } + + ShardError := { + #[display("Shard failed: {}", message)] + ShardFailure { message: String }, + } );
