This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch auth-refactor
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 059e784655631ceb4ea27196c4eff21ed0c82d65
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Jan 9 12:48:42 2026 +0100

    refactor(server): add type-safe auth with proof-carrying code pattern
    
    Introduce Auth proof type that can only be created via IggyShard::auth(),
    providing compile-time guarantees for handler authentication.
---
 core/server/src/binary/command.rs                  | 126 +++++++++++---------
 .../cluster/get_cluster_metadata_handler.rs        |   9 +-
 .../create_consumer_group_handler.rs               |  11 +-
 .../delete_consumer_group_handler.rs               |  12 +-
 .../consumer_groups/get_consumer_group_handler.rs  |  10 +-
 .../consumer_groups/get_consumer_groups_handler.rs |  10 +-
 .../consumer_groups/join_consumer_group_handler.rs |  10 +-
 .../leave_consumer_group_handler.rs                |  12 +-
 .../delete_consumer_offset_handler.rs              |   7 +-
 .../get_consumer_offset_handler.rs                 |   7 +-
 .../store_consumer_offset_handler.rs               |   7 +-
 .../messages/flush_unsaved_buffer_handler.rs       |  11 +-
 .../handlers/messages/poll_messages_handler.rs     |   9 +-
 .../handlers/messages/send_messages_handler.rs     |  11 +-
 .../partitions/create_partitions_handler.rs        |  12 +-
 .../partitions/delete_partitions_handler.rs        |  12 +-
 .../create_personal_access_token_handler.rs        |  12 +-
 .../delete_personal_access_token_handler.rs        |  13 ++-
 .../get_personal_access_tokens_handler.rs          |   6 +-
 .../login_with_personal_access_token_handler.rs    |   5 +-
 .../handlers/segments/delete_segments_handler.rs   |  13 +--
 .../handlers/streams/create_stream_handler.rs      |  16 +--
 .../handlers/streams/delete_stream_handler.rs      |  15 +--
 .../binary/handlers/streams/get_stream_handler.rs  |  12 +-
 .../binary/handlers/streams/get_streams_handler.rs |  12 +-
 .../handlers/streams/purge_stream_handler.rs       |  12 +-
 .../handlers/streams/update_stream_handler.rs      |  12 +-
 .../binary/handlers/system/get_client_handler.rs   |   6 +-
 .../binary/handlers/system/get_clients_handler.rs  |   6 +-
 .../src/binary/handlers/system/get_me_handler.rs   |   6 +-
 .../src/binary/handlers/system/get_snapshot.rs     |   6 +-
 .../binary/handlers/system/get_stats_handler.rs    |   8 +-
 .../src/binary/handlers/system/ping_handler.rs     |   5 +-
 .../binary/handlers/topics/create_topic_handler.rs |  16 +--
 .../binary/handlers/topics/delete_topic_handler.rs |  16 +--
 .../binary/handlers/topics/get_topic_handler.rs    |  10 +-
 .../binary/handlers/topics/get_topics_handler.rs   |  10 +-
 .../binary/handlers/topics/purge_topic_handler.rs  |  11 +-
 .../binary/handlers/topics/update_topic_handler.rs |  16 +--
 .../handlers/users/change_password_handler.rs      |  12 +-
 .../binary/handlers/users/create_user_handler.rs   |  15 +--
 .../binary/handlers/users/delete_user_handler.rs   |  12 +-
 .../src/binary/handlers/users/get_user_handler.rs  |   6 +-
 .../src/binary/handlers/users/get_users_handler.rs |   6 +-
 .../binary/handlers/users/login_user_handler.rs    |   5 +-
 .../binary/handlers/users/logout_user_handler.rs   |  14 +--
 .../handlers/users/update_permissions_handler.rs   |  15 +--
 .../binary/handlers/users/update_user_handler.rs   |  12 +-
 core/server/src/binary/macros.rs                   | 128 ++++++++++++++++-----
 core/server/src/quic/listener.rs                   |   4 +-
 core/server/src/shard/mod.rs                       |  14 +++
 core/server/src/streaming/auth.rs                  |  80 +++++++++++++
 core/server/src/streaming/mod.rs                   |   1 +
 core/server/src/tcp/connection_handler.rs          |   3 +-
 core/server/src/websocket/connection_handler.rs    |   3 +-
 55 files changed, 521 insertions(+), 329 deletions(-)

diff --git a/core/server/src/binary/command.rs 
b/core/server/src/binary/command.rs
index e10fe6ab8..4d4c2b9dd 100644
--- a/core/server/src/binary/command.rs
+++ b/core/server/src/binary/command.rs
@@ -18,9 +18,9 @@
 
 use crate::define_server_command_enum;
 use crate::shard::IggyShard;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
 use bytes::{BufMut, Bytes, BytesMut};
-use enum_dispatch::enum_dispatch;
 use iggy_common::SenderKind;
 use iggy_common::change_password::ChangePassword;
 use iggy_common::create_consumer_group::CreateConsumerGroup;
@@ -72,70 +72,83 @@ use strum::EnumString;
 use tracing::error;
 
 define_server_command_enum! {
-    Ping(Ping), PING_CODE, PING, false;
-    GetStats(GetStats), GET_STATS_CODE, GET_STATS, false;
-    GetMe(GetMe), GET_ME_CODE, GET_ME, false;
-    GetClient(GetClient), GET_CLIENT_CODE, GET_CLIENT, true;
-    GetClients(GetClients), GET_CLIENTS_CODE, GET_CLIENTS, false;
-    GetSnapshot(GetSnapshot), GET_SNAPSHOT_FILE_CODE, GET_SNAPSHOT_FILE, false;
-    GetClusterMetadata(GetClusterMetadata), GET_CLUSTER_METADATA_CODE, 
GET_CLUSTER_METADATA, false;
-    PollMessages(PollMessages), POLL_MESSAGES_CODE, POLL_MESSAGES, true;
-    FlushUnsavedBuffer(FlushUnsavedBuffer), FLUSH_UNSAVED_BUFFER_CODE, 
FLUSH_UNSAVED_BUFFER, true;
-    GetUser(GetUser), GET_USER_CODE, GET_USER, true;
-    GetUsers(GetUsers), GET_USERS_CODE, GET_USERS, false;
-    CreateUser(CreateUser), CREATE_USER_CODE, CREATE_USER, true;
-    DeleteUser(DeleteUser), DELETE_USER_CODE, DELETE_USER, true;
-    UpdateUser(UpdateUser), UPDATE_USER_CODE, UPDATE_USER, true;
-    UpdatePermissions(UpdatePermissions), UPDATE_PERMISSIONS_CODE, 
UPDATE_PERMISSIONS, true;
-    ChangePassword(ChangePassword), CHANGE_PASSWORD_CODE, CHANGE_PASSWORD, 
true;
-    LoginUser(LoginUser), LOGIN_USER_CODE, LOGIN_USER, true;
-    LogoutUser(LogoutUser), LOGOUT_USER_CODE, LOGOUT_USER, false;
-    GetPersonalAccessTokens(GetPersonalAccessTokens), 
GET_PERSONAL_ACCESS_TOKENS_CODE, GET_PERSONAL_ACCESS_TOKENS, false;
-    CreatePersonalAccessToken(CreatePersonalAccessToken), 
CREATE_PERSONAL_ACCESS_TOKEN_CODE, CREATE_PERSONAL_ACCESS_TOKEN, true;
-    DeletePersonalAccessToken(DeletePersonalAccessToken), 
DELETE_PERSONAL_ACCESS_TOKEN_CODE, DELETE_PERSONAL_ACCESS_TOKEN, false;
-    LoginWithPersonalAccessToken(LoginWithPersonalAccessToken), 
LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE, LOGIN_WITH_PERSONAL_ACCESS_TOKEN, true;
-    SendMessages(SendMessages), SEND_MESSAGES_CODE, SEND_MESSAGES, false;
-    GetConsumerOffset(GetConsumerOffset), GET_CONSUMER_OFFSET_CODE, 
GET_CONSUMER_OFFSET, true;
-    StoreConsumerOffset(StoreConsumerOffset), STORE_CONSUMER_OFFSET_CODE, 
STORE_CONSUMER_OFFSET, true;
-    DeleteConsumerOffset(DeleteConsumerOffset), DELETE_CONSUMER_OFFSET_CODE, 
DELETE_CONSUMER_OFFSET, true;
-    GetStream(GetStream), GET_STREAM_CODE, GET_STREAM, true;
-    GetStreams(GetStreams), GET_STREAMS_CODE, GET_STREAMS, false;
-    CreateStream(CreateStream), CREATE_STREAM_CODE, CREATE_STREAM, true;
-    DeleteStream(DeleteStream), DELETE_STREAM_CODE, DELETE_STREAM, true;
-    UpdateStream(UpdateStream), UPDATE_STREAM_CODE, UPDATE_STREAM, true;
-    PurgeStream(PurgeStream), PURGE_STREAM_CODE, PURGE_STREAM, true;
-    GetTopic(GetTopic), GET_TOPIC_CODE, GET_TOPIC, true;
-    GetTopics(GetTopics), GET_TOPICS_CODE, GET_TOPICS, false;
-    CreateTopic(CreateTopic), CREATE_TOPIC_CODE, CREATE_TOPIC, true;
-    DeleteTopic(DeleteTopic), DELETE_TOPIC_CODE, DELETE_TOPIC, true;
-    UpdateTopic(UpdateTopic), UPDATE_TOPIC_CODE, UPDATE_TOPIC, true;
-    PurgeTopic(PurgeTopic), PURGE_TOPIC_CODE, PURGE_TOPIC, true;
-    CreatePartitions(CreatePartitions), CREATE_PARTITIONS_CODE, 
CREATE_PARTITIONS, true;
-    DeletePartitions(DeletePartitions), DELETE_PARTITIONS_CODE, 
DELETE_PARTITIONS, true;
-    DeleteSegments(DeleteSegments), DELETE_SEGMENTS_CODE, DELETE_SEGMENTS, 
true;
-    GetConsumerGroup(GetConsumerGroup), GET_CONSUMER_GROUP_CODE, 
GET_CONSUMER_GROUP, true;
-    GetConsumerGroups(GetConsumerGroups), GET_CONSUMER_GROUPS_CODE, 
GET_CONSUMER_GROUPS, false;
-    CreateConsumerGroup(CreateConsumerGroup), CREATE_CONSUMER_GROUP_CODE, 
CREATE_CONSUMER_GROUP, true;
-    DeleteConsumerGroup(DeleteConsumerGroup), DELETE_CONSUMER_GROUP_CODE, 
DELETE_CONSUMER_GROUP, true;
-    JoinConsumerGroup(JoinConsumerGroup), JOIN_CONSUMER_GROUP_CODE, 
JOIN_CONSUMER_GROUP, true;
-    LeaveConsumerGroup(LeaveConsumerGroup), LEAVE_CONSUMER_GROUP_CODE, 
LEAVE_CONSUMER_GROUP, true;
+    @unauth {
+        Ping(Ping), PING_CODE, PING, false;
+        LoginUser(LoginUser), LOGIN_USER_CODE, LOGIN_USER, true;
+        LoginWithPersonalAccessToken(LoginWithPersonalAccessToken), 
LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE, LOGIN_WITH_PERSONAL_ACCESS_TOKEN, true;
+    }
+    @auth {
+        GetStats(GetStats), GET_STATS_CODE, GET_STATS, false;
+        GetMe(GetMe), GET_ME_CODE, GET_ME, false;
+        GetClient(GetClient), GET_CLIENT_CODE, GET_CLIENT, true;
+        GetClients(GetClients), GET_CLIENTS_CODE, GET_CLIENTS, false;
+        GetSnapshot(GetSnapshot), GET_SNAPSHOT_FILE_CODE, GET_SNAPSHOT_FILE, 
false;
+        GetClusterMetadata(GetClusterMetadata), GET_CLUSTER_METADATA_CODE, 
GET_CLUSTER_METADATA, false;
+        PollMessages(PollMessages), POLL_MESSAGES_CODE, POLL_MESSAGES, true;
+        FlushUnsavedBuffer(FlushUnsavedBuffer), FLUSH_UNSAVED_BUFFER_CODE, 
FLUSH_UNSAVED_BUFFER, true;
+        GetUser(GetUser), GET_USER_CODE, GET_USER, true;
+        GetUsers(GetUsers), GET_USERS_CODE, GET_USERS, false;
+        CreateUser(CreateUser), CREATE_USER_CODE, CREATE_USER, true;
+        DeleteUser(DeleteUser), DELETE_USER_CODE, DELETE_USER, true;
+        UpdateUser(UpdateUser), UPDATE_USER_CODE, UPDATE_USER, true;
+        UpdatePermissions(UpdatePermissions), UPDATE_PERMISSIONS_CODE, 
UPDATE_PERMISSIONS, true;
+        ChangePassword(ChangePassword), CHANGE_PASSWORD_CODE, CHANGE_PASSWORD, 
true;
+        LogoutUser(LogoutUser), LOGOUT_USER_CODE, LOGOUT_USER, false;
+        GetPersonalAccessTokens(GetPersonalAccessTokens), 
GET_PERSONAL_ACCESS_TOKENS_CODE, GET_PERSONAL_ACCESS_TOKENS, false;
+        CreatePersonalAccessToken(CreatePersonalAccessToken), 
CREATE_PERSONAL_ACCESS_TOKEN_CODE, CREATE_PERSONAL_ACCESS_TOKEN, true;
+        DeletePersonalAccessToken(DeletePersonalAccessToken), 
DELETE_PERSONAL_ACCESS_TOKEN_CODE, DELETE_PERSONAL_ACCESS_TOKEN, false;
+        SendMessages(SendMessages), SEND_MESSAGES_CODE, SEND_MESSAGES, false;
+        GetConsumerOffset(GetConsumerOffset), GET_CONSUMER_OFFSET_CODE, 
GET_CONSUMER_OFFSET, true;
+        StoreConsumerOffset(StoreConsumerOffset), STORE_CONSUMER_OFFSET_CODE, 
STORE_CONSUMER_OFFSET, true;
+        DeleteConsumerOffset(DeleteConsumerOffset), 
DELETE_CONSUMER_OFFSET_CODE, DELETE_CONSUMER_OFFSET, true;
+        GetStream(GetStream), GET_STREAM_CODE, GET_STREAM, true;
+        GetStreams(GetStreams), GET_STREAMS_CODE, GET_STREAMS, false;
+        CreateStream(CreateStream), CREATE_STREAM_CODE, CREATE_STREAM, true;
+        DeleteStream(DeleteStream), DELETE_STREAM_CODE, DELETE_STREAM, true;
+        UpdateStream(UpdateStream), UPDATE_STREAM_CODE, UPDATE_STREAM, true;
+        PurgeStream(PurgeStream), PURGE_STREAM_CODE, PURGE_STREAM, true;
+        GetTopic(GetTopic), GET_TOPIC_CODE, GET_TOPIC, true;
+        GetTopics(GetTopics), GET_TOPICS_CODE, GET_TOPICS, false;
+        CreateTopic(CreateTopic), CREATE_TOPIC_CODE, CREATE_TOPIC, true;
+        DeleteTopic(DeleteTopic), DELETE_TOPIC_CODE, DELETE_TOPIC, true;
+        UpdateTopic(UpdateTopic), UPDATE_TOPIC_CODE, UPDATE_TOPIC, true;
+        PurgeTopic(PurgeTopic), PURGE_TOPIC_CODE, PURGE_TOPIC, true;
+        CreatePartitions(CreatePartitions), CREATE_PARTITIONS_CODE, 
CREATE_PARTITIONS, true;
+        DeletePartitions(DeletePartitions), DELETE_PARTITIONS_CODE, 
DELETE_PARTITIONS, true;
+        DeleteSegments(DeleteSegments), DELETE_SEGMENTS_CODE, DELETE_SEGMENTS, 
true;
+        GetConsumerGroup(GetConsumerGroup), GET_CONSUMER_GROUP_CODE, 
GET_CONSUMER_GROUP, true;
+        GetConsumerGroups(GetConsumerGroups), GET_CONSUMER_GROUPS_CODE, 
GET_CONSUMER_GROUPS, false;
+        CreateConsumerGroup(CreateConsumerGroup), CREATE_CONSUMER_GROUP_CODE, 
CREATE_CONSUMER_GROUP, true;
+        DeleteConsumerGroup(DeleteConsumerGroup), DELETE_CONSUMER_GROUP_CODE, 
DELETE_CONSUMER_GROUP, true;
+        JoinConsumerGroup(JoinConsumerGroup), JOIN_CONSUMER_GROUP_CODE, 
JOIN_CONSUMER_GROUP, true;
+        LeaveConsumerGroup(LeaveConsumerGroup), LEAVE_CONSUMER_GROUP_CODE, 
LEAVE_CONSUMER_GROUP, true;
+    }
 }
 
