This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 96a44e1bf fix(server): propagate panics from connection handlers to
trigger shutdown (#2515)
96a44e1bf is described below
commit 96a44e1bfd6d79c0059d3ff1cdc9d7ad3d93d04f
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Dec 29 11:25:46 2025 +0100
fix(server): propagate panics from connection handlers to trigger shutdown
(#2515)
---
core/server/src/shard/task_registry/registry.rs | 122 ++++++++++++++++++++----
1 file changed, 105 insertions(+), 17 deletions(-)
diff --git a/core/server/src/shard/task_registry/registry.rs
b/core/server/src/shard/task_registry/registry.rs
index 8e96db6eb..531cdcbdc 100644
--- a/core/server/src/shard/task_registry/registry.rs
+++ b/core/server/src/shard/task_registry/registry.rs
@@ -23,6 +23,7 @@ use futures::future::join_all;
use iggy_common::IggyError;
use std::cell::RefCell;
use std::collections::HashMap;
+use std::future::Future;
use std::ops::{AsyncFn, AsyncFnOnce};
use std::panic::AssertUnwindSafe;
use std::time::{Duration, Instant};
@@ -179,6 +180,7 @@ impl TaskRegistry {
let shutdown = self.shutdown_token.clone();
let shutdown_for_task = self.shutdown_token.clone();
let shard_id = self.shard_id;
+ let all_stop_senders = self.all_stop_senders.clone();
let handle = compio::runtime::spawn(async move {
trace!(
@@ -191,12 +193,36 @@ impl TaskRegistry {
break;
}
- let fut = tick_fn(shutdown_for_task.clone());
- if let Err(e) = fut.await {
- error!(
- "periodic '{}' tick failed on shard {}: {}",
- name, shard_id, e
- );
+ let fut =
AssertUnwindSafe(tick_fn(shutdown_for_task.clone())).catch_unwind();
+ match fut.await {
+ Ok(Ok(())) => {}
+ Ok(Err(e)) => {
+ error!(
+ "periodic '{}' tick failed on shard {}: {}",
+ name, shard_id, e
+ );
+ }
+ Err(panic_payload) => {
+ let panic_msg = panic_payload
+ .downcast_ref::<&str>()
+ .map(|s| s.to_string())
+ .or_else(||
panic_payload.downcast_ref::<String>().cloned())
+ .unwrap_or_else(|| "unknown panic".to_string());
+ error!(
+ "periodic '{}' tick panicked on shard {}: {}",
+ name, shard_id, panic_msg
+ );
+ if critical {
+ error!(
+ "Critical periodic task '{}' panicked on shard
{}, triggering shutdown",
+ name, shard_id
+ );
+ for stop_sender in &all_stop_senders {
+ let _ = stop_sender.try_send(());
+ }
+ return Err(IggyError::Error);
+ }
+ }
}
}
@@ -253,24 +279,63 @@ impl TaskRegistry {
let shutdown = self.shutdown_token.clone();
let shard_id = self.shard_id;
+ let all_stop_senders = self.all_stop_senders.clone();
let handle = compio::runtime::spawn(async move {
trace!("oneshot '{}' starting on shard {}", name, shard_id);
- let fut = f(shutdown);
- let r = if let Some(d) = timeout {
- match compio::time::timeout(d, fut).await {
- Ok(r) => r,
- Err(_) => Err(IggyError::TaskTimeout),
+ let fut = if let Some(d) = timeout {
+ let inner_fut = AssertUnwindSafe(f(shutdown)).catch_unwind();
+ match compio::time::timeout(d, inner_fut).await {
+ Ok(Ok(r)) => Ok(r),
+ Ok(Err(panic_payload)) => Err(panic_payload),
+ Err(_) => Ok(Err(IggyError::TaskTimeout)),
}
} else {
- fut.await
+ AssertUnwindSafe(f(shutdown)).catch_unwind().await
};
- match &r {
- Ok(()) => trace!("oneshot '{}' completed on shard {}", name,
shard_id),
- Err(e) => error!("oneshot '{}' failed on shard {}: {}", name,
shard_id, e),
- }
+ let r = match fut {
+ Ok(r) => {
+ match &r {
+ Ok(()) => trace!("oneshot '{}' completed on shard {}",
name, shard_id),
+ Err(e) => {
+ error!("oneshot '{}' failed on shard {}: {}",
name, shard_id, e);
+ if critical {
+ error!(
+ "Critical oneshot task '{}' failed on
shard {}, triggering shutdown",
+ name, shard_id
+ );
+ for stop_sender in &all_stop_senders {
+ let _ = stop_sender.try_send(());
+ }
+ }
+ }
+ }
+ r
+ }
+ Err(panic_payload) => {
+ let panic_msg = panic_payload
+ .downcast_ref::<&str>()
+ .map(|s| s.to_string())
+ .or_else(||
panic_payload.downcast_ref::<String>().cloned())
+ .unwrap_or_else(|| "unknown panic".to_string());
+ error!(
+ "oneshot '{}' panicked on shard {}: {}",
+ name, shard_id, panic_msg
+ );
+ if critical {
+ error!(
+ "Critical oneshot task '{}' panicked on shard {},
triggering shutdown",
+ name, shard_id
+ );
+ for stop_sender in &all_stop_senders {
+ let _ = stop_sender.try_send(());
+ }
+ }
+ Err(IggyError::Error)
+ }
+ };
if let Some(on_shutdown) = on_shutdown {
on_shutdown(r.clone()).await;
@@ -420,11 +485,34 @@ impl TaskRegistry {
/// Spawn a connection handler that doesn't need to be tracked for
shutdown.
/// These handlers have their own shutdown mechanism via connection
channels.
+ /// If the handler panics, shutdown is triggered for all shards.
pub fn spawn_connection<F>(&self, future: F)
where
F: Future<Output = ()> + 'static,
{
- compio::runtime::spawn(future).detach();
+ let shard_id = self.shard_id;
+ let all_stop_senders = self.all_stop_senders.clone();
+
+ compio::runtime::spawn(async move {
+ let fut = AssertUnwindSafe(future).catch_unwind();
+ if let Err(panic_payload) = fut.await {
+ let panic_msg = panic_payload
+ .downcast_ref::<&str>()
+ .map(|s| s.to_string())
+ .or_else(||
panic_payload.downcast_ref::<String>().cloned())
+ .unwrap_or_else(|| "unknown panic".to_string());
+
+ error!(
+ "Connection handler panicked on shard {}: {}, triggering
shutdown",
+ shard_id, panic_msg
+ );
+
+ for stop_sender in &all_stop_senders {
+ let _ = stop_sender.try_send(());
+ }
+ }
+ })
+ .detach();
}
}