This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch arc-swap
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 058ae50ce4b0a4d785d5d12708e898394e603658
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Dec 29 14:26:56 2025 +0100

    commands
---
 .../create_consumer_group_handler.rs               |   8 -
 .../delete_consumer_group_handler.rs               |   8 -
 .../partitions/create_partitions_handler.rs        |   7 -
 .../partitions/delete_partitions_handler.rs        |  10 +-
 .../create_personal_access_token_handler.rs        |   5 -
 .../delete_personal_access_token_handler.rs        |   7 -
 .../handlers/streams/create_stream_handler.rs      |   7 -
 .../handlers/streams/delete_stream_handler.rs      |   7 -
 .../handlers/streams/purge_stream_handler.rs       |   6 +-
 .../handlers/streams/update_stream_handler.rs      |   6 -
 .../binary/handlers/topics/create_topic_handler.rs |  15 +-
 .../binary/handlers/topics/delete_topic_handler.rs |   8 -
 .../binary/handlers/topics/purge_topic_handler.rs  |   6 -
 .../binary/handlers/topics/update_topic_handler.rs |  12 --
 .../handlers/users/change_password_handler.rs      |   8 -
 .../binary/handlers/users/create_user_handler.rs   |  11 -
 .../binary/handlers/users/delete_user_handler.rs   |   3 -
 .../handlers/users/update_permissions_handler.rs   |   6 -
 .../binary/handlers/users/update_user_handler.rs   |   8 -
 core/server/src/http/consumer_groups.rs            |  37 ----
 core/server/src/http/partitions.rs                 |  41 +---
 core/server/src/http/streams.rs                    |  70 +------
 core/server/src/http/topics.rs                     | 114 ++--------
 core/server/src/http/users.rs                      |  89 --------
 core/server/src/shard/handlers.rs                  | 231 +--------------------
 core/server/src/shard/transmission/event.rs        | 105 +---------
 26 files changed, 33 insertions(+), 802 deletions(-)

diff --git 
a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
 
b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
index 0c6e9a763..fed1f0a6f 100644
--- 
a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
@@ -23,7 +23,6 @@ use crate::binary::handlers::consumer_groups::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::shard::IggyShard;
-use crate::shard::transmission::event::ShardEvent;
 use crate::slab::traits_ext::EntityMarker;
 use crate::state::command::EntryCommand;
 use crate::state::models::CreateConsumerGroupWithId;
@@ -57,13 +56,6 @@ impl ServerCommandHandler for CreateConsumerGroup {
         )?;
         let cg_id = cg.id();
 
-        let event = ShardEvent::CreatedConsumerGroup {
-            stream_id: self.stream_id.clone(),
-            topic_id: self.topic_id.clone(),
-            cg,
-        };
-        shard.broadcast_event_to_all_shards(event).await?;
-
         let stream_id = self.stream_id.clone();
         let topic_id = self.topic_id.clone();
         shard
diff --git 
a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
 
b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
index 1da98ac44..3c2800fe7 100644
--- 
a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
@@ -23,7 +23,6 @@ use crate::binary::handlers::consumer_groups::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 
 use crate::shard::IggyShard;
-use crate::shard::transmission::event::ShardEvent;
 use crate::slab::traits_ext::EntityMarker;
 use crate::state::command::EntryCommand;
 use crate::streaming::polling_consumer::ConsumerGroupId;
@@ -102,13 +101,6 @@ impl ServerCommandHandler for DeleteConsumerGroup {
             )
         })?;
 
-        let event = ShardEvent::DeletedConsumerGroup {
-            id: cg_id,
-            stream_id: self.stream_id.clone(),
-            topic_id: self.topic_id.clone(),
-            group_id: self.group_id.clone(),
-        };
-        shard.broadcast_event_to_all_shards(event).await?;
         let stream_id = self.stream_id.clone();
         let topic_id = self.topic_id.clone();
         shard
diff --git 
a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs 
b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
index 0ead0c4ad..3bd65d7bf 100644
--- a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
+++ b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
@@ -23,7 +23,6 @@ use crate::binary::handlers::partitions::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 
 use crate::shard::IggyShard;
-use crate::shard::transmission::event::ShardEvent;
 use crate::slab::traits_ext::EntityMarker;
 use crate::state::command::EntryCommand;
 use crate::streaming::session::Session;
