This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch arc-swap in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 0d9fab57e0c824633cd1b9b435a3faa938d23b59 Author: Hubert Gruszecki <[email protected]> AuthorDate: Mon Dec 29 14:12:24 2025 +0100 cg, users, partitions --- core/server/src/shard/system/consumer_groups.rs | 17 +++++++++- core/server/src/shard/system/partitions.rs | 10 ++++++ core/server/src/shard/system/users.rs | 43 ++++++++++++++++++++++--- 3 files changed, 64 insertions(+), 6 deletions(-) diff --git a/core/server/src/shard/system/consumer_groups.rs b/core/server/src/shard/system/consumer_groups.rs index 11a9296a8..39168ece9 100644 --- a/core/server/src/shard/system/consumer_groups.rs +++ b/core/server/src/shard/system/consumer_groups.rs @@ -69,7 +69,16 @@ impl IggyShard { topic_id, ).with_error(|error| format!("{COMPONENT} (error: {error}) - permission denied to create consumer group for user {} on stream ID: {}, topic ID: {}", session.get_user_id(), stream_id, topic_id))?; } - let cg = self.create_and_insert_consumer_group_mem(stream_id, topic_id, name); + let cg = self.create_and_insert_consumer_group_mem(stream_id, topic_id, name.clone()); + + // Dual-write: also update SharedMetadata + let partition_ids = self.streams.with_topics(stream_id, |topics| { + topics.with_partitions(topic_id, partitions::helpers::get_partition_ids()) + }); + let _ = + self.shared_metadata + .create_consumer_group(stream_id, topic_id, name, partition_ids); + Ok(cg) } @@ -131,6 +140,12 @@ impl IggyShard { ).with_error(|error| format!("{COMPONENT} (error: {error}) - permission denied to delete consumer group for user {} on stream ID: {}, topic ID: {}", session.get_user_id(), stream_id, topic_id))?; } let cg = self.delete_consumer_group_base(stream_id, topic_id, group_id); + + // Dual-write: also update SharedMetadata + let _ = self + .shared_metadata + .delete_consumer_group(stream_id, topic_id, group_id); + Ok(cg) } diff --git a/core/server/src/shard/system/partitions.rs b/core/server/src/shard/system/partitions.rs index b95847303..10177bab3 100644 --- a/core/server/src/shard/system/partitions.rs +++ b/core/server/src/shard/system/partitions.rs @@ -100,6 +100,11 @@ impl IggyShard { self.metrics.increment_partitions(partitions_count); self.metrics.increment_segments(partitions_count); + // Dual-write: also update SharedMetadata + let _ = self + .shared_metadata + .create_partitions(stream_id, topic_id, partitions_count); + let shards_count = self.get_available_shards_count(); for (partition_id, stats) in partitions.iter().map(|p| (p.id(), p.stats())) { let ns = IggyNamespace::new(numeric_stream_id, numeric_topic_id, partition_id); @@ -276,6 +281,11 @@ impl IggyShard { parent.decrement_size_bytes(total_size_bytes); parent.decrement_segments_count(total_segments_count); + // Dual-write: also update SharedMetadata + let _ = self + .shared_metadata + .delete_partitions(stream_id, topic_id, &deleted_ids); + Ok(deleted_ids) } diff --git a/core/server/src/shard/system/users.rs b/core/server/src/shard/system/users.rs index 634551e39..548aaf320 100644 --- a/core/server/src/shard/system/users.rs +++ b/core/server/src/shard/system/users.rs @@ -105,7 +105,16 @@ impl IggyShard { return Err(IggyError::UsersLimitReached); } - let user_id = self.create_user_base(username, password, status, permissions)?; + let user_id = self.create_user_base(username, password, status, permissions.clone())?; + + // Dual-write: also update SharedMetadata + let _ = self.shared_metadata.create_user( + username.to_string(), + crypto::hash_password(password), + status, + permissions, + ); + self.get_user(&(user_id as u32).try_into()?) .with_error(|error| { format!("{COMPONENT} (error: {error}) - failed to get user with id: {user_id}") @@ -159,7 +168,12 @@ impl IggyShard { ) })?; - self.delete_user_base(user_id) + let user = self.delete_user_base(user_id)?; + + // Dual-write: also update SharedMetadata + let _ = self.shared_metadata.delete_user(user_id); + + Ok(user) } pub fn delete_user_bypass_auth(&self, user_id: &Identifier) -> Result<User, IggyError> { @@ -215,7 +229,12 @@ impl IggyShard { ) })?; - self.update_user_base(user_id, username, status) + let user = self.update_user_base(user_id, username.clone(), status)?; + + // Dual-write: also update SharedMetadata + let _ = self.shared_metadata.update_user(user_id, username, status); + + Ok(user) } pub fn update_user_bypass_auth( @@ -286,7 +305,14 @@ impl IggyShard { } } - self.update_permissions_base(user_id, permissions) + self.update_permissions_base(user_id, permissions.clone())?; + + // Dual-write: also update SharedMetadata + let _ = self + .shared_metadata + .update_permissions(user_id, permissions); + + Ok(()) } pub fn update_permissions_bypass_auth( @@ -340,7 +366,14 @@ impl IggyShard { } } - self.change_password_base(user_id, current_password, new_password) + self.change_password_base(user_id, current_password, new_password)?; + + // Dual-write: also update SharedMetadata + let _ = self + .shared_metadata + .change_password(user_id, crypto::hash_password(new_password)); + + Ok(()) } pub fn change_password_bypass_auth(
