This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch rebase_master
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 110f8f417fb0649954314920df875fca4e4e3e78
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Tue Jun 24 18:55:03 2025 +0200

    fix issues with RefCell in 
core/server/src/streaming/topics/consumer_groups* files
---
 core/server/src/streaming/topics/consumer_group.rs |  25 ++--
 .../server/src/streaming/topics/consumer_groups.rs | 152 ++++++++++++---------
 2 files changed, 94 insertions(+), 83 deletions(-)

diff --git a/core/server/src/streaming/topics/consumer_group.rs 
b/core/server/src/streaming/topics/consumer_group.rs
index 43d9611b..cc28ef6f 100644
--- a/core/server/src/streaming/topics/consumer_group.rs
+++ b/core/server/src/streaming/topics/consumer_group.rs
@@ -18,7 +18,6 @@
 
 use ahash::AHashMap;
 use iggy_common::IggyError;
-use tokio::sync::RwLock;
 use tracing::trace;
 
 #[derive(Debug, Clone)]
@@ -58,10 +57,13 @@ impl ConsumerGroup {
         self.assign_partitions().await;
     }
 
-    pub async fn calculate_partition_id(&self, member_id: u32) -> 
Result<Option<u32>, IggyError> {
-        let member = self.members.get(&member_id);
+    pub async fn calculate_partition_id(
+        &mut self,
+        member_id: u32,
+    ) -> Result<Option<u32>, IggyError> {
+        let member = self.members.get_mut(&member_id);
         if let Some(member) = member {
-            return Ok(member.await.calculate_partition_id());
+            return Ok(member.calculate_partition_id());
         }
         Err(IggyError::ConsumerGroupMemberNotFound(
             member_id,
@@ -73,7 +75,7 @@ impl ConsumerGroup {
     pub async fn get_current_partition_id(&self, member_id: u32) -> 
Result<Option<u32>, IggyError> {
         let member = self.members.get(&member_id);
         if let Some(member) = member {
-            return Ok(member.read().await.current_partition_id);
+            return Ok(member.current_partition_id);
         }
         Err(IggyError::ConsumerGroupMemberNotFound(
             member_id,
@@ -85,12 +87,12 @@ impl ConsumerGroup {
     pub async fn add_member(&mut self, member_id: u32) {
         self.members.insert(
             member_id,
-            RwLock::new(ConsumerGroupMember {
+            ConsumerGroupMember {
                 id: member_id,
                 partitions: AHashMap::new(),
                 current_partition_index: None,
                 current_partition_id: None,
-            }),
+            },
         );
         trace!(
             "Added member with ID: {} to consumer group: {} for topic with ID: 
{}",
@@ -117,7 +119,6 @@ impl ConsumerGroup {
 
         let members_count = members.len() as u32;
         for member in members.iter_mut() {
-            let mut member = member.write().await;
             member.current_partition_index = None;
             member.current_partition_id = None;
             member.partitions.clear();
@@ -126,8 +127,7 @@ impl ConsumerGroup {
         for partition_index in 0..self.partitions_count {
             let partition_id = partition_index + 1;
             let member_index = partition_index % members_count;
-            let member = members.get(member_index as usize).unwrap();
-            let mut member = member.write().await;
+            let member = members.get_mut(member_index as usize).unwrap();
             let member_partition_index = member.partitions.len() as u32;
             member
                 .partitions
@@ -213,7 +213,6 @@ mod tests {
 
         consumer_group.add_member(member_id).await;
         let member = consumer_group.members.get(&member_id).unwrap();
-        let member = member.read().await;
         assert_eq!(
             member.partitions.len() as u32,
             consumer_group.partitions_count
@@ -240,8 +239,6 @@ mod tests {
         consumer_group.add_member(member2_id).await;
         let member1 = consumer_group.members.get(&member1_id).unwrap();
         let member2 = consumer_group.members.get(&member2_id).unwrap();
-        let member1 = member1.read().await;
-        let member2 = member2.read().await;
         assert_eq!(
             member1.partitions.len() + member2.partitions.len(),
             consumer_group.partitions_count as usize
@@ -277,8 +274,6 @@ mod tests {
         consumer_group.add_member(member2_id).await;
         let member1 = consumer_group.members.get(&member1_id).unwrap();
         let member2 = consumer_group.members.get(&member2_id).unwrap();
-        let member1 = member1.read().await;
-        let member2 = member2.read().await;
         if member1.partitions.len() == 1 {
             assert_eq!(member2.partitions.len(), 0);
         } else {
diff --git a/core/server/src/streaming/topics/consumer_groups.rs 
b/core/server/src/streaming/topics/consumer_groups.rs
index 26505355..1f50a1bb 100644
--- a/core/server/src/streaming/topics/consumer_groups.rs
+++ b/core/server/src/streaming/topics/consumer_groups.rs
@@ -21,37 +21,31 @@ use crate::streaming::topics::consumer_group::ConsumerGroup;
 use crate::streaming::topics::topic::Topic;
 use error_set::ErrContext;
 use iggy_common::IggyError;
-use iggy_common::locking::IggySharedMutFn;
 use iggy_common::{IdKind, Identifier};
 use std::sync::atomic::Ordering;
-use tokio::sync::RwLock;
 use tracing::info;
 
 impl Topic {
-    pub async fn reassign_consumer_groups(&mut self) {
+    pub fn reassign_consumer_groups(&mut self) {
         if self.consumer_groups.is_empty() {
             return;
         }
 
-        let partitions_count = self.partitions.len() as u32;
+        let partitions_count = self.partitions.borrow().len() as u32;
         info!(
             "Reassigning consumer groups for topic with ID: {} for stream with 
ID with {}, partitions count: {}",
             self.topic_id, self.stream_id, partitions_count
         );
         for (_, consumer_group) in self.consumer_groups.iter_mut() {
-            let mut consumer_group = consumer_group.write().await;
-            consumer_group.reassign_partitions(partitions_count).await;
+            consumer_group.reassign_partitions(partitions_count);
         }
     }
 
-    pub fn get_consumer_groups(&self) -> Vec<&RwLock<ConsumerGroup>> {
+    pub fn get_consumer_groups(&self) -> Vec<&ConsumerGroup> {
         self.consumer_groups.values().collect()
     }
 
-    pub fn get_consumer_group(
-        &self,
-        identifier: &Identifier,
-    ) -> Result<&RwLock<ConsumerGroup>, IggyError> {
+    pub fn get_consumer_group(&self, identifier: &Identifier) -> 
Result<&ConsumerGroup, IggyError> {
         match identifier.kind {
             IdKind::Numeric => 
self.get_consumer_group_by_id(identifier.get_u32_value().unwrap()),
             IdKind::String => 
self.get_consumer_group_by_name(&identifier.get_cow_str_value()?),
@@ -61,7 +55,7 @@ impl Topic {
     pub fn try_get_consumer_group(
         &self,
         identifier: &Identifier,
-    ) -> Result<Option<&RwLock<ConsumerGroup>>, IggyError> {
+    ) -> Result<Option<&ConsumerGroup>, IggyError> {
         match identifier.kind {
             IdKind::Numeric => 
Ok(self.consumer_groups.get(&identifier.get_u32_value()?)),
             IdKind::String => {
@@ -70,16 +64,13 @@ impl Topic {
         }
     }
 
-    fn try_get_consumer_group_by_name(&self, name: &str) -> 
Option<&RwLock<ConsumerGroup>> {
+    fn try_get_consumer_group_by_name(&self, name: &str) -> 
Option<&ConsumerGroup> {
         self.consumer_groups_ids
             .get(name)
             .and_then(|id| self.consumer_groups.get(id))
     }
 
-    pub fn get_consumer_group_by_name(
-        &self,
-        name: &str,
-    ) -> Result<&RwLock<ConsumerGroup>, IggyError> {
+    pub fn get_consumer_group_by_name(&self, name: &str) -> 
Result<&ConsumerGroup, IggyError> {
         let group_id = self.consumer_groups_ids.get(name);
         if group_id.is_none() {
             return Err(IggyError::ConsumerGroupNameNotFound(
@@ -91,7 +82,7 @@ impl Topic {
         self.get_consumer_group_by_id(*group_id.unwrap())
     }
 
-    pub fn get_consumer_group_by_id(&self, id: u32) -> Result<ConsumerGroup, 
IggyError> {
+    pub fn get_consumer_group_by_id(&self, id: u32) -> Result<&ConsumerGroup, 
IggyError> {
         let consumer_group = self.consumer_groups.get(&id);
         if consumer_group.is_none() {
             return Err(IggyError::ConsumerGroupIdNotFound(id, self.topic_id));
@@ -100,7 +91,46 @@ impl Topic {
         Ok(consumer_group.unwrap())
     }
 
-    pub async fn create_consumer_group(
+    pub fn get_consumer_group_mut(
+        &mut self,
+        identifier: &Identifier,
+    ) -> Result<&mut ConsumerGroup, IggyError> {
+        match identifier.kind {
+            IdKind::Numeric => {
+                
self.get_consumer_group_by_id_mut(identifier.get_u32_value().unwrap())
+            }
+            IdKind::String => 
self.get_consumer_group_by_name_mut(&identifier.get_cow_str_value()?),
+        }
+    }
+
+    pub fn get_consumer_group_by_id_mut(
+        &mut self,
+        id: u32,
+    ) -> Result<&mut ConsumerGroup, IggyError> {
+        let consumer_group = self.consumer_groups.get_mut(&id);
+        if consumer_group.is_none() {
+            return Err(IggyError::ConsumerGroupIdNotFound(id, self.topic_id));
+        }
+
+        Ok(consumer_group.unwrap())
+    }
+
+    pub fn get_consumer_group_by_name_mut(
+        &mut self,
+        name: &str,
+    ) -> Result<&mut ConsumerGroup, IggyError> {
+        let group_id = self.consumer_groups_ids.get(name).copied();
+        if group_id.is_none() {
+            return Err(IggyError::ConsumerGroupNameNotFound(
+                name.to_string(),
+                self.name.to_owned(),
+            ));
+        }
+
+        self.get_consumer_group_by_id_mut(group_id.unwrap())
+    }
+
+    pub fn create_consumer_group(
         &mut self,
         group_id: Option<u32>,
         name: &str,
@@ -137,15 +167,20 @@ impl Topic {
             return Err(IggyError::ConsumerGroupIdAlreadyExists(id, 
self.topic_id));
         }
 
-        let consumer_group =
-            ConsumerGroup::new(self.topic_id, id, name, 
self.partitions.borrow().len() as u32);
-        self.consumer_groups.insert(id, consumer_group.clone());
+        let consumer_group = ConsumerGroup::new(
+            self.topic_id,
+            id,
+            name,
+            self.partitions.borrow().len() as u32,
+        );
         self.consumer_groups_ids.insert(name.to_owned(), id);
+        let cloned_group = consumer_group.clone();
+        self.consumer_groups.insert(id, consumer_group);
         info!(
             "Created consumer group with ID: {} for topic with ID: {} and 
stream with ID: {}.",
             id, self.topic_id, self.stream_id
         );
-        consumer_group
+        Ok(cloned_group)
     }
 
     pub async fn delete_consumer_group(
@@ -157,7 +192,6 @@ impl Topic {
             let consumer_group = 
self.get_consumer_group(id).with_error_context(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to get consumer 
group with id: {id}")
             })?;
-            let consumer_group = consumer_group.read().await;
             group_id = consumer_group.group_id;
         }
 
@@ -167,7 +201,6 @@ impl Topic {
         }
         let consumer_group = consumer_group.unwrap();
         {
-            let group_id = consumer_group.group_id;
             self.consumer_groups_ids.remove(&consumer_group.name);
             let current_group_id = 
self.current_consumer_group_id.load(Ordering::SeqCst);
             if current_group_id > group_id {
@@ -193,16 +226,15 @@ impl Topic {
         Ok(consumer_group)
     }
 
-    pub async fn join_consumer_group(
-        &self,
+    pub fn join_consumer_group(
+        &mut self,
         group_id: &Identifier,
         member_id: u32,
     ) -> Result<(), IggyError> {
-        let consumer_group = 
self.get_consumer_group(group_id).with_error_context(|error| {
+        let consumer_group = 
self.get_consumer_group_mut(group_id).with_error_context(|error| {
             format!("{COMPONENT} (error: {error}) - failed to get consumer 
group with id: {group_id}")
         })?;
-        let mut consumer_group = consumer_group.write().await;
-        consumer_group.add_member(member_id).await;
+        consumer_group.add_member(member_id);
         info!(
             "Member with ID: {} has joined consumer group with ID: {} for 
topic with ID: {} and stream with ID: {}.",
             member_id, group_id, self.topic_id, self.stream_id
@@ -210,16 +242,15 @@ impl Topic {
         Ok(())
     }
 
-    pub async fn leave_consumer_group(
-        &self,
+    pub fn leave_consumer_group(
+        &mut self,
         group_id: &Identifier,
         member_id: u32,
     ) -> Result<(), IggyError> {
-        let consumer_group = 
self.get_consumer_group(group_id).with_error_context(|error| {
+        let consumer_group = 
self.get_consumer_group_mut(group_id).with_error_context(|error| {
             format!("{COMPONENT} (error: {error}) - failed to get consumer 
group with id: {group_id}")
         })?;
-        let mut consumer_group = consumer_group.write().await;
-        consumer_group.delete_member(member_id).await;
+        consumer_group.delete_member(member_id);
         info!(
             "Member with ID: {} has left consumer group with ID: {} for topic 
with ID: {} and stream with ID: {}.",
             member_id, group_id, self.topic_id, self.stream_id
@@ -236,6 +267,7 @@ mod tests {
     use crate::streaming::storage::SystemStorage;
     use crate::streaming::utils::MemoryPool;
     use iggy_common::{CompressionAlgorithm, IggyExpiry, MaxTopicSize};
+    use std::rc::Rc;
     use std::sync::Arc;
     use std::sync::atomic::{AtomicU32, AtomicU64};
 
@@ -245,10 +277,10 @@ mod tests {
         let name = "test";
         let mut topic = get_topic().await;
         let topic_id = topic.topic_id;
-        let result = topic.create_consumer_group(Some(group_id), name).await;
+        let result = topic.create_consumer_group(Some(group_id), name);
         assert!(result.is_ok());
         {
-            let created_consumer_group = result.unwrap().read().await;
+            let created_consumer_group = result.unwrap();
             assert_eq!(created_consumer_group.group_id, group_id);
             assert_eq!(created_consumer_group.name, name);
             assert_eq!(created_consumer_group.topic_id, topic_id);
@@ -258,13 +290,12 @@ mod tests {
         let consumer_group = topic
             .get_consumer_group(&Identifier::numeric(group_id).unwrap())
             .unwrap();
-        let consumer_group = consumer_group.read().await;
         assert_eq!(consumer_group.group_id, group_id);
         assert_eq!(consumer_group.name, name);
         assert_eq!(consumer_group.topic_id, topic_id);
         assert_eq!(
             consumer_group.partitions_count,
-            topic.partitions.len() as u32
+            topic.partitions.borrow().len() as u32
         );
     }
 
@@ -273,10 +304,10 @@ mod tests {
         let group_id = 1;
         let name = "test";
         let mut topic = get_topic().await;
-        let result = topic.create_consumer_group(Some(group_id), name).await;
+        let result = topic.create_consumer_group(Some(group_id), name);
         assert!(result.is_ok());
         assert_eq!(topic.consumer_groups.len(), 1);
-        let result = topic.create_consumer_group(Some(group_id), 
"test2").await;
+        let result = topic.create_consumer_group(Some(group_id), "test2");
         assert!(result.is_err());
         let err = result.unwrap_err();
         assert!(matches!(err, IggyError::ConsumerGroupIdAlreadyExists(_, _)));
@@ -288,11 +319,11 @@ mod tests {
         let group_id = 1;
         let name = "test";
         let mut topic = get_topic().await;
-        let result = topic.create_consumer_group(Some(group_id), name).await;
+        let result = topic.create_consumer_group(Some(group_id), name);
         assert!(result.is_ok());
         assert_eq!(topic.consumer_groups.len(), 1);
         let group_id = group_id + 1;
-        let result = topic.create_consumer_group(Some(group_id), name).await;
+        let result = topic.create_consumer_group(Some(group_id), name);
         assert!(result.is_err());
         let err = result.unwrap_err();
         assert!(matches!(
@@ -307,7 +338,7 @@ mod tests {
         let group_id = 1;
         let name = "test";
         let mut topic = get_topic().await;
-        let result = topic.create_consumer_group(Some(group_id), name).await;
+        let result = topic.create_consumer_group(Some(group_id), name);
         assert!(result.is_ok());
         assert_eq!(topic.consumer_groups.len(), 1);
         let result = topic
@@ -322,7 +353,7 @@ mod tests {
         let group_id = 1;
         let name = "test";
         let mut topic = get_topic().await;
-        let result = topic.create_consumer_group(Some(group_id), name).await;
+        let result = topic.create_consumer_group(Some(group_id), name);
         assert!(result.is_ok());
         assert_eq!(topic.consumer_groups.len(), 1);
         let group_id = group_id + 1;
@@ -339,19 +370,12 @@ mod tests {
         let name = "test";
         let member_id = 1;
         let mut topic = get_topic().await;
-        topic
-            .create_consumer_group(Some(group_id), name)
-            .await
-            .unwrap();
-        let result = topic
-            .join_consumer_group(&Identifier::numeric(group_id).unwrap(), 
member_id)
-            .await;
+        topic.create_consumer_group(Some(group_id), name).unwrap();
+        let result = 
topic.join_consumer_group(&Identifier::numeric(group_id).unwrap(), member_id);
         assert!(result.is_ok());
         let consumer_group = topic
             .get_consumer_group(&Identifier::numeric(group_id).unwrap())
-            .unwrap()
-            .read()
-            .await;
+            .unwrap();
         let members = consumer_group.get_members();
         assert_eq!(members.len(), 1);
     }
@@ -362,34 +386,26 @@ mod tests {
         let name = "test";
         let member_id = 1;
         let mut topic = get_topic().await;
-        topic
-            .create_consumer_group(Some(group_id), name)
-            .await
-            .unwrap();
+        topic.create_consumer_group(Some(group_id), name).unwrap();
         topic
             .join_consumer_group(&Identifier::numeric(group_id).unwrap(), 
member_id)
-            .await
             .unwrap();
-        let result = topic
-            .leave_consumer_group(&Identifier::numeric(group_id).unwrap(), 
member_id)
-            .await;
+        let result = 
topic.leave_consumer_group(&Identifier::numeric(group_id).unwrap(), member_id);
         assert!(result.is_ok());
         let consumer_group = topic
             .get_consumer_group(&Identifier::numeric(group_id).unwrap())
-            .unwrap()
-            .read()
-            .await;
+            .unwrap();
         let members = consumer_group.get_members();
         assert!(members.is_empty())
     }
 
     async fn get_topic() -> Topic {
         let tempdir = tempfile::TempDir::new().unwrap();
-        let config = Arc::new(SystemConfig {
+        let config = Rc::new(SystemConfig {
             path: tempdir.path().to_str().unwrap().to_string(),
             ..Default::default()
         });
-        let storage = Arc::new(SystemStorage::new(
+        let storage = Rc::new(SystemStorage::new(
             config.clone(),
             Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
         ));

Reply via email to