This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch fix_consumer_group_leave_and_append_messages
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 3dfe2ba2d53959dfc0a80baa585faeade5b77a7a
Author: numinex <[email protected]>
AuthorDate: Mon Dec 29 14:51:36 2025 +0100

    fix(server): fix consumer group leave race and append_messages
---
 core/server/src/shard/system/clients.rs            |  2 +-
 core/server/src/slab/streams.rs                    |  2 ++
 .../streaming/segments/messages/messages_writer.rs |  2 ++
 core/server/src/streaming/topics/helpers.rs        | 42 +++++++++++-----------
 4 files changed, 25 insertions(+), 23 deletions(-)

diff --git a/core/server/src/shard/system/clients.rs 
b/core/server/src/shard/system/clients.rs
index 3c0b3b444..16420acb2 100644
--- a/core/server/src/shard/system/clients.rs
+++ b/core/server/src/shard/system/clients.rs
@@ -37,7 +37,7 @@ impl IggyShard {
         let consumer_groups: Vec<(u32, u32, u32)>;
 
         {
-            let client = self.client_manager.delete_client(client_id);
+            let client = self.client_manager.try_get_client(client_id);
             if client.is_none() {
                 error!("Client with ID: {client_id} was not found in the 
client manager.",);
                 return;
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index ad7d5368d..1517c6195 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -1307,6 +1307,7 @@ impl Streams {
                         .clone(),
                 )
             });
+        let guard = messages_writer.lock.lock().await;
 
         let saved = messages_writer
             .as_ref()
@@ -1350,6 +1351,7 @@ impl Streams {
             
streaming_partitions::helpers::update_index_and_increment_stats(saved, config),
         );
 
+        drop(guard);
         Ok(batch_count)
     }
 
diff --git a/core/server/src/streaming/segments/messages/messages_writer.rs 
b/core/server/src/streaming/segments/messages/messages_writer.rs
index b0e48badc..ddd0c5197 100644
--- a/core/server/src/streaming/segments/messages/messages_writer.rs
+++ b/core/server/src/streaming/segments/messages/messages_writer.rs
@@ -33,6 +33,7 @@ pub struct MessagesWriter {
     file: File,
     messages_size_bytes: Rc<AtomicU64>,
     fsync: bool,
+    pub lock: tokio::sync::Mutex<()>,
 }
 
 // Safety: We are guaranteeing that MessagesWriter will never be used from 
multiple threads
@@ -86,6 +87,7 @@ impl MessagesWriter {
             file,
             messages_size_bytes,
             fsync,
+            lock: tokio::sync::Mutex::new(()),
         })
     }
 
diff --git a/core/server/src/streaming/topics/helpers.rs 
b/core/server/src/streaming/topics/helpers.rs
index 0e0e66344..0972596b8 100644
--- a/core/server/src/streaming/topics/helpers.rs
+++ b/core/server/src/streaming/topics/helpers.rs
@@ -281,31 +281,29 @@ fn delete_member(
     members: &mut ConsumerGroupMembers,
     partitions: &[usize],
 ) -> Option<usize> {
-    let member_ids: Vec<usize> = members
-        .inner()
-        .shared_get()
-        .iter()
-        .filter_map(|(_, member)| (member.client_id == 
client_id).then_some(member.id))
-        .collect();
-
-    if member_ids.is_empty() {
-        return None;
-    }
-
-    members.inner_mut().rcu(|members| {
-        let mut members = mimic_members(members);
-        for member_id in &member_ids {
-            members.remove(*member_id);
+    let mut member_id = None;
+    members.inner_mut().rcu(|inner| {
+        let member_ids: Vec<usize> = inner
+            .iter()
+            .filter_map(|(_, member)| (member.client_id == 
client_id).then_some(member.id))
+            .collect();
+
+        let mut members = mimic_members(inner);
+        if !member_ids.is_empty() {
+            member_id = member_ids.first().cloned();
+            for member_id in &member_ids {
+                members.remove(*member_id);
+            }
+            members.compact(|entry, _, idx| {
+                entry.id = idx;
+                true
+            });
+            assign_partitions_to_members(id, &mut members, partitions);
+            return members;
         }
-        members.compact(|entry, _, idx| {
-            entry.id = idx;
-            true
-        });
-        assign_partitions_to_members(id, &mut members, partitions);
         members
     });
-
-    Some(member_ids[0])
+    member_id
 }
 
 fn assign_partitions_to_members(id: usize, members: &mut Slab<Member>, 
partitions: &[usize]) {

Reply via email to