This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch experiment
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 56b91820615e3447f75586e0834267f7948a8aaa
Author: numinex <[email protected]>
AuthorDate: Thu Jan 15 22:05:31 2026 +0100

    abstract factory
---
 core/metadata/src/stm/mod.rs | 102 ++++++++++++++++++++++++-------------------
 1 file changed, 57 insertions(+), 45 deletions(-)

diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs
index aba963ddf..3786224c1 100644
--- a/core/metadata/src/stm/mod.rs
+++ b/core/metadata/src/stm/mod.rs
@@ -26,11 +26,17 @@ use std::cell::UnsafeCell;
 // WriteCell - Interior mutability wrapper for WriteHandle
 // ============================================================================
 
-pub struct WriteCell<T: left_right::Absorb<O>, O> {
+pub struct WriteCell<T, O>
+where
+    T: left_right::Absorb<O>,
+{
     inner: UnsafeCell<left_right::WriteHandle<T, O>>,
 }
 
-impl<T: left_right::Absorb<O>, O> WriteCell<T, O> {
+impl<T, O> WriteCell<T, O>
+where
+    T: left_right::Absorb<O>,
+{
     pub fn new(write: left_right::WriteHandle<T, O>) -> Self {
         Self {
             inner: UnsafeCell::new(write),
@@ -38,14 +44,24 @@ impl<T: left_right::Absorb<O>, O> WriteCell<T, O> {
     }
 
     pub fn apply(&self, cmd: O) {
-        // SAFETY: WriteCell is !Sync (via UnsafeCell), so only one thread can 
access.
-        // The caller ensures exclusive access through the &self borrow.
+        // SAFETY: This method is called from the `Inner` struct of the 
`State` wrapper, we cover it beind an `Option`
+        // where only one shard owns the `Some` type, thus all of the accesses 
are single-threaded.
         unsafe {
             (*self.inner.get()).append(cmd).publish();
         }
     }
 }
 
+impl<T, O> std::fmt::Debug for WriteCell<T, O>
+where
+    T: left_right::Absorb<O> + std::fmt::Debug,
+{
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        // SAFETY: Only reading for debug purposes
+        unsafe { (*self.inner.get()).fmt(f) }
+    }
+}
+
 /// Parses input into a command.
 pub trait Command {
     type Cmd;
@@ -67,6 +83,35 @@ pub trait ApplyState {
     fn do_apply(&self, cmd: <Self::Inner as Command>::Cmd) -> Self::Output;
 }
 
+// TODO: Move the factory shiet to different crate.
+pub trait AbstractFactory<T> {
+    type Inner;
+
+    fn factory<F>(&self) -> F
+    where
+        F: Factory<Constructable<Self::Inner> = T> + Default,
+    {
+        F::default()
+    }
+}
+
+pub struct StateFactory<S> {
+    _marker: std::marker::PhantomData<S>,
+}
+
+impl<S> AbstractFactory<S> for StateFactory<S>
+where
+    S: ApplyState,
+{
+    type Inner = S::Inner;
+}
+
+pub trait Factory {
+    type Constructable<T>;
+
+    fn finish<T>(&self, inner: impl FnOnce() -> T) -> Self::Constructable<T>;
+}
+
 /// Public interface for state machines.
 pub trait State {
     type Output;
@@ -84,7 +129,7 @@ impl<T: ApplyState> State for T {
     }
 }
 
-pub(crate) trait StateMachine {
+pub trait StateMachine {
     type Input;
     type Output;
     fn update(&self, input: &Self::Input, output: &mut Vec<Self::Output>);
@@ -100,56 +145,26 @@ macro_rules! define_state {
         $command:ident,
         [$($operation:ident),* $(,)?]
     ) => {
-        #[derive(Debug, Clone, Default)]
+        #[derive(Debug, Clone)]
         pub struct $inner {
             $(
                 pub $field_name: $field_type,
             )*
         }
 
-        #[derive(Debug)]
+        #[derive(Debug, Clone)]
         pub enum $command {
             $(
                 $operation($operation),
             )*
         }
 
-        impl Clone for $command
-        where
-            $($operation: Clone,)*
-        {
-            fn clone(&self) -> Self {
-                match self {
-                    $(
-                        $command::$operation(payload) => 
$command::$operation(payload.clone()),
-                    )*
-                }
-            }
-        }
-
         /// State wrapper with interior mutability for write access.
         pub struct $state {
             write: Option<$crate::stm::WriteCell<$inner, $command>>,
             read: ::std::sync::Arc<::left_right::ReadHandle<$inner>>,
         }
 
-        impl $state {
-            /// Get a clone of the read handle.
-            pub fn read_handle(&self) -> 
::std::sync::Arc<::left_right::ReadHandle<$inner>> {
-                self.read.clone()
-            }
-
-            /// Get read access to the inner state.
-            pub fn read(&self) -> Option<::left_right::ReadGuard<'_, $inner>> {
-                self.read.enter()
-            }
-
-            /// Check if this instance has write capability.
-            pub fn has_write(&self) -> bool {
-                self.write.is_some()
-            }
-        }
-
         impl From<$inner> for $state {
             fn from(inner: $inner) -> Self {
                 let (write, read) = { let (w, r) = 
::left_right::new_from_empty(inner); (Some($crate::stm::WriteCell::new(w)), 
::std::sync::Arc::new(r)) };
@@ -165,13 +180,10 @@ macro_rules! define_state {
 
         impl ::std::fmt::Debug for $state {
             fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> 
::std::fmt::Result {
-                match self.read.enter() {
-                    Some(guard) => f.debug_struct(stringify!($state))
-                        .field("has_write", &self.write.is_some())
-                        .field("inner", &*guard)
-                        .finish(),
-                    None => 
f.debug_struct(stringify!($state)).finish_non_exhaustive(),
-                }
+                f.debug_struct(stringify!($state))
+                    .field("write", &self.write)
+                    .field("inner", &self.read.enter())
+                    .finish()
             }
         }
 
@@ -228,7 +240,7 @@ macro_rules! define_state {
             type Output = ();
 
             fn do_apply(&self, cmd: $command) -> Self::Output {
-                self.write.as_ref().expect("[do_apply]: no write 
handle").apply(cmd);
+                self.write.as_ref().expect("[do_apply]: no write handle, not 
handled on shard0").apply(cmd);
             }
         }
     };

Reply via email to