This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch rebase_master in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 8566672cc5b26dcd2b0d98781d59dd7acde8dbc0 Author: numinex <[email protected]> AuthorDate: Wed Jun 25 14:32:26 2025 +0200 fixes --- core/common/src/locking/mod.rs | 10 +- core/common/src/locking/tokio_lock.rs | 2 - core/server/src/shard/system/clients.rs | 2 +- core/server/src/shard/system/consumer_groups.rs | 3 +- core/server/src/shard/system/consumer_offsets.rs | 2 - core/server/src/shard/system/mod.rs | 3 +- .../src/shard/system/personal_access_tokens.rs | 2 - core/server/src/shard/system/segments.rs | 3 +- core/server/src/shard/system/stats.rs | 2 +- core/server/src/shard/system/system.rs | 291 --------------------- core/server/src/shard/system/topics.rs | 5 +- .../src/streaming/partitions/consumer_offsets.rs | 2 +- core/server/src/streaming/storage.rs | 2 +- core/server/src/streaming/streams/topics.rs | 3 +- core/server/src/streaming/topics/consumer_group.rs | 31 ++- .../server/src/streaming/topics/consumer_groups.rs | 136 ++++++---- .../src/streaming/topics/consumer_offsets.rs | 6 +- core/server/src/streaming/topics/messages.rs | 5 +- core/server/src/streaming/topics/partitions.rs | 4 +- core/server/src/streaming/topics/storage.rs | 9 +- core/server/src/streaming/topics/topic.rs | 25 +- 21 files changed, 142 insertions(+), 406 deletions(-) diff --git a/core/common/src/locking/mod.rs b/core/common/src/locking/mod.rs index 42d72ebb..c2e542c7 100644 --- a/core/common/src/locking/mod.rs +++ b/core/common/src/locking/mod.rs @@ -29,19 +29,19 @@ mod fast_async_lock; #[cfg(feature = "tokio_lock")] #[cfg(not(any(feature = "fast_async_lock")))] -pub type IggySharedMut<T> = tokio_lock::IggyTokioRwLock<T>; +pub type IggyRwLock<T> = tokio_lock::IggyTokioRwLock<T>; //this can be used in the future to provide different locking mechanisms #[cfg(feature = "fast_async_lock")] -pub type IggySharedMut<T> = fast_async_lock::IggyFastAsyncRwLock<T>; +pub type IggyRwLock<T> = fast_async_lock::IggyFastAsyncRwLock<T>; #[allow(async_fn_in_trait)] -pub trait IggySharedMutFn<T>: Send + Sync { - type ReadGuard<'a>: Deref<Target = T> + Send +pub trait IggySharedMutFn<T> { + type ReadGuard<'a>: Deref<Target = T> where T: 'a, Self: 'a; - type WriteGuard<'a>: DerefMut<Target = T> + Send + type WriteGuard<'a>: DerefMut<Target = T> where T: 'a, Self: 'a; diff --git a/core/common/src/locking/tokio_lock.rs b/core/common/src/locking/tokio_lock.rs index 2e26b367..ef5baaad 100644 --- a/core/common/src/locking/tokio_lock.rs +++ b/core/common/src/locking/tokio_lock.rs @@ -25,8 +25,6 @@ use tokio::sync::{RwLock as TokioRwLock, RwLockReadGuard, RwLockWriteGuard}; pub struct IggyTokioRwLock<T>(Arc<TokioRwLock<T>>); impl<T> IggySharedMutFn<T> for IggyTokioRwLock<T> -where - T: Send + Sync, { type ReadGuard<'a> = RwLockReadGuard<'a, T> diff --git a/core/server/src/shard/system/clients.rs b/core/server/src/shard/system/clients.rs index fde182ce..a2debc42 100644 --- a/core/server/src/shard/system/clients.rs +++ b/core/server/src/shard/system/clients.rs @@ -22,7 +22,7 @@ use crate::streaming::session::Session; use error_set::ErrContext; use iggy_common::Identifier; use iggy_common::IggyError; -use iggy_common::locking::IggySharedMut; +use iggy_common::locking::IggyRwLock; use iggy_common::locking::IggySharedMutFn; use std::net::SocketAddr; use std::rc::Rc; diff --git a/core/server/src/shard/system/consumer_groups.rs b/core/server/src/shard/system/consumer_groups.rs index 4975a92d..d8d8818e 100644 --- a/core/server/src/shard/system/consumer_groups.rs +++ b/core/server/src/shard/system/consumer_groups.rs @@ -16,9 +16,8 @@ * under the License. */ +use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::COMPONENT; -use crate::streaming::systems::system::System; use crate::streaming::topics::consumer_group::ConsumerGroup; use error_set::ErrContext; use iggy_common::Identifier; diff --git a/core/server/src/shard/system/consumer_offsets.rs b/core/server/src/shard/system/consumer_offsets.rs index e63815ff..0c7438a6 100644 --- a/core/server/src/shard/system/consumer_offsets.rs +++ b/core/server/src/shard/system/consumer_offsets.rs @@ -17,8 +17,6 @@ */ use crate::streaming::session::Session; -use crate::streaming::systems::COMPONENT; -use crate::streaming::systems::system::System; use error_set::ErrContext; use iggy_common::{Consumer, ConsumerOffsetInfo, Identifier, IggyError}; diff --git a/core/server/src/shard/system/mod.rs b/core/server/src/shard/system/mod.rs index 1419abb9..fae8ad2b 100644 --- a/core/server/src/shard/system/mod.rs +++ b/core/server/src/shard/system/mod.rs @@ -28,8 +28,7 @@ pub mod snapshot; pub mod stats; pub mod storage; pub mod streams; -pub mod system; pub mod topics; pub mod users; -pub const COMPONENT: &str = "SYSTEM"; +pub const COMPONENT: &str = "SHARD_SYSTEM"; diff --git a/core/server/src/shard/system/personal_access_tokens.rs b/core/server/src/shard/system/personal_access_tokens.rs index 18125bed..b5452e9a 100644 --- a/core/server/src/shard/system/personal_access_tokens.rs +++ b/core/server/src/shard/system/personal_access_tokens.rs @@ -18,8 +18,6 @@ use crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken; use crate::streaming::session::Session; -use crate::streaming::systems::COMPONENT; -use crate::streaming::systems::system::System; use crate::streaming::users::user::User; use error_set::ErrContext; use iggy_common::IggyError; diff --git a/core/server/src/shard/system/segments.rs b/core/server/src/shard/system/segments.rs index bd4043a2..27abb76b 100644 --- a/core/server/src/shard/system/segments.rs +++ b/core/server/src/shard/system/segments.rs @@ -1,3 +1,4 @@ +use crate::shard::IggyShard; /* Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,8 +17,6 @@ * under the License. */ use crate::streaming::session::Session; -use crate::streaming::systems::COMPONENT; -use crate::streaming::systems::system::System; use error_set::ErrContext; use iggy_common::Identifier; use iggy_common::IggyError; diff --git a/core/server/src/shard/system/stats.rs b/core/server/src/shard/system/stats.rs index 2d2d2ea7..8410cac2 100644 --- a/core/server/src/shard/system/stats.rs +++ b/core/server/src/shard/system/stats.rs @@ -16,8 +16,8 @@ * under the License. */ +use crate::shard::IggyShard; use crate::VERSION; -use crate::streaming::systems::system::System; use crate::versioning::SemanticVersion; use iggy_common::locking::IggySharedMutFn; use iggy_common::{IggyDuration, IggyError, Stats}; diff --git a/core/server/src/shard/system/system.rs b/core/server/src/shard/system/system.rs deleted file mode 100644 index 5d830d71..00000000 --- a/core/server/src/shard/system/system.rs +++ /dev/null @@ -1,291 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -use crate::archiver::{ArchiverKind, ArchiverKindType}; -use crate::configs::server::{DataMaintenanceConfig, PersonalAccessTokenConfig}; -use crate::configs::system::SystemConfig; -use crate::map_toggle_str; -use crate::state::StateKind; -use crate::state::file::FileState; -use crate::state::system::SystemState; -use crate::streaming::clients::client_manager::ClientManager; -use crate::streaming::diagnostics::metrics::Metrics; -use crate::streaming::persistence::persister::*; -use crate::streaming::session::Session; -use crate::streaming::storage::SystemStorage; -use crate::streaming::streams::stream::Stream; -use crate::streaming::systems::COMPONENT; -use crate::streaming::users::permissioner::Permissioner; -use crate::streaming::users::user::User; -use crate::versioning::SemanticVersion; -use ahash::AHashMap; -use error_set::ErrContext; -use iggy_common::locking::IggySharedMut; -use iggy_common::locking::IggySharedMutFn; -use iggy_common::{Aes256GcmEncryptor, EncryptorKind, IggyError, UserId}; -use std::path::Path; -use std::sync::Arc; -use tokio::fs::{create_dir_all, remove_dir_all}; -use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; -use tokio::time::Instant; -use tracing::{error, info, instrument, trace}; - -#[derive(Debug)] -pub struct SharedSystem { - system: Arc<RwLock<System>>, -} - -impl SharedSystem { - pub fn new(system: System) -> SharedSystem { - SharedSystem { - system: Arc::new(RwLock::new(system)), - } - } - - pub async fn read(&self) -> RwLockReadGuard<System> { - self.system.read().await - } - - pub async fn write(&self) -> RwLockWriteGuard<System> { - self.system.write().await - } -} - -impl Clone for SharedSystem { - fn clone(&self) -> Self { - SharedSystem { - system: self.system.clone(), - } - } -} - -#[derive(Debug)] -pub struct System { - pub permissioner: Permissioner, - pub(crate) storage: Arc<SystemStorage>, - pub(crate) streams: AHashMap<u32, Stream>, - pub(crate) streams_ids: AHashMap<String, u32>, - pub(crate) users: AHashMap<UserId, User>, - pub(crate) config: Arc<SystemConfig>, - pub(crate) client_manager: IggySharedMut<ClientManager>, - pub(crate) encryptor: Option<Arc<EncryptorKind>>, - pub(crate) metrics: Metrics, - pub(crate) state: Arc<StateKind>, - pub(crate) archiver: Option<Arc<ArchiverKind>>, - pub personal_access_token: PersonalAccessTokenConfig, -} - -impl System { - pub fn new( - config: Arc<SystemConfig>, - data_maintenance_config: DataMaintenanceConfig, - pat_config: PersonalAccessTokenConfig, - ) -> System { - let version = SemanticVersion::current().expect("Invalid version"); - info!( - "Server-side encryption is {}.", - map_toggle_str(config.encryption.enabled) - ); - - let encryptor: Option<EncryptorKind> = match config.encryption.enabled { - true => Some(EncryptorKind::Aes256Gcm( - Aes256GcmEncryptor::from_base64_key(&config.encryption.key).unwrap(), - )), - false => None, - }; - - let state_persister = Self::resolve_persister(config.state.enforce_fsync); - let partition_persister = Self::resolve_persister(config.partition.enforce_fsync); - - let state = Arc::new(StateKind::File(FileState::new( - &config.get_state_messages_file_path(), - &version, - state_persister, - encryptor, - ))); - - //TODO: Just shut the fuck up rust-analyzer. - let encryptor: Option<EncryptorKind> = match config.encryption.enabled { - true => Some(EncryptorKind::Aes256Gcm( - Aes256GcmEncryptor::from_base64_key(&config.encryption.key).unwrap(), - )), - false => None, - }; - Self::create( - config.clone(), - SystemStorage::new(config, partition_persister), - state, - encryptor.map(Arc::new), - data_maintenance_config, - pat_config, - ) - } - - fn resolve_persister(enforce_fsync: bool) -> Arc<PersisterKind> { - match enforce_fsync { - true => Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister)), - false => Arc::new(PersisterKind::File(FilePersister)), - } - } - - pub fn create( - system_config: Arc<SystemConfig>, - storage: SystemStorage, - state: Arc<StateKind>, - encryptor: Option<Arc<EncryptorKind>>, - data_maintenance_config: DataMaintenanceConfig, - pat_config: PersonalAccessTokenConfig, - ) -> System { - let archiver_config = data_maintenance_config.archiver; - let archiver: Option<Arc<ArchiverKind>> = if archiver_config.enabled { - info!("Archiving is enabled, kind: {}", archiver_config.kind); - match archiver_config.kind { - ArchiverKindType::Disk => Some(Arc::new(ArchiverKind::get_disk_archiver( - archiver_config - .disk - .clone() - .expect("Disk archiver config is missing"), - ))), - ArchiverKindType::S3 => Some(Arc::new( - ArchiverKind::get_s3_archiver( - archiver_config - .s3 - .clone() - .expect("S3 archiver config is missing"), - ) - .expect("Failed to create S3 archiver"), - )), - } - } else { - info!("Archiving is disabled."); - None - }; - - System { - config: system_config, - streams: AHashMap::new(), - streams_ids: AHashMap::new(), - storage: Arc::new(storage), - encryptor, - client_manager: IggySharedMut::new(ClientManager::default()), - permissioner: Permissioner::default(), - metrics: Metrics::init(), - users: AHashMap::new(), - state, - personal_access_token: pat_config, - archiver, - } - } - - #[instrument(skip_all, name = "trace_system_init")] - pub async fn init(&mut self) -> Result<(), IggyError> { - let system_path = self.config.get_system_path(); - if !Path::new(&system_path).exists() && create_dir_all(&system_path).await.is_err() { - return Err(IggyError::CannotCreateBaseDirectory(system_path)); - } - - let state_path = self.config.get_state_path(); - if !Path::new(&state_path).exists() && create_dir_all(&state_path).await.is_err() { - return Err(IggyError::CannotCreateStateDirectory(state_path)); - } - - let streams_path = self.config.get_streams_path(); - if !Path::new(&streams_path).exists() && create_dir_all(&streams_path).await.is_err() { - return Err(IggyError::CannotCreateStreamsDirectory(streams_path)); - } - - let runtime_path = self.config.get_runtime_path(); - if Path::new(&runtime_path).exists() && remove_dir_all(&runtime_path).await.is_err() { - return Err(IggyError::CannotRemoveRuntimeDirectory(runtime_path)); - } - - if create_dir_all(&runtime_path).await.is_err() { - return Err(IggyError::CannotCreateRuntimeDirectory(runtime_path)); - } - - info!( - "Initializing system, data will be stored at: {}", - self.config.get_system_path() - ); - - let state_entries = self.state.init().await.with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to initialize state entries") - })?; - let system_state = SystemState::init(state_entries) - .await - .with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to initialize system state") - })?; - let now = Instant::now(); - //DONE - /* - self.load_version().await.with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to load version") - })?; - */ - self.load_users(system_state.users.into_values().collect()) - .await - .with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to load users") - })?; - self.load_streams(system_state.streams.into_values().collect()) - .await - .with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to load streams") - })?; - if let Some(archiver) = self.archiver.as_ref() { - archiver - .init() - .await - .expect("Failed to initialize archiver"); - } - info!("Initialized system in {} ms.", now.elapsed().as_millis()); - Ok(()) - } - - #[instrument(skip_all, name = "trace_shutdown")] - pub async fn shutdown(&mut self) -> Result<(), IggyError> { - self.persist_messages().await?; - Ok(()) - } - - #[instrument(skip_all, name = "trace_persist_messages")] - pub async fn persist_messages(&self) -> Result<usize, IggyError> { - trace!("Saving buffered messages on disk..."); - let mut saved_messages_number = 0; - for stream in self.streams.values() { - saved_messages_number += stream.persist_messages().await?; - } - - Ok(saved_messages_number) - } - - pub fn ensure_authenticated(&self, session: &Session) -> Result<(), IggyError> { - if !session.is_active() { - error!("{COMPONENT} - session is inactive, session: {session}"); - return Err(IggyError::StaleClient); - } - - if session.is_authenticated() { - Ok(()) - } else { - error!("{COMPONENT} - unauthenticated access attempt, session: {session}"); - Err(IggyError::Unauthenticated) - } - } -} diff --git a/core/server/src/shard/system/topics.rs b/core/server/src/shard/system/topics.rs index a61f5464..145689af 100644 --- a/core/server/src/shard/system/topics.rs +++ b/core/server/src/shard/system/topics.rs @@ -16,15 +16,14 @@ * under the License. */ +use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::COMPONENT; -use crate::streaming::systems::system::System; use crate::streaming::topics::topic::Topic; use error_set::ErrContext; use iggy_common::locking::IggySharedMutFn; use iggy_common::{CompressionAlgorithm, Identifier, IggyError, IggyExpiry, MaxTopicSize}; -impl System { +impl IggyShard { pub fn find_topic( &self, session: &Session, diff --git a/core/server/src/streaming/partitions/consumer_offsets.rs b/core/server/src/streaming/partitions/consumer_offsets.rs index 177793c9..96080c03 100644 --- a/core/server/src/streaming/partitions/consumer_offsets.rs +++ b/core/server/src/streaming/partitions/consumer_offsets.rs @@ -26,7 +26,7 @@ use iggy_common::IggyError; use tracing::trace; impl Partition { - pub async fn get_consumer_offset( + pub fn get_consumer_offset( &self, consumer: PollingConsumer, ) -> Result<Option<u64>, IggyError> { diff --git a/core/server/src/streaming/storage.rs b/core/server/src/streaming/storage.rs index 51e2565e..978b97aa 100644 --- a/core/server/src/streaming/storage.rs +++ b/core/server/src/streaming/storage.rs @@ -154,7 +154,7 @@ pub struct SystemStorage { impl SystemStorage { pub fn new(config: Rc<SystemConfig>, persister: Arc<PersisterKind>) -> Self { Self { - info: Arc::new(SystemInfoStorageKind::File(FileSystemInfoStorage::new( + info: Rc::new(SystemInfoStorageKind::File(FileSystemInfoStorage::new( config.get_state_info_path(), persister.clone(), ))), diff --git a/core/server/src/streaming/streams/topics.rs b/core/server/src/streaming/streams/topics.rs index b975ee83..ed350bee 100644 --- a/core/server/src/streaming/streams/topics.rs +++ b/core/server/src/streaming/streams/topics.rs @@ -146,7 +146,8 @@ impl Stream { topic.name = name.to_owned(); topic.message_expiry = message_expiry; topic.compression_algorithm = compression_algorithm; - for partition in topic.partitions.borrow_mut().values_mut() { + for partition in topic.partitions.values_mut() { + let mut partition = partition.write().await; partition.message_expiry = message_expiry; for segment in partition.segments.iter_mut() { segment.update_message_expiry(message_expiry); diff --git a/core/server/src/streaming/topics/consumer_group.rs b/core/server/src/streaming/topics/consumer_group.rs index cc28ef6f..f000ce79 100644 --- a/core/server/src/streaming/topics/consumer_group.rs +++ b/core/server/src/streaming/topics/consumer_group.rs @@ -52,12 +52,12 @@ impl ConsumerGroup { self.members.values().cloned().collect() } - pub async fn reassign_partitions(&mut self, partitions_count: u32) { + pub fn reassign_partitions(&mut self, partitions_count: u32) { self.partitions_count = partitions_count; - self.assign_partitions().await; + self.assign_partitions(); } - pub async fn calculate_partition_id( + pub fn calculate_partition_id( &mut self, member_id: u32, ) -> Result<Option<u32>, IggyError> { @@ -72,7 +72,7 @@ impl ConsumerGroup { )) } - pub async fn get_current_partition_id(&self, member_id: u32) -> Result<Option<u32>, IggyError> { + pub fn get_current_partition_id(&mut self, member_id: u32) -> Result<Option<u32>, IggyError> { let member = self.members.get(&member_id); if let Some(member) = member { return Ok(member.current_partition_id); @@ -84,7 +84,7 @@ impl ConsumerGroup { )) } - pub async fn add_member(&mut self, member_id: u32) { + pub fn add_member(&mut self, member_id: u32) { self.members.insert( member_id, ConsumerGroupMember { @@ -98,20 +98,20 @@ impl ConsumerGroup { "Added member with ID: {} to consumer group: {} for topic with ID: {}", member_id, self.group_id, self.topic_id ); - self.assign_partitions().await; + self.assign_partitions(); } - pub async fn delete_member(&mut self, member_id: u32) { + pub fn delete_member(&mut self, member_id: u32) { if self.members.remove(&member_id).is_some() { trace!( "Deleted member with ID: {} in consumer group: {} for topic with ID: {}", member_id, self.group_id, self.topic_id ); - self.assign_partitions().await; + self.assign_partitions(); } } - async fn assign_partitions(&mut self) { + fn assign_partitions(&mut self) { let mut members = self.members.values_mut().collect::<Vec<_>>(); if members.is_empty() { return; @@ -189,11 +189,10 @@ mod tests { members: AHashMap::new(), }; - consumer_group.add_member(member_id).await; + consumer_group.add_member(member_id); for i in 0..1000 { let partition_id = consumer_group .calculate_partition_id(member_id) - .await .unwrap() .expect("Partition ID not found"); assert_eq!(partition_id, (i % consumer_group.partitions_count) + 1); @@ -211,7 +210,7 @@ mod tests { members: AHashMap::new(), }; - consumer_group.add_member(member_id).await; + consumer_group.add_member(member_id); let member = consumer_group.members.get(&member_id).unwrap(); assert_eq!( member.partitions.len() as u32, @@ -235,8 +234,8 @@ mod tests { members: AHashMap::new(), }; - consumer_group.add_member(member1_id).await; - consumer_group.add_member(member2_id).await; + consumer_group.add_member(member1_id); + consumer_group.add_member(member2_id); let member1 = consumer_group.members.get(&member1_id).unwrap(); let member2 = consumer_group.members.get(&member2_id).unwrap(); assert_eq!( @@ -270,8 +269,8 @@ mod tests { members: AHashMap::new(), }; - consumer_group.add_member(member1_id).await; - consumer_group.add_member(member2_id).await; + consumer_group.add_member(member1_id); + consumer_group.add_member(member2_id); let member1 = consumer_group.members.get(&member1_id).unwrap(); let member2 = consumer_group.members.get(&member2_id).unwrap(); if member1.partitions.len() == 1 { diff --git a/core/server/src/streaming/topics/consumer_groups.rs b/core/server/src/streaming/topics/consumer_groups.rs index 1f50a1bb..c1371096 100644 --- a/core/server/src/streaming/topics/consumer_groups.rs +++ b/core/server/src/streaming/topics/consumer_groups.rs @@ -16,36 +16,39 @@ * under the License. */ -use crate::streaming::topics::COMPONENT; +use crate::binary::handlers::topics::get_topic_handler; +use crate::streaming::topics::{consumer_group, COMPONENT}; use crate::streaming::topics::consumer_group::ConsumerGroup; use crate::streaming::topics::topic::Topic; use error_set::ErrContext; +use iggy_common::locking::IggySharedMutFn; use iggy_common::IggyError; use iggy_common::{IdKind, Identifier}; +use std::cell::{Ref, RefMut}; use std::sync::atomic::Ordering; use tracing::info; impl Topic { pub fn reassign_consumer_groups(&mut self) { - if self.consumer_groups.is_empty() { + if self.consumer_groups.borrow().is_empty() { return; } - let partitions_count = self.partitions.borrow().len() as u32; + let partitions_count = self.partitions.len() as u32; info!( "Reassigning consumer groups for topic with ID: {} for stream with ID with {}, partitions count: {}", self.topic_id, self.stream_id, partitions_count ); - for (_, consumer_group) in self.consumer_groups.iter_mut() { + for (_, consumer_group) in self.consumer_groups.borrow_mut().iter_mut() { consumer_group.reassign_partitions(partitions_count); } } - pub fn get_consumer_groups(&self) -> Vec<&ConsumerGroup> { - self.consumer_groups.values().collect() + pub fn get_consumer_groups(&self) -> Vec<ConsumerGroup> { + self.consumer_groups.borrow().values().cloned().collect() } - pub fn get_consumer_group(&self, identifier: &Identifier) -> Result<&ConsumerGroup, IggyError> { + pub fn get_consumer_group(&self, identifier: &Identifier) -> Result<Ref<'_, ConsumerGroup>, IggyError> { match identifier.kind { IdKind::Numeric => self.get_consumer_group_by_id(identifier.get_u32_value().unwrap()), IdKind::String => self.get_consumer_group_by_name(&identifier.get_cow_str_value()?), @@ -55,22 +58,42 @@ impl Topic { pub fn try_get_consumer_group( &self, identifier: &Identifier, - ) -> Result<Option<&ConsumerGroup>, IggyError> { + ) -> Result<Option<Ref<'_, ConsumerGroup>>, IggyError> { match identifier.kind { - IdKind::Numeric => Ok(self.consumer_groups.get(&identifier.get_u32_value()?)), + IdKind::Numeric => Ok(self.try_get_consumer_group_by_id(&identifier.get_u32_value()?)), IdKind::String => { Ok(self.try_get_consumer_group_by_name(&identifier.get_cow_str_value()?)) } } } + fn try_get_consumer_group_by_id(&self, id: &u32) -> Option<Ref<'_, ConsumerGroup>> { + let consumer_groups = self.consumer_groups.borrow(); + let exists = consumer_groups.contains_key(id); + if !exists { + return None; + } + + Some(Ref::map(consumer_groups, |cg| { + let consumer_group = cg.get(id); + consumer_group.unwrap() + })) + } + + fn try_get_consumer_group_by_name(&self, name: &str) -> Option<Ref<'_, ConsumerGroup>> { + let consumer_groups = self.consumer_groups.borrow(); + let exists = self.consumer_groups_ids.contains_key(name); + let id = self.consumer_groups_ids.get(name).unwrap(); + if !exists { + return None; + } - fn try_get_consumer_group_by_name(&self, name: &str) -> Option<&ConsumerGroup> { - self.consumer_groups_ids - .get(name) - .and_then(|id| self.consumer_groups.get(id)) + Some(Ref::map(consumer_groups, |cg| { + let consumer_group = cg.get(id); + consumer_group.unwrap() + })) } - pub fn get_consumer_group_by_name(&self, name: &str) -> Result<&ConsumerGroup, IggyError> { + pub fn get_consumer_group_by_name(&self, name: &str) -> Result<Ref<'_, ConsumerGroup>, IggyError> { let group_id = self.consumer_groups_ids.get(name); if group_id.is_none() { return Err(IggyError::ConsumerGroupNameNotFound( @@ -82,19 +105,24 @@ impl Topic { self.get_consumer_group_by_id(*group_id.unwrap()) } - pub fn get_consumer_group_by_id(&self, id: u32) -> Result<&ConsumerGroup, IggyError> { - let consumer_group = self.consumer_groups.get(&id); - if consumer_group.is_none() { + pub fn get_consumer_group_by_id(&self, id: u32) -> Result<Ref<'_, ConsumerGroup>, IggyError> { + let consumer_groups = self.consumer_groups.borrow(); + let exists = consumer_groups.contains_key(&id); + if !exists { return Err(IggyError::ConsumerGroupIdNotFound(id, self.topic_id)); } + let consumer_group = Ref::map(consumer_groups, |cg| { + let consumer_group = cg.get(&id); + consumer_group.unwrap() + }); - Ok(consumer_group.unwrap()) + Ok(consumer_group) } pub fn get_consumer_group_mut( - &mut self, + &self, identifier: &Identifier, - ) -> Result<&mut ConsumerGroup, IggyError> { + ) -> Result<RefMut<'_, ConsumerGroup>, IggyError> { match identifier.kind { IdKind::Numeric => { self.get_consumer_group_by_id_mut(identifier.get_u32_value().unwrap()) @@ -104,21 +132,25 @@ impl Topic { } pub fn get_consumer_group_by_id_mut( - &mut self, + &self, id: u32, - ) -> Result<&mut ConsumerGroup, IggyError> { - let consumer_group = self.consumer_groups.get_mut(&id); - if consumer_group.is_none() { + ) -> Result<RefMut<'_, ConsumerGroup>, IggyError> { + let consumer_groups = self.consumer_groups.borrow_mut(); + let exists = consumer_groups.contains_key(&id); + if !exists { return Err(IggyError::ConsumerGroupIdNotFound(id, self.topic_id)); } - - Ok(consumer_group.unwrap()) + let consumer_group = RefMut::map(consumer_groups, |cg| { + let consumer_group = cg.get_mut(&id); + consumer_group.unwrap() + }); + Ok(consumer_group) } pub fn get_consumer_group_by_name_mut( - &mut self, + &self, name: &str, - ) -> Result<&mut ConsumerGroup, IggyError> { + ) -> Result<RefMut<'_, ConsumerGroup>, IggyError> { let group_id = self.consumer_groups_ids.get(name).copied(); if group_id.is_none() { return Err(IggyError::ConsumerGroupNameNotFound( @@ -148,7 +180,7 @@ impl Topic { .current_consumer_group_id .fetch_add(1, Ordering::SeqCst); loop { - if self.consumer_groups.contains_key(&id) { + if self.consumer_groups.borrow().contains_key(&id) { if id == u32::MAX { return Err(IggyError::ConsumerGroupIdAlreadyExists(id, self.topic_id)); } @@ -163,7 +195,7 @@ impl Topic { id = group_id.unwrap(); } - if self.consumer_groups.contains_key(&id) { + if self.consumer_groups.borrow().contains_key(&id) { return Err(IggyError::ConsumerGroupIdAlreadyExists(id, self.topic_id)); } @@ -171,11 +203,11 @@ impl Topic { self.topic_id, id, name, - self.partitions.borrow().len() as u32, + self.partitions.len() as u32, ); self.consumer_groups_ids.insert(name.to_owned(), id); let cloned_group = consumer_group.clone(); - self.consumer_groups.insert(id, consumer_group); + self.consumer_groups.borrow_mut().insert(id, consumer_group); info!( "Created consumer group with ID: {} for topic with ID: {} and stream with ID: {}.", id, self.topic_id, self.stream_id @@ -195,11 +227,14 @@ impl Topic { group_id = consumer_group.group_id; } - let consumer_group = self.consumer_groups.remove(&group_id); + let mut consumer_groups = self.consumer_groups.borrow_mut(); + let consumer_group = consumer_groups.remove(&group_id); if consumer_group.is_none() { return Err(IggyError::ConsumerGroupIdNotFound(group_id, self.topic_id)); } let consumer_group = consumer_group.unwrap(); + let consumer_group = consumer_group.clone(); + drop(consumer_groups); { self.consumer_groups_ids.remove(&consumer_group.name); let current_group_id = self.current_consumer_group_id.load(Ordering::SeqCst); @@ -208,7 +243,8 @@ impl Topic { .store(group_id, Ordering::SeqCst); } - for (_, partition) in self.partitions.borrow().iter() { + for (_, partition) in self.partitions.iter() { + let partition = partition.read().await; if let Some((_, offset)) = partition.consumer_group_offsets.remove(&group_id) { self.storage .partition @@ -231,13 +267,15 @@ impl Topic { group_id: &Identifier, member_id: u32, ) -> Result<(), IggyError> { - let consumer_group = self.get_consumer_group_mut(group_id).with_error_context(|error| { + let topic_id = self.topic_id; + let stream_id = self.stream_id; + let mut consumer_group = self.get_consumer_group_mut(group_id).with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to get consumer group with id: {group_id}") })?; consumer_group.add_member(member_id); info!( "Member with ID: {} has joined consumer group with ID: {} for topic with ID: {} and stream with ID: {}.", - member_id, group_id, self.topic_id, self.stream_id + member_id, group_id, topic_id, stream_id ); Ok(()) } @@ -247,13 +285,15 @@ impl Topic { group_id: &Identifier, member_id: u32, ) -> Result<(), IggyError> { - let consumer_group = self.get_consumer_group_mut(group_id).with_error_context(|error| { + let topic_id = self.topic_id; + let stream_id = self.stream_id; + let mut consumer_group = self.get_consumer_group_mut(group_id).with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to get consumer group with id: {group_id}") })?; consumer_group.delete_member(member_id); info!( "Member with ID: {} has left consumer group with ID: {} for topic with ID: {} and stream with ID: {}.", - member_id, group_id, self.topic_id, self.stream_id + member_id, group_id, topic_id, stream_id ); Ok(()) } @@ -286,7 +326,7 @@ mod tests { assert_eq!(created_consumer_group.topic_id, topic_id); } - assert_eq!(topic.consumer_groups.len(), 1); + assert_eq!(topic.consumer_groups.borrow().len(), 1); let consumer_group = topic .get_consumer_group(&Identifier::numeric(group_id).unwrap()) .unwrap(); @@ -295,7 +335,7 @@ mod tests { assert_eq!(consumer_group.topic_id, topic_id); assert_eq!( consumer_group.partitions_count, - topic.partitions.borrow().len() as u32 + topic.partitions.len() as u32 ); } @@ -306,12 +346,12 @@ mod tests { let mut topic = get_topic().await; let result = topic.create_consumer_group(Some(group_id), name); assert!(result.is_ok()); - assert_eq!(topic.consumer_groups.len(), 1); + assert_eq!(topic.consumer_groups.borrow().len(), 1); let result = topic.create_consumer_group(Some(group_id), "test2"); assert!(result.is_err()); let err = result.unwrap_err(); assert!(matches!(err, IggyError::ConsumerGroupIdAlreadyExists(_, _))); - assert_eq!(topic.consumer_groups.len(), 1); + assert_eq!(topic.consumer_groups.borrow().len(), 1); } #[tokio::test] @@ -321,7 +361,7 @@ mod tests { let mut topic = get_topic().await; let result = topic.create_consumer_group(Some(group_id), name); assert!(result.is_ok()); - assert_eq!(topic.consumer_groups.len(), 1); + assert_eq!(topic.consumer_groups.borrow().len(), 1); let group_id = group_id + 1; let result = topic.create_consumer_group(Some(group_id), name); assert!(result.is_err()); @@ -330,7 +370,7 @@ mod tests { err, IggyError::ConsumerGroupNameAlreadyExists(_, _) )); - assert_eq!(topic.consumer_groups.len(), 1); + assert_eq!(topic.consumer_groups.borrow().len(), 1); } #[tokio::test] @@ -340,12 +380,12 @@ mod tests { let mut topic = get_topic().await; let result = topic.create_consumer_group(Some(group_id), name); assert!(result.is_ok()); - assert_eq!(topic.consumer_groups.len(), 1); + assert_eq!(topic.consumer_groups.borrow().len(), 1); let result = topic .delete_consumer_group(&Identifier::numeric(group_id).unwrap()) .await; assert!(result.is_ok()); - assert!(topic.consumer_groups.is_empty()); + assert!(topic.consumer_groups.borrow().is_empty()); } #[tokio::test] @@ -355,13 +395,13 @@ mod tests { let mut topic = get_topic().await; let result = topic.create_consumer_group(Some(group_id), name); assert!(result.is_ok()); - assert_eq!(topic.consumer_groups.len(), 1); + assert_eq!(topic.consumer_groups.borrow().len(), 1); let group_id = group_id + 1; let result = topic .delete_consumer_group(&Identifier::numeric(group_id).unwrap()) .await; assert!(result.is_err()); - assert_eq!(topic.consumer_groups.len(), 1); + assert_eq!(topic.consumer_groups.borrow().len(), 1); } #[tokio::test] diff --git a/core/server/src/streaming/topics/consumer_offsets.rs b/core/server/src/streaming/topics/consumer_offsets.rs index f59e9276..8748576b 100644 --- a/core/server/src/streaming/topics/consumer_offsets.rs +++ b/core/server/src/streaming/topics/consumer_offsets.rs @@ -34,7 +34,6 @@ impl Topic { ) -> Result<(), IggyError> { let Some((polling_consumer, partition_id)) = self .resolve_consumer_with_partition_id(&consumer, client_id, partition_id, false) - .await .with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to resolve consumer with partition id, consumer ID: {}, client ID: {}, partition ID: {:?}", consumer.id, client_id, partition_id))? else { return Err(IggyError::ConsumerOffsetNotFound(client_id)); }; @@ -78,8 +77,7 @@ impl Topic { ) -> Result<Option<ConsumerOffsetInfo>, IggyError> { let Some((polling_consumer, partition_id)) = self .resolve_consumer_with_partition_id(consumer, client_id, partition_id, false) - .await - .with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to resolve consumer offset for consumer: {consumer}, client ID: {client_id}, partition ID: {partition_id:#?}"))? else { + .with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to resolve consumer offset for consumer: {consumer}, client ID: {client_id}, partition ID: {:#?}", partition_id))? else { return Ok(None); }; @@ -93,7 +91,6 @@ impl Topic { let partition = partition.read().await; let offset = partition .get_consumer_offset(polling_consumer) - .await .with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - failed to get consumer offset for consumer: {polling_consumer}" @@ -118,7 +115,6 @@ impl Topic { ) -> Result<(), IggyError> { let Some((polling_consumer, partition_id)) = self .resolve_consumer_with_partition_id(&consumer, client_id, partition_id, false) - .await .with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to resolve consumer with partition id, consumer ID: {}, client ID: {}, partition ID: {:?}", consumer.id, client_id, partition_id))? else { return Err(IggyError::ConsumerOffsetNotFound(client_id)); }; diff --git a/core/server/src/streaming/topics/messages.rs b/core/server/src/streaming/topics/messages.rs index 56d7c1c7..e8c52385 100644 --- a/core/server/src/streaming/topics/messages.rs +++ b/core/server/src/streaming/topics/messages.rs @@ -209,6 +209,7 @@ mod tests { use bytes::Bytes; use iggy_common::CompressionAlgorithm; use iggy_common::{IggyMessage, MaxTopicSize}; + use std::rc::Rc; use std::sync::Arc; use std::sync::atomic::AtomicU32; use std::sync::atomic::AtomicU64; @@ -321,11 +322,11 @@ mod tests { async fn init_topic(partitions_count: u32) -> Topic { let tempdir = tempfile::TempDir::new().unwrap(); - let config = Arc::new(SystemConfig { + let config = Rc::new(SystemConfig { path: tempdir.path().to_str().unwrap().to_string(), ..Default::default() }); - let storage = Arc::new(SystemStorage::new( + let storage = Rc::new(SystemStorage::new( config.clone(), Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})), )); diff --git a/core/server/src/streaming/topics/partitions.rs b/core/server/src/streaming/topics/partitions.rs index eb6b4280..dfedf0a9 100644 --- a/core/server/src/streaming/topics/partitions.rs +++ b/core/server/src/streaming/topics/partitions.rs @@ -20,7 +20,7 @@ use crate::streaming::partitions::partition::Partition; use crate::streaming::topics::COMPONENT; use crate::streaming::topics::topic::Topic; use error_set::ErrContext; -use iggy_common::locking::{IggySharedMut, IggySharedMutFn}; +use iggy_common::locking::{IggyRwLock, IggySharedMutFn}; use iggy_common::{IggyError, IggyTimestamp}; const MAX_PARTITIONS_COUNT: u32 = 100_000; @@ -63,7 +63,7 @@ impl Topic { ) .await; self.partitions - .insert(partition_id, IggySharedMut::new(partition)); + .insert(partition_id, IggyRwLock::new(partition)); partition_ids.push(partition_id) } diff --git a/core/server/src/streaming/topics/storage.rs b/core/server/src/streaming/topics/storage.rs index 1cc33310..ae9435e5 100644 --- a/core/server/src/streaming/topics/storage.rs +++ b/core/server/src/streaming/topics/storage.rs @@ -27,7 +27,7 @@ use anyhow::Context; use error_set::ErrContext; use futures::future::join_all; use iggy_common::IggyError; -use iggy_common::locking::IggySharedMut; +use iggy_common::locking::IggyRwLock; use iggy_common::locking::IggySharedMutFn; use serde::{Deserialize, Serialize}; use std::path::Path; @@ -189,7 +189,7 @@ impl TopicStorage for FileTopicStorage { for mut partition in unloaded_partitions { let loaded_partitions = loaded_partitions.clone(); let partition_state = state.partitions.remove(&partition.partition_id).unwrap(); - let load_partition = tokio::spawn(async move { + let load_partition = monoio::spawn(async move { match partition.load(partition_state).await { Ok(_) => { loaded_partitions.lock().await.push(partition); @@ -209,7 +209,7 @@ impl TopicStorage for FileTopicStorage { for partition in loaded_partitions.lock().await.drain(..) { topic .partitions - .insert(partition.partition_id, IggySharedMut::new(partition)); + .insert(partition.partition_id, IggyRwLock::new(partition)); } for consumer_group in state.consumer_groups.into_values() { @@ -224,7 +224,8 @@ impl TopicStorage for FileTopicStorage { .insert(consumer_group.name.to_owned(), consumer_group.group_id); topic .consumer_groups - .insert(consumer_group.group_id, RwLock::new(consumer_group)); + .borrow_mut() + .insert(consumer_group.group_id, consumer_group); } info!("Loaded topic {topic}"); diff --git a/core/server/src/streaming/topics/topic.rs b/core/server/src/streaming/topics/topic.rs index ad3e25ef..16aa324c 100644 --- a/core/server/src/streaming/topics/topic.rs +++ b/core/server/src/streaming/topics/topic.rs @@ -25,7 +25,7 @@ use ahash::AHashMap; use core::fmt; use std::cell::RefCell; use std::rc::Rc; -use iggy_common::locking::IggySharedMut; +use iggy_common::locking::IggyRwLock; use iggy_common::{ CompressionAlgorithm, Consumer, ConsumerKind, IggyByteSize, IggyError, IggyExpiry, IggyTimestamp, MaxTopicSize, Sizeable, @@ -33,7 +33,6 @@ use iggy_common::{ use std::sync::Arc; use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; -use tokio::sync::RwLock; use tracing::info; const ALMOST_FULL_THRESHOLD: f64 = 0.9; @@ -51,9 +50,9 @@ pub struct Topic { pub(crate) messages_count: Arc<AtomicU64>, pub(crate) segments_count_of_parent_stream: Arc<AtomicU32>, pub(crate) config: Rc<SystemConfig>, - pub(crate) partitions: RefCell<AHashMap<u32, Partition>>, + pub(crate) partitions: AHashMap<u32, IggyRwLock<Partition>>, pub(crate) storage: Rc<SystemStorage>, - pub(crate) consumer_groups: AHashMap<u32, ConsumerGroup>, + pub(crate) consumer_groups: RefCell<AHashMap<u32, ConsumerGroup>>, pub(crate) consumer_groups_ids: AHashMap<String, u32>, pub(crate) current_consumer_group_id: AtomicU32, pub(crate) current_partition_id: AtomicU32, @@ -126,7 +125,7 @@ impl Topic { messages_count_of_parent_stream, messages_count: Arc::new(AtomicU64::new(0)), segments_count_of_parent_stream, - consumer_groups: AHashMap::new(), + consumer_groups: RefCell::new(AHashMap::new()), consumer_groups_ids: AHashMap::new(), current_consumer_group_id: AtomicU32::new(1), current_partition_id: AtomicU32::new(1), @@ -172,11 +171,11 @@ impl Topic { matches!(self.max_topic_size, MaxTopicSize::Unlimited) } - pub fn get_partitions(&self) -> Vec<IggySharedMut<Partition>> { + pub fn get_partitions(&self) -> Vec<IggyRwLock<Partition>> { self.partitions.values().cloned().collect() } - pub fn get_partition(&self, partition_id: u32) -> Result<IggySharedMut<Partition>, IggyError> { + pub fn get_partition(&self, partition_id: u32) -> Result<IggyRwLock<Partition>, IggyError> { match self.partitions.get(&partition_id) { Some(partition_arc) => Ok(partition_arc.clone()), None => Err(IggyError::PartitionNotFound( @@ -187,7 +186,7 @@ impl Topic { } } - pub async fn resolve_consumer_with_partition_id( + pub fn resolve_consumer_with_partition_id( &self, consumer: &Consumer, client_id: u32, @@ -203,7 +202,7 @@ impl Topic { ))) } ConsumerKind::ConsumerGroup => { - let consumer_group = self.get_consumer_group(&consumer.id)?.read().await; + let mut consumer_group = self.get_consumer_group_mut(&consumer.id)?; if let Some(partition_id) = partition_id { return Ok(Some(( PollingConsumer::consumer_group(consumer_group.group_id, client_id), @@ -212,9 +211,9 @@ impl Topic { } let partition_id = if calculate_partition_id { - consumer_group.calculate_partition_id(client_id).await? + consumer_group.calculate_partition_id(client_id)? } else { - consumer_group.get_current_partition_id(client_id).await? + consumer_group.get_current_partition_id(client_id)? }; let Some(partition_id) = partition_id else { return Ok(None); @@ -291,11 +290,11 @@ mod tests { #[tokio::test] async fn should_be_created_given_valid_parameters() { let tempdir = tempfile::TempDir::new().unwrap(); - let config = Arc::new(SystemConfig { + let config = Rc::new(SystemConfig { path: tempdir.path().to_str().unwrap().to_string(), ..Default::default() }); - let storage = Arc::new(SystemStorage::new( + let storage = Rc::new(SystemStorage::new( config.clone(), Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})), ));
