This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch rebase_master in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 81c29b1b7276d684a88a5d56157a33fb8602df5f Author: numinex <[email protected]> AuthorDate: Fri Jun 27 12:26:28 2025 +0200 finish handlers --- Cargo.lock | 16 --------- .../create_consumer_group_handler.rs | 28 +++++++++++----- .../delete_consumer_group_handler.rs | 13 ++++---- .../consumer_groups/get_consumer_group_handler.rs | 23 +++++++++---- .../consumer_groups/get_consumer_groups_handler.rs | 12 +++---- .../consumer_groups/join_consumer_group_handler.rs | 11 +++---- .../leave_consumer_group_handler.rs | 13 ++++---- .../delete_consumer_offset_handler.rs | 10 +++--- .../get_consumer_offset_handler.rs | 10 +++--- .../store_consumer_offset_handler.rs | 11 ++++--- .../messages/flush_unsaved_buffer_handler.rs | 10 +++--- .../handlers/messages/poll_messages_handler.rs | 23 +++++++------ .../handlers/messages/send_messages_handler.rs | 38 +++++++++++++--------- .../partitions/create_partitions_handler.rs | 13 ++++---- .../partitions/delete_partitions_handler.rs | 13 ++++---- .../create_personal_access_token_handler.rs | 14 ++++---- .../delete_personal_access_token_handler.rs | 14 ++++---- .../get_personal_access_tokens_handler.rs | 13 ++++---- .../login_with_personal_access_token_handler.rs | 11 +++---- .../handlers/segments/delete_segments_handler.rs | 13 ++++---- .../handlers/streams/create_stream_handler.rs | 21 +++++++----- .../handlers/streams/delete_stream_handler.rs | 13 ++++---- .../binary/handlers/streams/get_stream_handler.rs | 12 +++---- .../binary/handlers/streams/get_streams_handler.rs | 12 +++---- .../handlers/streams/purge_stream_handler.rs | 12 +++---- .../handlers/streams/update_stream_handler.rs | 13 ++++---- .../binary/handlers/system/get_client_handler.rs | 12 +++---- .../binary/handlers/system/get_clients_handler.rs | 13 ++++---- .../src/binary/handlers/system/get_me_handler.rs | 13 +++----- .../src/binary/handlers/system/get_snapshot.rs | 10 +++--- .../binary/handlers/system/get_stats_handler.rs | 10 +++--- .../src/binary/handlers/system/ping_handler.rs | 17 +++++----- .../binary/handlers/topics/create_topic_handler.rs | 29 ++++++++++++----- .../binary/handlers/topics/delete_topic_handler.rs | 13 ++++---- .../binary/handlers/topics/get_topic_handler.rs | 19 ++++++++--- .../binary/handlers/topics/get_topics_handler.rs | 21 ++++++++---- .../binary/handlers/topics/purge_topic_handler.rs | 12 +++---- .../binary/handlers/topics/update_topic_handler.rs | 26 ++++++++++----- .../handlers/users/change_password_handler.rs | 16 ++++----- .../binary/handlers/users/create_user_handler.rs | 16 ++++----- .../binary/handlers/users/delete_user_handler.rs | 17 +++++----- .../src/binary/handlers/users/get_user_handler.rs | 13 ++++---- .../src/binary/handlers/users/get_users_handler.rs | 20 +++++------- .../binary/handlers/users/login_user_handler.rs | 14 ++++---- .../binary/handlers/users/logout_user_handler.rs | 20 +++++------- .../handlers/users/update_permissions_handler.rs | 15 ++++----- .../binary/handlers/users/update_user_handler.rs | 15 ++++----- core/server/src/binary/mapper.rs | 22 +++++++------ core/server/src/binary/sender.rs | 23 ++++++++----- core/server/src/lib.rs | 9 +++++ core/server/src/quic/quic_sender.rs | 3 +- core/server/src/shard/system/clients.rs | 4 +-- core/server/src/shard/system/consumer_groups.rs | 10 ++---- core/server/src/shard/system/messages.rs | 4 --- .../src/shard/system/personal_access_tokens.rs | 8 ++--- core/server/src/shard/system/snapshot/mod.rs | 5 +++ core/server/src/shard/system/streams.rs | 6 ++-- core/server/src/shard/system/topics.rs | 15 +++------ .../server/src/streaming/clients/client_manager.rs | 8 +++-- .../server/src/streaming/topics/consumer_groups.rs | 5 ++- core/server/src/streaming/utils/pooled_buffer.rs | 15 +++++++++ core/server/src/tcp/sender.rs | 7 ++-- core/server/src/tcp/tcp_sender.rs | 2 +- core/server/src/tcp/tcp_tls_sender.rs | 3 +- 64 files changed, 467 insertions(+), 415 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3e51b287..19be4ecb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -143,7 +143,6 @@ dependencies = [ "futures-core", "futures-util", "mio 1.0.4", - "mio 1.0.4", "socket2", "tokio", "tracing", @@ -653,7 +652,6 @@ dependencies = [ "proc-macro2", "quote", "syn 2.0.104", - "syn 2.0.104", ] [[package]] @@ -4806,18 +4804,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "mio" -version = "0.8.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" -dependencies = [ - "libc", - "log", - "wasi 0.11.1+wasi-snapshot-preview1", - "windows-sys 0.48.0", -] - [[package]] name = "mio" version = "1.0.4" @@ -4916,7 +4902,6 @@ dependencies = [ "proc-macro2", "quote", "syn 2.0.104", - "syn 2.0.104", ] [[package]] @@ -7643,7 +7628,6 @@ dependencies = [ "bytes", "libc", "mio 1.0.4", - "mio 1.0.4", "parking_lot 0.12.4", "pin-project-lite", "signal-hook-registry", diff --git a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs index 5d695687..266b46a6 100644 --- a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs +++ b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs @@ -16,8 +16,6 @@ * under the License. */ -use std::rc::Rc; - use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; @@ -30,6 +28,7 @@ use anyhow::Result; use error_set::ErrContext; use iggy_common::IggyError; use iggy_common::create_consumer_group::CreateConsumerGroup; +use std::rc::Rc; use tracing::{debug, instrument}; impl ServerCommandHandler for CreateConsumerGroup { @@ -47,7 +46,7 @@ impl ServerCommandHandler for CreateConsumerGroup { ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let consumer_group = shard + let consumer_group_id = shard .create_consumer_group( session, &self.stream_id, @@ -55,23 +54,34 @@ impl ServerCommandHandler for CreateConsumerGroup { self.group_id, &self.name, ) - .await .with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - failed to create consumer group for stream_id: {}, topic_id: {}, group_id: {:?}, session: {session}", self.stream_id, self.topic_id, self.group_id ) })?; - let consumer_group = consumer_group.read().await; + + let stream = shard.find_stream(session, &self.stream_id) + .with_error_context(|error| { + format!( + "{COMPONENT} (error: {error}) - failed to find stream with ID: {} for session: {}", + self.stream_id, session + ) + })?; + let consumer_group = shard.get_consumer_group(session, &stream, &self.topic_id, &consumer_group_id) + .with_error_context(|error| { + format!( + "{COMPONENT} (error: {error}) - failed to get consumer group for stream_id: {}, topic_id: {}, group_id: {:?}, session: {session}", + self.stream_id, self.topic_id, consumer_group_id + ) + })?.unwrap(); let group_id = consumer_group.group_id; - let response = mapper::map_consumer_group(&consumer_group).await; - drop(consumer_group); + let response = mapper::map_consumer_group(&consumer_group); - let system = system.downgrade(); let stream_id = self.stream_id.clone(); let topic_id = self.topic_id.clone(); - system + shard .state .apply( session.get_user_id(), diff --git a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs index 828d999f..b674baf1 100644 --- a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs +++ b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs @@ -19,13 +19,14 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::consumer_groups::COMPONENT, sender::SenderKind}; +use crate::shard::IggyShard; use crate::state::command::EntryCommand; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use error_set::ErrContext; use iggy_common::IggyError; use iggy_common::delete_consumer_group::DeleteConsumerGroup; +use std::rc::Rc; use tracing::{debug, instrument}; impl ServerCommandHandler for DeleteConsumerGroup { @@ -38,13 +39,12 @@ impl ServerCommandHandler for DeleteConsumerGroup { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let mut system = system.write().await; - system + shard .delete_consumer_group( session, &self.stream_id, @@ -56,12 +56,11 @@ impl ServerCommandHandler for DeleteConsumerGroup { self.group_id, self.topic_id, self.stream_id, session ))?; - let system = system.downgrade(); let stream_id = self.stream_id.clone(); let topic_id = self.topic_id.clone(); let group_id = self.group_id.clone(); - system + shard .state .apply( session.get_user_id(), diff --git a/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs b/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs index 694ac38c..8f67192b 100644 --- a/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs +++ b/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs @@ -20,13 +20,17 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHa use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::binary::sender::SenderKind; +use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; +use error_set::ErrContext; use iggy_common::IggyError; use iggy_common::get_consumer_group::GetConsumerGroup; +use std::rc::Rc; use tracing::debug; +use super::COMPONENT; + impl ServerCommandHandler for GetConsumerGroup { fn code(&self) -> u32 { iggy_common::GET_CONSUMER_GROUP_CODE @@ -36,14 +40,20 @@ impl ServerCommandHandler for GetConsumerGroup { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let system = system.read().await; + let stream_id = &self.stream_id; + let stream = shard.find_stream(session, &self.stream_id) + .with_error_context(|error| { + format!( + "{COMPONENT} (error: {error}) - failed to get stream for stream_id: {stream_id}" + ) + })?; let Ok(consumer_group) = - system.get_consumer_group(session, &self.stream_id, &self.topic_id, &self.group_id) + shard.get_consumer_group(session, &stream, &self.topic_id, &self.group_id) else { sender.send_empty_ok_response().await?; return Ok(()); @@ -53,8 +63,7 @@ impl ServerCommandHandler for GetConsumerGroup { return Ok(()); }; - let consumer_group = consumer_group.read().await; - let consumer_group = mapper::map_consumer_group(&consumer_group).await; + let consumer_group = mapper::map_consumer_group(&consumer_group); sender.send_ok_response(&consumer_group).await?; Ok(()) } diff --git a/core/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs b/core/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs index 7ff6389f..bc6b60b4 100644 --- a/core/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs +++ b/core/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs @@ -21,12 +21,13 @@ use crate::binary::handlers::consumer_groups::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::binary::sender::SenderKind; +use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use error_set::ErrContext; use iggy_common::IggyError; use iggy_common::get_consumer_groups::GetConsumerGroups; +use std::rc::Rc; use tracing::debug; impl ServerCommandHandler for GetConsumerGroups { @@ -38,12 +39,11 @@ impl ServerCommandHandler for GetConsumerGroups { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let system = system.read().await; - let consumer_groups = system + let consumer_groups = shard .get_consumer_groups(session, &self.stream_id, &self.topic_id) .with_error_context(|error| { format!( @@ -51,7 +51,7 @@ impl ServerCommandHandler for GetConsumerGroups { self.stream_id, self.topic_id, session ) })?; - let consumer_groups = mapper::map_consumer_groups(&consumer_groups).await; + let consumer_groups = mapper::map_consumer_groups(consumer_groups); sender.send_ok_response(&consumer_groups).await?; Ok(()) } diff --git a/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs b/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs index 3d01d41a..f72609c3 100644 --- a/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs +++ b/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs @@ -19,12 +19,13 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::consumer_groups::COMPONENT, sender::SenderKind}; +use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use error_set::ErrContext; use iggy_common::IggyError; use iggy_common::join_consumer_group::JoinConsumerGroup; +use std::rc::Rc; use tracing::{debug, instrument}; impl ServerCommandHandler for JoinConsumerGroup { @@ -37,19 +38,17 @@ impl ServerCommandHandler for JoinConsumerGroup { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let system = system.read().await; - system + shard .join_consumer_group( session, &self.stream_id, &self.topic_id, &self.group_id, ) - .await .with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - failed to join consumer group for stream_id: {}, topic_id: {}, group_id: {}, session: {}", diff --git a/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs b/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs index e094531d..ba352bf9 100644 --- a/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs +++ b/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs @@ -18,14 +18,16 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; -use crate::binary::{handlers::consumer_groups::COMPONENT, sender::SenderKind}; +use crate::binary::sender::SenderKind; +use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use error_set::ErrContext; use iggy_common::IggyError; use iggy_common::leave_consumer_group::LeaveConsumerGroup; +use std::rc::Rc; use tracing::{debug, instrument}; +use super::COMPONENT; impl ServerCommandHandler for LeaveConsumerGroup { fn code(&self) -> u32 { @@ -37,13 +39,12 @@ impl ServerCommandHandler for LeaveConsumerGroup { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let system = system.read().await; - system + shard .leave_consumer_group( session, &self.stream_id, diff --git a/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs b/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs index f639223d..96c1e43c 100644 --- a/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs +++ b/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs @@ -20,12 +20,13 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHa use crate::binary::handlers::consumer_offsets::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::sender::SenderKind; +use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use error_set::ErrContext; use iggy_common::IggyError; use iggy_common::delete_consumer_offset::DeleteConsumerOffset; +use std::rc::Rc; use tracing::debug; impl ServerCommandHandler for DeleteConsumerOffset { @@ -37,12 +38,11 @@ impl ServerCommandHandler for DeleteConsumerOffset { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let system = system.read().await; - system + shard .delete_consumer_offset( session, self.consumer, diff --git a/core/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs b/core/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs index dbe119ff..04c89852 100644 --- a/core/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs +++ b/core/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs @@ -20,11 +20,12 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHa use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::binary::sender::SenderKind; +use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use iggy_common::IggyError; use iggy_common::get_consumer_offset::GetConsumerOffset; +use std::rc::Rc; use tracing::debug; impl ServerCommandHandler for GetConsumerOffset { @@ -36,12 +37,11 @@ impl ServerCommandHandler for GetConsumerOffset { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let system = system.read().await; - let Ok(offset) = system + let Ok(offset) = shard .get_consumer_offset( session, &self.consumer, diff --git a/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs b/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs index c0343bcb..db770e4a 100644 --- a/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs +++ b/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs @@ -16,12 +16,14 @@ * under the License. */ +use std::rc::Rc; + use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::consumer_offsets::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::sender::SenderKind; +use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use error_set::ErrContext; use iggy_common::IggyError; @@ -37,12 +39,11 @@ impl ServerCommandHandler for StoreConsumerOffset { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let system = system.read().await; - system + shard .store_consumer_offset( session, self.consumer, diff --git a/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs b/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs index 5a50a50d..45bc9db6 100644 --- a/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs +++ b/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs @@ -19,11 +19,12 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::messages::COMPONENT, sender::SenderKind}; +use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use error_set::ErrContext; use iggy_common::{FlushUnsavedBuffer, IggyError}; +use std::rc::Rc; use tracing::{debug, instrument}; impl ServerCommandHandler for FlushUnsavedBuffer { @@ -36,17 +37,16 @@ impl ServerCommandHandler for FlushUnsavedBuffer { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let system = system.read().await; let stream_id = self.stream_id.clone(); let topic_id = self.topic_id.clone(); let partition_id = self.partition_id; let fsync = self.fsync; - system + shard .flush_unsaved_buffer(session, stream_id, topic_id, partition_id, fsync) .await .with_error_context(|error| { diff --git a/core/server/src/binary/handlers/messages/poll_messages_handler.rs b/core/server/src/binary/handlers/messages/poll_messages_handler.rs index b0a2ccea..a591f691 100644 --- a/core/server/src/binary/handlers/messages/poll_messages_handler.rs +++ b/core/server/src/binary/handlers/messages/poll_messages_handler.rs @@ -20,13 +20,15 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHa use crate::binary::handlers::messages::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::sender::SenderKind; +use crate::shard::system::messages::PollingArgs; +use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::messages::PollingArgs; -use crate::streaming::systems::system::SharedSystem; +use crate::to_iovec; use anyhow::Result; use error_set::ErrContext; use iggy_common::{IggyError, PollMessages}; use std::io::IoSlice; +use std::rc::Rc; use tracing::{debug, trace}; #[derive(Debug)] @@ -53,13 +55,12 @@ impl ServerCommandHandler for PollMessages { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let system = system.read().await; - let (metadata, messages) = system + let (metadata, messages) = shard .poll_messages( session, &self.consumer, @@ -73,7 +74,6 @@ impl ServerCommandHandler for PollMessages { "{COMPONENT} (error: {error}) - failed to poll messages for consumer: {}, stream_id: {}, topic_id: {}, partition_id: {:?}, session: {session}.", self.consumer, self.stream_id, self.topic_id, self.partition_id ))?; - drop(system); // Collect all chunks first into a Vec to extend their lifetimes. // This ensures the Bytes (in reality Arc<[u8]>) references from each IggyMessagesBatch stay alive @@ -89,12 +89,11 @@ impl ServerCommandHandler for PollMessages { let count = messages.count().to_le_bytes(); let mut io_slices = Vec::with_capacity(messages.containers_count() + 3); - io_slices.push(IoSlice::new(&partition_id)); - io_slices.push(IoSlice::new(¤t_offset)); - io_slices.push(IoSlice::new(&count)); - - io_slices.extend(messages.iter().map(|m| IoSlice::new(m))); + io_slices.push(to_iovec(&partition_id)); + io_slices.push(to_iovec(¤t_offset)); + io_slices.push(to_iovec(&count)); + io_slices.extend(messages.iter().map(|m| to_iovec(&m))); trace!( "Sending {} messages to client ({} bytes) to client", messages.count(), diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs b/core/server/src/binary/handlers/messages/send_messages_handler.rs index 17821a16..5603dfe9 100644 --- a/core/server/src/binary/handlers/messages/send_messages_handler.rs +++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs @@ -18,15 +18,17 @@ use crate::binary::command::{BinaryServerCommand, ServerCommandHandler}; use crate::binary::sender::SenderKind; +use crate::shard::IggyShard; use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut}; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use crate::streaming::utils::PooledBuffer; use anyhow::Result; +use bytes::BytesMut; use iggy_common::INDEX_SIZE; use iggy_common::Identifier; use iggy_common::Sizeable; use iggy_common::{IggyError, Partitioning, SendMessages, Validatable}; +use std::rc::Rc; use tracing::instrument; impl ServerCommandHandler for SendMessages { @@ -45,36 +47,40 @@ impl ServerCommandHandler for SendMessages { mut self, sender: &mut SenderKind, length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { let total_payload_size = length as usize - std::mem::size_of::<u32>(); let metadata_len_field_size = std::mem::size_of::<u32>(); - let mut metadata_length_buffer = [0u8; 4]; - sender.read(&mut metadata_length_buffer).await?; - let metadata_size = u32::from_le_bytes(metadata_length_buffer); + let mut metadata_length_buffer = BytesMut::with_capacity(4); + unsafe { metadata_length_buffer.set_len(4) }; + let (result, metadata_len_buf) = sender.read(metadata_length_buffer).await; + result?; + let metadata_len_buf = metadata_len_buf.freeze(); + let metadata_size = u32::from_le_bytes(metadata_len_buf[..].try_into().unwrap()); let mut metadata_buffer = PooledBuffer::with_capacity(metadata_size as usize); unsafe { metadata_buffer.set_len(metadata_size as usize) }; - sender.read(&mut metadata_buffer).await?; + let (result, metadata_buf) = sender.read(metadata_buffer).await; + result?; let mut element_size = 0; - let stream_id = Identifier::from_raw_bytes(&metadata_buffer)?; + let stream_id = Identifier::from_raw_bytes(&metadata_buf)?; element_size += stream_id.get_size_bytes().as_bytes_usize(); self.stream_id = stream_id; - let topic_id = Identifier::from_raw_bytes(&metadata_buffer[element_size..])?; + let topic_id = Identifier::from_raw_bytes(&metadata_buf[element_size..])?; element_size += topic_id.get_size_bytes().as_bytes_usize(); self.topic_id = topic_id; - let partitioning = Partitioning::from_raw_bytes(&metadata_buffer[element_size..])?; + let partitioning = Partitioning::from_raw_bytes(&metadata_buf[element_size..])?; element_size += partitioning.get_size_bytes().as_bytes_usize(); self.partitioning = partitioning; let messages_count = u32::from_le_bytes( - metadata_buffer[element_size..element_size + 4] + metadata_buf[element_size..element_size + 4] .try_into() .unwrap(), ); @@ -82,13 +88,15 @@ impl ServerCommandHandler for SendMessages { let mut indexes_buffer = PooledBuffer::with_capacity(indexes_size); unsafe { indexes_buffer.set_len(indexes_size) }; - sender.read(&mut indexes_buffer).await?; + let (result, indexes_buffer) = sender.read(indexes_buffer).await; + result?; let messages_size = total_payload_size - metadata_size as usize - indexes_size - metadata_len_field_size; let mut messages_buffer = PooledBuffer::with_capacity(messages_size); unsafe { messages_buffer.set_len(messages_size) }; - sender.read(&mut messages_buffer).await?; + let (result, messages_buffer) = sender.read(messages_buffer).await; + result?; let indexes = IggyIndexesMut::from_bytes(indexes_buffer, 0); let batch = IggyMessagesBatchMut::from_indexes_and_messages( @@ -99,8 +107,7 @@ impl ServerCommandHandler for SendMessages { batch.validate()?; - let system = system.read().await; - system + shard .append_messages( session, &self.stream_id, @@ -110,7 +117,6 @@ impl ServerCommandHandler for SendMessages { None, ) .await?; - drop(system); sender.send_empty_ok_response().await?; Ok(()) diff --git a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs index 895c83b2..5045dbfb 100644 --- a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs +++ b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs @@ -19,13 +19,14 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::partitions::COMPONENT, sender::SenderKind}; +use crate::shard::IggyShard; use crate::state::command::EntryCommand; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use error_set::ErrContext; use iggy_common::IggyError; use iggy_common::create_partitions::CreatePartitions; +use std::rc::Rc; use tracing::{debug, instrument}; impl ServerCommandHandler for CreatePartitions { @@ -38,13 +39,12 @@ impl ServerCommandHandler for CreatePartitions { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let mut system = system.write().await; - system + shard .create_partitions( session, &self.stream_id, @@ -59,11 +59,10 @@ impl ServerCommandHandler for CreatePartitions { ) })?; - let system = system.downgrade(); let stream_id = self.stream_id.clone(); let topic_id = self.topic_id.clone(); - system + shard .state .apply( session.get_user_id(), diff --git a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs index c6351403..952b6b64 100644 --- a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs +++ b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs @@ -19,13 +19,14 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::partitions::COMPONENT, sender::SenderKind}; +use crate::shard::IggyShard; use crate::state::command::EntryCommand; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use error_set::ErrContext; use iggy_common::IggyError; use iggy_common::delete_partitions::DeletePartitions; +use std::rc::Rc; use tracing::{debug, instrument}; impl ServerCommandHandler for DeletePartitions { @@ -38,15 +39,14 @@ impl ServerCommandHandler for DeletePartitions { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); let stream_id = self.stream_id.clone(); let topic_id = self.topic_id.clone(); - let mut system = system.write().await; - system + shard .delete_partitions( session, &self.stream_id, @@ -60,8 +60,7 @@ impl ServerCommandHandler for DeletePartitions { ) })?; - let system = system.downgrade(); - system + shard .state .apply( session.get_user_id(), diff --git a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs index b43103f3..c28ac77b 100644 --- a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs +++ b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs @@ -20,15 +20,16 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHa use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::binary::{handlers::personal_access_tokens::COMPONENT, sender::SenderKind}; +use crate::shard::IggyShard; use crate::state::command::EntryCommand; use crate::state::models::CreatePersonalAccessTokenWithHash; use crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use error_set::ErrContext; use iggy_common::IggyError; use iggy_common::create_personal_access_token::CreatePersonalAccessToken; +use std::rc::Rc; use tracing::{debug, instrument}; impl ServerCommandHandler for CreatePersonalAccessToken { @@ -41,15 +42,13 @@ impl ServerCommandHandler for CreatePersonalAccessToken { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let system = system.write().await; - let token = system + let token = shard .create_personal_access_token(session, &self.name, self.expiry) - .await .with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - failed to create personal access token with name: {}, session: {session}", @@ -59,8 +58,7 @@ impl ServerCommandHandler for CreatePersonalAccessToken { let bytes = mapper::map_raw_pat(&token); let token_hash = PersonalAccessToken::hash_token(&token); - let system = system.downgrade(); - system + shard .state .apply( session.get_user_id(), diff --git a/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs b/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs index 4ba2de04..bb8dd463 100644 --- a/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs +++ b/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs @@ -19,13 +19,14 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::personal_access_tokens::COMPONENT, sender::SenderKind}; +use crate::shard::IggyShard; use crate::state::command::EntryCommand; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use error_set::ErrContext; use iggy_common::IggyError; use iggy_common::delete_personal_access_token::DeletePersonalAccessToken; +use std::rc::Rc; use tracing::{debug, instrument}; impl ServerCommandHandler for DeletePersonalAccessToken { @@ -38,22 +39,19 @@ impl ServerCommandHandler for DeletePersonalAccessToken { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); let token_name = self.name.clone(); - let mut system = system.write().await; - system + shard .delete_personal_access_token(session, &self.name) - .await .with_error_context(|error| {format!( "{COMPONENT} (error: {error}) - failed to delete personal access token with name: {token_name}, session: {session}" )})?; - let system = system.downgrade(); - system + shard .state .apply( session.get_user_id(), diff --git a/core/server/src/binary/handlers/personal_access_tokens/get_personal_access_tokens_handler.rs b/core/server/src/binary/handlers/personal_access_tokens/get_personal_access_tokens_handler.rs index 08d7c6e6..973af58d 100644 --- a/core/server/src/binary/handlers/personal_access_tokens/get_personal_access_tokens_handler.rs +++ b/core/server/src/binary/handlers/personal_access_tokens/get_personal_access_tokens_handler.rs @@ -21,11 +21,12 @@ use crate::binary::handlers::personal_access_tokens::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::binary::sender::SenderKind; +use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use error_set::ErrContext; use iggy_common::IggyError; use iggy_common::get_personal_access_tokens::GetPersonalAccessTokens; +use std::rc::Rc; use tracing::debug; impl ServerCommandHandler for GetPersonalAccessTokens { @@ -37,18 +38,16 @@ impl ServerCommandHandler for GetPersonalAccessTokens { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let system = system.read().await; - let personal_access_tokens = system + let personal_access_tokens = shard .get_personal_access_tokens(session) - .await .with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to get personal access tokens with session: {session}") })?; - let personal_access_tokens = mapper::map_personal_access_tokens(&personal_access_tokens); + let personal_access_tokens = mapper::map_personal_access_tokens(personal_access_tokens); sender.send_ok_response(&personal_access_tokens).await?; Ok(()) } diff --git a/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs b/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs index 33a4dee2..f7ca5628 100644 --- a/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs +++ b/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs @@ -15,13 +15,14 @@ * specific language governing permissions and limitations * under the License. */ +use crate::shard::IggyShard; +use std::rc::Rc; use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::binary::{handlers::personal_access_tokens::COMPONENT, sender::SenderKind}; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use error_set::ErrContext; use iggy_common::IggyError; @@ -38,14 +39,12 @@ impl ServerCommandHandler for LoginWithPersonalAccessToken { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let system = system.read().await; - let user = system + let user = shard .login_with_personal_access_token(&self.token, Some(session)) - .await .with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - failed to login with personal access token: {}, session: {session}", diff --git a/core/server/src/binary/handlers/segments/delete_segments_handler.rs b/core/server/src/binary/handlers/segments/delete_segments_handler.rs index 7a2f106c..b3eef13b 100644 --- a/core/server/src/binary/handlers/segments/delete_segments_handler.rs +++ b/core/server/src/binary/handlers/segments/delete_segments_handler.rs @@ -19,13 +19,14 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::partitions::COMPONENT, sender::SenderKind}; +use crate::shard::IggyShard; use crate::state::command::EntryCommand; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use error_set::ErrContext; use iggy_common::IggyError; use iggy_common::delete_segments::DeleteSegments; +use std::rc::Rc; use tracing::{debug, instrument}; impl ServerCommandHandler for DeleteSegments { @@ -38,16 +39,15 @@ impl ServerCommandHandler for DeleteSegments { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); let stream_id = self.stream_id.clone(); let topic_id = self.topic_id.clone(); let partition_id = self.partition_id; - let mut system = system.write().await; - system + shard .delete_segments( session, &self.stream_id, @@ -62,8 +62,7 @@ impl ServerCommandHandler for DeleteSegments { ) })?; - let system = system.downgrade(); - system + shard .state .apply( session.get_user_id(), diff --git a/core/server/src/binary/handlers/streams/create_stream_handler.rs b/core/server/src/binary/handlers/streams/create_stream_handler.rs index 86a91d0c..a00afde7 100644 --- a/core/server/src/binary/handlers/streams/create_stream_handler.rs +++ b/core/server/src/binary/handlers/streams/create_stream_handler.rs @@ -20,14 +20,15 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHa use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::binary::{handlers::streams::COMPONENT, sender::SenderKind}; +use crate::shard::IggyShard; use crate::state::command::EntryCommand; use crate::state::models::CreateStreamWithId; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use error_set::ErrContext; use iggy_common::IggyError; use iggy_common::create_stream::CreateStream; +use std::rc::Rc; use tracing::{debug, instrument}; impl ServerCommandHandler for CreateStream { @@ -40,14 +41,13 @@ impl ServerCommandHandler for CreateStream { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); let stream_id = self.stream_id; - let mut system = system.write().await; - let stream = system + let created_stream_id = shard .create_stream(session, self.stream_id, &self.name) .await .with_error_context(|error| { @@ -55,11 +55,16 @@ impl ServerCommandHandler for CreateStream { "{COMPONENT} (error: {error}) - failed to create stream with id: {stream_id:?}, session: {session}" ) })?; + let stream = shard.find_stream(session, &created_stream_id) + .with_error_context(|error| { + format!( + "{COMPONENT} (error: {error}) - failed to find created stream with id: {created_stream_id:?}, session: {session}" + ) + })?; let stream_id = stream.stream_id; - let response = mapper::map_stream(stream); + let response = mapper::map_stream(&stream); - let system = system.downgrade(); - system + shard .state .apply(session.get_user_id(), &EntryCommand::CreateStream(CreateStreamWithId { stream_id, diff --git a/core/server/src/binary/handlers/streams/delete_stream_handler.rs b/core/server/src/binary/handlers/streams/delete_stream_handler.rs index f480c0ea..a0e52a43 100644 --- a/core/server/src/binary/handlers/streams/delete_stream_handler.rs +++ b/core/server/src/binary/handlers/streams/delete_stream_handler.rs @@ -19,13 +19,14 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::streams::COMPONENT, sender::SenderKind}; +use crate::shard::IggyShard; use crate::state::command::EntryCommand; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use error_set::ErrContext; use iggy_common::IggyError; use iggy_common::delete_stream::DeleteStream; +use std::rc::Rc; use tracing::{debug, instrument}; impl ServerCommandHandler for DeleteStream { @@ -38,22 +39,20 @@ impl ServerCommandHandler for DeleteStream { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); let stream_id = self.stream_id.clone(); - let mut system = system.write().await; - system + shard .delete_stream(session, &self.stream_id) .await .with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to delete stream with ID: {stream_id}, session: {session}") })?; - let system = system.downgrade(); - system + shard .state .apply(session.get_user_id(), &EntryCommand::DeleteStream(self)) .await diff --git a/core/server/src/binary/handlers/streams/get_stream_handler.rs b/core/server/src/binary/handlers/streams/get_stream_handler.rs index 12e015af..fef6458c 100644 --- a/core/server/src/binary/handlers/streams/get_stream_handler.rs +++ b/core/server/src/binary/handlers/streams/get_stream_handler.rs @@ -20,11 +20,12 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHa use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::binary::sender::SenderKind; +use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use iggy_common::IggyError; use iggy_common::get_stream::GetStream; +use std::rc::Rc; use tracing::debug; impl ServerCommandHandler for GetStream { @@ -36,12 +37,11 @@ impl ServerCommandHandler for GetStream { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let system = system.read().await; - let Ok(stream) = system.try_find_stream(session, &self.stream_id) else { + let Ok(stream) = shard.try_find_stream(session, &self.stream_id) else { sender.send_empty_ok_response().await?; return Ok(()); }; @@ -51,7 +51,7 @@ impl ServerCommandHandler for GetStream { return Ok(()); }; - let response = mapper::map_stream(stream); + let response = mapper::map_stream(&stream); sender.send_ok_response(&response).await?; Ok(()) } diff --git a/core/server/src/binary/handlers/streams/get_streams_handler.rs b/core/server/src/binary/handlers/streams/get_streams_handler.rs index fa08bc13..0f83a9cb 100644 --- a/core/server/src/binary/handlers/streams/get_streams_handler.rs +++ b/core/server/src/binary/handlers/streams/get_streams_handler.rs @@ -21,12 +21,13 @@ use crate::binary::handlers::streams::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::binary::sender::SenderKind; +use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use error_set::ErrContext; use iggy_common::IggyError; use iggy_common::get_streams::GetStreams; +use std::rc::Rc; use tracing::debug; impl ServerCommandHandler for GetStreams { @@ -38,15 +39,14 @@ impl ServerCommandHandler for GetStreams { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let system = system.read().await; - let streams = system.find_streams(session).with_error_context(|error| { + let streams = shard.find_streams(session).with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to find streams for session: {session}") })?; - let response = mapper::map_streams(&streams); + let response = mapper::map_streams(streams); sender.send_ok_response(&response).await?; Ok(()) } diff --git a/core/server/src/binary/handlers/streams/purge_stream_handler.rs b/core/server/src/binary/handlers/streams/purge_stream_handler.rs index dd37e155..3a387ce9 100644 --- a/core/server/src/binary/handlers/streams/purge_stream_handler.rs +++ b/core/server/src/binary/handlers/streams/purge_stream_handler.rs @@ -19,13 +19,14 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::streams::COMPONENT, sender::SenderKind}; +use crate::shard::IggyShard; use crate::state::command::EntryCommand; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use error_set::ErrContext; use iggy_common::IggyError; use iggy_common::purge_stream::PurgeStream; +use std::rc::Rc; use tracing::{debug, instrument}; impl ServerCommandHandler for PurgeStream { @@ -38,21 +39,20 @@ impl ServerCommandHandler for PurgeStream { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let system = system.read().await; let stream_id = self.stream_id.clone(); - system + shard .purge_stream(session, &self.stream_id) .await .with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to purge stream with id: {stream_id}, session: {session}") })?; - system + shard .state .apply(session.get_user_id(), &EntryCommand::PurgeStream(self)) .await diff --git a/core/server/src/binary/handlers/streams/update_stream_handler.rs b/core/server/src/binary/handlers/streams/update_stream_handler.rs index cdb02c91..2aa4ddc9 100644 --- a/core/server/src/binary/handlers/streams/update_stream_handler.rs +++ b/core/server/src/binary/handlers/streams/update_stream_handler.rs @@ -19,13 +19,14 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::streams::COMPONENT, sender::SenderKind}; +use crate::shard::IggyShard; use crate::state::command::EntryCommand; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use error_set::ErrContext; use iggy_common::IggyError; use iggy_common::update_stream::UpdateStream; +use std::rc::Rc; use tracing::{debug, instrument}; impl ServerCommandHandler for UpdateStream { @@ -38,22 +39,20 @@ impl ServerCommandHandler for UpdateStream { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); let stream_id = self.stream_id.clone(); - let mut system = system.write().await; - system + shard .update_stream(session, &self.stream_id, &self.name) .await .with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to update stream with id: {stream_id}, session: {session}") })?; - let system = system.downgrade(); - system + shard .state .apply(session.get_user_id(), &EntryCommand::UpdateStream(self)) .await diff --git a/core/server/src/binary/handlers/system/get_client_handler.rs b/core/server/src/binary/handlers/system/get_client_handler.rs index 532eddf4..9ca18a38 100644 --- a/core/server/src/binary/handlers/system/get_client_handler.rs +++ b/core/server/src/binary/handlers/system/get_client_handler.rs @@ -20,11 +20,11 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHa use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::binary::sender::SenderKind; +use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use iggy_common::IggyError; use iggy_common::get_client::GetClient; -use iggy_common::locking::IggySharedMutFn; +use std::rc::Rc; use tracing::debug; impl ServerCommandHandler for GetClient { @@ -36,13 +36,12 @@ impl ServerCommandHandler for GetClient { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let system = system.read().await; - let Ok(client) = system.get_client(session, self.client_id).await else { + let Ok(client) = shard.get_client(session, self.client_id) else { sender.send_empty_ok_response().await?; return Ok(()); }; @@ -52,7 +51,6 @@ impl ServerCommandHandler for GetClient { return Ok(()); }; - let client = client.read().await; let bytes = mapper::map_client(&client); sender.send_ok_response(&bytes).await?; diff --git a/core/server/src/binary/handlers/system/get_clients_handler.rs b/core/server/src/binary/handlers/system/get_clients_handler.rs index bf05f7f1..c73ac987 100644 --- a/core/server/src/binary/handlers/system/get_clients_handler.rs +++ b/core/server/src/binary/handlers/system/get_clients_handler.rs @@ -21,11 +21,12 @@ use crate::binary::handlers::system::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::binary::sender::SenderKind; +use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use error_set::ErrContext; use iggy_common::IggyError; use iggy_common::get_clients::GetClients; +use std::rc::Rc; use tracing::debug; impl ServerCommandHandler for GetClients { @@ -37,19 +38,17 @@ impl ServerCommandHandler for GetClients { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let system = system.read().await; - let clients = system + let clients = shard .get_clients(session) - .await .with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to get clients, session: {session}") })?; - let clients = mapper::map_clients(&clients).await; + let clients = mapper::map_clients(clients).await; sender.send_ok_response(&clients).await?; Ok(()) } diff --git a/core/server/src/binary/handlers/system/get_me_handler.rs b/core/server/src/binary/handlers/system/get_me_handler.rs index b29972a9..44fdd651 100644 --- a/core/server/src/binary/handlers/system/get_me_handler.rs +++ b/core/server/src/binary/handlers/system/get_me_handler.rs @@ -21,12 +21,12 @@ use crate::binary::handlers::system::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::binary::sender::SenderKind; +use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use error_set::ErrContext; use iggy_common::IggyError; use iggy_common::get_me::GetMe; -use iggy_common::locking::IggySharedMutFn; +use std::rc::Rc; impl ServerCommandHandler for GetMe { fn code(&self) -> u32 { @@ -37,13 +37,11 @@ impl ServerCommandHandler for GetMe { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { - let system = system.read().await; - let Some(client) = system + let Some(client) = shard .get_client(session, session.client_id) - .await .with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to get current client for session: {session}") })? @@ -51,7 +49,6 @@ impl ServerCommandHandler for GetMe { return Err(IggyError::ClientNotFound(session.client_id)); }; - let client = client.read().await; let bytes = mapper::map_client(&client); sender.send_ok_response(&bytes).await?; diff --git a/core/server/src/binary/handlers/system/get_snapshot.rs b/core/server/src/binary/handlers/system/get_snapshot.rs index ff850388..27b2a3a7 100644 --- a/core/server/src/binary/handlers/system/get_snapshot.rs +++ b/core/server/src/binary/handlers/system/get_snapshot.rs @@ -19,11 +19,12 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::sender::SenderKind; +use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use bytes::Bytes; use iggy_common::IggyError; use iggy_common::get_snapshot::GetSnapshot; +use std::rc::Rc; use tracing::debug; impl ServerCommandHandler for GetSnapshot { @@ -35,13 +36,12 @@ impl ServerCommandHandler for GetSnapshot { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let system = system.read().await; - let snapshot = system + let snapshot = shard .get_snapshot(session, self.compression, &self.snapshot_types) .await?; let bytes = Bytes::copy_from_slice(&snapshot.0); diff --git a/core/server/src/binary/handlers/system/get_stats_handler.rs b/core/server/src/binary/handlers/system/get_stats_handler.rs index 098ace33..17c87191 100644 --- a/core/server/src/binary/handlers/system/get_stats_handler.rs +++ b/core/server/src/binary/handlers/system/get_stats_handler.rs @@ -21,11 +21,12 @@ use crate::binary::handlers::system::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::binary::sender::SenderKind; +use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use error_set::ErrContext; use iggy_common::IggyError; use iggy_common::get_stats::GetStats; +use std::rc::Rc; use tracing::debug; impl ServerCommandHandler for GetStats { @@ -37,13 +38,12 @@ impl ServerCommandHandler for GetStats { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let system = system.read().await; - let stats = system.get_stats().await.with_error_context(|error| { + let stats = shard.get_stats().await.with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to get stats, session: {session}") })?; let bytes = mapper::map_stats(&stats); diff --git a/core/server/src/binary/handlers/system/ping_handler.rs b/core/server/src/binary/handlers/system/ping_handler.rs index 396719c5..a66d91db 100644 --- a/core/server/src/binary/handlers/system/ping_handler.rs +++ b/core/server/src/binary/handlers/system/ping_handler.rs @@ -18,13 +18,13 @@ use crate::binary::command::{BinaryServerCommand, ServerCommandHandler}; use crate::binary::sender::SenderKind; +use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use iggy_common::IggyError; use iggy_common::IggyTimestamp; -use iggy_common::locking::IggySharedMutFn; use iggy_common::ping::Ping; +use std::rc::Rc; use tracing::debug; impl ServerCommandHandler for Ping { @@ -36,14 +36,15 @@ impl ServerCommandHandler for Ping { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let system = system.read().await; - let client_manager = system.client_manager.read().await; - if let Some(client) = client_manager.try_get_client(session.client_id) { - let mut client = client.write().await; + if let Some(client) = shard + .client_manager + .borrow_mut() + .try_get_client_mut(session.client_id) + { let now = IggyTimestamp::now(); client.last_heartbeat = now; debug!("Updated last heartbeat to: {now} for session: {session}"); diff --git a/core/server/src/binary/handlers/topics/create_topic_handler.rs b/core/server/src/binary/handlers/topics/create_topic_handler.rs index 0f42616f..db09863e 100644 --- a/core/server/src/binary/handlers/topics/create_topic_handler.rs +++ b/core/server/src/binary/handlers/topics/create_topic_handler.rs @@ -20,14 +20,15 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHa use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::binary::{handlers::topics::COMPONENT, sender::SenderKind}; +use crate::shard::IggyShard; use crate::state::command::EntryCommand; use crate::state::models::CreateTopicWithId; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use error_set::ErrContext; use iggy_common::IggyError; use iggy_common::create_topic::CreateTopic; +use std::rc::Rc; use tracing::{debug, instrument}; impl ServerCommandHandler for CreateTopic { @@ -40,14 +41,13 @@ impl ServerCommandHandler for CreateTopic { mut self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); let stream_id = self.stream_id.clone(); let topic_id = self.topic_id; - let mut system = system.write().await; - let topic = system + let created_topic_id = shard .create_topic( session, &self.stream_id, @@ -62,18 +62,31 @@ impl ServerCommandHandler for CreateTopic { .await .with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to create topic for stream_id: {stream_id}, topic_id: {topic_id:?}" ))?; + let stream = shard + .find_stream(session, &self.stream_id) + .with_error_context(|error| { + format!( + "{COMPONENT} (error: {error}) - failed to get stream for stream_id: {stream_id}" + ) + })?; + let topic = shard.find_topic(session, &stream, &created_topic_id) + .with_error_context(|error| { + format!( + "{COMPONENT} (error: {error}) - failed to get topic with ID: {created_topic_id} in stream with ID: {stream_id}" + ) + })?; self.message_expiry = topic.message_expiry; self.max_topic_size = topic.max_topic_size; let topic_id = topic.topic_id; let response = mapper::map_topic(topic).await; - let system = system.downgrade(); - system + shard .state .apply(session.get_user_id(), &EntryCommand::CreateTopic(CreateTopicWithId { topic_id, command: self - })) .await + })) + .await .with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - failed to apply create topic for stream_id: {stream_id}, topic_id: {topic_id:?}" diff --git a/core/server/src/binary/handlers/topics/delete_topic_handler.rs b/core/server/src/binary/handlers/topics/delete_topic_handler.rs index e83115f4..a8978511 100644 --- a/core/server/src/binary/handlers/topics/delete_topic_handler.rs +++ b/core/server/src/binary/handlers/topics/delete_topic_handler.rs @@ -19,13 +19,14 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::topics::COMPONENT, sender::SenderKind}; +use crate::shard::IggyShard; use crate::state::command::EntryCommand; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use error_set::ErrContext; use iggy_common::IggyError; use iggy_common::delete_topic::DeleteTopic; +use std::rc::Rc; use tracing::{debug, instrument}; impl ServerCommandHandler for DeleteTopic { @@ -38,23 +39,21 @@ impl ServerCommandHandler for DeleteTopic { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); let stream_id = self.stream_id.clone(); let topic_id = self.topic_id.clone(); - let mut system = system.write().await; - system + shard .delete_topic(session, &self.stream_id, &self.topic_id) .await .with_error_context(|error| format!( "{COMPONENT} (error: {error}) - failed to delete topic with ID: {topic_id} in stream with ID: {stream_id}, session: {session}", ))?; - let system = system.downgrade(); - system + shard .state .apply(session.get_user_id(), &EntryCommand::DeleteTopic(self)) .await diff --git a/core/server/src/binary/handlers/topics/get_topic_handler.rs b/core/server/src/binary/handlers/topics/get_topic_handler.rs index 8c652eb3..597c13bf 100644 --- a/core/server/src/binary/handlers/topics/get_topic_handler.rs +++ b/core/server/src/binary/handlers/topics/get_topic_handler.rs @@ -20,11 +20,13 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHa use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::binary::sender::SenderKind; +use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; +use error_set::ErrContext; use iggy_common::IggyError; use iggy_common::get_topic::GetTopic; +use std::rc::Rc; use tracing::debug; impl ServerCommandHandler for GetTopic { @@ -36,12 +38,19 @@ impl ServerCommandHandler for GetTopic { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let system = system.read().await; - let Ok(topic) = system.try_find_topic(session, &self.stream_id, &self.topic_id) else { + let stream = shard + .find_stream(session, &self.stream_id) + .with_error_context(|error| { + format!( + "Failed to find stream with ID: {}, session: {session} (error: {error})", + self.stream_id + ) + })?; + let Ok(topic) = shard.try_find_topic(session, &stream, &self.topic_id) else { sender.send_empty_ok_response().await?; return Ok(()); }; diff --git a/core/server/src/binary/handlers/topics/get_topics_handler.rs b/core/server/src/binary/handlers/topics/get_topics_handler.rs index 8d5c61b1..1606245e 100644 --- a/core/server/src/binary/handlers/topics/get_topics_handler.rs +++ b/core/server/src/binary/handlers/topics/get_topics_handler.rs @@ -21,12 +21,13 @@ use crate::binary::handlers::topics::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::binary::sender::SenderKind; +use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use error_set::ErrContext; use iggy_common::IggyError; use iggy_common::get_topics::GetTopics; +use std::rc::Rc; use tracing::debug; impl ServerCommandHandler for GetTopics { @@ -38,20 +39,26 @@ impl ServerCommandHandler for GetTopics { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let system = system.read().await; - let topics = system - .find_topics(session, &self.stream_id) + let stream = shard.find_stream(session, &self.stream_id) + .with_error_context(|error| { + format!( + "{COMPONENT} (error: {error}) - failed to get stream, stream_id: {}, session: {session}", + self.stream_id + ) + })?; + let topics = shard + .find_topics(session, &stream) .with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - failed to find topics, stream_id: {}, session: {session}", self.stream_id ) })?; - let response = mapper::map_topics(&topics); + let response = mapper::map_topics(topics); sender.send_ok_response(&response).await?; Ok(()) } diff --git a/core/server/src/binary/handlers/topics/purge_topic_handler.rs b/core/server/src/binary/handlers/topics/purge_topic_handler.rs index d9c91470..ab7e392c 100644 --- a/core/server/src/binary/handlers/topics/purge_topic_handler.rs +++ b/core/server/src/binary/handlers/topics/purge_topic_handler.rs @@ -19,13 +19,14 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::topics::COMPONENT, sender::SenderKind}; +use crate::shard::IggyShard; use crate::state::command::EntryCommand; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use error_set::ErrContext; use iggy_common::IggyError; use iggy_common::purge_topic::PurgeTopic; +use std::rc::Rc; use tracing::{debug, instrument}; impl ServerCommandHandler for PurgeTopic { @@ -38,12 +39,11 @@ impl ServerCommandHandler for PurgeTopic { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let system = system.read().await; - system + shard .purge_topic(session, &self.stream_id, &self.topic_id) .await .with_error_context(|error| { @@ -55,7 +55,7 @@ impl ServerCommandHandler for PurgeTopic { let topic_id = self.topic_id.clone(); let stream_id = self.stream_id.clone(); - system + shard .state .apply(session.get_user_id(), &EntryCommand::PurgeTopic(self)) .await diff --git a/core/server/src/binary/handlers/topics/update_topic_handler.rs b/core/server/src/binary/handlers/topics/update_topic_handler.rs index f460cc24..9d75a255 100644 --- a/core/server/src/binary/handlers/topics/update_topic_handler.rs +++ b/core/server/src/binary/handlers/topics/update_topic_handler.rs @@ -19,13 +19,14 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::topics::COMPONENT, sender::SenderKind}; +use crate::shard::IggyShard; use crate::state::command::EntryCommand; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use error_set::ErrContext; use iggy_common::IggyError; use iggy_common::update_topic::UpdateTopic; +use std::rc::Rc; use tracing::{debug, instrument}; impl ServerCommandHandler for UpdateTopic { @@ -38,14 +39,12 @@ impl ServerCommandHandler for UpdateTopic { mut self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let mut system = system.write().await; - - let topic = system + shard .update_topic( session, &self.stream_id, @@ -61,14 +60,25 @@ impl ServerCommandHandler for UpdateTopic { "{COMPONENT} (error: {error}) - failed to update topic with id: {}, stream_id: {}, session: {session}", self.topic_id, self.stream_id ))?; + + let stream = shard.find_stream(session, &self.stream_id) + .with_error_context(|error| format!( + "{COMPONENT} (error: {error}) - failed to find stream, stream_id: {}, session: {session}", + self.stream_id + ))?; + let topic = stream + .get_topic(&self.topic_id) + .with_error_context(|error| format!( + "{COMPONENT} (error: {error}) - failed to find topic, topic_id: {}, stream_id: {}, session: {session}", + self.topic_id, self.stream_id + ))?; self.message_expiry = topic.message_expiry; self.max_topic_size = topic.max_topic_size; let topic_id = self.topic_id.clone(); let stream_id = self.stream_id.clone(); - let system = system.downgrade(); - system + shard .state .apply(session.get_user_id(), &EntryCommand::UpdateTopic(self)) .await diff --git a/core/server/src/binary/handlers/users/change_password_handler.rs b/core/server/src/binary/handlers/users/change_password_handler.rs index 935e8f87..32b376cd 100644 --- a/core/server/src/binary/handlers/users/change_password_handler.rs +++ b/core/server/src/binary/handlers/users/change_password_handler.rs @@ -19,14 +19,15 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::users::COMPONENT, sender::SenderKind}; +use crate::shard::IggyShard; use crate::state::command::EntryCommand; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use crate::streaming::utils::crypto; use anyhow::Result; use error_set::ErrContext; use iggy_common::IggyError; use iggy_common::change_password::ChangePassword; +use std::rc::Rc; use tracing::{debug, instrument}; impl ServerCommandHandler for ChangePassword { @@ -38,21 +39,19 @@ impl ServerCommandHandler for ChangePassword { async fn handle( self, sender: &mut SenderKind, - _length: u32, - session: &Session, - system: &SharedSystem, + length: u32, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let mut system = system.write().await; - system + shard .change_password( session, &self.user_id, &self.current_password, &self.new_password, ) - .await .with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - failed to change password for user_id: {}, session: {session}", @@ -61,8 +60,7 @@ impl ServerCommandHandler for ChangePassword { })?; // For the security of the system, we hash the password before storing it in metadata. - let system = system.downgrade(); - system + shard .state .apply( session.get_user_id(), diff --git a/core/server/src/binary/handlers/users/create_user_handler.rs b/core/server/src/binary/handlers/users/create_user_handler.rs index 17d17fdf..e51b147b 100644 --- a/core/server/src/binary/handlers/users/create_user_handler.rs +++ b/core/server/src/binary/handlers/users/create_user_handler.rs @@ -16,6 +16,9 @@ * under the License. */ +use crate::shard::IggyShard; +use std::rc::Rc; + use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; @@ -23,7 +26,6 @@ use crate::binary::{handlers::users::COMPONENT, sender::SenderKind}; use crate::state::command::EntryCommand; use crate::state::models::CreateUserWithId; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use crate::streaming::utils::crypto; use anyhow::Result; use error_set::ErrContext; @@ -41,13 +43,12 @@ impl ServerCommandHandler for CreateUser { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let mut system = system.write().await; - let user = system + let user = shard .create_user( session, &self.username, @@ -63,11 +64,10 @@ impl ServerCommandHandler for CreateUser { ) })?; let user_id = user.id; - let response = mapper::map_user(user); + let response = mapper::map_user(&user); // For the security of the system, we hash the password before storing it in metadata. - let system = system.downgrade(); - system + shard .state .apply( session.get_user_id(), diff --git a/core/server/src/binary/handlers/users/delete_user_handler.rs b/core/server/src/binary/handlers/users/delete_user_handler.rs index b9fddfd7..10de56a4 100644 --- a/core/server/src/binary/handlers/users/delete_user_handler.rs +++ b/core/server/src/binary/handlers/users/delete_user_handler.rs @@ -16,12 +16,14 @@ * under the License. */ +use std::rc::Rc; + use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::users::COMPONENT, sender::SenderKind}; +use crate::shard::IggyShard; use crate::state::command::EntryCommand; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use error_set::ErrContext; use iggy_common::IggyError; @@ -37,16 +39,14 @@ impl ServerCommandHandler for DeleteUser { async fn handle( self, sender: &mut SenderKind, - _length: u32, - session: &Session, - system: &SharedSystem, + length: u32, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let mut system = system.write().await; - system + shard .delete_user(session, &self.user_id) - .await .with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - failed to delete user with ID: {}, session: {session}", @@ -54,9 +54,8 @@ impl ServerCommandHandler for DeleteUser { ) })?; - let system = system.downgrade(); let user_id = self.user_id.clone(); - system + shard .state .apply(session.get_user_id(), &EntryCommand::DeleteUser(self)) .await diff --git a/core/server/src/binary/handlers/users/get_user_handler.rs b/core/server/src/binary/handlers/users/get_user_handler.rs index 5523c4cc..e276c9a4 100644 --- a/core/server/src/binary/handlers/users/get_user_handler.rs +++ b/core/server/src/binary/handlers/users/get_user_handler.rs @@ -16,12 +16,14 @@ * under the License. */ +use std::rc::Rc; + use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::binary::sender::SenderKind; +use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use iggy_common::IggyError; use iggy_common::get_user::GetUser; use tracing::debug; @@ -35,12 +37,11 @@ impl ServerCommandHandler for GetUser { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let system = system.read().await; - let Ok(user) = system.find_user(session, &self.user_id) else { + let Ok(user) = shard.find_user(session, &self.user_id) else { sender.send_empty_ok_response().await?; return Ok(()); }; @@ -49,7 +50,7 @@ impl ServerCommandHandler for GetUser { return Ok(()); }; - let bytes = mapper::map_user(user); + let bytes = mapper::map_user(&user); sender.send_ok_response(&bytes).await?; Ok(()) } diff --git a/core/server/src/binary/handlers/users/get_users_handler.rs b/core/server/src/binary/handlers/users/get_users_handler.rs index 7471c591..72a0ca1e 100644 --- a/core/server/src/binary/handlers/users/get_users_handler.rs +++ b/core/server/src/binary/handlers/users/get_users_handler.rs @@ -16,13 +16,15 @@ * under the License. */ +use std::rc::Rc; + use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::users::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::binary::sender::SenderKind; +use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use error_set::ErrContext; use iggy_common::IggyError; use iggy_common::get_users::GetUsers; @@ -37,18 +39,14 @@ impl ServerCommandHandler for GetUsers { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let system = system.read().await; - let users = system - .get_users(session) - .await - .with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to get users, session: {session}") - })?; - let users = mapper::map_users(&users); + let users = shard.get_users(session).await.with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to get users, session: {session}") + })?; + let users = mapper::map_users(users); sender.send_ok_response(&users).await?; Ok(()) } diff --git a/core/server/src/binary/handlers/users/login_user_handler.rs b/core/server/src/binary/handlers/users/login_user_handler.rs index 7fa67be7..21cbba49 100644 --- a/core/server/src/binary/handlers/users/login_user_handler.rs +++ b/core/server/src/binary/handlers/users/login_user_handler.rs @@ -16,12 +16,14 @@ * under the License. */ +use std::rc::Rc; + use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::binary::{handlers::users::COMPONENT, sender::SenderKind}; +use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use error_set::ErrContext; use iggy_common::IggyError; @@ -37,15 +39,13 @@ impl ServerCommandHandler for LoginUser { async fn handle( self, sender: &mut SenderKind, - _length: u32, - session: &Session, - system: &SharedSystem, + length: u32, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let system = system.read().await; - let user = system + let user = shard .login_user(&self.username, &self.password, Some(session)) - .await .with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - failed to login user with name: {}, session: {session}", diff --git a/core/server/src/binary/handlers/users/logout_user_handler.rs b/core/server/src/binary/handlers/users/logout_user_handler.rs index 77fa33f4..d1ef95bf 100644 --- a/core/server/src/binary/handlers/users/logout_user_handler.rs +++ b/core/server/src/binary/handlers/users/logout_user_handler.rs @@ -16,11 +16,13 @@ * under the License. */ +use std::rc::Rc; + use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::users::COMPONENT, sender::SenderKind}; +use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use error_set::ErrContext; use iggy_common::IggyError; @@ -36,18 +38,14 @@ impl ServerCommandHandler for LogoutUser { async fn handle( self, sender: &mut SenderKind, - _length: u32, - session: &Session, - system: &SharedSystem, + length: u32, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let system = system.read().await; - system - .logout_user(session) - .await - .with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to logout user, session: {session}") - })?; + shard.logout_user(session).with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to logout user, session: {session}") + })?; session.clear_user_id(); sender.send_empty_ok_response().await?; Ok(()) diff --git a/core/server/src/binary/handlers/users/update_permissions_handler.rs b/core/server/src/binary/handlers/users/update_permissions_handler.rs index 3606c308..dea49a1c 100644 --- a/core/server/src/binary/handlers/users/update_permissions_handler.rs +++ b/core/server/src/binary/handlers/users/update_permissions_handler.rs @@ -16,12 +16,14 @@ * under the License. */ +use std::rc::Rc; + use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::users::COMPONENT, sender::SenderKind}; +use crate::shard::IggyShard; use crate::state::command::EntryCommand; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use error_set::ErrContext; use iggy_common::IggyError; @@ -38,21 +40,18 @@ impl ServerCommandHandler for UpdatePermissions { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let mut system = system.write().await; - system + shard .update_permissions(session, &self.user_id, self.permissions.clone()) - .await .with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to update permissions for user_id: {}, session: {session}", self.user_id ))?; - let system = system.downgrade(); - system + shard .state .apply( session.get_user_id(), diff --git a/core/server/src/binary/handlers/users/update_user_handler.rs b/core/server/src/binary/handlers/users/update_user_handler.rs index 873a3c6f..982c4ae1 100644 --- a/core/server/src/binary/handlers/users/update_user_handler.rs +++ b/core/server/src/binary/handlers/users/update_user_handler.rs @@ -16,12 +16,14 @@ * under the License. */ +use std::rc::Rc; + use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::users::COMPONENT, sender::SenderKind}; +use crate::shard::IggyShard; use crate::state::command::EntryCommand; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; use error_set::ErrContext; use iggy_common::IggyError; @@ -38,20 +40,18 @@ impl ServerCommandHandler for UpdateUser { self, sender: &mut SenderKind, _length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let mut system = system.write().await; - system + shard .update_user( session, &self.user_id, self.username.clone(), self.status, ) - .await .with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - failed to update user with user_id: {}, session: {session}", @@ -59,10 +59,9 @@ impl ServerCommandHandler for UpdateUser { ) })?; - let system = system.downgrade(); let user_id = self.user_id.clone(); - system + shard .state .apply(session.get_user_id(), &EntryCommand::UpdateUser(self)) .await diff --git a/core/server/src/binary/mapper.rs b/core/server/src/binary/mapper.rs index 219ceed9..1ecce265 100644 --- a/core/server/src/binary/mapper.rs +++ b/core/server/src/binary/mapper.rs @@ -16,6 +16,8 @@ * under the License. */ +use std::cell::Ref; + use crate::streaming::clients::client_manager::{Client, Transport}; use crate::streaming::partitions::partition::Partition; use crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken; @@ -94,9 +96,9 @@ pub fn map_client(client: &Client) -> Bytes { bytes.freeze() } -pub async fn map_clients(clients: &[Client]) -> Bytes { +pub async fn map_clients(clients: Vec<Client>) -> Bytes { let mut bytes = BytesMut::new(); - for client in clients { + for client in clients.iter() { extend_client(client, &mut bytes); } bytes.freeze() @@ -117,9 +119,9 @@ pub fn map_user(user: &User) -> Bytes { bytes.freeze() } -pub fn map_users(users: &[&User]) -> Bytes { +pub fn map_users(users: Vec<User>) -> Bytes { let mut bytes = BytesMut::new(); - for user in users { + for user in users.iter() { extend_user(user, &mut bytes); } bytes.freeze() @@ -138,9 +140,9 @@ pub fn map_raw_pat(token: &str) -> Bytes { bytes.freeze() } -pub fn map_personal_access_tokens(personal_access_tokens: &[PersonalAccessToken]) -> Bytes { +pub fn map_personal_access_tokens(personal_access_tokens: Vec<PersonalAccessToken>) -> Bytes { let mut bytes = BytesMut::new(); - for personal_access_token in personal_access_tokens { + for personal_access_token in personal_access_tokens.iter() { extend_pat(personal_access_token, &mut bytes); } bytes.freeze() @@ -155,15 +157,15 @@ pub fn map_stream(stream: &Stream) -> Bytes { bytes.freeze() } -pub fn map_streams(streams: &[&Stream]) -> Bytes { +pub fn map_streams(streams: Vec<Ref<'_, Stream>>) -> Bytes { let mut bytes = BytesMut::new(); - for stream in streams { + for stream in streams.iter() { extend_stream(stream, &mut bytes); } bytes.freeze() } -pub fn map_topics(topics: &[&Topic]) -> Bytes { +pub fn map_topics(topics: Vec<&Topic>) -> Bytes { let mut bytes = BytesMut::new(); for topic in topics { extend_topic(topic, &mut bytes); @@ -196,7 +198,7 @@ pub fn map_consumer_group(consumer_group: &ConsumerGroup) -> Bytes { bytes.freeze() } -pub fn map_consumer_groups(consumer_groups: &[ConsumerGroup]) -> Bytes { +pub fn map_consumer_groups(consumer_groups: Vec<ConsumerGroup>) -> Bytes { let mut bytes = BytesMut::new(); for consumer_group in consumer_groups { extend_consumer_group(&consumer_group, &mut bytes); diff --git a/core/server/src/binary/sender.rs b/core/server/src/binary/sender.rs index 06e9c51f..ff405eb6 100644 --- a/core/server/src/binary/sender.rs +++ b/core/server/src/binary/sender.rs @@ -33,28 +33,33 @@ use quinn::{RecvStream, SendStream}; macro_rules! forward_async_methods { ( $( - async fn $method_name:ident( + async fn $method_name:ident + $(<$($generic:ident $(: $bound:path)?),+>)? + ( &mut self $(, $arg:ident : $arg_ty:ty )* ) -> $ret:ty ; )* ) => { $( - pub async fn $method_name(&mut self, $( $arg: $arg_ty ),* ) -> $ret { + pub async fn $method_name + $(<$($generic $(: $bound)?),+>)? + (&mut self, $( $arg: $arg_ty ),* ) -> $ret { match self { - Self::Tcp(d) => d.$method_name($( $arg ),*).await, - Self::TcpTls(s) => s.$method_name($( $arg ),*).await, - Self::Quic(s) => s.$method_name($( $arg ),*).await, + Self::Tcp(d) => d.$method_name$(::<$($generic),+>)?($( $arg ),*).await, + Self::TcpTls(s) => s.$method_name$(::<$($generic),+>)?($( $arg ),*).await, + Self::Quic(s) => s.$method_name$(::<$($generic),+>)?($( $arg ),*).await, } } )* } } + pub trait Sender { - fn read( + fn read<B: IoBufMut>( &mut self, - buffer: BytesMut, - ) -> impl Future<Output = (Result<usize, IggyError>, BytesMut)>; + buffer: B + ) -> impl Future<Output = (Result<usize, IggyError>, B)>; fn send_empty_ok_response(&mut self) -> impl Future<Output = Result<(), IggyError>>; fn send_ok_response(&mut self, payload: &[u8]) -> impl Future<Output = Result<(), IggyError>>; fn send_ok_response_vectored( @@ -92,7 +97,7 @@ impl SenderKind { } forward_async_methods! { - async fn read(&mut self, buffer: BytesMut) -> (Result<usize, IggyError>, BytesMut); + async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<usize, IggyError>, B); async fn send_empty_ok_response(&mut self) -> Result<(), IggyError>; async fn send_ok_response(&mut self, payload: &[u8]) -> Result<(), IggyError>; async fn send_ok_response_vectored(&mut self, length: &[u8], slices: Vec<libc::iovec>) -> Result<(), IggyError>; diff --git a/core/server/src/lib.rs b/core/server/src/lib.rs index 3dcf64b2..4a5dbe53 100644 --- a/core/server/src/lib.rs +++ b/core/server/src/lib.rs @@ -18,6 +18,8 @@ #[cfg(not(feature = "disable-mimalloc"))] use mimalloc::MiMalloc; +use nix::libc::iovec; +use nix::libc::c_void; #[cfg(not(feature = "disable-mimalloc"))] #[global_allocator] @@ -53,3 +55,10 @@ pub fn map_toggle_str<'a>(enabled: bool) -> &'a str { false => "disabled", } } + +pub fn to_iovec<T>(data: &[T]) -> iovec { + iovec { + iov_base: data.as_ptr() as *mut c_void, + iov_len: data.len() * std::mem::size_of::<T>(), + } +} \ No newline at end of file diff --git a/core/server/src/quic/quic_sender.rs b/core/server/src/quic/quic_sender.rs index 17b60818..c6403349 100644 --- a/core/server/src/quic/quic_sender.rs +++ b/core/server/src/quic/quic_sender.rs @@ -36,7 +36,8 @@ pub struct QuicSender { } impl Sender for QuicSender { - async fn read(&mut self, buffer: BytesMut) -> (Result<usize, IggyError>, BytesMut) { + async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<usize, IggyError>, B) { + //TODO: Fixme // Not-so-nice code because quinn recv stream has different API for read_exact /* let read_bytes = buffer.len(); diff --git a/core/server/src/shard/system/clients.rs b/core/server/src/shard/system/clients.rs index ee9e1634..4668b337 100644 --- a/core/server/src/shard/system/clients.rs +++ b/core/server/src/shard/system/clients.rs @@ -72,7 +72,7 @@ impl IggyShard { } } - pub async fn get_client( + pub fn get_client( &self, session: &Session, client_id: u32, @@ -91,7 +91,7 @@ impl IggyShard { Ok(self.client_manager.borrow().try_get_client(client_id)) } - pub async fn get_clients(&self, session: &Session) -> Result<Vec<Client>, IggyError> { + pub fn get_clients(&self, session: &Session) -> Result<Vec<Client>, IggyError> { self.ensure_authenticated(session)?; self.permissioner .borrow() diff --git a/core/server/src/shard/system/consumer_groups.rs b/core/server/src/shard/system/consumer_groups.rs index 768d4e5c..7b98205f 100644 --- a/core/server/src/shard/system/consumer_groups.rs +++ b/core/server/src/shard/system/consumer_groups.rs @@ -26,7 +26,6 @@ use crate::streaming::topics::consumer_group::ConsumerGroup; use error_set::ErrContext; use iggy_common::Identifier; use iggy_common::IggyError; -use iggy_common::locking::IggySharedMutFn; impl IggyShard { pub fn get_consumer_group<'cg, 'stream>( @@ -93,7 +92,7 @@ impl IggyShard { topic_id: &Identifier, group_id: Option<u32>, name: &str, - ) -> Result<u32, IggyError> { + ) -> Result<Identifier, IggyError> { self.ensure_authenticated(session)?; { let stream = self.get_stream(stream_id).with_error_context(|error| { @@ -120,7 +119,7 @@ impl IggyShard { .create_consumer_group(group_id, name) .with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to create consumer group with name: {name}") - }).map(|cg| cg.group_id) + }) } pub async fn delete_consumer_group( @@ -272,12 +271,7 @@ impl IggyShard { .find_topic(session, &stream, topic_id) .with_error_context(|error| { format!( -<<<<<<< HEAD "{COMPONENT} (error: {error}) - topic not found for stream ID: {stream_id:?}, topic_id: {topic_id:?}" -======= - "{COMPONENT} (error: {error}) - topic not found for stream ID: {:?}, topic_id: {:?}", - stream.stream_id, topic_id ->>>>>>> 48107890 (fix iggy shard errors) ) })?; diff --git a/core/server/src/shard/system/messages.rs b/core/server/src/shard/system/messages.rs index 06499bd3..7668566a 100644 --- a/core/server/src/shard/system/messages.rs +++ b/core/server/src/shard/system/messages.rs @@ -86,11 +86,7 @@ impl IggyShard { topic .store_consumer_offset_internal(polling_consumer, offset, partition_id) .await -<<<<<<< HEAD - .with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to store consumer offset internal, polling consumer: {polling_consumer}, offset: {offset}, partition ID: {partition_id}")) ?; -======= .with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to store consumer offset internal, polling consumer: {}, offset: {}, partition ID: {}", polling_consumer, offset, partition_id))?; ->>>>>>> 48107890 (fix iggy shard errors) } let batch_set = if let Some(encryptor) = &self.encryptor { diff --git a/core/server/src/shard/system/personal_access_tokens.rs b/core/server/src/shard/system/personal_access_tokens.rs index 082fcd93..8cc89cdc 100644 --- a/core/server/src/shard/system/personal_access_tokens.rs +++ b/core/server/src/shard/system/personal_access_tokens.rs @@ -28,7 +28,7 @@ use iggy_common::IggyTimestamp; use tracing::{error, info}; impl IggyShard { - pub async fn get_personal_access_tokens( + pub fn get_personal_access_tokens( &self, session: &Session, ) -> Result<Vec<PersonalAccessToken>, IggyError> { @@ -53,7 +53,7 @@ impl IggyShard { Ok(personal_access_tokens) } - pub async fn create_personal_access_token( + pub fn create_personal_access_token( &self, session: &Session, name: &str, @@ -103,7 +103,7 @@ impl IggyShard { Ok(token) } - pub async fn delete_personal_access_token( + pub fn delete_personal_access_token( &self, session: &Session, name: &str, @@ -135,7 +135,7 @@ impl IggyShard { Ok(()) } - pub async fn login_with_personal_access_token( + pub fn login_with_personal_access_token( &self, token: &str, session: Option<&Session>, diff --git a/core/server/src/shard/system/snapshot/mod.rs b/core/server/src/shard/system/snapshot/mod.rs index 78c5d393..b4de2992 100644 --- a/core/server/src/shard/system/snapshot/mod.rs +++ b/core/server/src/shard/system/snapshot/mod.rs @@ -54,6 +54,11 @@ impl IggyShard { snapshot_types }; + // TODO: Replace this with + // https://github.com/bearcove/rc-zip + // and impl the monoio async writer, based on this example: + // https://youtu.be/RYHYiXMJdZI?si=d2roKeHn5lJrw2ri&t=1140 + // and rc-zip-tokio crate. let cursor = Cursor::new(Vec::new()); let mut zip_writer = ZipFileWriter::new(cursor.compat_write()); diff --git a/core/server/src/shard/system/streams.rs b/core/server/src/shard/system/streams.rs index 43b45246..3cb0da5b 100644 --- a/core/server/src/shard/system/streams.rs +++ b/core/server/src/shard/system/streams.rs @@ -57,7 +57,7 @@ impl IggyShard { Ok(self.get_streams()) } - pub fn find_stream<'a>( + pub fn find_stream( &self, session: &Session, identifier: &Identifier, @@ -182,7 +182,7 @@ impl IggyShard { session: &Session, stream_id: Option<u32>, name: &str, - ) -> Result<u32, IggyError> { + ) -> Result<Identifier, IggyError> { self.ensure_authenticated(session)?; self.permissioner .borrow() @@ -220,7 +220,7 @@ impl IggyShard { .insert(name.to_owned(), stream.stream_id); self.streams.borrow_mut().insert(stream.stream_id, stream); self.metrics.increment_streams(1); - Ok(id) + Ok(Identifier::numeric(id)?) } pub async fn update_stream( diff --git a/core/server/src/shard/system/topics.rs b/core/server/src/shard/system/topics.rs index d118090f..477308bc 100644 --- a/core/server/src/shard/system/topics.rs +++ b/core/server/src/shard/system/topics.rs @@ -112,7 +112,7 @@ impl IggyShard { compression_algorithm: CompressionAlgorithm, max_topic_size: MaxTopicSize, replication_factor: Option<u8>, - ) -> Result<u32, IggyError> { + ) -> Result<Identifier, IggyError> { self.ensure_authenticated(session)?; { let stream = self.get_stream(stream_id).with_error_context(|error| { @@ -151,7 +151,7 @@ impl IggyShard { self.metrics.increment_partitions(partitions_count); self.metrics.increment_segments(partitions_count); - Ok(created_topic_id) + Ok(Identifier::numeric(created_topic_id)?) } #[allow(clippy::too_many_arguments)] @@ -165,7 +165,7 @@ impl IggyShard { compression_algorithm: CompressionAlgorithm, max_topic_size: MaxTopicSize, replication_factor: Option<u8>, - ) -> Result<u32, IggyError> { + ) -> Result<(), IggyError> { self.ensure_authenticated(session)?; { let stream = self.get_stream(stream_id).with_error_context(|error| { @@ -213,14 +213,7 @@ impl IggyShard { // TODO: if message_expiry is changed, we need to check if we need to purge messages based on the new expiry // TODO: if max_size_bytes is changed, we need to check if we need to purge messages based on the new size // TODO: if replication_factor is changed, we need to do `something` - self.get_stream(stream_id) - .with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to get stream with ID: {stream_id}") - })? - .get_topic(topic_id) - .with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to get topic with ID: {topic_id} in stream with ID: {stream_id}") - }).map(|topic| topic.topic_id) + Ok(()) } pub async fn delete_topic( diff --git a/core/server/src/streaming/clients/client_manager.rs b/core/server/src/streaming/clients/client_manager.rs index fc5c00cc..922f4f11 100644 --- a/core/server/src/streaming/clients/client_manager.rs +++ b/core/server/src/streaming/clients/client_manager.rs @@ -84,7 +84,7 @@ impl ClientManager { return Err(IggyError::ClientNotFound(client_id)); } - let mut client = client.unwrap(); + let client = client.unwrap(); client.user_id = Some(user_id); Ok(()) } @@ -95,7 +95,7 @@ impl ClientManager { return Err(IggyError::ClientNotFound(client_id)); } - let mut client = client.unwrap(); + let client = client.unwrap(); client.user_id = None; Ok(()) } @@ -104,6 +104,10 @@ impl ClientManager { self.clients.get(&client_id).cloned() } + pub fn try_get_client_mut(&mut self, client_id: u32) -> Option<&mut Client> { + self.clients.get_mut(&client_id) + } + pub fn get_clients(&self) -> Vec<Client> { self.clients.values().cloned().collect() } diff --git a/core/server/src/streaming/topics/consumer_groups.rs b/core/server/src/streaming/topics/consumer_groups.rs index 92168738..a2efba95 100644 --- a/core/server/src/streaming/topics/consumer_groups.rs +++ b/core/server/src/streaming/topics/consumer_groups.rs @@ -172,7 +172,7 @@ impl Topic { &mut self, group_id: Option<u32>, name: &str, - ) -> Result<ConsumerGroup, IggyError> { + ) -> Result<Identifier, IggyError> { if self.consumer_groups_ids.contains_key(name) { return Err(IggyError::ConsumerGroupNameAlreadyExists( name.to_owned(), @@ -208,13 +208,12 @@ impl Topic { let consumer_group = ConsumerGroup::new(self.topic_id, id, name, self.partitions.len() as u32); self.consumer_groups_ids.insert(name.to_owned(), id); - let cloned_group = consumer_group.clone(); self.consumer_groups.borrow_mut().insert(id, consumer_group); info!( "Created consumer group with ID: {} for topic with ID: {} and stream with ID: {}.", id, self.topic_id, self.stream_id ); - Ok(cloned_group) + Ok(Identifier::numeric(id)?) } pub async fn delete_consumer_group( diff --git a/core/server/src/streaming/utils/pooled_buffer.rs b/core/server/src/streaming/utils/pooled_buffer.rs index 9c4375d2..0aa1dd3a 100644 --- a/core/server/src/streaming/utils/pooled_buffer.rs +++ b/core/server/src/streaming/utils/pooled_buffer.rs @@ -18,6 +18,7 @@ use super::memory_pool::{BytesMutExt, memory_pool}; use bytes::{Buf, BufMut, BytesMut}; +use monoio::buf::IoBufMut; use std::ops::{Deref, DerefMut}; #[derive(Debug)] @@ -215,3 +216,17 @@ impl Buf for PooledBuffer { self.inner.chunks_vectored(dst) } } + +unsafe impl IoBufMut for PooledBuffer { + fn write_ptr(&mut self) -> *mut u8 { + self.inner.write_ptr() + } + + fn bytes_total(&mut self) -> usize { + self.inner.bytes_total() + } + + unsafe fn set_init(&mut self, pos: usize) { + unsafe { self.inner.set_init(pos) } + } +} diff --git a/core/server/src/tcp/sender.rs b/core/server/src/tcp/sender.rs index 6a10adb2..22796812 100644 --- a/core/server/src/tcp/sender.rs +++ b/core/server/src/tcp/sender.rs @@ -28,12 +28,13 @@ use tracing::debug; const STATUS_OK: &[u8] = &[0; 4]; -pub(crate) async fn read<T>( +pub(crate) async fn read<T, B>( stream: &mut T, - buffer: BytesMut, -) -> (Result<usize, IggyError>, BytesMut) + buffer: B, +) -> (Result<usize, IggyError>, B) where T: AsyncReadRent + AsyncWriteRent + Unpin, + B: IoBufMut, { match stream.read_exact(buffer).await { (Ok(0), buffer) => (Err(IggyError::ConnectionClosed), buffer), diff --git a/core/server/src/tcp/tcp_sender.rs b/core/server/src/tcp/tcp_sender.rs index 81089c85..cfa25aad 100644 --- a/core/server/src/tcp/tcp_sender.rs +++ b/core/server/src/tcp/tcp_sender.rs @@ -33,7 +33,7 @@ pub struct TcpSender { } impl Sender for TcpSender { - async fn read(&mut self, buffer: BytesMut) -> (Result<usize, IggyError>, BytesMut) { + async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<usize, IggyError>, B) { sender::read(&mut self.stream, buffer).await } diff --git a/core/server/src/tcp/tcp_tls_sender.rs b/core/server/src/tcp/tcp_tls_sender.rs index 7213ad64..167427c1 100644 --- a/core/server/src/tcp/tcp_tls_sender.rs +++ b/core/server/src/tcp/tcp_tls_sender.rs @@ -22,6 +22,7 @@ use crate::{server_error::ServerError, tcp::sender}; use bytes::BytesMut; use error_set::ErrContext; use iggy_common::IggyError; +use monoio::buf::IoBufMut; use monoio::io::AsyncWriteRent; use monoio::net::TcpStream; use monoio_native_tls::TlsStream; @@ -33,7 +34,7 @@ pub struct TcpTlsSender { } impl Sender for TcpTlsSender { - async fn read(&mut self, buffer: BytesMut) -> (Result<usize, IggyError>, BytesMut) { + async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<usize, IggyError>, B) { sender::read(&mut self.stream, buffer).await }
