This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch io_uring_tpc_task_registry
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit aa304b03986f73f19ece0d06c4f317360a8b18f9
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Sun Sep 28 10:53:25 2025 +0200

    builder
---
 core/server/Cargo.toml                             |   1 -
 core/server/src/main.rs                            |   4 -
 core/server/src/quic/listener.rs                   |  81 +++--
 core/server/src/shard/mod.rs                       |   9 +-
 core/server/src/shard/tasks/builder.rs             | 366 +++++++++++++++++++++
 .../src/shard/tasks/continuous/message_pump.rs     |   4 +-
 core/server/src/shard/tasks/mod.rs                 |   1 +
 core/server/src/shard/tasks/supervisor.rs          |  68 +++-
 core/server/src/tcp/tcp_listener.rs                |   4 +-
 9 files changed, 480 insertions(+), 58 deletions(-)

diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 64f71a31..49b6b66c 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -34,7 +34,6 @@ path = "src/main.rs"
 
 [features]
 default = ["mimalloc"]
-tokio-console = ["dep:console-subscriber", "tokio/tracing"]
 disable-mimalloc = []
 mimalloc = ["dep:mimalloc"]
 
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index cee7b9d0..8006b5d2 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -42,10 +42,7 @@ use server::configs::config_provider::{self};
 use server::configs::server::ServerConfig;
 use server::configs::sharding::CpuAllocation;
 use server::io::fs_utils;
-#[cfg(not(feature = "tokio-console"))]
 use server::log::logger::Logging;
-#[cfg(feature = "tokio-console")]
-use server::log::tokio_console::Logging;
 use server::server_error::{ConfigError, ServerError};
 use server::shard::namespace::IggyNamespace;
 use server::shard::system::info::SystemInfo;
