This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch invert-state-handler-dep in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 3e8c0984f2de5129a9c622ff16f6ab6ac17aa85f Author: Hubert Gruszecki <[email protected]> AuthorDate: Mon Feb 9 13:46:15 2026 +0100 refactor(metadata): invert state handler dependency with per-command dispatch Split define_state! into define_state! (struct + wrapper) and collect_handlers! (enum, Command, dispatch, Absorb). Each command type now implements StateHandler<S> for the state it mutates, moving handler logic from a central match block onto the command. Removes Handler trait and impl_absorb! macro. All 19 handlers migrated (10 stream, 7 user, 2 consumer_group). --- core/metadata/src/stm/consumer_group.rs | 136 +++++------ core/metadata/src/stm/mod.rs | 158 ++++++------- core/metadata/src/stm/stream.rs | 402 +++++++++++++++++--------------- core/metadata/src/stm/user.rs | 246 ++++++++++--------- 4 files changed, 484 insertions(+), 458 deletions(-) diff --git a/core/metadata/src/stm/consumer_group.rs b/core/metadata/src/stm/consumer_group.rs index 3a3e947fc..c4713e58a 100644 --- a/core/metadata/src/stm/consumer_group.rs +++ b/core/metadata/src/stm/consumer_group.rs @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. -use crate::stm::Handler; -use crate::{define_state, impl_absorb}; +use crate::stm::StateHandler; +use crate::{collect_handlers, define_state}; use ahash::AHashMap; use iggy_common::create_consumer_group::CreateConsumerGroup; use iggy_common::delete_consumer_group::DeleteConsumerGroup; @@ -94,10 +94,15 @@ define_state! { topic_index: AHashMap<(usize, usize), Vec<usize>>, topic_name_index: AHashMap<(Arc<str>, Arc<str>), Vec<usize>>, items: Slab<ConsumerGroup>, - }, - [CreateConsumerGroup, DeleteConsumerGroup] + } +} + +collect_handlers! { + ConsumerGroups { + CreateConsumerGroup, + DeleteConsumerGroup, + } } -impl_absorb!(ConsumerGroupsInner, ConsumerGroupsCommand); impl ConsumerGroupsInner { fn resolve_consumer_group_id_by_identifiers( @@ -106,7 +111,6 @@ impl ConsumerGroupsInner { topic_id: &Identifier, group_id: &Identifier, ) -> Option<usize> { - // Resolve by numeric IDs if let (Ok(s), Ok(t)) = (stream_id.get_u32_value(), topic_id.get_u32_value()) { let groups_in_topic = self.topic_index.get(&(s as usize, t as usize))?; @@ -129,7 +133,6 @@ impl ConsumerGroupsInner { }; } - // Resolve by string names if let (Ok(s), Ok(t)) = (stream_id.get_string_value(), topic_id.get_string_value()) { let key = (Arc::from(s.as_str()), Arc::from(t.as_str())); let groups_in_topic = self.topic_name_index.get(&key)?; @@ -157,70 +160,71 @@ impl ConsumerGroupsInner { } } -impl Handler for ConsumerGroupsInner { - fn handle(&mut self, cmd: &Self::Cmd) { - // TODO: This is all an hack, we need to figure out how to do this, in a way where `Identifier` does not reach - // this stage of execution. - match cmd { - ConsumerGroupsCommand::CreateConsumerGroup(payload) => { - let name: Arc<str> = Arc::from(payload.name.as_str()); - if self.name_index.contains_key(&name) { - return; - } +// TODO: This is all a hack, we need to figure out how to do this in a way where `Identifier` +// does not reach this stage of execution. - let group = ConsumerGroup::new(name.clone()); - let id = self.items.insert(group); - self.items[id].id = id; +impl StateHandler<ConsumerGroupsInner> for CreateConsumerGroup { + fn apply(&self, state: &mut ConsumerGroupsInner) { + let name: Arc<str> = Arc::from(self.name.as_str()); + if state.name_index.contains_key(&name) { + return; + } - self.name_index.insert(name.clone(), id); + let group = ConsumerGroup::new(name.clone()); + let id = state.items.insert(group); + state.items[id].id = id; + + state.name_index.insert(name.clone(), id); + + if let (Ok(s), Ok(t)) = ( + self.stream_id.get_u32_value(), + self.topic_id.get_u32_value(), + ) { + state + .topic_index + .entry((s as usize, t as usize)) + .or_default() + .push(id); + } - if let (Ok(s), Ok(t)) = ( - payload.stream_id.get_u32_value(), - payload.topic_id.get_u32_value(), - ) { - self.topic_index - .entry((s as usize, t as usize)) - .or_default() - .push(id); - } + if let (Ok(s), Ok(t)) = ( + self.stream_id.get_string_value(), + self.topic_id.get_string_value(), + ) { + let key = (Arc::from(s.as_str()), Arc::from(t.as_str())); + state.topic_name_index.entry(key).or_default().push(id); + } + } +} - if let (Ok(s), Ok(t)) = ( - payload.stream_id.get_string_value(), - payload.topic_id.get_string_value(), - ) { - let key = (Arc::from(s.as_str()), Arc::from(t.as_str())); - self.topic_name_index.entry(key).or_default().push(id); - } - } +impl StateHandler<ConsumerGroupsInner> for DeleteConsumerGroup { + fn apply(&self, state: &mut ConsumerGroupsInner) { + let Some(id) = state.resolve_consumer_group_id_by_identifiers( + &self.stream_id, + &self.topic_id, + &self.group_id, + ) else { + return; + }; - ConsumerGroupsCommand::DeleteConsumerGroup(payload) => { - if let Some(id) = self.resolve_consumer_group_id_by_identifiers( - &payload.stream_id, - &payload.topic_id, - &payload.group_id, - ) { - let group = self.items.remove(id); - - self.name_index.remove(&group.name); - - if let (Ok(s), Ok(t)) = ( - payload.stream_id.get_u32_value(), - payload.topic_id.get_u32_value(), - ) && let Some(vec) = self.topic_index.get_mut(&(s as usize, t as usize)) - { - vec.retain(|&x| x != id); - } - - if let (Ok(s), Ok(t)) = ( - payload.stream_id.get_string_value(), - payload.topic_id.get_string_value(), - ) { - let key = (Arc::from(s.as_str()), Arc::from(t.as_str())); - if let Some(vec) = self.topic_name_index.get_mut(&key) { - vec.retain(|&x| x != id); - } - } - } + let group = state.items.remove(id); + state.name_index.remove(&group.name); + + if let (Ok(s), Ok(t)) = ( + self.stream_id.get_u32_value(), + self.topic_id.get_u32_value(), + ) && let Some(vec) = state.topic_index.get_mut(&(s as usize, t as usize)) + { + vec.retain(|&x| x != id); + } + + if let (Ok(s), Ok(t)) = ( + self.stream_id.get_string_value(), + self.topic_id.get_string_value(), + ) { + let key = (Arc::from(s.as_str()), Arc::from(t.as_str())); + if let Some(vec) = state.topic_name_index.get_mut(&key) { + vec.retain(|&x| x != id); } } } diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs index c92b0ca10..802da6de5 100644 --- a/core/metadata/src/stm/mod.rs +++ b/core/metadata/src/stm/mod.rs @@ -70,9 +70,11 @@ pub trait Command { fn parse(input: Self::Input) -> Result<Self::Cmd, Self::Input>; } -/// Handles commands. User-implemented business logic. -pub trait Handler: Command { - fn handle(&mut self, cmd: &Self::Cmd); +/// Per-command handler for a given state type. +/// Each command struct implements this for the state it mutates, +/// replacing the monolithic `Handler::handle()` match block. +pub trait StateHandler<S> { + fn apply(&self, state: &mut S); } #[derive(Debug)] @@ -100,7 +102,7 @@ where impl<T> LeftRight<T, <T as Command>::Cmd> where - T: Absorb<<T as Command>::Cmd> + Clone + Handler, + T: Absorb<<T as Command>::Cmd> + Clone + Command, { pub fn do_apply(&self, cmd: <T as Command>::Cmd) { self.write @@ -125,51 +127,23 @@ pub trait StateMachine { fn update(&self, input: Self::Input) -> Self::Output; } -/// Generates a state machine with convention-based storage. +/// Generates the state's inner struct and wrapper type. /// /// # Generated items /// - `{$state}Inner` struct with the specified fields (the data) -/// - `{$state}Command` enum with variants for each operation -/// - `$state` wrapper struct (non-generic, contains LeftRight storage) -/// - `Command` impl for `{$state}Inner` (parsing) -/// - `State` impl for `$state` +/// - `$state` wrapper struct (contains LeftRight storage) /// - `From<LeftRight<...>>` impl for `$state` +/// - `From<{$state}Inner>` impl for `$state` /// -/// # User must implement -/// - `Handler` for `{$state}Inner` (business logic) -/// - `impl_absorb!` for `{$state}Inner` and `{$state}Command` -/// -/// # Example -/// ```ignore -/// define_state! { -/// Streams { -/// index: AHashMap<String, usize>, -/// items: Slab<Stream>, -/// }, -/// [CreateStream, UpdateStream, DeleteStream] -/// } -/// -/// // User implements Handler manually: -/// impl Handler for StreamsInner { -/// fn handle(&mut self, cmd: &StreamsCommand) { -/// match cmd { -/// StreamsCommand::CreateStream(payload) => { /* ... */ } -/// // ... -/// } -/// } -/// } -/// -/// // User implements Absorb via macro: -/// impl_absorb!(StreamsInner, StreamsCommand); -/// ``` -// TODO: The `operation` argument can be removed, once we create an trait for mapping. +/// The command enum, parsing, dispatch, and Absorb impl are generated +/// by `collect_handlers!` separately, keeping state definition decoupled +/// from the set of operations. #[macro_export] macro_rules! define_state { ( $state:ident { $($field_name:ident : $field_type:ty),* $(,)? - }, - [$($operation:ident),* $(,)?] + } ) => { paste::paste! { #[derive(Debug, Clone, Default)] @@ -185,13 +159,6 @@ macro_rules! define_state { } } - #[derive(Debug, Clone)] - pub enum [<$state Command>] { - $( - $operation($operation), - )* - } - #[derive(Debug)] pub struct $state { inner: $crate::stm::LeftRight<[<$state Inner>], [<$state Command>]>, @@ -209,16 +176,34 @@ macro_rules! define_state { left_right.into() } } + } + }; +} - impl $crate::stm::State for $state { - type Input = <[<$state Inner>] as $crate::stm::Command>::Input; - type Output = (); - - fn apply(&self, input: Self::Input) -> Result<Self::Output, Self::Input> { - let cmd = <[<$state Inner>] as $crate::stm::Command>::parse(input)?; - self.inner.do_apply(cmd); - Ok(()) - } +/// Generates the command enum, parsing, dispatch, State, and Absorb for a state type. +/// +/// # Generated items +/// - `{$state}Command` enum with one variant per operation +/// - `Command` impl for `{$state}Inner` (parses `Message<PrepareHeader>`) +/// - `{$state}Inner::dispatch()` method (routes each variant to `StateHandler::apply()`) +/// - `State` impl for `$state` wrapper +/// - `Absorb<{$state}Command>` impl for `{$state}Inner` +/// +/// # Requirements +/// Each listed operation type must implement `StateHandler<{$state}Inner>`. +#[macro_export] +macro_rules! collect_handlers { + ( + $state:ident { + $($operation:ident),* $(,)? + } + ) => { + paste::paste! { + #[derive(Debug, Clone)] + pub enum [<$state Command>] { + $( + $operation($operation), + )* } impl $crate::stm::Command for [<$state Inner>] { @@ -242,45 +227,42 @@ macro_rules! define_state { } } } - } - }; -} - -// This macro is really sad, but we can't do blanket impl from below, due to orphan rule. -// impl<T> Absorb<T::Cmd> for T -// where -// T: Handler + Clone, -// { -// fn absorb_first(&mut self, cmd: &mut T::Cmd, _other: &Self) { -// self.handle(cmd); -// } - -// fn absorb_second(&mut self, cmd: T::Cmd, _other: &Self) { -// self.handle(&cmd); -// } + impl [<$state Inner>] { + fn dispatch(&mut self, cmd: &[<$state Command>]) { + match cmd { + $( + [<$state Command>]::$operation(payload) => { + $crate::stm::StateHandler::apply(payload, self); + }, + )* + } + } + } -// fn sync_with(&mut self, first: &Self) { -// *self = first.clone(); -// } + impl $crate::stm::State for $state { + type Input = <[<$state Inner>] as $crate::stm::Command>::Input; + type Output = (); -// fn drop_first(self: Box<Self>) {} -// fn drop_second(self: Box<Self>) {} -// } -#[macro_export] -macro_rules! impl_absorb { - ($inner:ident, $cmd:ident) => { - impl left_right::Absorb<$cmd> for $inner { - fn absorb_first(&mut self, cmd: &mut $cmd, _other: &Self) { - self.handle(cmd); + fn apply(&self, input: Self::Input) -> Result<Self::Output, Self::Input> { + let cmd = <[<$state Inner>] as $crate::stm::Command>::parse(input)?; + self.inner.do_apply(cmd); + Ok(()) + } } - fn absorb_second(&mut self, cmd: $cmd, _other: &Self) { - self.handle(&cmd); - } + impl left_right::Absorb<[<$state Command>]> for [<$state Inner>] { + fn absorb_first(&mut self, cmd: &mut [<$state Command>], _other: &Self) { + self.dispatch(cmd); + } + + fn absorb_second(&mut self, cmd: [<$state Command>], _other: &Self) { + self.dispatch(&cmd); + } - fn sync_with(&mut self, first: &Self) { - *self = first.clone(); + fn sync_with(&mut self, first: &Self) { + *self = first.clone(); + } } } }; diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs index d1a3da212..d78728228 100644 --- a/core/metadata/src/stm/stream.rs +++ b/core/metadata/src/stm/stream.rs @@ -16,8 +16,8 @@ // under the License. use crate::stats::{StreamStats, TopicStats}; -use crate::stm::Handler; -use crate::{define_state, impl_absorb}; +use crate::stm::StateHandler; +use crate::{collect_handlers, define_state}; use ahash::AHashMap; use iggy_common::create_partitions::CreatePartitions; use iggy_common::create_stream::CreateStream; @@ -168,8 +168,11 @@ define_state! { Streams { index: AHashMap<Arc<str>, usize>, items: Slab<Stream>, - }, - [ + } +} + +collect_handlers! { + Streams { CreateStream, UpdateStream, DeleteStream, @@ -179,12 +182,10 @@ define_state! { DeleteTopic, PurgeTopic, CreatePartitions, - DeletePartitions - ] + DeletePartitions, + } } -impl_absorb!(StreamsInner, StreamsCommand); - impl StreamsInner { fn resolve_stream_id(&self, identifier: &iggy_common::Identifier) -> Option<usize> { use iggy_common::IdKind; @@ -229,206 +230,229 @@ impl StreamsInner { } } -impl Handler for StreamsInner { - fn handle(&mut self, cmd: &StreamsCommand) { - match cmd { - StreamsCommand::CreateStream(payload) => { - let name_arc: Arc<str> = Arc::from(payload.name.as_str()); - if self.index.contains_key(&name_arc) { - return; - } +impl StateHandler<StreamsInner> for CreateStream { + fn apply(&self, state: &mut StreamsInner) { + let name_arc: Arc<str> = Arc::from(self.name.as_str()); + if state.index.contains_key(&name_arc) { + return; + } - let stream = Stream { - id: 0, - name: name_arc.clone(), - created_at: iggy_common::IggyTimestamp::now(), - stats: Arc::new(StreamStats::default()), - topics: Slab::new(), - topic_index: AHashMap::default(), - }; + let stream = Stream { + id: 0, + name: name_arc.clone(), + created_at: iggy_common::IggyTimestamp::now(), + stats: Arc::new(StreamStats::default()), + topics: Slab::new(), + topic_index: AHashMap::default(), + }; - let id = self.items.insert(stream); - if let Some(stream) = self.items.get_mut(id) { - stream.id = id; - } - self.index.insert(name_arc, id); - } - StreamsCommand::UpdateStream(payload) => { - let Some(stream_id) = self.resolve_stream_id(&payload.stream_id) else { - return; - }; - let Some(stream) = self.items.get_mut(stream_id) else { - return; - }; + let id = state.items.insert(stream); + if let Some(stream) = state.items.get_mut(id) { + stream.id = id; + } + state.index.insert(name_arc, id); + } +} - let new_name_arc: Arc<str> = Arc::from(payload.name.as_str()); - if let Some(&existing_id) = self.index.get(&new_name_arc) - && existing_id != stream_id - { - return; - } +impl StateHandler<StreamsInner> for UpdateStream { + fn apply(&self, state: &mut StreamsInner) { + let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else { + return; + }; + let Some(stream) = state.items.get_mut(stream_id) else { + return; + }; + + let new_name_arc: Arc<str> = Arc::from(self.name.as_str()); + if let Some(&existing_id) = state.index.get(&new_name_arc) + && existing_id != stream_id + { + return; + } - self.index.remove(&stream.name); - stream.name = new_name_arc.clone(); - self.index.insert(new_name_arc, stream_id); - } - StreamsCommand::DeleteStream(payload) => { - let Some(stream_id) = self.resolve_stream_id(&payload.stream_id) else { - return; - }; + state.index.remove(&stream.name); + stream.name = new_name_arc.clone(); + state.index.insert(new_name_arc, stream_id); + } +} - if let Some(stream) = self.items.get(stream_id) { - let name = stream.name.clone(); +impl StateHandler<StreamsInner> for DeleteStream { + fn apply(&self, state: &mut StreamsInner) { + let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else { + return; + }; - self.items.remove(stream_id); - self.index.remove(&name); - } - } - StreamsCommand::PurgeStream(_payload) => { - // TODO - todo!(); - } - StreamsCommand::CreateTopic(payload) => { - let Some(stream_id) = self.resolve_stream_id(&payload.stream_id) else { - return; - }; - let Some(stream) = self.items.get_mut(stream_id) else { - return; - }; + if let Some(stream) = state.items.get(stream_id) { + let name = stream.name.clone(); - let name_arc: Arc<str> = Arc::from(payload.name.as_str()); - if stream.topic_index.contains_key(&name_arc) { - return; - } + state.items.remove(stream_id); + state.index.remove(&name); + } + } +} - let topic = Topic { - id: 0, // Will be assigned by slab - name: name_arc.clone(), - created_at: iggy_common::IggyTimestamp::now(), - replication_factor: payload.replication_factor.unwrap_or(1), - message_expiry: payload.message_expiry, - compression_algorithm: payload.compression_algorithm, - max_topic_size: payload.max_topic_size, - stats: Arc::new(TopicStats::new(stream.stats.clone())), - partitions: Vec::new(), - round_robin_counter: Arc::new(AtomicUsize::new(0)), - }; +impl StateHandler<StreamsInner> for PurgeStream { + fn apply(&self, _state: &mut StreamsInner) { + // TODO + todo!(); + } +} - let topic_id = stream.topics.insert(topic); - if let Some(topic) = stream.topics.get_mut(topic_id) { - topic.id = topic_id; - - for partition_id in 0..payload.partitions_count as usize { - let partition = Partition { - id: partition_id, - created_at: iggy_common::IggyTimestamp::now(), - }; - topic.partitions.push(partition); - } - } +impl StateHandler<StreamsInner> for CreateTopic { + fn apply(&self, state: &mut StreamsInner) { + let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else { + return; + }; + let Some(stream) = state.items.get_mut(stream_id) else { + return; + }; + + let name_arc: Arc<str> = Arc::from(self.name.as_str()); + if stream.topic_index.contains_key(&name_arc) { + return; + } - stream.topic_index.insert(name_arc, topic_id); - } - StreamsCommand::UpdateTopic(payload) => { - let Some(stream_id) = self.resolve_stream_id(&payload.stream_id) else { - return; - }; - let Some(topic_id) = self.resolve_topic_id(stream_id, &payload.topic_id) else { - return; - }; + let topic = Topic { + id: 0, + name: name_arc.clone(), + created_at: iggy_common::IggyTimestamp::now(), + replication_factor: self.replication_factor.unwrap_or(1), + message_expiry: self.message_expiry, + compression_algorithm: self.compression_algorithm, + max_topic_size: self.max_topic_size, + stats: Arc::new(TopicStats::new(stream.stats.clone())), + partitions: Vec::new(), + round_robin_counter: Arc::new(AtomicUsize::new(0)), + }; - let Some(stream) = self.items.get_mut(stream_id) else { - return; - }; - let Some(topic) = stream.topics.get_mut(topic_id) else { - return; + let topic_id = stream.topics.insert(topic); + if let Some(topic) = stream.topics.get_mut(topic_id) { + topic.id = topic_id; + + for partition_id in 0..self.partitions_count as usize { + let partition = Partition { + id: partition_id, + created_at: iggy_common::IggyTimestamp::now(), }; + topic.partitions.push(partition); + } + } - let new_name_arc: Arc<str> = Arc::from(payload.name.as_str()); - if let Some(&existing_id) = stream.topic_index.get(&new_name_arc) - && existing_id != topic_id - { - return; - } + stream.topic_index.insert(name_arc, topic_id); + } +} - stream.topic_index.remove(&topic.name); - topic.name = new_name_arc.clone(); - topic.compression_algorithm = payload.compression_algorithm; - topic.message_expiry = payload.message_expiry; - topic.max_topic_size = payload.max_topic_size; - if let Some(rf) = payload.replication_factor { - topic.replication_factor = rf; - } - stream.topic_index.insert(new_name_arc, topic_id); - } - StreamsCommand::DeleteTopic(payload) => { - let Some(stream_id) = self.resolve_stream_id(&payload.stream_id) else { - return; - }; - let Some(topic_id) = self.resolve_topic_id(stream_id, &payload.topic_id) else { - return; - }; - let Some(stream) = self.items.get_mut(stream_id) else { - return; - }; +impl StateHandler<StreamsInner> for UpdateTopic { + fn apply(&self, state: &mut StreamsInner) { + let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else { + return; + }; + let Some(topic_id) = state.resolve_topic_id(stream_id, &self.topic_id) else { + return; + }; + + let Some(stream) = state.items.get_mut(stream_id) else { + return; + }; + let Some(topic) = stream.topics.get_mut(topic_id) else { + return; + }; + + let new_name_arc: Arc<str> = Arc::from(self.name.as_str()); + if let Some(&existing_id) = stream.topic_index.get(&new_name_arc) + && existing_id != topic_id + { + return; + } - if let Some(topic) = stream.topics.get(topic_id) { - let name = topic.name.clone(); - stream.topics.remove(topic_id); - stream.topic_index.remove(&name); - } - } - StreamsCommand::PurgeTopic(_payload) => { - // TODO: - todo!(); - } - StreamsCommand::CreatePartitions(payload) => { - let Some(stream_id) = self.resolve_stream_id(&payload.stream_id) else { - return; - }; - let Some(topic_id) = self.resolve_topic_id(stream_id, &payload.topic_id) else { - return; - }; + stream.topic_index.remove(&topic.name); + topic.name = new_name_arc.clone(); + topic.compression_algorithm = self.compression_algorithm; + topic.message_expiry = self.message_expiry; + topic.max_topic_size = self.max_topic_size; + if let Some(rf) = self.replication_factor { + topic.replication_factor = rf; + } + stream.topic_index.insert(new_name_arc, topic_id); + } +} - let Some(stream) = self.items.get_mut(stream_id) else { - return; - }; - let Some(topic) = stream.topics.get_mut(topic_id) else { - return; - }; +impl StateHandler<StreamsInner> for DeleteTopic { + fn apply(&self, state: &mut StreamsInner) { + let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else { + return; + }; + let Some(topic_id) = state.resolve_topic_id(stream_id, &self.topic_id) else { + return; + }; + let Some(stream) = state.items.get_mut(stream_id) else { + return; + }; + + if let Some(topic) = stream.topics.get(topic_id) { + let name = topic.name.clone(); + stream.topics.remove(topic_id); + stream.topic_index.remove(&name); + } + } +} - let current_partition_count = topic.partitions.len(); - for i in 0..payload.partitions_count as usize { - let partition_id = current_partition_count + i; - let partition = Partition { - id: partition_id, - created_at: iggy_common::IggyTimestamp::now(), - }; - topic.partitions.push(partition); - } - } - StreamsCommand::DeletePartitions(payload) => { - let Some(stream_id) = self.resolve_stream_id(&payload.stream_id) else { - return; - }; - let Some(topic_id) = self.resolve_topic_id(stream_id, &payload.topic_id) else { - return; - }; +impl StateHandler<StreamsInner> for PurgeTopic { + fn apply(&self, _state: &mut StreamsInner) { + // TODO + todo!(); + } +} - let Some(stream) = self.items.get_mut(stream_id) else { - return; - }; - let Some(topic) = stream.topics.get_mut(topic_id) else { - return; - }; +impl StateHandler<StreamsInner> for CreatePartitions { + fn apply(&self, state: &mut StreamsInner) { + let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else { + return; + }; + let Some(topic_id) = state.resolve_topic_id(stream_id, &self.topic_id) else { + return; + }; + + let Some(stream) = state.items.get_mut(stream_id) else { + return; + }; + let Some(topic) = stream.topics.get_mut(topic_id) else { + return; + }; + + let current_partition_count = topic.partitions.len(); + for i in 0..self.partitions_count as usize { + let partition_id = current_partition_count + i; + let partition = Partition { + id: partition_id, + created_at: iggy_common::IggyTimestamp::now(), + }; + topic.partitions.push(partition); + } + } +} - let count_to_delete = payload.partitions_count as usize; - if count_to_delete > 0 && count_to_delete <= topic.partitions.len() { - topic - .partitions - .truncate(topic.partitions.len() - count_to_delete); - } - } +impl StateHandler<StreamsInner> for DeletePartitions { + fn apply(&self, state: &mut StreamsInner) { + let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else { + return; + }; + let Some(topic_id) = state.resolve_topic_id(stream_id, &self.topic_id) else { + return; + }; + + let Some(stream) = state.items.get_mut(stream_id) else { + return; + }; + let Some(topic) = stream.topics.get_mut(topic_id) else { + return; + }; + + let count_to_delete = self.partitions_count as usize; + if count_to_delete > 0 && count_to_delete <= topic.partitions.len() { + topic + .partitions + .truncate(topic.partitions.len() - count_to_delete); } } } diff --git a/core/metadata/src/stm/user.rs b/core/metadata/src/stm/user.rs index 81ef0b018..909fae432 100644 --- a/core/metadata/src/stm/user.rs +++ b/core/metadata/src/stm/user.rs @@ -16,8 +16,8 @@ // under the License. use crate::permissioner::Permissioner; -use crate::stm::Handler; -use crate::{define_state, impl_absorb}; +use crate::stm::StateHandler; +use crate::{collect_handlers, define_state}; use ahash::AHashMap; use iggy_common::change_password::ChangePassword; use iggy_common::create_personal_access_token::CreatePersonalAccessToken; @@ -82,18 +82,20 @@ define_state! { items: Slab<User>, personal_access_tokens: AHashMap<UserId, AHashMap<Arc<str>, PersonalAccessToken>>, permissioner: Permissioner, - }, - [ + } +} + +collect_handlers! { + Users { CreateUser, UpdateUser, DeleteUser, ChangePassword, UpdatePermissions, CreatePersonalAccessToken, - DeletePersonalAccessToken - ] + DeletePersonalAccessToken, + } } -impl_absorb!(UsersInner, UsersCommand); impl UsersInner { fn resolve_user_id(&self, identifier: &iggy_common::Identifier) -> Option<usize> { @@ -115,123 +117,137 @@ impl UsersInner { } } -impl Handler for UsersInner { - fn handle(&mut self, cmd: &UsersCommand) { - match cmd { - UsersCommand::CreateUser(payload) => { - let username_arc: Arc<str> = Arc::from(payload.username.as_str()); - if self.index.contains_key(&username_arc) { - return; - } +impl StateHandler<UsersInner> for CreateUser { + fn apply(&self, state: &mut UsersInner) { + let username_arc: Arc<str> = Arc::from(self.username.as_str()); + if state.index.contains_key(&username_arc) { + return; + } - let user = User { - id: 0, - username: username_arc.clone(), - password_hash: Arc::from(payload.password.as_str()), - status: payload.status, - created_at: iggy_common::IggyTimestamp::now(), - permissions: payload.permissions.as_ref().map(|p| Arc::new(p.clone())), - }; - - let id = self.items.insert(user); - if let Some(user) = self.items.get_mut(id) { - user.id = id as UserId; - } + let user = User { + id: 0, + username: username_arc.clone(), + password_hash: Arc::from(self.password.as_str()), + status: self.status, + created_at: iggy_common::IggyTimestamp::now(), + permissions: self.permissions.as_ref().map(|p| Arc::new(p.clone())), + }; + + let id = state.items.insert(user); + if let Some(user) = state.items.get_mut(id) { + user.id = id as UserId; + } - self.index.insert(username_arc, id as UserId); - self.personal_access_tokens - .insert(id as UserId, AHashMap::default()); - } - UsersCommand::UpdateUser(payload) => { - let Some(user_id) = self.resolve_user_id(&payload.user_id) else { - return; - }; - - let Some(user) = self.items.get_mut(user_id) else { - return; - }; - - if let Some(new_username) = &payload.username { - let new_username_arc: Arc<str> = Arc::from(new_username.as_str()); - if let Some(&existing_id) = self.index.get(&new_username_arc) - && existing_id != user_id as UserId - { - return; - } - - self.index.remove(&user.username); - user.username = new_username_arc.clone(); - self.index.insert(new_username_arc, user_id as UserId); - } + state.index.insert(username_arc, id as UserId); + state + .personal_access_tokens + .insert(id as UserId, AHashMap::default()); + } +} - if let Some(new_status) = payload.status { - user.status = new_status; - } - } - UsersCommand::DeleteUser(payload) => { - let Some(user_id) = self.resolve_user_id(&payload.user_id) else { - return; - }; - - if let Some(user) = self.items.get(user_id) { - let username = user.username.clone(); - self.items.remove(user_id); - self.index.remove(&username); - self.personal_access_tokens.remove(&(user_id as UserId)); - } +impl StateHandler<UsersInner> for UpdateUser { + fn apply(&self, state: &mut UsersInner) { + let Some(user_id) = state.resolve_user_id(&self.user_id) else { + return; + }; + + let Some(user) = state.items.get_mut(user_id) else { + return; + }; + + if let Some(new_username) = &self.username { + let new_username_arc: Arc<str> = Arc::from(new_username.as_str()); + if let Some(&existing_id) = state.index.get(&new_username_arc) + && existing_id != user_id as UserId + { + return; } - UsersCommand::ChangePassword(payload) => { - let Some(user_id) = self.resolve_user_id(&payload.user_id) else { - return; - }; - if let Some(user) = self.items.get_mut(user_id) { - user.password_hash = Arc::from(payload.new_password.as_str()); - } - } - UsersCommand::UpdatePermissions(payload) => { - let Some(user_id) = self.resolve_user_id(&payload.user_id) else { - return; - }; + state.index.remove(&user.username); + user.username = new_username_arc.clone(); + state.index.insert(new_username_arc, user_id as UserId); + } - if let Some(user) = self.items.get_mut(user_id) { - user.permissions = payload.permissions.as_ref().map(|p| Arc::new(p.clone())); - } - } - UsersCommand::CreatePersonalAccessToken(payload) => { - // TODO: Stub untill protocol gets adjusted. - let user_id = 0; - let user_tokens = self.personal_access_tokens.entry(user_id).or_default(); - let name_arc: Arc<str> = Arc::from(payload.name.as_str()); - if user_tokens.contains_key(&name_arc) { - return; - } + if let Some(new_status) = self.status { + user.status = new_status; + } + } +} - let expiry_at = - PersonalAccessToken::calculate_expiry_at(IggyTimestamp::now(), payload.expiry); - if let Some(expiry_at) = expiry_at - && expiry_at.as_micros() <= IggyTimestamp::now().as_micros() - { - return; - } +impl StateHandler<UsersInner> for DeleteUser { + fn apply(&self, state: &mut UsersInner) { + let Some(user_id) = state.resolve_user_id(&self.user_id) else { + return; + }; + + if let Some(user) = state.items.get(user_id) { + let username = user.username.clone(); + state.items.remove(user_id); + state.index.remove(&username); + state.personal_access_tokens.remove(&(user_id as UserId)); + } + } +} - let (pat, _) = PersonalAccessToken::new( - user_id, - payload.name.as_ref(), - IggyTimestamp::now(), - payload.expiry, - ); - user_tokens.insert(name_arc, pat); - } - UsersCommand::DeletePersonalAccessToken(payload) => { - // TODO: Stub untill protocol gets adjusted. - let user_id = 0; +impl StateHandler<UsersInner> for ChangePassword { + fn apply(&self, state: &mut UsersInner) { + let Some(user_id) = state.resolve_user_id(&self.user_id) else { + return; + }; - if let Some(user_tokens) = self.personal_access_tokens.get_mut(&user_id) { - let name_arc: Arc<str> = Arc::from(payload.name.as_str()); - user_tokens.remove(&name_arc); - } - } + if let Some(user) = state.items.get_mut(user_id) { + user.password_hash = Arc::from(self.new_password.as_str()); + } + } +} + +impl StateHandler<UsersInner> for UpdatePermissions { + fn apply(&self, state: &mut UsersInner) { + let Some(user_id) = state.resolve_user_id(&self.user_id) else { + return; + }; + + if let Some(user) = state.items.get_mut(user_id) { + user.permissions = self.permissions.as_ref().map(|p| Arc::new(p.clone())); + } + } +} + +impl StateHandler<UsersInner> for CreatePersonalAccessToken { + fn apply(&self, state: &mut UsersInner) { + // TODO: Stub until protocol gets adjusted. + let user_id = 0; + let user_tokens = state.personal_access_tokens.entry(user_id).or_default(); + let name_arc: Arc<str> = Arc::from(self.name.as_str()); + if user_tokens.contains_key(&name_arc) { + return; + } + + let expiry_at = PersonalAccessToken::calculate_expiry_at(IggyTimestamp::now(), self.expiry); + if let Some(expiry_at) = expiry_at + && expiry_at.as_micros() <= IggyTimestamp::now().as_micros() + { + return; + } + + let (pat, _) = PersonalAccessToken::new( + user_id, + self.name.as_ref(), + IggyTimestamp::now(), + self.expiry, + ); + user_tokens.insert(name_arc, pat); + } +} + +impl StateHandler<UsersInner> for DeletePersonalAccessToken { + fn apply(&self, state: &mut UsersInner) { + // TODO: Stub until protocol gets adjusted. + let user_id = 0; + + if let Some(user_tokens) = state.personal_access_tokens.get_mut(&user_id) { + let name_arc: Arc<str> = Arc::from(self.name.as_str()); + user_tokens.remove(&name_arc); } } }
