This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch rebase_master
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 2ace7e8e53baf35ffaa1349bf56e07f7d0f0d9ff
Author: numinex <[email protected]>
AuthorDate: Thu Jun 26 16:34:52 2025 +0200

    fix iggy shard errors
---
 core/common/src/locking/mod.rs                     |   4 +-
 core/common/src/locking/tokio_lock.rs              |   3 +-
 core/server/src/binary/command.rs                  |   1 -
 .../create_consumer_group_handler.rs               |  13 +-
 core/server/src/binary/handlers/utils.rs           |  15 +-
 core/server/src/binary/mapper.rs                   |  14 +-
 core/server/src/http/consumer_groups.rs            |   6 +-
 core/server/src/http/jwt/cleaner.rs                |   8 +-
 core/server/src/http/jwt/jwt_manager.rs            |   6 +-
 core/server/src/http/mapper.rs                     |  14 +-
 core/server/src/http/messages.rs                   |   2 +-
 core/server/src/http/mod.rs                        |   2 +
 core/server/src/http/shared.rs                     |   5 +-
 core/server/src/http/system.rs                     |   2 +-
 core/server/src/shard/mod.rs                       |   2 +-
 core/server/src/shard/system/clients.rs            |  35 ++---
 core/server/src/shard/system/consumer_groups.rs    | 139 +++++++++++-------
 core/server/src/shard/system/consumer_offsets.rs   |  24 +++-
 core/server/src/shard/system/messages.rs           |  34 +++--
 core/server/src/shard/system/partitions.rs         |  48 +++++--
 .../src/shard/system/personal_access_tokens.rs     |  14 +-
 core/server/src/shard/system/segments.rs           |  40 +++++-
 core/server/src/shard/system/snapshot/mod.rs       |  62 ++++++---
 core/server/src/shard/system/stats.rs              |   8 +-
 core/server/src/shard/system/storage.rs            |   1 +
 core/server/src/shard/system/streams.rs            | 155 +++++++++++++--------
 core/server/src/shard/system/topics.rs             | 122 ++++++++--------
 core/server/src/shard/system/users.rs              | 154 ++++++++++++--------
 .../src/streaming/partitions/consumer_offsets.rs   |   5 +-
 core/server/src/streaming/storage.rs               |  29 ++--
 core/server/src/streaming/topics/consumer_group.rs |   5 +-
 .../server/src/streaming/topics/consumer_groups.rs |  22 +--
 core/server/src/streaming/topics/topic.rs          |   4 +-
 core/server/src/streaming/users/user.rs            |   2 +-
 34 files changed, 595 insertions(+), 405 deletions(-)

