This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch rebase_master
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 8566672cc5b26dcd2b0d98781d59dd7acde8dbc0
Author: numinex <[email protected]>
AuthorDate: Wed Jun 25 14:32:26 2025 +0200

    fixes
---
 core/common/src/locking/mod.rs                     |  10 +-
 core/common/src/locking/tokio_lock.rs              |   2 -
 core/server/src/shard/system/clients.rs            |   2 +-
 core/server/src/shard/system/consumer_groups.rs    |   3 +-
 core/server/src/shard/system/consumer_offsets.rs   |   2 -
 core/server/src/shard/system/mod.rs                |   3 +-
 .../src/shard/system/personal_access_tokens.rs     |   2 -
 core/server/src/shard/system/segments.rs           |   3 +-
 core/server/src/shard/system/stats.rs              |   2 +-
 core/server/src/shard/system/system.rs             | 291 ---------------------
 core/server/src/shard/system/topics.rs             |   5 +-
 .../src/streaming/partitions/consumer_offsets.rs   |   2 +-
 core/server/src/streaming/storage.rs               |   2 +-
 core/server/src/streaming/streams/topics.rs        |   3 +-
 core/server/src/streaming/topics/consumer_group.rs |  31 ++-
 .../server/src/streaming/topics/consumer_groups.rs | 136 ++++++----
 .../src/streaming/topics/consumer_offsets.rs       |   6 +-
 core/server/src/streaming/topics/messages.rs       |   5 +-
 core/server/src/streaming/topics/partitions.rs     |   4 +-
 core/server/src/streaming/topics/storage.rs        |   9 +-
 core/server/src/streaming/topics/topic.rs          |  25 +-
 21 files changed, 142 insertions(+), 406 deletions(-)

diff --git a/core/common/src/locking/mod.rs b/core/common/src/locking/mod.rs
index 42d72ebb..c2e542c7 100644
--- a/core/common/src/locking/mod.rs
+++ b/core/common/src/locking/mod.rs
@@ -29,19 +29,19 @@ mod fast_async_lock;
 
 #[cfg(feature = "tokio_lock")]
 #[cfg(not(any(feature = "fast_async_lock")))]
-pub type IggySharedMut<T> = tokio_lock::IggyTokioRwLock<T>;
+pub type IggyRwLock<T> = tokio_lock::IggyTokioRwLock<T>;
 
 //this can be used in the future to provide different locking mechanisms
 #[cfg(feature = "fast_async_lock")]
-pub type IggySharedMut<T> = fast_async_lock::IggyFastAsyncRwLock<T>;
+pub type IggyRwLock<T> = fast_async_lock::IggyFastAsyncRwLock<T>;
 
 #[allow(async_fn_in_trait)]
