This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch fix_cg_rebalance in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 7574364dd603391bf37c6d86d705b466017c9f75 Author: numminex <[email protected]> AuthorDate: Thu Oct 16 11:38:57 2025 +0200 fix cg rebalance --- .../partitions/create_partitions_handler.rs | 9 ++++- .../partitions/delete_partitions_handler.rs | 15 +++++++- core/server/src/streaming/topics/helpers.rs | 42 +++++++++++++++++++--- 3 files changed, 60 insertions(+), 6 deletions(-) diff --git a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs index 6589031da..fe05c200d 100644 --- a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs +++ b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs @@ -21,6 +21,7 @@ use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::partitions::COMPONENT, sender::SenderKind}; use crate::shard::IggyShard; use crate::shard::transmission::event::ShardEvent; +use crate::slab::traits_ext::EntityMarker; use crate::state::command::EntryCommand; use crate::streaming::session::Session; use crate::streaming::{streams, topics}; @@ -53,13 +54,19 @@ impl ServerCommandHandler for CreatePartitions { self.partitions_count, ) .await?; + let partition_ids = partitions.iter().map(|p| p.id()).collect::<Vec<_>>(); let event = ShardEvent::CreatedPartitions2 { stream_id: self.stream_id.clone(), topic_id: self.topic_id.clone(), partitions, }; let _responses = shard.broadcast_event_to_all_shards(event).await; - // TODO: Rebalance the consumer group. + + shard.streams2.with_topic_by_id_mut( + &self.stream_id, + &self.topic_id, + topics::helpers::rebalance_consumer_group(shard.id, &partition_ids), + ); let stream_id = shard .streams2 diff --git a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs index 98b56f76a..9a0451651 100644 --- a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs +++ b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs @@ -62,7 +62,20 @@ impl ServerCommandHandler for DeletePartitions { partition_ids: deleted_partition_ids, }; let _responses = shard.broadcast_event_to_all_shards(event).await; - // TODO: Rebalance the consumer group. + + let remaining_partition_ids = shard.streams2.with_topic_by_id( + &self.stream_id, + &self.topic_id, + crate::streaming::topics::helpers::get_partition_ids(), + ); + shard.streams2.with_topic_by_id_mut( + &self.stream_id, + &self.topic_id, + crate::streaming::topics::helpers::rebalance_consumer_group( + shard.id, + &remaining_partition_ids, + ), + ); shard .state diff --git a/core/server/src/streaming/topics/helpers.rs b/core/server/src/streaming/topics/helpers.rs index 87c29c364..3ad1f230b 100644 --- a/core/server/src/streaming/topics/helpers.rs +++ b/core/server/src/streaming/topics/helpers.rs @@ -6,7 +6,10 @@ use crate::{ Keyed, consumer_groups::{self, ConsumerGroups}, topics::{self, Topics}, - traits_ext::{ComponentsById, Delete, DeleteCell, EntityMarker}, + traits_ext::{ + ComponentsById, Delete, DeleteCell, EntityComponentSystem, EntityComponentSystemMut, + EntityMarker, IntoComponents, + }, }, streaming::{ stats::TopicStats, @@ -44,6 +47,15 @@ pub fn get_topic_id() -> impl FnOnce(ComponentsById<TopicRef>) -> topics::Contai |(root, _, _)| root.id() } +pub fn get_partition_ids() -> impl FnOnce(ComponentsById<TopicRef>) -> Vec<usize> { + |(root, ..)| { + root.partitions().with_components(|components| { + let (roots, ..) = components.into_components(); + roots.iter().map(|(id, _)| id).collect() + }) + } +} + pub fn get_message_expiry() -> impl FnOnce(ComponentsById<TopicRef>) -> IggyExpiry { |(root, _, _)| root.message_expiry() } @@ -144,6 +156,28 @@ pub fn leave_consumer_group( } } +pub fn rebalance_consumer_group( + shard_id: u16, + partition_ids: &[usize], +) -> impl FnOnce(ComponentsById<TopicRefMut>) { + move |(mut root, ..)| { + root.consumer_groups_mut() + .with_components_mut(|components| { + let (all_roots, all_members) = components.into_components(); + for ((_, consumer_group_root), (_, members)) in + all_roots.iter().zip(all_members.iter_mut()) + { + let id = consumer_group_root.id(); + members.inner_mut().rcu(|existing_members| { + let mut new_members = mimic_members(existing_members); + assign_partitions_to_members(shard_id, id, &mut new_members, partition_ids); + new_members + }); + } + }); + } +} + pub fn get_consumer_group_member_id( client_id: u32, ) -> impl FnOnce(ComponentsById<ConsumerGroupRef>) -> usize { @@ -206,7 +240,7 @@ fn add_member( members.inner_mut().rcu(move |members| { let mut members = mimic_members(members); Member::new(client_id).insert_into(&mut members); - assign_partitions_to_members(shard_id, id, &mut members, partitions.to_vec()); + assign_partitions_to_members(shard_id, id, &mut members, partitions); members }); } @@ -231,7 +265,7 @@ fn delete_member( entry.id = idx; true }); - assign_partitions_to_members(shard_id, id, &mut members, partitions.to_vec()); + assign_partitions_to_members(shard_id, id, &mut members, partitions); members }); } @@ -240,7 +274,7 @@ fn assign_partitions_to_members( shard_id: u16, id: usize, members: &mut Slab<Member>, - partitions: Vec<usize>, + partitions: &[usize], ) { members .iter_mut()