diff --git a/core/common/src/locking/mod.rs b/core/common/src/locking/mod.rs
index c2e542c7..429b4f59 100644
--- a/core/common/src/locking/mod.rs
+++ b/core/common/src/locking/mod.rs
@@ -37,11 +37,11 @@ pub type IggyRwLock<T> = 
fast_async_lock::IggyFastAsyncRwLock<T>;
 
 #[allow(async_fn_in_trait)]
 pub trait IggySharedMutFn<T> {
-    type ReadGuard<'a>: Deref<Target = T> 
+    type ReadGuard<'a>: Deref<Target = T>
     where
         T: 'a,
         Self: 'a;
-    type WriteGuard<'a>: DerefMut<Target = T> 
+    type WriteGuard<'a>: DerefMut<Target = T>
     where
         T: 'a,
         Self: 'a;
diff --git a/core/common/src/locking/tokio_lock.rs 
b/core/common/src/locking/tokio_lock.rs
index ef5baaad..d6a96f06 100644
--- a/core/common/src/locking/tokio_lock.rs
+++ b/core/common/src/locking/tokio_lock.rs
@@ -24,8 +24,7 @@ use tokio::sync::{RwLock as TokioRwLock, RwLockReadGuard, 
RwLockWriteGuard};
 #[derive(Debug)]
 pub struct IggyTokioRwLock<T>(Arc<TokioRwLock<T>>);
 
-impl<T> IggySharedMutFn<T> for IggyTokioRwLock<T>
-{
+impl<T> IggySharedMutFn<T> for IggyTokioRwLock<T> {
     type ReadGuard<'a>
         = RwLockReadGuard<'a, T>
     where
diff --git a/core/server/src/binary/command.rs 
b/core/server/src/binary/command.rs
index aa935e09..1ebac395 100644
--- a/core/server/src/binary/command.rs
+++ b/core/server/src/binary/command.rs
@@ -22,7 +22,6 @@ use crate::binary::sender::SenderKind;
 use crate::define_server_command_enum;
 use crate::shard::IggyShard;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use bytes::{BufMut, Bytes, BytesMut};
 use enum_dispatch::enum_dispatch;
 use iggy_common::change_password::ChangePassword;
diff --git 
a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
 
b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
index 3d87fa17..5d695687 100644
--- 
a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
@@ -16,14 +16,16 @@
  * under the License.
  */
 
+use std::rc::Rc;
+
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::binary::{handlers::consumer_groups::COMPONENT, sender::SenderKind};
+use crate::shard::IggyShard;
 use crate::state::command::EntryCommand;
 use crate::state::models::CreateConsumerGroupWithId;
 use crate::streaming::session::Session;
-use crate::streaming::systems::system::SharedSystem;
 use anyhow::Result;
 use error_set::ErrContext;
 use iggy_common::IggyError;
@@ -39,14 +41,13 @@ impl ServerCommandHandler for CreateConsumerGroup {
     async fn handle(
         self,
         sender: &mut SenderKind,
-        _length: u32,
-        session: &Session,
-        system: &SharedSystem,
+        length: u32,
+        session: &Rc<Session>,
+        shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
 
-        let mut system = system.write().await;
-        let consumer_group = system
+        let consumer_group = shard
                 .create_consumer_group(
                     session,
                     &self.stream_id,
diff --git a/core/server/src/binary/handlers/utils.rs 
b/core/server/src/binary/handlers/utils.rs
index 351f6297..80bcede4 100644
--- a/core/server/src/binary/handlers/utils.rs
+++ b/core/server/src/binary/handlers/utils.rs
@@ -26,12 +26,17 @@ pub async fn receive_and_validate(
     length: u32,
 ) -> Result<ServerCommand, IggyError> {
     let mut buffer = BytesMut::with_capacity(length as usize);
-    if length > 0 {
-        unsafe {
-            buffer.set_len(length as usize);
-        }
-        sender.read(&mut buffer).await?;
+    unsafe {
+        buffer.set_len(length as usize);
     }
+    let buffer = if length == 0 {
+        buffer
+    } else {
+        let (result, buffer) = sender.read(buffer).await;
+        result?;
+        buffer
+    };
+
     let command = ServerCommand::from_code_and_payload(code, buffer.freeze())?;
     command.validate()?;
     Ok(command)
diff --git a/core/server/src/binary/mapper.rs b/core/server/src/binary/mapper.rs
index 61c65235..219ceed9 100644
--- a/core/server/src/binary/mapper.rs
+++ b/core/server/src/binary/mapper.rs
@@ -24,9 +24,8 @@ use crate::streaming::topics::consumer_group::ConsumerGroup;
 use crate::streaming::topics::topic::Topic;
 use crate::streaming::users::user::User;
 use bytes::{BufMut, Bytes, BytesMut};
-use iggy_common::locking::{IggySharedMut, IggySharedMutFn};
+use iggy_common::locking::IggySharedMutFn;
 use iggy_common::{BytesSerializable, ConsumerOffsetInfo, Sizeable, Stats, 
UserId};
-use tokio::sync::RwLock;
 
 pub fn map_stats(stats: &Stats) -> Bytes {
     let mut bytes = BytesMut::with_capacity(104);
@@ -95,11 +94,10 @@ pub fn map_client(client: &Client) -> Bytes {
     bytes.freeze()
 }
 
-pub async fn map_clients(clients: &[IggySharedMut<Client>]) -> Bytes {
+pub async fn map_clients(clients: &[Client]) -> Bytes {
     let mut bytes = BytesMut::new();
     for client in clients {
-        let client = client.read().await;
-        extend_client(&client, &mut bytes);
+        extend_client(client, &mut bytes);
     }
     bytes.freeze()
 }
@@ -183,12 +181,11 @@ pub async fn map_topic(topic: &Topic) -> Bytes {
     bytes.freeze()
 }
 
-pub async fn map_consumer_group(consumer_group: &ConsumerGroup) -> Bytes {
+pub fn map_consumer_group(consumer_group: &ConsumerGroup) -> Bytes {
     let mut bytes = BytesMut::new();
     extend_consumer_group(consumer_group, &mut bytes);
     let members = consumer_group.get_members();
     for member in members {
-        let member = member.read().await;
         bytes.put_u32_le(member.id);
         let partitions = member.get_partitions();
         bytes.put_u32_le(partitions.len() as u32);
@@ -199,10 +196,9 @@ pub async fn map_consumer_group(consumer_group: 
&ConsumerGroup) -> Bytes {
     bytes.freeze()
 }
 
-pub async fn map_consumer_groups(consumer_groups: &[&RwLock<ConsumerGroup>]) 
-> Bytes {
+pub fn map_consumer_groups(consumer_groups: &[ConsumerGroup]) -> Bytes {
     let mut bytes = BytesMut::new();
     for consumer_group in consumer_groups {
-        let consumer_group = consumer_group.read().await;
         extend_consumer_group(&consumer_group, &mut bytes);
     }
     bytes.freeze()
diff --git a/core/server/src/http/consumer_groups.rs 
b/core/server/src/http/consumer_groups.rs
index dd48c8c2..d58faef3 100644
--- a/core/server/src/http/consumer_groups.rs
+++ b/core/server/src/http/consumer_groups.rs
@@ -72,7 +72,7 @@ async fn get_consumer_group(
     };
 
     let consumer_group = consumer_group.read().await;
-    let consumer_group = mapper::map_consumer_group(&consumer_group).await;
+    let consumer_group = mapper::map_consumer_group(&consumer_group);
     Ok(Json(consumer_group))
 }
 
@@ -89,7 +89,7 @@ async fn get_consumer_groups(
         &stream_id,
         &topic_id,
     )?;
-    let consumer_groups = mapper::map_consumer_groups(&consumer_groups).await;
+    let consumer_groups = mapper::map_consumer_groups(&consumer_groups);
     Ok(Json(consumer_groups))
 }
 
@@ -116,7 +116,7 @@ async fn create_consumer_group(
             .with_error_context(|error| format!("{COMPONENT} (error: {error}) 
- failed to create consumer group, stream ID: {}, topic ID: {}, group ID: 
{:?}", stream_id, topic_id, command.group_id))?;
     let consumer_group = consumer_group.read().await;
     let group_id = consumer_group.group_id;
-    let consumer_group_details = 
mapper::map_consumer_group(&consumer_group).await;
+    let consumer_group_details = mapper::map_consumer_group(&consumer_group);
     drop(consumer_group);
 
     let system = system.downgrade();
diff --git a/core/server/src/http/jwt/cleaner.rs 
b/core/server/src/http/jwt/cleaner.rs
index 40003eca..6d4329e7 100644
--- a/core/server/src/http/jwt/cleaner.rs
+++ b/core/server/src/http/jwt/cleaner.rs
@@ -19,12 +19,12 @@
 use crate::http::shared::AppState;
 use iggy_common::IggyTimestamp;
 use std::sync::Arc;
-use std::time::Duration;
+use monoio::time::Duration;
 use tracing::{error, trace};
 
 pub fn start_expired_tokens_cleaner(app_state: Arc<AppState>) {
-    tokio::spawn(async move {
-        let mut interval_timer = 
tokio::time::interval(Duration::from_secs(300));
+    monoio::spawn(async move {
+        let mut interval_timer = 
monoio::time::interval(Duration::from_secs(300));
         loop {
             interval_timer.tick().await;
             trace!("Deleting expired tokens...");
@@ -34,7 +34,7 @@ pub fn start_expired_tokens_cleaner(app_state: Arc<AppState>) 
{
                 .delete_expired_revoked_tokens(now)
                 .await
                 .unwrap_or_else(|err| {
-                    error!("Failed to delete expired revoked access tokens. 
Error: {err}",);
+                    error!("Failed to delete expired revoked access tokens. 
Error: {err}");
                 });
         }
     });
diff --git a/core/server/src/http/jwt/jwt_manager.rs 
b/core/server/src/http/jwt/jwt_manager.rs
index 56e2ce9b..dfe0f66b 100644
--- a/core/server/src/http/jwt/jwt_manager.rs
+++ b/core/server/src/http/jwt/jwt_manager.rs
@@ -28,7 +28,7 @@ use iggy_common::IggyError;
 use iggy_common::IggyExpiry;
 use iggy_common::IggyTimestamp;
 use iggy_common::UserId;
-use iggy_common::locking::IggySharedMut;
+use iggy_common::locking::IggyRwLock;
 use iggy_common::locking::IggySharedMutFn;
 use jsonwebtoken::{Algorithm, DecodingKey, EncodingKey, Header, TokenData, 
Validation, encode};
 use std::sync::Arc;
@@ -54,7 +54,7 @@ pub struct JwtManager {
     issuer: IssuerOptions,
     validator: ValidatorOptions,
     tokens_storage: TokenStorage,
-    revoked_tokens: IggySharedMut<AHashMap<String, u64>>,
+    revoked_tokens: IggyRwLock<AHashMap<String, u64>>,
     validations: AHashMap<Algorithm, Validation>,
 }
 
@@ -77,7 +77,7 @@ impl JwtManager {
             issuer,
             validator,
             tokens_storage: TokenStorage::new(persister, path),
-            revoked_tokens: IggySharedMut::new(AHashMap::new()),
+            revoked_tokens: IggyRwLock::new(AHashMap::new()),
         })
     }
 
diff --git a/core/server/src/http/mapper.rs b/core/server/src/http/mapper.rs
index ed9ebba3..d839b9d9 100644
--- a/core/server/src/http/mapper.rs
+++ b/core/server/src/http/mapper.rs
@@ -28,12 +28,11 @@ use iggy_common::PersonalAccessTokenInfo;
 use iggy_common::Sizeable;
 use iggy_common::StreamDetails;
 use iggy_common::TopicDetails;
-use iggy_common::locking::IggySharedMut;
 use iggy_common::locking::IggySharedMutFn;
 use iggy_common::{ConsumerGroupDetails, ConsumerGroupMember};
 use iggy_common::{IdentityInfo, TokenInfo};
 use iggy_common::{UserInfo, UserInfoDetails};
-use tokio::sync::RwLock;
+
 
 pub fn map_stream(stream: &Stream) -> StreamDetails {
     let topics = map_topics(&stream.get_topics());
@@ -177,10 +176,9 @@ pub fn map_client(client: &Client) -> 
iggy_common::ClientInfoDetails {
     }
 }
 
-pub async fn map_clients(clients: &[IggySharedMut<Client>]) -> 
Vec<iggy_common::ClientInfo> {
+pub fn map_clients(clients: &[Client]) -> Vec<iggy_common::ClientInfo> {
     let mut all_clients = Vec::new();
     for client in clients {
-        let client = client.read().await;
         let client = iggy_common::ClientInfo {
             client_id: client.session.client_id,
             user_id: client.user_id,
@@ -195,12 +193,11 @@ pub async fn map_clients(clients: 
&[IggySharedMut<Client>]) -> Vec<iggy_common::
     all_clients
 }
 
-pub async fn map_consumer_groups(
-    consumer_groups: &[&RwLock<ConsumerGroup>],
+pub fn map_consumer_groups(
+    consumer_groups: &[ConsumerGroup],
 ) -> Vec<iggy_common::ConsumerGroup> {
     let mut groups = Vec::new();
     for consumer_group in consumer_groups {
-        let consumer_group = consumer_group.read().await;
         let consumer_group = iggy_common::ConsumerGroup {
             id: consumer_group.group_id,
             name: consumer_group.name.clone(),
@@ -213,7 +210,7 @@ pub async fn map_consumer_groups(
     groups
 }
 
-pub async fn map_consumer_group(consumer_group: &ConsumerGroup) -> 
ConsumerGroupDetails {
+pub fn map_consumer_group(consumer_group: &ConsumerGroup) -> 
ConsumerGroupDetails {
     let mut consumer_group_details = ConsumerGroupDetails {
         id: consumer_group.group_id,
         name: consumer_group.name.clone(),
@@ -223,7 +220,6 @@ pub async fn map_consumer_group(consumer_group: 
&ConsumerGroup) -> ConsumerGroup
     };
     let members = consumer_group.get_members();
     for member in members {
-        let member = member.read().await;
         let partitions = member.get_partitions();
         consumer_group_details.members.push(ConsumerGroupMember {
             id: member.id,
diff --git a/core/server/src/http/messages.rs b/core/server/src/http/messages.rs
index 251c503b..e289adb9 100644
--- a/core/server/src/http/messages.rs
+++ b/core/server/src/http/messages.rs
@@ -20,9 +20,9 @@ use crate::http::COMPONENT;
 use crate::http::error::CustomError;
 use crate::http::jwt::json_web_token::Identity;
 use crate::http::shared::AppState;
+use crate::shard::system::messages::PollingArgs;
 use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut};
 use crate::streaming::session::Session;
-use crate::streaming::systems::messages::PollingArgs;
 use crate::streaming::utils::PooledBuffer;
 use axum::extract::{Path, Query, State};
 use axum::http::StatusCode;
diff --git a/core/server/src/http/mod.rs b/core/server/src/http/mod.rs
index 194ed74a..7b24bfff 100644
--- a/core/server/src/http/mod.rs
+++ b/core/server/src/http/mod.rs
@@ -16,6 +16,7 @@
  * under the License.
  */
 
+/*
 pub mod consumer_groups;
 pub mod consumer_offsets;
 pub mod diagnostics;
@@ -32,5 +33,6 @@ pub mod streams;
 pub mod system;
 pub mod topics;
 pub mod users;
+*/
 
 pub const COMPONENT: &str = "HTTP";
diff --git a/core/server/src/http/shared.rs b/core/server/src/http/shared.rs
index 50703357..a21ddb1c 100644
--- a/core/server/src/http/shared.rs
+++ b/core/server/src/http/shared.rs
@@ -16,14 +16,13 @@
  * under the License.
  */
 
-use crate::http::jwt::jwt_manager::JwtManager;
-use crate::streaming::systems::system::SharedSystem;
+use crate::{http::jwt::jwt_manager::JwtManager, shard::IggyShard};
 use std::net::SocketAddr;
 use ulid::Ulid;
 
 pub struct AppState {
     pub jwt_manager: JwtManager,
-    pub system: SharedSystem,
+    pub shard: IggyShard,
 }
 
 #[derive(Debug, Copy, Clone)]
diff --git a/core/server/src/http/system.rs b/core/server/src/http/system.rs
index 1c7166e3..4428d36c 100644
--- a/core/server/src/http/system.rs
+++ b/core/server/src/http/system.rs
@@ -114,7 +114,7 @@ async fn get_clients(
                 identity.user_id
             )
         })?;
-    let clients = mapper::map_clients(&clients).await;
+    let clients = mapper::map_clients(&clients);
     Ok(Json(clients))
 }
 
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 12ba64bb..b5e48525 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -461,6 +461,6 @@ impl IggyShard {
                     Err(IggyError::Unauthenticated)
                 }
             })?;
-            Ok(user_id)
+        Ok(user_id)
     }
 }
diff --git a/core/server/src/shard/system/clients.rs 
b/core/server/src/shard/system/clients.rs
index a2debc42..ee9e1634 100644
--- a/core/server/src/shard/system/clients.rs
+++ b/core/server/src/shard/system/clients.rs
@@ -16,17 +16,16 @@
  * under the License.
  */
 
+use super::COMPONENT;
 use crate::shard::IggyShard;
 use crate::streaming::clients::client_manager::{Client, Transport};
 use crate::streaming::session::Session;
 use error_set::ErrContext;
 use iggy_common::Identifier;
 use iggy_common::IggyError;
-use iggy_common::locking::IggyRwLock;
 use iggy_common::locking::IggySharedMutFn;
 use std::net::SocketAddr;
 use std::rc::Rc;
-use std::sync::Arc;
 use tracing::{error, info};
 
 impl IggyShard {
@@ -43,7 +42,7 @@ impl IggyShard {
 
         {
             let mut client_manager = self.client_manager.borrow_mut();
-            let client = client_manager.delete_client(client_id).await;
+            let client = client_manager.delete_client(client_id);
             if client.is_none() {
                 error!("Client with ID: {client_id} was not found in the 
client manager.",);
                 return;
@@ -51,7 +50,6 @@ impl IggyShard {
 
             self.metrics.decrement_clients(1);
             let client = client.unwrap();
-            let client = client.read().await;
             consumer_groups = client
                 .consumer_groups
                 .iter()
@@ -65,14 +63,12 @@ impl IggyShard {
         }
 
         for (stream_id, topic_id, consumer_group_id) in 
consumer_groups.into_iter() {
-            _ = self
-                .leave_consumer_group_by_client(
-                    &Identifier::numeric(stream_id).unwrap(),
-                    &Identifier::numeric(topic_id).unwrap(),
-                    &Identifier::numeric(consumer_group_id).unwrap(),
-                    client_id,
-                )
-                .await
+            _ = self.leave_consumer_group_by_client(
+                &Identifier::numeric(stream_id).unwrap(),
+                &Identifier::numeric(topic_id).unwrap(),
+                &Identifier::numeric(consumer_group_id).unwrap(),
+                client_id,
+            )
         }
     }
 
@@ -80,9 +76,10 @@ impl IggyShard {
         &self,
         session: &Session,
         client_id: u32,
-    ) -> Result<Option<IggySharedMut<Client>>, IggyError> {
+    ) -> Result<Option<Client>, IggyError> {
         self.ensure_authenticated(session)?;
         self.permissioner
+        .borrow()
             .get_client(session.get_user_id())
             .with_error_context(|error| {
                 format!(
@@ -91,16 +88,13 @@ impl IggyShard {
                 )
             })?;
 
-        let client_manager = self.client_manager.read().await;
-        Ok(client_manager.try_get_client(client_id))
+        Ok(self.client_manager.borrow().try_get_client(client_id))
     }
 
-    pub async fn get_clients(
-        &self,
-        session: &Session,
-    ) -> Result<Vec<IggySharedMut<Client>>, IggyError> {
+    pub async fn get_clients(&self, session: &Session) -> Result<Vec<Client>, 
IggyError> {
         self.ensure_authenticated(session)?;
         self.permissioner
+            .borrow()
             .get_clients(session.get_user_id())
             .with_error_context(|error| {
                 format!(
@@ -109,7 +103,6 @@ impl IggyShard {
                 )
             })?;
 
-        let client_manager = self.client_manager.read().await;
-        Ok(client_manager.get_clients())
+        Ok(self.client_manager.borrow().get_clients())
     }
 }
diff --git a/core/server/src/shard/system/consumer_groups.rs 
b/core/server/src/shard/system/consumer_groups.rs
index d8d8818e..768d4e5c 100644
--- a/core/server/src/shard/system/consumer_groups.rs
+++ b/core/server/src/shard/system/consumer_groups.rs
@@ -16,29 +16,39 @@
  * under the License.
  */
 
+use std::cell::Ref;
+
+use super::COMPONENT;
 use crate::shard::IggyShard;
 use crate::streaming::session::Session;
+use crate::streaming::streams::stream::Stream;
 use crate::streaming::topics::consumer_group::ConsumerGroup;
 use error_set::ErrContext;
 use iggy_common::Identifier;
 use iggy_common::IggyError;
 use iggy_common::locking::IggySharedMutFn;
-use tokio::sync::RwLock;
 
 impl IggyShard {
-    pub fn get_consumer_group(
+    pub fn get_consumer_group<'cg, 'stream>(
         &self,
         session: &Session,
-        stream_id: &Identifier,
+        stream: &'stream Stream,
         topic_id: &Identifier,
         group_id: &Identifier,
-    ) -> Result<Option<&RwLock<ConsumerGroup>>, IggyError> {
+    ) -> Result<Option<Ref<'cg, ConsumerGroup>>, IggyError>
+    where
+        'stream: 'cg,
+    {
         self.ensure_authenticated(session)?;
-        let Some(topic) = self.try_find_topic(session, stream_id, topic_id)? 
else {
-            return Ok(None);
-        };
+        let stream_id = stream.stream_id;
+        let topic = stream.get_topic(topic_id).with_error_context(|error| {
+            format!(
+                "{COMPONENT} (error: {error}) - topic with ID: {topic_id} was 
not found in stream with ID: {stream_id}",
+            )
+        })?;
 
         self.permissioner
+        .borrow()
             .get_consumer_group(session.get_user_id(), topic.stream_id, 
topic.topic_id)
             .with_error_context(|error| {
                 format!(
@@ -55,12 +65,16 @@ impl IggyShard {
         session: &Session,
         stream_id: &Identifier,
         topic_id: &Identifier,
-    ) -> Result<Vec<&RwLock<ConsumerGroup>>, IggyError> {
+    ) -> Result<Vec<ConsumerGroup>, IggyError> {
         self.ensure_authenticated(session)?;
-        let topic = self.find_topic(session, stream_id, topic_id)
+        let stream = self.get_stream(stream_id).with_error_context(|error| {
+            format!("{COMPONENT} (error: {error}) - stream with ID: 
{stream_id} was not found")
+        })?;
+        let topic = self.find_topic(session, &stream, topic_id)
             .with_error_context(|error| format!("{COMPONENT} (error: {error}) 
- topic with ID: {topic_id} was not found in stream with ID: {stream_id}"))?;
 
         self.permissioner
+        .borrow()
             .get_consumer_groups(session.get_user_id(), topic.stream_id, 
topic.topic_id)
             .with_error_context(|error| {
                 format!(
@@ -72,40 +86,45 @@ impl IggyShard {
         Ok(topic.get_consumer_groups())
     }
 
-    pub async fn create_consumer_group(
-        &mut self,
+    pub fn create_consumer_group(
+        &self,
         session: &Session,
         stream_id: &Identifier,
         topic_id: &Identifier,
         group_id: Option<u32>,
         name: &str,
-    ) -> Result<&RwLock<ConsumerGroup>, IggyError> {
+    ) -> Result<u32, IggyError> {
         self.ensure_authenticated(session)?;
         {
-            let topic = self.find_topic(session, stream_id, topic_id)
+            let stream = self.get_stream(stream_id).with_error_context(|error| 
{
+                format!(
+                    "{COMPONENT} (error: {error}) - stream not found for 
stream ID: {stream_id}"
+                )
+            })?;
+            let topic = self.find_topic(session, &stream, topic_id)
                 .with_error_context(|error| format!("{COMPONENT} (error: 
{error}) - topic not found for stream ID: {stream_id}, topic_id: {topic_id}"))?;
 
-            self.permissioner.create_consumer_group(
+            self.permissioner.borrow().create_consumer_group(
                 session.get_user_id(),
                 topic.stream_id,
                 topic.topic_id,
             ).with_error_context(|error| format!("{COMPONENT} (error: {error}) 
- permission denied to create consumer group for user {} on stream ID: {}, 
topic ID: {}", session.get_user_id(), topic.stream_id, topic.topic_id))?;
         }
 
-        let topic = self.get_stream_mut(stream_id)?
-            .get_topic_mut(topic_id)
+        let mut stream = self.get_stream_mut(stream_id)
+            .with_error_context(|error| format!("{COMPONENT} (error: {error}) 
- failed to get mutable reference to stream with ID: {stream_id}"))?;
+        let topic = stream.get_topic_mut(topic_id)
             .with_error_context(|error| format!("{COMPONENT} (error: {error}) 
- topic not found for stream ID: {stream_id}, topic_id: {topic_id}"))?;
 
         topic
             .create_consumer_group(group_id, name)
-            .await
             .with_error_context(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to create 
consumer group with name: {name}")
-            })
+            }).map(|cg| cg.group_id)
     }
 
     pub async fn delete_consumer_group(
-        &mut self,
+        &self,
         session: &Session,
         stream_id: &Identifier,
         topic_id: &Identifier,
@@ -115,10 +134,15 @@ impl IggyShard {
         let stream_id_value;
         let topic_id_value;
         {
-            let topic = self.find_topic(session, stream_id, topic_id)
+            let stream = self.get_stream(stream_id).with_error_context(|error| 
{
+                format!(
+                    "{COMPONENT} (error: {error}) - stream not found for 
stream ID: {stream_id}"
+                )
+            })?;
+            let topic = self.find_topic(session, &stream, topic_id)
                 .with_error_context(|error| format!("{COMPONENT} (error: 
{error}) - topic not found for stream ID: {stream_id}, topic_id: {topic_id}"))?;
 
-            self.permissioner.delete_consumer_group(
+            self.permissioner.borrow().delete_consumer_group(
                 session.get_user_id(),
                 topic.stream_id,
                 topic.topic_id,
@@ -130,7 +154,7 @@ impl IggyShard {
 
         let consumer_group;
         {
-            let stream = 
self.get_stream_mut(stream_id).with_error_context(|error| {
+            let mut stream = 
self.get_stream_mut(stream_id).with_error_context(|error| {
                 format!(
                     "{COMPONENT} (error: {error}) - failed to get mutable 
reference to stream with id: {stream_id}"
                 )
@@ -142,25 +166,20 @@ impl IggyShard {
                 .with_error_context(|error| format!("{COMPONENT} (error: 
{error}) - failed to delete consumer group with ID: {consumer_group_id}"))?
         }
 
-        let client_manager = self.client_manager.read().await;
-        let consumer_group = consumer_group.read().await;
         for member in consumer_group.get_members() {
-            let member = member.read().await;
-            client_manager
-                .leave_consumer_group(
+            self.client_manager.borrow_mut().leave_consumer_group(
                     member.id,
                     stream_id_value,
                     topic_id_value,
                     consumer_group.group_id,
                 )
-                .await
                 .with_error_context(|error| format!("{COMPONENT} (error: 
{error}) - failed to make client leave consumer group for client ID: {}, group 
ID: {}", member.id, consumer_group.group_id))?;
         }
 
         Ok(())
     }
 
-    pub async fn join_consumer_group(
+    pub fn join_consumer_group(
         &self,
         session: &Session,
         stream_id: &Identifier,
@@ -171,15 +190,20 @@ impl IggyShard {
         let stream_id_value;
         let topic_id_value;
         {
+            let stream = self.get_stream(stream_id).with_error_context(|error| 
{
+                format!(
+                    "{COMPONENT} (error: {error}) - stream not found for 
stream ID: {stream_id}"
+                )
+            })?;
             let topic = self
-                .find_topic(session, stream_id, topic_id)
+                .find_topic(session, &stream, topic_id)
                 .with_error_context(|error| {
                     format!(
                         "{COMPONENT} (error: {error}) - topic not found for 
stream ID: {stream_id}, topic_id: {topic_id}",
                     )
                 })?;
 
-            self.permissioner.join_consumer_group(
+            self.permissioner.borrow().join_consumer_group(
                 session.get_user_id(),
                 topic.stream_id,
                 topic.topic_id,
@@ -191,7 +215,14 @@ impl IggyShard {
 
         let group_id;
         {
-            let topic = self.find_topic(session, stream_id, topic_id)?;
+            let mut stream = self.get_stream_mut(stream_id)
+                .with_error_context(|error| {
+                    format!("{COMPONENT} (error: {error}) - failed to get 
mutable reference to stream with ID: {stream_id}")
+                })?;
+            let topic = stream.get_topic_mut(topic_id)
+                .with_error_context(|error| {
+                    format!("{COMPONENT} (error: {error}) - failed to get 
mutable reference to topic with ID: {topic_id}")
+                })?;
 
             {
                 let consumer_group = topic
@@ -202,13 +233,11 @@ impl IggyShard {
                         )
                     })?;
 
-                let consumer_group = consumer_group.read().await;
                 group_id = consumer_group.group_id;
             }
 
             topic
                 .join_consumer_group(consumer_group_id, session.client_id)
-                .await
                 .with_error_context(|error| {
                     format!(
                         "{COMPONENT} (error: {error}) - failed to join 
consumer group for group ID: {group_id}"
@@ -216,10 +245,7 @@ impl IggyShard {
                 })?;
         }
 
-        let client_manager = self.client_manager.read().await;
-        client_manager
-            .join_consumer_group(session.client_id, stream_id_value, 
topic_id_value, group_id)
-            .await
+        
self.client_manager.borrow_mut().join_consumer_group(session.client_id, 
stream_id_value, topic_id_value, group_id)
             .with_error_context(|error| {
                 format!(
                     "{COMPONENT} (error: {error}) - failed to make client join 
consumer group for client ID: {}",
@@ -239,15 +265,23 @@ impl IggyShard {
     ) -> Result<(), IggyError> {
         self.ensure_authenticated(session)?;
         {
+            let stream = self.get_stream(stream_id).with_error_context(|error| 
{
+                format!("{COMPONENT} (error: {error}) - failed to get stream 
with ID: {stream_id}")
+            })?;
             let topic = self
-                .find_topic(session, stream_id, topic_id)
+                .find_topic(session, &stream, topic_id)
                 .with_error_context(|error| {
                     format!(
+<<<<<<< HEAD
                         "{COMPONENT} (error: {error}) - topic not found for 
stream ID: {stream_id:?}, topic_id: {topic_id:?}"
+=======
+                        "{COMPONENT} (error: {error}) - topic not found for 
stream ID: {:?}, topic_id: {:?}",
+                        stream.stream_id, topic_id
+>>>>>>> 48107890 (fix iggy shard errors)
                     )
                 })?;
 
-            self.permissioner.leave_consumer_group(
+            self.permissioner.borrow().leave_consumer_group(
                 session.get_user_id(),
                 topic.stream_id,
                 topic.topic_id,
@@ -260,7 +294,6 @@ impl IggyShard {
             consumer_group_id,
             session.client_id,
         )
-        .await
         .with_error_context(|error| {
             format!(
                 "{COMPONENT} (error: {error}) - failed to leave consumer group 
for client ID: {}",
@@ -269,7 +302,7 @@ impl IggyShard {
         })
     }
 
-    pub async fn leave_consumer_group_by_client(
+    pub fn leave_consumer_group_by_client(
         &self,
         stream_id: &Identifier,
         topic_id: &Identifier,
@@ -281,15 +314,17 @@ impl IggyShard {
         let group_id;
 
         {
-            let stream = self.get_stream(stream_id).with_error_context(|error| 
{
+            let mut stream = 
self.get_stream_mut(stream_id).with_error_context(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to get stream 
with ID: {stream_id}")
             })?;
-            let topic = stream.get_topic(topic_id)
+            let stream_id = stream.stream_id;
+            let topic = stream.get_topic_mut(topic_id)
                 .with_error_context(|error| {
                     format!(
                         "{COMPONENT} (error: {error}) - topic not found for 
stream ID: {stream_id}, topic_id: {topic_id}",
                     )
                 })?;
+            let topic_id = topic.topic_id;
             {
                 let consumer_group = topic
                     .get_consumer_group(consumer_group_id)
@@ -298,23 +333,23 @@ impl IggyShard {
                         "{COMPONENT} (error: {error}) - consumer group not 
found for group_id: {consumer_group_id}",
                     )
                     })?;
-                let consumer_group = consumer_group.read().await;
                 group_id = consumer_group.group_id;
             }
 
-            stream_id_value = stream.stream_id;
-            topic_id_value = topic.topic_id;
+            stream_id_value = stream_id;
+            topic_id_value = topic_id;
             topic
                 .leave_consumer_group(consumer_group_id, client_id)
-                .await
                 .with_error_context(|error| {
                     format!("{COMPONENT} (error: {error}) - failed leave 
consumer group, client ID {client_id}",)
                 })?;
         }
 
-        let client_manager = self.client_manager.read().await;
-        client_manager
-            .leave_consumer_group(client_id, stream_id_value, topic_id_value, 
group_id)
-            .await
+        self.client_manager.borrow_mut().leave_consumer_group(
+            client_id,
+            stream_id_value,
+            topic_id_value,
+            group_id,
+        )
     }
 }
diff --git a/core/server/src/shard/system/consumer_offsets.rs 
b/core/server/src/shard/system/consumer_offsets.rs
index 0c7438a6..e5504173 100644
--- a/core/server/src/shard/system/consumer_offsets.rs
+++ b/core/server/src/shard/system/consumer_offsets.rs
@@ -16,7 +16,8 @@
  * under the License.
  */
 
-use crate::streaming::session::Session;
+use super::COMPONENT;
+use crate::{shard::IggyShard, streaming::session::Session};
 use error_set::ErrContext;
 use iggy_common::{Consumer, ConsumerOffsetInfo, Identifier, IggyError};
 
@@ -31,9 +32,12 @@ impl IggyShard {
         offset: u64,
     ) -> Result<(), IggyError> {
         self.ensure_authenticated(session)?;
-        let topic = self.find_topic(session, stream_id, topic_id)
+        let stream = self.get_stream(stream_id).with_error_context(|error| {
+            format!("{COMPONENT} (error: {error}) - stream with ID: 
{stream_id} was not found")
+        })?;
+        let topic = self.find_topic(session, &stream, topic_id)
             .with_error_context(|error| format!("{COMPONENT} (error: {error}) 
- topic with ID: {topic_id} was not found in stream with ID: {stream_id}"))?;
-        self.permissioner.store_consumer_offset(
+        self.permissioner.borrow().store_consumer_offset(
             session.get_user_id(),
             topic.stream_id,
             topic.topic_id,
@@ -53,11 +57,14 @@ impl IggyShard {
         partition_id: Option<u32>,
     ) -> Result<Option<ConsumerOffsetInfo>, IggyError> {
         self.ensure_authenticated(session)?;
-        let Some(topic) = self.try_find_topic(session, stream_id, topic_id)? 
else {
+        let stream = self.get_stream(stream_id).with_error_context(|error| {
+            format!("{COMPONENT} (error: {error}) - stream with ID: 
{stream_id} was not found")
+        })?;
+        let Some(topic) = self.try_find_topic(session, &stream, topic_id)? 
else {
             return Ok(None);
         };
 
-        self.permissioner.get_consumer_offset(
+        self.permissioner.borrow().get_consumer_offset(
             session.get_user_id(),
             topic.stream_id,
             topic.topic_id,
@@ -82,9 +89,12 @@ impl IggyShard {
         partition_id: Option<u32>,
     ) -> Result<(), IggyError> {
         self.ensure_authenticated(session)?;
-        let topic = self.find_topic(session, stream_id, topic_id)
+        let stream = self.get_stream(stream_id).with_error_context(|error| {
+            format!("{COMPONENT} (error: {error}) - stream with ID: 
{stream_id} was not found")
+        })?;
+        let topic = self.find_topic(session, &stream, topic_id)
             .with_error_context(|error| format!("{COMPONENT} (error: {error}) 
- topic with ID: {topic_id} was not found in stream with ID: {stream_id}"))?;
-        self.permissioner.delete_consumer_offset(
+        self.permissioner.borrow().delete_consumer_offset(
             session.get_user_id(),
             topic.stream_id,
             topic.topic_id,
diff --git a/core/server/src/shard/system/messages.rs 
b/core/server/src/shard/system/messages.rs
index 54046dba..06499bd3 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -16,11 +16,13 @@
  * under the License.
  */
 
+use super::COMPONENT;
 use crate::binary::handlers::messages::poll_messages_handler::IggyPollMetadata;
 use crate::shard::IggyShard;
 use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut, 
IggyMessagesBatchSet};
 use crate::streaming::session::Session;
 use crate::streaming::utils::PooledBuffer;
+use async_zip::tokio::read::stream;
 use error_set::ErrContext;
 use iggy_common::{
     BytesSerializable, Confirmation, Consumer, EncryptorKind, 
IGGY_MESSAGE_HEADER_SIZE, Identifier,
@@ -43,8 +45,13 @@ impl IggyShard {
             return Err(IggyError::InvalidMessagesCount);
         }
 
-        let topic = self.find_topic(session, stream_id, 
topic_id).with_error_context(|error| format!("{COMPONENT} (error: {error}) - 
topic not found for stream ID: {stream_id}, topic_id: {topic_id}"))?;
+        let stream = self.get_stream(stream_id).with_error_context(|error| {
+            format!("{COMPONENT} (error: {error}) - stream not found for 
stream ID: {stream_id}")
+        })?;
+        let stream_id = stream.stream_id;
+        let topic = self.find_topic(session, &stream, 
topic_id).with_error_context(|error| format!("{COMPONENT} (error: {error}) - 
topic not found for stream ID: {stream_id}, topic_id: {topic_id}"))?;
         self.permissioner
+            .borrow()
             .poll_messages(session.get_user_id(), topic.stream_id, 
topic.topic_id)
             .with_error_context(|error| format!(
                 "{COMPONENT} (error: {error}) - permission denied to poll 
messages for user {} on stream ID: {}, topic ID: {}",
@@ -60,7 +67,6 @@ impl IggyShard {
         // There might be no partition assigned, if it's the consumer group 
member without any partitions.
         let Some((polling_consumer, partition_id)) = topic
             .resolve_consumer_with_partition_id(consumer, session.client_id, 
partition_id, true)
-            .await
             .with_error_context(|error| format!("{COMPONENT} (error: {error}) 
- failed to resolve consumer with partition id, consumer: {consumer}, client 
ID: {}, partition ID: {:?}", session.client_id, partition_id))? else {
             return Ok((IggyPollMetadata::new(0, 0), 
IggyMessagesBatchSet::empty()));
         };
@@ -80,11 +86,15 @@ impl IggyShard {
             topic
                 .store_consumer_offset_internal(polling_consumer, offset, 
partition_id)
                 .await
+<<<<<<< HEAD
                 .with_error_context(|error| format!("{COMPONENT} (error: 
{error}) - failed to store consumer offset internal, polling consumer: 
{polling_consumer}, offset: {offset}, partition ID: {partition_id}")) ?;
+=======
+                .with_error_context(|error| format!("{COMPONENT} (error: 
{error}) - failed to store consumer offset internal, polling consumer: {}, 
offset: {}, partition ID: {}", polling_consumer, offset, partition_id))?;
+>>>>>>> 48107890 (fix iggy shard errors)
         }
 
         let batch_set = if let Some(encryptor) = &self.encryptor {
-            self.decrypt_messages(batch_set, encryptor.as_ref()).await?
+            self.decrypt_messages(batch_set, encryptor).await?
         } else {
             batch_set
         };
@@ -102,8 +112,12 @@ impl IggyShard {
         confirmation: Option<Confirmation>,
     ) -> Result<(), IggyError> {
         self.ensure_authenticated(session)?;
-        let topic = self.find_topic(session, stream_id, 
topic_id).with_error_context(|error| format!("{COMPONENT} (error: {error}) - 
topic not found for stream_id: {stream_id}, topic_id: {topic_id}"))?;
-        self.permissioner.append_messages(
+        let stream = self.get_stream(stream_id).with_error_context(|error| {
+            format!("{COMPONENT} (error: {error}) - stream not found for 
stream_id: {stream_id}")
+        })?;
+        let stream_id = stream.stream_id;
+        let topic = self.find_topic(session, &stream, 
topic_id).with_error_context(|error| format!("{COMPONENT} (error: {error}) - 
topic not found for stream_id: {stream_id}, topic_id: {topic_id}"))?;
+        self.permissioner.borrow().append_messages(
             session.get_user_id(),
             topic.stream_id,
             topic.topic_id
@@ -117,7 +131,7 @@ impl IggyShard {
 
         // Encrypt messages if encryptor is configured
         let messages = if let Some(encryptor) = &self.encryptor {
-            self.encrypt_messages(messages, encryptor.as_ref())?
+            self.encrypt_messages(messages, encryptor)?
         } else {
             messages
         };
@@ -139,8 +153,12 @@ impl IggyShard {
         fsync: bool,
     ) -> Result<(), IggyError> {
         self.ensure_authenticated(session)?;
-        let topic = self.find_topic(session, &stream_id, 
&topic_id).with_error_context(|error| format!("{COMPONENT} (error: {error}) - 
topic not found for stream ID: {stream_id}, topic_id: {topic_id}"))?;
-        self.permissioner.append_messages(
+        let stream = self.get_stream(&stream_id).with_error_context(|error| {
+            format!("{COMPONENT} (error: {error}) - stream not found for 
stream ID: {stream_id}")
+        })?;
+        let stream_id = stream.stream_id;
+        let topic = self.find_topic(session, &stream, 
&topic_id).with_error_context(|error| format!("{COMPONENT} (error: {error}) - 
topic not found for stream ID: {stream_id}, topic ID: {topic_id}"))?;
+        self.permissioner.borrow().append_messages(
             session.get_user_id(),
             topic.stream_id,
             topic.topic_id
diff --git a/core/server/src/shard/system/partitions.rs 
b/core/server/src/shard/system/partitions.rs
index 7712f498..01e02968 100644
--- a/core/server/src/shard/system/partitions.rs
+++ b/core/server/src/shard/system/partitions.rs
@@ -16,16 +16,16 @@
  * under the License.
  */
 
+use super::COMPONENT;
+use crate::shard::IggyShard;
 use crate::streaming::session::Session;
-use crate::streaming::systems::COMPONENT;
-use crate::streaming::systems::system::System;
 use error_set::ErrContext;
 use iggy_common::Identifier;
 use iggy_common::IggyError;
 
 impl IggyShard {
     pub async fn create_partitions(
-        &mut self,
+        &self,
         session: &Session,
         stream_id: &Identifier,
         topic_id: &Identifier,
@@ -33,8 +33,13 @@ impl IggyShard {
     ) -> Result<(), IggyError> {
         self.ensure_authenticated(session)?;
         {
-            let topic = self.find_topic(session, stream_id, 
topic_id).with_error_context(|error| format!("{COMPONENT} (error: {error}) - 
topic not found for stream ID: {stream_id}, topic_id: {topic_id}"))?;
-            self.permissioner.create_partitions(
+            let stream = self.get_stream(stream_id).with_error_context(|error| 
{
+                format!(
+                    "{COMPONENT} (error: {error}) - stream not found for 
stream ID: {stream_id}"
+                )
+            })?;
+            let topic = self.find_topic(session, &stream, 
topic_id).with_error_context(|error| format!("{COMPONENT} (error: {error}) - 
topic not found for stream ID: {stream_id}, topic ID: {topic_id}"))?;
+            self.permissioner.borrow().create_partitions(
                 session.get_user_id(),
                 topic.stream_id,
                 topic.topic_id,
@@ -46,28 +51,33 @@ impl IggyShard {
             ))?;
         }
 
-        let topic = self
-            .get_stream_mut(stream_id)?
+        let mut stream = 
self.get_stream_mut(stream_id).with_error_context(|error| {
+            format!("{COMPONENT} (error: {error}) - failed to get stream with 
ID: {stream_id}")
+        })?;
+        let topic = stream
             .get_topic_mut(topic_id)
             .with_error_context(|error| {
                 format!(
                     "{COMPONENT} (error: {error}) - failed to get mutable 
reference to stream with id: {stream_id}"
                 )
             })?;
+
+        // TODO: Make add persisted partitions to topic sync, and extract the 
storage persister out of it
+        // perform disk i/o outside of the borrow_mut of the stream.
         topic
             .add_persisted_partitions(partitions_count)
             .await
             .with_error_context(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to add 
persisted partitions, topic: {topic}")
             })?;
-        topic.reassign_consumer_groups().await;
+        topic.reassign_consumer_groups();
         self.metrics.increment_partitions(partitions_count);
         self.metrics.increment_segments(partitions_count);
         Ok(())
     }
 
     pub async fn delete_partitions(
-        &mut self,
+        &self,
         session: &Session,
         stream_id: &Identifier,
         topic_id: &Identifier,
@@ -75,8 +85,13 @@ impl IggyShard {
     ) -> Result<(), IggyError> {
         self.ensure_authenticated(session)?;
         {
-            let topic = self.find_topic(session, stream_id, 
topic_id).with_error_context(|error| format!("{COMPONENT} (error: {error}) - 
topic not found for stream ID: {stream_id}, topic_id: {topic_id}"))?;
-            self.permissioner.delete_partitions(
+            let stream = self.get_stream(stream_id).with_error_context(|error| 
{
+                format!(
+                    "{COMPONENT} (error: {error}) - stream not found for 
stream ID: {stream_id}"
+                )
+            })?;
+            let topic = self.find_topic(session, &stream, 
topic_id).with_error_context(|error| format!("{COMPONENT} (error: {error}) - 
topic not found for stream ID: {stream_id}, topic_id: {topic_id}"))?;
+            self.permissioner.borrow().delete_partitions(
                 session.get_user_id(),
                 topic.stream_id,
                 topic.topic_id,
@@ -88,21 +103,26 @@ impl IggyShard {
             ))?;
         }
 
-        let topic = self
-            .get_stream_mut(stream_id)?
+        let mut stream = 
self.get_stream_mut(stream_id).with_error_context(|error| {
+            format!("{COMPONENT} (error: {error}) - failed to get stream with 
ID: {stream_id}")
+        })?;
+        let topic = stream
             .get_topic_mut(topic_id)
             .with_error_context(|error| {
                 format!(
                     "{COMPONENT} (error: {error}) - failed to get mutable 
reference to stream with id: {stream_id}"
                 )
             })?;
+
+        // TODO: Make delete persisted partitions from topic sync, and extract 
the storage persister out of it
+        // perform disk i/o outside of the borrow_mut of the stream.
         let partitions = topic
             .delete_persisted_partitions(partitions_count)
             .await
             .with_error_context(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to delete 
persisted partitions for topic: {topic}")
             })?;
-        topic.reassign_consumer_groups().await;
+        topic.reassign_consumer_groups();
         if let Some(partitions) = partitions {
             self.metrics.decrement_partitions(partitions_count);
             self.metrics.decrement_segments(partitions.segments_count);
diff --git a/core/server/src/shard/system/personal_access_tokens.rs 
b/core/server/src/shard/system/personal_access_tokens.rs
index b5452e9a..082fcd93 100644
--- a/core/server/src/shard/system/personal_access_tokens.rs
+++ b/core/server/src/shard/system/personal_access_tokens.rs
@@ -16,6 +16,8 @@
  * under the License.
  */
 
+use super::COMPONENT;
+use crate::shard::IggyShard;
 use 
crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken;
 use crate::streaming::session::Session;
 use crate::streaming::users::user::User;
@@ -64,7 +66,7 @@ impl IggyShard {
             let user = self.get_user(&identifier).with_error_context(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to get user 
with id: {user_id}")
             })?;
-            let max_token_per_user = 
self.personal_access_token.max_tokens_per_user;
+            let max_token_per_user = 
self.config.personal_access_token.max_tokens_per_user;
             if user.personal_access_tokens.len() as u32 >= max_token_per_user {
                 error!(
                     "User with ID: {user_id} has reached the maximum number of 
personal access tokens: {max_token_per_user}.",
@@ -76,7 +78,7 @@ impl IggyShard {
             }
         }
 
-        let user = self.get_user(&identifier).with_error_context(|error| {
+        let user = self.get_user_mut(&identifier).with_error_context(|error| {
             format!("{COMPONENT} (error: {error}) - failed to get mutable 
reference to the user with id: {user_id}")
         })?;
 
@@ -102,7 +104,7 @@ impl IggyShard {
     }
 
     pub async fn delete_personal_access_token(
-        &mut self,
+        &self,
         session: &Session,
         name: &str,
     ) -> Result<(), IggyError> {
@@ -137,10 +139,11 @@ impl IggyShard {
         &self,
         token: &str,
         session: Option<&Session>,
-    ) -> Result<&User, IggyError> {
+    ) -> Result<User, IggyError> {
         let token_hash = PersonalAccessToken::hash_token(token);
+        let users = self.users.borrow();
         let mut personal_access_token = None;
-        for user in self.users.values() {
+        for user in users.values() {
             if let Some(pat) = user.personal_access_tokens.get(&token_hash) {
                 personal_access_token = Some(pat);
                 break;
@@ -173,6 +176,5 @@ impl IggyShard {
                 )
             })?;
         self.login_user_with_credentials(&user.username, None, session)
-            .await
     }
 }
diff --git a/core/server/src/shard/system/segments.rs 
b/core/server/src/shard/system/segments.rs
index 27abb76b..6a8ad68e 100644
--- a/core/server/src/shard/system/segments.rs
+++ b/core/server/src/shard/system/segments.rs
@@ -16,6 +16,7 @@ use crate::shard::IggyShard;
  * specific language governing permissions and limitations
  * under the License.
  */
+use super::COMPONENT;
 use crate::streaming::session::Session;
 use error_set::ErrContext;
 use iggy_common::Identifier;
@@ -24,7 +25,7 @@ use iggy_common::locking::IggySharedMutFn;
 
 impl IggyShard {
     pub async fn delete_segments(
-        &mut self,
+        &self,
         session: &Session,
         stream_id: &Identifier,
         topic_id: &Identifier,
@@ -35,9 +36,14 @@ impl IggyShard {
         self.ensure_authenticated(session)?;
 
         {
-            let topic = self.find_topic(session, stream_id, 
topic_id).with_error_context(|error| format!("{COMPONENT} (error: {error}) - 
topic not found for stream_id: {stream_id}, topic_id: {topic_id}"))?;
+            let stream = self.get_stream(stream_id).with_error_context(|error| 
{
+                format!(
+                    "{COMPONENT} (error: {error}) - stream not found for 
stream_id: {stream_id}"
+                )
+            })?;
+            let topic = self.find_topic(session, &stream, 
topic_id).with_error_context(|error| format!("{COMPONENT} (error: {error}) - 
topic not found for stream_id: {stream_id}, topic_id: {topic_id}"))?;
 
-            self.permissioner.delete_segments(
+            self.permissioner.borrow().delete_segments(
                 session.get_user_id(),
                 topic.stream_id,
                 topic.topic_id,
@@ -49,9 +55,11 @@ impl IggyShard {
             ))?;
         }
 
-        let topic = self
-            .get_stream_mut(stream_id)?
-            .get_topic_mut(topic_id)
+        let stream = self.get_stream(stream_id).with_error_context(|error| {
+            format!("{COMPONENT} (error: {error}) - failed to get stream with 
ID: {stream_id}")
+        })?;
+        let topic = stream
+            .get_topic(topic_id)
             .with_error_context(|error| {
                 format!(
                     "{COMPONENT} (error: {error}) - failed to get mutable 
reference to stream with ID: {stream_id}"
@@ -95,7 +103,25 @@ impl IggyShard {
 
             (segments_count, messages_count)
         };
-        topic.reassign_consumer_groups().await;
+        drop(partition);
+        drop(topic);
+        drop(stream);
+
+        let mut stream = self
+            .get_stream_mut(stream_id)
+            .with_error_context(|error| {
+                format!(
+                    "{COMPONENT} (error: {error}) - failed to get mutable 
reference to stream with ID: {stream_id}"
+                )
+            })?;
+        let topic = stream
+            .get_topic_mut(topic_id)
+            .with_error_context(|error| {
+                format!(
+                    "{COMPONENT} (error: {error}) - failed to get mutable 
reference to topic with ID: {topic_id} in stream with ID: {stream_id}"
+                )
+            })?;
+        topic.reassign_consumer_groups();
 
         self.metrics.decrement_segments(deleted_segments_count);
         self.metrics.decrement_messages(deleted_messages_count);
diff --git a/core/server/src/shard/system/snapshot/mod.rs 
b/core/server/src/shard/system/snapshot/mod.rs
index f6e07785..78c5d393 100644
--- a/core/server/src/shard/system/snapshot/mod.rs
+++ b/core/server/src/shard/system/snapshot/mod.rs
@@ -19,22 +19,23 @@
 mod procdump;
 
 use crate::configs::system::SystemConfig;
+use crate::shard::IggyShard;
 use crate::streaming::session::Session;
 use async_zip::tokio::write::ZipFileWriter;
 use async_zip::{Compression, ZipEntryBuilder};
 use iggy_common::{IggyDuration, IggyError, Snapshot, SnapshotCompression, 
SystemSnapshotType};
+use monoio::fs::{File, OpenOptions};
 use std::io::Cursor;
 use std::path::PathBuf;
-use std::sync::Arc;
+use std::rc::Rc;
 use std::time::Instant;
 use tempfile::NamedTempFile;
-use tokio::fs::File;
 use tokio::io::{AsyncReadExt, AsyncWriteExt};
 use tokio::process::Command;
 use tokio_util::compat::TokioAsyncWriteCompatExt;
 use tracing::{error, info};
 
-impl System {
+impl IggyShard {
     pub async fn get_snapshot(
         &self,
         session: &Session,
@@ -69,18 +70,19 @@ impl System {
         let now = Instant::now();
 
         for snapshot_type in snapshot_types {
-            match get_command_result(snapshot_type, self.config.clone()).await 
{
+            match get_command_result(snapshot_type, 
self.config.system.clone()).await {
                 Ok(temp_file) => {
                     let filename = format!("{snapshot_type}.txt");
                     let entry = ZipEntryBuilder::new(filename.clone().into(), 
compression);
 
-                    let mut file = 
File::open(temp_file.path()).await.map_err(|e| {
+                    let file = File::open(temp_file.path()).await.map_err(|e| {
                         error!("Failed to open temporary file: {}", e);
                         IggyError::SnapshotFileCompletionFailed
                     })?;
 
-                    let mut content = Vec::new();
-                    if let Err(e) = file.read_to_end(&mut content).await {
+                    let content = Vec::new();
+                    let (result, content) = file.read_exact_at(content, 
0).await;
+                    if let Err(e) = result {
                         error!("Failed to read temporary file: {}", e);
                         continue;
                     }
@@ -130,9 +132,10 @@ async fn write_command_output_to_temp_file(
 ) -> Result<NamedTempFile, std::io::Error> {
     let output = command.output().await?;
     let temp_file = NamedTempFile::new()?;
-    let mut file = File::from_std(temp_file.as_file().try_clone()?);
-    file.write_all(&output.stdout).await?;
-    file.flush().await?;
+    let file = File::from_std(temp_file.as_file().try_clone()?).unwrap();
+    let (result, _) = file.write_all_at(output.stdout, 0).await;
+    result?;
+    file.sync_all().await?;
     Ok(temp_file)
 }
 
@@ -142,18 +145,35 @@ async fn get_filesystem_overview() -> 
Result<NamedTempFile, std::io::Error> {
 
 async fn get_process_info() -> Result<NamedTempFile, std::io::Error> {
     let temp_file = NamedTempFile::new()?;
-    let mut file = File::from_std(temp_file.as_file().try_clone()?);
+    let file = File::from_std(temp_file.as_file().try_clone()?).unwrap();
 
+    let mut position = 0;
     let ps_output = Command::new("ps").arg("aux").output().await?;
-    file.write_all(b"=== Process List (ps aux) ===\n").await?;
-    file.write_all(&ps_output.stdout).await?;
-    file.write_all(b"\n\n").await?;
+    let (result, written) = file
+        .write_all_at(b"=== Process List (ps aux) ===\n", 0)
+        .await;
+    result?;
+    position += written.len() as u64;
+
+    let (result, written) = file.write_all_at(ps_output.stdout, 
position).await;
+    result?;
+    position += written.len() as u64;
+
+    let (result, written) = file.write_all_at(b"\n\n", position).await;
+    result?;
+    position += written.len() as u64;
+
+    let (result, written) = file
+        .write_all_at(b"=== Detailed Process Information ===\n", position)
+        .await;
+    result?;
+    position += written.len() as u64;
 
-    file.write_all(b"=== Detailed Process Information ===\n")
-        .await?;
     let proc_info = procdump::get_proc_info().await?;
-    file.write_all(proc_info.as_bytes()).await?;
-    file.flush().await?;
+    let bytes = proc_info.as_bytes().to_owned();
+    let (result, _) = file.write_all_at(bytes, position).await;
+    result?;
+    file.sync_all().await?;
 
     Ok(temp_file)
 }
@@ -166,7 +186,7 @@ async fn get_test_snapshot() -> Result<NamedTempFile, 
std::io::Error> {
     write_command_output_to_temp_file(Command::new("echo").arg("test")).await
 }
 
-async fn get_server_logs(config: Arc<SystemConfig>) -> Result<NamedTempFile, 
std::io::Error> {
+async fn get_server_logs(config: Rc<SystemConfig>) -> Result<NamedTempFile, 
std::io::Error> {
     let base_directory = PathBuf::from(config.get_system_path());
     let logs_subdirectory = PathBuf::from(&config.logging.path);
     let logs_path = base_directory.join(logs_subdirectory);
@@ -179,7 +199,7 @@ async fn get_server_logs(config: Arc<SystemConfig>) -> 
Result<NamedTempFile, std
     write_command_output_to_temp_file(Command::new("sh").args(["-c", 
&list_and_cat])).await
 }
 
-async fn get_server_config(config: Arc<SystemConfig>) -> Result<NamedTempFile, 
std::io::Error> {
+async fn get_server_config(config: Rc<SystemConfig>) -> Result<NamedTempFile, 
std::io::Error> {
     let base_directory = PathBuf::from(config.get_system_path());
     let config_path = 
base_directory.join("runtime").join("current_config.toml");
 
@@ -188,7 +208,7 @@ async fn get_server_config(config: Arc<SystemConfig>) -> 
Result<NamedTempFile, s
 
 async fn get_command_result(
     snapshot_type: &SystemSnapshotType,
-    config: Arc<SystemConfig>,
+    config: Rc<SystemConfig>,
 ) -> Result<NamedTempFile, std::io::Error> {
     match snapshot_type {
         SystemSnapshotType::FilesystemOverview => 
get_filesystem_overview().await,
diff --git a/core/server/src/shard/system/stats.rs 
b/core/server/src/shard/system/stats.rs
index 8410cac2..53e31322 100644
--- a/core/server/src/shard/system/stats.rs
+++ b/core/server/src/shard/system/stats.rs
@@ -16,8 +16,8 @@
  * under the License.
  */
 
-use crate::shard::IggyShard;
 use crate::VERSION;
+use crate::shard::IggyShard;
 use crate::versioning::SemanticVersion;
 use iggy_common::locking::IggySharedMutFn;
 use iggy_common::{IggyDuration, IggyError, Stats};
@@ -45,7 +45,7 @@ impl IggyShard {
         let total_cpu_usage = sys.global_cpu_usage();
         let total_memory = sys.total_memory().into();
         let available_memory = sys.available_memory().into();
-        let clients_count = 
self.client_manager.read().await.get_clients().len() as u32;
+        let clients_count = self.client_manager.borrow().get_clients().len() 
as u32;
         let hostname = 
sysinfo::System::host_name().unwrap_or("unknown_hostname".to_string());
         let os_name = 
sysinfo::System::name().unwrap_or("unknown_os_name".to_string());
         let os_version =
@@ -90,7 +90,7 @@ impl IggyShard {
 
         drop(sys);
 
-        for stream in self.streams.values() {
+        for stream in self.streams.borrow().values() {
             stats.messages_count += stream.get_messages_count();
             stats.segments_count += stream.get_segments_count();
             stats.messages_size_bytes += stream.get_size();
@@ -104,7 +104,7 @@ impl IggyShard {
             stats.consumer_groups_count += stream
                 .topics
                 .values()
-                .map(|t| t.consumer_groups.len() as u32)
+                .map(|t| t.consumer_groups.borrow().len() as u32)
                 .sum::<u32>();
         }
 
diff --git a/core/server/src/shard/system/storage.rs 
b/core/server/src/shard/system/storage.rs
index 4cb41120..cc413583 100644
--- a/core/server/src/shard/system/storage.rs
+++ b/core/server/src/shard/system/storage.rs
@@ -16,6 +16,7 @@
  * under the License.
  */
 
+use super::COMPONENT;
 use crate::shard::system::info::SystemInfo;
 use crate::streaming::persistence::persister::PersisterKind;
 use crate::streaming::storage::SystemInfoStorage;
diff --git a/core/server/src/shard/system/streams.rs 
b/core/server/src/shard/system/streams.rs
index 8a0772a9..43b45246 100644
--- a/core/server/src/shard/system/streams.rs
+++ b/core/server/src/shard/system/streams.rs
@@ -16,24 +16,37 @@
  * under the License.
  */
 
+use super::COMPONENT;
 use crate::shard::IggyShard;
 use crate::streaming::session::Session;
 use crate::streaming::streams::stream::Stream;
 use error_set::ErrContext;
 use futures::future::try_join_all;
-use std::cell::RefCell;
+use iggy_common::{IdKind, Identifier, IggyError};
+use std::cell::{Ref, RefCell, RefMut};
 use std::sync::atomic::{AtomicU32, Ordering};
 use tokio::fs;
 use tracing::{error, info, warn};
 
+static CURRENT_STREAM_ID: AtomicU32 = AtomicU32::new(1);
+
 impl IggyShard {
-    pub fn get_streams(&self) -> Vec<&Stream> {
-        self.streams.values().collect()
+    pub fn get_streams(&self) -> Vec<Ref<'_, Stream>> {
+        let len = self.streams.borrow().len();
+        let result = (0..len)
+            .map(|i| {
+                Ref::map(self.streams.borrow(), |streams| {
+                    streams.values().nth(i).unwrap()
+                })
+            })
+            .collect();
+        result
     }
 
-    pub fn find_streams(&self, session: &Session) -> Result<Vec<&Stream>, 
IggyError> {
+    pub fn find_streams(&self, session: &Session) -> Result<Vec<Ref<'_, 
Stream>>, IggyError> {
         self.ensure_authenticated(session)?;
         self.permissioner
+            .borrow()
             .get_streams(session.get_user_id())
             .with_error_context(|error| {
                 format!(
@@ -44,15 +57,16 @@ impl IggyShard {
         Ok(self.get_streams())
     }
 
-    pub fn find_stream(
+    pub fn find_stream<'a>(
         &self,
         session: &Session,
         identifier: &Identifier,
-    ) -> Result<&Stream, IggyError> {
+    ) -> Result<Ref<'_, Stream>, IggyError> {
         self.ensure_authenticated(session)?;
         let stream = self.get_stream(identifier);
         if let Ok(stream) = stream {
             self.permissioner
+            .borrow()
                 .get_stream(session.get_user_id(), stream.stream_id)
                 .with_error_context(|error| {
                     format!(
@@ -70,13 +84,14 @@ impl IggyShard {
         &self,
         session: &Session,
         identifier: &Identifier,
-    ) -> Result<Option<&Stream>, IggyError> {
+    ) -> Result<Option<Ref<'_, Stream>>, IggyError> {
         self.ensure_authenticated(session)?;
         let Some(stream) = self.try_get_stream(identifier)? else {
             return Ok(None);
         };
 
         self.permissioner
+        .borrow()
             .get_stream(session.get_user_id(), stream.stream_id)
             .with_error_context(|error| {
                 format!(
@@ -87,83 +102,92 @@ impl IggyShard {
         Ok(Some(stream))
     }
 
-    pub fn try_get_stream(&self, identifier: &Identifier) -> 
Result<Option<&Stream>, IggyError> {
+    pub fn try_get_stream(
+        &self,
+        identifier: &Identifier,
+    ) -> Result<Option<Ref<'_, Stream>>, IggyError> {
         match identifier.kind {
-            IdKind::Numeric => 
Ok(self.streams.get(&identifier.get_u32_value()?)),
+            IdKind::Numeric => 
self.get_stream_by_id(identifier.get_u32_value()?).map(Some),
             IdKind::String => 
Ok(self.try_get_stream_by_name(&identifier.get_cow_str_value()?)),
         }
     }
 
-    fn try_get_stream_by_name(&self, name: &str) -> Option<&Stream> {
+    fn try_get_stream_by_name(&self, name: &str) -> Option<Ref<'_, Stream>> {
         self.streams_ids
+            .borrow()
             .get(name)
-            .and_then(|id| self.streams.get(id))
+            .and_then(|id| Some(self.get_stream_ref(*id)))
     }
 
-    pub fn get_stream(&self, identifier: &Identifier) -> Result<&Stream, 
IggyError> {
+    pub fn get_stream(&self, identifier: &Identifier) -> Result<Ref<'_, 
Stream>, IggyError> {
         match identifier.kind {
             IdKind::Numeric => 
self.get_stream_by_id(identifier.get_u32_value()?),
             IdKind::String => 
self.get_stream_by_name(&identifier.get_cow_str_value()?),
         }
     }
 
-    pub fn get_stream_mut(&mut self, identifier: &Identifier) -> Result<&mut 
Stream, IggyError> {
+    pub fn get_stream_mut(&self, identifier: &Identifier) -> Result<RefMut<'_, 
Stream>, IggyError> {
         match identifier.kind {
             IdKind::Numeric => 
self.get_stream_by_id_mut(identifier.get_u32_value()?),
             IdKind::String => 
self.get_stream_by_name_mut(&identifier.get_cow_str_value()?),
         }
     }
 
-    fn get_stream_by_name(&self, name: &str) -> Result<&Stream, IggyError> {
-        let stream_id = self.streams_ids.get(name);
-        if stream_id.is_none() {
+    fn get_stream_by_name(&self, name: &str) -> Result<Ref<'_, Stream>, 
IggyError> {
+        let exists = self.streams_ids.borrow().iter().any(|s| s.0 == name);
+        if !exists {
             return Err(IggyError::StreamNameNotFound(name.to_string()));
         }
-
-        self.get_stream_by_id(*stream_id.unwrap())
+        let stream_id = self.streams_ids.borrow().get(name).cloned().unwrap();
+        self.get_stream_by_id(stream_id)
     }
 
-    fn get_stream_by_id(&self, stream_id: u32) -> Result<&Stream, IggyError> {
-        let stream = self.streams.get(&stream_id);
-        if stream.is_none() {
+    fn get_stream_by_id(&self, stream_id: u32) -> Result<Ref<'_, Stream>, 
IggyError> {
+        let exists = self.streams.borrow().iter().any(|s| s.0 == &stream_id);
+        if !exists {
             return Err(IggyError::StreamIdNotFound(stream_id));
         }
-
-        Ok(stream.unwrap())
+        Ok(self.get_stream_ref(stream_id))
     }
 
-    fn get_stream_by_name_mut(&mut self, name: &str) -> Result<&mut Stream, 
IggyError> {
-        let stream_id;
-        {
-            let id = self.streams_ids.get_mut(name);
-            if id.is_none() {
-                return Err(IggyError::StreamNameNotFound(name.to_string()));
-            }
+    fn get_stream_ref(&self, stream_id: u32) -> Ref<'_, Stream> {
+        Ref::map(self.streams.borrow(), |streams| {
+            streams.get(&stream_id).expect("Stream ID not found")
+        })
+    }
 
-            stream_id = *id.unwrap();
+    fn get_stream_by_name_mut(&self, name: &str) -> Result<RefMut<'_, Stream>, 
IggyError> {
+        let exists = self.streams_ids.borrow().iter().any(|s| s.0 == name);
+        if !exists {
+            return Err(IggyError::StreamNameAlreadyExists(name.to_owned()));
         }
-
-        self.get_stream_by_id_mut(stream_id)
+        let streams_ids = self.streams_ids.borrow();
+        let id = streams_ids.get(name).cloned();
+        drop(streams_ids);
+        self.get_stream_by_id_mut(id.unwrap())
     }
 
-    fn get_stream_by_id_mut(&mut self, stream_id: u32) -> Result<&mut Stream, 
IggyError> {
-        let stream = self.streams.get_mut(&stream_id);
-        if stream.is_none() {
+    fn get_stream_by_id_mut(&self, stream_id: u32) -> Result<RefMut<'_, 
Stream>, IggyError> {
+        let exists = self.streams.borrow().iter().any(|s| s.0 == &stream_id);
+        if !exists {
             return Err(IggyError::StreamIdNotFound(stream_id));
         }
-
-        Ok(stream.unwrap())
+        Ok(RefMut::map(self.streams.borrow_mut(), |s| {
+            s.get_mut(&stream_id).unwrap()
+        }))
     }
 
     pub async fn create_stream(
-        &mut self,
+        &self,
         session: &Session,
         stream_id: Option<u32>,
         name: &str,
-    ) -> Result<&Stream, IggyError> {
+    ) -> Result<u32, IggyError> {
         self.ensure_authenticated(session)?;
-        self.permissioner.create_stream(session.get_user_id())?;
-        if self.streams_ids.contains_key(name) {
+        self.permissioner
+            .borrow()
+            .create_stream(session.get_user_id())?;
+        if self.streams_ids.borrow().contains_key(name) {
             return Err(IggyError::StreamNameAlreadyExists(name.to_owned()));
         }
 
@@ -171,7 +195,7 @@ impl IggyShard {
         if stream_id.is_none() {
             id = CURRENT_STREAM_ID.fetch_add(1, Ordering::SeqCst);
             loop {
-                if self.streams.contains_key(&id) {
+                if self.streams.borrow().contains_key(&id) {
                     if id == u32::MAX {
                         return Err(IggyError::StreamIdAlreadyExists(id));
                     }
@@ -184,21 +208,23 @@ impl IggyShard {
             id = stream_id.unwrap();
         }
 
-        if self.streams.contains_key(&id) {
+        if self.streams.borrow().contains_key(&id) {
             return Err(IggyError::StreamIdAlreadyExists(id));
         }
 
-        let stream = Stream::create(id, name, self.config.clone(), 
self.storage.clone());
+        let stream = Stream::create(id, name, self.config.system.clone(), 
self.storage.clone());
         stream.persist().await?;
         info!("Created stream with ID: {id}, name: '{name}'.");
-        self.streams_ids.insert(name.to_owned(), stream.stream_id);
-        self.streams.insert(stream.stream_id, stream);
+        self.streams_ids
+            .borrow_mut()
+            .insert(name.to_owned(), stream.stream_id);
+        self.streams.borrow_mut().insert(stream.stream_id, stream);
         self.metrics.increment_streams(1);
-        self.get_stream_by_id(id)
+        Ok(id)
     }
 
     pub async fn update_stream(
-        &mut self,
+        &self,
         session: &Session,
         id: &Identifier,
         name: &str,
@@ -213,6 +239,7 @@ impl IggyShard {
         }
 
         self.permissioner
+        .borrow()
             .update_stream(session.get_user_id(), stream_id)
             .with_error_context(|error| {
                 format!(
@@ -223,7 +250,7 @@ impl IggyShard {
             })?;
 
         {
-            if let Some(stream_id_by_name) = self.streams_ids.get(name) {
+            if let Some(stream_id_by_name) = 
self.streams_ids.borrow().get(name) {
                 if *stream_id_by_name != stream_id {
                     return 
Err(IggyError::StreamNameAlreadyExists(name.to_owned()));
                 }
@@ -232,7 +259,7 @@ impl IggyShard {
 
         let old_name;
         {
-            let stream = self.get_stream_mut(id).with_error_context(|error| {
+            let mut stream = 
self.get_stream_mut(id).with_error_context(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to get mutable 
reference to stream with id: {id}")
             })?;
             old_name = stream.name.clone();
@@ -241,8 +268,10 @@ impl IggyShard {
         }
 
         {
-            self.streams_ids.remove(&old_name);
-            self.streams_ids.insert(name.to_owned(), stream_id);
+            self.streams_ids.borrow_mut().remove(&old_name);
+            self.streams_ids
+                .borrow_mut()
+                .insert(name.to_owned(), stream_id);
         }
 
         info!("Stream with ID '{id}' updated. Old name: '{old_name}' changed 
to: '{name}'.");
@@ -250,7 +279,7 @@ impl IggyShard {
     }
 
     pub async fn delete_stream(
-        &mut self,
+        &self,
         session: &Session,
         id: &Identifier,
     ) -> Result<u32, IggyError> {
@@ -260,6 +289,7 @@ impl IggyShard {
         })?;
         let stream_id = stream.stream_id;
         self.permissioner
+            .borrow()
             .delete_stream(session.get_user_id(), stream_id)
             .with_error_context(|error| {
                 format!(
@@ -279,17 +309,16 @@ impl IggyShard {
             .decrement_partitions(stream.get_partitions_count());
         self.metrics.decrement_messages(stream.get_messages_count());
         self.metrics.decrement_segments(stream.get_segments_count());
-        self.streams.remove(&stream_id);
-        self.streams_ids.remove(&stream_name);
+        self.streams.borrow_mut().remove(&stream_id);
+        self.streams_ids.borrow_mut().remove(&stream_name);
         let current_stream_id = CURRENT_STREAM_ID.load(Ordering::SeqCst);
         if current_stream_id > stream_id {
             CURRENT_STREAM_ID.store(stream_id, Ordering::SeqCst);
         }
 
-        let client_manager = self.client_manager.read().await;
-        client_manager
-            .delete_consumer_groups_for_stream(stream_id)
-            .await;
+        self.client_manager
+            .borrow_mut()
+            .delete_consumer_groups_for_stream(stream_id);
         Ok(stream_id)
     }
 
@@ -302,6 +331,7 @@ impl IggyShard {
             format!("{COMPONENT} (error: {error}) - failed to get stream with 
ID: {stream_id}")
         })?;
         self.permissioner
+            .borrow()
             .purge_stream(session.get_user_id(), stream.stream_id)
             .with_error_context(|error| {
                 format!(
@@ -329,10 +359,12 @@ mod tests {
         sync::Arc,
     };
 
+    //TODO: Fixme
+    /*
     #[tokio::test]
     async fn should_get_stream_by_id_and_name() {
         let tempdir = tempfile::TempDir::new().unwrap();
-        let config = Arc::new(SystemConfig {
+        let config = Rc::new(SystemConfig {
             path: tempdir.path().to_str().unwrap().to_string(),
             ..Default::default()
         });
@@ -378,4 +410,5 @@ mod tests {
         assert_eq!(stream.stream_id, stream_id);
         assert_eq!(stream.name, stream_name);
     }
+    */
 }
diff --git a/core/server/src/shard/system/topics.rs 
b/core/server/src/shard/system/topics.rs
index 145689af..d118090f 100644
--- a/core/server/src/shard/system/topics.rs
+++ b/core/server/src/shard/system/topics.rs
@@ -16,29 +16,31 @@
  * under the License.
  */
 
+use super::COMPONENT;
 use crate::shard::IggyShard;
 use crate::streaming::session::Session;
+use crate::streaming::streams::stream::Stream;
 use crate::streaming::topics::topic::Topic;
 use error_set::ErrContext;
 use iggy_common::locking::IggySharedMutFn;
 use iggy_common::{CompressionAlgorithm, Identifier, IggyError, IggyExpiry, 
MaxTopicSize};
+use tokio_util::io::StreamReader;
 
 impl IggyShard {
-    pub fn find_topic(
+    pub fn find_topic<'topic, 'stream>(
         &self,
         session: &Session,
-        stream_id: &Identifier,
+        stream: &'stream Stream,
         topic_id: &Identifier,
-    ) -> Result<&Topic, IggyError> {
+    ) -> Result<&'topic Topic, IggyError>
+    where
+        'stream: 'topic,
+    {
         self.ensure_authenticated(session)?;
-        let stream = self
-            .find_stream(session, stream_id)
-            .with_error_context(|error| {
-                format!("{COMPONENT} (error: {error}) - failed to find stream 
with ID: {stream_id}")
-            })?;
-        let topic = stream.get_topic(topic_id);
-        if let Ok(topic) = topic {
-            self.permissioner
+        let stream_id = stream.stream_id;
+        let topic = stream.get_topic(topic_id)?;
+        self.permissioner
+            .borrow()
                 .get_topic(session.get_user_id(), stream.stream_id, 
topic.topic_id)
                 .with_error_context(|error| {
                     format!(
@@ -46,22 +48,21 @@ impl IggyShard {
                         session.get_user_id(),
                     )
                 })?;
-            return Ok(topic);
-        }
-
-        topic
+        Ok(topic)
     }
 
-    pub fn find_topics(
+    pub fn find_topics<'stream, 'topic>(
         &self,
         session: &Session,
-        stream_id: &Identifier,
-    ) -> Result<Vec<&Topic>, IggyError> {
+        stream: &'stream Stream,
+    ) -> Result<Vec<&'topic Topic>, IggyError>
+    where
+        'stream: 'topic,
+    {
         self.ensure_authenticated(session)?;
-        let stream = self.get_stream(stream_id).with_error_context(|error| {
-            format!("{COMPONENT} (error: {error}) - failed to get stream with 
ID: {stream_id}")
-        })?;
+        let stream_id = stream.stream_id;
         self.permissioner
+        .borrow()
             .get_topics(session.get_user_id(), stream.stream_id)
             .with_error_context(|error| {
                 format!(
@@ -72,28 +73,24 @@ impl IggyShard {
         Ok(stream.get_topics())
     }
 
-    pub fn try_find_topic(
+    pub fn try_find_topic<'stream, 'topic>(
         &self,
         session: &Session,
-        stream_id: &Identifier,
+        stream: &'stream Stream,
         topic_id: &Identifier,
-    ) -> Result<Option<&Topic>, IggyError> {
+    ) -> Result<Option<&'topic Topic>, IggyError>
+    where
+        'stream: 'topic,
+    {
         self.ensure_authenticated(session)?;
-        let Some(stream) = self
-            .try_find_stream(session, stream_id)
-            .with_error_context(|error| {
-                format!("{COMPONENT} (error: {error}) - failed to find stream 
with ID: {stream_id}")
-            })?
-        else {
-            return Ok(None);
-        };
-
+        let stream_id = stream.stream_id;
         let Some(topic) = stream.try_get_topic(topic_id)? else {
             return Ok(None);
         };
 
         self.permissioner
-            .get_topic(session.get_user_id(), stream.stream_id, topic.topic_id)
+        .borrow()
+            .get_topic(session.get_user_id(), stream_id, topic.topic_id)
             .with_error_context(|error| {
                 format!(
                     "{COMPONENT} (error: {error}) - permission denied to get 
topic with ID: {topic_id} in stream with ID: {stream_id} for user with ID: {}",
@@ -105,7 +102,7 @@ impl IggyShard {
 
     #[allow(clippy::too_many_arguments)]
     pub async fn create_topic(
-        &mut self,
+        &self,
         session: &Session,
         stream_id: &Identifier,
         topic_id: Option<u32>,
@@ -115,13 +112,14 @@ impl IggyShard {
         compression_algorithm: CompressionAlgorithm,
         max_topic_size: MaxTopicSize,
         replication_factor: Option<u8>,
-    ) -> Result<&Topic, IggyError> {
+    ) -> Result<u32, IggyError> {
         self.ensure_authenticated(session)?;
         {
             let stream = self.get_stream(stream_id).with_error_context(|error| 
{
                 format!("{COMPONENT} (error: {error}) - failed to get stream 
with ID: {stream_id}")
             })?;
             self.permissioner
+            .borrow()
                 .create_topic(session.get_user_id(), stream.stream_id)
                 .with_error_context(|error| {
                     format!(
@@ -131,6 +129,8 @@ impl IggyShard {
                 })?;
         }
 
+        // TODO: Make create topic sync, and extract the storage persister out 
of it
+        // perform disk i/o outside of the borrow_mut of the stream.
         let created_topic_id = self
             .get_stream_mut(stream_id)?
             .create_topic(
@@ -151,21 +151,12 @@ impl IggyShard {
         self.metrics.increment_partitions(partitions_count);
         self.metrics.increment_segments(partitions_count);
 
-        self.get_stream(stream_id)
-            .with_error_context(|error| {
-                format!("{COMPONENT} (error: {error}) - failed to get stream 
with ID: {stream_id}")
-            })?
-            .get_topic(&created_topic_id.try_into()?)
-            .with_error_context(|error| {
-                format!(
-                    "{COMPONENT} (error: {error}) - failed to get created 
topic with ID: {created_topic_id} in stream with ID: {stream_id}",
-                )
-            })
+        Ok(created_topic_id)
     }
 
     #[allow(clippy::too_many_arguments)]
     pub async fn update_topic(
-        &mut self,
+        &self,
         session: &Session,
         stream_id: &Identifier,
         topic_id: &Identifier,
@@ -174,17 +165,20 @@ impl IggyShard {
         compression_algorithm: CompressionAlgorithm,
         max_topic_size: MaxTopicSize,
         replication_factor: Option<u8>,
-    ) -> Result<&Topic, IggyError> {
+    ) -> Result<u32, IggyError> {
         self.ensure_authenticated(session)?;
         {
+            let stream = self.get_stream(stream_id).with_error_context(|error| 
{
+                format!("{COMPONENT} (error: {error}) - failed to get stream 
with ID: {stream_id}")
+            })?;
             let topic = self
-                .find_topic(session, stream_id, topic_id)
+                .find_topic(session, &stream, topic_id)
                 .with_error_context(|error| {
                     format!(
                         "{COMPONENT} (error: {error}) - failed to find topic 
with ID: {topic_id}"
                     )
                 })?;
-            self.permissioner.update_topic(
+            self.permissioner.borrow().update_topic(
                 session.get_user_id(),
                 topic.stream_id,
                 topic.topic_id,
@@ -198,6 +192,8 @@ impl IggyShard {
             })?;
         }
 
+        // TODO: Make update topic sync, and extract the storage persister out 
of it
+        // perform disk i/o outside of the borrow_mut of the stream.
         self.get_stream_mut(stream_id)?
             .update_topic(
                 topic_id,
@@ -224,11 +220,11 @@ impl IggyShard {
             .get_topic(topic_id)
             .with_error_context(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to get topic 
with ID: {topic_id} in stream with ID: {stream_id}")
-            })
+            }).map(|topic| topic.topic_id)
     }
 
     pub async fn delete_topic(
-        &mut self,
+        &self,
         session: &Session,
         stream_id: &Identifier,
         topic_id: &Identifier,
@@ -236,12 +232,15 @@ impl IggyShard {
         self.ensure_authenticated(session)?;
         let stream_id_value;
         {
+            let stream = self.get_stream(stream_id).with_error_context(|error| 
{
+                format!("{COMPONENT} (error: {error}) - failed to get stream 
with ID: {stream_id}")
+            })?;
             let topic = self
-                .find_topic(session, stream_id, topic_id)
+                .find_topic(session, &stream, topic_id)
                 .with_error_context(|error| {
                     format!("{COMPONENT} (error: {error}) - failed to find 
topic with ID: {topic_id} in stream with ID: {stream_id}")
                 })?;
-            self.permissioner.delete_topic(
+            self.permissioner.borrow().delete_topic(
                 session.get_user_id(),
                 topic.stream_id,
                 topic.topic_id,
@@ -254,6 +253,8 @@ impl IggyShard {
             stream_id_value = topic.stream_id;
         }
 
+        // TODO: Make delete topic sync, and extract the storage persister out 
of it
+        // perform disk i/o outside of the borrow_mut of the stream.
         let topic = self
             .get_stream_mut(stream_id)?
             .delete_topic(topic_id)
@@ -266,10 +267,9 @@ impl IggyShard {
         self.metrics.decrement_messages(topic.get_messages_count());
         self.metrics
             .decrement_segments(topic.get_segments_count().await);
-        let client_manager = self.client_manager.read().await;
-        client_manager
-            .delete_consumer_groups_for_topic(stream_id_value, topic.topic_id)
-            .await;
+        self.client_manager
+            .borrow_mut()
+            .delete_consumer_groups_for_topic(stream_id_value, topic.topic_id);
         Ok(())
     }
 
@@ -279,12 +279,16 @@ impl IggyShard {
         stream_id: &Identifier,
         topic_id: &Identifier,
     ) -> Result<(), IggyError> {
+        let stream = self.get_stream(stream_id).with_error_context(|error| {
+            format!("{COMPONENT} (error: {error}) - failed to get stream with 
ID: {stream_id}")
+        })?;
         let topic = self
-            .find_topic(session, stream_id, topic_id)
+            .find_topic(session, &stream, topic_id)
             .with_error_context(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to find topic 
with ID: {topic_id} in stream with ID: {stream_id}")
             })?;
         self.permissioner
+            .borrow()
             .purge_topic(session.get_user_id(), topic.stream_id, 
topic.topic_id)
             .with_error_context(|error| {
                 format!(
diff --git a/core/server/src/shard/system/users.rs 
b/core/server/src/shard/system/users.rs
index f2405f1f..f3b271a7 100644
--- a/core/server/src/shard/system/users.rs
+++ b/core/server/src/shard/system/users.rs
@@ -16,6 +16,7 @@
  * under the License.
  */
 
+use super::COMPONENT;
 use crate::shard::IggyShard;
 use crate::state::command::EntryCommand;
 use crate::state::models::CreateUserWithId;
@@ -33,6 +34,7 @@ use iggy_common::create_user::CreateUser;
 use iggy_common::defaults::*;
 use iggy_common::locking::IggySharedMutFn;
 use iggy_common::{IdKind, Identifier};
+use std::cell::RefMut;
 use std::env;
 use std::sync::Arc;
 use std::sync::atomic::{AtomicU32, Ordering};
@@ -46,7 +48,7 @@ impl IggyShard {
         &self,
         session: &Session,
         user_id: &Identifier,
-    ) -> Result<Option<&User>, IggyError> {
+    ) -> Result<Option<User>, IggyError> {
         self.ensure_authenticated(session)?;
         let Some(user) = self.try_get_user(user_id)? else {
             return Ok(None);
@@ -54,7 +56,7 @@ impl IggyShard {
 
         let session_user_id = session.get_user_id();
         if user.id != session_user_id {
-            
self.permissioner.get_user(session_user_id).with_error_context(|error| {
+            
self.permissioner.borrow().get_user(session_user_id).with_error_context(|error| 
{
                 format!(
                     "{COMPONENT} (error: {error}) - permission denied to get 
user with ID: {user_id} for current user with ID: {session_user_id}"
                 )
@@ -64,45 +66,73 @@ impl IggyShard {
         Ok(Some(user))
     }
 
-    pub fn get_user(&self, user_id: &Identifier) -> Result<&User, IggyError> {
+    pub fn get_user(&self, user_id: &Identifier) -> Result<User, IggyError> {
         self.try_get_user(user_id)?
             .ok_or(IggyError::ResourceNotFound(user_id.to_string()))
     }
 
-    pub fn try_get_user(&self, user_id: &Identifier) -> Result<Option<&User>, 
IggyError> {
+    pub fn try_get_user(&self, user_id: &Identifier) -> Result<Option<User>, 
IggyError> {
         match user_id.kind {
-            IdKind::Numeric => Ok(self.users.get(&user_id.get_u32_value()?)),
+            IdKind::Numeric => {
+                let user = self
+                    .users
+                    .borrow()
+                    .get(&user_id.get_u32_value()?)
+                    .map(|user| user.clone());
+                Ok(user)
+            }
             IdKind::String => {
                 let username = user_id.get_cow_str_value()?;
-                Ok(self
+                let user = self
                     .users
+                    .borrow()
                     .iter()
                     .find(|(_, user)| user.username == username)
-                    .map(|(_, user)| user))
+                    .map(|(_, user)| user.clone());
+                Ok(user)
             }
         }
     }
 
-    pub fn get_user_mut(&mut self, user_id: &Identifier) -> Result<&mut User, 
IggyError> {
+    pub fn get_user_mut(&self, user_id: &Identifier) -> Result<RefMut<'_, 
User>, IggyError> {
         match user_id.kind {
-            IdKind::Numeric => self
-                .users
-                .get_mut(&user_id.get_u32_value()?)
-                .ok_or(IggyError::ResourceNotFound(user_id.to_string())),
+            IdKind::Numeric => {
+                let user_id = user_id.get_u32_value()?;
+                let users = self.users.borrow_mut();
+                let exists = users.contains_key(&user_id);
+                if !exists {
+                    return 
Err(IggyError::ResourceNotFound(user_id.to_string()));
+                }
+                Ok(RefMut::map(users, |u| {
+                    let user = u.get_mut(&user_id);
+                    user.unwrap()
+                }))
+            }
             IdKind::String => {
                 let username = user_id.get_cow_str_value()?;
-                self.users
-                    .iter_mut()
+                let users = self.users.borrow_mut();
+                let exists = users
+                    .iter()
                     .find(|(_, user)| user.username == username)
-                    .map(|(_, user)| user)
-                    .ok_or(IggyError::ResourceNotFound(user_id.to_string()))
+                    .is_some();
+                if !exists {
+                    return 
Err(IggyError::ResourceNotFound(user_id.to_string()));
+                }
+                Ok(RefMut::map(users, |u| {
+                    let user = u
+                        .iter_mut()
+                        .find(|(_, user)| user.username == username)
+                        .map(|(_, user)| user);
+                    user.unwrap()
+                }))
             }
         }
     }
 
-    pub async fn get_users(&self, session: &Session) -> Result<Vec<&User>, 
IggyError> {
+    pub async fn get_users(&self, session: &Session) -> Result<Vec<User>, 
IggyError> {
         self.ensure_authenticated(session)?;
         self.permissioner
+        .borrow()
             .get_users(session.get_user_id())
             .with_error_context(|error| {
                 format!(
@@ -110,19 +140,20 @@ impl IggyShard {
                     session.get_user_id()
                 )
             })?;
-        Ok(self.users.values().collect())
+        Ok(self.users.borrow().values().cloned().collect())
     }
 
     pub async fn create_user(
-        &mut self,
+        &self,
         session: &Session,
         username: &str,
         password: &str,
         status: UserStatus,
         permissions: Option<Permissions>,
-    ) -> Result<&User, IggyError> {
+    ) -> Result<User, IggyError> {
         self.ensure_authenticated(session)?;
         self.permissioner
+        .borrow()
             .create_user(session.get_user_id())
             .with_error_context(|error| {
                 format!(
@@ -131,12 +162,17 @@ impl IggyShard {
                 )
             })?;
 
-        if self.users.iter().any(|(_, user)| user.username == username) {
+        if self
+            .users
+            .borrow()
+            .iter()
+            .any(|(_, user)| user.username == username)
+        {
             error!("User: {username} already exists.");
             return Err(IggyError::UserAlreadyExists);
         }
 
-        if self.users.len() >= MAX_USERS {
+        if self.users.borrow().len() >= MAX_USERS {
             error!("Available users limit reached.");
             return Err(IggyError::UsersLimitReached);
         }
@@ -145,8 +181,9 @@ impl IggyShard {
         info!("Creating user: {username} with ID: {user_id}...");
         let user = User::new(user_id, username, password, status, 
permissions.clone());
         self.permissioner
+            .borrow_mut()
             .init_permissions_for_user(user_id, permissions);
-        self.users.insert(user.id, user);
+        self.users.borrow_mut().insert(user.id, user);
         info!("Created user: {username} with ID: {user_id}.");
         self.metrics.increment_users(1);
         self.get_user(&user_id.try_into()?)
@@ -155,16 +192,13 @@ impl IggyShard {
             })
     }
 
-    pub async fn delete_user(
-        &mut self,
-        session: &Session,
-        user_id: &Identifier,
-    ) -> Result<User, IggyError> {
+    pub fn delete_user(&self, session: &Session, user_id: &Identifier) -> 
Result<User, IggyError> {
         self.ensure_authenticated(session)?;
         let existing_user_id;
         let existing_username;
         {
             self.permissioner
+                .borrow()
                 .delete_user(session.get_user_id())
                 .with_error_context(|error| {
                     format!(
@@ -187,14 +221,14 @@ impl IggyShard {
         info!("Deleting user: {existing_username} with ID: {user_id}...");
         let user = self
             .users
+            .borrow_mut()
             .remove(&existing_user_id)
             .ok_or(IggyError::ResourceNotFound(user_id.to_string()))?;
         self.permissioner
+            .borrow_mut()
             .delete_permissions_for_user(existing_user_id);
-        let mut client_manager = self.client_manager.write().await;
-        client_manager
+        self.client_manager.borrow_mut()
             .delete_clients_for_user(existing_user_id)
-            .await
             .with_error_context(|error| {
                 format!(
                     "{COMPONENT} (error: {error}) - failed to delete clients 
for user with ID: {existing_user_id}"
@@ -205,15 +239,16 @@ impl IggyShard {
         Ok(user)
     }
 
-    pub async fn update_user(
-        &mut self,
+    pub fn update_user(
+        &self,
         session: &Session,
         user_id: &Identifier,
         username: Option<String>,
         status: Option<UserStatus>,
-    ) -> Result<&User, IggyError> {
+    ) -> Result<User, IggyError> {
         self.ensure_authenticated(session)?;
         self.permissioner
+        .borrow()
             .update_user(session.get_user_id())
             .with_error_context(|error| {
                 format!(
@@ -231,7 +266,7 @@ impl IggyShard {
             }
         }
 
-        let user = self.get_user_mut(user_id).with_error_context(|error| {
+        let mut user = self.get_user_mut(user_id).with_error_context(|error| {
             format!("{COMPONENT} (error: {error}) - failed to get mutable 
reference to the user with id: {user_id}")
         })?;
         if let Some(username) = username {
@@ -241,13 +276,18 @@ impl IggyShard {
         if let Some(status) = status {
             user.status = status;
         }
+        let cloned_user = user.clone();
+        drop(user);
 
-        info!("Updated user: {} with ID: {}.", user.username, user.id);
-        Ok(user)
+        info!(
+            "Updated user: {} with ID: {}.",
+            cloned_user.username, cloned_user.id
+        );
+        Ok(cloned_user)
     }
 
-    pub async fn update_permissions(
-        &mut self,
+    pub fn update_permissions(
+        &self,
         session: &Session,
         user_id: &Identifier,
         permissions: Option<Permissions>,
@@ -256,13 +296,14 @@ impl IggyShard {
 
         {
             self.permissioner
+            .borrow()
                 .update_permissions(session.get_user_id())
                 .with_error_context(|error| {
                     format!(
                         "{COMPONENT} (error: {error}) - permission denied to 
update permissions for user with id: {}", session.get_user_id()
                     )
                 })?;
-            let user = self.get_user(user_id).with_error_context(|error| {
+            let user: User = self.get_user(user_id).with_error_context(|error| 
{
                 format!("{COMPONENT} (error: {error}) - failed to get user 
with id: {user_id}")
             })?;
             if user.is_root() {
@@ -271,11 +312,12 @@ impl IggyShard {
             }
 
             self.permissioner
+                .borrow_mut()
                 .update_permissions_for_user(user.id, permissions.clone());
         }
 
         {
-            let user = self.get_user_mut(user_id).with_error_context(|error| {
+            let mut user = 
self.get_user_mut(user_id).with_error_context(|error| {
                 format!(
                     "{COMPONENT} (error: {error}) - failed to get mutable 
reference to the user with id: {user_id}"
                 )
@@ -290,8 +332,8 @@ impl IggyShard {
         Ok(())
     }
 
-    pub async fn change_password(
-        &mut self,
+    pub fn change_password(
+        &self,
         session: &Session,
         user_id: &Identifier,
         current_password: &str,
@@ -305,11 +347,13 @@ impl IggyShard {
             })?;
             let session_user_id = session.get_user_id();
             if user.id != session_user_id {
-                self.permissioner.change_password(session_user_id)?;
+                self.permissioner
+                    .borrow()
+                    .change_password(session_user_id)?;
             }
         }
 
-        let user = self.get_user_mut(user_id).with_error_context(|error| {
+        let mut user = self.get_user_mut(user_id).with_error_context(|error| {
             format!("{COMPONENT} (error: {error}) - failed to get mutable 
reference to the user with id: {user_id}")
         })?;
         if !crypto::verify_password(current_password, &user.password) {
@@ -328,22 +372,21 @@ impl IggyShard {
         Ok(())
     }
 
-    pub async fn login_user(
+    pub fn login_user(
         &self,
         username: &str,
         password: &str,
         session: Option<&Session>,
-    ) -> Result<&User, IggyError> {
+    ) -> Result<User, IggyError> {
         self.login_user_with_credentials(username, Some(password), session)
-            .await
     }
 
-    pub async fn login_user_with_credentials(
+    pub fn login_user_with_credentials(
         &self,
         username: &str,
         password: Option<&str>,
         session: Option<&Session>,
-    ) -> Result<&User, IggyError> {
+    ) -> Result<User, IggyError> {
         let user = match self.get_user(&username.try_into()?) {
             Ok(user) => user,
             Err(_) => {
@@ -380,14 +423,13 @@ impl IggyShard {
                 user.username,
                 session.get_user_id()
             );
-            self.logout_user(session).await?;
+            self.logout_user(session)?;
         }
 
         session.set_user_id(user.id);
-        let mut client_manager = self.client_manager.write().await;
+        let mut client_manager = self.client_manager.borrow_mut();
         client_manager
             .set_user_id(session.client_id, user.id)
-            .await
             .with_error_context(|error| {
                 format!(
                     "{COMPONENT} (error: {error}) - failed to set user_id to 
client, client ID: {}, user ID: {}",
@@ -397,7 +439,7 @@ impl IggyShard {
         Ok(user)
     }
 
-    pub async fn logout_user(&self, session: &Session) -> Result<(), 
IggyError> {
+    pub fn logout_user(&self, session: &Session) -> Result<(), IggyError> {
         self.ensure_authenticated(session)?;
         let user = self
             .get_user(&Identifier::numeric(session.get_user_id())?)
@@ -412,8 +454,8 @@ impl IggyShard {
             user.username, user.id
         );
         if session.client_id > 0 {
-            let mut client_manager = self.client_manager.write().await;
-            client_manager.clear_user_id(session.client_id).await?;
+            let mut client_manager = self.client_manager.borrow_mut();
+            client_manager.clear_user_id(session.client_id)?;
             info!(
                 "Cleared user ID: {} for client: {}.",
                 user.id, session.client_id
diff --git a/core/server/src/streaming/partitions/consumer_offsets.rs 
b/core/server/src/streaming/partitions/consumer_offsets.rs
index 96080c03..e9f5c57d 100644
--- a/core/server/src/streaming/partitions/consumer_offsets.rs
+++ b/core/server/src/streaming/partitions/consumer_offsets.rs
@@ -26,10 +26,7 @@ use iggy_common::IggyError;
 use tracing::trace;
 
 impl Partition {
-    pub fn get_consumer_offset(
-        &self,
-        consumer: PollingConsumer,
-    ) -> Result<Option<u64>, IggyError> {
+    pub fn get_consumer_offset(&self, consumer: PollingConsumer) -> 
Result<Option<u64>, IggyError> {
         trace!(
             "Getting consumer offset for {}, partition: {}, current: {}...",
             consumer, self.partition_id, self.current_offset
diff --git a/core/server/src/streaming/storage.rs 
b/core/server/src/streaming/storage.rs
index 978b97aa..eaf96dc3 100644
--- a/core/server/src/streaming/storage.rs
+++ b/core/server/src/streaming/storage.rs
@@ -87,7 +87,7 @@ pub enum PartitionStorageKind {
 #[cfg_attr(test, automock)]
 pub trait SystemInfoStorage {
     fn load(&self) -> impl Future<Output = Result<SystemInfo, IggyError>>;
-    fn save(&self, system_info: &SystemInfo) -> impl Future<Output = 
Result<(), IggyError>>; 
+    fn save(&self, system_info: &SystemInfo) -> impl Future<Output = 
Result<(), IggyError>>;
 }
 
 #[cfg_attr(test, automock)]
@@ -96,8 +96,8 @@ pub trait StreamStorage {
         &self,
         stream: &mut Stream,
         state: StreamState,
-    ) -> impl Future<Output = Result<(), IggyError>>; 
-    fn save(&self, stream: &Stream) -> impl Future<Output = Result<(), 
IggyError>>; 
+    ) -> impl Future<Output = Result<(), IggyError>>;
+    fn save(&self, stream: &Stream) -> impl Future<Output = Result<(), 
IggyError>>;
     fn delete(&self, stream: &Stream) -> impl Future<Output = Result<(), 
IggyError>>;
 }
 
@@ -107,9 +107,9 @@ pub trait TopicStorage {
         &self,
         topic: &mut Topic,
         state: TopicState,
-    ) -> impl Future<Output = Result<(), IggyError>>; 
-    fn save(&self, topic: &Topic) -> impl Future<Output = Result<(), 
IggyError>>; 
-    fn delete(&self, topic: &Topic) -> impl Future<Output = Result<(), 
IggyError>>; 
+    ) -> impl Future<Output = Result<(), IggyError>>;
+    fn save(&self, topic: &Topic) -> impl Future<Output = Result<(), 
IggyError>>;
+    fn delete(&self, topic: &Topic) -> impl Future<Output = Result<(), 
IggyError>>;
 }
 
 #[cfg_attr(test, automock)]
@@ -119,9 +119,8 @@ pub trait PartitionStorage {
         partition: &mut Partition,
         state: PartitionState,
     ) -> impl Future<Output = Result<(), IggyError>>;
-    fn save(&self, partition: &mut Partition)
-    -> impl Future<Output = Result<(), IggyError>>;
-    fn delete(&self, partition: &Partition) -> impl Future<Output = Result<(), 
IggyError>>; 
+    fn save(&self, partition: &mut Partition) -> impl Future<Output = 
Result<(), IggyError>>;
+    fn delete(&self, partition: &Partition) -> impl Future<Output = Result<(), 
IggyError>>;
     fn save_consumer_offset(
         &self,
         offset: u64,
@@ -131,15 +130,9 @@ pub trait PartitionStorage {
         &self,
         kind: ConsumerKind,
         path: &str,
-    ) -> impl Future<Output = Result<Vec<ConsumerOffset>, IggyError>>; 
-    fn delete_consumer_offsets(
-        &self,
-        path: &str,
-    ) -> impl Future<Output = Result<(), IggyError>>; 
-    fn delete_consumer_offset(
-        &self,
-        path: &str,
-    ) -> impl Future<Output = Result<(), IggyError>>;
+    ) -> impl Future<Output = Result<Vec<ConsumerOffset>, IggyError>>;
+    fn delete_consumer_offsets(&self, path: &str) -> impl Future<Output = 
Result<(), IggyError>>;
+    fn delete_consumer_offset(&self, path: &str) -> impl Future<Output = 
Result<(), IggyError>>;
 }
 
 #[derive(Debug)]
diff --git a/core/server/src/streaming/topics/consumer_group.rs 
b/core/server/src/streaming/topics/consumer_group.rs
index f000ce79..c810159c 100644
--- a/core/server/src/streaming/topics/consumer_group.rs
+++ b/core/server/src/streaming/topics/consumer_group.rs
@@ -57,10 +57,7 @@ impl ConsumerGroup {
         self.assign_partitions();
     }
 
-    pub fn calculate_partition_id(
-        &mut self,
-        member_id: u32,
-    ) -> Result<Option<u32>, IggyError> {
+    pub fn calculate_partition_id(&mut self, member_id: u32) -> 
Result<Option<u32>, IggyError> {
         let member = self.members.get_mut(&member_id);
         if let Some(member) = member {
             return Ok(member.calculate_partition_id());
diff --git a/core/server/src/streaming/topics/consumer_groups.rs 
b/core/server/src/streaming/topics/consumer_groups.rs
index c1371096..92168738 100644
--- a/core/server/src/streaming/topics/consumer_groups.rs
+++ b/core/server/src/streaming/topics/consumer_groups.rs
@@ -17,12 +17,12 @@
  */
 
 use crate::binary::handlers::topics::get_topic_handler;
-use crate::streaming::topics::{consumer_group, COMPONENT};
 use crate::streaming::topics::consumer_group::ConsumerGroup;
 use crate::streaming::topics::topic::Topic;
+use crate::streaming::topics::{COMPONENT, consumer_group};
 use error_set::ErrContext;
-use iggy_common::locking::IggySharedMutFn;
 use iggy_common::IggyError;
+use iggy_common::locking::IggySharedMutFn;
 use iggy_common::{IdKind, Identifier};
 use std::cell::{Ref, RefMut};
 use std::sync::atomic::Ordering;
@@ -48,7 +48,10 @@ impl Topic {
         self.consumer_groups.borrow().values().cloned().collect()
     }
 
-    pub fn get_consumer_group(&self, identifier: &Identifier) -> 
Result<Ref<'_, ConsumerGroup>, IggyError> {
+    pub fn get_consumer_group(
+        &self,
+        identifier: &Identifier,
+    ) -> Result<Ref<'_, ConsumerGroup>, IggyError> {
         match identifier.kind {
             IdKind::Numeric => 
self.get_consumer_group_by_id(identifier.get_u32_value().unwrap()),
             IdKind::String => 
self.get_consumer_group_by_name(&identifier.get_cow_str_value()?),
@@ -93,7 +96,10 @@ impl Topic {
         }))
     }
 
-    pub fn get_consumer_group_by_name(&self, name: &str) -> Result<Ref<'_, 
ConsumerGroup>, IggyError> {
+    pub fn get_consumer_group_by_name(
+        &self,
+        name: &str,
+    ) -> Result<Ref<'_, ConsumerGroup>, IggyError> {
         let group_id = self.consumer_groups_ids.get(name);
         if group_id.is_none() {
             return Err(IggyError::ConsumerGroupNameNotFound(
@@ -199,12 +205,8 @@ impl Topic {
             return Err(IggyError::ConsumerGroupIdAlreadyExists(id, 
self.topic_id));
         }
 
-        let consumer_group = ConsumerGroup::new(
-            self.topic_id,
-            id,
-            name,
-            self.partitions.len() as u32,
-        );
+        let consumer_group =
+            ConsumerGroup::new(self.topic_id, id, name, self.partitions.len() 
as u32);
         self.consumer_groups_ids.insert(name.to_owned(), id);
         let cloned_group = consumer_group.clone();
         self.consumer_groups.borrow_mut().insert(id, consumer_group);
diff --git a/core/server/src/streaming/topics/topic.rs 
b/core/server/src/streaming/topics/topic.rs
index 16aa324c..7e8ca4ac 100644
--- a/core/server/src/streaming/topics/topic.rs
+++ b/core/server/src/streaming/topics/topic.rs
@@ -23,13 +23,13 @@ use crate::streaming::storage::SystemStorage;
 use crate::streaming::topics::consumer_group::ConsumerGroup;
 use ahash::AHashMap;
 use core::fmt;
-use std::cell::RefCell;
-use std::rc::Rc;
 use iggy_common::locking::IggyRwLock;
 use iggy_common::{
     CompressionAlgorithm, Consumer, ConsumerKind, IggyByteSize, IggyError, 
IggyExpiry,
     IggyTimestamp, MaxTopicSize, Sizeable,
 };
+use std::cell::RefCell;
+use std::rc::Rc;
 
 use std::sync::Arc;
 use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
diff --git a/core/server/src/streaming/users/user.rs 
b/core/server/src/streaming/users/user.rs
index 01a9faa0..7e68d98f 100644
--- a/core/server/src/streaming/users/user.rs
+++ b/core/server/src/streaming/users/user.rs
@@ -24,7 +24,7 @@ use iggy_common::defaults::*;
 use iggy_common::{Permissions, UserId};
 use std::sync::Arc;
 
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct User {
     pub id: UserId,
     pub status: UserStatus,

Reply via email to