This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch experiment
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit c4921a300e8b04a777b4668a4108ed12562a57a2
Author: numinex <[email protected]>
AuthorDate: Thu Jan 15 11:03:25 2026 +0100

    continue
---
 core/metadata/src/stm/mod.rs    | 66 +++++++++++++----------------------------
 core/metadata/src/stm/stream.rs |  9 ++----
 core/metadata/src/stm/user.rs   |  9 ++----
 3 files changed, 27 insertions(+), 57 deletions(-)

diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs
index 5baeeb421..aba963ddf 100644
--- a/core/metadata/src/stm/mod.rs
+++ b/core/metadata/src/stm/mod.rs
@@ -37,20 +37,15 @@ impl<T: left_right::Absorb<O>, O> WriteCell<T, O> {
         }
     }
 
-    pub fn with<F, R>(&self, f: F) -> R
-    where
-        F: FnOnce(&mut left_right::WriteHandle<T, O>) -> R,
-    {
+    pub fn apply(&self, cmd: O) {
         // SAFETY: WriteCell is !Sync (via UnsafeCell), so only one thread can 
access.
         // The caller ensures exclusive access through the &self borrow.
-        unsafe { f(&mut *self.inner.get()) }
+        unsafe {
+            (*self.inner.get()).append(cmd).publish();
+        }
     }
 }
 
