This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch io_uring_tpc_broadcast
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc_broadcast by this
push:
new 44d74602c fix cg rebalance (#2268)
44d74602c is described below
commit 44d74602ce4183bb5b28d1b5bcc2a8d0135ae8de
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Thu Oct 16 12:18:16 2025 +0200
fix cg rebalance (#2268)
---
core/common/src/utils/crypto.rs | 5 ++-
.../messages/flush_unsaved_buffer_handler.rs | 1 -
.../partitions/create_partitions_handler.rs | 9 ++++-
.../partitions/delete_partitions_handler.rs | 15 +++++++-
core/server/src/http/messages.rs | 1 -
core/server/src/shard/system/messages.rs | 2 --
core/server/src/streaming/topics/helpers.rs | 42 +++++++++++++++++++---
7 files changed, 62 insertions(+), 13 deletions(-)
diff --git a/core/common/src/utils/crypto.rs b/core/common/src/utils/crypto.rs
index 11d51cb7c..fca62dde1 100644
--- a/core/common/src/utils/crypto.rs
+++ b/core/common/src/utils/crypto.rs
@@ -18,7 +18,6 @@
use crate::IggyError;
use crate::text;
-use aes_gcm::aead::generic_array::GenericArray;
use aes_gcm::aead::{Aead, OsRng};
use aes_gcm::{AeadCore, Aes256Gcm, KeyInit};
use std::fmt::Debug;
@@ -63,7 +62,7 @@ impl Aes256GcmEncryptor {
return Err(IggyError::InvalidEncryptionKey);
}
Ok(Self {
- cipher: Aes256Gcm::new(GenericArray::from_slice(key)),
+ cipher: Aes256Gcm::new(key.into()),
})
}
@@ -84,7 +83,7 @@ impl Encryptor for Aes256GcmEncryptor {
}
fn decrypt(&self, data: &[u8]) -> Result<Vec<u8>, IggyError> {
- let nonce = GenericArray::from_slice(&data[0..12]);
+ let nonce = (&data[0..12]).into();
let payload = self.cipher.decrypt(nonce, &data[12..]);
if payload.is_err() {
return Err(IggyError::CannotDecryptData);
diff --git
a/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs
b/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs
index 00f1f8aba..ab1da1ce8 100644
--- a/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs
+++ b/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs
@@ -20,7 +20,6 @@ use crate::binary::command::{BinaryServerCommand,
ServerCommand, ServerCommandHa
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::messages::COMPONENT, sender::SenderKind};
use crate::shard::IggyShard;
-use crate::shard::transmission::event::ShardEvent;
use crate::streaming::session::Session;
use anyhow::Result;
use error_set::ErrContext;
diff --git
a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
index 6589031da..fe05c200d 100644
--- a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
+++ b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
@@ -21,6 +21,7 @@ use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::partitions::COMPONENT, sender::SenderKind};
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
+use crate::slab::traits_ext::EntityMarker;
use crate::state::command::EntryCommand;
use crate::streaming::session::Session;
use crate::streaming::{streams, topics};
@@ -53,13 +54,19 @@ impl ServerCommandHandler for CreatePartitions {
self.partitions_count,
)
.await?;
+ let partition_ids = partitions.iter().map(|p|
p.id()).collect::<Vec<_>>();
let event = ShardEvent::CreatedPartitions2 {
stream_id: self.stream_id.clone(),
topic_id: self.topic_id.clone(),
partitions,
};
let _responses = shard.broadcast_event_to_all_shards(event).await;
- // TODO: Rebalance the consumer group.
+
+ shard.streams2.with_topic_by_id_mut(
+ &self.stream_id,
+ &self.topic_id,
+ topics::helpers::rebalance_consumer_group(shard.id,
&partition_ids),
+ );
let stream_id = shard
.streams2
diff --git
a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
index 98b56f76a..9a0451651 100644
--- a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
+++ b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
@@ -62,7 +62,20 @@ impl ServerCommandHandler for DeletePartitions {
partition_ids: deleted_partition_ids,
};
let _responses = shard.broadcast_event_to_all_shards(event).await;
- // TODO: Rebalance the consumer group.
+
+ let remaining_partition_ids = shard.streams2.with_topic_by_id(
+ &self.stream_id,
+ &self.topic_id,
+ crate::streaming::topics::helpers::get_partition_ids(),
+ );
+ shard.streams2.with_topic_by_id_mut(
+ &self.stream_id,
+ &self.topic_id,
+ crate::streaming::topics::helpers::rebalance_consumer_group(
+ shard.id,
+ &remaining_partition_ids,
+ ),
+ );
shard
.state
diff --git a/core/server/src/http/messages.rs b/core/server/src/http/messages.rs
index 61d1ee13a..84e1824e1 100644
--- a/core/server/src/http/messages.rs
+++ b/core/server/src/http/messages.rs
@@ -132,7 +132,6 @@ async fn flush_unsaved_buffer(
let stream_id = Identifier::from_str_value(&stream_id)?;
let topic_id = Identifier::from_str_value(&topic_id)?;
let partition_id = partition_id as usize;
- let session = Session::stateless(identity.user_id, identity.ip_address);
let flush_future =
SendWrapper::new(state.shard.shard().flush_unsaved_buffer(
identity.user_id,
diff --git a/core/server/src/shard/system/messages.rs
b/core/server/src/shard/system/messages.rs
index 74e0add16..967adb058 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -24,9 +24,7 @@ use crate::shard::transmission::frame::ShardResponse;
use crate::shard::transmission::message::{
ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult,
};
-use crate::streaming::partitions::journal::Journal;
use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut,
IggyMessagesBatchSet};
-use crate::streaming::session::Session;
use crate::streaming::traits::MainOps;
use crate::streaming::utils::PooledBuffer;
use crate::streaming::{partitions, streams, topics};
diff --git a/core/server/src/streaming/topics/helpers.rs
b/core/server/src/streaming/topics/helpers.rs
index 87c29c364..3ad1f230b 100644
--- a/core/server/src/streaming/topics/helpers.rs
+++ b/core/server/src/streaming/topics/helpers.rs
@@ -6,7 +6,10 @@ use crate::{
Keyed,
consumer_groups::{self, ConsumerGroups},
topics::{self, Topics},
- traits_ext::{ComponentsById, Delete, DeleteCell, EntityMarker},
+ traits_ext::{
+ ComponentsById, Delete, DeleteCell, EntityComponentSystem,
EntityComponentSystemMut,
+ EntityMarker, IntoComponents,
+ },
},
streaming::{
stats::TopicStats,
@@ -44,6 +47,15 @@ pub fn get_topic_id() -> impl
FnOnce(ComponentsById<TopicRef>) -> topics::Contai
|(root, _, _)| root.id()
}
+pub fn get_partition_ids() -> impl FnOnce(ComponentsById<TopicRef>) ->
Vec<usize> {
+ |(root, ..)| {
+ root.partitions().with_components(|components| {
+ let (roots, ..) = components.into_components();
+ roots.iter().map(|(id, _)| id).collect()
+ })
+ }
+}
+
pub fn get_message_expiry() -> impl FnOnce(ComponentsById<TopicRef>) ->
IggyExpiry {
|(root, _, _)| root.message_expiry()
}
@@ -144,6 +156,28 @@ pub fn leave_consumer_group(
}
}
+pub fn rebalance_consumer_group(
+ shard_id: u16,
+ partition_ids: &[usize],
+) -> impl FnOnce(ComponentsById<TopicRefMut>) {
+ move |(mut root, ..)| {
+ root.consumer_groups_mut()
+ .with_components_mut(|components| {
+ let (all_roots, all_members) = components.into_components();
+ for ((_, consumer_group_root), (_, members)) in
+ all_roots.iter().zip(all_members.iter_mut())
+ {
+ let id = consumer_group_root.id();
+ members.inner_mut().rcu(|existing_members| {
+ let mut new_members = mimic_members(existing_members);
+ assign_partitions_to_members(shard_id, id, &mut
new_members, partition_ids);
+ new_members
+ });
+ }
+ });
+ }
+}
+
pub fn get_consumer_group_member_id(
client_id: u32,
) -> impl FnOnce(ComponentsById<ConsumerGroupRef>) -> usize {
@@ -206,7 +240,7 @@ fn add_member(
members.inner_mut().rcu(move |members| {
let mut members = mimic_members(members);
Member::new(client_id).insert_into(&mut members);
- assign_partitions_to_members(shard_id, id, &mut members,
partitions.to_vec());
+ assign_partitions_to_members(shard_id, id, &mut members, partitions);
members
});
}
@@ -231,7 +265,7 @@ fn delete_member(
entry.id = idx;
true
});
- assign_partitions_to_members(shard_id, id, &mut members,
partitions.to_vec());
+ assign_partitions_to_members(shard_id, id, &mut members, partitions);
members
});
}
@@ -240,7 +274,7 @@ fn assign_partitions_to_members(
shard_id: u16,
id: usize,
members: &mut Slab<Member>,
- partitions: Vec<usize>,
+ partitions: &[usize],
) {
members
.iter_mut()