This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new e2bd0f565 fix(server): graceful shutdown all shards when one panics
(#2439)
e2bd0f565 is described below
commit e2bd0f5650b99c89d2148366ab58fe982997dc1e
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Thu Dec 4 14:10:39 2025 +0100
fix(server): graceful shutdown all shards when one panics (#2439)
Wrap shard execution in `catch_unwind` and use `mpsc` channel to notify
main thread of shard completion status. When any shard panics or errors,
`initiate_shutdown` signals all remaining shards to stop gracefully.
---
core/server/src/main.rs | 200 ++++++++++++++++++++++++++++++----------
core/server/src/server_error.rs | 7 +-
2 files changed, 157 insertions(+), 50 deletions(-)
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 6f4384d04..d7b32f015 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -50,16 +50,57 @@ use server::streaming::storage::SystemStorage;
use server::streaming::utils::ptr::EternalPtr;
use server::versioning::SemanticVersion;
use std::collections::HashSet;
+use std::panic::AssertUnwindSafe;
use std::rc::Rc;
use std::str::FromStr;
use std::sync::Arc;
-use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
+use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
+use std::sync::mpsc;
+use std::thread::JoinHandle;
use tracing::{error, info, instrument, warn};
const COMPONENT: &str = "MAIN";
const SHARDS_TABLE_CAPACITY: usize = 16384;
static SHUTDOWN_START_TIME: AtomicU64 = AtomicU64::new(0);
+static SHUTDOWN_INITIATED: AtomicBool = AtomicBool::new(false);
+
+enum ShardExitStatus {
+ Success,
+ Error(String),
+ Panic(String),
+}
+
+fn initiate_shutdown(
+ reason: &str,
+ shutdown_handles: &[(u16,
server::shard::transmission::connector::StopSender)],
+) {
+ if SHUTDOWN_INITIATED.swap(true, Ordering::SeqCst) {
+ return;
+ }
+
+ let now = std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .map(|d| d.as_millis() as u64)
+ .unwrap_or(0);
+ SHUTDOWN_START_TIME.store(now, Ordering::SeqCst);
+
+ info!("{reason}, initiating graceful shutdown...");
+
+ for (shard_id, stop_sender) in shutdown_handles {
+ if let Err(e) = stop_sender.try_send(()) {
+ error!("Failed to send shutdown signal to shard {shard_id}: {e}");
+ }
+ }
+}
+
+fn extract_panic_message(payload: Box<dyn std::any::Any + Send>) -> String {
+ payload
+ .downcast_ref::<&str>()
+ .map(|s| s.to_string())
+ .or_else(|| payload.downcast_ref::<String>().cloned())
+ .unwrap_or_else(|| "unknown panic".to_string())
+}
#[instrument(skip_all, name = "trace_start_server")]
fn main() -> Result<(), ServerError> {
@@ -266,9 +307,13 @@ fn main() -> Result<(), ServerError> {
let metrics = Metrics::init();
// TWELFTH DISCRETE LOADING STEP.
- info!("Starting {} shard(s)", shards_set.len());
+ let shards_count = shards_set.len();
+ info!("Starting {} shard(s)", shards_count);
let (connections, shutdown_handles) =
create_shard_connections(&shards_set);
- let mut handles = Vec::with_capacity(shards_set.len());
+ let mut handles: Vec<JoinHandle<()>> =
Vec::with_capacity(shards_count);
+
+ // Channel for shard completion notifications
+ let (shard_done_tx, shard_done_rx) = mpsc::channel::<(u16,
ShardExitStatus)>();
// TODO: Persist the shards table and load it from the disk, so it
does not have to be
// THIRTEENTH DISCRETE LOADING STEP.
@@ -356,66 +401,115 @@ fn main() -> Result<(), ServerError> {
}
});
+ let shard_done_tx = shard_done_tx.clone();
let handle = std::thread::Builder::new()
.name(format!("shard-{id}"))
.spawn(move || {
- let affinity_set = HashSet::from([cpu_id]);
- let rt = create_shard_executor(affinity_set);
- rt.block_on(async move {
- let builder = IggyShard::builder();
- let shard = builder
- .id(id)
- .streams(streams)
- .state(state)
- .users(users)
- .shards_table(shards_table)
- .connections(connections)
- .clients_manager(client_manager)
- .config(config)
- .encryptor(encryptor)
- .version(current_version)
- .metrics(metrics)
- .is_follower(is_follower)
- .build();
- let shard = Rc::new(shard);
-
- if let Err(e) = shard.run().await {
- error!("Failed to run shard-{id}: {e}");
+ let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
+ let affinity_set = HashSet::from([cpu_id]);
+ let rt = create_shard_executor(affinity_set);
+ rt.block_on(async move {
+ let builder = IggyShard::builder();
+ let shard = builder
+ .id(id)
+ .streams(streams)
+ .state(state)
+ .users(users)
+ .shards_table(shards_table)
+ .connections(connections)
+ .clients_manager(client_manager)
+ .config(config)
+ .encryptor(encryptor)
+ .version(current_version)
+ .metrics(metrics)
+ .is_follower(is_follower)
+ .build();
+
+ let shard = Rc::new(shard);
+
+ if let Err(e) = shard.run().await {
+ error!("Failed to run shard-{id}: {e}");
+ return Err(e.to_string());
+ }
+ info!("Shard {id} run completed");
+ Ok(())
+ })
+ }));
+
+ let status = match result {
+ Ok(Ok(())) => ShardExitStatus::Success,
+ Ok(Err(msg)) => ShardExitStatus::Error(msg),
+ Err(panic_payload) => {
+
ShardExitStatus::Panic(extract_panic_message(panic_payload))
}
- info!("Run completed");
- })
+ };
+
+ let _ = shard_done_tx.send((id, status));
})
.unwrap_or_else(|e| panic!("Failed to spawn thread for
shard-{id}: {e}"));
handles.push(handle);
}
+ drop(shard_done_tx);
+
let shutdown_handles_for_signal = shutdown_handles.clone();
ctrlc::set_handler(move || {
- let now = std::time::SystemTime::now()
- .duration_since(std::time::UNIX_EPOCH)
- .unwrap()
- .as_millis() as u64;
- SHUTDOWN_START_TIME.store(now, Ordering::SeqCst);
-
- info!("Received shutdown signal (SIGTERM/SIGINT), initiating
graceful shutdown...");
-
- for (shard_id, stop_sender) in &shutdown_handles_for_signal {
- if let Err(e) = stop_sender.try_send(()) {
- error!(
- "Failed to send shutdown signal to shard {}: {}",
- shard_id, e
- );
- }
- }
+ initiate_shutdown(
+ "Received shutdown signal (SIGTERM/SIGINT)",
+ &shutdown_handles_for_signal,
+ );
})
.expect("Error setting Ctrl-C handler");
info!("Iggy server is running. Press Ctrl+C or send SIGTERM to
shutdown.");
+ let mut completed_shards = 0usize;
+ let mut failure_message: Option<String> = None;
+
+ while completed_shards < shards_count {
+ match shard_done_rx.recv() {
+ Ok((shard_id, status)) => {
+ completed_shards += 1;
+
+ match status {
+ ShardExitStatus::Success => {
+ info!("Shard {shard_id} exited successfully");
+ }
+ ShardExitStatus::Error(msg) => {
+ error!("Shard {shard_id} exited with error:
{msg}");
+ if failure_message.is_none() {
+ failure_message =
+ Some(format!("Shard {shard_id} exited with
error: {msg}"));
+ }
+ initiate_shutdown(
+ &format!("Shard {shard_id} exited with error"),
+ &shutdown_handles,
+ );
+ }
+ ShardExitStatus::Panic(msg) => {
+ error!("Shard {shard_id} panicked: {msg}");
+ if failure_message.is_none() {
+ failure_message =
+ Some(format!("Shard {shard_id} panicked:
{msg}"));
+ }
+ initiate_shutdown(
+ &format!("Shard {shard_id} panicked"),
+ &shutdown_handles,
+ );
+ }
+ }
+ }
+ Err(_) => {
+ error!("Shard completion channel closed unexpectedly");
+ break;
+ }
+ }
+ }
+
for (idx, handle) in handles.into_iter().enumerate() {
- handle
- .join()
- .unwrap_or_else(|_| panic!("Failed to join shard thread-{}",
idx));
+ if let Err(e) = handle.join() {
+ warn!("Shard {idx} thread join returned panic: {e:?}");
+ }
}
let shutdown_duration_msg = {
@@ -423,15 +517,23 @@ fn main() -> Result<(), ServerError> {
if start_time > 0 {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
- .unwrap()
- .as_millis() as u64;
- let elapsed = now - start_time;
+ .map(|d| d.as_millis() as u64)
+ .unwrap_or(0);
+ let elapsed = now.saturating_sub(start_time);
format!(" (shutdown took {} ms)", elapsed)
} else {
String::new()
}
};
+ if let Some(msg) = failure_message {
+ error!(
+ "Server shutting down due to shard failure.{}",
+ shutdown_duration_msg
+ );
+ return Err(ServerError::ShardFailure { message: msg });
+ }
+
info!(
"All shards have shut down. Iggy server is exiting.{}",
shutdown_duration_msg
diff --git a/core/server/src/server_error.rs b/core/server/src/server_error.rs
index 4cc01f414..c7e211465 100644
--- a/core/server/src/server_error.rs
+++ b/core/server/src/server_error.rs
@@ -23,7 +23,7 @@ use std::array::TryFromSliceError;
use std::io;
error_set!(
- ServerError := ConfigurationError || ArchiverError || ConnectionError ||
LogError || CompatError || QuicError
+ ServerError := ConfigurationError || ArchiverError || ConnectionError ||
LogError || CompatError || QuicError || ShardError
IoError := {
#[display("IO error")]
@@ -96,4 +96,9 @@ error_set!(
#[display("Transport config error")]
TransportConfigError,
}
+
+ ShardError := {
+ #[display("Shard failed: {}", message)]
+ ShardFailure { message: String },
+ }
);