This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch rebase_master in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 2bcf4de7809cbeaefe3072a78a3bb2417461d6a3 Author: Hubert Gruszecki <[email protected]> AuthorDate: Mon Jun 30 17:04:45 2025 +0200 feat(tpc): implement cooperative shutdown (#1941) This closes #1932. --- Cargo.lock | 11 +++ core/server/Cargo.toml | 1 + core/server/src/bootstrap.rs | 18 +++-- core/server/src/main.rs | 37 +++++++--- core/server/src/shard/builder.rs | 10 ++- core/server/src/shard/mod.rs | 60 +++++++++++------ core/server/src/shard/task_registry.rs | 108 ++++++++++++++++++++++++++++++ core/server/src/shard/tasks/messages.rs | 66 +++++++++++------- core/server/src/tcp/connection_handler.rs | 30 ++++++--- core/server/src/tcp/tcp_listener.rs | 97 +++++++++++++++++---------- core/server/src/tcp/tcp_server.rs | 8 +-- 11 files changed, 340 insertions(+), 106 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8a3b9e16..6898bb77 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1856,6 +1856,16 @@ dependencies = [ "cipher", ] +[[package]] +name = "ctrlc" +version = "3.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46f93780a459b7d656ef7f071fe699c4d3d2cb201c4b24d085b6ddc505276e73" +dependencies = [ + "nix 0.30.1", + "windows-sys 0.59.0", +] + [[package]] name = "cucumber" version = "0.21.1" @@ -7025,6 +7035,7 @@ dependencies = [ "clap", "console-subscriber", "crossbeam", + "ctrlc", "dashmap", "derive_more 2.0.1", "dotenvy", diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index 8da2f495..3e6c801e 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -48,6 +48,7 @@ chrono = { workspace = true } clap = { workspace = true } console-subscriber = { workspace = true, optional = true } crossbeam = { workspace = true } +ctrlc = { version = "3.4", features = ["termination"] } dashmap = { workspace = true } derive_more = { workspace = true } dotenvy = { workspace = true } diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs index 2d9f7eaa..6f046fa8 100644 --- a/core/server/src/bootstrap.rs +++ b/core/server/src/bootstrap.rs @@ -12,7 +12,10 @@ use crate::{ IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV, configs::{config_provider::ConfigProviderKind, server::ServerConfig, system::SystemConfig}, server_error::ServerError, - shard::transmission::{connector::ShardConnector, frame::ShardFrame}, + shard::transmission::{ + connector::{ShardConnector, StopSender}, + frame::ShardFrame, + }, streaming::{ persistence::persister::{FilePersister, FileWithSyncPersister, PersisterKind}, users::user::User, @@ -21,14 +24,21 @@ use crate::{ }; use std::{env, fs::remove_dir_all, ops::Range, path::Path, sync::Arc}; -pub fn create_shard_connections(shards_set: Range<usize>) -> Vec<ShardConnector<ShardFrame>> { +pub fn create_shard_connections( + shards_set: Range<usize>, +) -> (Vec<ShardConnector<ShardFrame>>, Vec<(u16, StopSender)>) { let shards_count = shards_set.len(); - let connectors = shards_set + let connectors: Vec<ShardConnector<ShardFrame>> = shards_set .into_iter() .map(|id| ShardConnector::new(id as u16, shards_count)) .collect(); - connectors + let shutdown_handles = connectors + .iter() + .map(|conn| (conn.id, conn.stop_sender.clone())) + .collect(); + + (connectors, shutdown_handles) } pub async fn load_config( diff --git a/core/server/src/main.rs b/core/server/src/main.rs index 1b9331a1..2cd70c4d 100644 --- a/core/server/src/main.rs +++ b/core/server/src/main.rs @@ -127,21 +127,23 @@ fn main() -> Result<(), ServerError> { let available_cpus = available_parallelism().expect("Failed to get num of cores"); let shards_count = available_cpus.into(); let shards_set = 0..shards_count; - let connections = create_shard_connections(shards_set.clone()); + let (connections, shutdown_handles) = create_shard_connections(shards_set.clone()); let gate = Arc::new(Gate::new()); let mut handles = Vec::with_capacity(shards_set.len()); + for shard_id in shards_set { let id = shard_id as u16; let gate = gate.clone(); let connections = connections.clone(); let config = config.clone(); let state_persister = resolve_persister(config.system.state.enforce_fsync); + let handle = std::thread::Builder::new() .name(format!("shard-{id}")) .spawn(move || { MemoryPool::init_pool(config.system.clone()); monoio::utils::bind_to_cpu_set(Some(shard_id)) - .expect(format!("Failed to set CPU affinity for shard-{id}").as_str()); + .unwrap_or_else(|e| panic!("Failed to set CPU affinity for shard-{id}: {e}")); let mut rt = create_shard_executor(); rt.block_on(async move { @@ -242,21 +244,40 @@ fn main() -> Result<(), ServerError> { .build() .into(); - //TODO: If one of the shards fails to initialize, we should crash the whole program; if let Err(e) = shard.run().await { error!("Failed to run shard-{id}: {e}"); } - //TODO: If one of the shards fails to initialize, we should crash the whole program; - //shard.assert_init(); + info!("Shard {} run completed", id); }) }) - .expect(format!("Failed to spawn thread for shard-{id}").as_str()); + .unwrap_or_else(|e| panic!("Failed to spawn thread for shard-{id}: {e}")); handles.push(handle); } - handles.into_iter().for_each(|handle| { + let shutdown_handles_for_signal = shutdown_handles.clone(); + ctrlc::set_handler(move || { + info!("Received shutdown signal (SIGTERM/SIGINT), initiating graceful shutdown..."); + + for (shard_id, stop_sender) in &shutdown_handles_for_signal { + info!("Sending shutdown signal to shard {}", shard_id); + if let Err(e) = stop_sender.send_blocking(()) { + error!( + "Failed to send shutdown signal to shard {}: {}", + shard_id, e + ); + } + } + }) + .expect("Error setting Ctrl-C handler"); + + info!("Iggy server is running. Press Ctrl+C or send SIGTERM to shutdown."); + for (idx, handle) in handles.into_iter().enumerate() { + info!("Waiting for shard thread {} to complete...", idx); handle.join().expect("Failed to join shard thread"); - }); + info!("Shard thread {} completed", idx); + } + + info!("All shards have shut down. Iggy server is exiting."); /* #[cfg(feature = "disable-mimalloc")] diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs index beda9ae2..eb851271 100644 --- a/core/server/src/shard/builder.rs +++ b/core/server/src/shard/builder.rs @@ -16,7 +16,11 @@ * under the License. */ -use std::{cell::Cell, rc::Rc, sync::Arc}; +use std::{ + cell::Cell, + rc::Rc, + sync::{Arc, atomic::AtomicBool}, +}; use iggy_common::{Aes256GcmEncryptor, EncryptorKind}; use tracing::info; @@ -25,7 +29,7 @@ use crate::{ bootstrap::resolve_persister, configs::server::ServerConfig, map_toggle_str, - shard::Shard, + shard::{Shard, task_registry::TaskRegistry}, state::{StateKind, file::FileState}, streaming::{diagnostics::metrics::Metrics, storage::SystemStorage}, versioning::SemanticVersion, @@ -115,6 +119,8 @@ impl IggyShardBuilder { stop_sender: stop_sender, messages_receiver: Cell::new(Some(frame_receiver)), metrics: Metrics::init(), + task_registry: TaskRegistry::new(), + is_shutting_down: AtomicBool::new(false), users: Default::default(), permissioner: Default::default(), diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index cfbd6eac..cc5023e2 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -20,6 +20,7 @@ pub mod builder; pub mod gate; pub mod namespace; pub mod system; +pub mod task_registry; pub mod tasks; pub mod transmission; @@ -36,9 +37,9 @@ use std::{ str::FromStr, sync::{ Arc, RwLock, - atomic::{AtomicU32, Ordering}, + atomic::{AtomicBool, AtomicU32, Ordering}, }, - time::Instant, + time::{Duration, Instant}, }; use tracing::{error, info, instrument, trace, warn}; use transmission::connector::{Receiver, ShardConnector, StopReceiver, StopSender}; @@ -47,6 +48,7 @@ use crate::{ configs::server::ServerConfig, shard::{ system::info::SystemInfo, + task_registry::TaskRegistry, tasks::messages::spawn_shard_message_task, transmission::{ event::ShardEvent, @@ -74,6 +76,8 @@ use crate::{ }; pub const COMPONENT: &str = "SHARD"; +pub const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10); + static USER_ID: AtomicU32 = AtomicU32::new(1); type Task = Pin<Box<dyn Future<Output = Result<(), IggyError>>>>; @@ -142,8 +146,10 @@ pub struct IggyShard { pub(crate) metrics: Metrics, pub messages_receiver: Cell<Option<Receiver<ShardFrame>>>, - stop_receiver: StopReceiver, - stop_sender: StopSender, + pub(crate) stop_receiver: StopReceiver, + pub(crate) stop_sender: StopSender, + pub(crate) task_registry: TaskRegistry, + pub(crate) is_shutting_down: AtomicBool, } impl IggyShard { @@ -181,11 +187,31 @@ impl IggyShard { // TODO: Fixme //self.assert_init(); info!("Initiated shard with ID: {}", self.id); + // Create all tasks (tcp listener, http listener, command processor, in the future also the background jobs). let mut tasks: Vec<Task> = vec![Box::pin(spawn_shard_message_task(self.clone()))]; if self.config.tcp.enabled { tasks.push(Box::pin(spawn_tcp_server(self.clone()))); } + + let stop_receiver = self.get_stop_receiver(); + let shard_for_shutdown = self.clone(); + + monoio::spawn(async move { + let _ = stop_receiver.recv().await; + info!("Shard {} received shutdown signal", shard_for_shutdown.id); + + let shutdown_success = shard_for_shutdown.trigger_shutdown().await; + if !shutdown_success { + error!("Shard {} shutdown timed out", shard_for_shutdown.id); + } else { + info!( + "Shard {} shutdown completed successfully", + shard_for_shutdown.id + ); + } + }); + let result = try_join_all(tasks).await; result?; @@ -430,25 +456,19 @@ impl IggyShard { Ok(()) } - #[instrument(skip_all, name = "trace_shutdown")] - pub async fn shutdown(&mut self) -> Result<(), IggyError> { - //TODO: Fixme, impl cooperative shutdown. - self.persist_messages().await?; - Ok(()) + pub fn is_shutting_down(&self) -> bool { + self.is_shutting_down.load(Ordering::Relaxed) } - #[instrument(skip_all, name = "trace_persist_messages")] - pub async fn persist_messages(&self) -> Result<usize, IggyError> { - trace!("Saving buffered messages on disk..."); - let mut saved_messages_number = 0; - //TODO: Fixme - /* - for stream in self.streams.values() { - saved_messages_number += stream.persist_messages().await?; - } - */ + pub fn get_stop_receiver(&self) -> StopReceiver { + self.stop_receiver.clone() + } - Ok(saved_messages_number) + #[instrument(skip_all, name = "trace_shutdown")] + pub async fn trigger_shutdown(&self) -> bool { + self.is_shutting_down.store(true, Ordering::SeqCst); + info!("Shard {} shutdown state set", self.id); + self.task_registry.shutdown_all(SHUTDOWN_TIMEOUT).await } pub fn get_available_shards_count(&self) -> u32 { diff --git a/core/server/src/shard/task_registry.rs b/core/server/src/shard/task_registry.rs new file mode 100644 index 00000000..1a201d3b --- /dev/null +++ b/core/server/src/shard/task_registry.rs @@ -0,0 +1,108 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use async_channel::{Receiver, Sender, bounded}; +use futures::future::join_all; +use monoio::task::JoinHandle; +use std::cell::RefCell; +use std::collections::HashMap; +use std::future::Future; +use std::time::Duration; +use tracing::{error, info, warn}; + +pub struct TaskRegistry { + tasks: RefCell<Vec<JoinHandle<()>>>, + active_connections: RefCell<HashMap<u32, Sender<()>>>, +} + +impl TaskRegistry { + pub fn new() -> Self { + Self { + tasks: RefCell::new(Vec::new()), + active_connections: RefCell::new(HashMap::new()), + } + } + + pub fn spawn_tracked<F>(&self, future: F) + where + F: Future<Output = ()> + 'static, + { + let handle = monoio::spawn(future); + self.tasks.borrow_mut().push(handle); + } + + pub fn add_connection(&self, client_id: u32) -> Receiver<()> { + let (stop_sender, stop_receiver) = bounded(1); + self.active_connections + .borrow_mut() + .insert(client_id, stop_sender); + stop_receiver + } + + pub fn remove_connection(&self, client_id: &u32) { + self.active_connections.borrow_mut().remove(client_id); + } + + pub async fn shutdown_all(&self, timeout: Duration) -> bool { + info!("Initiating task registry shutdown"); + + let connections = self.active_connections.borrow(); + for (client_id, stop_sender) in connections.iter() { + info!("Sending shutdown signal to client {}", client_id); + if let Err(e) = stop_sender.send(()).await { + warn!( + "Failed to send shutdown signal to client {}: {}", + client_id, e + ); + } + } + drop(connections); + + let tasks = self.tasks.take(); + let total = tasks.len(); + + if total == 0 { + info!("No tasks to shut down"); + return true; + } + + let timeout_futures: Vec<_> = tasks + .into_iter() + .enumerate() + .map(|(idx, handle)| async move { + match monoio::time::timeout(timeout, handle).await { + Ok(()) => (idx, true), + Err(_) => { + warn!("Task {} did not complete within timeout", idx); + (idx, false) + } + } + }) + .collect(); + + let results = join_all(timeout_futures).await; + let completed = results.iter().filter(|(_, success)| *success).count(); + + info!( + "Task registry shutdown complete. {} of {} tasks completed", + completed, total + ); + + completed == total + } +} diff --git a/core/server/src/shard/tasks/messages.rs b/core/server/src/shard/tasks/messages.rs index 87660290..ec4018c0 100644 --- a/core/server/src/shard/tasks/messages.rs +++ b/core/server/src/shard/tasks/messages.rs @@ -1,38 +1,56 @@ -use futures::StreamExt; +use futures::{FutureExt, StreamExt}; use iggy_common::IggyError; -use std::rc::Rc; -use tracing::error; +use std::{rc::Rc, time::Duration}; +use tracing::{error, info}; use crate::shard::{IggyShard, transmission::frame::ShardFrame}; async fn run_shard_messages_receiver(shard: Rc<IggyShard>) -> Result<(), IggyError> { let mut messages_receiver = shard.messages_receiver.take().unwrap(); + loop { - if let Some(frame) = messages_receiver.next().await { - let ShardFrame { - message, - response_sender, - } = frame; - match (shard.handle_shard_message(message).await, response_sender) { - (Some(response), Some(response_sender)) => { - response_sender - .send(response) - .await - .expect("Failed to send response back to origin shard."); + let shutdown_check = async { + loop { + if shard.is_shutting_down() { + return; + } + monoio::time::sleep(Duration::from_millis(100)).await; + } + }; + + futures::select! { + _ = shutdown_check.fuse() => { + info!("Shard {} message receiver shutting down", shard.id); + break; + } + frame = messages_receiver.next().fuse() => { + if let Some(frame) = frame { + let ShardFrame { + message, + response_sender, + } = frame; + match (shard.handle_shard_message(message).await, response_sender) { + (Some(response), Some(response_sender)) => { + response_sender + .send(response) + .await + .expect("Failed to send response back to origin shard."); + } + _ => {} + }; } - _ => {} - }; + } } } + + Ok(()) } pub async fn spawn_shard_message_task(shard: Rc<IggyShard>) -> Result<(), IggyError> { - monoio::spawn(async move { - let result = run_shard_messages_receiver(shard).await; - if let Err(err) = &result { - error!("Error running shard: {err}"); - } - result - }) - .await + let result = run_shard_messages_receiver(shard).await; + if let Err(err) = result { + error!("Error running shard message receiver: {err}"); + return Err(err); + } + Ok(()) } diff --git a/core/server/src/tcp/connection_handler.rs b/core/server/src/tcp/connection_handler.rs index 5622d2a8..418af387 100644 --- a/core/server/src/tcp/connection_handler.rs +++ b/core/server/src/tcp/connection_handler.rs @@ -22,7 +22,9 @@ use crate::server_error::ConnectionError; use crate::shard::IggyShard; use crate::streaming::session::Session; use crate::tcp::connection_handler::command::ServerCommand; +use async_channel::Receiver; use bytes::BytesMut; +use futures::FutureExt; use iggy_common::IggyError; use std::io::ErrorKind; use std::rc::Rc; @@ -34,6 +36,7 @@ pub(crate) async fn handle_connection( session: &Rc<Session>, sender: &mut SenderKind, shard: &Rc<IggyShard>, + stop_receiver: Receiver<()>, ) -> Result<(), ConnectionError> { let mut length_buffer = BytesMut::with_capacity(INITIAL_BYTES_LENGTH); unsafe { @@ -44,14 +47,25 @@ pub(crate) async fn handle_connection( code_buffer.set_len(INITIAL_BYTES_LENGTH); } loop { - let (read_length, initial_buffer) = match sender.read(length_buffer.clone()).await { - (Ok(read_length), initial_buffer) => (read_length, initial_buffer), - (Err(error), _) => { - if error.as_code() == IggyError::ConnectionClosed.as_code() { - return Err(ConnectionError::from(error)); - } else { - sender.send_error_response(error).await?; - continue; + let read_future = sender.read(length_buffer.clone()); + + let (read_length, initial_buffer) = futures::select! { + _ = stop_receiver.recv().fuse() => { + info!("Connection stop signal received for session: {}", session); + let _ = sender.send_error_response(IggyError::Disconnected).await; + return Ok(()); + } + result = read_future.fuse() => { + match result { + (Ok(read_length), initial_buffer) => (read_length, initial_buffer), + (Err(error), _) => { + if error.as_code() == IggyError::ConnectionClosed.as_code() { + return Err(ConnectionError::from(error)); + } else { + sender.send_error_response(error).await?; + continue; + } + } } } }; diff --git a/core/server/src/tcp/tcp_listener.rs b/core/server/src/tcp/tcp_listener.rs index c0080edf..90b1eabb 100644 --- a/core/server/src/tcp/tcp_listener.rs +++ b/core/server/src/tcp/tcp_listener.rs @@ -22,9 +22,11 @@ use crate::shard::transmission::event::ShardEvent; use crate::streaming::clients::client_manager::Transport; use crate::tcp::connection_handler::{handle_connection, handle_error}; use crate::tcp::tcp_socket; +use futures::FutureExt; use iggy_common::IggyError; use std::net::SocketAddr; use std::rc::Rc; +use std::time::Duration; use tracing::{error, info}; pub async fn start(server_name: &'static str, shard: Rc<IggyShard>) -> Result<(), IggyError> { @@ -38,37 +40,61 @@ pub async fn start(server_name: &'static str, shard: Rc<IggyShard>) -> Result<() .expect("Failed to parse TCP address"); let socket = tcp_socket::build(ip_v6, socket_config); - monoio::spawn(async move { - socket - .bind(&addr.into()) - .expect("Failed to bind TCP listener"); - socket.listen(1024).unwrap(); - let listener: std::net::TcpListener = socket.into(); - let listener = monoio::net::TcpListener::from_std(listener).unwrap(); - info!("{server_name} server has started on: {:?}", addr); - loop { - match listener.accept().await { - Ok((stream, address)) => { - let shard = shard.clone(); - info!("Accepted new TCP connection: {address}"); - let transport = Transport::Tcp; - let session = shard.add_client(&address, transport); - //TODO: Those can be shared with other shards. - shard.add_active_session(session.clone()); - // Broadcast session to all shards. - let event = ShardEvent::NewSession { address, transport }; - // TODO: Fixme look inside of broadcast_event_to_all_shards method. - let _responses = shard.broadcast_event_to_all_shards(event.into()); + socket + .bind(&addr.into()) + .expect("Failed to bind TCP listener"); + socket.listen(1024).unwrap(); + let listener: std::net::TcpListener = socket.into(); + let listener = monoio::net::TcpListener::from_std(listener).unwrap(); + info!("{server_name} server has started on: {:?}", addr); + + loop { + let shutdown_check = async { + loop { + if shard.is_shutting_down() { + return; + } + monoio::time::sleep(Duration::from_millis(100)).await; + } + }; + + let accept_future = listener.accept(); + futures::select! { + _ = shutdown_check.fuse() => { + info!("{server_name} detected shutdown flag, no longer accepting connections"); + break; + } + result = accept_future.fuse() => { + match result { + Ok((stream, address)) => { + if shard.is_shutting_down() { + info!("Rejecting new connection from {} during shutdown", address); + continue; + } + let shard_clone = shard.clone(); + info!("Accepted new TCP connection: {address}"); + let transport = Transport::Tcp; + let session = shard_clone.add_client(&address, transport); + //TODO: Those can be shared with other shards. + shard_clone.add_active_session(session.clone()); + // Broadcast session to all shards. + let event = ShardEvent::NewSession { address, transport }; + // TODO: Fixme look inside of broadcast_event_to_all_shards method. + let _responses = shard_clone.broadcast_event_to_all_shards(event.into()); + + let client_id = session.client_id; + info!("Created new session: {session}"); + let mut sender = SenderKind::get_tcp_sender(stream); + + let conn_stop_receiver = shard_clone.task_registry.add_connection(client_id); + + let shard_for_conn = shard_clone.clone(); + shard_clone.task_registry.spawn_tracked(async move { + if let Err(error) = handle_connection(&session, &mut sender, &shard_for_conn, conn_stop_receiver).await { + handle_error(error); + } + shard_for_conn.task_registry.remove_connection(&client_id); - let _client_id = session.client_id; - info!("Created new session: {session}"); - let mut sender = SenderKind::get_tcp_sender(stream); - monoio::spawn(async move { - if let Err(error) = handle_connection(&session, &mut sender, &shard).await { - handle_error(error); - //TODO: Fixme - /* - //system.read().await.delete_client(client_id).await; if let Err(error) = sender.shutdown().await { error!( "Failed to shutdown TCP stream for client: {client_id}, address: {address}. {error}" @@ -78,13 +104,12 @@ pub async fn start(server_name: &'static str, shard: Rc<IggyShard>) -> Result<() "Successfully closed TCP stream for client: {client_id}, address: {address}." ); } - */ - } - }); + }); + } + Err(error) => error!("Unable to accept TCP socket. {error}"), } - Err(error) => error!("Unable to accept TCP socket. {error}"), } } - }) - .await + } + Ok(()) } diff --git a/core/server/src/tcp/tcp_server.rs b/core/server/src/tcp/tcp_server.rs index 0430f773..e558d08f 100644 --- a/core/server/src/tcp/tcp_server.rs +++ b/core/server/src/tcp/tcp_server.rs @@ -32,10 +32,10 @@ pub async fn spawn_tcp_server(shard: Rc<IggyShard>) -> Result<(), IggyError> { }; info!("Initializing {server_name} server..."); // TODO: Fixme -- storing addr of the server inside of the config for integration tests... - let result = match shard.config.tcp.tls.enabled { + match shard.config.tcp.tls.enabled { true => unimplemented!("TLS support is not implemented yet"), - false => tcp_listener::start(server_name, shard).await, + false => tcp_listener::start(server_name, shard.clone()).await?, }; - //info!("{server_name} server has started on: {:?}", addr); - result + + Ok(()) }
