This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch experiment in repository https://gitbox.apache.org/repos/asf/iggy.git
commit bedae0cca5b71e9fbbd59e639aec14d6821e0fe6 Author: numinex <[email protected]> AuthorDate: Thu Jan 15 10:17:32 2026 +0100 test --- Cargo.lock | 1 + Cargo.toml | 1 + .../consumer_groups/create_consumer_group.rs | 2 +- .../consumer_groups/delete_consumer_group.rs | 2 +- .../src/commands/partitions/create_partitions.rs | 2 +- .../src/commands/partitions/delete_partitions.rs | 2 +- .../src/commands/segments/delete_segments.rs | 2 +- core/common/src/commands/streams/create_stream.rs | 2 +- core/common/src/commands/streams/delete_stream.rs | 2 +- core/common/src/commands/streams/purge_stream.rs | 2 +- core/common/src/commands/streams/update_stream.rs | 2 +- core/common/src/commands/topics/create_topic.rs | 2 +- core/common/src/commands/topics/delete_topic.rs | 2 +- core/common/src/commands/topics/purge_topic.rs | 2 +- core/common/src/commands/topics/update_topic.rs | 2 +- core/common/src/commands/users/change_password.rs | 2 +- core/common/src/commands/users/create_user.rs | 2 +- core/common/src/commands/users/delete_user.rs | 2 +- .../src/commands/users/update_permissions.rs | 2 +- core/common/src/commands/users/update_user.rs | 2 +- core/common/src/types/consensus/message.rs | 16 + core/metadata/Cargo.toml | 1 + core/metadata/src/stm/consumer_group.rs | 147 +------ core/metadata/src/stm/mod.rs | 277 +++++++++---- core/metadata/src/stm/mux.rs | 12 +- core/metadata/src/stm/stream.rs | 437 +++++---------------- core/metadata/src/stm/user.rs | 279 ++----------- core/server/Cargo.toml | 2 +- 28 files changed, 401 insertions(+), 808 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 88953e444..c7a1a3d7b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5702,6 +5702,7 @@ dependencies = [ "consensus", "iggy_common", "journal", + "left-right", "message_bus", "slab", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 3b68471ce..32dd683f3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -142,6 +142,7 @@ iggy_connector_sdk = { path = "core/connectors/sdk", version = "0.1.1-edge.1" } integration = { path = "core/integration" } keyring = { version = "3.6.3", features = ["sync-secret-service", "vendored"] } lazy_static = "1.5.0" +left-right = "0.11" log = "0.4.29" mimalloc = "0.1" mockall = "0.14.0" diff --git a/core/common/src/commands/consumer_groups/create_consumer_group.rs b/core/common/src/commands/consumer_groups/create_consumer_group.rs index c5bd3e912..c03d39964 100644 --- a/core/common/src/commands/consumer_groups/create_consumer_group.rs +++ b/core/common/src/commands/consumer_groups/create_consumer_group.rs @@ -33,7 +33,7 @@ use std::str::from_utf8; /// - `stream_id` - unique stream ID (numeric or name). /// - `topic_id` - unique topic ID (numeric or name). /// - `name` - unique consumer group name, max length is 255 characters. -#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] pub struct CreateConsumerGroup { /// Unique stream ID (numeric or name). #[serde(skip)] diff --git a/core/common/src/commands/consumer_groups/delete_consumer_group.rs b/core/common/src/commands/consumer_groups/delete_consumer_group.rs index 3eee5cc1c..859ded6f9 100644 --- a/core/common/src/commands/consumer_groups/delete_consumer_group.rs +++ b/core/common/src/commands/consumer_groups/delete_consumer_group.rs @@ -31,7 +31,7 @@ use std::fmt::Display; /// - `stream_id` - unique stream ID (numeric or name). /// - `topic_id` - unique topic ID (numeric or name). /// - `group_id` - unique consumer group ID (numeric or name). -#[derive(Debug, Serialize, Deserialize, PartialEq, Default)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)] pub struct DeleteConsumerGroup { /// Unique stream ID (numeric or name). #[serde(skip)] diff --git a/core/common/src/commands/partitions/create_partitions.rs b/core/common/src/commands/partitions/create_partitions.rs index 9aa862d5c..a01d3b26d 100644 --- a/core/common/src/commands/partitions/create_partitions.rs +++ b/core/common/src/commands/partitions/create_partitions.rs @@ -32,7 +32,7 @@ use std::fmt::Display; /// - `stream_id` - unique stream ID (numeric or name). /// - `topic_id` - unique topic ID (numeric or name). /// - `partitions_count` - number of partitions in the topic to create, max value is 1000. -#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] pub struct CreatePartitions { /// Unique stream ID (numeric or name). #[serde(skip)] diff --git a/core/common/src/commands/partitions/delete_partitions.rs b/core/common/src/commands/partitions/delete_partitions.rs index 068f502ce..12be4448a 100644 --- a/core/common/src/commands/partitions/delete_partitions.rs +++ b/core/common/src/commands/partitions/delete_partitions.rs @@ -32,7 +32,7 @@ use std::fmt::Display; /// - `stream_id` - unique stream ID (numeric or name). /// - `topic_id` - unique topic ID (numeric or name). /// - `partitions_count` - number of partitions in the topic to delete, max value is 1000. -#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] pub struct DeletePartitions { /// Unique stream ID (numeric or name). #[serde(skip)] diff --git a/core/common/src/commands/segments/delete_segments.rs b/core/common/src/commands/segments/delete_segments.rs index 221e5c667..841a84ac7 100644 --- a/core/common/src/commands/segments/delete_segments.rs +++ b/core/common/src/commands/segments/delete_segments.rs @@ -31,7 +31,7 @@ use std::fmt::Display; /// - `topic_id` - unique topic ID (numeric or name). /// - `partition_id` - unique partition ID (numeric or name). /// - `segments_count` - number of segments in the partition to delete. -#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] pub struct DeleteSegments { /// Unique stream ID (numeric or name). #[serde(skip)] diff --git a/core/common/src/commands/streams/create_stream.rs b/core/common/src/commands/streams/create_stream.rs index 292cfcfc2..9ff2fec5a 100644 --- a/core/common/src/commands/streams/create_stream.rs +++ b/core/common/src/commands/streams/create_stream.rs @@ -29,7 +29,7 @@ use std::str::from_utf8; /// `CreateStream` command is used to create a new stream. /// It has additional payload: /// - `name` - unique stream name (string), max length is 255 characters. -#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] pub struct CreateStream { /// Unique stream name (string), max length is 255 characters. pub name: String, diff --git a/core/common/src/commands/streams/delete_stream.rs b/core/common/src/commands/streams/delete_stream.rs index ac8aa57f9..f44110340 100644 --- a/core/common/src/commands/streams/delete_stream.rs +++ b/core/common/src/commands/streams/delete_stream.rs @@ -28,7 +28,7 @@ use std::fmt::Display; /// `DeleteStream` command is used to delete an existing stream. /// It has additional payload: /// - `stream_id` - unique stream ID (numeric or name). -#[derive(Debug, Serialize, Deserialize, PartialEq, Default)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)] pub struct DeleteStream { /// Unique stream ID (numeric or name). #[serde(skip)] diff --git a/core/common/src/commands/streams/purge_stream.rs b/core/common/src/commands/streams/purge_stream.rs index 805e7c944..036930ff8 100644 --- a/core/common/src/commands/streams/purge_stream.rs +++ b/core/common/src/commands/streams/purge_stream.rs @@ -28,7 +28,7 @@ use std::fmt::Display; /// `PurgeStream` command is used to purge stream data (all the messages from its topics). /// It has additional payload: /// - `stream_id` - unique stream ID (numeric or name). -#[derive(Debug, Serialize, Deserialize, PartialEq, Default)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)] pub struct PurgeStream { /// Unique stream ID (numeric or name). #[serde(skip)] diff --git a/core/common/src/commands/streams/update_stream.rs b/core/common/src/commands/streams/update_stream.rs index ced247e71..ce3600f88 100644 --- a/core/common/src/commands/streams/update_stream.rs +++ b/core/common/src/commands/streams/update_stream.rs @@ -32,7 +32,7 @@ use std::str::from_utf8; /// It has additional payload: /// - `stream_id` - unique stream ID (numeric or name). /// - `name` - unique stream name (string), max length is 255 characters. -#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] pub struct UpdateStream { /// Unique stream ID (numeric or name). #[serde(skip)] diff --git a/core/common/src/commands/topics/create_topic.rs b/core/common/src/commands/topics/create_topic.rs index ba90084e7..48cd3d525 100644 --- a/core/common/src/commands/topics/create_topic.rs +++ b/core/common/src/commands/topics/create_topic.rs @@ -40,7 +40,7 @@ use std::str::from_utf8; /// Can't be lower than segment size in the config. /// - `replication_factor` - replication factor for the topic. /// - `name` - unique topic name, max length is 255 characters. -#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] pub struct CreateTopic { /// Unique stream ID (numeric or name). #[serde(skip)] diff --git a/core/common/src/commands/topics/delete_topic.rs b/core/common/src/commands/topics/delete_topic.rs index a41c0de02..45680311d 100644 --- a/core/common/src/commands/topics/delete_topic.rs +++ b/core/common/src/commands/topics/delete_topic.rs @@ -30,7 +30,7 @@ use std::fmt::Display; /// It has additional payload: /// - `stream_id` - unique stream ID (numeric or name). /// - `topic_id` - unique topic ID (numeric or name). -#[derive(Debug, Serialize, Deserialize, PartialEq, Default)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)] pub struct DeleteTopic { /// Unique stream ID (numeric or name). #[serde(skip)] diff --git a/core/common/src/commands/topics/purge_topic.rs b/core/common/src/commands/topics/purge_topic.rs index 061e374fa..9473b38c7 100644 --- a/core/common/src/commands/topics/purge_topic.rs +++ b/core/common/src/commands/topics/purge_topic.rs @@ -30,7 +30,7 @@ use std::fmt::Display; /// It has additional payload: /// - `stream_id` - unique stream ID (numeric or name). /// - `topic_id` - unique topic ID (numeric or name). -#[derive(Debug, Serialize, Deserialize, PartialEq, Default)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)] pub struct PurgeTopic { /// Unique stream ID (numeric or name). #[serde(skip)] diff --git a/core/common/src/commands/topics/update_topic.rs b/core/common/src/commands/topics/update_topic.rs index 32a2ba7a5..83dadb69c 100644 --- a/core/common/src/commands/topics/update_topic.rs +++ b/core/common/src/commands/topics/update_topic.rs @@ -40,7 +40,7 @@ use std::str::from_utf8; /// Can't be lower than segment size in the config. /// - `replication_factor` - replication factor for the topic. /// - `name` - unique topic name, max length is 255 characters. -#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] pub struct UpdateTopic { /// Unique stream ID (numeric or name). #[serde(skip)] diff --git a/core/common/src/commands/users/change_password.rs b/core/common/src/commands/users/change_password.rs index 27f1ea6b8..4a5663778 100644 --- a/core/common/src/commands/users/change_password.rs +++ b/core/common/src/commands/users/change_password.rs @@ -33,7 +33,7 @@ use std::str::from_utf8; /// - `user_id` - unique user ID (numeric or name). /// - `current_password` - current password, must be between 3 and 100 characters long. /// - `new_password` - new password, must be between 3 and 100 characters long. -#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] pub struct ChangePassword { /// Unique user ID (numeric or name). #[serde(skip)] diff --git a/core/common/src/commands/users/create_user.rs b/core/common/src/commands/users/create_user.rs index 03e0cb167..7d23c54ee 100644 --- a/core/common/src/commands/users/create_user.rs +++ b/core/common/src/commands/users/create_user.rs @@ -34,7 +34,7 @@ use std::str::from_utf8; /// - `password` - password of the user, must be between 3 and 100 characters long. /// - `status` - status of the user, can be either `active` or `inactive`. /// - `permissions` - optional permissions of the user. If not provided, user will have no permissions. -#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] pub struct CreateUser { /// Unique name of the user, must be between 3 and 50 characters long. pub username: String, diff --git a/core/common/src/commands/users/delete_user.rs b/core/common/src/commands/users/delete_user.rs index c41a967ff..3cc5682a6 100644 --- a/core/common/src/commands/users/delete_user.rs +++ b/core/common/src/commands/users/delete_user.rs @@ -28,7 +28,7 @@ use std::fmt::Display; /// `DeleteUser` command is used to delete a user by unique ID. /// It has additional payload: /// - `user_id` - unique user ID (numeric or name). -#[derive(Debug, Serialize, Deserialize, PartialEq, Default)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)] pub struct DeleteUser { /// Unique user ID (numeric or name). #[serde(skip)] diff --git a/core/common/src/commands/users/update_permissions.rs b/core/common/src/commands/users/update_permissions.rs index ab8e525f0..ff96927dd 100644 --- a/core/common/src/commands/users/update_permissions.rs +++ b/core/common/src/commands/users/update_permissions.rs @@ -31,7 +31,7 @@ use std::fmt::Display; /// It has additional payload: /// - `user_id` - unique user ID (numeric or name). /// - `permissions` - new permissions (optional) -#[derive(Debug, Serialize, Deserialize, PartialEq, Default)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)] pub struct UpdatePermissions { /// Unique user ID (numeric or name). #[serde(skip)] diff --git a/core/common/src/commands/users/update_user.rs b/core/common/src/commands/users/update_user.rs index 28e2504d2..3e2811915 100644 --- a/core/common/src/commands/users/update_user.rs +++ b/core/common/src/commands/users/update_user.rs @@ -34,7 +34,7 @@ use std::str::from_utf8; /// - `user_id` - unique user ID (numeric or name). /// - `username` - new username (optional), if provided, must be between 3 and 50 characters long. /// - `status` - new status (optional) -#[derive(Debug, Serialize, Deserialize, PartialEq, Default)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)] pub struct UpdateUser { #[serde(skip)] pub user_id: Identifier, diff --git a/core/common/src/types/consensus/message.rs b/core/common/src/types/consensus/message.rs index 0e94820f8..ac32cc3c8 100644 --- a/core/common/src/types/consensus/message.rs +++ b/core/common/src/types/consensus/message.rs @@ -141,6 +141,22 @@ where } } + /// Get the message body as zero-copy `Bytes`. + /// + /// Returns an empty `Bytes` if there is no body. + #[inline] + #[allow(unused)] + pub fn body_bytes(&self) -> Bytes { + let header_size = size_of::<H>(); + let total_size = self.header().size() as usize; + + if total_size > header_size { + self.buffer.slice(header_size..total_size) + } else { + Bytes::new() + } + } + /// Get the complete message as bytes (header + body). #[inline] #[allow(unused)] diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml index ee886e7c8..d7fcecfc6 100644 --- a/core/metadata/Cargo.toml +++ b/core/metadata/Cargo.toml @@ -33,6 +33,7 @@ bytes = { workspace = true } consensus = { path = "../consensus" } iggy_common = { path = "../common" } journal = { path = "../journal" } +left-right = { workspace = true } message_bus = { path = "../message_bus" } slab = "0.4.11" tracing = { workspace = true } diff --git a/core/metadata/src/stm/consumer_group.rs b/core/metadata/src/stm/consumer_group.rs index 944627ad7..7eb5d08cb 100644 --- a/core/metadata/src/stm/consumer_group.rs +++ b/core/metadata/src/stm/consumer_group.rs @@ -15,24 +15,15 @@ // specific language governing permissions and limitations // under the License. -use crate::stm::{ApplyState, StateCommand}; use ahash::AHashMap; -use bytes::Bytes; -use iggy_common::create_consumer_group::CreateConsumerGroup; -use iggy_common::delete_consumer_group::DeleteConsumerGroup; -use iggy_common::{ - BytesSerializable, IggyTimestamp, - header::{Operation, PrepareHeader}, - message::Message, -}; +use iggy_common::IggyTimestamp; use slab::Slab; -use std::cell::RefCell; // ============================================================================ // ConsumerGroupMember - Individual member of a consumer group // ============================================================================ -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct ConsumerGroupMember { pub id: u32, pub joined_at: IggyTimestamp, @@ -48,7 +39,7 @@ impl ConsumerGroupMember { // ConsumerGroup - A group of consumers // ============================================================================ -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct ConsumerGroup { pub id: usize, pub stream_id: usize, @@ -93,138 +84,12 @@ impl ConsumerGroup { #[derive(Debug, Clone, Default)] pub struct ConsumerGroups { - // Global index for all consumer groups across all streams/topics - index: RefCell<AHashMap<(usize, usize, String), usize>>, // (stream_id, topic_id, name) -> id - items: RefCell<Slab<ConsumerGroup>>, + pub index: AHashMap<(usize, usize, String), usize>, + pub items: Slab<ConsumerGroup>, } impl ConsumerGroups { pub fn new() -> Self { - Self { - index: RefCell::new(AHashMap::with_capacity(256)), - items: RefCell::new(Slab::with_capacity(256)), - } - } - - /// Insert a consumer group and return the assigned ID - pub fn insert(&self, group: ConsumerGroup) -> usize { - let mut items = self.items.borrow_mut(); - let mut index = self.index.borrow_mut(); - - let key = (group.stream_id, group.topic_id, group.name.clone()); - let id = items.insert(group); - items[id].id = id; - index.insert(key, id); - id - } - - /// Get consumer group by ID - pub fn get(&self, id: usize) -> Option<ConsumerGroup> { - self.items.borrow().get(id).cloned() - } - - /// Get consumer group by stream_id, topic_id, and name - pub fn get_by_location( - &self, - stream_id: usize, - topic_id: usize, - name: &str, - ) -> Option<ConsumerGroup> { - let index = self.index.borrow(); - let key = (stream_id, topic_id, name.to_string()); - if let Some(&id) = index.get(&key) { - self.items.borrow().get(id).cloned() - } else { - None - } - } - - /// Remove consumer group by ID - pub fn remove(&self, id: usize) -> Option<ConsumerGroup> { - let mut items = self.items.borrow_mut(); - let mut index = self.index.borrow_mut(); - - if !items.contains(id) { - return None; - } - - let group = items.remove(id); - let key = (group.stream_id, group.topic_id, group.name.clone()); - index.remove(&key); - Some(group) - } - - /// Get all consumer groups for a specific topic - pub fn get_by_topic(&self, stream_id: usize, topic_id: usize) -> Vec<ConsumerGroup> { - self.items - .borrow() - .iter() - .filter_map(|(_, g)| { - if g.stream_id == stream_id && g.topic_id == topic_id { - Some(g.clone()) - } else { - None - } - }) - .collect() - } - - /// Get number of consumer groups - pub fn len(&self) -> usize { - self.items.borrow().len() - } - - /// Check if empty - pub fn is_empty(&self) -> bool { - self.items.borrow().is_empty() - } - - /// Get all consumer groups - pub fn values(&self) -> Vec<ConsumerGroup> { - self.items - .borrow() - .iter() - .map(|(_, g): (usize, &ConsumerGroup)| g.clone()) - .collect() - } -} - -#[derive(Debug)] -pub enum ConsumerGroupsCommand { - Create(CreateConsumerGroup), - Delete(DeleteConsumerGroup), -} - -impl StateCommand for ConsumerGroups { - type Command = ConsumerGroupsCommand; - type Input = Message<PrepareHeader>; - - fn into_command(input: &Self::Input) -> Option<Self::Command> { - // TODO: rework this thing, so we don't copy the bytes on each request - let body = Bytes::copy_from_slice(input.body()); - match input.header().operation { - Operation::CreateConsumerGroup => Some(ConsumerGroupsCommand::Create( - CreateConsumerGroup::from_bytes(body.clone()).unwrap(), - )), - Operation::DeleteConsumerGroup => Some(ConsumerGroupsCommand::Delete( - DeleteConsumerGroup::from_bytes(body.clone()).unwrap(), - )), - _ => None, - } - } -} - -impl ApplyState for ConsumerGroups { - type Output = (); - - fn do_apply(&self, cmd: Self::Command) -> Self::Output { - match cmd { - ConsumerGroupsCommand::Create(payload) => { - todo!("Handle Create consumer group with {:?}", payload) - } - ConsumerGroupsCommand::Delete(payload) => { - todo!("Handle Delete consumer group with {:?}", payload) - } - } + Self::default() } } diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs index e6c13684b..5baeeb421 100644 --- a/core/metadata/src/stm/mod.rs +++ b/core/metadata/src/stm/mod.rs @@ -20,51 +20,194 @@ pub mod mux; pub mod stream; pub mod user; -/// Macro to generate a `{State}Command` enum and implement `StateCommand` trait. -/// -/// # Arguments -/// * `$state_type` - The type that implements `ApplyState` trait -/// * `$command_enum` - The name of the command enum to generate (e.g., StreamsCommand) -/// * `$operations` - Array of Operation enum variants (also used as payload type names) -/// -/// # Example -/// ```ignore -/// define_state_command! { -/// Streams, -/// StreamsCommand, -/// [CreateStream, UpdateStream, DeleteStream, PurgeStream] -/// } -/// ``` +use std::cell::UnsafeCell; + +// ============================================================================ +// WriteCell - Interior mutability wrapper for WriteHandle +// ============================================================================ + +pub struct WriteCell<T: left_right::Absorb<O>, O> { + inner: UnsafeCell<left_right::WriteHandle<T, O>>, +} + +impl<T: left_right::Absorb<O>, O> WriteCell<T, O> { + pub fn new(write: left_right::WriteHandle<T, O>) -> Self { + Self { + inner: UnsafeCell::new(write), + } + } + + pub fn with<F, R>(&self, f: F) -> R + where + F: FnOnce(&mut left_right::WriteHandle<T, O>) -> R, + { + // SAFETY: WriteCell is !Sync (via UnsafeCell), so only one thread can access. + // The caller ensures exclusive access through the &self borrow. + unsafe { f(&mut *self.inner.get()) } + } +} + +// ============================================================================ +// Traits - All use &self, implementations use interior mutability +// ============================================================================ + +/// Parses input into a command. +pub trait Command { + type Cmd; + type Input; + + fn into_command(input: &Self::Input) -> Option<Self::Cmd>; +} + +/// Dispatches a command to mutate state. +/// Takes `&mut self` - implementations use interior mutability. +pub trait Dispatch { + type Cmd; + type Output; + + fn dispatch(&self, cmd: &Self::Cmd) -> Self::Output; +} + +/// Applies a command through a state wrapper. +/// Takes `&self` - implementations use interior mutability. +pub trait ApplyState { + type Cmd; + type Output; + type Inner: Command<Cmd = Self::Cmd>; + + fn do_apply(&self, cmd: Self::Cmd) -> Option<Self::Output>; +} + +/// Public interface for state machines. +/// Takes `&self` - implementations use interior mutability. +pub trait State { + type Output; + type Input; + + fn apply(&self, input: &Self::Input) -> Option<Self::Output>; +} + +impl<T> State for T +where + T: ApplyState, +{ + type Output = T::Output; + type Input = <T::Inner as Command>::Input; + + fn apply(&self, input: &Self::Input) -> Option<Self::Output> { + let cmd = T::Inner::into_command(input)?; + self.do_apply(cmd) + } +} + +pub(crate) trait StateMachine { + type Input; + type Output; + fn update(&self, input: &Self::Input, output: &mut Vec<Self::Output>); +} + #[macro_export] -macro_rules! define_state_command { +macro_rules! define_state { ( - $state_type:ty, - $command_enum:ident, + $state:ident, + $inner:ident { + $($field_name:ident : $field_type:ty),* $(,)? + }, + $command:ident, [$($operation:ident),* $(,)?] ) => { + #[derive(Debug, Clone, Default)] + pub struct $inner { + $( + pub $field_name: $field_type, + )* + } + #[derive(Debug)] - pub enum $command_enum { + pub enum $command { $( $operation($operation), )* } - impl $crate::stm::StateCommand for $state_type { - type Command = $command_enum; + impl Clone for $command + where + $($operation: Clone,)* + { + fn clone(&self) -> Self { + match self { + $( + $command::$operation(payload) => $command::$operation(payload.clone()), + )* + } + } + } + + /// State wrapper with interior mutability for write access. + pub struct $state { + write: Option<$crate::stm::WriteCell<$inner, $command>>, + read: ::std::sync::Arc<::left_right::ReadHandle<$inner>>, + } + + impl $state { + /// Get a clone of the read handle. + pub fn read_handle(&self) -> ::std::sync::Arc<::left_right::ReadHandle<$inner>> { + self.read.clone() + } + + /// Get read access to the inner state. + pub fn read(&self) -> Option<::left_right::ReadGuard<'_, $inner>> { + self.read.enter() + } + + /// Check if this instance has write capability. + pub fn has_write(&self) -> bool { + self.write.is_some() + } + } + + impl From<$inner> for $state { + fn from(inner: $inner) -> Self { + let (write, read) = ::left_right::new_from_empty(inner); + let write = Some($crate::stm::WriteCell::new(write)); + let read = ::std::sync::Arc::new(read); + Self { write, read } + } + } + + impl From<::std::sync::Arc<::left_right::ReadHandle<$inner>>> for $state { + /// Create a read-only instance from a read handle. + fn from(read: ::std::sync::Arc<::left_right::ReadHandle<$inner>>) -> Self { + Self { write: None, read } + } + } + + impl ::std::fmt::Debug for $state { + fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { + match self.read.enter() { + Some(guard) => f.debug_struct(stringify!($state)) + .field("has_write", &self.write.is_some()) + .field("inner", &*guard) + .finish(), + None => f.debug_struct(stringify!($state)).finish_non_exhaustive(), + } + } + } + + impl $crate::stm::Command for $inner { + type Cmd = $command; type Input = ::iggy_common::message::Message<::iggy_common::header::PrepareHeader>; - fn into_command(input: &Self::Input) -> Option<Self::Command> { + fn into_command(input: &Self::Input) -> Option<Self::Cmd> { use ::iggy_common::BytesSerializable; - use ::bytes::Bytes; use ::iggy_common::header::Operation; - // TODO: rework this thing, so we don't copy the bytes on each request - let body = Bytes::copy_from_slice(input.body()); + let body = input.body_bytes(); match input.header().operation { $( Operation::$operation => { - Some($command_enum::$operation( - $operation::from_bytes(body.clone()).unwrap() + Some($command::$operation( + $operation::from_bytes(body).unwrap() )) }, )* @@ -73,56 +216,44 @@ macro_rules! define_state_command { } } - // Compile-time check that the type implements ApplyState - const _: () = { - const fn assert_impl_apply_state<T: $crate::stm::ApplyState>() {} - assert_impl_apply_state::<$state_type>(); - }; - }; -} - -// This is public interface to state, therefore it will be imported from different crate, for now during development I am leaving it there. -pub trait State -where - Self: Sized, -{ - type Output; - type Input; - - // Apply the state machine logic and return an optional output. - // The output is optional, as we model the `StateMachine`, as an variadic list, - // where not all state machines will produce an output for every input event. - fn apply(&self, input: &Self::Input) -> Option<Self::Output>; -} - -// TODO: This interface should be private to the stm module. -pub trait StateMachine { - type Input; - type Output; - fn update(&self, input: &Self::Input, output: &mut Vec<Self::Output>); -} + impl ::left_right::Absorb<$command> for $inner + where + $inner: $crate::stm::Dispatch<Cmd = $command>, + { + fn absorb_first(&mut self, cmd: &mut $command, _other: &Self) { + use $crate::stm::Dispatch; + self.dispatch(cmd); + } -pub trait StateCommand { - type Command; - type Input; + fn absorb_second(&mut self, cmd: $command, _other: &Self) { + use $crate::stm::Dispatch; + self.dispatch(&cmd); + } - fn into_command(input: &Self::Input) -> Option<Self::Command>; -} + fn sync_with(&mut self, first: &Self) { + *self = first.clone(); + } -pub trait ApplyState: StateCommand { - type Output; + fn drop_first(self: Box<Self>) {} - fn do_apply(&self, cmd: Self::Command) -> Self::Output; -} + fn drop_second(self: Box<Self>) {} + } -impl<T> State for T -where - T: ApplyState, -{ - type Output = T::Output; - type Input = T::Input; + impl $crate::stm::ApplyState for $state + where + $inner: $crate::stm::Dispatch<Cmd = $command>, + { + type Cmd = $command; + type Output = (); + type Inner = $inner; - fn apply(&self, input: &Self::Input) -> Option<Self::Output> { - T::into_command(input).map(|cmd| self.do_apply(cmd)) - } + fn do_apply(&self, cmd: Self::Cmd) -> Option<Self::Output> { + let write_cell = self.write.as_ref()?; + write_cell.with(|write| { + write.append(cmd).publish(); + }); + Some(()) + } + } + }; } diff --git a/core/metadata/src/stm/mux.rs b/core/metadata/src/stm/mux.rs index 3fc80d88b..979ff3ccf 100644 --- a/core/metadata/src/stm/mux.rs +++ b/core/metadata/src/stm/mux.rs @@ -92,14 +92,16 @@ mod tests { #[test] fn construct_mux_state_machine_from_states_with_same_output() { - use crate::stm::*; + use crate::stm::StateMachine; + use crate::stm::mux::MuxStateMachine; + use crate::stm::stream::{Streams, StreamsInner}; + use crate::stm::user::{Users, UsersInner}; use iggy_common::header::PrepareHeader; use iggy_common::message::Message; - let users = user::Users::new(); - let streams = stream::Streams::new(); - let cgs = consumer_group::ConsumerGroups::new(); - let mux = mux::MuxStateMachine::new(variadic!(users, streams, cgs)); + let users: Users = UsersInner::new().into(); + let streams: Streams = StreamsInner::new().into(); + let mux = MuxStateMachine::new(variadic!(users, streams)); let input = Message::new(std::mem::size_of::<PrepareHeader>()); let mut output = Vec::new(); diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs index 6f05e5aa4..844a30565 100644 --- a/core/metadata/src/stm/stream.rs +++ b/core/metadata/src/stm/stream.rs @@ -15,29 +15,23 @@ // specific language governing permissions and limitations // under the License. -use crate::define_state_command; +use crate::define_state; use crate::stats::{StreamStats, TopicStats}; -use crate::stm::ApplyState; +use crate::stm::Dispatch; use ahash::AHashMap; -use iggy_common::create_partitions::CreatePartitions; use iggy_common::create_stream::CreateStream; -use iggy_common::create_topic::CreateTopic; -use iggy_common::delete_partitions::DeletePartitions; -use iggy_common::delete_segments::DeleteSegments; use iggy_common::delete_stream::DeleteStream; -use iggy_common::delete_topic::DeleteTopic; use iggy_common::purge_stream::PurgeStream; -use iggy_common::purge_topic::PurgeTopic; use iggy_common::update_stream::UpdateStream; -use iggy_common::update_topic::UpdateTopic; -use iggy_common::{ - CompressionAlgorithm, Identifier, IggyError, IggyExpiry, IggyTimestamp, MaxTopicSize, -}; +use iggy_common::{CompressionAlgorithm, IggyExpiry, IggyTimestamp, MaxTopicSize}; use slab::Slab; -use std::cell::RefCell; use std::sync::Arc; -#[derive(Debug, Clone)] +// ============================================================================ +// Partition Entity +// ============================================================================ + +#[derive(Debug, Clone, Default)] pub struct Partition { pub id: usize, } @@ -48,56 +42,26 @@ impl Partition { } } +// ============================================================================ +// Partitions Collection +// ============================================================================ + #[derive(Debug, Clone, Default)] pub struct Partitions { - items: RefCell<Slab<Partition>>, + pub items: Slab<Partition>, } impl Partitions { pub fn new() -> Self { - Self { - items: RefCell::new(Slab::with_capacity(1024)), - } - } - - pub fn insert(&self, partition: Partition) -> usize { - let mut items = self.items.borrow_mut(); - let id = items.insert(partition); - items[id].id = id; - id - } - - pub fn get(&self, id: usize) -> Option<Partition> { - self.items.borrow().get(id).cloned() - } - - pub fn remove(&self, id: usize) -> Option<Partition> { - let mut items = self.items.borrow_mut(); - if items.contains(id) { - Some(items.remove(id)) - } else { - None - } - } - - pub fn len(&self) -> usize { - self.items.borrow().len() - } - - pub fn is_empty(&self) -> bool { - self.items.borrow().is_empty() - } - - pub fn iter(&self) -> Vec<Partition> { - self.items - .borrow() - .iter() - .map(|(_, p): (usize, &Partition)| p.clone()) - .collect() + Self::default() } } -#[derive(Debug, Clone)] +// ============================================================================ +// ConsumerGroup (local to Topic, not a state machine) +// ============================================================================ + +#[derive(Debug, Clone, Default)] pub struct ConsumerGroup { pub id: usize, pub name: String, @@ -114,66 +78,50 @@ impl ConsumerGroup { } } +// ============================================================================ +// ConsumerGroups (local to Topic, simple collection - no left_right) +// ============================================================================ + #[derive(Debug, Clone, Default)] pub struct ConsumerGroups { - index: RefCell<AHashMap<String, usize>>, - items: RefCell<Slab<ConsumerGroup>>, + index: AHashMap<String, usize>, + items: Slab<ConsumerGroup>, } impl ConsumerGroups { pub fn new() -> Self { - Self { - index: RefCell::new(AHashMap::with_capacity(256)), - items: RefCell::new(Slab::with_capacity(256)), - } + Self::default() } - pub fn insert(&self, group: ConsumerGroup) -> usize { - let mut items = self.items.borrow_mut(); - let mut index = self.index.borrow_mut(); - - let name = group.name.clone(); - let id = items.insert(group); - items[id].id = id; - index.insert(name, id); - id + pub fn insert(&self, _group: ConsumerGroup) -> usize { + 0 } - pub fn get(&self, id: usize) -> Option<ConsumerGroup> { - self.items.borrow().get(id).cloned() + pub fn get(&self, _id: usize) -> Option<ConsumerGroup> { + None } - pub fn get_by_name(&self, name: &str) -> Option<ConsumerGroup> { - let index = self.index.borrow(); - if let Some(&id) = index.get(name) { - self.items.borrow().get(id).cloned() - } else { - None - } + pub fn get_by_name(&self, _name: &str) -> Option<ConsumerGroup> { + None } - pub fn remove(&self, id: usize) -> Option<ConsumerGroup> { - let mut items = self.items.borrow_mut(); - let mut index = self.index.borrow_mut(); - - if !items.contains(id) { - return None; - } - - let group = items.remove(id); - index.remove(&group.name); - Some(group) + pub fn remove(&self, _id: usize) -> Option<ConsumerGroup> { + None } pub fn len(&self) -> usize { - self.items.borrow().len() + 0 } pub fn is_empty(&self) -> bool { - self.items.borrow().is_empty() + true } } +// ============================================================================ +// Topic Entity +// ============================================================================ + #[derive(Debug, Clone)] pub struct Topic { pub id: usize, @@ -189,6 +137,23 @@ pub struct Topic { pub consumer_groups: ConsumerGroups, } +impl Default for Topic { + fn default() -> Self { + Self { + id: 0, + name: String::new(), + created_at: IggyTimestamp::default(), + replication_factor: 1, + message_expiry: IggyExpiry::default(), + compression_algorithm: CompressionAlgorithm::default(), + max_topic_size: MaxTopicSize::default(), + stats: Arc::new(TopicStats::default()), + partitions: Partitions::new(), + consumer_groups: ConsumerGroups::new(), + } + } +} + impl Topic { pub fn new( name: String, @@ -214,86 +179,27 @@ impl Topic { } } +// ============================================================================ +// Topics Collection +// ============================================================================ + #[derive(Debug, Clone, Default)] pub struct Topics { - index: RefCell<AHashMap<String, usize>>, - items: RefCell<Slab<Topic>>, + pub index: AHashMap<String, usize>, + pub items: Slab<Topic>, } impl Topics { pub fn new() -> Self { - Self { - index: RefCell::new(AHashMap::with_capacity(1024)), - items: RefCell::new(Slab::with_capacity(1024)), - } - } - - pub fn insert(&self, topic: Topic) -> usize { - let mut items = self.items.borrow_mut(); - let mut index = self.index.borrow_mut(); - - let name = topic.name.clone(); - let id = items.insert(topic); - items[id].id = id; - index.insert(name, id); - id - } - - pub fn get(&self, id: usize) -> Option<Topic> { - self.items.borrow().get(id).cloned() - } - - pub fn get_by_name(&self, name: &str) -> Option<Topic> { - let index = self.index.borrow(); - if let Some(&id) = index.get(name) { - self.items.borrow().get(id).cloned() - } else { - None - } - } - - pub fn get_by_identifier(&self, identifier: &Identifier) -> Option<Topic> { - match identifier.kind { - iggy_common::IdKind::Numeric => { - if let Ok(id) = identifier.get_u32_value() { - self.get(id as usize) - } else { - None - } - } - iggy_common::IdKind::String => { - if let Ok(name) = identifier.get_string_value() { - self.get_by_name(&name) - } else { - None - } - } - } - } - - pub fn remove(&self, id: usize) -> Option<Topic> { - let mut items = self.items.borrow_mut(); - let mut index = self.index.borrow_mut(); - - if !items.contains(id) { - return None; - } - - let topic = items.remove(id); - index.remove(&topic.name); - Some(topic) - } - - pub fn len(&self) -> usize { - self.items.borrow().len() - } - - pub fn is_empty(&self) -> bool { - self.items.borrow().is_empty() + Self::default() } } -#[derive(Debug, Clone)] +// ============================================================================ +// Stream Entity +// ============================================================================ + +#[derive(Debug)] pub struct Stream { pub id: usize, pub name: String, @@ -303,198 +209,73 @@ pub struct Stream { pub topics: Topics, } -impl Stream { - pub fn new(name: String, created_at: IggyTimestamp) -> Self { +impl Default for Stream { + fn default() -> Self { Self { id: 0, - name, - created_at, + name: String::new(), + created_at: IggyTimestamp::default(), stats: Arc::new(StreamStats::default()), topics: Topics::new(), } } } -#[derive(Debug, Clone, Default)] -pub struct Streams { - index: RefCell<AHashMap<String, usize>>, - items: RefCell<Slab<Stream>>, -} - -impl Streams { - pub fn new() -> Self { +impl Clone for Stream { + fn clone(&self) -> Self { Self { - index: RefCell::new(AHashMap::with_capacity(256)), - items: RefCell::new(Slab::with_capacity(256)), - } - } - - pub fn insert(&self, stream: Stream) -> usize { - let mut items = self.items.borrow_mut(); - let mut index = self.index.borrow_mut(); - - let name = stream.name.clone(); - let id = items.insert(stream); - items[id].id = id; - index.insert(name, id); - id - } - - pub fn get(&self, id: usize) -> Option<Stream> { - self.items.borrow().get(id).cloned() - } - - pub fn get_by_name(&self, name: &str) -> Option<Stream> { - let index = self.index.borrow(); - if let Some(&id) = index.get(name) { - self.items.borrow().get(id).cloned() - } else { - None - } - } - - pub fn get_by_identifier(&self, identifier: &Identifier) -> Option<Stream> { - match identifier.kind { - iggy_common::IdKind::Numeric => { - if let Ok(id) = identifier.get_u32_value() { - self.get(id as usize) - } else { - None - } - } - iggy_common::IdKind::String => { - if let Ok(name) = identifier.get_string_value() { - self.get_by_name(&name) - } else { - None - } - } - } - } - - pub fn remove(&self, id: usize) -> Option<Stream> { - let mut items = self.items.borrow_mut(); - let mut index = self.index.borrow_mut(); - - if !items.contains(id) { - return None; - } - - let stream = items.remove(id); - index.remove(&stream.name); - Some(stream) - } - - pub fn update_name(&self, identifier: &Identifier, new_name: String) -> Result<(), IggyError> { - let stream = self.get_by_identifier(identifier); - if let Some(stream) = stream { - let mut items = self.items.borrow_mut(); - let mut index = self.index.borrow_mut(); - - index.remove(&stream.name); - if let Some(s) = items.get_mut(stream.id) { - s.name = new_name.clone(); - } - index.insert(new_name, stream.id); - Ok(()) - } else { - Err(IggyError::ResourceNotFound("Stream".to_string())) + id: self.id, + name: self.name.clone(), + created_at: self.created_at, + stats: self.stats.clone(), + topics: self.topics.clone(), } } +} - pub fn purge(&self, id: usize) -> Result<(), IggyError> { - let items = self.items.borrow(); - if let Some(_stream) = items.get(id) { - // TODO: Purge all topics in the stream - Ok(()) - } else { - Err(IggyError::ResourceNotFound("Stream".to_string())) +impl Stream { + pub fn new(name: String, created_at: IggyTimestamp) -> Self { + Self { + id: 0, + name, + created_at, + stats: Arc::new(StreamStats::default()), + topics: Topics::new(), } } - - pub fn len(&self) -> usize { - self.items.borrow().len() - } - - pub fn is_empty(&self) -> bool { - self.items.borrow().is_empty() - } - - pub fn iter(&self) -> Vec<Stream> { - self.items - .borrow() - .iter() - .map(|(_, s): (usize, &Stream)| s.clone()) - .collect() - } } -// Define StreamsCommand enum and StateCommand implementation using the macro -define_state_command! { +// ============================================================================ +// Streams State Machine +// ============================================================================ + +define_state! { Streams, + StreamsInner { + index: AHashMap<String, usize>, + items: Slab<Stream>, + }, StreamsCommand, [CreateStream, UpdateStream, DeleteStream, PurgeStream] } -impl ApplyState for Streams { +impl Dispatch for StreamsInner { + type Cmd = StreamsCommand; type Output = (); - fn do_apply(&self, cmd: Self::Command) -> Self::Output { + fn dispatch(&self, cmd: &Self::Cmd) -> Self::Output { match cmd { - StreamsCommand::CreateStream(payload) => { - todo!("Handle Create stream with {:?}", payload) - } - StreamsCommand::UpdateStream(payload) => { - todo!("Handle Update stream with {:?}", payload) + StreamsCommand::CreateStream(_payload) => { + // Actual mutation logic will be implemented later } - StreamsCommand::DeleteStream(payload) => { - todo!("Handle Delete stream with {:?}", payload) - } - StreamsCommand::PurgeStream(payload) => todo!("Handle Purge stream with {:?}", payload), - } - } -} - -// Define TopicsCommand enum and StateCommand implementation using the macro -define_state_command! { - Topics, - TopicsCommand, - [CreateTopic, UpdateTopic, DeleteTopic, PurgeTopic] -} - -impl ApplyState for Topics { - type Output = (); - - fn do_apply(&self, cmd: Self::Command) -> Self::Output { - match cmd { - TopicsCommand::CreateTopic(payload) => todo!("Handle Create topic with {:?}", payload), - TopicsCommand::UpdateTopic(payload) => todo!("Handle Update topic with {:?}", payload), - TopicsCommand::DeleteTopic(payload) => todo!("Handle Delete topic with {:?}", payload), - TopicsCommand::PurgeTopic(payload) => todo!("Handle Purge topic with {:?}", payload), - } - } -} - -// Define PartitionsCommand enum and StateCommand implementation using the macro -define_state_command! { - Partitions, - PartitionsCommand, - [CreatePartitions, DeletePartitions, DeleteSegments] -} - -impl ApplyState for Partitions { - type Output = (); - - fn do_apply(&self, cmd: Self::Command) -> Self::Output { - match cmd { - PartitionsCommand::CreatePartitions(payload) => { - todo!("Handle Create partitions with {:?}", payload) + StreamsCommand::UpdateStream(_payload) => { + // Actual mutation logic will be implemented later } - PartitionsCommand::DeletePartitions(payload) => { - todo!("Handle Delete partitions with {:?}", payload) + StreamsCommand::DeleteStream(_payload) => { + // Actual mutation logic will be implemented later } - PartitionsCommand::DeleteSegments(payload) => { - todo!("Handle Delete segments with {:?}", payload) + StreamsCommand::PurgeStream(_payload) => { + // Actual mutation logic will be implemented later } } } diff --git a/core/metadata/src/stm/user.rs b/core/metadata/src/stm/user.rs index b31bdc764..043ec0bd2 100644 --- a/core/metadata/src/stm/user.rs +++ b/core/metadata/src/stm/user.rs @@ -15,27 +15,23 @@ // specific language governing permissions and limitations // under the License. -use crate::{ - permissioner::Permissioner, - stm::{ApplyState, StateCommand}, -}; +use crate::define_state; +use crate::permissioner::Permissioner; +use crate::stm::Dispatch; use ahash::AHashMap; -use bytes::Bytes; use iggy_common::change_password::ChangePassword; use iggy_common::create_user::CreateUser; use iggy_common::delete_user::DeleteUser; use iggy_common::update_permissions::UpdatePermissions; use iggy_common::update_user::UpdateUser; -use iggy_common::{ - BytesSerializable, Identifier, IggyError, IggyTimestamp, Permissions, PersonalAccessToken, - UserId, UserStatus, - header::{Operation, PrepareHeader}, - message::Message, -}; +use iggy_common::{IggyTimestamp, Permissions, PersonalAccessToken, UserId, UserStatus}; use slab::Slab; -use std::cell::RefCell; -#[derive(Debug, Clone)] +// ============================================================================ +// User Entity +// ============================================================================ + +#[derive(Debug, Clone, Default)] pub struct User { pub id: UserId, pub username: String, @@ -66,242 +62,41 @@ impl User { } } -#[derive(Debug, Clone, Default)] -pub struct Users { - index: RefCell<AHashMap<String, usize>>, - items: RefCell<Slab<User>>, - permissioner: RefCell<Permissioner>, +// ============================================================================ +// Users State Machine +// ============================================================================ + +define_state! { + Users, + UsersInner { + index: AHashMap<String, usize>, + items: Slab<User>, + permissioner: Permissioner, + }, + UsersCommand, + [CreateUser, UpdateUser, DeleteUser, ChangePassword, UpdatePermissions] } -impl Users { - pub fn new() -> Self { - Self { - index: RefCell::new(AHashMap::with_capacity(1024)), - items: RefCell::new(Slab::with_capacity(1024)), - permissioner: RefCell::new(Permissioner::new()), - } - } - - /// Insert a user and return the assigned ID - pub fn insert(&self, user: User) -> usize { - let mut items = self.items.borrow_mut(); - let mut index = self.index.borrow_mut(); - - let username = user.username.clone(); - let id = items.insert(user); - items[id].id = id as u32; - index.insert(username, id); - id - } - - /// Get user by ID - pub fn get(&self, id: usize) -> Option<User> { - self.items.borrow().get(id).cloned() - } +impl Dispatch for UsersInner { + type Cmd = UsersCommand; + type Output = (); - /// Get user by username or ID (via Identifier enum) - pub fn get_by_identifier(&self, identifier: &Identifier) -> Result<Option<User>, IggyError> { - match identifier.kind { - iggy_common::IdKind::Numeric => { - let id = identifier.get_u32_value()? as usize; - Ok(self.items.borrow().get(id).cloned()) - } - iggy_common::IdKind::String => { - let username = identifier.get_string_value()?; - let index = self.index.borrow(); - if let Some(&id) = index.get(&username) { - Ok(self.items.borrow().get(id).cloned()) - } else { - Ok(None) - } + fn dispatch(&self, cmd: &Self::Cmd) -> Self::Output { + match cmd { + UsersCommand::CreateUser(_payload) => { + // Actual mutation logic will be implemented later } - } - } - - /// Remove user by ID - pub fn remove(&self, id: usize) -> Option<User> { - let mut items = self.items.borrow_mut(); - let mut index = self.index.borrow_mut(); - - if !items.contains(id) { - return None; - } - - let user = items.remove(id); - index.remove(&user.username); - Some(user) - } - - /// Check if user exists - pub fn contains(&self, identifier: &Identifier) -> bool { - match identifier.kind { - iggy_common::IdKind::Numeric => { - if let Ok(id) = identifier.get_u32_value() { - self.items.borrow().contains(id as usize) - } else { - false - } + UsersCommand::UpdateUser(_payload) => { + // Actual mutation logic will be implemented later } - iggy_common::IdKind::String => { - if let Ok(username) = identifier.get_string_value() { - self.index.borrow().contains_key(&username) - } else { - false - } + UsersCommand::DeleteUser(_payload) => { + // Actual mutation logic will be implemented later } - } - } - - /// Get all users as a Vec - pub fn values(&self) -> Vec<User> { - self.items - .borrow() - .iter() - .map(|(_, u): (usize, &User)| u.clone()) - .collect() - } - - /// Get number of users - pub fn len(&self) -> usize { - self.items.borrow().len() - } - - /// Check if empty - pub fn is_empty(&self) -> bool { - self.items.borrow().is_empty() - } - - /// Check if username already exists - pub fn username_exists(&self, username: &str) -> bool { - self.index.borrow().contains_key(username) - } - - /// Get ID by username - pub fn get_id_by_username(&self, username: &str) -> Option<usize> { - self.index.borrow().get(username).copied() - } - - /// Initialize permissions for a user - pub fn init_permissions(&self, user_id: UserId, permissions: Option<Permissions>) { - self.permissioner - .borrow_mut() - .init_permissions(user_id, permissions); - } - - /// Update permissions for a user - pub fn update_permissions(&self, user_id: UserId, permissions: Option<Permissions>) { - self.permissioner - .borrow_mut() - .update_permissions_for_user(user_id, permissions); - } - - /// Delete permissions for a user - pub fn delete_permissions(&self, user_id: UserId) { - self.permissioner.borrow_mut().delete_permissions(user_id); - } - - /// Update username - pub fn update_username( - &self, - identifier: &Identifier, - new_username: String, - ) -> Result<(), IggyError> { - let id = match identifier.kind { - iggy_common::IdKind::Numeric => identifier.get_u32_value()? as usize, - iggy_common::IdKind::String => { - let username = identifier.get_string_value()?; - let index = self.index.borrow(); - *index - .get(&username) - .ok_or_else(|| IggyError::ResourceNotFound(username.to_string()))? - } - }; - - let old_username = { - let items = self.items.borrow(); - let user = items - .get(id) - .ok_or_else(|| IggyError::ResourceNotFound(identifier.to_string()))?; - user.username.clone() - }; - - if old_username == new_username { - return Ok(()); - } - - tracing::trace!( - "Updating username: '{}' → '{}' for user ID: {}", - old_username, - new_username, - id - ); - - { - let mut items = self.items.borrow_mut(); - let user = items - .get_mut(id) - .ok_or_else(|| IggyError::ResourceNotFound(identifier.to_string()))?; - user.username = new_username.clone(); - } - - let mut index = self.index.borrow_mut(); - index.remove(&old_username); - index.insert(new_username, id); - - Ok(()) - } -} - -#[derive(Debug)] -pub enum UsersCommand { - Create(CreateUser), - Update(UpdateUser), - Delete(DeleteUser), - ChangePassword(ChangePassword), - UpdatePermissions(UpdatePermissions), -} - -impl StateCommand for Users { - type Command = UsersCommand; - type Input = Message<PrepareHeader>; - - fn into_command(input: &Self::Input) -> Option<Self::Command> { - // TODO: rework this thing, so we don't copy the bytes on each request - let body = Bytes::copy_from_slice(input.body()); - match input.header().operation { - Operation::CreateUser => Some(UsersCommand::Create( - CreateUser::from_bytes(body.clone()).unwrap(), - )), - Operation::UpdateUser => Some(UsersCommand::Update( - UpdateUser::from_bytes(body.clone()).unwrap(), - )), - Operation::DeleteUser => Some(UsersCommand::Delete( - DeleteUser::from_bytes(body.clone()).unwrap(), - )), - Operation::ChangePassword => Some(UsersCommand::ChangePassword( - ChangePassword::from_bytes(body.clone()).unwrap(), - )), - Operation::UpdatePermissions => Some(UsersCommand::UpdatePermissions( - UpdatePermissions::from_bytes(body.clone()).unwrap(), - )), - _ => None, - } - } -} - -impl ApplyState for Users { - type Output = (); - - fn do_apply(&self, cmd: Self::Command) -> Self::Output { - match cmd { - UsersCommand::Create(payload) => todo!("Handle Create user with {:?}", payload), - UsersCommand::Update(payload) => todo!("Handle Update user with {:?}", payload), - UsersCommand::Delete(payload) => todo!("Handle Delete user with {:?}", payload), - UsersCommand::ChangePassword(payload) => { - todo!("Handle Change password with {:?}", payload) + UsersCommand::ChangePassword(_payload) => { + // Actual mutation logic will be implemented later } - UsersCommand::UpdatePermissions(payload) => { - todo!("Handle Update permissions with {:?}", payload) + UsersCommand::UpdatePermissions(_payload) => { + // Actual mutation logic will be implemented later } } } diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index 3e3bb5dc4..c63e801a2 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -71,7 +71,7 @@ hash32 = "1.0.0" human-repr = { workspace = true } iggy_common = { workspace = true } jsonwebtoken = { version = "10.2.0", features = ["rust_crypto"] } -left-right = "0.11" +left-right = { workspace = true } lending-iterator = "0.1.7" mimalloc = { workspace = true, optional = true } mime_guess = { version = "2.0", optional = true }
