This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new b3ea3c8b feat(tpc): implement cooperative shutdown (#1941)
b3ea3c8b is described below
commit b3ea3c8b5b0545dd1654c02c33b2a507b420f52b
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Jun 30 17:04:45 2025 +0200
feat(tpc): implement cooperative shutdown (#1941)
This closes #1932.
---
Cargo.lock | 11 +++
core/server/Cargo.toml | 1 +
core/server/src/bootstrap.rs | 18 +++--
core/server/src/main.rs | 37 +++++++---
core/server/src/shard/builder.rs | 10 ++-
core/server/src/shard/mod.rs | 60 +++++++++++------
core/server/src/shard/task_registry.rs | 108 ++++++++++++++++++++++++++++++
core/server/src/shard/tasks/messages.rs | 66 +++++++++++-------
core/server/src/tcp/connection_handler.rs | 30 ++++++---
core/server/src/tcp/tcp_listener.rs | 97 +++++++++++++++++----------
core/server/src/tcp/tcp_server.rs | 8 +--
11 files changed, 340 insertions(+), 106 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index c386d838..a3b0ee92 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1850,6 +1850,16 @@ dependencies = [
"cipher",
]
+[[package]]
+name = "ctrlc"
+version = "3.4.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "46f93780a459b7d656ef7f071fe699c4d3d2cb201c4b24d085b6ddc505276e73"
+dependencies = [
+ "nix 0.30.1",
+ "windows-sys 0.59.0",
+]
+
[[package]]
name = "cucumber"
version = "0.21.1"
@@ -6906,6 +6916,7 @@ dependencies = [
"clap",
"console-subscriber",
"crossbeam",
+ "ctrlc",
"dashmap",
"derive_more 2.0.1",
"dotenvy",
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 26345967..9836797f 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -48,6 +48,7 @@ chrono = { workspace = true }
clap = { workspace = true }
console-subscriber = { workspace = true, optional = true }
crossbeam = { workspace = true }
+ctrlc = { version = "3.4", features = ["termination"] }
dashmap = { workspace = true }
derive_more = { workspace = true }
dotenvy = "0.15.7"
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index 2d9f7eaa..6f046fa8 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -12,7 +12,10 @@ use crate::{
IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV,
configs::{config_provider::ConfigProviderKind, server::ServerConfig,
system::SystemConfig},
server_error::ServerError,
- shard::transmission::{connector::ShardConnector, frame::ShardFrame},
+ shard::transmission::{
+ connector::{ShardConnector, StopSender},
+ frame::ShardFrame,
+ },
streaming::{
persistence::persister::{FilePersister, FileWithSyncPersister,
PersisterKind},
users::user::User,
@@ -21,14 +24,21 @@ use crate::{
};
use std::{env, fs::remove_dir_all, ops::Range, path::Path, sync::Arc};
-pub fn create_shard_connections(shards_set: Range<usize>) ->
Vec<ShardConnector<ShardFrame>> {
+pub fn create_shard_connections(
+ shards_set: Range<usize>,
+) -> (Vec<ShardConnector<ShardFrame>>, Vec<(u16, StopSender)>) {
let shards_count = shards_set.len();
- let connectors = shards_set
+ let connectors: Vec<ShardConnector<ShardFrame>> = shards_set
.into_iter()
.map(|id| ShardConnector::new(id as u16, shards_count))
.collect();
- connectors
+ let shutdown_handles = connectors
+ .iter()
+ .map(|conn| (conn.id, conn.stop_sender.clone()))
+ .collect();
+
+ (connectors, shutdown_handles)
}
pub async fn load_config(
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 1b9331a1..2cd70c4d 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -127,21 +127,23 @@ fn main() -> Result<(), ServerError> {
let available_cpus = available_parallelism().expect("Failed to get num of
cores");
let shards_count = available_cpus.into();
let shards_set = 0..shards_count;
- let connections = create_shard_connections(shards_set.clone());
+ let (connections, shutdown_handles) =
create_shard_connections(shards_set.clone());
let gate = Arc::new(Gate::new());
let mut handles = Vec::with_capacity(shards_set.len());
+
for shard_id in shards_set {
let id = shard_id as u16;
let gate = gate.clone();
let connections = connections.clone();
let config = config.clone();
let state_persister =
resolve_persister(config.system.state.enforce_fsync);
+
let handle = std::thread::Builder::new()
.name(format!("shard-{id}"))
.spawn(move || {
MemoryPool::init_pool(config.system.clone());
monoio::utils::bind_to_cpu_set(Some(shard_id))
- .expect(format!("Failed to set CPU affinity for
shard-{id}").as_str());
+ .unwrap_or_else(|e| panic!("Failed to set CPU affinity for
shard-{id}: {e}"));
let mut rt = create_shard_executor();
rt.block_on(async move {
@@ -242,21 +244,40 @@ fn main() -> Result<(), ServerError> {
.build()
.into();
- //TODO: If one of the shards fails to initialize, we
should crash the whole program;
if let Err(e) = shard.run().await {
error!("Failed to run shard-{id}: {e}");
}
- //TODO: If one of the shards fails to initialize, we
should crash the whole program;
- //shard.assert_init();
+ info!("Shard {} run completed", id);
})
})
- .expect(format!("Failed to spawn thread for shard-{id}").as_str());
+ .unwrap_or_else(|e| panic!("Failed to spawn thread for shard-{id}:
{e}"));
handles.push(handle);
}
- handles.into_iter().for_each(|handle| {
+ let shutdown_handles_for_signal = shutdown_handles.clone();
+ ctrlc::set_handler(move || {
+ info!("Received shutdown signal (SIGTERM/SIGINT), initiating graceful
shutdown...");
+
+ for (shard_id, stop_sender) in &shutdown_handles_for_signal {
+ info!("Sending shutdown signal to shard {}", shard_id);
+ if let Err(e) = stop_sender.send_blocking(()) {
+ error!(
+ "Failed to send shutdown signal to shard {}: {}",
+ shard_id, e
+ );
+ }
+ }
+ })
+ .expect("Error setting Ctrl-C handler");
+
+ info!("Iggy server is running. Press Ctrl+C or send SIGTERM to shutdown.");
+ for (idx, handle) in handles.into_iter().enumerate() {
+ info!("Waiting for shard thread {} to complete...", idx);
handle.join().expect("Failed to join shard thread");
- });
+ info!("Shard thread {} completed", idx);
+ }
+
+ info!("All shards have shut down. Iggy server is exiting.");
/*
#[cfg(feature = "disable-mimalloc")]
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
index beda9ae2..eb851271 100644
--- a/core/server/src/shard/builder.rs
+++ b/core/server/src/shard/builder.rs
@@ -16,7 +16,11 @@
* under the License.
*/
-use std::{cell::Cell, rc::Rc, sync::Arc};
+use std::{
+ cell::Cell,
+ rc::Rc,
+ sync::{Arc, atomic::AtomicBool},
+};
use iggy_common::{Aes256GcmEncryptor, EncryptorKind};
use tracing::info;
@@ -25,7 +29,7 @@ use crate::{
bootstrap::resolve_persister,
configs::server::ServerConfig,
map_toggle_str,
- shard::Shard,
+ shard::{Shard, task_registry::TaskRegistry},
state::{StateKind, file::FileState},
streaming::{diagnostics::metrics::Metrics, storage::SystemStorage},
versioning::SemanticVersion,
@@ -115,6 +119,8 @@ impl IggyShardBuilder {
stop_sender: stop_sender,
messages_receiver: Cell::new(Some(frame_receiver)),
metrics: Metrics::init(),
+ task_registry: TaskRegistry::new(),
+ is_shutting_down: AtomicBool::new(false),
users: Default::default(),
permissioner: Default::default(),
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index cfbd6eac..cc5023e2 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -20,6 +20,7 @@ pub mod builder;
pub mod gate;
pub mod namespace;
pub mod system;
+pub mod task_registry;
pub mod tasks;
pub mod transmission;
@@ -36,9 +37,9 @@ use std::{
str::FromStr,
sync::{
Arc, RwLock,
- atomic::{AtomicU32, Ordering},
+ atomic::{AtomicBool, AtomicU32, Ordering},
},
- time::Instant,
+ time::{Duration, Instant},
};
use tracing::{error, info, instrument, trace, warn};
use transmission::connector::{Receiver, ShardConnector, StopReceiver,
StopSender};
@@ -47,6 +48,7 @@ use crate::{
configs::server::ServerConfig,
shard::{
system::info::SystemInfo,
+ task_registry::TaskRegistry,
tasks::messages::spawn_shard_message_task,
transmission::{
event::ShardEvent,
@@ -74,6 +76,8 @@ use crate::{
};
pub const COMPONENT: &str = "SHARD";
+pub const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
+
static USER_ID: AtomicU32 = AtomicU32::new(1);
type Task = Pin<Box<dyn Future<Output = Result<(), IggyError>>>>;
@@ -142,8 +146,10 @@ pub struct IggyShard {
pub(crate) metrics: Metrics,
pub messages_receiver: Cell<Option<Receiver<ShardFrame>>>,
- stop_receiver: StopReceiver,
- stop_sender: StopSender,
+ pub(crate) stop_receiver: StopReceiver,
+ pub(crate) stop_sender: StopSender,
+ pub(crate) task_registry: TaskRegistry,
+ pub(crate) is_shutting_down: AtomicBool,
}
impl IggyShard {
@@ -181,11 +187,31 @@ impl IggyShard {
// TODO: Fixme
//self.assert_init();
info!("Initiated shard with ID: {}", self.id);
+
// Create all tasks (tcp listener, http listener, command processor,
in the future also the background jobs).
let mut tasks: Vec<Task> =
vec![Box::pin(spawn_shard_message_task(self.clone()))];
if self.config.tcp.enabled {
tasks.push(Box::pin(spawn_tcp_server(self.clone())));
}
+
+ let stop_receiver = self.get_stop_receiver();
+ let shard_for_shutdown = self.clone();
+
+ monoio::spawn(async move {
+ let _ = stop_receiver.recv().await;
+ info!("Shard {} received shutdown signal", shard_for_shutdown.id);
+
+ let shutdown_success = shard_for_shutdown.trigger_shutdown().await;
+ if !shutdown_success {
+ error!("Shard {} shutdown timed out", shard_for_shutdown.id);
+ } else {
+ info!(
+ "Shard {} shutdown completed successfully",
+ shard_for_shutdown.id
+ );
+ }
+ });
+
let result = try_join_all(tasks).await;
result?;
@@ -430,25 +456,19 @@ impl IggyShard {
Ok(())
}
- #[instrument(skip_all, name = "trace_shutdown")]
- pub async fn shutdown(&mut self) -> Result<(), IggyError> {
- //TODO: Fixme, impl cooperative shutdown.
- self.persist_messages().await?;
- Ok(())
+ pub fn is_shutting_down(&self) -> bool {
+ self.is_shutting_down.load(Ordering::Relaxed)
}
- #[instrument(skip_all, name = "trace_persist_messages")]
- pub async fn persist_messages(&self) -> Result<usize, IggyError> {
- trace!("Saving buffered messages on disk...");
- let mut saved_messages_number = 0;
- //TODO: Fixme
- /*
- for stream in self.streams.values() {
- saved_messages_number += stream.persist_messages().await?;
- }
- */
+ pub fn get_stop_receiver(&self) -> StopReceiver {
+ self.stop_receiver.clone()
+ }
- Ok(saved_messages_number)
+ #[instrument(skip_all, name = "trace_shutdown")]
+ pub async fn trigger_shutdown(&self) -> bool {
+ self.is_shutting_down.store(true, Ordering::SeqCst);
+ info!("Shard {} shutdown state set", self.id);
+ self.task_registry.shutdown_all(SHUTDOWN_TIMEOUT).await
}
pub fn get_available_shards_count(&self) -> u32 {
diff --git a/core/server/src/shard/task_registry.rs
b/core/server/src/shard/task_registry.rs
new file mode 100644
index 00000000..1a201d3b
--- /dev/null
+++ b/core/server/src/shard/task_registry.rs
@@ -0,0 +1,108 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use async_channel::{Receiver, Sender, bounded};
+use futures::future::join_all;
+use monoio::task::JoinHandle;
+use std::cell::RefCell;
+use std::collections::HashMap;
+use std::future::Future;
+use std::time::Duration;
+use tracing::{error, info, warn};
+
+pub struct TaskRegistry {
+ tasks: RefCell<Vec<JoinHandle<()>>>,
+ active_connections: RefCell<HashMap<u32, Sender<()>>>,
+}
+
+impl TaskRegistry {
+ pub fn new() -> Self {
+ Self {
+ tasks: RefCell::new(Vec::new()),
+ active_connections: RefCell::new(HashMap::new()),
+ }
+ }
+
+ pub fn spawn_tracked<F>(&self, future: F)
+ where
+ F: Future<Output = ()> + 'static,
+ {
+ let handle = monoio::spawn(future);
+ self.tasks.borrow_mut().push(handle);
+ }
+
+ pub fn add_connection(&self, client_id: u32) -> Receiver<()> {
+ let (stop_sender, stop_receiver) = bounded(1);
+ self.active_connections
+ .borrow_mut()
+ .insert(client_id, stop_sender);
+ stop_receiver
+ }
+
+ pub fn remove_connection(&self, client_id: &u32) {
+ self.active_connections.borrow_mut().remove(client_id);
+ }
+
+ pub async fn shutdown_all(&self, timeout: Duration) -> bool {
+ info!("Initiating task registry shutdown");
+
+ let connections = self.active_connections.borrow();
+ for (client_id, stop_sender) in connections.iter() {
+ info!("Sending shutdown signal to client {}", client_id);
+ if let Err(e) = stop_sender.send(()).await {
+ warn!(
+ "Failed to send shutdown signal to client {}: {}",
+ client_id, e
+ );
+ }
+ }
+ drop(connections);
+
+ let tasks = self.tasks.take();
+ let total = tasks.len();
+
+ if total == 0 {
+ info!("No tasks to shut down");
+ return true;
+ }
+
+ let timeout_futures: Vec<_> = tasks
+ .into_iter()
+ .enumerate()
+ .map(|(idx, handle)| async move {
+ match monoio::time::timeout(timeout, handle).await {
+ Ok(()) => (idx, true),
+ Err(_) => {
+ warn!("Task {} did not complete within timeout", idx);
+ (idx, false)
+ }
+ }
+ })
+ .collect();
+
+ let results = join_all(timeout_futures).await;
+ let completed = results.iter().filter(|(_, success)| *success).count();
+
+ info!(
+ "Task registry shutdown complete. {} of {} tasks completed",
+ completed, total
+ );
+
+ completed == total
+ }
+}
diff --git a/core/server/src/shard/tasks/messages.rs
b/core/server/src/shard/tasks/messages.rs
index 87660290..ec4018c0 100644
--- a/core/server/src/shard/tasks/messages.rs
+++ b/core/server/src/shard/tasks/messages.rs
@@ -1,38 +1,56 @@
-use futures::StreamExt;
+use futures::{FutureExt, StreamExt};
use iggy_common::IggyError;
-use std::rc::Rc;
-use tracing::error;
+use std::{rc::Rc, time::Duration};
+use tracing::{error, info};
use crate::shard::{IggyShard, transmission::frame::ShardFrame};
async fn run_shard_messages_receiver(shard: Rc<IggyShard>) -> Result<(),
IggyError> {
let mut messages_receiver = shard.messages_receiver.take().unwrap();
+
loop {
- if let Some(frame) = messages_receiver.next().await {
- let ShardFrame {
- message,
- response_sender,
- } = frame;
- match (shard.handle_shard_message(message).await, response_sender)
{
- (Some(response), Some(response_sender)) => {
- response_sender
- .send(response)
- .await
- .expect("Failed to send response back to origin
shard.");
+ let shutdown_check = async {
+ loop {
+ if shard.is_shutting_down() {
+ return;
+ }
+ monoio::time::sleep(Duration::from_millis(100)).await;
+ }
+ };
+
+ futures::select! {
+ _ = shutdown_check.fuse() => {
+ info!("Shard {} message receiver shutting down", shard.id);
+ break;
+ }
+ frame = messages_receiver.next().fuse() => {
+ if let Some(frame) = frame {
+ let ShardFrame {
+ message,
+ response_sender,
+ } = frame;
+ match (shard.handle_shard_message(message).await,
response_sender) {
+ (Some(response), Some(response_sender)) => {
+ response_sender
+ .send(response)
+ .await
+ .expect("Failed to send response back to
origin shard.");
+ }
+ _ => {}
+ };
}
- _ => {}
- };
+ }
}
}
+
+ Ok(())
}
pub async fn spawn_shard_message_task(shard: Rc<IggyShard>) -> Result<(),
IggyError> {
- monoio::spawn(async move {
- let result = run_shard_messages_receiver(shard).await;
- if let Err(err) = &result {
- error!("Error running shard: {err}");
- }
- result
- })
- .await
+ let result = run_shard_messages_receiver(shard).await;
+ if let Err(err) = result {
+ error!("Error running shard message receiver: {err}");
+ return Err(err);
+ }
+ Ok(())
}
diff --git a/core/server/src/tcp/connection_handler.rs
b/core/server/src/tcp/connection_handler.rs
index 5622d2a8..418af387 100644
--- a/core/server/src/tcp/connection_handler.rs
+++ b/core/server/src/tcp/connection_handler.rs
@@ -22,7 +22,9 @@ use crate::server_error::ConnectionError;
use crate::shard::IggyShard;
use crate::streaming::session::Session;
use crate::tcp::connection_handler::command::ServerCommand;
+use async_channel::Receiver;
use bytes::BytesMut;
+use futures::FutureExt;
use iggy_common::IggyError;
use std::io::ErrorKind;
use std::rc::Rc;
@@ -34,6 +36,7 @@ pub(crate) async fn handle_connection(
session: &Rc<Session>,
sender: &mut SenderKind,
shard: &Rc<IggyShard>,
+ stop_receiver: Receiver<()>,
) -> Result<(), ConnectionError> {
let mut length_buffer = BytesMut::with_capacity(INITIAL_BYTES_LENGTH);
unsafe {
@@ -44,14 +47,25 @@ pub(crate) async fn handle_connection(
code_buffer.set_len(INITIAL_BYTES_LENGTH);
}
loop {
- let (read_length, initial_buffer) = match
sender.read(length_buffer.clone()).await {
- (Ok(read_length), initial_buffer) => (read_length, initial_buffer),
- (Err(error), _) => {
- if error.as_code() == IggyError::ConnectionClosed.as_code() {
- return Err(ConnectionError::from(error));
- } else {
- sender.send_error_response(error).await?;
- continue;
+ let read_future = sender.read(length_buffer.clone());
+
+ let (read_length, initial_buffer) = futures::select! {
+ _ = stop_receiver.recv().fuse() => {
+ info!("Connection stop signal received for session: {}",
session);
+ let _ =
sender.send_error_response(IggyError::Disconnected).await;
+ return Ok(());
+ }
+ result = read_future.fuse() => {
+ match result {
+ (Ok(read_length), initial_buffer) => (read_length,
initial_buffer),
+ (Err(error), _) => {
+ if error.as_code() ==
IggyError::ConnectionClosed.as_code() {
+ return Err(ConnectionError::from(error));
+ } else {
+ sender.send_error_response(error).await?;
+ continue;
+ }
+ }
}
}
};
diff --git a/core/server/src/tcp/tcp_listener.rs
b/core/server/src/tcp/tcp_listener.rs
index c0080edf..90b1eabb 100644
--- a/core/server/src/tcp/tcp_listener.rs
+++ b/core/server/src/tcp/tcp_listener.rs
@@ -22,9 +22,11 @@ use crate::shard::transmission::event::ShardEvent;
use crate::streaming::clients::client_manager::Transport;
use crate::tcp::connection_handler::{handle_connection, handle_error};
use crate::tcp::tcp_socket;
+use futures::FutureExt;
use iggy_common::IggyError;
use std::net::SocketAddr;
use std::rc::Rc;
+use std::time::Duration;
use tracing::{error, info};
pub async fn start(server_name: &'static str, shard: Rc<IggyShard>) ->
Result<(), IggyError> {
@@ -38,37 +40,61 @@ pub async fn start(server_name: &'static str, shard:
Rc<IggyShard>) -> Result<()
.expect("Failed to parse TCP address");
let socket = tcp_socket::build(ip_v6, socket_config);
- monoio::spawn(async move {
- socket
- .bind(&addr.into())
- .expect("Failed to bind TCP listener");
- socket.listen(1024).unwrap();
- let listener: std::net::TcpListener = socket.into();
- let listener = monoio::net::TcpListener::from_std(listener).unwrap();
- info!("{server_name} server has started on: {:?}", addr);
- loop {
- match listener.accept().await {
- Ok((stream, address)) => {
- let shard = shard.clone();
- info!("Accepted new TCP connection: {address}");
- let transport = Transport::Tcp;
- let session = shard.add_client(&address, transport);
- //TODO: Those can be shared with other shards.
- shard.add_active_session(session.clone());
- // Broadcast session to all shards.
- let event = ShardEvent::NewSession { address, transport };
- // TODO: Fixme look inside of
broadcast_event_to_all_shards method.
- let _responses =
shard.broadcast_event_to_all_shards(event.into());
+ socket
+ .bind(&addr.into())
+ .expect("Failed to bind TCP listener");
+ socket.listen(1024).unwrap();
+ let listener: std::net::TcpListener = socket.into();
+ let listener = monoio::net::TcpListener::from_std(listener).unwrap();
+ info!("{server_name} server has started on: {:?}", addr);
+
+ loop {
+ let shutdown_check = async {
+ loop {
+ if shard.is_shutting_down() {
+ return;
+ }
+ monoio::time::sleep(Duration::from_millis(100)).await;
+ }
+ };
+
+ let accept_future = listener.accept();
+ futures::select! {
+ _ = shutdown_check.fuse() => {
+ info!("{server_name} detected shutdown flag, no longer
accepting connections");
+ break;
+ }
+ result = accept_future.fuse() => {
+ match result {
+ Ok((stream, address)) => {
+ if shard.is_shutting_down() {
+ info!("Rejecting new connection from {} during
shutdown", address);
+ continue;
+ }
+ let shard_clone = shard.clone();
+ info!("Accepted new TCP connection: {address}");
+ let transport = Transport::Tcp;
+ let session = shard_clone.add_client(&address,
transport);
+ //TODO: Those can be shared with other shards.
+ shard_clone.add_active_session(session.clone());
+ // Broadcast session to all shards.
+ let event = ShardEvent::NewSession { address,
transport };
+ // TODO: Fixme look inside of
broadcast_event_to_all_shards method.
+ let _responses =
shard_clone.broadcast_event_to_all_shards(event.into());
+
+ let client_id = session.client_id;
+ info!("Created new session: {session}");
+ let mut sender = SenderKind::get_tcp_sender(stream);
+
+ let conn_stop_receiver =
shard_clone.task_registry.add_connection(client_id);
+
+ let shard_for_conn = shard_clone.clone();
+ shard_clone.task_registry.spawn_tracked(async move {
+ if let Err(error) = handle_connection(&session,
&mut sender, &shard_for_conn, conn_stop_receiver).await {
+ handle_error(error);
+ }
+
shard_for_conn.task_registry.remove_connection(&client_id);
- let _client_id = session.client_id;
- info!("Created new session: {session}");
- let mut sender = SenderKind::get_tcp_sender(stream);
- monoio::spawn(async move {
- if let Err(error) = handle_connection(&session, &mut
sender, &shard).await {
- handle_error(error);
- //TODO: Fixme
- /*
-
//system.read().await.delete_client(client_id).await;
if let Err(error) = sender.shutdown().await {
error!(
"Failed to shutdown TCP stream for client:
{client_id}, address: {address}. {error}"
@@ -78,13 +104,12 @@ pub async fn start(server_name: &'static str, shard:
Rc<IggyShard>) -> Result<()
"Successfully closed TCP stream for
client: {client_id}, address: {address}."
);
}
- */
- }
- });
+ });
+ }
+ Err(error) => error!("Unable to accept TCP socket.
{error}"),
}
- Err(error) => error!("Unable to accept TCP socket. {error}"),
}
}
- })
- .await
+ }
+ Ok(())
}
diff --git a/core/server/src/tcp/tcp_server.rs
b/core/server/src/tcp/tcp_server.rs
index 0430f773..e558d08f 100644
--- a/core/server/src/tcp/tcp_server.rs
+++ b/core/server/src/tcp/tcp_server.rs
@@ -32,10 +32,10 @@ pub async fn spawn_tcp_server(shard: Rc<IggyShard>) ->
Result<(), IggyError> {
};
info!("Initializing {server_name} server...");
// TODO: Fixme -- storing addr of the server inside of the config for
integration tests...
- let result = match shard.config.tcp.tls.enabled {
+ match shard.config.tcp.tls.enabled {
true => unimplemented!("TLS support is not implemented yet"),
- false => tcp_listener::start(server_name, shard).await,
+ false => tcp_listener::start(server_name, shard.clone()).await?,
};
- //info!("{server_name} server has started on: {:?}", addr);
- result
+
+ Ok(())
}