This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch experiment in repository https://gitbox.apache.org/repos/asf/iggy.git
commit f944a2e1e69880f615eb960afe704ff8369d44a5 Author: numinex <[email protected]> AuthorDate: Fri Jan 16 15:58:47 2026 +0100 refactor --- core/metadata/src/stm/mod.rs | 141 ++++++++++++++++++++++++++----------------- 1 file changed, 84 insertions(+), 57 deletions(-) diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs index a2d5a3c79..9f12cc071 100644 --- a/core/metadata/src/stm/mod.rs +++ b/core/metadata/src/stm/mod.rs @@ -20,56 +20,42 @@ pub mod mux; pub mod stream; pub mod user; -use std::{cell::UnsafeCell, default}; - -use iggy_common::Stream; - -use crate::stm::{ - stream::{Streams, StreamsInner}, - user::{Users, UsersInner}, -}; - -// ============================================================================ -// WriteCell - Interior mutability wrapper for WriteHandle -// ============================================================================ +use left_right::*; +use std::cell::UnsafeCell; +use std::sync::Arc; pub struct WriteCell<T, O> where - T: left_right::Absorb<O>, + T: Absorb<O>, { - inner: UnsafeCell<left_right::WriteHandle<T, O>>, + inner: UnsafeCell<WriteHandle<T, O>>, } impl<T, O> WriteCell<T, O> where - T: left_right::Absorb<O>, + T: Absorb<O>, { - pub fn new(write: left_right::WriteHandle<T, O>) -> Self { + pub fn new(write: WriteHandle<T, O>) -> Self { Self { inner: UnsafeCell::new(write), } } pub fn apply(&self, cmd: O) { - // SAFETY: This method is called from the `Inner` struct of the `State` wrapper, we cover it beind an `Option` - // where only one shard owns the `Some` type, thus all of the accesses are single-threaded. - unsafe { - (*self.inner.get()).append(cmd).publish(); - } + let hdl = unsafe { + self.inner + .get() + .as_mut() + .expect("[apply]: called on uninit writer, for cmd: {cmd}") + }; + hdl.append(cmd).publish(); } } -impl<T, O> std::fmt::Debug for WriteCell<T, O> -where - T: left_right::Absorb<O> + std::fmt::Debug, -{ - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - // SAFETY: Only reading for debug purposes - unsafe { (*self.inner.get()).fmt(f) } - } +pub trait Container { + type Inner; } -/// Parses input into a command. pub trait Command { type Cmd; type Input; @@ -77,36 +63,86 @@ pub trait Command { fn into_command(input: &Self::Input) -> Option<Self::Cmd>; } -/// Handles a command to mutate state. pub trait Handle: Command { - fn handle(&mut self, cmd: &<Self as Command>::Cmd); + fn handle(&mut self, cmd: &Self::Cmd); } -/// Applies a command through a state wrapper. -pub trait ApplyState { - type Inner: Command; +pub trait ApplyState: Container { type Output; - fn do_apply(&self, cmd: <Self::Inner as Command>::Cmd) -> Self::Output; + fn apply_cmd(&self, cmd: <<Self as Container>::Inner as Command>::Cmd) -> Self::Output + where + <Self as Container>::Inner: Command; +} + +pub trait Factory<T> { + type Constructable; + + fn create(&self, inner: impl FnOnce() -> T) -> Self::Constructable; } -// TODO: Move the factory shiet out of there. -fn create_factory<S, F>() -> F +pub struct LeftRight<T, C> where - S: ApplyState, - F: Factory<Constructable<S::Inner> = S> + Default, + T: Absorb<C>, { - F::default() + write: Option<WriteCell<T, C>>, + read: Arc<ReadHandle<T>>, } -pub trait Factory { - type Constructable<T>; +impl<T> From<T> for LeftRight<T, <T as Command>::Cmd> +where + T: Absorb<<T as Command>::Cmd> + Clone + Command, +{ + fn from(inner: T) -> Self { + let (write, read) = { + let (w, r) = left_right::new_from_empty(inner); + (WriteCell::new(w).into(), r.into()) + }; + Self { write, read } + } +} - fn finish<T>(&self, inner: impl FnOnce() -> T) -> Self::Constructable<T> - where - T: Into<Self::Constructable<T>>, - { - inner().into() +impl<T, C> LeftRight<T, C> +where + T: Absorb<C>, +{ + pub fn read(&self) -> Arc<ReadHandle<T>> { + self.read.clone() + } +} + +impl<T, C> Container for LeftRight<T, C> +where + T: Absorb<C>, +{ + type Inner = T; +} + +impl<T, C> ApplyState for LeftRight<T, C> +where + T: Command<Cmd = C> + Absorb<C>, +{ + type Output = (); + + fn apply_cmd(&self, cmd: C) -> Self::Output { + self.write + .as_ref() + .expect("no write handle - not the owner shard") + .apply(cmd); + } +} + +#[derive(Default)] +pub struct LeftRightFactory; + +impl<T> Factory<T> for LeftRightFactory +where + T: Absorb<<T as Command>::Cmd> + Clone + Command, +{ + type Constructable = LeftRight<T, <T as Command>::Cmd>; + + fn create(&self, inner: impl FnOnce() -> T) -> Self::Constructable { + LeftRight::new(inner()) } } @@ -118,15 +154,6 @@ pub trait State { fn apply(&self, input: &Self::Input) -> Option<Self::Output>; } -impl<T: ApplyState> State for T { - type Output = T::Output; - type Input = <T::Inner as Command>::Input; - - fn apply(&self, input: &Self::Input) -> Option<Self::Output> { - T::Inner::into_command(input).map(|cmd| self.do_apply(cmd)) - } -} - pub trait StateMachine { type Input; type Output;