@@ -62,12 +61,6 @@ impl ServerCommandHandler for CreatePartitions {
             )
             .await?;
         let partition_ids = partitions.iter().map(|p| 
p.id()).collect::<Vec<_>>();
-        let event = ShardEvent::CreatedPartitions {
-            stream_id: self.stream_id.clone(),
-            topic_id: self.topic_id.clone(),
-            partitions,
-        };
-        shard.broadcast_event_to_all_shards(event).await?;
 
         shard.streams.with_topic_by_id_mut(
             &self.stream_id,
diff --git 
a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs 
b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
index fe6740136..6e2045493 100644
--- a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
+++ b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
@@ -23,7 +23,6 @@ use crate::binary::handlers::partitions::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 
 use crate::shard::IggyShard;
-use crate::shard::transmission::event::ShardEvent;
 use crate::state::command::EntryCommand;
 use crate::streaming::session::Session;
 use anyhow::Result;
@@ -53,7 +52,7 @@ impl ServerCommandHandler for DeletePartitions {
         // Acquire partition lock to serialize filesystem operations
         let _partition_guard = shard.fs_locks.partition_lock.lock().await;
 
-        let deleted_partition_ids = shard
+        let _deleted_partition_ids = shard
             .delete_partitions(
                 session,
                 &self.stream_id,
@@ -61,13 +60,6 @@ impl ServerCommandHandler for DeletePartitions {
                 self.partitions_count,
             )
             .await?;
-        let event = ShardEvent::DeletedPartitions {
-            stream_id: self.stream_id.clone(),
-            topic_id: self.topic_id.clone(),
-            partitions_count: self.partitions_count,
-            partition_ids: deleted_partition_ids,
-        };
-        shard.broadcast_event_to_all_shards(event).await?;
 
         let remaining_partition_ids = shard.streams.with_topic_by_id(
             &self.stream_id,
diff --git 
a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
 
b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
index c52ddd4eb..6a0ec00ed 100644
--- 
a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
+++ 
b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
@@ -24,7 +24,6 @@ use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 
 use crate::shard::IggyShard;
-use crate::shard::transmission::event::ShardEvent;
 use crate::state::command::EntryCommand;
 use crate::state::models::CreatePersonalAccessTokenWithHash;
 use crate::streaming::session::Session;
@@ -60,10 +59,6 @@ impl ServerCommandHandler for CreatePersonalAccessToken {
                 })?;
         let bytes = mapper::map_raw_pat(&token);
         let hash = personal_access_token.token.to_string();
-        let event = ShardEvent::CreatedPersonalAccessToken {
-            personal_access_token: personal_access_token.clone(),
-        };
-        shard.broadcast_event_to_all_shards(event).await?;
 
         shard
             .state
diff --git 
a/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
 
b/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
index 9a588ccbe..19b472fb3 100644
--- 
a/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
+++ 
b/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
@@ -53,13 +53,6 @@ impl ServerCommandHandler for DeletePersonalAccessToken {
                     "{COMPONENT} (error: {error}) - failed to delete personal 
access token with name: {token_name}, session: {session}"
                 )})?;
 
-        // Broadcast the event to other shards
-        let event = 
crate::shard::transmission::event::ShardEvent::DeletedPersonalAccessToken {
-            user_id: session.get_user_id(),
-            name: self.name.clone(),
-        };
-        shard.broadcast_event_to_all_shards(event).await?;
-
         shard
             .state
             .apply(
diff --git a/core/server/src/binary/handlers/streams/create_stream_handler.rs 
b/core/server/src/binary/handlers/streams/create_stream_handler.rs
index e56f87e64..83990d9eb 100644
--- a/core/server/src/binary/handlers/streams/create_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/create_stream_handler.rs
@@ -24,7 +24,6 @@ use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 
 use crate::shard::IggyShard;
-use crate::shard::transmission::event::ShardEvent;
 use crate::shard::transmission::frame::ShardResponse;
 use crate::shard::transmission::message::{
     ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult,
@@ -77,12 +76,6 @@ impl ServerCommandHandler for CreateStream {
                     let stream = shard.create_stream(session, name).await?;
                     let created_stream_id = stream.id();
 
-                    let event = ShardEvent::CreatedStream {
-                        id: created_stream_id,
-                        stream: stream.clone(),
-                    };
-                    shard.broadcast_event_to_all_shards(event).await?;
-
                     let response = shard
                         .streams
                         .with_components_by_id(created_stream_id, |(root, 
stats)| {
diff --git a/core/server/src/binary/handlers/streams/delete_stream_handler.rs 
b/core/server/src/binary/handlers/streams/delete_stream_handler.rs
index 76bc9f8ca..c1ae46336 100644
--- a/core/server/src/binary/handlers/streams/delete_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/delete_stream_handler.rs
@@ -22,7 +22,6 @@ use crate::binary::command::{
 use crate::binary::handlers::streams::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::shard::IggyShard;
-use crate::shard::transmission::event::ShardEvent;
 use crate::shard::transmission::frame::ShardResponse;
 use crate::shard::transmission::message::{
     ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult,
@@ -83,12 +82,6 @@ impl ServerCommandHandler for DeleteStream {
                         stream.id()
                     );
 
-                    let event = ShardEvent::DeletedStream {
-                        id: stream.id(),
-                        stream_id: stream_id.clone(),
-                    };
-                    shard.broadcast_event_to_all_shards(event).await?;
-
                     shard
                         .state
                         .apply(session.get_user_id(), 
&EntryCommand::DeleteStream(DeleteStream { stream_id: stream_id.clone() }))
diff --git a/core/server/src/binary/handlers/streams/purge_stream_handler.rs 
b/core/server/src/binary/handlers/streams/purge_stream_handler.rs
index b8d3cfc0c..fd26111e7 100644
--- a/core/server/src/binary/handlers/streams/purge_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/purge_stream_handler.rs
@@ -23,7 +23,6 @@ use crate::binary::handlers::streams::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 
 use crate::shard::IggyShard;
-use crate::shard::transmission::event::ShardEvent;
 use crate::state::command::EntryCommand;
 use crate::streaming::session::Session;
 use anyhow::Result;
@@ -54,10 +53,7 @@ impl ServerCommandHandler for PurgeStream {
             .with_error(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to purge stream 
with id: {stream_id}, session: {session}")
             })?;
-        let event = ShardEvent::PurgedStream {
-            stream_id: self.stream_id.clone(),
-        };
-        shard.broadcast_event_to_all_shards(event).await?;
+
         shard
             .state
             .apply(session.get_user_id(), &EntryCommand::PurgeStream(self))
diff --git a/core/server/src/binary/handlers/streams/update_stream_handler.rs 
b/core/server/src/binary/handlers/streams/update_stream_handler.rs
index c080412a7..7168cae2d 100644
--- a/core/server/src/binary/handlers/streams/update_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/update_stream_handler.rs
@@ -23,7 +23,6 @@ use crate::binary::handlers::streams::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 
 use crate::shard::IggyShard;
-use crate::shard::transmission::event::ShardEvent;
 use crate::state::command::EntryCommand;
 use crate::streaming::session::Session;
 use anyhow::Result;
@@ -54,11 +53,6 @@ impl ServerCommandHandler for UpdateStream {
             format!("{COMPONENT} (error: {error}) - failed to update stream 
with id: {stream_id}, session: {session}")
         })?;
 
-        let event = ShardEvent::UpdatedStream {
-            stream_id: self.stream_id.clone(),
-            name: self.name.clone(),
-        };
-        shard.broadcast_event_to_all_shards(event).await?;
         shard
             .state
             .apply(session.get_user_id(), &EntryCommand::UpdateStream(self))
diff --git a/core/server/src/binary/handlers/topics/create_topic_handler.rs 
b/core/server/src/binary/handlers/topics/create_topic_handler.rs
index b5b5c96a2..b15b9c244 100644
--- a/core/server/src/binary/handlers/topics/create_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/create_topic_handler.rs
@@ -24,7 +24,6 @@ use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 
 use crate::shard::IggyShard;
-use crate::shard::transmission::event::ShardEvent;
 use crate::shard::transmission::frame::ShardResponse;
 use crate::shard::transmission::message::{
     ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult,
@@ -109,13 +108,7 @@ impl ServerCommandHandler for CreateTopic {
                         .with_stream_by_id(&stream_id, 
streams::helpers::get_stream_id());
                     let topic_id = topic.id();
 
-                    let event = ShardEvent::CreatedTopic {
-                        stream_id: stream_id.clone(),
-                        topic,
-                    };
-
-                    shard.broadcast_event_to_all_shards(event).await?;
-                    let partitions = shard
+                    shard
                         .create_partitions(
                             session,
                             &stream_id,
@@ -123,12 +116,6 @@ impl ServerCommandHandler for CreateTopic {
                             partitions_count,
                         )
                         .await?;
-                    let event = ShardEvent::CreatedPartitions {
-                        stream_id: stream_id.clone(),
-                        topic_id: Identifier::numeric(topic_id as 
u32).unwrap(),
-                        partitions,
-                    };
-                    shard.broadcast_event_to_all_shards(event).await?;
 
                     let response = shard.streams.with_topic_by_id(
                         &stream_id,
diff --git a/core/server/src/binary/handlers/topics/delete_topic_handler.rs 
b/core/server/src/binary/handlers/topics/delete_topic_handler.rs
index ea3d21cdd..59b8394b7 100644
--- a/core/server/src/binary/handlers/topics/delete_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/delete_topic_handler.rs
@@ -23,7 +23,6 @@ use crate::binary::handlers::topics::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 
 use crate::shard::IggyShard;
-use crate::shard::transmission::event::ShardEvent;
 use crate::shard::transmission::frame::ShardResponse;
 use crate::shard::transmission::message::{
     ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult,
@@ -91,13 +90,6 @@ impl ServerCommandHandler for DeleteTopic {
                         stream_id_num
                     );
 
-                    let event = ShardEvent::DeletedTopic {
-                        id: topic_id_num,
-                        stream_id: stream_id.clone(),
-                        topic_id: topic_id.clone(),
-                    };
-                    shard.broadcast_event_to_all_shards(event).await?;
-
                     shard
                         .state
                         .apply(session.get_user_id(), 
&EntryCommand::DeleteTopic(self))
diff --git a/core/server/src/binary/handlers/topics/purge_topic_handler.rs 
b/core/server/src/binary/handlers/topics/purge_topic_handler.rs
index 2cea843ed..8c725f960 100644
--- a/core/server/src/binary/handlers/topics/purge_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/purge_topic_handler.rs
@@ -58,12 +58,6 @@ impl ServerCommandHandler for PurgeTopic {
                 )
             })?;
 
-        let event = crate::shard::transmission::event::ShardEvent::PurgedTopic 
{
-            stream_id: self.stream_id.clone(),
-            topic_id: self.topic_id.clone(),
-        };
-        shard.broadcast_event_to_all_shards(event).await?;
-
         shard
             .state
             .apply(session.get_user_id(), &EntryCommand::PurgeTopic(self))
diff --git a/core/server/src/binary/handlers/topics/update_topic_handler.rs 
b/core/server/src/binary/handlers/topics/update_topic_handler.rs
index 6dfa2281d..0f4ae3a8a 100644
--- a/core/server/src/binary/handlers/topics/update_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/update_topic_handler.rs
@@ -23,7 +23,6 @@ use crate::binary::handlers::topics::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 
 use crate::shard::IggyShard;
-use crate::shard::transmission::event::ShardEvent;
 use crate::shard::transmission::frame::ShardResponse;
 use crate::shard::transmission::message::{
     ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult,
@@ -117,17 +116,6 @@ impl ServerCommandHandler for UpdateTopic {
                         .streams
                         .with_stream_by_id(&stream_id, 
streams::helpers::get_stream_id());
 
-                    let event = ShardEvent::UpdatedTopic {
-                        stream_id: stream_id.clone(),
-                        topic_id: topic_id.clone(),
-                        name: name.clone(),
-                        message_expiry: self.message_expiry,
-                        compression_algorithm: self.compression_algorithm,
-                        max_topic_size: self.max_topic_size,
-                        replication_factor: self.replication_factor,
-                    };
-                    shard.broadcast_event_to_all_shards(event).await?;
-
                     shard
                         .state
                         .apply(session.get_user_id(), 
&EntryCommand::UpdateTopic(self))
diff --git a/core/server/src/binary/handlers/users/change_password_handler.rs 
b/core/server/src/binary/handlers/users/change_password_handler.rs
index dc512e172..77e4ef47d 100644
--- a/core/server/src/binary/handlers/users/change_password_handler.rs
+++ b/core/server/src/binary/handlers/users/change_password_handler.rs
@@ -23,7 +23,6 @@ use crate::binary::handlers::users::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 
 use crate::shard::IggyShard;
-use crate::shard::transmission::event::ShardEvent;
 use crate::state::command::EntryCommand;
 use crate::streaming::session::Session;
 use crate::streaming::utils::crypto;
@@ -67,13 +66,6 @@ impl ServerCommandHandler for ChangePassword {
 
         info!("Changed password for user with ID: {}.", self.user_id);
 
-        let event = ShardEvent::ChangedPassword {
-            user_id: self.user_id.clone(),
-            current_password: self.current_password.clone(),
-            new_password: self.new_password.clone(),
-        };
-        shard.broadcast_event_to_all_shards(event).await?;
-
         // For the security of the system, we hash the password before storing 
it in metadata.
         shard
             .state
diff --git a/core/server/src/binary/handlers/users/create_user_handler.rs 
b/core/server/src/binary/handlers/users/create_user_handler.rs
index 0caa873f1..1b5cc190e 100644
--- a/core/server/src/binary/handlers/users/create_user_handler.rs
+++ b/core/server/src/binary/handlers/users/create_user_handler.rs
@@ -23,7 +23,6 @@ use crate::binary::handlers::users::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::shard::IggyShard;
-use crate::shard::transmission::event::ShardEvent;
 use crate::shard::transmission::frame::ShardResponse;
 use crate::shard::transmission::message::{
     ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult,
@@ -91,16 +90,6 @@ impl ServerCommandHandler for CreateUser {
                         })?;
 
                     let user_id = user.id;
-
-                    let event = ShardEvent::CreatedUser {
-                        user_id,
-                        username: username.clone(),
-                        password: password.clone(),
-                        status,
-                        permissions: permissions.clone(),
-                    };
-                    shard.broadcast_event_to_all_shards(event).await?;
-
                     let response = mapper::map_user(&user);
 
                     shard
diff --git a/core/server/src/binary/handlers/users/delete_user_handler.rs 
b/core/server/src/binary/handlers/users/delete_user_handler.rs
index 55b0f103d..42b74de87 100644
--- a/core/server/src/binary/handlers/users/delete_user_handler.rs
+++ b/core/server/src/binary/handlers/users/delete_user_handler.rs
@@ -25,7 +25,6 @@ use crate::binary::handlers::users::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 
 use crate::shard::IggyShard;
-use crate::shard::transmission::event::ShardEvent;
 use crate::shard::transmission::frame::ShardResponse;
 use crate::shard::transmission::message::{
     ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult,
@@ -81,8 +80,6 @@ impl ServerCommandHandler for DeleteUser {
                     })?;
 
                     info!("Deleted user: {} with ID: {}.", user.username, 
user.id);
-                    let event = ShardEvent::DeletedUser { user_id };
-                    shard.broadcast_event_to_all_shards(event).await?;
 
                     shard
                         .state
diff --git 
a/core/server/src/binary/handlers/users/update_permissions_handler.rs 
b/core/server/src/binary/handlers/users/update_permissions_handler.rs
index 948ce8e41..4100da70b 100644
--- a/core/server/src/binary/handlers/users/update_permissions_handler.rs
+++ b/core/server/src/binary/handlers/users/update_permissions_handler.rs
@@ -25,7 +25,6 @@ use crate::binary::handlers::users::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 
 use crate::shard::IggyShard;
-use crate::shard::transmission::event::ShardEvent;
 use crate::state::command::EntryCommand;
 use crate::streaming::session::Session;
 use anyhow::Result;
@@ -56,11 +55,6 @@ impl ServerCommandHandler for UpdatePermissions {
                     self.user_id
                 ))?;
         info!("Updated permissions for user with ID: {}.", self.user_id);
-        let event = ShardEvent::UpdatedPermissions {
-            user_id: self.user_id.clone(),
-            permissions: self.permissions.clone(),
-        };
-        shard.broadcast_event_to_all_shards(event).await?;
 
         shard
             .state
diff --git a/core/server/src/binary/handlers/users/update_user_handler.rs 
b/core/server/src/binary/handlers/users/update_user_handler.rs
index f4e12a0f5..fae459941 100644
--- a/core/server/src/binary/handlers/users/update_user_handler.rs
+++ b/core/server/src/binary/handlers/users/update_user_handler.rs
@@ -25,7 +25,6 @@ use crate::binary::handlers::users::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 
 use crate::shard::IggyShard;
-use crate::shard::transmission::event::ShardEvent;
 use crate::state::command::EntryCommand;
 use crate::streaming::session::Session;
 use anyhow::Result;
@@ -66,13 +65,6 @@ impl ServerCommandHandler for UpdateUser {
 
         info!("Updated user: {} with ID: {}.", user.username, user.id);
 
-        let event = ShardEvent::UpdatedUser {
-            user_id: self.user_id.clone(),
-            username: self.username.clone(),
-            status: self.status,
-        };
-        shard.broadcast_event_to_all_shards(event).await?;
-
         let user_id = self.user_id.clone();
         shard
             .state
diff --git a/core/server/src/http/consumer_groups.rs 
b/core/server/src/http/consumer_groups.rs
index e1e9b2f7e..278425796 100644
--- a/core/server/src/http/consumer_groups.rs
+++ b/core/server/src/http/consumer_groups.rs
@@ -180,24 +180,6 @@ async fn create_consumer_group(
 
     let group_id = consumer_group.id();
 
-    // Send event for consumer group creation
-    {
-        let broadcast_future = SendWrapper::new(async {
-            use crate::shard::transmission::event::ShardEvent;
-            let event = ShardEvent::CreatedConsumerGroup {
-                stream_id: command.stream_id.clone(),
-                topic_id: command.topic_id.clone(),
-                cg: consumer_group.clone(),
-            };
-            let _responses = state
-                .shard
-                .shard()
-                .broadcast_event_to_all_shards(event)
-                .await;
-        });
-        broadcast_future.await;
-    }
-
     // Get the created consumer group details
     let group_id_identifier = Identifier::numeric(group_id as u32).unwrap();
     let consumer_group_details = 
state.shard.shard().streams.with_consumer_group_by_id(
@@ -296,25 +278,6 @@ async fn delete_consumer_group(
             )
         })?;
 
-        // Send event for consumer group deletion
-        {
-            let broadcast_future = SendWrapper::new(async {
-                use crate::shard::transmission::event::ShardEvent;
-                let event = ShardEvent::DeletedConsumerGroup {
-                    id: cg_id,
-                    stream_id: identifier_stream_id.clone(),
-                    topic_id: identifier_topic_id.clone(),
-                    group_id: identifier_group_id.clone(),
-                };
-                let _responses = state
-                    .shard
-                    .shard()
-                    .broadcast_event_to_all_shards(event)
-                    .await;
-            });
-            broadcast_future.await;
-        }
-
         // Apply state change
         let entry_command = 
EntryCommand::DeleteConsumerGroup(DeleteConsumerGroup {
             stream_id: identifier_stream_id,
diff --git a/core/server/src/http/partitions.rs 
b/core/server/src/http/partitions.rs
index 46c842761..4e5f4e97b 100644
--- a/core/server/src/http/partitions.rs
+++ b/core/server/src/http/partitions.rs
@@ -20,7 +20,6 @@ use crate::http::COMPONENT;
 use crate::http::error::CustomError;
 use crate::http::jwt::json_web_token::Identity;
 use crate::http::shared::AppState;
-use crate::shard::transmission::event::ShardEvent;
 use crate::state::command::EntryCommand;
 use crate::streaming::session::Session;
 use axum::extract::{Path, Query, State};
@@ -59,7 +58,7 @@ async fn create_partitions(
 
     let _parititon_guard = 
state.shard.shard().fs_locks.partition_lock.lock().await;
     let session = Session::stateless(identity.user_id, identity.ip_address);
-    let partitions = SendWrapper::new(state.shard.shard().create_partitions(
+    let _partitions = SendWrapper::new(state.shard.shard().create_partitions(
         &session,
         &command.stream_id,
         &command.topic_id,
@@ -67,24 +66,6 @@ async fn create_partitions(
     ))
     .await?;
 
-    let broadcast_future = SendWrapper::new(async {
-        let shard = state.shard.shard();
-
-        let event = ShardEvent::CreatedPartitions {
-            stream_id: command.stream_id.clone(),
-            topic_id: command.topic_id.clone(),
-            partitions,
-        };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
-        Ok::<(), CustomError>(())
-    });
-
-    broadcast_future.await
-            .with_error(|error| {
-                format!(
-                    "{COMPONENT} (error: {error}) - failed to broadcast 
partition events, stream ID: {stream_id}, topic ID: {topic_id}"
-                )
-            })?;
     let command = EntryCommand::CreatePartitions(command);
     let state_future =
         SendWrapper::new(state.shard.shard().state.apply(identity.user_id, 
&command));
@@ -112,7 +93,7 @@ async fn delete_partitions(
     query.validate()?;
 
     let session = Session::stateless(identity.user_id, identity.ip_address);
-    let deleted_partition_ids = {
+    {
         let delete_future = 
SendWrapper::new(state.shard.shard().delete_partitions(
             &session,
             &query.stream_id,
@@ -127,24 +108,6 @@ async fn delete_partitions(
         })?
     };
 
-    // Send event for partition deletion
-    {
-        let broadcast_future = SendWrapper::new(async {
-            let event = ShardEvent::DeletedPartitions {
-                stream_id: query.stream_id.clone(),
-                topic_id: query.topic_id.clone(),
-                partitions_count: query.partitions_count,
-                partition_ids: deleted_partition_ids,
-            };
-            let _responses = state
-                .shard
-                .shard()
-                .broadcast_event_to_all_shards(event)
-                .await;
-        });
-        broadcast_future.await;
-    }
-
     let command = EntryCommand::DeletePartitions(DeletePartitions {
         stream_id: query.stream_id.clone(),
         topic_id: query.topic_id.clone(),
diff --git a/core/server/src/http/streams.rs b/core/server/src/http/streams.rs
index 2a66ea1d1..0c49feab3 100644
--- a/core/server/src/http/streams.rs
+++ b/core/server/src/http/streams.rs
@@ -117,20 +117,6 @@ async fn create_stream(
 
         let created_stream_id = stream.root().id();
 
-        // Send event for stream creation - inlined from wrapper
-        {
-            use crate::shard::transmission::event::ShardEvent;
-            let event = ShardEvent::CreatedStream {
-                id: created_stream_id,
-                stream,
-            };
-            let _responses = state
-                .shard
-                .shard()
-                .broadcast_event_to_all_shards(event)
-                .await;
-        }
-
         // Apply state change using wrapper method
         let entry_command = EntryCommand::CreateStream(CreateStreamWithId {
             stream_id: created_stream_id as u32,
@@ -189,23 +175,6 @@ async fn update_stream(
                 )
             })?;
 
-        // Send event for stream update
-        {
-            let broadcast_future = SendWrapper::new(async {
-                use crate::shard::transmission::event::ShardEvent;
-                let event = ShardEvent::UpdatedStream {
-                    stream_id: command.stream_id.clone(),
-                    name: command.name.clone(),
-                };
-                let _responses = state
-                    .shard
-                    .shard()
-                    .broadcast_event_to_all_shards(event)
-                    .await;
-            });
-            broadcast_future.await;
-        }
-
         // Apply state change using wrapper method
         let entry_command = EntryCommand::UpdateStream(command);
         state.shard.apply_state(identity.user_id, 
&entry_command).await.with_error(|error| {
@@ -232,8 +201,8 @@ async fn delete_stream(
         let session = Session::stateless(identity.user_id, 
identity.ip_address);
 
         let _stream_guard = 
state.shard.shard().fs_locks.stream_lock.lock().await;
-        // Delete stream and get the stream entity
-        let stream = {
+        // Delete stream
+        {
             let future = SendWrapper::new(
                 state
                     .shard
@@ -246,25 +215,6 @@ async fn delete_stream(
             format!("{COMPONENT} (error: {error}) - failed to delete stream 
with ID: {stream_id}",)
         })?;
 
-        let stream_id_numeric = stream.root().id();
-
-        // Send event for stream deletion
-        {
-            let broadcast_future = SendWrapper::new(async {
-                use crate::shard::transmission::event::ShardEvent;
-                let event = ShardEvent::DeletedStream {
-                    id: stream_id_numeric,
-                    stream_id: identifier_stream_id.clone(),
-                };
-                let _responses = state
-                    .shard
-                    .shard()
-                    .broadcast_event_to_all_shards(event)
-                    .await;
-            });
-            broadcast_future.await;
-        }
-
         // Apply state change using wrapper method
         let entry_command = EntryCommand::DeleteStream(DeleteStream {
             stream_id: identifier_stream_id,
@@ -304,22 +254,6 @@ async fn purge_stream(
                 )
             })?;
 
-        // Send event for stream purge
-        {
-            let broadcast_future = SendWrapper::new(async {
-                use crate::shard::transmission::event::ShardEvent;
-                let event = ShardEvent::PurgedStream {
-                    stream_id: identifier_stream_id.clone(),
-                };
-                let _responses = state
-                    .shard
-                    .shard()
-                    .broadcast_event_to_all_shards(event)
-                    .await;
-            });
-            broadcast_future.await;
-        }
-
         // Apply state change using wrapper method
         let entry_command = EntryCommand::PurgeStream(PurgeStream {
             stream_id: identifier_stream_id,
diff --git a/core/server/src/http/topics.rs b/core/server/src/http/topics.rs
index 551eed602..f4586a869 100644
--- a/core/server/src/http/topics.rs
+++ b/core/server/src/http/topics.rs
@@ -202,44 +202,23 @@ async fn create_topic(
 
     let topic_id = topic.id();
 
-    // Send events for topic creation
-    let broadcast_future = SendWrapper::new(async {
-        use crate::shard::transmission::event::ShardEvent;
-
+    // Create partitions
+    {
         let shard = state.shard.shard();
-
-        let event = ShardEvent::CreatedTopic {
-            stream_id: command.stream_id.clone(),
-            topic,
-        };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
-
-        // Create partitions
-        let partitions = shard
-            .create_partitions(
-                &session,
-                &command.stream_id,
-                &Identifier::numeric(topic_id as u32).unwrap(),
-                command.partitions_count,
-            )
-            .await?;
-
-        let event = ShardEvent::CreatedPartitions {
-            stream_id: command.stream_id.clone(),
-            topic_id: Identifier::numeric(topic_id as u32).unwrap(),
-            partitions,
-        };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
-
-        Ok::<(), CustomError>(())
-    });
-
-    broadcast_future.await
-        .with_error(|error| {
-            format!(
-                "{COMPONENT} (error: {error}) - failed to broadcast topic 
events, stream ID: {stream_id}"
-            )
-        })?;
+        let topic_identifier = Identifier::numeric(topic_id as u32).unwrap();
+        let future = SendWrapper::new(shard.create_partitions(
+            &session,
+            &command.stream_id,
+            &topic_identifier,
+            command.partitions_count,
+        ));
+        future.await
+    }
+    .with_error(|error| {
+        format!(
+            "{COMPONENT} (error: {error}) - failed to create partitions, 
stream ID: {stream_id}"
+        )
+    })?;
 
     // Create response using the same approach as binary handler
     let response = {
@@ -320,28 +299,6 @@ async fn update_topic(
         |(root, _, _)| (root.message_expiry(), root.max_topic_size()),
     );
 
-    // Send event for topic update
-    {
-        let broadcast_future = SendWrapper::new(async {
-            use crate::shard::transmission::event::ShardEvent;
-            let event = ShardEvent::UpdatedTopic {
-                stream_id: command.stream_id.clone(),
-                topic_id: command.topic_id.clone(),
-                name: command.name.clone(),
-                message_expiry: command.message_expiry,
-                compression_algorithm: command.compression_algorithm,
-                max_topic_size: command.max_topic_size,
-                replication_factor: command.replication_factor,
-            };
-            let _responses = state
-                .shard
-                .shard()
-                .broadcast_event_to_all_shards(event)
-                .await;
-        });
-        broadcast_future.await;
-    }
-
     command.message_expiry = message_expiry;
     command.max_topic_size = max_topic_size;
 
@@ -372,7 +329,7 @@ async fn delete_topic(
     let session = Session::stateless(identity.user_id, identity.ip_address);
     let _topic_guard = state.shard.shard().fs_locks.topic_lock.lock().await;
 
-    let topic = {
+    {
         let future = SendWrapper::new(state.shard.shard().delete_topic(
             &session,
             &identifier_stream_id,
@@ -385,26 +342,6 @@ async fn delete_topic(
         )
     })?;
 
-    let topic_id_numeric = topic.root().id();
-
-    // Send event for topic deletion
-    {
-        let broadcast_future = SendWrapper::new(async {
-            use crate::shard::transmission::event::ShardEvent;
-            let event = ShardEvent::DeletedTopic {
-                id: topic_id_numeric,
-                stream_id: identifier_stream_id.clone(),
-                topic_id: identifier_topic_id.clone(),
-            };
-            let _responses = state
-                .shard
-                .shard()
-                .broadcast_event_to_all_shards(event)
-                .await;
-        });
-        broadcast_future.await;
-    }
-
     {
         let entry_command = EntryCommand::DeleteTopic(DeleteTopic {
             stream_id: identifier_stream_id,
@@ -450,23 +387,6 @@ async fn purge_topic(
         )
     })?;
 
-    // Send event for topic purge
-    {
-        let broadcast_future = SendWrapper::new(async {
-            use crate::shard::transmission::event::ShardEvent;
-            let event = ShardEvent::PurgedTopic {
-                stream_id: identifier_stream_id.clone(),
-                topic_id: identifier_topic_id.clone(),
-            };
-            let _responses = state
-                .shard
-                .shard()
-                .broadcast_event_to_all_shards(event)
-                .await;
-        });
-        broadcast_future.await;
-    }
-
     {
         let entry_command = EntryCommand::PurgeTopic(PurgeTopic {
             stream_id: identifier_stream_id,
diff --git a/core/server/src/http/users.rs b/core/server/src/http/users.rs
index 4389c6d87..ff49dae6c 100644
--- a/core/server/src/http/users.rs
+++ b/core/server/src/http/users.rs
@@ -137,26 +137,6 @@ async fn create_user(
     let user_id = user.id;
     let response = Json(mapper::map_user(&user));
 
-    // Send event for user creation
-    {
-        let broadcast_future = SendWrapper::new(async {
-            use crate::shard::transmission::event::ShardEvent;
-            let event = ShardEvent::CreatedUser {
-                user_id,
-                username: command.username.to_owned(),
-                password: command.password.to_owned(),
-                status: command.status,
-                permissions: command.permissions.clone(),
-            };
-            let _responses = state
-                .shard
-                .shard()
-                .broadcast_event_to_all_shards(event)
-                .await;
-        });
-        broadcast_future.await;
-    }
-
     {
         let username = command.username.clone();
         let entry_command = EntryCommand::CreateUser(CreateUserWithId {
@@ -212,24 +192,6 @@ async fn update_user(
             format!("{COMPONENT} (error: {error}) - failed to update user, 
user ID: {user_id}")
         })?;
 
-    // Send event for user update
-    {
-        let broadcast_future = SendWrapper::new(async {
-            use crate::shard::transmission::event::ShardEvent;
-            let event = ShardEvent::UpdatedUser {
-                user_id: command.user_id.clone(),
-                username: command.username.clone(),
-                status: command.status,
-            };
-            let _responses = state
-                .shard
-                .shard()
-                .broadcast_event_to_all_shards(event)
-                .await;
-        });
-        broadcast_future.await;
-    }
-
     {
         let username = command.username.clone();
         let entry_command = EntryCommand::UpdateUser(command);
@@ -273,23 +235,6 @@ async fn update_permissions(
             )
         })?;
 
-    // Send event for permissions update
-    {
-        let broadcast_future = SendWrapper::new(async {
-            use crate::shard::transmission::event::ShardEvent;
-            let event = ShardEvent::UpdatedPermissions {
-                user_id: command.user_id.clone(),
-                permissions: command.permissions.clone(),
-            };
-            let _responses = state
-                .shard
-                .shard()
-                .broadcast_event_to_all_shards(event)
-                .await;
-        });
-        broadcast_future.await;
-    }
-
     {
         let entry_command = EntryCommand::UpdatePermissions(command);
         let future = SendWrapper::new(
@@ -334,24 +279,6 @@ async fn change_password(
             format!("{COMPONENT} (error: {error}) - failed to change password, 
user ID: {user_id}")
         })?;
 
-    // Send event for password change
-    {
-        let broadcast_future = SendWrapper::new(async {
-            use crate::shard::transmission::event::ShardEvent;
-            let event = ShardEvent::ChangedPassword {
-                user_id: command.user_id.clone(),
-                current_password: command.current_password.clone(),
-                new_password: command.new_password.clone(),
-            };
-            let _responses = state
-                .shard
-                .shard()
-                .broadcast_event_to_all_shards(event)
-                .await;
-        });
-        broadcast_future.await;
-    }
-
     {
         let entry_command = EntryCommand::ChangePassword(command);
         let future = SendWrapper::new(
@@ -390,22 +317,6 @@ async fn delete_user(
             format!("{COMPONENT} (error: {error}) - failed to delete user with 
ID: {user_id}")
         })?;
 
-    // Send event for user deletion
-    {
-        let broadcast_future = SendWrapper::new(async {
-            use crate::shard::transmission::event::ShardEvent;
-            let event = ShardEvent::DeletedUser {
-                user_id: identifier_user_id.clone(),
-            };
-            let _responses = state
-                .shard
-                .shard()
-                .broadcast_event_to_all_shards(event)
-                .await;
-        });
-        broadcast_future.await;
-    }
-
     {
         let entry_command = EntryCommand::DeleteUser(DeleteUser {
             user_id: identifier_user_id,
diff --git a/core/server/src/shard/handlers.rs 
b/core/server/src/shard/handlers.rs
index 81c54a98a..c1aed5c92 100644
--- a/core/server/src/shard/handlers.rs
+++ b/core/server/src/shard/handlers.rs
@@ -120,14 +120,6 @@ async fn handle_request(
             let _stream_guard = shard.fs_locks.stream_lock.lock().await;
 
             let stream = shard.create_stream(&session, name.clone()).await?;
-            let created_stream_id = stream.id();
-
-            let event = ShardEvent::CreatedStream {
-                id: created_stream_id,
-                stream: stream.clone(),
-            };
-
-            shard.broadcast_event_to_all_shards(event).await?;
 
             Ok(ShardResponse::CreateStreamResponse(stream))
         }
@@ -164,12 +156,8 @@ async fn handle_request(
                 .await?;
 
             let topic_id = topic.id();
-            let event = ShardEvent::CreatedTopic {
-                stream_id: stream_id.clone(),
-                topic: topic.clone(),
-            };
-            shard.broadcast_event_to_all_shards(event).await?;
-            let partitions = shard
+
+            shard
                 .create_partitions(
                     &session,
                     &stream_id,
@@ -178,13 +166,6 @@ async fn handle_request(
                 )
                 .await?;
 
-            let event = ShardEvent::CreatedPartitions {
-                stream_id: stream_id.clone(),
-                topic_id: Identifier::numeric(topic_id as u32).unwrap(),
-                partitions,
-            };
-            shard.broadcast_event_to_all_shards(event).await?;
-
             Ok(ShardResponse::CreateTopicResponse(topic))
         }
         ShardRequestPayload::UpdateTopic {
@@ -215,17 +196,6 @@ async fn handle_request(
                 replication_factor,
             )?;
 
-            let event = ShardEvent::UpdatedTopic {
-                stream_id: stream_id.clone(),
-                topic_id: topic_id.clone(),
-                name,
-                message_expiry,
-                compression_algorithm,
-                max_topic_size,
-                replication_factor,
-            };
-            shard.broadcast_event_to_all_shards(event).await?;
-
             Ok(ShardResponse::UpdateTopicResponse)
         }
         ShardRequestPayload::DeleteTopic {
@@ -242,14 +212,6 @@ async fn handle_request(
 
             let _topic_guard = shard.fs_locks.topic_lock.lock().await;
             let topic = shard.delete_topic(&session, &stream_id, 
&topic_id).await?;
-            let topic_id_num = topic.root().id();
-
-            let event = ShardEvent::DeletedTopic {
-                id: topic_id_num,
-                stream_id: stream_id.clone(),
-                topic_id: topic_id.clone(),
-            };
-            shard.broadcast_event_to_all_shards(event).await?;
 
             Ok(ShardResponse::DeleteTopicResponse(topic))
         }
@@ -270,16 +232,6 @@ async fn handle_request(
             let user =
                 shard.create_user(&session, &username, &password, status, 
permissions.clone())?;
 
-            let created_user_id = user.id;
-
-            let event = ShardEvent::CreatedUser {
-                user_id: created_user_id,
-                username: username.clone(),
-                password: password.clone(),
-                status,
-                permissions: permissions.clone(),
-            };
-            shard.broadcast_event_to_all_shards(event).await?;
             Ok(ShardResponse::CreateUserResponse(user))
         }
         ShardRequestPayload::GetStats { .. } => {
@@ -299,8 +251,6 @@ async fn handle_request(
             );
             let _user_guard = shard.fs_locks.user_lock.lock().await;
             let user = shard.delete_user(&session, &user_id)?;
-            let event = ShardEvent::DeletedUser { user_id };
-            shard.broadcast_event_to_all_shards(event).await?;
             Ok(ShardResponse::DeletedUser(user))
         }
         ShardRequestPayload::DeleteStream { user_id, stream_id } => {
@@ -312,11 +262,6 @@ async fn handle_request(
             );
             let _stream_guard = shard.fs_locks.stream_lock.lock().await;
             let stream = shard.delete_stream(&session, &stream_id).await?;
-            let event = ShardEvent::DeletedStream {
-                id: stream.id(),
-                stream_id,
-            };
-            shard.broadcast_event_to_all_shards(event).await?;
             Ok(ShardResponse::DeleteStreamResponse(stream))
         }
         ShardRequestPayload::SocketTransfer {
@@ -402,86 +347,15 @@ async fn handle_request(
 
 pub async fn handle_event(shard: &Rc<IggyShard>, event: ShardEvent) -> 
Result<(), IggyError> {
     match event {
-        ShardEvent::DeletedPartitions {
-            stream_id,
-            topic_id,
-            partitions_count,
-            partition_ids,
-        } => {
-            shard.delete_partitions_bypass_auth(
-                &stream_id,
-                &topic_id,
-                partitions_count,
-                partition_ids,
-            )?;
-            Ok(())
-        }
-        ShardEvent::UpdatedStream { stream_id, name } => {
-            shard.update_stream_bypass_auth(&stream_id, &name)?;
-            Ok(())
-        }
-        ShardEvent::PurgedStream { stream_id } => {
-            shard.purge_stream_bypass_auth(&stream_id).await?;
-            Ok(())
-        }
-        ShardEvent::PurgedTopic {
+        ShardEvent::FlushUnsavedBuffer {
             stream_id,
             topic_id,
+            partition_id,
+            fsync,
         } => {
-            shard.purge_topic_bypass_auth(&stream_id, &topic_id).await?;
-            Ok(())
-        }
-        ShardEvent::CreatedUser {
-            user_id,
-            username,
-            password,
-            status,
-            permissions,
-        } => {
-            shard.create_user_bypass_auth(
-                user_id,
-                &username,
-                &password,
-                status,
-                permissions.clone(),
-            )?;
-            Ok(())
-        }
-        ShardEvent::DeletedUser { user_id } => {
-            shard.delete_user_bypass_auth(&user_id)?;
-            Ok(())
-        }
-        ShardEvent::ChangedPassword {
-            user_id,
-            current_password,
-            new_password,
-        } => {
-            shard.change_password_bypass_auth(&user_id, &current_password, 
&new_password)?;
-            Ok(())
-        }
-        ShardEvent::CreatedPersonalAccessToken {
-            personal_access_token,
-        } => {
-            
shard.create_personal_access_token_bypass_auth(personal_access_token.to_owned())?;
-            Ok(())
-        }
-        ShardEvent::DeletedPersonalAccessToken { user_id, name } => {
-            shard.delete_personal_access_token_bypass_auth(user_id, &name)?;
-            Ok(())
-        }
-        ShardEvent::UpdatedUser {
-            user_id,
-            username,
-            status,
-        } => {
-            shard.update_user_bypass_auth(&user_id, username.to_owned(), 
status)?;
-            Ok(())
-        }
-        ShardEvent::UpdatedPermissions {
-            user_id,
-            permissions,
-        } => {
-            shard.update_permissions_bypass_auth(&user_id, 
permissions.to_owned())?;
+            shard
+                .flush_unsaved_buffer_base(&stream_id, &topic_id, 
partition_id, fsync)
+                .await?;
             Ok(())
         }
         ShardEvent::AddressBound { protocol, address } => {
@@ -509,94 +383,5 @@ pub async fn handle_event(shard: &Rc<IggyShard>, event: 
ShardEvent) -> Result<()
             }
             Ok(())
         }
-        ShardEvent::CreatedStream { id: _, stream } => {
-            // Note: Slab::insert() returns the next available slot, not the 
stream's embedded ID.
-            // The stream already has the correct ID from shard 0, which is 
preserved after insert.
-            shard.create_stream_bypass_auth(stream);
-            Ok(())
-        }
-        ShardEvent::DeletedStream { id, stream_id } => {
-            let stream = shard.delete_stream_bypass_auth(&stream_id);
-            assert_eq!(stream.id(), id);
-
-            Ok(())
-        }
-        ShardEvent::CreatedTopic { stream_id, topic } => {
-            // Note: Slab::insert() returns the next available slot, not the 
topic's embedded ID.
-            // The topic already has the correct ID from shard 0, which is 
preserved after insert.
-            shard.create_topic_bypass_auth(&stream_id, topic.clone());
-            Ok(())
-        }
-        ShardEvent::CreatedPartitions {
-            stream_id,
-            topic_id,
-            partitions,
-        } => {
-            shard
-                .create_partitions_bypass_auth(&stream_id, &topic_id, 
partitions)
-                .await?;
-            Ok(())
-        }
-        ShardEvent::DeletedTopic {
-            id,
-            stream_id,
-            topic_id,
-        } => {
-            let topic = shard.delete_topic_bypass_auth(&stream_id, &topic_id);
-            assert_eq!(topic.id(), id);
-            Ok(())
-        }
-        ShardEvent::UpdatedTopic {
-            stream_id,
-            topic_id,
-            name,
-            message_expiry,
-            compression_algorithm,
-            max_topic_size,
-            replication_factor,
-        } => {
-            shard.update_topic_bypass_auth(
-                &stream_id,
-                &topic_id,
-                name.clone(),
-                message_expiry,
-                compression_algorithm,
-                max_topic_size,
-                replication_factor,
-            )?;
-            Ok(())
-        }
-        ShardEvent::CreatedConsumerGroup {
-            stream_id,
-            topic_id,
-            cg,
-        } => {
-            // Note: Slab::insert() returns the next available slot, not the 
consumer group's embedded ID.
-            // The consumer group already has the correct ID from shard 0.
-            shard.create_consumer_group_bypass_auth(&stream_id, &topic_id, cg);
-            Ok(())
-        }
-        ShardEvent::DeletedConsumerGroup {
-            id,
-            stream_id,
-            topic_id,
-            group_id,
-        } => {
-            let cg = shard.delete_consumer_group_bypass_auth(&stream_id, 
&topic_id, &group_id);
-            assert_eq!(cg.id(), id);
-
-            Ok(())
-        }
-        ShardEvent::FlushUnsavedBuffer {
-            stream_id,
-            topic_id,
-            partition_id,
-            fsync,
-        } => {
-            shard
-                .flush_unsaved_buffer_base(&stream_id, &topic_id, 
partition_id, fsync)
-                .await?;
-            Ok(())
-        }
     }
 }
diff --git a/core/server/src/shard/transmission/event.rs 
b/core/server/src/shard/transmission/event.rs
index eef75d958..3364a55aa 100644
--- a/core/server/src/shard/transmission/event.rs
+++ b/core/server/src/shard/transmission/event.rs
@@ -15,23 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::streaming::{
-    partitions::partition,
-    personal_access_tokens::personal_access_token::PersonalAccessToken,
-    streams::stream,
-    topics::{
-        consumer_group::{self},
-        topic,
-    },
-};
-use iggy_common::{
-    CompressionAlgorithm, Identifier, IggyExpiry, MaxTopicSize, Permissions, 
TransportProtocol,
-    UserStatus,
-};
+use iggy_common::{Identifier, TransportProtocol};
 use std::net::SocketAddr;
 use strum::Display;
 
-#[allow(clippy::large_enum_variant)]
 #[derive(Debug, Clone, Display)]
 #[strum(serialize_all = "PascalCase")]
 pub enum ShardEvent {
@@ -41,96 +28,6 @@ pub enum ShardEvent {
         partition_id: usize,
         fsync: bool,
     },
-    CreatedStream {
-        id: usize,
-        stream: stream::Stream,
-    },
-    DeletedStream {
-        id: usize,
-        stream_id: Identifier,
-    },
-    UpdatedStream {
-        stream_id: Identifier,
-        name: String,
-    },
-    PurgedStream {
-        stream_id: Identifier,
-    },
-    CreatedPartitions {
-        stream_id: Identifier,
-        topic_id: Identifier,
-        partitions: Vec<partition::Partition>,
-    },
-    DeletedPartitions {
-        stream_id: Identifier,
-        topic_id: Identifier,
-        partitions_count: u32,
-        partition_ids: Vec<usize>,
-    },
-    CreatedTopic {
-        stream_id: Identifier,
-        topic: topic::Topic,
-    },
-    CreatedConsumerGroup {
-        stream_id: Identifier,
-        topic_id: Identifier,
-        cg: consumer_group::ConsumerGroup,
-    },
-    DeletedConsumerGroup {
-        id: usize,
-        stream_id: Identifier,
-        topic_id: Identifier,
-        group_id: Identifier,
-    },
-    UpdatedTopic {
-        stream_id: Identifier,
-        topic_id: Identifier,
-        name: String,
-        message_expiry: IggyExpiry,
-        compression_algorithm: CompressionAlgorithm,
-        max_topic_size: MaxTopicSize,
-        replication_factor: Option<u8>,
-    },
-    PurgedTopic {
-        stream_id: Identifier,
-        topic_id: Identifier,
-    },
-    DeletedTopic {
-        id: usize,
-        stream_id: Identifier,
-        topic_id: Identifier,
-    },
-    CreatedUser {
-        user_id: u32,
-        username: String,
-        password: String,
-        status: UserStatus,
-        permissions: Option<Permissions>,
-    },
-    UpdatedPermissions {
-        user_id: Identifier,
-        permissions: Option<Permissions>,
-    },
-    DeletedUser {
-        user_id: Identifier,
-    },
-    UpdatedUser {
-        user_id: Identifier,
-        username: Option<String>,
-        status: Option<UserStatus>,
-    },
-    ChangedPassword {
-        user_id: Identifier,
-        current_password: String,
-        new_password: String,
-    },
-    CreatedPersonalAccessToken {
-        personal_access_token: PersonalAccessToken,
-    },
-    DeletedPersonalAccessToken {
-        user_id: u32,
-        name: String,
-    },
     AddressBound {
         protocol: TransportProtocol,
         address: SocketAddr,

Reply via email to