-// ============================================================================
-// Traits - All use &self, implementations use interior mutability
-// ============================================================================
-
 /// Parses input into a command.
 pub trait Command {
     type Cmd;
@@ -59,27 +54,20 @@ pub trait Command {
     fn into_command(input: &Self::Input) -> Option<Self::Cmd>;
 }
 
-/// Dispatches a command to mutate state.
-/// Takes `&mut self` - implementations use interior mutability.
-pub trait Dispatch {
-    type Cmd;
-    type Output;
-
-    fn dispatch(&self, cmd: &Self::Cmd) -> Self::Output;
+/// Handles a command to mutate state.
+pub trait Handle: Command {
+    fn handle(&mut self, cmd: &<Self as Command>::Cmd);
 }
 
 /// Applies a command through a state wrapper.
-/// Takes `&self` - implementations use interior mutability.
 pub trait ApplyState {
-    type Cmd;
+    type Inner: Command;
     type Output;
-    type Inner: Command<Cmd = Self::Cmd>;
 
-    fn do_apply(&self, cmd: Self::Cmd) -> Option<Self::Output>;
+    fn do_apply(&self, cmd: <Self::Inner as Command>::Cmd) -> Self::Output;
 }
 
 /// Public interface for state machines.
-/// Takes `&self` - implementations use interior mutability.
 pub trait State {
     type Output;
     type Input;
@@ -87,16 +75,12 @@ pub trait State {
     fn apply(&self, input: &Self::Input) -> Option<Self::Output>;
 }
 
-impl<T> State for T
-where
-    T: ApplyState,
-{
+impl<T: ApplyState> State for T {
     type Output = T::Output;
     type Input = <T::Inner as Command>::Input;
 
     fn apply(&self, input: &Self::Input) -> Option<Self::Output> {
-        let cmd = T::Inner::into_command(input)?;
-        self.do_apply(cmd)
+        T::Inner::into_command(input).map(|cmd| self.do_apply(cmd))
     }
 }
 
@@ -168,15 +152,12 @@ macro_rules! define_state {
 
         impl From<$inner> for $state {
             fn from(inner: $inner) -> Self {
-                let (write, read) = ::left_right::new_from_empty(inner);
-                let write = Some($crate::stm::WriteCell::new(write));
-                let read = ::std::sync::Arc::new(read);
+                let (write, read) = { let (w, r) = 
::left_right::new_from_empty(inner); (Some($crate::stm::WriteCell::new(w)), 
::std::sync::Arc::new(r)) };
                 Self { write, read }
             }
         }
 
         impl From<::std::sync::Arc<::left_right::ReadHandle<$inner>>> for 
$state {
-            /// Create a read-only instance from a read handle.
             fn from(read: ::std::sync::Arc<::left_right::ReadHandle<$inner>>) 
-> Self {
                 Self { write: None, read }
             }
@@ -218,16 +199,16 @@ macro_rules! define_state {
 
         impl ::left_right::Absorb<$command> for $inner
         where
-            $inner: $crate::stm::Dispatch<Cmd = $command>,
+            $inner: $crate::stm::Handle,
         {
             fn absorb_first(&mut self, cmd: &mut $command, _other: &Self) {
-                use $crate::stm::Dispatch;
-                self.dispatch(cmd);
+                use $crate::stm::Handle;
+                self.handle(cmd);
             }
 
             fn absorb_second(&mut self, cmd: $command, _other: &Self) {
-                use $crate::stm::Dispatch;
-                self.dispatch(&cmd);
+                use $crate::stm::Handle;
+                self.handle(&cmd);
             }
 
             fn sync_with(&mut self, first: &Self) {
@@ -241,18 +222,13 @@ macro_rules! define_state {
 
         impl $crate::stm::ApplyState for $state
         where
-            $inner: $crate::stm::Dispatch<Cmd = $command>,
+            $inner: $crate::stm::Handle,
         {
-            type Cmd = $command;
-            type Output = ();
             type Inner = $inner;
+            type Output = ();
 
-            fn do_apply(&self, cmd: Self::Cmd) -> Option<Self::Output> {
-                let write_cell = self.write.as_ref()?;
-                write_cell.with(|write| {
-                    write.append(cmd).publish();
-                });
-                Some(())
+            fn do_apply(&self, cmd: $command) -> Self::Output {
+                self.write.as_ref().expect("[do_apply]: no write 
handle").apply(cmd);
             }
         }
     };
diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs
index 844a30565..2ac10843b 100644
--- a/core/metadata/src/stm/stream.rs
+++ b/core/metadata/src/stm/stream.rs
@@ -17,7 +17,7 @@
 
 use crate::define_state;
 use crate::stats::{StreamStats, TopicStats};
-use crate::stm::Dispatch;
+use crate::stm::Handle;
 use ahash::AHashMap;
 use iggy_common::create_stream::CreateStream;
 use iggy_common::delete_stream::DeleteStream;
@@ -259,11 +259,8 @@ define_state! {
     [CreateStream, UpdateStream, DeleteStream, PurgeStream]
 }
 
-impl Dispatch for StreamsInner {
-    type Cmd = StreamsCommand;
-    type Output = ();
-
-    fn dispatch(&self, cmd: &Self::Cmd) -> Self::Output {
+impl Handle for StreamsInner {
+    fn handle(&mut self, cmd: &StreamsCommand) {
         match cmd {
             StreamsCommand::CreateStream(_payload) => {
                 // Actual mutation logic will be implemented later
diff --git a/core/metadata/src/stm/user.rs b/core/metadata/src/stm/user.rs
index 043ec0bd2..61cd0ddd6 100644
--- a/core/metadata/src/stm/user.rs
+++ b/core/metadata/src/stm/user.rs
@@ -17,7 +17,7 @@
 
 use crate::define_state;
 use crate::permissioner::Permissioner;
-use crate::stm::Dispatch;
+use crate::stm::Handle;
 use ahash::AHashMap;
 use iggy_common::change_password::ChangePassword;
 use iggy_common::create_user::CreateUser;
@@ -77,11 +77,8 @@ define_state! {
     [CreateUser, UpdateUser, DeleteUser, ChangePassword, UpdatePermissions]
 }
 
-impl Dispatch for UsersInner {
-    type Cmd = UsersCommand;
-    type Output = ();
-
-    fn dispatch(&self, cmd: &Self::Cmd) -> Self::Output {
+impl Handle for UsersInner {
+    fn handle(&mut self, cmd: &UsersCommand) {
         match cmd {
             UsersCommand::CreateUser(_payload) => {
                 // Actual mutation logic will be implemented later

Reply via email to