This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch stm-output in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 95c11e235b6d60e46d5740051505a7b2a89779dd Author: Hubert Gruszecki <[email protected]> AuthorDate: Mon Feb 9 17:51:08 2026 +0100 refactor(metadata):add typed return values to state handlers, split stm/mod.rs and mod.rs mixed traits, unsafe left-right infra, and code-gen macros in a single 400-line file, making each concern hard to review independently. StateHandler also returned void, preventing commands from producing results through the left-right layer. Split into focused modules (traits.rs, left_right.rs, macros.rs) and introduce CommandEnvelope to smuggle typed return values out of Absorb's void interface. StateHandler and Command now carry an Output associated type. WriteCell gains a debug-mode reentrancy guard and LeftRight exposes with_state() for lock-free reads. --- Cargo.lock | 2 + core/metadata/Cargo.toml | 4 + core/metadata/src/stm/consumer_group.rs | 8 +- core/metadata/src/stm/left_right.rs | 146 +++++++++++++++ core/metadata/src/stm/{mod.rs => macros.rs} | 270 +++++++++++++--------------- core/metadata/src/stm/mod.rs | 252 +------------------------- core/metadata/src/stm/mux.rs | 100 +++++++++-- core/metadata/src/stm/stream.rs | 38 ++-- core/metadata/src/stm/traits.rs | 54 ++++++ core/metadata/src/stm/user.rs | 34 ++-- 10 files changed, 464 insertions(+), 444 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0d5781ce3..f9910a457 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5656,6 +5656,8 @@ name = "metadata" version = "0.1.0" dependencies = [ "ahash 0.8.12", + "bytemuck", + "bytes", "consensus", "iggy_common", "journal", diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml index 193bf5788..c5a7dc776 100644 --- a/core/metadata/Cargo.toml +++ b/core/metadata/Cargo.toml @@ -37,3 +37,7 @@ message_bus = { workspace = true } paste = { workspace = true } slab = { workspace = true } tracing = { workspace = true } + +[dev-dependencies] +bytemuck = { workspace = true } +bytes = { workspace = true } diff --git a/core/metadata/src/stm/consumer_group.rs b/core/metadata/src/stm/consumer_group.rs index 71babb412..12314ad2f 100644 --- a/core/metadata/src/stm/consumer_group.rs +++ b/core/metadata/src/stm/consumer_group.rs @@ -20,7 +20,7 @@ use crate::{collect_handlers, define_state}; use ahash::AHashMap; use iggy_common::create_consumer_group::CreateConsumerGroup; use iggy_common::delete_consumer_group::DeleteConsumerGroup; -use iggy_common::{IdKind, Identifier}; +use iggy_common::{IdKind, Identifier, IggyTimestamp}; use slab::Slab; use std::sync::Arc; use std::sync::atomic::AtomicUsize; @@ -165,7 +165,8 @@ impl ConsumerGroupsInner { impl StateHandler for CreateConsumerGroup { type State = ConsumerGroupsInner; - fn apply(&self, state: &mut ConsumerGroupsInner) { + type Output = (); + fn apply(&self, state: &mut ConsumerGroupsInner, _now: IggyTimestamp) -> Self::Output { let name: Arc<str> = Arc::from(self.name.as_str()); if state.name_index.contains_key(&name) { return; @@ -200,7 +201,8 @@ impl StateHandler for CreateConsumerGroup { impl StateHandler for DeleteConsumerGroup { type State = ConsumerGroupsInner; - fn apply(&self, state: &mut ConsumerGroupsInner) { + type Output = (); + fn apply(&self, state: &mut ConsumerGroupsInner, _now: IggyTimestamp) -> Self::Output { let Some(id) = state.resolve_consumer_group_id_by_identifiers( &self.stream_id, &self.topic_id, diff --git a/core/metadata/src/stm/left_right.rs b/core/metadata/src/stm/left_right.rs new file mode 100644 index 000000000..472558fce --- /dev/null +++ b/core/metadata/src/stm/left_right.rs @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::traits::Command; +use iggy_common::IggyTimestamp; +use left_right::*; +use std::cell::{Cell, UnsafeCell}; +use std::sync::Arc; + +/// Wraps a command with a raw pointer to a stack-allocated output slot. +/// +/// Used to smuggle return values out of `left_right`'s `Absorb` trait, +/// which returns void. The pointer targets a stack local in `WriteCell::apply()` +/// and is only written during the synchronous `publish()` call. +#[doc(hidden)] +pub struct CommandEnvelope<C, R> { + pub(crate) cmd: C, + pub(crate) output: *mut Option<R>, + pub(crate) now: IggyTimestamp, +} + +// Safety: The pointer targets a stack local in WriteCell::apply() and is only +// dereferenced during the synchronous append()+publish() sequence on the same +// thread. Single-writer architecture ensures no concurrent access. +unsafe impl<C: Send, R: Send> Send for CommandEnvelope<C, R> {} + +pub struct WriteCell<T, C, R> +where + T: Absorb<CommandEnvelope<C, R>>, +{ + inner: UnsafeCell<WriteHandle<T, CommandEnvelope<C, R>>>, + #[cfg(debug_assertions)] + in_apply: Cell<bool>, +} + +impl<T, C, R> std::fmt::Debug for WriteCell<T, C, R> +where + T: Absorb<CommandEnvelope<C, R>>, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("WriteCell").finish_non_exhaustive() + } +} + +impl<T, C, R> WriteCell<T, C, R> +where + T: Absorb<CommandEnvelope<C, R>>, +{ + pub fn new(write: WriteHandle<T, CommandEnvelope<C, R>>) -> Self { + Self { + inner: UnsafeCell::new(write), + #[cfg(debug_assertions)] + in_apply: Cell::new(false), + } + } + + pub fn apply(&self, cmd: C) -> R { + #[cfg(debug_assertions)] + { + assert!( + !self.in_apply.replace(true), + "WriteCell::apply() called reentrantly — this would alias &mut WriteHandle" + ); + } + + let mut output: Option<R> = None; + let envelope = CommandEnvelope { + cmd, + output: &mut output, + now: IggyTimestamp::now(), + }; + let hdl = unsafe { + self.inner + .get() + .as_mut() + .expect("[apply]: called on uninit writer") + }; + hdl.append(envelope).publish(); + + #[cfg(debug_assertions)] + self.in_apply.set(false); + + output.expect("[apply]: absorb did not produce output") + } +} + +#[derive(Debug)] +pub struct LeftRight<T, C, R> +where + T: Absorb<CommandEnvelope<C, R>>, +{ + write: Option<WriteCell<T, C, R>>, + read: Arc<ReadHandle<T>>, +} + +// TODO: Expose Arc<ReadHandle<T>> for cross-thread lock-free reads from non-owner shards. + +impl<T> From<T> for LeftRight<T, <T as Command>::Cmd, <T as Command>::Output> +where + T: Absorb<CommandEnvelope<<T as Command>::Cmd, <T as Command>::Output>> + Clone + Command, +{ + fn from(inner: T) -> Self { + let (write, read) = { + let (w, r) = left_right::new_from_empty(inner); + (WriteCell::new(w).into(), r.into()) + }; + Self { write, read } + } +} + +impl<T> LeftRight<T, <T as Command>::Cmd, <T as Command>::Output> +where + T: Absorb<CommandEnvelope<<T as Command>::Cmd, <T as Command>::Output>> + Clone + Command, +{ + pub fn do_apply(&self, cmd: <T as Command>::Cmd) -> <T as Command>::Output { + self.write + .as_ref() + .expect("no write handle - not the owner shard") + .apply(cmd) + } + + pub fn with_state<F, Ret>(&self, f: F) -> Ret + where + F: FnOnce(&T) -> Ret, + { + let guard = self + .read + .enter() + .expect("ReadHandle dropped - WriteHandle was deallocated"); + f(&guard) + } +} diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/macros.rs similarity index 52% copy from core/metadata/src/stm/mod.rs copy to core/metadata/src/stm/macros.rs index 08bef35dd..130cbdbe4 100644 --- a/core/metadata/src/stm/mod.rs +++ b/core/metadata/src/stm/macros.rs @@ -15,129 +15,14 @@ // specific language governing permissions and limitations // under the License. -pub mod consumer_group; -pub mod mux; -pub mod stream; -pub mod user; - -use left_right::*; -use std::cell::UnsafeCell; -use std::sync::Arc; - -pub struct WriteCell<T, O> -where - T: Absorb<O>, -{ - inner: UnsafeCell<WriteHandle<T, O>>, -} - -impl<T, O> std::fmt::Debug for WriteCell<T, O> -where - T: Absorb<O>, -{ - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("WriteCell").finish_non_exhaustive() - } -} - -impl<T, O> WriteCell<T, O> -where - T: Absorb<O>, -{ - pub fn new(write: WriteHandle<T, O>) -> Self { - Self { - inner: UnsafeCell::new(write), - } - } - - pub fn apply(&self, cmd: O) { - let hdl = unsafe { - self.inner - .get() - .as_mut() - .expect("[apply]: called on uninit writer, for cmd: {cmd}") - }; - hdl.append(cmd).publish(); - } -} - -/// Parses type-erased input into a command. Macro-generated. -/// Returns `Ok(cmd)` if applicable, `Err(input)` to pass ownership back. -pub trait Command { - type Cmd; - type Input; - - fn parse(input: Self::Input) -> Result<Self::Cmd, Self::Input>; -} - -/// Per-command handler for a given state type. -/// Each command struct implements this for the state it mutates. -pub trait StateHandler { - type State; - fn apply(&self, state: &mut Self::State); -} - -#[derive(Debug)] -pub struct LeftRight<T, C> -where - T: Absorb<C>, -{ - write: Option<WriteCell<T, C>>, - #[allow(unused)] - read: Arc<ReadHandle<T>>, -} - -impl<T> From<T> for LeftRight<T, <T as Command>::Cmd> -where - T: Absorb<<T as Command>::Cmd> + Clone + Command, -{ - fn from(inner: T) -> Self { - let (write, read) = { - let (w, r) = left_right::new_from_empty(inner); - (WriteCell::new(w).into(), r.into()) - }; - Self { write, read } - } -} - -impl<T> LeftRight<T, <T as Command>::Cmd> -where - T: Absorb<<T as Command>::Cmd> + Clone + Command, -{ - pub fn do_apply(&self, cmd: <T as Command>::Cmd) { - self.write - .as_ref() - .expect("no write handle - not the owner shard") - .apply(cmd); - } -} - -/// Public interface for state machines. -/// Returns `Ok(output)` if applicable, `Err(input)` to pass ownership back. -pub trait State { - type Output; - type Input; - - fn apply(&self, input: Self::Input) -> Result<Self::Output, Self::Input>; -} - -pub trait StateMachine { - type Input; - type Output; - fn update(&self, input: Self::Input) -> Self::Output; -} - -/// Generates the state's inner struct and wrapper type. +/// Generates the state's inner struct. /// /// # Generated items /// - `{$state}Inner` struct with the specified fields (the data) -/// - `$state` wrapper struct (contains LeftRight storage) -/// - `From<LeftRight<...>>` impl for `$state` -/// - `From<{$state}Inner>` impl for `$state` /// -/// The command enum, parsing, dispatch, and Absorb impl are generated -/// by `collect_handlers!` separately, keeping state definition decoupled -/// from the set of operations. +/// The wrapper struct, `From` impls, command enum, parsing, dispatch, and +/// Absorb impl are all generated by `collect_handlers!`, which knows the +/// output type. #[macro_export] macro_rules! define_state { ( @@ -158,47 +43,99 @@ macro_rules! define_state { Self::default() } } - - #[derive(Debug)] - pub struct $state { - inner: $crate::stm::LeftRight<[<$state Inner>], [<$state Command>]>, - } - - impl From<$crate::stm::LeftRight<[<$state Inner>], [<$state Command>]>> for $state { - fn from(inner: $crate::stm::LeftRight<[<$state Inner>], [<$state Command>]>) -> Self { - Self { inner } - } - } - - impl From<[<$state Inner>]> for $state { - fn from(inner: [<$state Inner>]) -> Self { - let left_right: $crate::stm::LeftRight<[<$state Inner>], [<$state Command>]> = inner.into(); - left_right.into() - } - } } }; } -/// Generates the command enum, parsing, dispatch, State, and Absorb for a state type. +/// Generates the command enum, wrapper struct, parsing, dispatch, State, and +/// envelope-aware Absorb for a state type. +/// +/// # Syntax +/// ```ignore +/// collect_handlers! { Streams { Op1, Op2 } } // Output = () +/// collect_handlers! { Streams -> OutputType { Op1 } } // Output = OutputType +/// ``` /// /// # Generated items +/// - `$state` wrapper struct (contains LeftRight storage) +/// - `From<LeftRight<...>>` impl for `$state` +/// - `From<{$state}Inner>` impl for `$state` /// - `{$state}Command` enum with one variant per operation /// - `Command` impl for `{$state}Inner` (parses `Message<PrepareHeader>`) /// - `{$state}Inner::dispatch()` method (routes each variant to `StateHandler::apply()`) /// - `State` impl for `$state` wrapper -/// - `Absorb<{$state}Command>` impl for `{$state}Inner` +/// - `Absorb<CommandEnvelope<{$state}Command, Output>>` impl for `{$state}Inner` /// /// # Requirements -/// Each listed operation type must implement `StateHandler<{$state}Inner>`. +/// Each listed operation type must implement `StateHandler<State = {$state}Inner, Output = $output>`. #[macro_export] macro_rules! collect_handlers { + // Default output: () ( $state:ident { $($operation:ident),* $(,)? } + ) => { + $crate::__collect_handlers_impl!($state -> () { $($operation),* }); + }; + // Explicit output type + ( + $state:ident -> $output:ty { + $($operation:ident),* $(,)? + } + ) => { + $crate::__collect_handlers_impl!($state -> $output { $($operation),* }); + }; +} + +#[doc(hidden)] +#[macro_export] +macro_rules! __collect_handlers_impl { + ( + $state:ident -> $output:ty { + $($operation:ident),* + } ) => { paste::paste! { + // --- wrapper struct + From impls (moved from define_state!) --- + + #[derive(Debug)] + pub struct $state { + inner: $crate::stm::LeftRight< + [<$state Inner>], + [<$state Command>], + $output, + >, + } + + impl From<$crate::stm::LeftRight<[<$state Inner>], [<$state Command>], $output>> for $state { + fn from(inner: $crate::stm::LeftRight<[<$state Inner>], [<$state Command>], $output>) -> Self { + Self { inner } + } + } + + impl $state { + pub fn with_state<F, Ret>(&self, f: F) -> Ret + where + F: FnOnce(&[<$state Inner>]) -> Ret, + { + self.inner.with_state(f) + } + } + + impl From<[<$state Inner>]> for $state { + fn from(inner: [<$state Inner>]) -> Self { + let left_right: $crate::stm::LeftRight< + [<$state Inner>], + [<$state Command>], + $output, + > = inner.into(); + left_right.into() + } + } + + // --- command enum --- + #[derive(Debug, Clone)] pub enum [<$state Command>] { $( @@ -206,9 +143,12 @@ macro_rules! collect_handlers { )* } + // --- Command trait --- + impl $crate::stm::Command for [<$state Inner>] { type Cmd = [<$state Command>]; type Input = ::iggy_common::message::Message<::iggy_common::header::PrepareHeader>; + type Output = $output; fn parse(input: Self::Input) -> Result<Self::Cmd, Self::Input> { use ::iggy_common::BytesSerializable; @@ -218,6 +158,7 @@ macro_rules! collect_handlers { $( Operation::$operation => { let body = input.body_bytes(); + // FIXME: return a parse error instead of panicking on malformed input Ok([<$state Command>]::$operation( $operation::from_bytes(body).unwrap() )) @@ -228,36 +169,65 @@ macro_rules! collect_handlers { } } + // --- dispatch --- + impl [<$state Inner>] { - fn dispatch(&mut self, cmd: &[<$state Command>]) { + fn dispatch(&mut self, cmd: &[<$state Command>], now: ::iggy_common::IggyTimestamp) -> $output { match cmd { $( [<$state Command>]::$operation(payload) => { - $crate::stm::StateHandler::apply(payload, self); + $crate::stm::StateHandler::apply(payload, self, now) }, )* } } } + // --- State trait --- + impl $crate::stm::State for $state { type Input = <[<$state Inner>] as $crate::stm::Command>::Input; - type Output = (); + type Output = $output; fn apply(&self, input: Self::Input) -> Result<Self::Output, Self::Input> { let cmd = <[<$state Inner>] as $crate::stm::Command>::parse(input)?; - self.inner.do_apply(cmd); - Ok(()) + Ok(self.inner.do_apply(cmd)) } } - impl left_right::Absorb<[<$state Command>]> for [<$state Inner>] { - fn absorb_first(&mut self, cmd: &mut [<$state Command>], _other: &Self) { - self.dispatch(cmd); + // --- Absorb<CommandEnvelope<...>> --- + + impl left_right::Absorb<$crate::stm::CommandEnvelope<[<$state Command>], $output>> + for [<$state Inner>] + { + fn absorb_first( + &mut self, + envelope: &mut $crate::stm::CommandEnvelope<[<$state Command>], $output>, + _other: &Self, + ) { + let result = self.dispatch(&envelope.cmd, envelope.now); + if !envelope.output.is_null() { + // Safety: pointer targets WriteCell::apply()'s stack local, + // still alive during synchronous publish(). + unsafe { *envelope.output = Some(result) }; + // Nullify so the stale copy in absorb_second won't write + // through a dangling pointer. + envelope.output = std::ptr::null_mut(); + } } - fn absorb_second(&mut self, cmd: [<$state Command>], _other: &Self) { - self.dispatch(&cmd); + fn absorb_second( + &mut self, + mut envelope: $crate::stm::CommandEnvelope<[<$state Command>], $output>, + _other: &Self, + ) { + let result = self.dispatch(&envelope.cmd, envelope.now); + if !envelope.output.is_null() { + // Safety: on the `extend` (first-publish) path, this is + // absorb_second and the pointer is still live. + unsafe { *envelope.output = Some(result) }; + envelope.output = std::ptr::null_mut(); + } } fn sync_with(&mut self, first: &Self) { diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs index 08bef35dd..8274cb40a 100644 --- a/core/metadata/src/stm/mod.rs +++ b/core/metadata/src/stm/mod.rs @@ -16,254 +16,12 @@ // under the License. pub mod consumer_group; +mod left_right; +mod macros; pub mod mux; pub mod stream; +mod traits; pub mod user; -use left_right::*; -use std::cell::UnsafeCell; -use std::sync::Arc; - -pub struct WriteCell<T, O> -where - T: Absorb<O>, -{ - inner: UnsafeCell<WriteHandle<T, O>>, -} - -impl<T, O> std::fmt::Debug for WriteCell<T, O> -where - T: Absorb<O>, -{ - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("WriteCell").finish_non_exhaustive() - } -} - -impl<T, O> WriteCell<T, O> -where - T: Absorb<O>, -{ - pub fn new(write: WriteHandle<T, O>) -> Self { - Self { - inner: UnsafeCell::new(write), - } - } - - pub fn apply(&self, cmd: O) { - let hdl = unsafe { - self.inner - .get() - .as_mut() - .expect("[apply]: called on uninit writer, for cmd: {cmd}") - }; - hdl.append(cmd).publish(); - } -} - -/// Parses type-erased input into a command. Macro-generated. -/// Returns `Ok(cmd)` if applicable, `Err(input)` to pass ownership back. -pub trait Command { - type Cmd; - type Input; - - fn parse(input: Self::Input) -> Result<Self::Cmd, Self::Input>; -} - -/// Per-command handler for a given state type. -/// Each command struct implements this for the state it mutates. -pub trait StateHandler { - type State; - fn apply(&self, state: &mut Self::State); -} - -#[derive(Debug)] -pub struct LeftRight<T, C> -where - T: Absorb<C>, -{ - write: Option<WriteCell<T, C>>, - #[allow(unused)] - read: Arc<ReadHandle<T>>, -} - -impl<T> From<T> for LeftRight<T, <T as Command>::Cmd> -where - T: Absorb<<T as Command>::Cmd> + Clone + Command, -{ - fn from(inner: T) -> Self { - let (write, read) = { - let (w, r) = left_right::new_from_empty(inner); - (WriteCell::new(w).into(), r.into()) - }; - Self { write, read } - } -} - -impl<T> LeftRight<T, <T as Command>::Cmd> -where - T: Absorb<<T as Command>::Cmd> + Clone + Command, -{ - pub fn do_apply(&self, cmd: <T as Command>::Cmd) { - self.write - .as_ref() - .expect("no write handle - not the owner shard") - .apply(cmd); - } -} - -/// Public interface for state machines. -/// Returns `Ok(output)` if applicable, `Err(input)` to pass ownership back. -pub trait State { - type Output; - type Input; - - fn apply(&self, input: Self::Input) -> Result<Self::Output, Self::Input>; -} - -pub trait StateMachine { - type Input; - type Output; - fn update(&self, input: Self::Input) -> Self::Output; -} - -/// Generates the state's inner struct and wrapper type. -/// -/// # Generated items -/// - `{$state}Inner` struct with the specified fields (the data) -/// - `$state` wrapper struct (contains LeftRight storage) -/// - `From<LeftRight<...>>` impl for `$state` -/// - `From<{$state}Inner>` impl for `$state` -/// -/// The command enum, parsing, dispatch, and Absorb impl are generated -/// by `collect_handlers!` separately, keeping state definition decoupled -/// from the set of operations. -#[macro_export] -macro_rules! define_state { - ( - $state:ident { - $($field_name:ident : $field_type:ty),* $(,)? - } - ) => { - paste::paste! { - #[derive(Debug, Clone, Default)] - pub struct [<$state Inner>] { - $( - pub $field_name: $field_type, - )* - } - - impl [<$state Inner>] { - pub fn new() -> Self { - Self::default() - } - } - - #[derive(Debug)] - pub struct $state { - inner: $crate::stm::LeftRight<[<$state Inner>], [<$state Command>]>, - } - - impl From<$crate::stm::LeftRight<[<$state Inner>], [<$state Command>]>> for $state { - fn from(inner: $crate::stm::LeftRight<[<$state Inner>], [<$state Command>]>) -> Self { - Self { inner } - } - } - - impl From<[<$state Inner>]> for $state { - fn from(inner: [<$state Inner>]) -> Self { - let left_right: $crate::stm::LeftRight<[<$state Inner>], [<$state Command>]> = inner.into(); - left_right.into() - } - } - } - }; -} - -/// Generates the command enum, parsing, dispatch, State, and Absorb for a state type. -/// -/// # Generated items -/// - `{$state}Command` enum with one variant per operation -/// - `Command` impl for `{$state}Inner` (parses `Message<PrepareHeader>`) -/// - `{$state}Inner::dispatch()` method (routes each variant to `StateHandler::apply()`) -/// - `State` impl for `$state` wrapper -/// - `Absorb<{$state}Command>` impl for `{$state}Inner` -/// -/// # Requirements -/// Each listed operation type must implement `StateHandler<{$state}Inner>`. -#[macro_export] -macro_rules! collect_handlers { - ( - $state:ident { - $($operation:ident),* $(,)? - } - ) => { - paste::paste! { - #[derive(Debug, Clone)] - pub enum [<$state Command>] { - $( - $operation($operation), - )* - } - - impl $crate::stm::Command for [<$state Inner>] { - type Cmd = [<$state Command>]; - type Input = ::iggy_common::message::Message<::iggy_common::header::PrepareHeader>; - - fn parse(input: Self::Input) -> Result<Self::Cmd, Self::Input> { - use ::iggy_common::BytesSerializable; - use ::iggy_common::header::Operation; - - match input.header().operation { - $( - Operation::$operation => { - let body = input.body_bytes(); - Ok([<$state Command>]::$operation( - $operation::from_bytes(body).unwrap() - )) - }, - )* - _ => Err(input), - } - } - } - - impl [<$state Inner>] { - fn dispatch(&mut self, cmd: &[<$state Command>]) { - match cmd { - $( - [<$state Command>]::$operation(payload) => { - $crate::stm::StateHandler::apply(payload, self); - }, - )* - } - } - } - - impl $crate::stm::State for $state { - type Input = <[<$state Inner>] as $crate::stm::Command>::Input; - type Output = (); - - fn apply(&self, input: Self::Input) -> Result<Self::Output, Self::Input> { - let cmd = <[<$state Inner>] as $crate::stm::Command>::parse(input)?; - self.inner.do_apply(cmd); - Ok(()) - } - } - - impl left_right::Absorb<[<$state Command>]> for [<$state Inner>] { - fn absorb_first(&mut self, cmd: &mut [<$state Command>], _other: &Self) { - self.dispatch(cmd); - } - - fn absorb_second(&mut self, cmd: [<$state Command>], _other: &Self) { - self.dispatch(&cmd); - } - - fn sync_with(&mut self, first: &Self) { - *self = first.clone(); - } - } - } - }; -} +pub use left_right::*; +pub use traits::*; diff --git a/core/metadata/src/stm/mux.rs b/core/metadata/src/stm/mux.rs index f68d99bf7..45097b105 100644 --- a/core/metadata/src/stm/mux.rs +++ b/core/metadata/src/stm/mux.rs @@ -61,16 +61,14 @@ macro_rules! variadic { ($a:expr, $( $b:tt )+) => ( ($a, variadic!( $( $b )* )) ); } -// TODO: Figure out how to get around the fact that we need to hardcode the Input/Output type for base case. -// TODO: I think we could move the base case to the impl site of `State`, so this way we know the `Input` and `Output` types. -// Base case of the recursive resolution. +// Base case: panics if no state machine in the chain accepted the input. impl StateMachine for () { type Input = Message<PrepareHeader>; - // TODO: Make sure that the `Output` matches to the output type of the rest of list. - // TODO: Add a trait bound to the output that will allow us to get the response in bytes. type Output = (); - fn update(&self, _input: Self::Input) -> Self::Output {} + fn update(&self, _input: Self::Input) -> Self::Output { + panic!("unhandled command: no state machine accepted the input"); + } } // Recursive case: process head and recurse on tail @@ -91,22 +89,96 @@ where } } +#[cfg(test)] mod tests { + use crate::stm::State; + use crate::stm::StateMachine; + use crate::stm::mux::MuxStateMachine; + use crate::stm::stream::{Streams, StreamsInner}; + use crate::stm::user::{Users, UsersInner}; + use bytes::{Bytes, BytesMut}; + use iggy_common::BytesSerializable; + use iggy_common::header::{Command2, Operation, PrepareHeader}; + use iggy_common::message::Message; + use std::mem; + + fn build_prepare_message(operation: Operation, body: Bytes) -> Message<PrepareHeader> { + let header_size = mem::size_of::<PrepareHeader>(); + let total_size = header_size + body.len(); + let mut buffer = BytesMut::zeroed(total_size); + + let header = bytemuck::from_bytes_mut::<PrepareHeader>(&mut buffer[..header_size]); + header.command = Command2::Prepare; + header.operation = operation; + header.size = total_size as u32; + + buffer[header_size..].copy_from_slice(&body); + Message::<PrepareHeader>::from_bytes(buffer.freeze()).unwrap() + } + + #[test] + fn create_stream_applies_and_is_readable() { + let streams: Streams = StreamsInner::new().into(); + + let cmd = iggy_common::create_stream::CreateStream { + name: "test".to_string(), + }; + let msg = build_prepare_message(Operation::CreateStream, cmd.to_bytes()); + + let result = streams.apply(msg); + assert!(result.is_ok()); + + streams.with_state(|inner| { + assert_eq!(inner.items.len(), 1); + assert_eq!(inner.index.len(), 1); + + let (id, stream) = inner.items.iter().next().unwrap(); + assert_eq!(stream.name.as_ref(), "test"); + assert_eq!(stream.id, id); + assert_eq!(inner.index.get("test"), Some(&id)); + }); + } #[test] - fn construct_mux_state_machine_from_states_with_same_output() { - use crate::stm::StateMachine; - use crate::stm::mux::MuxStateMachine; - use crate::stm::stream::{Streams, StreamsInner}; - use crate::stm::user::{Users, UsersInner}; - use iggy_common::header::PrepareHeader; - use iggy_common::message::Message; + fn create_user_applies_and_is_readable() { + let users: Users = UsersInner::new().into(); + let cmd = iggy_common::create_user::CreateUser { + username: "admin".to_string(), + password: "secret123".to_string(), + status: iggy_common::UserStatus::Active, + permissions: None, + }; + let msg = build_prepare_message(Operation::CreateUser, cmd.to_bytes()); + + let result = users.apply(msg); + assert!(result.is_ok()); + + users.with_state(|inner| { + assert_eq!(inner.items.len(), 1); + assert_eq!(inner.index.len(), 1); + + let (id, user) = inner.items.iter().next().unwrap(); + assert_eq!(user.username.as_ref(), "admin"); + assert_eq!(user.id, id as iggy_common::UserId); + assert_eq!(user.status, iggy_common::UserStatus::Active); + assert_eq!(inner.index.get("admin"), Some(&(id as iggy_common::UserId))); + assert!( + inner + .personal_access_tokens + .contains_key(&(id as iggy_common::UserId)) + ); + }); + } + + #[test] + #[should_panic(expected = "unhandled command: no state machine accepted the input")] + fn unhandled_operation_falls_through() { let users: Users = UsersInner::new().into(); let streams: Streams = StreamsInner::new().into(); let mux = MuxStateMachine::new(variadic!(users, streams)); - let input = Message::new(std::mem::size_of::<PrepareHeader>()); + let input = Message::new(mem::size_of::<PrepareHeader>()); mux.update(input); } diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs index 4f212cd20..f69f82199 100644 --- a/core/metadata/src/stm/stream.rs +++ b/core/metadata/src/stm/stream.rs @@ -232,7 +232,8 @@ impl StreamsInner { impl StateHandler for CreateStream { type State = StreamsInner; - fn apply(&self, state: &mut StreamsInner) { + type Output = (); + fn apply(&self, state: &mut StreamsInner, now: IggyTimestamp) -> Self::Output { let name_arc: Arc<str> = Arc::from(self.name.as_str()); if state.index.contains_key(&name_arc) { return; @@ -241,7 +242,7 @@ impl StateHandler for CreateStream { let stream = Stream { id: 0, name: name_arc.clone(), - created_at: iggy_common::IggyTimestamp::now(), + created_at: now, stats: Arc::new(StreamStats::default()), topics: Slab::new(), topic_index: AHashMap::default(), @@ -257,7 +258,8 @@ impl StateHandler for CreateStream { impl StateHandler for UpdateStream { type State = StreamsInner; - fn apply(&self, state: &mut StreamsInner) { + type Output = (); + fn apply(&self, state: &mut StreamsInner, _now: IggyTimestamp) -> Self::Output { let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else { return; }; @@ -280,7 +282,8 @@ impl StateHandler for UpdateStream { impl StateHandler for DeleteStream { type State = StreamsInner; - fn apply(&self, state: &mut StreamsInner) { + type Output = (); + fn apply(&self, state: &mut StreamsInner, _now: IggyTimestamp) -> Self::Output { let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else { return; }; @@ -296,7 +299,8 @@ impl StateHandler for DeleteStream { impl StateHandler for PurgeStream { type State = StreamsInner; - fn apply(&self, _state: &mut StreamsInner) { + type Output = (); + fn apply(&self, _state: &mut StreamsInner, _now: IggyTimestamp) -> Self::Output { // TODO todo!(); } @@ -304,7 +308,8 @@ impl StateHandler for PurgeStream { impl StateHandler for CreateTopic { type State = StreamsInner; - fn apply(&self, state: &mut StreamsInner) { + type Output = (); + fn apply(&self, state: &mut StreamsInner, now: IggyTimestamp) -> Self::Output { let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else { return; }; @@ -320,7 +325,7 @@ impl StateHandler for CreateTopic { let topic = Topic { id: 0, name: name_arc.clone(), - created_at: iggy_common::IggyTimestamp::now(), + created_at: now, replication_factor: self.replication_factor.unwrap_or(1), message_expiry: self.message_expiry, compression_algorithm: self.compression_algorithm, @@ -337,7 +342,7 @@ impl StateHandler for CreateTopic { for partition_id in 0..self.partitions_count as usize { let partition = Partition { id: partition_id, - created_at: iggy_common::IggyTimestamp::now(), + created_at: now, }; topic.partitions.push(partition); } @@ -349,7 +354,8 @@ impl StateHandler for CreateTopic { impl StateHandler for UpdateTopic { type State = StreamsInner; - fn apply(&self, state: &mut StreamsInner) { + type Output = (); + fn apply(&self, state: &mut StreamsInner, _now: IggyTimestamp) -> Self::Output { let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else { return; }; @@ -385,7 +391,8 @@ impl StateHandler for UpdateTopic { impl StateHandler for DeleteTopic { type State = StreamsInner; - fn apply(&self, state: &mut StreamsInner) { + type Output = (); + fn apply(&self, state: &mut StreamsInner, _now: IggyTimestamp) -> Self::Output { let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else { return; }; @@ -406,7 +413,8 @@ impl StateHandler for DeleteTopic { impl StateHandler for PurgeTopic { type State = StreamsInner; - fn apply(&self, _state: &mut StreamsInner) { + type Output = (); + fn apply(&self, _state: &mut StreamsInner, _now: IggyTimestamp) -> Self::Output { // TODO todo!(); } @@ -414,7 +422,8 @@ impl StateHandler for PurgeTopic { impl StateHandler for CreatePartitions { type State = StreamsInner; - fn apply(&self, state: &mut StreamsInner) { + type Output = (); + fn apply(&self, state: &mut StreamsInner, now: IggyTimestamp) -> Self::Output { let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else { return; }; @@ -434,7 +443,7 @@ impl StateHandler for CreatePartitions { let partition_id = current_partition_count + i; let partition = Partition { id: partition_id, - created_at: iggy_common::IggyTimestamp::now(), + created_at: now, }; topic.partitions.push(partition); } @@ -443,7 +452,8 @@ impl StateHandler for CreatePartitions { impl StateHandler for DeletePartitions { type State = StreamsInner; - fn apply(&self, state: &mut StreamsInner) { + type Output = (); + fn apply(&self, state: &mut StreamsInner, _now: IggyTimestamp) -> Self::Output { let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else { return; }; diff --git a/core/metadata/src/stm/traits.rs b/core/metadata/src/stm/traits.rs new file mode 100644 index 000000000..355420506 --- /dev/null +++ b/core/metadata/src/stm/traits.rs @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use iggy_common::IggyTimestamp; + +/// Per-command handler for a given state type. +/// Each command struct implements this for the state it mutates. +/// +/// `now` is captured once before the command enters `left_right`, ensuring +/// both copies see the same timestamp (deterministic replay). +pub trait StateHandler { + type State; + type Output; + fn apply(&self, state: &mut Self::State, now: IggyTimestamp) -> Self::Output; +} + +/// Parses type-erased input into a command. Macro-generated. +/// Returns `Ok(cmd)` if applicable, `Err(input)` to pass ownership back. +pub trait Command { + type Cmd; + type Input; + type Output; + + fn parse(input: Self::Input) -> Result<Self::Cmd, Self::Input>; +} + +/// Public interface for state machines. +/// Returns `Ok(output)` if applicable, `Err(input)` to pass ownership back. +pub trait State { + type Output; + type Input; + + fn apply(&self, input: Self::Input) -> Result<Self::Output, Self::Input>; +} + +pub trait StateMachine { + type Input; + type Output; + fn update(&self, input: Self::Input) -> Self::Output; +} diff --git a/core/metadata/src/stm/user.rs b/core/metadata/src/stm/user.rs index 88bc178ea..95cb195fa 100644 --- a/core/metadata/src/stm/user.rs +++ b/core/metadata/src/stm/user.rs @@ -119,7 +119,8 @@ impl UsersInner { impl StateHandler for CreateUser { type State = UsersInner; - fn apply(&self, state: &mut UsersInner) { + type Output = (); + fn apply(&self, state: &mut UsersInner, now: IggyTimestamp) -> Self::Output { let username_arc: Arc<str> = Arc::from(self.username.as_str()); if state.index.contains_key(&username_arc) { return; @@ -130,7 +131,7 @@ impl StateHandler for CreateUser { username: username_arc.clone(), password_hash: Arc::from(self.password.as_str()), status: self.status, - created_at: iggy_common::IggyTimestamp::now(), + created_at: now, permissions: self.permissions.as_ref().map(|p| Arc::new(p.clone())), }; @@ -148,7 +149,8 @@ impl StateHandler for CreateUser { impl StateHandler for UpdateUser { type State = UsersInner; - fn apply(&self, state: &mut UsersInner) { + type Output = (); + fn apply(&self, state: &mut UsersInner, _now: IggyTimestamp) -> Self::Output { let Some(user_id) = state.resolve_user_id(&self.user_id) else { return; }; @@ -178,7 +180,8 @@ impl StateHandler for UpdateUser { impl StateHandler for DeleteUser { type State = UsersInner; - fn apply(&self, state: &mut UsersInner) { + type Output = (); + fn apply(&self, state: &mut UsersInner, _now: IggyTimestamp) -> Self::Output { let Some(user_id) = state.resolve_user_id(&self.user_id) else { return; }; @@ -194,7 +197,8 @@ impl StateHandler for DeleteUser { impl StateHandler for ChangePassword { type State = UsersInner; - fn apply(&self, state: &mut UsersInner) { + type Output = (); + fn apply(&self, state: &mut UsersInner, _now: IggyTimestamp) -> Self::Output { let Some(user_id) = state.resolve_user_id(&self.user_id) else { return; }; @@ -207,7 +211,8 @@ impl StateHandler for ChangePassword { impl StateHandler for UpdatePermissions { type State = UsersInner; - fn apply(&self, state: &mut UsersInner) { + type Output = (); + fn apply(&self, state: &mut UsersInner, _now: IggyTimestamp) -> Self::Output { let Some(user_id) = state.resolve_user_id(&self.user_id) else { return; }; @@ -220,7 +225,8 @@ impl StateHandler for UpdatePermissions { impl StateHandler for CreatePersonalAccessToken { type State = UsersInner; - fn apply(&self, state: &mut UsersInner) { + type Output = (); + fn apply(&self, state: &mut UsersInner, now: IggyTimestamp) -> Self::Output { // TODO: Stub until protocol gets adjusted. let user_id = 0; let user_tokens = state.personal_access_tokens.entry(user_id).or_default(); @@ -229,26 +235,22 @@ impl StateHandler for CreatePersonalAccessToken { return; } - let expiry_at = PersonalAccessToken::calculate_expiry_at(IggyTimestamp::now(), self.expiry); + let expiry_at = PersonalAccessToken::calculate_expiry_at(now, self.expiry); if let Some(expiry_at) = expiry_at - && expiry_at.as_micros() <= IggyTimestamp::now().as_micros() + && expiry_at.as_micros() <= now.as_micros() { return; } - let (pat, _) = PersonalAccessToken::new( - user_id, - self.name.as_ref(), - IggyTimestamp::now(), - self.expiry, - ); + let (pat, _) = PersonalAccessToken::new(user_id, self.name.as_ref(), now, self.expiry); user_tokens.insert(name_arc, pat); } } impl StateHandler for DeletePersonalAccessToken { type State = UsersInner; - fn apply(&self, state: &mut UsersInner) { + type Output = (); + fn apply(&self, state: &mut UsersInner, _now: IggyTimestamp) -> Self::Output { // TODO: Stub until protocol gets adjusted. let user_id = 0;
