This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/io_uring_tpc by this push:
     new 6e0fdf7f feat(io_uring): fix pat/user/cg events (#1989)
6e0fdf7f is described below

commit 6e0fdf7f1d56cfb99b11618dc9c746f88d55c94a
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Thu Jul 10 08:42:57 2025 +0200

    feat(io_uring): fix pat/user/cg events (#1989)
---
 .../create_consumer_group_handler.rs               |  11 +-
 .../create_personal_access_token_handler.rs        |  11 +-
 .../delete_personal_access_token_handler.rs        |   7 +
 .../handlers/users/change_password_handler.rs      |  10 +-
 .../binary/handlers/users/create_user_handler.rs   |  10 +-
 .../binary/handlers/users/delete_user_handler.rs   |   8 +-
 .../binary/handlers/users/logout_user_handler.rs   |   7 +-
 .../handlers/users/update_permissions_handler.rs   |   8 +-
 .../binary/handlers/users/update_user_handler.rs   |   9 +-
 core/server/src/main.rs                            |   5 +-
 core/server/src/shard/mod.rs                       | 505 +++++++++++----------
 core/server/src/shard/system/consumer_groups.rs    |  67 ++-
 .../src/shard/system/personal_access_tokens.rs     |  49 +-
 core/server/src/shard/system/users.rs              | 137 +++++-
 core/server/src/shard/transmission/event.rs        |  16 +-
 .../server/src/streaming/topics/consumer_groups.rs |  23 +-
 16 files changed, 574 insertions(+), 309 deletions(-)

diff --git 
a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
 
b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
index e6e1f239..51d60b0c 100644
--- 
a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
@@ -21,6 +21,7 @@ use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::binary::{handlers::consumer_groups::COMPONENT, sender::SenderKind};
 use crate::shard::IggyShard;
+use crate::shard::transmission::event::ShardEvent;
 use crate::state::command::EntryCommand;
 use crate::state::models::CreateConsumerGroupWithId;
 use crate::streaming::session::Session;
@@ -40,12 +41,11 @@ impl ServerCommandHandler for CreateConsumerGroup {
     async fn handle(
         self,
         sender: &mut SenderKind,
-        length: u32,
+        _length: u32,
         session: &Rc<Session>,
         shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
-
         let consumer_group_id = shard
                 .create_consumer_group(
                     session,
@@ -60,6 +60,13 @@ impl ServerCommandHandler for CreateConsumerGroup {
                         self.stream_id, self.topic_id, self.group_id
                     )
                 })?;
+        let event = ShardEvent::CreatedConsumerGroup {
+            stream_id: self.stream_id.clone(),
+            topic_id: self.topic_id.clone(),
+            consumer_group_id: self.group_id,
+            name: self.name.clone(),
+        };
+        let _responses = 
shard.broadcast_event_to_all_shards(event.into()).await;
 
         let stream = shard.find_stream(session, &self.stream_id)
             .with_error_context(|error| {
diff --git 
a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
 
b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
index c28ac77b..bfeda164 100644
--- 
a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
+++ 
b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
@@ -20,6 +20,7 @@ use crate::binary::command::{BinaryServerCommand, 
ServerCommand, ServerCommandHa
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::binary::{handlers::personal_access_tokens::COMPONENT, 
sender::SenderKind};
+use crate::shard::transmission::event::ShardEvent;
 use crate::shard::IggyShard;
 use crate::state::command::EntryCommand;
 use crate::state::models::CreatePersonalAccessTokenWithHash;
@@ -47,7 +48,7 @@ impl ServerCommandHandler for CreatePersonalAccessToken {
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
 
-        let token = shard
+        let (personal_access_token, token) = shard
                 .create_personal_access_token(session, &self.name, self.expiry)
                 .with_error_context(|error| {
                     format!(
@@ -56,7 +57,11 @@ impl ServerCommandHandler for CreatePersonalAccessToken {
                     )
                 })?;
         let bytes = mapper::map_raw_pat(&token);
-        let token_hash = PersonalAccessToken::hash_token(&token);
+        let hash = personal_access_token.token.to_string();
+        let event = ShardEvent::CreatedPersonalAccessToken {
+            personal_access_token: personal_access_token.clone(),
+        };
+        let _responses = 
shard.broadcast_event_to_all_shards(event.into()).await;
 
         shard
             .state
@@ -67,7 +72,7 @@ impl ServerCommandHandler for CreatePersonalAccessToken {
                         name: self.name.to_owned(),
                         expiry: self.expiry,
                     },
-                    hash: token_hash,
+                    hash,
                 }),
             )
             .await
diff --git 
a/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
 
b/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
index bb8dd463..13a06e55 100644
--- 
a/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
+++ 
b/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
@@ -51,6 +51,13 @@ impl ServerCommandHandler for DeletePersonalAccessToken {
                     "{COMPONENT} (error: {error}) - failed to delete personal 
access token with name: {token_name}, session: {session}"
                 )})?;
 
+        // Broadcast the event to other shards
+        let event = 
crate::shard::transmission::event::ShardEvent::DeletedPersonalAccessToken {
+            user_id: session.get_user_id(),
+            name: self.name.clone(),
+        };
+        let _responses = 
shard.broadcast_event_to_all_shards(event.into()).await;
+
         shard
             .state
             .apply(
diff --git a/core/server/src/binary/handlers/users/change_password_handler.rs 
b/core/server/src/binary/handlers/users/change_password_handler.rs
index 32b376cd..1e89aeb0 100644
--- a/core/server/src/binary/handlers/users/change_password_handler.rs
+++ b/core/server/src/binary/handlers/users/change_password_handler.rs
@@ -20,6 +20,7 @@ use crate::binary::command::{BinaryServerCommand, 
ServerCommand, ServerCommandHa
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
 use crate::shard::IggyShard;
+use crate::shard::transmission::event::ShardEvent;
 use crate::state::command::EntryCommand;
 use crate::streaming::session::Session;
 use crate::streaming::utils::crypto;
@@ -39,7 +40,7 @@ impl ServerCommandHandler for ChangePassword {
     async fn handle(
         self,
         sender: &mut SenderKind,
-        length: u32,
+        _length: u32,
         session: &Rc<Session>,
         shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
@@ -59,6 +60,13 @@ impl ServerCommandHandler for ChangePassword {
                     )
                 })?;
 
+        let event = ShardEvent::ChangedPassword {
+            user_id: self.user_id.clone(),
+            current_password: self.current_password.clone(),
+            new_password: self.new_password.clone(),
+        };
+        let _responses = 
shard.broadcast_event_to_all_shards(event.into()).await;
+
         // For the security of the system, we hash the password before storing 
it in metadata.
         shard
             .state
diff --git a/core/server/src/binary/handlers/users/create_user_handler.rs 
b/core/server/src/binary/handlers/users/create_user_handler.rs
index e51b147b..77ce6249 100644
--- a/core/server/src/binary/handlers/users/create_user_handler.rs
+++ b/core/server/src/binary/handlers/users/create_user_handler.rs
@@ -17,6 +17,7 @@
  */
 
 use crate::shard::IggyShard;
+use crate::shard::transmission::event::ShardEvent;
 use std::rc::Rc;
 
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
@@ -31,6 +32,7 @@ use anyhow::Result;
 use error_set::ErrContext;
 use iggy_common::IggyError;
 use iggy_common::create_user::CreateUser;
+use tower_http::map_response_body;
 use tracing::{debug, instrument};
 
 impl ServerCommandHandler for CreateUser {
@@ -56,13 +58,19 @@ impl ServerCommandHandler for CreateUser {
                     self.status,
                     self.permissions.clone(),
                 )
-                .await
                 .with_error_context(|error| {
                     format!(
                         "{COMPONENT} (error: {error}) - failed to create user 
with name: {}, session: {session}",
                         self.username
                     )
                 })?;
+        let event = ShardEvent::CreatedUser {
+            username: self.username.to_owned(),
+            password: self.password.to_owned(),
+            status: self.status,
+            permissions: self.permissions.clone(),
+        };
+        let _responses = 
shard.broadcast_event_to_all_shards(event.into()).await;
         let user_id = user.id;
         let response = mapper::map_user(&user);
 
diff --git a/core/server/src/binary/handlers/users/delete_user_handler.rs 
b/core/server/src/binary/handlers/users/delete_user_handler.rs
index 10de56a4..cd2f082c 100644
--- a/core/server/src/binary/handlers/users/delete_user_handler.rs
+++ b/core/server/src/binary/handlers/users/delete_user_handler.rs
@@ -22,6 +22,7 @@ use crate::binary::command::{BinaryServerCommand, 
ServerCommand, ServerCommandHa
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
 use crate::shard::IggyShard;
+use crate::shard::transmission::event::ShardEvent;
 use crate::state::command::EntryCommand;
 use crate::streaming::session::Session;
 use anyhow::Result;
@@ -39,7 +40,7 @@ impl ServerCommandHandler for DeleteUser {
     async fn handle(
         self,
         sender: &mut SenderKind,
-        length: u32,
+        _length: u32,
         session: &Rc<Session>,
         shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
@@ -54,6 +55,11 @@ impl ServerCommandHandler for DeleteUser {
                     )
                 })?;
 
+        let event = ShardEvent::DeletedUser {
+            user_id: self.user_id.clone(),
+        };
+        let _responses = 
shard.broadcast_event_to_all_shards(event.into()).await;
+
         let user_id = self.user_id.clone();
         shard
             .state
diff --git a/core/server/src/binary/handlers/users/logout_user_handler.rs 
b/core/server/src/binary/handlers/users/logout_user_handler.rs
index d1ef95bf..cfa37cd5 100644
--- a/core/server/src/binary/handlers/users/logout_user_handler.rs
+++ b/core/server/src/binary/handlers/users/logout_user_handler.rs
@@ -22,6 +22,7 @@ use crate::binary::command::{BinaryServerCommand, 
ServerCommand, ServerCommandHa
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
 use crate::shard::IggyShard;
+use crate::shard::transmission::event::ShardEvent;
 use crate::streaming::session::Session;
 use anyhow::Result;
 use error_set::ErrContext;
@@ -38,7 +39,7 @@ impl ServerCommandHandler for LogoutUser {
     async fn handle(
         self,
         sender: &mut SenderKind,
-        length: u32,
+        _length: u32,
         session: &Rc<Session>,
         shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
@@ -46,6 +47,10 @@ impl ServerCommandHandler for LogoutUser {
         shard.logout_user(session).with_error_context(|error| {
             format!("{COMPONENT} (error: {error}) - failed to logout user, 
session: {session}")
         })?;
+        let event = ShardEvent::LogoutUser {
+            client_id: session.client_id,
+        };
+        let _responses = 
shard.broadcast_event_to_all_shards(event.into()).await;
         session.clear_user_id();
         sender.send_empty_ok_response().await?;
         Ok(())
diff --git 
a/core/server/src/binary/handlers/users/update_permissions_handler.rs 
b/core/server/src/binary/handlers/users/update_permissions_handler.rs
index dea49a1c..d28d3812 100644
--- a/core/server/src/binary/handlers/users/update_permissions_handler.rs
+++ b/core/server/src/binary/handlers/users/update_permissions_handler.rs
@@ -21,6 +21,7 @@ use std::rc::Rc;
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
+use crate::shard::transmission::event::ShardEvent;
 use crate::shard::IggyShard;
 use crate::state::command::EntryCommand;
 use crate::streaming::session::Session;
@@ -50,7 +51,12 @@ impl ServerCommandHandler for UpdatePermissions {
                 .with_error_context(|error| format!("{COMPONENT} (error: 
{error}) - failed to update permissions for user_id: {}, session: {session}",
                     self.user_id
                 ))?;
-
+        let event = ShardEvent::UpdatedPermissions {
+            user_id: self.user_id.clone(),
+            permissions: self.permissions.clone(),
+        };
+        let _responses = 
shard.broadcast_event_to_all_shards(event.into()).await;
+        
         shard
             .state
             .apply(
diff --git a/core/server/src/binary/handlers/users/update_user_handler.rs 
b/core/server/src/binary/handlers/users/update_user_handler.rs
index 982c4ae1..b90cb442 100644
--- a/core/server/src/binary/handlers/users/update_user_handler.rs
+++ b/core/server/src/binary/handlers/users/update_user_handler.rs
@@ -22,6 +22,7 @@ use crate::binary::command::{BinaryServerCommand, 
ServerCommand, ServerCommandHa
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
 use crate::shard::IggyShard;
+use crate::shard::transmission::event::ShardEvent;
 use crate::state::command::EntryCommand;
 use crate::streaming::session::Session;
 use anyhow::Result;
@@ -59,8 +60,14 @@ impl ServerCommandHandler for UpdateUser {
                     )
                 })?;
 
-        let user_id = self.user_id.clone();
+        let event = ShardEvent::UpdatedUser {
+            user_id: self.user_id.clone(),
+            username: self.username.clone(),
+            status: self.status,
+        };
+        let _responses = 
shard.broadcast_event_to_all_shards(event.into()).await;
 
+        let user_id = self.user_id.clone();
         shard
             .state
             .apply(session.get_user_id(), &EntryCommand::UpdateUser(self))
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index e01f03ec..2971347e 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -48,8 +48,7 @@ use server::state::StateKind;
 use server::state::command::EntryCommand;
 use server::state::file::FileState;
 use server::state::models::CreateUserWithId;
-use server::state::system::SystemState;
-use server::streaming::utils::{MemoryPool, crypto};
+use server::streaming::utils::MemoryPool;
 use server::versioning::SemanticVersion;
 use server::{IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV, map_toggle_str};
 use tokio::time::Instant;
@@ -183,7 +182,7 @@ fn main() -> Result<(), ServerError> {
 
                     // We can't use std::sync::Once because it doesn't support 
async.
                     // Trait bound on the closure is FnOnce.
-                    // Peak into the state to check if the root user exists.
+                    // Peek into the state to check if the root user exists.
                     // If it does not exist, create it.
                     barrier.with_async::<Result<(), IggyError>>(async 
|barrier_state| {
                         // A thread already initialized state
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index a8158bc1..ecbe2be1 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -64,7 +64,6 @@ use crate::{
     streaming::{
         clients::client_manager::ClientManager,
         diagnostics::metrics::Metrics,
-        partitions::partition,
         personal_access_tokens::personal_access_token::PersonalAccessToken,
         session::Session,
         storage::SystemStorage,
@@ -512,240 +511,271 @@ impl IggyShard {
     async fn handle_event(&self, event: Arc<ShardEvent>) -> Result<(), 
IggyError> {
         match &*event {
             ShardEvent::CreatedStream { stream_id, name } => {
-                self.create_stream_bypass_auth(*stream_id, name)
-            }
+                        self.create_stream_bypass_auth(*stream_id, name)
+                    }
             ShardEvent::CreatedTopic {
-                stream_id,
-                topic_id,
-                name,
-                partitions_count,
-                message_expiry,
-                compression_algorithm,
-                max_topic_size,
-                replication_factor,
-            } => {
-                let topic_id = topic_id.get_u32_value().ok();
-                self.create_topic_bypass_auth(
-                    stream_id,
-                    topic_id,
-                    name,
-                    *partitions_count,
-                    *message_expiry,
-                    *compression_algorithm,
-                    *max_topic_size,
-                    *replication_factor,
-                )
-                .await
-            }
+                        stream_id,
+                        topic_id,
+                        name,
+                        partitions_count,
+                        message_expiry,
+                        compression_algorithm,
+                        max_topic_size,
+                        replication_factor,
+                    } => {
+                        let topic_id = topic_id.get_u32_value().ok();
+                        self.create_topic_bypass_auth(
+                            stream_id,
+                            topic_id,
+                            name,
+                            *partitions_count,
+                            *message_expiry,
+                            *compression_algorithm,
+                            *max_topic_size,
+                            *replication_factor,
+                        )
+                        .await
+                    }
             ShardEvent::LoginUser {
-                client_id,
-                username,
-                password,
-            } => self.login_user_event(*client_id, username, password),
+                        client_id,
+                        username,
+                        password,
+                    } => self.login_user_event(*client_id, username, password),
             ShardEvent::NewSession { address, transport } => {
-                let session = self.add_client(address, *transport);
-                self.add_active_session(session);
-                Ok(())
-            }
+                        let session = self.add_client(address, *transport);
+                        self.add_active_session(session);
+                        Ok(())
+                    }
             ShardEvent::CreatedShardTableRecords {
-                stream_id,
-                topic_id,
-                partition_ids,
-            } => {
-                let records = self
-                    .create_shard_table_records(&partition_ids, *stream_id, 
*topic_id)
-                    .collect::<Vec<_>>();
-                let stream = 
self.get_stream(&Identifier::numeric(*stream_id)?)?;
-                let topic = 
stream.get_topic(&Identifier::numeric(*topic_id)?)?;
-                // Open partition and segments for that particular shard.
-                for (ns, shard_info) in records.iter() {
-                    if shard_info.id() == self.id {
-                        let partition_id = ns.partition_id;
-                        let partition = topic.get_partition(partition_id)?;
-                        let mut partition = partition.write().await;
-                        partition.open().await.with_error_context(|error| {
+                        stream_id,
+                        topic_id,
+                        partition_ids,
+                    } => {
+                        let records = self
+                            .create_shard_table_records(&partition_ids, 
*stream_id, *topic_id)
+                            .collect::<Vec<_>>();
+                        let stream = 
self.get_stream(&Identifier::numeric(*stream_id)?)?;
+                        let topic = 
stream.get_topic(&Identifier::numeric(*topic_id)?)?;
+                        // Open partition and segments for that particular 
shard.
+                        for (ns, shard_info) in records.iter() {
+                            if shard_info.id() == self.id {
+                                let partition_id = ns.partition_id;
+                                let partition = 
topic.get_partition(partition_id)?;
+                                let mut partition = partition.write().await;
+                                
partition.open().await.with_error_context(|error| {
+                                                    format!(
+                                                        "{COMPONENT} (error: 
{error}) - failed to open partition with ID: {partition_id} in topic with ID: 
{topic_id} for stream with ID: {stream_id}"
+                                                    )
+                                                })?;
+                            }
+                        }
+                        self.insert_shard_table_records(records);
+                        Ok(())
+                    }
+            ShardEvent::CreatedPartitions {
+                        stream_id,
+                        topic_id,
+                        partitions_count,
+                    } => {
+                        let mut stream = self.get_stream_mut(stream_id)?;
+                        let topic = stream.get_topic_mut(topic_id)?;
+                        topic.add_persisted_partitions(*partitions_count)
+                                            .with_error_context(|error| {
+                                                format!(
+                                                    "{COMPONENT} (error: 
{error}) - failed to create partitions for topic with ID: {topic_id} in stream 
with ID: {stream_id}"
+                                                )
+                                            })?;
+                        topic.reassign_consumer_groups();
+                        self.metrics.increment_partitions(*partitions_count);
+                        self.metrics.increment_segments(*partitions_count);
+                        Ok(())
+                    }
+            ShardEvent::DeletedPartitions {
+                        stream_id,
+                        topic_id,
+                        partition_ids,
+                    } => {
+                        let mut stream = self.get_stream_mut(stream_id)?;
+                        let topic = stream.get_topic_mut(topic_id)?;
+                        let partitions = topic
+                                    .       
delete_persisted_partitions_by_ids(partition_ids)
+                                            .with_error_context(|error| {
+                                                format!("{COMPONENT} (error: 
{error}) - failed to delete persisted partitions for topic: {topic}")
+                                            })?;
+                        drop(stream);
+
+                        let mut segments_count = 0;
+                        let mut messages_count = 0;
+                        let partitions_count = partitions.len();
+                        for partition in &partitions {
+                            let partition = partition.read().await;
+                            let partition_messages_count = 
partition.get_messages_count();
+                            segments_count += partition.get_segments_count();
+                            messages_count += partition_messages_count;
+                        }
+
+                        let mut stream = 
self.get_stream_mut(stream_id).with_error_context(|error| {
+                            format!(
+                                "{COMPONENT} (error: {error}) - failed to get 
stream with ID: {stream_id}"
+                            )
+                        })?;
+                        let topic = 
stream.get_topic_mut(topic_id).with_error_context(|error| {
                             format!(
-                                "{COMPONENT} (error: {error}) - failed to open 
partition with ID: {partition_id} in topic with ID: {topic_id} for stream with 
ID: {stream_id}"
+                                "{COMPONENT} (error: {error}) - failed to get 
topic with ID: {topic_id}"
                             )
                         })?;
+                        topic.reassign_consumer_groups();
+                        if partitions.len() > 0 {
+                            self.metrics.decrement_partitions(partitions_count 
as u32);
+                            self.metrics.decrement_segments(segments_count);
+                            self.metrics.decrement_messages(messages_count);
+                        }
+                        Ok(())
                     }
-                }
-                self.insert_shard_table_records(records);
-                Ok(())
-            }
-            ShardEvent::CreatedPartitions {
-                stream_id,
-                topic_id,
-                partitions_count,
-            } => {
-                let mut stream = self.get_stream_mut(stream_id)?;
-                let topic = stream.get_topic_mut(topic_id)?;
-                topic.add_persisted_partitions(*partitions_count)
-                    .with_error_context(|error| {
-                        format!(
-                            "{COMPONENT} (error: {error}) - failed to create 
partitions for topic with ID: {topic_id} in stream with ID: {stream_id}"
-                        )
-                    })?;
-                topic.reassign_consumer_groups();
-                self.metrics.increment_partitions(*partitions_count);
-                self.metrics.increment_segments(*partitions_count);
-                Ok(())
-            }
-            ShardEvent::DeletedPartitions {
-                stream_id,
-                topic_id,
-                partition_ids,
-            } => {
-                let mut stream = self.get_stream_mut(stream_id)?;
-                let topic = stream.get_topic_mut(topic_id)?;
-                let partitions = topic
-            .       delete_persisted_partitions_by_ids(partition_ids)
-                    .with_error_context(|error| {
-                        format!("{COMPONENT} (error: {error}) - failed to 
delete persisted partitions for topic: {topic}")
-                    })?;
-                drop(stream);
-
-                let mut segments_count = 0;
-                let mut messages_count = 0;
-                let partitions_count = partitions.len();
-                for partition in &partitions {
-                    let partition = partition.read().await;
-                    let partition_messages_count = 
partition.get_messages_count();
-                    segments_count += partition.get_segments_count();
-                    messages_count += partition_messages_count;
-                }
-
-                let mut stream = 
self.get_stream_mut(stream_id).with_error_context(|error| {
-                    format!(
-                        "{COMPONENT} (error: {error}) - failed to get stream 
with ID: {stream_id}"
-                    )
-                })?;
-                let topic = 
stream.get_topic_mut(topic_id).with_error_context(|error| {
-                    format!(
-                        "{COMPONENT} (error: {error}) - failed to get topic 
with ID: {topic_id}"
-                    )
-                })?;
-                topic.reassign_consumer_groups();
-                if partitions.len() > 0 {
-                    self.metrics.decrement_partitions(partitions_count as u32);
-                    self.metrics.decrement_segments(segments_count);
-                    self.metrics.decrement_messages(messages_count);
-                }
-                Ok(())
-            }
             ShardEvent::DeletedShardTableRecords { namespaces } => {
-                let (stream_id, topic_id) = namespaces
-                    .first()
-                    .map(|ns| (ns.stream_id, ns.topic_id))
-                    .unwrap();
-                let stream = 
self.get_stream(&Identifier::numeric(stream_id).unwrap())?;
-                let topic = 
stream.get_topic(&Identifier::numeric(topic_id).unwrap())?;
-                let records = self.remove_shard_table_records(&namespaces);
-                for (ns, shard_info) in records.iter() {
-                    if shard_info.id() == self.id {
-                        let partition = topic.get_partition(ns.partition_id)?;
-                        let mut partition = partition.write().await;
-                        partition.delete().await.with_error_context(|error| {
-                        format!(
-                            "{COMPONENT} (error: {error}) - failed to delete 
partition with ID: {} in topic with ID: {}",
-                            ns.partition_id,
-                            topic_id
-                    )
-                    })?;
+                        let (stream_id, topic_id) = namespaces
+                            .first()
+                            .map(|ns| (ns.stream_id, ns.topic_id))
+                            .unwrap();
+                        let stream = 
self.get_stream(&Identifier::numeric(stream_id).unwrap())?;
+                        let topic = 
stream.get_topic(&Identifier::numeric(topic_id).unwrap())?;
+                        let records = 
self.remove_shard_table_records(&namespaces);
+                        for (ns, shard_info) in records.iter() {
+                            if shard_info.id() == self.id {
+                                let partition = 
topic.get_partition(ns.partition_id)?;
+                                let mut partition = partition.write().await;
+                                
partition.delete().await.with_error_context(|error| {
+                                                format!(
+                                                    "{COMPONENT} (error: 
{error}) - failed to delete partition with ID: {} in topic with ID: {}",
+                                                    ns.partition_id,
+                                                    topic_id
+                                            )
+                                            })?;
+                            }
+                        }
+                        Ok(())
                     }
-                }
-                Ok(())
-            }
             ShardEvent::DeletedStream { stream_id } => {
-                let shard_id = self.id;
-                
self.delete_stream_bypass_auth(stream_id).with_error_context(|err| {
-                    format!(
-                        "{COMPONENT} (error: {err}) - failed to delete, when 
handling event on shard: {shard_id} stream with ID: {stream_id}",
-                    )
-                })?;
-                Ok(())
-            }
+                        let shard_id = self.id;
+                        
self.delete_stream_bypass_auth(stream_id).with_error_context(|err| {
+                                            format!(
+                                                "{COMPONENT} (error: {err}) - 
failed to delete, when handling event on shard: {shard_id} stream with ID: 
{stream_id}",
+                                            )
+                                        })?;
+                        Ok(())
+                    }
             ShardEvent::UpdatedStream { stream_id, name } => {
-                self.update_stream_bypass_auth(stream_id, name)?;
-                Ok(())
-            }
+                        self.update_stream_bypass_auth(stream_id, name)?;
+                        Ok(())
+                    }
             ShardEvent::UpdatedTopic {
-                stream_id,
-                topic_id,
-                name,
-                message_expiry,
-                compression_algorithm,
-                max_topic_size,
-                replication_factor,
-            } => {
-                self.update_topic_bypass_auth(
-                    stream_id,
-                    topic_id,
-                    name,
-                    *message_expiry,
-                    *compression_algorithm,
-                    *max_topic_size,
-                    *replication_factor,
-                )
-                .await?;
-                Ok(())
-            }
+                        stream_id,
+                        topic_id,
+                        name,
+                        message_expiry,
+                        compression_algorithm,
+                        max_topic_size,
+                        replication_factor,
+                    } => {
+                        self.update_topic_bypass_auth(
+                            stream_id,
+                            topic_id,
+                            name,
+                            *message_expiry,
+                            *compression_algorithm,
+                            *max_topic_size,
+                            *replication_factor,
+                        )
+                        .await?;
+                        Ok(())
+                    }
             ShardEvent::PurgedStream { stream_id: _ } => todo!(),
-            ShardEvent::CreatedConsumerGroup {
-                stream_id: _,
-                topic_id: _,
-                consumer_group_id: _,
-                name: _,
-            } => todo!(),
-            ShardEvent::DeletedConsumerGroup {
-                stream_id: _,
-                topic_id: _,
-                consumer_group_id: _,
-            } => todo!(),
             ShardEvent::PurgedTopic {
-                stream_id: _,
-                topic_id: _,
-            } => todo!(),
+                        stream_id: _,
+                        topic_id: _,
+                    } => todo!(),
             ShardEvent::DeletedTopic {
-                stream_id,
-                topic_id,
-            } => {
-                self.delete_topic_bypass_auth(stream_id, topic_id)
-                    .await
-                    .with_error_context(|err| {
-                        format!(
-                            "{COMPONENT} (error: {err}) - failed to delete 
topic with ID: {topic_id} in stream with ID: {stream_id}"
-                        )
-                    })?;
-                Ok(())
-            }
+                        stream_id,
+                        topic_id,
+                    } => {
+                        self.delete_topic_bypass_auth(stream_id, topic_id)
+                                            .await
+                                            .with_error_context(|err| {
+                                                format!(
+                                                    "{COMPONENT} (error: 
{err}) - failed to delete topic with ID: {topic_id} in stream with ID: 
{stream_id}"
+                                                )
+                                            })?;
+                        Ok(())
+                    }
+            ShardEvent::CreatedConsumerGroup {
+                        stream_id,
+                        topic_id,
+                        consumer_group_id,
+                        name,
+                    } => self.create_consumer_group_bypass_auth(
+                        stream_id,
+                        topic_id,
+                        *consumer_group_id,
+                        name,
+                    ),
+            ShardEvent::DeletedConsumerGroup {
+                        stream_id,
+                        topic_id,
+                        consumer_group_id,
+                    } => {
+                        self.delete_consumer_group_bypass_auth(stream_id, 
topic_id, consumer_group_id)?;
+                        Ok(())
+                    }
             ShardEvent::CreatedUser {
-                username: _,
-                password: _,
-                status: _,
-                permissions: _,
-            } => todo!(),
-            ShardEvent::DeletedUser { user_id: _ } => todo!(),
-            ShardEvent::LogoutUser { client_id: _ } => todo!(),
-            ShardEvent::UpdatedUser {
-                user_id: _,
-                username: _,
-                status: _,
-            } => todo!(),
+                        username,
+                        password,
+                        status,
+                        permissions,
+                    } => {
+                        self.create_user_bypass_auth(username, password, 
*status, permissions.clone())?;
+                        Ok(())
+                    }
+            ShardEvent::DeletedUser { user_id } => {
+                        self.delete_user_bypass_auth(user_id)?;
+                        Ok(())
+                    }
+            ShardEvent::LogoutUser { client_id } => {
+                        let sessions = self.active_sessions.borrow();
+                        let session = sessions.iter().find(|s| s.client_id == 
*client_id).unwrap();
+                        self.logout_user(session)?;
+                        self.remove_active_session(session.get_user_id());
+
+                        Ok(())
+                    }
             ShardEvent::ChangedPassword {
-                user_id: _,
-                current_password: _,
-                new_password: _,
-            } => todo!(),
-            ShardEvent::CreatedPersonalAccessToken { name: _, expiry: _ } => 
todo!(),
-            ShardEvent::DeletedPersonalAccessToken { name: _ } => todo!(),
+                        user_id,
+                        current_password,
+                        new_password,
+                    } => {
+                        self.change_password_bypass_auth(user_id, 
current_password, new_password)?;
+                        Ok(())
+                    }
+            ShardEvent::CreatedPersonalAccessToken {personal_access_token} => {
+                
self.create_personal_access_token_bypass_auth(personal_access_token.to_owned())?;
+                Ok(())
+            },
+            ShardEvent::DeletedPersonalAccessToken { user_id, name } => {
+                self.delete_personal_access_token_bypass_auth(*user_id, name)?;
+                Ok(())
+            },
             ShardEvent::LoginWithPersonalAccessToken { token: _ } => todo!(),
-            ShardEvent::StoredConsumerOffset {
-                stream_id: _,
-                topic_id: _,
-                consumer: _,
-                offset: _,
-            } => todo!(),
+            ShardEvent::UpdatedUser {
+                        user_id,
+                        username,
+                        status,
+                    } => {
+                        self.update_user_bypass_auth(user_id, 
username.to_owned(), *status)?;
+                        Ok(())
+                    },
+            ShardEvent::UpdatedPermissions { user_id, permissions } => {
+                self.update_permissions_bypass_auth(user_id, 
permissions.to_owned())?;
+                Ok(())
+            }
         }
     }
 
@@ -779,9 +809,13 @@ impl IggyShard {
         }
     }
 
-    pub async fn broadcast_event_to_all_shards(&self, event: Arc<ShardEvent>) 
-> Vec<ShardResponse> {
+    pub async fn broadcast_event_to_all_shards(
+        &self,
+        event: Arc<ShardEvent>,
+    ) -> Vec<ShardResponse> {
         let mut responses = 
Vec::with_capacity(self.get_available_shards_count() as usize);
-        for maybe_receiver in self.shards
+        for maybe_receiver in self
+            .shards
             .iter()
             .filter_map(|shard| {
                 if shard.id != self.id {
@@ -801,18 +835,19 @@ impl IggyShard {
                     conn.send(ShardFrame::new(event.clone().into(), None));
                     None
                 }
-            }) {
-                match maybe_receiver {
-                    Some(receiver) => {
-                        let response = receiver.recv().await.unwrap();
-                        responses.push(response);
-                    }
-                    None => {
-                        responses.push(ShardResponse::Event);
-                    }
+            })
+        {
+            match maybe_receiver {
+                Some(receiver) => {
+                    let response = receiver.recv().await.unwrap();
+                    responses.push(response);
+                }
+                None => {
+                    responses.push(ShardResponse::Event);
                 }
             }
-            responses
+        }
+        responses
     }
 
     fn find_shard(&self, namespace: &IggyNamespace) -> Option<&Shard> {
@@ -866,6 +901,20 @@ impl IggyShard {
         self.active_sessions.borrow_mut().push(session);
     }
 
+    pub fn remove_active_session(&self, user_id: u32) {
+        let mut active_sessions = self.active_sessions.borrow_mut();
+        let pos = active_sessions
+            .iter()
+            .position(|s| s.get_user_id() == user_id);
+        if let Some(pos) = pos {
+            active_sessions.remove(pos);
+        } else {
+            error!(
+                "{COMPONENT} - failed to remove active session for user ID: 
{user_id}, session not found."
+            );
+        }
+    }
+
     pub fn ensure_authenticated(&self, session: &Session) -> Result<u32, 
IggyError> {
         let active_sessions = self.active_sessions.borrow();
         let user_id = active_sessions
diff --git a/core/server/src/shard/system/consumer_groups.rs 
b/core/server/src/shard/system/consumer_groups.rs
index 7b98205f..96109b85 100644
--- a/core/server/src/shard/system/consumer_groups.rs
+++ b/core/server/src/shard/system/consumer_groups.rs
@@ -26,6 +26,7 @@ use crate::streaming::topics::consumer_group::ConsumerGroup;
 use error_set::ErrContext;
 use iggy_common::Identifier;
 use iggy_common::IggyError;
+use iggy_common::locking::IggySharedMutFn;
 
 impl IggyShard {
     pub fn get_consumer_group<'cg, 'stream>(
@@ -85,6 +86,17 @@ impl IggyShard {
         Ok(topic.get_consumer_groups())
     }
 
+    pub fn create_consumer_group_bypass_auth(
+        &self,
+        stream_id: &Identifier,
+        topic_id: &Identifier,
+        group_id: Option<u32>,
+        name: &str,
+    ) -> Result<(), IggyError> {
+        self.create_consumer_group_base(stream_id, topic_id, group_id, name)?;
+        Ok(())
+    }
+
     pub fn create_consumer_group(
         &self,
         session: &Session,
@@ -109,7 +121,16 @@ impl IggyShard {
                 topic.topic_id,
             ).with_error_context(|error| format!("{COMPONENT} (error: {error}) 
- permission denied to create consumer group for user {} on stream ID: {}, 
topic ID: {}", session.get_user_id(), topic.stream_id, topic.topic_id))?;
         }
+        self.create_consumer_group_base(stream_id, topic_id, group_id, name)
+    }
 
+    fn create_consumer_group_base(
+        &self,
+        stream_id: &Identifier,
+        topic_id: &Identifier,
+        group_id: Option<u32>,
+        name: &str,
+    ) -> Result<Identifier, IggyError> {
         let mut stream = self.get_stream_mut(stream_id)
             .with_error_context(|error| format!("{COMPONENT} (error: {error}) 
- failed to get mutable reference to stream with ID: {stream_id}"))?;
         let topic = stream.get_topic_mut(topic_id)
@@ -122,6 +143,15 @@ impl IggyShard {
             })
     }
 
+    pub fn delete_consumer_group_bypass_auth(
+        &self,
+        stream_id: &Identifier,
+        topic_id: &Identifier,
+        consumer_group_id: &Identifier,
+    ) -> Result<ConsumerGroup, IggyError> {
+        self.delete_consumer_group_base(stream_id, topic_id, consumer_group_id)
+    }
+
     pub async fn delete_consumer_group(
         &self,
         session: &Session,
@@ -130,8 +160,6 @@ impl IggyShard {
         consumer_group_id: &Identifier,
     ) -> Result<(), IggyError> {
         self.ensure_authenticated(session)?;
-        let stream_id_value;
-        let topic_id_value;
         {
             let stream = self.get_stream(stream_id).with_error_context(|error| 
{
                 format!(
@@ -146,11 +174,32 @@ impl IggyShard {
                 topic.stream_id,
                 topic.topic_id,
             ).with_error_context(|error| format!("{COMPONENT} (error: {error}) 
- permission denied to delete consumer group for user {} on stream ID: {}, 
topic ID: {}", session.get_user_id(), topic.stream_id, topic.topic_id))?;
-
-            stream_id_value = topic.stream_id;
-            topic_id_value = topic.topic_id;
         }
+        let cg = self.delete_consumer_group_base(stream_id, topic_id, 
consumer_group_id)?;
+        let stream = self.get_stream(stream_id)?;
+        let topic = stream.get_topic(topic_id)?;
+
+        for (_, partition) in topic.partitions.iter() {
+            let partition = partition.read().await;
+            if let Some((_, offset)) = 
partition.consumer_group_offsets.remove(&cg.group_id) {
+                self.storage
+                    .partition
+                    .delete_consumer_offset(&offset.path)
+                    .await?;
+            }
+        }
+
+        Ok(())
+    }
 
+    fn delete_consumer_group_base(
+        &self,
+        stream_id: &Identifier,
+        topic_id: &Identifier,
+        consumer_group_id: &Identifier,
+    ) -> Result<ConsumerGroup, IggyError> {
+        let stream_id_value;
+        let topic_id_value;
         let consumer_group;
         {
             let mut stream = 
self.get_stream_mut(stream_id).with_error_context(|error| {
@@ -161,8 +210,9 @@ impl IggyShard {
             let topic = 
stream.get_topic_mut(topic_id).with_error_context(|error| format!("{COMPONENT} 
(error: {error}) - topic not found for stream ID: {stream_id}, topic_id: 
{topic_id}"))?;
 
             consumer_group = topic.delete_consumer_group(consumer_group_id)
-                .await
-                .with_error_context(|error| format!("{COMPONENT} (error: 
{error}) - failed to delete consumer group with ID: {consumer_group_id}"))?
+                .with_error_context(|error| format!("{COMPONENT} (error: 
{error}) - failed to delete consumer group with ID: {consumer_group_id}"))?;
+            stream_id_value = topic.stream_id;
+            topic_id_value = topic.topic_id;
         }
 
         for member in consumer_group.get_members() {
@@ -174,8 +224,7 @@ impl IggyShard {
                 )
                 .with_error_context(|error| format!("{COMPONENT} (error: 
{error}) - failed to make client leave consumer group for client ID: {}, group 
ID: {}", member.id, consumer_group.group_id))?;
         }
-
-        Ok(())
+        Ok(consumer_group)
     }
 
     pub fn join_consumer_group(
diff --git a/core/server/src/shard/system/personal_access_tokens.rs 
b/core/server/src/shard/system/personal_access_tokens.rs
index 8cc89cdc..8448cc9d 100644
--- a/core/server/src/shard/system/personal_access_tokens.rs
+++ b/core/server/src/shard/system/personal_access_tokens.rs
@@ -58,7 +58,7 @@ impl IggyShard {
         session: &Session,
         name: &str,
         expiry: IggyExpiry,
-    ) -> Result<String, IggyError> {
+    ) -> Result<(PersonalAccessToken, String), IggyError> {
         self.ensure_authenticated(session)?;
         let user_id = session.get_user_id();
         let identifier = user_id.try_into()?;
@@ -78,6 +78,27 @@ impl IggyShard {
             }
         }
 
+        let (personal_access_token, token) =
+            PersonalAccessToken::new(user_id, name, IggyTimestamp::now(), 
expiry);
+        self.create_personal_access_token_base(personal_access_token.clone())?;
+        Ok((personal_access_token, token))
+    }
+
+    pub fn create_personal_access_token_bypass_auth(
+        &self,
+        personal_access_token: PersonalAccessToken,
+    ) -> Result<(), IggyError> {
+        self.create_personal_access_token_base(personal_access_token)
+    }
+
+    fn create_personal_access_token_base(
+        &self,
+        personal_access_token: PersonalAccessToken,
+    ) -> Result<(), IggyError> {
+        let user_id = personal_access_token.user_id;
+        let name = personal_access_token.name.clone();
+        let token_hash = personal_access_token.token.clone();
+        let identifier = user_id.try_into()?;
         let user = self.get_user_mut(&identifier).with_error_context(|error| {
             format!("{COMPONENT} (error: {error}) - failed to get mutable 
reference to the user with id: {user_id}")
         })?;
@@ -85,22 +106,20 @@ impl IggyShard {
         if user
             .personal_access_tokens
             .iter()
-            .any(|pat| pat.name.as_str() == name)
+            .any(|pat| pat.name.as_str() == name.as_str())
         {
             error!("Personal access token: {name} for user with ID: {user_id} 
already exists.");
             return Err(IggyError::PersonalAccessTokenAlreadyExists(
-                name.to_owned(),
+                name.to_string(),
                 user_id,
             ));
         }
 
         info!("Creating personal access token: {name} for user with ID: 
{user_id}...");
-        let (personal_access_token, token) =
-            PersonalAccessToken::new(user_id, name, IggyTimestamp::now(), 
expiry);
         user.personal_access_tokens
-            .insert(personal_access_token.token.clone(), 
personal_access_token);
+            .insert(token_hash, personal_access_token);
         info!("Created personal access token: {name} for user with ID: 
{user_id}.");
-        Ok(token)
+        Ok(())
     }
 
     pub fn delete_personal_access_token(
@@ -110,6 +129,22 @@ impl IggyShard {
     ) -> Result<(), IggyError> {
         self.ensure_authenticated(session)?;
         let user_id = session.get_user_id();
+        self.delete_personal_access_token_base(user_id, name)
+    }
+
+    pub fn delete_personal_access_token_bypass_auth(
+        &self,
+        user_id: u32,
+        name: &str,
+    ) -> Result<(), IggyError> {
+        self.delete_personal_access_token_base(user_id, name)
+    }
+
+    fn delete_personal_access_token_base(
+        &self,
+        user_id: u32,
+        name: &str,
+    ) -> Result<(), IggyError> {
         let user = self
             .get_user_mut(&user_id.try_into()?)
             .with_error_context(|error| {
diff --git a/core/server/src/shard/system/users.rs 
b/core/server/src/shard/system/users.rs
index f0e87fc0..0f92a912 100644
--- a/core/server/src/shard/system/users.rs
+++ b/core/server/src/shard/system/users.rs
@@ -143,7 +143,7 @@ impl IggyShard {
         Ok(self.users.borrow().values().cloned().collect())
     }
 
-    pub async fn create_user(
+    pub fn create_user(
         &self,
         session: &Session,
         username: &str,
@@ -162,6 +162,31 @@ impl IggyShard {
                 )
             })?;
 
+        let user_id = self.create_user_base(username, password, status, 
permissions)?;
+        self.get_user(&user_id.try_into()?)
+            .with_error_context(|error| {
+                format!("{COMPONENT} (error: {error}) - failed to get user 
with id: {user_id}")
+            })
+    }
+
+    pub fn create_user_bypass_auth(
+        &self,
+        username: &str,
+        password: &str,
+        status: UserStatus,
+        permissions: Option<Permissions>,
+    ) -> Result<u32, IggyError> {
+        let user_id = self.create_user_base(username, password, status, 
permissions)?;
+        Ok(user_id)
+    }
+
+    fn create_user_base(
+        &self,
+        username: &str,
+        password: &str,
+        status: UserStatus,
+        permissions: Option<Permissions>,
+    ) -> Result<u32, IggyError> {
         if self
             .users
             .borrow()
@@ -186,26 +211,32 @@ impl IggyShard {
         self.users.borrow_mut().insert(user.id, user);
         info!("Created user: {username} with ID: {user_id}.");
         self.metrics.increment_users(1);
-        self.get_user(&user_id.try_into()?)
-            .with_error_context(|error| {
-                format!("{COMPONENT} (error: {error}) - failed to get user 
with id: {user_id}")
-            })
+        Ok(user_id)
     }
 
     pub fn delete_user(&self, session: &Session, user_id: &Identifier) -> 
Result<User, IggyError> {
         self.ensure_authenticated(session)?;
+        self.permissioner
+            .borrow()
+            .delete_user(session.get_user_id())
+            .with_error_context(|error| {
+                format!(
+                    "{COMPONENT} (error: {error}) - permission denied to 
delete user for user with id: {}",
+                    session.get_user_id()
+                )
+            })?;
+
+        self.delete_user_base(user_id)
+    }
+
+    pub fn delete_user_bypass_auth(&self, user_id: &Identifier) -> 
Result<User, IggyError> {
+        self.delete_user_base(user_id)
+    }
+
+    fn delete_user_base(&self, user_id: &Identifier) -> Result<User, 
IggyError> {
         let existing_user_id;
         let existing_username;
         {
-            self.permissioner
-                .borrow()
-                .delete_user(session.get_user_id())
-                .with_error_context(|error| {
-                    format!(
-                        "{COMPONENT} (error: {error}) - permission denied to 
delete user for user with id: {}",
-                        session.get_user_id()
-                    )
-                })?;
             let user = self.get_user(user_id).with_error_context(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to get user 
with id: {user_id}")
             })?;
@@ -257,6 +288,24 @@ impl IggyShard {
                 )
             })?;
 
+        self.update_user_base(user_id, username, status)
+    }
+
+    pub fn update_user_bypass_auth(
+        &self,
+        user_id: &Identifier,
+        username: Option<String>,
+        status: Option<UserStatus>,
+    ) -> Result<User, IggyError> {
+        self.update_user_base(user_id, username, status)
+    }
+
+    fn update_user_base(
+        &self,
+        user_id: &Identifier,
+        username: Option<String>,
+        status: Option<UserStatus>,
+    ) -> Result<User, IggyError> {
         if let Some(username) = username.to_owned() {
             let user = self.get_user(user_id)?;
             let existing_user = 
self.get_user(&username.to_owned().try_into()?);
@@ -310,7 +359,29 @@ impl IggyShard {
                 error!("Cannot change the root user permissions.");
                 return Err(IggyError::CannotChangePermissions(user.id));
             }
+        }
 
+        self.update_permissions_base(user_id, permissions)
+    }
+
+    pub fn update_permissions_bypass_auth(
+        &self,
+        user_id: &Identifier,
+        permissions: Option<Permissions>,
+    ) -> Result<(), IggyError> {
+        self.update_permissions_base(user_id, permissions)
+    }
+
+    fn update_permissions_base(
+        &self,
+        user_id: &Identifier,
+        permissions: Option<Permissions>,
+    ) -> Result<(), IggyError> {
+        {
+            let user: User = self.get_user(user_id).with_error_context(|error| 
{
+                format!("{COMPONENT} (error: {error}) - failed to get user 
with id: {user_id}")
+            })?;
+            
             self.permissioner
                 .borrow_mut()
                 .update_permissions_for_user(user.id, permissions.clone());
@@ -353,6 +424,24 @@ impl IggyShard {
             }
         }
 
+        self.change_password_base(user_id, current_password, new_password)
+    }
+
+    pub fn change_password_bypass_auth(
+        &self,
+        user_id: &Identifier,
+        current_password: &str,
+        new_password: &str,
+    ) -> Result<(), IggyError> {
+        self.change_password_base(user_id, current_password, new_password)
+    }
+
+    fn change_password_base(
+        &self,
+        user_id: &Identifier,
+        current_password: &str,
+        new_password: &str,
+    ) -> Result<(), IggyError> {
         let mut user = self.get_user_mut(user_id).with_error_context(|error| {
             format!("{COMPONENT} (error: {error}) - failed to get mutable 
reference to the user with id: {user_id}")
         })?;
@@ -456,25 +545,29 @@ impl IggyShard {
 
     pub fn logout_user(&self, session: &Session) -> Result<(), IggyError> {
         self.ensure_authenticated(session)?;
+        let client_id = session.client_id;
+        let user_id = session.get_user_id();
+        self.logout_user_base(user_id, client_id)?;
+        Ok(())
+    }
+
+    fn logout_user_base(&self, user_id: u32, client_id: u32) -> Result<(), 
IggyError> {
         let user = self
-            .get_user(&Identifier::numeric(session.get_user_id())?)
+            .get_user(&Identifier::numeric(user_id)?)
             .with_error_context(|error| {
                 format!(
                     "{COMPONENT} (error: {error}) - failed to get user with 
id: {}",
-                    session.get_user_id()
+                    user_id,
                 )
             })?;
         info!(
             "Logging out user: {} with ID: {}...",
             user.username, user.id
         );
-        if session.client_id > 0 {
+        if client_id > 0 {
             let mut client_manager = self.client_manager.borrow_mut();
-            client_manager.clear_user_id(session.client_id)?;
-            info!(
-                "Cleared user ID: {} for client: {}.",
-                user.id, session.client_id
-            );
+            client_manager.clear_user_id(client_id)?;
+            info!("Cleared user ID: {} for client: {}.", user.id, client_id);
         }
         info!("Logged out user: {} with ID: {}.", user.username, user.id);
         Ok(())
diff --git a/core/server/src/shard/transmission/event.rs 
b/core/server/src/shard/transmission/event.rs
index 02daba5e..3b62d3d2 100644
--- a/core/server/src/shard/transmission/event.rs
+++ b/core/server/src/shard/transmission/event.rs
@@ -6,7 +6,7 @@ use iggy_common::{
 
 use crate::{
     shard::namespace::IggyNamespace,
-    streaming::{clients::client_manager::Transport, 
polling_consumer::PollingConsumer},
+    streaming::{clients::client_manager::Transport, 
polling_consumer::PollingConsumer, 
personal_access_tokens::personal_access_token::PersonalAccessToken},
 };
 
 #[derive(Debug)]
@@ -87,6 +87,10 @@ pub enum ShardEvent {
         status: UserStatus,
         permissions: Option<Permissions>,
     },
+    UpdatedPermissions {
+        user_id: Identifier,
+        permissions: Option<Permissions>,
+    },
     DeletedUser {
         user_id: Identifier,
     },
@@ -109,21 +113,15 @@ pub enum ShardEvent {
         new_password: String,
     },
     CreatedPersonalAccessToken {
-        name: String,
-        expiry: IggyExpiry,
+        personal_access_token: PersonalAccessToken,
     },
     DeletedPersonalAccessToken {
+        user_id: u32,
         name: String,
     },
     LoginWithPersonalAccessToken {
         token: String,
     },
-    StoredConsumerOffset {
-        stream_id: Identifier,
-        topic_id: Identifier,
-        consumer: PollingConsumer,
-        offset: u64,
-    },
     NewSession {
         address: SocketAddr,
         transport: Transport,
diff --git a/core/server/src/streaming/topics/consumer_groups.rs 
b/core/server/src/streaming/topics/consumer_groups.rs
index 79c7f9f3..ccb8b69f 100644
--- a/core/server/src/streaming/topics/consumer_groups.rs
+++ b/core/server/src/streaming/topics/consumer_groups.rs
@@ -216,10 +216,7 @@ impl Topic {
         Ok(Identifier::numeric(id)?)
     }
 
-    pub async fn delete_consumer_group(
-        &mut self,
-        id: &Identifier,
-    ) -> Result<ConsumerGroup, IggyError> {
+    pub fn delete_consumer_group(&mut self, id: &Identifier) -> 
Result<ConsumerGroup, IggyError> {
         let group_id;
         {
             let consumer_group = 
self.get_consumer_group(id).with_error_context(|error| {
@@ -244,16 +241,6 @@ impl Topic {
                     .store(group_id, Ordering::SeqCst);
             }
 
-            for (_, partition) in self.partitions.iter() {
-                let partition = partition.read().await;
-                if let Some((_, offset)) = 
partition.consumer_group_offsets.remove(&group_id) {
-                    self.storage
-                        .partition
-                        .delete_consumer_offset(&offset.path)
-                        .await?;
-                }
-            }
-
             info!(
                 "Deleted consumer group with ID: {} from topic with ID: {} and 
stream with ID: {}.",
                 id, self.topic_id, self.stream_id
@@ -380,9 +367,7 @@ mod tests {
         let result = topic.create_consumer_group(Some(group_id), name);
         assert!(result.is_ok());
         assert_eq!(topic.consumer_groups.borrow().len(), 1);
-        let result = topic
-            .delete_consumer_group(&Identifier::numeric(group_id).unwrap())
-            .await;
+        let result = 
topic.delete_consumer_group(&Identifier::numeric(group_id).unwrap());
         assert!(result.is_ok());
         assert!(topic.consumer_groups.borrow().is_empty());
     }
@@ -396,9 +381,7 @@ mod tests {
         assert!(result.is_ok());
         assert_eq!(topic.consumer_groups.borrow().len(), 1);
         let group_id = group_id + 1;
-        let result = topic
-            .delete_consumer_group(&Identifier::numeric(group_id).unwrap())
-            .await;
+        let result = 
topic.delete_consumer_group(&Identifier::numeric(group_id).unwrap());
         assert!(result.is_err());
         assert_eq!(topic.consumer_groups.borrow().len(), 1);
     }

Reply via email to