-pub trait IggySharedMutFn<T>: Send + Sync {
-    type ReadGuard<'a>: Deref<Target = T> + Send
+pub trait IggySharedMutFn<T> {
+    type ReadGuard<'a>: Deref<Target = T> 
     where
         T: 'a,
         Self: 'a;
-    type WriteGuard<'a>: DerefMut<Target = T> + Send
+    type WriteGuard<'a>: DerefMut<Target = T> 
     where
         T: 'a,
         Self: 'a;
diff --git a/core/common/src/locking/tokio_lock.rs 
b/core/common/src/locking/tokio_lock.rs
index 2e26b367..ef5baaad 100644
--- a/core/common/src/locking/tokio_lock.rs
+++ b/core/common/src/locking/tokio_lock.rs
@@ -25,8 +25,6 @@ use tokio::sync::{RwLock as TokioRwLock, RwLockReadGuard, 
RwLockWriteGuard};
 pub struct IggyTokioRwLock<T>(Arc<TokioRwLock<T>>);
 
 impl<T> IggySharedMutFn<T> for IggyTokioRwLock<T>
-where
-    T: Send + Sync,
 {
     type ReadGuard<'a>
         = RwLockReadGuard<'a, T>
diff --git a/core/server/src/shard/system/clients.rs 
b/core/server/src/shard/system/clients.rs
index fde182ce..a2debc42 100644
--- a/core/server/src/shard/system/clients.rs
+++ b/core/server/src/shard/system/clients.rs
@@ -22,7 +22,7 @@ use crate::streaming::session::Session;
 use error_set::ErrContext;
 use iggy_common::Identifier;
 use iggy_common::IggyError;
-use iggy_common::locking::IggySharedMut;
+use iggy_common::locking::IggyRwLock;
 use iggy_common::locking::IggySharedMutFn;
 use std::net::SocketAddr;
 use std::rc::Rc;
diff --git a/core/server/src/shard/system/consumer_groups.rs 
b/core/server/src/shard/system/consumer_groups.rs
index 4975a92d..d8d8818e 100644
--- a/core/server/src/shard/system/consumer_groups.rs
+++ b/core/server/src/shard/system/consumer_groups.rs
@@ -16,9 +16,8 @@
  * under the License.
  */
 
+use crate::shard::IggyShard;
 use crate::streaming::session::Session;
-use crate::streaming::systems::COMPONENT;
-use crate::streaming::systems::system::System;
 use crate::streaming::topics::consumer_group::ConsumerGroup;
 use error_set::ErrContext;
 use iggy_common::Identifier;
diff --git a/core/server/src/shard/system/consumer_offsets.rs 
b/core/server/src/shard/system/consumer_offsets.rs
index e63815ff..0c7438a6 100644
--- a/core/server/src/shard/system/consumer_offsets.rs
+++ b/core/server/src/shard/system/consumer_offsets.rs
@@ -17,8 +17,6 @@
  */
 
 use crate::streaming::session::Session;
-use crate::streaming::systems::COMPONENT;
-use crate::streaming::systems::system::System;
 use error_set::ErrContext;
 use iggy_common::{Consumer, ConsumerOffsetInfo, Identifier, IggyError};
 
diff --git a/core/server/src/shard/system/mod.rs 
b/core/server/src/shard/system/mod.rs
index 1419abb9..fae8ad2b 100644
--- a/core/server/src/shard/system/mod.rs
+++ b/core/server/src/shard/system/mod.rs
@@ -28,8 +28,7 @@ pub mod snapshot;
 pub mod stats;
 pub mod storage;
 pub mod streams;
-pub mod system;
 pub mod topics;
 pub mod users;
 
-pub const COMPONENT: &str = "SYSTEM";
+pub const COMPONENT: &str = "SHARD_SYSTEM";
diff --git a/core/server/src/shard/system/personal_access_tokens.rs 
b/core/server/src/shard/system/personal_access_tokens.rs
index 18125bed..b5452e9a 100644
--- a/core/server/src/shard/system/personal_access_tokens.rs
+++ b/core/server/src/shard/system/personal_access_tokens.rs
@@ -18,8 +18,6 @@
 
 use 
crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken;
 use crate::streaming::session::Session;
-use crate::streaming::systems::COMPONENT;
-use crate::streaming::systems::system::System;
 use crate::streaming::users::user::User;
 use error_set::ErrContext;
 use iggy_common::IggyError;
diff --git a/core/server/src/shard/system/segments.rs 
b/core/server/src/shard/system/segments.rs
index bd4043a2..27abb76b 100644
--- a/core/server/src/shard/system/segments.rs
+++ b/core/server/src/shard/system/segments.rs
@@ -1,3 +1,4 @@
+use crate::shard::IggyShard;
 /* Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,8 +17,6 @@
  * under the License.
  */
 use crate::streaming::session::Session;
-use crate::streaming::systems::COMPONENT;
-use crate::streaming::systems::system::System;
 use error_set::ErrContext;
 use iggy_common::Identifier;
 use iggy_common::IggyError;
diff --git a/core/server/src/shard/system/stats.rs 
b/core/server/src/shard/system/stats.rs
index 2d2d2ea7..8410cac2 100644
--- a/core/server/src/shard/system/stats.rs
+++ b/core/server/src/shard/system/stats.rs
@@ -16,8 +16,8 @@
  * under the License.
  */
 
+use crate::shard::IggyShard;
 use crate::VERSION;
-use crate::streaming::systems::system::System;
 use crate::versioning::SemanticVersion;
 use iggy_common::locking::IggySharedMutFn;
 use iggy_common::{IggyDuration, IggyError, Stats};
diff --git a/core/server/src/shard/system/system.rs 
b/core/server/src/shard/system/system.rs
deleted file mode 100644
index 5d830d71..00000000
--- a/core/server/src/shard/system/system.rs
+++ /dev/null
@@ -1,291 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use crate::archiver::{ArchiverKind, ArchiverKindType};
-use crate::configs::server::{DataMaintenanceConfig, PersonalAccessTokenConfig};
-use crate::configs::system::SystemConfig;
-use crate::map_toggle_str;
-use crate::state::StateKind;
-use crate::state::file::FileState;
-use crate::state::system::SystemState;
-use crate::streaming::clients::client_manager::ClientManager;
-use crate::streaming::diagnostics::metrics::Metrics;
-use crate::streaming::persistence::persister::*;
-use crate::streaming::session::Session;
-use crate::streaming::storage::SystemStorage;
-use crate::streaming::streams::stream::Stream;
-use crate::streaming::systems::COMPONENT;
-use crate::streaming::users::permissioner::Permissioner;
-use crate::streaming::users::user::User;
-use crate::versioning::SemanticVersion;
-use ahash::AHashMap;
-use error_set::ErrContext;
-use iggy_common::locking::IggySharedMut;
-use iggy_common::locking::IggySharedMutFn;
-use iggy_common::{Aes256GcmEncryptor, EncryptorKind, IggyError, UserId};
-use std::path::Path;
-use std::sync::Arc;
-use tokio::fs::{create_dir_all, remove_dir_all};
-use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
-use tokio::time::Instant;
-use tracing::{error, info, instrument, trace};
-
-#[derive(Debug)]
-pub struct SharedSystem {
-    system: Arc<RwLock<System>>,
-}
-
-impl SharedSystem {
-    pub fn new(system: System) -> SharedSystem {
-        SharedSystem {
-            system: Arc::new(RwLock::new(system)),
-        }
-    }
-
-    pub async fn read(&self) -> RwLockReadGuard<System> {
-        self.system.read().await
-    }
-
-    pub async fn write(&self) -> RwLockWriteGuard<System> {
-        self.system.write().await
-    }
-}
-
-impl Clone for SharedSystem {
-    fn clone(&self) -> Self {
-        SharedSystem {
-            system: self.system.clone(),
-        }
-    }
-}
-
-#[derive(Debug)]
-pub struct System {
-    pub permissioner: Permissioner,
-    pub(crate) storage: Arc<SystemStorage>,
-    pub(crate) streams: AHashMap<u32, Stream>,
-    pub(crate) streams_ids: AHashMap<String, u32>,
-    pub(crate) users: AHashMap<UserId, User>,
-    pub(crate) config: Arc<SystemConfig>,
-    pub(crate) client_manager: IggySharedMut<ClientManager>,
-    pub(crate) encryptor: Option<Arc<EncryptorKind>>,
-    pub(crate) metrics: Metrics,
-    pub(crate) state: Arc<StateKind>,
-    pub(crate) archiver: Option<Arc<ArchiverKind>>,
-    pub personal_access_token: PersonalAccessTokenConfig,
-}
-
-impl System {
-    pub fn new(
-        config: Arc<SystemConfig>,
-        data_maintenance_config: DataMaintenanceConfig,
-        pat_config: PersonalAccessTokenConfig,
-    ) -> System {
-        let version = SemanticVersion::current().expect("Invalid version");
-        info!(
-            "Server-side encryption is {}.",
-            map_toggle_str(config.encryption.enabled)
-        );
-
-        let encryptor: Option<EncryptorKind> = match config.encryption.enabled 
{
-            true => Some(EncryptorKind::Aes256Gcm(
-                
Aes256GcmEncryptor::from_base64_key(&config.encryption.key).unwrap(),
-            )),
-            false => None,
-        };
-
-        let state_persister = 
Self::resolve_persister(config.state.enforce_fsync);
-        let partition_persister = 
Self::resolve_persister(config.partition.enforce_fsync);
-
-        let state = Arc::new(StateKind::File(FileState::new(
-            &config.get_state_messages_file_path(),
-            &version,
-            state_persister,
-            encryptor,
-        )));
-
-        //TODO: Just shut the fuck up rust-analyzer.
-        let encryptor: Option<EncryptorKind> = match config.encryption.enabled 
{
-            true => Some(EncryptorKind::Aes256Gcm(
-                
Aes256GcmEncryptor::from_base64_key(&config.encryption.key).unwrap(),
-            )),
-            false => None,
-        };
-        Self::create(
-            config.clone(),
-            SystemStorage::new(config, partition_persister),
-            state,
-            encryptor.map(Arc::new),
-            data_maintenance_config,
-            pat_config,
-        )
-    }
-
-    fn resolve_persister(enforce_fsync: bool) -> Arc<PersisterKind> {
-        match enforce_fsync {
-            true => 
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister)),
-            false => Arc::new(PersisterKind::File(FilePersister)),
-        }
-    }
-
-    pub fn create(
-        system_config: Arc<SystemConfig>,
-        storage: SystemStorage,
-        state: Arc<StateKind>,
-        encryptor: Option<Arc<EncryptorKind>>,
-        data_maintenance_config: DataMaintenanceConfig,
-        pat_config: PersonalAccessTokenConfig,
-    ) -> System {
-        let archiver_config = data_maintenance_config.archiver;
-        let archiver: Option<Arc<ArchiverKind>> = if archiver_config.enabled {
-            info!("Archiving is enabled, kind: {}", archiver_config.kind);
-            match archiver_config.kind {
-                ArchiverKindType::Disk => 
Some(Arc::new(ArchiverKind::get_disk_archiver(
-                    archiver_config
-                        .disk
-                        .clone()
-                        .expect("Disk archiver config is missing"),
-                ))),
-                ArchiverKindType::S3 => Some(Arc::new(
-                    ArchiverKind::get_s3_archiver(
-                        archiver_config
-                            .s3
-                            .clone()
-                            .expect("S3 archiver config is missing"),
-                    )
-                    .expect("Failed to create S3 archiver"),
-                )),
-            }
-        } else {
-            info!("Archiving is disabled.");
-            None
-        };
-
-        System {
-            config: system_config,
-            streams: AHashMap::new(),
-            streams_ids: AHashMap::new(),
-            storage: Arc::new(storage),
-            encryptor,
-            client_manager: IggySharedMut::new(ClientManager::default()),
-            permissioner: Permissioner::default(),
-            metrics: Metrics::init(),
-            users: AHashMap::new(),
-            state,
-            personal_access_token: pat_config,
-            archiver,
-        }
-    }
-
-    #[instrument(skip_all, name = "trace_system_init")]
-    pub async fn init(&mut self) -> Result<(), IggyError> {
-        let system_path = self.config.get_system_path();
-        if !Path::new(&system_path).exists() && 
create_dir_all(&system_path).await.is_err() {
-            return Err(IggyError::CannotCreateBaseDirectory(system_path));
-        }
-
-        let state_path = self.config.get_state_path();
-        if !Path::new(&state_path).exists() && 
create_dir_all(&state_path).await.is_err() {
-            return Err(IggyError::CannotCreateStateDirectory(state_path));
-        }
-
-        let streams_path = self.config.get_streams_path();
-        if !Path::new(&streams_path).exists() && 
create_dir_all(&streams_path).await.is_err() {
-            return Err(IggyError::CannotCreateStreamsDirectory(streams_path));
-        }
-
-        let runtime_path = self.config.get_runtime_path();
-        if Path::new(&runtime_path).exists() && 
remove_dir_all(&runtime_path).await.is_err() {
-            return Err(IggyError::CannotRemoveRuntimeDirectory(runtime_path));
-        }
-
-        if create_dir_all(&runtime_path).await.is_err() {
-            return Err(IggyError::CannotCreateRuntimeDirectory(runtime_path));
-        }
-
-        info!(
-            "Initializing system, data will be stored at: {}",
-            self.config.get_system_path()
-        );
-
-        let state_entries = self.state.init().await.with_error_context(|error| 
{
-            format!("{COMPONENT} (error: {error}) - failed to initialize state 
entries")
-        })?;
-        let system_state = SystemState::init(state_entries)
-            .await
-            .with_error_context(|error| {
-                format!("{COMPONENT} (error: {error}) - failed to initialize 
system state")
-            })?;
-        let now = Instant::now();
-        //DONE
-        /*
-        self.load_version().await.with_error_context(|error| {
-            format!("{COMPONENT} (error: {error}) - failed to load version")
-        })?;
-        */
-        self.load_users(system_state.users.into_values().collect())
-            .await
-            .with_error_context(|error| {
-                format!("{COMPONENT} (error: {error}) - failed to load users")
-            })?;
-        self.load_streams(system_state.streams.into_values().collect())
-            .await
-            .with_error_context(|error| {
-                format!("{COMPONENT} (error: {error}) - failed to load 
streams")
-            })?;
-        if let Some(archiver) = self.archiver.as_ref() {
-            archiver
-                .init()
-                .await
-                .expect("Failed to initialize archiver");
-        }
-        info!("Initialized system in {} ms.", now.elapsed().as_millis());
-        Ok(())
-    }
-
-    #[instrument(skip_all, name = "trace_shutdown")]
-    pub async fn shutdown(&mut self) -> Result<(), IggyError> {
-        self.persist_messages().await?;
-        Ok(())
-    }
-
-    #[instrument(skip_all, name = "trace_persist_messages")]
-    pub async fn persist_messages(&self) -> Result<usize, IggyError> {
-        trace!("Saving buffered messages on disk...");
-        let mut saved_messages_number = 0;
-        for stream in self.streams.values() {
-            saved_messages_number += stream.persist_messages().await?;
-        }
-
-        Ok(saved_messages_number)
-    }
-
-    pub fn ensure_authenticated(&self, session: &Session) -> Result<(), 
IggyError> {
-        if !session.is_active() {
-            error!("{COMPONENT} - session is inactive, session: {session}");
-            return Err(IggyError::StaleClient);
-        }
-
-        if session.is_authenticated() {
-            Ok(())
-        } else {
-            error!("{COMPONENT} - unauthenticated access attempt, session: 
{session}");
-            Err(IggyError::Unauthenticated)
-        }
-    }
-}
diff --git a/core/server/src/shard/system/topics.rs 
b/core/server/src/shard/system/topics.rs
index a61f5464..145689af 100644
--- a/core/server/src/shard/system/topics.rs
+++ b/core/server/src/shard/system/topics.rs
@@ -16,15 +16,14 @@
  * under the License.
  */
 
+use crate::shard::IggyShard;
 use crate::streaming::session::Session;
-use crate::streaming::systems::COMPONENT;
-use crate::streaming::systems::system::System;
 use crate::streaming::topics::topic::Topic;
 use error_set::ErrContext;
 use iggy_common::locking::IggySharedMutFn;
 use iggy_common::{CompressionAlgorithm, Identifier, IggyError, IggyExpiry, 
MaxTopicSize};
 
-impl System {
+impl IggyShard {
     pub fn find_topic(
         &self,
         session: &Session,
diff --git a/core/server/src/streaming/partitions/consumer_offsets.rs 
b/core/server/src/streaming/partitions/consumer_offsets.rs
index 177793c9..96080c03 100644
--- a/core/server/src/streaming/partitions/consumer_offsets.rs
+++ b/core/server/src/streaming/partitions/consumer_offsets.rs
@@ -26,7 +26,7 @@ use iggy_common::IggyError;
 use tracing::trace;
 
 impl Partition {
-    pub async fn get_consumer_offset(
+    pub fn get_consumer_offset(
         &self,
         consumer: PollingConsumer,
     ) -> Result<Option<u64>, IggyError> {
diff --git a/core/server/src/streaming/storage.rs 
b/core/server/src/streaming/storage.rs
index 51e2565e..978b97aa 100644
--- a/core/server/src/streaming/storage.rs
+++ b/core/server/src/streaming/storage.rs
@@ -154,7 +154,7 @@ pub struct SystemStorage {
 impl SystemStorage {
     pub fn new(config: Rc<SystemConfig>, persister: Arc<PersisterKind>) -> 
Self {
         Self {
-            info: 
Arc::new(SystemInfoStorageKind::File(FileSystemInfoStorage::new(
+            info: 
Rc::new(SystemInfoStorageKind::File(FileSystemInfoStorage::new(
                 config.get_state_info_path(),
                 persister.clone(),
             ))),
diff --git a/core/server/src/streaming/streams/topics.rs 
b/core/server/src/streaming/streams/topics.rs
index b975ee83..ed350bee 100644
--- a/core/server/src/streaming/streams/topics.rs
+++ b/core/server/src/streaming/streams/topics.rs
@@ -146,7 +146,8 @@ impl Stream {
             topic.name = name.to_owned();
             topic.message_expiry = message_expiry;
             topic.compression_algorithm = compression_algorithm;
-            for partition in topic.partitions.borrow_mut().values_mut() {
+            for partition in topic.partitions.values_mut() {
+                let mut partition = partition.write().await;
                 partition.message_expiry = message_expiry;
                 for segment in partition.segments.iter_mut() {
                     segment.update_message_expiry(message_expiry);
diff --git a/core/server/src/streaming/topics/consumer_group.rs 
b/core/server/src/streaming/topics/consumer_group.rs
index cc28ef6f..f000ce79 100644
--- a/core/server/src/streaming/topics/consumer_group.rs
+++ b/core/server/src/streaming/topics/consumer_group.rs
@@ -52,12 +52,12 @@ impl ConsumerGroup {
         self.members.values().cloned().collect()
     }
 
-    pub async fn reassign_partitions(&mut self, partitions_count: u32) {
+    pub fn reassign_partitions(&mut self, partitions_count: u32) {
         self.partitions_count = partitions_count;
-        self.assign_partitions().await;
+        self.assign_partitions();
     }
 
-    pub async fn calculate_partition_id(
+    pub fn calculate_partition_id(
         &mut self,
         member_id: u32,
     ) -> Result<Option<u32>, IggyError> {
@@ -72,7 +72,7 @@ impl ConsumerGroup {
         ))
     }
 
-    pub async fn get_current_partition_id(&self, member_id: u32) -> 
Result<Option<u32>, IggyError> {
+    pub fn get_current_partition_id(&mut self, member_id: u32) -> 
Result<Option<u32>, IggyError> {
         let member = self.members.get(&member_id);
         if let Some(member) = member {
             return Ok(member.current_partition_id);
@@ -84,7 +84,7 @@ impl ConsumerGroup {
         ))
     }
 
-    pub async fn add_member(&mut self, member_id: u32) {
+    pub fn add_member(&mut self, member_id: u32) {
         self.members.insert(
             member_id,
             ConsumerGroupMember {
@@ -98,20 +98,20 @@ impl ConsumerGroup {
             "Added member with ID: {} to consumer group: {} for topic with ID: 
{}",
             member_id, self.group_id, self.topic_id
         );
-        self.assign_partitions().await;
+        self.assign_partitions();
     }
 
-    pub async fn delete_member(&mut self, member_id: u32) {
+    pub fn delete_member(&mut self, member_id: u32) {
         if self.members.remove(&member_id).is_some() {
             trace!(
                 "Deleted member with ID: {} in consumer group: {} for topic 
with ID: {}",
                 member_id, self.group_id, self.topic_id
             );
-            self.assign_partitions().await;
+            self.assign_partitions();
         }
     }
 
-    async fn assign_partitions(&mut self) {
+    fn assign_partitions(&mut self) {
         let mut members = self.members.values_mut().collect::<Vec<_>>();
         if members.is_empty() {
             return;
@@ -189,11 +189,10 @@ mod tests {
             members: AHashMap::new(),
         };
 
-        consumer_group.add_member(member_id).await;
+        consumer_group.add_member(member_id);
         for i in 0..1000 {
             let partition_id = consumer_group
                 .calculate_partition_id(member_id)
-                .await
                 .unwrap()
                 .expect("Partition ID not found");
             assert_eq!(partition_id, (i % consumer_group.partitions_count) + 
1);
@@ -211,7 +210,7 @@ mod tests {
             members: AHashMap::new(),
         };
 
-        consumer_group.add_member(member_id).await;
+        consumer_group.add_member(member_id);
         let member = consumer_group.members.get(&member_id).unwrap();
         assert_eq!(
             member.partitions.len() as u32,
@@ -235,8 +234,8 @@ mod tests {
             members: AHashMap::new(),
         };
 
-        consumer_group.add_member(member1_id).await;
-        consumer_group.add_member(member2_id).await;
+        consumer_group.add_member(member1_id);
+        consumer_group.add_member(member2_id);
         let member1 = consumer_group.members.get(&member1_id).unwrap();
         let member2 = consumer_group.members.get(&member2_id).unwrap();
         assert_eq!(
@@ -270,8 +269,8 @@ mod tests {
             members: AHashMap::new(),
         };
 
-        consumer_group.add_member(member1_id).await;
-        consumer_group.add_member(member2_id).await;
+        consumer_group.add_member(member1_id);
+        consumer_group.add_member(member2_id);
         let member1 = consumer_group.members.get(&member1_id).unwrap();
         let member2 = consumer_group.members.get(&member2_id).unwrap();
         if member1.partitions.len() == 1 {
diff --git a/core/server/src/streaming/topics/consumer_groups.rs 
b/core/server/src/streaming/topics/consumer_groups.rs
index 1f50a1bb..c1371096 100644
--- a/core/server/src/streaming/topics/consumer_groups.rs
+++ b/core/server/src/streaming/topics/consumer_groups.rs
@@ -16,36 +16,39 @@
  * under the License.
  */
 
-use crate::streaming::topics::COMPONENT;
+use crate::binary::handlers::topics::get_topic_handler;
+use crate::streaming::topics::{consumer_group, COMPONENT};
 use crate::streaming::topics::consumer_group::ConsumerGroup;
 use crate::streaming::topics::topic::Topic;
 use error_set::ErrContext;
+use iggy_common::locking::IggySharedMutFn;
 use iggy_common::IggyError;
 use iggy_common::{IdKind, Identifier};
+use std::cell::{Ref, RefMut};
 use std::sync::atomic::Ordering;
 use tracing::info;
 
 impl Topic {
     pub fn reassign_consumer_groups(&mut self) {
-        if self.consumer_groups.is_empty() {
+        if self.consumer_groups.borrow().is_empty() {
             return;
         }
 
-        let partitions_count = self.partitions.borrow().len() as u32;
+        let partitions_count = self.partitions.len() as u32;
         info!(
             "Reassigning consumer groups for topic with ID: {} for stream with 
ID with {}, partitions count: {}",
             self.topic_id, self.stream_id, partitions_count
         );
-        for (_, consumer_group) in self.consumer_groups.iter_mut() {
+        for (_, consumer_group) in 
self.consumer_groups.borrow_mut().iter_mut() {
             consumer_group.reassign_partitions(partitions_count);
         }
     }
 
-    pub fn get_consumer_groups(&self) -> Vec<&ConsumerGroup> {
-        self.consumer_groups.values().collect()
+    pub fn get_consumer_groups(&self) -> Vec<ConsumerGroup> {
+        self.consumer_groups.borrow().values().cloned().collect()
     }
 
-    pub fn get_consumer_group(&self, identifier: &Identifier) -> 
Result<&ConsumerGroup, IggyError> {
+    pub fn get_consumer_group(&self, identifier: &Identifier) -> 
Result<Ref<'_, ConsumerGroup>, IggyError> {
         match identifier.kind {
             IdKind::Numeric => 
self.get_consumer_group_by_id(identifier.get_u32_value().unwrap()),
             IdKind::String => 
self.get_consumer_group_by_name(&identifier.get_cow_str_value()?),
@@ -55,22 +58,42 @@ impl Topic {
     pub fn try_get_consumer_group(
         &self,
         identifier: &Identifier,
-    ) -> Result<Option<&ConsumerGroup>, IggyError> {
+    ) -> Result<Option<Ref<'_, ConsumerGroup>>, IggyError> {
         match identifier.kind {
-            IdKind::Numeric => 
Ok(self.consumer_groups.get(&identifier.get_u32_value()?)),
+            IdKind::Numeric => 
Ok(self.try_get_consumer_group_by_id(&identifier.get_u32_value()?)),
             IdKind::String => {
                 
Ok(self.try_get_consumer_group_by_name(&identifier.get_cow_str_value()?))
             }
         }
     }
+    fn try_get_consumer_group_by_id(&self, id: &u32) -> Option<Ref<'_, 
ConsumerGroup>> {
+        let consumer_groups = self.consumer_groups.borrow();
+        let exists = consumer_groups.contains_key(id);
+        if !exists {
+            return None;
+        }
+
+        Some(Ref::map(consumer_groups, |cg| {
+            let consumer_group = cg.get(id);
+            consumer_group.unwrap()
+        }))
+    }
+
+    fn try_get_consumer_group_by_name(&self, name: &str) -> Option<Ref<'_, 
ConsumerGroup>> {
+        let consumer_groups = self.consumer_groups.borrow();
+        let exists = self.consumer_groups_ids.contains_key(name);
+        let id = self.consumer_groups_ids.get(name).unwrap();
+        if !exists {
+            return None;
+        }
 
-    fn try_get_consumer_group_by_name(&self, name: &str) -> 
Option<&ConsumerGroup> {
-        self.consumer_groups_ids
-            .get(name)
-            .and_then(|id| self.consumer_groups.get(id))
+        Some(Ref::map(consumer_groups, |cg| {
+            let consumer_group = cg.get(id);
+            consumer_group.unwrap()
+        }))
     }
 
-    pub fn get_consumer_group_by_name(&self, name: &str) -> 
Result<&ConsumerGroup, IggyError> {
+    pub fn get_consumer_group_by_name(&self, name: &str) -> Result<Ref<'_, 
ConsumerGroup>, IggyError> {
         let group_id = self.consumer_groups_ids.get(name);
         if group_id.is_none() {
             return Err(IggyError::ConsumerGroupNameNotFound(
@@ -82,19 +105,24 @@ impl Topic {
         self.get_consumer_group_by_id(*group_id.unwrap())
     }
 
-    pub fn get_consumer_group_by_id(&self, id: u32) -> Result<&ConsumerGroup, 
IggyError> {
-        let consumer_group = self.consumer_groups.get(&id);
-        if consumer_group.is_none() {
+    pub fn get_consumer_group_by_id(&self, id: u32) -> Result<Ref<'_, 
ConsumerGroup>, IggyError> {
+        let consumer_groups = self.consumer_groups.borrow();
+        let exists = consumer_groups.contains_key(&id);
+        if !exists {
             return Err(IggyError::ConsumerGroupIdNotFound(id, self.topic_id));
         }
+        let consumer_group = Ref::map(consumer_groups, |cg| {
+            let consumer_group = cg.get(&id);
+            consumer_group.unwrap()
+        });
 
-        Ok(consumer_group.unwrap())
+        Ok(consumer_group)
     }
 
     pub fn get_consumer_group_mut(
-        &mut self,
+        &self,
         identifier: &Identifier,
-    ) -> Result<&mut ConsumerGroup, IggyError> {
+    ) -> Result<RefMut<'_, ConsumerGroup>, IggyError> {
         match identifier.kind {
             IdKind::Numeric => {
                 
self.get_consumer_group_by_id_mut(identifier.get_u32_value().unwrap())
@@ -104,21 +132,25 @@ impl Topic {
     }
 
     pub fn get_consumer_group_by_id_mut(
-        &mut self,
+        &self,
         id: u32,
-    ) -> Result<&mut ConsumerGroup, IggyError> {
-        let consumer_group = self.consumer_groups.get_mut(&id);
-        if consumer_group.is_none() {
+    ) -> Result<RefMut<'_, ConsumerGroup>, IggyError> {
+        let consumer_groups = self.consumer_groups.borrow_mut();
+        let exists = consumer_groups.contains_key(&id);
+        if !exists {
             return Err(IggyError::ConsumerGroupIdNotFound(id, self.topic_id));
         }
-
-        Ok(consumer_group.unwrap())
+        let consumer_group = RefMut::map(consumer_groups, |cg| {
+            let consumer_group = cg.get_mut(&id);
+            consumer_group.unwrap()
+        });
+        Ok(consumer_group)
     }
 
     pub fn get_consumer_group_by_name_mut(
-        &mut self,
+        &self,
         name: &str,
-    ) -> Result<&mut ConsumerGroup, IggyError> {
+    ) -> Result<RefMut<'_, ConsumerGroup>, IggyError> {
         let group_id = self.consumer_groups_ids.get(name).copied();
         if group_id.is_none() {
             return Err(IggyError::ConsumerGroupNameNotFound(
@@ -148,7 +180,7 @@ impl Topic {
                 .current_consumer_group_id
                 .fetch_add(1, Ordering::SeqCst);
             loop {
-                if self.consumer_groups.contains_key(&id) {
+                if self.consumer_groups.borrow().contains_key(&id) {
                     if id == u32::MAX {
                         return Err(IggyError::ConsumerGroupIdAlreadyExists(id, 
self.topic_id));
                     }
@@ -163,7 +195,7 @@ impl Topic {
             id = group_id.unwrap();
         }
 
-        if self.consumer_groups.contains_key(&id) {
+        if self.consumer_groups.borrow().contains_key(&id) {
             return Err(IggyError::ConsumerGroupIdAlreadyExists(id, 
self.topic_id));
         }
 
@@ -171,11 +203,11 @@ impl Topic {
             self.topic_id,
             id,
             name,
-            self.partitions.borrow().len() as u32,
+            self.partitions.len() as u32,
         );
         self.consumer_groups_ids.insert(name.to_owned(), id);
         let cloned_group = consumer_group.clone();
-        self.consumer_groups.insert(id, consumer_group);
+        self.consumer_groups.borrow_mut().insert(id, consumer_group);
         info!(
             "Created consumer group with ID: {} for topic with ID: {} and 
stream with ID: {}.",
             id, self.topic_id, self.stream_id
@@ -195,11 +227,14 @@ impl Topic {
             group_id = consumer_group.group_id;
         }
 
-        let consumer_group = self.consumer_groups.remove(&group_id);
+        let mut consumer_groups = self.consumer_groups.borrow_mut();
+        let consumer_group = consumer_groups.remove(&group_id);
         if consumer_group.is_none() {
             return Err(IggyError::ConsumerGroupIdNotFound(group_id, 
self.topic_id));
         }
         let consumer_group = consumer_group.unwrap();
+        let consumer_group = consumer_group.clone();
+        drop(consumer_groups);
         {
             self.consumer_groups_ids.remove(&consumer_group.name);
             let current_group_id = 
self.current_consumer_group_id.load(Ordering::SeqCst);
@@ -208,7 +243,8 @@ impl Topic {
                     .store(group_id, Ordering::SeqCst);
             }
 
-            for (_, partition) in self.partitions.borrow().iter() {
+            for (_, partition) in self.partitions.iter() {
+                let partition = partition.read().await;
                 if let Some((_, offset)) = 
partition.consumer_group_offsets.remove(&group_id) {
                     self.storage
                         .partition
@@ -231,13 +267,15 @@ impl Topic {
         group_id: &Identifier,
         member_id: u32,
     ) -> Result<(), IggyError> {
-        let consumer_group = 
self.get_consumer_group_mut(group_id).with_error_context(|error| {
+        let topic_id = self.topic_id;
+        let stream_id = self.stream_id;
+        let mut consumer_group = 
self.get_consumer_group_mut(group_id).with_error_context(|error| {
             format!("{COMPONENT} (error: {error}) - failed to get consumer 
group with id: {group_id}")
         })?;
         consumer_group.add_member(member_id);
         info!(
             "Member with ID: {} has joined consumer group with ID: {} for 
topic with ID: {} and stream with ID: {}.",
-            member_id, group_id, self.topic_id, self.stream_id
+            member_id, group_id, topic_id, stream_id
         );
         Ok(())
     }
@@ -247,13 +285,15 @@ impl Topic {
         group_id: &Identifier,
         member_id: u32,
     ) -> Result<(), IggyError> {
-        let consumer_group = 
self.get_consumer_group_mut(group_id).with_error_context(|error| {
+        let topic_id = self.topic_id;
+        let stream_id = self.stream_id;
+        let mut consumer_group = 
self.get_consumer_group_mut(group_id).with_error_context(|error| {
             format!("{COMPONENT} (error: {error}) - failed to get consumer 
group with id: {group_id}")
         })?;
         consumer_group.delete_member(member_id);
         info!(
             "Member with ID: {} has left consumer group with ID: {} for topic 
with ID: {} and stream with ID: {}.",
-            member_id, group_id, self.topic_id, self.stream_id
+            member_id, group_id, topic_id, stream_id
         );
         Ok(())
     }
@@ -286,7 +326,7 @@ mod tests {
             assert_eq!(created_consumer_group.topic_id, topic_id);
         }
 
-        assert_eq!(topic.consumer_groups.len(), 1);
+        assert_eq!(topic.consumer_groups.borrow().len(), 1);
         let consumer_group = topic
             .get_consumer_group(&Identifier::numeric(group_id).unwrap())
             .unwrap();
@@ -295,7 +335,7 @@ mod tests {
         assert_eq!(consumer_group.topic_id, topic_id);
         assert_eq!(
             consumer_group.partitions_count,
-            topic.partitions.borrow().len() as u32
+            topic.partitions.len() as u32
         );
     }
 
@@ -306,12 +346,12 @@ mod tests {
         let mut topic = get_topic().await;
         let result = topic.create_consumer_group(Some(group_id), name);
         assert!(result.is_ok());
-        assert_eq!(topic.consumer_groups.len(), 1);
+        assert_eq!(topic.consumer_groups.borrow().len(), 1);
         let result = topic.create_consumer_group(Some(group_id), "test2");
         assert!(result.is_err());
         let err = result.unwrap_err();
         assert!(matches!(err, IggyError::ConsumerGroupIdAlreadyExists(_, _)));
-        assert_eq!(topic.consumer_groups.len(), 1);
+        assert_eq!(topic.consumer_groups.borrow().len(), 1);
     }
 
     #[tokio::test]
@@ -321,7 +361,7 @@ mod tests {
         let mut topic = get_topic().await;
         let result = topic.create_consumer_group(Some(group_id), name);
         assert!(result.is_ok());
-        assert_eq!(topic.consumer_groups.len(), 1);
+        assert_eq!(topic.consumer_groups.borrow().len(), 1);
         let group_id = group_id + 1;
         let result = topic.create_consumer_group(Some(group_id), name);
         assert!(result.is_err());
@@ -330,7 +370,7 @@ mod tests {
             err,
             IggyError::ConsumerGroupNameAlreadyExists(_, _)
         ));
-        assert_eq!(topic.consumer_groups.len(), 1);
+        assert_eq!(topic.consumer_groups.borrow().len(), 1);
     }
 
     #[tokio::test]
@@ -340,12 +380,12 @@ mod tests {
         let mut topic = get_topic().await;
         let result = topic.create_consumer_group(Some(group_id), name);
         assert!(result.is_ok());
-        assert_eq!(topic.consumer_groups.len(), 1);
+        assert_eq!(topic.consumer_groups.borrow().len(), 1);
         let result = topic
             .delete_consumer_group(&Identifier::numeric(group_id).unwrap())
             .await;
         assert!(result.is_ok());
-        assert!(topic.consumer_groups.is_empty());
+        assert!(topic.consumer_groups.borrow().is_empty());
     }
 
     #[tokio::test]
@@ -355,13 +395,13 @@ mod tests {
         let mut topic = get_topic().await;
         let result = topic.create_consumer_group(Some(group_id), name);
         assert!(result.is_ok());
-        assert_eq!(topic.consumer_groups.len(), 1);
+        assert_eq!(topic.consumer_groups.borrow().len(), 1);
         let group_id = group_id + 1;
         let result = topic
             .delete_consumer_group(&Identifier::numeric(group_id).unwrap())
             .await;
         assert!(result.is_err());
-        assert_eq!(topic.consumer_groups.len(), 1);
+        assert_eq!(topic.consumer_groups.borrow().len(), 1);
     }
 
     #[tokio::test]
diff --git a/core/server/src/streaming/topics/consumer_offsets.rs 
b/core/server/src/streaming/topics/consumer_offsets.rs
index f59e9276..8748576b 100644
--- a/core/server/src/streaming/topics/consumer_offsets.rs
+++ b/core/server/src/streaming/topics/consumer_offsets.rs
@@ -34,7 +34,6 @@ impl Topic {
     ) -> Result<(), IggyError> {
         let Some((polling_consumer, partition_id)) = self
             .resolve_consumer_with_partition_id(&consumer, client_id, 
partition_id, false)
-            .await
             .with_error_context(|error| format!("{COMPONENT} (error: {error}) 
- failed to resolve consumer with partition id, consumer ID: {}, client ID: {}, 
partition ID: {:?}", consumer.id, client_id, partition_id))? else {
             return Err(IggyError::ConsumerOffsetNotFound(client_id));
         };
@@ -78,8 +77,7 @@ impl Topic {
     ) -> Result<Option<ConsumerOffsetInfo>, IggyError> {
         let Some((polling_consumer, partition_id)) = self
             .resolve_consumer_with_partition_id(consumer, client_id, 
partition_id, false)
-            .await
-            .with_error_context(|error| format!("{COMPONENT} (error: {error}) 
- failed to resolve consumer offset for consumer: {consumer}, client ID: 
{client_id}, partition ID: {partition_id:#?}"))? else {
+            .with_error_context(|error| format!("{COMPONENT} (error: {error}) 
- failed to resolve consumer offset for consumer: {consumer}, client ID: 
{client_id}, partition ID: {:#?}", partition_id))? else {
             return Ok(None);
         };
 
@@ -93,7 +91,6 @@ impl Topic {
         let partition = partition.read().await;
         let offset = partition
             .get_consumer_offset(polling_consumer)
-            .await
             .with_error_context(|error| {
                 format!(
                     "{COMPONENT} (error: {error}) - failed to get consumer 
offset for consumer: {polling_consumer}"
@@ -118,7 +115,6 @@ impl Topic {
     ) -> Result<(), IggyError> {
         let Some((polling_consumer, partition_id)) = self
             .resolve_consumer_with_partition_id(&consumer, client_id, 
partition_id, false)
-            .await
             .with_error_context(|error| format!("{COMPONENT} (error: {error}) 
- failed to resolve consumer with partition id, consumer ID: {}, client ID: {}, 
partition ID: {:?}", consumer.id, client_id, partition_id))? else {
             return Err(IggyError::ConsumerOffsetNotFound(client_id));
         };
diff --git a/core/server/src/streaming/topics/messages.rs 
b/core/server/src/streaming/topics/messages.rs
index 56d7c1c7..e8c52385 100644
--- a/core/server/src/streaming/topics/messages.rs
+++ b/core/server/src/streaming/topics/messages.rs
@@ -209,6 +209,7 @@ mod tests {
     use bytes::Bytes;
     use iggy_common::CompressionAlgorithm;
     use iggy_common::{IggyMessage, MaxTopicSize};
+    use std::rc::Rc;
     use std::sync::Arc;
     use std::sync::atomic::AtomicU32;
     use std::sync::atomic::AtomicU64;
@@ -321,11 +322,11 @@ mod tests {
 
     async fn init_topic(partitions_count: u32) -> Topic {
         let tempdir = tempfile::TempDir::new().unwrap();
-        let config = Arc::new(SystemConfig {
+        let config = Rc::new(SystemConfig {
             path: tempdir.path().to_str().unwrap().to_string(),
             ..Default::default()
         });
-        let storage = Arc::new(SystemStorage::new(
+        let storage = Rc::new(SystemStorage::new(
             config.clone(),
             Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
         ));
diff --git a/core/server/src/streaming/topics/partitions.rs 
b/core/server/src/streaming/topics/partitions.rs
index eb6b4280..dfedf0a9 100644
--- a/core/server/src/streaming/topics/partitions.rs
+++ b/core/server/src/streaming/topics/partitions.rs
@@ -20,7 +20,7 @@ use crate::streaming::partitions::partition::Partition;
 use crate::streaming::topics::COMPONENT;
 use crate::streaming::topics::topic::Topic;
 use error_set::ErrContext;
-use iggy_common::locking::{IggySharedMut, IggySharedMutFn};
+use iggy_common::locking::{IggyRwLock, IggySharedMutFn};
 use iggy_common::{IggyError, IggyTimestamp};
 
 const MAX_PARTITIONS_COUNT: u32 = 100_000;
@@ -63,7 +63,7 @@ impl Topic {
             )
             .await;
             self.partitions
-                .insert(partition_id, IggySharedMut::new(partition));
+                .insert(partition_id, IggyRwLock::new(partition));
             partition_ids.push(partition_id)
         }
 
diff --git a/core/server/src/streaming/topics/storage.rs 
b/core/server/src/streaming/topics/storage.rs
index 1cc33310..ae9435e5 100644
--- a/core/server/src/streaming/topics/storage.rs
+++ b/core/server/src/streaming/topics/storage.rs
@@ -27,7 +27,7 @@ use anyhow::Context;
 use error_set::ErrContext;
 use futures::future::join_all;
 use iggy_common::IggyError;
-use iggy_common::locking::IggySharedMut;
+use iggy_common::locking::IggyRwLock;
 use iggy_common::locking::IggySharedMutFn;
 use serde::{Deserialize, Serialize};
 use std::path::Path;
@@ -189,7 +189,7 @@ impl TopicStorage for FileTopicStorage {
         for mut partition in unloaded_partitions {
             let loaded_partitions = loaded_partitions.clone();
             let partition_state = 
state.partitions.remove(&partition.partition_id).unwrap();
-            let load_partition = tokio::spawn(async move {
+            let load_partition = monoio::spawn(async move {
                 match partition.load(partition_state).await {
                     Ok(_) => {
                         loaded_partitions.lock().await.push(partition);
@@ -209,7 +209,7 @@ impl TopicStorage for FileTopicStorage {
         for partition in loaded_partitions.lock().await.drain(..) {
             topic
                 .partitions
-                .insert(partition.partition_id, IggySharedMut::new(partition));
+                .insert(partition.partition_id, IggyRwLock::new(partition));
         }
 
         for consumer_group in state.consumer_groups.into_values() {
@@ -224,7 +224,8 @@ impl TopicStorage for FileTopicStorage {
                 .insert(consumer_group.name.to_owned(), 
consumer_group.group_id);
             topic
                 .consumer_groups
-                .insert(consumer_group.group_id, RwLock::new(consumer_group));
+                .borrow_mut()
+                .insert(consumer_group.group_id, consumer_group);
         }
 
         info!("Loaded topic {topic}");
diff --git a/core/server/src/streaming/topics/topic.rs 
b/core/server/src/streaming/topics/topic.rs
index ad3e25ef..16aa324c 100644
--- a/core/server/src/streaming/topics/topic.rs
+++ b/core/server/src/streaming/topics/topic.rs
@@ -25,7 +25,7 @@ use ahash::AHashMap;
 use core::fmt;
 use std::cell::RefCell;
 use std::rc::Rc;
-use iggy_common::locking::IggySharedMut;
+use iggy_common::locking::IggyRwLock;
 use iggy_common::{
     CompressionAlgorithm, Consumer, ConsumerKind, IggyByteSize, IggyError, 
IggyExpiry,
     IggyTimestamp, MaxTopicSize, Sizeable,
@@ -33,7 +33,6 @@ use iggy_common::{
 
 use std::sync::Arc;
 use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
-use tokio::sync::RwLock;
 use tracing::info;
 
 const ALMOST_FULL_THRESHOLD: f64 = 0.9;
@@ -51,9 +50,9 @@ pub struct Topic {
     pub(crate) messages_count: Arc<AtomicU64>,
     pub(crate) segments_count_of_parent_stream: Arc<AtomicU32>,
     pub(crate) config: Rc<SystemConfig>,
-    pub(crate) partitions: RefCell<AHashMap<u32, Partition>>,
+    pub(crate) partitions: AHashMap<u32, IggyRwLock<Partition>>,
     pub(crate) storage: Rc<SystemStorage>,
-    pub(crate) consumer_groups: AHashMap<u32, ConsumerGroup>,
+    pub(crate) consumer_groups: RefCell<AHashMap<u32, ConsumerGroup>>,
     pub(crate) consumer_groups_ids: AHashMap<String, u32>,
     pub(crate) current_consumer_group_id: AtomicU32,
     pub(crate) current_partition_id: AtomicU32,
@@ -126,7 +125,7 @@ impl Topic {
             messages_count_of_parent_stream,
             messages_count: Arc::new(AtomicU64::new(0)),
             segments_count_of_parent_stream,
-            consumer_groups: AHashMap::new(),
+            consumer_groups: RefCell::new(AHashMap::new()),
             consumer_groups_ids: AHashMap::new(),
             current_consumer_group_id: AtomicU32::new(1),
             current_partition_id: AtomicU32::new(1),
@@ -172,11 +171,11 @@ impl Topic {
         matches!(self.max_topic_size, MaxTopicSize::Unlimited)
     }
 
-    pub fn get_partitions(&self) -> Vec<IggySharedMut<Partition>> {
+    pub fn get_partitions(&self) -> Vec<IggyRwLock<Partition>> {
         self.partitions.values().cloned().collect()
     }
 
-    pub fn get_partition(&self, partition_id: u32) -> 
Result<IggySharedMut<Partition>, IggyError> {
+    pub fn get_partition(&self, partition_id: u32) -> 
Result<IggyRwLock<Partition>, IggyError> {
         match self.partitions.get(&partition_id) {
             Some(partition_arc) => Ok(partition_arc.clone()),
             None => Err(IggyError::PartitionNotFound(
@@ -187,7 +186,7 @@ impl Topic {
         }
     }
 
-    pub async fn resolve_consumer_with_partition_id(
+    pub fn resolve_consumer_with_partition_id(
         &self,
         consumer: &Consumer,
         client_id: u32,
@@ -203,7 +202,7 @@ impl Topic {
                 )))
             }
             ConsumerKind::ConsumerGroup => {
-                let consumer_group = 
self.get_consumer_group(&consumer.id)?.read().await;
+                let mut consumer_group = 
self.get_consumer_group_mut(&consumer.id)?;
                 if let Some(partition_id) = partition_id {
                     return Ok(Some((
                         
PollingConsumer::consumer_group(consumer_group.group_id, client_id),
@@ -212,9 +211,9 @@ impl Topic {
                 }
 
                 let partition_id = if calculate_partition_id {
-                    consumer_group.calculate_partition_id(client_id).await?
+                    consumer_group.calculate_partition_id(client_id)?
                 } else {
-                    consumer_group.get_current_partition_id(client_id).await?
+                    consumer_group.get_current_partition_id(client_id)?
                 };
                 let Some(partition_id) = partition_id else {
                     return Ok(None);
@@ -291,11 +290,11 @@ mod tests {
     #[tokio::test]
     async fn should_be_created_given_valid_parameters() {
         let tempdir = tempfile::TempDir::new().unwrap();
-        let config = Arc::new(SystemConfig {
+        let config = Rc::new(SystemConfig {
             path: tempdir.path().to_str().unwrap().to_string(),
             ..Default::default()
         });
-        let storage = Arc::new(SystemStorage::new(
+        let storage = Rc::new(SystemStorage::new(
             config.clone(),
             Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
         ));

Reply via email to