This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch auth-refactor in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 059e784655631ceb4ea27196c4eff21ed0c82d65 Author: Hubert Gruszecki <[email protected]> AuthorDate: Fri Jan 9 12:48:42 2026 +0100 refactor(server): add type-safe auth with proof-carrying code pattern Introduce Auth proof type that can only be created via IggyShard::auth(), providing compile-time guarantees for handler authentication. --- core/server/src/binary/command.rs | 126 +++++++++++--------- .../cluster/get_cluster_metadata_handler.rs | 9 +- .../create_consumer_group_handler.rs | 11 +- .../delete_consumer_group_handler.rs | 12 +- .../consumer_groups/get_consumer_group_handler.rs | 10 +- .../consumer_groups/get_consumer_groups_handler.rs | 10 +- .../consumer_groups/join_consumer_group_handler.rs | 10 +- .../leave_consumer_group_handler.rs | 12 +- .../delete_consumer_offset_handler.rs | 7 +- .../get_consumer_offset_handler.rs | 7 +- .../store_consumer_offset_handler.rs | 7 +- .../messages/flush_unsaved_buffer_handler.rs | 11 +- .../handlers/messages/poll_messages_handler.rs | 9 +- .../handlers/messages/send_messages_handler.rs | 11 +- .../partitions/create_partitions_handler.rs | 12 +- .../partitions/delete_partitions_handler.rs | 12 +- .../create_personal_access_token_handler.rs | 12 +- .../delete_personal_access_token_handler.rs | 13 ++- .../get_personal_access_tokens_handler.rs | 6 +- .../login_with_personal_access_token_handler.rs | 5 +- .../handlers/segments/delete_segments_handler.rs | 13 +-- .../handlers/streams/create_stream_handler.rs | 16 +-- .../handlers/streams/delete_stream_handler.rs | 15 +-- .../binary/handlers/streams/get_stream_handler.rs | 12 +- .../binary/handlers/streams/get_streams_handler.rs | 12 +- .../handlers/streams/purge_stream_handler.rs | 12 +- .../handlers/streams/update_stream_handler.rs | 12 +- .../binary/handlers/system/get_client_handler.rs | 6 +- .../binary/handlers/system/get_clients_handler.rs | 6 +- .../src/binary/handlers/system/get_me_handler.rs | 6 +- .../src/binary/handlers/system/get_snapshot.rs | 6 +- .../binary/handlers/system/get_stats_handler.rs | 8 +- .../src/binary/handlers/system/ping_handler.rs | 5 +- .../binary/handlers/topics/create_topic_handler.rs | 16 +-- .../binary/handlers/topics/delete_topic_handler.rs | 16 +-- .../binary/handlers/topics/get_topic_handler.rs | 10 +- .../binary/handlers/topics/get_topics_handler.rs | 10 +- .../binary/handlers/topics/purge_topic_handler.rs | 11 +- .../binary/handlers/topics/update_topic_handler.rs | 16 +-- .../handlers/users/change_password_handler.rs | 12 +- .../binary/handlers/users/create_user_handler.rs | 15 +-- .../binary/handlers/users/delete_user_handler.rs | 12 +- .../src/binary/handlers/users/get_user_handler.rs | 6 +- .../src/binary/handlers/users/get_users_handler.rs | 6 +- .../binary/handlers/users/login_user_handler.rs | 5 +- .../binary/handlers/users/logout_user_handler.rs | 14 +-- .../handlers/users/update_permissions_handler.rs | 15 +-- .../binary/handlers/users/update_user_handler.rs | 12 +- core/server/src/binary/macros.rs | 128 ++++++++++++++++----- core/server/src/quic/listener.rs | 4 +- core/server/src/shard/mod.rs | 14 +++ core/server/src/streaming/auth.rs | 80 +++++++++++++ core/server/src/streaming/mod.rs | 1 + core/server/src/tcp/connection_handler.rs | 3 +- core/server/src/websocket/connection_handler.rs | 3 +- 55 files changed, 521 insertions(+), 329 deletions(-) diff --git a/core/server/src/binary/command.rs b/core/server/src/binary/command.rs index e10fe6ab8..4d4c2b9dd 100644 --- a/core/server/src/binary/command.rs +++ b/core/server/src/binary/command.rs @@ -18,9 +18,9 @@ use crate::define_server_command_enum; use crate::shard::IggyShard; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; use bytes::{BufMut, Bytes, BytesMut}; -use enum_dispatch::enum_dispatch; use iggy_common::SenderKind; use iggy_common::change_password::ChangePassword; use iggy_common::create_consumer_group::CreateConsumerGroup; @@ -72,70 +72,83 @@ use strum::EnumString; use tracing::error; define_server_command_enum! { - Ping(Ping), PING_CODE, PING, false; - GetStats(GetStats), GET_STATS_CODE, GET_STATS, false; - GetMe(GetMe), GET_ME_CODE, GET_ME, false; - GetClient(GetClient), GET_CLIENT_CODE, GET_CLIENT, true; - GetClients(GetClients), GET_CLIENTS_CODE, GET_CLIENTS, false; - GetSnapshot(GetSnapshot), GET_SNAPSHOT_FILE_CODE, GET_SNAPSHOT_FILE, false; - GetClusterMetadata(GetClusterMetadata), GET_CLUSTER_METADATA_CODE, GET_CLUSTER_METADATA, false; - PollMessages(PollMessages), POLL_MESSAGES_CODE, POLL_MESSAGES, true; - FlushUnsavedBuffer(FlushUnsavedBuffer), FLUSH_UNSAVED_BUFFER_CODE, FLUSH_UNSAVED_BUFFER, true; - GetUser(GetUser), GET_USER_CODE, GET_USER, true; - GetUsers(GetUsers), GET_USERS_CODE, GET_USERS, false; - CreateUser(CreateUser), CREATE_USER_CODE, CREATE_USER, true; - DeleteUser(DeleteUser), DELETE_USER_CODE, DELETE_USER, true; - UpdateUser(UpdateUser), UPDATE_USER_CODE, UPDATE_USER, true; - UpdatePermissions(UpdatePermissions), UPDATE_PERMISSIONS_CODE, UPDATE_PERMISSIONS, true; - ChangePassword(ChangePassword), CHANGE_PASSWORD_CODE, CHANGE_PASSWORD, true; - LoginUser(LoginUser), LOGIN_USER_CODE, LOGIN_USER, true; - LogoutUser(LogoutUser), LOGOUT_USER_CODE, LOGOUT_USER, false; - GetPersonalAccessTokens(GetPersonalAccessTokens), GET_PERSONAL_ACCESS_TOKENS_CODE, GET_PERSONAL_ACCESS_TOKENS, false; - CreatePersonalAccessToken(CreatePersonalAccessToken), CREATE_PERSONAL_ACCESS_TOKEN_CODE, CREATE_PERSONAL_ACCESS_TOKEN, true; - DeletePersonalAccessToken(DeletePersonalAccessToken), DELETE_PERSONAL_ACCESS_TOKEN_CODE, DELETE_PERSONAL_ACCESS_TOKEN, false; - LoginWithPersonalAccessToken(LoginWithPersonalAccessToken), LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE, LOGIN_WITH_PERSONAL_ACCESS_TOKEN, true; - SendMessages(SendMessages), SEND_MESSAGES_CODE, SEND_MESSAGES, false; - GetConsumerOffset(GetConsumerOffset), GET_CONSUMER_OFFSET_CODE, GET_CONSUMER_OFFSET, true; - StoreConsumerOffset(StoreConsumerOffset), STORE_CONSUMER_OFFSET_CODE, STORE_CONSUMER_OFFSET, true; - DeleteConsumerOffset(DeleteConsumerOffset), DELETE_CONSUMER_OFFSET_CODE, DELETE_CONSUMER_OFFSET, true; - GetStream(GetStream), GET_STREAM_CODE, GET_STREAM, true; - GetStreams(GetStreams), GET_STREAMS_CODE, GET_STREAMS, false; - CreateStream(CreateStream), CREATE_STREAM_CODE, CREATE_STREAM, true; - DeleteStream(DeleteStream), DELETE_STREAM_CODE, DELETE_STREAM, true; - UpdateStream(UpdateStream), UPDATE_STREAM_CODE, UPDATE_STREAM, true; - PurgeStream(PurgeStream), PURGE_STREAM_CODE, PURGE_STREAM, true; - GetTopic(GetTopic), GET_TOPIC_CODE, GET_TOPIC, true; - GetTopics(GetTopics), GET_TOPICS_CODE, GET_TOPICS, false; - CreateTopic(CreateTopic), CREATE_TOPIC_CODE, CREATE_TOPIC, true; - DeleteTopic(DeleteTopic), DELETE_TOPIC_CODE, DELETE_TOPIC, true; - UpdateTopic(UpdateTopic), UPDATE_TOPIC_CODE, UPDATE_TOPIC, true; - PurgeTopic(PurgeTopic), PURGE_TOPIC_CODE, PURGE_TOPIC, true; - CreatePartitions(CreatePartitions), CREATE_PARTITIONS_CODE, CREATE_PARTITIONS, true; - DeletePartitions(DeletePartitions), DELETE_PARTITIONS_CODE, DELETE_PARTITIONS, true; - DeleteSegments(DeleteSegments), DELETE_SEGMENTS_CODE, DELETE_SEGMENTS, true; - GetConsumerGroup(GetConsumerGroup), GET_CONSUMER_GROUP_CODE, GET_CONSUMER_GROUP, true; - GetConsumerGroups(GetConsumerGroups), GET_CONSUMER_GROUPS_CODE, GET_CONSUMER_GROUPS, false; - CreateConsumerGroup(CreateConsumerGroup), CREATE_CONSUMER_GROUP_CODE, CREATE_CONSUMER_GROUP, true; - DeleteConsumerGroup(DeleteConsumerGroup), DELETE_CONSUMER_GROUP_CODE, DELETE_CONSUMER_GROUP, true; - JoinConsumerGroup(JoinConsumerGroup), JOIN_CONSUMER_GROUP_CODE, JOIN_CONSUMER_GROUP, true; - LeaveConsumerGroup(LeaveConsumerGroup), LEAVE_CONSUMER_GROUP_CODE, LEAVE_CONSUMER_GROUP, true; + @unauth { + Ping(Ping), PING_CODE, PING, false; + LoginUser(LoginUser), LOGIN_USER_CODE, LOGIN_USER, true; + LoginWithPersonalAccessToken(LoginWithPersonalAccessToken), LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE, LOGIN_WITH_PERSONAL_ACCESS_TOKEN, true; + } + @auth { + GetStats(GetStats), GET_STATS_CODE, GET_STATS, false; + GetMe(GetMe), GET_ME_CODE, GET_ME, false; + GetClient(GetClient), GET_CLIENT_CODE, GET_CLIENT, true; + GetClients(GetClients), GET_CLIENTS_CODE, GET_CLIENTS, false; + GetSnapshot(GetSnapshot), GET_SNAPSHOT_FILE_CODE, GET_SNAPSHOT_FILE, false; + GetClusterMetadata(GetClusterMetadata), GET_CLUSTER_METADATA_CODE, GET_CLUSTER_METADATA, false; + PollMessages(PollMessages), POLL_MESSAGES_CODE, POLL_MESSAGES, true; + FlushUnsavedBuffer(FlushUnsavedBuffer), FLUSH_UNSAVED_BUFFER_CODE, FLUSH_UNSAVED_BUFFER, true; + GetUser(GetUser), GET_USER_CODE, GET_USER, true; + GetUsers(GetUsers), GET_USERS_CODE, GET_USERS, false; + CreateUser(CreateUser), CREATE_USER_CODE, CREATE_USER, true; + DeleteUser(DeleteUser), DELETE_USER_CODE, DELETE_USER, true; + UpdateUser(UpdateUser), UPDATE_USER_CODE, UPDATE_USER, true; + UpdatePermissions(UpdatePermissions), UPDATE_PERMISSIONS_CODE, UPDATE_PERMISSIONS, true; + ChangePassword(ChangePassword), CHANGE_PASSWORD_CODE, CHANGE_PASSWORD, true; + LogoutUser(LogoutUser), LOGOUT_USER_CODE, LOGOUT_USER, false; + GetPersonalAccessTokens(GetPersonalAccessTokens), GET_PERSONAL_ACCESS_TOKENS_CODE, GET_PERSONAL_ACCESS_TOKENS, false; + CreatePersonalAccessToken(CreatePersonalAccessToken), CREATE_PERSONAL_ACCESS_TOKEN_CODE, CREATE_PERSONAL_ACCESS_TOKEN, true; + DeletePersonalAccessToken(DeletePersonalAccessToken), DELETE_PERSONAL_ACCESS_TOKEN_CODE, DELETE_PERSONAL_ACCESS_TOKEN, false; + SendMessages(SendMessages), SEND_MESSAGES_CODE, SEND_MESSAGES, false; + GetConsumerOffset(GetConsumerOffset), GET_CONSUMER_OFFSET_CODE, GET_CONSUMER_OFFSET, true; + StoreConsumerOffset(StoreConsumerOffset), STORE_CONSUMER_OFFSET_CODE, STORE_CONSUMER_OFFSET, true; + DeleteConsumerOffset(DeleteConsumerOffset), DELETE_CONSUMER_OFFSET_CODE, DELETE_CONSUMER_OFFSET, true; + GetStream(GetStream), GET_STREAM_CODE, GET_STREAM, true; + GetStreams(GetStreams), GET_STREAMS_CODE, GET_STREAMS, false; + CreateStream(CreateStream), CREATE_STREAM_CODE, CREATE_STREAM, true; + DeleteStream(DeleteStream), DELETE_STREAM_CODE, DELETE_STREAM, true; + UpdateStream(UpdateStream), UPDATE_STREAM_CODE, UPDATE_STREAM, true; + PurgeStream(PurgeStream), PURGE_STREAM_CODE, PURGE_STREAM, true; + GetTopic(GetTopic), GET_TOPIC_CODE, GET_TOPIC, true; + GetTopics(GetTopics), GET_TOPICS_CODE, GET_TOPICS, false; + CreateTopic(CreateTopic), CREATE_TOPIC_CODE, CREATE_TOPIC, true; + DeleteTopic(DeleteTopic), DELETE_TOPIC_CODE, DELETE_TOPIC, true; + UpdateTopic(UpdateTopic), UPDATE_TOPIC_CODE, UPDATE_TOPIC, true; + PurgeTopic(PurgeTopic), PURGE_TOPIC_CODE, PURGE_TOPIC, true; + CreatePartitions(CreatePartitions), CREATE_PARTITIONS_CODE, CREATE_PARTITIONS, true; + DeletePartitions(DeletePartitions), DELETE_PARTITIONS_CODE, DELETE_PARTITIONS, true; + DeleteSegments(DeleteSegments), DELETE_SEGMENTS_CODE, DELETE_SEGMENTS, true; + GetConsumerGroup(GetConsumerGroup), GET_CONSUMER_GROUP_CODE, GET_CONSUMER_GROUP, true; + GetConsumerGroups(GetConsumerGroups), GET_CONSUMER_GROUPS_CODE, GET_CONSUMER_GROUPS, false; + CreateConsumerGroup(CreateConsumerGroup), CREATE_CONSUMER_GROUP_CODE, CREATE_CONSUMER_GROUP, true; + DeleteConsumerGroup(DeleteConsumerGroup), DELETE_CONSUMER_GROUP_CODE, DELETE_CONSUMER_GROUP, true; + JoinConsumerGroup(JoinConsumerGroup), JOIN_CONSUMER_GROUP_CODE, JOIN_CONSUMER_GROUP, true; + LeaveConsumerGroup(LeaveConsumerGroup), LEAVE_CONSUMER_GROUP_CODE, LEAVE_CONSUMER_GROUP, true; + } } -/// Indicates whether a command handler completed normally or migrated the connection. pub enum HandlerResult { - /// Command completed, connection stays on current shard. Finished, - - /// Connection was migrated to another shard. Source shard should exit without cleanup. Migrated { to_shard: u16 }, } -#[enum_dispatch] -pub trait ServerCommandHandler { - /// Return the command code +/// Handler for commands requiring authentication - receives [`Auth`] proof token. +pub trait AuthenticatedHandler { + fn code(&self) -> u32; + + #[allow(async_fn_in_trait)] + async fn handle( + self, + sender: &mut SenderKind, + length: u32, + auth: Auth, + session: &Session, + shard: &Rc<IggyShard>, + ) -> Result<HandlerResult, IggyError>; +} + +/// Handler for commands not requiring authentication (Ping, LoginUser, LoginWithPersonalAccessToken). +pub trait UnauthenticatedHandler { fn code(&self) -> u32; - /// Handle the command execution #[allow(async_fn_in_trait)] async fn handle( self, @@ -147,7 +160,6 @@ pub trait ServerCommandHandler { } pub trait BinaryServerCommand { - /// Parse command from sender #[allow(async_fn_in_trait)] async fn from_sender( sender: &mut SenderKind, diff --git a/core/server/src/binary/handlers/cluster/get_cluster_metadata_handler.rs b/core/server/src/binary/handlers/cluster/get_cluster_metadata_handler.rs index af6c7a9a0..babd2e692 100644 --- a/core/server/src/binary/handlers/cluster/get_cluster_metadata_handler.rs +++ b/core/server/src/binary/handlers/cluster/get_cluster_metadata_handler.rs @@ -19,27 +19,28 @@ use std::rc::Rc; use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::utils::receive_and_validate; use crate::shard::IggyShard; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; //use crate::streaming::systems::system::SharedSystem; -use anyhow::Result; use iggy_common::get_cluster_metadata::GetClusterMetadata; use iggy_common::{BytesSerializable, IggyError, SenderKind}; use tracing::{debug, instrument}; -impl ServerCommandHandler for GetClusterMetadata { +impl AuthenticatedHandler for GetClusterMetadata { fn code(&self) -> u32 { iggy_common::GET_CLUSTER_METADATA_CODE } - #[instrument(skip_all, name = "trace_get_cluster_metadata", fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id))] + #[instrument(skip_all, name = "trace_get_cluster_metadata", fields(iggy_user_id = auth.user_id(), iggy_client_id = session.client_id))] async fn handle( self, sender: &mut SenderKind, _length: u32, + auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { diff --git a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs index 192d867f5..aebec3452 100644 --- a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs +++ b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs @@ -17,7 +17,7 @@ */ use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::consumer_groups::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; @@ -27,24 +27,25 @@ use crate::shard::transmission::event::ShardEvent; use crate::slab::traits_ext::EntityMarker; use crate::state::command::EntryCommand; use crate::state::models::CreateConsumerGroupWithId; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; -use anyhow::Result; use err_trail::ErrContext; use iggy_common::create_consumer_group::CreateConsumerGroup; use iggy_common::{Identifier, IggyError, SenderKind}; use std::rc::Rc; use tracing::{debug, instrument}; -impl ServerCommandHandler for CreateConsumerGroup { +impl AuthenticatedHandler for CreateConsumerGroup { fn code(&self) -> u32 { iggy_common::CREATE_CONSUMER_GROUP_CODE } - #[instrument(skip_all, name = "trace_create_consumer_group", fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = self.topic_id.as_string()))] + #[instrument(skip_all, name = "trace_create_consumer_group", fields(iggy_user_id = auth.user_id(), iggy_client_id = session.client_id, iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = self.topic_id.as_string()))] async fn handle( self, sender: &mut SenderKind, _length: u32, + auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { @@ -69,7 +70,7 @@ impl ServerCommandHandler for CreateConsumerGroup { shard .state .apply( - session.get_user_id(), + auth.user_id(), &EntryCommand::CreateConsumerGroup(CreateConsumerGroupWithId { group_id: cg_id as u32, command: self diff --git a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs index 05c3cd9de..f8d06392d 100644 --- a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs +++ b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs @@ -17,34 +17,34 @@ */ use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::consumer_groups::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; - use crate::shard::IggyShard; use crate::shard::transmission::event::ShardEvent; use crate::slab::traits_ext::EntityMarker; use crate::state::command::EntryCommand; +use crate::streaming::auth::Auth; use crate::streaming::polling_consumer::ConsumerGroupId; use crate::streaming::session::Session; -use anyhow::Result; use err_trail::ErrContext; use iggy_common::delete_consumer_group::DeleteConsumerGroup; use iggy_common::{IggyError, SenderKind}; use std::rc::Rc; use tracing::{debug, instrument}; -impl ServerCommandHandler for DeleteConsumerGroup { +impl AuthenticatedHandler for DeleteConsumerGroup { fn code(&self) -> u32 { iggy_common::DELETE_CONSUMER_GROUP_CODE } - #[instrument(skip_all, name = "trace_delete_consumer_group", fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = self.topic_id.as_string()))] + #[instrument(skip_all, name = "trace_delete_consumer_group", fields(iggy_user_id = auth.user_id(), iggy_client_id = session.client_id, iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = self.topic_id.as_string()))] async fn handle( self, sender: &mut SenderKind, _length: u32, + auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { @@ -114,7 +114,7 @@ impl ServerCommandHandler for DeleteConsumerGroup { shard .state .apply( - session.get_user_id(), + auth.user_id(), &EntryCommand::DeleteConsumerGroup(self), ) .await diff --git a/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs b/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs index 7156a4d53..0d578e4c9 100644 --- a/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs +++ b/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs @@ -17,21 +17,21 @@ */ use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::shard::IggyShard; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; use crate::streaming::{streams, topics}; -use anyhow::Result; use iggy_common::IggyError; use iggy_common::SenderKind; use iggy_common::get_consumer_group::GetConsumerGroup; use std::rc::Rc; use tracing::debug; -impl ServerCommandHandler for GetConsumerGroup { +impl AuthenticatedHandler for GetConsumerGroup { fn code(&self) -> u32 { iggy_common::GET_CONSUMER_GROUP_CODE } @@ -40,11 +40,11 @@ impl ServerCommandHandler for GetConsumerGroup { self, sender: &mut SenderKind, _length: u32, + auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); - shard.ensure_authenticated(session)?; let exists = shard .ensure_consumer_group_exists(&self.stream_id, &self.topic_id, &self.group_id) .is_ok(); @@ -63,7 +63,7 @@ impl ServerCommandHandler for GetConsumerGroup { let has_permission = shard .permissioner .borrow() - .get_consumer_group(session.get_user_id(), numeric_stream_id, numeric_topic_id) + .get_consumer_group(auth.user_id(), numeric_stream_id, numeric_topic_id) .is_ok(); if !has_permission { sender.send_empty_ok_response().await?; diff --git a/core/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs b/core/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs index d59f33c97..644cc7a28 100644 --- a/core/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs +++ b/core/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs @@ -17,22 +17,22 @@ */ use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::shard::IggyShard; use crate::slab::traits_ext::{EntityComponentSystem, IntoComponents}; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; use crate::streaming::{streams, topics}; -use anyhow::Result; use iggy_common::IggyError; use iggy_common::SenderKind; use iggy_common::get_consumer_groups::GetConsumerGroups; use std::rc::Rc; use tracing::debug; -impl ServerCommandHandler for GetConsumerGroups { +impl AuthenticatedHandler for GetConsumerGroups { fn code(&self) -> u32 { iggy_common::GET_CONSUMER_GROUPS_CODE } @@ -41,11 +41,11 @@ impl ServerCommandHandler for GetConsumerGroups { self, sender: &mut SenderKind, _length: u32, + auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); - shard.ensure_authenticated(session)?; shard.ensure_topic_exists(&self.stream_id, &self.topic_id)?; let numeric_topic_id = shard.streams.with_topic_by_id( &self.stream_id, @@ -56,7 +56,7 @@ impl ServerCommandHandler for GetConsumerGroups { .streams .with_stream_by_id(&self.stream_id, streams::helpers::get_stream_id()); shard.permissioner.borrow().get_consumer_groups( - session.get_user_id(), + auth.user_id(), numeric_stream_id, numeric_topic_id, )?; diff --git a/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs b/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs index cc9a8e27b..251c7996e 100644 --- a/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs +++ b/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs @@ -17,30 +17,30 @@ */ use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::consumer_groups::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; - use crate::shard::IggyShard; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; -use anyhow::Result; use err_trail::ErrContext; use iggy_common::join_consumer_group::JoinConsumerGroup; use iggy_common::{IggyError, SenderKind}; use std::rc::Rc; use tracing::{debug, instrument}; -impl ServerCommandHandler for JoinConsumerGroup { +impl AuthenticatedHandler for JoinConsumerGroup { fn code(&self) -> u32 { iggy_common::JOIN_CONSUMER_GROUP_CODE } - #[instrument(skip_all, name = "trace_join_consumer_group", fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = self.topic_id.as_string(), iggy_group_id = self.group_id.as_string()))] + #[instrument(skip_all, name = "trace_join_consumer_group", fields(iggy_user_id = auth.user_id(), iggy_client_id = session.client_id, iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = self.topic_id.as_string(), iggy_group_id = self.group_id.as_string()))] async fn handle( self, sender: &mut SenderKind, _length: u32, + auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { diff --git a/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs b/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs index b56d94a48..eaaab37be 100644 --- a/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs +++ b/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs @@ -18,30 +18,30 @@ use super::COMPONENT; use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::utils::receive_and_validate; -use iggy_common::SenderKind; - use crate::shard::IggyShard; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; -use anyhow::Result; use err_trail::ErrContext; use iggy_common::IggyError; +use iggy_common::SenderKind; use iggy_common::leave_consumer_group::LeaveConsumerGroup; use std::rc::Rc; use tracing::{debug, instrument}; -impl ServerCommandHandler for LeaveConsumerGroup { +impl AuthenticatedHandler for LeaveConsumerGroup { fn code(&self) -> u32 { iggy_common::LEAVE_CONSUMER_GROUP_CODE } - #[instrument(skip_all, name = "trace_leave_consumer_group", fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = self.topic_id.as_string(), iggy_group_id = self.group_id.as_string()))] + #[instrument(skip_all, name = "trace_leave_consumer_group", fields(iggy_user_id = auth.user_id(), iggy_client_id = session.client_id, iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = self.topic_id.as_string(), iggy_group_id = self.group_id.as_string()))] async fn handle( self, sender: &mut SenderKind, _length: u32, + auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { diff --git a/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs b/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs index 45f22349d..e0414c85a 100644 --- a/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs +++ b/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs @@ -17,20 +17,20 @@ */ use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::consumer_offsets::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::shard::IggyShard; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; -use anyhow::Result; use err_trail::ErrContext; use iggy_common::delete_consumer_offset::DeleteConsumerOffset; use iggy_common::{IggyError, SenderKind}; use std::rc::Rc; use tracing::debug; -impl ServerCommandHandler for DeleteConsumerOffset { +impl AuthenticatedHandler for DeleteConsumerOffset { fn code(&self) -> u32 { iggy_common::DELETE_CONSUMER_OFFSET_CODE } @@ -39,6 +39,7 @@ impl ServerCommandHandler for DeleteConsumerOffset { self, sender: &mut SenderKind, _length: u32, + _auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { diff --git a/core/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs b/core/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs index 29ee6f310..868c2a237 100644 --- a/core/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs +++ b/core/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs @@ -17,20 +17,20 @@ */ use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::shard::IggyShard; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; -use anyhow::Result; use iggy_common::IggyError; use iggy_common::SenderKind; use iggy_common::get_consumer_offset::GetConsumerOffset; use std::rc::Rc; use tracing::debug; -impl ServerCommandHandler for GetConsumerOffset { +impl AuthenticatedHandler for GetConsumerOffset { fn code(&self) -> u32 { iggy_common::GET_CONSUMER_OFFSET_CODE } @@ -39,6 +39,7 @@ impl ServerCommandHandler for GetConsumerOffset { self, sender: &mut SenderKind, _length: u32, + _auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { diff --git a/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs b/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs index dcbb79db3..4661f524a 100644 --- a/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs +++ b/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs @@ -19,20 +19,20 @@ use std::rc::Rc; use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::consumer_offsets::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::shard::IggyShard; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; -use anyhow::Result; use err_trail::ErrContext; use iggy_common::IggyError; use iggy_common::SenderKind; use iggy_common::store_consumer_offset::StoreConsumerOffset; use tracing::debug; -impl ServerCommandHandler for StoreConsumerOffset { +impl AuthenticatedHandler for StoreConsumerOffset { fn code(&self) -> u32 { iggy_common::STORE_CONSUMER_OFFSET_CODE } @@ -41,6 +41,7 @@ impl ServerCommandHandler for StoreConsumerOffset { self, sender: &mut SenderKind, _length: u32, + _auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { diff --git a/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs b/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs index 937a03cb3..8fe71f96c 100644 --- a/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs +++ b/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs @@ -17,34 +17,35 @@ */ use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::messages::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::shard::IggyShard; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; -use anyhow::Result; use err_trail::ErrContext; use iggy_common::{FlushUnsavedBuffer, IggyError, SenderKind}; use std::rc::Rc; use tracing::{debug, instrument}; -impl ServerCommandHandler for FlushUnsavedBuffer { +impl AuthenticatedHandler for FlushUnsavedBuffer { fn code(&self) -> u32 { iggy_common::FLUSH_UNSAVED_BUFFER_CODE } - #[instrument(skip_all, name = "trace_flush_unsaved_buffer", fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = self.topic_id.as_string(), iggy_partition_id = self.partition_id, iggy_fsync = self.fsync))] + #[instrument(skip_all, name = "trace_flush_unsaved_buffer", fields(iggy_user_id = auth.user_id(), iggy_client_id = session.client_id, iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = self.topic_id.as_string(), iggy_partition_id = self.partition_id, iggy_fsync = self.fsync))] async fn handle( self, sender: &mut SenderKind, _length: u32, + auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); - let user_id = session.get_user_id(); + let user_id = auth.user_id(); let stream_id = self.stream_id.clone(); let topic_id = self.topic_id.clone(); let partition_id = self.partition_id; diff --git a/core/server/src/binary/handlers/messages/poll_messages_handler.rs b/core/server/src/binary/handlers/messages/poll_messages_handler.rs index 0839e3a55..e508ae6c9 100644 --- a/core/server/src/binary/handlers/messages/poll_messages_handler.rs +++ b/core/server/src/binary/handlers/messages/poll_messages_handler.rs @@ -17,13 +17,13 @@ */ use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::utils::receive_and_validate; use crate::shard::IggyShard; use crate::shard::system::messages::PollingArgs; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; -use anyhow::Result; use iggy_common::SenderKind; use iggy_common::{IggyError, PollMessages, PooledBuffer}; use std::rc::Rc; @@ -44,7 +44,7 @@ impl IggyPollMetadata { } } -impl ServerCommandHandler for PollMessages { +impl AuthenticatedHandler for PollMessages { fn code(&self) -> u32 { iggy_common::POLL_MESSAGES_CODE } @@ -53,6 +53,7 @@ impl ServerCommandHandler for PollMessages { self, sender: &mut SenderKind, _length: u32, + auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { @@ -68,7 +69,7 @@ impl ServerCommandHandler for PollMessages { } = self; let args = PollingArgs::new(strategy, count, auto_commit); - let user_id = session.get_user_id(); + let user_id = auth.user_id(); let client_id = session.client_id; let (metadata, mut batch) = shard .poll_messages( diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs b/core/server/src/binary/handlers/messages/send_messages_handler.rs index 375848799..782525481 100644 --- a/core/server/src/binary/handlers/messages/send_messages_handler.rs +++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs @@ -16,13 +16,13 @@ * under the License. */ -use crate::binary::command::{BinaryServerCommand, HandlerResult, ServerCommandHandler}; +use crate::binary::command::{AuthenticatedHandler, BinaryServerCommand, HandlerResult}; use crate::shard::IggyShard; use crate::shard::transmission::message::{ShardMessage, ShardRequest, ShardRequestPayload}; +use crate::streaming::auth::Auth; use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut}; use crate::streaming::session::Session; use crate::streaming::{streams, topics}; -use anyhow::Result; use compio::buf::{IntoInner as _, IoBuf}; use iggy_common::Identifier; use iggy_common::PooledBuffer; @@ -34,13 +34,13 @@ use iggy_common::{IggyError, Partitioning, SendMessages, Validatable}; use std::rc::Rc; use tracing::{debug, error, info, instrument}; -impl ServerCommandHandler for SendMessages { +impl AuthenticatedHandler for SendMessages { fn code(&self) -> u32 { iggy_common::SEND_MESSAGES_CODE } #[instrument(skip_all, name = "trace_send_messages", fields( - iggy_user_id = session.get_user_id(), + iggy_user_id = auth.user_id(), iggy_client_id = session.client_id, iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = self.topic_id.as_string(), @@ -50,6 +50,7 @@ impl ServerCommandHandler for SendMessages { mut self, sender: &mut SenderKind, length: u32, + auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { @@ -150,7 +151,7 @@ impl ServerCommandHandler for SendMessages { )?; let namespace = IggyNamespace::new(numeric_stream_id, numeric_topic_id, partition_id); - let user_id = session.get_user_id(); + let user_id = auth.user_id(); let unsupport_socket_transfer = matches!( self.partitioning.kind, PartitioningKind::Balanced | PartitioningKind::MessagesKey diff --git a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs index 5c1930a8a..43e6d8119 100644 --- a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs +++ b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs @@ -17,34 +17,34 @@ */ use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::partitions::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; - use crate::shard::IggyShard; use crate::shard::transmission::event::ShardEvent; use crate::slab::traits_ext::EntityMarker; use crate::state::command::EntryCommand; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; use crate::streaming::{streams, topics}; -use anyhow::Result; use err_trail::ErrContext; use iggy_common::create_partitions::CreatePartitions; use iggy_common::{IggyError, SenderKind}; use std::rc::Rc; use tracing::{debug, instrument}; -impl ServerCommandHandler for CreatePartitions { +impl AuthenticatedHandler for CreatePartitions { fn code(&self) -> u32 { iggy_common::CREATE_PARTITIONS_CODE } - #[instrument(skip_all, name = "trace_create_partitions", fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = self.topic_id.as_string()))] + #[instrument(skip_all, name = "trace_create_partitions", fields(iggy_user_id = auth.user_id(), iggy_client_id = session.client_id, iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = self.topic_id.as_string()))] async fn handle( self, sender: &mut SenderKind, _length: u32, + auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { @@ -86,7 +86,7 @@ impl ServerCommandHandler for CreatePartitions { shard .state .apply( - session.get_user_id(), + auth.user_id(), &EntryCommand::CreatePartitions(self), ) .await diff --git a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs index 9090ec2c8..4d0daa84e 100644 --- a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs +++ b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs @@ -17,32 +17,32 @@ */ use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::partitions::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; - use crate::shard::IggyShard; use crate::shard::transmission::event::ShardEvent; use crate::state::command::EntryCommand; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; -use anyhow::Result; use err_trail::ErrContext; use iggy_common::delete_partitions::DeletePartitions; use iggy_common::{IggyError, SenderKind}; use std::rc::Rc; use tracing::{debug, instrument}; -impl ServerCommandHandler for DeletePartitions { +impl AuthenticatedHandler for DeletePartitions { fn code(&self) -> u32 { iggy_common::DELETE_PARTITIONS_CODE } - #[instrument(skip_all, name = "trace_delete_partitions", fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = self.topic_id.as_string()))] + #[instrument(skip_all, name = "trace_delete_partitions", fields(iggy_user_id = auth.user_id(), iggy_client_id = session.client_id, iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = self.topic_id.as_string()))] async fn handle( self, sender: &mut SenderKind, _length: u32, + auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { @@ -83,7 +83,7 @@ impl ServerCommandHandler for DeletePartitions { shard .state .apply( - session.get_user_id(), + auth.user_id(), &EntryCommand::DeletePartitions(self), ) .await diff --git a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs index 50b896fb0..6cf407ed4 100644 --- a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs +++ b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs @@ -17,34 +17,34 @@ */ use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::personal_access_tokens::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; - use crate::shard::IggyShard; use crate::shard::transmission::event::ShardEvent; use crate::state::command::EntryCommand; use crate::state::models::CreatePersonalAccessTokenWithHash; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; -use anyhow::Result; use err_trail::ErrContext; use iggy_common::create_personal_access_token::CreatePersonalAccessToken; use iggy_common::{IggyError, SenderKind}; use std::rc::Rc; use tracing::{debug, instrument}; -impl ServerCommandHandler for CreatePersonalAccessToken { +impl AuthenticatedHandler for CreatePersonalAccessToken { fn code(&self) -> u32 { iggy_common::CREATE_PERSONAL_ACCESS_TOKEN_CODE } - #[instrument(skip_all, name = "trace_create_personal_access_token", fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id))] + #[instrument(skip_all, name = "trace_create_personal_access_token", fields(iggy_user_id = auth.user_id(), iggy_client_id = session.client_id))] async fn handle( self, sender: &mut SenderKind, _length: u32, + auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { @@ -68,7 +68,7 @@ impl ServerCommandHandler for CreatePersonalAccessToken { shard .state .apply( - session.get_user_id(), + auth.user_id(), &EntryCommand::CreatePersonalAccessToken(CreatePersonalAccessTokenWithHash { command: CreatePersonalAccessToken { name: self.name.to_owned(), diff --git a/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs b/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs index bf89c70ca..38cef42f7 100644 --- a/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs +++ b/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs @@ -17,30 +17,31 @@ */ use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::personal_access_tokens::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::shard::IggyShard; use crate::state::command::EntryCommand; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; -use anyhow::Result; use err_trail::ErrContext; use iggy_common::delete_personal_access_token::DeletePersonalAccessToken; use iggy_common::{IggyError, SenderKind}; use std::rc::Rc; use tracing::{debug, instrument}; -impl ServerCommandHandler for DeletePersonalAccessToken { +impl AuthenticatedHandler for DeletePersonalAccessToken { fn code(&self) -> u32 { iggy_common::DELETE_PERSONAL_ACCESS_TOKEN_CODE } - #[instrument(skip_all, name = "trace_delete_personal_access_token", fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id))] + #[instrument(skip_all, name = "trace_delete_personal_access_token", fields(iggy_user_id = auth.user_id(), iggy_client_id = session.client_id))] async fn handle( self, sender: &mut SenderKind, _length: u32, + auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { @@ -55,7 +56,7 @@ impl ServerCommandHandler for DeletePersonalAccessToken { // Broadcast the event to other shards let event = crate::shard::transmission::event::ShardEvent::DeletedPersonalAccessToken { - user_id: session.get_user_id(), + user_id: auth.user_id(), name: self.name.clone(), }; shard.broadcast_event_to_all_shards(event).await?; @@ -63,7 +64,7 @@ impl ServerCommandHandler for DeletePersonalAccessToken { shard .state .apply( - session.get_user_id(), + auth.user_id(), &EntryCommand::DeletePersonalAccessToken(DeletePersonalAccessToken { name: self.name, }), diff --git a/core/server/src/binary/handlers/personal_access_tokens/get_personal_access_tokens_handler.rs b/core/server/src/binary/handlers/personal_access_tokens/get_personal_access_tokens_handler.rs index 9e2923868..35885c1a8 100644 --- a/core/server/src/binary/handlers/personal_access_tokens/get_personal_access_tokens_handler.rs +++ b/core/server/src/binary/handlers/personal_access_tokens/get_personal_access_tokens_handler.rs @@ -17,12 +17,13 @@ */ use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::personal_access_tokens::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::shard::IggyShard; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; use err_trail::ErrContext; use iggy_common::IggyError; @@ -31,7 +32,7 @@ use iggy_common::get_personal_access_tokens::GetPersonalAccessTokens; use std::rc::Rc; use tracing::debug; -impl ServerCommandHandler for GetPersonalAccessTokens { +impl AuthenticatedHandler for GetPersonalAccessTokens { fn code(&self) -> u32 { iggy_common::GET_PERSONAL_ACCESS_TOKENS_CODE } @@ -40,6 +41,7 @@ impl ServerCommandHandler for GetPersonalAccessTokens { self, sender: &mut SenderKind, _length: u32, + _auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { diff --git a/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs b/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs index 1603aaa6a..73914ffcd 100644 --- a/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs +++ b/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs @@ -19,19 +19,18 @@ use crate::shard::IggyShard; use std::rc::Rc; use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + BinaryServerCommand, HandlerResult, ServerCommand, UnauthenticatedHandler, }; use crate::binary::handlers::personal_access_tokens::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::streaming::session::Session; -use anyhow::Result; use err_trail::ErrContext; use iggy_common::login_with_personal_access_token::LoginWithPersonalAccessToken; use iggy_common::{IggyError, SenderKind}; use tracing::{debug, instrument}; -impl ServerCommandHandler for LoginWithPersonalAccessToken { +impl UnauthenticatedHandler for LoginWithPersonalAccessToken { fn code(&self) -> u32 { iggy_common::LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE } diff --git a/core/server/src/binary/handlers/segments/delete_segments_handler.rs b/core/server/src/binary/handlers/segments/delete_segments_handler.rs index a4eb20654..8d35670d1 100644 --- a/core/server/src/binary/handlers/segments/delete_segments_handler.rs +++ b/core/server/src/binary/handlers/segments/delete_segments_handler.rs @@ -17,7 +17,7 @@ */ use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::partitions::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; @@ -28,8 +28,8 @@ use crate::shard::transmission::message::{ }; use crate::state::command::EntryCommand; use crate::streaming; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; -use anyhow::Result; use err_trail::ErrContext; use iggy_common::delete_segments::DeleteSegments; use iggy_common::sharding::IggyNamespace; @@ -37,16 +37,17 @@ use iggy_common::{IggyError, SenderKind}; use std::rc::Rc; use tracing::{debug, instrument}; -impl ServerCommandHandler for DeleteSegments { +impl AuthenticatedHandler for DeleteSegments { fn code(&self) -> u32 { iggy_common::DELETE_SEGMENTS_CODE } - #[instrument(skip_all, name = "trace_delete_segments", fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = self.topic_id.as_string()))] + #[instrument(skip_all, name = "trace_delete_segments", fields(iggy_user_id = auth.user_id(), iggy_client_id = session.client_id, iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = self.topic_id.as_string()))] async fn handle( self, sender: &mut SenderKind, _length: u32, + auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { @@ -57,8 +58,6 @@ impl ServerCommandHandler for DeleteSegments { let partition_id = self.partition_id as usize; let segments_count = self.segments_count; - // Ensure authentication and topic exists - shard.ensure_authenticated(session)?; shard.ensure_topic_exists(&stream_id, &topic_id)?; shard.ensure_partition_exists(&stream_id, &topic_id, partition_id)?; @@ -114,7 +113,7 @@ impl ServerCommandHandler for DeleteSegments { shard .state .apply( - session.get_user_id(), + auth.user_id(), &EntryCommand::DeleteSegments(self), ) .await diff --git a/core/server/src/binary/handlers/streams/create_stream_handler.rs b/core/server/src/binary/handlers/streams/create_stream_handler.rs index c2935ef6a..c008b1d16 100644 --- a/core/server/src/binary/handlers/streams/create_stream_handler.rs +++ b/core/server/src/binary/handlers/streams/create_stream_handler.rs @@ -17,12 +17,11 @@ */ use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::streams::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; - use crate::shard::IggyShard; use crate::shard::transmission::event::ShardEvent; use crate::shard::transmission::frame::ShardResponse; @@ -32,24 +31,25 @@ use crate::shard::transmission::message::{ use crate::slab::traits_ext::{EntityComponentSystem, EntityMarker}; use crate::state::command::EntryCommand; use crate::state::models::CreateStreamWithId; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; -use anyhow::Result; use err_trail::ErrContext; use iggy_common::create_stream::CreateStream; use iggy_common::{Identifier, IggyError, SenderKind}; use std::rc::Rc; use tracing::{debug, instrument}; -impl ServerCommandHandler for CreateStream { +impl AuthenticatedHandler for CreateStream { fn code(&self) -> u32 { iggy_common::CREATE_STREAM_CODE } - #[instrument(skip_all, name = "trace_create_stream", fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id))] + #[instrument(skip_all, name = "trace_create_stream", fields(iggy_user_id = auth.user_id(), iggy_client_id = session.client_id))] async fn handle( self, sender: &mut SenderKind, _length: u32, + auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { @@ -60,7 +60,7 @@ impl ServerCommandHandler for CreateStream { topic_id: Identifier::default(), partition_id: 0, payload: ShardRequestPayload::CreateStream { - user_id: session.get_user_id(), + user_id: auth.user_id(), name: self.name.clone(), }, }; @@ -91,7 +91,7 @@ impl ServerCommandHandler for CreateStream { shard .state - .apply(session.get_user_id(), &EntryCommand::CreateStream(CreateStreamWithId { + .apply(auth.user_id(), &EntryCommand::CreateStream(CreateStreamWithId { stream_id: created_stream_id as u32, command: self })) @@ -116,7 +116,7 @@ impl ServerCommandHandler for CreateStream { shard .state - .apply(session.get_user_id(), &EntryCommand::CreateStream(CreateStreamWithId { + .apply(auth.user_id(), &EntryCommand::CreateStream(CreateStreamWithId { stream_id: created_stream_id as u32, command: self })) diff --git a/core/server/src/binary/handlers/streams/delete_stream_handler.rs b/core/server/src/binary/handlers/streams/delete_stream_handler.rs index d1e23cb04..43baf411a 100644 --- a/core/server/src/binary/handlers/streams/delete_stream_handler.rs +++ b/core/server/src/binary/handlers/streams/delete_stream_handler.rs @@ -17,7 +17,7 @@ */ use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::streams::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; @@ -29,8 +29,8 @@ use crate::shard::transmission::message::{ }; use crate::slab::traits_ext::EntityMarker; use crate::state::command::EntryCommand; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; -use anyhow::Result; use err_trail::ErrContext; use iggy_common::delete_stream::DeleteStream; use iggy_common::{Identifier, IggyError, SenderKind}; @@ -38,16 +38,17 @@ use std::rc::Rc; use tracing::info; use tracing::{debug, instrument}; -impl ServerCommandHandler for DeleteStream { +impl AuthenticatedHandler for DeleteStream { fn code(&self) -> u32 { iggy_common::DELETE_STREAM_CODE } - #[instrument(skip_all, name = "trace_delete_stream", fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = self.stream_id.as_string()))] + #[instrument(skip_all, name = "trace_delete_stream", fields(iggy_user_id = auth.user_id(), iggy_client_id = session.client_id, iggy_stream_id = self.stream_id.as_string()))] async fn handle( self, sender: &mut SenderKind, _length: u32, + auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { @@ -58,7 +59,7 @@ impl ServerCommandHandler for DeleteStream { topic_id: Identifier::default(), partition_id: 0, payload: ShardRequestPayload::DeleteStream { - user_id: session.get_user_id(), + user_id: auth.user_id(), stream_id: self.stream_id, }, }; @@ -91,7 +92,7 @@ impl ServerCommandHandler for DeleteStream { shard .state - .apply(session.get_user_id(), &EntryCommand::DeleteStream(DeleteStream { stream_id: stream_id.clone() })) + .apply(auth.user_id(), &EntryCommand::DeleteStream(DeleteStream { stream_id: stream_id.clone() })) .await .error(|e: &IggyError| { format!("{COMPONENT} (error: {e}) - failed to apply delete stream with ID: {stream_id}, session: {session}") @@ -103,7 +104,7 @@ impl ServerCommandHandler for DeleteStream { ShardResponse::DeleteStreamResponse(stream) => { shard .state - .apply(session.get_user_id(), &EntryCommand::DeleteStream(DeleteStream { stream_id: (stream.id() as u32).try_into().unwrap() })) + .apply(auth.user_id(), &EntryCommand::DeleteStream(DeleteStream { stream_id: (stream.id() as u32).try_into().unwrap() })) .await .error(|e: &IggyError| { format!("{COMPONENT} (error: {e}) - failed to apply delete stream with ID: {stream_id}, session: {session}") diff --git a/core/server/src/binary/handlers/streams/get_stream_handler.rs b/core/server/src/binary/handlers/streams/get_stream_handler.rs index fe9cd1f83..6a22da3db 100644 --- a/core/server/src/binary/handlers/streams/get_stream_handler.rs +++ b/core/server/src/binary/handlers/streams/get_stream_handler.rs @@ -17,15 +17,15 @@ */ use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::shard::IggyShard; use crate::slab::traits_ext::EntityComponentSystem; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; use crate::streaming::streams; -use anyhow::Result; use err_trail::ErrContext; use iggy_common::IggyError; use iggy_common::SenderKind; @@ -33,7 +33,7 @@ use iggy_common::get_stream::GetStream; use std::rc::Rc; use tracing::debug; -impl ServerCommandHandler for GetStream { +impl AuthenticatedHandler for GetStream { fn code(&self) -> u32 { iggy_common::GET_STREAM_CODE } @@ -42,11 +42,11 @@ impl ServerCommandHandler for GetStream { self, sender: &mut SenderKind, _length: u32, + auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); - shard.ensure_authenticated(session)?; let exists = shard.ensure_stream_exists(&self.stream_id).is_ok(); if !exists { sender.send_empty_ok_response().await?; @@ -58,12 +58,12 @@ impl ServerCommandHandler for GetStream { let has_permission = shard .permissioner .borrow() - .get_stream(session.get_user_id(), stream_id) + .get_stream(auth.user_id(), stream_id) .error(|e: &IggyError| { format!( "permission denied to get stream with ID: {} for user with ID: {}, error: {e}", self.stream_id, - session.get_user_id(), + auth.user_id(), ) }) .is_ok(); diff --git a/core/server/src/binary/handlers/streams/get_streams_handler.rs b/core/server/src/binary/handlers/streams/get_streams_handler.rs index f7bf06f2d..a7a39641c 100644 --- a/core/server/src/binary/handlers/streams/get_streams_handler.rs +++ b/core/server/src/binary/handlers/streams/get_streams_handler.rs @@ -17,15 +17,15 @@ */ use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::streams::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::shard::IggyShard; use crate::slab::traits_ext::{EntityComponentSystem, IntoComponents}; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; -use anyhow::Result; use err_trail::ErrContext; use iggy_common::IggyError; use iggy_common::SenderKind; @@ -33,7 +33,7 @@ use iggy_common::get_streams::GetStreams; use std::rc::Rc; use tracing::debug; -impl ServerCommandHandler for GetStreams { +impl AuthenticatedHandler for GetStreams { fn code(&self) -> u32 { iggy_common::GET_STREAMS_CODE } @@ -42,19 +42,19 @@ impl ServerCommandHandler for GetStreams { self, sender: &mut SenderKind, _length: u32, + auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); - shard.ensure_authenticated(session)?; shard .permissioner .borrow() - .get_streams(session.get_user_id()) + .get_streams(auth.user_id()) .error(|e: &IggyError| { format!( "{COMPONENT} (error: {e}) - permission denied to get streams for user {}", - session.get_user_id() + auth.user_id() ) })?; diff --git a/core/server/src/binary/handlers/streams/purge_stream_handler.rs b/core/server/src/binary/handlers/streams/purge_stream_handler.rs index bfa189613..dd8c00ffb 100644 --- a/core/server/src/binary/handlers/streams/purge_stream_handler.rs +++ b/core/server/src/binary/handlers/streams/purge_stream_handler.rs @@ -17,32 +17,32 @@ */ use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::streams::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; - use crate::shard::IggyShard; use crate::shard::transmission::event::ShardEvent; use crate::state::command::EntryCommand; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; -use anyhow::Result; use err_trail::ErrContext; use iggy_common::purge_stream::PurgeStream; use iggy_common::{IggyError, SenderKind}; use std::rc::Rc; use tracing::{debug, instrument}; -impl ServerCommandHandler for PurgeStream { +impl AuthenticatedHandler for PurgeStream { fn code(&self) -> u32 { iggy_common::PURGE_STREAM_CODE } - #[instrument(skip_all, name = "trace_purge_stream", fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = self.stream_id.as_string()))] + #[instrument(skip_all, name = "trace_purge_stream", fields(iggy_user_id = auth.user_id(), iggy_client_id = session.client_id, iggy_stream_id = self.stream_id.as_string()))] async fn handle( self, sender: &mut SenderKind, _length: u32, + auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { @@ -60,7 +60,7 @@ impl ServerCommandHandler for PurgeStream { shard.broadcast_event_to_all_shards(event).await?; shard .state - .apply(session.get_user_id(), &EntryCommand::PurgeStream(self)) + .apply(auth.user_id(), &EntryCommand::PurgeStream(self)) .await .error(|e: &IggyError| { format!("{COMPONENT} (error: {e}) - failed to apply purge stream with id: {stream_id}, session: {session}") diff --git a/core/server/src/binary/handlers/streams/update_stream_handler.rs b/core/server/src/binary/handlers/streams/update_stream_handler.rs index 8e4de6574..84c55b679 100644 --- a/core/server/src/binary/handlers/streams/update_stream_handler.rs +++ b/core/server/src/binary/handlers/streams/update_stream_handler.rs @@ -17,32 +17,32 @@ */ use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::streams::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; - use crate::shard::IggyShard; use crate::shard::transmission::event::ShardEvent; use crate::state::command::EntryCommand; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; -use anyhow::Result; use err_trail::ErrContext; use iggy_common::update_stream::UpdateStream; use iggy_common::{IggyError, SenderKind}; use std::rc::Rc; use tracing::{debug, instrument}; -impl ServerCommandHandler for UpdateStream { +impl AuthenticatedHandler for UpdateStream { fn code(&self) -> u32 { iggy_common::UPDATE_STREAM_CODE } - #[instrument(skip_all, name = "trace_update_stream", fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = self.stream_id.as_string()))] + #[instrument(skip_all, name = "trace_update_stream", fields(iggy_user_id = auth.user_id(), iggy_client_id = session.client_id, iggy_stream_id = self.stream_id.as_string()))] async fn handle( self, sender: &mut SenderKind, _length: u32, + auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { @@ -61,7 +61,7 @@ impl ServerCommandHandler for UpdateStream { shard.broadcast_event_to_all_shards(event).await?; shard .state - .apply(session.get_user_id(), &EntryCommand::UpdateStream(self)) + .apply(auth.user_id(), &EntryCommand::UpdateStream(self)) .await .error(|e: &IggyError| { format!("{COMPONENT} (error: {e}) - failed to apply update stream with id: {stream_id}, session: {session}") diff --git a/core/server/src/binary/handlers/system/get_client_handler.rs b/core/server/src/binary/handlers/system/get_client_handler.rs index b9e16aef3..da71329bd 100644 --- a/core/server/src/binary/handlers/system/get_client_handler.rs +++ b/core/server/src/binary/handlers/system/get_client_handler.rs @@ -17,11 +17,12 @@ */ use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::shard::IggyShard; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; use iggy_common::IggyError; use iggy_common::SenderKind; @@ -29,7 +30,7 @@ use iggy_common::get_client::GetClient; use std::rc::Rc; use tracing::debug; -impl ServerCommandHandler for GetClient { +impl AuthenticatedHandler for GetClient { fn code(&self) -> u32 { iggy_common::GET_CLIENT_CODE } @@ -38,6 +39,7 @@ impl ServerCommandHandler for GetClient { self, sender: &mut SenderKind, _length: u32, + _auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { diff --git a/core/server/src/binary/handlers/system/get_clients_handler.rs b/core/server/src/binary/handlers/system/get_clients_handler.rs index 81ac21b5f..6288cf969 100644 --- a/core/server/src/binary/handlers/system/get_clients_handler.rs +++ b/core/server/src/binary/handlers/system/get_clients_handler.rs @@ -17,12 +17,13 @@ */ use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::system::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::shard::IggyShard; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; use err_trail::ErrContext; use iggy_common::IggyError; @@ -31,7 +32,7 @@ use iggy_common::get_clients::GetClients; use std::rc::Rc; use tracing::debug; -impl ServerCommandHandler for GetClients { +impl AuthenticatedHandler for GetClients { fn code(&self) -> u32 { iggy_common::GET_CLIENTS_CODE } @@ -40,6 +41,7 @@ impl ServerCommandHandler for GetClients { self, sender: &mut SenderKind, _length: u32, + _auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { diff --git a/core/server/src/binary/handlers/system/get_me_handler.rs b/core/server/src/binary/handlers/system/get_me_handler.rs index 339063ab3..ba22c9660 100644 --- a/core/server/src/binary/handlers/system/get_me_handler.rs +++ b/core/server/src/binary/handlers/system/get_me_handler.rs @@ -17,12 +17,13 @@ */ use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::system::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::shard::IggyShard; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; use err_trail::ErrContext; use iggy_common::IggyError; @@ -30,7 +31,7 @@ use iggy_common::SenderKind; use iggy_common::get_me::GetMe; use std::rc::Rc; -impl ServerCommandHandler for GetMe { +impl AuthenticatedHandler for GetMe { fn code(&self) -> u32 { iggy_common::GET_ME_CODE } @@ -39,6 +40,7 @@ impl ServerCommandHandler for GetMe { self, sender: &mut SenderKind, _length: u32, + _auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { diff --git a/core/server/src/binary/handlers/system/get_snapshot.rs b/core/server/src/binary/handlers/system/get_snapshot.rs index cc2163b91..3e6f76ba9 100644 --- a/core/server/src/binary/handlers/system/get_snapshot.rs +++ b/core/server/src/binary/handlers/system/get_snapshot.rs @@ -17,10 +17,11 @@ */ use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::utils::receive_and_validate; use crate::shard::IggyShard; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; use bytes::Bytes; use iggy_common::IggyError; @@ -29,7 +30,7 @@ use iggy_common::get_snapshot::GetSnapshot; use std::rc::Rc; use tracing::debug; -impl ServerCommandHandler for GetSnapshot { +impl AuthenticatedHandler for GetSnapshot { fn code(&self) -> u32 { iggy_common::GET_SNAPSHOT_FILE_CODE } @@ -38,6 +39,7 @@ impl ServerCommandHandler for GetSnapshot { self, sender: &mut SenderKind, _length: u32, + _auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { diff --git a/core/server/src/binary/handlers/system/get_stats_handler.rs b/core/server/src/binary/handlers/system/get_stats_handler.rs index 2fe368ce1..2284295c1 100644 --- a/core/server/src/binary/handlers/system/get_stats_handler.rs +++ b/core/server/src/binary/handlers/system/get_stats_handler.rs @@ -17,7 +17,7 @@ */ use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::system::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; @@ -27,6 +27,7 @@ use crate::shard::transmission::frame::ShardResponse; use crate::shard::transmission::message::{ ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult, }; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; use err_trail::ErrContext; use iggy_common::SenderKind; @@ -35,7 +36,7 @@ use iggy_common::{Identifier, IggyError}; use std::rc::Rc; use tracing::debug; -impl ServerCommandHandler for GetStats { +impl AuthenticatedHandler for GetStats { fn code(&self) -> u32 { iggy_common::GET_STATS_CODE } @@ -44,6 +45,7 @@ impl ServerCommandHandler for GetStats { self, sender: &mut SenderKind, _length: u32, + auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { @@ -55,7 +57,7 @@ impl ServerCommandHandler for GetStats { topic_id: Identifier::default(), partition_id: 0, payload: ShardRequestPayload::GetStats { - user_id: session.get_user_id(), + user_id: auth.user_id(), }, }; diff --git a/core/server/src/binary/handlers/system/ping_handler.rs b/core/server/src/binary/handlers/system/ping_handler.rs index e22fa7ba8..a7d777c28 100644 --- a/core/server/src/binary/handlers/system/ping_handler.rs +++ b/core/server/src/binary/handlers/system/ping_handler.rs @@ -16,10 +16,9 @@ * under the License. */ -use crate::binary::command::{BinaryServerCommand, HandlerResult, ServerCommandHandler}; +use crate::binary::command::{BinaryServerCommand, HandlerResult, UnauthenticatedHandler}; use crate::shard::IggyShard; use crate::streaming::session::Session; -use anyhow::Result; use iggy_common::IggyError; use iggy_common::IggyTimestamp; use iggy_common::SenderKind; @@ -27,7 +26,7 @@ use iggy_common::ping::Ping; use std::rc::Rc; use tracing::debug; -impl ServerCommandHandler for Ping { +impl UnauthenticatedHandler for Ping { fn code(&self) -> u32 { iggy_common::PING_CODE } diff --git a/core/server/src/binary/handlers/topics/create_topic_handler.rs b/core/server/src/binary/handlers/topics/create_topic_handler.rs index fa5c0270a..d8e85b541 100644 --- a/core/server/src/binary/handlers/topics/create_topic_handler.rs +++ b/core/server/src/binary/handlers/topics/create_topic_handler.rs @@ -17,12 +17,11 @@ */ use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::topics::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; - use crate::shard::IggyShard; use crate::shard::transmission::event::ShardEvent; use crate::shard::transmission::frame::ShardResponse; @@ -32,25 +31,26 @@ use crate::shard::transmission::message::{ use crate::slab::traits_ext::EntityMarker; use crate::state::command::EntryCommand; use crate::state::models::CreateTopicWithId; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; use crate::streaming::streams; -use anyhow::Result; use err_trail::ErrContext; use iggy_common::create_topic::CreateTopic; use iggy_common::{Identifier, IggyError, SenderKind}; use std::rc::Rc; use tracing::{debug, instrument}; -impl ServerCommandHandler for CreateTopic { +impl AuthenticatedHandler for CreateTopic { fn code(&self) -> u32 { iggy_common::CREATE_TOPIC_CODE } - #[instrument(skip_all, name = "trace_create_topic", fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = self.stream_id.as_string()))] + #[instrument(skip_all, name = "trace_create_topic", fields(iggy_user_id = auth.user_id(), iggy_client_id = session.client_id, iggy_stream_id = self.stream_id.as_string()))] async fn handle( mut self, sender: &mut SenderKind, _length: u32, + auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { @@ -61,7 +61,7 @@ impl ServerCommandHandler for CreateTopic { topic_id: Identifier::default(), partition_id: 0, payload: ShardRequestPayload::CreateTopic { - user_id: session.get_user_id(), + user_id: auth.user_id(), stream_id: self.stream_id.clone(), name: self.name.clone(), partitions_count: self.partitions_count, @@ -137,7 +137,7 @@ impl ServerCommandHandler for CreateTopic { ); shard .state - .apply(session.get_user_id(), &EntryCommand::CreateTopic(CreateTopicWithId { + .apply(auth.user_id(), &EntryCommand::CreateTopic(CreateTopicWithId { topic_id: topic_id as u32, command: self })) @@ -172,7 +172,7 @@ impl ServerCommandHandler for CreateTopic { shard .state - .apply(session.get_user_id(), &EntryCommand::CreateTopic(CreateTopicWithId { + .apply(auth.user_id(), &EntryCommand::CreateTopic(CreateTopicWithId { topic_id: topic_id as u32, command: self })) diff --git a/core/server/src/binary/handlers/topics/delete_topic_handler.rs b/core/server/src/binary/handlers/topics/delete_topic_handler.rs index 668dbd707..de3545bb1 100644 --- a/core/server/src/binary/handlers/topics/delete_topic_handler.rs +++ b/core/server/src/binary/handlers/topics/delete_topic_handler.rs @@ -17,11 +17,10 @@ */ use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::topics::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; - use crate::shard::IggyShard; use crate::shard::transmission::event::ShardEvent; use crate::shard::transmission::frame::ShardResponse; @@ -30,9 +29,9 @@ use crate::shard::transmission::message::{ }; use crate::slab::traits_ext::EntityMarker; use crate::state::command::EntryCommand; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; use crate::streaming::streams; -use anyhow::Result; use err_trail::ErrContext; use iggy_common::delete_topic::DeleteTopic; use iggy_common::{Identifier, IggyError, SenderKind}; @@ -40,16 +39,17 @@ use std::rc::Rc; use tracing::info; use tracing::{debug, instrument}; -impl ServerCommandHandler for DeleteTopic { +impl AuthenticatedHandler for DeleteTopic { fn code(&self) -> u32 { iggy_common::DELETE_TOPIC_CODE } - #[instrument(skip_all, name = "trace_delete_topic", fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = self.topic_id.as_string()))] + #[instrument(skip_all, name = "trace_delete_topic", fields(iggy_user_id = auth.user_id(), iggy_client_id = session.client_id, iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = self.topic_id.as_string()))] async fn handle( self, sender: &mut SenderKind, _length: u32, + auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { @@ -60,7 +60,7 @@ impl ServerCommandHandler for DeleteTopic { topic_id: Identifier::default(), partition_id: 0, payload: ShardRequestPayload::DeleteTopic { - user_id: session.get_user_id(), + user_id: auth.user_id(), stream_id: self.stream_id.clone(), topic_id: self.topic_id.clone(), }, @@ -100,7 +100,7 @@ impl ServerCommandHandler for DeleteTopic { shard .state - .apply(session.get_user_id(), &EntryCommand::DeleteTopic(self)) + .apply(auth.user_id(), &EntryCommand::DeleteTopic(self)) .await .error(|e: &IggyError| format!( "{COMPONENT} (error: {e}) - failed to apply delete topic with ID: {topic_id_num} in stream with ID: {stream_id_num}, session: {session}", @@ -116,7 +116,7 @@ impl ServerCommandHandler for DeleteTopic { ShardResponse::DeleteTopicResponse(topic) => { shard .state - .apply(session.get_user_id(), &EntryCommand::DeleteTopic(self)) + .apply(auth.user_id(), &EntryCommand::DeleteTopic(self)) .await .error(|e: &IggyError| format!( "{COMPONENT} (error: {e}) - failed to apply delete topic with ID: {}, session: {session}", diff --git a/core/server/src/binary/handlers/topics/get_topic_handler.rs b/core/server/src/binary/handlers/topics/get_topic_handler.rs index fcc42ac0b..c4a211f08 100644 --- a/core/server/src/binary/handlers/topics/get_topic_handler.rs +++ b/core/server/src/binary/handlers/topics/get_topic_handler.rs @@ -17,21 +17,21 @@ */ use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::shard::IggyShard; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; use crate::streaming::streams; -use anyhow::Result; use iggy_common::IggyError; use iggy_common::SenderKind; use iggy_common::get_topic::GetTopic; use std::rc::Rc; use tracing::debug; -impl ServerCommandHandler for GetTopic { +impl AuthenticatedHandler for GetTopic { fn code(&self) -> u32 { iggy_common::GET_TOPIC_CODE } @@ -40,11 +40,11 @@ impl ServerCommandHandler for GetTopic { self, sender: &mut SenderKind, _length: u32, + auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); - shard.ensure_authenticated(session)?; let exists = shard .ensure_topic_exists(&self.stream_id, &self.topic_id) .is_ok(); @@ -60,7 +60,7 @@ impl ServerCommandHandler for GetTopic { .permissioner .borrow() .get_topic( - session.get_user_id(), + auth.user_id(), numeric_stream_id, self.topic_id .get_u32_value() diff --git a/core/server/src/binary/handlers/topics/get_topics_handler.rs b/core/server/src/binary/handlers/topics/get_topics_handler.rs index 96c3f9325..73e6ea692 100644 --- a/core/server/src/binary/handlers/topics/get_topics_handler.rs +++ b/core/server/src/binary/handlers/topics/get_topics_handler.rs @@ -17,22 +17,22 @@ */ use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::shard::IggyShard; use crate::slab::traits_ext::{EntityComponentSystem, IntoComponents}; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; use crate::streaming::streams; -use anyhow::Result; use iggy_common::IggyError; use iggy_common::SenderKind; use iggy_common::get_topics::GetTopics; use std::rc::Rc; use tracing::debug; -impl ServerCommandHandler for GetTopics { +impl AuthenticatedHandler for GetTopics { fn code(&self) -> u32 { iggy_common::GET_TOPICS_CODE } @@ -41,11 +41,11 @@ impl ServerCommandHandler for GetTopics { self, sender: &mut SenderKind, _length: u32, + auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); - shard.ensure_authenticated(session)?; shard.ensure_stream_exists(&self.stream_id)?; let numeric_stream_id = shard .streams @@ -53,7 +53,7 @@ impl ServerCommandHandler for GetTopics { shard .permissioner .borrow() - .get_topics(session.get_user_id(), numeric_stream_id)?; + .get_topics(auth.user_id(), numeric_stream_id)?; let response = shard.streams.with_topics(&self.stream_id, |topics| { topics.with_components(|topics| { diff --git a/core/server/src/binary/handlers/topics/purge_topic_handler.rs b/core/server/src/binary/handlers/topics/purge_topic_handler.rs index 6e86293ae..950c8d421 100644 --- a/core/server/src/binary/handlers/topics/purge_topic_handler.rs +++ b/core/server/src/binary/handlers/topics/purge_topic_handler.rs @@ -17,30 +17,31 @@ */ use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::topics::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::shard::IggyShard; use crate::state::command::EntryCommand; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; -use anyhow::Result; use err_trail::ErrContext; use iggy_common::purge_topic::PurgeTopic; use iggy_common::{IggyError, SenderKind}; use std::rc::Rc; use tracing::{debug, instrument}; -impl ServerCommandHandler for PurgeTopic { +impl AuthenticatedHandler for PurgeTopic { fn code(&self) -> u32 { iggy_common::PURGE_TOPIC_CODE } - #[instrument(skip_all, name = "trace_purge_topic", fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = self.topic_id.as_string()))] + #[instrument(skip_all, name = "trace_purge_topic", fields(iggy_user_id = auth.user_id(), iggy_client_id = session.client_id, iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = self.topic_id.as_string()))] async fn handle( self, sender: &mut SenderKind, _length: u32, + auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { @@ -66,7 +67,7 @@ impl ServerCommandHandler for PurgeTopic { shard .state - .apply(session.get_user_id(), &EntryCommand::PurgeTopic(self)) + .apply(auth.user_id(), &EntryCommand::PurgeTopic(self)) .await .error(|e: &IggyError| { format!( diff --git a/core/server/src/binary/handlers/topics/update_topic_handler.rs b/core/server/src/binary/handlers/topics/update_topic_handler.rs index 90f5e11ec..33f31398d 100644 --- a/core/server/src/binary/handlers/topics/update_topic_handler.rs +++ b/core/server/src/binary/handlers/topics/update_topic_handler.rs @@ -17,11 +17,10 @@ */ use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::topics::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; - use crate::shard::IggyShard; use crate::shard::transmission::event::ShardEvent; use crate::shard::transmission::frame::ShardResponse; @@ -29,25 +28,26 @@ use crate::shard::transmission::message::{ ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult, }; use crate::state::command::EntryCommand; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; use crate::streaming::{streams, topics}; -use anyhow::Result; use err_trail::ErrContext; use iggy_common::update_topic::UpdateTopic; use iggy_common::{Identifier, IggyError, SenderKind}; use std::rc::Rc; use tracing::{debug, instrument}; -impl ServerCommandHandler for UpdateTopic { +impl AuthenticatedHandler for UpdateTopic { fn code(&self) -> u32 { iggy_common::UPDATE_TOPIC_CODE } - #[instrument(skip_all, name = "trace_update_topic", fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id, iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = self.topic_id.as_string()))] + #[instrument(skip_all, name = "trace_update_topic", fields(iggy_user_id = auth.user_id(), iggy_client_id = session.client_id, iggy_stream_id = self.stream_id.as_string(), iggy_topic_id = self.topic_id.as_string()))] async fn handle( mut self, sender: &mut SenderKind, _length: u32, + auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { @@ -58,7 +58,7 @@ impl ServerCommandHandler for UpdateTopic { topic_id: Identifier::default(), partition_id: 0, payload: ShardRequestPayload::UpdateTopic { - user_id: session.get_user_id(), + user_id: auth.user_id(), stream_id: self.stream_id.clone(), topic_id: self.topic_id.clone(), name: self.name.clone(), @@ -130,7 +130,7 @@ impl ServerCommandHandler for UpdateTopic { shard .state - .apply(session.get_user_id(), &EntryCommand::UpdateTopic(self)) + .apply(auth.user_id(), &EntryCommand::UpdateTopic(self)) .await .error(|e: &IggyError| format!( "{COMPONENT} (error: {e}) - failed to apply update topic with id: {topic_id} in stream with ID: {stream_id_num}, session: {session}" @@ -150,7 +150,7 @@ impl ServerCommandHandler for UpdateTopic { shard .state - .apply(session.get_user_id(), &EntryCommand::UpdateTopic(self)) + .apply(auth.user_id(), &EntryCommand::UpdateTopic(self)) .await .error(|e: &IggyError| format!( "{COMPONENT} (error: {e}) - failed to apply update topic in stream with ID: {stream_id}, session: {session}" diff --git a/core/server/src/binary/handlers/users/change_password_handler.rs b/core/server/src/binary/handlers/users/change_password_handler.rs index 0dadd98bb..98c7298dc 100644 --- a/core/server/src/binary/handlers/users/change_password_handler.rs +++ b/core/server/src/binary/handlers/users/change_password_handler.rs @@ -17,17 +17,16 @@ */ use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::users::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; - use crate::shard::IggyShard; use crate::shard::transmission::event::ShardEvent; use crate::state::command::EntryCommand; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; use crate::streaming::utils::crypto; -use anyhow::Result; use err_trail::ErrContext; use iggy_common::change_password::ChangePassword; use iggy_common::{IggyError, SenderKind}; @@ -35,16 +34,17 @@ use std::rc::Rc; use tracing::info; use tracing::{debug, instrument}; -impl ServerCommandHandler for ChangePassword { +impl AuthenticatedHandler for ChangePassword { fn code(&self) -> u32 { iggy_common::CHANGE_PASSWORD_CODE } - #[instrument(skip_all, name = "trace_change_password", fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id))] + #[instrument(skip_all, name = "trace_change_password", fields(iggy_user_id = auth.user_id(), iggy_client_id = session.client_id))] async fn handle( self, sender: &mut SenderKind, _length: u32, + auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { @@ -78,7 +78,7 @@ impl ServerCommandHandler for ChangePassword { shard .state .apply( - session.get_user_id(), + auth.user_id(), &EntryCommand::ChangePassword(ChangePassword { user_id: self.user_id.to_owned(), current_password: "".into(), diff --git a/core/server/src/binary/handlers/users/create_user_handler.rs b/core/server/src/binary/handlers/users/create_user_handler.rs index 3cace71b3..9ee8819b6 100644 --- a/core/server/src/binary/handlers/users/create_user_handler.rs +++ b/core/server/src/binary/handlers/users/create_user_handler.rs @@ -17,7 +17,7 @@ */ use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::users::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; @@ -30,9 +30,9 @@ use crate::shard::transmission::message::{ }; use crate::state::command::EntryCommand; use crate::state::models::CreateUserWithId; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; use crate::streaming::utils::crypto; -use anyhow::Result; use err_trail::ErrContext; use iggy_common::create_user::CreateUser; use iggy_common::{Identifier, IggyError, SenderKind}; @@ -40,16 +40,17 @@ use std::rc::Rc; use tracing::debug; use tracing::instrument; -impl ServerCommandHandler for CreateUser { +impl AuthenticatedHandler for CreateUser { fn code(&self) -> u32 { iggy_common::CREATE_USER_CODE } - #[instrument(skip_all, name = "trace_create_user", fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id))] + #[instrument(skip_all, name = "trace_create_user", fields(iggy_user_id = auth.user_id(), iggy_client_id = session.client_id))] async fn handle( self, sender: &mut SenderKind, _length: u32, + auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { @@ -60,7 +61,7 @@ impl ServerCommandHandler for CreateUser { topic_id: Identifier::default(), partition_id: 0, payload: ShardRequestPayload::CreateUser { - user_id: session.get_user_id(), + user_id: auth.user_id(), username: self.username.clone(), password: self.password.clone(), status: self.status, @@ -106,7 +107,7 @@ impl ServerCommandHandler for CreateUser { shard .state .apply( - session.get_user_id(), + auth.user_id(), &EntryCommand::CreateUser(CreateUserWithId { user_id, command: CreateUser { @@ -140,7 +141,7 @@ impl ServerCommandHandler for CreateUser { shard .state .apply( - session.get_user_id(), + auth.user_id(), &EntryCommand::CreateUser(CreateUserWithId { user_id, command: CreateUser { diff --git a/core/server/src/binary/handlers/users/delete_user_handler.rs b/core/server/src/binary/handlers/users/delete_user_handler.rs index a7fa3ec1b..16f5ccdb6 100644 --- a/core/server/src/binary/handlers/users/delete_user_handler.rs +++ b/core/server/src/binary/handlers/users/delete_user_handler.rs @@ -19,11 +19,10 @@ use std::rc::Rc; use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::users::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; - use crate::shard::IggyShard; use crate::shard::transmission::event::ShardEvent; use crate::shard::transmission::frame::ShardResponse; @@ -31,24 +30,25 @@ use crate::shard::transmission::message::{ ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult, }; use crate::state::command::EntryCommand; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; -use anyhow::Result; use err_trail::ErrContext; use iggy_common::delete_user::DeleteUser; use iggy_common::{Identifier, IggyError, SenderKind}; use tracing::info; use tracing::{debug, instrument}; -impl ServerCommandHandler for DeleteUser { +impl AuthenticatedHandler for DeleteUser { fn code(&self) -> u32 { iggy_common::DELETE_USER_CODE } - #[instrument(skip_all, name = "trace_delete_user", fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id))] + #[instrument(skip_all, name = "trace_delete_user", fields(iggy_user_id = auth.user_id(), iggy_client_id = session.client_id))] async fn handle( self, sender: &mut SenderKind, _length: u32, + auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { @@ -59,7 +59,7 @@ impl ServerCommandHandler for DeleteUser { topic_id: Identifier::default(), partition_id: 0, payload: ShardRequestPayload::DeleteUser { - session_user_id: session.get_user_id(), + session_user_id: auth.user_id(), user_id: self.user_id, }, }; diff --git a/core/server/src/binary/handlers/users/get_user_handler.rs b/core/server/src/binary/handlers/users/get_user_handler.rs index 97322a340..d34792b4e 100644 --- a/core/server/src/binary/handlers/users/get_user_handler.rs +++ b/core/server/src/binary/handlers/users/get_user_handler.rs @@ -19,18 +19,19 @@ use std::rc::Rc; use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::shard::IggyShard; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; use iggy_common::IggyError; use iggy_common::SenderKind; use iggy_common::get_user::GetUser; use tracing::debug; -impl ServerCommandHandler for GetUser { +impl AuthenticatedHandler for GetUser { fn code(&self) -> u32 { iggy_common::GET_USER_CODE } @@ -39,6 +40,7 @@ impl ServerCommandHandler for GetUser { self, sender: &mut SenderKind, _length: u32, + _auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { diff --git a/core/server/src/binary/handlers/users/get_users_handler.rs b/core/server/src/binary/handlers/users/get_users_handler.rs index bca683a0a..350d9de6d 100644 --- a/core/server/src/binary/handlers/users/get_users_handler.rs +++ b/core/server/src/binary/handlers/users/get_users_handler.rs @@ -19,12 +19,13 @@ use std::rc::Rc; use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::users::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::shard::IggyShard; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; use err_trail::ErrContext; use iggy_common::IggyError; @@ -32,7 +33,7 @@ use iggy_common::SenderKind; use iggy_common::get_users::GetUsers; use tracing::debug; -impl ServerCommandHandler for GetUsers { +impl AuthenticatedHandler for GetUsers { fn code(&self) -> u32 { iggy_common::GET_USERS_CODE } @@ -41,6 +42,7 @@ impl ServerCommandHandler for GetUsers { self, sender: &mut SenderKind, _length: u32, + _auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { diff --git a/core/server/src/binary/handlers/users/login_user_handler.rs b/core/server/src/binary/handlers/users/login_user_handler.rs index 550be02b7..ed7fa3252 100644 --- a/core/server/src/binary/handlers/users/login_user_handler.rs +++ b/core/server/src/binary/handlers/users/login_user_handler.rs @@ -17,21 +17,20 @@ */ use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + BinaryServerCommand, HandlerResult, ServerCommand, UnauthenticatedHandler, }; use crate::binary::handlers::users::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::shard::IggyShard; use crate::streaming::session::Session; -use anyhow::Result; use err_trail::ErrContext; use iggy_common::login_user::LoginUser; use iggy_common::{IggyError, SenderKind}; use std::rc::Rc; use tracing::{debug, info, instrument, warn}; -impl ServerCommandHandler for LoginUser { +impl UnauthenticatedHandler for LoginUser { fn code(&self) -> u32 { iggy_common::LOGIN_USER_CODE } diff --git a/core/server/src/binary/handlers/users/logout_user_handler.rs b/core/server/src/binary/handlers/users/logout_user_handler.rs index 714815173..995b55422 100644 --- a/core/server/src/binary/handlers/users/logout_user_handler.rs +++ b/core/server/src/binary/handlers/users/logout_user_handler.rs @@ -19,39 +19,39 @@ use std::rc::Rc; use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::users::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; - use crate::shard::IggyShard; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; -use anyhow::Result; use err_trail::ErrContext; use iggy_common::logout_user::LogoutUser; use iggy_common::{IggyError, SenderKind}; use tracing::info; use tracing::{debug, instrument}; -impl ServerCommandHandler for LogoutUser { +impl AuthenticatedHandler for LogoutUser { fn code(&self) -> u32 { iggy_common::LOGOUT_USER_CODE } - #[instrument(skip_all, name = "trace_logout_user", fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id))] + #[instrument(skip_all, name = "trace_logout_user", fields(iggy_user_id = auth.user_id(), iggy_client_id = session.client_id))] async fn handle( self, sender: &mut SenderKind, _length: u32, + auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { debug!("session: {session}, command: {self}"); - info!("Logging out user with ID: {}...", session.get_user_id()); + info!("Logging out user with ID: {}...", auth.user_id()); shard.logout_user(session).error(|e: &IggyError| { format!("{COMPONENT} (error: {e}) - failed to logout user, session: {session}") })?; - info!("Logged out user with ID: {}.", session.get_user_id()); + info!("Logged out user with ID: {}.", auth.user_id()); session.clear_user_id(); sender.send_empty_ok_response().await?; Ok(HandlerResult::Finished) diff --git a/core/server/src/binary/handlers/users/update_permissions_handler.rs b/core/server/src/binary/handlers/users/update_permissions_handler.rs index 30829a840..ecc45192d 100644 --- a/core/server/src/binary/handlers/users/update_permissions_handler.rs +++ b/core/server/src/binary/handlers/users/update_permissions_handler.rs @@ -19,32 +19,32 @@ use std::rc::Rc; use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::users::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; - use crate::shard::IggyShard; use crate::shard::transmission::event::ShardEvent; use crate::state::command::EntryCommand; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; -use anyhow::Result; use err_trail::ErrContext; use iggy_common::update_permissions::UpdatePermissions; use iggy_common::{IggyError, SenderKind}; use tracing::info; use tracing::{debug, instrument}; -impl ServerCommandHandler for UpdatePermissions { +impl AuthenticatedHandler for UpdatePermissions { fn code(&self) -> u32 { iggy_common::UPDATE_PERMISSIONS_CODE } - #[instrument(skip_all, name = "trace_update_permissions", fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id))] + #[instrument(skip_all, name = "trace_update_permissions", fields(iggy_user_id = auth.user_id(), iggy_client_id = session.client_id))] async fn handle( self, sender: &mut SenderKind, _length: u32, + auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { @@ -64,10 +64,7 @@ impl ServerCommandHandler for UpdatePermissions { shard .state - .apply( - session.get_user_id(), - &EntryCommand::UpdatePermissions(self), - ) + .apply(auth.user_id(), &EntryCommand::UpdatePermissions(self)) .await?; sender.send_empty_ok_response().await?; Ok(HandlerResult::Finished) diff --git a/core/server/src/binary/handlers/users/update_user_handler.rs b/core/server/src/binary/handlers/users/update_user_handler.rs index bdb929136..73dc72e22 100644 --- a/core/server/src/binary/handlers/users/update_user_handler.rs +++ b/core/server/src/binary/handlers/users/update_user_handler.rs @@ -19,32 +19,32 @@ use std::rc::Rc; use crate::binary::command::{ - BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler, + AuthenticatedHandler, BinaryServerCommand, HandlerResult, ServerCommand, }; use crate::binary::handlers::users::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; - use crate::shard::IggyShard; use crate::shard::transmission::event::ShardEvent; use crate::state::command::EntryCommand; +use crate::streaming::auth::Auth; use crate::streaming::session::Session; -use anyhow::Result; use err_trail::ErrContext; use iggy_common::update_user::UpdateUser; use iggy_common::{IggyError, SenderKind}; use tracing::info; use tracing::{debug, instrument}; -impl ServerCommandHandler for UpdateUser { +impl AuthenticatedHandler for UpdateUser { fn code(&self) -> u32 { iggy_common::UPDATE_USER_CODE } - #[instrument(skip_all, name = "trace_update_user", fields(iggy_user_id = session.get_user_id(), iggy_client_id = session.client_id))] + #[instrument(skip_all, name = "trace_update_user", fields(iggy_user_id = auth.user_id(), iggy_client_id = session.client_id))] async fn handle( self, sender: &mut SenderKind, _length: u32, + auth: Auth, session: &Session, shard: &Rc<IggyShard>, ) -> Result<HandlerResult, IggyError> { @@ -76,7 +76,7 @@ impl ServerCommandHandler for UpdateUser { let user_id = self.user_id.clone(); shard .state - .apply(session.get_user_id(), &EntryCommand::UpdateUser(self)) + .apply(auth.user_id(), &EntryCommand::UpdateUser(self)) .await .error(|e: &IggyError| { format!( diff --git a/core/server/src/binary/macros.rs b/core/server/src/binary/macros.rs index a56c0bee0..5d80b01ca 100644 --- a/core/server/src/binary/macros.rs +++ b/core/server/src/binary/macros.rs @@ -16,37 +16,54 @@ * under the License. */ -/// This macro does 4 expansions in one go: -/// -/// 1) The `#[enum_dispatch(ServerCommandHandler)] pub enum ServerCommand` -/// with all variants. -/// 2) The `from_code_and_payload(code, payload)` function that matches each code. -/// 3) The `to_bytes()` function that matches each variant. -/// 4) The `validate()` function that matches each variant. +/// Generates `ServerCommand` enum with `@unauth`/`@auth` split and `dispatch()` method. +/// For `@auth` commands, `dispatch()` calls `shard.auth(session)?` before handler. #[macro_export] macro_rules! define_server_command_enum { ( - $( - // Macro pattern: - // variant name inner type numeric code display string show_payload? - $variant:ident ( $ty:ty ), $code:ident, $display_str:expr, $show_payload:expr - );* $(;)? + @unauth { + $( + $unauth_variant:ident ( $unauth_ty:ty ), $unauth_code:ident, $unauth_display:expr, $unauth_show_payload:expr + );* $(;)? + } + @auth { + $( + $auth_variant:ident ( $auth_ty:ty ), $auth_code:ident, $auth_display:expr, $auth_show_payload:expr + );* $(;)? + } ) => { - #[enum_dispatch(ServerCommandHandler)] #[derive(Debug, PartialEq, EnumString)] pub enum ServerCommand { $( - $variant($ty), + $unauth_variant($unauth_ty), + )* + $( + $auth_variant($auth_ty), )* } impl ServerCommand { - /// Constructs a `ServerCommand` from its numeric code and payload. + pub fn code(&self) -> u32 { + match self { + $( + ServerCommand::$unauth_variant(_) => $unauth_code, + )* + $( + ServerCommand::$auth_variant(_) => $auth_code, + )* + } + } + pub fn from_code_and_payload(code: u32, payload: Bytes) -> Result<Self, IggyError> { match code { $( - $code => Ok(ServerCommand::$variant( - <$ty>::from_bytes(payload)? + $unauth_code => Ok(ServerCommand::$unauth_variant( + <$unauth_ty>::from_bytes(payload)? + )), + )* + $( + $auth_code => Ok(ServerCommand::$auth_variant( + <$auth_ty>::from_bytes(payload)? )), )* _ => { @@ -56,7 +73,6 @@ macro_rules! define_server_command_enum { } } - /// Constructs a ServerCommand from its numeric code by reading from the provided async reader. pub async fn from_code_and_reader( code: u32, sender: &mut SenderKind, @@ -64,28 +80,72 @@ macro_rules! define_server_command_enum { ) -> Result<Self, IggyError> { match code { $( - $code => Ok(ServerCommand::$variant( - <$ty as BinaryServerCommand>::from_sender(sender, code, length).await? + $unauth_code => Ok(ServerCommand::$unauth_variant( + <$unauth_ty as BinaryServerCommand>::from_sender(sender, code, length).await? + )), + )* + $( + $auth_code => Ok(ServerCommand::$auth_variant( + <$auth_ty as BinaryServerCommand>::from_sender(sender, code, length).await? )), )* _ => Err(IggyError::InvalidCommand), } } - /// Converts the command into raw bytes. pub fn to_bytes(&self) -> Bytes { match self { $( - ServerCommand::$variant(payload) => as_bytes(payload), + ServerCommand::$unauth_variant(payload) => as_bytes(payload), + )* + $( + ServerCommand::$auth_variant(payload) => as_bytes(payload), )* } } - /// Validate the command by delegating to the inner command’s implementation. pub fn validate(&self) -> Result<(), IggyError> { match self { $( - ServerCommand::$variant(cmd) => <$ty as iggy_common::Validatable<iggy_common::IggyError>>::validate(cmd), + ServerCommand::$unauth_variant(cmd) => <$unauth_ty as iggy_common::Validatable<iggy_common::IggyError>>::validate(cmd), + )* + $( + ServerCommand::$auth_variant(cmd) => <$auth_ty as iggy_common::Validatable<iggy_common::IggyError>>::validate(cmd), + )* + } + } + + pub async fn dispatch( + self, + sender: &mut SenderKind, + length: u32, + session: &Session, + shard: &Rc<IggyShard>, + ) -> Result<HandlerResult, IggyError> { + match self { + $( + ServerCommand::$unauth_variant(cmd) => { + <$unauth_ty as UnauthenticatedHandler>::handle( + cmd, + sender, + length, + session, + shard, + ).await + } + )* + $( + ServerCommand::$auth_variant(cmd) => { + let auth = shard.auth(session)?; + <$auth_ty as AuthenticatedHandler>::handle( + cmd, + sender, + length, + auth, + session, + shard, + ).await + } )* } } @@ -96,13 +156,20 @@ macro_rules! define_server_command_enum { fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { $( - // If $show_payload is true, we display the command name + payload; - // otherwise, we just display the command name. - ServerCommand::$variant(payload) => { - if $show_payload { - write!(formatter, "{}|{payload:?}", $display_str) + ServerCommand::$unauth_variant(payload) => { + if $unauth_show_payload { + write!(formatter, "{}|{payload:?}", $unauth_display) + } else { + write!(formatter, "{}", $unauth_display) + } + }, + )* + $( + ServerCommand::$auth_variant(payload) => { + if $auth_show_payload { + write!(formatter, "{}|{payload:?}", $auth_display) } else { - write!(formatter, "{}", $display_str) + write!(formatter, "{}", $auth_display) } }, )* @@ -110,5 +177,4 @@ macro_rules! define_server_command_enum { } } }; - } diff --git a/core/server/src/quic/listener.rs b/core/server/src/quic/listener.rs index 67a2d4937..78a5331de 100644 --- a/core/server/src/quic/listener.rs +++ b/core/server/src/quic/listener.rs @@ -16,7 +16,7 @@ * under the License. */ -use crate::binary::command::{ServerCommand, ServerCommandHandler}; +use crate::binary::command::ServerCommand; use crate::server_error::ConnectionError; use crate::shard::IggyShard; use crate::shard::task_registry::ShutdownToken; @@ -187,7 +187,7 @@ async fn handle_stream( trace!("Received a QUIC command: {command}, payload size: {length}"); - match command.handle(&mut sender, length, session, &shard).await { + match command.dispatch(&mut sender, length, session, &shard).await { Ok(_) => { trace!( "Command was handled successfully, session: {:?}. QUIC response was sent.", diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index 8939e87b0..a1d86b2ec 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -295,4 +295,18 @@ impl IggyShard { Err(IggyError::Unauthenticated) } } + + pub fn auth(&self, session: &Session) -> Result<crate::streaming::auth::Auth, IggyError> { + if !session.is_active() { + error!("{COMPONENT} - session is inactive, session: {session}"); + return Err(IggyError::StaleClient); + } + + if !session.is_authenticated() { + error!("{COMPONENT} - unauthenticated access attempt, session: {session}"); + return Err(IggyError::Unauthenticated); + } + + Ok(crate::streaming::auth::Auth::new(session.get_user_id())) + } } diff --git a/core/server/src/streaming/auth.rs b/core/server/src/streaming/auth.rs new file mode 100644 index 000000000..30e360543 --- /dev/null +++ b/core/server/src/streaming/auth.rs @@ -0,0 +1,80 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//! Type-safe authentication proof system using proof-carrying code pattern. +//! +//! The [`Auth`] type can only be constructed via [`IggyShard::auth()`], +//! ensuring any code holding an `Auth` value has passed authentication. + +use iggy_common::UserId; + +/// Proof of successful authentication. +/// +/// # Invariants +/// - `user_id` is always valid (never `u32::MAX`) +/// - User was authenticated at construction time +#[derive(Clone, Copy, Debug)] +pub struct Auth { + user_id: UserId, + // Private field prevents external construction - only `Auth::new()` can create this. + _sealed: (), +} + +impl Auth { + /// Only call after verifying user is authenticated. + #[inline] + pub(crate) fn new(user_id: UserId) -> Self { + debug_assert!(user_id != u32::MAX, "Auth created with invalid user_id"); + Self { + user_id, + _sealed: (), + } + } + + #[inline] + pub fn user_id(&self) -> UserId { + self.user_id + } +} + +/// Sealed marker trait for authentication requirements. +pub trait AuthRequirement: private::Sealed { + const NAME: &'static str; +} + +/// Command requires authentication - handler receives [`Auth`] proof token. +pub struct Authenticated; + +impl AuthRequirement for Authenticated { + const NAME: &'static str = "Authenticated"; +} + +impl private::Sealed for Authenticated {} + +/// Command does NOT require authentication (Ping, LoginUser, LoginWithPersonalAccessToken). +pub struct Unauthenticated; + +impl AuthRequirement for Unauthenticated { + const NAME: &'static str = "Unauthenticated"; +} + +impl private::Sealed for Unauthenticated {} + +mod private { + pub trait Sealed {} +} diff --git a/core/server/src/streaming/mod.rs b/core/server/src/streaming/mod.rs index a6141bd8b..d38d6d199 100644 --- a/core/server/src/streaming/mod.rs +++ b/core/server/src/streaming/mod.rs @@ -16,6 +16,7 @@ * under the License. */ +pub mod auth; pub mod clients; pub mod deduplication; pub mod diagnostics; diff --git a/core/server/src/tcp/connection_handler.rs b/core/server/src/tcp/connection_handler.rs index d3a58db57..38819d08c 100644 --- a/core/server/src/tcp/connection_handler.rs +++ b/core/server/src/tcp/connection_handler.rs @@ -17,7 +17,6 @@ */ use crate::binary::command; -use crate::binary::command::ServerCommandHandler; use crate::server_error::ConnectionError; use crate::shard::IggyShard; use crate::streaming::session::Session; @@ -91,7 +90,7 @@ pub(crate) async fn handle_connection( let command = ServerCommand::from_code_and_reader(code, sender, length - 4).await?; debug!("Received a TCP command: {command}, payload size: {length}"); let cmd_code = command.code(); - match command.handle(sender, length, session, shard).await { + match command.dispatch(sender, length, session, shard).await { Ok(handler_result) => match handler_result { command::HandlerResult::Finished => { debug!( diff --git a/core/server/src/websocket/connection_handler.rs b/core/server/src/websocket/connection_handler.rs index 2ee97196f..f23e0911a 100644 --- a/core/server/src/websocket/connection_handler.rs +++ b/core/server/src/websocket/connection_handler.rs @@ -17,7 +17,6 @@ */ use crate::binary::command; -use crate::binary::command::ServerCommandHandler; use crate::server_error::ConnectionError; use crate::shard::IggyShard; use crate::streaming::session::Session; @@ -81,7 +80,7 @@ pub(crate) async fn handle_connection( let command = ServerCommand::from_code_and_reader(code, sender, length - 4).await?; debug!("Received a WebSocket command: {command}, payload size: {length}"); - match command.handle(sender, length, session, shard).await { + match command.dispatch(sender, length, session, shard).await { Ok(_) => { debug!( "Command was handled successfully, session: {session}. WebSocket response was sent."
