This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch rebase_master
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 81c29b1b7276d684a88a5d56157a33fb8602df5f
Author: numinex <[email protected]>
AuthorDate: Fri Jun 27 12:26:28 2025 +0200

    finish handlers
---
 Cargo.lock                                         | 16 ---------
 .../create_consumer_group_handler.rs               | 28 +++++++++++-----
 .../delete_consumer_group_handler.rs               | 13 ++++----
 .../consumer_groups/get_consumer_group_handler.rs  | 23 +++++++++----
 .../consumer_groups/get_consumer_groups_handler.rs | 12 +++----
 .../consumer_groups/join_consumer_group_handler.rs | 11 +++----
 .../leave_consumer_group_handler.rs                | 13 ++++----
 .../delete_consumer_offset_handler.rs              | 10 +++---
 .../get_consumer_offset_handler.rs                 | 10 +++---
 .../store_consumer_offset_handler.rs               | 11 ++++---
 .../messages/flush_unsaved_buffer_handler.rs       | 10 +++---
 .../handlers/messages/poll_messages_handler.rs     | 23 +++++++------
 .../handlers/messages/send_messages_handler.rs     | 38 +++++++++++++---------
 .../partitions/create_partitions_handler.rs        | 13 ++++----
 .../partitions/delete_partitions_handler.rs        | 13 ++++----
 .../create_personal_access_token_handler.rs        | 14 ++++----
 .../delete_personal_access_token_handler.rs        | 14 ++++----
 .../get_personal_access_tokens_handler.rs          | 13 ++++----
 .../login_with_personal_access_token_handler.rs    | 11 +++----
 .../handlers/segments/delete_segments_handler.rs   | 13 ++++----
 .../handlers/streams/create_stream_handler.rs      | 21 +++++++-----
 .../handlers/streams/delete_stream_handler.rs      | 13 ++++----
 .../binary/handlers/streams/get_stream_handler.rs  | 12 +++----
 .../binary/handlers/streams/get_streams_handler.rs | 12 +++----
 .../handlers/streams/purge_stream_handler.rs       | 12 +++----
 .../handlers/streams/update_stream_handler.rs      | 13 ++++----
 .../binary/handlers/system/get_client_handler.rs   | 12 +++----
 .../binary/handlers/system/get_clients_handler.rs  | 13 ++++----
 .../src/binary/handlers/system/get_me_handler.rs   | 13 +++-----
 .../src/binary/handlers/system/get_snapshot.rs     | 10 +++---
 .../binary/handlers/system/get_stats_handler.rs    | 10 +++---
 .../src/binary/handlers/system/ping_handler.rs     | 17 +++++-----
 .../binary/handlers/topics/create_topic_handler.rs | 29 ++++++++++++-----
 .../binary/handlers/topics/delete_topic_handler.rs | 13 ++++----
 .../binary/handlers/topics/get_topic_handler.rs    | 19 ++++++++---
 .../binary/handlers/topics/get_topics_handler.rs   | 21 ++++++++----
 .../binary/handlers/topics/purge_topic_handler.rs  | 12 +++----
 .../binary/handlers/topics/update_topic_handler.rs | 26 ++++++++++-----
 .../handlers/users/change_password_handler.rs      | 16 ++++-----
 .../binary/handlers/users/create_user_handler.rs   | 16 ++++-----
 .../binary/handlers/users/delete_user_handler.rs   | 17 +++++-----
 .../src/binary/handlers/users/get_user_handler.rs  | 13 ++++----
 .../src/binary/handlers/users/get_users_handler.rs | 20 +++++-------
 .../binary/handlers/users/login_user_handler.rs    | 14 ++++----
 .../binary/handlers/users/logout_user_handler.rs   | 20 +++++-------
 .../handlers/users/update_permissions_handler.rs   | 15 ++++-----
 .../binary/handlers/users/update_user_handler.rs   | 15 ++++-----
 core/server/src/binary/mapper.rs                   | 22 +++++++------
 core/server/src/binary/sender.rs                   | 23 ++++++++-----
 core/server/src/lib.rs                             |  9 +++++
 core/server/src/quic/quic_sender.rs                |  3 +-
 core/server/src/shard/system/clients.rs            |  4 +--
 core/server/src/shard/system/consumer_groups.rs    | 10 ++----
 core/server/src/shard/system/messages.rs           |  4 ---
 .../src/shard/system/personal_access_tokens.rs     |  8 ++---
 core/server/src/shard/system/snapshot/mod.rs       |  5 +++
 core/server/src/shard/system/streams.rs            |  6 ++--
 core/server/src/shard/system/topics.rs             | 15 +++------
 .../server/src/streaming/clients/client_manager.rs |  8 +++--
 .../server/src/streaming/topics/consumer_groups.rs |  5 ++-
 core/server/src/streaming/utils/pooled_buffer.rs   | 15 +++++++++
 core/server/src/tcp/sender.rs                      |  7 ++--
 core/server/src/tcp/tcp_sender.rs                  |  2 +-
 core/server/src/tcp/tcp_tls_sender.rs              |  3 +-
 64 files changed, 467 insertions(+), 415 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 3e51b287..19be4ecb 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -143,7 +143,6 @@ dependencies = [
  "futures-core",
  "futures-util",
  "mio 1.0.4",
- "mio 1.0.4",
  "socket2",
  "tokio",
  "tracing",
@@ -653,7 +652,6 @@ dependencies = [
  "proc-macro2",
  "quote",
  "syn 2.0.104",
- "syn 2.0.104",
 ]
 
 [[package]]
@@ -4806,18 +4804,6 @@ dependencies = [
  "windows-sys 0.48.0",
 ]
 
-[[package]]
-name = "mio"
-version = "0.8.11"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c"
-dependencies = [
- "libc",
- "log",
- "wasi 0.11.1+wasi-snapshot-preview1",
- "windows-sys 0.48.0",
-]
-
 [[package]]
 name = "mio"
 version = "1.0.4"
@@ -4916,7 +4902,6 @@ dependencies = [
  "proc-macro2",
  "quote",
  "syn 2.0.104",
- "syn 2.0.104",
 ]
 
 [[package]]
@@ -7643,7 +7628,6 @@ dependencies = [
  "bytes",
  "libc",
  "mio 1.0.4",
- "mio 1.0.4",
  "parking_lot 0.12.4",
  "pin-project-lite",
  "signal-hook-registry",
diff --git 
a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
 
b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
index 5d695687..266b46a6 100644
--- 
a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
@@ -16,8 +16,6 @@
  * under the License.
  */
 
-use std::rc::Rc;
-
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
@@ -30,6 +28,7 @@ use anyhow::Result;
 use error_set::ErrContext;
 use iggy_common::IggyError;
 use iggy_common::create_consumer_group::CreateConsumerGroup;
+use std::rc::Rc;
 use tracing::{debug, instrument};
 
 impl ServerCommandHandler for CreateConsumerGroup {
@@ -47,7 +46,7 @@ impl ServerCommandHandler for CreateConsumerGroup {
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
 
-        let consumer_group = shard
+        let consumer_group_id = shard
                 .create_consumer_group(
                     session,
                     &self.stream_id,
@@ -55,23 +54,34 @@ impl ServerCommandHandler for CreateConsumerGroup {
                     self.group_id,
                     &self.name,
                 )
-                .await
                 .with_error_context(|error| {
                     format!(
                         "{COMPONENT} (error: {error}) - failed to create 
consumer group for stream_id: {}, topic_id: {}, group_id: {:?}, session: 
{session}",
                         self.stream_id, self.topic_id, self.group_id
                     )
                 })?;
-        let consumer_group = consumer_group.read().await;
+            
+        let stream = shard.find_stream(session, &self.stream_id)
+            .with_error_context(|error| {
+                format!(
+                    "{COMPONENT} (error: {error}) - failed to find stream with 
ID: {} for session: {}",
+                    self.stream_id, session
+                )
+            })?;
+        let consumer_group = shard.get_consumer_group(session, &stream, 
&self.topic_id, &consumer_group_id)
+            .with_error_context(|error| {
+                format!(
+                    "{COMPONENT} (error: {error}) - failed to get consumer 
group for stream_id: {}, topic_id: {}, group_id: {:?}, session: {session}",
+                    self.stream_id, self.topic_id, consumer_group_id
+                )
+            })?.unwrap();
         let group_id = consumer_group.group_id;
-        let response = mapper::map_consumer_group(&consumer_group).await;
-        drop(consumer_group);
+        let response = mapper::map_consumer_group(&consumer_group);
 
-        let system = system.downgrade();
         let stream_id = self.stream_id.clone();
         let topic_id = self.topic_id.clone();
 
-        system
+        shard
             .state
         .apply(
             session.get_user_id(),
diff --git 
a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
 
b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
index 828d999f..b674baf1 100644
--- 
a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
@@ -19,13 +19,14 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::consumer_groups::COMPONENT, sender::SenderKind};
+use crate::shard::IggyShard;
 use crate::state::command::EntryCommand;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use anyhow::Result;
 use error_set::ErrContext;
 use iggy_common::IggyError;
 use iggy_common::delete_consumer_group::DeleteConsumerGroup;
+use std::rc::Rc;
 use tracing::{debug, instrument};
 
 impl ServerCommandHandler for DeleteConsumerGroup {
@@ -38,13 +39,12 @@ impl ServerCommandHandler for DeleteConsumerGroup {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
 
-        let mut system = system.write().await;
-        system
+        shard
                 .delete_consumer_group(
                     session,
                     &self.stream_id,
@@ -56,12 +56,11 @@ impl ServerCommandHandler for DeleteConsumerGroup {
                     self.group_id, self.topic_id, self.stream_id, session
                 ))?;
 
-        let system = system.downgrade();
         let stream_id = self.stream_id.clone();
         let topic_id = self.topic_id.clone();
         let group_id = self.group_id.clone();
 
-        system
+        shard
             .state
             .apply(
                 session.get_user_id(),
diff --git 
a/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs 
b/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs
index 694ac38c..8f67192b 100644
--- 
a/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs
@@ -20,13 +20,17 @@ use crate::binary::command::{BinaryServerCommand, 
ServerCommand, ServerCommandHa
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::binary::sender::SenderKind;
+use crate::shard::IggyShard;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use anyhow::Result;
+use error_set::ErrContext;
 use iggy_common::IggyError;
 use iggy_common::get_consumer_group::GetConsumerGroup;
+use std::rc::Rc;
 use tracing::debug;
 
+use super::COMPONENT;
+
 impl ServerCommandHandler for GetConsumerGroup {
     fn code(&self) -> u32 {
         iggy_common::GET_CONSUMER_GROUP_CODE
@@ -36,14 +40,20 @@ impl ServerCommandHandler for GetConsumerGroup {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
 
-        let system = system.read().await;
+        let stream_id = &self.stream_id;
+        let stream = shard.find_stream(session, &self.stream_id)
+            .with_error_context(|error| {
+                format!(
+                    "{COMPONENT} (error: {error}) - failed to get stream for 
stream_id: {stream_id}"
+                )
+            })?;
         let Ok(consumer_group) =
-            system.get_consumer_group(session, &self.stream_id, 
&self.topic_id, &self.group_id)
+            shard.get_consumer_group(session, &stream, &self.topic_id, 
&self.group_id)
         else {
             sender.send_empty_ok_response().await?;
             return Ok(());
@@ -53,8 +63,7 @@ impl ServerCommandHandler for GetConsumerGroup {
             return Ok(());
         };
 
-        let consumer_group = consumer_group.read().await;
-        let consumer_group = mapper::map_consumer_group(&consumer_group).await;
+        let consumer_group = mapper::map_consumer_group(&consumer_group);
         sender.send_ok_response(&consumer_group).await?;
         Ok(())
     }
diff --git 
a/core/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs
 
b/core/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs
index 7ff6389f..bc6b60b4 100644
--- 
a/core/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs
@@ -21,12 +21,13 @@ use crate::binary::handlers::consumer_groups::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::binary::sender::SenderKind;
+use crate::shard::IggyShard;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use anyhow::Result;
 use error_set::ErrContext;
 use iggy_common::IggyError;
 use iggy_common::get_consumer_groups::GetConsumerGroups;
+use std::rc::Rc;
 use tracing::debug;
 
 impl ServerCommandHandler for GetConsumerGroups {
@@ -38,12 +39,11 @@ impl ServerCommandHandler for GetConsumerGroups {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
-        let system = system.read().await;
-        let consumer_groups = system
+        let consumer_groups = shard
             .get_consumer_groups(session, &self.stream_id, &self.topic_id)
             .with_error_context(|error| {
                 format!(
@@ -51,7 +51,7 @@ impl ServerCommandHandler for GetConsumerGroups {
                     self.stream_id, self.topic_id, session
                 )
             })?;
-        let consumer_groups = 
mapper::map_consumer_groups(&consumer_groups).await;
+        let consumer_groups = mapper::map_consumer_groups(consumer_groups);
         sender.send_ok_response(&consumer_groups).await?;
         Ok(())
     }
diff --git 
a/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
 
b/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
index 3d01d41a..f72609c3 100644
--- 
a/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
@@ -19,12 +19,13 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::consumer_groups::COMPONENT, sender::SenderKind};
+use crate::shard::IggyShard;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use anyhow::Result;
 use error_set::ErrContext;
 use iggy_common::IggyError;
 use iggy_common::join_consumer_group::JoinConsumerGroup;
+use std::rc::Rc;
 use tracing::{debug, instrument};
 
 impl ServerCommandHandler for JoinConsumerGroup {
@@ -37,19 +38,17 @@ impl ServerCommandHandler for JoinConsumerGroup {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
-        let system = system.read().await;
-        system
+        shard
             .join_consumer_group(
                 session,
                 &self.stream_id,
                 &self.topic_id,
                 &self.group_id,
             )
-            .await
             .with_error_context(|error| {
                 format!(
                     "{COMPONENT} (error: {error}) - failed to join consumer 
group for stream_id: {}, topic_id: {}, group_id: {}, session: {}",
diff --git 
a/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
 
b/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
index e094531d..ba352bf9 100644
--- 
a/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
@@ -18,14 +18,16 @@
 
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
-use crate::binary::{handlers::consumer_groups::COMPONENT, sender::SenderKind};
+use crate::binary::sender::SenderKind;
+use crate::shard::IggyShard;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use anyhow::Result;
 use error_set::ErrContext;
 use iggy_common::IggyError;
 use iggy_common::leave_consumer_group::LeaveConsumerGroup;
+use std::rc::Rc;
 use tracing::{debug, instrument};
+use super::COMPONENT;
 
 impl ServerCommandHandler for LeaveConsumerGroup {
     fn code(&self) -> u32 {
@@ -37,13 +39,12 @@ impl ServerCommandHandler for LeaveConsumerGroup {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
 
-        let system = system.read().await;
-        system
+        shard
             .leave_consumer_group(
                 session,
                 &self.stream_id,
diff --git 
a/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs
 
b/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs
index f639223d..96c1e43c 100644
--- 
a/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs
@@ -20,12 +20,13 @@ use crate::binary::command::{BinaryServerCommand, 
ServerCommand, ServerCommandHa
 use crate::binary::handlers::consumer_offsets::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::sender::SenderKind;
+use crate::shard::IggyShard;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use anyhow::Result;
 use error_set::ErrContext;
 use iggy_common::IggyError;
 use iggy_common::delete_consumer_offset::DeleteConsumerOffset;
+use std::rc::Rc;
 use tracing::debug;
 
 impl ServerCommandHandler for DeleteConsumerOffset {
@@ -37,12 +38,11 @@ impl ServerCommandHandler for DeleteConsumerOffset {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
-        let system = system.read().await;
-        system
+        shard
             .delete_consumer_offset(
                 session,
                 self.consumer,
diff --git 
a/core/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs
 
b/core/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs
index dbe119ff..04c89852 100644
--- 
a/core/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs
@@ -20,11 +20,12 @@ use crate::binary::command::{BinaryServerCommand, 
ServerCommand, ServerCommandHa
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::binary::sender::SenderKind;
+use crate::shard::IggyShard;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use anyhow::Result;
 use iggy_common::IggyError;
 use iggy_common::get_consumer_offset::GetConsumerOffset;
+use std::rc::Rc;
 use tracing::debug;
 
 impl ServerCommandHandler for GetConsumerOffset {
@@ -36,12 +37,11 @@ impl ServerCommandHandler for GetConsumerOffset {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
-        let system = system.read().await;
-        let Ok(offset) = system
+        let Ok(offset) = shard
             .get_consumer_offset(
                 session,
                 &self.consumer,
diff --git 
a/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs
 
b/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs
index c0343bcb..db770e4a 100644
--- 
a/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs
@@ -16,12 +16,14 @@
  * under the License.
  */
 
+use std::rc::Rc;
+
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::consumer_offsets::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::sender::SenderKind;
+use crate::shard::IggyShard;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use anyhow::Result;
 use error_set::ErrContext;
 use iggy_common::IggyError;
@@ -37,12 +39,11 @@ impl ServerCommandHandler for StoreConsumerOffset {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
-        let system = system.read().await;
-        system
+        shard
             .store_consumer_offset(
                 session,
                 self.consumer,
diff --git 
a/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs 
b/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs
index 5a50a50d..45bc9db6 100644
--- a/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs
+++ b/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs
@@ -19,11 +19,12 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::messages::COMPONENT, sender::SenderKind};
+use crate::shard::IggyShard;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use anyhow::Result;
 use error_set::ErrContext;
 use iggy_common::{FlushUnsavedBuffer, IggyError};
+use std::rc::Rc;
 use tracing::{debug, instrument};
 
 impl ServerCommandHandler for FlushUnsavedBuffer {
@@ -36,17 +37,16 @@ impl ServerCommandHandler for FlushUnsavedBuffer {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
 
-        let system = system.read().await;
         let stream_id = self.stream_id.clone();
         let topic_id = self.topic_id.clone();
         let partition_id = self.partition_id;
         let fsync = self.fsync;
-        system
+        shard
             .flush_unsaved_buffer(session, stream_id, topic_id, partition_id, 
fsync)
             .await
             .with_error_context(|error| {
diff --git a/core/server/src/binary/handlers/messages/poll_messages_handler.rs 
b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
index b0a2ccea..a591f691 100644
--- a/core/server/src/binary/handlers/messages/poll_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
@@ -20,13 +20,15 @@ use crate::binary::command::{BinaryServerCommand, 
ServerCommand, ServerCommandHa
 use crate::binary::handlers::messages::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::sender::SenderKind;
+use crate::shard::system::messages::PollingArgs;
+use crate::shard::IggyShard;
 use crate::streaming::session::Session;
-use crate::streaming::systems::messages::PollingArgs;
-use crate::streaming::systems::system::SharedSystem;
+use crate::to_iovec;
 use anyhow::Result;
 use error_set::ErrContext;
 use iggy_common::{IggyError, PollMessages};
 use std::io::IoSlice;
+use std::rc::Rc;
 use tracing::{debug, trace};
 
 #[derive(Debug)]
@@ -53,13 +55,12 @@ impl ServerCommandHandler for PollMessages {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
 
-        let system = system.read().await;
-        let (metadata, messages) = system
+        let (metadata, messages) = shard
             .poll_messages(
                 session,
                 &self.consumer,
@@ -73,7 +74,6 @@ impl ServerCommandHandler for PollMessages {
                 "{COMPONENT} (error: {error}) - failed to poll messages for 
consumer: {}, stream_id: {}, topic_id: {}, partition_id: {:?}, session: 
{session}.",
                 self.consumer, self.stream_id, self.topic_id, self.partition_id
             ))?;
-        drop(system);
 
         // Collect all chunks first into a Vec to extend their lifetimes.
         // This ensures the Bytes (in reality Arc<[u8]>) references from each 
IggyMessagesBatch stay alive
@@ -89,12 +89,11 @@ impl ServerCommandHandler for PollMessages {
         let count = messages.count().to_le_bytes();
 
         let mut io_slices = Vec::with_capacity(messages.containers_count() + 
3);
-        io_slices.push(IoSlice::new(&partition_id));
-        io_slices.push(IoSlice::new(&current_offset));
-        io_slices.push(IoSlice::new(&count));
-
-        io_slices.extend(messages.iter().map(|m| IoSlice::new(m)));
+        io_slices.push(to_iovec(&partition_id));
+        io_slices.push(to_iovec(&current_offset));
+        io_slices.push(to_iovec(&count));
 
+        io_slices.extend(messages.iter().map(|m| to_iovec(&m)));
         trace!(
             "Sending {} messages to client ({} bytes) to client",
             messages.count(),
diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs 
b/core/server/src/binary/handlers/messages/send_messages_handler.rs
index 17821a16..5603dfe9 100644
--- a/core/server/src/binary/handlers/messages/send_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs
@@ -18,15 +18,17 @@
 
 use crate::binary::command::{BinaryServerCommand, ServerCommandHandler};
 use crate::binary::sender::SenderKind;
+use crate::shard::IggyShard;
 use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut};
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use crate::streaming::utils::PooledBuffer;
 use anyhow::Result;
+use bytes::BytesMut;
 use iggy_common::INDEX_SIZE;
 use iggy_common::Identifier;
 use iggy_common::Sizeable;
 use iggy_common::{IggyError, Partitioning, SendMessages, Validatable};
+use std::rc::Rc;
 use tracing::instrument;
 
 impl ServerCommandHandler for SendMessages {
@@ -45,36 +47,40 @@ impl ServerCommandHandler for SendMessages {
         mut self,
         sender: &mut SenderKind,
         length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         let total_payload_size = length as usize - std::mem::size_of::<u32>();
         let metadata_len_field_size = std::mem::size_of::<u32>();
 
-        let mut metadata_length_buffer = [0u8; 4];
-        sender.read(&mut metadata_length_buffer).await?;
-        let metadata_size = u32::from_le_bytes(metadata_length_buffer);
+        let mut metadata_length_buffer = BytesMut::with_capacity(4);
+        unsafe { metadata_length_buffer.set_len(4) };
+        let (result, metadata_len_buf) = 
sender.read(metadata_length_buffer).await;
+        result?;
+        let metadata_len_buf = metadata_len_buf.freeze();
+        let metadata_size = 
u32::from_le_bytes(metadata_len_buf[..].try_into().unwrap());
 
         let mut metadata_buffer = PooledBuffer::with_capacity(metadata_size as 
usize);
         unsafe { metadata_buffer.set_len(metadata_size as usize) };
-        sender.read(&mut metadata_buffer).await?;
+        let (result, metadata_buf) = sender.read(metadata_buffer).await;
+        result?;
 
         let mut element_size = 0;
 
-        let stream_id = Identifier::from_raw_bytes(&metadata_buffer)?;
+        let stream_id = Identifier::from_raw_bytes(&metadata_buf)?;
         element_size += stream_id.get_size_bytes().as_bytes_usize();
         self.stream_id = stream_id;
 
-        let topic_id = 
Identifier::from_raw_bytes(&metadata_buffer[element_size..])?;
+        let topic_id = 
Identifier::from_raw_bytes(&metadata_buf[element_size..])?;
         element_size += topic_id.get_size_bytes().as_bytes_usize();
         self.topic_id = topic_id;
 
-        let partitioning = 
Partitioning::from_raw_bytes(&metadata_buffer[element_size..])?;
+        let partitioning = 
Partitioning::from_raw_bytes(&metadata_buf[element_size..])?;
         element_size += partitioning.get_size_bytes().as_bytes_usize();
         self.partitioning = partitioning;
 
         let messages_count = u32::from_le_bytes(
-            metadata_buffer[element_size..element_size + 4]
+            metadata_buf[element_size..element_size + 4]
                 .try_into()
                 .unwrap(),
         );
@@ -82,13 +88,15 @@ impl ServerCommandHandler for SendMessages {
 
         let mut indexes_buffer = PooledBuffer::with_capacity(indexes_size);
         unsafe { indexes_buffer.set_len(indexes_size) };
-        sender.read(&mut indexes_buffer).await?;
+        let (result, indexes_buffer) = sender.read(indexes_buffer).await;
+        result?;
 
         let messages_size =
             total_payload_size - metadata_size as usize - indexes_size - 
metadata_len_field_size;
         let mut messages_buffer = PooledBuffer::with_capacity(messages_size);
         unsafe { messages_buffer.set_len(messages_size) };
-        sender.read(&mut messages_buffer).await?;
+        let (result, messages_buffer) = sender.read(messages_buffer).await;
+        result?;
 
         let indexes = IggyIndexesMut::from_bytes(indexes_buffer, 0);
         let batch = IggyMessagesBatchMut::from_indexes_and_messages(
@@ -99,8 +107,7 @@ impl ServerCommandHandler for SendMessages {
 
         batch.validate()?;
 
-        let system = system.read().await;
-        system
+        shard
             .append_messages(
                 session,
                 &self.stream_id,
@@ -110,7 +117,6 @@ impl ServerCommandHandler for SendMessages {
                 None,
             )
             .await?;
-        drop(system);
 
         sender.send_empty_ok_response().await?;
         Ok(())
diff --git 
a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs 
b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
index 895c83b2..5045dbfb 100644
--- a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
+++ b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
@@ -19,13 +19,14 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::partitions::COMPONENT, sender::SenderKind};
+use crate::shard::IggyShard;
 use crate::state::command::EntryCommand;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use anyhow::Result;
 use error_set::ErrContext;
 use iggy_common::IggyError;
 use iggy_common::create_partitions::CreatePartitions;
+use std::rc::Rc;
 use tracing::{debug, instrument};
 
 impl ServerCommandHandler for CreatePartitions {
@@ -38,13 +39,12 @@ impl ServerCommandHandler for CreatePartitions {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
 
-        let mut system = system.write().await;
-        system
+        shard
             .create_partitions(
                 session,
                 &self.stream_id,
@@ -59,11 +59,10 @@ impl ServerCommandHandler for CreatePartitions {
                 )
             })?;
 
-        let system = system.downgrade();
         let stream_id = self.stream_id.clone();
         let topic_id = self.topic_id.clone();
 
-        system
+        shard
         .state
         .apply(
             session.get_user_id(),
diff --git 
a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs 
b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
index c6351403..952b6b64 100644
--- a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
+++ b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
@@ -19,13 +19,14 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::partitions::COMPONENT, sender::SenderKind};
+use crate::shard::IggyShard;
 use crate::state::command::EntryCommand;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use anyhow::Result;
 use error_set::ErrContext;
 use iggy_common::IggyError;
 use iggy_common::delete_partitions::DeletePartitions;
+use std::rc::Rc;
 use tracing::{debug, instrument};
 
 impl ServerCommandHandler for DeletePartitions {
@@ -38,15 +39,14 @@ impl ServerCommandHandler for DeletePartitions {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
         let stream_id = self.stream_id.clone();
         let topic_id = self.topic_id.clone();
 
-        let mut system = system.write().await;
-        system
+        shard
             .delete_partitions(
                 session,
                 &self.stream_id,
@@ -60,8 +60,7 @@ impl ServerCommandHandler for DeletePartitions {
                 )
             })?;
 
-        let system = system.downgrade();
-        system
+        shard
         .state
         .apply(
             session.get_user_id(),
diff --git 
a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
 
b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
index b43103f3..c28ac77b 100644
--- 
a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
+++ 
b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
@@ -20,15 +20,16 @@ use crate::binary::command::{BinaryServerCommand, 
ServerCommand, ServerCommandHa
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::binary::{handlers::personal_access_tokens::COMPONENT, 
sender::SenderKind};
+use crate::shard::IggyShard;
 use crate::state::command::EntryCommand;
 use crate::state::models::CreatePersonalAccessTokenWithHash;
 use 
crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use anyhow::Result;
 use error_set::ErrContext;
 use iggy_common::IggyError;
 use iggy_common::create_personal_access_token::CreatePersonalAccessToken;
+use std::rc::Rc;
 use tracing::{debug, instrument};
 
 impl ServerCommandHandler for CreatePersonalAccessToken {
@@ -41,15 +42,13 @@ impl ServerCommandHandler for CreatePersonalAccessToken {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
 
-        let system = system.write().await;
-        let token = system
+        let token = shard
                 .create_personal_access_token(session, &self.name, self.expiry)
-                .await
                 .with_error_context(|error| {
                     format!(
                         "{COMPONENT} (error: {error}) - failed to create 
personal access token with name: {}, session: {session}",
@@ -59,8 +58,7 @@ impl ServerCommandHandler for CreatePersonalAccessToken {
         let bytes = mapper::map_raw_pat(&token);
         let token_hash = PersonalAccessToken::hash_token(&token);
 
-        let system = system.downgrade();
-        system
+        shard
             .state
             .apply(
                 session.get_user_id(),
diff --git 
a/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
 
b/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
index 4ba2de04..bb8dd463 100644
--- 
a/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
+++ 
b/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
@@ -19,13 +19,14 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::personal_access_tokens::COMPONENT, 
sender::SenderKind};
+use crate::shard::IggyShard;
 use crate::state::command::EntryCommand;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use anyhow::Result;
 use error_set::ErrContext;
 use iggy_common::IggyError;
 use iggy_common::delete_personal_access_token::DeletePersonalAccessToken;
+use std::rc::Rc;
 use tracing::{debug, instrument};
 
 impl ServerCommandHandler for DeletePersonalAccessToken {
@@ -38,22 +39,19 @@ impl ServerCommandHandler for DeletePersonalAccessToken {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
         let token_name = self.name.clone();
 
-        let mut system = system.write().await;
-        system
+        shard
                 .delete_personal_access_token(session, &self.name)
-                .await
                 .with_error_context(|error| {format!(
                     "{COMPONENT} (error: {error}) - failed to delete personal 
access token with name: {token_name}, session: {session}"
                 )})?;
 
-        let system = system.downgrade();
-        system
+        shard
             .state
             .apply(
                 session.get_user_id(),
diff --git 
a/core/server/src/binary/handlers/personal_access_tokens/get_personal_access_tokens_handler.rs
 
b/core/server/src/binary/handlers/personal_access_tokens/get_personal_access_tokens_handler.rs
index 08d7c6e6..973af58d 100644
--- 
a/core/server/src/binary/handlers/personal_access_tokens/get_personal_access_tokens_handler.rs
+++ 
b/core/server/src/binary/handlers/personal_access_tokens/get_personal_access_tokens_handler.rs
@@ -21,11 +21,12 @@ use 
crate::binary::handlers::personal_access_tokens::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::binary::sender::SenderKind;
+use crate::shard::IggyShard;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use error_set::ErrContext;
 use iggy_common::IggyError;
 use iggy_common::get_personal_access_tokens::GetPersonalAccessTokens;
+use std::rc::Rc;
 use tracing::debug;
 
 impl ServerCommandHandler for GetPersonalAccessTokens {
@@ -37,18 +38,16 @@ impl ServerCommandHandler for GetPersonalAccessTokens {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
-        let system = system.read().await;
-        let personal_access_tokens = system
+        let personal_access_tokens = shard
             .get_personal_access_tokens(session)
-            .await
             .with_error_context(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to get personal 
access tokens with session: {session}")
             })?;
-        let personal_access_tokens = 
mapper::map_personal_access_tokens(&personal_access_tokens);
+        let personal_access_tokens = 
mapper::map_personal_access_tokens(personal_access_tokens);
         sender.send_ok_response(&personal_access_tokens).await?;
         Ok(())
     }
diff --git 
a/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
 
b/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
index 33a4dee2..f7ca5628 100644
--- 
a/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
+++ 
b/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
@@ -15,13 +15,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+use crate::shard::IggyShard;
+use std::rc::Rc;
 
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::binary::{handlers::personal_access_tokens::COMPONENT, 
sender::SenderKind};
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use anyhow::Result;
 use error_set::ErrContext;
 use iggy_common::IggyError;
@@ -38,14 +39,12 @@ impl ServerCommandHandler for LoginWithPersonalAccessToken {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
-        let system = system.read().await;
-        let user = system
+        let user = shard
             .login_with_personal_access_token(&self.token, Some(session))
-            .await
             .with_error_context(|error| {
                 format!(
                     "{COMPONENT} (error: {error}) - failed to login with 
personal access token: {}, session: {session}",
diff --git 
a/core/server/src/binary/handlers/segments/delete_segments_handler.rs 
b/core/server/src/binary/handlers/segments/delete_segments_handler.rs
index 7a2f106c..b3eef13b 100644
--- a/core/server/src/binary/handlers/segments/delete_segments_handler.rs
+++ b/core/server/src/binary/handlers/segments/delete_segments_handler.rs
@@ -19,13 +19,14 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::partitions::COMPONENT, sender::SenderKind};
+use crate::shard::IggyShard;
 use crate::state::command::EntryCommand;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use anyhow::Result;
 use error_set::ErrContext;
 use iggy_common::IggyError;
 use iggy_common::delete_segments::DeleteSegments;
+use std::rc::Rc;
 use tracing::{debug, instrument};
 
 impl ServerCommandHandler for DeleteSegments {
@@ -38,16 +39,15 @@ impl ServerCommandHandler for DeleteSegments {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
         let stream_id = self.stream_id.clone();
         let topic_id = self.topic_id.clone();
         let partition_id = self.partition_id;
 
-        let mut system = system.write().await;
-        system
+        shard
             .delete_segments(
                 session,
                 &self.stream_id,
@@ -62,8 +62,7 @@ impl ServerCommandHandler for DeleteSegments {
                 )
             })?;
 
-        let system = system.downgrade();
-        system
+        shard
             .state
             .apply(
                 session.get_user_id(),
diff --git a/core/server/src/binary/handlers/streams/create_stream_handler.rs 
b/core/server/src/binary/handlers/streams/create_stream_handler.rs
index 86a91d0c..a00afde7 100644
--- a/core/server/src/binary/handlers/streams/create_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/create_stream_handler.rs
@@ -20,14 +20,15 @@ use crate::binary::command::{BinaryServerCommand, 
ServerCommand, ServerCommandHa
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::binary::{handlers::streams::COMPONENT, sender::SenderKind};
+use crate::shard::IggyShard;
 use crate::state::command::EntryCommand;
 use crate::state::models::CreateStreamWithId;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use anyhow::Result;
 use error_set::ErrContext;
 use iggy_common::IggyError;
 use iggy_common::create_stream::CreateStream;
+use std::rc::Rc;
 use tracing::{debug, instrument};
 
 impl ServerCommandHandler for CreateStream {
@@ -40,14 +41,13 @@ impl ServerCommandHandler for CreateStream {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
         let stream_id = self.stream_id;
 
-        let mut system = system.write().await;
-        let stream = system
+        let created_stream_id = shard
                 .create_stream(session, self.stream_id, &self.name)
                 .await
                 .with_error_context(|error| {
@@ -55,11 +55,16 @@ impl ServerCommandHandler for CreateStream {
                         "{COMPONENT} (error: {error}) - failed to create 
stream with id: {stream_id:?}, session: {session}"
                     )
                 })?;
+        let stream = shard.find_stream(session, &created_stream_id)
+            .with_error_context(|error| {
+                format!(
+                    "{COMPONENT} (error: {error}) - failed to find created 
stream with id: {created_stream_id:?}, session: {session}"
+                )
+            })?;
         let stream_id = stream.stream_id;
-        let response = mapper::map_stream(stream);
+        let response = mapper::map_stream(&stream);
 
-        let system = system.downgrade();
-        system
+        shard
             .state
         .apply(session.get_user_id(), 
&EntryCommand::CreateStream(CreateStreamWithId {
             stream_id,
diff --git a/core/server/src/binary/handlers/streams/delete_stream_handler.rs 
b/core/server/src/binary/handlers/streams/delete_stream_handler.rs
index f480c0ea..a0e52a43 100644
--- a/core/server/src/binary/handlers/streams/delete_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/delete_stream_handler.rs
@@ -19,13 +19,14 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::streams::COMPONENT, sender::SenderKind};
+use crate::shard::IggyShard;
 use crate::state::command::EntryCommand;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use anyhow::Result;
 use error_set::ErrContext;
 use iggy_common::IggyError;
 use iggy_common::delete_stream::DeleteStream;
+use std::rc::Rc;
 use tracing::{debug, instrument};
 
 impl ServerCommandHandler for DeleteStream {
@@ -38,22 +39,20 @@ impl ServerCommandHandler for DeleteStream {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
         let stream_id = self.stream_id.clone();
 
-        let mut system = system.write().await;
-        system
+        shard
                 .delete_stream(session, &self.stream_id)
                 .await
                 .with_error_context(|error| {
                     format!("{COMPONENT} (error: {error}) - failed to delete 
stream with ID: {stream_id}, session: {session}")
                 })?;
 
-        let system = system.downgrade();
-        system
+        shard
             .state
             .apply(session.get_user_id(), &EntryCommand::DeleteStream(self))
             .await
diff --git a/core/server/src/binary/handlers/streams/get_stream_handler.rs 
b/core/server/src/binary/handlers/streams/get_stream_handler.rs
index 12e015af..fef6458c 100644
--- a/core/server/src/binary/handlers/streams/get_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/get_stream_handler.rs
@@ -20,11 +20,12 @@ use crate::binary::command::{BinaryServerCommand, 
ServerCommand, ServerCommandHa
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::binary::sender::SenderKind;
+use crate::shard::IggyShard;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use anyhow::Result;
 use iggy_common::IggyError;
 use iggy_common::get_stream::GetStream;
+use std::rc::Rc;
 use tracing::debug;
 
 impl ServerCommandHandler for GetStream {
@@ -36,12 +37,11 @@ impl ServerCommandHandler for GetStream {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
-        let system = system.read().await;
-        let Ok(stream) = system.try_find_stream(session, &self.stream_id) else 
{
+        let Ok(stream) = shard.try_find_stream(session, &self.stream_id) else {
             sender.send_empty_ok_response().await?;
             return Ok(());
         };
@@ -51,7 +51,7 @@ impl ServerCommandHandler for GetStream {
             return Ok(());
         };
 
-        let response = mapper::map_stream(stream);
+        let response = mapper::map_stream(&stream);
         sender.send_ok_response(&response).await?;
         Ok(())
     }
diff --git a/core/server/src/binary/handlers/streams/get_streams_handler.rs 
b/core/server/src/binary/handlers/streams/get_streams_handler.rs
index fa08bc13..0f83a9cb 100644
--- a/core/server/src/binary/handlers/streams/get_streams_handler.rs
+++ b/core/server/src/binary/handlers/streams/get_streams_handler.rs
@@ -21,12 +21,13 @@ use crate::binary::handlers::streams::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::binary::sender::SenderKind;
+use crate::shard::IggyShard;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use anyhow::Result;
 use error_set::ErrContext;
 use iggy_common::IggyError;
 use iggy_common::get_streams::GetStreams;
+use std::rc::Rc;
 use tracing::debug;
 
 impl ServerCommandHandler for GetStreams {
@@ -38,15 +39,14 @@ impl ServerCommandHandler for GetStreams {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
-        let system = system.read().await;
-        let streams = system.find_streams(session).with_error_context(|error| {
+        let streams = shard.find_streams(session).with_error_context(|error| {
             format!("{COMPONENT} (error: {error}) - failed to find streams for 
session: {session}")
         })?;
-        let response = mapper::map_streams(&streams);
+        let response = mapper::map_streams(streams);
         sender.send_ok_response(&response).await?;
         Ok(())
     }
diff --git a/core/server/src/binary/handlers/streams/purge_stream_handler.rs 
b/core/server/src/binary/handlers/streams/purge_stream_handler.rs
index dd37e155..3a387ce9 100644
--- a/core/server/src/binary/handlers/streams/purge_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/purge_stream_handler.rs
@@ -19,13 +19,14 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::streams::COMPONENT, sender::SenderKind};
+use crate::shard::IggyShard;
 use crate::state::command::EntryCommand;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use anyhow::Result;
 use error_set::ErrContext;
 use iggy_common::IggyError;
 use iggy_common::purge_stream::PurgeStream;
+use std::rc::Rc;
 use tracing::{debug, instrument};
 
 impl ServerCommandHandler for PurgeStream {
@@ -38,21 +39,20 @@ impl ServerCommandHandler for PurgeStream {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
-        let system = system.read().await;
         let stream_id = self.stream_id.clone();
 
-        system
+        shard
             .purge_stream(session, &self.stream_id)
             .await
             .with_error_context(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to purge stream 
with id: {stream_id}, session: {session}")
             })?;
 
-        system
+        shard
             .state
             .apply(session.get_user_id(), &EntryCommand::PurgeStream(self))
             .await
diff --git a/core/server/src/binary/handlers/streams/update_stream_handler.rs 
b/core/server/src/binary/handlers/streams/update_stream_handler.rs
index cdb02c91..2aa4ddc9 100644
--- a/core/server/src/binary/handlers/streams/update_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/update_stream_handler.rs
@@ -19,13 +19,14 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::streams::COMPONENT, sender::SenderKind};
+use crate::shard::IggyShard;
 use crate::state::command::EntryCommand;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use anyhow::Result;
 use error_set::ErrContext;
 use iggy_common::IggyError;
 use iggy_common::update_stream::UpdateStream;
+use std::rc::Rc;
 use tracing::{debug, instrument};
 
 impl ServerCommandHandler for UpdateStream {
@@ -38,22 +39,20 @@ impl ServerCommandHandler for UpdateStream {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
         let stream_id = self.stream_id.clone();
 
-        let mut system = system.write().await;
-        system
+        shard
                 .update_stream(session, &self.stream_id, &self.name)
                 .await
                 .with_error_context(|error| {
                     format!("{COMPONENT} (error: {error}) - failed to update 
stream with id: {stream_id}, session: {session}")
                 })?;
 
-        let system = system.downgrade();
-        system
+        shard
             .state
             .apply(session.get_user_id(), &EntryCommand::UpdateStream(self))
             .await
diff --git a/core/server/src/binary/handlers/system/get_client_handler.rs 
b/core/server/src/binary/handlers/system/get_client_handler.rs
index 532eddf4..9ca18a38 100644
--- a/core/server/src/binary/handlers/system/get_client_handler.rs
+++ b/core/server/src/binary/handlers/system/get_client_handler.rs
@@ -20,11 +20,11 @@ use crate::binary::command::{BinaryServerCommand, 
ServerCommand, ServerCommandHa
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::binary::sender::SenderKind;
+use crate::shard::IggyShard;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use iggy_common::IggyError;
 use iggy_common::get_client::GetClient;
-use iggy_common::locking::IggySharedMutFn;
+use std::rc::Rc;
 use tracing::debug;
 
 impl ServerCommandHandler for GetClient {
@@ -36,13 +36,12 @@ impl ServerCommandHandler for GetClient {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
 
-        let system = system.read().await;
-        let Ok(client) = system.get_client(session, self.client_id).await else 
{
+        let Ok(client) = shard.get_client(session, self.client_id) else {
             sender.send_empty_ok_response().await?;
             return Ok(());
         };
@@ -52,7 +51,6 @@ impl ServerCommandHandler for GetClient {
             return Ok(());
         };
 
-        let client = client.read().await;
         let bytes = mapper::map_client(&client);
 
         sender.send_ok_response(&bytes).await?;
diff --git a/core/server/src/binary/handlers/system/get_clients_handler.rs 
b/core/server/src/binary/handlers/system/get_clients_handler.rs
index bf05f7f1..c73ac987 100644
--- a/core/server/src/binary/handlers/system/get_clients_handler.rs
+++ b/core/server/src/binary/handlers/system/get_clients_handler.rs
@@ -21,11 +21,12 @@ use crate::binary::handlers::system::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::binary::sender::SenderKind;
+use crate::shard::IggyShard;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use error_set::ErrContext;
 use iggy_common::IggyError;
 use iggy_common::get_clients::GetClients;
+use std::rc::Rc;
 use tracing::debug;
 
 impl ServerCommandHandler for GetClients {
@@ -37,19 +38,17 @@ impl ServerCommandHandler for GetClients {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
 
-        let system = system.read().await;
-        let clients = system
+        let clients = shard
             .get_clients(session)
-            .await
             .with_error_context(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to get clients, 
session: {session}")
             })?;
-        let clients = mapper::map_clients(&clients).await;
+        let clients = mapper::map_clients(clients).await;
         sender.send_ok_response(&clients).await?;
         Ok(())
     }
diff --git a/core/server/src/binary/handlers/system/get_me_handler.rs 
b/core/server/src/binary/handlers/system/get_me_handler.rs
index b29972a9..44fdd651 100644
--- a/core/server/src/binary/handlers/system/get_me_handler.rs
+++ b/core/server/src/binary/handlers/system/get_me_handler.rs
@@ -21,12 +21,12 @@ use crate::binary::handlers::system::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::binary::sender::SenderKind;
+use crate::shard::IggyShard;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use error_set::ErrContext;
 use iggy_common::IggyError;
 use iggy_common::get_me::GetMe;
-use iggy_common::locking::IggySharedMutFn;
+use std::rc::Rc;
 
 impl ServerCommandHandler for GetMe {
     fn code(&self) -> u32 {
@@ -37,13 +37,11 @@ impl ServerCommandHandler for GetMe {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
-        let system = system.read().await;
-        let Some(client) = system
+        let Some(client) = shard
             .get_client(session, session.client_id)
-            .await
             .with_error_context(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to get current 
client for session: {session}")
             })?
@@ -51,7 +49,6 @@ impl ServerCommandHandler for GetMe {
             return Err(IggyError::ClientNotFound(session.client_id));
         };
 
-        let client = client.read().await;
         let bytes = mapper::map_client(&client);
 
         sender.send_ok_response(&bytes).await?;
diff --git a/core/server/src/binary/handlers/system/get_snapshot.rs 
b/core/server/src/binary/handlers/system/get_snapshot.rs
index ff850388..27b2a3a7 100644
--- a/core/server/src/binary/handlers/system/get_snapshot.rs
+++ b/core/server/src/binary/handlers/system/get_snapshot.rs
@@ -19,11 +19,12 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::sender::SenderKind;
+use crate::shard::IggyShard;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use bytes::Bytes;
 use iggy_common::IggyError;
 use iggy_common::get_snapshot::GetSnapshot;
+use std::rc::Rc;
 use tracing::debug;
 
 impl ServerCommandHandler for GetSnapshot {
@@ -35,13 +36,12 @@ impl ServerCommandHandler for GetSnapshot {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
 
-        let system = system.read().await;
-        let snapshot = system
+        let snapshot = shard
             .get_snapshot(session, self.compression, &self.snapshot_types)
             .await?;
         let bytes = Bytes::copy_from_slice(&snapshot.0);
diff --git a/core/server/src/binary/handlers/system/get_stats_handler.rs 
b/core/server/src/binary/handlers/system/get_stats_handler.rs
index 098ace33..17c87191 100644
--- a/core/server/src/binary/handlers/system/get_stats_handler.rs
+++ b/core/server/src/binary/handlers/system/get_stats_handler.rs
@@ -21,11 +21,12 @@ use crate::binary::handlers::system::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::binary::sender::SenderKind;
+use crate::shard::IggyShard;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use error_set::ErrContext;
 use iggy_common::IggyError;
 use iggy_common::get_stats::GetStats;
+use std::rc::Rc;
 use tracing::debug;
 
 impl ServerCommandHandler for GetStats {
@@ -37,13 +38,12 @@ impl ServerCommandHandler for GetStats {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
 
-        let system = system.read().await;
-        let stats = system.get_stats().await.with_error_context(|error| {
+        let stats = shard.get_stats().await.with_error_context(|error| {
             format!("{COMPONENT} (error: {error}) - failed to get stats, 
session: {session}")
         })?;
         let bytes = mapper::map_stats(&stats);
diff --git a/core/server/src/binary/handlers/system/ping_handler.rs 
b/core/server/src/binary/handlers/system/ping_handler.rs
index 396719c5..a66d91db 100644
--- a/core/server/src/binary/handlers/system/ping_handler.rs
+++ b/core/server/src/binary/handlers/system/ping_handler.rs
@@ -18,13 +18,13 @@
 
 use crate::binary::command::{BinaryServerCommand, ServerCommandHandler};
 use crate::binary::sender::SenderKind;
+use crate::shard::IggyShard;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use anyhow::Result;
 use iggy_common::IggyError;
 use iggy_common::IggyTimestamp;
-use iggy_common::locking::IggySharedMutFn;
 use iggy_common::ping::Ping;
+use std::rc::Rc;
 use tracing::debug;
 
 impl ServerCommandHandler for Ping {
@@ -36,14 +36,15 @@ impl ServerCommandHandler for Ping {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
-        let system = system.read().await;
-        let client_manager = system.client_manager.read().await;
-        if let Some(client) = client_manager.try_get_client(session.client_id) 
{
-            let mut client = client.write().await;
+        if let Some(client) = shard
+            .client_manager
+            .borrow_mut()
+            .try_get_client_mut(session.client_id)
+        {
             let now = IggyTimestamp::now();
             client.last_heartbeat = now;
             debug!("Updated last heartbeat to: {now} for session: {session}");
diff --git a/core/server/src/binary/handlers/topics/create_topic_handler.rs 
b/core/server/src/binary/handlers/topics/create_topic_handler.rs
index 0f42616f..db09863e 100644
--- a/core/server/src/binary/handlers/topics/create_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/create_topic_handler.rs
@@ -20,14 +20,15 @@ use crate::binary::command::{BinaryServerCommand, 
ServerCommand, ServerCommandHa
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::binary::{handlers::topics::COMPONENT, sender::SenderKind};
+use crate::shard::IggyShard;
 use crate::state::command::EntryCommand;
 use crate::state::models::CreateTopicWithId;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use anyhow::Result;
 use error_set::ErrContext;
 use iggy_common::IggyError;
 use iggy_common::create_topic::CreateTopic;
+use std::rc::Rc;
 use tracing::{debug, instrument};
 
 impl ServerCommandHandler for CreateTopic {
@@ -40,14 +41,13 @@ impl ServerCommandHandler for CreateTopic {
         mut self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
         let stream_id = self.stream_id.clone();
         let topic_id = self.topic_id;
-        let mut system = system.write().await;
-        let topic = system
+        let created_topic_id = shard
                 .create_topic(
                     session,
                     &self.stream_id,
@@ -62,18 +62,31 @@ impl ServerCommandHandler for CreateTopic {
                 .await
                 .with_error_context(|error| format!("{COMPONENT} (error: 
{error}) - failed to create topic for stream_id: {stream_id}, topic_id: 
{topic_id:?}"
                 ))?;
+        let stream = shard
+            .find_stream(session, &self.stream_id)
+            .with_error_context(|error| {
+                format!(
+                    "{COMPONENT} (error: {error}) - failed to get stream for 
stream_id: {stream_id}"
+                )
+            })?;
+        let topic = shard.find_topic(session, &stream, &created_topic_id)
+            .with_error_context(|error| {
+                format!(
+                    "{COMPONENT} (error: {error}) - failed to get topic with 
ID: {created_topic_id} in stream with ID: {stream_id}"
+                )
+            })?;
         self.message_expiry = topic.message_expiry;
         self.max_topic_size = topic.max_topic_size;
         let topic_id = topic.topic_id;
         let response = mapper::map_topic(topic).await;
 
-        let system = system.downgrade();
-        system
+        shard
             .state
             .apply(session.get_user_id(), 
&EntryCommand::CreateTopic(CreateTopicWithId {
                 topic_id,
                 command: self
-            }))            .await
+            }))
+            .await
             .with_error_context(|error| {
                 format!(
                 "{COMPONENT} (error: {error}) - failed to apply create topic 
for stream_id: {stream_id}, topic_id: {topic_id:?}"
diff --git a/core/server/src/binary/handlers/topics/delete_topic_handler.rs 
b/core/server/src/binary/handlers/topics/delete_topic_handler.rs
index e83115f4..a8978511 100644
--- a/core/server/src/binary/handlers/topics/delete_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/delete_topic_handler.rs
@@ -19,13 +19,14 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::topics::COMPONENT, sender::SenderKind};
+use crate::shard::IggyShard;
 use crate::state::command::EntryCommand;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use anyhow::Result;
 use error_set::ErrContext;
 use iggy_common::IggyError;
 use iggy_common::delete_topic::DeleteTopic;
+use std::rc::Rc;
 use tracing::{debug, instrument};
 
 impl ServerCommandHandler for DeleteTopic {
@@ -38,23 +39,21 @@ impl ServerCommandHandler for DeleteTopic {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
         let stream_id = self.stream_id.clone();
         let topic_id = self.topic_id.clone();
 
-        let mut system = system.write().await;
-        system
+        shard
                 .delete_topic(session, &self.stream_id, &self.topic_id)
                 .await
                 .with_error_context(|error| format!(
                     "{COMPONENT} (error: {error}) - failed to delete topic 
with ID: {topic_id} in stream with ID: {stream_id}, session: {session}",
                 ))?;
 
-        let system = system.downgrade();
-        system
+        shard
             .state
             .apply(session.get_user_id(), &EntryCommand::DeleteTopic(self))
             .await
diff --git a/core/server/src/binary/handlers/topics/get_topic_handler.rs 
b/core/server/src/binary/handlers/topics/get_topic_handler.rs
index 8c652eb3..597c13bf 100644
--- a/core/server/src/binary/handlers/topics/get_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/get_topic_handler.rs
@@ -20,11 +20,13 @@ use crate::binary::command::{BinaryServerCommand, 
ServerCommand, ServerCommandHa
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::binary::sender::SenderKind;
+use crate::shard::IggyShard;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use anyhow::Result;
+use error_set::ErrContext;
 use iggy_common::IggyError;
 use iggy_common::get_topic::GetTopic;
+use std::rc::Rc;
 use tracing::debug;
 
 impl ServerCommandHandler for GetTopic {
@@ -36,12 +38,19 @@ impl ServerCommandHandler for GetTopic {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
-        let system = system.read().await;
-        let Ok(topic) = system.try_find_topic(session, &self.stream_id, 
&self.topic_id) else {
+        let stream = shard
+            .find_stream(session, &self.stream_id)
+            .with_error_context(|error| {
+                format!(
+                    "Failed to find stream with ID: {}, session: {session} 
(error: {error})",
+                    self.stream_id
+                )
+            })?;
+        let Ok(topic) = shard.try_find_topic(session, &stream, &self.topic_id) 
else {
             sender.send_empty_ok_response().await?;
             return Ok(());
         };
diff --git a/core/server/src/binary/handlers/topics/get_topics_handler.rs 
b/core/server/src/binary/handlers/topics/get_topics_handler.rs
index 8d5c61b1..1606245e 100644
--- a/core/server/src/binary/handlers/topics/get_topics_handler.rs
+++ b/core/server/src/binary/handlers/topics/get_topics_handler.rs
@@ -21,12 +21,13 @@ use crate::binary::handlers::topics::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::binary::sender::SenderKind;
+use crate::shard::IggyShard;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use anyhow::Result;
 use error_set::ErrContext;
 use iggy_common::IggyError;
 use iggy_common::get_topics::GetTopics;
+use std::rc::Rc;
 use tracing::debug;
 
 impl ServerCommandHandler for GetTopics {
@@ -38,20 +39,26 @@ impl ServerCommandHandler for GetTopics {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
-        let system = system.read().await;
-        let topics = system
-            .find_topics(session, &self.stream_id)
+        let stream = shard.find_stream(session, &self.stream_id)
+        .with_error_context(|error| {
+            format!(
+                "{COMPONENT} (error: {error}) - failed to get stream, 
stream_id: {}, session: {session}",
+                self.stream_id
+            )
+        })?;
+        let topics = shard
+            .find_topics(session, &stream)
             .with_error_context(|error| {
                 format!(
                     "{COMPONENT} (error: {error}) - failed to find topics, 
stream_id: {}, session: {session}",
                     self.stream_id
                 )
             })?;
-        let response = mapper::map_topics(&topics);
+        let response = mapper::map_topics(topics);
         sender.send_ok_response(&response).await?;
         Ok(())
     }
diff --git a/core/server/src/binary/handlers/topics/purge_topic_handler.rs 
b/core/server/src/binary/handlers/topics/purge_topic_handler.rs
index d9c91470..ab7e392c 100644
--- a/core/server/src/binary/handlers/topics/purge_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/purge_topic_handler.rs
@@ -19,13 +19,14 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::topics::COMPONENT, sender::SenderKind};
+use crate::shard::IggyShard;
 use crate::state::command::EntryCommand;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use anyhow::Result;
 use error_set::ErrContext;
 use iggy_common::IggyError;
 use iggy_common::purge_topic::PurgeTopic;
+use std::rc::Rc;
 use tracing::{debug, instrument};
 
 impl ServerCommandHandler for PurgeTopic {
@@ -38,12 +39,11 @@ impl ServerCommandHandler for PurgeTopic {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
-        let system = system.read().await;
-        system
+        shard
             .purge_topic(session, &self.stream_id, &self.topic_id)
             .await
             .with_error_context(|error| {
@@ -55,7 +55,7 @@ impl ServerCommandHandler for PurgeTopic {
 
         let topic_id = self.topic_id.clone();
         let stream_id = self.stream_id.clone();
-        system
+        shard
             .state
             .apply(session.get_user_id(), &EntryCommand::PurgeTopic(self))
             .await
diff --git a/core/server/src/binary/handlers/topics/update_topic_handler.rs 
b/core/server/src/binary/handlers/topics/update_topic_handler.rs
index f460cc24..9d75a255 100644
--- a/core/server/src/binary/handlers/topics/update_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/update_topic_handler.rs
@@ -19,13 +19,14 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::topics::COMPONENT, sender::SenderKind};
+use crate::shard::IggyShard;
 use crate::state::command::EntryCommand;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use anyhow::Result;
 use error_set::ErrContext;
 use iggy_common::IggyError;
 use iggy_common::update_topic::UpdateTopic;
+use std::rc::Rc;
 use tracing::{debug, instrument};
 
 impl ServerCommandHandler for UpdateTopic {
@@ -38,14 +39,12 @@ impl ServerCommandHandler for UpdateTopic {
         mut self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
 
-        let mut system = system.write().await;
-
-        let topic = system
+        shard
                 .update_topic(
                     session,
                     &self.stream_id,
@@ -61,14 +60,25 @@ impl ServerCommandHandler for UpdateTopic {
                     "{COMPONENT} (error: {error}) - failed to update topic 
with id: {}, stream_id: {}, session: {session}",
                     self.topic_id, self.stream_id
                 ))?;
+
+        let stream = shard.find_stream(session, &self.stream_id)
+            .with_error_context(|error| format!(
+                "{COMPONENT} (error: {error}) - failed to find stream, 
stream_id: {}, session: {session}",
+                self.stream_id
+            ))?;
+        let topic = stream
+            .get_topic(&self.topic_id)
+            .with_error_context(|error| format!(
+                "{COMPONENT} (error: {error}) - failed to find topic, 
topic_id: {}, stream_id: {}, session: {session}",
+                self.topic_id, self.stream_id
+            ))?;
         self.message_expiry = topic.message_expiry;
         self.max_topic_size = topic.max_topic_size;
 
         let topic_id = self.topic_id.clone();
         let stream_id = self.stream_id.clone();
-        let system = system.downgrade();
 
-        system
+        shard
             .state
             .apply(session.get_user_id(), &EntryCommand::UpdateTopic(self))
             .await
diff --git a/core/server/src/binary/handlers/users/change_password_handler.rs 
b/core/server/src/binary/handlers/users/change_password_handler.rs
index 935e8f87..32b376cd 100644
--- a/core/server/src/binary/handlers/users/change_password_handler.rs
+++ b/core/server/src/binary/handlers/users/change_password_handler.rs
@@ -19,14 +19,15 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
+use crate::shard::IggyShard;
 use crate::state::command::EntryCommand;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use crate::streaming::utils::crypto;
 use anyhow::Result;
 use error_set::ErrContext;
 use iggy_common::IggyError;
 use iggy_common::change_password::ChangePassword;
+use std::rc::Rc;
 use tracing::{debug, instrument};
 
 impl ServerCommandHandler for ChangePassword {
@@ -38,21 +39,19 @@ impl ServerCommandHandler for ChangePassword {
     async fn handle(
         self,
         sender: &mut SenderKind,
-        _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        length: u32,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
 
-        let mut system = system.write().await;
-        system
+        shard
                 .change_password(
                     session,
                     &self.user_id,
                     &self.current_password,
                     &self.new_password,
                 )
-                .await
                 .with_error_context(|error| {
                     format!(
                         "{COMPONENT} (error: {error}) - failed to change 
password for user_id: {}, session: {session}",
@@ -61,8 +60,7 @@ impl ServerCommandHandler for ChangePassword {
                 })?;
 
         // For the security of the system, we hash the password before storing 
it in metadata.
-        let system = system.downgrade();
-        system
+        shard
             .state
             .apply(
                 session.get_user_id(),
diff --git a/core/server/src/binary/handlers/users/create_user_handler.rs 
b/core/server/src/binary/handlers/users/create_user_handler.rs
index 17d17fdf..e51b147b 100644
--- a/core/server/src/binary/handlers/users/create_user_handler.rs
+++ b/core/server/src/binary/handlers/users/create_user_handler.rs
@@ -16,6 +16,9 @@
  * under the License.
  */
 
+use crate::shard::IggyShard;
+use std::rc::Rc;
+
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
@@ -23,7 +26,6 @@ use crate::binary::{handlers::users::COMPONENT, 
sender::SenderKind};
 use crate::state::command::EntryCommand;
 use crate::state::models::CreateUserWithId;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use crate::streaming::utils::crypto;
 use anyhow::Result;
 use error_set::ErrContext;
@@ -41,13 +43,12 @@ impl ServerCommandHandler for CreateUser {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
 
-        let mut system = system.write().await;
-        let user = system
+        let user = shard
                 .create_user(
                     session,
                     &self.username,
@@ -63,11 +64,10 @@ impl ServerCommandHandler for CreateUser {
                     )
                 })?;
         let user_id = user.id;
-        let response = mapper::map_user(user);
+        let response = mapper::map_user(&user);
 
         // For the security of the system, we hash the password before storing 
it in metadata.
-        let system = system.downgrade();
-        system
+        shard
             .state
         .apply(
             session.get_user_id(),
diff --git a/core/server/src/binary/handlers/users/delete_user_handler.rs 
b/core/server/src/binary/handlers/users/delete_user_handler.rs
index b9fddfd7..10de56a4 100644
--- a/core/server/src/binary/handlers/users/delete_user_handler.rs
+++ b/core/server/src/binary/handlers/users/delete_user_handler.rs
@@ -16,12 +16,14 @@
  * under the License.
  */
 
+use std::rc::Rc;
+
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
+use crate::shard::IggyShard;
 use crate::state::command::EntryCommand;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use anyhow::Result;
 use error_set::ErrContext;
 use iggy_common::IggyError;
@@ -37,16 +39,14 @@ impl ServerCommandHandler for DeleteUser {
     async fn handle(
         self,
         sender: &mut SenderKind,
-        _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        length: u32,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
 
-        let mut system = system.write().await;
-        system
+        shard
                 .delete_user(session, &self.user_id)
-                .await
                 .with_error_context(|error| {
                     format!(
                         "{COMPONENT} (error: {error}) - failed to delete user 
with ID: {}, session: {session}",
@@ -54,9 +54,8 @@ impl ServerCommandHandler for DeleteUser {
                     )
                 })?;
 
-        let system = system.downgrade();
         let user_id = self.user_id.clone();
-        system
+        shard
             .state
             .apply(session.get_user_id(), &EntryCommand::DeleteUser(self))
             .await
diff --git a/core/server/src/binary/handlers/users/get_user_handler.rs 
b/core/server/src/binary/handlers/users/get_user_handler.rs
index 5523c4cc..e276c9a4 100644
--- a/core/server/src/binary/handlers/users/get_user_handler.rs
+++ b/core/server/src/binary/handlers/users/get_user_handler.rs
@@ -16,12 +16,14 @@
  * under the License.
  */
 
+use std::rc::Rc;
+
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::binary::sender::SenderKind;
+use crate::shard::IggyShard;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use iggy_common::IggyError;
 use iggy_common::get_user::GetUser;
 use tracing::debug;
@@ -35,12 +37,11 @@ impl ServerCommandHandler for GetUser {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
-        let system = system.read().await;
-        let Ok(user) = system.find_user(session, &self.user_id) else {
+        let Ok(user) = shard.find_user(session, &self.user_id) else {
             sender.send_empty_ok_response().await?;
             return Ok(());
         };
@@ -49,7 +50,7 @@ impl ServerCommandHandler for GetUser {
             return Ok(());
         };
 
-        let bytes = mapper::map_user(user);
+        let bytes = mapper::map_user(&user);
         sender.send_ok_response(&bytes).await?;
         Ok(())
     }
diff --git a/core/server/src/binary/handlers/users/get_users_handler.rs 
b/core/server/src/binary/handlers/users/get_users_handler.rs
index 7471c591..72a0ca1e 100644
--- a/core/server/src/binary/handlers/users/get_users_handler.rs
+++ b/core/server/src/binary/handlers/users/get_users_handler.rs
@@ -16,13 +16,15 @@
  * under the License.
  */
 
+use std::rc::Rc;
+
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::users::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::binary::sender::SenderKind;
+use crate::shard::IggyShard;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use error_set::ErrContext;
 use iggy_common::IggyError;
 use iggy_common::get_users::GetUsers;
@@ -37,18 +39,14 @@ impl ServerCommandHandler for GetUsers {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
-        let system = system.read().await;
-        let users = system
-            .get_users(session)
-            .await
-            .with_error_context(|error| {
-                format!("{COMPONENT} (error: {error}) - failed to get users, 
session: {session}")
-            })?;
-        let users = mapper::map_users(&users);
+        let users = shard.get_users(session).await.with_error_context(|error| {
+            format!("{COMPONENT} (error: {error}) - failed to get users, 
session: {session}")
+        })?;
+        let users = mapper::map_users(users);
         sender.send_ok_response(&users).await?;
         Ok(())
     }
diff --git a/core/server/src/binary/handlers/users/login_user_handler.rs 
b/core/server/src/binary/handlers/users/login_user_handler.rs
index 7fa67be7..21cbba49 100644
--- a/core/server/src/binary/handlers/users/login_user_handler.rs
+++ b/core/server/src/binary/handlers/users/login_user_handler.rs
@@ -16,12 +16,14 @@
  * under the License.
  */
 
+use std::rc::Rc;
+
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
+use crate::shard::IggyShard;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use anyhow::Result;
 use error_set::ErrContext;
 use iggy_common::IggyError;
@@ -37,15 +39,13 @@ impl ServerCommandHandler for LoginUser {
     async fn handle(
         self,
         sender: &mut SenderKind,
-        _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        length: u32,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
-        let system = system.read().await;
-        let user = system
+        let user = shard
             .login_user(&self.username, &self.password, Some(session))
-            .await
             .with_error_context(|error| {
                 format!(
                     "{COMPONENT} (error: {error}) - failed to login user with 
name: {}, session: {session}",
diff --git a/core/server/src/binary/handlers/users/logout_user_handler.rs 
b/core/server/src/binary/handlers/users/logout_user_handler.rs
index 77fa33f4..d1ef95bf 100644
--- a/core/server/src/binary/handlers/users/logout_user_handler.rs
+++ b/core/server/src/binary/handlers/users/logout_user_handler.rs
@@ -16,11 +16,13 @@
  * under the License.
  */
 
+use std::rc::Rc;
+
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
+use crate::shard::IggyShard;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use anyhow::Result;
 use error_set::ErrContext;
 use iggy_common::IggyError;
@@ -36,18 +38,14 @@ impl ServerCommandHandler for LogoutUser {
     async fn handle(
         self,
         sender: &mut SenderKind,
-        _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        length: u32,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
-        let system = system.read().await;
-        system
-            .logout_user(session)
-            .await
-            .with_error_context(|error| {
-                format!("{COMPONENT} (error: {error}) - failed to logout user, 
session: {session}")
-            })?;
+        shard.logout_user(session).with_error_context(|error| {
+            format!("{COMPONENT} (error: {error}) - failed to logout user, 
session: {session}")
+        })?;
         session.clear_user_id();
         sender.send_empty_ok_response().await?;
         Ok(())
diff --git 
a/core/server/src/binary/handlers/users/update_permissions_handler.rs 
b/core/server/src/binary/handlers/users/update_permissions_handler.rs
index 3606c308..dea49a1c 100644
--- a/core/server/src/binary/handlers/users/update_permissions_handler.rs
+++ b/core/server/src/binary/handlers/users/update_permissions_handler.rs
@@ -16,12 +16,14 @@
  * under the License.
  */
 
+use std::rc::Rc;
+
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
+use crate::shard::IggyShard;
 use crate::state::command::EntryCommand;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use anyhow::Result;
 use error_set::ErrContext;
 use iggy_common::IggyError;
@@ -38,21 +40,18 @@ impl ServerCommandHandler for UpdatePermissions {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
 
-        let mut system = system.write().await;
-        system
+        shard
                 .update_permissions(session, &self.user_id, 
self.permissions.clone())
-                .await
                 .with_error_context(|error| format!("{COMPONENT} (error: 
{error}) - failed to update permissions for user_id: {}, session: {session}",
                     self.user_id
                 ))?;
 
-        let system = system.downgrade();
-        system
+        shard
             .state
             .apply(
                 session.get_user_id(),
diff --git a/core/server/src/binary/handlers/users/update_user_handler.rs 
b/core/server/src/binary/handlers/users/update_user_handler.rs
index 873a3c6f..982c4ae1 100644
--- a/core/server/src/binary/handlers/users/update_user_handler.rs
+++ b/core/server/src/binary/handlers/users/update_user_handler.rs
@@ -16,12 +16,14 @@
  * under the License.
  */
 
+use std::rc::Rc;
+
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
+use crate::shard::IggyShard;
 use crate::state::command::EntryCommand;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use anyhow::Result;
 use error_set::ErrContext;
 use iggy_common::IggyError;
@@ -38,20 +40,18 @@ impl ServerCommandHandler for UpdateUser {
         self,
         sender: &mut SenderKind,
         _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
 
-        let mut system = system.write().await;
-        system
+        shard
                 .update_user(
                     session,
                     &self.user_id,
                     self.username.clone(),
                     self.status,
                 )
-                .await
                 .with_error_context(|error| {
                     format!(
                         "{COMPONENT} (error: {error}) - failed to update user 
with user_id: {}, session: {session}",
@@ -59,10 +59,9 @@ impl ServerCommandHandler for UpdateUser {
                     )
                 })?;
 
-        let system = system.downgrade();
         let user_id = self.user_id.clone();
 
-        system
+        shard
             .state
             .apply(session.get_user_id(), &EntryCommand::UpdateUser(self))
             .await
diff --git a/core/server/src/binary/mapper.rs b/core/server/src/binary/mapper.rs
index 219ceed9..1ecce265 100644
--- a/core/server/src/binary/mapper.rs
+++ b/core/server/src/binary/mapper.rs
@@ -16,6 +16,8 @@
  * under the License.
  */
 
+use std::cell::Ref;
+
 use crate::streaming::clients::client_manager::{Client, Transport};
 use crate::streaming::partitions::partition::Partition;
 use 
crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken;
@@ -94,9 +96,9 @@ pub fn map_client(client: &Client) -> Bytes {
     bytes.freeze()
 }
 
-pub async fn map_clients(clients: &[Client]) -> Bytes {
+pub async fn map_clients(clients: Vec<Client>) -> Bytes {
     let mut bytes = BytesMut::new();
-    for client in clients {
+    for client in clients.iter() {
         extend_client(client, &mut bytes);
     }
     bytes.freeze()
@@ -117,9 +119,9 @@ pub fn map_user(user: &User) -> Bytes {
     bytes.freeze()
 }
 
-pub fn map_users(users: &[&User]) -> Bytes {
+pub fn map_users(users: Vec<User>) -> Bytes {
     let mut bytes = BytesMut::new();
-    for user in users {
+    for user in users.iter() {
         extend_user(user, &mut bytes);
     }
     bytes.freeze()
@@ -138,9 +140,9 @@ pub fn map_raw_pat(token: &str) -> Bytes {
     bytes.freeze()
 }
 
-pub fn map_personal_access_tokens(personal_access_tokens: 
&[PersonalAccessToken]) -> Bytes {
+pub fn map_personal_access_tokens(personal_access_tokens: 
Vec<PersonalAccessToken>) -> Bytes {
     let mut bytes = BytesMut::new();
-    for personal_access_token in personal_access_tokens {
+    for personal_access_token in personal_access_tokens.iter() {
         extend_pat(personal_access_token, &mut bytes);
     }
     bytes.freeze()
@@ -155,15 +157,15 @@ pub fn map_stream(stream: &Stream) -> Bytes {
     bytes.freeze()
 }
 
-pub fn map_streams(streams: &[&Stream]) -> Bytes {
+pub fn map_streams(streams: Vec<Ref<'_, Stream>>) -> Bytes {
     let mut bytes = BytesMut::new();
-    for stream in streams {
+    for stream in streams.iter() {
         extend_stream(stream, &mut bytes);
     }
     bytes.freeze()
 }
 
-pub fn map_topics(topics: &[&Topic]) -> Bytes {
+pub fn map_topics(topics: Vec<&Topic>) -> Bytes {
     let mut bytes = BytesMut::new();
     for topic in topics {
         extend_topic(topic, &mut bytes);
@@ -196,7 +198,7 @@ pub fn map_consumer_group(consumer_group: &ConsumerGroup) 
-> Bytes {
     bytes.freeze()
 }
 
-pub fn map_consumer_groups(consumer_groups: &[ConsumerGroup]) -> Bytes {
+pub fn map_consumer_groups(consumer_groups: Vec<ConsumerGroup>) -> Bytes {
     let mut bytes = BytesMut::new();
     for consumer_group in consumer_groups {
         extend_consumer_group(&consumer_group, &mut bytes);
diff --git a/core/server/src/binary/sender.rs b/core/server/src/binary/sender.rs
index 06e9c51f..ff405eb6 100644
--- a/core/server/src/binary/sender.rs
+++ b/core/server/src/binary/sender.rs
@@ -33,28 +33,33 @@ use quinn::{RecvStream, SendStream};
 macro_rules! forward_async_methods {
     (
         $(
-            async fn $method_name:ident(
+            async fn $method_name:ident
+            $(<$($generic:ident $(: $bound:path)?),+>)?
+            (
                 &mut self $(, $arg:ident : $arg_ty:ty )*
             ) -> $ret:ty ;
         )*
     ) => {
         $(
-            pub async fn $method_name(&mut self, $( $arg: $arg_ty ),* ) -> 
$ret {
+            pub async fn $method_name
+            $(<$($generic $(: $bound)?),+>)?
+            (&mut self, $( $arg: $arg_ty ),* ) -> $ret {
                 match self {
-                    Self::Tcp(d) => d.$method_name($( $arg ),*).await,
-                    Self::TcpTls(s) => s.$method_name($( $arg ),*).await,
-                    Self::Quic(s) => s.$method_name($( $arg ),*).await,
+                    Self::Tcp(d) => d.$method_name$(::<$($generic),+>)?($( 
$arg ),*).await,
+                    Self::TcpTls(s) => s.$method_name$(::<$($generic),+>)?($( 
$arg ),*).await,
+                    Self::Quic(s) => s.$method_name$(::<$($generic),+>)?($( 
$arg ),*).await,
                 }
             }
         )*
     }
 }
 
+
 pub trait Sender {
-    fn read(
+    fn read<B: IoBufMut>(
         &mut self,
-        buffer: BytesMut,
-    ) -> impl Future<Output = (Result<usize, IggyError>, BytesMut)>;
+        buffer: B
+    ) -> impl Future<Output = (Result<usize, IggyError>, B)>;
     fn send_empty_ok_response(&mut self) -> impl Future<Output = Result<(), 
IggyError>>;
     fn send_ok_response(&mut self, payload: &[u8]) -> impl Future<Output = 
Result<(), IggyError>>;
     fn send_ok_response_vectored(
@@ -92,7 +97,7 @@ impl SenderKind {
     }
 
     forward_async_methods! {
-        async fn read(&mut self, buffer: BytesMut) -> (Result<usize, 
IggyError>, BytesMut);
+        async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<usize, 
IggyError>, B);
         async fn send_empty_ok_response(&mut self) -> Result<(), IggyError>;
         async fn send_ok_response(&mut self, payload: &[u8]) -> Result<(), 
IggyError>;
         async fn send_ok_response_vectored(&mut self, length: &[u8], slices: 
Vec<libc::iovec>) -> Result<(), IggyError>;
diff --git a/core/server/src/lib.rs b/core/server/src/lib.rs
index 3dcf64b2..4a5dbe53 100644
--- a/core/server/src/lib.rs
+++ b/core/server/src/lib.rs
@@ -18,6 +18,8 @@
 
 #[cfg(not(feature = "disable-mimalloc"))]
 use mimalloc::MiMalloc;
+use nix::libc::iovec;
+use nix::libc::c_void;
 
 #[cfg(not(feature = "disable-mimalloc"))]
 #[global_allocator]
@@ -53,3 +55,10 @@ pub fn map_toggle_str<'a>(enabled: bool) -> &'a str {
         false => "disabled",
     }
 }
+
+pub fn to_iovec<T>(data: &[T]) -> iovec {
+    iovec {
+        iov_base: data.as_ptr() as *mut c_void,
+        iov_len: data.len() * std::mem::size_of::<T>(),
+    }
+}
\ No newline at end of file
diff --git a/core/server/src/quic/quic_sender.rs 
b/core/server/src/quic/quic_sender.rs
index 17b60818..c6403349 100644
--- a/core/server/src/quic/quic_sender.rs
+++ b/core/server/src/quic/quic_sender.rs
@@ -36,7 +36,8 @@ pub struct QuicSender {
 }
 
 impl Sender for QuicSender {
-    async fn read(&mut self, buffer: BytesMut) -> (Result<usize, IggyError>, 
BytesMut) {
+    async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<usize, 
IggyError>, B) {
+        //TODO: Fixme
         // Not-so-nice code because quinn recv stream has different API for 
read_exact
         /*
         let read_bytes = buffer.len();
diff --git a/core/server/src/shard/system/clients.rs 
b/core/server/src/shard/system/clients.rs
index ee9e1634..4668b337 100644
--- a/core/server/src/shard/system/clients.rs
+++ b/core/server/src/shard/system/clients.rs
@@ -72,7 +72,7 @@ impl IggyShard {
         }
     }
 
-    pub async fn get_client(
+    pub fn get_client(
         &self,
         session: &Session,
         client_id: u32,
@@ -91,7 +91,7 @@ impl IggyShard {
         Ok(self.client_manager.borrow().try_get_client(client_id))
     }
 
-    pub async fn get_clients(&self, session: &Session) -> Result<Vec<Client>, 
IggyError> {
+    pub fn get_clients(&self, session: &Session) -> Result<Vec<Client>, 
IggyError> {
         self.ensure_authenticated(session)?;
         self.permissioner
             .borrow()
diff --git a/core/server/src/shard/system/consumer_groups.rs 
b/core/server/src/shard/system/consumer_groups.rs
index 768d4e5c..7b98205f 100644
--- a/core/server/src/shard/system/consumer_groups.rs
+++ b/core/server/src/shard/system/consumer_groups.rs
@@ -26,7 +26,6 @@ use crate::streaming::topics::consumer_group::ConsumerGroup;
 use error_set::ErrContext;
 use iggy_common::Identifier;
 use iggy_common::IggyError;
-use iggy_common::locking::IggySharedMutFn;
 
 impl IggyShard {
     pub fn get_consumer_group<'cg, 'stream>(
@@ -93,7 +92,7 @@ impl IggyShard {
         topic_id: &Identifier,
         group_id: Option<u32>,
         name: &str,
-    ) -> Result<u32, IggyError> {
+    ) -> Result<Identifier, IggyError> {
         self.ensure_authenticated(session)?;
         {
             let stream = self.get_stream(stream_id).with_error_context(|error| 
{
@@ -120,7 +119,7 @@ impl IggyShard {
             .create_consumer_group(group_id, name)
             .with_error_context(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to create 
consumer group with name: {name}")
-            }).map(|cg| cg.group_id)
+            })
     }
 
     pub async fn delete_consumer_group(
@@ -272,12 +271,7 @@ impl IggyShard {
                 .find_topic(session, &stream, topic_id)
                 .with_error_context(|error| {
                     format!(
-<<<<<<< HEAD
                         "{COMPONENT} (error: {error}) - topic not found for 
stream ID: {stream_id:?}, topic_id: {topic_id:?}"
-=======
-                        "{COMPONENT} (error: {error}) - topic not found for 
stream ID: {:?}, topic_id: {:?}",
-                        stream.stream_id, topic_id
->>>>>>> 48107890 (fix iggy shard errors)
                     )
                 })?;
 
diff --git a/core/server/src/shard/system/messages.rs 
b/core/server/src/shard/system/messages.rs
index 06499bd3..7668566a 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -86,11 +86,7 @@ impl IggyShard {
             topic
                 .store_consumer_offset_internal(polling_consumer, offset, 
partition_id)
                 .await
-<<<<<<< HEAD
-                .with_error_context(|error| format!("{COMPONENT} (error: 
{error}) - failed to store consumer offset internal, polling consumer: 
{polling_consumer}, offset: {offset}, partition ID: {partition_id}")) ?;
-=======
                 .with_error_context(|error| format!("{COMPONENT} (error: 
{error}) - failed to store consumer offset internal, polling consumer: {}, 
offset: {}, partition ID: {}", polling_consumer, offset, partition_id))?;
->>>>>>> 48107890 (fix iggy shard errors)
         }
 
         let batch_set = if let Some(encryptor) = &self.encryptor {
diff --git a/core/server/src/shard/system/personal_access_tokens.rs 
b/core/server/src/shard/system/personal_access_tokens.rs
index 082fcd93..8cc89cdc 100644
--- a/core/server/src/shard/system/personal_access_tokens.rs
+++ b/core/server/src/shard/system/personal_access_tokens.rs
@@ -28,7 +28,7 @@ use iggy_common::IggyTimestamp;
 use tracing::{error, info};
 
 impl IggyShard {
-    pub async fn get_personal_access_tokens(
+    pub fn get_personal_access_tokens(
         &self,
         session: &Session,
     ) -> Result<Vec<PersonalAccessToken>, IggyError> {
@@ -53,7 +53,7 @@ impl IggyShard {
         Ok(personal_access_tokens)
     }
 
-    pub async fn create_personal_access_token(
+    pub fn create_personal_access_token(
         &self,
         session: &Session,
         name: &str,
@@ -103,7 +103,7 @@ impl IggyShard {
         Ok(token)
     }
 
-    pub async fn delete_personal_access_token(
+    pub fn delete_personal_access_token(
         &self,
         session: &Session,
         name: &str,
@@ -135,7 +135,7 @@ impl IggyShard {
         Ok(())
     }
 
-    pub async fn login_with_personal_access_token(
+    pub fn login_with_personal_access_token(
         &self,
         token: &str,
         session: Option<&Session>,
diff --git a/core/server/src/shard/system/snapshot/mod.rs 
b/core/server/src/shard/system/snapshot/mod.rs
index 78c5d393..b4de2992 100644
--- a/core/server/src/shard/system/snapshot/mod.rs
+++ b/core/server/src/shard/system/snapshot/mod.rs
@@ -54,6 +54,11 @@ impl IggyShard {
             snapshot_types
         };
 
+        // TODO: Replace this with
+        // https://github.com/bearcove/rc-zip
+        // and impl the monoio async writer, based on this example:
+        // https://youtu.be/RYHYiXMJdZI?si=d2roKeHn5lJrw2ri&t=1140
+        // and rc-zip-tokio crate.
         let cursor = Cursor::new(Vec::new());
         let mut zip_writer = ZipFileWriter::new(cursor.compat_write());
 
diff --git a/core/server/src/shard/system/streams.rs 
b/core/server/src/shard/system/streams.rs
index 43b45246..3cb0da5b 100644
--- a/core/server/src/shard/system/streams.rs
+++ b/core/server/src/shard/system/streams.rs
@@ -57,7 +57,7 @@ impl IggyShard {
         Ok(self.get_streams())
     }
 
-    pub fn find_stream<'a>(
+    pub fn find_stream(
         &self,
         session: &Session,
         identifier: &Identifier,
@@ -182,7 +182,7 @@ impl IggyShard {
         session: &Session,
         stream_id: Option<u32>,
         name: &str,
-    ) -> Result<u32, IggyError> {
+    ) -> Result<Identifier, IggyError> {
         self.ensure_authenticated(session)?;
         self.permissioner
             .borrow()
@@ -220,7 +220,7 @@ impl IggyShard {
             .insert(name.to_owned(), stream.stream_id);
         self.streams.borrow_mut().insert(stream.stream_id, stream);
         self.metrics.increment_streams(1);
-        Ok(id)
+        Ok(Identifier::numeric(id)?)
     }
 
     pub async fn update_stream(
diff --git a/core/server/src/shard/system/topics.rs 
b/core/server/src/shard/system/topics.rs
index d118090f..477308bc 100644
--- a/core/server/src/shard/system/topics.rs
+++ b/core/server/src/shard/system/topics.rs
@@ -112,7 +112,7 @@ impl IggyShard {
         compression_algorithm: CompressionAlgorithm,
         max_topic_size: MaxTopicSize,
         replication_factor: Option<u8>,
-    ) -> Result<u32, IggyError> {
+    ) -> Result<Identifier, IggyError> {
         self.ensure_authenticated(session)?;
         {
             let stream = self.get_stream(stream_id).with_error_context(|error| 
{
@@ -151,7 +151,7 @@ impl IggyShard {
         self.metrics.increment_partitions(partitions_count);
         self.metrics.increment_segments(partitions_count);
 
-        Ok(created_topic_id)
+        Ok(Identifier::numeric(created_topic_id)?)
     }
 
     #[allow(clippy::too_many_arguments)]
@@ -165,7 +165,7 @@ impl IggyShard {
         compression_algorithm: CompressionAlgorithm,
         max_topic_size: MaxTopicSize,
         replication_factor: Option<u8>,
-    ) -> Result<u32, IggyError> {
+    ) -> Result<(), IggyError> {
         self.ensure_authenticated(session)?;
         {
             let stream = self.get_stream(stream_id).with_error_context(|error| 
{
@@ -213,14 +213,7 @@ impl IggyShard {
         // TODO: if message_expiry is changed, we need to check if we need to 
purge messages based on the new expiry
         // TODO: if max_size_bytes is changed, we need to check if we need to 
purge messages based on the new size
         // TODO: if replication_factor is changed, we need to do `something`
-        self.get_stream(stream_id)
-            .with_error_context(|error| {
-                format!("{COMPONENT} (error: {error}) - failed to get stream 
with ID: {stream_id}")
-            })?
-            .get_topic(topic_id)
-            .with_error_context(|error| {
-                format!("{COMPONENT} (error: {error}) - failed to get topic 
with ID: {topic_id} in stream with ID: {stream_id}")
-            }).map(|topic| topic.topic_id)
+        Ok(())
     }
 
     pub async fn delete_topic(
diff --git a/core/server/src/streaming/clients/client_manager.rs 
b/core/server/src/streaming/clients/client_manager.rs
index fc5c00cc..922f4f11 100644
--- a/core/server/src/streaming/clients/client_manager.rs
+++ b/core/server/src/streaming/clients/client_manager.rs
@@ -84,7 +84,7 @@ impl ClientManager {
             return Err(IggyError::ClientNotFound(client_id));
         }
 
-        let mut client = client.unwrap();
+        let client = client.unwrap();
         client.user_id = Some(user_id);
         Ok(())
     }
@@ -95,7 +95,7 @@ impl ClientManager {
             return Err(IggyError::ClientNotFound(client_id));
         }
 
-        let mut client = client.unwrap();
+        let client = client.unwrap();
         client.user_id = None;
         Ok(())
     }
@@ -104,6 +104,10 @@ impl ClientManager {
         self.clients.get(&client_id).cloned()
     }
 
+    pub fn try_get_client_mut(&mut self, client_id: u32) -> Option<&mut 
Client> {
+        self.clients.get_mut(&client_id)
+    }
+
     pub fn get_clients(&self) -> Vec<Client> {
         self.clients.values().cloned().collect()
     }
diff --git a/core/server/src/streaming/topics/consumer_groups.rs 
b/core/server/src/streaming/topics/consumer_groups.rs
index 92168738..a2efba95 100644
--- a/core/server/src/streaming/topics/consumer_groups.rs
+++ b/core/server/src/streaming/topics/consumer_groups.rs
@@ -172,7 +172,7 @@ impl Topic {
         &mut self,
         group_id: Option<u32>,
         name: &str,
-    ) -> Result<ConsumerGroup, IggyError> {
+    ) -> Result<Identifier, IggyError> {
         if self.consumer_groups_ids.contains_key(name) {
             return Err(IggyError::ConsumerGroupNameAlreadyExists(
                 name.to_owned(),
@@ -208,13 +208,12 @@ impl Topic {
         let consumer_group =
             ConsumerGroup::new(self.topic_id, id, name, self.partitions.len() 
as u32);
         self.consumer_groups_ids.insert(name.to_owned(), id);
-        let cloned_group = consumer_group.clone();
         self.consumer_groups.borrow_mut().insert(id, consumer_group);
         info!(
             "Created consumer group with ID: {} for topic with ID: {} and 
stream with ID: {}.",
             id, self.topic_id, self.stream_id
         );
-        Ok(cloned_group)
+        Ok(Identifier::numeric(id)?)
     }
 
     pub async fn delete_consumer_group(
diff --git a/core/server/src/streaming/utils/pooled_buffer.rs 
b/core/server/src/streaming/utils/pooled_buffer.rs
index 9c4375d2..0aa1dd3a 100644
--- a/core/server/src/streaming/utils/pooled_buffer.rs
+++ b/core/server/src/streaming/utils/pooled_buffer.rs
@@ -18,6 +18,7 @@
 
 use super::memory_pool::{BytesMutExt, memory_pool};
 use bytes::{Buf, BufMut, BytesMut};
+use monoio::buf::IoBufMut;
 use std::ops::{Deref, DerefMut};
 
 #[derive(Debug)]
@@ -215,3 +216,17 @@ impl Buf for PooledBuffer {
         self.inner.chunks_vectored(dst)
     }
 }
+
+unsafe impl IoBufMut for PooledBuffer {
+    fn write_ptr(&mut self) -> *mut u8 {
+        self.inner.write_ptr()
+    }
+
+    fn bytes_total(&mut self) -> usize {
+        self.inner.bytes_total()
+    }
+
+    unsafe fn set_init(&mut self, pos: usize) {
+        unsafe { self.inner.set_init(pos) }
+    }
+}
diff --git a/core/server/src/tcp/sender.rs b/core/server/src/tcp/sender.rs
index 6a10adb2..22796812 100644
--- a/core/server/src/tcp/sender.rs
+++ b/core/server/src/tcp/sender.rs
@@ -28,12 +28,13 @@ use tracing::debug;
 
 const STATUS_OK: &[u8] = &[0; 4];
 
-pub(crate) async fn read<T>(
+pub(crate) async fn read<T, B>(
     stream: &mut T,
-    buffer: BytesMut,
-) -> (Result<usize, IggyError>, BytesMut)
+    buffer: B,
+) -> (Result<usize, IggyError>, B)
 where
     T: AsyncReadRent + AsyncWriteRent + Unpin,
+    B: IoBufMut,
 {
     match stream.read_exact(buffer).await {
         (Ok(0), buffer) => (Err(IggyError::ConnectionClosed), buffer),
diff --git a/core/server/src/tcp/tcp_sender.rs 
b/core/server/src/tcp/tcp_sender.rs
index 81089c85..cfa25aad 100644
--- a/core/server/src/tcp/tcp_sender.rs
+++ b/core/server/src/tcp/tcp_sender.rs
@@ -33,7 +33,7 @@ pub struct TcpSender {
 }
 
 impl Sender for TcpSender {
-    async fn read(&mut self, buffer: BytesMut) -> (Result<usize, IggyError>, 
BytesMut) {
+    async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<usize, 
IggyError>, B) {
         sender::read(&mut self.stream, buffer).await
     }
 
diff --git a/core/server/src/tcp/tcp_tls_sender.rs 
b/core/server/src/tcp/tcp_tls_sender.rs
index 7213ad64..167427c1 100644
--- a/core/server/src/tcp/tcp_tls_sender.rs
+++ b/core/server/src/tcp/tcp_tls_sender.rs
@@ -22,6 +22,7 @@ use crate::{server_error::ServerError, tcp::sender};
 use bytes::BytesMut;
 use error_set::ErrContext;
 use iggy_common::IggyError;
+use monoio::buf::IoBufMut;
 use monoio::io::AsyncWriteRent;
 use monoio::net::TcpStream;
 use monoio_native_tls::TlsStream;
@@ -33,7 +34,7 @@ pub struct TcpTlsSender {
 }
 
 impl Sender for TcpTlsSender {
-    async fn read(&mut self, buffer: BytesMut) -> (Result<usize, IggyError>, 
BytesMut) {
+    async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<usize, 
IggyError>, B) {
         sender::read(&mut self.stream, buffer).await
     }
 

Reply via email to