This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch rebase_master in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 2ace7e8e53baf35ffaa1349bf56e07f7d0f0d9ff Author: numinex <[email protected]> AuthorDate: Thu Jun 26 16:34:52 2025 +0200 fix iggy shard errors --- core/common/src/locking/mod.rs | 4 +- core/common/src/locking/tokio_lock.rs | 3 +- core/server/src/binary/command.rs | 1 - .../create_consumer_group_handler.rs | 13 +- core/server/src/binary/handlers/utils.rs | 15 +- core/server/src/binary/mapper.rs | 14 +- core/server/src/http/consumer_groups.rs | 6 +- core/server/src/http/jwt/cleaner.rs | 8 +- core/server/src/http/jwt/jwt_manager.rs | 6 +- core/server/src/http/mapper.rs | 14 +- core/server/src/http/messages.rs | 2 +- core/server/src/http/mod.rs | 2 + core/server/src/http/shared.rs | 5 +- core/server/src/http/system.rs | 2 +- core/server/src/shard/mod.rs | 2 +- core/server/src/shard/system/clients.rs | 35 ++--- core/server/src/shard/system/consumer_groups.rs | 139 +++++++++++------- core/server/src/shard/system/consumer_offsets.rs | 24 +++- core/server/src/shard/system/messages.rs | 34 +++-- core/server/src/shard/system/partitions.rs | 48 +++++-- .../src/shard/system/personal_access_tokens.rs | 14 +- core/server/src/shard/system/segments.rs | 40 +++++- core/server/src/shard/system/snapshot/mod.rs | 62 ++++++--- core/server/src/shard/system/stats.rs | 8 +- core/server/src/shard/system/storage.rs | 1 + core/server/src/shard/system/streams.rs | 155 +++++++++++++-------- core/server/src/shard/system/topics.rs | 122 ++++++++-------- core/server/src/shard/system/users.rs | 154 ++++++++++++-------- .../src/streaming/partitions/consumer_offsets.rs | 5 +- core/server/src/streaming/storage.rs | 29 ++-- core/server/src/streaming/topics/consumer_group.rs | 5 +- .../server/src/streaming/topics/consumer_groups.rs | 22 +-- core/server/src/streaming/topics/topic.rs | 4 +- core/server/src/streaming/users/user.rs | 2 +- 34 files changed, 595 insertions(+), 405 deletions(-) diff --git a/core/common/src/locking/mod.rs b/core/common/src/locking/mod.rs index c2e542c7..429b4f59 100644 --- a/core/common/src/locking/mod.rs +++ b/core/common/src/locking/mod.rs @@ -37,11 +37,11 @@ pub type IggyRwLock<T> = fast_async_lock::IggyFastAsyncRwLock<T>; #[allow(async_fn_in_trait)] pub trait IggySharedMutFn<T> { - type ReadGuard<'a>: Deref<Target = T> + type ReadGuard<'a>: Deref<Target = T> where T: 'a, Self: 'a; - type WriteGuard<'a>: DerefMut<Target = T> + type WriteGuard<'a>: DerefMut<Target = T> where T: 'a, Self: 'a; diff --git a/core/common/src/locking/tokio_lock.rs b/core/common/src/locking/tokio_lock.rs index ef5baaad..d6a96f06 100644 --- a/core/common/src/locking/tokio_lock.rs +++ b/core/common/src/locking/tokio_lock.rs @@ -24,8 +24,7 @@ use tokio::sync::{RwLock as TokioRwLock, RwLockReadGuard, RwLockWriteGuard}; #[derive(Debug)] pub struct IggyTokioRwLock<T>(Arc<TokioRwLock<T>>); -impl<T> IggySharedMutFn<T> for IggyTokioRwLock<T> -{ +impl<T> IggySharedMutFn<T> for IggyTokioRwLock<T> { type ReadGuard<'a> = RwLockReadGuard<'a, T> where diff --git a/core/server/src/binary/command.rs b/core/server/src/binary/command.rs index aa935e09..1ebac395 100644 --- a/core/server/src/binary/command.rs +++ b/core/server/src/binary/command.rs @@ -22,7 +22,6 @@ use crate::binary::sender::SenderKind; use crate::define_server_command_enum; use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use bytes::{BufMut, Bytes, BytesMut}; use enum_dispatch::enum_dispatch; use iggy_common::change_password::ChangePassword; diff --git a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs index 3d87fa17..5d695687 100644 --- a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs +++ b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs @@ -16,14 +16,16 @@ * under the License. */ +use std::rc::Rc; + use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::binary::{handlers::consumer_groups::COMPONENT, sender::SenderKind}; +use crate::shard::IggyShard; use crate::state::command::EntryCommand; use crate::state::models::CreateConsumerGroupWithId; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use error_set::ErrContext; use iggy_common::IggyError; @@ -39,14 +41,13 @@ impl ServerCommandHandler for CreateConsumerGroup { async fn handle( self, sender: &mut SenderKind, - _length: u32, - session: &Session, - system: &SharedSystem, + length: u32, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let mut system = system.write().await; - let consumer_group = system + let consumer_group = shard .create_consumer_group( session, &self.stream_id, diff --git a/core/server/src/binary/handlers/utils.rs b/core/server/src/binary/handlers/utils.rs index 351f6297..80bcede4 100644 --- a/core/server/src/binary/handlers/utils.rs +++ b/core/server/src/binary/handlers/utils.rs @@ -26,12 +26,17 @@ pub async fn receive_and_validate( length: u32, ) -> Result<ServerCommand, IggyError> { let mut buffer = BytesMut::with_capacity(length as usize); - if length > 0 { - unsafe { - buffer.set_len(length as usize); - } - sender.read(&mut buffer).await?; + unsafe { + buffer.set_len(length as usize); } + let buffer = if length == 0 { + buffer + } else { + let (result, buffer) = sender.read(buffer).await; + result?; + buffer + }; + let command = ServerCommand::from_code_and_payload(code, buffer.freeze())?; command.validate()?; Ok(command) diff --git a/core/server/src/binary/mapper.rs b/core/server/src/binary/mapper.rs index 61c65235..219ceed9 100644 --- a/core/server/src/binary/mapper.rs +++ b/core/server/src/binary/mapper.rs @@ -24,9 +24,8 @@ use crate::streaming::topics::consumer_group::ConsumerGroup; use crate::streaming::topics::topic::Topic; use crate::streaming::users::user::User; use bytes::{BufMut, Bytes, BytesMut}; -use iggy_common::locking::{IggySharedMut, IggySharedMutFn}; +use iggy_common::locking::IggySharedMutFn; use iggy_common::{BytesSerializable, ConsumerOffsetInfo, Sizeable, Stats, UserId}; -use tokio::sync::RwLock; pub fn map_stats(stats: &Stats) -> Bytes { let mut bytes = BytesMut::with_capacity(104); @@ -95,11 +94,10 @@ pub fn map_client(client: &Client) -> Bytes { bytes.freeze() } -pub async fn map_clients(clients: &[IggySharedMut<Client>]) -> Bytes { +pub async fn map_clients(clients: &[Client]) -> Bytes { let mut bytes = BytesMut::new(); for client in clients { - let client = client.read().await; - extend_client(&client, &mut bytes); + extend_client(client, &mut bytes); } bytes.freeze() } @@ -183,12 +181,11 @@ pub async fn map_topic(topic: &Topic) -> Bytes { bytes.freeze() } -pub async fn map_consumer_group(consumer_group: &ConsumerGroup) -> Bytes { +pub fn map_consumer_group(consumer_group: &ConsumerGroup) -> Bytes { let mut bytes = BytesMut::new(); extend_consumer_group(consumer_group, &mut bytes); let members = consumer_group.get_members(); for member in members { - let member = member.read().await; bytes.put_u32_le(member.id); let partitions = member.get_partitions(); bytes.put_u32_le(partitions.len() as u32); @@ -199,10 +196,9 @@ pub async fn map_consumer_group(consumer_group: &ConsumerGroup) -> Bytes { bytes.freeze() } -pub async fn map_consumer_groups(consumer_groups: &[&RwLock<ConsumerGroup>]) -> Bytes { +pub fn map_consumer_groups(consumer_groups: &[ConsumerGroup]) -> Bytes { let mut bytes = BytesMut::new(); for consumer_group in consumer_groups { - let consumer_group = consumer_group.read().await; extend_consumer_group(&consumer_group, &mut bytes); } bytes.freeze() diff --git a/core/server/src/http/consumer_groups.rs b/core/server/src/http/consumer_groups.rs index dd48c8c2..d58faef3 100644 --- a/core/server/src/http/consumer_groups.rs +++ b/core/server/src/http/consumer_groups.rs @@ -72,7 +72,7 @@ async fn get_consumer_group( }; let consumer_group = consumer_group.read().await; - let consumer_group = mapper::map_consumer_group(&consumer_group).await; + let consumer_group = mapper::map_consumer_group(&consumer_group); Ok(Json(consumer_group)) } @@ -89,7 +89,7 @@ async fn get_consumer_groups( &stream_id, &topic_id, )?; - let consumer_groups = mapper::map_consumer_groups(&consumer_groups).await; + let consumer_groups = mapper::map_consumer_groups(&consumer_groups); Ok(Json(consumer_groups)) } @@ -116,7 +116,7 @@ async fn create_consumer_group( .with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to create consumer group, stream ID: {}, topic ID: {}, group ID: {:?}", stream_id, topic_id, command.group_id))?; let consumer_group = consumer_group.read().await; let group_id = consumer_group.group_id; - let consumer_group_details = mapper::map_consumer_group(&consumer_group).await; + let consumer_group_details = mapper::map_consumer_group(&consumer_group); drop(consumer_group); let system = system.downgrade(); diff --git a/core/server/src/http/jwt/cleaner.rs b/core/server/src/http/jwt/cleaner.rs index 40003eca..6d4329e7 100644 --- a/core/server/src/http/jwt/cleaner.rs +++ b/core/server/src/http/jwt/cleaner.rs @@ -19,12 +19,12 @@ use crate::http::shared::AppState; use iggy_common::IggyTimestamp; use std::sync::Arc; -use std::time::Duration; +use monoio::time::Duration; use tracing::{error, trace}; pub fn start_expired_tokens_cleaner(app_state: Arc<AppState>) { - tokio::spawn(async move { - let mut interval_timer = tokio::time::interval(Duration::from_secs(300)); + monoio::spawn(async move { + let mut interval_timer = monoio::time::interval(Duration::from_secs(300)); loop { interval_timer.tick().await; trace!("Deleting expired tokens..."); @@ -34,7 +34,7 @@ pub fn start_expired_tokens_cleaner(app_state: Arc<AppState>) { .delete_expired_revoked_tokens(now) .await .unwrap_or_else(|err| { - error!("Failed to delete expired revoked access tokens. Error: {err}",); + error!("Failed to delete expired revoked access tokens. Error: {err}"); }); } }); diff --git a/core/server/src/http/jwt/jwt_manager.rs b/core/server/src/http/jwt/jwt_manager.rs index 56e2ce9b..dfe0f66b 100644 --- a/core/server/src/http/jwt/jwt_manager.rs +++ b/core/server/src/http/jwt/jwt_manager.rs @@ -28,7 +28,7 @@ use iggy_common::IggyError; use iggy_common::IggyExpiry; use iggy_common::IggyTimestamp; use iggy_common::UserId; -use iggy_common::locking::IggySharedMut; +use iggy_common::locking::IggyRwLock; use iggy_common::locking::IggySharedMutFn; use jsonwebtoken::{Algorithm, DecodingKey, EncodingKey, Header, TokenData, Validation, encode}; use std::sync::Arc; @@ -54,7 +54,7 @@ pub struct JwtManager { issuer: IssuerOptions, validator: ValidatorOptions, tokens_storage: TokenStorage, - revoked_tokens: IggySharedMut<AHashMap<String, u64>>, + revoked_tokens: IggyRwLock<AHashMap<String, u64>>, validations: AHashMap<Algorithm, Validation>, } @@ -77,7 +77,7 @@ impl JwtManager { issuer, validator, tokens_storage: TokenStorage::new(persister, path), - revoked_tokens: IggySharedMut::new(AHashMap::new()), + revoked_tokens: IggyRwLock::new(AHashMap::new()), }) } diff --git a/core/server/src/http/mapper.rs b/core/server/src/http/mapper.rs index ed9ebba3..d839b9d9 100644 --- a/core/server/src/http/mapper.rs +++ b/core/server/src/http/mapper.rs @@ -28,12 +28,11 @@ use iggy_common::PersonalAccessTokenInfo; use iggy_common::Sizeable; use iggy_common::StreamDetails; use iggy_common::TopicDetails; -use iggy_common::locking::IggySharedMut; use iggy_common::locking::IggySharedMutFn; use iggy_common::{ConsumerGroupDetails, ConsumerGroupMember}; use iggy_common::{IdentityInfo, TokenInfo}; use iggy_common::{UserInfo, UserInfoDetails}; -use tokio::sync::RwLock; + pub fn map_stream(stream: &Stream) -> StreamDetails { let topics = map_topics(&stream.get_topics()); @@ -177,10 +176,9 @@ pub fn map_client(client: &Client) -> iggy_common::ClientInfoDetails { } } -pub async fn map_clients(clients: &[IggySharedMut<Client>]) -> Vec<iggy_common::ClientInfo> { +pub fn map_clients(clients: &[Client]) -> Vec<iggy_common::ClientInfo> { let mut all_clients = Vec::new(); for client in clients { - let client = client.read().await; let client = iggy_common::ClientInfo { client_id: client.session.client_id, user_id: client.user_id, @@ -195,12 +193,11 @@ pub async fn map_clients(clients: &[IggySharedMut<Client>]) -> Vec<iggy_common:: all_clients } -pub async fn map_consumer_groups( - consumer_groups: &[&RwLock<ConsumerGroup>], +pub fn map_consumer_groups( + consumer_groups: &[ConsumerGroup], ) -> Vec<iggy_common::ConsumerGroup> { let mut groups = Vec::new(); for consumer_group in consumer_groups { - let consumer_group = consumer_group.read().await; let consumer_group = iggy_common::ConsumerGroup { id: consumer_group.group_id, name: consumer_group.name.clone(), @@ -213,7 +210,7 @@ pub async fn map_consumer_groups( groups } -pub async fn map_consumer_group(consumer_group: &ConsumerGroup) -> ConsumerGroupDetails { +pub fn map_consumer_group(consumer_group: &ConsumerGroup) -> ConsumerGroupDetails { let mut consumer_group_details = ConsumerGroupDetails { id: consumer_group.group_id, name: consumer_group.name.clone(), @@ -223,7 +220,6 @@ pub async fn map_consumer_group(consumer_group: &ConsumerGroup) -> ConsumerGroup }; let members = consumer_group.get_members(); for member in members { - let member = member.read().await; let partitions = member.get_partitions(); consumer_group_details.members.push(ConsumerGroupMember { id: member.id, diff --git a/core/server/src/http/messages.rs b/core/server/src/http/messages.rs index 251c503b..e289adb9 100644 --- a/core/server/src/http/messages.rs +++ b/core/server/src/http/messages.rs @@ -20,9 +20,9 @@ use crate::http::COMPONENT; use crate::http::error::CustomError; use crate::http::jwt::json_web_token::Identity; use crate::http::shared::AppState; +use crate::shard::system::messages::PollingArgs; use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut}; use crate::streaming::session::Session; -use crate::streaming::systems::messages::PollingArgs; use crate::streaming::utils::PooledBuffer; use axum::extract::{Path, Query, State}; use axum::http::StatusCode; diff --git a/core/server/src/http/mod.rs b/core/server/src/http/mod.rs index 194ed74a..7b24bfff 100644 --- a/core/server/src/http/mod.rs +++ b/core/server/src/http/mod.rs @@ -16,6 +16,7 @@ * under the License. */ +/* pub mod consumer_groups; pub mod consumer_offsets; pub mod diagnostics; @@ -32,5 +33,6 @@ pub mod streams; pub mod system; pub mod topics; pub mod users; +*/ pub const COMPONENT: &str = "HTTP"; diff --git a/core/server/src/http/shared.rs b/core/server/src/http/shared.rs index 50703357..a21ddb1c 100644 --- a/core/server/src/http/shared.rs +++ b/core/server/src/http/shared.rs @@ -16,14 +16,13 @@ * under the License. */ -use crate::http::jwt::jwt_manager::JwtManager; -use crate::streaming::systems::system::SharedSystem; +use crate::{http::jwt::jwt_manager::JwtManager, shard::IggyShard}; use std::net::SocketAddr; use ulid::Ulid; pub struct AppState { pub jwt_manager: JwtManager, - pub system: SharedSystem, + pub shard: IggyShard, } #[derive(Debug, Copy, Clone)] diff --git a/core/server/src/http/system.rs b/core/server/src/http/system.rs index 1c7166e3..4428d36c 100644 --- a/core/server/src/http/system.rs +++ b/core/server/src/http/system.rs @@ -114,7 +114,7 @@ async fn get_clients( identity.user_id ) })?; - let clients = mapper::map_clients(&clients).await; + let clients = mapper::map_clients(&clients); Ok(Json(clients)) } diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index 12ba64bb..b5e48525 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -461,6 +461,6 @@ impl IggyShard { Err(IggyError::Unauthenticated) } })?; - Ok(user_id) + Ok(user_id) } } diff --git a/core/server/src/shard/system/clients.rs b/core/server/src/shard/system/clients.rs index a2debc42..ee9e1634 100644 --- a/core/server/src/shard/system/clients.rs +++ b/core/server/src/shard/system/clients.rs @@ -16,17 +16,16 @@ * under the License. */ +use super::COMPONENT; use crate::shard::IggyShard; use crate::streaming::clients::client_manager::{Client, Transport}; use crate::streaming::session::Session; use error_set::ErrContext; use iggy_common::Identifier; use iggy_common::IggyError; -use iggy_common::locking::IggyRwLock; use iggy_common::locking::IggySharedMutFn; use std::net::SocketAddr; use std::rc::Rc; -use std::sync::Arc; use tracing::{error, info}; impl IggyShard { @@ -43,7 +42,7 @@ impl IggyShard { { let mut client_manager = self.client_manager.borrow_mut(); - let client = client_manager.delete_client(client_id).await; + let client = client_manager.delete_client(client_id); if client.is_none() { error!("Client with ID: {client_id} was not found in the client manager.",); return; @@ -51,7 +50,6 @@ impl IggyShard { self.metrics.decrement_clients(1); let client = client.unwrap(); - let client = client.read().await; consumer_groups = client .consumer_groups .iter() @@ -65,14 +63,12 @@ impl IggyShard { } for (stream_id, topic_id, consumer_group_id) in consumer_groups.into_iter() { - _ = self - .leave_consumer_group_by_client( - &Identifier::numeric(stream_id).unwrap(), - &Identifier::numeric(topic_id).unwrap(), - &Identifier::numeric(consumer_group_id).unwrap(), - client_id, - ) - .await + _ = self.leave_consumer_group_by_client( + &Identifier::numeric(stream_id).unwrap(), + &Identifier::numeric(topic_id).unwrap(), + &Identifier::numeric(consumer_group_id).unwrap(), + client_id, + ) } } @@ -80,9 +76,10 @@ impl IggyShard { &self, session: &Session, client_id: u32, - ) -> Result<Option<IggySharedMut<Client>>, IggyError> { + ) -> Result<Option<Client>, IggyError> { self.ensure_authenticated(session)?; self.permissioner + .borrow() .get_client(session.get_user_id()) .with_error_context(|error| { format!( @@ -91,16 +88,13 @@ impl IggyShard { ) })?; - let client_manager = self.client_manager.read().await; - Ok(client_manager.try_get_client(client_id)) + Ok(self.client_manager.borrow().try_get_client(client_id)) } - pub async fn get_clients( - &self, - session: &Session, - ) -> Result<Vec<IggySharedMut<Client>>, IggyError> { + pub async fn get_clients(&self, session: &Session) -> Result<Vec<Client>, IggyError> { self.ensure_authenticated(session)?; self.permissioner + .borrow() .get_clients(session.get_user_id()) .with_error_context(|error| { format!( @@ -109,7 +103,6 @@ impl IggyShard { ) })?; - let client_manager = self.client_manager.read().await; - Ok(client_manager.get_clients()) + Ok(self.client_manager.borrow().get_clients()) } } diff --git a/core/server/src/shard/system/consumer_groups.rs b/core/server/src/shard/system/consumer_groups.rs index d8d8818e..768d4e5c 100644 --- a/core/server/src/shard/system/consumer_groups.rs +++ b/core/server/src/shard/system/consumer_groups.rs @@ -16,29 +16,39 @@ * under the License. */ +use std::cell::Ref; + +use super::COMPONENT; use crate::shard::IggyShard; use crate::streaming::session::Session; +use crate::streaming::streams::stream::Stream; use crate::streaming::topics::consumer_group::ConsumerGroup; use error_set::ErrContext; use iggy_common::Identifier; use iggy_common::IggyError; use iggy_common::locking::IggySharedMutFn; -use tokio::sync::RwLock; impl IggyShard { - pub fn get_consumer_group( + pub fn get_consumer_group<'cg, 'stream>( &self, session: &Session, - stream_id: &Identifier, + stream: &'stream Stream, topic_id: &Identifier, group_id: &Identifier, - ) -> Result<Option<&RwLock<ConsumerGroup>>, IggyError> { + ) -> Result<Option<Ref<'cg, ConsumerGroup>>, IggyError> + where + 'stream: 'cg, + { self.ensure_authenticated(session)?; - let Some(topic) = self.try_find_topic(session, stream_id, topic_id)? else { - return Ok(None); - }; + let stream_id = stream.stream_id; + let topic = stream.get_topic(topic_id).with_error_context(|error| { + format!( + "{COMPONENT} (error: {error}) - topic with ID: {topic_id} was not found in stream with ID: {stream_id}", + ) + })?; self.permissioner + .borrow() .get_consumer_group(session.get_user_id(), topic.stream_id, topic.topic_id) .with_error_context(|error| { format!( @@ -55,12 +65,16 @@ impl IggyShard { session: &Session, stream_id: &Identifier, topic_id: &Identifier, - ) -> Result<Vec<&RwLock<ConsumerGroup>>, IggyError> { + ) -> Result<Vec<ConsumerGroup>, IggyError> { self.ensure_authenticated(session)?; - let topic = self.find_topic(session, stream_id, topic_id) + let stream = self.get_stream(stream_id).with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - stream with ID: {stream_id} was not found") + })?; + let topic = self.find_topic(session, &stream, topic_id) .with_error_context(|error| format!("{COMPONENT} (error: {error}) - topic with ID: {topic_id} was not found in stream with ID: {stream_id}"))?; self.permissioner + .borrow() .get_consumer_groups(session.get_user_id(), topic.stream_id, topic.topic_id) .with_error_context(|error| { format!( @@ -72,40 +86,45 @@ impl IggyShard { Ok(topic.get_consumer_groups()) } - pub async fn create_consumer_group( - &mut self, + pub fn create_consumer_group( + &self, session: &Session, stream_id: &Identifier, topic_id: &Identifier, group_id: Option<u32>, name: &str, - ) -> Result<&RwLock<ConsumerGroup>, IggyError> { + ) -> Result<u32, IggyError> { self.ensure_authenticated(session)?; { - let topic = self.find_topic(session, stream_id, topic_id) + let stream = self.get_stream(stream_id).with_error_context(|error| { + format!( + "{COMPONENT} (error: {error}) - stream not found for stream ID: {stream_id}" + ) + })?; + let topic = self.find_topic(session, &stream, topic_id) .with_error_context(|error| format!("{COMPONENT} (error: {error}) - topic not found for stream ID: {stream_id}, topic_id: {topic_id}"))?; - self.permissioner.create_consumer_group( + self.permissioner.borrow().create_consumer_group( session.get_user_id(), topic.stream_id, topic.topic_id, ).with_error_context(|error| format!("{COMPONENT} (error: {error}) - permission denied to create consumer group for user {} on stream ID: {}, topic ID: {}", session.get_user_id(), topic.stream_id, topic.topic_id))?; } - let topic = self.get_stream_mut(stream_id)? - .get_topic_mut(topic_id) + let mut stream = self.get_stream_mut(stream_id) + .with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to get mutable reference to stream with ID: {stream_id}"))?; + let topic = stream.get_topic_mut(topic_id) .with_error_context(|error| format!("{COMPONENT} (error: {error}) - topic not found for stream ID: {stream_id}, topic_id: {topic_id}"))?; topic .create_consumer_group(group_id, name) - .await .with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to create consumer group with name: {name}") - }) + }).map(|cg| cg.group_id) } pub async fn delete_consumer_group( - &mut self, + &self, session: &Session, stream_id: &Identifier, topic_id: &Identifier, @@ -115,10 +134,15 @@ impl IggyShard { let stream_id_value; let topic_id_value; { - let topic = self.find_topic(session, stream_id, topic_id) + let stream = self.get_stream(stream_id).with_error_context(|error| { + format!( + "{COMPONENT} (error: {error}) - stream not found for stream ID: {stream_id}" + ) + })?; + let topic = self.find_topic(session, &stream, topic_id) .with_error_context(|error| format!("{COMPONENT} (error: {error}) - topic not found for stream ID: {stream_id}, topic_id: {topic_id}"))?; - self.permissioner.delete_consumer_group( + self.permissioner.borrow().delete_consumer_group( session.get_user_id(), topic.stream_id, topic.topic_id, @@ -130,7 +154,7 @@ impl IggyShard { let consumer_group; { - let stream = self.get_stream_mut(stream_id).with_error_context(|error| { + let mut stream = self.get_stream_mut(stream_id).with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - failed to get mutable reference to stream with id: {stream_id}" ) @@ -142,25 +166,20 @@ impl IggyShard { .with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to delete consumer group with ID: {consumer_group_id}"))? } - let client_manager = self.client_manager.read().await; - let consumer_group = consumer_group.read().await; for member in consumer_group.get_members() { - let member = member.read().await; - client_manager - .leave_consumer_group( + self.client_manager.borrow_mut().leave_consumer_group( member.id, stream_id_value, topic_id_value, consumer_group.group_id, ) - .await .with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to make client leave consumer group for client ID: {}, group ID: {}", member.id, consumer_group.group_id))?; } Ok(()) } - pub async fn join_consumer_group( + pub fn join_consumer_group( &self, session: &Session, stream_id: &Identifier, @@ -171,15 +190,20 @@ impl IggyShard { let stream_id_value; let topic_id_value; { + let stream = self.get_stream(stream_id).with_error_context(|error| { + format!( + "{COMPONENT} (error: {error}) - stream not found for stream ID: {stream_id}" + ) + })?; let topic = self - .find_topic(session, stream_id, topic_id) + .find_topic(session, &stream, topic_id) .with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - topic not found for stream ID: {stream_id}, topic_id: {topic_id}", ) })?; - self.permissioner.join_consumer_group( + self.permissioner.borrow().join_consumer_group( session.get_user_id(), topic.stream_id, topic.topic_id, @@ -191,7 +215,14 @@ impl IggyShard { let group_id; { - let topic = self.find_topic(session, stream_id, topic_id)?; + let mut stream = self.get_stream_mut(stream_id) + .with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to get mutable reference to stream with ID: {stream_id}") + })?; + let topic = stream.get_topic_mut(topic_id) + .with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to get mutable reference to topic with ID: {topic_id}") + })?; { let consumer_group = topic @@ -202,13 +233,11 @@ impl IggyShard { ) })?; - let consumer_group = consumer_group.read().await; group_id = consumer_group.group_id; } topic .join_consumer_group(consumer_group_id, session.client_id) - .await .with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - failed to join consumer group for group ID: {group_id}" @@ -216,10 +245,7 @@ impl IggyShard { })?; } - let client_manager = self.client_manager.read().await; - client_manager - .join_consumer_group(session.client_id, stream_id_value, topic_id_value, group_id) - .await + self.client_manager.borrow_mut().join_consumer_group(session.client_id, stream_id_value, topic_id_value, group_id) .with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - failed to make client join consumer group for client ID: {}", @@ -239,15 +265,23 @@ impl IggyShard { ) -> Result<(), IggyError> { self.ensure_authenticated(session)?; { + let stream = self.get_stream(stream_id).with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to get stream with ID: {stream_id}") + })?; let topic = self - .find_topic(session, stream_id, topic_id) + .find_topic(session, &stream, topic_id) .with_error_context(|error| { format!( +<<<<<<< HEAD "{COMPONENT} (error: {error}) - topic not found for stream ID: {stream_id:?}, topic_id: {topic_id:?}" +======= + "{COMPONENT} (error: {error}) - topic not found for stream ID: {:?}, topic_id: {:?}", + stream.stream_id, topic_id +>>>>>>> 48107890 (fix iggy shard errors) ) })?; - self.permissioner.leave_consumer_group( + self.permissioner.borrow().leave_consumer_group( session.get_user_id(), topic.stream_id, topic.topic_id, @@ -260,7 +294,6 @@ impl IggyShard { consumer_group_id, session.client_id, ) - .await .with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - failed to leave consumer group for client ID: {}", @@ -269,7 +302,7 @@ impl IggyShard { }) } - pub async fn leave_consumer_group_by_client( + pub fn leave_consumer_group_by_client( &self, stream_id: &Identifier, topic_id: &Identifier, @@ -281,15 +314,17 @@ impl IggyShard { let group_id; { - let stream = self.get_stream(stream_id).with_error_context(|error| { + let mut stream = self.get_stream_mut(stream_id).with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to get stream with ID: {stream_id}") })?; - let topic = stream.get_topic(topic_id) + let stream_id = stream.stream_id; + let topic = stream.get_topic_mut(topic_id) .with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - topic not found for stream ID: {stream_id}, topic_id: {topic_id}", ) })?; + let topic_id = topic.topic_id; { let consumer_group = topic .get_consumer_group(consumer_group_id) @@ -298,23 +333,23 @@ impl IggyShard { "{COMPONENT} (error: {error}) - consumer group not found for group_id: {consumer_group_id}", ) })?; - let consumer_group = consumer_group.read().await; group_id = consumer_group.group_id; } - stream_id_value = stream.stream_id; - topic_id_value = topic.topic_id; + stream_id_value = stream_id; + topic_id_value = topic_id; topic .leave_consumer_group(consumer_group_id, client_id) - .await .with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed leave consumer group, client ID {client_id}",) })?; } - let client_manager = self.client_manager.read().await; - client_manager - .leave_consumer_group(client_id, stream_id_value, topic_id_value, group_id) - .await + self.client_manager.borrow_mut().leave_consumer_group( + client_id, + stream_id_value, + topic_id_value, + group_id, + ) } } diff --git a/core/server/src/shard/system/consumer_offsets.rs b/core/server/src/shard/system/consumer_offsets.rs index 0c7438a6..e5504173 100644 --- a/core/server/src/shard/system/consumer_offsets.rs +++ b/core/server/src/shard/system/consumer_offsets.rs @@ -16,7 +16,8 @@ * under the License. */ -use crate::streaming::session::Session; +use super::COMPONENT; +use crate::{shard::IggyShard, streaming::session::Session}; use error_set::ErrContext; use iggy_common::{Consumer, ConsumerOffsetInfo, Identifier, IggyError}; @@ -31,9 +32,12 @@ impl IggyShard { offset: u64, ) -> Result<(), IggyError> { self.ensure_authenticated(session)?; - let topic = self.find_topic(session, stream_id, topic_id) + let stream = self.get_stream(stream_id).with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - stream with ID: {stream_id} was not found") + })?; + let topic = self.find_topic(session, &stream, topic_id) .with_error_context(|error| format!("{COMPONENT} (error: {error}) - topic with ID: {topic_id} was not found in stream with ID: {stream_id}"))?; - self.permissioner.store_consumer_offset( + self.permissioner.borrow().store_consumer_offset( session.get_user_id(), topic.stream_id, topic.topic_id, @@ -53,11 +57,14 @@ impl IggyShard { partition_id: Option<u32>, ) -> Result<Option<ConsumerOffsetInfo>, IggyError> { self.ensure_authenticated(session)?; - let Some(topic) = self.try_find_topic(session, stream_id, topic_id)? else { + let stream = self.get_stream(stream_id).with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - stream with ID: {stream_id} was not found") + })?; + let Some(topic) = self.try_find_topic(session, &stream, topic_id)? else { return Ok(None); }; - self.permissioner.get_consumer_offset( + self.permissioner.borrow().get_consumer_offset( session.get_user_id(), topic.stream_id, topic.topic_id, @@ -82,9 +89,12 @@ impl IggyShard { partition_id: Option<u32>, ) -> Result<(), IggyError> { self.ensure_authenticated(session)?; - let topic = self.find_topic(session, stream_id, topic_id) + let stream = self.get_stream(stream_id).with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - stream with ID: {stream_id} was not found") + })?; + let topic = self.find_topic(session, &stream, topic_id) .with_error_context(|error| format!("{COMPONENT} (error: {error}) - topic with ID: {topic_id} was not found in stream with ID: {stream_id}"))?; - self.permissioner.delete_consumer_offset( + self.permissioner.borrow().delete_consumer_offset( session.get_user_id(), topic.stream_id, topic.topic_id, diff --git a/core/server/src/shard/system/messages.rs b/core/server/src/shard/system/messages.rs index 54046dba..06499bd3 100644 --- a/core/server/src/shard/system/messages.rs +++ b/core/server/src/shard/system/messages.rs @@ -16,11 +16,13 @@ * under the License. */ +use super::COMPONENT; use crate::binary::handlers::messages::poll_messages_handler::IggyPollMetadata; use crate::shard::IggyShard; use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet}; use crate::streaming::session::Session; use crate::streaming::utils::PooledBuffer; +use async_zip::tokio::read::stream; use error_set::ErrContext; use iggy_common::{ BytesSerializable, Confirmation, Consumer, EncryptorKind, IGGY_MESSAGE_HEADER_SIZE, Identifier, @@ -43,8 +45,13 @@ impl IggyShard { return Err(IggyError::InvalidMessagesCount); } - let topic = self.find_topic(session, stream_id, topic_id).with_error_context(|error| format!("{COMPONENT} (error: {error}) - topic not found for stream ID: {stream_id}, topic_id: {topic_id}"))?; + let stream = self.get_stream(stream_id).with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - stream not found for stream ID: {stream_id}") + })?; + let stream_id = stream.stream_id; + let topic = self.find_topic(session, &stream, topic_id).with_error_context(|error| format!("{COMPONENT} (error: {error}) - topic not found for stream ID: {stream_id}, topic_id: {topic_id}"))?; self.permissioner + .borrow() .poll_messages(session.get_user_id(), topic.stream_id, topic.topic_id) .with_error_context(|error| format!( "{COMPONENT} (error: {error}) - permission denied to poll messages for user {} on stream ID: {}, topic ID: {}", @@ -60,7 +67,6 @@ impl IggyShard { // There might be no partition assigned, if it's the consumer group member without any partitions. let Some((polling_consumer, partition_id)) = topic .resolve_consumer_with_partition_id(consumer, session.client_id, partition_id, true) - .await .with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to resolve consumer with partition id, consumer: {consumer}, client ID: {}, partition ID: {:?}", session.client_id, partition_id))? else { return Ok((IggyPollMetadata::new(0, 0), IggyMessagesBatchSet::empty())); }; @@ -80,11 +86,15 @@ impl IggyShard { topic .store_consumer_offset_internal(polling_consumer, offset, partition_id) .await +<<<<<<< HEAD .with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to store consumer offset internal, polling consumer: {polling_consumer}, offset: {offset}, partition ID: {partition_id}")) ?; +======= + .with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to store consumer offset internal, polling consumer: {}, offset: {}, partition ID: {}", polling_consumer, offset, partition_id))?; +>>>>>>> 48107890 (fix iggy shard errors) } let batch_set = if let Some(encryptor) = &self.encryptor { - self.decrypt_messages(batch_set, encryptor.as_ref()).await? + self.decrypt_messages(batch_set, encryptor).await? } else { batch_set }; @@ -102,8 +112,12 @@ impl IggyShard { confirmation: Option<Confirmation>, ) -> Result<(), IggyError> { self.ensure_authenticated(session)?; - let topic = self.find_topic(session, stream_id, topic_id).with_error_context(|error| format!("{COMPONENT} (error: {error}) - topic not found for stream_id: {stream_id}, topic_id: {topic_id}"))?; - self.permissioner.append_messages( + let stream = self.get_stream(stream_id).with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - stream not found for stream_id: {stream_id}") + })?; + let stream_id = stream.stream_id; + let topic = self.find_topic(session, &stream, topic_id).with_error_context(|error| format!("{COMPONENT} (error: {error}) - topic not found for stream_id: {stream_id}, topic_id: {topic_id}"))?; + self.permissioner.borrow().append_messages( session.get_user_id(), topic.stream_id, topic.topic_id @@ -117,7 +131,7 @@ impl IggyShard { // Encrypt messages if encryptor is configured let messages = if let Some(encryptor) = &self.encryptor { - self.encrypt_messages(messages, encryptor.as_ref())? + self.encrypt_messages(messages, encryptor)? } else { messages }; @@ -139,8 +153,12 @@ impl IggyShard { fsync: bool, ) -> Result<(), IggyError> { self.ensure_authenticated(session)?; - let topic = self.find_topic(session, &stream_id, &topic_id).with_error_context(|error| format!("{COMPONENT} (error: {error}) - topic not found for stream ID: {stream_id}, topic_id: {topic_id}"))?; - self.permissioner.append_messages( + let stream = self.get_stream(&stream_id).with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - stream not found for stream ID: {stream_id}") + })?; + let stream_id = stream.stream_id; + let topic = self.find_topic(session, &stream, &topic_id).with_error_context(|error| format!("{COMPONENT} (error: {error}) - topic not found for stream ID: {stream_id}, topic ID: {topic_id}"))?; + self.permissioner.borrow().append_messages( session.get_user_id(), topic.stream_id, topic.topic_id diff --git a/core/server/src/shard/system/partitions.rs b/core/server/src/shard/system/partitions.rs index 7712f498..01e02968 100644 --- a/core/server/src/shard/system/partitions.rs +++ b/core/server/src/shard/system/partitions.rs @@ -16,16 +16,16 @@ * under the License. */ +use super::COMPONENT; +use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::COMPONENT; -use crate::streaming::systems::system::System; use error_set::ErrContext; use iggy_common::Identifier; use iggy_common::IggyError; impl IggyShard { pub async fn create_partitions( - &mut self, + &self, session: &Session, stream_id: &Identifier, topic_id: &Identifier, @@ -33,8 +33,13 @@ impl IggyShard { ) -> Result<(), IggyError> { self.ensure_authenticated(session)?; { - let topic = self.find_topic(session, stream_id, topic_id).with_error_context(|error| format!("{COMPONENT} (error: {error}) - topic not found for stream ID: {stream_id}, topic_id: {topic_id}"))?; - self.permissioner.create_partitions( + let stream = self.get_stream(stream_id).with_error_context(|error| { + format!( + "{COMPONENT} (error: {error}) - stream not found for stream ID: {stream_id}" + ) + })?; + let topic = self.find_topic(session, &stream, topic_id).with_error_context(|error| format!("{COMPONENT} (error: {error}) - topic not found for stream ID: {stream_id}, topic ID: {topic_id}"))?; + self.permissioner.borrow().create_partitions( session.get_user_id(), topic.stream_id, topic.topic_id, @@ -46,28 +51,33 @@ impl IggyShard { ))?; } - let topic = self - .get_stream_mut(stream_id)? + let mut stream = self.get_stream_mut(stream_id).with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to get stream with ID: {stream_id}") + })?; + let topic = stream .get_topic_mut(topic_id) .with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - failed to get mutable reference to stream with id: {stream_id}" ) })?; + + // TODO: Make add persisted partitions to topic sync, and extract the storage persister out of it + // perform disk i/o outside of the borrow_mut of the stream. topic .add_persisted_partitions(partitions_count) .await .with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to add persisted partitions, topic: {topic}") })?; - topic.reassign_consumer_groups().await; + topic.reassign_consumer_groups(); self.metrics.increment_partitions(partitions_count); self.metrics.increment_segments(partitions_count); Ok(()) } pub async fn delete_partitions( - &mut self, + &self, session: &Session, stream_id: &Identifier, topic_id: &Identifier, @@ -75,8 +85,13 @@ impl IggyShard { ) -> Result<(), IggyError> { self.ensure_authenticated(session)?; { - let topic = self.find_topic(session, stream_id, topic_id).with_error_context(|error| format!("{COMPONENT} (error: {error}) - topic not found for stream ID: {stream_id}, topic_id: {topic_id}"))?; - self.permissioner.delete_partitions( + let stream = self.get_stream(stream_id).with_error_context(|error| { + format!( + "{COMPONENT} (error: {error}) - stream not found for stream ID: {stream_id}" + ) + })?; + let topic = self.find_topic(session, &stream, topic_id).with_error_context(|error| format!("{COMPONENT} (error: {error}) - topic not found for stream ID: {stream_id}, topic_id: {topic_id}"))?; + self.permissioner.borrow().delete_partitions( session.get_user_id(), topic.stream_id, topic.topic_id, @@ -88,21 +103,26 @@ impl IggyShard { ))?; } - let topic = self - .get_stream_mut(stream_id)? + let mut stream = self.get_stream_mut(stream_id).with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to get stream with ID: {stream_id}") + })?; + let topic = stream .get_topic_mut(topic_id) .with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - failed to get mutable reference to stream with id: {stream_id}" ) })?; + + // TODO: Make delete persisted partitions from topic sync, and extract the storage persister out of it + // perform disk i/o outside of the borrow_mut of the stream. let partitions = topic .delete_persisted_partitions(partitions_count) .await .with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to delete persisted partitions for topic: {topic}") })?; - topic.reassign_consumer_groups().await; + topic.reassign_consumer_groups(); if let Some(partitions) = partitions { self.metrics.decrement_partitions(partitions_count); self.metrics.decrement_segments(partitions.segments_count); diff --git a/core/server/src/shard/system/personal_access_tokens.rs b/core/server/src/shard/system/personal_access_tokens.rs index b5452e9a..082fcd93 100644 --- a/core/server/src/shard/system/personal_access_tokens.rs +++ b/core/server/src/shard/system/personal_access_tokens.rs @@ -16,6 +16,8 @@ * under the License. */ +use super::COMPONENT; +use crate::shard::IggyShard; use crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken; use crate::streaming::session::Session; use crate::streaming::users::user::User; @@ -64,7 +66,7 @@ impl IggyShard { let user = self.get_user(&identifier).with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to get user with id: {user_id}") })?; - let max_token_per_user = self.personal_access_token.max_tokens_per_user; + let max_token_per_user = self.config.personal_access_token.max_tokens_per_user; if user.personal_access_tokens.len() as u32 >= max_token_per_user { error!( "User with ID: {user_id} has reached the maximum number of personal access tokens: {max_token_per_user}.", @@ -76,7 +78,7 @@ impl IggyShard { } } - let user = self.get_user(&identifier).with_error_context(|error| { + let user = self.get_user_mut(&identifier).with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to get mutable reference to the user with id: {user_id}") })?; @@ -102,7 +104,7 @@ impl IggyShard { } pub async fn delete_personal_access_token( - &mut self, + &self, session: &Session, name: &str, ) -> Result<(), IggyError> { @@ -137,10 +139,11 @@ impl IggyShard { &self, token: &str, session: Option<&Session>, - ) -> Result<&User, IggyError> { + ) -> Result<User, IggyError> { let token_hash = PersonalAccessToken::hash_token(token); + let users = self.users.borrow(); let mut personal_access_token = None; - for user in self.users.values() { + for user in users.values() { if let Some(pat) = user.personal_access_tokens.get(&token_hash) { personal_access_token = Some(pat); break; @@ -173,6 +176,5 @@ impl IggyShard { ) })?; self.login_user_with_credentials(&user.username, None, session) - .await } } diff --git a/core/server/src/shard/system/segments.rs b/core/server/src/shard/system/segments.rs index 27abb76b..6a8ad68e 100644 --- a/core/server/src/shard/system/segments.rs +++ b/core/server/src/shard/system/segments.rs @@ -16,6 +16,7 @@ use crate::shard::IggyShard; * specific language governing permissions and limitations * under the License. */ +use super::COMPONENT; use crate::streaming::session::Session; use error_set::ErrContext; use iggy_common::Identifier; @@ -24,7 +25,7 @@ use iggy_common::locking::IggySharedMutFn; impl IggyShard { pub async fn delete_segments( - &mut self, + &self, session: &Session, stream_id: &Identifier, topic_id: &Identifier, @@ -35,9 +36,14 @@ impl IggyShard { self.ensure_authenticated(session)?; { - let topic = self.find_topic(session, stream_id, topic_id).with_error_context(|error| format!("{COMPONENT} (error: {error}) - topic not found for stream_id: {stream_id}, topic_id: {topic_id}"))?; + let stream = self.get_stream(stream_id).with_error_context(|error| { + format!( + "{COMPONENT} (error: {error}) - stream not found for stream_id: {stream_id}" + ) + })?; + let topic = self.find_topic(session, &stream, topic_id).with_error_context(|error| format!("{COMPONENT} (error: {error}) - topic not found for stream_id: {stream_id}, topic_id: {topic_id}"))?; - self.permissioner.delete_segments( + self.permissioner.borrow().delete_segments( session.get_user_id(), topic.stream_id, topic.topic_id, @@ -49,9 +55,11 @@ impl IggyShard { ))?; } - let topic = self - .get_stream_mut(stream_id)? - .get_topic_mut(topic_id) + let stream = self.get_stream(stream_id).with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to get stream with ID: {stream_id}") + })?; + let topic = stream + .get_topic(topic_id) .with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - failed to get mutable reference to stream with ID: {stream_id}" @@ -95,7 +103,25 @@ impl IggyShard { (segments_count, messages_count) }; - topic.reassign_consumer_groups().await; + drop(partition); + drop(topic); + drop(stream); + + let mut stream = self + .get_stream_mut(stream_id) + .with_error_context(|error| { + format!( + "{COMPONENT} (error: {error}) - failed to get mutable reference to stream with ID: {stream_id}" + ) + })?; + let topic = stream + .get_topic_mut(topic_id) + .with_error_context(|error| { + format!( + "{COMPONENT} (error: {error}) - failed to get mutable reference to topic with ID: {topic_id} in stream with ID: {stream_id}" + ) + })?; + topic.reassign_consumer_groups(); self.metrics.decrement_segments(deleted_segments_count); self.metrics.decrement_messages(deleted_messages_count); diff --git a/core/server/src/shard/system/snapshot/mod.rs b/core/server/src/shard/system/snapshot/mod.rs index f6e07785..78c5d393 100644 --- a/core/server/src/shard/system/snapshot/mod.rs +++ b/core/server/src/shard/system/snapshot/mod.rs @@ -19,22 +19,23 @@ mod procdump; use crate::configs::system::SystemConfig; +use crate::shard::IggyShard; use crate::streaming::session::Session; use async_zip::tokio::write::ZipFileWriter; use async_zip::{Compression, ZipEntryBuilder}; use iggy_common::{IggyDuration, IggyError, Snapshot, SnapshotCompression, SystemSnapshotType}; +use monoio::fs::{File, OpenOptions}; use std::io::Cursor; use std::path::PathBuf; -use std::sync::Arc; +use std::rc::Rc; use std::time::Instant; use tempfile::NamedTempFile; -use tokio::fs::File; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::process::Command; use tokio_util::compat::TokioAsyncWriteCompatExt; use tracing::{error, info}; -impl System { +impl IggyShard { pub async fn get_snapshot( &self, session: &Session, @@ -69,18 +70,19 @@ impl System { let now = Instant::now(); for snapshot_type in snapshot_types { - match get_command_result(snapshot_type, self.config.clone()).await { + match get_command_result(snapshot_type, self.config.system.clone()).await { Ok(temp_file) => { let filename = format!("{snapshot_type}.txt"); let entry = ZipEntryBuilder::new(filename.clone().into(), compression); - let mut file = File::open(temp_file.path()).await.map_err(|e| { + let file = File::open(temp_file.path()).await.map_err(|e| { error!("Failed to open temporary file: {}", e); IggyError::SnapshotFileCompletionFailed })?; - let mut content = Vec::new(); - if let Err(e) = file.read_to_end(&mut content).await { + let content = Vec::new(); + let (result, content) = file.read_exact_at(content, 0).await; + if let Err(e) = result { error!("Failed to read temporary file: {}", e); continue; } @@ -130,9 +132,10 @@ async fn write_command_output_to_temp_file( ) -> Result<NamedTempFile, std::io::Error> { let output = command.output().await?; let temp_file = NamedTempFile::new()?; - let mut file = File::from_std(temp_file.as_file().try_clone()?); - file.write_all(&output.stdout).await?; - file.flush().await?; + let file = File::from_std(temp_file.as_file().try_clone()?).unwrap(); + let (result, _) = file.write_all_at(output.stdout, 0).await; + result?; + file.sync_all().await?; Ok(temp_file) } @@ -142,18 +145,35 @@ async fn get_filesystem_overview() -> Result<NamedTempFile, std::io::Error> { async fn get_process_info() -> Result<NamedTempFile, std::io::Error> { let temp_file = NamedTempFile::new()?; - let mut file = File::from_std(temp_file.as_file().try_clone()?); + let file = File::from_std(temp_file.as_file().try_clone()?).unwrap(); + let mut position = 0; let ps_output = Command::new("ps").arg("aux").output().await?; - file.write_all(b"=== Process List (ps aux) ===\n").await?; - file.write_all(&ps_output.stdout).await?; - file.write_all(b"\n\n").await?; + let (result, written) = file + .write_all_at(b"=== Process List (ps aux) ===\n", 0) + .await; + result?; + position += written.len() as u64; + + let (result, written) = file.write_all_at(ps_output.stdout, position).await; + result?; + position += written.len() as u64; + + let (result, written) = file.write_all_at(b"\n\n", position).await; + result?; + position += written.len() as u64; + + let (result, written) = file + .write_all_at(b"=== Detailed Process Information ===\n", position) + .await; + result?; + position += written.len() as u64; - file.write_all(b"=== Detailed Process Information ===\n") - .await?; let proc_info = procdump::get_proc_info().await?; - file.write_all(proc_info.as_bytes()).await?; - file.flush().await?; + let bytes = proc_info.as_bytes().to_owned(); + let (result, _) = file.write_all_at(bytes, position).await; + result?; + file.sync_all().await?; Ok(temp_file) } @@ -166,7 +186,7 @@ async fn get_test_snapshot() -> Result<NamedTempFile, std::io::Error> { write_command_output_to_temp_file(Command::new("echo").arg("test")).await } -async fn get_server_logs(config: Arc<SystemConfig>) -> Result<NamedTempFile, std::io::Error> { +async fn get_server_logs(config: Rc<SystemConfig>) -> Result<NamedTempFile, std::io::Error> { let base_directory = PathBuf::from(config.get_system_path()); let logs_subdirectory = PathBuf::from(&config.logging.path); let logs_path = base_directory.join(logs_subdirectory); @@ -179,7 +199,7 @@ async fn get_server_logs(config: Arc<SystemConfig>) -> Result<NamedTempFile, std write_command_output_to_temp_file(Command::new("sh").args(["-c", &list_and_cat])).await } -async fn get_server_config(config: Arc<SystemConfig>) -> Result<NamedTempFile, std::io::Error> { +async fn get_server_config(config: Rc<SystemConfig>) -> Result<NamedTempFile, std::io::Error> { let base_directory = PathBuf::from(config.get_system_path()); let config_path = base_directory.join("runtime").join("current_config.toml"); @@ -188,7 +208,7 @@ async fn get_server_config(config: Arc<SystemConfig>) -> Result<NamedTempFile, s async fn get_command_result( snapshot_type: &SystemSnapshotType, - config: Arc<SystemConfig>, + config: Rc<SystemConfig>, ) -> Result<NamedTempFile, std::io::Error> { match snapshot_type { SystemSnapshotType::FilesystemOverview => get_filesystem_overview().await, diff --git a/core/server/src/shard/system/stats.rs b/core/server/src/shard/system/stats.rs index 8410cac2..53e31322 100644 --- a/core/server/src/shard/system/stats.rs +++ b/core/server/src/shard/system/stats.rs @@ -16,8 +16,8 @@ * under the License. */ -use crate::shard::IggyShard; use crate::VERSION; +use crate::shard::IggyShard; use crate::versioning::SemanticVersion; use iggy_common::locking::IggySharedMutFn; use iggy_common::{IggyDuration, IggyError, Stats}; @@ -45,7 +45,7 @@ impl IggyShard { let total_cpu_usage = sys.global_cpu_usage(); let total_memory = sys.total_memory().into(); let available_memory = sys.available_memory().into(); - let clients_count = self.client_manager.read().await.get_clients().len() as u32; + let clients_count = self.client_manager.borrow().get_clients().len() as u32; let hostname = sysinfo::System::host_name().unwrap_or("unknown_hostname".to_string()); let os_name = sysinfo::System::name().unwrap_or("unknown_os_name".to_string()); let os_version = @@ -90,7 +90,7 @@ impl IggyShard { drop(sys); - for stream in self.streams.values() { + for stream in self.streams.borrow().values() { stats.messages_count += stream.get_messages_count(); stats.segments_count += stream.get_segments_count(); stats.messages_size_bytes += stream.get_size(); @@ -104,7 +104,7 @@ impl IggyShard { stats.consumer_groups_count += stream .topics .values() - .map(|t| t.consumer_groups.len() as u32) + .map(|t| t.consumer_groups.borrow().len() as u32) .sum::<u32>(); } diff --git a/core/server/src/shard/system/storage.rs b/core/server/src/shard/system/storage.rs index 4cb41120..cc413583 100644 --- a/core/server/src/shard/system/storage.rs +++ b/core/server/src/shard/system/storage.rs @@ -16,6 +16,7 @@ * under the License. */ +use super::COMPONENT; use crate::shard::system::info::SystemInfo; use crate::streaming::persistence::persister::PersisterKind; use crate::streaming::storage::SystemInfoStorage; diff --git a/core/server/src/shard/system/streams.rs b/core/server/src/shard/system/streams.rs index 8a0772a9..43b45246 100644 --- a/core/server/src/shard/system/streams.rs +++ b/core/server/src/shard/system/streams.rs @@ -16,24 +16,37 @@ * under the License. */ +use super::COMPONENT; use crate::shard::IggyShard; use crate::streaming::session::Session; use crate::streaming::streams::stream::Stream; use error_set::ErrContext; use futures::future::try_join_all; -use std::cell::RefCell; +use iggy_common::{IdKind, Identifier, IggyError}; +use std::cell::{Ref, RefCell, RefMut}; use std::sync::atomic::{AtomicU32, Ordering}; use tokio::fs; use tracing::{error, info, warn}; +static CURRENT_STREAM_ID: AtomicU32 = AtomicU32::new(1); + impl IggyShard { - pub fn get_streams(&self) -> Vec<&Stream> { - self.streams.values().collect() + pub fn get_streams(&self) -> Vec<Ref<'_, Stream>> { + let len = self.streams.borrow().len(); + let result = (0..len) + .map(|i| { + Ref::map(self.streams.borrow(), |streams| { + streams.values().nth(i).unwrap() + }) + }) + .collect(); + result } - pub fn find_streams(&self, session: &Session) -> Result<Vec<&Stream>, IggyError> { + pub fn find_streams(&self, session: &Session) -> Result<Vec<Ref<'_, Stream>>, IggyError> { self.ensure_authenticated(session)?; self.permissioner + .borrow() .get_streams(session.get_user_id()) .with_error_context(|error| { format!( @@ -44,15 +57,16 @@ impl IggyShard { Ok(self.get_streams()) } - pub fn find_stream( + pub fn find_stream<'a>( &self, session: &Session, identifier: &Identifier, - ) -> Result<&Stream, IggyError> { + ) -> Result<Ref<'_, Stream>, IggyError> { self.ensure_authenticated(session)?; let stream = self.get_stream(identifier); if let Ok(stream) = stream { self.permissioner + .borrow() .get_stream(session.get_user_id(), stream.stream_id) .with_error_context(|error| { format!( @@ -70,13 +84,14 @@ impl IggyShard { &self, session: &Session, identifier: &Identifier, - ) -> Result<Option<&Stream>, IggyError> { + ) -> Result<Option<Ref<'_, Stream>>, IggyError> { self.ensure_authenticated(session)?; let Some(stream) = self.try_get_stream(identifier)? else { return Ok(None); }; self.permissioner + .borrow() .get_stream(session.get_user_id(), stream.stream_id) .with_error_context(|error| { format!( @@ -87,83 +102,92 @@ impl IggyShard { Ok(Some(stream)) } - pub fn try_get_stream(&self, identifier: &Identifier) -> Result<Option<&Stream>, IggyError> { + pub fn try_get_stream( + &self, + identifier: &Identifier, + ) -> Result<Option<Ref<'_, Stream>>, IggyError> { match identifier.kind { - IdKind::Numeric => Ok(self.streams.get(&identifier.get_u32_value()?)), + IdKind::Numeric => self.get_stream_by_id(identifier.get_u32_value()?).map(Some), IdKind::String => Ok(self.try_get_stream_by_name(&identifier.get_cow_str_value()?)), } } - fn try_get_stream_by_name(&self, name: &str) -> Option<&Stream> { + fn try_get_stream_by_name(&self, name: &str) -> Option<Ref<'_, Stream>> { self.streams_ids + .borrow() .get(name) - .and_then(|id| self.streams.get(id)) + .and_then(|id| Some(self.get_stream_ref(*id))) } - pub fn get_stream(&self, identifier: &Identifier) -> Result<&Stream, IggyError> { + pub fn get_stream(&self, identifier: &Identifier) -> Result<Ref<'_, Stream>, IggyError> { match identifier.kind { IdKind::Numeric => self.get_stream_by_id(identifier.get_u32_value()?), IdKind::String => self.get_stream_by_name(&identifier.get_cow_str_value()?), } } - pub fn get_stream_mut(&mut self, identifier: &Identifier) -> Result<&mut Stream, IggyError> { + pub fn get_stream_mut(&self, identifier: &Identifier) -> Result<RefMut<'_, Stream>, IggyError> { match identifier.kind { IdKind::Numeric => self.get_stream_by_id_mut(identifier.get_u32_value()?), IdKind::String => self.get_stream_by_name_mut(&identifier.get_cow_str_value()?), } } - fn get_stream_by_name(&self, name: &str) -> Result<&Stream, IggyError> { - let stream_id = self.streams_ids.get(name); - if stream_id.is_none() { + fn get_stream_by_name(&self, name: &str) -> Result<Ref<'_, Stream>, IggyError> { + let exists = self.streams_ids.borrow().iter().any(|s| s.0 == name); + if !exists { return Err(IggyError::StreamNameNotFound(name.to_string())); } - - self.get_stream_by_id(*stream_id.unwrap()) + let stream_id = self.streams_ids.borrow().get(name).cloned().unwrap(); + self.get_stream_by_id(stream_id) } - fn get_stream_by_id(&self, stream_id: u32) -> Result<&Stream, IggyError> { - let stream = self.streams.get(&stream_id); - if stream.is_none() { + fn get_stream_by_id(&self, stream_id: u32) -> Result<Ref<'_, Stream>, IggyError> { + let exists = self.streams.borrow().iter().any(|s| s.0 == &stream_id); + if !exists { return Err(IggyError::StreamIdNotFound(stream_id)); } - - Ok(stream.unwrap()) + Ok(self.get_stream_ref(stream_id)) } - fn get_stream_by_name_mut(&mut self, name: &str) -> Result<&mut Stream, IggyError> { - let stream_id; - { - let id = self.streams_ids.get_mut(name); - if id.is_none() { - return Err(IggyError::StreamNameNotFound(name.to_string())); - } + fn get_stream_ref(&self, stream_id: u32) -> Ref<'_, Stream> { + Ref::map(self.streams.borrow(), |streams| { + streams.get(&stream_id).expect("Stream ID not found") + }) + } - stream_id = *id.unwrap(); + fn get_stream_by_name_mut(&self, name: &str) -> Result<RefMut<'_, Stream>, IggyError> { + let exists = self.streams_ids.borrow().iter().any(|s| s.0 == name); + if !exists { + return Err(IggyError::StreamNameAlreadyExists(name.to_owned())); } - - self.get_stream_by_id_mut(stream_id) + let streams_ids = self.streams_ids.borrow(); + let id = streams_ids.get(name).cloned(); + drop(streams_ids); + self.get_stream_by_id_mut(id.unwrap()) } - fn get_stream_by_id_mut(&mut self, stream_id: u32) -> Result<&mut Stream, IggyError> { - let stream = self.streams.get_mut(&stream_id); - if stream.is_none() { + fn get_stream_by_id_mut(&self, stream_id: u32) -> Result<RefMut<'_, Stream>, IggyError> { + let exists = self.streams.borrow().iter().any(|s| s.0 == &stream_id); + if !exists { return Err(IggyError::StreamIdNotFound(stream_id)); } - - Ok(stream.unwrap()) + Ok(RefMut::map(self.streams.borrow_mut(), |s| { + s.get_mut(&stream_id).unwrap() + })) } pub async fn create_stream( - &mut self, + &self, session: &Session, stream_id: Option<u32>, name: &str, - ) -> Result<&Stream, IggyError> { + ) -> Result<u32, IggyError> { self.ensure_authenticated(session)?; - self.permissioner.create_stream(session.get_user_id())?; - if self.streams_ids.contains_key(name) { + self.permissioner + .borrow() + .create_stream(session.get_user_id())?; + if self.streams_ids.borrow().contains_key(name) { return Err(IggyError::StreamNameAlreadyExists(name.to_owned())); } @@ -171,7 +195,7 @@ impl IggyShard { if stream_id.is_none() { id = CURRENT_STREAM_ID.fetch_add(1, Ordering::SeqCst); loop { - if self.streams.contains_key(&id) { + if self.streams.borrow().contains_key(&id) { if id == u32::MAX { return Err(IggyError::StreamIdAlreadyExists(id)); } @@ -184,21 +208,23 @@ impl IggyShard { id = stream_id.unwrap(); } - if self.streams.contains_key(&id) { + if self.streams.borrow().contains_key(&id) { return Err(IggyError::StreamIdAlreadyExists(id)); } - let stream = Stream::create(id, name, self.config.clone(), self.storage.clone()); + let stream = Stream::create(id, name, self.config.system.clone(), self.storage.clone()); stream.persist().await?; info!("Created stream with ID: {id}, name: '{name}'."); - self.streams_ids.insert(name.to_owned(), stream.stream_id); - self.streams.insert(stream.stream_id, stream); + self.streams_ids + .borrow_mut() + .insert(name.to_owned(), stream.stream_id); + self.streams.borrow_mut().insert(stream.stream_id, stream); self.metrics.increment_streams(1); - self.get_stream_by_id(id) + Ok(id) } pub async fn update_stream( - &mut self, + &self, session: &Session, id: &Identifier, name: &str, @@ -213,6 +239,7 @@ impl IggyShard { } self.permissioner + .borrow() .update_stream(session.get_user_id(), stream_id) .with_error_context(|error| { format!( @@ -223,7 +250,7 @@ impl IggyShard { })?; { - if let Some(stream_id_by_name) = self.streams_ids.get(name) { + if let Some(stream_id_by_name) = self.streams_ids.borrow().get(name) { if *stream_id_by_name != stream_id { return Err(IggyError::StreamNameAlreadyExists(name.to_owned())); } @@ -232,7 +259,7 @@ impl IggyShard { let old_name; { - let stream = self.get_stream_mut(id).with_error_context(|error| { + let mut stream = self.get_stream_mut(id).with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to get mutable reference to stream with id: {id}") })?; old_name = stream.name.clone(); @@ -241,8 +268,10 @@ impl IggyShard { } { - self.streams_ids.remove(&old_name); - self.streams_ids.insert(name.to_owned(), stream_id); + self.streams_ids.borrow_mut().remove(&old_name); + self.streams_ids + .borrow_mut() + .insert(name.to_owned(), stream_id); } info!("Stream with ID '{id}' updated. Old name: '{old_name}' changed to: '{name}'."); @@ -250,7 +279,7 @@ impl IggyShard { } pub async fn delete_stream( - &mut self, + &self, session: &Session, id: &Identifier, ) -> Result<u32, IggyError> { @@ -260,6 +289,7 @@ impl IggyShard { })?; let stream_id = stream.stream_id; self.permissioner + .borrow() .delete_stream(session.get_user_id(), stream_id) .with_error_context(|error| { format!( @@ -279,17 +309,16 @@ impl IggyShard { .decrement_partitions(stream.get_partitions_count()); self.metrics.decrement_messages(stream.get_messages_count()); self.metrics.decrement_segments(stream.get_segments_count()); - self.streams.remove(&stream_id); - self.streams_ids.remove(&stream_name); + self.streams.borrow_mut().remove(&stream_id); + self.streams_ids.borrow_mut().remove(&stream_name); let current_stream_id = CURRENT_STREAM_ID.load(Ordering::SeqCst); if current_stream_id > stream_id { CURRENT_STREAM_ID.store(stream_id, Ordering::SeqCst); } - let client_manager = self.client_manager.read().await; - client_manager - .delete_consumer_groups_for_stream(stream_id) - .await; + self.client_manager + .borrow_mut() + .delete_consumer_groups_for_stream(stream_id); Ok(stream_id) } @@ -302,6 +331,7 @@ impl IggyShard { format!("{COMPONENT} (error: {error}) - failed to get stream with ID: {stream_id}") })?; self.permissioner + .borrow() .purge_stream(session.get_user_id(), stream.stream_id) .with_error_context(|error| { format!( @@ -329,10 +359,12 @@ mod tests { sync::Arc, }; + //TODO: Fixme + /* #[tokio::test] async fn should_get_stream_by_id_and_name() { let tempdir = tempfile::TempDir::new().unwrap(); - let config = Arc::new(SystemConfig { + let config = Rc::new(SystemConfig { path: tempdir.path().to_str().unwrap().to_string(), ..Default::default() }); @@ -378,4 +410,5 @@ mod tests { assert_eq!(stream.stream_id, stream_id); assert_eq!(stream.name, stream_name); } + */ } diff --git a/core/server/src/shard/system/topics.rs b/core/server/src/shard/system/topics.rs index 145689af..d118090f 100644 --- a/core/server/src/shard/system/topics.rs +++ b/core/server/src/shard/system/topics.rs @@ -16,29 +16,31 @@ * under the License. */ +use super::COMPONENT; use crate::shard::IggyShard; use crate::streaming::session::Session; +use crate::streaming::streams::stream::Stream; use crate::streaming::topics::topic::Topic; use error_set::ErrContext; use iggy_common::locking::IggySharedMutFn; use iggy_common::{CompressionAlgorithm, Identifier, IggyError, IggyExpiry, MaxTopicSize}; +use tokio_util::io::StreamReader; impl IggyShard { - pub fn find_topic( + pub fn find_topic<'topic, 'stream>( &self, session: &Session, - stream_id: &Identifier, + stream: &'stream Stream, topic_id: &Identifier, - ) -> Result<&Topic, IggyError> { + ) -> Result<&'topic Topic, IggyError> + where + 'stream: 'topic, + { self.ensure_authenticated(session)?; - let stream = self - .find_stream(session, stream_id) - .with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to find stream with ID: {stream_id}") - })?; - let topic = stream.get_topic(topic_id); - if let Ok(topic) = topic { - self.permissioner + let stream_id = stream.stream_id; + let topic = stream.get_topic(topic_id)?; + self.permissioner + .borrow() .get_topic(session.get_user_id(), stream.stream_id, topic.topic_id) .with_error_context(|error| { format!( @@ -46,22 +48,21 @@ impl IggyShard { session.get_user_id(), ) })?; - return Ok(topic); - } - - topic + Ok(topic) } - pub fn find_topics( + pub fn find_topics<'stream, 'topic>( &self, session: &Session, - stream_id: &Identifier, - ) -> Result<Vec<&Topic>, IggyError> { + stream: &'stream Stream, + ) -> Result<Vec<&'topic Topic>, IggyError> + where + 'stream: 'topic, + { self.ensure_authenticated(session)?; - let stream = self.get_stream(stream_id).with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to get stream with ID: {stream_id}") - })?; + let stream_id = stream.stream_id; self.permissioner + .borrow() .get_topics(session.get_user_id(), stream.stream_id) .with_error_context(|error| { format!( @@ -72,28 +73,24 @@ impl IggyShard { Ok(stream.get_topics()) } - pub fn try_find_topic( + pub fn try_find_topic<'stream, 'topic>( &self, session: &Session, - stream_id: &Identifier, + stream: &'stream Stream, topic_id: &Identifier, - ) -> Result<Option<&Topic>, IggyError> { + ) -> Result<Option<&'topic Topic>, IggyError> + where + 'stream: 'topic, + { self.ensure_authenticated(session)?; - let Some(stream) = self - .try_find_stream(session, stream_id) - .with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to find stream with ID: {stream_id}") - })? - else { - return Ok(None); - }; - + let stream_id = stream.stream_id; let Some(topic) = stream.try_get_topic(topic_id)? else { return Ok(None); }; self.permissioner - .get_topic(session.get_user_id(), stream.stream_id, topic.topic_id) + .borrow() + .get_topic(session.get_user_id(), stream_id, topic.topic_id) .with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - permission denied to get topic with ID: {topic_id} in stream with ID: {stream_id} for user with ID: {}", @@ -105,7 +102,7 @@ impl IggyShard { #[allow(clippy::too_many_arguments)] pub async fn create_topic( - &mut self, + &self, session: &Session, stream_id: &Identifier, topic_id: Option<u32>, @@ -115,13 +112,14 @@ impl IggyShard { compression_algorithm: CompressionAlgorithm, max_topic_size: MaxTopicSize, replication_factor: Option<u8>, - ) -> Result<&Topic, IggyError> { + ) -> Result<u32, IggyError> { self.ensure_authenticated(session)?; { let stream = self.get_stream(stream_id).with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to get stream with ID: {stream_id}") })?; self.permissioner + .borrow() .create_topic(session.get_user_id(), stream.stream_id) .with_error_context(|error| { format!( @@ -131,6 +129,8 @@ impl IggyShard { })?; } + // TODO: Make create topic sync, and extract the storage persister out of it + // perform disk i/o outside of the borrow_mut of the stream. let created_topic_id = self .get_stream_mut(stream_id)? .create_topic( @@ -151,21 +151,12 @@ impl IggyShard { self.metrics.increment_partitions(partitions_count); self.metrics.increment_segments(partitions_count); - self.get_stream(stream_id) - .with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to get stream with ID: {stream_id}") - })? - .get_topic(&created_topic_id.try_into()?) - .with_error_context(|error| { - format!( - "{COMPONENT} (error: {error}) - failed to get created topic with ID: {created_topic_id} in stream with ID: {stream_id}", - ) - }) + Ok(created_topic_id) } #[allow(clippy::too_many_arguments)] pub async fn update_topic( - &mut self, + &self, session: &Session, stream_id: &Identifier, topic_id: &Identifier, @@ -174,17 +165,20 @@ impl IggyShard { compression_algorithm: CompressionAlgorithm, max_topic_size: MaxTopicSize, replication_factor: Option<u8>, - ) -> Result<&Topic, IggyError> { + ) -> Result<u32, IggyError> { self.ensure_authenticated(session)?; { + let stream = self.get_stream(stream_id).with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to get stream with ID: {stream_id}") + })?; let topic = self - .find_topic(session, stream_id, topic_id) + .find_topic(session, &stream, topic_id) .with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - failed to find topic with ID: {topic_id}" ) })?; - self.permissioner.update_topic( + self.permissioner.borrow().update_topic( session.get_user_id(), topic.stream_id, topic.topic_id, @@ -198,6 +192,8 @@ impl IggyShard { })?; } + // TODO: Make update topic sync, and extract the storage persister out of it + // perform disk i/o outside of the borrow_mut of the stream. self.get_stream_mut(stream_id)? .update_topic( topic_id, @@ -224,11 +220,11 @@ impl IggyShard { .get_topic(topic_id) .with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to get topic with ID: {topic_id} in stream with ID: {stream_id}") - }) + }).map(|topic| topic.topic_id) } pub async fn delete_topic( - &mut self, + &self, session: &Session, stream_id: &Identifier, topic_id: &Identifier, @@ -236,12 +232,15 @@ impl IggyShard { self.ensure_authenticated(session)?; let stream_id_value; { + let stream = self.get_stream(stream_id).with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to get stream with ID: {stream_id}") + })?; let topic = self - .find_topic(session, stream_id, topic_id) + .find_topic(session, &stream, topic_id) .with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to find topic with ID: {topic_id} in stream with ID: {stream_id}") })?; - self.permissioner.delete_topic( + self.permissioner.borrow().delete_topic( session.get_user_id(), topic.stream_id, topic.topic_id, @@ -254,6 +253,8 @@ impl IggyShard { stream_id_value = topic.stream_id; } + // TODO: Make delete topic sync, and extract the storage persister out of it + // perform disk i/o outside of the borrow_mut of the stream. let topic = self .get_stream_mut(stream_id)? .delete_topic(topic_id) @@ -266,10 +267,9 @@ impl IggyShard { self.metrics.decrement_messages(topic.get_messages_count()); self.metrics .decrement_segments(topic.get_segments_count().await); - let client_manager = self.client_manager.read().await; - client_manager - .delete_consumer_groups_for_topic(stream_id_value, topic.topic_id) - .await; + self.client_manager + .borrow_mut() + .delete_consumer_groups_for_topic(stream_id_value, topic.topic_id); Ok(()) } @@ -279,12 +279,16 @@ impl IggyShard { stream_id: &Identifier, topic_id: &Identifier, ) -> Result<(), IggyError> { + let stream = self.get_stream(stream_id).with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to get stream with ID: {stream_id}") + })?; let topic = self - .find_topic(session, stream_id, topic_id) + .find_topic(session, &stream, topic_id) .with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to find topic with ID: {topic_id} in stream with ID: {stream_id}") })?; self.permissioner + .borrow() .purge_topic(session.get_user_id(), topic.stream_id, topic.topic_id) .with_error_context(|error| { format!( diff --git a/core/server/src/shard/system/users.rs b/core/server/src/shard/system/users.rs index f2405f1f..f3b271a7 100644 --- a/core/server/src/shard/system/users.rs +++ b/core/server/src/shard/system/users.rs @@ -16,6 +16,7 @@ * under the License. */ +use super::COMPONENT; use crate::shard::IggyShard; use crate::state::command::EntryCommand; use crate::state::models::CreateUserWithId; @@ -33,6 +34,7 @@ use iggy_common::create_user::CreateUser; use iggy_common::defaults::*; use iggy_common::locking::IggySharedMutFn; use iggy_common::{IdKind, Identifier}; +use std::cell::RefMut; use std::env; use std::sync::Arc; use std::sync::atomic::{AtomicU32, Ordering}; @@ -46,7 +48,7 @@ impl IggyShard { &self, session: &Session, user_id: &Identifier, - ) -> Result<Option<&User>, IggyError> { + ) -> Result<Option<User>, IggyError> { self.ensure_authenticated(session)?; let Some(user) = self.try_get_user(user_id)? else { return Ok(None); @@ -54,7 +56,7 @@ impl IggyShard { let session_user_id = session.get_user_id(); if user.id != session_user_id { - self.permissioner.get_user(session_user_id).with_error_context(|error| { + self.permissioner.borrow().get_user(session_user_id).with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - permission denied to get user with ID: {user_id} for current user with ID: {session_user_id}" ) @@ -64,45 +66,73 @@ impl IggyShard { Ok(Some(user)) } - pub fn get_user(&self, user_id: &Identifier) -> Result<&User, IggyError> { + pub fn get_user(&self, user_id: &Identifier) -> Result<User, IggyError> { self.try_get_user(user_id)? .ok_or(IggyError::ResourceNotFound(user_id.to_string())) } - pub fn try_get_user(&self, user_id: &Identifier) -> Result<Option<&User>, IggyError> { + pub fn try_get_user(&self, user_id: &Identifier) -> Result<Option<User>, IggyError> { match user_id.kind { - IdKind::Numeric => Ok(self.users.get(&user_id.get_u32_value()?)), + IdKind::Numeric => { + let user = self + .users + .borrow() + .get(&user_id.get_u32_value()?) + .map(|user| user.clone()); + Ok(user) + } IdKind::String => { let username = user_id.get_cow_str_value()?; - Ok(self + let user = self .users + .borrow() .iter() .find(|(_, user)| user.username == username) - .map(|(_, user)| user)) + .map(|(_, user)| user.clone()); + Ok(user) } } } - pub fn get_user_mut(&mut self, user_id: &Identifier) -> Result<&mut User, IggyError> { + pub fn get_user_mut(&self, user_id: &Identifier) -> Result<RefMut<'_, User>, IggyError> { match user_id.kind { - IdKind::Numeric => self - .users - .get_mut(&user_id.get_u32_value()?) - .ok_or(IggyError::ResourceNotFound(user_id.to_string())), + IdKind::Numeric => { + let user_id = user_id.get_u32_value()?; + let users = self.users.borrow_mut(); + let exists = users.contains_key(&user_id); + if !exists { + return Err(IggyError::ResourceNotFound(user_id.to_string())); + } + Ok(RefMut::map(users, |u| { + let user = u.get_mut(&user_id); + user.unwrap() + })) + } IdKind::String => { let username = user_id.get_cow_str_value()?; - self.users - .iter_mut() + let users = self.users.borrow_mut(); + let exists = users + .iter() .find(|(_, user)| user.username == username) - .map(|(_, user)| user) - .ok_or(IggyError::ResourceNotFound(user_id.to_string())) + .is_some(); + if !exists { + return Err(IggyError::ResourceNotFound(user_id.to_string())); + } + Ok(RefMut::map(users, |u| { + let user = u + .iter_mut() + .find(|(_, user)| user.username == username) + .map(|(_, user)| user); + user.unwrap() + })) } } } - pub async fn get_users(&self, session: &Session) -> Result<Vec<&User>, IggyError> { + pub async fn get_users(&self, session: &Session) -> Result<Vec<User>, IggyError> { self.ensure_authenticated(session)?; self.permissioner + .borrow() .get_users(session.get_user_id()) .with_error_context(|error| { format!( @@ -110,19 +140,20 @@ impl IggyShard { session.get_user_id() ) })?; - Ok(self.users.values().collect()) + Ok(self.users.borrow().values().cloned().collect()) } pub async fn create_user( - &mut self, + &self, session: &Session, username: &str, password: &str, status: UserStatus, permissions: Option<Permissions>, - ) -> Result<&User, IggyError> { + ) -> Result<User, IggyError> { self.ensure_authenticated(session)?; self.permissioner + .borrow() .create_user(session.get_user_id()) .with_error_context(|error| { format!( @@ -131,12 +162,17 @@ impl IggyShard { ) })?; - if self.users.iter().any(|(_, user)| user.username == username) { + if self + .users + .borrow() + .iter() + .any(|(_, user)| user.username == username) + { error!("User: {username} already exists."); return Err(IggyError::UserAlreadyExists); } - if self.users.len() >= MAX_USERS { + if self.users.borrow().len() >= MAX_USERS { error!("Available users limit reached."); return Err(IggyError::UsersLimitReached); } @@ -145,8 +181,9 @@ impl IggyShard { info!("Creating user: {username} with ID: {user_id}..."); let user = User::new(user_id, username, password, status, permissions.clone()); self.permissioner + .borrow_mut() .init_permissions_for_user(user_id, permissions); - self.users.insert(user.id, user); + self.users.borrow_mut().insert(user.id, user); info!("Created user: {username} with ID: {user_id}."); self.metrics.increment_users(1); self.get_user(&user_id.try_into()?) @@ -155,16 +192,13 @@ impl IggyShard { }) } - pub async fn delete_user( - &mut self, - session: &Session, - user_id: &Identifier, - ) -> Result<User, IggyError> { + pub fn delete_user(&self, session: &Session, user_id: &Identifier) -> Result<User, IggyError> { self.ensure_authenticated(session)?; let existing_user_id; let existing_username; { self.permissioner + .borrow() .delete_user(session.get_user_id()) .with_error_context(|error| { format!( @@ -187,14 +221,14 @@ impl IggyShard { info!("Deleting user: {existing_username} with ID: {user_id}..."); let user = self .users + .borrow_mut() .remove(&existing_user_id) .ok_or(IggyError::ResourceNotFound(user_id.to_string()))?; self.permissioner + .borrow_mut() .delete_permissions_for_user(existing_user_id); - let mut client_manager = self.client_manager.write().await; - client_manager + self.client_manager.borrow_mut() .delete_clients_for_user(existing_user_id) - .await .with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - failed to delete clients for user with ID: {existing_user_id}" @@ -205,15 +239,16 @@ impl IggyShard { Ok(user) } - pub async fn update_user( - &mut self, + pub fn update_user( + &self, session: &Session, user_id: &Identifier, username: Option<String>, status: Option<UserStatus>, - ) -> Result<&User, IggyError> { + ) -> Result<User, IggyError> { self.ensure_authenticated(session)?; self.permissioner + .borrow() .update_user(session.get_user_id()) .with_error_context(|error| { format!( @@ -231,7 +266,7 @@ impl IggyShard { } } - let user = self.get_user_mut(user_id).with_error_context(|error| { + let mut user = self.get_user_mut(user_id).with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to get mutable reference to the user with id: {user_id}") })?; if let Some(username) = username { @@ -241,13 +276,18 @@ impl IggyShard { if let Some(status) = status { user.status = status; } + let cloned_user = user.clone(); + drop(user); - info!("Updated user: {} with ID: {}.", user.username, user.id); - Ok(user) + info!( + "Updated user: {} with ID: {}.", + cloned_user.username, cloned_user.id + ); + Ok(cloned_user) } - pub async fn update_permissions( - &mut self, + pub fn update_permissions( + &self, session: &Session, user_id: &Identifier, permissions: Option<Permissions>, @@ -256,13 +296,14 @@ impl IggyShard { { self.permissioner + .borrow() .update_permissions(session.get_user_id()) .with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - permission denied to update permissions for user with id: {}", session.get_user_id() ) })?; - let user = self.get_user(user_id).with_error_context(|error| { + let user: User = self.get_user(user_id).with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to get user with id: {user_id}") })?; if user.is_root() { @@ -271,11 +312,12 @@ impl IggyShard { } self.permissioner + .borrow_mut() .update_permissions_for_user(user.id, permissions.clone()); } { - let user = self.get_user_mut(user_id).with_error_context(|error| { + let mut user = self.get_user_mut(user_id).with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - failed to get mutable reference to the user with id: {user_id}" ) @@ -290,8 +332,8 @@ impl IggyShard { Ok(()) } - pub async fn change_password( - &mut self, + pub fn change_password( + &self, session: &Session, user_id: &Identifier, current_password: &str, @@ -305,11 +347,13 @@ impl IggyShard { })?; let session_user_id = session.get_user_id(); if user.id != session_user_id { - self.permissioner.change_password(session_user_id)?; + self.permissioner + .borrow() + .change_password(session_user_id)?; } } - let user = self.get_user_mut(user_id).with_error_context(|error| { + let mut user = self.get_user_mut(user_id).with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to get mutable reference to the user with id: {user_id}") })?; if !crypto::verify_password(current_password, &user.password) { @@ -328,22 +372,21 @@ impl IggyShard { Ok(()) } - pub async fn login_user( + pub fn login_user( &self, username: &str, password: &str, session: Option<&Session>, - ) -> Result<&User, IggyError> { + ) -> Result<User, IggyError> { self.login_user_with_credentials(username, Some(password), session) - .await } - pub async fn login_user_with_credentials( + pub fn login_user_with_credentials( &self, username: &str, password: Option<&str>, session: Option<&Session>, - ) -> Result<&User, IggyError> { + ) -> Result<User, IggyError> { let user = match self.get_user(&username.try_into()?) { Ok(user) => user, Err(_) => { @@ -380,14 +423,13 @@ impl IggyShard { user.username, session.get_user_id() ); - self.logout_user(session).await?; + self.logout_user(session)?; } session.set_user_id(user.id); - let mut client_manager = self.client_manager.write().await; + let mut client_manager = self.client_manager.borrow_mut(); client_manager .set_user_id(session.client_id, user.id) - .await .with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - failed to set user_id to client, client ID: {}, user ID: {}", @@ -397,7 +439,7 @@ impl IggyShard { Ok(user) } - pub async fn logout_user(&self, session: &Session) -> Result<(), IggyError> { + pub fn logout_user(&self, session: &Session) -> Result<(), IggyError> { self.ensure_authenticated(session)?; let user = self .get_user(&Identifier::numeric(session.get_user_id())?) @@ -412,8 +454,8 @@ impl IggyShard { user.username, user.id ); if session.client_id > 0 { - let mut client_manager = self.client_manager.write().await; - client_manager.clear_user_id(session.client_id).await?; + let mut client_manager = self.client_manager.borrow_mut(); + client_manager.clear_user_id(session.client_id)?; info!( "Cleared user ID: {} for client: {}.", user.id, session.client_id diff --git a/core/server/src/streaming/partitions/consumer_offsets.rs b/core/server/src/streaming/partitions/consumer_offsets.rs index 96080c03..e9f5c57d 100644 --- a/core/server/src/streaming/partitions/consumer_offsets.rs +++ b/core/server/src/streaming/partitions/consumer_offsets.rs @@ -26,10 +26,7 @@ use iggy_common::IggyError; use tracing::trace; impl Partition { - pub fn get_consumer_offset( - &self, - consumer: PollingConsumer, - ) -> Result<Option<u64>, IggyError> { + pub fn get_consumer_offset(&self, consumer: PollingConsumer) -> Result<Option<u64>, IggyError> { trace!( "Getting consumer offset for {}, partition: {}, current: {}...", consumer, self.partition_id, self.current_offset diff --git a/core/server/src/streaming/storage.rs b/core/server/src/streaming/storage.rs index 978b97aa..eaf96dc3 100644 --- a/core/server/src/streaming/storage.rs +++ b/core/server/src/streaming/storage.rs @@ -87,7 +87,7 @@ pub enum PartitionStorageKind { #[cfg_attr(test, automock)] pub trait SystemInfoStorage { fn load(&self) -> impl Future<Output = Result<SystemInfo, IggyError>>; - fn save(&self, system_info: &SystemInfo) -> impl Future<Output = Result<(), IggyError>>; + fn save(&self, system_info: &SystemInfo) -> impl Future<Output = Result<(), IggyError>>; } #[cfg_attr(test, automock)] @@ -96,8 +96,8 @@ pub trait StreamStorage { &self, stream: &mut Stream, state: StreamState, - ) -> impl Future<Output = Result<(), IggyError>>; - fn save(&self, stream: &Stream) -> impl Future<Output = Result<(), IggyError>>; + ) -> impl Future<Output = Result<(), IggyError>>; + fn save(&self, stream: &Stream) -> impl Future<Output = Result<(), IggyError>>; fn delete(&self, stream: &Stream) -> impl Future<Output = Result<(), IggyError>>; } @@ -107,9 +107,9 @@ pub trait TopicStorage { &self, topic: &mut Topic, state: TopicState, - ) -> impl Future<Output = Result<(), IggyError>>; - fn save(&self, topic: &Topic) -> impl Future<Output = Result<(), IggyError>>; - fn delete(&self, topic: &Topic) -> impl Future<Output = Result<(), IggyError>>; + ) -> impl Future<Output = Result<(), IggyError>>; + fn save(&self, topic: &Topic) -> impl Future<Output = Result<(), IggyError>>; + fn delete(&self, topic: &Topic) -> impl Future<Output = Result<(), IggyError>>; } #[cfg_attr(test, automock)] @@ -119,9 +119,8 @@ pub trait PartitionStorage { partition: &mut Partition, state: PartitionState, ) -> impl Future<Output = Result<(), IggyError>>; - fn save(&self, partition: &mut Partition) - -> impl Future<Output = Result<(), IggyError>>; - fn delete(&self, partition: &Partition) -> impl Future<Output = Result<(), IggyError>>; + fn save(&self, partition: &mut Partition) -> impl Future<Output = Result<(), IggyError>>; + fn delete(&self, partition: &Partition) -> impl Future<Output = Result<(), IggyError>>; fn save_consumer_offset( &self, offset: u64, @@ -131,15 +130,9 @@ pub trait PartitionStorage { &self, kind: ConsumerKind, path: &str, - ) -> impl Future<Output = Result<Vec<ConsumerOffset>, IggyError>>; - fn delete_consumer_offsets( - &self, - path: &str, - ) -> impl Future<Output = Result<(), IggyError>>; - fn delete_consumer_offset( - &self, - path: &str, - ) -> impl Future<Output = Result<(), IggyError>>; + ) -> impl Future<Output = Result<Vec<ConsumerOffset>, IggyError>>; + fn delete_consumer_offsets(&self, path: &str) -> impl Future<Output = Result<(), IggyError>>; + fn delete_consumer_offset(&self, path: &str) -> impl Future<Output = Result<(), IggyError>>; } #[derive(Debug)] diff --git a/core/server/src/streaming/topics/consumer_group.rs b/core/server/src/streaming/topics/consumer_group.rs index f000ce79..c810159c 100644 --- a/core/server/src/streaming/topics/consumer_group.rs +++ b/core/server/src/streaming/topics/consumer_group.rs @@ -57,10 +57,7 @@ impl ConsumerGroup { self.assign_partitions(); } - pub fn calculate_partition_id( - &mut self, - member_id: u32, - ) -> Result<Option<u32>, IggyError> { + pub fn calculate_partition_id(&mut self, member_id: u32) -> Result<Option<u32>, IggyError> { let member = self.members.get_mut(&member_id); if let Some(member) = member { return Ok(member.calculate_partition_id()); diff --git a/core/server/src/streaming/topics/consumer_groups.rs b/core/server/src/streaming/topics/consumer_groups.rs index c1371096..92168738 100644 --- a/core/server/src/streaming/topics/consumer_groups.rs +++ b/core/server/src/streaming/topics/consumer_groups.rs @@ -17,12 +17,12 @@ */ use crate::binary::handlers::topics::get_topic_handler; -use crate::streaming::topics::{consumer_group, COMPONENT}; use crate::streaming::topics::consumer_group::ConsumerGroup; use crate::streaming::topics::topic::Topic; +use crate::streaming::topics::{COMPONENT, consumer_group}; use error_set::ErrContext; -use iggy_common::locking::IggySharedMutFn; use iggy_common::IggyError; +use iggy_common::locking::IggySharedMutFn; use iggy_common::{IdKind, Identifier}; use std::cell::{Ref, RefMut}; use std::sync::atomic::Ordering; @@ -48,7 +48,10 @@ impl Topic { self.consumer_groups.borrow().values().cloned().collect() } - pub fn get_consumer_group(&self, identifier: &Identifier) -> Result<Ref<'_, ConsumerGroup>, IggyError> { + pub fn get_consumer_group( + &self, + identifier: &Identifier, + ) -> Result<Ref<'_, ConsumerGroup>, IggyError> { match identifier.kind { IdKind::Numeric => self.get_consumer_group_by_id(identifier.get_u32_value().unwrap()), IdKind::String => self.get_consumer_group_by_name(&identifier.get_cow_str_value()?), @@ -93,7 +96,10 @@ impl Topic { })) } - pub fn get_consumer_group_by_name(&self, name: &str) -> Result<Ref<'_, ConsumerGroup>, IggyError> { + pub fn get_consumer_group_by_name( + &self, + name: &str, + ) -> Result<Ref<'_, ConsumerGroup>, IggyError> { let group_id = self.consumer_groups_ids.get(name); if group_id.is_none() { return Err(IggyError::ConsumerGroupNameNotFound( @@ -199,12 +205,8 @@ impl Topic { return Err(IggyError::ConsumerGroupIdAlreadyExists(id, self.topic_id)); } - let consumer_group = ConsumerGroup::new( - self.topic_id, - id, - name, - self.partitions.len() as u32, - ); + let consumer_group = + ConsumerGroup::new(self.topic_id, id, name, self.partitions.len() as u32); self.consumer_groups_ids.insert(name.to_owned(), id); let cloned_group = consumer_group.clone(); self.consumer_groups.borrow_mut().insert(id, consumer_group); diff --git a/core/server/src/streaming/topics/topic.rs b/core/server/src/streaming/topics/topic.rs index 16aa324c..7e8ca4ac 100644 --- a/core/server/src/streaming/topics/topic.rs +++ b/core/server/src/streaming/topics/topic.rs @@ -23,13 +23,13 @@ use crate::streaming::storage::SystemStorage; use crate::streaming::topics::consumer_group::ConsumerGroup; use ahash::AHashMap; use core::fmt; -use std::cell::RefCell; -use std::rc::Rc; use iggy_common::locking::IggyRwLock; use iggy_common::{ CompressionAlgorithm, Consumer, ConsumerKind, IggyByteSize, IggyError, IggyExpiry, IggyTimestamp, MaxTopicSize, Sizeable, }; +use std::cell::RefCell; +use std::rc::Rc; use std::sync::Arc; use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; diff --git a/core/server/src/streaming/users/user.rs b/core/server/src/streaming/users/user.rs index 01a9faa0..7e68d98f 100644 --- a/core/server/src/streaming/users/user.rs +++ b/core/server/src/streaming/users/user.rs @@ -24,7 +24,7 @@ use iggy_common::defaults::*; use iggy_common::{Permissions, UserId}; use std::sync::Arc; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct User { pub id: UserId, pub status: UserStatus,
