This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch stm-response
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 5cb2ffb9623ae0d21716dc53e4d0281f5942686b
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Tue Mar 3 11:58:13 2026 +0100

    feat(metadata): thread STM response into consensus Reply body
    
    After a metadata state machine commit, the Reply message
    had an empty body -- mux_stm.update() returned () and
    the result was discarded. Clients need the response
    (e.g. assigned IDs) to proceed without a second query.
    
    StateHandler::apply now returns Bytes, stored in a
    last_result field on each *Inner struct during
    left_right::Absorb (which forces () return). After
    publish(), the result is read back through ReadHandle
    and propagated through State -> StateMachine -> MuxSTM
    into build_reply_message, which now accepts a body
    parameter and constructs a variable-length BytesMut
    buffer instead of a header-only transmute.
    
    All 19 handlers return Bytes::new() for now.
---
 Cargo.lock                              |  5 ++
 core/consensus/Cargo.toml               |  2 +
 core/consensus/src/plane_helpers.rs     | 55 ++++++++++++---------
 core/metadata/Cargo.toml                |  1 +
 core/metadata/src/impls/metadata.rs     | 20 +++++---
 core/metadata/src/stm/consumer_group.rs | 13 +++--
 core/metadata/src/stm/mod.rs            | 18 ++++---
 core/metadata/src/stm/mux.rs            |  8 +--
 core/metadata/src/stm/stream.rs         | 86 +++++++++++++++++++--------------
 core/metadata/src/stm/user.rs           | 43 +++++++++++------
 core/partitions/Cargo.toml              |  1 +
 core/partitions/src/iggy_partitions.rs  |  3 +-
 core/shard/Cargo.toml                   |  1 +
 core/shard/src/lib.rs                   | 30 ++++++++++--
 14 files changed, 185 insertions(+), 101 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index bbd78bdf8..607aa1879 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2165,6 +2165,8 @@ name = "consensus"
 version = "0.1.0"
 dependencies = [
  "bit-set",
+ "bytemuck",
+ "bytes",
  "futures",
  "iggy_common",
  "message_bus",
@@ -6369,6 +6371,7 @@ name = "metadata"
 version = "0.1.0"
 dependencies = [
  "ahash 0.8.12",
+ "bytes",
  "consensus",
  "iggy_common",
  "journal",
@@ -7264,6 +7267,7 @@ dependencies = [
 name = "partitions"
 version = "0.1.0"
 dependencies = [
+ "bytes",
  "consensus",
  "iggy_common",
  "journal",
@@ -9361,6 +9365,7 @@ dependencies = [
 name = "shard"
 version = "0.1.0"
 dependencies = [
+ "bytes",
  "consensus",
  "iggy_common",
  "journal",
diff --git a/core/consensus/Cargo.toml b/core/consensus/Cargo.toml
index 7b433bfb2..70c6ff46b 100644
--- a/core/consensus/Cargo.toml
+++ b/core/consensus/Cargo.toml
@@ -29,6 +29,8 @@ readme = "../../../README.md"
 
 [dependencies]
 bit-set = { workspace = true }
+bytemuck = { workspace = true }
+bytes = { workspace = true }
 iggy_common = { workspace = true }
 message_bus = { workspace = true }
 rand = { workspace = true }
diff --git a/core/consensus/src/plane_helpers.rs 
b/core/consensus/src/plane_helpers.rs
index 8f8ea1275..f4ca92f3d 100644
--- a/core/consensus/src/plane_helpers.rs
+++ b/core/consensus/src/plane_helpers.rs
@@ -198,33 +198,44 @@ where
 pub fn build_reply_message<B, P>(
     consensus: &VsrConsensus<B, P>,
     prepare_header: &PrepareHeader,
+    body: bytes::Bytes,
 ) -> Message<ReplyHeader>
 where
     B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
     P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
 {
-    
Message::<ReplyHeader>::new(std::mem::size_of::<ReplyHeader>()).transmute_header(|_,
 new| {
-        *new = ReplyHeader {
-            checksum: 0,
-            checksum_body: 0,
-            cluster: consensus.cluster(),
-            size: std::mem::size_of::<ReplyHeader>() as u32,
-            view: consensus.view(),
-            release: 0,
-            command: Command2::Reply,
-            replica: consensus.replica(),
-            reserved_frame: [0; 66],
-            request_checksum: prepare_header.request_checksum,
-            context: 0,
-            op: prepare_header.op,
-            commit: consensus.commit(),
-            timestamp: prepare_header.timestamp,
-            request: prepare_header.request,
-            operation: prepare_header.operation,
-            namespace: prepare_header.namespace,
-            ..Default::default()
-        };
-    })
+    let header_size = std::mem::size_of::<ReplyHeader>();
+    let total_size = header_size + body.len();
+    let mut buffer = bytes::BytesMut::zeroed(total_size);
+
+    let header = bytemuck::from_bytes_mut::<ReplyHeader>(&mut 
buffer[..header_size]);
+    *header = ReplyHeader {
+        checksum: 0,
+        checksum_body: 0,
+        cluster: consensus.cluster(),
+        size: total_size as u32,
+        view: consensus.view(),
+        release: 0,
+        command: Command2::Reply,
+        replica: consensus.replica(),
+        reserved_frame: [0; 66],
+        request_checksum: prepare_header.request_checksum,
+        context: 0,
+        op: prepare_header.op,
+        commit: consensus.commit(),
+        timestamp: prepare_header.timestamp,
+        request: prepare_header.request,
+        operation: prepare_header.operation,
+        namespace: prepare_header.namespace,
+        ..Default::default()
+    };
+
+    if !body.is_empty() {
+        buffer[header_size..].copy_from_slice(&body);
+    }
+
+    Message::<ReplyHeader>::from_bytes(buffer.freeze())
+        .expect("build_reply_message: constructed header must be valid")
 }
 
 /// Verify hash chain would not break if we add this header.
diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml
index f5d81d5f8..fcad045bf 100644
--- a/core/metadata/Cargo.toml
+++ b/core/metadata/Cargo.toml
@@ -29,6 +29,7 @@ readme = "../../../README.md"
 
 [dependencies]
 ahash = { workspace = true }
+bytes = { workspace = true }
 consensus = { workspace = true }
 iggy_common = { workspace = true }
 journal = { workspace = true }
diff --git a/core/metadata/src/impls/metadata.rs 
b/core/metadata/src/impls/metadata.rs
index 64f9fe959..d76aaca9f 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -106,7 +106,11 @@ where
     P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
     J: JournalHandle,
     J::Target: Journal<J::Storage, Entry = Message<PrepareHeader>, Header = 
PrepareHeader>,
-    M: StateMachine<Input = Message<PrepareHeader>>,
+    M: StateMachine<
+            Input = Message<PrepareHeader>,
+            Output = bytes::Bytes,
+            Error = iggy_common::IggyError,
+        >,
 {
     async fn on_request(&self, message: <VsrConsensus<B, P> as 
Consensus>::Message<RequestHeader>) {
         let consensus = self.consensus.as_ref().unwrap();
@@ -225,13 +229,17 @@ where
                         )
                     });
 
-                // Apply the state (consumes prepare)
-                // TODO: Handle appending result to response
-                let _result = self.mux_stm.update(prepare);
+                let response = 
self.mux_stm.update(prepare).unwrap_or_else(|err| {
+                    warn!(
+                        "on_ack: state machine error for op={}: {err}",
+                        prepare_header.op
+                    );
+                    bytes::Bytes::new()
+                });
                 debug!("on_ack: state applied for op={}", prepare_header.op);
 
-                // Send reply to client
-                let generic_reply = build_reply_message(consensus, 
&prepare_header).into_generic();
+                let generic_reply =
+                    build_reply_message(consensus, &prepare_header, 
response).into_generic();
                 debug!(
                     "on_ack: sending reply to client={} for op={}",
                     prepare_header.client, prepare_header.op
diff --git a/core/metadata/src/stm/consumer_group.rs 
b/core/metadata/src/stm/consumer_group.rs
index 5c039e0dc..f9e42178f 100644
--- a/core/metadata/src/stm/consumer_group.rs
+++ b/core/metadata/src/stm/consumer_group.rs
@@ -18,6 +18,7 @@
 use crate::stm::StateHandler;
 use crate::stm::snapshot::Snapshotable;
 use crate::{collect_handlers, define_state, impl_fill_restore};
+use bytes::Bytes;
 
 use ahash::AHashMap;
 use iggy_common::create_consumer_group::CreateConsumerGroup;
@@ -166,12 +167,13 @@ impl ConsumerGroupsInner {
 // TODO: This is all a hack, we need to figure out how to do this in a way 
where `Identifier`
 // does not reach this stage of execution.
 
+// TODO(hubcio): Serialize proper reply (e.g. assigned group ID) instead of 
empty Bytes.
 impl StateHandler for CreateConsumerGroup {
     type State = ConsumerGroupsInner;
-    fn apply(&self, state: &mut ConsumerGroupsInner) {
+    fn apply(&self, state: &mut ConsumerGroupsInner) -> Bytes {
         let name: Arc<str> = Arc::from(self.name.as_str());
         if state.name_index.contains_key(&name) {
-            return;
+            return Bytes::new();
         }
 
         let group = ConsumerGroup::new(name.clone());
@@ -198,18 +200,19 @@ impl StateHandler for CreateConsumerGroup {
             let key = (Arc::from(s.as_str()), Arc::from(t.as_str()));
             state.topic_name_index.entry(key).or_default().push(id);
         }
+        Bytes::new()
     }
 }
 
 impl StateHandler for DeleteConsumerGroup {
     type State = ConsumerGroupsInner;
-    fn apply(&self, state: &mut ConsumerGroupsInner) {
+    fn apply(&self, state: &mut ConsumerGroupsInner) -> Bytes {
         let Some(id) = state.resolve_consumer_group_id_by_identifiers(
             &self.stream_id,
             &self.topic_id,
             &self.group_id,
         ) else {
-            return;
+            return Bytes::new();
         };
 
         let group = state.items.remove(id);
@@ -232,6 +235,7 @@ impl StateHandler for DeleteConsumerGroup {
                 vec.retain(|&x| x != id);
             }
         }
+        Bytes::new()
     }
 }
 
@@ -367,6 +371,7 @@ impl Snapshotable for ConsumerGroups {
             topic_index,
             topic_name_index,
             items,
+            last_result: None,
         };
         Ok(inner.into())
     }
diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs
index 7de1992d0..82aed80b1 100644
--- a/core/metadata/src/stm/mod.rs
+++ b/core/metadata/src/stm/mod.rs
@@ -21,6 +21,7 @@ pub mod snapshot;
 pub mod stream;
 pub mod user;
 
+use bytes::Bytes;
 use iggy_common::Either;
 use left_right::*;
 use std::cell::UnsafeCell;
@@ -78,9 +79,10 @@ pub trait Command {
 
 /// Per-command handler for a given state type.
 /// Each command struct implements this for the state it mutates.
+/// Returns a `Bytes` response that will be threaded into the Reply message.
 pub trait StateHandler {
     type State;
-    fn apply(&self, state: &mut Self::State);
+    fn apply(&self, state: &mut Self::State) -> Bytes;
 }
 
 #[derive(Debug)]
@@ -171,6 +173,7 @@ macro_rules! define_state {
                 $(
                     pub $field_name: $field_type,
                 )*
+                pub last_result: Option<bytes::Bytes>,
             }
 
             impl [<$state Inner>] {
@@ -251,19 +254,19 @@ macro_rules! collect_handlers {
 
             impl [<$state Inner>] {
                 fn dispatch(&mut self, cmd: &[<$state Command>]) {
-                    match cmd {
+                    self.last_result = Some(match cmd {
                         $(
                             [<$state Command>]::$operation(payload) => {
-                                $crate::stm::StateHandler::apply(payload, 
self);
+                                $crate::stm::StateHandler::apply(payload, self)
                             },
                         )*
-                    }
+                    });
                 }
             }
 
             impl $crate::stm::State for $state {
                 type Input = <[<$state Inner>] as $crate::stm::Command>::Input;
-                type Output = ();
+                type Output = bytes::Bytes;
                 type Error = <[<$state Inner>] as $crate::stm::Command>::Error;
 
                 fn apply(&self, input: Self::Input) -> 
Result<::iggy_common::Either<Self::Output, Self::Input>, Self::Error> {
@@ -272,7 +275,10 @@ macro_rules! collect_handlers {
                     match <[<$state Inner>] as 
$crate::stm::Command>::parse(input)? {
                         Either::Left(cmd) => {
                             self.inner.do_apply(cmd);
-                            Ok(Either::Left(()))
+                            let result = self.inner.read(|state| {
+                                state.last_result.clone().unwrap_or_default()
+                            });
+                            Ok(Either::Left(result))
                         }
                         Either::Right(input) => {
                             Ok(Either::Right(input))
diff --git a/core/metadata/src/stm/mux.rs b/core/metadata/src/stm/mux.rs
index 648d8893e..0f6d39dcc 100644
--- a/core/metadata/src/stm/mux.rs
+++ b/core/metadata/src/stm/mux.rs
@@ -53,18 +53,14 @@ where
     }
 }
 
-// TODO: Figure out how to get around the fact that we need to hardcode the 
Input/Output type for base case.
-// TODO: I think we could move the base case to the impl site of `State`, so 
this way we know the `Input` and `Output` types.
 // Base case of the recursive resolution.
 impl StateMachine for () {
     type Input = Message<PrepareHeader>;
-    // TODO: Make sure that the `Output` matches to the output type of the 
rest of list.
-    // TODO: Add a trait bound to the output that will allow us to get the 
response in bytes.
-    type Output = ();
+    type Output = bytes::Bytes;
     type Error = iggy_common::IggyError;
 
     fn update(&self, _input: Self::Input) -> Result<Self::Output, Self::Error> 
{
-        Ok(())
+        Ok(bytes::Bytes::new())
     }
 }
 
diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs
index 7e4c95104..9bc6f4668 100644
--- a/core/metadata/src/stm/stream.rs
+++ b/core/metadata/src/stm/stream.rs
@@ -20,6 +20,7 @@ use crate::stm::StateHandler;
 use crate::stm::snapshot::Snapshotable;
 use crate::{collect_handlers, define_state, impl_fill_restore};
 use ahash::AHashMap;
+use bytes::Bytes;
 use iggy_common::create_partitions::CreatePartitions;
 use iggy_common::create_stream::CreateStream;
 use iggy_common::create_topic::CreateTopic;
@@ -272,12 +273,13 @@ impl StreamsInner {
     }
 }
 
+// TODO(hubcio): Serialize proper reply (e.g. assigned stream ID) instead of 
empty Bytes.
 impl StateHandler for CreateStream {
     type State = StreamsInner;
-    fn apply(&self, state: &mut StreamsInner) {
+    fn apply(&self, state: &mut StreamsInner) -> Bytes {
         let name_arc: Arc<str> = Arc::from(self.name.as_str());
         if state.index.contains_key(&name_arc) {
-            return;
+            return Bytes::new();
         }
 
         let stream = Stream {
@@ -294,37 +296,39 @@ impl StateHandler for CreateStream {
             stream.id = id;
         }
         state.index.insert(name_arc, id);
+        Bytes::new()
     }
 }
 
 impl StateHandler for UpdateStream {
     type State = StreamsInner;
-    fn apply(&self, state: &mut StreamsInner) {
+    fn apply(&self, state: &mut StreamsInner) -> Bytes {
         let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else {
-            return;
+            return Bytes::new();
         };
         let Some(stream) = state.items.get_mut(stream_id) else {
-            return;
+            return Bytes::new();
         };
 
         let new_name_arc: Arc<str> = Arc::from(self.name.as_str());
         if let Some(&existing_id) = state.index.get(&new_name_arc)
             && existing_id != stream_id
         {
-            return;
+            return Bytes::new();
         }
 
         state.index.remove(&stream.name);
         stream.name = new_name_arc.clone();
         state.index.insert(new_name_arc, stream_id);
+        Bytes::new()
     }
 }
 
 impl StateHandler for DeleteStream {
     type State = StreamsInner;
-    fn apply(&self, state: &mut StreamsInner) {
+    fn apply(&self, state: &mut StreamsInner) -> Bytes {
         let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else {
-            return;
+            return Bytes::new();
         };
 
         if let Some(stream) = state.items.get(stream_id) {
@@ -333,30 +337,32 @@ impl StateHandler for DeleteStream {
             state.items.remove(stream_id);
             state.index.remove(&name);
         }
+        Bytes::new()
     }
 }
 
 impl StateHandler for PurgeStream {
     type State = StreamsInner;
-    fn apply(&self, _state: &mut StreamsInner) {
+    fn apply(&self, _state: &mut StreamsInner) -> Bytes {
         // TODO
         todo!();
     }
 }
 
+// TODO(hubcio): Serialize proper reply (e.g. assigned topic ID) instead of 
empty Bytes.
 impl StateHandler for CreateTopic {
     type State = StreamsInner;
-    fn apply(&self, state: &mut StreamsInner) {
+    fn apply(&self, state: &mut StreamsInner) -> Bytes {
         let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else {
-            return;
+            return Bytes::new();
         };
         let Some(stream) = state.items.get_mut(stream_id) else {
-            return;
+            return Bytes::new();
         };
 
         let name_arc: Arc<str> = Arc::from(self.name.as_str());
         if stream.topic_index.contains_key(&name_arc) {
-            return;
+            return Bytes::new();
         }
 
         let topic = Topic {
@@ -386,31 +392,32 @@ impl StateHandler for CreateTopic {
         }
 
         stream.topic_index.insert(name_arc, topic_id);
+        Bytes::new()
     }
 }
 
 impl StateHandler for UpdateTopic {
     type State = StreamsInner;
-    fn apply(&self, state: &mut StreamsInner) {
+    fn apply(&self, state: &mut StreamsInner) -> Bytes {
         let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else {
-            return;
+            return Bytes::new();
         };
         let Some(topic_id) = state.resolve_topic_id(stream_id, &self.topic_id) 
else {
-            return;
+            return Bytes::new();
         };
 
         let Some(stream) = state.items.get_mut(stream_id) else {
-            return;
+            return Bytes::new();
         };
         let Some(topic) = stream.topics.get_mut(topic_id) else {
-            return;
+            return Bytes::new();
         };
 
         let new_name_arc: Arc<str> = Arc::from(self.name.as_str());
         if let Some(&existing_id) = stream.topic_index.get(&new_name_arc)
             && existing_id != topic_id
         {
-            return;
+            return Bytes::new();
         }
 
         stream.topic_index.remove(&topic.name);
@@ -422,20 +429,21 @@ impl StateHandler for UpdateTopic {
             topic.replication_factor = rf;
         }
         stream.topic_index.insert(new_name_arc, topic_id);
+        Bytes::new()
     }
 }
 
 impl StateHandler for DeleteTopic {
     type State = StreamsInner;
-    fn apply(&self, state: &mut StreamsInner) {
+    fn apply(&self, state: &mut StreamsInner) -> Bytes {
         let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else {
-            return;
+            return Bytes::new();
         };
         let Some(topic_id) = state.resolve_topic_id(stream_id, &self.topic_id) 
else {
-            return;
+            return Bytes::new();
         };
         let Some(stream) = state.items.get_mut(stream_id) else {
-            return;
+            return Bytes::new();
         };
 
         if let Some(topic) = stream.topics.get(topic_id) {
@@ -443,32 +451,34 @@ impl StateHandler for DeleteTopic {
             stream.topics.remove(topic_id);
             stream.topic_index.remove(&name);
         }
+        Bytes::new()
     }
 }
 
 impl StateHandler for PurgeTopic {
     type State = StreamsInner;
-    fn apply(&self, _state: &mut StreamsInner) {
+    fn apply(&self, _state: &mut StreamsInner) -> Bytes {
         // TODO
         todo!();
     }
 }
 
+// TODO(hubcio): Serialize proper reply (e.g. assigned partition IDs) instead 
of empty Bytes.
 impl StateHandler for CreatePartitions {
     type State = StreamsInner;
-    fn apply(&self, state: &mut StreamsInner) {
+    fn apply(&self, state: &mut StreamsInner) -> Bytes {
         let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else {
-            return;
+            return Bytes::new();
         };
         let Some(topic_id) = state.resolve_topic_id(stream_id, &self.topic_id) 
else {
-            return;
+            return Bytes::new();
         };
 
         let Some(stream) = state.items.get_mut(stream_id) else {
-            return;
+            return Bytes::new();
         };
         let Some(topic) = stream.topics.get_mut(topic_id) else {
-            return;
+            return Bytes::new();
         };
 
         let current_partition_count = topic.partitions.len();
@@ -480,24 +490,25 @@ impl StateHandler for CreatePartitions {
             };
             topic.partitions.push(partition);
         }
+        Bytes::new()
     }
 }
 
 impl StateHandler for DeletePartitions {
     type State = StreamsInner;
-    fn apply(&self, state: &mut StreamsInner) {
+    fn apply(&self, state: &mut StreamsInner) -> Bytes {
         let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else {
-            return;
+            return Bytes::new();
         };
         let Some(topic_id) = state.resolve_topic_id(stream_id, &self.topic_id) 
else {
-            return;
+            return Bytes::new();
         };
 
         let Some(stream) = state.items.get_mut(stream_id) else {
-            return;
+            return Bytes::new();
         };
         let Some(topic) = stream.topics.get_mut(topic_id) else {
-            return;
+            return Bytes::new();
         };
 
         let count_to_delete = self.partitions_count as usize;
@@ -506,6 +517,7 @@ impl StateHandler for DeletePartitions {
                 .partitions
                 .truncate(topic.partitions.len() - count_to_delete);
         }
+        Bytes::new()
     }
 }
 
@@ -646,7 +658,11 @@ impl Snapshotable for Streams {
         }
 
         let items: Slab<Stream> = stream_entries.into_iter().collect();
-        let inner = StreamsInner { index, items };
+        let inner = StreamsInner {
+            index,
+            items,
+            last_result: None,
+        };
         Ok(inner.into())
     }
 }
diff --git a/core/metadata/src/stm/user.rs b/core/metadata/src/stm/user.rs
index 14f71f3af..8e2c7897e 100644
--- a/core/metadata/src/stm/user.rs
+++ b/core/metadata/src/stm/user.rs
@@ -20,6 +20,7 @@ use crate::stm::StateHandler;
 use crate::stm::snapshot::Snapshotable;
 use crate::{collect_handlers, define_state, impl_fill_restore};
 use ahash::AHashMap;
+use bytes::Bytes;
 use iggy_common::change_password::ChangePassword;
 use iggy_common::create_personal_access_token::CreatePersonalAccessToken;
 use iggy_common::create_user::CreateUser;
@@ -122,12 +123,13 @@ impl UsersInner {
     }
 }
 
+// TODO(hubcio): Serialize proper reply (e.g. assigned user ID) instead of 
empty Bytes.
 impl StateHandler for CreateUser {
     type State = UsersInner;
-    fn apply(&self, state: &mut UsersInner) {
+    fn apply(&self, state: &mut UsersInner) -> Bytes {
         let username_arc: Arc<str> = Arc::from(self.username.as_str());
         if state.index.contains_key(&username_arc) {
-            return;
+            return Bytes::new();
         }
 
         let user = User {
@@ -148,18 +150,19 @@ impl StateHandler for CreateUser {
         state
             .personal_access_tokens
             .insert(id as UserId, AHashMap::default());
+        Bytes::new()
     }
 }
 
 impl StateHandler for UpdateUser {
     type State = UsersInner;
-    fn apply(&self, state: &mut UsersInner) {
+    fn apply(&self, state: &mut UsersInner) -> Bytes {
         let Some(user_id) = state.resolve_user_id(&self.user_id) else {
-            return;
+            return Bytes::new();
         };
 
         let Some(user) = state.items.get_mut(user_id) else {
-            return;
+            return Bytes::new();
         };
 
         if let Some(new_username) = &self.username {
@@ -167,7 +170,7 @@ impl StateHandler for UpdateUser {
             if let Some(&existing_id) = state.index.get(&new_username_arc)
                 && existing_id != user_id as UserId
             {
-                return;
+                return Bytes::new();
             }
 
             state.index.remove(&user.username);
@@ -178,14 +181,15 @@ impl StateHandler for UpdateUser {
         if let Some(new_status) = self.status {
             user.status = new_status;
         }
+        Bytes::new()
     }
 }
 
 impl StateHandler for DeleteUser {
     type State = UsersInner;
-    fn apply(&self, state: &mut UsersInner) {
+    fn apply(&self, state: &mut UsersInner) -> Bytes {
         let Some(user_id) = state.resolve_user_id(&self.user_id) else {
-            return;
+            return Bytes::new();
         };
 
         if let Some(user) = state.items.get(user_id) {
@@ -194,51 +198,55 @@ impl StateHandler for DeleteUser {
             state.index.remove(&username);
             state.personal_access_tokens.remove(&(user_id as UserId));
         }
+        Bytes::new()
     }
 }
 
 impl StateHandler for ChangePassword {
     type State = UsersInner;
-    fn apply(&self, state: &mut UsersInner) {
+    fn apply(&self, state: &mut UsersInner) -> Bytes {
         let Some(user_id) = state.resolve_user_id(&self.user_id) else {
-            return;
+            return Bytes::new();
         };
 
         if let Some(user) = state.items.get_mut(user_id) {
             user.password_hash = Arc::from(self.new_password.as_str());
         }
+        Bytes::new()
     }
 }
 
 impl StateHandler for UpdatePermissions {
     type State = UsersInner;
-    fn apply(&self, state: &mut UsersInner) {
+    fn apply(&self, state: &mut UsersInner) -> Bytes {
         let Some(user_id) = state.resolve_user_id(&self.user_id) else {
-            return;
+            return Bytes::new();
         };
 
         if let Some(user) = state.items.get_mut(user_id) {
             user.permissions = self.permissions.as_ref().map(|p| 
Arc::new(p.clone()));
         }
+        Bytes::new()
     }
 }
 
+// TODO(hubcio): Serialize proper reply (e.g. generated token) instead of 
empty Bytes.
 impl StateHandler for CreatePersonalAccessToken {
     type State = UsersInner;
-    fn apply(&self, state: &mut UsersInner) {
+    fn apply(&self, state: &mut UsersInner) -> Bytes {
         // TODO: Stub until protocol gets adjusted.
         let user_id = 0;
         let user_tokens = 
state.personal_access_tokens.entry(user_id).or_default();
         let name_arc: Arc<str> = Arc::from(self.name.as_str());
         if user_tokens.contains_key(&name_arc) {
-            return;
+            return Bytes::new();
         }
 
         let expiry_at = 
PersonalAccessToken::calculate_expiry_at(IggyTimestamp::now(), self.expiry);
         if let Some(expiry_at) = expiry_at
             && expiry_at.as_micros() <= IggyTimestamp::now().as_micros()
         {
-            return;
+            return Bytes::new();
         }
 
         let (pat, _) = PersonalAccessToken::new(
@@ -248,12 +256,13 @@ impl StateHandler for CreatePersonalAccessToken {
             self.expiry,
         );
         user_tokens.insert(name_arc, pat);
+        Bytes::new()
     }
 }
 
 impl StateHandler for DeletePersonalAccessToken {
     type State = UsersInner;
-    fn apply(&self, state: &mut UsersInner) {
+    fn apply(&self, state: &mut UsersInner) -> Bytes {
         // TODO: Stub until protocol gets adjusted.
         let user_id = 0;
 
@@ -261,6 +270,7 @@ impl StateHandler for DeletePersonalAccessToken {
             let name_arc: Arc<str> = Arc::from(self.name.as_str());
             user_tokens.remove(&name_arc);
         }
+        Bytes::new()
     }
 }
 
@@ -473,6 +483,7 @@ impl Snapshotable for Users {
             items,
             personal_access_tokens,
             permissioner,
+            last_result: None,
         };
         Ok(inner.into())
     }
diff --git a/core/partitions/Cargo.toml b/core/partitions/Cargo.toml
index 6940fb323..d7fadd93b 100644
--- a/core/partitions/Cargo.toml
+++ b/core/partitions/Cargo.toml
@@ -28,6 +28,7 @@ repository = "https://github.com/apache/iggy";
 readme = "../../README.md"
 
 [dependencies]
+bytes = { workspace = true }
 consensus = { workspace = true }
 iggy_common = { workspace = true }
 journal = { workspace = true }
diff --git a/core/partitions/src/iggy_partitions.rs 
b/core/partitions/src/iggy_partitions.rs
index f05b294db..305d475cb 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -496,7 +496,8 @@ where
                 }
             }
 
-            let generic_reply = build_reply_message(consensus, 
&prepare_header).into_generic();
+            let generic_reply =
+                build_reply_message(consensus, &prepare_header, 
bytes::Bytes::new()).into_generic();
             debug!(
                 "on_ack: sending reply to client={} for op={}",
                 prepare_header.client, prepare_header.op
diff --git a/core/shard/Cargo.toml b/core/shard/Cargo.toml
index c5b7621fd..0e0cdceb5 100644
--- a/core/shard/Cargo.toml
+++ b/core/shard/Cargo.toml
@@ -21,6 +21,7 @@ version = "0.1.0"
 edition = "2024"
 
 [dependencies]
+bytes = { workspace = true }
 consensus = { path = "../consensus" }
 iggy_common = { path = "../common" }
 journal = { path = "../journal" }
diff --git a/core/shard/src/lib.rs b/core/shard/src/lib.rs
index b51286d3f..196615525 100644
--- a/core/shard/src/lib.rs
+++ b/core/shard/src/lib.rs
@@ -71,7 +71,11 @@ where
                 Entry = Message<PrepareHeader>,
                 Header = PrepareHeader,
             >,
-        M: StateMachine<Input = Message<PrepareHeader>>,
+        M: StateMachine<
+                Input = Message<PrepareHeader>,
+                Output = bytes::Bytes,
+                Error = iggy_common::IggyError,
+            >,
     {
         match MessageBag::from(message) {
             MessageBag::Request(request) => self.on_request(request).await,
@@ -89,7 +93,11 @@ where
                 Entry = Message<PrepareHeader>,
                 Header = PrepareHeader,
             >,
-        M: StateMachine<Input = Message<PrepareHeader>>,
+        M: StateMachine<
+                Input = Message<PrepareHeader>,
+                Output = bytes::Bytes,
+                Error = iggy_common::IggyError,
+            >,
     {
         let planes = self.plane.inner();
         if planes.0.is_applicable(&request) {
@@ -108,7 +116,11 @@ where
                 Entry = Message<PrepareHeader>,
                 Header = PrepareHeader,
             >,
-        M: StateMachine<Input = Message<PrepareHeader>>,
+        M: StateMachine<
+                Input = Message<PrepareHeader>,
+                Output = bytes::Bytes,
+                Error = iggy_common::IggyError,
+            >,
     {
         let planes = self.plane.inner();
         if planes.0.is_applicable(&prepare) {
@@ -127,7 +139,11 @@ where
                 Entry = Message<PrepareHeader>,
                 Header = PrepareHeader,
             >,
-        M: StateMachine<Input = Message<PrepareHeader>>,
+        M: StateMachine<
+                Input = Message<PrepareHeader>,
+                Output = bytes::Bytes,
+                Error = iggy_common::IggyError,
+            >,
     {
         let planes = self.plane.inner();
         if planes.0.is_applicable(&prepare_ok) {
@@ -154,7 +170,11 @@ where
                 Entry = Message<PrepareHeader>,
                 Header = PrepareHeader,
             >,
-        M: StateMachine<Input = Message<PrepareHeader>>,
+        M: StateMachine<
+                Input = Message<PrepareHeader>,
+                Output = bytes::Bytes,
+                Error = iggy_common::IggyError,
+            >,
     {
         debug_assert!(buf.is_empty(), "buf must be empty on entry");
 

Reply via email to