@@ -358,7 +355,6 @@ async fn main() -> Result<(), ServerError> {
         info!("Received shutdown signal (SIGTERM/SIGINT), initiating graceful 
shutdown...");
 
         for (shard_id, stop_sender) in &shutdown_handles_for_signal {
-            info!("Sending shutdown signal to shard {}", shard_id);
             if let Err(e) = stop_sender.try_send(()) {
                 error!(
                     "Failed to send shutdown signal to shard {}: {}",
diff --git a/core/server/src/quic/listener.rs b/core/server/src/quic/listener.rs
index 60833e2e..623c00c6 100644
--- a/core/server/src/quic/listener.rs
+++ b/core/server/src/quic/listener.rs
@@ -16,8 +16,6 @@
  * under the License.
  */
 
-use std::rc::Rc;
-
 use crate::binary::command::{ServerCommand, ServerCommandHandler};
 use crate::binary::sender::SenderKind;
 use crate::server_error::ConnectionError;
@@ -31,43 +29,74 @@ use compio_quic::{Connection, Endpoint, RecvStream, 
SendStream};
 use futures::FutureExt;
 use iggy_common::IggyError;
 use iggy_common::TransportProtocol;
+use std::rc::Rc;
 use tracing::{error, info, trace};
 
 const INITIAL_BYTES_LENGTH: usize = 4;
 
 pub async fn start(endpoint: Endpoint, shard: Rc<IggyShard>) -> Result<(), 
IggyError> {
-    info!("Starting QUIC listener for shard {}", shard.id);
-
     // Since the QUIC Endpoint is internally Arc-wrapped and can be shared,
     // we only need one worker per shard rather than multiple workers per 
endpoint.
     // This avoids the N×workers multiplication when multiple shards are used.
-    while let Some(incoming_conn) = endpoint.wait_incoming().await {
-        let remote_addr = incoming_conn.remote_address();
-        trace!("Incoming connection from client: {}", remote_addr);
+    loop {
         let shard_clone = shard.clone();
-        let shard_for_conn = shard_clone.clone();
-
-        // Use TaskSupervisor to track connection handlers for graceful 
shutdown
-        task_supervisor().spawn_tracked(async move {
-            trace!("Accepting connection from {}", remote_addr);
-            match incoming_conn.await {
-                Ok(connection) => {
-                    trace!("Connection established from {}", remote_addr);
-                    if let Err(error) = handle_connection(connection, 
shard_for_conn).await {
-                        error!("QUIC connection from {} has failed: {error}", 
remote_addr);
-                    }
+        let shutdown_check = async {
+            loop {
+                if shard_clone.is_shutting_down() {
+                    return;
                 }
-                Err(error) => {
-                    error!(
-                        "Error when accepting incoming connection from {}: 
{:?}",
-                        remote_addr, error
-                    );
+                
compio::time::sleep(std::time::Duration::from_millis(100)).await;
+            }
+        };
+
+        let accept_future = endpoint.wait_incoming();
+
+        futures::select! {
+            _ = shutdown_check.fuse() => {
+                shard_debug!(shard.id, "QUIC listener detected shutdown flag, 
no longer accepting connections");
+                break;
+            }
+            incoming_conn = accept_future.fuse() => {
+                match incoming_conn {
+                    Some(incoming_conn) => {
+                        let remote_addr = incoming_conn.remote_address();
+
+                        if shard.is_shutting_down() {
+                            shard_info!(shard.id, "Rejecting new QUIC 
connection from {} during shutdown", remote_addr);
+                            continue;
+                        }
+
+                        trace!("Incoming connection from client: {}", 
remote_addr);
+                        let shard_for_conn = shard.clone();
+
+                        // Use TaskSupervisor to track connection handlers for 
graceful shutdown
+                        task_supervisor().spawn_tracked(async move {
+                            trace!("Accepting connection from {}", 
remote_addr);
+                            match incoming_conn.await {
+                                Ok(connection) => {
+                                    trace!("Connection established from {}", 
remote_addr);
+                                    if let Err(error) = 
handle_connection(connection, shard_for_conn).await {
+                                        error!("QUIC connection from {} has 
failed: {error}", remote_addr);
+                                    }
+                                }
+                                Err(error) => {
+                                    error!(
+                                        "Error when accepting incoming 
connection from {}: {:?}",
+                                        remote_addr, error
+                                    );
+                                }
+                            }
+                        });
+                    }
+                    None => {
+                        // Endpoint closed
+                        info!("QUIC endpoint closed for shard {}", shard.id);
+                        break;
+                    }
                 }
             }
-        });
+        }
     }
-
-    info!("QUIC listener for shard {} stopped", shard.id);
     Ok(())
 }
 
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 7cb6c6f7..9f8b4cfc 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -49,7 +49,7 @@ use std::{
     },
     time::{Duration, Instant},
 };
-use tracing::{error, info, instrument, trace, warn};
+use tracing::{debug, error, info, instrument, trace, warn};
 use transmission::connector::{Receiver, ShardConnector, StopReceiver, 
StopSender};
 
 use crate::{
@@ -253,15 +253,10 @@ impl IggyShard {
         // Spawn shutdown handler - only this task consumes the stop signal
         compio::runtime::spawn(async move {
             let _ = stop_receiver.recv().await;
-            info!("Shard {} received shutdown signal", shard_for_shutdown.id);
-
             let shutdown_success = shard_for_shutdown.trigger_shutdown().await;
             if !shutdown_success {
                 shard_error!(shard_for_shutdown.id, "shutdown timed out");
-            } else {
-                shard_info!(shard_for_shutdown.id, "shutdown completed 
successfully");
             }
-
             let _ = shutdown_complete_tx.send(()).await;
         })
         .detach();
@@ -384,7 +379,7 @@ impl IggyShard {
     #[instrument(skip_all, name = "trace_shutdown")]
     pub async fn trigger_shutdown(&self) -> bool {
         self.is_shutting_down.store(true, Ordering::SeqCst);
-        info!("Shard {} shutdown state set", self.id);
+        debug!("Shard {} shutdown state set", self.id);
         task_supervisor().graceful_shutdown(SHUTDOWN_TIMEOUT).await
     }
 
diff --git a/core/server/src/shard/tasks/builder.rs 
b/core/server/src/shard/tasks/builder.rs
new file mode 100644
index 00000000..e1a06326
--- /dev/null
+++ b/core/server/src/shard/tasks/builder.rs
@@ -0,0 +1,366 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::specs::{TaskCtx, TaskKind, TaskScope};
+use super::supervisor::TaskSupervisor;
+use crate::shard::IggyShard;
+use futures::Future;
+use futures::future::LocalBoxFuture;
+use iggy_common::IggyError;
+use std::marker::PhantomData;
+use std::rc::Rc;
+use std::time::Duration;
+use tracing::{debug, info};
+
+/// Type state for builder that hasn't had its task function set yet
+pub struct NoTask;
+
+/// Type state for builder that has its task function set
+pub struct HasTask;
+
+/// Builder for periodic tasks with fluent API
+pub struct PeriodicTaskBuilder<'a, State = NoTask> {
+    supervisor: &'a TaskSupervisor,
+    name: &'static str,
+    period: Option<Duration>,
+    scope: TaskScope,
+    critical: bool,
+    shard: Option<Rc<IggyShard>>,
+    tick: Option<Box<dyn FnMut(&TaskCtx) -> LocalBoxFuture<'static, Result<(), 
IggyError>>>>,
+    _phantom: PhantomData<State>,
+}
+
+impl<'a> PeriodicTaskBuilder<'a, NoTask> {
+    pub(crate) fn new(supervisor: &'a TaskSupervisor, name: &'static str) -> 
Self {
+        Self {
+            supervisor,
+            name,
+            period: None,
+            scope: TaskScope::AllShards,
+            critical: false,
+            shard: None,
+            tick: None,
+            _phantom: PhantomData,
+        }
+    }
+
+    /// Set the period for this periodic task
+    pub fn every(mut self, period: Duration) -> Self {
+        self.period = Some(period);
+        self
+    }
+
+    /// Set the shard scope for this task
+    pub fn on_shard(mut self, scope: TaskScope) -> Self {
+        self.scope = scope;
+        self
+    }
+
+    /// Mark this task as critical
+    pub fn critical(mut self, critical: bool) -> Self {
+        self.critical = critical;
+        self
+    }
+
+    /// Set the shard for this task
+    pub fn with_shard(mut self, shard: Rc<IggyShard>) -> Self {
+        self.shard = Some(shard);
+        self
+    }
+
+    /// Set the tick function for this periodic task
+    pub fn tick<F, Fut>(self, tick: F) -> PeriodicTaskBuilder<'a, HasTask>
+    where
+        F: FnMut(&TaskCtx) -> Fut + 'static,
+        Fut: Future<Output = Result<(), IggyError>> + 'static,
+    {
+        let mut tick = tick;
+        let tick_boxed: Box<dyn FnMut(&TaskCtx) -> LocalBoxFuture<'static, 
Result<(), IggyError>>> =
+            Box::new(move |ctx| Box::pin(tick(ctx)));
+
+        PeriodicTaskBuilder {
+            supervisor: self.supervisor,
+            name: self.name,
+            period: self.period,
+            scope: self.scope,
+            critical: self.critical,
+            shard: self.shard,
+            tick: Some(tick_boxed),
+            _phantom: PhantomData,
+        }
+    }
+}
+
+impl<'a> PeriodicTaskBuilder<'a, HasTask> {
+    /// Spawn the periodic task
+    pub fn spawn(self) {
+        let period = self.period.expect("Period must be set for periodic 
tasks");
+        let shard = self.shard.expect("Shard must be set for periodic tasks");
+        let mut tick = self.tick.expect("Tick function must be set");
+
+        // Check if task should run on this shard
+        if !self.scope.should_run(&shard) {
+            return;
+        }
+
+        let ctx = TaskCtx {
+            shard: shard.clone(),
+            shutdown: self.supervisor.shutdown_token(),
+        };
+
+        let name = self.name;
+        let shard_id = self.supervisor.shard_id;
+        let shutdown = self.supervisor.shutdown_token();
+        let is_critical = self.critical;
+
+        debug!(
+            "Spawning periodic task '{}' on shard {} with period {:?}",
+            name, shard_id, period
+        );
+
+        let handle = compio::runtime::spawn(async move {
+            loop {
+                // Use shutdown-aware sleep
+                if !shutdown.sleep_or_shutdown(period).await {
+                    tracing::trace!("Periodic task '{}' shutting down", name);
+                    break;
+                }
+
+                // Execute tick
+                match tick(&ctx).await {
+                    Ok(()) => tracing::trace!("Periodic task '{}' tick 
completed", name),
+                    Err(e) => tracing::error!("Periodic task '{}' tick failed: 
{}", name, e),
+                }
+            }
+
+            debug!("Periodic task '{}' completed on shard {}", name, shard_id);
+            Ok(())
+        });
+
+        self.supervisor
+            .register_task(name, TaskKind::Periodic { period }, handle, 
is_critical);
+    }
+}
+
+/// Builder for continuous tasks with fluent API
+pub struct ContinuousTaskBuilder<'a, State = NoTask> {
+    supervisor: &'a TaskSupervisor,
+    name: &'static str,
+    scope: TaskScope,
+    critical: bool,
+    shard: Option<Rc<IggyShard>>,
+    run: Option<Box<dyn FnOnce(&TaskCtx) -> LocalBoxFuture<'static, Result<(), 
IggyError>>>>,
+    _phantom: PhantomData<State>,
+}
+
+impl<'a> ContinuousTaskBuilder<'a, NoTask> {
+    pub(crate) fn new(supervisor: &'a TaskSupervisor, name: &'static str) -> 
Self {
+        Self {
+            supervisor,
+            name,
+            scope: TaskScope::AllShards,
+            critical: false,
+            shard: None,
+            run: None,
+            _phantom: PhantomData,
+        }
+    }
+
+    /// Set the shard scope for this task
+    pub fn on_shard(mut self, scope: TaskScope) -> Self {
+        self.scope = scope;
+        self
+    }
+
+    /// Mark this task as critical
+    pub fn critical(mut self, critical: bool) -> Self {
+        self.critical = critical;
+        self
+    }
+
+    /// Set the shard for this task
+    pub fn with_shard(mut self, shard: Rc<IggyShard>) -> Self {
+        self.shard = Some(shard);
+        self
+    }
+
+    /// Set the run function for this continuous task
+    pub fn run<F, Fut>(self, run: F) -> ContinuousTaskBuilder<'a, HasTask>
+    where
+        F: FnOnce(&TaskCtx) -> Fut + 'static,
+        Fut: Future<Output = Result<(), IggyError>> + 'static,
+    {
+        let run_boxed: Box<dyn FnOnce(&TaskCtx) -> LocalBoxFuture<'static, 
Result<(), IggyError>>> =
+            Box::new(move |ctx| Box::pin(run(ctx)));
+
+        ContinuousTaskBuilder {
+            supervisor: self.supervisor,
+            name: self.name,
+            scope: self.scope,
+            critical: self.critical,
+            shard: self.shard,
+            run: Some(run_boxed),
+            _phantom: PhantomData,
+        }
+    }
+}
+
+impl<'a> ContinuousTaskBuilder<'a, HasTask> {
+    /// Spawn the continuous task
+    pub fn spawn(self) {
+        let shard = self.shard.expect("Shard must be set for continuous 
tasks");
+        let run = self.run.expect("Run function must be set");
+
+        if !self.scope.should_run(&shard) {
+            return;
+        }
+
+        let ctx = TaskCtx {
+            shard: shard.clone(),
+            shutdown: self.supervisor.shutdown_token(),
+        };
+
+        let name = self.name;
+        let shard_id = self.supervisor.shard_id;
+        let is_critical = self.critical;
+
+        debug!("Spawning continuous task '{}' on shard {}", name, shard_id);
+
+        let handle = compio::runtime::spawn(async move {
+            let result = run(&ctx).await;
+            match &result {
+                Ok(()) => debug!("Continuous task '{}' completed on shard {}", 
name, shard_id),
+                Err(e) => tracing::error!(
+                    "Continuous task '{}' failed on shard {}: {}",
+                    name,
+                    shard_id,
+                    e
+                ),
+            }
+            result
+        });
+
+        self.supervisor
+            .register_task(name, TaskKind::Continuous, handle, is_critical);
+    }
+}
+
+/// Builder for oneshot tasks with fluent API
+pub struct OneshotTaskBuilder<'a, State = NoTask> {
+    supervisor: &'a TaskSupervisor,
+    name: &'static str,
+    scope: TaskScope,
+    critical: bool,
+    timeout: Option<Duration>,
+    shard: Option<Rc<IggyShard>>,
+    run: Option<Box<dyn FnOnce(&TaskCtx) -> LocalBoxFuture<'static, Result<(), 
IggyError>>>>,
+    _phantom: PhantomData<State>,
+}
+
+impl<'a> OneshotTaskBuilder<'a, NoTask> {
+    pub(crate) fn new(supervisor: &'a TaskSupervisor, name: &'static str) -> 
Self {
+        Self {
+            supervisor,
+            name,
+            scope: TaskScope::AllShards,
+            critical: false,
+            timeout: None,
+            shard: None,
+            run: None,
+            _phantom: PhantomData,
+        }
+    }
+
+    /// Set the shard scope for this task
+    pub fn on_shard(mut self, scope: TaskScope) -> Self {
+        self.scope = scope;
+        self
+    }
+
+    /// Mark this task as critical
+    pub fn critical(mut self, critical: bool) -> Self {
+        self.critical = critical;
+        self
+    }
+
+    /// Set a timeout for this oneshot task
+    pub fn timeout(mut self, timeout: Duration) -> Self {
+        self.timeout = Some(timeout);
+        self
+    }
+
+    /// Set the shard for this task
+    pub fn with_shard(mut self, shard: Rc<IggyShard>) -> Self {
+        self.shard = Some(shard);
+        self
+    }
+
+    /// Set the run function for this oneshot task
+    pub fn run<F, Fut>(self, run: F) -> OneshotTaskBuilder<'a, HasTask>
+    where
+        F: FnOnce(&TaskCtx) -> Fut + 'static,
+        Fut: Future<Output = Result<(), IggyError>> + 'static,
+    {
+        let run_boxed: Box<dyn FnOnce(&TaskCtx) -> LocalBoxFuture<'static, 
Result<(), IggyError>>> =
+            Box::new(move |ctx| Box::pin(run(ctx)));
+
+        OneshotTaskBuilder {
+            supervisor: self.supervisor,
+            name: self.name,
+            scope: self.scope,
+            critical: self.critical,
+            timeout: self.timeout,
+            shard: self.shard,
+            run: Some(run_boxed),
+            _phantom: PhantomData,
+        }
+    }
+}
+
+impl<'a> OneshotTaskBuilder<'a, HasTask> {
+    /// Spawn the oneshot task
+    pub fn spawn(self) {
+        let shard = self.shard.expect("Shard must be set for oneshot tasks");
+        let run = self.run.expect("Run function must be set");
+
+        // Check if task should run on this shard
+        if !self.scope.should_run(&shard) {
+            return;
+        }
+
+        let ctx = TaskCtx {
+            shard: shard.clone(),
+            shutdown: self.supervisor.shutdown_token(),
+        };
+
+        // If timeout is specified, wrap the future with timeout
+        if let Some(timeout) = self.timeout {
+            let name = self.name;
+            self.supervisor
+                .spawn_oneshot(name, self.critical, async move {
+                    match compio::time::timeout(timeout, run(&ctx)).await {
+                        Ok(result) => result,
+                        Err(_) => Err(IggyError::InvalidCommand),
+                    }
+                });
+        } else {
+            self.supervisor
+                .spawn_oneshot(self.name, self.critical, async move { 
run(&ctx).await });
+        }
+    }
+}
diff --git a/core/server/src/shard/tasks/continuous/message_pump.rs 
b/core/server/src/shard/tasks/continuous/message_pump.rs
index cf45a3ff..b50979f1 100644
--- a/core/server/src/shard/tasks/continuous/message_pump.rs
+++ b/core/server/src/shard/tasks/continuous/message_pump.rs
@@ -21,7 +21,7 @@ use crate::shard::tasks::specs::{
     ContinuousSpec, TaskCtx, TaskFuture, TaskKind, TaskScope, TaskSpec,
 };
 use crate::shard::transmission::frame::ShardFrame;
-use crate::shard_info;
+use crate::{shard_debug, shard_info};
 use futures::{FutureExt, StreamExt};
 use std::fmt::Debug;
 use std::rc::Rc;
@@ -71,7 +71,7 @@ impl TaskSpec for MessagePump {
             loop {
                 futures::select! {
                     _ = ctx.shutdown.wait().fuse() => {
-                        shard_info!(self.shard.id, "Message receiver shutting 
down");
+                        shard_debug!(self.shard.id, "Message receiver shutting 
down");
                         break;
                     }
                     frame = messages_receiver.next().fuse() => {
diff --git a/core/server/src/shard/tasks/mod.rs 
b/core/server/src/shard/tasks/mod.rs
index 448f2f9e..b156d68b 100644
--- a/core/server/src/shard/tasks/mod.rs
+++ b/core/server/src/shard/tasks/mod.rs
@@ -21,6 +21,7 @@
 //! This module provides a unified framework for managing all asynchronous 
tasks
 //! within a shard, including servers, periodic maintenance, and one-shot 
operations.
 
+pub mod builder;
 pub mod continuous;
 pub mod periodic;
 pub mod shutdown;
diff --git a/core/server/src/shard/tasks/supervisor.rs 
b/core/server/src/shard/tasks/supervisor.rs
index 9bd62f07..e1efa04a 100644
--- a/core/server/src/shard/tasks/supervisor.rs
+++ b/core/server/src/shard/tasks/supervisor.rs
@@ -16,6 +16,7 @@
  * under the License.
  */
 
+use super::builder::{ContinuousTaskBuilder, OneshotTaskBuilder, 
PeriodicTaskBuilder};
 use super::shutdown::{Shutdown, ShutdownToken};
 use super::specs::{TaskCtx, TaskKind, TaskScope, TaskSpec};
 use crate::shard::IggyShard;
@@ -27,7 +28,7 @@ use std::cell::RefCell;
 use std::collections::HashMap;
 use std::rc::Rc;
 use std::time::Duration;
-use tracing::{error, info, trace, warn};
+use tracing::{debug, error, info, trace, warn};
 
 /// Handle to a spawned task
 struct TaskHandle {
@@ -39,7 +40,7 @@ struct TaskHandle {
 
 /// Supervises the lifecycle of all tasks in a shard
 pub struct TaskSupervisor {
-    shard_id: u16,
+    pub(crate) shard_id: u16,
     shutdown: Shutdown,
     shutdown_token: ShutdownToken,
     tasks: RefCell<Vec<TaskHandle>>,
@@ -67,6 +68,21 @@ impl TaskSupervisor {
         self.shutdown_token.clone()
     }
 
+    /// Create a builder for a periodic task
+    pub fn periodic(&self, name: &'static str) -> PeriodicTaskBuilder {
+        PeriodicTaskBuilder::new(self, name)
+    }
+
+    /// Create a builder for a continuous task
+    pub fn continuous(&self, name: &'static str) -> ContinuousTaskBuilder {
+        ContinuousTaskBuilder::new(self, name)
+    }
+
+    /// Create a builder for a oneshot task
+    pub fn oneshot(&self, name: &'static str) -> OneshotTaskBuilder {
+        OneshotTaskBuilder::new(self, name)
+    }
+
     /// Spawn all tasks according to their specifications
     pub fn spawn(&self, shard: Rc<IggyShard>, specs: Vec<Box<dyn TaskSpec>>) {
         for spec in specs {
@@ -155,7 +171,7 @@ impl TaskSupervisor {
             let result = spec.run(ctx).await;
 
             match &result {
-                Ok(()) => info!("Continuous task '{}' completed on shard {}", 
name, shard_id),
+                Ok(()) => debug!("Continuous task '{}' completed on shard {}", 
name, shard_id),
                 Err(e) => error!(
                     "Continuous task '{}' failed on shard {}: {}",
                     name, shard_id, e
@@ -186,7 +202,7 @@ impl TaskSupervisor {
             let result = spec.run(ctx).await;
 
             match &result {
-                Ok(()) => info!("Periodic task '{}' completed on shard {}", 
name, shard_id),
+                Ok(()) => debug!("Periodic task '{}' completed on shard {}", 
name, shard_id),
                 Err(e) => error!(
                     "Periodic task '{}' failed on shard {}: {}",
                     name, shard_id, e
@@ -211,7 +227,7 @@ impl TaskSupervisor {
             let result = spec.run(ctx).await;
 
             match &result {
-                Ok(()) => info!("OneShot task '{}' completed on shard {}", 
name, shard_id),
+                Ok(()) => debug!("OneShot task '{}' completed on shard {}", 
name, shard_id),
                 Err(e) => error!(
                     "OneShot task '{}' failed on shard {}: {}",
                     name, shard_id, e
@@ -224,7 +240,7 @@ impl TaskSupervisor {
 
     /// Trigger graceful shutdown for all tasks
     pub async fn graceful_shutdown(&self, timeout: Duration) -> bool {
-        info!(
+        debug!(
             "Initiating graceful shutdown for {} tasks on shard {}",
             self.tasks.borrow().len() + self.oneshot_handles.borrow().len(),
             self.shard_id
@@ -248,9 +264,7 @@ impl TaskSupervisor {
 
         let all_complete = continuous_periodic_complete && oneshot_complete;
 
-        if all_complete {
-            info!("All tasks shutdown gracefully on shard {}", self.shard_id);
-        } else {
+        if !all_complete {
             warn!(
                 "Some tasks did not shutdown cleanly on shard {}",
                 self.shard_id
@@ -352,7 +366,7 @@ impl TaskSupervisor {
         let all_complete = results.iter().all(|&r| r);
 
         if all_complete {
-            info!("All oneshot tasks completed on shard {}", self.shard_id);
+            debug!("All oneshot tasks completed on shard {}", self.shard_id);
         } else {
             error!("Some oneshot tasks failed on shard {}", self.shard_id);
         }
@@ -486,7 +500,7 @@ impl TaskSupervisor {
                 }
             }
 
-            info!(
+            trace!(
                 "Periodic tick task '{}' completed on shard {}",
                 name, shard_id
             );
@@ -503,12 +517,13 @@ impl TaskSupervisor {
 
     /// Shutdown all connections gracefully
     fn shutdown_connections(&self) {
-        info!(
-            "Shutting down {} active connections",
-            self.active_connections.borrow().len()
-        );
-
         let connections = self.active_connections.borrow();
+        if connections.is_empty() {
+            return;
+        }
+
+        info!("Shutting down {} active connections", connections.len());
+
         for (client_id, stop_sender) in connections.iter() {
             trace!("Sending shutdown signal to connection {}", client_id);
             if let Err(e) = stop_sender.send_blocking(()) {
@@ -519,4 +534,25 @@ impl TaskSupervisor {
             }
         }
     }
+
+    /// Register a task handle (used by builders)
+    pub(crate) fn register_task(
+        &self,
+        name: &str,
+        kind: TaskKind,
+        handle: JoinHandle<Result<(), IggyError>>,
+        is_critical: bool,
+    ) {
+        let task_handle = TaskHandle {
+            name: name.to_string(),
+            kind: kind.clone(),
+            handle,
+            is_critical,
+        };
+
+        match kind {
+            TaskKind::OneShot => 
self.oneshot_handles.borrow_mut().push(task_handle),
+            _ => self.tasks.borrow_mut().push(task_handle),
+        }
+    }
 }
diff --git a/core/server/src/tcp/tcp_listener.rs 
b/core/server/src/tcp/tcp_listener.rs
index f39dcc94..8cda9310 100644
--- a/core/server/src/tcp/tcp_listener.rs
+++ b/core/server/src/tcp/tcp_listener.rs
@@ -22,7 +22,7 @@ use crate::shard::IggyShard;
 use crate::shard::tasks::task_supervisor;
 use crate::shard::transmission::event::ShardEvent;
 use crate::tcp::connection_handler::{handle_connection, handle_error};
-use crate::{shard_error, shard_info};
+use crate::{shard_debug, shard_error, shard_info};
 use compio::net::{TcpListener, TcpOpts};
 use error_set::ErrContext;
 use futures::FutureExt;
@@ -155,7 +155,7 @@ async fn accept_loop(
         let accept_future = listener.accept();
         futures::select! {
             _ = shutdown_check.fuse() => {
-                shard_info!(shard.id, "{} detected shutdown flag, no longer 
accepting connections", server_name);
+                shard_debug!(shard.id, "{} detected shutdown flag, no longer 
accepting connections", server_name);
                 break;
             }
             result = accept_future.fuse() => {

Reply via email to