This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch stm-output
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 95c11e235b6d60e46d5740051505a7b2a89779dd
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Feb 9 17:51:08 2026 +0100

    refactor(metadata):add typed return values to state handlers, split 
stm/mod.rs and
    
    mod.rs mixed traits, unsafe left-right infra, and code-gen macros
    in a single 400-line file, making each concern hard to review
    independently. StateHandler also returned void, preventing
    commands from producing results through the left-right layer.
    
    Split into focused modules (traits.rs, left_right.rs, macros.rs)
    and introduce CommandEnvelope to smuggle typed return values out
    of Absorb's void interface. StateHandler and Command now carry an
    Output associated type. WriteCell gains a debug-mode reentrancy
    guard and LeftRight exposes with_state() for lock-free reads.
---
 Cargo.lock                                  |   2 +
 core/metadata/Cargo.toml                    |   4 +
 core/metadata/src/stm/consumer_group.rs     |   8 +-
 core/metadata/src/stm/left_right.rs         | 146 +++++++++++++++
 core/metadata/src/stm/{mod.rs => macros.rs} | 270 +++++++++++++---------------
 core/metadata/src/stm/mod.rs                | 252 +-------------------------
 core/metadata/src/stm/mux.rs                | 100 +++++++++--
 core/metadata/src/stm/stream.rs             |  38 ++--
 core/metadata/src/stm/traits.rs             |  54 ++++++
 core/metadata/src/stm/user.rs               |  34 ++--
 10 files changed, 464 insertions(+), 444 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 0d5781ce3..f9910a457 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5656,6 +5656,8 @@ name = "metadata"
 version = "0.1.0"
 dependencies = [
  "ahash 0.8.12",
+ "bytemuck",
+ "bytes",
  "consensus",
  "iggy_common",
  "journal",
diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml
index 193bf5788..c5a7dc776 100644
--- a/core/metadata/Cargo.toml
+++ b/core/metadata/Cargo.toml
@@ -37,3 +37,7 @@ message_bus = { workspace = true }
 paste = { workspace = true }
 slab = { workspace = true }
 tracing = { workspace = true }
+
+[dev-dependencies]
+bytemuck = { workspace = true }
+bytes = { workspace = true }
diff --git a/core/metadata/src/stm/consumer_group.rs 
b/core/metadata/src/stm/consumer_group.rs
index 71babb412..12314ad2f 100644
--- a/core/metadata/src/stm/consumer_group.rs
+++ b/core/metadata/src/stm/consumer_group.rs
@@ -20,7 +20,7 @@ use crate::{collect_handlers, define_state};
 use ahash::AHashMap;
 use iggy_common::create_consumer_group::CreateConsumerGroup;
 use iggy_common::delete_consumer_group::DeleteConsumerGroup;
-use iggy_common::{IdKind, Identifier};
+use iggy_common::{IdKind, Identifier, IggyTimestamp};
 use slab::Slab;
 use std::sync::Arc;
 use std::sync::atomic::AtomicUsize;
@@ -165,7 +165,8 @@ impl ConsumerGroupsInner {
 
 impl StateHandler for CreateConsumerGroup {
     type State = ConsumerGroupsInner;
-    fn apply(&self, state: &mut ConsumerGroupsInner) {
+    type Output = ();
+    fn apply(&self, state: &mut ConsumerGroupsInner, _now: IggyTimestamp) -> 
Self::Output {
         let name: Arc<str> = Arc::from(self.name.as_str());
         if state.name_index.contains_key(&name) {
             return;
@@ -200,7 +201,8 @@ impl StateHandler for CreateConsumerGroup {
 
 impl StateHandler for DeleteConsumerGroup {
     type State = ConsumerGroupsInner;
-    fn apply(&self, state: &mut ConsumerGroupsInner) {
+    type Output = ();
+    fn apply(&self, state: &mut ConsumerGroupsInner, _now: IggyTimestamp) -> 
Self::Output {
         let Some(id) = state.resolve_consumer_group_id_by_identifiers(
             &self.stream_id,
             &self.topic_id,
diff --git a/core/metadata/src/stm/left_right.rs 
b/core/metadata/src/stm/left_right.rs
new file mode 100644
index 000000000..472558fce
--- /dev/null
+++ b/core/metadata/src/stm/left_right.rs
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::traits::Command;
+use iggy_common::IggyTimestamp;
+use left_right::*;
+use std::cell::{Cell, UnsafeCell};
+use std::sync::Arc;
+
+/// Wraps a command with a raw pointer to a stack-allocated output slot.
+///
+/// Used to smuggle return values out of `left_right`'s `Absorb` trait,
+/// which returns void. The pointer targets a stack local in 
`WriteCell::apply()`
+/// and is only written during the synchronous `publish()` call.
+#[doc(hidden)]
+pub struct CommandEnvelope<C, R> {
+    pub(crate) cmd: C,
+    pub(crate) output: *mut Option<R>,
+    pub(crate) now: IggyTimestamp,
+}
+
+// Safety: The pointer targets a stack local in WriteCell::apply() and is only
+// dereferenced during the synchronous append()+publish() sequence on the same
+// thread. Single-writer architecture ensures no concurrent access.
+unsafe impl<C: Send, R: Send> Send for CommandEnvelope<C, R> {}
+
+pub struct WriteCell<T, C, R>
+where
+    T: Absorb<CommandEnvelope<C, R>>,
+{
+    inner: UnsafeCell<WriteHandle<T, CommandEnvelope<C, R>>>,
+    #[cfg(debug_assertions)]
+    in_apply: Cell<bool>,
+}
+
+impl<T, C, R> std::fmt::Debug for WriteCell<T, C, R>
+where
+    T: Absorb<CommandEnvelope<C, R>>,
+{
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("WriteCell").finish_non_exhaustive()
+    }
+}
+
+impl<T, C, R> WriteCell<T, C, R>
+where
+    T: Absorb<CommandEnvelope<C, R>>,
+{
+    pub fn new(write: WriteHandle<T, CommandEnvelope<C, R>>) -> Self {
+        Self {
+            inner: UnsafeCell::new(write),
+            #[cfg(debug_assertions)]
+            in_apply: Cell::new(false),
+        }
+    }
+
+    pub fn apply(&self, cmd: C) -> R {
+        #[cfg(debug_assertions)]
+        {
+            assert!(
+                !self.in_apply.replace(true),
+                "WriteCell::apply() called reentrantly — this would alias &mut 
WriteHandle"
+            );
+        }
+
+        let mut output: Option<R> = None;
+        let envelope = CommandEnvelope {
+            cmd,
+            output: &mut output,
+            now: IggyTimestamp::now(),
+        };
+        let hdl = unsafe {
+            self.inner
+                .get()
+                .as_mut()
+                .expect("[apply]: called on uninit writer")
+        };
+        hdl.append(envelope).publish();
+
+        #[cfg(debug_assertions)]
+        self.in_apply.set(false);
+
+        output.expect("[apply]: absorb did not produce output")
+    }
+}
+
+#[derive(Debug)]
+pub struct LeftRight<T, C, R>
+where
+    T: Absorb<CommandEnvelope<C, R>>,
+{
+    write: Option<WriteCell<T, C, R>>,
+    read: Arc<ReadHandle<T>>,
+}
+
+// TODO: Expose Arc<ReadHandle<T>> for cross-thread lock-free reads from 
non-owner shards.
+
+impl<T> From<T> for LeftRight<T, <T as Command>::Cmd, <T as Command>::Output>
+where
+    T: Absorb<CommandEnvelope<<T as Command>::Cmd, <T as Command>::Output>> + 
Clone + Command,
+{
+    fn from(inner: T) -> Self {
+        let (write, read) = {
+            let (w, r) = left_right::new_from_empty(inner);
+            (WriteCell::new(w).into(), r.into())
+        };
+        Self { write, read }
+    }
+}
+
+impl<T> LeftRight<T, <T as Command>::Cmd, <T as Command>::Output>
+where
+    T: Absorb<CommandEnvelope<<T as Command>::Cmd, <T as Command>::Output>> + 
Clone + Command,
+{
+    pub fn do_apply(&self, cmd: <T as Command>::Cmd) -> <T as Command>::Output 
{
+        self.write
+            .as_ref()
+            .expect("no write handle - not the owner shard")
+            .apply(cmd)
+    }
+
+    pub fn with_state<F, Ret>(&self, f: F) -> Ret
+    where
+        F: FnOnce(&T) -> Ret,
+    {
+        let guard = self
+            .read
+            .enter()
+            .expect("ReadHandle dropped - WriteHandle was deallocated");
+        f(&guard)
+    }
+}
diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/macros.rs
similarity index 52%
copy from core/metadata/src/stm/mod.rs
copy to core/metadata/src/stm/macros.rs
index 08bef35dd..130cbdbe4 100644
--- a/core/metadata/src/stm/mod.rs
+++ b/core/metadata/src/stm/macros.rs
@@ -15,129 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
-pub mod consumer_group;
-pub mod mux;
-pub mod stream;
-pub mod user;
-
-use left_right::*;
-use std::cell::UnsafeCell;
-use std::sync::Arc;
-
-pub struct WriteCell<T, O>
-where
-    T: Absorb<O>,
-{
-    inner: UnsafeCell<WriteHandle<T, O>>,
-}
-
-impl<T, O> std::fmt::Debug for WriteCell<T, O>
-where
-    T: Absorb<O>,
-{
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        f.debug_struct("WriteCell").finish_non_exhaustive()
-    }
-}
-
-impl<T, O> WriteCell<T, O>
-where
-    T: Absorb<O>,
-{
-    pub fn new(write: WriteHandle<T, O>) -> Self {
-        Self {
-            inner: UnsafeCell::new(write),
-        }
-    }
-
-    pub fn apply(&self, cmd: O) {
-        let hdl = unsafe {
-            self.inner
-                .get()
-                .as_mut()
-                .expect("[apply]: called on uninit writer, for cmd: {cmd}")
-        };
-        hdl.append(cmd).publish();
-    }
-}
-
-/// Parses type-erased input into a command. Macro-generated.
-/// Returns `Ok(cmd)` if applicable, `Err(input)` to pass ownership back.
-pub trait Command {
-    type Cmd;
-    type Input;
-
-    fn parse(input: Self::Input) -> Result<Self::Cmd, Self::Input>;
-}
-
-/// Per-command handler for a given state type.
-/// Each command struct implements this for the state it mutates.
-pub trait StateHandler {
-    type State;
-    fn apply(&self, state: &mut Self::State);
-}
-
-#[derive(Debug)]
-pub struct LeftRight<T, C>
-where
-    T: Absorb<C>,
-{
-    write: Option<WriteCell<T, C>>,
-    #[allow(unused)]
-    read: Arc<ReadHandle<T>>,
-}
-
-impl<T> From<T> for LeftRight<T, <T as Command>::Cmd>
-where
-    T: Absorb<<T as Command>::Cmd> + Clone + Command,
-{
-    fn from(inner: T) -> Self {
-        let (write, read) = {
-            let (w, r) = left_right::new_from_empty(inner);
-            (WriteCell::new(w).into(), r.into())
-        };
-        Self { write, read }
-    }
-}
-
-impl<T> LeftRight<T, <T as Command>::Cmd>
-where
-    T: Absorb<<T as Command>::Cmd> + Clone + Command,
-{
-    pub fn do_apply(&self, cmd: <T as Command>::Cmd) {
-        self.write
-            .as_ref()
-            .expect("no write handle - not the owner shard")
-            .apply(cmd);
-    }
-}
-
-/// Public interface for state machines.
-/// Returns `Ok(output)` if applicable, `Err(input)` to pass ownership back.
-pub trait State {
-    type Output;
-    type Input;
-
-    fn apply(&self, input: Self::Input) -> Result<Self::Output, Self::Input>;
-}
-
-pub trait StateMachine {
-    type Input;
-    type Output;
-    fn update(&self, input: Self::Input) -> Self::Output;
-}
-
-/// Generates the state's inner struct and wrapper type.
+/// Generates the state's inner struct.
 ///
 /// # Generated items
 /// - `{$state}Inner` struct with the specified fields (the data)
-/// - `$state` wrapper struct (contains LeftRight storage)
-/// - `From<LeftRight<...>>` impl for `$state`
-/// - `From<{$state}Inner>` impl for `$state`
 ///
-/// The command enum, parsing, dispatch, and Absorb impl are generated
-/// by `collect_handlers!` separately, keeping state definition decoupled
-/// from the set of operations.
+/// The wrapper struct, `From` impls, command enum, parsing, dispatch, and
+/// Absorb impl are all generated by `collect_handlers!`, which knows the
+/// output type.
 #[macro_export]
 macro_rules! define_state {
     (
@@ -158,47 +43,99 @@ macro_rules! define_state {
                     Self::default()
                 }
             }
-
-            #[derive(Debug)]
-            pub struct $state {
-                inner: $crate::stm::LeftRight<[<$state Inner>], [<$state 
Command>]>,
-            }
-
-            impl From<$crate::stm::LeftRight<[<$state Inner>], [<$state 
Command>]>> for $state {
-                fn from(inner: $crate::stm::LeftRight<[<$state Inner>], 
[<$state Command>]>) -> Self {
-                    Self { inner }
-                }
-            }
-
-            impl From<[<$state Inner>]> for $state {
-                fn from(inner: [<$state Inner>]) -> Self {
-                    let left_right: $crate::stm::LeftRight<[<$state Inner>], 
[<$state Command>]> = inner.into();
-                    left_right.into()
-                }
-            }
         }
     };
 }
 
-/// Generates the command enum, parsing, dispatch, State, and Absorb for a 
state type.
+/// Generates the command enum, wrapper struct, parsing, dispatch, State, and
+/// envelope-aware Absorb for a state type.
+///
+/// # Syntax
+/// ```ignore
+/// collect_handlers! { Streams { Op1, Op2 } }           // Output = ()
+/// collect_handlers! { Streams -> OutputType { Op1 } }   // Output = 
OutputType
+/// ```
 ///
 /// # Generated items
+/// - `$state` wrapper struct (contains LeftRight storage)
+/// - `From<LeftRight<...>>` impl for `$state`
+/// - `From<{$state}Inner>` impl for `$state`
 /// - `{$state}Command` enum with one variant per operation
 /// - `Command` impl for `{$state}Inner` (parses `Message<PrepareHeader>`)
 /// - `{$state}Inner::dispatch()` method (routes each variant to 
`StateHandler::apply()`)
 /// - `State` impl for `$state` wrapper
-/// - `Absorb<{$state}Command>` impl for `{$state}Inner`
+/// - `Absorb<CommandEnvelope<{$state}Command, Output>>` impl for 
`{$state}Inner`
 ///
 /// # Requirements
-/// Each listed operation type must implement `StateHandler<{$state}Inner>`.
+/// Each listed operation type must implement `StateHandler<State = 
{$state}Inner, Output = $output>`.
 #[macro_export]
 macro_rules! collect_handlers {
+    // Default output: ()
     (
         $state:ident {
             $($operation:ident),* $(,)?
         }
+    ) => {
+        $crate::__collect_handlers_impl!($state -> () { $($operation),* });
+    };
+    // Explicit output type
+    (
+        $state:ident -> $output:ty {
+            $($operation:ident),* $(,)?
+        }
+    ) => {
+        $crate::__collect_handlers_impl!($state -> $output { $($operation),* 
});
+    };
+}
+
+#[doc(hidden)]
+#[macro_export]
+macro_rules! __collect_handlers_impl {
+    (
+        $state:ident -> $output:ty {
+            $($operation:ident),*
+        }
     ) => {
         paste::paste! {
+            // --- wrapper struct + From impls (moved from define_state!) ---
+
+            #[derive(Debug)]
+            pub struct $state {
+                inner: $crate::stm::LeftRight<
+                    [<$state Inner>],
+                    [<$state Command>],
+                    $output,
+                >,
+            }
+
+            impl From<$crate::stm::LeftRight<[<$state Inner>], [<$state 
Command>], $output>> for $state {
+                fn from(inner: $crate::stm::LeftRight<[<$state Inner>], 
[<$state Command>], $output>) -> Self {
+                    Self { inner }
+                }
+            }
+
+            impl $state {
+                pub fn with_state<F, Ret>(&self, f: F) -> Ret
+                where
+                    F: FnOnce(&[<$state Inner>]) -> Ret,
+                {
+                    self.inner.with_state(f)
+                }
+            }
+
+            impl From<[<$state Inner>]> for $state {
+                fn from(inner: [<$state Inner>]) -> Self {
+                    let left_right: $crate::stm::LeftRight<
+                        [<$state Inner>],
+                        [<$state Command>],
+                        $output,
+                    > = inner.into();
+                    left_right.into()
+                }
+            }
+
+            // --- command enum ---
+
             #[derive(Debug, Clone)]
             pub enum [<$state Command>] {
                 $(
@@ -206,9 +143,12 @@ macro_rules! collect_handlers {
                 )*
             }
 
+            // --- Command trait ---
+
             impl $crate::stm::Command for [<$state Inner>] {
                 type Cmd = [<$state Command>];
                 type Input = 
::iggy_common::message::Message<::iggy_common::header::PrepareHeader>;
+                type Output = $output;
 
                 fn parse(input: Self::Input) -> Result<Self::Cmd, Self::Input> 
{
                     use ::iggy_common::BytesSerializable;
@@ -218,6 +158,7 @@ macro_rules! collect_handlers {
                         $(
                             Operation::$operation => {
                                 let body = input.body_bytes();
+                                // FIXME: return a parse error instead of 
panicking on malformed input
                                 Ok([<$state Command>]::$operation(
                                     $operation::from_bytes(body).unwrap()
                                 ))
@@ -228,36 +169,65 @@ macro_rules! collect_handlers {
                 }
             }
 
+            // --- dispatch ---
+
             impl [<$state Inner>] {
-                fn dispatch(&mut self, cmd: &[<$state Command>]) {
+                fn dispatch(&mut self, cmd: &[<$state Command>], now: 
::iggy_common::IggyTimestamp) -> $output {
                     match cmd {
                         $(
                             [<$state Command>]::$operation(payload) => {
-                                $crate::stm::StateHandler::apply(payload, 
self);
+                                $crate::stm::StateHandler::apply(payload, 
self, now)
                             },
                         )*
                     }
                 }
             }
 
+            // --- State trait ---
+
             impl $crate::stm::State for $state {
                 type Input = <[<$state Inner>] as $crate::stm::Command>::Input;
-                type Output = ();
+                type Output = $output;
 
                 fn apply(&self, input: Self::Input) -> Result<Self::Output, 
Self::Input> {
                     let cmd = <[<$state Inner>] as 
$crate::stm::Command>::parse(input)?;
-                    self.inner.do_apply(cmd);
-                    Ok(())
+                    Ok(self.inner.do_apply(cmd))
                 }
             }
 
-            impl left_right::Absorb<[<$state Command>]> for [<$state Inner>] {
-                fn absorb_first(&mut self, cmd: &mut [<$state Command>], 
_other: &Self) {
-                    self.dispatch(cmd);
+            // --- Absorb<CommandEnvelope<...>> ---
+
+            impl left_right::Absorb<$crate::stm::CommandEnvelope<[<$state 
Command>], $output>>
+                for [<$state Inner>]
+            {
+                fn absorb_first(
+                    &mut self,
+                    envelope: &mut $crate::stm::CommandEnvelope<[<$state 
Command>], $output>,
+                    _other: &Self,
+                ) {
+                    let result = self.dispatch(&envelope.cmd, envelope.now);
+                    if !envelope.output.is_null() {
+                        // Safety: pointer targets WriteCell::apply()'s stack 
local,
+                        // still alive during synchronous publish().
+                        unsafe { *envelope.output = Some(result) };
+                        // Nullify so the stale copy in absorb_second won't 
write
+                        // through a dangling pointer.
+                        envelope.output = std::ptr::null_mut();
+                    }
                 }
 
-                fn absorb_second(&mut self, cmd: [<$state Command>], _other: 
&Self) {
-                    self.dispatch(&cmd);
+                fn absorb_second(
+                    &mut self,
+                    mut envelope: $crate::stm::CommandEnvelope<[<$state 
Command>], $output>,
+                    _other: &Self,
+                ) {
+                    let result = self.dispatch(&envelope.cmd, envelope.now);
+                    if !envelope.output.is_null() {
+                        // Safety: on the `extend` (first-publish) path, this 
is
+                        // absorb_second and the pointer is still live.
+                        unsafe { *envelope.output = Some(result) };
+                        envelope.output = std::ptr::null_mut();
+                    }
                 }
 
                 fn sync_with(&mut self, first: &Self) {
diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs
index 08bef35dd..8274cb40a 100644
--- a/core/metadata/src/stm/mod.rs
+++ b/core/metadata/src/stm/mod.rs
@@ -16,254 +16,12 @@
 // under the License.
 
 pub mod consumer_group;
+mod left_right;
+mod macros;
 pub mod mux;
 pub mod stream;
+mod traits;
 pub mod user;
 
-use left_right::*;
-use std::cell::UnsafeCell;
-use std::sync::Arc;
-
-pub struct WriteCell<T, O>
-where
-    T: Absorb<O>,
-{
-    inner: UnsafeCell<WriteHandle<T, O>>,
-}
-
-impl<T, O> std::fmt::Debug for WriteCell<T, O>
-where
-    T: Absorb<O>,
-{
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        f.debug_struct("WriteCell").finish_non_exhaustive()
-    }
-}
-
-impl<T, O> WriteCell<T, O>
-where
-    T: Absorb<O>,
-{
-    pub fn new(write: WriteHandle<T, O>) -> Self {
-        Self {
-            inner: UnsafeCell::new(write),
-        }
-    }
-
-    pub fn apply(&self, cmd: O) {
-        let hdl = unsafe {
-            self.inner
-                .get()
-                .as_mut()
-                .expect("[apply]: called on uninit writer, for cmd: {cmd}")
-        };
-        hdl.append(cmd).publish();
-    }
-}
-
-/// Parses type-erased input into a command. Macro-generated.
-/// Returns `Ok(cmd)` if applicable, `Err(input)` to pass ownership back.
-pub trait Command {
-    type Cmd;
-    type Input;
-
-    fn parse(input: Self::Input) -> Result<Self::Cmd, Self::Input>;
-}
-
-/// Per-command handler for a given state type.
-/// Each command struct implements this for the state it mutates.
-pub trait StateHandler {
-    type State;
-    fn apply(&self, state: &mut Self::State);
-}
-
-#[derive(Debug)]
-pub struct LeftRight<T, C>
-where
-    T: Absorb<C>,
-{
-    write: Option<WriteCell<T, C>>,
-    #[allow(unused)]
-    read: Arc<ReadHandle<T>>,
-}
-
-impl<T> From<T> for LeftRight<T, <T as Command>::Cmd>
-where
-    T: Absorb<<T as Command>::Cmd> + Clone + Command,
-{
-    fn from(inner: T) -> Self {
-        let (write, read) = {
-            let (w, r) = left_right::new_from_empty(inner);
-            (WriteCell::new(w).into(), r.into())
-        };
-        Self { write, read }
-    }
-}
-
-impl<T> LeftRight<T, <T as Command>::Cmd>
-where
-    T: Absorb<<T as Command>::Cmd> + Clone + Command,
-{
-    pub fn do_apply(&self, cmd: <T as Command>::Cmd) {
-        self.write
-            .as_ref()
-            .expect("no write handle - not the owner shard")
-            .apply(cmd);
-    }
-}
-
-/// Public interface for state machines.
-/// Returns `Ok(output)` if applicable, `Err(input)` to pass ownership back.
-pub trait State {
-    type Output;
-    type Input;
-
-    fn apply(&self, input: Self::Input) -> Result<Self::Output, Self::Input>;
-}
-
-pub trait StateMachine {
-    type Input;
-    type Output;
-    fn update(&self, input: Self::Input) -> Self::Output;
-}
-
-/// Generates the state's inner struct and wrapper type.
-///
-/// # Generated items
-/// - `{$state}Inner` struct with the specified fields (the data)
-/// - `$state` wrapper struct (contains LeftRight storage)
-/// - `From<LeftRight<...>>` impl for `$state`
-/// - `From<{$state}Inner>` impl for `$state`
-///
-/// The command enum, parsing, dispatch, and Absorb impl are generated
-/// by `collect_handlers!` separately, keeping state definition decoupled
-/// from the set of operations.
-#[macro_export]
-macro_rules! define_state {
-    (
-        $state:ident {
-            $($field_name:ident : $field_type:ty),* $(,)?
-        }
-    ) => {
-        paste::paste! {
-            #[derive(Debug, Clone, Default)]
-            pub struct [<$state Inner>] {
-                $(
-                    pub $field_name: $field_type,
-                )*
-            }
-
-            impl [<$state Inner>] {
-                pub fn new() -> Self {
-                    Self::default()
-                }
-            }
-
-            #[derive(Debug)]
-            pub struct $state {
-                inner: $crate::stm::LeftRight<[<$state Inner>], [<$state 
Command>]>,
-            }
-
-            impl From<$crate::stm::LeftRight<[<$state Inner>], [<$state 
Command>]>> for $state {
-                fn from(inner: $crate::stm::LeftRight<[<$state Inner>], 
[<$state Command>]>) -> Self {
-                    Self { inner }
-                }
-            }
-
-            impl From<[<$state Inner>]> for $state {
-                fn from(inner: [<$state Inner>]) -> Self {
-                    let left_right: $crate::stm::LeftRight<[<$state Inner>], 
[<$state Command>]> = inner.into();
-                    left_right.into()
-                }
-            }
-        }
-    };
-}
-
-/// Generates the command enum, parsing, dispatch, State, and Absorb for a 
state type.
-///
-/// # Generated items
-/// - `{$state}Command` enum with one variant per operation
-/// - `Command` impl for `{$state}Inner` (parses `Message<PrepareHeader>`)
-/// - `{$state}Inner::dispatch()` method (routes each variant to 
`StateHandler::apply()`)
-/// - `State` impl for `$state` wrapper
-/// - `Absorb<{$state}Command>` impl for `{$state}Inner`
-///
-/// # Requirements
-/// Each listed operation type must implement `StateHandler<{$state}Inner>`.
-#[macro_export]
-macro_rules! collect_handlers {
-    (
-        $state:ident {
-            $($operation:ident),* $(,)?
-        }
-    ) => {
-        paste::paste! {
-            #[derive(Debug, Clone)]
-            pub enum [<$state Command>] {
-                $(
-                    $operation($operation),
-                )*
-            }
-
-            impl $crate::stm::Command for [<$state Inner>] {
-                type Cmd = [<$state Command>];
-                type Input = 
::iggy_common::message::Message<::iggy_common::header::PrepareHeader>;
-
-                fn parse(input: Self::Input) -> Result<Self::Cmd, Self::Input> 
{
-                    use ::iggy_common::BytesSerializable;
-                    use ::iggy_common::header::Operation;
-
-                    match input.header().operation {
-                        $(
-                            Operation::$operation => {
-                                let body = input.body_bytes();
-                                Ok([<$state Command>]::$operation(
-                                    $operation::from_bytes(body).unwrap()
-                                ))
-                            },
-                        )*
-                        _ => Err(input),
-                    }
-                }
-            }
-
-            impl [<$state Inner>] {
-                fn dispatch(&mut self, cmd: &[<$state Command>]) {
-                    match cmd {
-                        $(
-                            [<$state Command>]::$operation(payload) => {
-                                $crate::stm::StateHandler::apply(payload, 
self);
-                            },
-                        )*
-                    }
-                }
-            }
-
-            impl $crate::stm::State for $state {
-                type Input = <[<$state Inner>] as $crate::stm::Command>::Input;
-                type Output = ();
-
-                fn apply(&self, input: Self::Input) -> Result<Self::Output, 
Self::Input> {
-                    let cmd = <[<$state Inner>] as 
$crate::stm::Command>::parse(input)?;
-                    self.inner.do_apply(cmd);
-                    Ok(())
-                }
-            }
-
-            impl left_right::Absorb<[<$state Command>]> for [<$state Inner>] {
-                fn absorb_first(&mut self, cmd: &mut [<$state Command>], 
_other: &Self) {
-                    self.dispatch(cmd);
-                }
-
-                fn absorb_second(&mut self, cmd: [<$state Command>], _other: 
&Self) {
-                    self.dispatch(&cmd);
-                }
-
-                fn sync_with(&mut self, first: &Self) {
-                    *self = first.clone();
-                }
-            }
-        }
-    };
-}
+pub use left_right::*;
+pub use traits::*;
diff --git a/core/metadata/src/stm/mux.rs b/core/metadata/src/stm/mux.rs
index f68d99bf7..45097b105 100644
--- a/core/metadata/src/stm/mux.rs
+++ b/core/metadata/src/stm/mux.rs
@@ -61,16 +61,14 @@ macro_rules! variadic {
     ($a:expr,  $( $b:tt )+) => ( ($a, variadic!( $( $b )* )) );
 }
 
-// TODO: Figure out how to get around the fact that we need to hardcode the 
Input/Output type for base case.
-// TODO: I think we could move the base case to the impl site of `State`, so 
this way we know the `Input` and `Output` types.
-// Base case of the recursive resolution.
+// Base case: panics if no state machine in the chain accepted the input.
 impl StateMachine for () {
     type Input = Message<PrepareHeader>;
-    // TODO: Make sure that the `Output` matches to the output type of the 
rest of list.
-    // TODO: Add a trait bound to the output that will allow us to get the 
response in bytes.
     type Output = ();
 
-    fn update(&self, _input: Self::Input) -> Self::Output {}
+    fn update(&self, _input: Self::Input) -> Self::Output {
+        panic!("unhandled command: no state machine accepted the input");
+    }
 }
 
 // Recursive case: process head and recurse on tail
@@ -91,22 +89,96 @@ where
     }
 }
 
+#[cfg(test)]
 mod tests {
+    use crate::stm::State;
+    use crate::stm::StateMachine;
+    use crate::stm::mux::MuxStateMachine;
+    use crate::stm::stream::{Streams, StreamsInner};
+    use crate::stm::user::{Users, UsersInner};
+    use bytes::{Bytes, BytesMut};
+    use iggy_common::BytesSerializable;
+    use iggy_common::header::{Command2, Operation, PrepareHeader};
+    use iggy_common::message::Message;
+    use std::mem;
+
+    fn build_prepare_message(operation: Operation, body: Bytes) -> 
Message<PrepareHeader> {
+        let header_size = mem::size_of::<PrepareHeader>();
+        let total_size = header_size + body.len();
+        let mut buffer = BytesMut::zeroed(total_size);
+
+        let header = bytemuck::from_bytes_mut::<PrepareHeader>(&mut 
buffer[..header_size]);
+        header.command = Command2::Prepare;
+        header.operation = operation;
+        header.size = total_size as u32;
+
+        buffer[header_size..].copy_from_slice(&body);
+        Message::<PrepareHeader>::from_bytes(buffer.freeze()).unwrap()
+    }
+
+    #[test]
+    fn create_stream_applies_and_is_readable() {
+        let streams: Streams = StreamsInner::new().into();
+
+        let cmd = iggy_common::create_stream::CreateStream {
+            name: "test".to_string(),
+        };
+        let msg = build_prepare_message(Operation::CreateStream, 
cmd.to_bytes());
+
+        let result = streams.apply(msg);
+        assert!(result.is_ok());
+
+        streams.with_state(|inner| {
+            assert_eq!(inner.items.len(), 1);
+            assert_eq!(inner.index.len(), 1);
+
+            let (id, stream) = inner.items.iter().next().unwrap();
+            assert_eq!(stream.name.as_ref(), "test");
+            assert_eq!(stream.id, id);
+            assert_eq!(inner.index.get("test"), Some(&id));
+        });
+    }
 
     #[test]
-    fn construct_mux_state_machine_from_states_with_same_output() {
-        use crate::stm::StateMachine;
-        use crate::stm::mux::MuxStateMachine;
-        use crate::stm::stream::{Streams, StreamsInner};
-        use crate::stm::user::{Users, UsersInner};
-        use iggy_common::header::PrepareHeader;
-        use iggy_common::message::Message;
+    fn create_user_applies_and_is_readable() {
+        let users: Users = UsersInner::new().into();
 
+        let cmd = iggy_common::create_user::CreateUser {
+            username: "admin".to_string(),
+            password: "secret123".to_string(),
+            status: iggy_common::UserStatus::Active,
+            permissions: None,
+        };
+        let msg = build_prepare_message(Operation::CreateUser, cmd.to_bytes());
+
+        let result = users.apply(msg);
+        assert!(result.is_ok());
+
+        users.with_state(|inner| {
+            assert_eq!(inner.items.len(), 1);
+            assert_eq!(inner.index.len(), 1);
+
+            let (id, user) = inner.items.iter().next().unwrap();
+            assert_eq!(user.username.as_ref(), "admin");
+            assert_eq!(user.id, id as iggy_common::UserId);
+            assert_eq!(user.status, iggy_common::UserStatus::Active);
+            assert_eq!(inner.index.get("admin"), Some(&(id as 
iggy_common::UserId)));
+            assert!(
+                inner
+                    .personal_access_tokens
+                    .contains_key(&(id as iggy_common::UserId))
+            );
+        });
+    }
+
+    #[test]
+    #[should_panic(expected = "unhandled command: no state machine accepted 
the input")]
+    fn unhandled_operation_falls_through() {
         let users: Users = UsersInner::new().into();
         let streams: Streams = StreamsInner::new().into();
         let mux = MuxStateMachine::new(variadic!(users, streams));
 
-        let input = Message::new(std::mem::size_of::<PrepareHeader>());
+        let input = Message::new(mem::size_of::<PrepareHeader>());
 
         mux.update(input);
     }
diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs
index 4f212cd20..f69f82199 100644
--- a/core/metadata/src/stm/stream.rs
+++ b/core/metadata/src/stm/stream.rs
@@ -232,7 +232,8 @@ impl StreamsInner {
 
 impl StateHandler for CreateStream {
     type State = StreamsInner;
-    fn apply(&self, state: &mut StreamsInner) {
+    type Output = ();
+    fn apply(&self, state: &mut StreamsInner, now: IggyTimestamp) -> 
Self::Output {
         let name_arc: Arc<str> = Arc::from(self.name.as_str());
         if state.index.contains_key(&name_arc) {
             return;
@@ -241,7 +242,7 @@ impl StateHandler for CreateStream {
         let stream = Stream {
             id: 0,
             name: name_arc.clone(),
-            created_at: iggy_common::IggyTimestamp::now(),
+            created_at: now,
             stats: Arc::new(StreamStats::default()),
             topics: Slab::new(),
             topic_index: AHashMap::default(),
@@ -257,7 +258,8 @@ impl StateHandler for CreateStream {
 
 impl StateHandler for UpdateStream {
     type State = StreamsInner;
-    fn apply(&self, state: &mut StreamsInner) {
+    type Output = ();
+    fn apply(&self, state: &mut StreamsInner, _now: IggyTimestamp) -> 
Self::Output {
         let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else {
             return;
         };
@@ -280,7 +282,8 @@ impl StateHandler for UpdateStream {
 
 impl StateHandler for DeleteStream {
     type State = StreamsInner;
-    fn apply(&self, state: &mut StreamsInner) {
+    type Output = ();
+    fn apply(&self, state: &mut StreamsInner, _now: IggyTimestamp) -> 
Self::Output {
         let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else {
             return;
         };
@@ -296,7 +299,8 @@ impl StateHandler for DeleteStream {
 
 impl StateHandler for PurgeStream {
     type State = StreamsInner;
-    fn apply(&self, _state: &mut StreamsInner) {
+    type Output = ();
+    fn apply(&self, _state: &mut StreamsInner, _now: IggyTimestamp) -> 
Self::Output {
         // TODO
         todo!();
     }
@@ -304,7 +308,8 @@ impl StateHandler for PurgeStream {
 
 impl StateHandler for CreateTopic {
     type State = StreamsInner;
-    fn apply(&self, state: &mut StreamsInner) {
+    type Output = ();
+    fn apply(&self, state: &mut StreamsInner, now: IggyTimestamp) -> 
Self::Output {
         let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else {
             return;
         };
@@ -320,7 +325,7 @@ impl StateHandler for CreateTopic {
         let topic = Topic {
             id: 0,
             name: name_arc.clone(),
-            created_at: iggy_common::IggyTimestamp::now(),
+            created_at: now,
             replication_factor: self.replication_factor.unwrap_or(1),
             message_expiry: self.message_expiry,
             compression_algorithm: self.compression_algorithm,
@@ -337,7 +342,7 @@ impl StateHandler for CreateTopic {
             for partition_id in 0..self.partitions_count as usize {
                 let partition = Partition {
                     id: partition_id,
-                    created_at: iggy_common::IggyTimestamp::now(),
+                    created_at: now,
                 };
                 topic.partitions.push(partition);
             }
@@ -349,7 +354,8 @@ impl StateHandler for CreateTopic {
 
 impl StateHandler for UpdateTopic {
     type State = StreamsInner;
-    fn apply(&self, state: &mut StreamsInner) {
+    type Output = ();
+    fn apply(&self, state: &mut StreamsInner, _now: IggyTimestamp) -> 
Self::Output {
         let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else {
             return;
         };
@@ -385,7 +391,8 @@ impl StateHandler for UpdateTopic {
 
 impl StateHandler for DeleteTopic {
     type State = StreamsInner;
-    fn apply(&self, state: &mut StreamsInner) {
+    type Output = ();
+    fn apply(&self, state: &mut StreamsInner, _now: IggyTimestamp) -> 
Self::Output {
         let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else {
             return;
         };
@@ -406,7 +413,8 @@ impl StateHandler for DeleteTopic {
 
 impl StateHandler for PurgeTopic {
     type State = StreamsInner;
-    fn apply(&self, _state: &mut StreamsInner) {
+    type Output = ();
+    fn apply(&self, _state: &mut StreamsInner, _now: IggyTimestamp) -> 
Self::Output {
         // TODO
         todo!();
     }
@@ -414,7 +422,8 @@ impl StateHandler for PurgeTopic {
 
 impl StateHandler for CreatePartitions {
     type State = StreamsInner;
-    fn apply(&self, state: &mut StreamsInner) {
+    type Output = ();
+    fn apply(&self, state: &mut StreamsInner, now: IggyTimestamp) -> 
Self::Output {
         let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else {
             return;
         };
@@ -434,7 +443,7 @@ impl StateHandler for CreatePartitions {
             let partition_id = current_partition_count + i;
             let partition = Partition {
                 id: partition_id,
-                created_at: iggy_common::IggyTimestamp::now(),
+                created_at: now,
             };
             topic.partitions.push(partition);
         }
@@ -443,7 +452,8 @@ impl StateHandler for CreatePartitions {
 
 impl StateHandler for DeletePartitions {
     type State = StreamsInner;
-    fn apply(&self, state: &mut StreamsInner) {
+    type Output = ();
+    fn apply(&self, state: &mut StreamsInner, _now: IggyTimestamp) -> 
Self::Output {
         let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else {
             return;
         };
diff --git a/core/metadata/src/stm/traits.rs b/core/metadata/src/stm/traits.rs
new file mode 100644
index 000000000..355420506
--- /dev/null
+++ b/core/metadata/src/stm/traits.rs
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use iggy_common::IggyTimestamp;
+
+/// Per-command handler for a given state type.
+/// Each command struct implements this for the state it mutates.
+///
+/// `now` is captured once before the command enters `left_right`, ensuring
+/// both copies see the same timestamp (deterministic replay).
+pub trait StateHandler {
+    type State;
+    type Output;
+    fn apply(&self, state: &mut Self::State, now: IggyTimestamp) -> 
Self::Output;
+}
+
+/// Parses type-erased input into a command. Macro-generated.
+/// Returns `Ok(cmd)` if applicable, `Err(input)` to pass ownership back.
+pub trait Command {
+    type Cmd;
+    type Input;
+    type Output;
+
+    fn parse(input: Self::Input) -> Result<Self::Cmd, Self::Input>;
+}
+
+/// Public interface for state machines.
+/// Returns `Ok(output)` if applicable, `Err(input)` to pass ownership back.
+pub trait State {
+    type Output;
+    type Input;
+
+    fn apply(&self, input: Self::Input) -> Result<Self::Output, Self::Input>;
+}
+
+pub trait StateMachine {
+    type Input;
+    type Output;
+    fn update(&self, input: Self::Input) -> Self::Output;
+}
diff --git a/core/metadata/src/stm/user.rs b/core/metadata/src/stm/user.rs
index 88bc178ea..95cb195fa 100644
--- a/core/metadata/src/stm/user.rs
+++ b/core/metadata/src/stm/user.rs
@@ -119,7 +119,8 @@ impl UsersInner {
 
 impl StateHandler for CreateUser {
     type State = UsersInner;
-    fn apply(&self, state: &mut UsersInner) {
+    type Output = ();
+    fn apply(&self, state: &mut UsersInner, now: IggyTimestamp) -> 
Self::Output {
         let username_arc: Arc<str> = Arc::from(self.username.as_str());
         if state.index.contains_key(&username_arc) {
             return;
@@ -130,7 +131,7 @@ impl StateHandler for CreateUser {
             username: username_arc.clone(),
             password_hash: Arc::from(self.password.as_str()),
             status: self.status,
-            created_at: iggy_common::IggyTimestamp::now(),
+            created_at: now,
             permissions: self.permissions.as_ref().map(|p| 
Arc::new(p.clone())),
         };
 
@@ -148,7 +149,8 @@ impl StateHandler for CreateUser {
 
 impl StateHandler for UpdateUser {
     type State = UsersInner;
-    fn apply(&self, state: &mut UsersInner) {
+    type Output = ();
+    fn apply(&self, state: &mut UsersInner, _now: IggyTimestamp) -> 
Self::Output {
         let Some(user_id) = state.resolve_user_id(&self.user_id) else {
             return;
         };
@@ -178,7 +180,8 @@ impl StateHandler for UpdateUser {
 
 impl StateHandler for DeleteUser {
     type State = UsersInner;
-    fn apply(&self, state: &mut UsersInner) {
+    type Output = ();
+    fn apply(&self, state: &mut UsersInner, _now: IggyTimestamp) -> 
Self::Output {
         let Some(user_id) = state.resolve_user_id(&self.user_id) else {
             return;
         };
@@ -194,7 +197,8 @@ impl StateHandler for DeleteUser {
 
 impl StateHandler for ChangePassword {
     type State = UsersInner;
-    fn apply(&self, state: &mut UsersInner) {
+    type Output = ();
+    fn apply(&self, state: &mut UsersInner, _now: IggyTimestamp) -> 
Self::Output {
         let Some(user_id) = state.resolve_user_id(&self.user_id) else {
             return;
         };
@@ -207,7 +211,8 @@ impl StateHandler for ChangePassword {
 
 impl StateHandler for UpdatePermissions {
     type State = UsersInner;
-    fn apply(&self, state: &mut UsersInner) {
+    type Output = ();
+    fn apply(&self, state: &mut UsersInner, _now: IggyTimestamp) -> 
Self::Output {
         let Some(user_id) = state.resolve_user_id(&self.user_id) else {
             return;
         };
@@ -220,7 +225,8 @@ impl StateHandler for UpdatePermissions {
 
 impl StateHandler for CreatePersonalAccessToken {
     type State = UsersInner;
-    fn apply(&self, state: &mut UsersInner) {
+    type Output = ();
+    fn apply(&self, state: &mut UsersInner, now: IggyTimestamp) -> 
Self::Output {
         // TODO: Stub until protocol gets adjusted.
         let user_id = 0;
         let user_tokens = 
state.personal_access_tokens.entry(user_id).or_default();
@@ -229,26 +235,22 @@ impl StateHandler for CreatePersonalAccessToken {
             return;
         }
 
-        let expiry_at = 
PersonalAccessToken::calculate_expiry_at(IggyTimestamp::now(), self.expiry);
+        let expiry_at = PersonalAccessToken::calculate_expiry_at(now, 
self.expiry);
         if let Some(expiry_at) = expiry_at
-            && expiry_at.as_micros() <= IggyTimestamp::now().as_micros()
+            && expiry_at.as_micros() <= now.as_micros()
         {
             return;
         }
 
-        let (pat, _) = PersonalAccessToken::new(
-            user_id,
-            self.name.as_ref(),
-            IggyTimestamp::now(),
-            self.expiry,
-        );
+        let (pat, _) = PersonalAccessToken::new(user_id, self.name.as_ref(), 
now, self.expiry);
         user_tokens.insert(name_arc, pat);
     }
 }
 
 impl StateHandler for DeletePersonalAccessToken {
     type State = UsersInner;
-    fn apply(&self, state: &mut UsersInner) {
+    type Output = ();
+    fn apply(&self, state: &mut UsersInner, _now: IggyTimestamp) -> 
Self::Output {
         // TODO: Stub until protocol gets adjusted.
         let user_id = 0;
 


Reply via email to