This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch experiment in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 56b91820615e3447f75586e0834267f7948a8aaa Author: numinex <[email protected]> AuthorDate: Thu Jan 15 22:05:31 2026 +0100 abstract factory --- core/metadata/src/stm/mod.rs | 102 ++++++++++++++++++++++++------------------- 1 file changed, 57 insertions(+), 45 deletions(-) diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs index aba963ddf..3786224c1 100644 --- a/core/metadata/src/stm/mod.rs +++ b/core/metadata/src/stm/mod.rs @@ -26,11 +26,17 @@ use std::cell::UnsafeCell; // WriteCell - Interior mutability wrapper for WriteHandle // ============================================================================ -pub struct WriteCell<T: left_right::Absorb<O>, O> { +pub struct WriteCell<T, O> +where + T: left_right::Absorb<O>, +{ inner: UnsafeCell<left_right::WriteHandle<T, O>>, } -impl<T: left_right::Absorb<O>, O> WriteCell<T, O> { +impl<T, O> WriteCell<T, O> +where + T: left_right::Absorb<O>, +{ pub fn new(write: left_right::WriteHandle<T, O>) -> Self { Self { inner: UnsafeCell::new(write), @@ -38,14 +44,24 @@ impl<T: left_right::Absorb<O>, O> WriteCell<T, O> { } pub fn apply(&self, cmd: O) { - // SAFETY: WriteCell is !Sync (via UnsafeCell), so only one thread can access. - // The caller ensures exclusive access through the &self borrow. + // SAFETY: This method is called from the `Inner` struct of the `State` wrapper, we cover it beind an `Option` + // where only one shard owns the `Some` type, thus all of the accesses are single-threaded. unsafe { (*self.inner.get()).append(cmd).publish(); } } } +impl<T, O> std::fmt::Debug for WriteCell<T, O> +where + T: left_right::Absorb<O> + std::fmt::Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // SAFETY: Only reading for debug purposes + unsafe { (*self.inner.get()).fmt(f) } + } +} + /// Parses input into a command. pub trait Command { type Cmd; @@ -67,6 +83,35 @@ pub trait ApplyState { fn do_apply(&self, cmd: <Self::Inner as Command>::Cmd) -> Self::Output; } +// TODO: Move the factory shiet to different crate. +pub trait AbstractFactory<T> { + type Inner; + + fn factory<F>(&self) -> F + where + F: Factory<Constructable<Self::Inner> = T> + Default, + { + F::default() + } +} + +pub struct StateFactory<S> { + _marker: std::marker::PhantomData<S>, +} + +impl<S> AbstractFactory<S> for StateFactory<S> +where + S: ApplyState, +{ + type Inner = S::Inner; +} + +pub trait Factory { + type Constructable<T>; + + fn finish<T>(&self, inner: impl FnOnce() -> T) -> Self::Constructable<T>; +} + /// Public interface for state machines. pub trait State { type Output; @@ -84,7 +129,7 @@ impl<T: ApplyState> State for T { } } -pub(crate) trait StateMachine { +pub trait StateMachine { type Input; type Output; fn update(&self, input: &Self::Input, output: &mut Vec<Self::Output>); @@ -100,56 +145,26 @@ macro_rules! define_state { $command:ident, [$($operation:ident),* $(,)?] ) => { - #[derive(Debug, Clone, Default)] + #[derive(Debug, Clone)] pub struct $inner { $( pub $field_name: $field_type, )* } - #[derive(Debug)] + #[derive(Debug, Clone)] pub enum $command { $( $operation($operation), )* } - impl Clone for $command - where - $($operation: Clone,)* - { - fn clone(&self) -> Self { - match self { - $( - $command::$operation(payload) => $command::$operation(payload.clone()), - )* - } - } - } - /// State wrapper with interior mutability for write access. pub struct $state { write: Option<$crate::stm::WriteCell<$inner, $command>>, read: ::std::sync::Arc<::left_right::ReadHandle<$inner>>, } - impl $state { - /// Get a clone of the read handle. - pub fn read_handle(&self) -> ::std::sync::Arc<::left_right::ReadHandle<$inner>> { - self.read.clone() - } - - /// Get read access to the inner state. - pub fn read(&self) -> Option<::left_right::ReadGuard<'_, $inner>> { - self.read.enter() - } - - /// Check if this instance has write capability. - pub fn has_write(&self) -> bool { - self.write.is_some() - } - } - impl From<$inner> for $state { fn from(inner: $inner) -> Self { let (write, read) = { let (w, r) = ::left_right::new_from_empty(inner); (Some($crate::stm::WriteCell::new(w)), ::std::sync::Arc::new(r)) }; @@ -165,13 +180,10 @@ macro_rules! define_state { impl ::std::fmt::Debug for $state { fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { - match self.read.enter() { - Some(guard) => f.debug_struct(stringify!($state)) - .field("has_write", &self.write.is_some()) - .field("inner", &*guard) - .finish(), - None => f.debug_struct(stringify!($state)).finish_non_exhaustive(), - } + f.debug_struct(stringify!($state)) + .field("write", &self.write) + .field("inner", &self.read.enter()) + .finish() } } @@ -228,7 +240,7 @@ macro_rules! define_state { type Output = (); fn do_apply(&self, cmd: $command) -> Self::Output { - self.write.as_ref().expect("[do_apply]: no write handle").apply(cmd); + self.write.as_ref().expect("[do_apply]: no write handle, not handled on shard0").apply(cmd); } } };
