This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch experiment
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 42bc498dc1f28fe802b97344375ef44bf1140ec5
Author: numinex <[email protected]>
AuthorDate: Fri Jan 16 22:05:37 2026 +0100

    refactor v2
---
 core/metadata/src/stm/mod.rs    | 191 ++++++++++++++++++++++------------------
 core/metadata/src/stm/stream.rs |  20 +++--
 core/metadata/src/stm/user.rs   |   8 +-
 3 files changed, 123 insertions(+), 96 deletions(-)

diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs
index 9f12cc071..518582bbd 100644
--- a/core/metadata/src/stm/mod.rs
+++ b/core/metadata/src/stm/mod.rs
@@ -1,4 +1,3 @@
-// Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
 // regarding copyright ownership.  The ASF licenses this file
@@ -52,33 +51,25 @@ where
     }
 }
 
-pub trait Container {
-    type Inner;
-}
-
+/// Parses type-erased input into a command. Macro-generated.
 pub trait Command {
     type Cmd;
     type Input;
 
-    fn into_command(input: &Self::Input) -> Option<Self::Cmd>;
+    fn parse(input: &Self::Input) -> Option<Self::Cmd>;
 }
 
-pub trait Handle: Command {
+/// Handles commands. User-implemented business logic.
+pub trait Handler: Command {
     fn handle(&mut self, cmd: &Self::Cmd);
 }
 
-pub trait ApplyState: Container {
+/// Storage abstraction: applies commands to inner state.
+pub trait ApplyState {
+    type Inner: Handler;
     type Output;
 
-    fn apply_cmd(&self, cmd: <<Self as Container>::Inner as Command>::Cmd) -> 
Self::Output
-    where
-        <Self as Container>::Inner: Command;
-}
-
-pub trait Factory<T> {
-    type Constructable;
-
-    fn create(&self, inner: impl FnOnce() -> T) -> Self::Constructable;
+    fn do_apply(&self, cmd: <Self::Inner as Command>::Cmd) -> Self::Output;
 }
 
 pub struct LeftRight<T, C>
@@ -102,29 +93,14 @@ where
     }
 }
 
-impl<T, C> LeftRight<T, C>
+impl<T> ApplyState for LeftRight<T, <T as Command>::Cmd>
 where
