This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch experiment in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 42bc498dc1f28fe802b97344375ef44bf1140ec5 Author: numinex <[email protected]> AuthorDate: Fri Jan 16 22:05:37 2026 +0100 refactor v2 --- core/metadata/src/stm/mod.rs | 191 ++++++++++++++++++++++------------------ core/metadata/src/stm/stream.rs | 20 +++-- core/metadata/src/stm/user.rs | 8 +- 3 files changed, 123 insertions(+), 96 deletions(-) diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs index 9f12cc071..518582bbd 100644 --- a/core/metadata/src/stm/mod.rs +++ b/core/metadata/src/stm/mod.rs @@ -1,4 +1,3 @@ -// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file @@ -52,33 +51,25 @@ where } } -pub trait Container { - type Inner; -} - +/// Parses type-erased input into a command. Macro-generated. pub trait Command { type Cmd; type Input; - fn into_command(input: &Self::Input) -> Option<Self::Cmd>; + fn parse(input: &Self::Input) -> Option<Self::Cmd>; } -pub trait Handle: Command { +/// Handles commands. User-implemented business logic. +pub trait Handler: Command { fn handle(&mut self, cmd: &Self::Cmd); } -pub trait ApplyState: Container { +/// Storage abstraction: applies commands to inner state. +pub trait ApplyState { + type Inner: Handler; type Output; - fn apply_cmd(&self, cmd: <<Self as Container>::Inner as Command>::Cmd) -> Self::Output - where - <Self as Container>::Inner: Command; -} - -pub trait Factory<T> { - type Constructable; - - fn create(&self, inner: impl FnOnce() -> T) -> Self::Constructable; + fn do_apply(&self, cmd: <Self::Inner as Command>::Cmd) -> Self::Output; } pub struct LeftRight<T, C> @@ -102,29 +93,14 @@ where } } -impl<T, C> LeftRight<T, C> +impl<T> ApplyState for LeftRight<T, <T as Command>::Cmd> where - T: Absorb<C>, -{ - pub fn read(&self) -> Arc<ReadHandle<T>> { - self.read.clone() - } -} - -impl<T, C> Container for LeftRight<T, C> -where - T: Absorb<C>, + T: Absorb<<T as Command>::Cmd> + Clone + Handler, { type Inner = T; -} - -impl<T, C> ApplyState for LeftRight<T, C> -where - T: Command<Cmd = C> + Absorb<C>, -{ type Output = (); - fn apply_cmd(&self, cmd: C) -> Self::Output { + fn do_apply(&self, cmd: <Self::Inner as Command>::Cmd) -> Self::Output { self.write .as_ref() .expect("no write handle - not the owner shard") @@ -132,20 +108,6 @@ where } } -#[derive(Default)] -pub struct LeftRightFactory; - -impl<T> Factory<T> for LeftRightFactory -where - T: Absorb<<T as Command>::Cmd> + Clone + Command, -{ - type Constructable = LeftRight<T, <T as Command>::Cmd>; - - fn create(&self, inner: impl FnOnce() -> T) -> Self::Constructable { - LeftRight::new(inner()) - } -} - /// Public interface for state machines. pub trait State { type Output; @@ -160,6 +122,42 @@ pub trait StateMachine { fn update(&self, input: &Self::Input, output: &mut Vec<Self::Output>); } +/// Generates a state machine with pluggable storage. +/// +/// # Generated items +/// - `$inner` struct with the specified fields (the data) +/// - `$command` enum with variants for each operation +/// - `$state<S: ApplyState<Inner = $inner>>` wrapper struct (storage-agnostic) +/// - `Command` impl for `$inner` (parsing) +/// - `Absorb` impl for `$inner` (delegates to `Handler::handle`) +/// - `State` impl for `$state<S>` +/// - `From<S>` impl for `$state<S>` +/// +/// # User must implement +/// - `Handler` for `$inner` (business logic) +/// +/// # Example +/// ```ignore +/// define_state! { +/// Streams, +/// StreamsInner { +/// index: AHashMap<String, usize>, +/// items: Slab<Stream>, +/// }, +/// StreamsCommand, +/// [CreateStream, UpdateStream, DeleteStream] +/// } +/// +/// // User implements Handler manually: +/// impl Handler for StreamsInner { +/// fn handle(&mut self, cmd: &StreamsCommand) { +/// match cmd { +/// StreamsCommand::CreateStream(payload) => { /* ... */ } +/// // ... +/// } +/// } +/// } +/// ``` #[macro_export] macro_rules! define_state { ( @@ -170,7 +168,7 @@ macro_rules! define_state { $command:ident, [$($operation:ident),* $(,)?] ) => { - #[derive(Debug, Clone)] + #[derive(Debug, Clone, Default)] pub struct $inner { $( pub $field_name: $field_type, @@ -184,31 +182,26 @@ macro_rules! define_state { )* } - /// State wrapper with interior mutability for write access. - pub struct $state { - write: Option<$crate::stm::WriteCell<$inner, $command>>, - read: ::std::sync::Arc<::left_right::ReadHandle<$inner>>, + pub struct $state<S: $crate::stm::ApplyState<Inner = $inner>> { + inner: S, } - impl From<$inner> for $state { - fn from(inner: $inner) -> Self { - let (write, read) = { let (w, r) = ::left_right::new_from_empty(inner); (Some($crate::stm::WriteCell::new(w)), ::std::sync::Arc::new(r)) }; - Self { write, read } + impl<S: $crate::stm::ApplyState<Inner = $inner>> From<S> for $state<S> { + fn from(storage: S) -> Self { + Self { inner: storage } } } - impl From<::std::sync::Arc<::left_right::ReadHandle<$inner>>> for $state { - fn from(read: ::std::sync::Arc<::left_right::ReadHandle<$inner>>) -> Self { - Self { write: None, read } - } - } + impl<S> $crate::stm::State for $state<S> + where + S: $crate::stm::ApplyState<Inner = $inner>, + { + type Input = <$inner as $crate::stm::Command>::Input; + type Output = S::Output; - impl ::std::fmt::Debug for $state { - fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { - f.debug_struct(stringify!($state)) - .field("write", &self.write) - .field("inner", &self.read.enter()) - .finish() + fn apply(&self, input: &Self::Input) -> Option<Self::Output> { + <$inner as $crate::stm::Command>::parse(input) + .map(|cmd| self.inner.do_apply(cmd)) } } @@ -216,7 +209,7 @@ macro_rules! define_state { type Cmd = $command; type Input = ::iggy_common::message::Message<::iggy_common::header::PrepareHeader>; - fn into_command(input: &Self::Input) -> Option<Self::Cmd> { + fn parse(input: &Self::Input) -> Option<Self::Cmd> { use ::iggy_common::BytesSerializable; use ::iggy_common::header::Operation; @@ -234,18 +227,17 @@ macro_rules! define_state { } } + /* impl ::left_right::Absorb<$command> for $inner where - $inner: $crate::stm::Handle, + $inner: $crate::stm::Handler, { fn absorb_first(&mut self, cmd: &mut $command, _other: &Self) { - use $crate::stm::Handle; - self.handle(cmd); + <Self as $crate::stm::Handler>::handle(self, cmd); } fn absorb_second(&mut self, cmd: $command, _other: &Self) { - use $crate::stm::Handle; - self.handle(&cmd); + <Self as $crate::stm::Handler>::handle(self, &cmd); } fn sync_with(&mut self, first: &Self) { @@ -253,20 +245,51 @@ macro_rules! define_state { } fn drop_first(self: Box<Self>) {} - fn drop_second(self: Box<Self>) {} } + */ + }; +} - impl $crate::stm::ApplyState for $state - where - $inner: $crate::stm::Handle, - { - type Inner = $inner; - type Output = (); +// This macro is really sad, but we can't do blanket impl from below, due to orphan rule. +// impl<T> Absorb<T::Cmd> for T +// where +// T: Handler + Clone, +// { +// fn absorb_first(&mut self, cmd: &mut T::Cmd, _other: &Self) { +// self.handle(cmd); + +// } - fn do_apply(&self, cmd: $command) -> Self::Output { - self.write.as_ref().expect("[do_apply]: no write handle, not handled on shard0").apply(cmd); +// fn absorb_second(&mut self, cmd: T::Cmd, _other: &Self) { +// self.handle(&cmd); +// } + +// fn sync_with(&mut self, first: &Self) { +// *self = first.clone(); +// } + +// fn drop_first(self: Box<Self>) {} +// fn drop_second(self: Box<Self>) {} +// } +#[macro_export] +macro_rules! impl_absorb { + ($inner:ident, $cmd:ident) => { + impl left_right::Absorb<$cmd> for $inner { + fn absorb_first(&mut self, cmd: &mut $cmd, _other: &Self) { + self.handle(cmd); + } + + fn absorb_second(&mut self, cmd: $cmd, _other: &Self) { + self.handle(&cmd); + } + + fn sync_with(&mut self, first: &Self) { + *self = first.clone(); } } }; } + +/* +*/ diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs index 2ac10843b..f301b511a 100644 --- a/core/metadata/src/stm/stream.rs +++ b/core/metadata/src/stm/stream.rs @@ -15,15 +15,16 @@ // specific language governing permissions and limitations // under the License. -use crate::define_state; use crate::stats::{StreamStats, TopicStats}; -use crate::stm::Handle; +use crate::stm::{Handler, LeftRight}; +use crate::{define_state, impl_absorb}; use ahash::AHashMap; use iggy_common::create_stream::CreateStream; use iggy_common::delete_stream::DeleteStream; use iggy_common::purge_stream::PurgeStream; use iggy_common::update_stream::UpdateStream; use iggy_common::{CompressionAlgorithm, IggyExpiry, IggyTimestamp, MaxTopicSize}; +use left_right::Absorb; use slab::Slab; use std::sync::Arc; @@ -245,9 +246,15 @@ impl Stream { } } -// ============================================================================ -// Streams State Machine -// ============================================================================ +fn foo() { + let streams_inner = StreamsInner { + index: AHashMap::new(), + items: Slab::new(), + }; + + let streams: LeftRight<StreamsInner, StreamsCommand> = streams_inner.into(); + let streams_2: Streams<LeftRight<StreamsInner, StreamsCommand>> = streams.into(); +} define_state! { Streams, @@ -258,8 +265,9 @@ define_state! { StreamsCommand, [CreateStream, UpdateStream, DeleteStream, PurgeStream] } +impl_absorb!(StreamsInner, StreamsCommand); -impl Handle for StreamsInner { +impl Handler for StreamsInner { fn handle(&mut self, cmd: &StreamsCommand) { match cmd { StreamsCommand::CreateStream(_payload) => { diff --git a/core/metadata/src/stm/user.rs b/core/metadata/src/stm/user.rs index 61cd0ddd6..64285b22d 100644 --- a/core/metadata/src/stm/user.rs +++ b/core/metadata/src/stm/user.rs @@ -17,7 +17,7 @@ use crate::define_state; use crate::permissioner::Permissioner; -use crate::stm::Handle; +use crate::stm::{Handle, Handler}; use ahash::AHashMap; use iggy_common::change_password::ChangePassword; use iggy_common::create_user::CreateUser; @@ -62,10 +62,6 @@ impl User { } } -// ============================================================================ -// Users State Machine -// ============================================================================ - define_state! { Users, UsersInner { @@ -77,7 +73,7 @@ define_state! { [CreateUser, UpdateUser, DeleteUser, ChangePassword, UpdatePermissions] } -impl Handle for UsersInner { +impl Handler for UsersInner { fn handle(&mut self, cmd: &UsersCommand) { match cmd { UsersCommand::CreateUser(_payload) => {
