This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch arc-swap
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 0d9fab57e0c824633cd1b9b435a3faa938d23b59
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Dec 29 14:12:24 2025 +0100

    cg, users, partitions
---
 core/server/src/shard/system/consumer_groups.rs | 17 +++++++++-
 core/server/src/shard/system/partitions.rs      | 10 ++++++
 core/server/src/shard/system/users.rs           | 43 ++++++++++++++++++++++---
 3 files changed, 64 insertions(+), 6 deletions(-)

diff --git a/core/server/src/shard/system/consumer_groups.rs 
b/core/server/src/shard/system/consumer_groups.rs
index 11a9296a8..39168ece9 100644
--- a/core/server/src/shard/system/consumer_groups.rs
+++ b/core/server/src/shard/system/consumer_groups.rs
@@ -69,7 +69,16 @@ impl IggyShard {
                 topic_id,
             ).with_error(|error| format!("{COMPONENT} (error: {error}) - 
permission denied to create consumer group for user {} on stream ID: {}, topic 
ID: {}", session.get_user_id(), stream_id, topic_id))?;
         }
-        let cg = self.create_and_insert_consumer_group_mem(stream_id, 
topic_id, name);
+        let cg = self.create_and_insert_consumer_group_mem(stream_id, 
topic_id, name.clone());
+
+        // Dual-write: also update SharedMetadata
+        let partition_ids = self.streams.with_topics(stream_id, |topics| {
+            topics.with_partitions(topic_id, 
partitions::helpers::get_partition_ids())
+        });
+        let _ =
+            self.shared_metadata
+                .create_consumer_group(stream_id, topic_id, name, 
partition_ids);
+
         Ok(cg)
     }
 
@@ -131,6 +140,12 @@ impl IggyShard {
             ).with_error(|error| format!("{COMPONENT} (error: {error}) - 
permission denied to delete consumer group for user {} on stream ID: {}, topic 
ID: {}", session.get_user_id(), stream_id, topic_id))?;
         }
         let cg = self.delete_consumer_group_base(stream_id, topic_id, 
group_id);
+
+        // Dual-write: also update SharedMetadata
+        let _ = self
+            .shared_metadata
+            .delete_consumer_group(stream_id, topic_id, group_id);
+
         Ok(cg)
     }
 
diff --git a/core/server/src/shard/system/partitions.rs 
b/core/server/src/shard/system/partitions.rs
index b95847303..10177bab3 100644
--- a/core/server/src/shard/system/partitions.rs
+++ b/core/server/src/shard/system/partitions.rs
@@ -100,6 +100,11 @@ impl IggyShard {
         self.metrics.increment_partitions(partitions_count);
         self.metrics.increment_segments(partitions_count);
 
+        // Dual-write: also update SharedMetadata
+        let _ = self
+            .shared_metadata
+            .create_partitions(stream_id, topic_id, partitions_count);
+
         let shards_count = self.get_available_shards_count();
         for (partition_id, stats) in partitions.iter().map(|p| (p.id(), 
p.stats())) {
             let ns = IggyNamespace::new(numeric_stream_id, numeric_topic_id, 
partition_id);
@@ -276,6 +281,11 @@ impl IggyShard {
         parent.decrement_size_bytes(total_size_bytes);
         parent.decrement_segments_count(total_segments_count);
 
+        // Dual-write: also update SharedMetadata
+        let _ = self
+            .shared_metadata
+            .delete_partitions(stream_id, topic_id, &deleted_ids);
+
         Ok(deleted_ids)
     }
 
diff --git a/core/server/src/shard/system/users.rs 
b/core/server/src/shard/system/users.rs
index 634551e39..548aaf320 100644
--- a/core/server/src/shard/system/users.rs
+++ b/core/server/src/shard/system/users.rs
@@ -105,7 +105,16 @@ impl IggyShard {
             return Err(IggyError::UsersLimitReached);
         }
 
-        let user_id = self.create_user_base(username, password, status, 
permissions)?;
+        let user_id = self.create_user_base(username, password, status, 
permissions.clone())?;
+
+        // Dual-write: also update SharedMetadata
+        let _ = self.shared_metadata.create_user(
+            username.to_string(),
+            crypto::hash_password(password),
+            status,
+            permissions,
+        );
+
         self.get_user(&(user_id as u32).try_into()?)
             .with_error(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to get user 
with id: {user_id}")
@@ -159,7 +168,12 @@ impl IggyShard {
                 )
             })?;
 
-        self.delete_user_base(user_id)
+        let user = self.delete_user_base(user_id)?;
+
+        // Dual-write: also update SharedMetadata
+        let _ = self.shared_metadata.delete_user(user_id);
+
+        Ok(user)
     }
 
     pub fn delete_user_bypass_auth(&self, user_id: &Identifier) -> 
Result<User, IggyError> {
@@ -215,7 +229,12 @@ impl IggyShard {
                 )
             })?;
 
-        self.update_user_base(user_id, username, status)
+        let user = self.update_user_base(user_id, username.clone(), status)?;
+
+        // Dual-write: also update SharedMetadata
+        let _ = self.shared_metadata.update_user(user_id, username, status);
+
+        Ok(user)
     }
 
     pub fn update_user_bypass_auth(
@@ -286,7 +305,14 @@ impl IggyShard {
             }
         }
 
-        self.update_permissions_base(user_id, permissions)
+        self.update_permissions_base(user_id, permissions.clone())?;
+
+        // Dual-write: also update SharedMetadata
+        let _ = self
+            .shared_metadata
+            .update_permissions(user_id, permissions);
+
+        Ok(())
     }
 
     pub fn update_permissions_bypass_auth(
@@ -340,7 +366,14 @@ impl IggyShard {
             }
         }
 
-        self.change_password_base(user_id, current_password, new_password)
+        self.change_password_base(user_id, current_password, new_password)?;
+
+        // Dual-write: also update SharedMetadata
+        let _ = self
+            .shared_metadata
+            .change_password(user_id, crypto::hash_password(new_password));
+
+        Ok(())
     }
 
     pub fn change_password_bypass_auth(

Reply via email to