This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch rebase_master
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit d0ea2c19eb9e0e2e2314e3510675a2ca808dd11e
Author: numinex <[email protected]>
AuthorDate: Tue Jun 24 18:24:30 2025 +0200

    begin fix cg
---
 Cargo.lock                                         |  1 +
 core/server/Cargo.toml                             |  1 +
 core/server/src/binary/command.rs                  |  7 +-
 core/server/src/binary/sender.rs                   | 28 ++++---
 core/server/src/channels/commands/archive_state.rs |  4 +
 .../commands/clean_personal_access_tokens.rs       |  3 +
 .../src/channels/commands/maintain_messages.rs     |  5 ++
 core/server/src/channels/commands/print_sysinfo.rs |  4 +
 core/server/src/channels/commands/save_messages.rs |  4 +
 .../src/channels/commands/verify_heartbeats.rs     |  3 +
 core/server/src/channels/handler.rs                |  4 +
 core/server/src/channels/server_command.rs         |  4 +
 core/server/src/configs/defaults.rs                |  3 +-
 core/server/src/http/http_server.rs                |  3 +
 core/server/src/quic/listener.rs                   |  9 +-
 core/server/src/quic/quic_sender.rs                | 15 +++-
 core/server/src/quic/quic_server.rs                |  4 +-
 core/server/src/shard/builder.rs                   |  2 +-
 core/server/src/shard/mod.rs                       | 95 +++++++++++++++++-----
 core/server/src/shard/namespace.rs                 |  4 +-
 core/server/src/shard/system/clients.rs            | 10 +--
 core/server/src/shard/system/snapshot/mod.rs       |  1 -
 core/server/src/shard/system/storage.rs            |  1 +
 core/server/src/shard/system/streams.rs            |  1 -
 core/server/src/shard/transmission/message.rs      | 14 ++--
 .../server/src/streaming/clients/client_manager.rs | 65 +++++++--------
 core/server/src/streaming/partitions/messages.rs   |  5 +-
 core/server/src/streaming/partitions/partition.rs  | 20 +++--
 core/server/src/streaming/segments/segment.rs      |  9 +-
 core/server/src/streaming/session.rs               | 21 +++--
 core/server/src/streaming/storage.rs               | 42 +++++-----
 core/server/src/streaming/streams/storage.rs       |  2 +-
 core/server/src/streaming/streams/stream.rs        | 10 +--
 core/server/src/streaming/streams/topics.rs        |  9 +-
 core/server/src/streaming/topics/consumer_group.rs | 12 +--
 .../server/src/streaming/topics/consumer_groups.rs | 16 ++--
 core/server/src/streaming/topics/topic.rs          | 18 ++--
 core/server/src/streaming/utils/memory_pool.rs     |  5 +-
 core/server/src/tcp/connection_handler.rs          | 32 +++++---
 core/server/src/tcp/sender.rs                      | 53 +++++++-----
 core/server/src/tcp/tcp_listener.rs                | 45 ++++++----
 core/server/src/tcp/tcp_sender.rs                  |  9 +-
 core/server/src/tcp/tcp_server.rs                  |  4 +-
 core/server/src/tcp/tcp_socket.rs                  | 24 ++++--
 core/server/src/tcp/tcp_tls_listener.rs            | 11 ++-
 core/server/src/tcp/tcp_tls_sender.rs              |  7 +-
 46 files changed, 408 insertions(+), 241 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 17894833..3e51b287 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -7071,6 +7071,7 @@ dependencies = [
  "serde_with",
  "serial_test",
  "sharded_queue",
+ "socket2",
  "static-toml",
  "strum",
  "sysinfo 0.35.2",
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index a493e837..70f6a7fd 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -61,6 +61,7 @@ futures = { workspace = true }
 human-repr = { workspace = true }
 iggy_common = { workspace = true }
 jsonwebtoken = "9.3.1"
+socket2 = "0.5.10"
 lending-iterator = "0.1.7"
 hash32 = "1.0.0"
 mimalloc = { workspace = true, optional = true }
diff --git a/core/server/src/binary/command.rs 
b/core/server/src/binary/command.rs
index 0b997941..aa935e09 100644
--- a/core/server/src/binary/command.rs
+++ b/core/server/src/binary/command.rs
@@ -16,8 +16,11 @@
  * under the License.
  */
 
+use std::rc::Rc;
+
 use crate::binary::sender::SenderKind;
 use crate::define_server_command_enum;
+use crate::shard::IggyShard;
 use crate::streaming::session::Session;
 use crate::streaming::systems::system::SharedSystem;
 use bytes::{BufMut, Bytes, BytesMut};
@@ -130,8 +133,8 @@ pub trait ServerCommandHandler {
         self,
         sender: &mut SenderKind,
         length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError>;
 }
 
diff --git a/core/server/src/binary/sender.rs b/core/server/src/binary/sender.rs
index 2ccf349b..06e9c51f 100644
--- a/core/server/src/binary/sender.rs
+++ b/core/server/src/binary/sender.rs
@@ -22,10 +22,13 @@ use std::io::IoSlice;
 use crate::tcp::tcp_sender::TcpSender;
 use crate::tcp::tcp_tls_sender::TcpTlsSender;
 use crate::{quic::quic_sender::QuicSender, server_error::ServerError};
+use bytes::BytesMut;
 use iggy_common::IggyError;
-use quinn::{RecvStream, SendStream};
+use monoio::buf::IoBufMut;
 use monoio::net::TcpStream;
-use tokio_native_tls::TlsStream;
+use monoio_native_tls::TlsStream;
+use nix::libc;
+use quinn::{RecvStream, SendStream};
 
 macro_rules! forward_async_methods {
     (
@@ -48,25 +51,24 @@ macro_rules! forward_async_methods {
 }
 
 pub trait Sender {
-    fn read(&mut self, buffer: &mut [u8]) -> impl Future<Output = 
Result<usize, IggyError>>;
-    fn send_empty_ok_response(&mut self) -> impl Future<Output = Result<(), 
IggyError>>; 
-    fn send_ok_response(
+    fn read(
         &mut self,
-        payload: &[u8],
-    ) -> impl Future<Output = Result<(), IggyError>>; 
+        buffer: BytesMut,
+    ) -> impl Future<Output = (Result<usize, IggyError>, BytesMut)>;
+    fn send_empty_ok_response(&mut self) -> impl Future<Output = Result<(), 
IggyError>>;
+    fn send_ok_response(&mut self, payload: &[u8]) -> impl Future<Output = 
Result<(), IggyError>>;
     fn send_ok_response_vectored(
         &mut self,
         length: &[u8],
-        slices: Vec<IoSlice<'_>>,
+        slices: Vec<libc::iovec>,
     ) -> impl Future<Output = Result<(), IggyError>>;
     fn send_error_response(
         &mut self,
         error: IggyError,
-    ) -> impl Future<Output = Result<(), IggyError>>; 
-    fn shutdown(&mut self) -> impl Future<Output = Result<(), ServerError>>; 
+    ) -> impl Future<Output = Result<(), IggyError>>;
+    fn shutdown(&mut self) -> impl Future<Output = Result<(), ServerError>>;
 }
 
-#[allow(clippy::large_enum_variant)]
 pub enum SenderKind {
     Tcp(TcpSender),
     TcpTls(TcpTlsSender),
@@ -90,10 +92,10 @@ impl SenderKind {
     }
 
     forward_async_methods! {
-        async fn read(&mut self, buffer: &mut [u8]) -> Result<usize, 
IggyError>;
+        async fn read(&mut self, buffer: BytesMut) -> (Result<usize, 
IggyError>, BytesMut);
         async fn send_empty_ok_response(&mut self) -> Result<(), IggyError>;
         async fn send_ok_response(&mut self, payload: &[u8]) -> Result<(), 
IggyError>;
-        async fn send_ok_response_vectored(&mut self, length: &[u8], slices: 
Vec<IoSlice<'_>>) -> Result<(), IggyError>;
+        async fn send_ok_response_vectored(&mut self, length: &[u8], slices: 
Vec<libc::iovec>) -> Result<(), IggyError>;
         async fn send_error_response(&mut self, error: IggyError) -> 
Result<(), IggyError>;
         async fn shutdown(&mut self) -> Result<(), ServerError>;
     }
diff --git a/core/server/src/channels/commands/archive_state.rs 
b/core/server/src/channels/commands/archive_state.rs
index e80d17d3..f42393ae 100644
--- a/core/server/src/channels/commands/archive_state.rs
+++ b/core/server/src/channels/commands/archive_state.rs
@@ -16,6 +16,8 @@
  * under the License.
  */
 
+//TODO: Fixme
+/*
 use crate::channels::server_command::BackgroundServerCommand;
 use crate::configs::server::StateMaintenanceConfig;
 use crate::streaming::systems::system::SharedSystem;
@@ -137,3 +139,5 @@ impl BackgroundServerCommand<ArchiveStateCommand> for 
ArchiveStateExecutor {
         });
     }
 }
+
+*/
diff --git a/core/server/src/channels/commands/clean_personal_access_tokens.rs 
b/core/server/src/channels/commands/clean_personal_access_tokens.rs
index c815a18f..fd2eaf11 100644
--- a/core/server/src/channels/commands/clean_personal_access_tokens.rs
+++ b/core/server/src/channels/commands/clean_personal_access_tokens.rs
@@ -16,6 +16,7 @@
  * under the License.
  */
 
+/*
 use crate::channels::server_command::BackgroundServerCommand;
 use crate::configs::server::PersonalAccessTokenCleanerConfig;
 use crate::streaming::systems::system::SharedSystem;
@@ -135,3 +136,5 @@ impl 
BackgroundServerCommand<CleanPersonalAccessTokensCommand>
         });
     }
 }
+
+*/
diff --git a/core/server/src/channels/commands/maintain_messages.rs 
b/core/server/src/channels/commands/maintain_messages.rs
index 16ef3647..2dc526e2 100644
--- a/core/server/src/channels/commands/maintain_messages.rs
+++ b/core/server/src/channels/commands/maintain_messages.rs
@@ -16,6 +16,9 @@
  * under the License.
  */
 
+//TODO: Fixme
+/*
+
 use crate::archiver::ArchiverKind;
 use crate::channels::server_command::BackgroundServerCommand;
 use crate::configs::server::MessagesMaintenanceConfig;
@@ -553,3 +556,5 @@ async fn delete_segments(
         messages_count,
     })
 }
+
+*/
diff --git a/core/server/src/channels/commands/print_sysinfo.rs 
b/core/server/src/channels/commands/print_sysinfo.rs
index c130b58c..c0f2b1a2 100644
--- a/core/server/src/channels/commands/print_sysinfo.rs
+++ b/core/server/src/channels/commands/print_sysinfo.rs
@@ -16,6 +16,8 @@
  * under the License.
  */
 
+// TODO: Fixme
+/*
 use crate::{
     channels::server_command::BackgroundServerCommand,
     configs::server::ServerConfig,
@@ -121,3 +123,5 @@ impl BackgroundServerCommand<SysInfoPrintCommand> for 
SysInfoPrintExecutor {
         });
     }
 }
+
+*/
diff --git a/core/server/src/channels/commands/save_messages.rs 
b/core/server/src/channels/commands/save_messages.rs
index 586fb8b0..bf832762 100644
--- a/core/server/src/channels/commands/save_messages.rs
+++ b/core/server/src/channels/commands/save_messages.rs
@@ -16,6 +16,8 @@
  * under the License.
  */
 
+//Todo: Fixme
+/*
 use crate::channels::server_command::BackgroundServerCommand;
 use crate::configs::server::MessageSaverConfig;
 use crate::configs::server::ServerConfig;
@@ -116,3 +118,5 @@ impl BackgroundServerCommand<SaveMessagesCommand> for 
SaveMessagesExecutor {
         });
     }
 }
+
+*/
diff --git a/core/server/src/channels/commands/verify_heartbeats.rs 
b/core/server/src/channels/commands/verify_heartbeats.rs
index ba48c69e..b2b01c49 100644
--- a/core/server/src/channels/commands/verify_heartbeats.rs
+++ b/core/server/src/channels/commands/verify_heartbeats.rs
@@ -16,6 +16,8 @@
  * under the License.
  */
 
+//TODO: Fixme
+/*
 use crate::channels::server_command::BackgroundServerCommand;
 use crate::configs::server::HeartbeatConfig;
 use crate::streaming::systems::system::SharedSystem;
@@ -148,3 +150,4 @@ impl BackgroundServerCommand<VerifyHeartbeatsCommand> for 
VerifyHeartbeatsExecut
         });
     }
 }
+*/
diff --git a/core/server/src/channels/handler.rs 
b/core/server/src/channels/handler.rs
index 8bd82e0d..7612a259 100644
--- a/core/server/src/channels/handler.rs
+++ b/core/server/src/channels/handler.rs
@@ -16,6 +16,8 @@
  * under the License.
  */
 
+//TODO: Fixme
+/*
 use super::server_command::BackgroundServerCommand;
 use crate::configs::server::ServerConfig;
 use crate::streaming::systems::system::SharedSystem;
@@ -44,3 +46,5 @@ impl<'a> BackgroundServerCommandHandler<'a> {
         }
     }
 }
+
+*/
diff --git a/core/server/src/channels/server_command.rs 
b/core/server/src/channels/server_command.rs
index 326b3d77..16b0dc2f 100644
--- a/core/server/src/channels/server_command.rs
+++ b/core/server/src/channels/server_command.rs
@@ -16,6 +16,8 @@
  * under the License.
  */
 
+//TODO: Fixme
+/*
 use crate::configs::server::ServerConfig;
 use crate::streaming::systems::system::SharedSystem;
 use flume::{Receiver, Sender};
@@ -38,3 +40,5 @@ pub trait BackgroundServerCommand<C> {
         receiver: Receiver<C>,
     );
 }
+
+*/
diff --git a/core/server/src/configs/defaults.rs 
b/core/server/src/configs/defaults.rs
index 82b560cf..fc333ec7 100644
--- a/core/server/src/configs/defaults.rs
+++ b/core/server/src/configs/defaults.rs
@@ -36,6 +36,7 @@ use crate::configs::system::{
 use crate::configs::tcp::{TcpConfig, TcpTlsConfig};
 use iggy_common::IggyByteSize;
 use iggy_common::IggyDuration;
+use std::rc::Rc;
 use std::sync::Arc;
 use std::time::Duration;
 
@@ -51,7 +52,7 @@ impl Default for ServerConfig {
             heartbeat: HeartbeatConfig::default(),
             message_saver: MessageSaverConfig::default(),
             personal_access_token: PersonalAccessTokenConfig::default(),
-            system: Arc::new(SystemConfig::default()),
+            system: Rc::new(SystemConfig::default()),
             quic: QuicConfig::default(),
             tcp: TcpConfig::default(),
             http: HttpConfig::default(),
diff --git a/core/server/src/http/http_server.rs 
b/core/server/src/http/http_server.rs
index b91857f5..7dec1a7f 100644
--- a/core/server/src/http/http_server.rs
+++ b/core/server/src/http/http_server.rs
@@ -16,6 +16,8 @@
  * under the License.
  */
 
+//TODO: Fixme
+/*
 use crate::configs::http::{HttpConfig, HttpCorsConfig};
 use crate::http::diagnostics::request_diagnostics;
 use crate::http::jwt::cleaner::start_expired_tokens_cleaner;
@@ -191,3 +193,4 @@ fn configure_cors(config: HttpCorsConfig) -> CorsLayer {
         .allow_credentials(config.allow_credentials)
         .allow_private_network(config.allow_private_network)
 }
+*/
diff --git a/core/server/src/quic/listener.rs b/core/server/src/quic/listener.rs
index e9b329de..f5cfc429 100644
--- a/core/server/src/quic/listener.rs
+++ b/core/server/src/quic/listener.rs
@@ -16,12 +16,14 @@
  * under the License.
  */
 
+use std::rc::Rc;
+
 use crate::binary::command::{ServerCommand, ServerCommandHandler};
 use crate::binary::sender::SenderKind;
 use crate::server_error::ConnectionError;
+use crate::shard::IggyShard;
 use crate::streaming::clients::client_manager::Transport;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use anyhow::anyhow;
 use iggy_common::IggyError;
 use quinn::{Connection, Endpoint, RecvStream, SendStream};
@@ -29,6 +31,8 @@ use tracing::{error, info, trace};
 
 const LISTENERS_COUNT: u32 = 10;
 const INITIAL_BYTES_LENGTH: usize = 4;
+//TODO: Fixme
+/*
 
 pub fn start(endpoint: Endpoint, system: SharedSystem) {
     for _ in 0..LISTENERS_COUNT {
@@ -65,7 +69,7 @@ pub fn start(endpoint: Endpoint, system: SharedSystem) {
 
 async fn handle_connection(
     incoming_connection: quinn::Connecting,
-    system: SharedSystem,
+    shard: Rc<IggyShard>,
 ) -> Result<(), ConnectionError> {
     let connection = incoming_connection.await?;
     let address = connection.remote_address();
@@ -179,3 +183,4 @@ async fn handle_stream(
         }
     }
 }
+*/
diff --git a/core/server/src/quic/quic_sender.rs 
b/core/server/src/quic/quic_sender.rs
index f1631cb2..17b60818 100644
--- a/core/server/src/quic/quic_sender.rs
+++ b/core/server/src/quic/quic_sender.rs
@@ -18,8 +18,11 @@
 
 use crate::quic::COMPONENT;
 use crate::{binary::sender::Sender, server_error::ServerError};
+use bytes::BytesMut;
 use error_set::ErrContext;
 use iggy_common::IggyError;
+use monoio::buf::IoBufMut;
+use nix::libc;
 use quinn::{RecvStream, SendStream};
 use std::io::IoSlice;
 use tracing::{debug, error};
@@ -33,15 +36,18 @@ pub struct QuicSender {
 }
 
 impl Sender for QuicSender {
-    async fn read(&mut self, buffer: &mut [u8]) -> Result<usize, IggyError> {
+    async fn read(&mut self, buffer: BytesMut) -> (Result<usize, IggyError>, 
BytesMut) {
         // Not-so-nice code because quinn recv stream has different API for 
read_exact
+        /*
         let read_bytes = buffer.len();
         self.recv.read_exact(buffer).await.map_err(|error| {
             error!("Failed to read from the stream: {:?}", error);
             IggyError::QuicError
         })?;
+        */
 
-        Ok(read_bytes)
+        //Ok(read_bytes)
+        todo!();
     }
 
     async fn send_empty_ok_response(&mut self) -> Result<(), IggyError> {
@@ -64,7 +70,7 @@ impl Sender for QuicSender {
     async fn send_ok_response_vectored(
         &mut self,
         length: &[u8],
-        slices: Vec<IoSlice<'_>>,
+        slices: Vec<libc::iovec>,
     ) -> Result<(), IggyError> {
         debug!("Sending vectored response with status: {:?}...", STATUS_OK);
 
@@ -79,6 +85,8 @@ impl Sender for QuicSender {
 
         let mut total_bytes_written = 0;
 
+        //TODO: Fixme
+        /*
         for slice in slices {
             let slice_data = &*slice;
             if !slice_data.is_empty() {
@@ -93,6 +101,7 @@ impl Sender for QuicSender {
                 total_bytes_written += slice_data.len();
             }
         }
+        */
 
         debug!(
             "Sent vectored response: {} bytes of payload",
diff --git a/core/server/src/quic/quic_server.rs 
b/core/server/src/quic/quic_server.rs
index 7b8edf21..0a8020b8 100644
--- a/core/server/src/quic/quic_server.rs
+++ b/core/server/src/quic/quic_server.rs
@@ -31,8 +31,9 @@ use crate::configs::quic::QuicConfig;
 use crate::quic::COMPONENT;
 use crate::quic::listener;
 use crate::server_error::QuicError;
-use crate::streaming::systems::system::SharedSystem;
 
+//TODO: Fixme
+/*
 /// Starts the QUIC server.
 /// Returns the address the server is listening on.
 pub fn start(config: QuicConfig, system: SharedSystem) -> SocketAddr {
@@ -136,3 +137,4 @@ fn load_certificates(
     let key = keys.remove(0);
     Ok((certs, key))
 }
+    */
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
index 0428a483..a2a3fdfb 100644
--- a/core/server/src/shard/builder.rs
+++ b/core/server/src/shard/builder.rs
@@ -26,7 +26,7 @@ use crate::{
     configs::server::ServerConfig,
     map_toggle_str,
     shard::Shard,
-    state::{file::FileState, StateKind},
+    state::{StateKind, file::FileState},
     streaming::{diagnostics::metrics::Metrics, storage::SystemStorage},
     versioning::SemanticVersion,
 };
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 3e7e10f1..12ba64bb 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -17,9 +17,9 @@
  */
 
 pub mod builder;
-pub mod system;
 pub mod gate;
 pub mod namespace;
+pub mod system;
 pub mod transmission;
 
 use ahash::{AHashMap, AHashSet, HashMap};
@@ -29,18 +29,42 @@ use futures::future::try_join_all;
 use iggy_common::{EncryptorKind, IggyError, UserId};
 use namespace::IggyNamespace;
 use std::{
-    cell::{Cell, RefCell}, pin::Pin, rc::Rc, str::FromStr, 
sync::{atomic::{AtomicU32, Ordering}, Arc, RwLock}, time::Instant
+    cell::{Cell, RefCell},
+    pin::Pin,
+    rc::Rc,
+    str::FromStr,
+    sync::{
+        Arc, RwLock,
+        atomic::{AtomicU32, Ordering},
+    },
+    time::Instant,
 };
 use tracing::{error, info, instrument, trace, warn};
 use transmission::connector::{Receiver, ShardConnector, StopReceiver, 
StopSender};
 
 use crate::{
     configs::server::ServerConfig,
-    shard::{system::info::SystemInfo, transmission::frame::ShardFrame},
+    shard::{
+        system::info::SystemInfo,
+        transmission::{
+            frame::ShardFrame,
+            message::{ShardEvent, ShardMessage},
+        },
+    },
     state::{
-        file::FileState, system::{StreamState, SystemState, UserState}, 
StateKind
+        StateKind,
+        file::FileState,
+        system::{StreamState, SystemState, UserState},
+    },
+    streaming::{
+        clients::client_manager::ClientManager,
+        diagnostics::metrics::Metrics,
+        personal_access_tokens::personal_access_token::PersonalAccessToken,
+        session::Session,
+        storage::SystemStorage,
+        streams::stream::Stream,
+        users::{permissioner::Permissioner, user::User},
     },
-    streaming::{clients::client_manager::ClientManager, 
diagnostics::metrics::Metrics, 
personal_access_tokens::personal_access_token::PersonalAccessToken, 
session::Session, storage::SystemStorage, streams::stream::Stream, 
users::{permissioner::Permissioner, user::User}},
     versioning::SemanticVersion,
 };
 
@@ -83,7 +107,7 @@ pub struct IggyShard {
     pub(crate) config: ServerConfig,
     //TODO: This could be shared.
     pub(crate) client_manager: RefCell<ClientManager>,
-    pub(crate) active_sessions: RefCell<Vec<Session>>,
+    pub(crate) active_sessions: RefCell<Vec<Rc<Session>>>,
     pub(crate) permissioner: RefCell<Permissioner>,
     pub(crate) users: RefCell<HashMap<UserId, User>>,
 
@@ -244,8 +268,8 @@ impl IggyShard {
         info!("Loading streams from disk...");
         let mut unloaded_streams = Vec::new();
         // Does mononio has api for that ?
-        let mut dir_entries = 
std::fs::read_dir(&self.config.system.get_streams_path())
-            .map_err(|error| {
+        let mut dir_entries =
+            
std::fs::read_dir(&self.config.system.get_streams_path()).map_err(|error| {
                 error!("Cannot read streams directory: {error}");
                 IggyError::CannotReadStreams
             })?;
@@ -362,16 +386,21 @@ impl IggyShard {
             self.metrics.increment_messages(stream.get_messages_count());
 
             self.streams_ids
-            .borrow_mut()
+                .borrow_mut()
                 .insert(stream.name.clone(), stream.stream_id);
             self.streams.borrow_mut().insert(stream.stream_id, stream);
         }
 
-        info!("Loaded {} stream(s) from disk.", self.streams.borrow().len());
+        info!(
+            "Loaded {} stream(s) from disk.",
+            self.streams.borrow().len()
+        );
         Ok(())
     }
 
-    pub fn assert_init(&self) -> Result<(), IggyError> { Ok(())}
+    pub fn assert_init(&self) -> Result<(), IggyError> {
+        Ok(())
+    }
 
     #[instrument(skip_all, name = "trace_shutdown")]
     pub async fn shutdown(&mut self) -> Result<(), IggyError> {
@@ -397,17 +426,41 @@ impl IggyShard {
         self.shards.len() as u32
     }
 
-    pub fn ensure_authenticated(&self, client_id: u32) -> Result<u32, 
IggyError> {
+    pub fn broadcast_event_to_all_shards(&self, client_id: u32, event: 
Rc<ShardEvent>) {
+        self.shards
+            .iter()
+            .filter_map(|shard| {
+                if shard.id != self.id {
+                    Some(shard.connection.clone())
+                } else {
+                    None
+                }
+            })
+            .map(|conn| {
+                let message = ShardMessage::Event(event.clone());
+                conn.send(ShardFrame::new(client_id, message, None));
+            })
+            .collect::<Vec<_>>();
+    }
+
+    pub fn add_active_session(&self, session: Rc<Session>) {
+        self.active_sessions.borrow_mut().push(session);
+    }
+
+    pub fn ensure_authenticated(&self, session: &Session) -> Result<u32, 
IggyError> {
         let active_sessions = self.active_sessions.borrow();
-        let session = active_sessions
+        let user_id = active_sessions
             .iter()
-            .find(|s| s.client_id == client_id)
-            .ok_or_else(|| IggyError::Unauthenticated)?;
-        if session.is_authenticated() {
-            Ok(session.get_user_id())
-        } else {
-            error!("{COMPONENT} - unauthenticated access attempt, session: 
{session}");
-            Err(IggyError::Unauthenticated)
-        }
+            .find(|s| s.get_user_id() == session.get_user_id())
+            .ok_or_else(|| IggyError::Unauthenticated)
+            .and_then(|session| {
+                if session.is_authenticated() {
+                    Ok(session.get_user_id())
+                } else {
+                    error!("{COMPONENT} - unauthenticated access attempt, 
session: {session}");
+                    Err(IggyError::Unauthenticated)
+                }
+            })?;
+            Ok(user_id)
     }
 }
diff --git a/core/server/src/shard/namespace.rs 
b/core/server/src/shard/namespace.rs
index d9d68ec9..a7c899aa 100644
--- a/core/server/src/shard/namespace.rs
+++ b/core/server/src/shard/namespace.rs
@@ -16,9 +16,9 @@
  * under the License.
  */
 
-use std::hash::Hasher as _;
-use iggy_common::Identifier;
 use hash32::{Hasher, Murmur3Hasher};
+use iggy_common::Identifier;
+use std::hash::Hasher as _;
 
 //TODO: Will probably want to move it to separate crate so we can share it 
with sdk.
 #[derive(Debug, Clone, Eq, PartialEq, Hash)]
diff --git a/core/server/src/shard/system/clients.rs 
b/core/server/src/shard/system/clients.rs
index 2cb0fbe8..fde182ce 100644
--- a/core/server/src/shard/system/clients.rs
+++ b/core/server/src/shard/system/clients.rs
@@ -16,22 +16,22 @@
  * under the License.
  */
 
+use crate::shard::IggyShard;
 use crate::streaming::clients::client_manager::{Client, Transport};
 use crate::streaming::session::Session;
-use crate::streaming::systems::COMPONENT;
-use crate::streaming::systems::system::System;
 use error_set::ErrContext;
 use iggy_common::Identifier;
 use iggy_common::IggyError;
 use iggy_common::locking::IggySharedMut;
 use iggy_common::locking::IggySharedMutFn;
 use std::net::SocketAddr;
+use std::rc::Rc;
 use std::sync::Arc;
 use tracing::{error, info};
 
 impl IggyShard {
-    pub async fn add_client(&self, address: &SocketAddr, transport: Transport) 
-> Arc<Session> {
-        let mut client_manager = self.client_manager.write().await;
+    pub fn add_client(&self, address: &SocketAddr, transport: Transport) -> 
Rc<Session> {
+        let mut client_manager = self.client_manager.borrow_mut();
         let session = client_manager.add_client(address, transport);
         info!("Added {transport} client with session: {session} for IP 
address: {address}");
         self.metrics.increment_clients(1);
@@ -42,7 +42,7 @@ impl IggyShard {
         let consumer_groups: Vec<(u32, u32, u32)>;
 
         {
-            let mut client_manager = self.client_manager.write().await;
+            let mut client_manager = self.client_manager.borrow_mut();
             let client = client_manager.delete_client(client_id).await;
             if client.is_none() {
                 error!("Client with ID: {client_id} was not found in the 
client manager.",);
diff --git a/core/server/src/shard/system/snapshot/mod.rs 
b/core/server/src/shard/system/snapshot/mod.rs
index 92b0c400..f6e07785 100644
--- a/core/server/src/shard/system/snapshot/mod.rs
+++ b/core/server/src/shard/system/snapshot/mod.rs
@@ -20,7 +20,6 @@ mod procdump;
 
 use crate::configs::system::SystemConfig;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::System;
 use async_zip::tokio::write::ZipFileWriter;
 use async_zip::{Compression, ZipEntryBuilder};
 use iggy_common::{IggyDuration, IggyError, Snapshot, SnapshotCompression, 
SystemSnapshotType};
diff --git a/core/server/src/shard/system/storage.rs 
b/core/server/src/shard/system/storage.rs
index 9a493751..4cb41120 100644
--- a/core/server/src/shard/system/storage.rs
+++ b/core/server/src/shard/system/storage.rs
@@ -16,6 +16,7 @@
  * under the License.
  */
 
+use crate::shard::system::info::SystemInfo;
 use crate::streaming::persistence::persister::PersisterKind;
 use crate::streaming::storage::SystemInfoStorage;
 use crate::streaming::utils::PooledBuffer;
diff --git a/core/server/src/shard/system/streams.rs 
b/core/server/src/shard/system/streams.rs
index efcfff2b..8a0772a9 100644
--- a/core/server/src/shard/system/streams.rs
+++ b/core/server/src/shard/system/streams.rs
@@ -26,7 +26,6 @@ use std::sync::atomic::{AtomicU32, Ordering};
 use tokio::fs;
 use tracing::{error, info, warn};
 
-
 impl IggyShard {
     pub fn get_streams(&self) -> Vec<&Stream> {
         self.streams.values().collect()
diff --git a/core/server/src/shard/transmission/message.rs 
b/core/server/src/shard/transmission/message.rs
index 3572c0ec..8d1e66eb 100644
--- a/core/server/src/shard/transmission/message.rs
+++ b/core/server/src/shard/transmission/message.rs
@@ -1,3 +1,5 @@
+use std::rc::Rc;
+
 /* Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,16 +17,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-use crate::binary::command::ServerCommand;
+use crate::{binary::command::ServerCommand, streaming::session::Session};
 
 #[derive(Debug)]
 pub enum ShardMessage {
     Command(ServerCommand),
-    Event(ShardEvent),
+    Event(Rc<ShardEvent>),
 }
 
 #[derive(Debug)]
-pub enum ShardEvent {}
+pub enum ShardEvent {
+    NewSession(Rc<Session>),
+}
 
 impl From<ServerCommand> for ShardMessage {
     fn from(command: ServerCommand) -> Self {
@@ -32,8 +36,8 @@ impl From<ServerCommand> for ShardMessage {
     }
 }
 
-impl From<ShardEvent> for ShardMessage {
-    fn from(event: ShardEvent) -> Self {
+impl From<Rc<ShardEvent>> for ShardMessage {
+    fn from(event: Rc<ShardEvent>) -> Self {
         ShardMessage::Event(event)
     }
 }
diff --git a/core/server/src/streaming/clients/client_manager.rs 
b/core/server/src/streaming/clients/client_manager.rs
index 4a84a58c..fc5c00cc 100644
--- a/core/server/src/streaming/clients/client_manager.rs
+++ b/core/server/src/streaming/clients/client_manager.rs
@@ -22,27 +22,26 @@ use ahash::AHashMap;
 use iggy_common::IggyError;
 use iggy_common::IggyTimestamp;
 use iggy_common::UserId;
-use iggy_common::locking::IggySharedMut;
 use iggy_common::locking::IggySharedMutFn;
 use std::fmt::{Display, Formatter};
 use std::net::SocketAddr;
-use std::sync::Arc;
+use std::rc::Rc;
 
 #[derive(Debug, Default)]
 pub struct ClientManager {
-    clients: AHashMap<u32, IggySharedMut<Client>>,
+    clients: AHashMap<u32, Client>,
 }
 
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct Client {
     pub user_id: Option<u32>,
-    pub session: Arc<Session>,
+    pub session: Rc<Session>,
     pub transport: Transport,
     pub consumer_groups: Vec<ConsumerGroup>,
     pub last_heartbeat: IggyTimestamp,
 }
 
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct ConsumerGroup {
     pub stream_id: u32,
     pub topic_id: u32,
@@ -65,9 +64,9 @@ impl Display for Transport {
 }
 
 impl ClientManager {
-    pub fn add_client(&mut self, address: &SocketAddr, transport: Transport) 
-> Arc<Session> {
+    pub fn add_client(&mut self, address: &SocketAddr, transport: Transport) 
-> Rc<Session> {
         let client_id = hash::calculate_32(address.to_string().as_bytes());
-        let session = Arc::new(Session::from_client_id(client_id, *address));
+        let session = Rc::new(Session::from_client_id(client_id, *address));
         let client = Client {
             user_id: None,
             session: session.clone(),
@@ -75,44 +74,43 @@ impl ClientManager {
             consumer_groups: Vec::new(),
             last_heartbeat: IggyTimestamp::now(),
         };
-        self.clients.insert(client_id, IggySharedMut::new(client));
+        self.clients.insert(client_id, client);
         session
     }
 
-    pub async fn set_user_id(&mut self, client_id: u32, user_id: UserId) -> 
Result<(), IggyError> {
-        let client = self.clients.get(&client_id);
+    pub fn set_user_id(&mut self, client_id: u32, user_id: UserId) -> 
Result<(), IggyError> {
+        let client = self.clients.get_mut(&client_id);
         if client.is_none() {
             return Err(IggyError::ClientNotFound(client_id));
         }
 
-        let mut client = client.unwrap().write().await;
+        let mut client = client.unwrap();
         client.user_id = Some(user_id);
         Ok(())
     }
 
-    pub async fn clear_user_id(&mut self, client_id: u32) -> Result<(), 
IggyError> {
-        let client = self.clients.get(&client_id);
+    pub fn clear_user_id(&mut self, client_id: u32) -> Result<(), IggyError> {
+        let client = self.clients.get_mut(&client_id);
         if client.is_none() {
             return Err(IggyError::ClientNotFound(client_id));
         }
 
-        let mut client = client.unwrap().write().await;
+        let mut client = client.unwrap();
         client.user_id = None;
         Ok(())
     }
 
-    pub fn try_get_client(&self, client_id: u32) -> 
Option<IggySharedMut<Client>> {
+    pub fn try_get_client(&self, client_id: u32) -> Option<Client> {
         self.clients.get(&client_id).cloned()
     }
 
-    pub fn get_clients(&self) -> Vec<IggySharedMut<Client>> {
+    pub fn get_clients(&self) -> Vec<Client> {
         self.clients.values().cloned().collect()
     }
 
-    pub async fn delete_clients_for_user(&mut self, user_id: UserId) -> 
Result<(), IggyError> {
+    pub fn delete_clients_for_user(&mut self, user_id: UserId) -> Result<(), 
IggyError> {
         let mut clients_to_remove = Vec::new();
         for client in self.clients.values() {
-            let client = client.read().await;
             if let Some(client_user_id) = client.user_id {
                 if client_user_id == user_id {
                     clients_to_remove.push(client.session.client_id);
@@ -127,28 +125,27 @@ impl ClientManager {
         Ok(())
     }
 
-    pub async fn delete_client(&mut self, client_id: u32) -> 
Option<IggySharedMut<Client>> {
+    pub fn delete_client(&mut self, client_id: u32) -> Option<Client> {
         let client = self.clients.remove(&client_id);
         if let Some(client) = client.as_ref() {
-            let client = client.read().await;
             client.session.clear_user_id();
         }
         client
     }
 
-    pub async fn join_consumer_group(
-        &self,
+    pub fn join_consumer_group(
+        &mut self,
         client_id: u32,
         stream_id: u32,
         topic_id: u32,
         group_id: u32,
     ) -> Result<(), IggyError> {
-        let client = self.clients.get(&client_id);
+        let client = self.clients.get_mut(&client_id);
         if client.is_none() {
             return Err(IggyError::ClientNotFound(client_id));
         }
 
-        let mut client = client.unwrap().write().await;
+        let client = client.unwrap();
         if client.consumer_groups.iter().any(|consumer_group| {
             consumer_group.group_id == group_id
                 && consumer_group.topic_id == topic_id
@@ -165,18 +162,18 @@ impl ClientManager {
         Ok(())
     }
 
-    pub async fn leave_consumer_group(
-        &self,
+    pub fn leave_consumer_group(
+        &mut self,
         client_id: u32,
         stream_id: u32,
         topic_id: u32,
         consumer_group_id: u32,
     ) -> Result<(), IggyError> {
-        let client = self.clients.get(&client_id);
+        let client = self.clients.get_mut(&client_id);
         if client.is_none() {
             return Err(IggyError::ClientNotFound(client_id));
         }
-        let mut client = client.unwrap().write().await;
+        let mut client = client.unwrap();
         for (index, consumer_group) in 
client.consumer_groups.iter().enumerate() {
             if consumer_group.stream_id == stream_id
                 && consumer_group.topic_id == topic_id
@@ -189,9 +186,8 @@ impl ClientManager {
         Ok(())
     }
 
-    pub async fn delete_consumer_groups_for_stream(&self, stream_id: u32) {
-        for client in self.clients.values() {
-            let mut client = client.write().await;
+    pub fn delete_consumer_groups_for_stream(&mut self, stream_id: u32) {
+        for client in self.clients.values_mut() {
             let indexes_to_remove = client
                 .consumer_groups
                 .iter()
@@ -210,9 +206,8 @@ impl ClientManager {
         }
     }
 
-    pub async fn delete_consumer_groups_for_topic(&self, stream_id: u32, 
topic_id: u32) {
-        for client in self.clients.values() {
-            let mut client = client.write().await;
+    pub fn delete_consumer_groups_for_topic(&mut self, stream_id: u32, 
topic_id: u32) {
+        for client in self.clients.values_mut() {
             let indexes_to_remove = client
                 .consumer_groups
                 .iter()
diff --git a/core/server/src/streaming/partitions/messages.rs 
b/core/server/src/streaming/partitions/messages.rs
index a00b371f..a2d71c8e 100644
--- a/core/server/src/streaming/partitions/messages.rs
+++ b/core/server/src/streaming/partitions/messages.rs
@@ -368,6 +368,7 @@ mod tests {
     use crate::streaming::utils::MemoryPool;
     use bytes::Bytes;
     use iggy_common::{IggyExpiry, IggyMessage};
+    use std::rc::Rc;
     use std::sync::Arc;
     use std::sync::atomic::{AtomicU32, AtomicU64};
     use tempfile::TempDir;
@@ -710,7 +711,7 @@ mod tests {
         let partition_id = 3;
         let with_segment = true;
         let temp_dir = TempDir::new().unwrap();
-        let config = Arc::new(SystemConfig {
+        let config = Rc::new(SystemConfig {
             path: temp_dir.path().to_path_buf().to_str().unwrap().to_string(),
             message_deduplication: MessageDeduplicationConfig {
                 enabled: deduplication_enabled,
@@ -718,7 +719,7 @@ mod tests {
             },
             ..Default::default()
         });
-        let storage = Arc::new(SystemStorage::new(
+        let storage = Rc::new(SystemStorage::new(
             config.clone(),
             Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
         ));
diff --git a/core/server/src/streaming/partitions/partition.rs 
b/core/server/src/streaming/partitions/partition.rs
index 73c190cc..4a00ca85 100644
--- a/core/server/src/streaming/partitions/partition.rs
+++ b/core/server/src/streaming/partitions/partition.rs
@@ -28,6 +28,7 @@ use iggy_common::IggyExpiry;
 use iggy_common::IggyTimestamp;
 use iggy_common::Sizeable;
 use std::fmt;
+use std::rc::Rc;
 use std::sync::Arc;
 use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
 
@@ -58,8 +59,8 @@ pub struct Partition {
     pub(crate) consumer_offsets: DashMap<u32, ConsumerOffset>,
     pub(crate) consumer_group_offsets: DashMap<u32, ConsumerOffset>,
     pub(crate) segments: Vec<Segment>,
-    pub(crate) config: Arc<SystemConfig>,
-    pub(crate) storage: Arc<SystemStorage>,
+    pub(crate) config: Rc<SystemConfig>,
+    pub(crate) storage: Rc<SystemStorage>,
 }
 
 #[derive(Debug, PartialEq, Clone)]
@@ -88,8 +89,8 @@ impl Partition {
         topic_id: u32,
         partition_id: u32,
         with_segment: bool,
-        config: Arc<SystemConfig>,
-        storage: Arc<SystemStorage>,
+        config: Rc<SystemConfig>,
+        storage: Rc<SystemStorage>,
         message_expiry: IggyExpiry,
         messages_count_of_parent_stream: Arc<AtomicU64>,
         messages_count_of_parent_topic: Arc<AtomicU64>,
@@ -209,17 +210,18 @@ mod tests {
     use iggy_common::IggyDuration;
     use iggy_common::IggyExpiry;
     use iggy_common::IggyTimestamp;
+    use std::rc::Rc;
     use std::sync::Arc;
     use std::sync::atomic::{AtomicU32, AtomicU64};
 
     #[tokio::test]
     async fn should_be_created_with_a_single_segment_given_valid_parameters() {
         let tempdir = tempfile::TempDir::new().unwrap();
-        let config = Arc::new(SystemConfig {
+        let config = Rc::new(SystemConfig {
             path: tempdir.path().to_str().unwrap().to_string(),
             ..Default::default()
         });
-        let storage = Arc::new(SystemStorage::new(
+        let storage = Rc::new(SystemStorage::new(
             config.clone(),
             Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
         ));
@@ -264,11 +266,11 @@ mod tests {
     #[tokio::test]
     async fn 
should_not_initialize_segments_given_false_with_segment_parameter() {
         let tempdir = tempfile::TempDir::new().unwrap();
-        let config = Arc::new(SystemConfig {
+        let config = Rc::new(SystemConfig {
             path: tempdir.path().to_str().unwrap().to_string(),
             ..Default::default()
         });
-        let storage = Arc::new(SystemStorage::new(
+        let storage = Rc::new(SystemStorage::new(
             config.clone(),
             Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
         ));
@@ -279,7 +281,7 @@ mod tests {
             topic_id,
             1,
             false,
-            Arc::new(SystemConfig::default()),
+            config,
             storage,
             IggyExpiry::NeverExpire,
             Arc::new(AtomicU64::new(0)),
diff --git a/core/server/src/streaming/segments/segment.rs 
b/core/server/src/streaming/segments/segment.rs
index acfc98f4..252fe7f1 100644
--- a/core/server/src/streaming/segments/segment.rs
+++ b/core/server/src/streaming/segments/segment.rs
@@ -27,6 +27,7 @@ use iggy_common::IggyByteSize;
 use iggy_common::IggyError;
 use iggy_common::IggyExpiry;
 use iggy_common::IggyTimestamp;
+use std::rc::Rc;
 use std::sync::Arc;
 use std::sync::atomic::{AtomicU64, Ordering};
 use tokio::fs::remove_file;
@@ -60,7 +61,7 @@ pub struct Segment {
     pub(super) index_reader: Option<IndexReader>,
     pub(super) message_expiry: IggyExpiry,
     pub(super) accumulator: MessagesAccumulator,
-    pub(super) config: Arc<SystemConfig>,
+    pub(super) config: Rc<SystemConfig>,
     pub(super) indexes: IggyIndexesMut,
     pub(super) messages_size: Arc<AtomicU64>,
     pub(super) indexes_size: Arc<AtomicU64>,
@@ -73,7 +74,7 @@ impl Segment {
         topic_id: u32,
         partition_id: u32,
         start_offset: u64,
-        config: Arc<SystemConfig>,
+        config: Rc<SystemConfig>,
         message_expiry: IggyExpiry,
         size_of_parent_stream: Arc<AtomicU64>,
         size_of_parent_topic: Arc<AtomicU64>,
@@ -450,7 +451,7 @@ mod tests {
         let topic_id = 2;
         let partition_id = 3;
         let start_offset = 0;
-        let config = Arc::new(SystemConfig::default());
+        let config = Rc::new(SystemConfig::default());
         let path = config.get_segment_path(stream_id, topic_id, partition_id, 
start_offset);
         let messages_file_path = Segment::get_messages_file_path(&path);
         let index_path = Segment::get_index_path(&path);
@@ -499,7 +500,7 @@ mod tests {
         let topic_id = 2;
         let partition_id = 3;
         let start_offset = 0;
-        let config = Arc::new(SystemConfig {
+        let config = Rc::new(SystemConfig {
             segment: SegmentConfig {
                 cache_indexes: CacheIndexesConfig::None,
                 ..Default::default()
diff --git a/core/server/src/streaming/session.rs 
b/core/server/src/streaming/session.rs
index 1eae4eb0..3f412bce 100644
--- a/core/server/src/streaming/session.rs
+++ b/core/server/src/streaming/session.rs
@@ -19,13 +19,16 @@
 use iggy_common::{AtomicUserId, UserId};
 use std::fmt::Display;
 use std::net::SocketAddr;
+use std::rc::Rc;
 use std::sync::atomic::{AtomicBool, Ordering};
 
 // This might be extended with more fields in the future e.g. custom name, 
permissions etc.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct Session {
-    user_id: AtomicUserId,
-    active: AtomicBool,
+    // TODO: Fixme, those don't have to be atomics anymore, simple integers 
will suffice
+    // Once the atomics are removed, we can impl Copy trait aswell.
+    user_id: Rc<AtomicUserId>,
+    active: Rc<AtomicBool>,
     pub client_id: u32,
     pub ip_address: SocketAddr,
 }
@@ -34,8 +37,8 @@ impl Session {
     pub fn new(client_id: u32, user_id: UserId, ip_address: SocketAddr) -> 
Self {
         Self {
             client_id,
-            active: AtomicBool::new(true),
-            user_id: AtomicUserId::new(user_id),
+            active: Rc::new(AtomicBool::new(true)),
+            user_id: Rc::new(AtomicUserId::new(user_id)),
             ip_address,
         }
     }
@@ -49,15 +52,15 @@ impl Session {
     }
 
     pub fn get_user_id(&self) -> UserId {
-        self.user_id.load(Ordering::Acquire)
+        self.user_id.load(Ordering::Relaxed)
     }
 
     pub fn set_user_id(&self, user_id: UserId) {
-        self.user_id.store(user_id, Ordering::Release)
+        self.user_id.store(user_id, Ordering::Relaxed)
     }
 
     pub fn set_stale(&self) {
-        self.active.store(false, Ordering::Release)
+        self.active.store(false, Ordering::Relaxed)
     }
 
     pub fn clear_user_id(&self) {
@@ -65,7 +68,7 @@ impl Session {
     }
 
     pub fn is_active(&self) -> bool {
-        self.active.load(Ordering::Acquire)
+        self.active.load(Ordering::Relaxed)
     }
 
     pub fn is_authenticated(&self) -> bool {
diff --git a/core/server/src/streaming/storage.rs 
b/core/server/src/streaming/storage.rs
index f423b842..51e2565e 100644
--- a/core/server/src/streaming/storage.rs
+++ b/core/server/src/streaming/storage.rs
@@ -18,13 +18,13 @@
 
 use super::persistence::persister::PersisterKind;
 use crate::configs::system::SystemConfig;
+use crate::shard::system::info::SystemInfo;
+use crate::shard::system::storage::FileSystemInfoStorage;
 use crate::state::system::{PartitionState, StreamState, TopicState};
 use crate::streaming::partitions::partition::{ConsumerOffset, Partition};
 use crate::streaming::partitions::storage::FilePartitionStorage;
 use crate::streaming::streams::storage::FileStreamStorage;
 use crate::streaming::streams::stream::Stream;
-use crate::streaming::systems::info::SystemInfo;
-use crate::streaming::systems::storage::FileSystemInfoStorage;
 use crate::streaming::topics::storage::FileTopicStorage;
 use crate::streaming::topics::topic::Topic;
 use iggy_common::ConsumerKind;
@@ -85,61 +85,61 @@ pub enum PartitionStorageKind {
 }
 
 #[cfg_attr(test, automock)]
-pub trait SystemInfoStorage: Send {
-    fn load(&self) -> impl Future<Output = Result<SystemInfo, IggyError>> + 
Send;
-    fn save(&self, system_info: &SystemInfo) -> impl Future<Output = 
Result<(), IggyError>> + Send;
+pub trait SystemInfoStorage {
+    fn load(&self) -> impl Future<Output = Result<SystemInfo, IggyError>>;
+    fn save(&self, system_info: &SystemInfo) -> impl Future<Output = 
Result<(), IggyError>>; 
 }
 
 #[cfg_attr(test, automock)]
-pub trait StreamStorage: Send {
+pub trait StreamStorage {
     fn load(
         &self,
         stream: &mut Stream,
         state: StreamState,
-    ) -> impl Future<Output = Result<(), IggyError>> + Send;
-    fn save(&self, stream: &Stream) -> impl Future<Output = Result<(), 
IggyError>> + Send;
-    fn delete(&self, stream: &Stream) -> impl Future<Output = Result<(), 
IggyError>> + Send;
+    ) -> impl Future<Output = Result<(), IggyError>>; 
+    fn save(&self, stream: &Stream) -> impl Future<Output = Result<(), 
IggyError>>; 
+    fn delete(&self, stream: &Stream) -> impl Future<Output = Result<(), 
IggyError>>;
 }
 
 #[cfg_attr(test, automock)]
-pub trait TopicStorage: Send {
+pub trait TopicStorage {
     fn load(
         &self,
         topic: &mut Topic,
         state: TopicState,
-    ) -> impl Future<Output = Result<(), IggyError>> + Send;
-    fn save(&self, topic: &Topic) -> impl Future<Output = Result<(), 
IggyError>> + Send;
-    fn delete(&self, topic: &Topic) -> impl Future<Output = Result<(), 
IggyError>> + Send;
+    ) -> impl Future<Output = Result<(), IggyError>>; 
+    fn save(&self, topic: &Topic) -> impl Future<Output = Result<(), 
IggyError>>; 
+    fn delete(&self, topic: &Topic) -> impl Future<Output = Result<(), 
IggyError>>; 
 }
 
 #[cfg_attr(test, automock)]
-pub trait PartitionStorage: Send {
+pub trait PartitionStorage {
     fn load(
         &self,
         partition: &mut Partition,
         state: PartitionState,
-    ) -> impl Future<Output = Result<(), IggyError>> + Send;
+    ) -> impl Future<Output = Result<(), IggyError>>;
     fn save(&self, partition: &mut Partition)
-    -> impl Future<Output = Result<(), IggyError>> + Send;
-    fn delete(&self, partition: &Partition) -> impl Future<Output = Result<(), 
IggyError>> + Send;
+    -> impl Future<Output = Result<(), IggyError>>;
+    fn delete(&self, partition: &Partition) -> impl Future<Output = Result<(), 
IggyError>>; 
     fn save_consumer_offset(
         &self,
         offset: u64,
         path: &str,
-    ) -> impl Future<Output = Result<(), IggyError>> + Send;
+    ) -> impl Future<Output = Result<(), IggyError>>;
     fn load_consumer_offsets(
         &self,
         kind: ConsumerKind,
         path: &str,
-    ) -> impl Future<Output = Result<Vec<ConsumerOffset>, IggyError>> + Send;
+    ) -> impl Future<Output = Result<Vec<ConsumerOffset>, IggyError>>; 
     fn delete_consumer_offsets(
         &self,
         path: &str,
-    ) -> impl Future<Output = Result<(), IggyError>> + Send;
+    ) -> impl Future<Output = Result<(), IggyError>>; 
     fn delete_consumer_offset(
         &self,
         path: &str,
-    ) -> impl Future<Output = Result<(), IggyError>> + Send;
+    ) -> impl Future<Output = Result<(), IggyError>>;
 }
 
 #[derive(Debug)]
diff --git a/core/server/src/streaming/streams/storage.rs 
b/core/server/src/streaming/streams/storage.rs
index 3e3e5e76..53639de9 100644
--- a/core/server/src/streaming/streams/storage.rs
+++ b/core/server/src/streaming/streams/storage.rs
@@ -156,7 +156,7 @@ impl StreamStorage for FileStreamStorage {
         for mut topic in unloaded_topics {
             let loaded_topics = loaded_topics.clone();
             let topic_state = state.topics.remove(&topic.topic_id).unwrap();
-            let load_topic = tokio::spawn(async move {
+            let load_topic = monoio::spawn(async move {
                 match topic.load(topic_state).await {
                     Ok(_) => loaded_topics.lock().await.push(topic),
                     Err(error) => error!(
diff --git a/core/server/src/streaming/streams/stream.rs 
b/core/server/src/streaming/streams/stream.rs
index 339fc08a..03681b75 100644
--- a/core/server/src/streaming/streams/stream.rs
+++ b/core/server/src/streaming/streams/stream.rs
@@ -40,8 +40,8 @@ pub struct Stream {
     pub segments_count: Arc<AtomicU32>,
     pub(crate) topics: AHashMap<u32, Topic>,
     pub(crate) topics_ids: AHashMap<String, u32>,
-    pub(crate) config: Arc<SystemConfig>,
-    pub(crate) storage: Arc<SystemStorage>,
+    pub(crate) config: Rc<SystemConfig>,
+    pub(crate) storage: Rc<SystemStorage>,
 }
 
 impl Stream {
@@ -106,18 +106,18 @@ mod tests {
     #[test]
     fn should_be_created_given_valid_parameters() {
         let tempdir = tempfile::TempDir::new().unwrap();
-        let config = Arc::new(SystemConfig {
+        let config = Rc::new(SystemConfig {
             path: tempdir.path().to_str().unwrap().to_string(),
             ..Default::default()
         });
-        let storage = Arc::new(SystemStorage::new(
+        let storage = Rc::new(SystemStorage::new(
             config.clone(),
             Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
         ));
         MemoryPool::init_pool(config.clone());
         let id = 1;
         let name = "test";
-        let config = Arc::new(SystemConfig::default());
+        let config = Rc::new(SystemConfig::default());
         let path = config.get_stream_path(id);
         let topics_path = config.get_topics_path(id);
 
diff --git a/core/server/src/streaming/streams/topics.rs 
b/core/server/src/streaming/streams/topics.rs
index f8c742be..b975ee83 100644
--- a/core/server/src/streaming/streams/topics.rs
+++ b/core/server/src/streaming/streams/topics.rs
@@ -146,8 +146,7 @@ impl Stream {
             topic.name = name.to_owned();
             topic.message_expiry = message_expiry;
             topic.compression_algorithm = compression_algorithm;
-            for partition in topic.partitions.values_mut() {
-                let mut partition = partition.write().await;
+            for partition in topic.partitions.borrow_mut().values_mut() {
                 partition.message_expiry = message_expiry;
                 for segment in partition.segments.iter_mut() {
                     segment.update_message_expiry(message_expiry);
@@ -282,16 +281,16 @@ mod tests {
         },
     };
     use iggy_common::IggyByteSize;
-    use std::sync::Arc;
+    use std::{rc::Rc, sync::Arc};
 
     #[tokio::test]
     async fn should_get_topic_by_id_and_name() {
         let tempdir = tempfile::TempDir::new().unwrap();
-        let config = Arc::new(SystemConfig {
+        let config = Rc::new(SystemConfig {
             path: tempdir.path().to_str().unwrap().to_string(),
             ..Default::default()
         });
-        let storage = Arc::new(SystemStorage::new(
+        let storage = Rc::new(SystemStorage::new(
             config.clone(),
             Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
         ));
diff --git a/core/server/src/streaming/topics/consumer_group.rs 
b/core/server/src/streaming/topics/consumer_group.rs
index dedb29c7..43d9611b 100644
--- a/core/server/src/streaming/topics/consumer_group.rs
+++ b/core/server/src/streaming/topics/consumer_group.rs
@@ -21,16 +21,16 @@ use iggy_common::IggyError;
 use tokio::sync::RwLock;
 use tracing::trace;
 
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct ConsumerGroup {
     pub topic_id: u32,
     pub group_id: u32,
     pub name: String,
     pub partitions_count: u32,
-    members: AHashMap<u32, RwLock<ConsumerGroupMember>>,
+    members: AHashMap<u32, ConsumerGroupMember>,
 }
 
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct ConsumerGroupMember {
     pub id: u32,
     partitions: AHashMap<u32, u32>,
@@ -49,8 +49,8 @@ impl ConsumerGroup {
         }
     }
 
-    pub fn get_members(&self) -> Vec<&RwLock<ConsumerGroupMember>> {
-        self.members.values().collect()
+    pub fn get_members(&self) -> Vec<ConsumerGroupMember> {
+        self.members.values().cloned().collect()
     }
 
     pub async fn reassign_partitions(&mut self, partitions_count: u32) {
@@ -61,7 +61,7 @@ impl ConsumerGroup {
     pub async fn calculate_partition_id(&self, member_id: u32) -> 
Result<Option<u32>, IggyError> {
         let member = self.members.get(&member_id);
         if let Some(member) = member {
-            return Ok(member.write().await.calculate_partition_id());
+            return Ok(member.await.calculate_partition_id());
         }
         Err(IggyError::ConsumerGroupMemberNotFound(
             member_id,
diff --git a/core/server/src/streaming/topics/consumer_groups.rs 
b/core/server/src/streaming/topics/consumer_groups.rs
index dec2f163..26505355 100644
--- a/core/server/src/streaming/topics/consumer_groups.rs
+++ b/core/server/src/streaming/topics/consumer_groups.rs
@@ -91,7 +91,7 @@ impl Topic {
         self.get_consumer_group_by_id(*group_id.unwrap())
     }
 
-    pub fn get_consumer_group_by_id(&self, id: u32) -> 
Result<&RwLock<ConsumerGroup>, IggyError> {
+    pub fn get_consumer_group_by_id(&self, id: u32) -> Result<ConsumerGroup, 
IggyError> {
         let consumer_group = self.consumer_groups.get(&id);
         if consumer_group.is_none() {
             return Err(IggyError::ConsumerGroupIdNotFound(id, self.topic_id));
@@ -104,7 +104,7 @@ impl Topic {
         &mut self,
         group_id: Option<u32>,
         name: &str,
-    ) -> Result<&RwLock<ConsumerGroup>, IggyError> {
+    ) -> Result<ConsumerGroup, IggyError> {
         if self.consumer_groups_ids.contains_key(name) {
             return Err(IggyError::ConsumerGroupNameAlreadyExists(
                 name.to_owned(),
@@ -138,20 +138,20 @@ impl Topic {
         }
 
         let consumer_group =
-            ConsumerGroup::new(self.topic_id, id, name, self.partitions.len() 
as u32);
-        self.consumer_groups.insert(id, RwLock::new(consumer_group));
+            ConsumerGroup::new(self.topic_id, id, name, 
self.partitions.borrow().len() as u32);
+        self.consumer_groups.insert(id, consumer_group.clone());
         self.consumer_groups_ids.insert(name.to_owned(), id);
         info!(
             "Created consumer group with ID: {} for topic with ID: {} and 
stream with ID: {}.",
             id, self.topic_id, self.stream_id
         );
-        self.get_consumer_group_by_id(id)
+        consumer_group
     }
 
     pub async fn delete_consumer_group(
         &mut self,
         id: &Identifier,
-    ) -> Result<RwLock<ConsumerGroup>, IggyError> {
+    ) -> Result<ConsumerGroup, IggyError> {
         let group_id;
         {
             let consumer_group = 
self.get_consumer_group(id).with_error_context(|error| {
@@ -167,7 +167,6 @@ impl Topic {
         }
         let consumer_group = consumer_group.unwrap();
         {
-            let consumer_group = consumer_group.read().await;
             let group_id = consumer_group.group_id;
             self.consumer_groups_ids.remove(&consumer_group.name);
             let current_group_id = 
self.current_consumer_group_id.load(Ordering::SeqCst);
@@ -176,8 +175,7 @@ impl Topic {
                     .store(group_id, Ordering::SeqCst);
             }
 
-            for (_, partition) in self.partitions.iter() {
-                let partition = partition.read().await;
+            for (_, partition) in self.partitions.borrow().iter() {
                 if let Some((_, offset)) = 
partition.consumer_group_offsets.remove(&group_id) {
                     self.storage
                         .partition
diff --git a/core/server/src/streaming/topics/topic.rs 
b/core/server/src/streaming/topics/topic.rs
index b0913856..ad3e25ef 100644
--- a/core/server/src/streaming/topics/topic.rs
+++ b/core/server/src/streaming/topics/topic.rs
@@ -23,6 +23,8 @@ use crate::streaming::storage::SystemStorage;
 use crate::streaming::topics::consumer_group::ConsumerGroup;
 use ahash::AHashMap;
 use core::fmt;
+use std::cell::RefCell;
+use std::rc::Rc;
 use iggy_common::locking::IggySharedMut;
 use iggy_common::{
     CompressionAlgorithm, Consumer, ConsumerKind, IggyByteSize, IggyError, 
IggyExpiry,
@@ -48,10 +50,10 @@ pub struct Topic {
     pub(crate) messages_count_of_parent_stream: Arc<AtomicU64>,
     pub(crate) messages_count: Arc<AtomicU64>,
     pub(crate) segments_count_of_parent_stream: Arc<AtomicU32>,
-    pub(crate) config: Arc<SystemConfig>,
-    pub(crate) partitions: AHashMap<u32, IggySharedMut<Partition>>,
-    pub(crate) storage: Arc<SystemStorage>,
-    pub(crate) consumer_groups: AHashMap<u32, RwLock<ConsumerGroup>>,
+    pub(crate) config: Rc<SystemConfig>,
+    pub(crate) partitions: RefCell<AHashMap<u32, Partition>>,
+    pub(crate) storage: Rc<SystemStorage>,
+    pub(crate) consumer_groups: AHashMap<u32, ConsumerGroup>,
     pub(crate) consumer_groups_ids: AHashMap<String, u32>,
     pub(crate) current_consumer_group_id: AtomicU32,
     pub(crate) current_partition_id: AtomicU32,
@@ -71,8 +73,8 @@ impl Topic {
         size_of_parent_stream: Arc<AtomicU64>,
         messages_count_of_parent_stream: Arc<AtomicU64>,
         segments_count_of_parent_stream: Arc<AtomicU32>,
-        config: Arc<SystemConfig>,
-        storage: Arc<SystemStorage>,
+        config: Rc<SystemConfig>,
+        storage: Rc<SystemStorage>,
     ) -> Topic {
         Topic::create(
             stream_id,
@@ -99,8 +101,8 @@ impl Topic {
         topic_id: u32,
         name: &str,
         partitions_count: u32,
-        config: Arc<SystemConfig>,
-        storage: Arc<SystemStorage>,
+        config: Rc<SystemConfig>,
+        storage: Rc<SystemStorage>,
         size_of_parent_stream: Arc<AtomicU64>,
         messages_count_of_parent_stream: Arc<AtomicU64>,
         segments_count_of_parent_stream: Arc<AtomicU32>,
diff --git a/core/server/src/streaming/utils/memory_pool.rs 
b/core/server/src/streaming/utils/memory_pool.rs
index 61962f0a..e05b7359 100644
--- a/core/server/src/streaming/utils/memory_pool.rs
+++ b/core/server/src/streaming/utils/memory_pool.rs
@@ -21,6 +21,7 @@ use bytes::BytesMut;
 use crossbeam::queue::ArrayQueue;
 use human_repr::HumanCount;
 use once_cell::sync::OnceCell;
+use std::rc::Rc;
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
 use tracing::{info, trace, warn};
@@ -153,7 +154,7 @@ impl MemoryPool {
     }
 
     /// Initialize the global pool from the given config.
-    pub fn init_pool(config: Arc<SystemConfig>) {
+    pub fn init_pool(config: Rc<SystemConfig>) {
         let is_enabled = config.memory_pool.enabled;
         let memory_limit = config.memory_pool.size.as_bytes_usize();
         let bucket_capacity = config.memory_pool.bucket_capacity as usize;
@@ -467,7 +468,7 @@ mod tests {
 
     fn initialize_pool_for_tests() {
         TEST_INIT.call_once(|| {
-            let config = Arc::new(SystemConfig {
+            let config = Rc::new(SystemConfig {
                 memory_pool: MemoryPoolConfig {
                     enabled: true,
                     size: IggyByteSize::from_str("4GiB").unwrap(),
diff --git a/core/server/src/tcp/connection_handler.rs 
b/core/server/src/tcp/connection_handler.rs
index ec61b23b..9c92d27d 100644
--- a/core/server/src/tcp/connection_handler.rs
+++ b/core/server/src/tcp/connection_handler.rs
@@ -19,27 +19,28 @@
 use crate::binary::command::ServerCommandHandler;
 use crate::binary::{command, sender::SenderKind};
 use crate::server_error::ConnectionError;
+use crate::shard::IggyShard;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use crate::tcp::connection_handler::command::ServerCommand;
+use bytes::BytesMut;
 use iggy_common::IggyError;
 use std::io::ErrorKind;
-use std::sync::Arc;
+use std::rc::Rc;
 use tracing::{debug, error, info};
 
 const INITIAL_BYTES_LENGTH: usize = 4;
 
 pub(crate) async fn handle_connection(
-    session: Arc<Session>,
+    session: &Rc<Session>,
     sender: &mut SenderKind,
-    system: SharedSystem,
+    shard: &Rc<IggyShard>,
 ) -> Result<(), ConnectionError> {
-    let mut length_buffer = [0u8; INITIAL_BYTES_LENGTH];
-    let mut code_buffer = [0u8; INITIAL_BYTES_LENGTH];
+    let length_buffer = BytesMut::with_capacity(INITIAL_BYTES_LENGTH);
+    let code_buffer = BytesMut::with_capacity(INITIAL_BYTES_LENGTH);
     loop {
-        let read_length = match sender.read(&mut length_buffer).await {
-            Ok(read_length) => read_length,
-            Err(error) => {
+        let (read_length, initial_buffer) = match 
sender.read(length_buffer.clone()).await {
+            (Ok(read_length), initial_buffer) => (read_length, initial_buffer),
+            (Err(error), _) => {
                 if error.as_code() == IggyError::ConnectionClosed.as_code() {
                     return Err(ConnectionError::from(error));
                 } else {
@@ -56,13 +57,18 @@ pub(crate) async fn handle_connection(
             continue;
         }
 
-        let length = u32::from_le_bytes(length_buffer);
-        sender.read(&mut code_buffer).await?;
-        let code = u32::from_le_bytes(code_buffer);
+        let initial_buffer = initial_buffer.freeze();
+        let length =
+            
u32::from_le_bytes(initial_buffer[0..INITIAL_BYTES_LENGTH].try_into().unwrap());
+        let (res, code_buffer) = sender.read(code_buffer.clone()).await;
+        let _ = res?;
+        let code_buffer = code_buffer.freeze();
+        let code: u32 =
+            
u32::from_le_bytes(code_buffer[0..INITIAL_BYTES_LENGTH].try_into().unwrap());
         debug!("Received a TCP request, length: {length}, code: {code}");
         let command = ServerCommand::from_code_and_reader(code, sender, length 
- 4).await?;
         debug!("Received a TCP command: {command}, payload size: {length}");
-        match command.handle(sender, length, &session, &system).await {
+        match command.handle(sender, length, session, shard).await {
             Ok(_) => {
                 debug!(
                     "Command was handled successfully, session: {session}. TCP 
response was sent."
diff --git a/core/server/src/tcp/sender.rs b/core/server/src/tcp/sender.rs
index 4dd11725..6a10adb2 100644
--- a/core/server/src/tcp/sender.rs
+++ b/core/server/src/tcp/sender.rs
@@ -16,25 +16,33 @@
  * under the License.
  */
 
+use bytes::BytesMut;
 use iggy_common::IggyError;
-use monoio::io::{AsyncReadRent, AsyncReadRentExt, AsyncWriteRent, 
AsyncWriteRentExt};
+use monoio::{
+    buf::IoBufMut,
+    io::{AsyncReadRent, AsyncReadRentExt, AsyncWriteRent, AsyncWriteRentExt},
+};
+use nix::libc;
 use std::io::IoSlice;
 use tracing::debug;
 
 const STATUS_OK: &[u8] = &[0; 4];
 
-pub(crate) async fn read<T>(stream: &mut T, buffer: &mut [u8]) -> 
Result<usize, IggyError>
+pub(crate) async fn read<T>(
+    stream: &mut T,
+    buffer: BytesMut,
+) -> (Result<usize, IggyError>, BytesMut)
 where
     T: AsyncReadRent + AsyncWriteRent + Unpin,
 {
     match stream.read_exact(buffer).await {
-        Ok(0) => Err(IggyError::ConnectionClosed),
-        Ok(read_bytes) => Ok(read_bytes),
-        Err(error) => {
+        (Ok(0), buffer) => (Err(IggyError::ConnectionClosed), buffer),
+        (Ok(read_bytes), buffer) => (Ok(read_bytes), buffer),
+        (Err(error), buffer) => {
             if error.kind() == std::io::ErrorKind::UnexpectedEof {
-                Err(IggyError::ConnectionClosed)
+                (Err(IggyError::ConnectionClosed), buffer)
             } else {
-                Err(IggyError::TcpError)
+                (Err(IggyError::TcpError), buffer)
             }
         }
     }
@@ -57,7 +65,7 @@ where
 pub(crate) async fn send_ok_response_vectored<T>(
     stream: &mut T,
     length: &[u8],
-    slices: Vec<IoSlice<'_>>,
+    slices: Vec<libc::iovec>,
 ) -> Result<(), IggyError>
 where
     T: AsyncReadRentExt + AsyncWriteRentExt + Unpin,
@@ -90,8 +98,9 @@ where
     );
     let length = (payload.len() as u32).to_le_bytes();
     stream
-        .write_all(&[status, &length, payload].as_slice().concat())
+        .write_all([status, &length, payload].concat())
         .await
+        .0
         .map_err(|_| IggyError::TcpError)?;
     debug!("Sent response with status: {:?}", status);
     Ok(())
@@ -101,7 +110,7 @@ pub(crate) async fn send_response_vectored<T>(
     stream: &mut T,
     status: &[u8],
     length: &[u8],
-    mut slices: Vec<IoSlice<'_>>,
+    mut slices: Vec<libc::iovec>,
 ) -> Result<(), IggyError>
 where
     T: AsyncReadRentExt + AsyncWriteRentExt + Unpin,
@@ -111,16 +120,22 @@ where
         slices.len(),
         status
     );
-    let prefix = [IoSlice::new(status), IoSlice::new(length)];
+    let prefix = [
+        libc::iovec {
+            iov_base: length.as_ptr() as _,
+            iov_len: length.len(),
+        },
+        libc::iovec {
+            iov_base: status.as_ptr() as _,
+            iov_len: status.len(),
+        },
+    ];
     slices.splice(0..0, prefix);
-    let mut slice_refs = slices.as_mut_slice();
-    while !slice_refs.is_empty() {
-        let bytes_written = stream
-            .write_vectored(slice_refs)
-            .await
-            .map_err(|_| IggyError::TcpError)?;
-        IoSlice::advance_slices(&mut slice_refs, bytes_written);
-    }
+    stream
+        .write_vectored_all(slices)
+        .await
+        .0
+        .map_err(|_| IggyError::TcpError)?;
     debug!("Sent response with status: {:?}", status);
     Ok(())
 }
diff --git a/core/server/src/tcp/tcp_listener.rs 
b/core/server/src/tcp/tcp_listener.rs
index 2e4f25be..ffa01eb1 100644
--- a/core/server/src/tcp/tcp_listener.rs
+++ b/core/server/src/tcp/tcp_listener.rs
@@ -18,39 +18,50 @@
 
 use crate::binary::sender::SenderKind;
 use crate::shard::IggyShard;
+use crate::shard::transmission::message::ShardEvent;
 use crate::streaming::clients::client_manager::Transport;
 use crate::tcp::connection_handler::{handle_connection, handle_error};
+use crate::tcp::tcp_socket;
 use std::net::SocketAddr;
 use std::rc::Rc;
-use monoio::net::TcpListener;
-use rustls::pki_types::Ipv4Addr;
-use tokio::net::TcpSocket;
-use tokio::sync::oneshot;
 use tracing::{error, info};
 
-pub async fn start(server_name: &str, shard: Rc<IggyShard>) {
-    let addr: SocketAddr = if shard.config.tcp.ipv6 {
-        shard.config.tcp.address.parse().expect("Unable to parse IPv6 address")
-    } else {
-        shard.config.tcp.address.parse().expect("Unable to parse IPv4 address")
-    };
+pub async fn start(server_name: &'static str, shard: Rc<IggyShard>) {
+    let ip_v6 = shard.config.tcp.ipv6;
+    let socket_config = &shard.config.tcp.socket;
+    let addr: SocketAddr = shard
+        .config
+        .tcp
+        .address
+        .parse()
+        .expect("Failed to parse TCP address");
+
+    let socket = tcp_socket::build(ip_v6, socket_config);
     monoio::spawn(async move {
-        let listener = TcpListener::bind(addr).expect(format!("Unable to start 
{server_name}.").as_ref());
+        socket
+            .bind(&addr.into())
+            .expect("Failed to bind eTCP listener");
+        socket.listen(1024);
+        let listener: std::net::TcpListener = socket.into();
+        let listener = monoio::net::TcpListener::from_std(listener).unwrap();
+        info!("{server_name} server has started on: {:?}", addr);
         loop {
             match listener.accept().await {
                 Ok((stream, address)) => {
                     let shard = shard.clone();
                     info!("Accepted new TCP connection: {address}");
-                    let session = shard
-                        .add_client(&address, Transport::Tcp);
+                    let session = shard.add_client(&address, Transport::Tcp);
+                    //TODO: Those can be shared with other shards.
+                    shard.add_active_session(session.clone());
+                    // Broadcast session to all shards.
+                    let event = 
Rc::new(ShardEvent::NewSession(session.clone()));
+                    shard.broadcast_event_to_all_shards(session.client_id, 
event);
 
-                    let client_id = session.client_id;
+                    let _client_id = session.client_id;
                     info!("Created new session: {session}");
                     let mut sender = SenderKind::get_tcp_sender(stream);
                     monoio::spawn(async move {
-                        if let Err(error) =
-                            handle_connection(session, &mut sender, 
shard.clone()).await
-                        {
+                        if let Err(error) = handle_connection(&session, &mut 
sender, &shard).await {
                             handle_error(error);
                             //TODO: Fixme
                             /*
diff --git a/core/server/src/tcp/tcp_sender.rs 
b/core/server/src/tcp/tcp_sender.rs
index a157dead..81089c85 100644
--- a/core/server/src/tcp/tcp_sender.rs
+++ b/core/server/src/tcp/tcp_sender.rs
@@ -19,10 +19,13 @@
 use crate::binary::sender::Sender;
 use crate::tcp::COMPONENT;
 use crate::{server_error::ServerError, tcp::sender};
+use bytes::BytesMut;
 use error_set::ErrContext;
 use iggy_common::IggyError;
-use tokio::{io::AsyncWriteExt};
+use monoio::buf::IoBufMut;
+use monoio::io::AsyncWriteRent;
 use monoio::net::TcpStream;
+use nix::libc;
 
 #[derive(Debug)]
 pub struct TcpSender {
@@ -30,7 +33,7 @@ pub struct TcpSender {
 }
 
 impl Sender for TcpSender {
-    async fn read(&mut self, buffer: &mut [u8]) -> Result<usize, IggyError> {
+    async fn read(&mut self, buffer: BytesMut) -> (Result<usize, IggyError>, 
BytesMut) {
         sender::read(&mut self.stream, buffer).await
     }
 
@@ -59,7 +62,7 @@ impl Sender for TcpSender {
     async fn send_ok_response_vectored(
         &mut self,
         length: &[u8],
-        slices: Vec<std::io::IoSlice<'_>>,
+        slices: Vec<libc::iovec>,
     ) -> Result<(), IggyError> {
         sender::send_ok_response_vectored(&mut self.stream, length, 
slices).await
     }
diff --git a/core/server/src/tcp/tcp_server.rs 
b/core/server/src/tcp/tcp_server.rs
index 82bbe0c1..9b8e1c04 100644
--- a/core/server/src/tcp/tcp_server.rs
+++ b/core/server/src/tcp/tcp_server.rs
@@ -17,9 +17,9 @@
  */
 
 use crate::shard::IggyShard;
-use crate::tcp::{tcp_listener};
-use std::rc::Rc;
+use crate::tcp::tcp_listener;
 use iggy_common::IggyError;
+use std::rc::Rc;
 use tracing::info;
 
 /// Starts the TCP server.
diff --git a/core/server/src/tcp/tcp_socket.rs 
b/core/server/src/tcp/tcp_socket.rs
index 69c1d408..747540fa 100644
--- a/core/server/src/tcp/tcp_socket.rs
+++ b/core/server/src/tcp/tcp_socket.rs
@@ -16,16 +16,18 @@
  * under the License.
  */
 
+use socket2::{Domain, Protocol, Socket, Type};
 use std::num::TryFromIntError;
 
-
 use crate::configs::tcp::TcpSocketConfig;
 
-pub fn build(ipv6: bool, config: TcpSocketConfig) -> TcpSocket {
+pub fn build(ipv6: bool, config: &TcpSocketConfig) -> Socket {
     let socket = if ipv6 {
-        TcpSocket::new_v6().expect("Unable to create an ipv6 socket")
+        Socket::new(Domain::IPV6, Type::STREAM, Some(Protocol::TCP))
+            .expect("Unable to create an ipv6 socket")
     } else {
-        TcpSocket::new_v4().expect("Unable to create an ipv4 socket")
+        Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP))
+            .expect("Unable to create an ipv4 socket")
     };
 
     if config.override_defaults {
@@ -34,7 +36,7 @@ pub fn build(ipv6: bool, config: TcpSocketConfig) -> 
TcpSocket {
             .as_bytes_u64()
             .try_into()
             .map_err(|e: TryFromIntError| std::io::Error::other(e.to_string()))
-            .and_then(|size: u32| socket.set_recv_buffer_size(size))
+            .and_then(|size| socket.set_recv_buffer_size(size))
             .expect("Unable to set SO_RCVBUF on socket");
         config
             .send_buffer_size
@@ -52,6 +54,12 @@ pub fn build(ipv6: bool, config: TcpSocketConfig) -> 
TcpSocket {
         socket
             .set_linger(Some(config.linger.get_duration()))
             .expect("Unable to set SO_LINGER on socket");
+        socket
+            .set_reuse_address(true)
+            .expect("Unable to set SO_REUSEADDR on socket");
+        socket
+            .set_reuse_port(true)
+            .expect("Unable to set SO_REUSEPORT on socket");
     }
 
     socket
@@ -77,9 +85,9 @@ mod tests {
             nodelay: true,
             linger: IggyDuration::new(linger_dur),
         };
-        let socket = build(false, config);
-        assert!(socket.recv_buffer_size().unwrap() >= buffer_size as u32);
-        assert!(socket.send_buffer_size().unwrap() >= buffer_size as u32);
+        let socket = build(false, &config);
+        assert!(socket.recv_buffer_size().unwrap() >= buffer_size as usize);
+        assert!(socket.send_buffer_size().unwrap() >= buffer_size as usize);
         assert!(socket.keepalive().unwrap());
         assert!(socket.nodelay().unwrap());
         assert_eq!(socket.linger().unwrap(), Some(linger_dur));
diff --git a/core/server/src/tcp/tcp_tls_listener.rs 
b/core/server/src/tcp/tcp_tls_listener.rs
index d2bbf8da..53df4b0b 100644
--- a/core/server/src/tcp/tcp_tls_listener.rs
+++ b/core/server/src/tcp/tcp_tls_listener.rs
@@ -18,26 +18,30 @@
 
 use crate::binary::sender::SenderKind;
 use crate::configs::tcp::TcpTlsConfig;
+use crate::shard::IggyShard;
 use crate::streaming::clients::client_manager::Transport;
-use crate::streaming::systems::system::SharedSystem;
 use crate::tcp::connection_handler::{handle_connection, handle_error};
+use monoio_native_tls::TlsAcceptor;
 use rustls::ServerConfig;
 use rustls::pki_types::{CertificateDer, PrivateKeyDer};
 use rustls_pemfile::{certs, private_key};
 use std::io::BufReader;
 use std::net::SocketAddr;
+use std::rc::Rc;
 use std::sync::Arc;
 use tokio::net::TcpSocket;
 use tokio::sync::oneshot;
-use tokio_rustls::{TlsAcceptor, rustls};
 use tracing::{error, info};
 
 pub(crate) async fn start(
     address: &str,
     config: TcpTlsConfig,
     socket: TcpSocket,
-    system: SharedSystem,
+    system: Rc<IggyShard>,
 ) -> SocketAddr {
+    //TODO: Fixme
+    todo!();
+    /*
     let address = address.to_string();
     let (tx, rx) = oneshot::channel();
     tokio::spawn(async move {
@@ -131,6 +135,7 @@ pub(crate) async fn start(
         Ok(addr) => addr,
         Err(_) => panic!("Failed to get the local address for TCP TLS 
listener."),
     }
+    */
 }
 
 fn generate_self_signed_cert()
diff --git a/core/server/src/tcp/tcp_tls_sender.rs 
b/core/server/src/tcp/tcp_tls_sender.rs
index 6ba62315..7213ad64 100644
--- a/core/server/src/tcp/tcp_tls_sender.rs
+++ b/core/server/src/tcp/tcp_tls_sender.rs
@@ -19,10 +19,13 @@
 use crate::binary::sender::Sender;
 use crate::tcp::COMPONENT;
 use crate::{server_error::ServerError, tcp::sender};
+use bytes::BytesMut;
 use error_set::ErrContext;
 use iggy_common::IggyError;
+use monoio::io::AsyncWriteRent;
 use monoio::net::TcpStream;
 use monoio_native_tls::TlsStream;
+use nix::libc;
 
 #[derive(Debug)]
 pub struct TcpTlsSender {
@@ -30,7 +33,7 @@ pub struct TcpTlsSender {
 }
 
 impl Sender for TcpTlsSender {
-    async fn read(&mut self, buffer: &mut [u8]) -> Result<usize, IggyError> {
+    async fn read(&mut self, buffer: BytesMut) -> (Result<usize, IggyError>, 
BytesMut) {
         sender::read(&mut self.stream, buffer).await
     }
 
@@ -59,7 +62,7 @@ impl Sender for TcpTlsSender {
     async fn send_ok_response_vectored(
         &mut self,
         length: &[u8],
-        slices: Vec<std::io::IoSlice<'_>>,
+        slices: Vec<libc::iovec>,
     ) -> Result<(), IggyError> {
         sender::send_ok_response_vectored(&mut self.stream, length, 
slices).await
     }

Reply via email to