-    T: Absorb<C>,
-{
-    pub fn read(&self) -> Arc<ReadHandle<T>> {
-        self.read.clone()
-    }
-}
-
-impl<T, C> Container for LeftRight<T, C>
-where
-    T: Absorb<C>,
+    T: Absorb<<T as Command>::Cmd> + Clone + Handler,
 {
     type Inner = T;
-}
-
-impl<T, C> ApplyState for LeftRight<T, C>
-where
-    T: Command<Cmd = C> + Absorb<C>,
-{
     type Output = ();
 
-    fn apply_cmd(&self, cmd: C) -> Self::Output {
+    fn do_apply(&self, cmd: <Self::Inner as Command>::Cmd) -> Self::Output {
         self.write
             .as_ref()
             .expect("no write handle - not the owner shard")
@@ -132,20 +108,6 @@ where
     }
 }
 
-#[derive(Default)]
-pub struct LeftRightFactory;
-
-impl<T> Factory<T> for LeftRightFactory
-where
-    T: Absorb<<T as Command>::Cmd> + Clone + Command,
-{
-    type Constructable = LeftRight<T, <T as Command>::Cmd>;
-
-    fn create(&self, inner: impl FnOnce() -> T) -> Self::Constructable {
-        LeftRight::new(inner())
-    }
-}
-
 /// Public interface for state machines.
 pub trait State {
     type Output;
@@ -160,6 +122,42 @@ pub trait StateMachine {
     fn update(&self, input: &Self::Input, output: &mut Vec<Self::Output>);
 }
 
+/// Generates a state machine with pluggable storage.
+///
+/// # Generated items
+/// - `$inner` struct with the specified fields (the data)
+/// - `$command` enum with variants for each operation
+/// - `$state<S: ApplyState<Inner = $inner>>` wrapper struct (storage-agnostic)
+/// - `Command` impl for `$inner` (parsing)
+/// - `Absorb` impl for `$inner` (delegates to `Handler::handle`)
+/// - `State` impl for `$state<S>`
+/// - `From<S>` impl for `$state<S>`
+///
+/// # User must implement
+/// - `Handler` for `$inner` (business logic)
+///
+/// # Example
+/// ```ignore
+/// define_state! {
+///     Streams,
+///     StreamsInner {
+///         index: AHashMap<String, usize>,
+///         items: Slab<Stream>,
+///     },
+///     StreamsCommand,
+///     [CreateStream, UpdateStream, DeleteStream]
+/// }
+///
+/// // User implements Handler manually:
+/// impl Handler for StreamsInner {
+///     fn handle(&mut self, cmd: &StreamsCommand) {
+///         match cmd {
+///             StreamsCommand::CreateStream(payload) => { /* ... */ }
+///             // ...
+///         }
+///     }
+/// }
+/// ```
 #[macro_export]
 macro_rules! define_state {
     (
@@ -170,7 +168,7 @@ macro_rules! define_state {
         $command:ident,
         [$($operation:ident),* $(,)?]
     ) => {
-        #[derive(Debug, Clone)]
+        #[derive(Debug, Clone, Default)]
         pub struct $inner {
             $(
                 pub $field_name: $field_type,
@@ -184,31 +182,26 @@ macro_rules! define_state {
             )*
         }
 
-        /// State wrapper with interior mutability for write access.
-        pub struct $state {
-            write: Option<$crate::stm::WriteCell<$inner, $command>>,
-            read: ::std::sync::Arc<::left_right::ReadHandle<$inner>>,
+        pub struct $state<S: $crate::stm::ApplyState<Inner = $inner>> {
+            inner: S,
         }
 
-        impl From<$inner> for $state {
-            fn from(inner: $inner) -> Self {
-                let (write, read) = { let (w, r) = 
::left_right::new_from_empty(inner); (Some($crate::stm::WriteCell::new(w)), 
::std::sync::Arc::new(r)) };
-                Self { write, read }
+        impl<S: $crate::stm::ApplyState<Inner = $inner>> From<S> for $state<S> 
{
+            fn from(storage: S) -> Self {
+                Self { inner: storage }
             }
         }
 
-        impl From<::std::sync::Arc<::left_right::ReadHandle<$inner>>> for 
$state {
-            fn from(read: ::std::sync::Arc<::left_right::ReadHandle<$inner>>) 
-> Self {
-                Self { write: None, read }
-            }
-        }
+        impl<S> $crate::stm::State for $state<S>
+        where
+            S: $crate::stm::ApplyState<Inner = $inner>,
+        {
+            type Input = <$inner as $crate::stm::Command>::Input;
+            type Output = S::Output;
 
-        impl ::std::fmt::Debug for $state {
-            fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> 
::std::fmt::Result {
-                f.debug_struct(stringify!($state))
-                    .field("write", &self.write)
-                    .field("inner", &self.read.enter())
-                    .finish()
+            fn apply(&self, input: &Self::Input) -> Option<Self::Output> {
+                <$inner as $crate::stm::Command>::parse(input)
+                    .map(|cmd| self.inner.do_apply(cmd))
             }
         }
 
@@ -216,7 +209,7 @@ macro_rules! define_state {
             type Cmd = $command;
             type Input = 
::iggy_common::message::Message<::iggy_common::header::PrepareHeader>;
 
-            fn into_command(input: &Self::Input) -> Option<Self::Cmd> {
+            fn parse(input: &Self::Input) -> Option<Self::Cmd> {
                 use ::iggy_common::BytesSerializable;
                 use ::iggy_common::header::Operation;
 
@@ -234,18 +227,17 @@ macro_rules! define_state {
             }
         }
 
+        /*
         impl ::left_right::Absorb<$command> for $inner
         where
-            $inner: $crate::stm::Handle,
+            $inner: $crate::stm::Handler,
         {
             fn absorb_first(&mut self, cmd: &mut $command, _other: &Self) {
-                use $crate::stm::Handle;
-                self.handle(cmd);
+                <Self as $crate::stm::Handler>::handle(self, cmd);
             }
 
             fn absorb_second(&mut self, cmd: $command, _other: &Self) {
-                use $crate::stm::Handle;
-                self.handle(&cmd);
+                <Self as $crate::stm::Handler>::handle(self, &cmd);
             }
 
             fn sync_with(&mut self, first: &Self) {
@@ -253,20 +245,51 @@ macro_rules! define_state {
             }
 
             fn drop_first(self: Box<Self>) {}
-
             fn drop_second(self: Box<Self>) {}
         }
+        */
+    };
+}
 
-        impl $crate::stm::ApplyState for $state
-        where
-            $inner: $crate::stm::Handle,
-        {
-            type Inner = $inner;
-            type Output = ();
+// This macro is really sad, but we can't do blanket impl from below, due to 
orphan rule.
+// impl<T> Absorb<T::Cmd> for T
+// where
+//     T: Handler + Clone,
+// {
+//     fn absorb_first(&mut self, cmd: &mut T::Cmd, _other: &Self) {
+//         self.handle(cmd);
+
+//     }
 
-            fn do_apply(&self, cmd: $command) -> Self::Output {
-                self.write.as_ref().expect("[do_apply]: no write handle, not 
handled on shard0").apply(cmd);
+//     fn absorb_second(&mut self, cmd: T::Cmd, _other: &Self) {
+//         self.handle(&cmd);
+//     }
+
+//     fn sync_with(&mut self, first: &Self) {
+//         *self = first.clone();
+//     }
+
+//     fn drop_first(self: Box<Self>) {}
+//     fn drop_second(self: Box<Self>) {}
+// }
+#[macro_export]
+macro_rules! impl_absorb {
+    ($inner:ident, $cmd:ident) => {
+        impl left_right::Absorb<$cmd> for $inner {
+            fn absorb_first(&mut self, cmd: &mut $cmd, _other: &Self) {
+                self.handle(cmd);
+            }
+
+            fn absorb_second(&mut self, cmd: $cmd, _other: &Self) {
+                self.handle(&cmd);
+            }
+
+            fn sync_with(&mut self, first: &Self) {
+                *self = first.clone();
             }
         }
     };
 }
+
+/*
+*/
diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs
index 2ac10843b..f301b511a 100644
--- a/core/metadata/src/stm/stream.rs
+++ b/core/metadata/src/stm/stream.rs
@@ -15,15 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::define_state;
 use crate::stats::{StreamStats, TopicStats};
-use crate::stm::Handle;
+use crate::stm::{Handler, LeftRight};
+use crate::{define_state, impl_absorb};
 use ahash::AHashMap;
 use iggy_common::create_stream::CreateStream;
 use iggy_common::delete_stream::DeleteStream;
 use iggy_common::purge_stream::PurgeStream;
 use iggy_common::update_stream::UpdateStream;
 use iggy_common::{CompressionAlgorithm, IggyExpiry, IggyTimestamp, 
MaxTopicSize};
+use left_right::Absorb;
 use slab::Slab;
 use std::sync::Arc;
 
@@ -245,9 +246,15 @@ impl Stream {
     }
 }
 
-// ============================================================================
-// Streams State Machine
-// ============================================================================
+fn foo() {
+    let streams_inner = StreamsInner {
+        index: AHashMap::new(),
+        items: Slab::new(),
+    };
+
+    let streams: LeftRight<StreamsInner, StreamsCommand> = 
streams_inner.into();
+    let streams_2: Streams<LeftRight<StreamsInner, StreamsCommand>> = 
streams.into();
+}
 
 define_state! {
     Streams,
@@ -258,8 +265,9 @@ define_state! {
     StreamsCommand,
     [CreateStream, UpdateStream, DeleteStream, PurgeStream]
 }
+impl_absorb!(StreamsInner, StreamsCommand);
 
-impl Handle for StreamsInner {
+impl Handler for StreamsInner {
     fn handle(&mut self, cmd: &StreamsCommand) {
         match cmd {
             StreamsCommand::CreateStream(_payload) => {
diff --git a/core/metadata/src/stm/user.rs b/core/metadata/src/stm/user.rs
index 61cd0ddd6..64285b22d 100644
--- a/core/metadata/src/stm/user.rs
+++ b/core/metadata/src/stm/user.rs
@@ -17,7 +17,7 @@
 
 use crate::define_state;
 use crate::permissioner::Permissioner;
-use crate::stm::Handle;
+use crate::stm::{Handle, Handler};
 use ahash::AHashMap;
 use iggy_common::change_password::ChangePassword;
 use iggy_common::create_user::CreateUser;
@@ -62,10 +62,6 @@ impl User {
     }
 }
 
-// ============================================================================
-// Users State Machine
-// ============================================================================
-
 define_state! {
     Users,
     UsersInner {
@@ -77,7 +73,7 @@ define_state! {
     [CreateUser, UpdateUser, DeleteUser, ChangePassword, UpdatePermissions]
 }
 
-impl Handle for UsersInner {
+impl Handler for UsersInner {
     fn handle(&mut self, cmd: &UsersCommand) {
         match cmd {
             UsersCommand::CreateUser(_payload) => {

Reply via email to