This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 96a44e1bf fix(server): propagate panics from connection handlers to 
trigger shutdown (#2515)
96a44e1bf is described below

commit 96a44e1bfd6d79c0059d3ff1cdc9d7ad3d93d04f
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Dec 29 11:25:46 2025 +0100

    fix(server): propagate panics from connection handlers to trigger shutdown 
(#2515)
---
 core/server/src/shard/task_registry/registry.rs | 122 ++++++++++++++++++++----
 1 file changed, 105 insertions(+), 17 deletions(-)

diff --git a/core/server/src/shard/task_registry/registry.rs 
b/core/server/src/shard/task_registry/registry.rs
index 8e96db6eb..531cdcbdc 100644
--- a/core/server/src/shard/task_registry/registry.rs
+++ b/core/server/src/shard/task_registry/registry.rs
@@ -23,6 +23,7 @@ use futures::future::join_all;
 use iggy_common::IggyError;
 use std::cell::RefCell;
 use std::collections::HashMap;
+use std::future::Future;
 use std::ops::{AsyncFn, AsyncFnOnce};
 use std::panic::AssertUnwindSafe;
 use std::time::{Duration, Instant};
@@ -179,6 +180,7 @@ impl TaskRegistry {
         let shutdown = self.shutdown_token.clone();
         let shutdown_for_task = self.shutdown_token.clone();
         let shard_id = self.shard_id;
+        let all_stop_senders = self.all_stop_senders.clone();
 
         let handle = compio::runtime::spawn(async move {
             trace!(
@@ -191,12 +193,36 @@ impl TaskRegistry {
                     break;
                 }
 
-                let fut = tick_fn(shutdown_for_task.clone());
-                if let Err(e) = fut.await {
-                    error!(
-                        "periodic '{}' tick failed on shard {}: {}",
-                        name, shard_id, e
-                    );
+                let fut = 
AssertUnwindSafe(tick_fn(shutdown_for_task.clone())).catch_unwind();
+                match fut.await {
+                    Ok(Ok(())) => {}
+                    Ok(Err(e)) => {
+                        error!(
+                            "periodic '{}' tick failed on shard {}: {}",
+                            name, shard_id, e
+                        );
+                    }
+                    Err(panic_payload) => {
+                        let panic_msg = panic_payload
+                            .downcast_ref::<&str>()
+                            .map(|s| s.to_string())
+                            .or_else(|| 
panic_payload.downcast_ref::<String>().cloned())
+                            .unwrap_or_else(|| "unknown panic".to_string());
+                        error!(
+                            "periodic '{}' tick panicked on shard {}: {}",
+                            name, shard_id, panic_msg
+                        );
+                        if critical {
+                            error!(
+                                "Critical periodic task '{}' panicked on shard 
{}, triggering shutdown",
+                                name, shard_id
+                            );
+                            for stop_sender in &all_stop_senders {
+                                let _ = stop_sender.try_send(());
+                            }
+                            return Err(IggyError::Error);
+                        }
+                    }
                 }
             }
 
@@ -253,24 +279,63 @@ impl TaskRegistry {
 
         let shutdown = self.shutdown_token.clone();
         let shard_id = self.shard_id;
+        let all_stop_senders = self.all_stop_senders.clone();
 
         let handle = compio::runtime::spawn(async move {
             trace!("oneshot '{}' starting on shard {}", name, shard_id);
-            let fut = f(shutdown);
 
-            let r = if let Some(d) = timeout {
-                match compio::time::timeout(d, fut).await {
-                    Ok(r) => r,
-                    Err(_) => Err(IggyError::TaskTimeout),
+            let fut = if let Some(d) = timeout {
+                let inner_fut = AssertUnwindSafe(f(shutdown)).catch_unwind();
+                match compio::time::timeout(d, inner_fut).await {
+                    Ok(Ok(r)) => Ok(r),
+                    Ok(Err(panic_payload)) => Err(panic_payload),
+                    Err(_) => Ok(Err(IggyError::TaskTimeout)),
                 }
             } else {
-                fut.await
+                AssertUnwindSafe(f(shutdown)).catch_unwind().await
             };
 
-            match &r {
-                Ok(()) => trace!("oneshot '{}' completed on shard {}", name, 
shard_id),
-                Err(e) => error!("oneshot '{}' failed on shard {}: {}", name, 
shard_id, e),
-            }
+            let r = match fut {
+                Ok(r) => {
+                    match &r {
+                        Ok(()) => trace!("oneshot '{}' completed on shard {}", 
name, shard_id),
+                        Err(e) => {
+                            error!("oneshot '{}' failed on shard {}: {}", 
name, shard_id, e);
+                            if critical {
+                                error!(
+                                    "Critical oneshot task '{}' failed on 
shard {}, triggering shutdown",
+                                    name, shard_id
+                                );
+                                for stop_sender in &all_stop_senders {
+                                    let _ = stop_sender.try_send(());
+                                }
+                            }
+                        }
+                    }
+                    r
+                }
+                Err(panic_payload) => {
+                    let panic_msg = panic_payload
+                        .downcast_ref::<&str>()
+                        .map(|s| s.to_string())
+                        .or_else(|| 
panic_payload.downcast_ref::<String>().cloned())
+                        .unwrap_or_else(|| "unknown panic".to_string());
+                    error!(
+                        "oneshot '{}' panicked on shard {}: {}",
+                        name, shard_id, panic_msg
+                    );
+                    if critical {
+                        error!(
+                            "Critical oneshot task '{}' panicked on shard {}, 
triggering shutdown",
+                            name, shard_id
+                        );
+                        for stop_sender in &all_stop_senders {
+                            let _ = stop_sender.try_send(());
+                        }
+                    }
+                    Err(IggyError::Error)
+                }
+            };
 
             if let Some(on_shutdown) = on_shutdown {
                 on_shutdown(r.clone()).await;
@@ -420,11 +485,34 @@ impl TaskRegistry {
 
     /// Spawn a connection handler that doesn't need to be tracked for 
shutdown.
     /// These handlers have their own shutdown mechanism via connection 
channels.
+    /// If the handler panics, shutdown is triggered for all shards.
     pub fn spawn_connection<F>(&self, future: F)
     where
         F: Future<Output = ()> + 'static,
     {
-        compio::runtime::spawn(future).detach();
+        let shard_id = self.shard_id;
+        let all_stop_senders = self.all_stop_senders.clone();
+
+        compio::runtime::spawn(async move {
+            let fut = AssertUnwindSafe(future).catch_unwind();
+            if let Err(panic_payload) = fut.await {
+                let panic_msg = panic_payload
+                    .downcast_ref::<&str>()
+                    .map(|s| s.to_string())
+                    .or_else(|| 
panic_payload.downcast_ref::<String>().cloned())
+                    .unwrap_or_else(|| "unknown panic".to_string());
+
+                error!(
+                    "Connection handler panicked on shard {}: {}, triggering 
shutdown",
+                    shard_id, panic_msg
+                );
+
+                for stop_sender in &all_stop_senders {
+                    let _ = stop_sender.try_send(());
+                }
+            }
+        })
+        .detach();
     }
 }
 

Reply via email to