This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch rebase_master
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 2bcf4de7809cbeaefe3072a78a3bb2417461d6a3
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Jun 30 17:04:45 2025 +0200

    feat(tpc): implement cooperative shutdown (#1941)
    
    This closes #1932.
---
 Cargo.lock                                |  11 +++
 core/server/Cargo.toml                    |   1 +
 core/server/src/bootstrap.rs              |  18 +++--
 core/server/src/main.rs                   |  37 +++++++---
 core/server/src/shard/builder.rs          |  10 ++-
 core/server/src/shard/mod.rs              |  60 +++++++++++------
 core/server/src/shard/task_registry.rs    | 108 ++++++++++++++++++++++++++++++
 core/server/src/shard/tasks/messages.rs   |  66 +++++++++++-------
 core/server/src/tcp/connection_handler.rs |  30 ++++++---
 core/server/src/tcp/tcp_listener.rs       |  97 +++++++++++++++++----------
 core/server/src/tcp/tcp_server.rs         |   8 +--
 11 files changed, 340 insertions(+), 106 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 8a3b9e16..6898bb77 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1856,6 +1856,16 @@ dependencies = [
  "cipher",
 ]
 
+[[package]]
+name = "ctrlc"
+version = "3.4.7"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "46f93780a459b7d656ef7f071fe699c4d3d2cb201c4b24d085b6ddc505276e73"
+dependencies = [
+ "nix 0.30.1",
+ "windows-sys 0.59.0",
+]
+
 [[package]]
 name = "cucumber"
 version = "0.21.1"
@@ -7025,6 +7035,7 @@ dependencies = [
  "clap",
  "console-subscriber",
  "crossbeam",
+ "ctrlc",
  "dashmap",
  "derive_more 2.0.1",
  "dotenvy",
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 8da2f495..3e6c801e 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -48,6 +48,7 @@ chrono = { workspace = true }
 clap = { workspace = true }
 console-subscriber = { workspace = true, optional = true }
 crossbeam = { workspace = true }
+ctrlc = { version = "3.4", features = ["termination"] }
 dashmap = { workspace = true }
 derive_more = { workspace = true }
 dotenvy = { workspace = true }
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index 2d9f7eaa..6f046fa8 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -12,7 +12,10 @@ use crate::{
     IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV,
     configs::{config_provider::ConfigProviderKind, server::ServerConfig, 
system::SystemConfig},
     server_error::ServerError,
-    shard::transmission::{connector::ShardConnector, frame::ShardFrame},
+    shard::transmission::{
+        connector::{ShardConnector, StopSender},
+        frame::ShardFrame,
+    },
     streaming::{
         persistence::persister::{FilePersister, FileWithSyncPersister, 
PersisterKind},
         users::user::User,
@@ -21,14 +24,21 @@ use crate::{
 };
 use std::{env, fs::remove_dir_all, ops::Range, path::Path, sync::Arc};
 
-pub fn create_shard_connections(shards_set: Range<usize>) -> 
Vec<ShardConnector<ShardFrame>> {
+pub fn create_shard_connections(
+    shards_set: Range<usize>,
+) -> (Vec<ShardConnector<ShardFrame>>, Vec<(u16, StopSender)>) {
     let shards_count = shards_set.len();
-    let connectors = shards_set
+    let connectors: Vec<ShardConnector<ShardFrame>> = shards_set
         .into_iter()
         .map(|id| ShardConnector::new(id as u16, shards_count))
         .collect();
 
-    connectors
+    let shutdown_handles = connectors
+        .iter()
+        .map(|conn| (conn.id, conn.stop_sender.clone()))
+        .collect();
+
+    (connectors, shutdown_handles)
 }
 
 pub async fn load_config(
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 1b9331a1..2cd70c4d 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -127,21 +127,23 @@ fn main() -> Result<(), ServerError> {
     let available_cpus = available_parallelism().expect("Failed to get num of 
cores");
     let shards_count = available_cpus.into();
     let shards_set = 0..shards_count;
-    let connections = create_shard_connections(shards_set.clone());
+    let (connections, shutdown_handles) = 
create_shard_connections(shards_set.clone());
     let gate = Arc::new(Gate::new());
     let mut handles = Vec::with_capacity(shards_set.len());
+
     for shard_id in shards_set {
         let id = shard_id as u16;
         let gate = gate.clone();
         let connections = connections.clone();
         let config = config.clone();
         let state_persister = 
resolve_persister(config.system.state.enforce_fsync);
+
         let handle = std::thread::Builder::new()
             .name(format!("shard-{id}"))
             .spawn(move || {
                 MemoryPool::init_pool(config.system.clone());
                 monoio::utils::bind_to_cpu_set(Some(shard_id))
-                    .expect(format!("Failed to set CPU affinity for 
shard-{id}").as_str());
+                    .unwrap_or_else(|e| panic!("Failed to set CPU affinity for 
shard-{id}: {e}"));
 
                 let mut rt = create_shard_executor();
                 rt.block_on(async move {
@@ -242,21 +244,40 @@ fn main() -> Result<(), ServerError> {
                         .build()
                         .into();
 
-                    //TODO: If one of the shards fails to initialize, we 
should crash the whole program;
                     if let Err(e) = shard.run().await {
                         error!("Failed to run shard-{id}: {e}");
                     }
-                    //TODO: If one of the shards fails to initialize, we 
should crash the whole program;
-                    //shard.assert_init();
+                    info!("Shard {} run completed", id);
                 })
             })
-            .expect(format!("Failed to spawn thread for shard-{id}").as_str());
+            .unwrap_or_else(|e| panic!("Failed to spawn thread for shard-{id}: 
{e}"));
         handles.push(handle);
     }
 
-    handles.into_iter().for_each(|handle| {
+    let shutdown_handles_for_signal = shutdown_handles.clone();
+    ctrlc::set_handler(move || {
+        info!("Received shutdown signal (SIGTERM/SIGINT), initiating graceful 
shutdown...");
+
+        for (shard_id, stop_sender) in &shutdown_handles_for_signal {
+            info!("Sending shutdown signal to shard {}", shard_id);
+            if let Err(e) = stop_sender.send_blocking(()) {
+                error!(
+                    "Failed to send shutdown signal to shard {}: {}",
+                    shard_id, e
+                );
+            }
+        }
+    })
+    .expect("Error setting Ctrl-C handler");
+
+    info!("Iggy server is running. Press Ctrl+C or send SIGTERM to shutdown.");
+    for (idx, handle) in handles.into_iter().enumerate() {
+        info!("Waiting for shard thread {} to complete...", idx);
         handle.join().expect("Failed to join shard thread");
-    });
+        info!("Shard thread {} completed", idx);
+    }
+
+    info!("All shards have shut down. Iggy server is exiting.");
 
     /*
     #[cfg(feature = "disable-mimalloc")]
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
index beda9ae2..eb851271 100644
--- a/core/server/src/shard/builder.rs
+++ b/core/server/src/shard/builder.rs
@@ -16,7 +16,11 @@
  * under the License.
  */
 
-use std::{cell::Cell, rc::Rc, sync::Arc};
+use std::{
+    cell::Cell,
+    rc::Rc,
+    sync::{Arc, atomic::AtomicBool},
+};
 
 use iggy_common::{Aes256GcmEncryptor, EncryptorKind};
 use tracing::info;
@@ -25,7 +29,7 @@ use crate::{
     bootstrap::resolve_persister,
     configs::server::ServerConfig,
     map_toggle_str,
-    shard::Shard,
+    shard::{Shard, task_registry::TaskRegistry},
     state::{StateKind, file::FileState},
     streaming::{diagnostics::metrics::Metrics, storage::SystemStorage},
     versioning::SemanticVersion,
@@ -115,6 +119,8 @@ impl IggyShardBuilder {
             stop_sender: stop_sender,
             messages_receiver: Cell::new(Some(frame_receiver)),
             metrics: Metrics::init(),
+            task_registry: TaskRegistry::new(),
+            is_shutting_down: AtomicBool::new(false),
 
             users: Default::default(),
             permissioner: Default::default(),
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index cfbd6eac..cc5023e2 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -20,6 +20,7 @@ pub mod builder;
 pub mod gate;
 pub mod namespace;
 pub mod system;
+pub mod task_registry;
 pub mod tasks;
 pub mod transmission;
 
@@ -36,9 +37,9 @@ use std::{
     str::FromStr,
     sync::{
         Arc, RwLock,
-        atomic::{AtomicU32, Ordering},
+        atomic::{AtomicBool, AtomicU32, Ordering},
     },
-    time::Instant,
+    time::{Duration, Instant},
 };
 use tracing::{error, info, instrument, trace, warn};
 use transmission::connector::{Receiver, ShardConnector, StopReceiver, 
StopSender};
@@ -47,6 +48,7 @@ use crate::{
     configs::server::ServerConfig,
     shard::{
         system::info::SystemInfo,
+        task_registry::TaskRegistry,
         tasks::messages::spawn_shard_message_task,
         transmission::{
             event::ShardEvent,
@@ -74,6 +76,8 @@ use crate::{
 };
 
 pub const COMPONENT: &str = "SHARD";
+pub const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
+
 static USER_ID: AtomicU32 = AtomicU32::new(1);
 
 type Task = Pin<Box<dyn Future<Output = Result<(), IggyError>>>>;
@@ -142,8 +146,10 @@ pub struct IggyShard {
 
     pub(crate) metrics: Metrics,
     pub messages_receiver: Cell<Option<Receiver<ShardFrame>>>,
-    stop_receiver: StopReceiver,
-    stop_sender: StopSender,
+    pub(crate) stop_receiver: StopReceiver,
+    pub(crate) stop_sender: StopSender,
+    pub(crate) task_registry: TaskRegistry,
+    pub(crate) is_shutting_down: AtomicBool,
 }
 
 impl IggyShard {
@@ -181,11 +187,31 @@ impl IggyShard {
         // TODO: Fixme
         //self.assert_init();
         info!("Initiated shard with ID: {}", self.id);
+
         // Create all tasks (tcp listener, http listener, command processor, 
in the future also the background jobs).
         let mut tasks: Vec<Task> = 
vec![Box::pin(spawn_shard_message_task(self.clone()))];
         if self.config.tcp.enabled {
             tasks.push(Box::pin(spawn_tcp_server(self.clone())));
         }
+
+        let stop_receiver = self.get_stop_receiver();
+        let shard_for_shutdown = self.clone();
+
+        monoio::spawn(async move {
+            let _ = stop_receiver.recv().await;
+            info!("Shard {} received shutdown signal", shard_for_shutdown.id);
+
+            let shutdown_success = shard_for_shutdown.trigger_shutdown().await;
+            if !shutdown_success {
+                error!("Shard {} shutdown timed out", shard_for_shutdown.id);
+            } else {
+                info!(
+                    "Shard {} shutdown completed successfully",
+                    shard_for_shutdown.id
+                );
+            }
+        });
+
         let result = try_join_all(tasks).await;
         result?;
 
@@ -430,25 +456,19 @@ impl IggyShard {
         Ok(())
     }
 
-    #[instrument(skip_all, name = "trace_shutdown")]
-    pub async fn shutdown(&mut self) -> Result<(), IggyError> {
-        //TODO: Fixme, impl cooperative shutdown.
-        self.persist_messages().await?;
-        Ok(())
+    pub fn is_shutting_down(&self) -> bool {
+        self.is_shutting_down.load(Ordering::Relaxed)
     }
 
-    #[instrument(skip_all, name = "trace_persist_messages")]
-    pub async fn persist_messages(&self) -> Result<usize, IggyError> {
-        trace!("Saving buffered messages on disk...");
-        let mut saved_messages_number = 0;
-        //TODO: Fixme
-        /*
-        for stream in self.streams.values() {
-            saved_messages_number += stream.persist_messages().await?;
-        }
-        */
+    pub fn get_stop_receiver(&self) -> StopReceiver {
+        self.stop_receiver.clone()
+    }
 
-        Ok(saved_messages_number)
+    #[instrument(skip_all, name = "trace_shutdown")]
+    pub async fn trigger_shutdown(&self) -> bool {
+        self.is_shutting_down.store(true, Ordering::SeqCst);
+        info!("Shard {} shutdown state set", self.id);
+        self.task_registry.shutdown_all(SHUTDOWN_TIMEOUT).await
     }
 
     pub fn get_available_shards_count(&self) -> u32 {
diff --git a/core/server/src/shard/task_registry.rs 
b/core/server/src/shard/task_registry.rs
new file mode 100644
index 00000000..1a201d3b
--- /dev/null
+++ b/core/server/src/shard/task_registry.rs
@@ -0,0 +1,108 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use async_channel::{Receiver, Sender, bounded};
+use futures::future::join_all;
+use monoio::task::JoinHandle;
+use std::cell::RefCell;
+use std::collections::HashMap;
+use std::future::Future;
+use std::time::Duration;
+use tracing::{error, info, warn};
+
+pub struct TaskRegistry {
+    tasks: RefCell<Vec<JoinHandle<()>>>,
+    active_connections: RefCell<HashMap<u32, Sender<()>>>,
+}
+
+impl TaskRegistry {
+    pub fn new() -> Self {
+        Self {
+            tasks: RefCell::new(Vec::new()),
+            active_connections: RefCell::new(HashMap::new()),
+        }
+    }
+
+    pub fn spawn_tracked<F>(&self, future: F)
+    where
+        F: Future<Output = ()> + 'static,
+    {
+        let handle = monoio::spawn(future);
+        self.tasks.borrow_mut().push(handle);
+    }
+
+    pub fn add_connection(&self, client_id: u32) -> Receiver<()> {
+        let (stop_sender, stop_receiver) = bounded(1);
+        self.active_connections
+            .borrow_mut()
+            .insert(client_id, stop_sender);
+        stop_receiver
+    }
+
+    pub fn remove_connection(&self, client_id: &u32) {
+        self.active_connections.borrow_mut().remove(client_id);
+    }
+
+    pub async fn shutdown_all(&self, timeout: Duration) -> bool {
+        info!("Initiating task registry shutdown");
+
+        let connections = self.active_connections.borrow();
+        for (client_id, stop_sender) in connections.iter() {
+            info!("Sending shutdown signal to client {}", client_id);
+            if let Err(e) = stop_sender.send(()).await {
+                warn!(
+                    "Failed to send shutdown signal to client {}: {}",
+                    client_id, e
+                );
+            }
+        }
+        drop(connections);
+
+        let tasks = self.tasks.take();
+        let total = tasks.len();
+
+        if total == 0 {
+            info!("No tasks to shut down");
+            return true;
+        }
+
+        let timeout_futures: Vec<_> = tasks
+            .into_iter()
+            .enumerate()
+            .map(|(idx, handle)| async move {
+                match monoio::time::timeout(timeout, handle).await {
+                    Ok(()) => (idx, true),
+                    Err(_) => {
+                        warn!("Task {} did not complete within timeout", idx);
+                        (idx, false)
+                    }
+                }
+            })
+            .collect();
+
+        let results = join_all(timeout_futures).await;
+        let completed = results.iter().filter(|(_, success)| *success).count();
+
+        info!(
+            "Task registry shutdown complete. {} of {} tasks completed",
+            completed, total
+        );
+
+        completed == total
+    }
+}
diff --git a/core/server/src/shard/tasks/messages.rs 
b/core/server/src/shard/tasks/messages.rs
index 87660290..ec4018c0 100644
--- a/core/server/src/shard/tasks/messages.rs
+++ b/core/server/src/shard/tasks/messages.rs
@@ -1,38 +1,56 @@
-use futures::StreamExt;
+use futures::{FutureExt, StreamExt};
 use iggy_common::IggyError;
-use std::rc::Rc;
-use tracing::error;
+use std::{rc::Rc, time::Duration};
+use tracing::{error, info};
 
 use crate::shard::{IggyShard, transmission::frame::ShardFrame};
 
 async fn run_shard_messages_receiver(shard: Rc<IggyShard>) -> Result<(), 
IggyError> {
     let mut messages_receiver = shard.messages_receiver.take().unwrap();
+
     loop {
-        if let Some(frame) = messages_receiver.next().await {
-            let ShardFrame {
-                message,
-                response_sender,
-            } = frame;
-            match (shard.handle_shard_message(message).await, response_sender) 
{
-                (Some(response), Some(response_sender)) => {
-                    response_sender
-                        .send(response)
-                        .await
-                        .expect("Failed to send response back to origin 
shard.");
+        let shutdown_check = async {
+            loop {
+                if shard.is_shutting_down() {
+                    return;
+                }
+                monoio::time::sleep(Duration::from_millis(100)).await;
+            }
+        };
+
+        futures::select! {
+            _ = shutdown_check.fuse() => {
+                info!("Shard {} message receiver shutting down", shard.id);
+                break;
+            }
+            frame = messages_receiver.next().fuse() => {
+                if let Some(frame) = frame {
+                    let ShardFrame {
+                        message,
+                        response_sender,
+                    } = frame;
+                    match (shard.handle_shard_message(message).await, 
response_sender) {
+                        (Some(response), Some(response_sender)) => {
+                            response_sender
+                                .send(response)
+                                .await
+                                .expect("Failed to send response back to 
origin shard.");
+                        }
+                        _ => {}
+                    };
                 }
-                _ => {}
-            };
+            }
         }
     }
+
+    Ok(())
 }
 
 pub async fn spawn_shard_message_task(shard: Rc<IggyShard>) -> Result<(), 
IggyError> {
-    monoio::spawn(async move {
-        let result = run_shard_messages_receiver(shard).await;
-        if let Err(err) = &result {
-            error!("Error running shard: {err}");
-        }
-        result
-    })
-    .await
+    let result = run_shard_messages_receiver(shard).await;
+    if let Err(err) = result {
+        error!("Error running shard message receiver: {err}");
+        return Err(err);
+    }
+    Ok(())
 }
diff --git a/core/server/src/tcp/connection_handler.rs 
b/core/server/src/tcp/connection_handler.rs
index 5622d2a8..418af387 100644
--- a/core/server/src/tcp/connection_handler.rs
+++ b/core/server/src/tcp/connection_handler.rs
@@ -22,7 +22,9 @@ use crate::server_error::ConnectionError;
 use crate::shard::IggyShard;
 use crate::streaming::session::Session;
 use crate::tcp::connection_handler::command::ServerCommand;
+use async_channel::Receiver;
 use bytes::BytesMut;
+use futures::FutureExt;
 use iggy_common::IggyError;
 use std::io::ErrorKind;
 use std::rc::Rc;
@@ -34,6 +36,7 @@ pub(crate) async fn handle_connection(
     session: &Rc<Session>,
     sender: &mut SenderKind,
     shard: &Rc<IggyShard>,
+    stop_receiver: Receiver<()>,
 ) -> Result<(), ConnectionError> {
     let mut length_buffer = BytesMut::with_capacity(INITIAL_BYTES_LENGTH);
     unsafe {
@@ -44,14 +47,25 @@ pub(crate) async fn handle_connection(
         code_buffer.set_len(INITIAL_BYTES_LENGTH);
     }
     loop {
-        let (read_length, initial_buffer) = match 
sender.read(length_buffer.clone()).await {
-            (Ok(read_length), initial_buffer) => (read_length, initial_buffer),
-            (Err(error), _) => {
-                if error.as_code() == IggyError::ConnectionClosed.as_code() {
-                    return Err(ConnectionError::from(error));
-                } else {
-                    sender.send_error_response(error).await?;
-                    continue;
+        let read_future = sender.read(length_buffer.clone());
+
+        let (read_length, initial_buffer) = futures::select! {
+            _ = stop_receiver.recv().fuse() => {
+                info!("Connection stop signal received for session: {}", 
session);
+                let _ = 
sender.send_error_response(IggyError::Disconnected).await;
+                return Ok(());
+            }
+            result = read_future.fuse() => {
+                match result {
+                    (Ok(read_length), initial_buffer) => (read_length, 
initial_buffer),
+                    (Err(error), _) => {
+                        if error.as_code() == 
IggyError::ConnectionClosed.as_code() {
+                            return Err(ConnectionError::from(error));
+                        } else {
+                            sender.send_error_response(error).await?;
+                            continue;
+                        }
+                    }
                 }
             }
         };
diff --git a/core/server/src/tcp/tcp_listener.rs 
b/core/server/src/tcp/tcp_listener.rs
index c0080edf..90b1eabb 100644
--- a/core/server/src/tcp/tcp_listener.rs
+++ b/core/server/src/tcp/tcp_listener.rs
@@ -22,9 +22,11 @@ use crate::shard::transmission::event::ShardEvent;
 use crate::streaming::clients::client_manager::Transport;
 use crate::tcp::connection_handler::{handle_connection, handle_error};
 use crate::tcp::tcp_socket;
+use futures::FutureExt;
 use iggy_common::IggyError;
 use std::net::SocketAddr;
 use std::rc::Rc;
+use std::time::Duration;
 use tracing::{error, info};
 
 pub async fn start(server_name: &'static str, shard: Rc<IggyShard>) -> 
Result<(), IggyError> {
@@ -38,37 +40,61 @@ pub async fn start(server_name: &'static str, shard: 
Rc<IggyShard>) -> Result<()
         .expect("Failed to parse TCP address");
 
     let socket = tcp_socket::build(ip_v6, socket_config);
-    monoio::spawn(async move {
-        socket
-            .bind(&addr.into())
-            .expect("Failed to bind TCP listener");
-        socket.listen(1024).unwrap();
-        let listener: std::net::TcpListener = socket.into();
-        let listener = monoio::net::TcpListener::from_std(listener).unwrap();
-        info!("{server_name} server has started on: {:?}", addr);
-        loop {
-            match listener.accept().await {
-                Ok((stream, address)) => {
-                    let shard = shard.clone();
-                    info!("Accepted new TCP connection: {address}");
-                    let transport = Transport::Tcp;
-                    let session = shard.add_client(&address, transport);
-                    //TODO: Those can be shared with other shards.
-                    shard.add_active_session(session.clone());
-                    // Broadcast session to all shards.
-                    let event = ShardEvent::NewSession { address, transport };
-                    // TODO: Fixme look inside of 
broadcast_event_to_all_shards method.
-                    let _responses = 
shard.broadcast_event_to_all_shards(event.into());
+    socket
+        .bind(&addr.into())
+        .expect("Failed to bind TCP listener");
+    socket.listen(1024).unwrap();
+    let listener: std::net::TcpListener = socket.into();
+    let listener = monoio::net::TcpListener::from_std(listener).unwrap();
+    info!("{server_name} server has started on: {:?}", addr);
+
+    loop {
+        let shutdown_check = async {
+            loop {
+                if shard.is_shutting_down() {
+                    return;
+                }
+                monoio::time::sleep(Duration::from_millis(100)).await;
+            }
+        };
+
+        let accept_future = listener.accept();
+        futures::select! {
+            _ = shutdown_check.fuse() => {
+                info!("{server_name} detected shutdown flag, no longer 
accepting connections");
+                break;
+            }
+            result = accept_future.fuse() => {
+                match result {
+                    Ok((stream, address)) => {
+                        if shard.is_shutting_down() {
+                            info!("Rejecting new connection from {} during 
shutdown", address);
+                            continue;
+                        }
+                        let shard_clone = shard.clone();
+                        info!("Accepted new TCP connection: {address}");
+                        let transport = Transport::Tcp;
+                        let session = shard_clone.add_client(&address, 
transport);
+                        //TODO: Those can be shared with other shards.
+                        shard_clone.add_active_session(session.clone());
+                        // Broadcast session to all shards.
+                        let event = ShardEvent::NewSession { address, 
transport };
+                        // TODO: Fixme look inside of 
broadcast_event_to_all_shards method.
+                        let _responses = 
shard_clone.broadcast_event_to_all_shards(event.into());
+
+                        let client_id = session.client_id;
+                        info!("Created new session: {session}");
+                        let mut sender = SenderKind::get_tcp_sender(stream);
+
+                        let conn_stop_receiver = 
shard_clone.task_registry.add_connection(client_id);
+
+                        let shard_for_conn = shard_clone.clone();
+                        shard_clone.task_registry.spawn_tracked(async move {
+                            if let Err(error) = handle_connection(&session, 
&mut sender, &shard_for_conn, conn_stop_receiver).await {
+                                handle_error(error);
+                            }
+                            
shard_for_conn.task_registry.remove_connection(&client_id);
 
-                    let _client_id = session.client_id;
-                    info!("Created new session: {session}");
-                    let mut sender = SenderKind::get_tcp_sender(stream);
-                    monoio::spawn(async move {
-                        if let Err(error) = handle_connection(&session, &mut 
sender, &shard).await {
-                            handle_error(error);
-                            //TODO: Fixme
-                            /*
-                            
//system.read().await.delete_client(client_id).await;
                             if let Err(error) = sender.shutdown().await {
                                 error!(
                                     "Failed to shutdown TCP stream for client: 
{client_id}, address: {address}. {error}"
@@ -78,13 +104,12 @@ pub async fn start(server_name: &'static str, shard: 
Rc<IggyShard>) -> Result<()
                                     "Successfully closed TCP stream for 
client: {client_id}, address: {address}."
                                 );
                             }
-                            */
-                        }
-                    });
+                        });
+                    }
+                    Err(error) => error!("Unable to accept TCP socket. 
{error}"),
                 }
-                Err(error) => error!("Unable to accept TCP socket. {error}"),
             }
         }
-    })
-    .await
+    }
+    Ok(())
 }
diff --git a/core/server/src/tcp/tcp_server.rs 
b/core/server/src/tcp/tcp_server.rs
index 0430f773..e558d08f 100644
--- a/core/server/src/tcp/tcp_server.rs
+++ b/core/server/src/tcp/tcp_server.rs
@@ -32,10 +32,10 @@ pub async fn spawn_tcp_server(shard: Rc<IggyShard>) -> 
Result<(), IggyError> {
     };
     info!("Initializing {server_name} server...");
     // TODO: Fixme -- storing addr of the server inside of the config for 
integration tests...
-    let result = match shard.config.tcp.tls.enabled {
+    match shard.config.tcp.tls.enabled {
         true => unimplemented!("TLS support is not implemented yet"),
-        false => tcp_listener::start(server_name, shard).await,
+        false => tcp_listener::start(server_name, shard.clone()).await?,
     };
-    //info!("{server_name} server has started on: {:?}", addr);
-    result
+
+    Ok(())
 }

Reply via email to