This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch experiment in repository https://gitbox.apache.org/repos/asf/iggy.git
commit c4921a300e8b04a777b4668a4108ed12562a57a2 Author: numinex <[email protected]> AuthorDate: Thu Jan 15 11:03:25 2026 +0100 continue --- core/metadata/src/stm/mod.rs | 66 +++++++++++++---------------------------- core/metadata/src/stm/stream.rs | 9 ++---- core/metadata/src/stm/user.rs | 9 ++---- 3 files changed, 27 insertions(+), 57 deletions(-) diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs index 5baeeb421..aba963ddf 100644 --- a/core/metadata/src/stm/mod.rs +++ b/core/metadata/src/stm/mod.rs @@ -37,20 +37,15 @@ impl<T: left_right::Absorb<O>, O> WriteCell<T, O> { } } - pub fn with<F, R>(&self, f: F) -> R - where - F: FnOnce(&mut left_right::WriteHandle<T, O>) -> R, - { + pub fn apply(&self, cmd: O) { // SAFETY: WriteCell is !Sync (via UnsafeCell), so only one thread can access. // The caller ensures exclusive access through the &self borrow. - unsafe { f(&mut *self.inner.get()) } + unsafe { + (*self.inner.get()).append(cmd).publish(); + } } } -// ============================================================================ -// Traits - All use &self, implementations use interior mutability -// ============================================================================ - /// Parses input into a command. pub trait Command { type Cmd; @@ -59,27 +54,20 @@ pub trait Command { fn into_command(input: &Self::Input) -> Option<Self::Cmd>; } -/// Dispatches a command to mutate state. -/// Takes `&mut self` - implementations use interior mutability. -pub trait Dispatch { - type Cmd; - type Output; - - fn dispatch(&self, cmd: &Self::Cmd) -> Self::Output; +/// Handles a command to mutate state. +pub trait Handle: Command { + fn handle(&mut self, cmd: &<Self as Command>::Cmd); } /// Applies a command through a state wrapper. -/// Takes `&self` - implementations use interior mutability. pub trait ApplyState { - type Cmd; + type Inner: Command; type Output; - type Inner: Command<Cmd = Self::Cmd>; - fn do_apply(&self, cmd: Self::Cmd) -> Option<Self::Output>; + fn do_apply(&self, cmd: <Self::Inner as Command>::Cmd) -> Self::Output; } /// Public interface for state machines. -/// Takes `&self` - implementations use interior mutability. pub trait State { type Output; type Input; @@ -87,16 +75,12 @@ pub trait State { fn apply(&self, input: &Self::Input) -> Option<Self::Output>; } -impl<T> State for T -where - T: ApplyState, -{ +impl<T: ApplyState> State for T { type Output = T::Output; type Input = <T::Inner as Command>::Input; fn apply(&self, input: &Self::Input) -> Option<Self::Output> { - let cmd = T::Inner::into_command(input)?; - self.do_apply(cmd) + T::Inner::into_command(input).map(|cmd| self.do_apply(cmd)) } } @@ -168,15 +152,12 @@ macro_rules! define_state { impl From<$inner> for $state { fn from(inner: $inner) -> Self { - let (write, read) = ::left_right::new_from_empty(inner); - let write = Some($crate::stm::WriteCell::new(write)); - let read = ::std::sync::Arc::new(read); + let (write, read) = { let (w, r) = ::left_right::new_from_empty(inner); (Some($crate::stm::WriteCell::new(w)), ::std::sync::Arc::new(r)) }; Self { write, read } } } impl From<::std::sync::Arc<::left_right::ReadHandle<$inner>>> for $state { - /// Create a read-only instance from a read handle. fn from(read: ::std::sync::Arc<::left_right::ReadHandle<$inner>>) -> Self { Self { write: None, read } } @@ -218,16 +199,16 @@ macro_rules! define_state { impl ::left_right::Absorb<$command> for $inner where - $inner: $crate::stm::Dispatch<Cmd = $command>, + $inner: $crate::stm::Handle, { fn absorb_first(&mut self, cmd: &mut $command, _other: &Self) { - use $crate::stm::Dispatch; - self.dispatch(cmd); + use $crate::stm::Handle; + self.handle(cmd); } fn absorb_second(&mut self, cmd: $command, _other: &Self) { - use $crate::stm::Dispatch; - self.dispatch(&cmd); + use $crate::stm::Handle; + self.handle(&cmd); } fn sync_with(&mut self, first: &Self) { @@ -241,18 +222,13 @@ macro_rules! define_state { impl $crate::stm::ApplyState for $state where - $inner: $crate::stm::Dispatch<Cmd = $command>, + $inner: $crate::stm::Handle, { - type Cmd = $command; - type Output = (); type Inner = $inner; + type Output = (); - fn do_apply(&self, cmd: Self::Cmd) -> Option<Self::Output> { - let write_cell = self.write.as_ref()?; - write_cell.with(|write| { - write.append(cmd).publish(); - }); - Some(()) + fn do_apply(&self, cmd: $command) -> Self::Output { + self.write.as_ref().expect("[do_apply]: no write handle").apply(cmd); } } }; diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs index 844a30565..2ac10843b 100644 --- a/core/metadata/src/stm/stream.rs +++ b/core/metadata/src/stm/stream.rs @@ -17,7 +17,7 @@ use crate::define_state; use crate::stats::{StreamStats, TopicStats}; -use crate::stm::Dispatch; +use crate::stm::Handle; use ahash::AHashMap; use iggy_common::create_stream::CreateStream; use iggy_common::delete_stream::DeleteStream; @@ -259,11 +259,8 @@ define_state! { [CreateStream, UpdateStream, DeleteStream, PurgeStream] } -impl Dispatch for StreamsInner { - type Cmd = StreamsCommand; - type Output = (); - - fn dispatch(&self, cmd: &Self::Cmd) -> Self::Output { +impl Handle for StreamsInner { + fn handle(&mut self, cmd: &StreamsCommand) { match cmd { StreamsCommand::CreateStream(_payload) => { // Actual mutation logic will be implemented later diff --git a/core/metadata/src/stm/user.rs b/core/metadata/src/stm/user.rs index 043ec0bd2..61cd0ddd6 100644 --- a/core/metadata/src/stm/user.rs +++ b/core/metadata/src/stm/user.rs @@ -17,7 +17,7 @@ use crate::define_state; use crate::permissioner::Permissioner; -use crate::stm::Dispatch; +use crate::stm::Handle; use ahash::AHashMap; use iggy_common::change_password::ChangePassword; use iggy_common::create_user::CreateUser; @@ -77,11 +77,8 @@ define_state! { [CreateUser, UpdateUser, DeleteUser, ChangePassword, UpdatePermissions] } -impl Dispatch for UsersInner { - type Cmd = UsersCommand; - type Output = (); - - fn dispatch(&self, cmd: &Self::Cmd) -> Self::Output { +impl Handle for UsersInner { + fn handle(&mut self, cmd: &UsersCommand) { match cmd { UsersCommand::CreateUser(_payload) => { // Actual mutation logic will be implemented later
