This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch fix_cg_rebalance
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 7574364dd603391bf37c6d86d705b466017c9f75
Author: numminex <[email protected]>
AuthorDate: Thu Oct 16 11:38:57 2025 +0200

    fix cg rebalance
---
 .../partitions/create_partitions_handler.rs        |  9 ++++-
 .../partitions/delete_partitions_handler.rs        | 15 +++++++-
 core/server/src/streaming/topics/helpers.rs        | 42 +++++++++++++++++++---
 3 files changed, 60 insertions(+), 6 deletions(-)

diff --git 
a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs 
b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
index 6589031da..fe05c200d 100644
--- a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
+++ b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
@@ -21,6 +21,7 @@ use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::partitions::COMPONENT, sender::SenderKind};
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
+use crate::slab::traits_ext::EntityMarker;
 use crate::state::command::EntryCommand;
 use crate::streaming::session::Session;
 use crate::streaming::{streams, topics};
@@ -53,13 +54,19 @@ impl ServerCommandHandler for CreatePartitions {
                 self.partitions_count,
             )
             .await?;
+        let partition_ids = partitions.iter().map(|p| 
p.id()).collect::<Vec<_>>();
         let event = ShardEvent::CreatedPartitions2 {
             stream_id: self.stream_id.clone(),
             topic_id: self.topic_id.clone(),
             partitions,
         };
         let _responses = shard.broadcast_event_to_all_shards(event).await;
-        // TODO: Rebalance the consumer group.
+
+        shard.streams2.with_topic_by_id_mut(
+            &self.stream_id,
+            &self.topic_id,
+            topics::helpers::rebalance_consumer_group(shard.id, 
&partition_ids),
+        );
 
         let stream_id = shard
             .streams2
diff --git 
a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs 
b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
index 98b56f76a..9a0451651 100644
--- a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
+++ b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
@@ -62,7 +62,20 @@ impl ServerCommandHandler for DeletePartitions {
             partition_ids: deleted_partition_ids,
         };
         let _responses = shard.broadcast_event_to_all_shards(event).await;
-        // TODO: Rebalance the consumer group.
+
+        let remaining_partition_ids = shard.streams2.with_topic_by_id(
+            &self.stream_id,
+            &self.topic_id,
+            crate::streaming::topics::helpers::get_partition_ids(),
+        );
+        shard.streams2.with_topic_by_id_mut(
+            &self.stream_id,
+            &self.topic_id,
+            crate::streaming::topics::helpers::rebalance_consumer_group(
+                shard.id,
+                &remaining_partition_ids,
+            ),
+        );
 
         shard
         .state
diff --git a/core/server/src/streaming/topics/helpers.rs 
b/core/server/src/streaming/topics/helpers.rs
index 87c29c364..3ad1f230b 100644
--- a/core/server/src/streaming/topics/helpers.rs
+++ b/core/server/src/streaming/topics/helpers.rs
@@ -6,7 +6,10 @@ use crate::{
         Keyed,
         consumer_groups::{self, ConsumerGroups},
         topics::{self, Topics},
-        traits_ext::{ComponentsById, Delete, DeleteCell, EntityMarker},
+        traits_ext::{
+            ComponentsById, Delete, DeleteCell, EntityComponentSystem, 
EntityComponentSystemMut,
+            EntityMarker, IntoComponents,
+        },
     },
     streaming::{
         stats::TopicStats,
@@ -44,6 +47,15 @@ pub fn get_topic_id() -> impl 
FnOnce(ComponentsById<TopicRef>) -> topics::Contai
     |(root, _, _)| root.id()
 }
 
+pub fn get_partition_ids() -> impl FnOnce(ComponentsById<TopicRef>) -> 
Vec<usize> {
+    |(root, ..)| {
+        root.partitions().with_components(|components| {
+            let (roots, ..) = components.into_components();
+            roots.iter().map(|(id, _)| id).collect()
+        })
+    }
+}
+
 pub fn get_message_expiry() -> impl FnOnce(ComponentsById<TopicRef>) -> 
IggyExpiry {
     |(root, _, _)| root.message_expiry()
 }
@@ -144,6 +156,28 @@ pub fn leave_consumer_group(
     }
 }
 
+pub fn rebalance_consumer_group(
+    shard_id: u16,
+    partition_ids: &[usize],
+) -> impl FnOnce(ComponentsById<TopicRefMut>) {
+    move |(mut root, ..)| {
+        root.consumer_groups_mut()
+            .with_components_mut(|components| {
+                let (all_roots, all_members) = components.into_components();
+                for ((_, consumer_group_root), (_, members)) in
+                    all_roots.iter().zip(all_members.iter_mut())
+                {
+                    let id = consumer_group_root.id();
+                    members.inner_mut().rcu(|existing_members| {
+                        let mut new_members = mimic_members(existing_members);
+                        assign_partitions_to_members(shard_id, id, &mut 
new_members, partition_ids);
+                        new_members
+                    });
+                }
+            });
+    }
+}
+
 pub fn get_consumer_group_member_id(
     client_id: u32,
 ) -> impl FnOnce(ComponentsById<ConsumerGroupRef>) -> usize {
@@ -206,7 +240,7 @@ fn add_member(
     members.inner_mut().rcu(move |members| {
         let mut members = mimic_members(members);
         Member::new(client_id).insert_into(&mut members);
-        assign_partitions_to_members(shard_id, id, &mut members, 
partitions.to_vec());
+        assign_partitions_to_members(shard_id, id, &mut members, partitions);
         members
     });
 }
@@ -231,7 +265,7 @@ fn delete_member(
             entry.id = idx;
             true
         });
-        assign_partitions_to_members(shard_id, id, &mut members, 
partitions.to_vec());
+        assign_partitions_to_members(shard_id, id, &mut members, partitions);
         members
     });
 }
@@ -240,7 +274,7 @@ fn assign_partitions_to_members(
     shard_id: u16,
     id: usize,
     members: &mut Slab<Member>,
-    partitions: Vec<usize>,
+    partitions: &[usize],
 ) {
     members
         .iter_mut()

Reply via email to