This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new c1ea81f39 fix(server): fix consumer group leave race and
append_messages (#2521)
c1ea81f39 is described below
commit c1ea81f39048bbbe898de312a55550331d86a40b
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Mon Dec 29 20:32:02 2025 +0100
fix(server): fix consumer group leave race and append_messages (#2521)
This PR fixes an race condition when client leaves consumer group,
members collection was in inconsistent state, leading to panic on the
`remove` method call.
Additionally fixes an concurrency issue affecting `current_position`
pointer in active segment, leading to incorrect indexes being written to
disk. The scenario only happen in cases when every request
`append_messages` from the client triggered commit of the journal and
disk store.
---
core/server/src/shard/system/clients.rs | 3 +-
core/server/src/slab/streams.rs | 2 ++
.../streaming/segments/messages/messages_writer.rs | 2 ++
core/server/src/streaming/topics/helpers.rs | 42 +++++++++++-----------
4 files changed, 26 insertions(+), 23 deletions(-)
diff --git a/core/server/src/shard/system/clients.rs
b/core/server/src/shard/system/clients.rs
index 3c0b3b444..112ba97c5 100644
--- a/core/server/src/shard/system/clients.rs
+++ b/core/server/src/shard/system/clients.rs
@@ -37,7 +37,7 @@ impl IggyShard {
let consumer_groups: Vec<(u32, u32, u32)>;
{
- let client = self.client_manager.delete_client(client_id);
+ let client = self.client_manager.try_get_client(client_id);
if client.is_none() {
error!("Client with ID: {client_id} was not found in the
client manager.",);
return;
@@ -65,6 +65,7 @@ impl IggyShard {
client_id,
)
}
+ self.client_manager.delete_client(client_id);
}
pub fn get_client(
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index ad7d5368d..1517c6195 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -1307,6 +1307,7 @@ impl Streams {
.clone(),
)
});
+ let guard = messages_writer.lock.lock().await;
let saved = messages_writer
.as_ref()
@@ -1350,6 +1351,7 @@ impl Streams {
streaming_partitions::helpers::update_index_and_increment_stats(saved, config),
);
+ drop(guard);
Ok(batch_count)
}
diff --git a/core/server/src/streaming/segments/messages/messages_writer.rs
b/core/server/src/streaming/segments/messages/messages_writer.rs
index b0e48badc..ddd0c5197 100644
--- a/core/server/src/streaming/segments/messages/messages_writer.rs
+++ b/core/server/src/streaming/segments/messages/messages_writer.rs
@@ -33,6 +33,7 @@ pub struct MessagesWriter {
file: File,
messages_size_bytes: Rc<AtomicU64>,
fsync: bool,
+ pub lock: tokio::sync::Mutex<()>,
}
// Safety: We are guaranteeing that MessagesWriter will never be used from
multiple threads
@@ -86,6 +87,7 @@ impl MessagesWriter {
file,
messages_size_bytes,
fsync,
+ lock: tokio::sync::Mutex::new(()),
})
}
diff --git a/core/server/src/streaming/topics/helpers.rs
b/core/server/src/streaming/topics/helpers.rs
index 0e0e66344..0972596b8 100644
--- a/core/server/src/streaming/topics/helpers.rs
+++ b/core/server/src/streaming/topics/helpers.rs
@@ -281,31 +281,29 @@ fn delete_member(
members: &mut ConsumerGroupMembers,
partitions: &[usize],
) -> Option<usize> {
- let member_ids: Vec<usize> = members
- .inner()
- .shared_get()
- .iter()
- .filter_map(|(_, member)| (member.client_id ==
client_id).then_some(member.id))
- .collect();
-
- if member_ids.is_empty() {
- return None;
- }
-
- members.inner_mut().rcu(|members| {
- let mut members = mimic_members(members);
- for member_id in &member_ids {
- members.remove(*member_id);
+ let mut member_id = None;
+ members.inner_mut().rcu(|inner| {
+ let member_ids: Vec<usize> = inner
+ .iter()
+ .filter_map(|(_, member)| (member.client_id ==
client_id).then_some(member.id))
+ .collect();
+
+ let mut members = mimic_members(inner);
+ if !member_ids.is_empty() {
+ member_id = member_ids.first().cloned();
+ for member_id in &member_ids {
+ members.remove(*member_id);
+ }
+ members.compact(|entry, _, idx| {
+ entry.id = idx;
+ true
+ });
+ assign_partitions_to_members(id, &mut members, partitions);
+ return members;
}
- members.compact(|entry, _, idx| {
- entry.id = idx;
- true
- });
- assign_partitions_to_members(id, &mut members, partitions);
members
});
-
- Some(member_ids[0])
+ member_id
}
fn assign_partitions_to_members(id: usize, members: &mut Slab<Member>,
partitions: &[usize]) {