This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch fix_consumer_group_leave_and_append_messages in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 3dfe2ba2d53959dfc0a80baa585faeade5b77a7a Author: numinex <[email protected]> AuthorDate: Mon Dec 29 14:51:36 2025 +0100 fix(server): fix consumer group leave race and append_messages --- core/server/src/shard/system/clients.rs | 2 +- core/server/src/slab/streams.rs | 2 ++ .../streaming/segments/messages/messages_writer.rs | 2 ++ core/server/src/streaming/topics/helpers.rs | 42 +++++++++++----------- 4 files changed, 25 insertions(+), 23 deletions(-) diff --git a/core/server/src/shard/system/clients.rs b/core/server/src/shard/system/clients.rs index 3c0b3b444..16420acb2 100644 --- a/core/server/src/shard/system/clients.rs +++ b/core/server/src/shard/system/clients.rs @@ -37,7 +37,7 @@ impl IggyShard { let consumer_groups: Vec<(u32, u32, u32)>; { - let client = self.client_manager.delete_client(client_id); + let client = self.client_manager.try_get_client(client_id); if client.is_none() { error!("Client with ID: {client_id} was not found in the client manager.",); return; diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs index ad7d5368d..1517c6195 100644 --- a/core/server/src/slab/streams.rs +++ b/core/server/src/slab/streams.rs @@ -1307,6 +1307,7 @@ impl Streams { .clone(), ) }); + let guard = messages_writer.lock.lock().await; let saved = messages_writer .as_ref() @@ -1350,6 +1351,7 @@ impl Streams { streaming_partitions::helpers::update_index_and_increment_stats(saved, config), ); + drop(guard); Ok(batch_count) } diff --git a/core/server/src/streaming/segments/messages/messages_writer.rs b/core/server/src/streaming/segments/messages/messages_writer.rs index b0e48badc..ddd0c5197 100644 --- a/core/server/src/streaming/segments/messages/messages_writer.rs +++ b/core/server/src/streaming/segments/messages/messages_writer.rs @@ -33,6 +33,7 @@ pub struct MessagesWriter { file: File, messages_size_bytes: Rc<AtomicU64>, fsync: bool, + pub lock: tokio::sync::Mutex<()>, } // Safety: We are guaranteeing that MessagesWriter will never be used from multiple threads @@ -86,6 +87,7 @@ impl MessagesWriter { file, messages_size_bytes, fsync, + lock: tokio::sync::Mutex::new(()), }) } diff --git a/core/server/src/streaming/topics/helpers.rs b/core/server/src/streaming/topics/helpers.rs index 0e0e66344..0972596b8 100644 --- a/core/server/src/streaming/topics/helpers.rs +++ b/core/server/src/streaming/topics/helpers.rs @@ -281,31 +281,29 @@ fn delete_member( members: &mut ConsumerGroupMembers, partitions: &[usize], ) -> Option<usize> { - let member_ids: Vec<usize> = members - .inner() - .shared_get() - .iter() - .filter_map(|(_, member)| (member.client_id == client_id).then_some(member.id)) - .collect(); - - if member_ids.is_empty() { - return None; - } - - members.inner_mut().rcu(|members| { - let mut members = mimic_members(members); - for member_id in &member_ids { - members.remove(*member_id); + let mut member_id = None; + members.inner_mut().rcu(|inner| { + let member_ids: Vec<usize> = inner + .iter() + .filter_map(|(_, member)| (member.client_id == client_id).then_some(member.id)) + .collect(); + + let mut members = mimic_members(inner); + if !member_ids.is_empty() { + member_id = member_ids.first().cloned(); + for member_id in &member_ids { + members.remove(*member_id); + } + members.compact(|entry, _, idx| { + entry.id = idx; + true + }); + assign_partitions_to_members(id, &mut members, partitions); + return members; } - members.compact(|entry, _, idx| { - entry.id = idx; - true - }); - assign_partitions_to_members(id, &mut members, partitions); members }); - - Some(member_ids[0]) + member_id } fn assign_partitions_to_members(id: usize, members: &mut Slab<Member>, partitions: &[usize]) {
