This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch io_uring_tpc_task_registry
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 7b50a8bcee008a6fc811136cc97797fb3b1ea431
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Wed Sep 24 13:14:23 2025 +0200

    first version of task supervisor
---
 .../commands/clean_personal_access_tokens.rs       |  75 ----
 core/server/src/channels/commands/print_sysinfo.rs |  70 ----
 core/server/src/channels/commands/save_messages.rs |  86 -----
 .../src/channels/commands/verify_heartbeats.rs     |  80 ----
 core/server/src/lib.rs                             |   1 -
 core/server/src/main.rs                            |  28 +-
 core/server/src/quic/listener.rs                   |  52 ++-
 core/server/src/shard/builder.rs                   |   4 +-
 core/server/src/shard/mod.rs                       |  69 ++--
 core/server/src/shard/task_registry.rs             | 108 ------
 .../src/shard/tasks/continuous/http_server.rs      |  72 ++++
 .../src/shard/tasks/continuous/message_pump.rs     | 103 ++++++
 .../{channels => shard/tasks/continuous}/mod.rs    |  12 +-
 .../src/shard/tasks/continuous/quic_server.rs      |  68 ++++
 .../src/shard/tasks/continuous/tcp_server.rs       |  72 ++++
 core/server/src/shard/tasks/messages.rs            |  59 ---
 core/server/src/shard/tasks/mod.rs                 | 116 +++++-
 core/server/src/shard/tasks/oneshot/fsync.rs       |  87 +++++
 .../src/{channels => shard/tasks/oneshot}/mod.rs   |   6 +-
 .../tasks/periodic/clear_personal_access_tokens.rs | 123 ++++++
 .../src/shard/tasks/periodic/jwt_token_cleaner.rs  | 110 ++++++
 .../commands => shard/tasks/periodic}/mod.rs       |   9 +-
 .../src/shard/tasks/periodic/print_sysinfo.rs      | 127 +++++++
 .../src/shard/tasks/periodic/save_messages.rs      | 137 +++++++
 .../src/shard/tasks/periodic/verify_heartbeats.rs  | 132 +++++++
 core/server/src/shard/tasks/shutdown.rs            | 233 ++++++++++++
 core/server/src/shard/tasks/specs.rs               | 203 ++++++++++
 core/server/src/shard/tasks/supervisor.rs          | 412 +++++++++++++++++++++
 core/server/src/shard/transmission/message.rs      |   5 +-
 core/server/src/slab/streams.rs                    |   3 +
 core/server/src/slab/traits_ext.rs                 |   4 +-
 core/server/src/tcp/tcp_listener.rs                |   6 +-
 core/server/src/tcp/tcp_tls_listener.rs            |   6 +-
 33 files changed, 2103 insertions(+), 575 deletions(-)

diff --git a/core/server/src/channels/commands/clean_personal_access_tokens.rs 
b/core/server/src/channels/commands/clean_personal_access_tokens.rs
deleted file mode 100644
index d846aa27..00000000
--- a/core/server/src/channels/commands/clean_personal_access_tokens.rs
+++ /dev/null
@@ -1,75 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use std::rc::Rc;
-
-use crate::shard::IggyShard;
-use iggy_common::{IggyError, IggyTimestamp};
-use tracing::{debug, info, trace};
-
-pub async fn clear_personal_access_tokens(shard: Rc<IggyShard>) -> Result<(), 
IggyError> {
-    let config = &shard.config.personal_access_token.cleaner;
-    if !config.enabled {
-        info!("Personal access token cleaner is disabled.");
-        return Ok(());
-    }
-
-    info!(
-        "Personal access token cleaner is enabled, expired tokens will be 
deleted every: {}.",
-        config.interval
-    );
-
-    let interval = config.interval.get_duration();
-    let mut interval_timer = compio::time::interval(interval);
-
-    loop {
-        interval_timer.tick().await;
-        trace!("Cleaning expired personal access tokens...");
-
-        let users = shard.users.borrow();
-        let now = IggyTimestamp::now();
-        let mut deleted_tokens_count = 0;
-
-        for (_, user) in users.iter() {
-            let expired_tokens = user
-                .personal_access_tokens
-                .iter()
-                .filter(|token| token.is_expired(now))
-                .map(|token| token.token.clone())
-                .collect::<Vec<_>>();
-
-            for token in expired_tokens {
-                debug!(
-                    "Personal access token: {} for user with ID: {} is 
expired.",
-                    token, user.id
-                );
-                deleted_tokens_count += 1;
-                user.personal_access_tokens.remove(&token);
-                debug!(
-                    "Deleted personal access token: {} for user with ID: {}.",
-                    token, user.id
-                );
-            }
-        }
-
-        info!(
-            "Deleted {} expired personal access tokens.",
-            deleted_tokens_count
-        );
-    }
-}
diff --git a/core/server/src/channels/commands/print_sysinfo.rs 
b/core/server/src/channels/commands/print_sysinfo.rs
deleted file mode 100644
index 0c3992d2..00000000
--- a/core/server/src/channels/commands/print_sysinfo.rs
+++ /dev/null
@@ -1,70 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use std::rc::Rc;
-
-use crate::shard::IggyShard;
-use crate::streaming::utils::memory_pool;
-use human_repr::HumanCount;
-use iggy_common::IggyError;
-use tracing::{error, info, trace};
-
-pub async fn print_sys_info(shard: Rc<IggyShard>) -> Result<(), IggyError> {
-    let config = &shard.config.system.logging;
-    let interval = config.sysinfo_print_interval;
-
-    if interval.is_zero() {
-        info!("SysInfoPrinter is disabled.");
-        return Ok(());
-    }
-    info!("SysInfoPrinter is enabled, system information will be printed every 
{interval}.");
-    let mut interval_timer = compio::time::interval(interval.get_duration());
-    loop {
-        interval_timer.tick().await;
-        trace!("Printing system information...");
-
-        let stats = match shard.get_stats().await {
-            Ok(stats) => stats,
-            Err(e) => {
-                error!("Failed to get system information. Error: {e}");
-                continue;
-            }
-        };
-
-        let free_memory_percent = (stats.available_memory.as_bytes_u64() as f64
-            / stats.total_memory.as_bytes_u64() as f64)
-            * 100f64;
-
-        info!(
-            "CPU: {:.2}%/{:.2}% (IggyUsage/Total), Mem: {:.2}%/{}/{}/{} 
(Free/IggyUsage/TotalUsed/Total), Clients: {}, Messages processed: {}, Read: 
{}, Written: {}, Uptime: {}",
-            stats.cpu_usage,
-            stats.total_cpu_usage,
-            free_memory_percent,
-            stats.memory_usage,
-            stats.total_memory - stats.available_memory,
-            stats.total_memory,
-            stats.clients_count,
-            stats.messages_count.human_count_bare(),
-            stats.read_bytes,
-            stats.written_bytes,
-            stats.run_time
-        );
-
-        memory_pool().log_stats();
-    }
-}
diff --git a/core/server/src/channels/commands/save_messages.rs 
b/core/server/src/channels/commands/save_messages.rs
deleted file mode 100644
index 84330563..00000000
--- a/core/server/src/channels/commands/save_messages.rs
+++ /dev/null
@@ -1,86 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use crate::{shard::IggyShard, shard_info};
-use iggy_common::{Identifier, IggyError};
-use std::rc::Rc;
-use tracing::{error, info, trace};
-
-pub async fn save_messages(shard: Rc<IggyShard>) -> Result<(), IggyError> {
-    let config = &shard.config.message_saver;
-    if !config.enabled {
-        info!("Message saver is disabled.");
-        return Ok(());
-    }
-
-    // TODO: Maybe we should get rid of it in order to not complicate, and use 
the fsync settings per partition from config.
-    let enforce_fsync = config.enforce_fsync;
-    let interval = config.interval;
-    info!(
-        "Message saver is enabled, buffered messages will be automatically 
saved every: {interval}, enforce fsync: {enforce_fsync}."
-    );
-
-    let mut interval_timer = compio::time::interval(interval.get_duration());
-    loop {
-        interval_timer.tick().await;
-        trace!("Saving buffered messages...");
-
-        let namespaces = shard.get_current_shard_namespaces();
-        let mut total_saved_messages = 0u32;
-        let reason = "background saver triggered".to_string();
-
-        for ns in namespaces {
-            let stream_id = Identifier::numeric(ns.stream_id() as 
u32).unwrap();
-            let topic_id = Identifier::numeric(ns.topic_id() as u32).unwrap();
-            let partition_id = ns.partition_id();
-
-            match shard
-                .streams2
-                .persist_messages(
-                    shard.id,
-                    &stream_id,
-                    &topic_id,
-                    partition_id,
-                    reason.clone(),
-                    &shard.config.system,
-                )
-                .await
-            {
-                Ok(batch_count) => {
-                    total_saved_messages += batch_count;
-                }
-                Err(err) => {
-                    error!(
-                        "Failed to save messages for partition {}: {}",
-                        partition_id, err
-                    );
-                }
-            }
-        }
-
-        if total_saved_messages > 0 {
-            shard_info!(
-                shard.id,
-                "Saved {} buffered messages on disk.",
-                total_saved_messages
-            );
-        }
-
-        trace!("Finished saving buffered messages.");
-    }
-}
diff --git a/core/server/src/channels/commands/verify_heartbeats.rs 
b/core/server/src/channels/commands/verify_heartbeats.rs
deleted file mode 100644
index 123c174f..00000000
--- a/core/server/src/channels/commands/verify_heartbeats.rs
+++ /dev/null
@@ -1,80 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use crate::shard::IggyShard;
-use iggy_common::{IggyDuration, IggyError, IggyTimestamp};
-use std::rc::Rc;
-use tracing::{debug, info, trace, warn};
-
-const MAX_THRESHOLD: f64 = 1.2;
-
-pub async fn verify_heartbeats(shard: Rc<IggyShard>) -> Result<(), IggyError> {
-    let config = &shard.config.heartbeat;
-    if !config.enabled {
-        info!("Heartbeats verification is disabled.");
-        return Ok(());
-    }
-
-    let interval = config.interval;
-    let max_interval = IggyDuration::from((MAX_THRESHOLD * 
interval.as_micros() as f64) as u64);
-    info!("Heartbeats will be verified every: {interval}. Max allowed 
interval: {max_interval}.");
-
-    let mut interval_timer = compio::time::interval(interval.get_duration());
-
-    loop {
-        interval_timer.tick().await;
-        trace!("Verifying heartbeats...");
-
-        let clients = {
-            let client_manager = shard.client_manager.borrow();
-            client_manager.get_clients()
-        };
-
-        let now = IggyTimestamp::now();
-        let heartbeat_to = IggyTimestamp::from(now.as_micros() - 
max_interval.as_micros());
-        debug!("Verifying heartbeats at: {now}, max allowed timestamp: 
{heartbeat_to}");
-
-        let mut stale_clients = Vec::new();
-        for client in clients {
-            if client.last_heartbeat.as_micros() < heartbeat_to.as_micros() {
-                warn!(
-                    "Stale client session: {}, last heartbeat at: {}, max 
allowed timestamp: {heartbeat_to}",
-                    client.session, client.last_heartbeat,
-                );
-                client.session.set_stale();
-                stale_clients.push(client.session.client_id);
-            } else {
-                debug!(
-                    "Valid heartbeat at: {} for client session: {}, max 
allowed timestamp: {heartbeat_to}",
-                    client.last_heartbeat, client.session,
-                );
-            }
-        }
-
-        if stale_clients.is_empty() {
-            continue;
-        }
-
-        let count = stale_clients.len();
-        info!("Removing {count} stale clients...");
-        for client_id in stale_clients {
-            shard.delete_client(client_id);
-        }
-        info!("Removed {count} stale clients.");
-    }
-}
diff --git a/core/server/src/lib.rs b/core/server/src/lib.rs
index 38263692..735a2e51 100644
--- a/core/server/src/lib.rs
+++ b/core/server/src/lib.rs
@@ -31,7 +31,6 @@ compile_error!("iggy-server doesn't support windows.");
 pub mod args;
 pub mod binary;
 pub mod bootstrap;
-pub mod channels;
 pub(crate) mod compat;
 pub mod configs;
 pub mod http;
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 05e4ecc8..3cb87561 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -352,22 +352,20 @@ async fn main() -> Result<(), ServerError> {
     }
 
     let shutdown_handles_for_signal = shutdown_handles.clone();
-    /*
-        ::set_handler(move || {
-            info!("Received shutdown signal (SIGTERM/SIGINT), initiating 
graceful shutdown...");
-
-            for (shard_id, stop_sender) in &shutdown_handles_for_signal {
-                info!("Sending shutdown signal to shard {}", shard_id);
-                if let Err(e) = stop_sender.send_blocking(()) {
-                    error!(
-                        "Failed to send shutdown signal to shard {}: {}",
-                        shard_id, e
-                    );
-                }
+    ctrlc::set_handler(move || {
+        info!("Received shutdown signal (SIGTERM/SIGINT), initiating graceful 
shutdown...");
+
+        for (shard_id, stop_sender) in &shutdown_handles_for_signal {
+            info!("Sending shutdown signal to shard {}", shard_id);
+            if let Err(e) = stop_sender.try_send(()) {
+                error!(
+                    "Failed to send shutdown signal to shard {}: {}",
+                    shard_id, e
+                );
             }
-        })
-        .expect("Error setting Ctrl-C handler");
-    */
+        }
+    })
+    .expect("Error setting Ctrl-C handler");
 
     info!("Iggy server is running. Press Ctrl+C or send SIGTERM to shutdown.");
     for (idx, handle) in handles.into_iter().enumerate() {
diff --git a/core/server/src/quic/listener.rs b/core/server/src/quic/listener.rs
index 5fce5c84..247b4b73 100644
--- a/core/server/src/quic/listener.rs
+++ b/core/server/src/quic/listener.rs
@@ -27,6 +27,7 @@ use crate::streaming::session::Session;
 use crate::{shard_debug, shard_info};
 use anyhow::anyhow;
 use compio_quic::{Connection, Endpoint, RecvStream, SendStream};
+use futures::FutureExt;
 use iggy_common::IggyError;
 use iggy_common::TransportProtocol;
 use tracing::{error, info, trace};
@@ -42,15 +43,16 @@ pub async fn start(endpoint: Endpoint, shard: 
Rc<IggyShard>) -> Result<(), IggyE
     while let Some(incoming_conn) = endpoint.wait_incoming().await {
         let remote_addr = incoming_conn.remote_address();
         trace!("Incoming connection from client: {}", remote_addr);
-        let shard = shard.clone();
+        let shard_clone = shard.clone();
+        let shard_for_conn = shard_clone.clone();
 
-        // Spawn each connection handler independently to maintain concurrency
-        compio::runtime::spawn(async move {
+        // Use TaskSupervisor to track connection handlers for graceful 
shutdown
+        shard_clone.task_supervisor.spawn_tracked(async move {
             trace!("Accepting connection from {}", remote_addr);
             match incoming_conn.await {
                 Ok(connection) => {
                     trace!("Connection established from {}", remote_addr);
-                    if let Err(error) = handle_connection(connection, 
shard).await {
+                    if let Err(error) = handle_connection(connection, 
shard_for_conn).await {
                         error!("QUIC connection from {} has failed: {error}", 
remote_addr);
                     }
                 }
@@ -61,8 +63,7 @@ pub async fn start(endpoint: Endpoint, shard: Rc<IggyShard>) 
-> Result<(), IggyE
                     );
                 }
             }
-        })
-        .detach();
+        });
     }
 
     info!("QUIC listener for shard {} stopped", shard.id);