-/// Indicates whether a command handler completed normally or migrated the 
connection.
 pub enum HandlerResult {
-    /// Command completed, connection stays on current shard.
     Finished,
-
-    /// Connection was migrated to another shard. Source shard should exit 
without cleanup.
     Migrated { to_shard: u16 },
 }
 
-#[enum_dispatch]
-pub trait ServerCommandHandler {
-    /// Return the command code
+/// Handler for commands requiring authentication - receives [`Auth`] proof 
token.
+pub trait AuthenticatedHandler {
+    fn code(&self) -> u32;
+
+    #[allow(async_fn_in_trait)]
+    async fn handle(
+        self,
+        sender: &mut SenderKind,
+        length: u32,
+        auth: Auth,
+        session: &Session,
+        shard: &Rc<IggyShard>,
+    ) -> Result<HandlerResult, IggyError>;
+}
+
+/// Handler for commands not requiring authentication (Ping, LoginUser, 
LoginWithPersonalAccessToken).
+pub trait UnauthenticatedHandler {
     fn code(&self) -> u32;
 
-    /// Handle the command execution
     #[allow(async_fn_in_trait)]
     async fn handle(
         self,
@@ -147,7 +160,6 @@ pub trait ServerCommandHandler {
 }
 
 pub trait BinaryServerCommand {
-    /// Parse command from sender
     #[allow(async_fn_in_trait)]
     async fn from_sender(
         sender: &mut SenderKind,
diff --git 
a/core/server/src/binary/handlers/cluster/get_cluster_metadata_handler.rs 
b/core/server/src/binary/handlers/cluster/get_cluster_metadata_handler.rs
index af6c7a9a0..babd2e692 100644
--- a/core/server/src/binary/handlers/cluster/get_cluster_metadata_handler.rs
+++ b/core/server/src/binary/handlers/cluster/get_cluster_metadata_handler.rs
@@ -19,27 +19,28 @@
 use std::rc::Rc;
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::shard::IggyShard;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
 //use crate::streaming::systems::system::SharedSystem;
-use anyhow::Result;
 use iggy_common::get_cluster_metadata::GetClusterMetadata;
 use iggy_common::{BytesSerializable, IggyError, SenderKind};
 use tracing::{debug, instrument};
 
-impl ServerCommandHandler for GetClusterMetadata {
+impl AuthenticatedHandler for GetClusterMetadata {
     fn code(&self) -> u32 {
         iggy_common::GET_CLUSTER_METADATA_CODE
     }
 
-    #[instrument(skip_all, name = "trace_get_cluster_metadata", 
fields(iggy_user_id = session.get_user_id(), iggy_client_id = 
session.client_id))]
+    #[instrument(skip_all, name = "trace_get_cluster_metadata", 
fields(iggy_user_id = auth.user_id(), iggy_client_id = session.client_id))]
     async fn handle(
         self,
         sender: &mut SenderKind,
         _length: u32,
+        auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
diff --git 
a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
 
b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
index 192d867f5..aebec3452 100644
--- 
a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
@@ -17,7 +17,7 @@
  */
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::consumer_groups::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
@@ -27,24 +27,25 @@ use crate::shard::transmission::event::ShardEvent;
 use crate::slab::traits_ext::EntityMarker;
 use crate::state::command::EntryCommand;
 use crate::state::models::CreateConsumerGroupWithId;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
-use anyhow::Result;
 use err_trail::ErrContext;
 use iggy_common::create_consumer_group::CreateConsumerGroup;
 use iggy_common::{Identifier, IggyError, SenderKind};
 use std::rc::Rc;
 use tracing::{debug, instrument};
 
-impl ServerCommandHandler for CreateConsumerGroup {
+impl AuthenticatedHandler for CreateConsumerGroup {
     fn code(&self) -> u32 {
         iggy_common::CREATE_CONSUMER_GROUP_CODE
     }
 
-    #[instrument(skip_all, name = "trace_create_consumer_group", 
fields(iggy_user_id = session.get_user_id(), iggy_client_id = 
session.client_id, iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = 
self.topic_id.as_string()))]
+    #[instrument(skip_all, name = "trace_create_consumer_group", 
fields(iggy_user_id = auth.user_id(), iggy_client_id = session.client_id, 
iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = 
self.topic_id.as_string()))]
     async fn handle(
         self,
         sender: &mut SenderKind,
         _length: u32,
+        auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
@@ -69,7 +70,7 @@ impl ServerCommandHandler for CreateConsumerGroup {
         shard
             .state
         .apply(
-            session.get_user_id(),
+            auth.user_id(),
            &EntryCommand::CreateConsumerGroup(CreateConsumerGroupWithId {
                 group_id: cg_id as u32,
                 command: self
diff --git 
a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
 
b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
index 05c3cd9de..f8d06392d 100644
--- 
a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
@@ -17,34 +17,34 @@
  */
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::consumer_groups::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
-
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::slab::traits_ext::EntityMarker;
 use crate::state::command::EntryCommand;
+use crate::streaming::auth::Auth;
 use crate::streaming::polling_consumer::ConsumerGroupId;
 use crate::streaming::session::Session;
-use anyhow::Result;
 use err_trail::ErrContext;
 use iggy_common::delete_consumer_group::DeleteConsumerGroup;
 use iggy_common::{IggyError, SenderKind};
 use std::rc::Rc;
 use tracing::{debug, instrument};
 
-impl ServerCommandHandler for DeleteConsumerGroup {
+impl AuthenticatedHandler for DeleteConsumerGroup {
     fn code(&self) -> u32 {
         iggy_common::DELETE_CONSUMER_GROUP_CODE
     }
 
-    #[instrument(skip_all, name = "trace_delete_consumer_group", 
fields(iggy_user_id = session.get_user_id(), iggy_client_id = 
session.client_id, iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = 
self.topic_id.as_string()))]
+    #[instrument(skip_all, name = "trace_delete_consumer_group", 
fields(iggy_user_id = auth.user_id(), iggy_client_id = session.client_id, 
iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = 
self.topic_id.as_string()))]
     async fn handle(
         self,
         sender: &mut SenderKind,
         _length: u32,
+        auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
@@ -114,7 +114,7 @@ impl ServerCommandHandler for DeleteConsumerGroup {
         shard
             .state
             .apply(
-                session.get_user_id(),
+                auth.user_id(),
                 &EntryCommand::DeleteConsumerGroup(self),
             )
             .await
diff --git 
a/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs 
b/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs
index 7156a4d53..0d578e4c9 100644
--- 
a/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs
@@ -17,21 +17,21 @@
  */
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::shard::IggyShard;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
 use crate::streaming::{streams, topics};
-use anyhow::Result;
 use iggy_common::IggyError;
 use iggy_common::SenderKind;
 use iggy_common::get_consumer_group::GetConsumerGroup;
 use std::rc::Rc;
 use tracing::debug;
 
-impl ServerCommandHandler for GetConsumerGroup {
+impl AuthenticatedHandler for GetConsumerGroup {
     fn code(&self) -> u32 {
         iggy_common::GET_CONSUMER_GROUP_CODE
     }
@@ -40,11 +40,11 @@ impl ServerCommandHandler for GetConsumerGroup {
         self,
         sender: &mut SenderKind,
         _length: u32,
+        auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
-        shard.ensure_authenticated(session)?;
         let exists = shard
             .ensure_consumer_group_exists(&self.stream_id, &self.topic_id, 
&self.group_id)
             .is_ok();
@@ -63,7 +63,7 @@ impl ServerCommandHandler for GetConsumerGroup {
         let has_permission = shard
             .permissioner
             .borrow()
-            .get_consumer_group(session.get_user_id(), numeric_stream_id, 
numeric_topic_id)
+            .get_consumer_group(auth.user_id(), numeric_stream_id, 
numeric_topic_id)
             .is_ok();
         if !has_permission {
             sender.send_empty_ok_response().await?;
diff --git 
a/core/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs
 
b/core/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs
index d59f33c97..644cc7a28 100644
--- 
a/core/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs
@@ -17,22 +17,22 @@
  */
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::shard::IggyShard;
 use crate::slab::traits_ext::{EntityComponentSystem, IntoComponents};
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
 use crate::streaming::{streams, topics};
-use anyhow::Result;
 use iggy_common::IggyError;
 use iggy_common::SenderKind;
 use iggy_common::get_consumer_groups::GetConsumerGroups;
 use std::rc::Rc;
 use tracing::debug;
 
-impl ServerCommandHandler for GetConsumerGroups {
+impl AuthenticatedHandler for GetConsumerGroups {
     fn code(&self) -> u32 {
         iggy_common::GET_CONSUMER_GROUPS_CODE
     }
@@ -41,11 +41,11 @@ impl ServerCommandHandler for GetConsumerGroups {
         self,
         sender: &mut SenderKind,
         _length: u32,
+        auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
-        shard.ensure_authenticated(session)?;
         shard.ensure_topic_exists(&self.stream_id, &self.topic_id)?;
         let numeric_topic_id = shard.streams.with_topic_by_id(
             &self.stream_id,
@@ -56,7 +56,7 @@ impl ServerCommandHandler for GetConsumerGroups {
             .streams
             .with_stream_by_id(&self.stream_id, 
streams::helpers::get_stream_id());
         shard.permissioner.borrow().get_consumer_groups(
-            session.get_user_id(),
+            auth.user_id(),
             numeric_stream_id,
             numeric_topic_id,
         )?;
diff --git 
a/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
 
b/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
index cc9a8e27b..251c7996e 100644
--- 
a/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
@@ -17,30 +17,30 @@
  */
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::consumer_groups::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
-
 use crate::shard::IggyShard;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
-use anyhow::Result;
 use err_trail::ErrContext;
 use iggy_common::join_consumer_group::JoinConsumerGroup;
 use iggy_common::{IggyError, SenderKind};
 use std::rc::Rc;
 use tracing::{debug, instrument};
 
-impl ServerCommandHandler for JoinConsumerGroup {
+impl AuthenticatedHandler for JoinConsumerGroup {
     fn code(&self) -> u32 {
         iggy_common::JOIN_CONSUMER_GROUP_CODE
     }
 
-    #[instrument(skip_all, name = "trace_join_consumer_group", 
fields(iggy_user_id = session.get_user_id(), iggy_client_id = 
session.client_id, iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = 
self.topic_id.as_string(), iggy_group_id = self.group_id.as_string()))]
+    #[instrument(skip_all, name = "trace_join_consumer_group", 
fields(iggy_user_id = auth.user_id(), iggy_client_id = session.client_id, 
iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = 
self.topic_id.as_string(), iggy_group_id = self.group_id.as_string()))]
     async fn handle(
         self,
         sender: &mut SenderKind,
         _length: u32,
+        auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
diff --git 
a/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
 
b/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
index b56d94a48..eaaab37be 100644
--- 
a/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
@@ -18,30 +18,30 @@
 
 use super::COMPONENT;
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::utils::receive_and_validate;
-use iggy_common::SenderKind;
-
 use crate::shard::IggyShard;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
-use anyhow::Result;
 use err_trail::ErrContext;
 use iggy_common::IggyError;
+use iggy_common::SenderKind;
 use iggy_common::leave_consumer_group::LeaveConsumerGroup;
 use std::rc::Rc;
 use tracing::{debug, instrument};
 
-impl ServerCommandHandler for LeaveConsumerGroup {
+impl AuthenticatedHandler for LeaveConsumerGroup {
     fn code(&self) -> u32 {
         iggy_common::LEAVE_CONSUMER_GROUP_CODE
     }
 
-    #[instrument(skip_all, name = "trace_leave_consumer_group", 
fields(iggy_user_id = session.get_user_id(), iggy_client_id = 
session.client_id, iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = 
self.topic_id.as_string(), iggy_group_id = self.group_id.as_string()))]
+    #[instrument(skip_all, name = "trace_leave_consumer_group", 
fields(iggy_user_id = auth.user_id(), iggy_client_id = session.client_id, 
iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = 
self.topic_id.as_string(), iggy_group_id = self.group_id.as_string()))]
     async fn handle(
         self,
         sender: &mut SenderKind,
         _length: u32,
+        auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
diff --git 
a/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs
 
b/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs
index 45f22349d..e0414c85a 100644
--- 
a/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs
@@ -17,20 +17,20 @@
  */
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::consumer_offsets::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::shard::IggyShard;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
-use anyhow::Result;
 use err_trail::ErrContext;
 use iggy_common::delete_consumer_offset::DeleteConsumerOffset;
 use iggy_common::{IggyError, SenderKind};
 use std::rc::Rc;
 use tracing::debug;
 
-impl ServerCommandHandler for DeleteConsumerOffset {
+impl AuthenticatedHandler for DeleteConsumerOffset {
     fn code(&self) -> u32 {
         iggy_common::DELETE_CONSUMER_OFFSET_CODE
     }
@@ -39,6 +39,7 @@ impl ServerCommandHandler for DeleteConsumerOffset {
         self,
         sender: &mut SenderKind,
         _length: u32,
+        _auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
diff --git 
a/core/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs
 
b/core/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs
index 29ee6f310..868c2a237 100644
--- 
a/core/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs
@@ -17,20 +17,20 @@
  */
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::shard::IggyShard;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
-use anyhow::Result;
 use iggy_common::IggyError;
 use iggy_common::SenderKind;
 use iggy_common::get_consumer_offset::GetConsumerOffset;
 use std::rc::Rc;
 use tracing::debug;
 
-impl ServerCommandHandler for GetConsumerOffset {
+impl AuthenticatedHandler for GetConsumerOffset {
     fn code(&self) -> u32 {
         iggy_common::GET_CONSUMER_OFFSET_CODE
     }
@@ -39,6 +39,7 @@ impl ServerCommandHandler for GetConsumerOffset {
         self,
         sender: &mut SenderKind,
         _length: u32,
+        _auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
diff --git 
a/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs
 
b/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs
index dcbb79db3..4661f524a 100644
--- 
a/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs
@@ -19,20 +19,20 @@
 use std::rc::Rc;
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::consumer_offsets::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::shard::IggyShard;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
-use anyhow::Result;
 use err_trail::ErrContext;
 use iggy_common::IggyError;
 use iggy_common::SenderKind;
 use iggy_common::store_consumer_offset::StoreConsumerOffset;
 use tracing::debug;
 
-impl ServerCommandHandler for StoreConsumerOffset {
+impl AuthenticatedHandler for StoreConsumerOffset {
     fn code(&self) -> u32 {
         iggy_common::STORE_CONSUMER_OFFSET_CODE
     }
@@ -41,6 +41,7 @@ impl ServerCommandHandler for StoreConsumerOffset {
         self,
         sender: &mut SenderKind,
         _length: u32,
+        _auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
diff --git 
a/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs 
b/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs
index 937a03cb3..8fe71f96c 100644
--- a/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs
+++ b/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs
@@ -17,34 +17,35 @@
  */
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::messages::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::shard::IggyShard;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
-use anyhow::Result;
 use err_trail::ErrContext;
 use iggy_common::{FlushUnsavedBuffer, IggyError, SenderKind};
 use std::rc::Rc;
 use tracing::{debug, instrument};
 
-impl ServerCommandHandler for FlushUnsavedBuffer {
+impl AuthenticatedHandler for FlushUnsavedBuffer {
     fn code(&self) -> u32 {
         iggy_common::FLUSH_UNSAVED_BUFFER_CODE
     }
 
-    #[instrument(skip_all, name = "trace_flush_unsaved_buffer", 
fields(iggy_user_id = session.get_user_id(), iggy_client_id = 
session.client_id, iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = 
self.topic_id.as_string(), iggy_partition_id = self.partition_id, iggy_fsync = 
self.fsync))]
+    #[instrument(skip_all, name = "trace_flush_unsaved_buffer", 
fields(iggy_user_id = auth.user_id(), iggy_client_id = session.client_id, 
iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = 
self.topic_id.as_string(), iggy_partition_id = self.partition_id, iggy_fsync = 
self.fsync))]
     async fn handle(
         self,
         sender: &mut SenderKind,
         _length: u32,
+        auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
 
-        let user_id = session.get_user_id();
+        let user_id = auth.user_id();
         let stream_id = self.stream_id.clone();
         let topic_id = self.topic_id.clone();
         let partition_id = self.partition_id;
diff --git a/core/server/src/binary/handlers/messages/poll_messages_handler.rs 
b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
index 0839e3a55..e508ae6c9 100644
--- a/core/server/src/binary/handlers/messages/poll_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
@@ -17,13 +17,13 @@
  */
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::shard::IggyShard;
 use crate::shard::system::messages::PollingArgs;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
-use anyhow::Result;
 use iggy_common::SenderKind;
 use iggy_common::{IggyError, PollMessages, PooledBuffer};
 use std::rc::Rc;
@@ -44,7 +44,7 @@ impl IggyPollMetadata {
     }
 }
 
-impl ServerCommandHandler for PollMessages {
+impl AuthenticatedHandler for PollMessages {
     fn code(&self) -> u32 {
         iggy_common::POLL_MESSAGES_CODE
     }
@@ -53,6 +53,7 @@ impl ServerCommandHandler for PollMessages {
         self,
         sender: &mut SenderKind,
         _length: u32,
+        auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
@@ -68,7 +69,7 @@ impl ServerCommandHandler for PollMessages {
         } = self;
         let args = PollingArgs::new(strategy, count, auto_commit);
 
-        let user_id = session.get_user_id();
+        let user_id = auth.user_id();
         let client_id = session.client_id;
         let (metadata, mut batch) = shard
             .poll_messages(
diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs 
b/core/server/src/binary/handlers/messages/send_messages_handler.rs
index 375848799..782525481 100644
--- a/core/server/src/binary/handlers/messages/send_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs
@@ -16,13 +16,13 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, HandlerResult, 
ServerCommandHandler};
+use crate::binary::command::{AuthenticatedHandler, BinaryServerCommand, 
HandlerResult};
 use crate::shard::IggyShard;
 use crate::shard::transmission::message::{ShardMessage, ShardRequest, 
ShardRequestPayload};
+use crate::streaming::auth::Auth;
 use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut};
 use crate::streaming::session::Session;
 use crate::streaming::{streams, topics};
-use anyhow::Result;
 use compio::buf::{IntoInner as _, IoBuf};
 use iggy_common::Identifier;
 use iggy_common::PooledBuffer;
@@ -34,13 +34,13 @@ use iggy_common::{IggyError, Partitioning, SendMessages, 
Validatable};
 use std::rc::Rc;
 use tracing::{debug, error, info, instrument};
 
-impl ServerCommandHandler for SendMessages {
+impl AuthenticatedHandler for SendMessages {
     fn code(&self) -> u32 {
         iggy_common::SEND_MESSAGES_CODE
     }
 
     #[instrument(skip_all, name = "trace_send_messages", fields(
-        iggy_user_id = session.get_user_id(),
+        iggy_user_id = auth.user_id(),
         iggy_client_id = session.client_id,
         iggy_stream_id = self.stream_id.as_string(),
         iggy_topic_id = self.topic_id.as_string(),
@@ -50,6 +50,7 @@ impl ServerCommandHandler for SendMessages {
         mut self,
         sender: &mut SenderKind,
         length: u32,
+        auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
@@ -150,7 +151,7 @@ impl ServerCommandHandler for SendMessages {
         )?;
 
         let namespace = IggyNamespace::new(numeric_stream_id, 
numeric_topic_id, partition_id);
-        let user_id = session.get_user_id();
+        let user_id = auth.user_id();
         let unsupport_socket_transfer = matches!(
             self.partitioning.kind,
             PartitioningKind::Balanced | PartitioningKind::MessagesKey
diff --git 
a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs 
b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
index 5c1930a8a..43e6d8119 100644
--- a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
+++ b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
@@ -17,34 +17,34 @@
  */
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::partitions::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
-
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::slab::traits_ext::EntityMarker;
 use crate::state::command::EntryCommand;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
 use crate::streaming::{streams, topics};
-use anyhow::Result;
 use err_trail::ErrContext;
 use iggy_common::create_partitions::CreatePartitions;
 use iggy_common::{IggyError, SenderKind};
 use std::rc::Rc;
 use tracing::{debug, instrument};
 
-impl ServerCommandHandler for CreatePartitions {
+impl AuthenticatedHandler for CreatePartitions {
     fn code(&self) -> u32 {
         iggy_common::CREATE_PARTITIONS_CODE
     }
 
-    #[instrument(skip_all, name = "trace_create_partitions", 
fields(iggy_user_id = session.get_user_id(), iggy_client_id = 
session.client_id, iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = 
self.topic_id.as_string()))]
+    #[instrument(skip_all, name = "trace_create_partitions", 
fields(iggy_user_id = auth.user_id(), iggy_client_id = session.client_id, 
iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = 
self.topic_id.as_string()))]
     async fn handle(
         self,
         sender: &mut SenderKind,
         _length: u32,
+        auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
@@ -86,7 +86,7 @@ impl ServerCommandHandler for CreatePartitions {
         shard
         .state
         .apply(
-            session.get_user_id(),
+            auth.user_id(),
             &EntryCommand::CreatePartitions(self),
         )
         .await
diff --git 
a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs 
b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
index 9090ec2c8..4d0daa84e 100644
--- a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
+++ b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
@@ -17,32 +17,32 @@
  */
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::partitions::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
-
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::state::command::EntryCommand;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
-use anyhow::Result;
 use err_trail::ErrContext;
 use iggy_common::delete_partitions::DeletePartitions;
 use iggy_common::{IggyError, SenderKind};
 use std::rc::Rc;
 use tracing::{debug, instrument};
 
-impl ServerCommandHandler for DeletePartitions {
+impl AuthenticatedHandler for DeletePartitions {
     fn code(&self) -> u32 {
         iggy_common::DELETE_PARTITIONS_CODE
     }
 
-    #[instrument(skip_all, name = "trace_delete_partitions", 
fields(iggy_user_id = session.get_user_id(), iggy_client_id = 
session.client_id, iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = 
self.topic_id.as_string()))]
+    #[instrument(skip_all, name = "trace_delete_partitions", 
fields(iggy_user_id = auth.user_id(), iggy_client_id = session.client_id, 
iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = 
self.topic_id.as_string()))]
     async fn handle(
         self,
         sender: &mut SenderKind,
         _length: u32,
+        auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
@@ -83,7 +83,7 @@ impl ServerCommandHandler for DeletePartitions {
         shard
         .state
         .apply(
-            session.get_user_id(),
+            auth.user_id(),
             &EntryCommand::DeletePartitions(self),
         )
         .await
diff --git 
a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
 
b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
index 50b896fb0..6cf407ed4 100644
--- 
a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
+++ 
b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
@@ -17,34 +17,34 @@
  */
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::personal_access_tokens::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
-
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::state::command::EntryCommand;
 use crate::state::models::CreatePersonalAccessTokenWithHash;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
-use anyhow::Result;
 use err_trail::ErrContext;
 use iggy_common::create_personal_access_token::CreatePersonalAccessToken;
 use iggy_common::{IggyError, SenderKind};
 use std::rc::Rc;
 use tracing::{debug, instrument};
 
-impl ServerCommandHandler for CreatePersonalAccessToken {
+impl AuthenticatedHandler for CreatePersonalAccessToken {
     fn code(&self) -> u32 {
         iggy_common::CREATE_PERSONAL_ACCESS_TOKEN_CODE
     }
 
-    #[instrument(skip_all, name = "trace_create_personal_access_token", 
fields(iggy_user_id = session.get_user_id(), iggy_client_id = 
session.client_id))]
+    #[instrument(skip_all, name = "trace_create_personal_access_token", 
fields(iggy_user_id = auth.user_id(), iggy_client_id = session.client_id))]
     async fn handle(
         self,
         sender: &mut SenderKind,
         _length: u32,
+        auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
@@ -68,7 +68,7 @@ impl ServerCommandHandler for CreatePersonalAccessToken {
         shard
             .state
             .apply(
-                session.get_user_id(),
+                auth.user_id(),
                 
&EntryCommand::CreatePersonalAccessToken(CreatePersonalAccessTokenWithHash {
                     command: CreatePersonalAccessToken {
                         name: self.name.to_owned(),
diff --git 
a/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
 
b/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
index bf89c70ca..38cef42f7 100644
--- 
a/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
+++ 
b/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
@@ -17,30 +17,31 @@
  */
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::personal_access_tokens::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::shard::IggyShard;
 use crate::state::command::EntryCommand;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
-use anyhow::Result;
 use err_trail::ErrContext;
 use iggy_common::delete_personal_access_token::DeletePersonalAccessToken;
 use iggy_common::{IggyError, SenderKind};
 use std::rc::Rc;
 use tracing::{debug, instrument};
 
-impl ServerCommandHandler for DeletePersonalAccessToken {
+impl AuthenticatedHandler for DeletePersonalAccessToken {
     fn code(&self) -> u32 {
         iggy_common::DELETE_PERSONAL_ACCESS_TOKEN_CODE
     }
 
-    #[instrument(skip_all, name = "trace_delete_personal_access_token", 
fields(iggy_user_id = session.get_user_id(), iggy_client_id = 
session.client_id))]
+    #[instrument(skip_all, name = "trace_delete_personal_access_token", 
fields(iggy_user_id = auth.user_id(), iggy_client_id = session.client_id))]
     async fn handle(
         self,
         sender: &mut SenderKind,
         _length: u32,
+        auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
@@ -55,7 +56,7 @@ impl ServerCommandHandler for DeletePersonalAccessToken {
 
         // Broadcast the event to other shards
         let event = 
crate::shard::transmission::event::ShardEvent::DeletedPersonalAccessToken {
-            user_id: session.get_user_id(),
+            user_id: auth.user_id(),
             name: self.name.clone(),
         };
         shard.broadcast_event_to_all_shards(event).await?;
@@ -63,7 +64,7 @@ impl ServerCommandHandler for DeletePersonalAccessToken {
         shard
             .state
             .apply(
-                session.get_user_id(),
+                auth.user_id(),
                 
&EntryCommand::DeletePersonalAccessToken(DeletePersonalAccessToken {
                     name: self.name,
                 }),
diff --git 
a/core/server/src/binary/handlers/personal_access_tokens/get_personal_access_tokens_handler.rs
 
b/core/server/src/binary/handlers/personal_access_tokens/get_personal_access_tokens_handler.rs
index 9e2923868..35885c1a8 100644
--- 
a/core/server/src/binary/handlers/personal_access_tokens/get_personal_access_tokens_handler.rs
+++ 
b/core/server/src/binary/handlers/personal_access_tokens/get_personal_access_tokens_handler.rs
@@ -17,12 +17,13 @@
  */
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::personal_access_tokens::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::shard::IggyShard;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
 use err_trail::ErrContext;
 use iggy_common::IggyError;
@@ -31,7 +32,7 @@ use 
iggy_common::get_personal_access_tokens::GetPersonalAccessTokens;
 use std::rc::Rc;
 use tracing::debug;
 
-impl ServerCommandHandler for GetPersonalAccessTokens {
+impl AuthenticatedHandler for GetPersonalAccessTokens {
     fn code(&self) -> u32 {
         iggy_common::GET_PERSONAL_ACCESS_TOKENS_CODE
     }
@@ -40,6 +41,7 @@ impl ServerCommandHandler for GetPersonalAccessTokens {
         self,
         sender: &mut SenderKind,
         _length: u32,
+        _auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
diff --git 
a/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
 
b/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
index 1603aaa6a..73914ffcd 100644
--- 
a/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
+++ 
b/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
@@ -19,19 +19,18 @@ use crate::shard::IggyShard;
 use std::rc::Rc;
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    BinaryServerCommand, HandlerResult, ServerCommand, UnauthenticatedHandler,
 };
 use crate::binary::handlers::personal_access_tokens::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::streaming::session::Session;
-use anyhow::Result;
 use err_trail::ErrContext;
 use 
iggy_common::login_with_personal_access_token::LoginWithPersonalAccessToken;
 use iggy_common::{IggyError, SenderKind};
 use tracing::{debug, instrument};
 
-impl ServerCommandHandler for LoginWithPersonalAccessToken {
+impl UnauthenticatedHandler for LoginWithPersonalAccessToken {
     fn code(&self) -> u32 {
         iggy_common::LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE
     }
diff --git 
a/core/server/src/binary/handlers/segments/delete_segments_handler.rs 
b/core/server/src/binary/handlers/segments/delete_segments_handler.rs
index a4eb20654..8d35670d1 100644
--- a/core/server/src/binary/handlers/segments/delete_segments_handler.rs
+++ b/core/server/src/binary/handlers/segments/delete_segments_handler.rs
@@ -17,7 +17,7 @@
  */
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::partitions::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
@@ -28,8 +28,8 @@ use crate::shard::transmission::message::{
 };
 use crate::state::command::EntryCommand;
 use crate::streaming;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
-use anyhow::Result;
 use err_trail::ErrContext;
 use iggy_common::delete_segments::DeleteSegments;
 use iggy_common::sharding::IggyNamespace;
@@ -37,16 +37,17 @@ use iggy_common::{IggyError, SenderKind};
 use std::rc::Rc;
 use tracing::{debug, instrument};
 
-impl ServerCommandHandler for DeleteSegments {
+impl AuthenticatedHandler for DeleteSegments {
     fn code(&self) -> u32 {
         iggy_common::DELETE_SEGMENTS_CODE
     }
 
-    #[instrument(skip_all, name = "trace_delete_segments", fields(iggy_user_id 
= session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = 
self.stream_id.as_string(), iggy_topic_id = self.topic_id.as_string()))]
+    #[instrument(skip_all, name = "trace_delete_segments", fields(iggy_user_id 
= auth.user_id(), iggy_client_id = session.client_id, iggy_stream_id = 
self.stream_id.as_string(), iggy_topic_id = self.topic_id.as_string()))]
     async fn handle(
         self,
         sender: &mut SenderKind,
         _length: u32,
+        auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
@@ -57,8 +58,6 @@ impl ServerCommandHandler for DeleteSegments {
         let partition_id = self.partition_id as usize;
         let segments_count = self.segments_count;
 
-        // Ensure authentication and topic exists
-        shard.ensure_authenticated(session)?;
         shard.ensure_topic_exists(&stream_id, &topic_id)?;
         shard.ensure_partition_exists(&stream_id, &topic_id, partition_id)?;
 
@@ -114,7 +113,7 @@ impl ServerCommandHandler for DeleteSegments {
         shard
             .state
             .apply(
-                session.get_user_id(),
+                auth.user_id(),
                 &EntryCommand::DeleteSegments(self),
             )
             .await
diff --git a/core/server/src/binary/handlers/streams/create_stream_handler.rs 
b/core/server/src/binary/handlers/streams/create_stream_handler.rs
index c2935ef6a..c008b1d16 100644
--- a/core/server/src/binary/handlers/streams/create_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/create_stream_handler.rs
@@ -17,12 +17,11 @@
  */
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::streams::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
-
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::shard::transmission::frame::ShardResponse;
@@ -32,24 +31,25 @@ use crate::shard::transmission::message::{
 use crate::slab::traits_ext::{EntityComponentSystem, EntityMarker};
 use crate::state::command::EntryCommand;
 use crate::state::models::CreateStreamWithId;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
-use anyhow::Result;
 use err_trail::ErrContext;
 use iggy_common::create_stream::CreateStream;
 use iggy_common::{Identifier, IggyError, SenderKind};
 use std::rc::Rc;
 use tracing::{debug, instrument};
 
-impl ServerCommandHandler for CreateStream {
+impl AuthenticatedHandler for CreateStream {
     fn code(&self) -> u32 {
         iggy_common::CREATE_STREAM_CODE
     }
 
-    #[instrument(skip_all, name = "trace_create_stream", fields(iggy_user_id = 
session.get_user_id(), iggy_client_id = session.client_id))]
+    #[instrument(skip_all, name = "trace_create_stream", fields(iggy_user_id = 
auth.user_id(), iggy_client_id = session.client_id))]
     async fn handle(
         self,
         sender: &mut SenderKind,
         _length: u32,
+        auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
@@ -60,7 +60,7 @@ impl ServerCommandHandler for CreateStream {
             topic_id: Identifier::default(),
             partition_id: 0,
             payload: ShardRequestPayload::CreateStream {
-                user_id: session.get_user_id(),
+                user_id: auth.user_id(),
                 name: self.name.clone(),
             },
         };
@@ -91,7 +91,7 @@ impl ServerCommandHandler for CreateStream {
 
                     shard
                         .state
-                        .apply(session.get_user_id(), 
&EntryCommand::CreateStream(CreateStreamWithId {
+                        .apply(auth.user_id(), 
&EntryCommand::CreateStream(CreateStreamWithId {
                             stream_id: created_stream_id as u32,
                             command: self
                         }))
@@ -116,7 +116,7 @@ impl ServerCommandHandler for CreateStream {
 
                     shard
                         .state
-                        .apply(session.get_user_id(), 
&EntryCommand::CreateStream(CreateStreamWithId {
+                        .apply(auth.user_id(), 
&EntryCommand::CreateStream(CreateStreamWithId {
                             stream_id: created_stream_id as u32,
                             command: self
                         }))
diff --git a/core/server/src/binary/handlers/streams/delete_stream_handler.rs 
b/core/server/src/binary/handlers/streams/delete_stream_handler.rs
index d1e23cb04..43baf411a 100644
--- a/core/server/src/binary/handlers/streams/delete_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/delete_stream_handler.rs
@@ -17,7 +17,7 @@
  */
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::streams::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
@@ -29,8 +29,8 @@ use crate::shard::transmission::message::{
 };
 use crate::slab::traits_ext::EntityMarker;
 use crate::state::command::EntryCommand;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
-use anyhow::Result;
 use err_trail::ErrContext;
 use iggy_common::delete_stream::DeleteStream;
 use iggy_common::{Identifier, IggyError, SenderKind};
@@ -38,16 +38,17 @@ use std::rc::Rc;
 use tracing::info;
 use tracing::{debug, instrument};
 
-impl ServerCommandHandler for DeleteStream {
+impl AuthenticatedHandler for DeleteStream {
     fn code(&self) -> u32 {
         iggy_common::DELETE_STREAM_CODE
     }
 
-    #[instrument(skip_all, name = "trace_delete_stream", fields(iggy_user_id = 
session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = 
self.stream_id.as_string()))]
+    #[instrument(skip_all, name = "trace_delete_stream", fields(iggy_user_id = 
auth.user_id(), iggy_client_id = session.client_id, iggy_stream_id = 
self.stream_id.as_string()))]
     async fn handle(
         self,
         sender: &mut SenderKind,
         _length: u32,
+        auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
@@ -58,7 +59,7 @@ impl ServerCommandHandler for DeleteStream {
             topic_id: Identifier::default(),
             partition_id: 0,
             payload: ShardRequestPayload::DeleteStream {
-                user_id: session.get_user_id(),
+                user_id: auth.user_id(),
                 stream_id: self.stream_id,
             },
         };
@@ -91,7 +92,7 @@ impl ServerCommandHandler for DeleteStream {
 
                     shard
                         .state
-                        .apply(session.get_user_id(), 
&EntryCommand::DeleteStream(DeleteStream { stream_id: stream_id.clone() }))
+                        .apply(auth.user_id(), 
&EntryCommand::DeleteStream(DeleteStream { stream_id: stream_id.clone() }))
                         .await
                         .error(|e: &IggyError| {
                             format!("{COMPONENT} (error: {e}) - failed to 
apply delete stream with ID: {stream_id}, session: {session}")
@@ -103,7 +104,7 @@ impl ServerCommandHandler for DeleteStream {
                 ShardResponse::DeleteStreamResponse(stream) => {
                     shard
                         .state
-                        .apply(session.get_user_id(), 
&EntryCommand::DeleteStream(DeleteStream { stream_id: (stream.id() as 
u32).try_into().unwrap() }))
+                        .apply(auth.user_id(), 
&EntryCommand::DeleteStream(DeleteStream { stream_id: (stream.id() as 
u32).try_into().unwrap() }))
                         .await
                         .error(|e: &IggyError| {
                             format!("{COMPONENT} (error: {e}) - failed to 
apply delete stream with ID: {stream_id}, session: {session}")
diff --git a/core/server/src/binary/handlers/streams/get_stream_handler.rs 
b/core/server/src/binary/handlers/streams/get_stream_handler.rs
index fe9cd1f83..6a22da3db 100644
--- a/core/server/src/binary/handlers/streams/get_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/get_stream_handler.rs
@@ -17,15 +17,15 @@
  */
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::shard::IggyShard;
 use crate::slab::traits_ext::EntityComponentSystem;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
 use crate::streaming::streams;
-use anyhow::Result;
 use err_trail::ErrContext;
 use iggy_common::IggyError;
 use iggy_common::SenderKind;
@@ -33,7 +33,7 @@ use iggy_common::get_stream::GetStream;
 use std::rc::Rc;
 use tracing::debug;
 
-impl ServerCommandHandler for GetStream {
+impl AuthenticatedHandler for GetStream {
     fn code(&self) -> u32 {
         iggy_common::GET_STREAM_CODE
     }
@@ -42,11 +42,11 @@ impl ServerCommandHandler for GetStream {
         self,
         sender: &mut SenderKind,
         _length: u32,
+        auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
-        shard.ensure_authenticated(session)?;
         let exists = shard.ensure_stream_exists(&self.stream_id).is_ok();
         if !exists {
             sender.send_empty_ok_response().await?;
@@ -58,12 +58,12 @@ impl ServerCommandHandler for GetStream {
         let has_permission = shard
             .permissioner
             .borrow()
-            .get_stream(session.get_user_id(), stream_id)
+            .get_stream(auth.user_id(), stream_id)
             .error(|e: &IggyError| {
                 format!(
                     "permission denied to get stream with ID: {} for user with 
ID: {}, error: {e}",
                     self.stream_id,
-                    session.get_user_id(),
+                    auth.user_id(),
                 )
             })
             .is_ok();
diff --git a/core/server/src/binary/handlers/streams/get_streams_handler.rs 
b/core/server/src/binary/handlers/streams/get_streams_handler.rs
index f7bf06f2d..a7a39641c 100644
--- a/core/server/src/binary/handlers/streams/get_streams_handler.rs
+++ b/core/server/src/binary/handlers/streams/get_streams_handler.rs
@@ -17,15 +17,15 @@
  */
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::streams::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::shard::IggyShard;
 use crate::slab::traits_ext::{EntityComponentSystem, IntoComponents};
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
-use anyhow::Result;
 use err_trail::ErrContext;
 use iggy_common::IggyError;
 use iggy_common::SenderKind;
@@ -33,7 +33,7 @@ use iggy_common::get_streams::GetStreams;
 use std::rc::Rc;
 use tracing::debug;
 
-impl ServerCommandHandler for GetStreams {
+impl AuthenticatedHandler for GetStreams {
     fn code(&self) -> u32 {
         iggy_common::GET_STREAMS_CODE
     }
@@ -42,19 +42,19 @@ impl ServerCommandHandler for GetStreams {
         self,
         sender: &mut SenderKind,
         _length: u32,
+        auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
-        shard.ensure_authenticated(session)?;
         shard
             .permissioner
             .borrow()
-            .get_streams(session.get_user_id())
+            .get_streams(auth.user_id())
             .error(|e: &IggyError| {
                 format!(
                     "{COMPONENT} (error: {e}) - permission denied to get 
streams for user {}",
-                    session.get_user_id()
+                    auth.user_id()
                 )
             })?;
 
diff --git a/core/server/src/binary/handlers/streams/purge_stream_handler.rs 
b/core/server/src/binary/handlers/streams/purge_stream_handler.rs
index bfa189613..dd8c00ffb 100644
--- a/core/server/src/binary/handlers/streams/purge_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/purge_stream_handler.rs
@@ -17,32 +17,32 @@
  */
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::streams::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
-
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::state::command::EntryCommand;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
-use anyhow::Result;
 use err_trail::ErrContext;
 use iggy_common::purge_stream::PurgeStream;
 use iggy_common::{IggyError, SenderKind};
 use std::rc::Rc;
 use tracing::{debug, instrument};
 
-impl ServerCommandHandler for PurgeStream {
+impl AuthenticatedHandler for PurgeStream {
     fn code(&self) -> u32 {
         iggy_common::PURGE_STREAM_CODE
     }
 
-    #[instrument(skip_all, name = "trace_purge_stream", fields(iggy_user_id = 
session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = 
self.stream_id.as_string()))]
+    #[instrument(skip_all, name = "trace_purge_stream", fields(iggy_user_id = 
auth.user_id(), iggy_client_id = session.client_id, iggy_stream_id = 
self.stream_id.as_string()))]
     async fn handle(
         self,
         sender: &mut SenderKind,
         _length: u32,
+        auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
@@ -60,7 +60,7 @@ impl ServerCommandHandler for PurgeStream {
         shard.broadcast_event_to_all_shards(event).await?;
         shard
             .state
-            .apply(session.get_user_id(), &EntryCommand::PurgeStream(self))
+            .apply(auth.user_id(), &EntryCommand::PurgeStream(self))
             .await
             .error(|e: &IggyError| {
                 format!("{COMPONENT} (error: {e}) - failed to apply purge 
stream with id: {stream_id}, session: {session}")
diff --git a/core/server/src/binary/handlers/streams/update_stream_handler.rs 
b/core/server/src/binary/handlers/streams/update_stream_handler.rs
index 8e4de6574..84c55b679 100644
--- a/core/server/src/binary/handlers/streams/update_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/update_stream_handler.rs
@@ -17,32 +17,32 @@
  */
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::streams::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
-
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::state::command::EntryCommand;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
-use anyhow::Result;
 use err_trail::ErrContext;
 use iggy_common::update_stream::UpdateStream;
 use iggy_common::{IggyError, SenderKind};
 use std::rc::Rc;
 use tracing::{debug, instrument};
 
-impl ServerCommandHandler for UpdateStream {
+impl AuthenticatedHandler for UpdateStream {
     fn code(&self) -> u32 {
         iggy_common::UPDATE_STREAM_CODE
     }
 
-    #[instrument(skip_all, name = "trace_update_stream", fields(iggy_user_id = 
session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = 
self.stream_id.as_string()))]
+    #[instrument(skip_all, name = "trace_update_stream", fields(iggy_user_id = 
auth.user_id(), iggy_client_id = session.client_id, iggy_stream_id = 
self.stream_id.as_string()))]
     async fn handle(
         self,
         sender: &mut SenderKind,
         _length: u32,
+        auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
@@ -61,7 +61,7 @@ impl ServerCommandHandler for UpdateStream {
         shard.broadcast_event_to_all_shards(event).await?;
         shard
             .state
-            .apply(session.get_user_id(), &EntryCommand::UpdateStream(self))
+            .apply(auth.user_id(), &EntryCommand::UpdateStream(self))
             .await
             .error(|e: &IggyError| {
                 format!("{COMPONENT} (error: {e}) - failed to apply update 
stream with id: {stream_id}, session: {session}")
diff --git a/core/server/src/binary/handlers/system/get_client_handler.rs 
b/core/server/src/binary/handlers/system/get_client_handler.rs
index b9e16aef3..da71329bd 100644
--- a/core/server/src/binary/handlers/system/get_client_handler.rs
+++ b/core/server/src/binary/handlers/system/get_client_handler.rs
@@ -17,11 +17,12 @@
  */
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::shard::IggyShard;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
 use iggy_common::IggyError;
 use iggy_common::SenderKind;
@@ -29,7 +30,7 @@ use iggy_common::get_client::GetClient;
 use std::rc::Rc;
 use tracing::debug;
 
-impl ServerCommandHandler for GetClient {
+impl AuthenticatedHandler for GetClient {
     fn code(&self) -> u32 {
         iggy_common::GET_CLIENT_CODE
     }
@@ -38,6 +39,7 @@ impl ServerCommandHandler for GetClient {
         self,
         sender: &mut SenderKind,
         _length: u32,
+        _auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
diff --git a/core/server/src/binary/handlers/system/get_clients_handler.rs 
b/core/server/src/binary/handlers/system/get_clients_handler.rs
index 81ac21b5f..6288cf969 100644
--- a/core/server/src/binary/handlers/system/get_clients_handler.rs
+++ b/core/server/src/binary/handlers/system/get_clients_handler.rs
@@ -17,12 +17,13 @@
  */
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::system::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::shard::IggyShard;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
 use err_trail::ErrContext;
 use iggy_common::IggyError;
@@ -31,7 +32,7 @@ use iggy_common::get_clients::GetClients;
 use std::rc::Rc;
 use tracing::debug;
 
-impl ServerCommandHandler for GetClients {
+impl AuthenticatedHandler for GetClients {
     fn code(&self) -> u32 {
         iggy_common::GET_CLIENTS_CODE
     }
@@ -40,6 +41,7 @@ impl ServerCommandHandler for GetClients {
         self,
         sender: &mut SenderKind,
         _length: u32,
+        _auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
diff --git a/core/server/src/binary/handlers/system/get_me_handler.rs 
b/core/server/src/binary/handlers/system/get_me_handler.rs
index 339063ab3..ba22c9660 100644
--- a/core/server/src/binary/handlers/system/get_me_handler.rs
+++ b/core/server/src/binary/handlers/system/get_me_handler.rs
@@ -17,12 +17,13 @@
  */
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::system::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::shard::IggyShard;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
 use err_trail::ErrContext;
 use iggy_common::IggyError;
@@ -30,7 +31,7 @@ use iggy_common::SenderKind;
 use iggy_common::get_me::GetMe;
 use std::rc::Rc;
 
-impl ServerCommandHandler for GetMe {
+impl AuthenticatedHandler for GetMe {
     fn code(&self) -> u32 {
         iggy_common::GET_ME_CODE
     }
@@ -39,6 +40,7 @@ impl ServerCommandHandler for GetMe {
         self,
         sender: &mut SenderKind,
         _length: u32,
+        _auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
diff --git a/core/server/src/binary/handlers/system/get_snapshot.rs 
b/core/server/src/binary/handlers/system/get_snapshot.rs
index cc2163b91..3e6f76ba9 100644
--- a/core/server/src/binary/handlers/system/get_snapshot.rs
+++ b/core/server/src/binary/handlers/system/get_snapshot.rs
@@ -17,10 +17,11 @@
  */
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::shard::IggyShard;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
 use bytes::Bytes;
 use iggy_common::IggyError;
@@ -29,7 +30,7 @@ use iggy_common::get_snapshot::GetSnapshot;
 use std::rc::Rc;
 use tracing::debug;
 
-impl ServerCommandHandler for GetSnapshot {
+impl AuthenticatedHandler for GetSnapshot {
     fn code(&self) -> u32 {
         iggy_common::GET_SNAPSHOT_FILE_CODE
     }
@@ -38,6 +39,7 @@ impl ServerCommandHandler for GetSnapshot {
         self,
         sender: &mut SenderKind,
         _length: u32,
+        _auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
diff --git a/core/server/src/binary/handlers/system/get_stats_handler.rs 
b/core/server/src/binary/handlers/system/get_stats_handler.rs
index 2fe368ce1..2284295c1 100644
--- a/core/server/src/binary/handlers/system/get_stats_handler.rs
+++ b/core/server/src/binary/handlers/system/get_stats_handler.rs
@@ -17,7 +17,7 @@
  */
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::system::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
@@ -27,6 +27,7 @@ use crate::shard::transmission::frame::ShardResponse;
 use crate::shard::transmission::message::{
     ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult,
 };
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
 use err_trail::ErrContext;
 use iggy_common::SenderKind;
@@ -35,7 +36,7 @@ use iggy_common::{Identifier, IggyError};
 use std::rc::Rc;
 use tracing::debug;
 
-impl ServerCommandHandler for GetStats {
+impl AuthenticatedHandler for GetStats {
     fn code(&self) -> u32 {
         iggy_common::GET_STATS_CODE
     }
@@ -44,6 +45,7 @@ impl ServerCommandHandler for GetStats {
         self,
         sender: &mut SenderKind,
         _length: u32,
+        auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
@@ -55,7 +57,7 @@ impl ServerCommandHandler for GetStats {
             topic_id: Identifier::default(),
             partition_id: 0,
             payload: ShardRequestPayload::GetStats {
-                user_id: session.get_user_id(),
+                user_id: auth.user_id(),
             },
         };
 
diff --git a/core/server/src/binary/handlers/system/ping_handler.rs 
b/core/server/src/binary/handlers/system/ping_handler.rs
index e22fa7ba8..a7d777c28 100644
--- a/core/server/src/binary/handlers/system/ping_handler.rs
+++ b/core/server/src/binary/handlers/system/ping_handler.rs
@@ -16,10 +16,9 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, HandlerResult, 
ServerCommandHandler};
+use crate::binary::command::{BinaryServerCommand, HandlerResult, 
UnauthenticatedHandler};
 use crate::shard::IggyShard;
 use crate::streaming::session::Session;
-use anyhow::Result;
 use iggy_common::IggyError;
 use iggy_common::IggyTimestamp;
 use iggy_common::SenderKind;
@@ -27,7 +26,7 @@ use iggy_common::ping::Ping;
 use std::rc::Rc;
 use tracing::debug;
 
-impl ServerCommandHandler for Ping {
+impl UnauthenticatedHandler for Ping {
     fn code(&self) -> u32 {
         iggy_common::PING_CODE
     }
diff --git a/core/server/src/binary/handlers/topics/create_topic_handler.rs 
b/core/server/src/binary/handlers/topics/create_topic_handler.rs
index fa5c0270a..d8e85b541 100644
--- a/core/server/src/binary/handlers/topics/create_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/create_topic_handler.rs
@@ -17,12 +17,11 @@
  */
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::topics::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
-
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::shard::transmission::frame::ShardResponse;
@@ -32,25 +31,26 @@ use crate::shard::transmission::message::{
 use crate::slab::traits_ext::EntityMarker;
 use crate::state::command::EntryCommand;
 use crate::state::models::CreateTopicWithId;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
 use crate::streaming::streams;
-use anyhow::Result;
 use err_trail::ErrContext;
 use iggy_common::create_topic::CreateTopic;
 use iggy_common::{Identifier, IggyError, SenderKind};
 use std::rc::Rc;
 use tracing::{debug, instrument};
 
-impl ServerCommandHandler for CreateTopic {
+impl AuthenticatedHandler for CreateTopic {
     fn code(&self) -> u32 {
         iggy_common::CREATE_TOPIC_CODE
     }
 
-    #[instrument(skip_all, name = "trace_create_topic", fields(iggy_user_id = 
session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = 
self.stream_id.as_string()))]
+    #[instrument(skip_all, name = "trace_create_topic", fields(iggy_user_id = 
auth.user_id(), iggy_client_id = session.client_id, iggy_stream_id = 
self.stream_id.as_string()))]
     async fn handle(
         mut self,
         sender: &mut SenderKind,
         _length: u32,
+        auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
@@ -61,7 +61,7 @@ impl ServerCommandHandler for CreateTopic {
             topic_id: Identifier::default(),
             partition_id: 0,
             payload: ShardRequestPayload::CreateTopic {
-                user_id: session.get_user_id(),
+                user_id: auth.user_id(),
                 stream_id: self.stream_id.clone(),
                 name: self.name.clone(),
                 partitions_count: self.partitions_count,
@@ -137,7 +137,7 @@ impl ServerCommandHandler for CreateTopic {
                     );
                     shard
                         .state
-                        .apply(session.get_user_id(), 
&EntryCommand::CreateTopic(CreateTopicWithId {
+                        .apply(auth.user_id(), 
&EntryCommand::CreateTopic(CreateTopicWithId {
                             topic_id: topic_id as u32,
                             command: self
                         }))
@@ -172,7 +172,7 @@ impl ServerCommandHandler for CreateTopic {
 
                     shard
                         .state
-                        .apply(session.get_user_id(), 
&EntryCommand::CreateTopic(CreateTopicWithId {
+                        .apply(auth.user_id(), 
&EntryCommand::CreateTopic(CreateTopicWithId {
                             topic_id: topic_id as u32,
                             command: self
                         }))
diff --git a/core/server/src/binary/handlers/topics/delete_topic_handler.rs 
b/core/server/src/binary/handlers/topics/delete_topic_handler.rs
index 668dbd707..de3545bb1 100644
--- a/core/server/src/binary/handlers/topics/delete_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/delete_topic_handler.rs
@@ -17,11 +17,10 @@
  */
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::topics::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
-
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::shard::transmission::frame::ShardResponse;
@@ -30,9 +29,9 @@ use crate::shard::transmission::message::{
 };
 use crate::slab::traits_ext::EntityMarker;
 use crate::state::command::EntryCommand;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
 use crate::streaming::streams;
-use anyhow::Result;
 use err_trail::ErrContext;
 use iggy_common::delete_topic::DeleteTopic;
 use iggy_common::{Identifier, IggyError, SenderKind};
@@ -40,16 +39,17 @@ use std::rc::Rc;
 use tracing::info;
 use tracing::{debug, instrument};
 
-impl ServerCommandHandler for DeleteTopic {
+impl AuthenticatedHandler for DeleteTopic {
     fn code(&self) -> u32 {
         iggy_common::DELETE_TOPIC_CODE
     }
 
-    #[instrument(skip_all, name = "trace_delete_topic", fields(iggy_user_id = 
session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = 
self.stream_id.as_string(), iggy_topic_id = self.topic_id.as_string()))]
+    #[instrument(skip_all, name = "trace_delete_topic", fields(iggy_user_id = 
auth.user_id(), iggy_client_id = session.client_id, iggy_stream_id = 
self.stream_id.as_string(), iggy_topic_id = self.topic_id.as_string()))]
     async fn handle(
         self,
         sender: &mut SenderKind,
         _length: u32,
+        auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
@@ -60,7 +60,7 @@ impl ServerCommandHandler for DeleteTopic {
             topic_id: Identifier::default(),
             partition_id: 0,
             payload: ShardRequestPayload::DeleteTopic {
-                user_id: session.get_user_id(),
+                user_id: auth.user_id(),
                 stream_id: self.stream_id.clone(),
                 topic_id: self.topic_id.clone(),
             },
@@ -100,7 +100,7 @@ impl ServerCommandHandler for DeleteTopic {
 
                     shard
                         .state
-                        .apply(session.get_user_id(), 
&EntryCommand::DeleteTopic(self))
+                        .apply(auth.user_id(), 
&EntryCommand::DeleteTopic(self))
                         .await
                         .error(|e: &IggyError| format!(
                             "{COMPONENT} (error: {e}) - failed to apply delete 
topic with ID: {topic_id_num} in stream with ID: {stream_id_num}, session: 
{session}",
@@ -116,7 +116,7 @@ impl ServerCommandHandler for DeleteTopic {
                 ShardResponse::DeleteTopicResponse(topic) => {
                     shard
                         .state
-                        .apply(session.get_user_id(), 
&EntryCommand::DeleteTopic(self))
+                        .apply(auth.user_id(), 
&EntryCommand::DeleteTopic(self))
                         .await
                         .error(|e: &IggyError| format!(
                             "{COMPONENT} (error: {e}) - failed to apply delete 
topic with ID: {}, session: {session}",
diff --git a/core/server/src/binary/handlers/topics/get_topic_handler.rs 
b/core/server/src/binary/handlers/topics/get_topic_handler.rs
index fcc42ac0b..c4a211f08 100644
--- a/core/server/src/binary/handlers/topics/get_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/get_topic_handler.rs
@@ -17,21 +17,21 @@
  */
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::shard::IggyShard;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
 use crate::streaming::streams;
-use anyhow::Result;
 use iggy_common::IggyError;
 use iggy_common::SenderKind;
 use iggy_common::get_topic::GetTopic;
 use std::rc::Rc;
 use tracing::debug;
 
-impl ServerCommandHandler for GetTopic {
+impl AuthenticatedHandler for GetTopic {
     fn code(&self) -> u32 {
         iggy_common::GET_TOPIC_CODE
     }
@@ -40,11 +40,11 @@ impl ServerCommandHandler for GetTopic {
         self,
         sender: &mut SenderKind,
         _length: u32,
+        auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
-        shard.ensure_authenticated(session)?;
         let exists = shard
             .ensure_topic_exists(&self.stream_id, &self.topic_id)
             .is_ok();
@@ -60,7 +60,7 @@ impl ServerCommandHandler for GetTopic {
             .permissioner
             .borrow()
             .get_topic(
-                session.get_user_id(),
+                auth.user_id(),
                 numeric_stream_id,
                 self.topic_id
                     .get_u32_value()
diff --git a/core/server/src/binary/handlers/topics/get_topics_handler.rs 
b/core/server/src/binary/handlers/topics/get_topics_handler.rs
index 96c3f9325..73e6ea692 100644
--- a/core/server/src/binary/handlers/topics/get_topics_handler.rs
+++ b/core/server/src/binary/handlers/topics/get_topics_handler.rs
@@ -17,22 +17,22 @@
  */
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::shard::IggyShard;
 use crate::slab::traits_ext::{EntityComponentSystem, IntoComponents};
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
 use crate::streaming::streams;
-use anyhow::Result;
 use iggy_common::IggyError;
 use iggy_common::SenderKind;
 use iggy_common::get_topics::GetTopics;
 use std::rc::Rc;
 use tracing::debug;
 
-impl ServerCommandHandler for GetTopics {
+impl AuthenticatedHandler for GetTopics {
     fn code(&self) -> u32 {
         iggy_common::GET_TOPICS_CODE
     }
@@ -41,11 +41,11 @@ impl ServerCommandHandler for GetTopics {
         self,
         sender: &mut SenderKind,
         _length: u32,
+        auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
-        shard.ensure_authenticated(session)?;
         shard.ensure_stream_exists(&self.stream_id)?;
         let numeric_stream_id = shard
             .streams
@@ -53,7 +53,7 @@ impl ServerCommandHandler for GetTopics {
         shard
             .permissioner
             .borrow()
-            .get_topics(session.get_user_id(), numeric_stream_id)?;
+            .get_topics(auth.user_id(), numeric_stream_id)?;
 
         let response = shard.streams.with_topics(&self.stream_id, |topics| {
             topics.with_components(|topics| {
diff --git a/core/server/src/binary/handlers/topics/purge_topic_handler.rs 
b/core/server/src/binary/handlers/topics/purge_topic_handler.rs
index 6e86293ae..950c8d421 100644
--- a/core/server/src/binary/handlers/topics/purge_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/purge_topic_handler.rs
@@ -17,30 +17,31 @@
  */
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::topics::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::shard::IggyShard;
 use crate::state::command::EntryCommand;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
-use anyhow::Result;
 use err_trail::ErrContext;
 use iggy_common::purge_topic::PurgeTopic;
 use iggy_common::{IggyError, SenderKind};
 use std::rc::Rc;
 use tracing::{debug, instrument};
 
-impl ServerCommandHandler for PurgeTopic {
+impl AuthenticatedHandler for PurgeTopic {
     fn code(&self) -> u32 {
         iggy_common::PURGE_TOPIC_CODE
     }
 
-    #[instrument(skip_all, name = "trace_purge_topic", fields(iggy_user_id = 
session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = 
self.stream_id.as_string(), iggy_topic_id = self.topic_id.as_string()))]
+    #[instrument(skip_all, name = "trace_purge_topic", fields(iggy_user_id = 
auth.user_id(), iggy_client_id = session.client_id, iggy_stream_id = 
self.stream_id.as_string(), iggy_topic_id = self.topic_id.as_string()))]
     async fn handle(
         self,
         sender: &mut SenderKind,
         _length: u32,
+        auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
@@ -66,7 +67,7 @@ impl ServerCommandHandler for PurgeTopic {
 
         shard
             .state
-            .apply(session.get_user_id(), &EntryCommand::PurgeTopic(self))
+            .apply(auth.user_id(), &EntryCommand::PurgeTopic(self))
             .await
             .error(|e: &IggyError| {
                 format!(
diff --git a/core/server/src/binary/handlers/topics/update_topic_handler.rs 
b/core/server/src/binary/handlers/topics/update_topic_handler.rs
index 90f5e11ec..33f31398d 100644
--- a/core/server/src/binary/handlers/topics/update_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/update_topic_handler.rs
@@ -17,11 +17,10 @@
  */
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::topics::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
-
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::shard::transmission::frame::ShardResponse;
@@ -29,25 +28,26 @@ use crate::shard::transmission::message::{
     ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult,
 };
 use crate::state::command::EntryCommand;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
 use crate::streaming::{streams, topics};
-use anyhow::Result;
 use err_trail::ErrContext;
 use iggy_common::update_topic::UpdateTopic;
 use iggy_common::{Identifier, IggyError, SenderKind};
 use std::rc::Rc;
 use tracing::{debug, instrument};
 
-impl ServerCommandHandler for UpdateTopic {
+impl AuthenticatedHandler for UpdateTopic {
     fn code(&self) -> u32 {
         iggy_common::UPDATE_TOPIC_CODE
     }
 
-    #[instrument(skip_all, name = "trace_update_topic", fields(iggy_user_id = 
session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = 
self.stream_id.as_string(), iggy_topic_id = self.topic_id.as_string()))]
+    #[instrument(skip_all, name = "trace_update_topic", fields(iggy_user_id = 
auth.user_id(), iggy_client_id = session.client_id, iggy_stream_id = 
self.stream_id.as_string(), iggy_topic_id = self.topic_id.as_string()))]
     async fn handle(
         mut self,
         sender: &mut SenderKind,
         _length: u32,
+        auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
@@ -58,7 +58,7 @@ impl ServerCommandHandler for UpdateTopic {
             topic_id: Identifier::default(),
             partition_id: 0,
             payload: ShardRequestPayload::UpdateTopic {
-                user_id: session.get_user_id(),
+                user_id: auth.user_id(),
                 stream_id: self.stream_id.clone(),
                 topic_id: self.topic_id.clone(),
                 name: self.name.clone(),
@@ -130,7 +130,7 @@ impl ServerCommandHandler for UpdateTopic {
 
                     shard
                         .state
-                        .apply(session.get_user_id(), 
&EntryCommand::UpdateTopic(self))
+                        .apply(auth.user_id(), 
&EntryCommand::UpdateTopic(self))
                         .await
                         .error(|e: &IggyError| format!(
                             "{COMPONENT} (error: {e}) - failed to apply update 
topic with id: {topic_id} in stream with ID: {stream_id_num}, session: 
{session}"
@@ -150,7 +150,7 @@ impl ServerCommandHandler for UpdateTopic {
 
                     shard
                         .state
-                        .apply(session.get_user_id(), 
&EntryCommand::UpdateTopic(self))
+                        .apply(auth.user_id(), 
&EntryCommand::UpdateTopic(self))
                         .await
                         .error(|e: &IggyError| format!(
                             "{COMPONENT} (error: {e}) - failed to apply update 
topic in stream with ID: {stream_id}, session: {session}"
diff --git a/core/server/src/binary/handlers/users/change_password_handler.rs 
b/core/server/src/binary/handlers/users/change_password_handler.rs
index 0dadd98bb..98c7298dc 100644
--- a/core/server/src/binary/handlers/users/change_password_handler.rs
+++ b/core/server/src/binary/handlers/users/change_password_handler.rs
@@ -17,17 +17,16 @@
  */
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::users::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
-
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::state::command::EntryCommand;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
 use crate::streaming::utils::crypto;
-use anyhow::Result;
 use err_trail::ErrContext;
 use iggy_common::change_password::ChangePassword;
 use iggy_common::{IggyError, SenderKind};
@@ -35,16 +34,17 @@ use std::rc::Rc;
 use tracing::info;
 use tracing::{debug, instrument};
 
-impl ServerCommandHandler for ChangePassword {
+impl AuthenticatedHandler for ChangePassword {
     fn code(&self) -> u32 {
         iggy_common::CHANGE_PASSWORD_CODE
     }
 
-    #[instrument(skip_all, name = "trace_change_password", fields(iggy_user_id 
= session.get_user_id(), iggy_client_id = session.client_id))]
+    #[instrument(skip_all, name = "trace_change_password", fields(iggy_user_id 
= auth.user_id(), iggy_client_id = session.client_id))]
     async fn handle(
         self,
         sender: &mut SenderKind,
         _length: u32,
+        auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
@@ -78,7 +78,7 @@ impl ServerCommandHandler for ChangePassword {
         shard
             .state
             .apply(
-                session.get_user_id(),
+                auth.user_id(),
                 &EntryCommand::ChangePassword(ChangePassword {
                     user_id: self.user_id.to_owned(),
                     current_password: "".into(),
diff --git a/core/server/src/binary/handlers/users/create_user_handler.rs 
b/core/server/src/binary/handlers/users/create_user_handler.rs
index 3cace71b3..9ee8819b6 100644
--- a/core/server/src/binary/handlers/users/create_user_handler.rs
+++ b/core/server/src/binary/handlers/users/create_user_handler.rs
@@ -17,7 +17,7 @@
  */
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::users::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
@@ -30,9 +30,9 @@ use crate::shard::transmission::message::{
 };
 use crate::state::command::EntryCommand;
 use crate::state::models::CreateUserWithId;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
 use crate::streaming::utils::crypto;
-use anyhow::Result;
 use err_trail::ErrContext;
 use iggy_common::create_user::CreateUser;
 use iggy_common::{Identifier, IggyError, SenderKind};
@@ -40,16 +40,17 @@ use std::rc::Rc;
 use tracing::debug;
 use tracing::instrument;
 
-impl ServerCommandHandler for CreateUser {
+impl AuthenticatedHandler for CreateUser {
     fn code(&self) -> u32 {
         iggy_common::CREATE_USER_CODE
     }
 
-    #[instrument(skip_all, name = "trace_create_user", fields(iggy_user_id = 
session.get_user_id(), iggy_client_id = session.client_id))]
+    #[instrument(skip_all, name = "trace_create_user", fields(iggy_user_id = 
auth.user_id(), iggy_client_id = session.client_id))]
     async fn handle(
         self,
         sender: &mut SenderKind,
         _length: u32,
+        auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
@@ -60,7 +61,7 @@ impl ServerCommandHandler for CreateUser {
             topic_id: Identifier::default(),
             partition_id: 0,
             payload: ShardRequestPayload::CreateUser {
-                user_id: session.get_user_id(),
+                user_id: auth.user_id(),
                 username: self.username.clone(),
                 password: self.password.clone(),
                 status: self.status,
@@ -106,7 +107,7 @@ impl ServerCommandHandler for CreateUser {
                     shard
                         .state
                         .apply(
-                            session.get_user_id(),
+                            auth.user_id(),
                             &EntryCommand::CreateUser(CreateUserWithId {
                                 user_id,
                                 command: CreateUser {
@@ -140,7 +141,7 @@ impl ServerCommandHandler for CreateUser {
                     shard
                         .state
                         .apply(
-                            session.get_user_id(),
+                            auth.user_id(),
                             &EntryCommand::CreateUser(CreateUserWithId {
                                 user_id,
                                 command: CreateUser {
diff --git a/core/server/src/binary/handlers/users/delete_user_handler.rs 
b/core/server/src/binary/handlers/users/delete_user_handler.rs
index a7fa3ec1b..16f5ccdb6 100644
--- a/core/server/src/binary/handlers/users/delete_user_handler.rs
+++ b/core/server/src/binary/handlers/users/delete_user_handler.rs
@@ -19,11 +19,10 @@
 use std::rc::Rc;
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::users::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
-
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::shard::transmission::frame::ShardResponse;
@@ -31,24 +30,25 @@ use crate::shard::transmission::message::{
     ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult,
 };
 use crate::state::command::EntryCommand;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
-use anyhow::Result;
 use err_trail::ErrContext;
 use iggy_common::delete_user::DeleteUser;
 use iggy_common::{Identifier, IggyError, SenderKind};
 use tracing::info;
 use tracing::{debug, instrument};
 
-impl ServerCommandHandler for DeleteUser {
+impl AuthenticatedHandler for DeleteUser {
     fn code(&self) -> u32 {
         iggy_common::DELETE_USER_CODE
     }
 
-    #[instrument(skip_all, name = "trace_delete_user", fields(iggy_user_id = 
session.get_user_id(), iggy_client_id = session.client_id))]
+    #[instrument(skip_all, name = "trace_delete_user", fields(iggy_user_id = 
auth.user_id(), iggy_client_id = session.client_id))]
     async fn handle(
         self,
         sender: &mut SenderKind,
         _length: u32,
+        auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
@@ -59,7 +59,7 @@ impl ServerCommandHandler for DeleteUser {
             topic_id: Identifier::default(),
             partition_id: 0,
             payload: ShardRequestPayload::DeleteUser {
-                session_user_id: session.get_user_id(),
+                session_user_id: auth.user_id(),
                 user_id: self.user_id,
             },
         };
diff --git a/core/server/src/binary/handlers/users/get_user_handler.rs 
b/core/server/src/binary/handlers/users/get_user_handler.rs
index 97322a340..d34792b4e 100644
--- a/core/server/src/binary/handlers/users/get_user_handler.rs
+++ b/core/server/src/binary/handlers/users/get_user_handler.rs
@@ -19,18 +19,19 @@
 use std::rc::Rc;
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::shard::IggyShard;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
 use iggy_common::IggyError;
 use iggy_common::SenderKind;
 use iggy_common::get_user::GetUser;
 use tracing::debug;
 
-impl ServerCommandHandler for GetUser {
+impl AuthenticatedHandler for GetUser {
     fn code(&self) -> u32 {
         iggy_common::GET_USER_CODE
     }
@@ -39,6 +40,7 @@ impl ServerCommandHandler for GetUser {
         self,
         sender: &mut SenderKind,
         _length: u32,
+        _auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
diff --git a/core/server/src/binary/handlers/users/get_users_handler.rs 
b/core/server/src/binary/handlers/users/get_users_handler.rs
index bca683a0a..350d9de6d 100644
--- a/core/server/src/binary/handlers/users/get_users_handler.rs
+++ b/core/server/src/binary/handlers/users/get_users_handler.rs
@@ -19,12 +19,13 @@
 use std::rc::Rc;
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::users::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::shard::IggyShard;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
 use err_trail::ErrContext;
 use iggy_common::IggyError;
@@ -32,7 +33,7 @@ use iggy_common::SenderKind;
 use iggy_common::get_users::GetUsers;
 use tracing::debug;
 
-impl ServerCommandHandler for GetUsers {
+impl AuthenticatedHandler for GetUsers {
     fn code(&self) -> u32 {
         iggy_common::GET_USERS_CODE
     }
@@ -41,6 +42,7 @@ impl ServerCommandHandler for GetUsers {
         self,
         sender: &mut SenderKind,
         _length: u32,
+        _auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
diff --git a/core/server/src/binary/handlers/users/login_user_handler.rs 
b/core/server/src/binary/handlers/users/login_user_handler.rs
index 550be02b7..ed7fa3252 100644
--- a/core/server/src/binary/handlers/users/login_user_handler.rs
+++ b/core/server/src/binary/handlers/users/login_user_handler.rs
@@ -17,21 +17,20 @@
  */
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    BinaryServerCommand, HandlerResult, ServerCommand, UnauthenticatedHandler,
 };
 use crate::binary::handlers::users::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::shard::IggyShard;
 use crate::streaming::session::Session;
-use anyhow::Result;
 use err_trail::ErrContext;
 use iggy_common::login_user::LoginUser;
 use iggy_common::{IggyError, SenderKind};
 use std::rc::Rc;
 use tracing::{debug, info, instrument, warn};
 
-impl ServerCommandHandler for LoginUser {
+impl UnauthenticatedHandler for LoginUser {
     fn code(&self) -> u32 {
         iggy_common::LOGIN_USER_CODE
     }
diff --git a/core/server/src/binary/handlers/users/logout_user_handler.rs 
b/core/server/src/binary/handlers/users/logout_user_handler.rs
index 714815173..995b55422 100644
--- a/core/server/src/binary/handlers/users/logout_user_handler.rs
+++ b/core/server/src/binary/handlers/users/logout_user_handler.rs
@@ -19,39 +19,39 @@
 use std::rc::Rc;
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::users::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
-
 use crate::shard::IggyShard;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
-use anyhow::Result;
 use err_trail::ErrContext;
 use iggy_common::logout_user::LogoutUser;
 use iggy_common::{IggyError, SenderKind};
 use tracing::info;
 use tracing::{debug, instrument};
 
-impl ServerCommandHandler for LogoutUser {
+impl AuthenticatedHandler for LogoutUser {
     fn code(&self) -> u32 {
         iggy_common::LOGOUT_USER_CODE
     }
 
-    #[instrument(skip_all, name = "trace_logout_user", fields(iggy_user_id = 
session.get_user_id(), iggy_client_id = session.client_id))]
+    #[instrument(skip_all, name = "trace_logout_user", fields(iggy_user_id = 
auth.user_id(), iggy_client_id = session.client_id))]
     async fn handle(
         self,
         sender: &mut SenderKind,
         _length: u32,
+        auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
-        info!("Logging out user with ID: {}...", session.get_user_id());
+        info!("Logging out user with ID: {}...", auth.user_id());
         shard.logout_user(session).error(|e: &IggyError| {
             format!("{COMPONENT} (error: {e}) - failed to logout user, 
session: {session}")
         })?;
-        info!("Logged out user with ID: {}.", session.get_user_id());
+        info!("Logged out user with ID: {}.", auth.user_id());
         session.clear_user_id();
         sender.send_empty_ok_response().await?;
         Ok(HandlerResult::Finished)
diff --git 
a/core/server/src/binary/handlers/users/update_permissions_handler.rs 
b/core/server/src/binary/handlers/users/update_permissions_handler.rs
index 30829a840..ecc45192d 100644
--- a/core/server/src/binary/handlers/users/update_permissions_handler.rs
+++ b/core/server/src/binary/handlers/users/update_permissions_handler.rs
@@ -19,32 +19,32 @@
 use std::rc::Rc;
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::users::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
-
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::state::command::EntryCommand;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
-use anyhow::Result;
 use err_trail::ErrContext;
 use iggy_common::update_permissions::UpdatePermissions;
 use iggy_common::{IggyError, SenderKind};
 use tracing::info;
 use tracing::{debug, instrument};
 
-impl ServerCommandHandler for UpdatePermissions {
+impl AuthenticatedHandler for UpdatePermissions {
     fn code(&self) -> u32 {
         iggy_common::UPDATE_PERMISSIONS_CODE
     }
 
-    #[instrument(skip_all, name = "trace_update_permissions", 
fields(iggy_user_id = session.get_user_id(), iggy_client_id = 
session.client_id))]
+    #[instrument(skip_all, name = "trace_update_permissions", 
fields(iggy_user_id = auth.user_id(), iggy_client_id = session.client_id))]
     async fn handle(
         self,
         sender: &mut SenderKind,
         _length: u32,
+        auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
@@ -64,10 +64,7 @@ impl ServerCommandHandler for UpdatePermissions {
 
         shard
             .state
-            .apply(
-                session.get_user_id(),
-                &EntryCommand::UpdatePermissions(self),
-            )
+            .apply(auth.user_id(), &EntryCommand::UpdatePermissions(self))
             .await?;
         sender.send_empty_ok_response().await?;
         Ok(HandlerResult::Finished)
diff --git a/core/server/src/binary/handlers/users/update_user_handler.rs 
b/core/server/src/binary/handlers/users/update_user_handler.rs
index bdb929136..73dc72e22 100644
--- a/core/server/src/binary/handlers/users/update_user_handler.rs
+++ b/core/server/src/binary/handlers/users/update_user_handler.rs
@@ -19,32 +19,32 @@
 use std::rc::Rc;
 
 use crate::binary::command::{
-    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+    AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand,
 };
 use crate::binary::handlers::users::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
-
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::state::command::EntryCommand;
+use crate::streaming::auth::Auth;
 use crate::streaming::session::Session;
-use anyhow::Result;
 use err_trail::ErrContext;
 use iggy_common::update_user::UpdateUser;
 use iggy_common::{IggyError, SenderKind};
 use tracing::info;
 use tracing::{debug, instrument};
 
-impl ServerCommandHandler for UpdateUser {
+impl AuthenticatedHandler for UpdateUser {
     fn code(&self) -> u32 {
         iggy_common::UPDATE_USER_CODE
     }
 
-    #[instrument(skip_all, name = "trace_update_user", fields(iggy_user_id = 
session.get_user_id(), iggy_client_id = session.client_id))]
+    #[instrument(skip_all, name = "trace_update_user", fields(iggy_user_id = 
auth.user_id(), iggy_client_id = session.client_id))]
     async fn handle(
         self,
         sender: &mut SenderKind,
         _length: u32,
+        auth: Auth,
         session: &Session,
         shard: &Rc<IggyShard>,
     ) -> Result<HandlerResult, IggyError> {
@@ -76,7 +76,7 @@ impl ServerCommandHandler for UpdateUser {
         let user_id = self.user_id.clone();
         shard
             .state
-            .apply(session.get_user_id(), &EntryCommand::UpdateUser(self))
+            .apply(auth.user_id(), &EntryCommand::UpdateUser(self))
             .await
             .error(|e: &IggyError| {
                 format!(
diff --git a/core/server/src/binary/macros.rs b/core/server/src/binary/macros.rs
index a56c0bee0..5d80b01ca 100644
--- a/core/server/src/binary/macros.rs
+++ b/core/server/src/binary/macros.rs
@@ -16,37 +16,54 @@
  * under the License.
  */
 
-/// This macro does 4 expansions in one go:
-///
-/// 1) The `#[enum_dispatch(ServerCommandHandler)] pub enum ServerCommand`
-///    with all variants.
-/// 2) The `from_code_and_payload(code, payload)` function that matches each 
code.
-/// 3) The `to_bytes()` function that matches each variant.
-/// 4) The `validate()` function that matches each variant.
+/// Generates `ServerCommand` enum with `@unauth`/`@auth` split and 
`dispatch()` method.
+/// For `@auth` commands, `dispatch()` calls `shard.auth(session)?` before 
handler.
 #[macro_export]
 macro_rules! define_server_command_enum {
     (
-                $(
-            // Macro pattern:
-            // variant name     inner type    numeric code   display string   
show_payload?
-            $variant:ident ( $ty:ty ), $code:ident, $display_str:expr, 
$show_payload:expr
-        );* $(;)?
+        @unauth {
+            $(
+                $unauth_variant:ident ( $unauth_ty:ty ), $unauth_code:ident, 
$unauth_display:expr, $unauth_show_payload:expr
+            );* $(;)?
+        }
+        @auth {
+            $(
+                $auth_variant:ident ( $auth_ty:ty ), $auth_code:ident, 
$auth_display:expr, $auth_show_payload:expr
+            );* $(;)?
+        }
     ) => {
-        #[enum_dispatch(ServerCommandHandler)]
         #[derive(Debug, PartialEq, EnumString)]
         pub enum ServerCommand {
             $(
-                $variant($ty),
+                $unauth_variant($unauth_ty),
+            )*
+            $(
+                $auth_variant($auth_ty),
             )*
         }
 
         impl ServerCommand {
-            /// Constructs a `ServerCommand` from its numeric code and payload.
+            pub fn code(&self) -> u32 {
+                match self {
+                    $(
+                        ServerCommand::$unauth_variant(_) => $unauth_code,
+                    )*
+                    $(
+                        ServerCommand::$auth_variant(_) => $auth_code,
+                    )*
+                }
+            }
+
             pub fn from_code_and_payload(code: u32, payload: Bytes) -> 
Result<Self, IggyError> {
                 match code {
                     $(
-                        $code => Ok(ServerCommand::$variant(
-                            <$ty>::from_bytes(payload)?
+                        $unauth_code => Ok(ServerCommand::$unauth_variant(
+                            <$unauth_ty>::from_bytes(payload)?
+                        )),
+                    )*
+                    $(
+                        $auth_code => Ok(ServerCommand::$auth_variant(
+                            <$auth_ty>::from_bytes(payload)?
                         )),
                     )*
                     _ => {
@@ -56,7 +73,6 @@ macro_rules! define_server_command_enum {
                 }
             }
 
-            /// Constructs a ServerCommand from its numeric code by reading 
from the provided async reader.
             pub async fn from_code_and_reader(
                 code: u32,
                 sender: &mut SenderKind,
@@ -64,28 +80,72 @@ macro_rules! define_server_command_enum {
             ) -> Result<Self, IggyError> {
                 match code {
                     $(
-                        $code => Ok(ServerCommand::$variant(
-                            <$ty as BinaryServerCommand>::from_sender(sender, 
code, length).await?
+                        $unauth_code => Ok(ServerCommand::$unauth_variant(
+                            <$unauth_ty as 
BinaryServerCommand>::from_sender(sender, code, length).await?
+                        )),
+                    )*
+                    $(
+                        $auth_code => Ok(ServerCommand::$auth_variant(
+                            <$auth_ty as 
BinaryServerCommand>::from_sender(sender, code, length).await?
                         )),
                     )*
                     _ => Err(IggyError::InvalidCommand),
                 }
             }
 
-            /// Converts the command into raw bytes.
             pub fn to_bytes(&self) -> Bytes {
                 match self {
                     $(
-                        ServerCommand::$variant(payload) => as_bytes(payload),
+                        ServerCommand::$unauth_variant(payload) => 
as_bytes(payload),
+                    )*
+                    $(
+                        ServerCommand::$auth_variant(payload) => 
as_bytes(payload),
                     )*
                 }
             }
 
-            /// Validate the command by delegating to the inner command’s 
implementation.
             pub fn validate(&self) -> Result<(), IggyError> {
                 match self {
                     $(
-                        ServerCommand::$variant(cmd) => <$ty as 
iggy_common::Validatable<iggy_common::IggyError>>::validate(cmd),
+                        ServerCommand::$unauth_variant(cmd) => <$unauth_ty as 
iggy_common::Validatable<iggy_common::IggyError>>::validate(cmd),
+                    )*
+                    $(
+                        ServerCommand::$auth_variant(cmd) => <$auth_ty as 
iggy_common::Validatable<iggy_common::IggyError>>::validate(cmd),
+                    )*
+                }
+            }
+
+            pub async fn dispatch(
+                self,
+                sender: &mut SenderKind,
+                length: u32,
+                session: &Session,
+                shard: &Rc<IggyShard>,
+            ) -> Result<HandlerResult, IggyError> {
+                match self {
+                    $(
+                        ServerCommand::$unauth_variant(cmd) => {
+                            <$unauth_ty as UnauthenticatedHandler>::handle(
+                                cmd,
+                                sender,
+                                length,
+                                session,
+                                shard,
+                            ).await
+                        }
+                    )*
+                    $(
+                        ServerCommand::$auth_variant(cmd) => {
+                            let auth = shard.auth(session)?;
+                            <$auth_ty as AuthenticatedHandler>::handle(
+                                cmd,
+                                sender,
+                                length,
+                                auth,
+                                session,
+                                shard,
+                            ).await
+                        }
                     )*
                 }
             }
@@ -96,13 +156,20 @@ macro_rules! define_server_command_enum {
             fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> 
std::fmt::Result {
                 match self {
                     $(
-                        // If $show_payload is true, we display the command 
name + payload;
-                        // otherwise, we just display the command name.
-                        ServerCommand::$variant(payload) => {
-                            if $show_payload {
-                                write!(formatter, "{}|{payload:?}", 
$display_str)
+                        ServerCommand::$unauth_variant(payload) => {
+                            if $unauth_show_payload {
+                                write!(formatter, "{}|{payload:?}", 
$unauth_display)
+                            } else {
+                                write!(formatter, "{}", $unauth_display)
+                            }
+                        },
+                    )*
+                    $(
+                        ServerCommand::$auth_variant(payload) => {
+                            if $auth_show_payload {
+                                write!(formatter, "{}|{payload:?}", 
$auth_display)
                             } else {
-                                write!(formatter, "{}", $display_str)
+                                write!(formatter, "{}", $auth_display)
                             }
                         },
                     )*
@@ -110,5 +177,4 @@ macro_rules! define_server_command_enum {
             }
         }
     };
-
 }
diff --git a/core/server/src/quic/listener.rs b/core/server/src/quic/listener.rs
index 67a2d4937..78a5331de 100644
--- a/core/server/src/quic/listener.rs
+++ b/core/server/src/quic/listener.rs
@@ -16,7 +16,7 @@
  * under the License.
  */
 
-use crate::binary::command::{ServerCommand, ServerCommandHandler};
+use crate::binary::command::ServerCommand;
 use crate::server_error::ConnectionError;
 use crate::shard::IggyShard;
 use crate::shard::task_registry::ShutdownToken;
@@ -187,7 +187,7 @@ async fn handle_stream(
 
     trace!("Received a QUIC command: {command}, payload size: {length}");
 
-    match command.handle(&mut sender, length, session, &shard).await {
+    match command.dispatch(&mut sender, length, session, &shard).await {
         Ok(_) => {
             trace!(
                 "Command was handled successfully, session: {:?}. QUIC 
response was sent.",
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 8939e87b0..a1d86b2ec 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -295,4 +295,18 @@ impl IggyShard {
             Err(IggyError::Unauthenticated)
         }
     }
+
+    pub fn auth(&self, session: &Session) -> 
Result<crate::streaming::auth::Auth, IggyError> {
+        if !session.is_active() {
+            error!("{COMPONENT} - session is inactive, session: {session}");
+            return Err(IggyError::StaleClient);
+        }
+
+        if !session.is_authenticated() {
+            error!("{COMPONENT} - unauthenticated access attempt, session: 
{session}");
+            return Err(IggyError::Unauthenticated);
+        }
+
+        Ok(crate::streaming::auth::Auth::new(session.get_user_id()))
+    }
 }
diff --git a/core/server/src/streaming/auth.rs 
b/core/server/src/streaming/auth.rs
new file mode 100644
index 000000000..30e360543
--- /dev/null
+++ b/core/server/src/streaming/auth.rs
@@ -0,0 +1,80 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! Type-safe authentication proof system using proof-carrying code pattern.
+//!
+//! The [`Auth`] type can only be constructed via [`IggyShard::auth()`],
+//! ensuring any code holding an `Auth` value has passed authentication.
+
+use iggy_common::UserId;
+
+/// Proof of successful authentication.
+///
+/// # Invariants
+/// - `user_id` is always valid (never `u32::MAX`)
+/// - User was authenticated at construction time
+#[derive(Clone, Copy, Debug)]
+pub struct Auth {
+    user_id: UserId,
+    // Private field prevents external construction - only `Auth::new()` can 
create this.
+    _sealed: (),
+}
+
+impl Auth {
+    /// Only call after verifying user is authenticated.
+    #[inline]
+    pub(crate) fn new(user_id: UserId) -> Self {
+        debug_assert!(user_id != u32::MAX, "Auth created with invalid 
user_id");
+        Self {
+            user_id,
+            _sealed: (),
+        }
+    }
+
+    #[inline]
+    pub fn user_id(&self) -> UserId {
+        self.user_id
+    }
+}
+
+/// Sealed marker trait for authentication requirements.
+pub trait AuthRequirement: private::Sealed {
+    const NAME: &'static str;
+}
+
+/// Command requires authentication - handler receives [`Auth`] proof token.
+pub struct Authenticated;
+
+impl AuthRequirement for Authenticated {
+    const NAME: &'static str = "Authenticated";
+}
+
+impl private::Sealed for Authenticated {}
+
+/// Command does NOT require authentication (Ping, LoginUser, 
LoginWithPersonalAccessToken).
+pub struct Unauthenticated;
+
+impl AuthRequirement for Unauthenticated {
+    const NAME: &'static str = "Unauthenticated";
+}
+
+impl private::Sealed for Unauthenticated {}
+
+mod private {
+    pub trait Sealed {}
+}
diff --git a/core/server/src/streaming/mod.rs b/core/server/src/streaming/mod.rs
index a6141bd8b..d38d6d199 100644
--- a/core/server/src/streaming/mod.rs
+++ b/core/server/src/streaming/mod.rs
@@ -16,6 +16,7 @@
  * under the License.
  */
 
+pub mod auth;
 pub mod clients;
 pub mod deduplication;
 pub mod diagnostics;
diff --git a/core/server/src/tcp/connection_handler.rs 
b/core/server/src/tcp/connection_handler.rs
index d3a58db57..38819d08c 100644
--- a/core/server/src/tcp/connection_handler.rs
+++ b/core/server/src/tcp/connection_handler.rs
@@ -17,7 +17,6 @@
  */
 
 use crate::binary::command;
-use crate::binary::command::ServerCommandHandler;
 use crate::server_error::ConnectionError;
 use crate::shard::IggyShard;
 use crate::streaming::session::Session;
@@ -91,7 +90,7 @@ pub(crate) async fn handle_connection(
         let command = ServerCommand::from_code_and_reader(code, sender, length 
- 4).await?;
         debug!("Received a TCP command: {command}, payload size: {length}");
         let cmd_code = command.code();
-        match command.handle(sender, length, session, shard).await {
+        match command.dispatch(sender, length, session, shard).await {
             Ok(handler_result) => match handler_result {
                 command::HandlerResult::Finished => {
                     debug!(
diff --git a/core/server/src/websocket/connection_handler.rs 
b/core/server/src/websocket/connection_handler.rs
index 2ee97196f..f23e0911a 100644
--- a/core/server/src/websocket/connection_handler.rs
+++ b/core/server/src/websocket/connection_handler.rs
@@ -17,7 +17,6 @@
  */
 
 use crate::binary::command;
-use crate::binary::command::ServerCommandHandler;
 use crate::server_error::ConnectionError;
 use crate::shard::IggyShard;
 use crate::streaming::session::Session;
@@ -81,7 +80,7 @@ pub(crate) async fn handle_connection(
         let command = ServerCommand::from_code_and_reader(code, sender, length 
- 4).await?;
         debug!("Received a WebSocket command: {command}, payload size: 
{length}");
 
-        match command.handle(sender, length, session, shard).await {
+        match command.dispatch(sender, length, session, shard).await {
             Ok(_) => {
                 debug!(
                     "Command was handled successfully, session: {session}. 
WebSocket response was sent."

Reply via email to