This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new e2bd0f565 fix(server): graceful shutdown all shards when one panics 
(#2439)
e2bd0f565 is described below

commit e2bd0f5650b99c89d2148366ab58fe982997dc1e
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Thu Dec 4 14:10:39 2025 +0100

    fix(server): graceful shutdown all shards when one panics (#2439)
    
    Wrap shard execution in `catch_unwind` and use `mpsc` channel to notify
    main thread of shard completion status. When any shard panics or errors,
    `initiate_shutdown` signals all remaining shards to stop gracefully.
---
 core/server/src/main.rs         | 200 ++++++++++++++++++++++++++++++----------
 core/server/src/server_error.rs |   7 +-
 2 files changed, 157 insertions(+), 50 deletions(-)

diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 6f4384d04..d7b32f015 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -50,16 +50,57 @@ use server::streaming::storage::SystemStorage;
 use server::streaming::utils::ptr::EternalPtr;
 use server::versioning::SemanticVersion;
 use std::collections::HashSet;
+use std::panic::AssertUnwindSafe;
 use std::rc::Rc;
 use std::str::FromStr;
 use std::sync::Arc;
-use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
+use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
+use std::sync::mpsc;
+use std::thread::JoinHandle;
 use tracing::{error, info, instrument, warn};
 
 const COMPONENT: &str = "MAIN";
 const SHARDS_TABLE_CAPACITY: usize = 16384;
 
 static SHUTDOWN_START_TIME: AtomicU64 = AtomicU64::new(0);
+static SHUTDOWN_INITIATED: AtomicBool = AtomicBool::new(false);
+
+enum ShardExitStatus {
+    Success,
+    Error(String),
+    Panic(String),
+}
+
+fn initiate_shutdown(
+    reason: &str,
+    shutdown_handles: &[(u16, 
server::shard::transmission::connector::StopSender)],
+) {
+    if SHUTDOWN_INITIATED.swap(true, Ordering::SeqCst) {
+        return;
+    }
+
+    let now = std::time::SystemTime::now()
+        .duration_since(std::time::UNIX_EPOCH)
+        .map(|d| d.as_millis() as u64)
+        .unwrap_or(0);
+    SHUTDOWN_START_TIME.store(now, Ordering::SeqCst);
+
+    info!("{reason}, initiating graceful shutdown...");
+
+    for (shard_id, stop_sender) in shutdown_handles {
+        if let Err(e) = stop_sender.try_send(()) {
+            error!("Failed to send shutdown signal to shard {shard_id}: {e}");
+        }
+    }
+}
+
+fn extract_panic_message(payload: Box<dyn std::any::Any + Send>) -> String {
+    payload
+        .downcast_ref::<&str>()
+        .map(|s| s.to_string())
+        .or_else(|| payload.downcast_ref::<String>().cloned())
+        .unwrap_or_else(|| "unknown panic".to_string())
+}
 
 #[instrument(skip_all, name = "trace_start_server")]
 fn main() -> Result<(), ServerError> {
@@ -266,9 +307,13 @@ fn main() -> Result<(), ServerError> {
         let metrics = Metrics::init();
 
         // TWELFTH DISCRETE LOADING STEP.
-        info!("Starting {} shard(s)", shards_set.len());
+        let shards_count = shards_set.len();
+        info!("Starting {} shard(s)", shards_count);
         let (connections, shutdown_handles) = 
create_shard_connections(&shards_set);
-        let mut handles = Vec::with_capacity(shards_set.len());
+        let mut handles: Vec<JoinHandle<()>> = 
Vec::with_capacity(shards_count);
+
+        // Channel for shard completion notifications
+        let (shard_done_tx, shard_done_rx) = mpsc::channel::<(u16, 
ShardExitStatus)>();
 
         // TODO: Persist the shards table and load it from the disk, so it 
does not have to be
         // THIRTEENTH DISCRETE LOADING STEP.
@@ -356,66 +401,115 @@ fn main() -> Result<(), ServerError> {
                 }
             });
 
+            let shard_done_tx = shard_done_tx.clone();
             let handle = std::thread::Builder::new()
                 .name(format!("shard-{id}"))
                 .spawn(move || {
-                    let affinity_set = HashSet::from([cpu_id]);
-                    let rt = create_shard_executor(affinity_set);
-                    rt.block_on(async move {
-                        let builder = IggyShard::builder();
-                        let shard = builder
-                            .id(id)
-                            .streams(streams)
-                            .state(state)
-                            .users(users)
-                            .shards_table(shards_table)
-                            .connections(connections)
-                            .clients_manager(client_manager)
-                            .config(config)
-                            .encryptor(encryptor)
-                            .version(current_version)
-                            .metrics(metrics)
-                            .is_follower(is_follower)
-                            .build();
-                        let shard = Rc::new(shard);
-
-                        if let Err(e) = shard.run().await {
-                            error!("Failed to run shard-{id}: {e}");
+                    let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
+                        let affinity_set = HashSet::from([cpu_id]);
+                        let rt = create_shard_executor(affinity_set);
+                        rt.block_on(async move {
+                            let builder = IggyShard::builder();
+                            let shard = builder
+                                .id(id)
+                                .streams(streams)
+                                .state(state)
+                                .users(users)
+                                .shards_table(shards_table)
+                                .connections(connections)
+                                .clients_manager(client_manager)
+                                .config(config)
+                                .encryptor(encryptor)
+                                .version(current_version)
+                                .metrics(metrics)
+                                .is_follower(is_follower)
+                                .build();
+
+                            let shard = Rc::new(shard);
+
+                            if let Err(e) = shard.run().await {
+                                error!("Failed to run shard-{id}: {e}");
+                                return Err(e.to_string());
+                            }
+                            info!("Shard {id} run completed");
+                            Ok(())
+                        })
+                    }));
+
+                    let status = match result {
+                        Ok(Ok(())) => ShardExitStatus::Success,
+                        Ok(Err(msg)) => ShardExitStatus::Error(msg),
+                        Err(panic_payload) => {
+                            
ShardExitStatus::Panic(extract_panic_message(panic_payload))
                         }
-                        info!("Run completed");
-                    })
+                    };
+
+                    let _ = shard_done_tx.send((id, status));
                 })
                 .unwrap_or_else(|e| panic!("Failed to spawn thread for 
shard-{id}: {e}"));
             handles.push(handle);
         }
 
+        drop(shard_done_tx);
+
         let shutdown_handles_for_signal = shutdown_handles.clone();
         ctrlc::set_handler(move || {
-            let now = std::time::SystemTime::now()
-                .duration_since(std::time::UNIX_EPOCH)
-                .unwrap()
-                .as_millis() as u64;
-            SHUTDOWN_START_TIME.store(now, Ordering::SeqCst);
-
-            info!("Received shutdown signal (SIGTERM/SIGINT), initiating 
graceful shutdown...");
-
-            for (shard_id, stop_sender) in &shutdown_handles_for_signal {
-                if let Err(e) = stop_sender.try_send(()) {
-                    error!(
-                        "Failed to send shutdown signal to shard {}: {}",
-                        shard_id, e
-                    );
-                }
-            }
+            initiate_shutdown(
+                "Received shutdown signal (SIGTERM/SIGINT)",
+                &shutdown_handles_for_signal,
+            );
         })
         .expect("Error setting Ctrl-C handler");
 
         info!("Iggy server is running. Press Ctrl+C or send SIGTERM to 
shutdown.");
 
+        let mut completed_shards = 0usize;
+        let mut failure_message: Option<String> = None;
+
+        while completed_shards < shards_count {
+            match shard_done_rx.recv() {
+                Ok((shard_id, status)) => {
+                    completed_shards += 1;
+
+                    match status {
+                        ShardExitStatus::Success => {
+                            info!("Shard {shard_id} exited successfully");
+                        }
+                        ShardExitStatus::Error(msg) => {
+                            error!("Shard {shard_id} exited with error: 
{msg}");
+                            if failure_message.is_none() {
+                                failure_message =
+                                    Some(format!("Shard {shard_id} exited with 
error: {msg}"));
+                            }
+                            initiate_shutdown(
+                                &format!("Shard {shard_id} exited with error"),
+                                &shutdown_handles,
+                            );
+                        }
+                        ShardExitStatus::Panic(msg) => {
+                            error!("Shard {shard_id} panicked: {msg}");
+                            if failure_message.is_none() {
+                                failure_message =
+                                    Some(format!("Shard {shard_id} panicked: 
{msg}"));
+                            }
+                            initiate_shutdown(
+                                &format!("Shard {shard_id} panicked"),
+                                &shutdown_handles,
+                            );
+                        }
+                    }
+                }
+                Err(_) => {
+                    error!("Shard completion channel closed unexpectedly");
+                    break;
+                }
+            }
+        }
+
         for (idx, handle) in handles.into_iter().enumerate() {
-            handle
-                .join()
-                .unwrap_or_else(|_| panic!("Failed to join shard thread-{}", 
idx));
+            if let Err(e) = handle.join() {
+                warn!("Shard {idx} thread join returned panic: {e:?}");
+            }
         }
 
         let shutdown_duration_msg = {
@@ -423,15 +517,23 @@ fn main() -> Result<(), ServerError> {
             if start_time > 0 {
                 let now = std::time::SystemTime::now()
                     .duration_since(std::time::UNIX_EPOCH)
-                    .unwrap()
-                    .as_millis() as u64;
-                let elapsed = now - start_time;
+                    .map(|d| d.as_millis() as u64)
+                    .unwrap_or(0);
+                let elapsed = now.saturating_sub(start_time);
                 format!(" (shutdown took {} ms)", elapsed)
             } else {
                 String::new()
             }
         };
 
+        if let Some(msg) = failure_message {
+            error!(
+                "Server shutting down due to shard failure.{}",
+                shutdown_duration_msg
+            );
+            return Err(ServerError::ShardFailure { message: msg });
+        }
+
         info!(
             "All shards have shut down. Iggy server is exiting.{}",
             shutdown_duration_msg
diff --git a/core/server/src/server_error.rs b/core/server/src/server_error.rs
index 4cc01f414..c7e211465 100644
--- a/core/server/src/server_error.rs
+++ b/core/server/src/server_error.rs
@@ -23,7 +23,7 @@ use std::array::TryFromSliceError;
 use std::io;
 
 error_set!(
-    ServerError := ConfigurationError || ArchiverError || ConnectionError || 
LogError || CompatError || QuicError
+    ServerError := ConfigurationError || ArchiverError || ConnectionError || 
LogError || CompatError || QuicError || ShardError
 
     IoError := {
         #[display("IO error")]
@@ -96,4 +96,9 @@ error_set!(
         #[display("Transport config error")]
         TransportConfigError,
     }
+
+    ShardError := {
+        #[display("Shard failed: {}", message)]
+        ShardFailure { message: String },
+    }
 );

Reply via email to