This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch io_uring_tpc_task_registry in repository https://gitbox.apache.org/repos/asf/iggy.git
commit aa304b03986f73f19ece0d06c4f317360a8b18f9 Author: Hubert Gruszecki <[email protected]> AuthorDate: Sun Sep 28 10:53:25 2025 +0200 builder --- core/server/Cargo.toml | 1 - core/server/src/main.rs | 4 - core/server/src/quic/listener.rs | 81 +++-- core/server/src/shard/mod.rs | 9 +- core/server/src/shard/tasks/builder.rs | 366 +++++++++++++++++++++ .../src/shard/tasks/continuous/message_pump.rs | 4 +- core/server/src/shard/tasks/mod.rs | 1 + core/server/src/shard/tasks/supervisor.rs | 68 +++- core/server/src/tcp/tcp_listener.rs | 4 +- 9 files changed, 480 insertions(+), 58 deletions(-) diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index 64f71a31..49b6b66c 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -34,7 +34,6 @@ path = "src/main.rs" [features] default = ["mimalloc"] -tokio-console = ["dep:console-subscriber", "tokio/tracing"] disable-mimalloc = [] mimalloc = ["dep:mimalloc"] diff --git a/core/server/src/main.rs b/core/server/src/main.rs index cee7b9d0..8006b5d2 100644 --- a/core/server/src/main.rs +++ b/core/server/src/main.rs @@ -42,10 +42,7 @@ use server::configs::config_provider::{self}; use server::configs::server::ServerConfig; use server::configs::sharding::CpuAllocation; use server::io::fs_utils; -#[cfg(not(feature = "tokio-console"))] use server::log::logger::Logging; -#[cfg(feature = "tokio-console")] -use server::log::tokio_console::Logging; use server::server_error::{ConfigError, ServerError}; use server::shard::namespace::IggyNamespace; use server::shard::system::info::SystemInfo; @@ -358,7 +355,6 @@ async fn main() -> Result<(), ServerError> { info!("Received shutdown signal (SIGTERM/SIGINT), initiating graceful shutdown..."); for (shard_id, stop_sender) in &shutdown_handles_for_signal { - info!("Sending shutdown signal to shard {}", shard_id); if let Err(e) = stop_sender.try_send(()) { error!( "Failed to send shutdown signal to shard {}: {}", diff --git a/core/server/src/quic/listener.rs b/core/server/src/quic/listener.rs index 60833e2e..623c00c6 100644 --- a/core/server/src/quic/listener.rs +++ b/core/server/src/quic/listener.rs @@ -16,8 +16,6 @@ * under the License. */ -use std::rc::Rc; - use crate::binary::command::{ServerCommand, ServerCommandHandler}; use crate::binary::sender::SenderKind; use crate::server_error::ConnectionError; @@ -31,43 +29,74 @@ use compio_quic::{Connection, Endpoint, RecvStream, SendStream}; use futures::FutureExt; use iggy_common::IggyError; use iggy_common::TransportProtocol; +use std::rc::Rc; use tracing::{error, info, trace}; const INITIAL_BYTES_LENGTH: usize = 4; pub async fn start(endpoint: Endpoint, shard: Rc<IggyShard>) -> Result<(), IggyError> { - info!("Starting QUIC listener for shard {}", shard.id); - // Since the QUIC Endpoint is internally Arc-wrapped and can be shared, // we only need one worker per shard rather than multiple workers per endpoint. // This avoids the N×workers multiplication when multiple shards are used. - while let Some(incoming_conn) = endpoint.wait_incoming().await { - let remote_addr = incoming_conn.remote_address(); - trace!("Incoming connection from client: {}", remote_addr); + loop { let shard_clone = shard.clone(); - let shard_for_conn = shard_clone.clone(); - - // Use TaskSupervisor to track connection handlers for graceful shutdown - task_supervisor().spawn_tracked(async move { - trace!("Accepting connection from {}", remote_addr); - match incoming_conn.await { - Ok(connection) => { - trace!("Connection established from {}", remote_addr); - if let Err(error) = handle_connection(connection, shard_for_conn).await { - error!("QUIC connection from {} has failed: {error}", remote_addr); - } + let shutdown_check = async { + loop { + if shard_clone.is_shutting_down() { + return; } - Err(error) => { - error!( - "Error when accepting incoming connection from {}: {:?}", - remote_addr, error - ); + compio::time::sleep(std::time::Duration::from_millis(100)).await; + } + }; + + let accept_future = endpoint.wait_incoming(); + + futures::select! { + _ = shutdown_check.fuse() => { + shard_debug!(shard.id, "QUIC listener detected shutdown flag, no longer accepting connections"); + break; + } + incoming_conn = accept_future.fuse() => { + match incoming_conn { + Some(incoming_conn) => { + let remote_addr = incoming_conn.remote_address(); + + if shard.is_shutting_down() { + shard_info!(shard.id, "Rejecting new QUIC connection from {} during shutdown", remote_addr); + continue; + } + + trace!("Incoming connection from client: {}", remote_addr); + let shard_for_conn = shard.clone(); + + // Use TaskSupervisor to track connection handlers for graceful shutdown + task_supervisor().spawn_tracked(async move { + trace!("Accepting connection from {}", remote_addr); + match incoming_conn.await { + Ok(connection) => { + trace!("Connection established from {}", remote_addr); + if let Err(error) = handle_connection(connection, shard_for_conn).await { + error!("QUIC connection from {} has failed: {error}", remote_addr); + } + } + Err(error) => { + error!( + "Error when accepting incoming connection from {}: {:?}", + remote_addr, error + ); + } + } + }); + } + None => { + // Endpoint closed + info!("QUIC endpoint closed for shard {}", shard.id); + break; + } } } - }); + } } - - info!("QUIC listener for shard {} stopped", shard.id); Ok(()) } diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index 7cb6c6f7..9f8b4cfc 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -49,7 +49,7 @@ use std::{ }, time::{Duration, Instant}, }; -use tracing::{error, info, instrument, trace, warn}; +use tracing::{debug, error, info, instrument, trace, warn}; use transmission::connector::{Receiver, ShardConnector, StopReceiver, StopSender}; use crate::{ @@ -253,15 +253,10 @@ impl IggyShard { // Spawn shutdown handler - only this task consumes the stop signal compio::runtime::spawn(async move { let _ = stop_receiver.recv().await; - info!("Shard {} received shutdown signal", shard_for_shutdown.id); - let shutdown_success = shard_for_shutdown.trigger_shutdown().await; if !shutdown_success { shard_error!(shard_for_shutdown.id, "shutdown timed out"); - } else { - shard_info!(shard_for_shutdown.id, "shutdown completed successfully"); } - let _ = shutdown_complete_tx.send(()).await; }) .detach(); @@ -384,7 +379,7 @@ impl IggyShard { #[instrument(skip_all, name = "trace_shutdown")] pub async fn trigger_shutdown(&self) -> bool { self.is_shutting_down.store(true, Ordering::SeqCst); - info!("Shard {} shutdown state set", self.id); + debug!("Shard {} shutdown state set", self.id); task_supervisor().graceful_shutdown(SHUTDOWN_TIMEOUT).await } diff --git a/core/server/src/shard/tasks/builder.rs b/core/server/src/shard/tasks/builder.rs new file mode 100644 index 00000000..e1a06326 --- /dev/null +++ b/core/server/src/shard/tasks/builder.rs @@ -0,0 +1,366 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use super::specs::{TaskCtx, TaskKind, TaskScope}; +use super::supervisor::TaskSupervisor; +use crate::shard::IggyShard; +use futures::Future; +use futures::future::LocalBoxFuture; +use iggy_common::IggyError; +use std::marker::PhantomData; +use std::rc::Rc; +use std::time::Duration; +use tracing::{debug, info}; + +/// Type state for builder that hasn't had its task function set yet +pub struct NoTask; + +/// Type state for builder that has its task function set +pub struct HasTask; + +/// Builder for periodic tasks with fluent API +pub struct PeriodicTaskBuilder<'a, State = NoTask> { + supervisor: &'a TaskSupervisor, + name: &'static str, + period: Option<Duration>, + scope: TaskScope, + critical: bool, + shard: Option<Rc<IggyShard>>, + tick: Option<Box<dyn FnMut(&TaskCtx) -> LocalBoxFuture<'static, Result<(), IggyError>>>>, + _phantom: PhantomData<State>, +} + +impl<'a> PeriodicTaskBuilder<'a, NoTask> { + pub(crate) fn new(supervisor: &'a TaskSupervisor, name: &'static str) -> Self { + Self { + supervisor, + name, + period: None, + scope: TaskScope::AllShards, + critical: false, + shard: None, + tick: None, + _phantom: PhantomData, + } + } + + /// Set the period for this periodic task + pub fn every(mut self, period: Duration) -> Self { + self.period = Some(period); + self + } + + /// Set the shard scope for this task + pub fn on_shard(mut self, scope: TaskScope) -> Self { + self.scope = scope; + self + } + + /// Mark this task as critical + pub fn critical(mut self, critical: bool) -> Self { + self.critical = critical; + self + } + + /// Set the shard for this task + pub fn with_shard(mut self, shard: Rc<IggyShard>) -> Self { + self.shard = Some(shard); + self + } + + /// Set the tick function for this periodic task + pub fn tick<F, Fut>(self, tick: F) -> PeriodicTaskBuilder<'a, HasTask> + where + F: FnMut(&TaskCtx) -> Fut + 'static, + Fut: Future<Output = Result<(), IggyError>> + 'static, + { + let mut tick = tick; + let tick_boxed: Box<dyn FnMut(&TaskCtx) -> LocalBoxFuture<'static, Result<(), IggyError>>> = + Box::new(move |ctx| Box::pin(tick(ctx))); + + PeriodicTaskBuilder { + supervisor: self.supervisor, + name: self.name, + period: self.period, + scope: self.scope, + critical: self.critical, + shard: self.shard, + tick: Some(tick_boxed), + _phantom: PhantomData, + } + } +} + +impl<'a> PeriodicTaskBuilder<'a, HasTask> { + /// Spawn the periodic task + pub fn spawn(self) { + let period = self.period.expect("Period must be set for periodic tasks"); + let shard = self.shard.expect("Shard must be set for periodic tasks"); + let mut tick = self.tick.expect("Tick function must be set"); + + // Check if task should run on this shard + if !self.scope.should_run(&shard) { + return; + } + + let ctx = TaskCtx { + shard: shard.clone(), + shutdown: self.supervisor.shutdown_token(), + }; + + let name = self.name; + let shard_id = self.supervisor.shard_id; + let shutdown = self.supervisor.shutdown_token(); + let is_critical = self.critical; + + debug!( + "Spawning periodic task '{}' on shard {} with period {:?}", + name, shard_id, period + ); + + let handle = compio::runtime::spawn(async move { + loop { + // Use shutdown-aware sleep + if !shutdown.sleep_or_shutdown(period).await { + tracing::trace!("Periodic task '{}' shutting down", name); + break; + } + + // Execute tick + match tick(&ctx).await { + Ok(()) => tracing::trace!("Periodic task '{}' tick completed", name), + Err(e) => tracing::error!("Periodic task '{}' tick failed: {}", name, e), + } + } + + debug!("Periodic task '{}' completed on shard {}", name, shard_id); + Ok(()) + }); + + self.supervisor + .register_task(name, TaskKind::Periodic { period }, handle, is_critical); + } +} + +/// Builder for continuous tasks with fluent API +pub struct ContinuousTaskBuilder<'a, State = NoTask> { + supervisor: &'a TaskSupervisor, + name: &'static str, + scope: TaskScope, + critical: bool, + shard: Option<Rc<IggyShard>>, + run: Option<Box<dyn FnOnce(&TaskCtx) -> LocalBoxFuture<'static, Result<(), IggyError>>>>, + _phantom: PhantomData<State>, +} + +impl<'a> ContinuousTaskBuilder<'a, NoTask> { + pub(crate) fn new(supervisor: &'a TaskSupervisor, name: &'static str) -> Self { + Self { + supervisor, + name, + scope: TaskScope::AllShards, + critical: false, + shard: None, + run: None, + _phantom: PhantomData, + } + } + + /// Set the shard scope for this task + pub fn on_shard(mut self, scope: TaskScope) -> Self { + self.scope = scope; + self + } + + /// Mark this task as critical + pub fn critical(mut self, critical: bool) -> Self { + self.critical = critical; + self + } + + /// Set the shard for this task + pub fn with_shard(mut self, shard: Rc<IggyShard>) -> Self { + self.shard = Some(shard); + self + } + + /// Set the run function for this continuous task + pub fn run<F, Fut>(self, run: F) -> ContinuousTaskBuilder<'a, HasTask> + where + F: FnOnce(&TaskCtx) -> Fut + 'static, + Fut: Future<Output = Result<(), IggyError>> + 'static, + { + let run_boxed: Box<dyn FnOnce(&TaskCtx) -> LocalBoxFuture<'static, Result<(), IggyError>>> = + Box::new(move |ctx| Box::pin(run(ctx))); + + ContinuousTaskBuilder { + supervisor: self.supervisor, + name: self.name, + scope: self.scope, + critical: self.critical, + shard: self.shard, + run: Some(run_boxed), + _phantom: PhantomData, + } + } +} + +impl<'a> ContinuousTaskBuilder<'a, HasTask> { + /// Spawn the continuous task + pub fn spawn(self) { + let shard = self.shard.expect("Shard must be set for continuous tasks"); + let run = self.run.expect("Run function must be set"); + + if !self.scope.should_run(&shard) { + return; + } + + let ctx = TaskCtx { + shard: shard.clone(), + shutdown: self.supervisor.shutdown_token(), + }; + + let name = self.name; + let shard_id = self.supervisor.shard_id; + let is_critical = self.critical; + + debug!("Spawning continuous task '{}' on shard {}", name, shard_id); + + let handle = compio::runtime::spawn(async move { + let result = run(&ctx).await; + match &result { + Ok(()) => debug!("Continuous task '{}' completed on shard {}", name, shard_id), + Err(e) => tracing::error!( + "Continuous task '{}' failed on shard {}: {}", + name, + shard_id, + e + ), + } + result + }); + + self.supervisor + .register_task(name, TaskKind::Continuous, handle, is_critical); + } +} + +/// Builder for oneshot tasks with fluent API +pub struct OneshotTaskBuilder<'a, State = NoTask> { + supervisor: &'a TaskSupervisor, + name: &'static str, + scope: TaskScope, + critical: bool, + timeout: Option<Duration>, + shard: Option<Rc<IggyShard>>, + run: Option<Box<dyn FnOnce(&TaskCtx) -> LocalBoxFuture<'static, Result<(), IggyError>>>>, + _phantom: PhantomData<State>, +} + +impl<'a> OneshotTaskBuilder<'a, NoTask> { + pub(crate) fn new(supervisor: &'a TaskSupervisor, name: &'static str) -> Self { + Self { + supervisor, + name, + scope: TaskScope::AllShards, + critical: false, + timeout: None, + shard: None, + run: None, + _phantom: PhantomData, + } + } + + /// Set the shard scope for this task + pub fn on_shard(mut self, scope: TaskScope) -> Self { + self.scope = scope; + self + } + + /// Mark this task as critical + pub fn critical(mut self, critical: bool) -> Self { + self.critical = critical; + self + } + + /// Set a timeout for this oneshot task + pub fn timeout(mut self, timeout: Duration) -> Self { + self.timeout = Some(timeout); + self + } + + /// Set the shard for this task + pub fn with_shard(mut self, shard: Rc<IggyShard>) -> Self { + self.shard = Some(shard); + self + } + + /// Set the run function for this oneshot task + pub fn run<F, Fut>(self, run: F) -> OneshotTaskBuilder<'a, HasTask> + where + F: FnOnce(&TaskCtx) -> Fut + 'static, + Fut: Future<Output = Result<(), IggyError>> + 'static, + { + let run_boxed: Box<dyn FnOnce(&TaskCtx) -> LocalBoxFuture<'static, Result<(), IggyError>>> = + Box::new(move |ctx| Box::pin(run(ctx))); + + OneshotTaskBuilder { + supervisor: self.supervisor, + name: self.name, + scope: self.scope, + critical: self.critical, + timeout: self.timeout, + shard: self.shard, + run: Some(run_boxed), + _phantom: PhantomData, + } + } +} + +impl<'a> OneshotTaskBuilder<'a, HasTask> { + /// Spawn the oneshot task + pub fn spawn(self) { + let shard = self.shard.expect("Shard must be set for oneshot tasks"); + let run = self.run.expect("Run function must be set"); + + // Check if task should run on this shard + if !self.scope.should_run(&shard) { + return; + } + + let ctx = TaskCtx { + shard: shard.clone(), + shutdown: self.supervisor.shutdown_token(), + }; + + // If timeout is specified, wrap the future with timeout + if let Some(timeout) = self.timeout { + let name = self.name; + self.supervisor + .spawn_oneshot(name, self.critical, async move { + match compio::time::timeout(timeout, run(&ctx)).await { + Ok(result) => result, + Err(_) => Err(IggyError::InvalidCommand), + } + }); + } else { + self.supervisor + .spawn_oneshot(self.name, self.critical, async move { run(&ctx).await }); + } + } +} diff --git a/core/server/src/shard/tasks/continuous/message_pump.rs b/core/server/src/shard/tasks/continuous/message_pump.rs index cf45a3ff..b50979f1 100644 --- a/core/server/src/shard/tasks/continuous/message_pump.rs +++ b/core/server/src/shard/tasks/continuous/message_pump.rs @@ -21,7 +21,7 @@ use crate::shard::tasks::specs::{ ContinuousSpec, TaskCtx, TaskFuture, TaskKind, TaskScope, TaskSpec, }; use crate::shard::transmission::frame::ShardFrame; -use crate::shard_info; +use crate::{shard_debug, shard_info}; use futures::{FutureExt, StreamExt}; use std::fmt::Debug; use std::rc::Rc; @@ -71,7 +71,7 @@ impl TaskSpec for MessagePump { loop { futures::select! { _ = ctx.shutdown.wait().fuse() => { - shard_info!(self.shard.id, "Message receiver shutting down"); + shard_debug!(self.shard.id, "Message receiver shutting down"); break; } frame = messages_receiver.next().fuse() => { diff --git a/core/server/src/shard/tasks/mod.rs b/core/server/src/shard/tasks/mod.rs index 448f2f9e..b156d68b 100644 --- a/core/server/src/shard/tasks/mod.rs +++ b/core/server/src/shard/tasks/mod.rs @@ -21,6 +21,7 @@ //! This module provides a unified framework for managing all asynchronous tasks //! within a shard, including servers, periodic maintenance, and one-shot operations. +pub mod builder; pub mod continuous; pub mod periodic; pub mod shutdown; diff --git a/core/server/src/shard/tasks/supervisor.rs b/core/server/src/shard/tasks/supervisor.rs index 9bd62f07..e1efa04a 100644 --- a/core/server/src/shard/tasks/supervisor.rs +++ b/core/server/src/shard/tasks/supervisor.rs @@ -16,6 +16,7 @@ * under the License. */ +use super::builder::{ContinuousTaskBuilder, OneshotTaskBuilder, PeriodicTaskBuilder}; use super::shutdown::{Shutdown, ShutdownToken}; use super::specs::{TaskCtx, TaskKind, TaskScope, TaskSpec}; use crate::shard::IggyShard; @@ -27,7 +28,7 @@ use std::cell::RefCell; use std::collections::HashMap; use std::rc::Rc; use std::time::Duration; -use tracing::{error, info, trace, warn}; +use tracing::{debug, error, info, trace, warn}; /// Handle to a spawned task struct TaskHandle { @@ -39,7 +40,7 @@ struct TaskHandle { /// Supervises the lifecycle of all tasks in a shard pub struct TaskSupervisor { - shard_id: u16, + pub(crate) shard_id: u16, shutdown: Shutdown, shutdown_token: ShutdownToken, tasks: RefCell<Vec<TaskHandle>>, @@ -67,6 +68,21 @@ impl TaskSupervisor { self.shutdown_token.clone() } + /// Create a builder for a periodic task + pub fn periodic(&self, name: &'static str) -> PeriodicTaskBuilder { + PeriodicTaskBuilder::new(self, name) + } + + /// Create a builder for a continuous task + pub fn continuous(&self, name: &'static str) -> ContinuousTaskBuilder { + ContinuousTaskBuilder::new(self, name) + } + + /// Create a builder for a oneshot task + pub fn oneshot(&self, name: &'static str) -> OneshotTaskBuilder { + OneshotTaskBuilder::new(self, name) + } + /// Spawn all tasks according to their specifications pub fn spawn(&self, shard: Rc<IggyShard>, specs: Vec<Box<dyn TaskSpec>>) { for spec in specs { @@ -155,7 +171,7 @@ impl TaskSupervisor { let result = spec.run(ctx).await; match &result { - Ok(()) => info!("Continuous task '{}' completed on shard {}", name, shard_id), + Ok(()) => debug!("Continuous task '{}' completed on shard {}", name, shard_id), Err(e) => error!( "Continuous task '{}' failed on shard {}: {}", name, shard_id, e @@ -186,7 +202,7 @@ impl TaskSupervisor { let result = spec.run(ctx).await; match &result { - Ok(()) => info!("Periodic task '{}' completed on shard {}", name, shard_id), + Ok(()) => debug!("Periodic task '{}' completed on shard {}", name, shard_id), Err(e) => error!( "Periodic task '{}' failed on shard {}: {}", name, shard_id, e @@ -211,7 +227,7 @@ impl TaskSupervisor { let result = spec.run(ctx).await; match &result { - Ok(()) => info!("OneShot task '{}' completed on shard {}", name, shard_id), + Ok(()) => debug!("OneShot task '{}' completed on shard {}", name, shard_id), Err(e) => error!( "OneShot task '{}' failed on shard {}: {}", name, shard_id, e @@ -224,7 +240,7 @@ impl TaskSupervisor { /// Trigger graceful shutdown for all tasks pub async fn graceful_shutdown(&self, timeout: Duration) -> bool { - info!( + debug!( "Initiating graceful shutdown for {} tasks on shard {}", self.tasks.borrow().len() + self.oneshot_handles.borrow().len(), self.shard_id @@ -248,9 +264,7 @@ impl TaskSupervisor { let all_complete = continuous_periodic_complete && oneshot_complete; - if all_complete { - info!("All tasks shutdown gracefully on shard {}", self.shard_id); - } else { + if !all_complete { warn!( "Some tasks did not shutdown cleanly on shard {}", self.shard_id @@ -352,7 +366,7 @@ impl TaskSupervisor { let all_complete = results.iter().all(|&r| r); if all_complete { - info!("All oneshot tasks completed on shard {}", self.shard_id); + debug!("All oneshot tasks completed on shard {}", self.shard_id); } else { error!("Some oneshot tasks failed on shard {}", self.shard_id); } @@ -486,7 +500,7 @@ impl TaskSupervisor { } } - info!( + trace!( "Periodic tick task '{}' completed on shard {}", name, shard_id ); @@ -503,12 +517,13 @@ impl TaskSupervisor { /// Shutdown all connections gracefully fn shutdown_connections(&self) { - info!( - "Shutting down {} active connections", - self.active_connections.borrow().len() - ); - let connections = self.active_connections.borrow(); + if connections.is_empty() { + return; + } + + info!("Shutting down {} active connections", connections.len()); + for (client_id, stop_sender) in connections.iter() { trace!("Sending shutdown signal to connection {}", client_id); if let Err(e) = stop_sender.send_blocking(()) { @@ -519,4 +534,25 @@ impl TaskSupervisor { } } } + + /// Register a task handle (used by builders) + pub(crate) fn register_task( + &self, + name: &str, + kind: TaskKind, + handle: JoinHandle<Result<(), IggyError>>, + is_critical: bool, + ) { + let task_handle = TaskHandle { + name: name.to_string(), + kind: kind.clone(), + handle, + is_critical, + }; + + match kind { + TaskKind::OneShot => self.oneshot_handles.borrow_mut().push(task_handle), + _ => self.tasks.borrow_mut().push(task_handle), + } + } } diff --git a/core/server/src/tcp/tcp_listener.rs b/core/server/src/tcp/tcp_listener.rs index f39dcc94..8cda9310 100644 --- a/core/server/src/tcp/tcp_listener.rs +++ b/core/server/src/tcp/tcp_listener.rs @@ -22,7 +22,7 @@ use crate::shard::IggyShard; use crate::shard::tasks::task_supervisor; use crate::shard::transmission::event::ShardEvent; use crate::tcp::connection_handler::{handle_connection, handle_error}; -use crate::{shard_error, shard_info}; +use crate::{shard_debug, shard_error, shard_info}; use compio::net::{TcpListener, TcpOpts}; use error_set::ErrContext; use futures::FutureExt; @@ -155,7 +155,7 @@ async fn accept_loop( let accept_future = listener.accept(); futures::select! { _ = shutdown_check.fuse() => { - shard_info!(shard.id, "{} detected shutdown flag, no longer accepting connections", server_name); + shard_debug!(shard.id, "{} detected shutdown flag, no longer accepting connections", server_name); break; } result = accept_future.fuse() => {
