This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch io_uring_tpc_broadcast
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/io_uring_tpc_broadcast by this 
push:
     new 44d74602c fix cg rebalance (#2268)
44d74602c is described below

commit 44d74602ce4183bb5b28d1b5bcc2a8d0135ae8de
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Thu Oct 16 12:18:16 2025 +0200

    fix cg rebalance (#2268)
---
 core/common/src/utils/crypto.rs                    |  5 ++-
 .../messages/flush_unsaved_buffer_handler.rs       |  1 -
 .../partitions/create_partitions_handler.rs        |  9 ++++-
 .../partitions/delete_partitions_handler.rs        | 15 +++++++-
 core/server/src/http/messages.rs                   |  1 -
 core/server/src/shard/system/messages.rs           |  2 --
 core/server/src/streaming/topics/helpers.rs        | 42 +++++++++++++++++++---
 7 files changed, 62 insertions(+), 13 deletions(-)

diff --git a/core/common/src/utils/crypto.rs b/core/common/src/utils/crypto.rs
index 11d51cb7c..fca62dde1 100644
--- a/core/common/src/utils/crypto.rs
+++ b/core/common/src/utils/crypto.rs
@@ -18,7 +18,6 @@
 
 use crate::IggyError;
 use crate::text;
-use aes_gcm::aead::generic_array::GenericArray;
 use aes_gcm::aead::{Aead, OsRng};
 use aes_gcm::{AeadCore, Aes256Gcm, KeyInit};
 use std::fmt::Debug;
@@ -63,7 +62,7 @@ impl Aes256GcmEncryptor {
             return Err(IggyError::InvalidEncryptionKey);
         }
         Ok(Self {
-            cipher: Aes256Gcm::new(GenericArray::from_slice(key)),
+            cipher: Aes256Gcm::new(key.into()),
         })
     }
 
@@ -84,7 +83,7 @@ impl Encryptor for Aes256GcmEncryptor {
     }
 
     fn decrypt(&self, data: &[u8]) -> Result<Vec<u8>, IggyError> {
-        let nonce = GenericArray::from_slice(&data[0..12]);
+        let nonce = (&data[0..12]).into();
         let payload = self.cipher.decrypt(nonce, &data[12..]);
         if payload.is_err() {
             return Err(IggyError::CannotDecryptData);
diff --git 
a/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs 
b/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs
index 00f1f8aba..ab1da1ce8 100644
--- a/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs
+++ b/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs
@@ -20,7 +20,6 @@ use crate::binary::command::{BinaryServerCommand, 
ServerCommand, ServerCommandHa
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::messages::COMPONENT, sender::SenderKind};
 use crate::shard::IggyShard;
-use crate::shard::transmission::event::ShardEvent;
 use crate::streaming::session::Session;
 use anyhow::Result;
 use error_set::ErrContext;
diff --git 
a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs 
b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
index 6589031da..fe05c200d 100644
--- a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
+++ b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
@@ -21,6 +21,7 @@ use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::partitions::COMPONENT, sender::SenderKind};
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
+use crate::slab::traits_ext::EntityMarker;
 use crate::state::command::EntryCommand;
 use crate::streaming::session::Session;
 use crate::streaming::{streams, topics};
@@ -53,13 +54,19 @@ impl ServerCommandHandler for CreatePartitions {
                 self.partitions_count,
             )
             .await?;
+        let partition_ids = partitions.iter().map(|p| 
p.id()).collect::<Vec<_>>();
         let event = ShardEvent::CreatedPartitions2 {
             stream_id: self.stream_id.clone(),
             topic_id: self.topic_id.clone(),
             partitions,
         };
         let _responses = shard.broadcast_event_to_all_shards(event).await;
-        // TODO: Rebalance the consumer group.
+
+        shard.streams2.with_topic_by_id_mut(
+            &self.stream_id,
+            &self.topic_id,
+            topics::helpers::rebalance_consumer_group(shard.id, 
&partition_ids),
+        );
 
         let stream_id = shard
             .streams2
diff --git 
a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs 
b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
index 98b56f76a..9a0451651 100644
--- a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
+++ b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
@@ -62,7 +62,20 @@ impl ServerCommandHandler for DeletePartitions {
             partition_ids: deleted_partition_ids,
         };
         let _responses = shard.broadcast_event_to_all_shards(event).await;
-        // TODO: Rebalance the consumer group.
+
+        let remaining_partition_ids = shard.streams2.with_topic_by_id(
+            &self.stream_id,
+            &self.topic_id,
+            crate::streaming::topics::helpers::get_partition_ids(),
+        );
+        shard.streams2.with_topic_by_id_mut(
+            &self.stream_id,
+            &self.topic_id,
+            crate::streaming::topics::helpers::rebalance_consumer_group(
+                shard.id,
+                &remaining_partition_ids,
+            ),
+        );
 
         shard
         .state
diff --git a/core/server/src/http/messages.rs b/core/server/src/http/messages.rs
index 61d1ee13a..84e1824e1 100644
--- a/core/server/src/http/messages.rs
+++ b/core/server/src/http/messages.rs
@@ -132,7 +132,6 @@ async fn flush_unsaved_buffer(
     let stream_id = Identifier::from_str_value(&stream_id)?;
     let topic_id = Identifier::from_str_value(&topic_id)?;
     let partition_id = partition_id as usize;
-    let session = Session::stateless(identity.user_id, identity.ip_address);
 
     let flush_future = 
SendWrapper::new(state.shard.shard().flush_unsaved_buffer(
         identity.user_id,
diff --git a/core/server/src/shard/system/messages.rs 
b/core/server/src/shard/system/messages.rs
index 74e0add16..967adb058 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -24,9 +24,7 @@ use crate::shard::transmission::frame::ShardResponse;
 use crate::shard::transmission::message::{
     ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult,
 };
-use crate::streaming::partitions::journal::Journal;
 use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut, 
IggyMessagesBatchSet};
-use crate::streaming::session::Session;
 use crate::streaming::traits::MainOps;
 use crate::streaming::utils::PooledBuffer;
 use crate::streaming::{partitions, streams, topics};
diff --git a/core/server/src/streaming/topics/helpers.rs 
b/core/server/src/streaming/topics/helpers.rs
index 87c29c364..3ad1f230b 100644
--- a/core/server/src/streaming/topics/helpers.rs
+++ b/core/server/src/streaming/topics/helpers.rs
@@ -6,7 +6,10 @@ use crate::{
         Keyed,
         consumer_groups::{self, ConsumerGroups},
         topics::{self, Topics},
-        traits_ext::{ComponentsById, Delete, DeleteCell, EntityMarker},
+        traits_ext::{
+            ComponentsById, Delete, DeleteCell, EntityComponentSystem, 
EntityComponentSystemMut,
+            EntityMarker, IntoComponents,
+        },
     },
     streaming::{
         stats::TopicStats,
@@ -44,6 +47,15 @@ pub fn get_topic_id() -> impl 
FnOnce(ComponentsById<TopicRef>) -> topics::Contai
     |(root, _, _)| root.id()
 }
 
+pub fn get_partition_ids() -> impl FnOnce(ComponentsById<TopicRef>) -> 
Vec<usize> {
+    |(root, ..)| {
+        root.partitions().with_components(|components| {
+            let (roots, ..) = components.into_components();
+            roots.iter().map(|(id, _)| id).collect()
+        })
+    }
+}
+
 pub fn get_message_expiry() -> impl FnOnce(ComponentsById<TopicRef>) -> 
IggyExpiry {
     |(root, _, _)| root.message_expiry()
 }
@@ -144,6 +156,28 @@ pub fn leave_consumer_group(
     }
 }
 
+pub fn rebalance_consumer_group(
+    shard_id: u16,
+    partition_ids: &[usize],
+) -> impl FnOnce(ComponentsById<TopicRefMut>) {
+    move |(mut root, ..)| {
+        root.consumer_groups_mut()
+            .with_components_mut(|components| {
+                let (all_roots, all_members) = components.into_components();
+                for ((_, consumer_group_root), (_, members)) in
+                    all_roots.iter().zip(all_members.iter_mut())
+                {
+                    let id = consumer_group_root.id();
+                    members.inner_mut().rcu(|existing_members| {
+                        let mut new_members = mimic_members(existing_members);
+                        assign_partitions_to_members(shard_id, id, &mut 
new_members, partition_ids);
+                        new_members
+                    });
+                }
+            });
+    }
+}
+
 pub fn get_consumer_group_member_id(
     client_id: u32,
 ) -> impl FnOnce(ComponentsById<ConsumerGroupRef>) -> usize {
@@ -206,7 +240,7 @@ fn add_member(
     members.inner_mut().rcu(move |members| {
         let mut members = mimic_members(members);
         Member::new(client_id).insert_into(&mut members);
-        assign_partitions_to_members(shard_id, id, &mut members, 
partitions.to_vec());
+        assign_partitions_to_members(shard_id, id, &mut members, partitions);
         members
     });
 }
@@ -231,7 +265,7 @@ fn delete_member(
             entry.id = idx;
             true
         });
-        assign_partitions_to_members(shard_id, id, &mut members, 
partitions.to_vec());
+        assign_partitions_to_members(shard_id, id, &mut members, partitions);
         members
     });
 }
@@ -240,7 +274,7 @@ fn assign_partitions_to_members(
     shard_id: u16,
     id: usize,
     members: &mut Slab<Member>,
-    partitions: Vec<usize>,
+    partitions: &[usize],
 ) {
     members
         .iter_mut()

Reply via email to