This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch experiment
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit f944a2e1e69880f615eb960afe704ff8369d44a5
Author: numinex <[email protected]>
AuthorDate: Fri Jan 16 15:58:47 2026 +0100

    refactor
---
 core/metadata/src/stm/mod.rs | 141 ++++++++++++++++++++++++++-----------------
 1 file changed, 84 insertions(+), 57 deletions(-)

diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs
index a2d5a3c79..9f12cc071 100644
--- a/core/metadata/src/stm/mod.rs
+++ b/core/metadata/src/stm/mod.rs
@@ -20,56 +20,42 @@ pub mod mux;
 pub mod stream;
 pub mod user;
 
-use std::{cell::UnsafeCell, default};
-
-use iggy_common::Stream;
-
-use crate::stm::{
-    stream::{Streams, StreamsInner},
-    user::{Users, UsersInner},
-};
-
-// ============================================================================
-// WriteCell - Interior mutability wrapper for WriteHandle
-// ============================================================================
+use left_right::*;
+use std::cell::UnsafeCell;
+use std::sync::Arc;
 
 pub struct WriteCell<T, O>
 where
-    T: left_right::Absorb<O>,
+    T: Absorb<O>,
 {
-    inner: UnsafeCell<left_right::WriteHandle<T, O>>,
+    inner: UnsafeCell<WriteHandle<T, O>>,
 }
 
 impl<T, O> WriteCell<T, O>
 where
-    T: left_right::Absorb<O>,
+    T: Absorb<O>,
 {
-    pub fn new(write: left_right::WriteHandle<T, O>) -> Self {
+    pub fn new(write: WriteHandle<T, O>) -> Self {
         Self {
             inner: UnsafeCell::new(write),
         }
     }
 
     pub fn apply(&self, cmd: O) {
-        // SAFETY: This method is called from the `Inner` struct of the 
`State` wrapper, we cover it beind an `Option`
-        // where only one shard owns the `Some` type, thus all of the accesses 
are single-threaded.
-        unsafe {
-            (*self.inner.get()).append(cmd).publish();
-        }
+        let hdl = unsafe {
+            self.inner
+                .get()
+                .as_mut()
+                .expect("[apply]: called on uninit writer, for cmd: {cmd}")
+        };
+        hdl.append(cmd).publish();
     }
 }
 
-impl<T, O> std::fmt::Debug for WriteCell<T, O>
-where
-    T: left_right::Absorb<O> + std::fmt::Debug,
-{
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        // SAFETY: Only reading for debug purposes
-        unsafe { (*self.inner.get()).fmt(f) }
-    }
+pub trait Container {
+    type Inner;
 }
 
-/// Parses input into a command.
 pub trait Command {
     type Cmd;
     type Input;
@@ -77,36 +63,86 @@ pub trait Command {
     fn into_command(input: &Self::Input) -> Option<Self::Cmd>;
 }
 
-/// Handles a command to mutate state.
 pub trait Handle: Command {
-    fn handle(&mut self, cmd: &<Self as Command>::Cmd);
+    fn handle(&mut self, cmd: &Self::Cmd);
 }
 
-/// Applies a command through a state wrapper.
-pub trait ApplyState {
-    type Inner: Command;
+pub trait ApplyState: Container {
     type Output;
 
-    fn do_apply(&self, cmd: <Self::Inner as Command>::Cmd) -> Self::Output;
+    fn apply_cmd(&self, cmd: <<Self as Container>::Inner as Command>::Cmd) -> 
Self::Output
+    where
+        <Self as Container>::Inner: Command;
+}
+
+pub trait Factory<T> {
+    type Constructable;
+
+    fn create(&self, inner: impl FnOnce() -> T) -> Self::Constructable;
 }
 
-// TODO: Move the factory shiet out of there.
-fn create_factory<S, F>() -> F
+pub struct LeftRight<T, C>
 where
-    S: ApplyState,
-    F: Factory<Constructable<S::Inner> = S> + Default,
+    T: Absorb<C>,
 {
-    F::default()
+    write: Option<WriteCell<T, C>>,
+    read: Arc<ReadHandle<T>>,
 }
 
-pub trait Factory {
-    type Constructable<T>;
+impl<T> From<T> for LeftRight<T, <T as Command>::Cmd>
+where
+    T: Absorb<<T as Command>::Cmd> + Clone + Command,
+{
+    fn from(inner: T) -> Self {
+        let (write, read) = {
+            let (w, r) = left_right::new_from_empty(inner);
+            (WriteCell::new(w).into(), r.into())
+        };
+        Self { write, read }
+    }
+}
 
-    fn finish<T>(&self, inner: impl FnOnce() -> T) -> Self::Constructable<T>
-    where
-        T: Into<Self::Constructable<T>>,
-    {
-        inner().into()
+impl<T, C> LeftRight<T, C>
+where
+    T: Absorb<C>,
+{
+    pub fn read(&self) -> Arc<ReadHandle<T>> {
+        self.read.clone()
+    }
+}
+
+impl<T, C> Container for LeftRight<T, C>
+where
+    T: Absorb<C>,
+{
+    type Inner = T;
+}
+
+impl<T, C> ApplyState for LeftRight<T, C>
+where
+    T: Command<Cmd = C> + Absorb<C>,
+{
+    type Output = ();
+
+    fn apply_cmd(&self, cmd: C) -> Self::Output {
+        self.write
+            .as_ref()
+            .expect("no write handle - not the owner shard")
+            .apply(cmd);
+    }
+}
+
+#[derive(Default)]
+pub struct LeftRightFactory;
+
+impl<T> Factory<T> for LeftRightFactory
+where
+    T: Absorb<<T as Command>::Cmd> + Clone + Command,
+{
+    type Constructable = LeftRight<T, <T as Command>::Cmd>;
+
+    fn create(&self, inner: impl FnOnce() -> T) -> Self::Constructable {
+        LeftRight::new(inner())
     }
 }
 
@@ -118,15 +154,6 @@ pub trait State {
     fn apply(&self, input: &Self::Input) -> Option<Self::Output>;
 }
 
-impl<T: ApplyState> State for T {
-    type Output = T::Output;
-    type Input = <T::Inner as Command>::Input;
-
-    fn apply(&self, input: &Self::Input) -> Option<Self::Output> {
-        T::Inner::into_command(input).map(|cmd| self.do_apply(cmd))
-    }
-}
-
 pub trait StateMachine {
     type Input;
     type Output;

Reply via email to