@@ -77,7 +78,6 @@ async fn handle_connection(
     info!("Client has connected: {address}");
     let session = shard.add_client(&address, TransportProtocol::Quic);
 
-    let session = shard.add_client(&address, TransportProtocol::Quic);
     let client_id = session.client_id;
     shard_debug!(
         shard.id,
@@ -95,17 +95,39 @@ async fn handle_connection(
     };
     let _responses = shard.broadcast_event_to_all_shards(event.into()).await;
 
-    while let Some(stream) = accept_stream(&connection, &shard, 
client_id).await? {
-        let shard = shard.clone();
-        let session = session.clone();
+    // Add connection tracking for graceful shutdown
+    let conn_stop_receiver = shard.task_supervisor.add_connection(client_id);
 
-        let handle_stream_task = async move {
-            if let Err(err) = handle_stream(stream, shard, session).await {
-                error!("Error when handling QUIC stream: {:?}", err)
+    // Handle streams until connection closes or shutdown is triggered
+    loop {
+        // Check for shutdown signal
+        futures::select! {
+            _ = conn_stop_receiver.recv().fuse() => {
+                info!("QUIC connection {} shutting down gracefully", 
client_id);
+                break;
+            }
+            stream_result = accept_stream(&connection, &shard, 
client_id).fuse() => {
+                match stream_result? {
+                    Some(stream) => {
+                        let shard_clone = shard.clone();
+                        let session_clone = session.clone();
+
+                        // Use TaskSupervisor to track stream handlers
+                        shard.task_supervisor.spawn_tracked(async move {
+                            if let Err(err) = handle_stream(stream, 
shard_clone, session_clone).await {
+                                error!("Error when handling QUIC stream: 
{:?}", err)
+                            }
+                        });
+                    }
+                    None => break, // Connection closed
+                }
             }
-        };
-        let _handle = compio::runtime::spawn(handle_stream_task).detach();
+        }
     }
+
+    // Remove connection from tracking
+    shard.task_supervisor.remove_connection(&client_id);
+    info!("QUIC connection {} closed", client_id);
     Ok(())
 }
 
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
index fc203b77..378e4e03 100644
--- a/core/server/src/shard/builder.rs
+++ b/core/server/src/shard/builder.rs
@@ -30,7 +30,7 @@ use tracing::info;
 use crate::{
     configs::server::ServerConfig,
     io::storage::Storage,
-    shard::{Shard, ShardInfo, namespace::IggyNamespace, 
task_registry::TaskRegistry},
+    shard::{Shard, ShardInfo, namespace::IggyNamespace, tasks::TaskSupervisor},
     slab::streams::Streams,
     state::{StateKind, system::SystemState},
     streaming::{
@@ -160,7 +160,7 @@ impl IggyShardBuilder {
             stop_sender: stop_sender,
             messages_receiver: Cell::new(Some(frame_receiver)),
             metrics: metrics,
-            task_registry: TaskRegistry::new(),
+            task_supervisor: TaskSupervisor::new(self.id.unwrap_or(0)),
             is_shutting_down: AtomicBool::new(false),
             tcp_bound_address: Cell::new(None),
             quic_bound_address: Cell::new(None),
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 4771d238..7e2cc1d9 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -21,7 +21,6 @@ pub mod logging;
 pub mod namespace;
 pub mod stats;
 pub mod system;
-pub mod task_registry;
 pub mod tasks;
 pub mod transmission;
 
@@ -61,8 +60,7 @@ use crate::{
     io::fs_utils,
     shard::{
         namespace::{IggyFullNamespace, IggyNamespace},
-        task_registry::TaskRegistry,
-        tasks::messages::spawn_shard_message_task,
+        tasks::{TaskSupervisor, shard_task_specs},
         transmission::{
             event::ShardEvent,
             frame::{ShardFrame, ShardResponse},
@@ -170,7 +168,7 @@ pub struct IggyShard {
     pub messages_receiver: Cell<Option<Receiver<ShardFrame>>>,
     pub(crate) stop_receiver: StopReceiver,
     pub(crate) stop_sender: StopSender,
-    pub(crate) task_registry: TaskRegistry,
+    pub(crate) task_supervisor: TaskSupervisor,
     pub(crate) is_shutting_down: AtomicBool,
     pub(crate) tcp_bound_address: Cell<Option<SocketAddr>>,
     pub(crate) quic_bound_address: Cell<Option<SocketAddr>>,
@@ -220,7 +218,7 @@ impl IggyShard {
             messages_receiver: Cell::new(None),
             stop_receiver,
             stop_sender,
-            task_registry: TaskRegistry::new(),
+            task_supervisor: TaskSupervisor::new(0),
             is_shutting_down: AtomicBool::new(false),
             tcp_bound_address: Cell::new(None),
             quic_bound_address: Cell::new(None),
@@ -254,49 +252,17 @@ impl IggyShard {
         // TODO: Fixme
         //self.assert_init();
 
-        // Create all tasks (tcp listener, http listener, command processor, 
in the future also the background jobs).
-        let mut tasks: Vec<Task> = 
vec![Box::pin(spawn_shard_message_task(self.clone()))];
-        if self.config.tcp.enabled {
-            tasks.push(Box::pin(spawn_tcp_server(self.clone())));
-        }
-
-        if self.config.http.enabled && self.id == 0 {
-            println!("Starting HTTP server on shard: {}", self.id);
-            tasks.push(Box::pin(http_server::start(
-                self.config.http.clone(),
-                self.clone(),
-            )));
-        }
+        // Create and spawn all tasks via the supervisor
+        let task_specs = shard_task_specs(self.clone());
+        self.task_supervisor.spawn(self.clone(), task_specs);
 
-        if self.config.quic.enabled {
-            tasks.push(Box::pin(crate::quic::quic_server::span_quic_server(
-                self.clone(),
-            )));
-        }
-
-        tasks.push(Box::pin(
-            
crate::channels::commands::clean_personal_access_tokens::clear_personal_access_tokens(
-                self.clone(),
-            ),
-        ));
-        // TOOD: Fixme, not always id 0 is the first shard.
-        if self.id == 0 {
-            tasks.push(Box::pin(
-                
crate::channels::commands::print_sysinfo::print_sys_info(self.clone()),
-            ));
-        }
-
-        tasks.push(Box::pin(
-            
crate::channels::commands::verify_heartbeats::verify_heartbeats(self.clone()),
-        ));
-        tasks.push(Box::pin(
-            
crate::channels::commands::save_messages::save_messages(self.clone()),
-        ));
+        // Create a oneshot channel for shutdown completion notification
+        let (shutdown_complete_tx, shutdown_complete_rx) = 
async_channel::bounded(1);
 
         let stop_receiver = self.get_stop_receiver();
         let shard_for_shutdown = self.clone();
 
-        /*
+        // Spawn shutdown handler - only this task consumes the stop signal
         compio::runtime::spawn(async move {
             let _ = stop_receiver.recv().await;
             info!("Shard {} received shutdown signal", shard_for_shutdown.id);
@@ -307,13 +273,18 @@ impl IggyShard {
             } else {
                 shard_info!(shard_for_shutdown.id, "shutdown completed 
successfully");
             }
-        });
-        */
+
+            // Notify that shutdown is complete
+            let _ = shutdown_complete_tx.send(()).await;
+        })
+        .detach();
 
         let elapsed = now.elapsed();
         shard_info!(self.id, "Initialized in {} ms.", elapsed.as_millis());
-        let result = try_join_all(tasks).await;
-        result?;
+
+        // Wait for shutdown completion signal (not the stop signal directly)
+        // This avoids the double consumer issue
+        shutdown_complete_rx.recv().await.ok();
         Ok(())
     }
 
@@ -421,7 +392,9 @@ impl IggyShard {
     pub async fn trigger_shutdown(&self) -> bool {
         self.is_shutting_down.store(true, Ordering::SeqCst);
         info!("Shard {} shutdown state set", self.id);
-        self.task_registry.shutdown_all(SHUTDOWN_TIMEOUT).await
+        self.task_supervisor
+            .graceful_shutdown(SHUTDOWN_TIMEOUT)
+            .await
     }
 
     pub fn get_available_shards_count(&self) -> u32 {
diff --git a/core/server/src/shard/task_registry.rs 
b/core/server/src/shard/task_registry.rs
deleted file mode 100644
index ff674a8e..00000000
--- a/core/server/src/shard/task_registry.rs
+++ /dev/null
@@ -1,108 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use async_channel::{Receiver, Sender, bounded};
-use compio::runtime::JoinHandle;
-use futures::future::join_all;
-use std::cell::RefCell;
-use std::collections::HashMap;
-use std::future::Future;
-use std::time::Duration;
-use tracing::{error, info, warn};
-
-pub struct TaskRegistry {
-    tasks: RefCell<Vec<JoinHandle<()>>>,
-    active_connections: RefCell<HashMap<u32, Sender<()>>>,
-}
-
-impl TaskRegistry {
-    pub fn new() -> Self {
-        Self {
-            tasks: RefCell::new(Vec::new()),
-            active_connections: RefCell::new(HashMap::new()),
-        }
-    }
-
-    pub fn spawn_tracked<F>(&self, future: F)
-    where
-        F: Future<Output = ()> + 'static,
-    {
-        let handle = compio::runtime::spawn(future);
-        self.tasks.borrow_mut().push(handle);
-    }
-
-    pub fn add_connection(&self, client_id: u32) -> Receiver<()> {
-        let (stop_sender, stop_receiver) = bounded(1);
-        self.active_connections
-            .borrow_mut()
-            .insert(client_id, stop_sender);
-        stop_receiver
-    }
-
-    pub fn remove_connection(&self, client_id: &u32) {
-        self.active_connections.borrow_mut().remove(client_id);
-    }
-
-    pub async fn shutdown_all(&self, timeout: Duration) -> bool {
-        info!("Initiating task registry shutdown");
-
-        let connections = self.active_connections.borrow();
-        for (client_id, stop_sender) in connections.iter() {
-            info!("Sending shutdown signal to client {}", client_id);
-            if let Err(e) = stop_sender.send(()).await {
-                warn!(
-                    "Failed to send shutdown signal to client {}: {}",
-                    client_id, e
-                );
-            }
-        }
-        drop(connections);
-
-        let tasks = self.tasks.take();
-        let total = tasks.len();
-
-        if total == 0 {
-            info!("No tasks to shut down");
-            return true;
-        }
-
-        let timeout_futures: Vec<_> = tasks
-            .into_iter()
-            .enumerate()
-            .map(|(idx, handle)| async move {
-                match compio::time::timeout(timeout, handle).await {
-                    Ok(_) => (idx, true),
-                    Err(_) => {
-                        warn!("Task {} did not complete within timeout", idx);
-                        (idx, false)
-                    }
-                }
-            })
-            .collect();
-
-        let results = join_all(timeout_futures).await;
-        let completed = results.iter().filter(|(_, success)| *success).count();
-
-        info!(
-            "Task registry shutdown complete. {} of {} tasks completed",
-            completed, total
-        );
-
-        completed == total
-    }
-}
diff --git a/core/server/src/shard/tasks/continuous/http_server.rs 
b/core/server/src/shard/tasks/continuous/http_server.rs
new file mode 100644
index 00000000..cd784a1a
--- /dev/null
+++ b/core/server/src/shard/tasks/continuous/http_server.rs
@@ -0,0 +1,72 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::http::http_server;
+use crate::shard::IggyShard;
+use crate::shard::tasks::specs::{
+    ContinuousSpec, TaskCtx, TaskFuture, TaskKind, TaskScope, TaskSpec,
+};
+use std::fmt::Debug;
+use std::rc::Rc;
+use tracing::info;
+
+/// Continuous task for running the HTTP REST API server
+pub struct HttpServer {
+    shard: Rc<IggyShard>,
+}
+
+impl Debug for HttpServer {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("HttpServer")
+            .field("shard_id", &self.shard.id)
+            .finish()
+    }
+}
+
+impl HttpServer {
+    pub fn new(shard: Rc<IggyShard>) -> Self {
+        Self { shard }
+    }
+}
+
+impl TaskSpec for HttpServer {
+    fn name(&self) -> &'static str {
+        "http_server"
+    }
+
+    fn kind(&self) -> TaskKind {
+        TaskKind::Continuous
+    }
+
+    fn scope(&self) -> TaskScope {
+        TaskScope::SpecificShard(0)
+    }
+
+    fn is_critical(&self) -> bool {
+        false
+    }
+
+    fn run(self: Box<Self>, _ctx: TaskCtx) -> TaskFuture {
+        Box::pin(async move {
+            info!("Starting HTTP server on shard: {}", self.shard.id);
+            http_server::start(self.shard.config.http.clone(), 
self.shard.clone()).await
+        })
+    }
+}
+
+impl ContinuousSpec for HttpServer {}
diff --git a/core/server/src/shard/tasks/continuous/message_pump.rs 
b/core/server/src/shard/tasks/continuous/message_pump.rs
new file mode 100644
index 00000000..e442f87b
--- /dev/null
+++ b/core/server/src/shard/tasks/continuous/message_pump.rs
@@ -0,0 +1,103 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::shard::IggyShard;
+use crate::shard::tasks::specs::{
+    ContinuousSpec, TaskCtx, TaskFuture, TaskKind, TaskScope, TaskSpec,
+};
+use crate::shard::transmission::frame::ShardFrame;
+use crate::shard_info;
+use futures::{FutureExt, StreamExt};
+use std::fmt::Debug;
+use std::rc::Rc;
+
+/// Continuous task for processing inter-shard messages
+pub struct MessagePump {
+    shard: Rc<IggyShard>,
+}
+
+impl Debug for MessagePump {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("MessagePump")
+            .field("shard_id", &self.shard.id)
+            .finish()
+    }
+}
+
+impl MessagePump {
+    pub fn new(shard: Rc<IggyShard>) -> Self {
+        Self { shard }
+    }
+}
+
+impl TaskSpec for MessagePump {
+    fn name(&self) -> &'static str {
+        "message_pump"
+    }
+
+    fn kind(&self) -> TaskKind {
+        TaskKind::Continuous
+    }
+
+    fn scope(&self) -> TaskScope {
+        TaskScope::AllShards
+    }
+
+    fn is_critical(&self) -> bool {
+        true
+    }
+
+    fn run(self: Box<Self>, ctx: TaskCtx) -> TaskFuture {
+        Box::pin(async move {
+            let mut messages_receiver = 
self.shard.messages_receiver.take().unwrap();
+
+            shard_info!(self.shard.id, "Starting message passing task");
+
+            loop {
+                futures::select! {
+                    _ = ctx.shutdown.wait().fuse() => {
+                        shard_info!(self.shard.id, "Message receiver shutting 
down");
+                        break;
+                    }
+                    frame = messages_receiver.next().fuse() => {
+                        if let Some(frame) = frame {
+                            let ShardFrame {
+                                message,
+                                response_sender,
+                            } = frame;
+
+                            match 
(self.shard.handle_shard_message(message).await, response_sender) {
+                                (Some(response), Some(response_sender)) => {
+                                    response_sender
+                                        .send(response)
+                                        .await
+                                        .expect("Failed to send response back 
to origin shard.");
+                                }
+                                _ => {}
+                            };
+                        }
+                    }
+                }
+            }
+
+            Ok(())
+        })
+    }
+}
+
+impl ContinuousSpec for MessagePump {}
diff --git a/core/server/src/channels/mod.rs 
b/core/server/src/shard/tasks/continuous/mod.rs
similarity index 74%
copy from core/server/src/channels/mod.rs
copy to core/server/src/shard/tasks/continuous/mod.rs
index d147f7e0..97a3ff75 100644
--- a/core/server/src/channels/mod.rs
+++ b/core/server/src/shard/tasks/continuous/mod.rs
@@ -16,4 +16,14 @@
  * under the License.
  */
 
-pub mod commands;
+//! Continuous task specifications for long-running services
+
+pub mod http_server;
+pub mod message_pump;
+pub mod quic_server;
+pub mod tcp_server;
+
+pub use http_server::HttpServer;
+pub use message_pump::MessagePump;
+pub use quic_server::QuicServer;
+pub use tcp_server::TcpServer;
diff --git a/core/server/src/shard/tasks/continuous/quic_server.rs 
b/core/server/src/shard/tasks/continuous/quic_server.rs
new file mode 100644
index 00000000..c0d451b4
--- /dev/null
+++ b/core/server/src/shard/tasks/continuous/quic_server.rs
@@ -0,0 +1,68 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::quic::quic_server;
+use crate::shard::IggyShard;
+use crate::shard::tasks::specs::{
+    ContinuousSpec, TaskCtx, TaskFuture, TaskKind, TaskScope, TaskSpec,
+};
+use std::fmt::Debug;
+use std::rc::Rc;
+
+/// Continuous task for running the QUIC server
+pub struct QuicServer {
+    shard: Rc<IggyShard>,
+}
+
+impl Debug for QuicServer {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("QuicServer")
+            .field("shard_id", &self.shard.id)
+            .finish()
+    }
+}
+
+impl QuicServer {
+    pub fn new(shard: Rc<IggyShard>) -> Self {
+        Self { shard }
+    }
+}
+
+impl TaskSpec for QuicServer {
+    fn name(&self) -> &'static str {
+        "quic_server"
+    }
+
+    fn kind(&self) -> TaskKind {
+        TaskKind::Continuous
+    }
+
+    fn scope(&self) -> TaskScope {
+        TaskScope::AllShards
+    }
+
+    fn is_critical(&self) -> bool {
+        false
+    }
+
+    fn run(self: Box<Self>, _ctx: TaskCtx) -> TaskFuture {
+        Box::pin(async move { 
quic_server::span_quic_server(self.shard.clone()).await })
+    }
+}
+
+impl ContinuousSpec for QuicServer {}
diff --git a/core/server/src/shard/tasks/continuous/tcp_server.rs 
b/core/server/src/shard/tasks/continuous/tcp_server.rs
new file mode 100644
index 00000000..b269db58
--- /dev/null
+++ b/core/server/src/shard/tasks/continuous/tcp_server.rs
@@ -0,0 +1,72 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::shard::IggyShard;
+use crate::shard::tasks::specs::{
+    ContinuousSpec, TaskCtx, TaskFuture, TaskKind, TaskScope, TaskSpec,
+};
+use crate::tcp::tcp_server::spawn_tcp_server;
+use std::fmt::Debug;
+use std::rc::Rc;
+
+/// Continuous task for running the TCP server
+pub struct TcpServer {
+    shard: Rc<IggyShard>,
+}
+
+impl Debug for TcpServer {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("TcpServer")
+            .field("shard_id", &self.shard.id)
+            .finish()
+    }
+}
+
+impl TcpServer {
+    pub fn new(shard: Rc<IggyShard>) -> Self {
+        Self { shard }
+    }
+}
+
+impl TaskSpec for TcpServer {
+    fn name(&self) -> &'static str {
+        "tcp_server"
+    }
+
+    fn kind(&self) -> TaskKind {
+        TaskKind::Continuous
+    }
+
+    fn scope(&self) -> TaskScope {
+        TaskScope::AllShards
+    }
+
+    fn is_critical(&self) -> bool {
+        false
+    }
+
+    fn run(self: Box<Self>, _ctx: TaskCtx) -> TaskFuture {
+        Box::pin(async move {
+            // The existing spawn_tcp_server already handles shutdown 
internally
+            // via is_shutting_down checks. This will be refactored later.
+            spawn_tcp_server(self.shard.clone()).await
+        })
+    }
+}
+
+impl ContinuousSpec for TcpServer {}
diff --git a/core/server/src/shard/tasks/messages.rs 
b/core/server/src/shard/tasks/messages.rs
deleted file mode 100644
index 2feadd1c..00000000
--- a/core/server/src/shard/tasks/messages.rs
+++ /dev/null
@@ -1,59 +0,0 @@
-use futures::{FutureExt, StreamExt};
-use iggy_common::IggyError;
-use std::{rc::Rc, time::Duration};
-
-use crate::{
-    shard::{IggyShard, transmission::frame::ShardFrame},
-    shard_error, shard_info,
-};
-
-async fn run_shard_messages_receiver(shard: Rc<IggyShard>) -> Result<(), 
IggyError> {
-    let mut messages_receiver = shard.messages_receiver.take().unwrap();
-
-    shard_info!(shard.id, "Starting message passing task");
-    loop {
-        let shutdown_check = async {
-            loop {
-                if shard.is_shutting_down() {
-                    return;
-                }
-                compio::time::sleep(Duration::from_millis(100)).await;
-            }
-        };
-
-        futures::select! {
-            _ = shutdown_check.fuse() => {
-                shard_info!(shard.id, "Message receiver shutting down");
-                break;
-            }
-            frame = messages_receiver.next().fuse() => {
-                if let Some(frame) = frame {
-                    let ShardFrame {
-                        message,
-                        response_sender,
-                    } = frame;
-                    match (shard.handle_shard_message(message).await, 
response_sender) {
-                        (Some(response), Some(response_sender)) => {
-                            response_sender
-                                .send(response)
-                                .await
-                                .expect("Failed to send response back to 
origin shard.");
-                        }
-                        _ => {}
-                    };
-                }
-            }
-        }
-    }
-
-    Ok(())
-}
-
-pub async fn spawn_shard_message_task(shard: Rc<IggyShard>) -> Result<(), 
IggyError> {
-    let result = run_shard_messages_receiver(shard.clone()).await;
-    if let Err(err) = result {
-        shard_error!(shard.id, "Error running shard message receiver: {err}");
-        return Err(err);
-    }
-    Ok(())
-}
diff --git a/core/server/src/shard/tasks/mod.rs 
b/core/server/src/shard/tasks/mod.rs
index ba63992f..1ad68837 100644
--- a/core/server/src/shard/tasks/mod.rs
+++ b/core/server/src/shard/tasks/mod.rs
@@ -1 +1,115 @@
-pub mod messages;
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! Task management system for shard operations
+//!
+//! This module provides a unified framework for managing all asynchronous 
tasks
+//! within a shard, including servers, periodic maintenance, and one-shot 
operations.
+
+pub mod continuous;
+pub mod oneshot;
+pub mod periodic;
+pub mod shutdown;
+pub mod specs;
+pub mod supervisor;
+
+pub use shutdown::{Shutdown, ShutdownToken};
+pub use specs::{TaskCtx, TaskFuture, TaskKind, TaskScope, TaskSpec};
+pub use supervisor::TaskSupervisor;
+
+use crate::shard::IggyShard;
+use specs::IsNoOp;
+use std::rc::Rc;
+use std::time::Duration;
+
+/// Create all task specifications for a shard
+///
+/// This is the central place where all tasks are declared and configured.
+/// The supervisor will filter tasks based on their scope and the shard's 
properties.
+pub fn shard_task_specs(shard: Rc<IggyShard>) -> Vec<Box<dyn TaskSpec>> {
+    let mut specs: Vec<Box<dyn TaskSpec>> = vec![];
+
+    // Continuous tasks - servers and message processing
+    specs.push(Box::new(continuous::MessagePump::new(shard.clone())));
+
+    if shard.config.tcp.enabled {
+        specs.push(Box::new(continuous::TcpServer::new(shard.clone())));
+    }
+
+    if shard.config.http.enabled {
+        specs.push(Box::new(continuous::HttpServer::new(shard.clone())));
+    }
+
+    if shard.config.quic.enabled {
+        specs.push(Box::new(continuous::QuicServer::new(shard.clone())));
+    }
+
+    // Periodic tasks - maintenance and monitoring
+    if shard.config.message_saver.enabled {
+        let period = shard.config.message_saver.interval.get_duration();
+        specs.push(Box::new(periodic::SaveMessages::new(shard.clone(), 
period)));
+    }
+
+    if shard.config.heartbeat.enabled {
+        let period = shard.config.heartbeat.interval.get_duration();
+        specs.push(Box::new(periodic::VerifyHeartbeats::new(
+            shard.clone(),
+            period,
+        )));
+    }
+
+    if shard.config.personal_access_token.cleaner.enabled {
+        let period = shard
+            .config
+            .personal_access_token
+            .cleaner
+            .interval
+            .get_duration();
+        specs.push(Box::new(periodic::ClearPersonalAccessTokens::new(
+            shard.clone(),
+            period,
+        )));
+    }
+
+    // JWT token cleaner (leader only, if HTTP is enabled)
+    if shard.config.http.enabled {
+        // JWT tokens are cleaned every 5 minutes (300 seconds)
+        let jwt_cleaner_period = Duration::from_secs(300);
+        specs.push(Box::new(periodic::JwtTokenCleaner::new(
+            shard.clone(),
+            jwt_cleaner_period,
+        )));
+    }
+
+    // System info printing (leader only)
+    let sysinfo_period = shard
+        .config
+        .system
+        .logging
+        .sysinfo_print_interval
+        .get_duration();
+    if !sysinfo_period.is_zero() {
+        specs.push(Box::new(periodic::PrintSysinfo::new(
+            shard.clone(),
+            sysinfo_period,
+        )));
+    }
+
+    // Filter out no-op tasks
+    specs.into_iter().filter(|spec| !spec.is_noop()).collect()
+}
diff --git a/core/server/src/shard/tasks/oneshot/fsync.rs 
b/core/server/src/shard/tasks/oneshot/fsync.rs
new file mode 100644
index 00000000..d03c1ad6
--- /dev/null
+++ b/core/server/src/shard/tasks/oneshot/fsync.rs
@@ -0,0 +1,87 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::shard::tasks::{TaskCtx, TaskFuture, TaskKind, TaskScope, TaskSpec, 
specs::OneShotSpec};
+use compio::fs::File;
+use iggy_common::IggyError;
+use std::{path::PathBuf, time::Duration};
+use tracing::{debug, error, trace};
+
+/// One-shot task for performing fsync on critical files
+#[derive(Debug)]
+pub struct FsyncTask {
+    path: PathBuf,
+    description: String,
+}
+
+impl FsyncTask {
+    pub fn new(path: PathBuf, description: String) -> Self {
+        Self { path, description }
+    }
+}
+
+impl TaskSpec for FsyncTask {
+    fn name(&self) -> &'static str {
+        "fsync"
+    }
+
+    fn kind(&self) -> TaskKind {
+        TaskKind::OneShot
+    }
+
+    fn scope(&self) -> TaskScope {
+        TaskScope::AllShards
+    }
+
+    fn is_critical(&self) -> bool {
+        true
+    }
+
+    fn run(self: Box<Self>, _ctx: TaskCtx) -> TaskFuture {
+        Box::pin(async move {
+            trace!(
+                "Performing fsync on {} - {}",
+                self.path.display(),
+                self.description
+            );
+
+            match File::open(&self.path).await {
+                Ok(file) => match file.sync_all().await {
+                    Ok(_) => {
+                        debug!("Successfully synced {} to disk", 
self.path.display());
+                        Ok(())
+                    }
+                    Err(e) => {
+                        error!("Failed to fsync {}: {}", self.path.display(), 
e);
+                        Err(IggyError::CannotSyncFile)
+                    }
+                },
+                Err(e) => {
+                    error!("Failed to open {} for fsync: {}", 
self.path.display(), e);
+                    Err(IggyError::CannotReadFile)
+                }
+            }
+        })
+    }
+}
+
+impl OneShotSpec for FsyncTask {
+    fn timeout(&self) -> Option<Duration> {
+        Some(Duration::from_secs(30)) // 30 second timeout for fsync
+    }
+}
diff --git a/core/server/src/channels/mod.rs 
b/core/server/src/shard/tasks/oneshot/mod.rs
similarity index 86%
rename from core/server/src/channels/mod.rs
rename to core/server/src/shard/tasks/oneshot/mod.rs
index d147f7e0..a07e8582 100644
--- a/core/server/src/channels/mod.rs
+++ b/core/server/src/shard/tasks/oneshot/mod.rs
@@ -16,4 +16,8 @@
  * under the License.
  */
 
-pub mod commands;
+//! One-shot task specifications for durability and administrative operations
+
+pub mod fsync;
+
+pub use fsync::FsyncTask;
diff --git 
a/core/server/src/shard/tasks/periodic/clear_personal_access_tokens.rs 
b/core/server/src/shard/tasks/periodic/clear_personal_access_tokens.rs
new file mode 100644
index 00000000..818385e4
--- /dev/null
+++ b/core/server/src/shard/tasks/periodic/clear_personal_access_tokens.rs
@@ -0,0 +1,123 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::shard::IggyShard;
+use crate::shard::tasks::specs::{
+    PeriodicSpec, TaskCtx, TaskFuture, TaskKind, TaskScope, TaskSpec,
+};
+use iggy_common::IggyTimestamp;
+use std::fmt::Debug;
+use std::rc::Rc;
+use std::time::Duration;
+use tracing::{debug, info, trace};
+
+/// Periodic task for cleaning expired personal access tokens
+pub struct ClearPersonalAccessTokens {
+    shard: Rc<IggyShard>,
+    period: Duration,
+}
+
+impl Debug for ClearPersonalAccessTokens {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("ClearPersonalAccessTokens")
+            .field("shard_id", &self.shard.id)
+            .field("period", &self.period)
+            .finish()
+    }
+}
+
+impl ClearPersonalAccessTokens {
+    pub fn new(shard: Rc<IggyShard>, period: Duration) -> Self {
+        Self { shard, period }
+    }
+}
+
+impl TaskSpec for ClearPersonalAccessTokens {
+    fn name(&self) -> &'static str {
+        "clear_personal_access_tokens"
+    }
+
+    fn kind(&self) -> TaskKind {
+        TaskKind::Periodic {
+            period: self.period,
+        }
+    }
+
+    fn scope(&self) -> TaskScope {
+        // Only clean tokens on shard 0 to avoid conflicts
+        TaskScope::SpecificShard(0)
+    }
+
+    fn run(self: Box<Self>, ctx: TaskCtx) -> TaskFuture {
+        Box::pin(async move {
+            info!(
+                "Personal access token cleaner is enabled, expired tokens will 
be deleted every: {:?}.",
+                self.period
+            );
+
+            loop {
+                // Use sleep_or_shutdown to allow clean interruption
+                if !ctx.shutdown.sleep_or_shutdown(self.period).await {
+                    trace!("Personal access token cleaner shutting down");
+                    break;
+                }
+
+                trace!("Cleaning expired personal access tokens...");
+
+                let users = self.shard.users.borrow();
+                let now = IggyTimestamp::now();
+                let mut deleted_tokens_count = 0;
+
+                for (_, user) in users.iter() {
+                    let expired_tokens = user
+                        .personal_access_tokens
+                        .iter()
+                        .filter(|token| token.is_expired(now))
+                        .map(|token| token.token.clone())
+                        .collect::<Vec<_>>();
+
+                    for token in expired_tokens {
+                        debug!(
+                            "Personal access token: {} for user with ID: {} is 
expired.",
+                            token, user.id
+                        );
+                        deleted_tokens_count += 1;
+                        user.personal_access_tokens.remove(&token);
+                        debug!(
+                            "Deleted personal access token: {} for user with 
ID: {}.",
+                            token, user.id
+                        );
+                    }
+                }
+
+                info!(
+                    "Deleted {} expired personal access tokens.",
+                    deleted_tokens_count
+                );
+            }
+
+            Ok(())
+        })
+    }
+}
+
+impl PeriodicSpec for ClearPersonalAccessTokens {
+    fn period(&self) -> Duration {
+        self.period
+    }
+}
diff --git a/core/server/src/shard/tasks/periodic/jwt_token_cleaner.rs 
b/core/server/src/shard/tasks/periodic/jwt_token_cleaner.rs
new file mode 100644
index 00000000..4e4c8332
--- /dev/null
+++ b/core/server/src/shard/tasks/periodic/jwt_token_cleaner.rs
@@ -0,0 +1,110 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::shard::IggyShard;
+use crate::shard::tasks::specs::{
+    PeriodicSpec, TaskCtx, TaskFuture, TaskKind, TaskScope, TaskSpec,
+};
+use std::fmt::Debug;
+use std::rc::Rc;
+use std::time::Duration;
+use tracing::{info, trace};
+
+/// Periodic task for cleaning expired JWT tokens
+pub struct JwtTokenCleaner {
+    shard: Rc<IggyShard>,
+    period: Duration,
+}
+
+impl Debug for JwtTokenCleaner {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("JwtTokenCleaner")
+            .field("shard_id", &self.shard.id)
+            .field("period", &self.period)
+            .finish()
+    }
+}
+
+impl JwtTokenCleaner {
+    pub fn new(shard: Rc<IggyShard>, period: Duration) -> Self {
+        Self { shard, period }
+    }
+}
+
+impl TaskSpec for JwtTokenCleaner {
+    fn name(&self) -> &'static str {
+        "jwt_token_cleaner"
+    }
+
+    fn kind(&self) -> TaskKind {
+        TaskKind::Periodic {
+            period: self.period,
+        }
+    }
+
+    fn scope(&self) -> TaskScope {
+        // Only clean JWT tokens on shard 0 to avoid conflicts
+        TaskScope::SpecificShard(0)
+    }
+
+    fn run(self: Box<Self>, ctx: TaskCtx) -> TaskFuture {
+        Box::pin(async move {
+            // Only run if HTTP is enabled and JWT manager exists
+            if !self.shard.config.http.enabled {
+                info!("JWT token cleaner skipped - HTTP is disabled");
+                return Ok(());
+            }
+
+            info!(
+                "JWT token cleaner is enabled, expired tokens will be deleted 
every: {:?}.",
+                self.period
+            );
+
+            loop {
+                // Use sleep_or_shutdown to allow clean interruption
+                if !ctx.shutdown.sleep_or_shutdown(self.period).await {
+                    trace!("JWT token cleaner shutting down");
+                    break;
+                }
+
+                trace!("Deleting expired JWT tokens...");
+
+                // TODO: Access JWT manager through shard when it's available
+                // For now, this task serves as a placeholder for the JWT 
cleaning logic
+                // The actual implementation will need access to the JWT 
manager instance
+
+                // Note: The current implementation in http/jwt/cleaner.rs 
uses app_state
+                // which is not directly accessible from shard. This would 
need refactoring
+                // to either:
+                // 1. Store JWT manager in shard
+                // 2. Pass JWT manager reference to this task
+                // 3. Use a channel to communicate with HTTP server
+
+                trace!("JWT token cleaning cycle completed");
+            }
+
+            Ok(())
+        })
+    }
+}
+
+impl PeriodicSpec for JwtTokenCleaner {
+    fn period(&self) -> Duration {
+        self.period
+    }
+}
diff --git a/core/server/src/channels/commands/mod.rs 
b/core/server/src/shard/tasks/periodic/mod.rs
similarity index 74%
rename from core/server/src/channels/commands/mod.rs
rename to core/server/src/shard/tasks/periodic/mod.rs
index 0130170b..5b80e7b1 100644
--- a/core/server/src/channels/commands/mod.rs
+++ b/core/server/src/shard/tasks/periodic/mod.rs
@@ -16,7 +16,14 @@
  * under the License.
  */
 
-pub mod clean_personal_access_tokens;
+pub mod clear_personal_access_tokens;
+pub mod jwt_token_cleaner;
 pub mod print_sysinfo;
 pub mod save_messages;
 pub mod verify_heartbeats;
+
+pub use clear_personal_access_tokens::ClearPersonalAccessTokens;
+pub use jwt_token_cleaner::JwtTokenCleaner;
+pub use print_sysinfo::PrintSysinfo;
+pub use save_messages::SaveMessages;
+pub use verify_heartbeats::VerifyHeartbeats;
diff --git a/core/server/src/shard/tasks/periodic/print_sysinfo.rs 
b/core/server/src/shard/tasks/periodic/print_sysinfo.rs
new file mode 100644
index 00000000..9b71a76d
--- /dev/null
+++ b/core/server/src/shard/tasks/periodic/print_sysinfo.rs
@@ -0,0 +1,127 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::shard::IggyShard;
+use crate::shard::tasks::specs::{
+    PeriodicSpec, TaskCtx, TaskFuture, TaskKind, TaskScope, TaskSpec,
+};
+use crate::streaming::utils::memory_pool;
+use human_repr::HumanCount;
+use std::fmt::Debug;
+use std::rc::Rc;
+use std::time::Duration;
+use tracing::{error, info, trace};
+
+/// Periodic task for printing system information
+pub struct PrintSysinfo {
+    shard: Rc<IggyShard>,
+    period: Duration,
+}
+
+impl Debug for PrintSysinfo {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("PrintSysinfo")
+            .field("shard_id", &self.shard.id)
+            .field("period", &self.period)
+            .finish()
+    }
+}
+
+impl PrintSysinfo {
+    pub fn new(shard: Rc<IggyShard>, period: Duration) -> Self {
+        Self { shard, period }
+    }
+}
+
+impl TaskSpec for PrintSysinfo {
+    fn name(&self) -> &'static str {
+        "print_sysinfo"
+    }
+
+    fn kind(&self) -> TaskKind {
+        TaskKind::Periodic {
+            period: self.period,
+        }
+    }
+
+    fn scope(&self) -> TaskScope {
+        // Only print sysinfo from shard 0 to avoid duplicate logs
+        TaskScope::SpecificShard(0)
+    }
+
+    fn run(self: Box<Self>, ctx: TaskCtx) -> TaskFuture {
+        Box::pin(async move {
+            if self.period == Duration::ZERO {
+                info!("SysInfoPrinter is disabled.");
+                return Ok(());
+            }
+
+            info!(
+                "SysInfoPrinter is enabled, system information will be printed 
every {:?}.",
+                self.period
+            );
+
+            loop {
+                // Use sleep_or_shutdown to allow clean interruption
+                if !ctx.shutdown.sleep_or_shutdown(self.period).await {
+                    trace!("SysInfoPrinter shutting down");
+                    break;
+                }
+
+                trace!("Printing system information...");
+
+                let stats = match self.shard.get_stats().await {
+                    Ok(stats) => stats,
+                    Err(e) => {
+                        error!("Failed to get system information. Error: {e}");
+                        continue;
+                    }
+                };
+
+                let free_memory_percent = 
(stats.available_memory.as_bytes_u64() as f64
+                    / stats.total_memory.as_bytes_u64() as f64)
+                    * 100f64;
+
+                info!(
+                    "CPU: {:.2}%/{:.2}% (IggyUsage/Total), Mem: 
{:.2}%/{}/{}/{} (Free/IggyUsage/TotalUsed/Total), Clients: {}, Messages 
processed: {}, Read: {}, Written: {}, Uptime: {}",
+                    stats.cpu_usage,
+                    stats.total_cpu_usage,
+                    free_memory_percent,
+                    stats.memory_usage,
+                    stats.total_memory - stats.available_memory,
+                    stats.total_memory,
+                    stats.clients_count,
+                    stats.messages_count.human_count_bare(),
+                    stats.read_bytes,
+                    stats.written_bytes,
+                    stats.run_time
+                );
+
+                memory_pool().log_stats();
+            }
+
+            Ok(())
+        })
+    }
+}
+
+impl PeriodicSpec for PrintSysinfo {
+    fn period(&self) -> Duration {
+        self.period
+    }
+}
diff --git a/core/server/src/shard/tasks/periodic/save_messages.rs 
b/core/server/src/shard/tasks/periodic/save_messages.rs
new file mode 100644
index 00000000..ea50535f
--- /dev/null
+++ b/core/server/src/shard/tasks/periodic/save_messages.rs
@@ -0,0 +1,137 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::shard::IggyShard;
+use crate::shard::tasks::specs::{
+    PeriodicSpec, TaskCtx, TaskFuture, TaskKind, TaskScope, TaskSpec,
+};
+use crate::shard_info;
+use iggy_common::Identifier;
+use std::fmt::Debug;
+use std::rc::Rc;
+use std::time::Duration;
+use tracing::{error, info, trace};
+
+/// Periodic task for saving buffered messages to disk
+pub struct SaveMessages {
+    shard: Rc<IggyShard>,
+    period: Duration,
+}
+
+impl Debug for SaveMessages {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("SaveMessages")
+            .field("shard_id", &self.shard.id)
+            .field("period", &self.period)
+            .finish()
+    }
+}
+
+impl SaveMessages {
+    pub fn new(shard: Rc<IggyShard>, period: Duration) -> Self {
+        Self { shard, period }
+    }
+}
+
+impl TaskSpec for SaveMessages {
+    fn name(&self) -> &'static str {
+        "save_messages"
+    }
+
+    fn kind(&self) -> TaskKind {
+        TaskKind::Periodic {
+            period: self.period,
+        }
+    }
+
+    fn scope(&self) -> TaskScope {
+        TaskScope::AllShards
+    }
+
+    fn run(self: Box<Self>, ctx: TaskCtx) -> TaskFuture {
+        Box::pin(async move {
+            let enforce_fsync = self.shard.config.message_saver.enforce_fsync;
+            info!(
+                "Message saver is enabled, buffered messages will be 
automatically saved every: {:?}, enforce fsync: {enforce_fsync}.",
+                self.period
+            );
+
+            loop {
+                // Use sleep_or_shutdown to allow clean interruption
+                if !ctx.shutdown.sleep_or_shutdown(self.period).await {
+                    trace!("Message saver shutting down");
+                    break;
+                }
+
+                trace!("Saving buffered messages...");
+
+                let namespaces = self.shard.get_current_shard_namespaces();
+                let mut total_saved_messages = 0u32;
+                let reason = "background saver triggered".to_string();
+
+                for ns in namespaces {
+                    let stream_id = Identifier::numeric(ns.stream_id() as 
u32).unwrap();
+                    let topic_id = Identifier::numeric(ns.topic_id() as 
u32).unwrap();
+                    let partition_id = ns.partition_id();
+
+                    match self
+                        .shard
+                        .streams2
+                        .persist_messages(
+                            self.shard.id,
+                            &stream_id,
+                            &topic_id,
+                            partition_id,
+                            reason.clone(),
+                            &self.shard.config.system,
+                        )
+                        .await
+                    {
+                        Ok(batch_count) => {
+                            total_saved_messages += batch_count;
+                        }
+                        Err(err) => {
+                            error!(
+                                "Failed to save messages for partition {}: {}",
+                                partition_id, err
+                            );
+                        }
+                    }
+                }
+
+                if total_saved_messages > 0 {
+                    shard_info!(
+                        self.shard.id,
+                        "Saved {} buffered messages on disk.",
+                        total_saved_messages
+                    );
+                }
+
+                trace!("Finished saving buffered messages.");
+            }
+
+            Ok(())
+        })
+    }
+}
+
+impl PeriodicSpec for SaveMessages {
+    fn period(&self) -> Duration {
+        self.period
+    }
+}
diff --git a/core/server/src/shard/tasks/periodic/verify_heartbeats.rs 
b/core/server/src/shard/tasks/periodic/verify_heartbeats.rs
new file mode 100644
index 00000000..8d5c3ef2
--- /dev/null
+++ b/core/server/src/shard/tasks/periodic/verify_heartbeats.rs
@@ -0,0 +1,132 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::shard::IggyShard;
+use crate::shard::tasks::specs::{
+    PeriodicSpec, TaskCtx, TaskFuture, TaskKind, TaskScope, TaskSpec,
+};
+use iggy_common::{IggyDuration, IggyTimestamp};
+use std::fmt::Debug;
+use std::rc::Rc;
+use std::time::Duration;
+use tracing::{debug, info, trace, warn};
+
+const MAX_THRESHOLD: f64 = 1.2;
+
+/// Periodic task for verifying client heartbeats and removing stale clients
+pub struct VerifyHeartbeats {
+    shard: Rc<IggyShard>,
+    period: Duration,
+}
+
+impl Debug for VerifyHeartbeats {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("VerifyHeartbeats")
+            .field("shard_id", &self.shard.id)
+            .field("period", &self.period)
+            .finish()
+    }
+}
+
+impl VerifyHeartbeats {
+    pub fn new(shard: Rc<IggyShard>, period: Duration) -> Self {
+        Self { shard, period }
+    }
+}
+
+impl TaskSpec for VerifyHeartbeats {
+    fn name(&self) -> &'static str {
+        "verify_heartbeats"
+    }
+
+    fn kind(&self) -> TaskKind {
+        TaskKind::Periodic {
+            period: self.period,
+        }
+    }
+
+    fn scope(&self) -> TaskScope {
+        TaskScope::AllShards
+    }
+
+    fn run(self: Box<Self>, ctx: TaskCtx) -> TaskFuture {
+        Box::pin(async move {
+            let interval = IggyDuration::from(self.period);
+            let max_interval =
+                IggyDuration::from((MAX_THRESHOLD * interval.as_micros() as 
f64) as u64);
+            info!(
+                "Heartbeats will be verified every: {interval}. Max allowed 
interval: {max_interval}."
+            );
+
+            loop {
+                // Use sleep_or_shutdown to allow clean interruption
+                if !ctx.shutdown.sleep_or_shutdown(self.period).await {
+                    trace!("Heartbeat verifier shutting down");
+                    break;
+                }
+
+                trace!("Verifying heartbeats...");
+
+                let clients = {
+                    let client_manager = self.shard.client_manager.borrow();
+                    client_manager.get_clients()
+                };
+
+                let now = IggyTimestamp::now();
+                let heartbeat_to = IggyTimestamp::from(now.as_micros() - 
max_interval.as_micros());
+                debug!("Verifying heartbeats at: {now}, max allowed timestamp: 
{heartbeat_to}");
+
+                let mut stale_clients = Vec::new();
+                for client in clients {
+                    if client.last_heartbeat.as_micros() < 
heartbeat_to.as_micros() {
+                        warn!(
+                            "Stale client session: {}, last heartbeat at: {}, 
max allowed timestamp: {heartbeat_to}",
+                            client.session, client.last_heartbeat,
+                        );
+                        client.session.set_stale();
+                        stale_clients.push(client.session.client_id);
+                    } else {
+                        debug!(
+                            "Valid heartbeat at: {} for client session: {}, 
max allowed timestamp: {heartbeat_to}",
+                            client.last_heartbeat, client.session,
+                        );
+                    }
+                }
+
+                if stale_clients.is_empty() {
+                    continue;
+                }
+
+                let count = stale_clients.len();
+                info!("Removing {count} stale clients...");
+                for client_id in stale_clients {
+                    self.shard.delete_client(client_id);
+                }
+                info!("Removed {count} stale clients.");
+            }
+
+            Ok(())
+        })
+    }
+}
+
+impl PeriodicSpec for VerifyHeartbeats {
+    fn period(&self) -> Duration {
+        self.period
+    }
+}
diff --git a/core/server/src/shard/tasks/shutdown.rs 
b/core/server/src/shard/tasks/shutdown.rs
new file mode 100644
index 00000000..6bb36d63
--- /dev/null
+++ b/core/server/src/shard/tasks/shutdown.rs
@@ -0,0 +1,233 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use async_channel::{Receiver, Sender, bounded};
+use futures::FutureExt;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::time::Duration;
+use tracing::trace;
+
+/// Coordinates graceful shutdown across multiple tasks
+#[derive(Clone)]
+pub struct Shutdown {
+    sender: Sender<()>,
+    is_triggered: Arc<AtomicBool>,
+}
+
+impl Shutdown {
+    pub fn new() -> (Self, ShutdownToken) {
+        let (sender, receiver) = bounded(1);
+        let is_triggered = Arc::new(AtomicBool::new(false));
+
+        let shutdown = Self {
+            sender,
+            is_triggered: is_triggered.clone(),
+        };
+
+        let token = ShutdownToken {
+            receiver,
+            is_triggered,
+        };
+
+        (shutdown, token)
+    }
+
+    pub async fn trigger(&self) {
+        if self.is_triggered.swap(true, Ordering::SeqCst) {
+            return; // Already triggered
+        }
+
+        trace!("Triggering shutdown signal");
+        let _ = self.sender.close();
+    }
+
+    pub fn is_triggered(&self) -> bool {
+        self.is_triggered.load(Ordering::Relaxed)
+    }
+}
+
+/// Token held by tasks to receive shutdown signals
+#[derive(Clone)]
+pub struct ShutdownToken {
+    receiver: Receiver<()>,
+    is_triggered: Arc<AtomicBool>,
+}
+
+impl ShutdownToken {
+    /// Wait for shutdown signal
+    pub async fn wait(&self) {
+        let _ = self.receiver.recv().await;
+    }
+
+    /// Check if shutdown has been triggered (non-blocking)
+    pub fn is_triggered(&self) -> bool {
+        self.is_triggered.load(Ordering::Relaxed)
+    }
+
+    /// Sleep for the specified duration or until shutdown is triggered
+    /// Returns true if the full duration elapsed, false if shutdown was 
triggered
+    pub async fn sleep_or_shutdown(&self, duration: Duration) -> bool {
+        futures::select! {
+            _ = self.wait().fuse() => false,
+            _ = compio::time::sleep(duration).fuse() => !self.is_triggered(),
+        }
+    }
+
+    /// Create a child token that triggers when either this token or the 
child's own signal triggers
+    pub fn child(&self) -> (Shutdown, ShutdownToken) {
+        let (child_shutdown, child_token) = Shutdown::new();
+        let parent_receiver = self.receiver.clone();
+        let child_receiver = child_token.receiver.clone();
+
+        // Create a new channel for the combined token
+        let (combined_sender, combined_receiver) = bounded(1);
+
+        // Create a combined is_triggered flag
+        let combined_is_triggered = Arc::new(AtomicBool::new(false));
+
+        // Clone references for the async task
+        let parent_triggered = self.is_triggered.clone();
+        let child_triggered = child_token.is_triggered.clone();
+        let combined_flag_for_task = combined_is_triggered.clone();
+
+        // Spawn a single task that waits for either parent or child shutdown
+        compio::runtime::spawn(async move {
+            // Wait for either parent or child to trigger
+            futures::select! {
+                _ = parent_receiver.recv().fuse() => {
+                    trace!("Child token triggered by parent shutdown");
+                },
+                _ = child_receiver.recv().fuse() => {
+                    trace!("Child token triggered by child shutdown");
+                },
+            }
+
+            // Set the combined flag based on which one triggered
+            if parent_triggered.load(Ordering::Relaxed) || 
child_triggered.load(Ordering::Relaxed) {
+                combined_flag_for_task.store(true, Ordering::SeqCst);
+            }
+
+            // Close the combined channel to signal shutdown
+            let _ = combined_sender.close();
+        })
+        .detach();
+
+        let combined_token = ShutdownToken {
+            receiver: combined_receiver,
+            is_triggered: combined_is_triggered,
+        };
+
+        (child_shutdown, combined_token)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[compio::test]
+    async fn test_shutdown_trigger() {
+        let (shutdown, token) = Shutdown::new();
+
+        assert!(!token.is_triggered());
+
+        shutdown.trigger().await;
+
+        assert!(token.is_triggered());
+
+        // Should complete immediately after trigger
+        token.wait().await;
+    }
+
+    #[compio::test]
+    async fn test_sleep_or_shutdown_completes() {
+        let (_shutdown, token) = Shutdown::new();
+
+        // Should complete the full sleep
+        let completed = 
token.sleep_or_shutdown(Duration::from_millis(10)).await;
+        assert!(completed);
+    }
+
+    #[compio::test]
+    async fn test_sleep_or_shutdown_interrupted() {
+        let (shutdown, token) = Shutdown::new();
+
+        // Trigger shutdown after a short delay
+        let shutdown_clone = shutdown.clone();
+        compio::runtime::spawn(async move {
+            compio::time::sleep(Duration::from_millis(10)).await;
+            shutdown_clone.trigger().await;
+        })
+        .detach();
+
+        // Should be interrupted
+        let completed = token.sleep_or_shutdown(Duration::from_secs(10)).await;
+        assert!(!completed);
+    }
+
+    #[compio::test]
+    async fn test_child_token_parent_trigger() {
+        let (parent_shutdown, parent_token) = Shutdown::new();
+        let (_child_shutdown, combined_token) = parent_token.child();
+
+        assert!(!combined_token.is_triggered());
+
+        // Trigger parent shutdown
+        parent_shutdown.trigger().await;
+
+        // Combined token should be triggered
+        combined_token.wait().await;
+        assert!(combined_token.is_triggered());
+    }
+
+    #[compio::test]
+    async fn test_child_token_child_trigger() {
+        let (_parent_shutdown, parent_token) = Shutdown::new();
+        let (child_shutdown, combined_token) = parent_token.child();
+
+        assert!(!combined_token.is_triggered());
+
+        // Trigger child shutdown
+        child_shutdown.trigger().await;
+
+        // Combined token should be triggered
+        combined_token.wait().await;
+        assert!(combined_token.is_triggered());
+    }
+
+    #[compio::test]
+    async fn test_child_token_no_polling_overhead() {
+        let (_parent_shutdown, parent_token) = Shutdown::new();
+        let (_child_shutdown, combined_token) = parent_token.child();
+
+        // Test that we can create many child tokens without performance issues
+        let start = std::time::Instant::now();
+        for _ in 0..100 {
+            let _ = combined_token.child();
+        }
+        let elapsed = start.elapsed();
+
+        // Should complete very quickly since there's no polling
+        assert!(
+            elapsed.as_millis() < 100,
+            "Creating child tokens took too long: {:?}",
+            elapsed
+        );
+    }
+}
diff --git a/core/server/src/shard/tasks/specs.rs 
b/core/server/src/shard/tasks/specs.rs
new file mode 100644
index 00000000..2b3340ea
--- /dev/null
+++ b/core/server/src/shard/tasks/specs.rs
@@ -0,0 +1,203 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::shard::IggyShard;
+use iggy_common::IggyError;
+use std::fmt::Debug;
+use std::future::Future;
+use std::pin::Pin;
+use std::rc::Rc;
+use std::time::Duration;
+use strum::Display;
+
+use super::shutdown::ShutdownToken;
+
+/// Context provided to all tasks when they run
+#[derive(Clone)]
+pub struct TaskCtx {
+    pub shard: Rc<IggyShard>,
+    pub shutdown: ShutdownToken,
+}
+
+/// Future type returned by task run methods
+pub type TaskFuture = Pin<Box<dyn Future<Output = Result<(), IggyError>>>>;
+
+/// Describes the kind of task and its scheduling behavior
+#[derive(Debug, Clone, Display)]
+pub enum TaskKind {
+    /// Runs continuously until shutdown
+    Continuous,
+    /// Runs periodically at specified intervals
+    Periodic { period: Duration },
+    /// Runs once and completes
+    OneShot,
+}
+
+/// Determines which shards should run this task
+#[derive(Clone, Debug)]
+pub enum TaskScope {
+    /// Run on all shards
+    AllShards,
+    /// Run on a specific shard by ID
+    SpecificShard(u16),
+}
+
+impl TaskScope {
+    /// Check if this task should run on the given shard
+    pub fn should_run(&self, shard: &IggyShard) -> bool {
+        match self {
+            TaskScope::AllShards => true,
+            TaskScope::SpecificShard(id) => shard.id == *id,
+        }
+    }
+}
+
+/// Core trait that all tasks must implement
+pub trait TaskSpec: Debug {
+    /// Unique name for this task
+    fn name(&self) -> &'static str;
+
+    /// The kind of task (continuous, periodic, or oneshot)
+    fn kind(&self) -> TaskKind;
+
+    /// Scope determining which shards run this task
+    fn scope(&self) -> TaskScope;
+
+    /// Run the task with the provided context
+    fn run(self: Box<Self>, ctx: TaskCtx) -> TaskFuture;
+
+    /// Optional: called before the task starts
+    fn on_start(&self) {
+        tracing::info!("Starting task: {}", self.name());
+    }
+
+    /// Optional: called after the task completes
+    fn on_complete(&self, result: &Result<(), IggyError>) {
+        match result {
+            Ok(()) => tracing::info!("Task {} completed successfully", 
self.name()),
+            Err(e) => tracing::error!("Task {} failed: {}", self.name(), e),
+        }
+    }
+
+    /// Optional: whether this task is critical (failure should stop the shard)
+    fn is_critical(&self) -> bool {
+        false
+    }
+}
+
+/// Marker trait for continuous tasks
+pub trait ContinuousSpec: TaskSpec {
+    fn as_task_spec(self: Box<Self>) -> Box<dyn TaskSpec>
+    where
+        Self: 'static + Sized,
+    {
+        self as Box<dyn TaskSpec>
+    }
+}
+
+/// Marker trait for periodic tasks
+pub trait PeriodicSpec: TaskSpec {
+    /// Get the period for this periodic task
+    fn period(&self) -> Duration;
+
+    /// Optional: whether to run one final tick on shutdown
+    fn last_tick_on_shutdown(&self) -> bool {
+        false
+    }
+
+    fn as_task_spec(self: Box<Self>) -> Box<dyn TaskSpec>
+    where
+        Self: 'static + Sized,
+    {
+        self as Box<dyn TaskSpec>
+    }
+}
+
+/// Marker trait for oneshot tasks
+pub trait OneShotSpec: TaskSpec {
+    /// Optional: timeout for this oneshot task
+    fn timeout(&self) -> Option<Duration> {
+        None
+    }
+
+    fn as_task_spec(self: Box<Self>) -> Box<dyn TaskSpec>
+    where
+        Self: 'static + Sized,
+    {
+        self as Box<dyn TaskSpec>
+    }
+}
+
+/// A no-op task that does nothing (useful for conditional task creation)
+#[derive(Debug)]
+pub struct NoOpTask;
+
+impl TaskSpec for NoOpTask {
+    fn name(&self) -> &'static str {
+        "noop"
+    }
+
+    fn kind(&self) -> TaskKind {
+        TaskKind::OneShot
+    }
+
+    fn scope(&self) -> TaskScope {
+        TaskScope::AllShards
+    }
+
+    fn run(self: Box<Self>, _ctx: TaskCtx) -> TaskFuture {
+        Box::pin(async { Ok(()) })
+    }
+}
+
+/// Helper to create a no-op task
+pub fn noop() -> Box<dyn TaskSpec> {
+    Box::new(NoOpTask)
+}
+
+/// Helper trait to check if a task is a no-op
+pub trait IsNoOp {
+    fn is_noop(&self) -> bool;
+}
+
+impl IsNoOp for Box<dyn TaskSpec> {
+    fn is_noop(&self) -> bool {
+        self.name() == "noop"
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_task_scope() {
+        let shard = IggyShard::default_from_config(Default::default());
+
+        assert!(TaskScope::AllShards.should_run(&shard));
+        assert!(TaskScope::SpecificShard(0).should_run(&shard)); // shard.id 
== 0 by default
+        assert!(!TaskScope::SpecificShard(1).should_run(&shard));
+    }
+
+    #[test]
+    fn test_noop_task() {
+        let task = noop();
+        assert!(task.is_noop());
+        assert_eq!(task.name(), "noop");
+    }
+}
diff --git a/core/server/src/shard/tasks/supervisor.rs 
b/core/server/src/shard/tasks/supervisor.rs
new file mode 100644
index 00000000..a97b1fd1
--- /dev/null
+++ b/core/server/src/shard/tasks/supervisor.rs
@@ -0,0 +1,412 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::shutdown::{Shutdown, ShutdownToken};
+use super::specs::{TaskCtx, TaskKind, TaskSpec};
+use crate::shard::IggyShard;
+use compio::runtime::JoinHandle;
+use futures::future::join_all;
+use iggy_common::IggyError;
+use std::cell::RefCell;
+use std::collections::HashMap;
+use std::rc::Rc;
+use std::time::Duration;
+use tracing::{error, info, trace, warn};
+
+/// Handle to a spawned task
+struct TaskHandle {
+    name: String,
+    kind: TaskKind,
+    handle: JoinHandle<Result<(), IggyError>>,
+    is_critical: bool,
+}
+
+/// Supervises the lifecycle of all tasks in a shard
+pub struct TaskSupervisor {
+    shard_id: u16,
+    shutdown: Shutdown,
+    shutdown_token: ShutdownToken,
+    tasks: RefCell<Vec<TaskHandle>>,
+    oneshot_handles: RefCell<Vec<TaskHandle>>,
+    active_connections: RefCell<HashMap<u32, async_channel::Sender<()>>>,
+}
+
+impl TaskSupervisor {
+    /// Create a new task supervisor for a shard
+    pub fn new(shard_id: u16) -> Self {
+        let (shutdown, shutdown_token) = Shutdown::new();
+
+        Self {
+            shard_id,
+            shutdown,
+            shutdown_token,
+            tasks: RefCell::new(Vec::new()),
+            oneshot_handles: RefCell::new(Vec::new()),
+            active_connections: RefCell::new(HashMap::new()),
+        }
+    }
+
+    /// Get a shutdown token for tasks
+    pub fn shutdown_token(&self) -> ShutdownToken {
+        self.shutdown_token.clone()
+    }
+
+    /// Spawn all tasks according to their specifications
+    pub fn spawn(&self, shard: Rc<IggyShard>, specs: Vec<Box<dyn TaskSpec>>) {
+        for spec in specs {
+            // Check if task should run on this shard
+            if !spec.scope().should_run(&shard) {
+                trace!(
+                    "Skipping task {} ({}) on shard {} due to scope",
+                    spec.name(),
+                    spec.kind(),
+                    self.shard_id
+                );
+                continue;
+            }
+
+            // Skip no-op tasks
+            if spec.name() == "noop" {
+                continue;
+            }
+
+            self.spawn_single(shard.clone(), spec);
+        }
+
+        info!(
+            "Shard {} spawned {} tasks ({} oneshot)",
+            self.shard_id,
+            self.tasks.borrow().len(),
+            self.oneshot_handles.borrow().len()
+        );
+    }
+
+    /// Spawn a single task based on its kind
+    fn spawn_single(&self, shard: Rc<IggyShard>, spec: Box<dyn TaskSpec>) {
+        let name = spec.name();
+        let kind = spec.kind();
+        let is_critical = spec.is_critical();
+
+        info!(
+            "Spawning {} task '{}' on shard {}",
+            match &kind {
+                TaskKind::Continuous => "continuous",
+                TaskKind::Periodic { .. } => "periodic",
+                TaskKind::OneShot => "oneshot",
+            },
+            name,
+            self.shard_id
+        );
+
+        spec.on_start();
+
+        let ctx = TaskCtx {
+            shard,
+            shutdown: self.shutdown_token.clone(),
+        };
+
+        let handle = match kind {
+            TaskKind::Continuous => self.spawn_continuous(spec, ctx),
+            TaskKind::Periodic { period } => self.spawn_periodic(spec, ctx, 
period),
+            TaskKind::OneShot => self.spawn_oneshot(spec, ctx),
+        };
+
+        let task_handle = TaskHandle {
+            name: name.to_string(),
+            kind: kind.clone(),
+            handle,
+            is_critical,
+        };
+
+        // Store handle based on kind
+        match kind {
+            TaskKind::OneShot => 
self.oneshot_handles.borrow_mut().push(task_handle),
+            _ => self.tasks.borrow_mut().push(task_handle),
+        }
+    }
+
+    /// Spawn a continuous task
+    fn spawn_continuous(
+        &self,
+        spec: Box<dyn TaskSpec>,
+        ctx: TaskCtx,
+    ) -> JoinHandle<Result<(), IggyError>> {
+        let name = spec.name();
+        let shard_id = self.shard_id;
+
+        compio::runtime::spawn(async move {
+            trace!("Continuous task '{}' starting on shard {}", name, 
shard_id);
+            let result = spec.run(ctx).await;
+
+            match &result {
+                Ok(()) => info!("Continuous task '{}' completed on shard {}", 
name, shard_id),
+                Err(e) => error!(
+                    "Continuous task '{}' failed on shard {}: {}",
+                    name, shard_id, e
+                ),
+            }
+
+            result
+        })
+    }
+
+    /// Spawn a periodic task
+    fn spawn_periodic(
+        &self,
+        spec: Box<dyn TaskSpec>,
+        ctx: TaskCtx,
+        period: Duration,
+    ) -> JoinHandle<Result<(), IggyError>> {
+        let name = spec.name();
+        let shard_id = self.shard_id;
+
+        compio::runtime::spawn(async move {
+            trace!(
+                "Periodic task '{}' starting on shard {} with period {:?}",
+                name, shard_id, period
+            );
+
+            // Periodic tasks handle their own loop internally
+            let result = spec.run(ctx).await;
+
+            match &result {
+                Ok(()) => info!("Periodic task '{}' completed on shard {}", 
name, shard_id),
+                Err(e) => error!(
+                    "Periodic task '{}' failed on shard {}: {}",
+                    name, shard_id, e
+                ),
+            }
+
+            result
+        })
+    }
+
+    /// Spawn a oneshot task
+    fn spawn_oneshot(
+        &self,
+        spec: Box<dyn TaskSpec>,
+        ctx: TaskCtx,
+    ) -> JoinHandle<Result<(), IggyError>> {
+        let name = spec.name();
+        let shard_id = self.shard_id;
+
+        compio::runtime::spawn(async move {
+            trace!("OneShot task '{}' starting on shard {}", name, shard_id);
+            let result = spec.run(ctx).await;
+
+            match &result {
+                Ok(()) => info!("OneShot task '{}' completed on shard {}", 
name, shard_id),
+                Err(e) => error!(
+                    "OneShot task '{}' failed on shard {}: {}",
+                    name, shard_id, e
+                ),
+            }
+
+            result
+        })
+    }
+
+    /// Trigger graceful shutdown for all tasks
+    pub async fn graceful_shutdown(&self, timeout: Duration) -> bool {
+        info!(
+            "Initiating graceful shutdown for {} tasks on shard {}",
+            self.tasks.borrow().len() + self.oneshot_handles.borrow().len(),
+            self.shard_id
+        );
+
+        // First shutdown connections
+        self.shutdown_connections().await;
+
+        // Trigger shutdown signal
+        self.shutdown.trigger().await;
+
+        // Wait for continuous and periodic tasks with timeout
+        let continuous_periodic_tasks = self.tasks.take();
+        let continuous_periodic_complete = self
+            .await_tasks_with_timeout(continuous_periodic_tasks, timeout)
+            .await;
+
+        // Always wait for oneshot tasks (no timeout for durability)
+        let oneshot_tasks = self.oneshot_handles.take();
+        let oneshot_complete = self.await_oneshot_tasks(oneshot_tasks).await;
+
+        let all_complete = continuous_periodic_complete && oneshot_complete;
+
+        if all_complete {
+            info!("All tasks shutdown gracefully on shard {}", self.shard_id);
+        } else {
+            warn!(
+                "Some tasks did not shutdown cleanly on shard {}",
+                self.shard_id
+            );
+        }
+
+        all_complete
+    }
+
+    /// Wait for tasks with a timeout
+    async fn await_tasks_with_timeout(&self, tasks: Vec<TaskHandle>, timeout: 
Duration) -> bool {
+        if tasks.is_empty() {
+            return true;
+        }
+
+        let task_count = tasks.len();
+        let timeout_futures: Vec<_> = tasks
+            .into_iter()
+            .map(|task| async move {
+                let name = task.name.clone();
+                let kind = task.kind;
+                let is_critical = task.is_critical;
+
+                match compio::time::timeout(timeout, task.handle).await {
+                    Ok(Ok(Ok(()))) => {
+                        trace!("Task '{}' ({}) shutdown gracefully", name, 
kind);
+                        (name, true, false)
+                    }
+                    Ok(Ok(Err(e))) => {
+                        error!("Task '{}' ({}) failed during shutdown: {}", 
name, kind, e);
+                        (name, false, is_critical)
+                    }
+                    Ok(Err(e)) => {
+                        error!(
+                            "Task '{}' ({}) panicked during shutdown: {:?}",
+                            name, kind, e
+                        );
+                        (name, false, is_critical)
+                    }
+                    Err(_) => {
+                        warn!("Task '{}' ({}) did not complete within 
timeout", name, kind);
+                        (name, false, is_critical)
+                    }
+                }
+            })
+            .collect();
+
+        let results = join_all(timeout_futures).await;
+
+        let completed = results.iter().filter(|(_, success, _)| 
*success).count();
+        let critical_failures = results.iter().any(|(_, _, critical)| 
*critical);
+
+        info!(
+            "Shard {} shutdown: {}/{} tasks completed",
+            self.shard_id, completed, task_count
+        );
+
+        if critical_failures {
+            error!("Critical task(s) failed on shard {}", self.shard_id);
+        }
+
+        completed == task_count && !critical_failures
+    }
+
+    /// Wait for oneshot tasks (no timeout for durability)
+    async fn await_oneshot_tasks(&self, tasks: Vec<TaskHandle>) -> bool {
+        if tasks.is_empty() {
+            return true;
+        }
+
+        info!(
+            "Waiting for {} oneshot tasks to complete on shard {}",
+            tasks.len(),
+            self.shard_id
+        );
+
+        let futures: Vec<_> = tasks
+            .into_iter()
+            .map(|task| async move {
+                let name = task.name.clone();
+                match task.handle.await {
+                    Ok(Ok(())) => {
+                        trace!("OneShot task '{}' completed", name);
+                        true
+                    }
+                    Ok(Err(e)) => {
+                        error!("OneShot task '{}' failed: {}", name, e);
+                        false
+                    }
+                    Err(e) => {
+                        error!("OneShot task '{}' panicked: {:?}", name, e);
+                        false
+                    }
+                }
+            })
+            .collect();
+
+        let results = join_all(futures).await;
+        let all_complete = results.iter().all(|&r| r);
+
+        if all_complete {
+            info!("All oneshot tasks completed on shard {}", self.shard_id);
+        } else {
+            error!("Some oneshot tasks failed on shard {}", self.shard_id);
+        }
+
+        all_complete
+    }
+
+    /// Add a connection for tracking
+    pub fn add_connection(&self, client_id: u32) -> 
async_channel::Receiver<()> {
+        let (stop_sender, stop_receiver) = async_channel::bounded(1);
+        self.active_connections
+            .borrow_mut()
+            .insert(client_id, stop_sender);
+        stop_receiver
+    }
+
+    /// Remove a connection from tracking
+    pub fn remove_connection(&self, client_id: &u32) {
+        self.active_connections.borrow_mut().remove(client_id);
+    }
+
+    /// Spawn a tracked task (for connection handlers)
+    pub fn spawn_tracked<F>(&self, future: F)
+    where
+        F: Future<Output = ()> + 'static,
+    {
+        let handle = compio::runtime::spawn(async move {
+            future.await;
+            Ok(())
+        });
+        self.tasks.borrow_mut().push(TaskHandle {
+            name: "connection_handler".to_string(),
+            kind: TaskKind::Continuous,
+            handle,
+            is_critical: false,
+        });
+    }
+
+    /// Shutdown all connections gracefully
+    async fn shutdown_connections(&self) {
+        info!(
+            "Shutting down {} active connections",
+            self.active_connections.borrow().len()
+        );
+
+        let connections = self.active_connections.borrow();
+        for (client_id, stop_sender) in connections.iter() {
+            trace!("Sending shutdown signal to connection {}", client_id);
+            if let Err(e) = stop_sender.send(()).await {
+                warn!(
+                    "Failed to send shutdown signal to connection {}: {}",
+                    client_id, e
+                );
+            }
+        }
+        drop(connections);
+    }
+}
diff --git a/core/server/src/shard/transmission/message.rs 
b/core/server/src/shard/transmission/message.rs
index a4e4bb3a..14bf075b 100644
--- a/core/server/src/shard/transmission/message.rs
+++ b/core/server/src/shard/transmission/message.rs
@@ -1,7 +1,3 @@
-use std::{rc::Rc, sync::Arc};
-
-use iggy_common::{Identifier, PollingStrategy};
-
 /* Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -27,6 +23,7 @@ use crate::{
     slab::partitions,
     streaming::{polling_consumer::PollingConsumer, 
segments::IggyMessagesBatchMut},
 };
+use iggy_common::Identifier;
 
 pub enum ShardSendRequestResult {
     // TODO: In the future we can add other variants, for example backpressure 
from the destination shard,
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index 02ab2850..6fc652c6 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -721,6 +721,9 @@ impl Streams {
                 log.active_storage_mut().shutdown()
             });
 
+        // TODO: These fsync operations should use 
TaskSupervisor::spawn_oneshot with FsyncTask
+        // for proper tracking and graceful shutdown. However, this requires 
passing the shard
+        // reference through multiple layers. For now, we spawn directly to 
ensure durability.
         compio::runtime::spawn(async move {
             let _ = log_writer.fsync().await;
         })
diff --git a/core/server/src/slab/traits_ext.rs 
b/core/server/src/slab/traits_ext.rs
index 0db207cf..949971fa 100644
--- a/core/server/src/slab/traits_ext.rs
+++ b/core/server/src/slab/traits_ext.rs
@@ -158,7 +158,7 @@ type MappingByIdMut<'a, E, T> =
 pub type Components<T> = <T as IntoComponents>::Components;
 pub type ComponentsById<'a, T> = <T as IntoComponentsById>::Output;
 
-// TODO: 
+// TODO:
 // I've figured there is actually and ergonomic improvement that can be made 
here.
 // Observe that the chain of constraints put on the `EntityRef` type is 
actually wrong.
 // We constraint the `EntityRef` to be IntoComponents + IntoComponentsById,
@@ -173,7 +173,7 @@ pub type ComponentsById<'a, T> = <T as 
IntoComponentsById>::Output;
 
 // Maybe lets not go this way with the tuple mapping madness, it already is 
pretty difficult to distinguish between all of the different components,
 // and everytime we add a new component to an entity, we need to update the 
tuple type everywhere.
-// Better idea would be to use the `EntityRef` type directly inside of the 
`with_components_by_id` closure 
+// Better idea would be to use the `EntityRef` type directly inside of the 
`with_components_by_id` closure
 // -- f(components.into_components_by_id(id)) -> 
components.into_components_by_id(id) would return `EntityRef`, rather than the 
tuple.
 pub trait EntityComponentSystem<T>
 where
diff --git a/core/server/src/tcp/tcp_listener.rs 
b/core/server/src/tcp/tcp_listener.rs
index 18565ae6..c7b165b9 100644
--- a/core/server/src/tcp/tcp_listener.rs
+++ b/core/server/src/tcp/tcp_listener.rs
@@ -180,14 +180,14 @@ async fn accept_loop(
                         shard_info!(shard.id, "Created new session: {}", 
session);
                         let mut sender = SenderKind::get_tcp_sender(stream);
 
-                        let conn_stop_receiver = 
shard_clone.task_registry.add_connection(client_id);
+                        let conn_stop_receiver = 
shard_clone.task_supervisor.add_connection(client_id);
 
                         let shard_for_conn = shard_clone.clone();
-                        shard_clone.task_registry.spawn_tracked(async move {
+                        shard_clone.task_supervisor.spawn_tracked(async move {
                             if let Err(error) = handle_connection(&session, 
&mut sender, &shard_for_conn, conn_stop_receiver).await {
                                 handle_error(error);
                             }
-                            
shard_for_conn.task_registry.remove_connection(&client_id);
+                            
shard_for_conn.task_supervisor.remove_connection(&client_id);
 
                             if let Err(error) = sender.shutdown().await {
                                 shard_error!(shard.id, "Failed to shutdown TCP 
stream for client: {}, address: {}. {}", client_id, address, error);
diff --git a/core/server/src/tcp/tcp_tls_listener.rs 
b/core/server/src/tcp/tcp_tls_listener.rs
index ec69dbcd..0a5f7f8b 100644
--- a/core/server/src/tcp/tcp_tls_listener.rs
+++ b/core/server/src/tcp/tcp_tls_listener.rs
@@ -219,7 +219,7 @@ async fn accept_loop(
 
                         // Perform TLS handshake in a separate task to avoid 
blocking the accept loop
                         let task_shard = shard_clone.clone();
-                        task_shard.task_registry.spawn_tracked(async move {
+                        task_shard.task_supervisor.spawn_tracked(async move {
                             match acceptor.accept(stream).await {
                                 Ok(tls_stream) => {
                                     // TLS handshake successful, now create 
session
@@ -237,13 +237,13 @@ async fn accept_loop(
                                     let client_id = session.client_id;
                                     shard_info!(shard_clone.id, "Created new 
session: {}", session);
 
-                                    let conn_stop_receiver = 
shard_clone.task_registry.add_connection(client_id);
+                                    let conn_stop_receiver = 
shard_clone.task_supervisor.add_connection(client_id);
                                     let shard_for_conn = shard_clone.clone();
                                     let mut sender = 
SenderKind::get_tcp_tls_sender(tls_stream);
                                     if let Err(error) = 
handle_connection(&session, &mut sender, &shard_for_conn, 
conn_stop_receiver).await {
                                         handle_error(error);
                                     }
-                                    
shard_for_conn.task_registry.remove_connection(&client_id);
+                                    
shard_for_conn.task_supervisor.remove_connection(&client_id);
 
                                     if let Err(error) = 
sender.shutdown().await {
                                         shard_error!(shard.id, "Failed to 
shutdown TCP TLS stream for client: {}, address: {}. {}", client_id, address, 
error);

Reply via email to