This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch arc-swap in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 058ae50ce4b0a4d785d5d12708e898394e603658 Author: Hubert Gruszecki <[email protected]> AuthorDate: Mon Dec 29 14:26:56 2025 +0100 commands --- .../create_consumer_group_handler.rs | 8 - .../delete_consumer_group_handler.rs | 8 - .../partitions/create_partitions_handler.rs | 7 - .../partitions/delete_partitions_handler.rs | 10 +- .../create_personal_access_token_handler.rs | 5 - .../delete_personal_access_token_handler.rs | 7 - .../handlers/streams/create_stream_handler.rs | 7 - .../handlers/streams/delete_stream_handler.rs | 7 - .../handlers/streams/purge_stream_handler.rs | 6 +- .../handlers/streams/update_stream_handler.rs | 6 - .../binary/handlers/topics/create_topic_handler.rs | 15 +- .../binary/handlers/topics/delete_topic_handler.rs | 8 - .../binary/handlers/topics/purge_topic_handler.rs | 6 - .../binary/handlers/topics/update_topic_handler.rs | 12 -- .../handlers/users/change_password_handler.rs | 8 - .../binary/handlers/users/create_user_handler.rs | 11 - .../binary/handlers/users/delete_user_handler.rs | 3 - .../handlers/users/update_permissions_handler.rs | 6 - .../binary/handlers/users/update_user_handler.rs | 8 - core/server/src/http/consumer_groups.rs | 37 ---- core/server/src/http/partitions.rs | 41 +--- core/server/src/http/streams.rs | 70 +------ core/server/src/http/topics.rs | 114 ++-------- core/server/src/http/users.rs | 89 -------- core/server/src/shard/handlers.rs | 231 +-------------------- core/server/src/shard/transmission/event.rs | 105 +--------- 26 files changed, 33 insertions(+), 802 deletions(-) diff --git a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs index 0c6e9a763..fed1f0a6f 100644 --- a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs +++ b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs @@ -23,7 +23,6 @@ use crate::binary::handlers::consumer_groups::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::shard::IggyShard; -use crate::shard::transmission::event::ShardEvent; use crate::slab::traits_ext::EntityMarker; use crate::state::command::EntryCommand; use crate::state::models::CreateConsumerGroupWithId; @@ -57,13 +56,6 @@ impl ServerCommandHandler for CreateConsumerGroup { )?; let cg_id = cg.id(); - let event = ShardEvent::CreatedConsumerGroup { - stream_id: self.stream_id.clone(), - topic_id: self.topic_id.clone(), - cg, - }; - shard.broadcast_event_to_all_shards(event).await?; - let stream_id = self.stream_id.clone(); let topic_id = self.topic_id.clone(); shard diff --git a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs index 1da98ac44..3c2800fe7 100644 --- a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs +++ b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs @@ -23,7 +23,6 @@ use crate::binary::handlers::consumer_groups::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::shard::IggyShard; -use crate::shard::transmission::event::ShardEvent; use crate::slab::traits_ext::EntityMarker; use crate::state::command::EntryCommand; use crate::streaming::polling_consumer::ConsumerGroupId; @@ -102,13 +101,6 @@ impl ServerCommandHandler for DeleteConsumerGroup { ) })?; - let event = ShardEvent::DeletedConsumerGroup { - id: cg_id, - stream_id: self.stream_id.clone(), - topic_id: self.topic_id.clone(), - group_id: self.group_id.clone(), - }; - shard.broadcast_event_to_all_shards(event).await?; let stream_id = self.stream_id.clone(); let topic_id = self.topic_id.clone(); shard diff --git a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs index 0ead0c4ad..3bd65d7bf 100644 --- a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs +++ b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs @@ -23,7 +23,6 @@ use crate::binary::handlers::partitions::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::shard::IggyShard; -use crate::shard::transmission::event::ShardEvent; use crate::slab::traits_ext::EntityMarker; use crate::state::command::EntryCommand; use crate::streaming::session::Session; @@ -62,12 +61,6 @@ impl ServerCommandHandler for CreatePartitions { ) .await?; let partition_ids = partitions.iter().map(|p| p.id()).collect::<Vec<_>>(); - let event = ShardEvent::CreatedPartitions { - stream_id: self.stream_id.clone(), - topic_id: self.topic_id.clone(), - partitions, - }; - shard.broadcast_event_to_all_shards(event).await?; shard.streams.with_topic_by_id_mut( &self.stream_id, diff --git a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs index fe6740136..6e2045493 100644 --- a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs +++ b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs @@ -23,7 +23,6 @@ use crate::binary::handlers::partitions::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::shard::IggyShard; -use crate::shard::transmission::event::ShardEvent; use crate::state::command::EntryCommand; use crate::streaming::session::Session; use anyhow::Result; @@ -53,7 +52,7 @@ impl ServerCommandHandler for DeletePartitions { // Acquire partition lock to serialize filesystem operations let _partition_guard = shard.fs_locks.partition_lock.lock().await; - let deleted_partition_ids = shard + let _deleted_partition_ids = shard .delete_partitions( session, &self.stream_id, @@ -61,13 +60,6 @@ impl ServerCommandHandler for DeletePartitions { self.partitions_count, ) .await?; - let event = ShardEvent::DeletedPartitions { - stream_id: self.stream_id.clone(), - topic_id: self.topic_id.clone(), - partitions_count: self.partitions_count, - partition_ids: deleted_partition_ids, - }; - shard.broadcast_event_to_all_shards(event).await?; let remaining_partition_ids = shard.streams.with_topic_by_id( &self.stream_id, diff --git a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs index c52ddd4eb..6a0ec00ed 100644 --- a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs +++ b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs @@ -24,7 +24,6 @@ use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::shard::IggyShard; -use crate::shard::transmission::event::ShardEvent; use crate::state::command::EntryCommand; use crate::state::models::CreatePersonalAccessTokenWithHash; use crate::streaming::session::Session; @@ -60,10 +59,6 @@ impl ServerCommandHandler for CreatePersonalAccessToken { })?; let bytes = mapper::map_raw_pat(&token); let hash = personal_access_token.token.to_string(); - let event = ShardEvent::CreatedPersonalAccessToken { - personal_access_token: personal_access_token.clone(), - }; - shard.broadcast_event_to_all_shards(event).await?; shard .state diff --git a/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs b/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs index 9a588ccbe..19b472fb3 100644 --- a/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs +++ b/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs @@ -53,13 +53,6 @@ impl ServerCommandHandler for DeletePersonalAccessToken { "{COMPONENT} (error: {error}) - failed to delete personal access token with name: {token_name}, session: {session}" )})?; - // Broadcast the event to other shards - let event = crate::shard::transmission::event::ShardEvent::DeletedPersonalAccessToken { - user_id: session.get_user_id(), - name: self.name.clone(), - }; - shard.broadcast_event_to_all_shards(event).await?; - shard .state .apply( diff --git a/core/server/src/binary/handlers/streams/create_stream_handler.rs b/core/server/src/binary/handlers/streams/create_stream_handler.rs index e56f87e64..83990d9eb 100644 --- a/core/server/src/binary/handlers/streams/create_stream_handler.rs +++ b/core/server/src/binary/handlers/streams/create_stream_handler.rs @@ -24,7 +24,6 @@ use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::shard::IggyShard; -use crate::shard::transmission::event::ShardEvent; use crate::shard::transmission::frame::ShardResponse; use crate::shard::transmission::message::{ ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult, @@ -77,12 +76,6 @@ impl ServerCommandHandler for CreateStream { let stream = shard.create_stream(session, name).await?; let created_stream_id = stream.id(); - let event = ShardEvent::CreatedStream { - id: created_stream_id, - stream: stream.clone(), - }; - shard.broadcast_event_to_all_shards(event).await?; - let response = shard .streams .with_components_by_id(created_stream_id, |(root, stats)| { diff --git a/core/server/src/binary/handlers/streams/delete_stream_handler.rs b/core/server/src/binary/handlers/streams/delete_stream_handler.rs index 76bc9f8ca..c1ae46336 100644 --- a/core/server/src/binary/handlers/streams/delete_stream_handler.rs +++ b/core/server/src/binary/handlers/streams/delete_stream_handler.rs @@ -22,7 +22,6 @@ use crate::binary::command::{ use crate::binary::handlers::streams::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::shard::IggyShard; -use crate::shard::transmission::event::ShardEvent; use crate::shard::transmission::frame::ShardResponse; use crate::shard::transmission::message::{ ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult, @@ -83,12 +82,6 @@ impl ServerCommandHandler for DeleteStream { stream.id() ); - let event = ShardEvent::DeletedStream { - id: stream.id(), - stream_id: stream_id.clone(), - }; - shard.broadcast_event_to_all_shards(event).await?; - shard .state .apply(session.get_user_id(), &EntryCommand::DeleteStream(DeleteStream { stream_id: stream_id.clone() })) diff --git a/core/server/src/binary/handlers/streams/purge_stream_handler.rs b/core/server/src/binary/handlers/streams/purge_stream_handler.rs index b8d3cfc0c..fd26111e7 100644 --- a/core/server/src/binary/handlers/streams/purge_stream_handler.rs +++ b/core/server/src/binary/handlers/streams/purge_stream_handler.rs @@ -23,7 +23,6 @@ use crate::binary::handlers::streams::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::shard::IggyShard; -use crate::shard::transmission::event::ShardEvent; use crate::state::command::EntryCommand; use crate::streaming::session::Session; use anyhow::Result; @@ -54,10 +53,7 @@ impl ServerCommandHandler for PurgeStream { .with_error(|error| { format!("{COMPONENT} (error: {error}) - failed to purge stream with id: {stream_id}, session: {session}") })?; - let event = ShardEvent::PurgedStream { - stream_id: self.stream_id.clone(), - }; - shard.broadcast_event_to_all_shards(event).await?; + shard .state .apply(session.get_user_id(), &EntryCommand::PurgeStream(self)) diff --git a/core/server/src/binary/handlers/streams/update_stream_handler.rs b/core/server/src/binary/handlers/streams/update_stream_handler.rs index c080412a7..7168cae2d 100644 --- a/core/server/src/binary/handlers/streams/update_stream_handler.rs +++ b/core/server/src/binary/handlers/streams/update_stream_handler.rs @@ -23,7 +23,6 @@ use crate::binary::handlers::streams::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::shard::IggyShard; -use crate::shard::transmission::event::ShardEvent; use crate::state::command::EntryCommand; use crate::streaming::session::Session; use anyhow::Result; @@ -54,11 +53,6 @@ impl ServerCommandHandler for UpdateStream { format!("{COMPONENT} (error: {error}) - failed to update stream with id: {stream_id}, session: {session}") })?; - let event = ShardEvent::UpdatedStream { - stream_id: self.stream_id.clone(), - name: self.name.clone(), - }; - shard.broadcast_event_to_all_shards(event).await?; shard .state .apply(session.get_user_id(), &EntryCommand::UpdateStream(self)) diff --git a/core/server/src/binary/handlers/topics/create_topic_handler.rs b/core/server/src/binary/handlers/topics/create_topic_handler.rs index b5b5c96a2..b15b9c244 100644 --- a/core/server/src/binary/handlers/topics/create_topic_handler.rs +++ b/core/server/src/binary/handlers/topics/create_topic_handler.rs @@ -24,7 +24,6 @@ use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::shard::IggyShard; -use crate::shard::transmission::event::ShardEvent; use crate::shard::transmission::frame::ShardResponse; use crate::shard::transmission::message::{ ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult, @@ -109,13 +108,7 @@ impl ServerCommandHandler for CreateTopic { .with_stream_by_id(&stream_id, streams::helpers::get_stream_id()); let topic_id = topic.id(); - let event = ShardEvent::CreatedTopic { - stream_id: stream_id.clone(), - topic, - }; - - shard.broadcast_event_to_all_shards(event).await?; - let partitions = shard + shard .create_partitions( session, &stream_id, @@ -123,12 +116,6 @@ impl ServerCommandHandler for CreateTopic { partitions_count, ) .await?; - let event = ShardEvent::CreatedPartitions { - stream_id: stream_id.clone(), - topic_id: Identifier::numeric(topic_id as u32).unwrap(), - partitions, - }; - shard.broadcast_event_to_all_shards(event).await?; let response = shard.streams.with_topic_by_id( &stream_id, diff --git a/core/server/src/binary/handlers/topics/delete_topic_handler.rs b/core/server/src/binary/handlers/topics/delete_topic_handler.rs index ea3d21cdd..59b8394b7 100644 --- a/core/server/src/binary/handlers/topics/delete_topic_handler.rs +++ b/core/server/src/binary/handlers/topics/delete_topic_handler.rs @@ -23,7 +23,6 @@ use crate::binary::handlers::topics::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::shard::IggyShard; -use crate::shard::transmission::event::ShardEvent; use crate::shard::transmission::frame::ShardResponse; use crate::shard::transmission::message::{ ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult, @@ -91,13 +90,6 @@ impl ServerCommandHandler for DeleteTopic { stream_id_num ); - let event = ShardEvent::DeletedTopic { - id: topic_id_num, - stream_id: stream_id.clone(), - topic_id: topic_id.clone(), - }; - shard.broadcast_event_to_all_shards(event).await?; - shard .state .apply(session.get_user_id(), &EntryCommand::DeleteTopic(self)) diff --git a/core/server/src/binary/handlers/topics/purge_topic_handler.rs b/core/server/src/binary/handlers/topics/purge_topic_handler.rs index 2cea843ed..8c725f960 100644 --- a/core/server/src/binary/handlers/topics/purge_topic_handler.rs +++ b/core/server/src/binary/handlers/topics/purge_topic_handler.rs @@ -58,12 +58,6 @@ impl ServerCommandHandler for PurgeTopic { ) })?; - let event = crate::shard::transmission::event::ShardEvent::PurgedTopic { - stream_id: self.stream_id.clone(), - topic_id: self.topic_id.clone(), - }; - shard.broadcast_event_to_all_shards(event).await?; - shard .state .apply(session.get_user_id(), &EntryCommand::PurgeTopic(self)) diff --git a/core/server/src/binary/handlers/topics/update_topic_handler.rs b/core/server/src/binary/handlers/topics/update_topic_handler.rs index 6dfa2281d..0f4ae3a8a 100644 --- a/core/server/src/binary/handlers/topics/update_topic_handler.rs +++ b/core/server/src/binary/handlers/topics/update_topic_handler.rs @@ -23,7 +23,6 @@ use crate::binary::handlers::topics::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::shard::IggyShard; -use crate::shard::transmission::event::ShardEvent; use crate::shard::transmission::frame::ShardResponse; use crate::shard::transmission::message::{ ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult, @@ -117,17 +116,6 @@ impl ServerCommandHandler for UpdateTopic { .streams .with_stream_by_id(&stream_id, streams::helpers::get_stream_id()); - let event = ShardEvent::UpdatedTopic { - stream_id: stream_id.clone(), - topic_id: topic_id.clone(), - name: name.clone(), - message_expiry: self.message_expiry, - compression_algorithm: self.compression_algorithm, - max_topic_size: self.max_topic_size, - replication_factor: self.replication_factor, - }; - shard.broadcast_event_to_all_shards(event).await?; - shard .state .apply(session.get_user_id(), &EntryCommand::UpdateTopic(self)) diff --git a/core/server/src/binary/handlers/users/change_password_handler.rs b/core/server/src/binary/handlers/users/change_password_handler.rs index dc512e172..77e4ef47d 100644 --- a/core/server/src/binary/handlers/users/change_password_handler.rs +++ b/core/server/src/binary/handlers/users/change_password_handler.rs @@ -23,7 +23,6 @@ use crate::binary::handlers::users::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::shard::IggyShard; -use crate::shard::transmission::event::ShardEvent; use crate::state::command::EntryCommand; use crate::streaming::session::Session; use crate::streaming::utils::crypto; @@ -67,13 +66,6 @@ impl ServerCommandHandler for ChangePassword { info!("Changed password for user with ID: {}.", self.user_id); - let event = ShardEvent::ChangedPassword { - user_id: self.user_id.clone(), - current_password: self.current_password.clone(), - new_password: self.new_password.clone(), - }; - shard.broadcast_event_to_all_shards(event).await?; - // For the security of the system, we hash the password before storing it in metadata. shard .state diff --git a/core/server/src/binary/handlers/users/create_user_handler.rs b/core/server/src/binary/handlers/users/create_user_handler.rs index 0caa873f1..1b5cc190e 100644 --- a/core/server/src/binary/handlers/users/create_user_handler.rs +++ b/core/server/src/binary/handlers/users/create_user_handler.rs @@ -23,7 +23,6 @@ use crate::binary::handlers::users::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::shard::IggyShard; -use crate::shard::transmission::event::ShardEvent; use crate::shard::transmission::frame::ShardResponse; use crate::shard::transmission::message::{ ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult, @@ -91,16 +90,6 @@ impl ServerCommandHandler for CreateUser { })?; let user_id = user.id; - - let event = ShardEvent::CreatedUser { - user_id, - username: username.clone(), - password: password.clone(), - status, - permissions: permissions.clone(), - }; - shard.broadcast_event_to_all_shards(event).await?; - let response = mapper::map_user(&user); shard diff --git a/core/server/src/binary/handlers/users/delete_user_handler.rs b/core/server/src/binary/handlers/users/delete_user_handler.rs index 55b0f103d..42b74de87 100644 --- a/core/server/src/binary/handlers/users/delete_user_handler.rs +++ b/core/server/src/binary/handlers/users/delete_user_handler.rs @@ -25,7 +25,6 @@ use crate::binary::handlers::users::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::shard::IggyShard; -use crate::shard::transmission::event::ShardEvent; use crate::shard::transmission::frame::ShardResponse; use crate::shard::transmission::message::{ ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult, @@ -81,8 +80,6 @@ impl ServerCommandHandler for DeleteUser { })?; info!("Deleted user: {} with ID: {}.", user.username, user.id); - let event = ShardEvent::DeletedUser { user_id }; - shard.broadcast_event_to_all_shards(event).await?; shard .state diff --git a/core/server/src/binary/handlers/users/update_permissions_handler.rs b/core/server/src/binary/handlers/users/update_permissions_handler.rs index 948ce8e41..4100da70b 100644 --- a/core/server/src/binary/handlers/users/update_permissions_handler.rs +++ b/core/server/src/binary/handlers/users/update_permissions_handler.rs @@ -25,7 +25,6 @@ use crate::binary::handlers::users::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::shard::IggyShard; -use crate::shard::transmission::event::ShardEvent; use crate::state::command::EntryCommand; use crate::streaming::session::Session; use anyhow::Result; @@ -56,11 +55,6 @@ impl ServerCommandHandler for UpdatePermissions { self.user_id ))?; info!("Updated permissions for user with ID: {}.", self.user_id); - let event = ShardEvent::UpdatedPermissions { - user_id: self.user_id.clone(), - permissions: self.permissions.clone(), - }; - shard.broadcast_event_to_all_shards(event).await?; shard .state diff --git a/core/server/src/binary/handlers/users/update_user_handler.rs b/core/server/src/binary/handlers/users/update_user_handler.rs index f4e12a0f5..fae459941 100644 --- a/core/server/src/binary/handlers/users/update_user_handler.rs +++ b/core/server/src/binary/handlers/users/update_user_handler.rs @@ -25,7 +25,6 @@ use crate::binary::handlers::users::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::shard::IggyShard; -use crate::shard::transmission::event::ShardEvent; use crate::state::command::EntryCommand; use crate::streaming::session::Session; use anyhow::Result; @@ -66,13 +65,6 @@ impl ServerCommandHandler for UpdateUser { info!("Updated user: {} with ID: {}.", user.username, user.id); - let event = ShardEvent::UpdatedUser { - user_id: self.user_id.clone(), - username: self.username.clone(), - status: self.status, - }; - shard.broadcast_event_to_all_shards(event).await?; - let user_id = self.user_id.clone(); shard .state diff --git a/core/server/src/http/consumer_groups.rs b/core/server/src/http/consumer_groups.rs index e1e9b2f7e..278425796 100644 --- a/core/server/src/http/consumer_groups.rs +++ b/core/server/src/http/consumer_groups.rs @@ -180,24 +180,6 @@ async fn create_consumer_group( let group_id = consumer_group.id(); - // Send event for consumer group creation - { - let broadcast_future = SendWrapper::new(async { - use crate::shard::transmission::event::ShardEvent; - let event = ShardEvent::CreatedConsumerGroup { - stream_id: command.stream_id.clone(), - topic_id: command.topic_id.clone(), - cg: consumer_group.clone(), - }; - let _responses = state - .shard - .shard() - .broadcast_event_to_all_shards(event) - .await; - }); - broadcast_future.await; - } - // Get the created consumer group details let group_id_identifier = Identifier::numeric(group_id as u32).unwrap(); let consumer_group_details = state.shard.shard().streams.with_consumer_group_by_id( @@ -296,25 +278,6 @@ async fn delete_consumer_group( ) })?; - // Send event for consumer group deletion - { - let broadcast_future = SendWrapper::new(async { - use crate::shard::transmission::event::ShardEvent; - let event = ShardEvent::DeletedConsumerGroup { - id: cg_id, - stream_id: identifier_stream_id.clone(), - topic_id: identifier_topic_id.clone(), - group_id: identifier_group_id.clone(), - }; - let _responses = state - .shard - .shard() - .broadcast_event_to_all_shards(event) - .await; - }); - broadcast_future.await; - } - // Apply state change let entry_command = EntryCommand::DeleteConsumerGroup(DeleteConsumerGroup { stream_id: identifier_stream_id, diff --git a/core/server/src/http/partitions.rs b/core/server/src/http/partitions.rs index 46c842761..4e5f4e97b 100644 --- a/core/server/src/http/partitions.rs +++ b/core/server/src/http/partitions.rs @@ -20,7 +20,6 @@ use crate::http::COMPONENT; use crate::http::error::CustomError; use crate::http::jwt::json_web_token::Identity; use crate::http::shared::AppState; -use crate::shard::transmission::event::ShardEvent; use crate::state::command::EntryCommand; use crate::streaming::session::Session; use axum::extract::{Path, Query, State}; @@ -59,7 +58,7 @@ async fn create_partitions( let _parititon_guard = state.shard.shard().fs_locks.partition_lock.lock().await; let session = Session::stateless(identity.user_id, identity.ip_address); - let partitions = SendWrapper::new(state.shard.shard().create_partitions( + let _partitions = SendWrapper::new(state.shard.shard().create_partitions( &session, &command.stream_id, &command.topic_id, @@ -67,24 +66,6 @@ async fn create_partitions( )) .await?; - let broadcast_future = SendWrapper::new(async { - let shard = state.shard.shard(); - - let event = ShardEvent::CreatedPartitions { - stream_id: command.stream_id.clone(), - topic_id: command.topic_id.clone(), - partitions, - }; - let _responses = shard.broadcast_event_to_all_shards(event).await; - Ok::<(), CustomError>(()) - }); - - broadcast_future.await - .with_error(|error| { - format!( - "{COMPONENT} (error: {error}) - failed to broadcast partition events, stream ID: {stream_id}, topic ID: {topic_id}" - ) - })?; let command = EntryCommand::CreatePartitions(command); let state_future = SendWrapper::new(state.shard.shard().state.apply(identity.user_id, &command)); @@ -112,7 +93,7 @@ async fn delete_partitions( query.validate()?; let session = Session::stateless(identity.user_id, identity.ip_address); - let deleted_partition_ids = { + { let delete_future = SendWrapper::new(state.shard.shard().delete_partitions( &session, &query.stream_id, @@ -127,24 +108,6 @@ async fn delete_partitions( })? }; - // Send event for partition deletion - { - let broadcast_future = SendWrapper::new(async { - let event = ShardEvent::DeletedPartitions { - stream_id: query.stream_id.clone(), - topic_id: query.topic_id.clone(), - partitions_count: query.partitions_count, - partition_ids: deleted_partition_ids, - }; - let _responses = state - .shard - .shard() - .broadcast_event_to_all_shards(event) - .await; - }); - broadcast_future.await; - } - let command = EntryCommand::DeletePartitions(DeletePartitions { stream_id: query.stream_id.clone(), topic_id: query.topic_id.clone(), diff --git a/core/server/src/http/streams.rs b/core/server/src/http/streams.rs index 2a66ea1d1..0c49feab3 100644 --- a/core/server/src/http/streams.rs +++ b/core/server/src/http/streams.rs @@ -117,20 +117,6 @@ async fn create_stream( let created_stream_id = stream.root().id(); - // Send event for stream creation - inlined from wrapper - { - use crate::shard::transmission::event::ShardEvent; - let event = ShardEvent::CreatedStream { - id: created_stream_id, - stream, - }; - let _responses = state - .shard - .shard() - .broadcast_event_to_all_shards(event) - .await; - } - // Apply state change using wrapper method let entry_command = EntryCommand::CreateStream(CreateStreamWithId { stream_id: created_stream_id as u32, @@ -189,23 +175,6 @@ async fn update_stream( ) })?; - // Send event for stream update - { - let broadcast_future = SendWrapper::new(async { - use crate::shard::transmission::event::ShardEvent; - let event = ShardEvent::UpdatedStream { - stream_id: command.stream_id.clone(), - name: command.name.clone(), - }; - let _responses = state - .shard - .shard() - .broadcast_event_to_all_shards(event) - .await; - }); - broadcast_future.await; - } - // Apply state change using wrapper method let entry_command = EntryCommand::UpdateStream(command); state.shard.apply_state(identity.user_id, &entry_command).await.with_error(|error| { @@ -232,8 +201,8 @@ async fn delete_stream( let session = Session::stateless(identity.user_id, identity.ip_address); let _stream_guard = state.shard.shard().fs_locks.stream_lock.lock().await; - // Delete stream and get the stream entity - let stream = { + // Delete stream + { let future = SendWrapper::new( state .shard @@ -246,25 +215,6 @@ async fn delete_stream( format!("{COMPONENT} (error: {error}) - failed to delete stream with ID: {stream_id}",) })?; - let stream_id_numeric = stream.root().id(); - - // Send event for stream deletion - { - let broadcast_future = SendWrapper::new(async { - use crate::shard::transmission::event::ShardEvent; - let event = ShardEvent::DeletedStream { - id: stream_id_numeric, - stream_id: identifier_stream_id.clone(), - }; - let _responses = state - .shard - .shard() - .broadcast_event_to_all_shards(event) - .await; - }); - broadcast_future.await; - } - // Apply state change using wrapper method let entry_command = EntryCommand::DeleteStream(DeleteStream { stream_id: identifier_stream_id, @@ -304,22 +254,6 @@ async fn purge_stream( ) })?; - // Send event for stream purge - { - let broadcast_future = SendWrapper::new(async { - use crate::shard::transmission::event::ShardEvent; - let event = ShardEvent::PurgedStream { - stream_id: identifier_stream_id.clone(), - }; - let _responses = state - .shard - .shard() - .broadcast_event_to_all_shards(event) - .await; - }); - broadcast_future.await; - } - // Apply state change using wrapper method let entry_command = EntryCommand::PurgeStream(PurgeStream { stream_id: identifier_stream_id, diff --git a/core/server/src/http/topics.rs b/core/server/src/http/topics.rs index 551eed602..f4586a869 100644 --- a/core/server/src/http/topics.rs +++ b/core/server/src/http/topics.rs @@ -202,44 +202,23 @@ async fn create_topic( let topic_id = topic.id(); - // Send events for topic creation - let broadcast_future = SendWrapper::new(async { - use crate::shard::transmission::event::ShardEvent; - + // Create partitions + { let shard = state.shard.shard(); - - let event = ShardEvent::CreatedTopic { - stream_id: command.stream_id.clone(), - topic, - }; - let _responses = shard.broadcast_event_to_all_shards(event).await; - - // Create partitions - let partitions = shard - .create_partitions( - &session, - &command.stream_id, - &Identifier::numeric(topic_id as u32).unwrap(), - command.partitions_count, - ) - .await?; - - let event = ShardEvent::CreatedPartitions { - stream_id: command.stream_id.clone(), - topic_id: Identifier::numeric(topic_id as u32).unwrap(), - partitions, - }; - let _responses = shard.broadcast_event_to_all_shards(event).await; - - Ok::<(), CustomError>(()) - }); - - broadcast_future.await - .with_error(|error| { - format!( - "{COMPONENT} (error: {error}) - failed to broadcast topic events, stream ID: {stream_id}" - ) - })?; + let topic_identifier = Identifier::numeric(topic_id as u32).unwrap(); + let future = SendWrapper::new(shard.create_partitions( + &session, + &command.stream_id, + &topic_identifier, + command.partitions_count, + )); + future.await + } + .with_error(|error| { + format!( + "{COMPONENT} (error: {error}) - failed to create partitions, stream ID: {stream_id}" + ) + })?; // Create response using the same approach as binary handler let response = { @@ -320,28 +299,6 @@ async fn update_topic( |(root, _, _)| (root.message_expiry(), root.max_topic_size()), ); - // Send event for topic update - { - let broadcast_future = SendWrapper::new(async { - use crate::shard::transmission::event::ShardEvent; - let event = ShardEvent::UpdatedTopic { - stream_id: command.stream_id.clone(), - topic_id: command.topic_id.clone(), - name: command.name.clone(), - message_expiry: command.message_expiry, - compression_algorithm: command.compression_algorithm, - max_topic_size: command.max_topic_size, - replication_factor: command.replication_factor, - }; - let _responses = state - .shard - .shard() - .broadcast_event_to_all_shards(event) - .await; - }); - broadcast_future.await; - } - command.message_expiry = message_expiry; command.max_topic_size = max_topic_size; @@ -372,7 +329,7 @@ async fn delete_topic( let session = Session::stateless(identity.user_id, identity.ip_address); let _topic_guard = state.shard.shard().fs_locks.topic_lock.lock().await; - let topic = { + { let future = SendWrapper::new(state.shard.shard().delete_topic( &session, &identifier_stream_id, @@ -385,26 +342,6 @@ async fn delete_topic( ) })?; - let topic_id_numeric = topic.root().id(); - - // Send event for topic deletion - { - let broadcast_future = SendWrapper::new(async { - use crate::shard::transmission::event::ShardEvent; - let event = ShardEvent::DeletedTopic { - id: topic_id_numeric, - stream_id: identifier_stream_id.clone(), - topic_id: identifier_topic_id.clone(), - }; - let _responses = state - .shard - .shard() - .broadcast_event_to_all_shards(event) - .await; - }); - broadcast_future.await; - } - { let entry_command = EntryCommand::DeleteTopic(DeleteTopic { stream_id: identifier_stream_id, @@ -450,23 +387,6 @@ async fn purge_topic( ) })?; - // Send event for topic purge - { - let broadcast_future = SendWrapper::new(async { - use crate::shard::transmission::event::ShardEvent; - let event = ShardEvent::PurgedTopic { - stream_id: identifier_stream_id.clone(), - topic_id: identifier_topic_id.clone(), - }; - let _responses = state - .shard - .shard() - .broadcast_event_to_all_shards(event) - .await; - }); - broadcast_future.await; - } - { let entry_command = EntryCommand::PurgeTopic(PurgeTopic { stream_id: identifier_stream_id, diff --git a/core/server/src/http/users.rs b/core/server/src/http/users.rs index 4389c6d87..ff49dae6c 100644 --- a/core/server/src/http/users.rs +++ b/core/server/src/http/users.rs @@ -137,26 +137,6 @@ async fn create_user( let user_id = user.id; let response = Json(mapper::map_user(&user)); - // Send event for user creation - { - let broadcast_future = SendWrapper::new(async { - use crate::shard::transmission::event::ShardEvent; - let event = ShardEvent::CreatedUser { - user_id, - username: command.username.to_owned(), - password: command.password.to_owned(), - status: command.status, - permissions: command.permissions.clone(), - }; - let _responses = state - .shard - .shard() - .broadcast_event_to_all_shards(event) - .await; - }); - broadcast_future.await; - } - { let username = command.username.clone(); let entry_command = EntryCommand::CreateUser(CreateUserWithId { @@ -212,24 +192,6 @@ async fn update_user( format!("{COMPONENT} (error: {error}) - failed to update user, user ID: {user_id}") })?; - // Send event for user update - { - let broadcast_future = SendWrapper::new(async { - use crate::shard::transmission::event::ShardEvent; - let event = ShardEvent::UpdatedUser { - user_id: command.user_id.clone(), - username: command.username.clone(), - status: command.status, - }; - let _responses = state - .shard - .shard() - .broadcast_event_to_all_shards(event) - .await; - }); - broadcast_future.await; - } - { let username = command.username.clone(); let entry_command = EntryCommand::UpdateUser(command); @@ -273,23 +235,6 @@ async fn update_permissions( ) })?; - // Send event for permissions update - { - let broadcast_future = SendWrapper::new(async { - use crate::shard::transmission::event::ShardEvent; - let event = ShardEvent::UpdatedPermissions { - user_id: command.user_id.clone(), - permissions: command.permissions.clone(), - }; - let _responses = state - .shard - .shard() - .broadcast_event_to_all_shards(event) - .await; - }); - broadcast_future.await; - } - { let entry_command = EntryCommand::UpdatePermissions(command); let future = SendWrapper::new( @@ -334,24 +279,6 @@ async fn change_password( format!("{COMPONENT} (error: {error}) - failed to change password, user ID: {user_id}") })?; - // Send event for password change - { - let broadcast_future = SendWrapper::new(async { - use crate::shard::transmission::event::ShardEvent; - let event = ShardEvent::ChangedPassword { - user_id: command.user_id.clone(), - current_password: command.current_password.clone(), - new_password: command.new_password.clone(), - }; - let _responses = state - .shard - .shard() - .broadcast_event_to_all_shards(event) - .await; - }); - broadcast_future.await; - } - { let entry_command = EntryCommand::ChangePassword(command); let future = SendWrapper::new( @@ -390,22 +317,6 @@ async fn delete_user( format!("{COMPONENT} (error: {error}) - failed to delete user with ID: {user_id}") })?; - // Send event for user deletion - { - let broadcast_future = SendWrapper::new(async { - use crate::shard::transmission::event::ShardEvent; - let event = ShardEvent::DeletedUser { - user_id: identifier_user_id.clone(), - }; - let _responses = state - .shard - .shard() - .broadcast_event_to_all_shards(event) - .await; - }); - broadcast_future.await; - } - { let entry_command = EntryCommand::DeleteUser(DeleteUser { user_id: identifier_user_id, diff --git a/core/server/src/shard/handlers.rs b/core/server/src/shard/handlers.rs index 81c54a98a..c1aed5c92 100644 --- a/core/server/src/shard/handlers.rs +++ b/core/server/src/shard/handlers.rs @@ -120,14 +120,6 @@ async fn handle_request( let _stream_guard = shard.fs_locks.stream_lock.lock().await; let stream = shard.create_stream(&session, name.clone()).await?; - let created_stream_id = stream.id(); - - let event = ShardEvent::CreatedStream { - id: created_stream_id, - stream: stream.clone(), - }; - - shard.broadcast_event_to_all_shards(event).await?; Ok(ShardResponse::CreateStreamResponse(stream)) } @@ -164,12 +156,8 @@ async fn handle_request( .await?; let topic_id = topic.id(); - let event = ShardEvent::CreatedTopic { - stream_id: stream_id.clone(), - topic: topic.clone(), - }; - shard.broadcast_event_to_all_shards(event).await?; - let partitions = shard + + shard .create_partitions( &session, &stream_id, @@ -178,13 +166,6 @@ async fn handle_request( ) .await?; - let event = ShardEvent::CreatedPartitions { - stream_id: stream_id.clone(), - topic_id: Identifier::numeric(topic_id as u32).unwrap(), - partitions, - }; - shard.broadcast_event_to_all_shards(event).await?; - Ok(ShardResponse::CreateTopicResponse(topic)) } ShardRequestPayload::UpdateTopic { @@ -215,17 +196,6 @@ async fn handle_request( replication_factor, )?; - let event = ShardEvent::UpdatedTopic { - stream_id: stream_id.clone(), - topic_id: topic_id.clone(), - name, - message_expiry, - compression_algorithm, - max_topic_size, - replication_factor, - }; - shard.broadcast_event_to_all_shards(event).await?; - Ok(ShardResponse::UpdateTopicResponse) } ShardRequestPayload::DeleteTopic { @@ -242,14 +212,6 @@ async fn handle_request( let _topic_guard = shard.fs_locks.topic_lock.lock().await; let topic = shard.delete_topic(&session, &stream_id, &topic_id).await?; - let topic_id_num = topic.root().id(); - - let event = ShardEvent::DeletedTopic { - id: topic_id_num, - stream_id: stream_id.clone(), - topic_id: topic_id.clone(), - }; - shard.broadcast_event_to_all_shards(event).await?; Ok(ShardResponse::DeleteTopicResponse(topic)) } @@ -270,16 +232,6 @@ async fn handle_request( let user = shard.create_user(&session, &username, &password, status, permissions.clone())?; - let created_user_id = user.id; - - let event = ShardEvent::CreatedUser { - user_id: created_user_id, - username: username.clone(), - password: password.clone(), - status, - permissions: permissions.clone(), - }; - shard.broadcast_event_to_all_shards(event).await?; Ok(ShardResponse::CreateUserResponse(user)) } ShardRequestPayload::GetStats { .. } => { @@ -299,8 +251,6 @@ async fn handle_request( ); let _user_guard = shard.fs_locks.user_lock.lock().await; let user = shard.delete_user(&session, &user_id)?; - let event = ShardEvent::DeletedUser { user_id }; - shard.broadcast_event_to_all_shards(event).await?; Ok(ShardResponse::DeletedUser(user)) } ShardRequestPayload::DeleteStream { user_id, stream_id } => { @@ -312,11 +262,6 @@ async fn handle_request( ); let _stream_guard = shard.fs_locks.stream_lock.lock().await; let stream = shard.delete_stream(&session, &stream_id).await?; - let event = ShardEvent::DeletedStream { - id: stream.id(), - stream_id, - }; - shard.broadcast_event_to_all_shards(event).await?; Ok(ShardResponse::DeleteStreamResponse(stream)) } ShardRequestPayload::SocketTransfer { @@ -402,86 +347,15 @@ async fn handle_request( pub async fn handle_event(shard: &Rc<IggyShard>, event: ShardEvent) -> Result<(), IggyError> { match event { - ShardEvent::DeletedPartitions { - stream_id, - topic_id, - partitions_count, - partition_ids, - } => { - shard.delete_partitions_bypass_auth( - &stream_id, - &topic_id, - partitions_count, - partition_ids, - )?; - Ok(()) - } - ShardEvent::UpdatedStream { stream_id, name } => { - shard.update_stream_bypass_auth(&stream_id, &name)?; - Ok(()) - } - ShardEvent::PurgedStream { stream_id } => { - shard.purge_stream_bypass_auth(&stream_id).await?; - Ok(()) - } - ShardEvent::PurgedTopic { + ShardEvent::FlushUnsavedBuffer { stream_id, topic_id, + partition_id, + fsync, } => { - shard.purge_topic_bypass_auth(&stream_id, &topic_id).await?; - Ok(()) - } - ShardEvent::CreatedUser { - user_id, - username, - password, - status, - permissions, - } => { - shard.create_user_bypass_auth( - user_id, - &username, - &password, - status, - permissions.clone(), - )?; - Ok(()) - } - ShardEvent::DeletedUser { user_id } => { - shard.delete_user_bypass_auth(&user_id)?; - Ok(()) - } - ShardEvent::ChangedPassword { - user_id, - current_password, - new_password, - } => { - shard.change_password_bypass_auth(&user_id, ¤t_password, &new_password)?; - Ok(()) - } - ShardEvent::CreatedPersonalAccessToken { - personal_access_token, - } => { - shard.create_personal_access_token_bypass_auth(personal_access_token.to_owned())?; - Ok(()) - } - ShardEvent::DeletedPersonalAccessToken { user_id, name } => { - shard.delete_personal_access_token_bypass_auth(user_id, &name)?; - Ok(()) - } - ShardEvent::UpdatedUser { - user_id, - username, - status, - } => { - shard.update_user_bypass_auth(&user_id, username.to_owned(), status)?; - Ok(()) - } - ShardEvent::UpdatedPermissions { - user_id, - permissions, - } => { - shard.update_permissions_bypass_auth(&user_id, permissions.to_owned())?; + shard + .flush_unsaved_buffer_base(&stream_id, &topic_id, partition_id, fsync) + .await?; Ok(()) } ShardEvent::AddressBound { protocol, address } => { @@ -509,94 +383,5 @@ pub async fn handle_event(shard: &Rc<IggyShard>, event: ShardEvent) -> Result<() } Ok(()) } - ShardEvent::CreatedStream { id: _, stream } => { - // Note: Slab::insert() returns the next available slot, not the stream's embedded ID. - // The stream already has the correct ID from shard 0, which is preserved after insert. - shard.create_stream_bypass_auth(stream); - Ok(()) - } - ShardEvent::DeletedStream { id, stream_id } => { - let stream = shard.delete_stream_bypass_auth(&stream_id); - assert_eq!(stream.id(), id); - - Ok(()) - } - ShardEvent::CreatedTopic { stream_id, topic } => { - // Note: Slab::insert() returns the next available slot, not the topic's embedded ID. - // The topic already has the correct ID from shard 0, which is preserved after insert. - shard.create_topic_bypass_auth(&stream_id, topic.clone()); - Ok(()) - } - ShardEvent::CreatedPartitions { - stream_id, - topic_id, - partitions, - } => { - shard - .create_partitions_bypass_auth(&stream_id, &topic_id, partitions) - .await?; - Ok(()) - } - ShardEvent::DeletedTopic { - id, - stream_id, - topic_id, - } => { - let topic = shard.delete_topic_bypass_auth(&stream_id, &topic_id); - assert_eq!(topic.id(), id); - Ok(()) - } - ShardEvent::UpdatedTopic { - stream_id, - topic_id, - name, - message_expiry, - compression_algorithm, - max_topic_size, - replication_factor, - } => { - shard.update_topic_bypass_auth( - &stream_id, - &topic_id, - name.clone(), - message_expiry, - compression_algorithm, - max_topic_size, - replication_factor, - )?; - Ok(()) - } - ShardEvent::CreatedConsumerGroup { - stream_id, - topic_id, - cg, - } => { - // Note: Slab::insert() returns the next available slot, not the consumer group's embedded ID. - // The consumer group already has the correct ID from shard 0. - shard.create_consumer_group_bypass_auth(&stream_id, &topic_id, cg); - Ok(()) - } - ShardEvent::DeletedConsumerGroup { - id, - stream_id, - topic_id, - group_id, - } => { - let cg = shard.delete_consumer_group_bypass_auth(&stream_id, &topic_id, &group_id); - assert_eq!(cg.id(), id); - - Ok(()) - } - ShardEvent::FlushUnsavedBuffer { - stream_id, - topic_id, - partition_id, - fsync, - } => { - shard - .flush_unsaved_buffer_base(&stream_id, &topic_id, partition_id, fsync) - .await?; - Ok(()) - } } } diff --git a/core/server/src/shard/transmission/event.rs b/core/server/src/shard/transmission/event.rs index eef75d958..3364a55aa 100644 --- a/core/server/src/shard/transmission/event.rs +++ b/core/server/src/shard/transmission/event.rs @@ -15,23 +15,10 @@ // specific language governing permissions and limitations // under the License. -use crate::streaming::{ - partitions::partition, - personal_access_tokens::personal_access_token::PersonalAccessToken, - streams::stream, - topics::{ - consumer_group::{self}, - topic, - }, -}; -use iggy_common::{ - CompressionAlgorithm, Identifier, IggyExpiry, MaxTopicSize, Permissions, TransportProtocol, - UserStatus, -}; +use iggy_common::{Identifier, TransportProtocol}; use std::net::SocketAddr; use strum::Display; -#[allow(clippy::large_enum_variant)] #[derive(Debug, Clone, Display)] #[strum(serialize_all = "PascalCase")] pub enum ShardEvent { @@ -41,96 +28,6 @@ pub enum ShardEvent { partition_id: usize, fsync: bool, }, - CreatedStream { - id: usize, - stream: stream::Stream, - }, - DeletedStream { - id: usize, - stream_id: Identifier, - }, - UpdatedStream { - stream_id: Identifier, - name: String, - }, - PurgedStream { - stream_id: Identifier, - }, - CreatedPartitions { - stream_id: Identifier, - topic_id: Identifier, - partitions: Vec<partition::Partition>, - }, - DeletedPartitions { - stream_id: Identifier, - topic_id: Identifier, - partitions_count: u32, - partition_ids: Vec<usize>, - }, - CreatedTopic { - stream_id: Identifier, - topic: topic::Topic, - }, - CreatedConsumerGroup { - stream_id: Identifier, - topic_id: Identifier, - cg: consumer_group::ConsumerGroup, - }, - DeletedConsumerGroup { - id: usize, - stream_id: Identifier, - topic_id: Identifier, - group_id: Identifier, - }, - UpdatedTopic { - stream_id: Identifier, - topic_id: Identifier, - name: String, - message_expiry: IggyExpiry, - compression_algorithm: CompressionAlgorithm, - max_topic_size: MaxTopicSize, - replication_factor: Option<u8>, - }, - PurgedTopic { - stream_id: Identifier, - topic_id: Identifier, - }, - DeletedTopic { - id: usize, - stream_id: Identifier, - topic_id: Identifier, - }, - CreatedUser { - user_id: u32, - username: String, - password: String, - status: UserStatus, - permissions: Option<Permissions>, - }, - UpdatedPermissions { - user_id: Identifier, - permissions: Option<Permissions>, - }, - DeletedUser { - user_id: Identifier, - }, - UpdatedUser { - user_id: Identifier, - username: Option<String>, - status: Option<UserStatus>, - }, - ChangedPassword { - user_id: Identifier, - current_password: String, - new_password: String, - }, - CreatedPersonalAccessToken { - personal_access_token: PersonalAccessToken, - }, - DeletedPersonalAccessToken { - user_id: u32, - name: String, - }, AddressBound { protocol: TransportProtocol, address: SocketAddr,
