This is an automated email from the ASF dual-hosted git repository.
maciej pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 553123e78 feat(metadata): thread STM response into consensus Reply
body (#2856)
553123e78 is described below
commit 553123e7888794a50185f4cb93ec2df83c5a24a7
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Thu Mar 5 10:30:21 2026 +0100
feat(metadata): thread STM response into consensus Reply body (#2856)
After a metadata state machine commit, the Reply message
had an empty body -- mux_stm.update() returned () and
the result was discarded. Clients need the response
(e.g. assigned IDs) to proceed without a second query.
StateHandler::apply now returns Bytes, stored in a
last_result field on each *Inner struct during
left_right::Absorb (which forces () return). After
publish(), the result is read back through ReadHandle
and propagated through State -> StateMachine -> MuxSTM
into build_reply_message, which now accepts a body
parameter and constructs a variable-length BytesMut
buffer instead of a header-only transmute.
All 19 handlers return Bytes::new() for now.
---
Cargo.lock | 5 ++
core/consensus/Cargo.toml | 2 +
core/consensus/src/plane_helpers.rs | 55 ++++++++++++---------
core/metadata/Cargo.toml | 1 +
core/metadata/src/impls/metadata.rs | 20 +++++---
core/metadata/src/stm/consumer_group.rs | 13 +++--
core/metadata/src/stm/mod.rs | 18 ++++---
core/metadata/src/stm/mux.rs | 8 +--
core/metadata/src/stm/stream.rs | 86 +++++++++++++++++++--------------
core/metadata/src/stm/user.rs | 43 +++++++++++------
core/partitions/Cargo.toml | 1 +
core/partitions/src/iggy_partitions.rs | 3 +-
core/shard/Cargo.toml | 1 +
core/shard/src/lib.rs | 30 ++++++++++--
core/shard/src/router.rs | 12 ++++-
15 files changed, 195 insertions(+), 103 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index e54c38866..c0f9957b6 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2165,6 +2165,8 @@ name = "consensus"
version = "0.1.0"
dependencies = [
"bit-set",
+ "bytemuck",
+ "bytes",
"futures",
"iggy_common",
"message_bus",
@@ -6381,6 +6383,7 @@ name = "metadata"
version = "0.1.0"
dependencies = [
"ahash 0.8.12",
+ "bytes",
"consensus",
"iggy_common",
"journal",
@@ -7276,6 +7279,7 @@ dependencies = [
name = "partitions"
version = "0.1.0"
dependencies = [
+ "bytes",
"consensus",
"iggy_common",
"journal",
@@ -9373,6 +9377,7 @@ dependencies = [
name = "shard"
version = "0.1.0"
dependencies = [
+ "bytes",
"consensus",
"crossfire",
"futures",
diff --git a/core/consensus/Cargo.toml b/core/consensus/Cargo.toml
index 7b433bfb2..70c6ff46b 100644
--- a/core/consensus/Cargo.toml
+++ b/core/consensus/Cargo.toml
@@ -29,6 +29,8 @@ readme = "../../../README.md"
[dependencies]
bit-set = { workspace = true }
+bytemuck = { workspace = true }
+bytes = { workspace = true }
iggy_common = { workspace = true }
message_bus = { workspace = true }
rand = { workspace = true }
diff --git a/core/consensus/src/plane_helpers.rs
b/core/consensus/src/plane_helpers.rs
index 8f8ea1275..f4ca92f3d 100644
--- a/core/consensus/src/plane_helpers.rs
+++ b/core/consensus/src/plane_helpers.rs
@@ -198,33 +198,44 @@ where
pub fn build_reply_message<B, P>(
consensus: &VsrConsensus<B, P>,
prepare_header: &PrepareHeader,
+ body: bytes::Bytes,
) -> Message<ReplyHeader>
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
{
-
Message::<ReplyHeader>::new(std::mem::size_of::<ReplyHeader>()).transmute_header(|_,
new| {
- *new = ReplyHeader {
- checksum: 0,
- checksum_body: 0,
- cluster: consensus.cluster(),
- size: std::mem::size_of::<ReplyHeader>() as u32,
- view: consensus.view(),
- release: 0,
- command: Command2::Reply,
- replica: consensus.replica(),
- reserved_frame: [0; 66],
- request_checksum: prepare_header.request_checksum,
- context: 0,
- op: prepare_header.op,
- commit: consensus.commit(),
- timestamp: prepare_header.timestamp,
- request: prepare_header.request,
- operation: prepare_header.operation,
- namespace: prepare_header.namespace,
- ..Default::default()
- };
- })
+ let header_size = std::mem::size_of::<ReplyHeader>();
+ let total_size = header_size + body.len();
+ let mut buffer = bytes::BytesMut::zeroed(total_size);
+
+ let header = bytemuck::from_bytes_mut::<ReplyHeader>(&mut
buffer[..header_size]);
+ *header = ReplyHeader {
+ checksum: 0,
+ checksum_body: 0,
+ cluster: consensus.cluster(),
+ size: total_size as u32,
+ view: consensus.view(),
+ release: 0,
+ command: Command2::Reply,
+ replica: consensus.replica(),
+ reserved_frame: [0; 66],
+ request_checksum: prepare_header.request_checksum,
+ context: 0,
+ op: prepare_header.op,
+ commit: consensus.commit(),
+ timestamp: prepare_header.timestamp,
+ request: prepare_header.request,
+ operation: prepare_header.operation,
+ namespace: prepare_header.namespace,
+ ..Default::default()
+ };
+
+ if !body.is_empty() {
+ buffer[header_size..].copy_from_slice(&body);
+ }
+
+ Message::<ReplyHeader>::from_bytes(buffer.freeze())
+ .expect("build_reply_message: constructed header must be valid")
}
/// Verify hash chain would not break if we add this header.
diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml
index f5d81d5f8..fcad045bf 100644
--- a/core/metadata/Cargo.toml
+++ b/core/metadata/Cargo.toml
@@ -29,6 +29,7 @@ readme = "../../../README.md"
[dependencies]
ahash = { workspace = true }
+bytes = { workspace = true }
consensus = { workspace = true }
iggy_common = { workspace = true }
journal = { workspace = true }
diff --git a/core/metadata/src/impls/metadata.rs
b/core/metadata/src/impls/metadata.rs
index f5502ce06..41cb21296 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -104,7 +104,11 @@ where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
J: JournalHandle,
J::Target: Journal<J::Storage, Entry = Message<PrepareHeader>, Header =
PrepareHeader>,
- M: StateMachine<Input = Message<PrepareHeader>>,
+ M: StateMachine<
+ Input = Message<PrepareHeader>,
+ Output = bytes::Bytes,
+ Error = iggy_common::IggyError,
+ >,
{
async fn on_request(&self, message: <VsrConsensus<B> as
Consensus>::Message<RequestHeader>) {
let consensus = self.consensus.as_ref().unwrap();
@@ -220,13 +224,17 @@ where
)
});
- // Apply the state (consumes prepare)
- // TODO: Handle appending result to response
- let _result = self.mux_stm.update(prepare);
+ let response =
self.mux_stm.update(prepare).unwrap_or_else(|err| {
+ warn!(
+ "on_ack: state machine error for op={}: {err}",
+ prepare_header.op
+ );
+ bytes::Bytes::new()
+ });
debug!("on_ack: state applied for op={}", prepare_header.op);
- // Send reply to client
- let generic_reply = build_reply_message(consensus,
&prepare_header).into_generic();
+ let generic_reply =
+ build_reply_message(consensus, &prepare_header,
response).into_generic();
debug!(
"on_ack: sending reply to client={} for op={}",
prepare_header.client, prepare_header.op
diff --git a/core/metadata/src/stm/consumer_group.rs
b/core/metadata/src/stm/consumer_group.rs
index 5c039e0dc..f9e42178f 100644
--- a/core/metadata/src/stm/consumer_group.rs
+++ b/core/metadata/src/stm/consumer_group.rs
@@ -18,6 +18,7 @@
use crate::stm::StateHandler;
use crate::stm::snapshot::Snapshotable;
use crate::{collect_handlers, define_state, impl_fill_restore};
+use bytes::Bytes;
use ahash::AHashMap;
use iggy_common::create_consumer_group::CreateConsumerGroup;
@@ -166,12 +167,13 @@ impl ConsumerGroupsInner {
// TODO: This is all a hack, we need to figure out how to do this in a way
where `Identifier`
// does not reach this stage of execution.
+// TODO(hubcio): Serialize proper reply (e.g. assigned group ID) instead of
empty Bytes.
impl StateHandler for CreateConsumerGroup {
type State = ConsumerGroupsInner;
- fn apply(&self, state: &mut ConsumerGroupsInner) {
+ fn apply(&self, state: &mut ConsumerGroupsInner) -> Bytes {
let name: Arc<str> = Arc::from(self.name.as_str());
if state.name_index.contains_key(&name) {
- return;
+ return Bytes::new();
}
let group = ConsumerGroup::new(name.clone());
@@ -198,18 +200,19 @@ impl StateHandler for CreateConsumerGroup {
let key = (Arc::from(s.as_str()), Arc::from(t.as_str()));
state.topic_name_index.entry(key).or_default().push(id);
}
+ Bytes::new()
}
}
impl StateHandler for DeleteConsumerGroup {
type State = ConsumerGroupsInner;
- fn apply(&self, state: &mut ConsumerGroupsInner) {
+ fn apply(&self, state: &mut ConsumerGroupsInner) -> Bytes {
let Some(id) = state.resolve_consumer_group_id_by_identifiers(
&self.stream_id,
&self.topic_id,
&self.group_id,
) else {
- return;
+ return Bytes::new();
};
let group = state.items.remove(id);
@@ -232,6 +235,7 @@ impl StateHandler for DeleteConsumerGroup {
vec.retain(|&x| x != id);
}
}
+ Bytes::new()
}
}
@@ -367,6 +371,7 @@ impl Snapshotable for ConsumerGroups {
topic_index,
topic_name_index,
items,
+ last_result: None,
};
Ok(inner.into())
}
diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs
index 7de1992d0..82aed80b1 100644
--- a/core/metadata/src/stm/mod.rs
+++ b/core/metadata/src/stm/mod.rs
@@ -21,6 +21,7 @@ pub mod snapshot;
pub mod stream;
pub mod user;
+use bytes::Bytes;
use iggy_common::Either;
use left_right::*;
use std::cell::UnsafeCell;
@@ -78,9 +79,10 @@ pub trait Command {
/// Per-command handler for a given state type.
/// Each command struct implements this for the state it mutates.
+/// Returns a `Bytes` response that will be threaded into the Reply message.
pub trait StateHandler {
type State;
- fn apply(&self, state: &mut Self::State);
+ fn apply(&self, state: &mut Self::State) -> Bytes;
}
#[derive(Debug)]
@@ -171,6 +173,7 @@ macro_rules! define_state {
$(
pub $field_name: $field_type,
)*
+ pub last_result: Option<bytes::Bytes>,
}
impl [<$state Inner>] {
@@ -251,19 +254,19 @@ macro_rules! collect_handlers {
impl [<$state Inner>] {
fn dispatch(&mut self, cmd: &[<$state Command>]) {
- match cmd {
+ self.last_result = Some(match cmd {
$(
[<$state Command>]::$operation(payload) => {
- $crate::stm::StateHandler::apply(payload,
self);
+ $crate::stm::StateHandler::apply(payload, self)
},
)*
- }
+ });
}
}
impl $crate::stm::State for $state {
type Input = <[<$state Inner>] as $crate::stm::Command>::Input;
- type Output = ();
+ type Output = bytes::Bytes;
type Error = <[<$state Inner>] as $crate::stm::Command>::Error;
fn apply(&self, input: Self::Input) ->
Result<::iggy_common::Either<Self::Output, Self::Input>, Self::Error> {
@@ -272,7 +275,10 @@ macro_rules! collect_handlers {
match <[<$state Inner>] as
$crate::stm::Command>::parse(input)? {
Either::Left(cmd) => {
self.inner.do_apply(cmd);
- Ok(Either::Left(()))
+ let result = self.inner.read(|state| {
+ state.last_result.clone().unwrap_or_default()
+ });
+ Ok(Either::Left(result))
}
Either::Right(input) => {
Ok(Either::Right(input))
diff --git a/core/metadata/src/stm/mux.rs b/core/metadata/src/stm/mux.rs
index 648d8893e..0f6d39dcc 100644
--- a/core/metadata/src/stm/mux.rs
+++ b/core/metadata/src/stm/mux.rs
@@ -53,18 +53,14 @@ where
}
}
-// TODO: Figure out how to get around the fact that we need to hardcode the
Input/Output type for base case.
-// TODO: I think we could move the base case to the impl site of `State`, so
this way we know the `Input` and `Output` types.
// Base case of the recursive resolution.
impl StateMachine for () {
type Input = Message<PrepareHeader>;
- // TODO: Make sure that the `Output` matches to the output type of the
rest of list.
- // TODO: Add a trait bound to the output that will allow us to get the
response in bytes.
- type Output = ();
+ type Output = bytes::Bytes;
type Error = iggy_common::IggyError;
fn update(&self, _input: Self::Input) -> Result<Self::Output, Self::Error>
{
- Ok(())
+ Ok(bytes::Bytes::new())
}
}
diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs
index 7e4c95104..9bc6f4668 100644
--- a/core/metadata/src/stm/stream.rs
+++ b/core/metadata/src/stm/stream.rs
@@ -20,6 +20,7 @@ use crate::stm::StateHandler;
use crate::stm::snapshot::Snapshotable;
use crate::{collect_handlers, define_state, impl_fill_restore};
use ahash::AHashMap;
+use bytes::Bytes;
use iggy_common::create_partitions::CreatePartitions;
use iggy_common::create_stream::CreateStream;
use iggy_common::create_topic::CreateTopic;
@@ -272,12 +273,13 @@ impl StreamsInner {
}
}
+// TODO(hubcio): Serialize proper reply (e.g. assigned stream ID) instead of
empty Bytes.
impl StateHandler for CreateStream {
type State = StreamsInner;
- fn apply(&self, state: &mut StreamsInner) {
+ fn apply(&self, state: &mut StreamsInner) -> Bytes {
let name_arc: Arc<str> = Arc::from(self.name.as_str());
if state.index.contains_key(&name_arc) {
- return;
+ return Bytes::new();
}
let stream = Stream {
@@ -294,37 +296,39 @@ impl StateHandler for CreateStream {
stream.id = id;
}
state.index.insert(name_arc, id);
+ Bytes::new()
}
}
impl StateHandler for UpdateStream {
type State = StreamsInner;
- fn apply(&self, state: &mut StreamsInner) {
+ fn apply(&self, state: &mut StreamsInner) -> Bytes {
let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else {
- return;
+ return Bytes::new();
};
let Some(stream) = state.items.get_mut(stream_id) else {
- return;
+ return Bytes::new();
};
let new_name_arc: Arc<str> = Arc::from(self.name.as_str());
if let Some(&existing_id) = state.index.get(&new_name_arc)
&& existing_id != stream_id
{
- return;
+ return Bytes::new();
}
state.index.remove(&stream.name);
stream.name = new_name_arc.clone();
state.index.insert(new_name_arc, stream_id);
+ Bytes::new()
}
}
impl StateHandler for DeleteStream {
type State = StreamsInner;
- fn apply(&self, state: &mut StreamsInner) {
+ fn apply(&self, state: &mut StreamsInner) -> Bytes {
let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else {
- return;
+ return Bytes::new();
};
if let Some(stream) = state.items.get(stream_id) {
@@ -333,30 +337,32 @@ impl StateHandler for DeleteStream {
state.items.remove(stream_id);
state.index.remove(&name);
}
+ Bytes::new()
}
}
impl StateHandler for PurgeStream {
type State = StreamsInner;
- fn apply(&self, _state: &mut StreamsInner) {
+ fn apply(&self, _state: &mut StreamsInner) -> Bytes {
// TODO
todo!();
}
}
+// TODO(hubcio): Serialize proper reply (e.g. assigned topic ID) instead of
empty Bytes.
impl StateHandler for CreateTopic {
type State = StreamsInner;
- fn apply(&self, state: &mut StreamsInner) {
+ fn apply(&self, state: &mut StreamsInner) -> Bytes {
let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else {
- return;
+ return Bytes::new();
};
let Some(stream) = state.items.get_mut(stream_id) else {
- return;
+ return Bytes::new();
};
let name_arc: Arc<str> = Arc::from(self.name.as_str());
if stream.topic_index.contains_key(&name_arc) {
- return;
+ return Bytes::new();
}
let topic = Topic {
@@ -386,31 +392,32 @@ impl StateHandler for CreateTopic {
}
stream.topic_index.insert(name_arc, topic_id);
+ Bytes::new()
}
}
impl StateHandler for UpdateTopic {
type State = StreamsInner;
- fn apply(&self, state: &mut StreamsInner) {
+ fn apply(&self, state: &mut StreamsInner) -> Bytes {
let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else {
- return;
+ return Bytes::new();
};
let Some(topic_id) = state.resolve_topic_id(stream_id, &self.topic_id)
else {
- return;
+ return Bytes::new();
};
let Some(stream) = state.items.get_mut(stream_id) else {
- return;
+ return Bytes::new();
};
let Some(topic) = stream.topics.get_mut(topic_id) else {
- return;
+ return Bytes::new();
};
let new_name_arc: Arc<str> = Arc::from(self.name.as_str());
if let Some(&existing_id) = stream.topic_index.get(&new_name_arc)
&& existing_id != topic_id
{
- return;
+ return Bytes::new();
}
stream.topic_index.remove(&topic.name);
@@ -422,20 +429,21 @@ impl StateHandler for UpdateTopic {
topic.replication_factor = rf;
}
stream.topic_index.insert(new_name_arc, topic_id);
+ Bytes::new()
}
}
impl StateHandler for DeleteTopic {
type State = StreamsInner;
- fn apply(&self, state: &mut StreamsInner) {
+ fn apply(&self, state: &mut StreamsInner) -> Bytes {
let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else {
- return;
+ return Bytes::new();
};
let Some(topic_id) = state.resolve_topic_id(stream_id, &self.topic_id)
else {
- return;
+ return Bytes::new();
};
let Some(stream) = state.items.get_mut(stream_id) else {
- return;
+ return Bytes::new();
};
if let Some(topic) = stream.topics.get(topic_id) {
@@ -443,32 +451,34 @@ impl StateHandler for DeleteTopic {
stream.topics.remove(topic_id);
stream.topic_index.remove(&name);
}
+ Bytes::new()
}
}
impl StateHandler for PurgeTopic {
type State = StreamsInner;
- fn apply(&self, _state: &mut StreamsInner) {
+ fn apply(&self, _state: &mut StreamsInner) -> Bytes {
// TODO
todo!();
}
}
+// TODO(hubcio): Serialize proper reply (e.g. assigned partition IDs) instead
of empty Bytes.
impl StateHandler for CreatePartitions {
type State = StreamsInner;
- fn apply(&self, state: &mut StreamsInner) {
+ fn apply(&self, state: &mut StreamsInner) -> Bytes {
let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else {
- return;
+ return Bytes::new();
};
let Some(topic_id) = state.resolve_topic_id(stream_id, &self.topic_id)
else {
- return;
+ return Bytes::new();
};
let Some(stream) = state.items.get_mut(stream_id) else {
- return;
+ return Bytes::new();
};
let Some(topic) = stream.topics.get_mut(topic_id) else {
- return;
+ return Bytes::new();
};
let current_partition_count = topic.partitions.len();
@@ -480,24 +490,25 @@ impl StateHandler for CreatePartitions {
};
topic.partitions.push(partition);
}
+ Bytes::new()
}
}
impl StateHandler for DeletePartitions {
type State = StreamsInner;
- fn apply(&self, state: &mut StreamsInner) {
+ fn apply(&self, state: &mut StreamsInner) -> Bytes {
let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else {
- return;
+ return Bytes::new();
};
let Some(topic_id) = state.resolve_topic_id(stream_id, &self.topic_id)
else {
- return;
+ return Bytes::new();
};
let Some(stream) = state.items.get_mut(stream_id) else {
- return;
+ return Bytes::new();
};
let Some(topic) = stream.topics.get_mut(topic_id) else {
- return;
+ return Bytes::new();
};
let count_to_delete = self.partitions_count as usize;
@@ -506,6 +517,7 @@ impl StateHandler for DeletePartitions {
.partitions
.truncate(topic.partitions.len() - count_to_delete);
}
+ Bytes::new()
}
}
@@ -646,7 +658,11 @@ impl Snapshotable for Streams {
}
let items: Slab<Stream> = stream_entries.into_iter().collect();
- let inner = StreamsInner { index, items };
+ let inner = StreamsInner {
+ index,
+ items,
+ last_result: None,
+ };
Ok(inner.into())
}
}
diff --git a/core/metadata/src/stm/user.rs b/core/metadata/src/stm/user.rs
index 14f71f3af..8e2c7897e 100644
--- a/core/metadata/src/stm/user.rs
+++ b/core/metadata/src/stm/user.rs
@@ -20,6 +20,7 @@ use crate::stm::StateHandler;
use crate::stm::snapshot::Snapshotable;
use crate::{collect_handlers, define_state, impl_fill_restore};
use ahash::AHashMap;
+use bytes::Bytes;
use iggy_common::change_password::ChangePassword;
use iggy_common::create_personal_access_token::CreatePersonalAccessToken;
use iggy_common::create_user::CreateUser;
@@ -122,12 +123,13 @@ impl UsersInner {
}
}
+// TODO(hubcio): Serialize proper reply (e.g. assigned user ID) instead of
empty Bytes.
impl StateHandler for CreateUser {
type State = UsersInner;
- fn apply(&self, state: &mut UsersInner) {
+ fn apply(&self, state: &mut UsersInner) -> Bytes {
let username_arc: Arc<str> = Arc::from(self.username.as_str());
if state.index.contains_key(&username_arc) {
- return;
+ return Bytes::new();
}
let user = User {
@@ -148,18 +150,19 @@ impl StateHandler for CreateUser {
state
.personal_access_tokens
.insert(id as UserId, AHashMap::default());
+ Bytes::new()
}
}
impl StateHandler for UpdateUser {
type State = UsersInner;
- fn apply(&self, state: &mut UsersInner) {
+ fn apply(&self, state: &mut UsersInner) -> Bytes {
let Some(user_id) = state.resolve_user_id(&self.user_id) else {
- return;
+ return Bytes::new();
};
let Some(user) = state.items.get_mut(user_id) else {
- return;
+ return Bytes::new();
};
if let Some(new_username) = &self.username {
@@ -167,7 +170,7 @@ impl StateHandler for UpdateUser {
if let Some(&existing_id) = state.index.get(&new_username_arc)
&& existing_id != user_id as UserId
{
- return;
+ return Bytes::new();
}
state.index.remove(&user.username);
@@ -178,14 +181,15 @@ impl StateHandler for UpdateUser {
if let Some(new_status) = self.status {
user.status = new_status;
}
+ Bytes::new()
}
}
impl StateHandler for DeleteUser {
type State = UsersInner;
- fn apply(&self, state: &mut UsersInner) {
+ fn apply(&self, state: &mut UsersInner) -> Bytes {
let Some(user_id) = state.resolve_user_id(&self.user_id) else {
- return;
+ return Bytes::new();
};
if let Some(user) = state.items.get(user_id) {
@@ -194,51 +198,55 @@ impl StateHandler for DeleteUser {
state.index.remove(&username);
state.personal_access_tokens.remove(&(user_id as UserId));
}
+ Bytes::new()
}
}
impl StateHandler for ChangePassword {
type State = UsersInner;
- fn apply(&self, state: &mut UsersInner) {
+ fn apply(&self, state: &mut UsersInner) -> Bytes {
let Some(user_id) = state.resolve_user_id(&self.user_id) else {
- return;
+ return Bytes::new();
};
if let Some(user) = state.items.get_mut(user_id) {
user.password_hash = Arc::from(self.new_password.as_str());
}
+ Bytes::new()
}
}
impl StateHandler for UpdatePermissions {
type State = UsersInner;
- fn apply(&self, state: &mut UsersInner) {
+ fn apply(&self, state: &mut UsersInner) -> Bytes {
let Some(user_id) = state.resolve_user_id(&self.user_id) else {
- return;
+ return Bytes::new();
};
if let Some(user) = state.items.get_mut(user_id) {
user.permissions = self.permissions.as_ref().map(|p|
Arc::new(p.clone()));
}
+ Bytes::new()
}
}
+// TODO(hubcio): Serialize proper reply (e.g. generated token) instead of
empty Bytes.
impl StateHandler for CreatePersonalAccessToken {
type State = UsersInner;
- fn apply(&self, state: &mut UsersInner) {
+ fn apply(&self, state: &mut UsersInner) -> Bytes {
// TODO: Stub until protocol gets adjusted.
let user_id = 0;
let user_tokens =
state.personal_access_tokens.entry(user_id).or_default();
let name_arc: Arc<str> = Arc::from(self.name.as_str());
if user_tokens.contains_key(&name_arc) {
- return;
+ return Bytes::new();
}
let expiry_at =
PersonalAccessToken::calculate_expiry_at(IggyTimestamp::now(), self.expiry);
if let Some(expiry_at) = expiry_at
&& expiry_at.as_micros() <= IggyTimestamp::now().as_micros()
{
- return;
+ return Bytes::new();
}
let (pat, _) = PersonalAccessToken::new(
@@ -248,12 +256,13 @@ impl StateHandler for CreatePersonalAccessToken {
self.expiry,
);
user_tokens.insert(name_arc, pat);
+ Bytes::new()
}
}
impl StateHandler for DeletePersonalAccessToken {
type State = UsersInner;
- fn apply(&self, state: &mut UsersInner) {
+ fn apply(&self, state: &mut UsersInner) -> Bytes {
// TODO: Stub until protocol gets adjusted.
let user_id = 0;
@@ -261,6 +270,7 @@ impl StateHandler for DeletePersonalAccessToken {
let name_arc: Arc<str> = Arc::from(self.name.as_str());
user_tokens.remove(&name_arc);
}
+ Bytes::new()
}
}
@@ -473,6 +483,7 @@ impl Snapshotable for Users {
items,
personal_access_tokens,
permissioner,
+ last_result: None,
};
Ok(inner.into())
}
diff --git a/core/partitions/Cargo.toml b/core/partitions/Cargo.toml
index 6940fb323..d7fadd93b 100644
--- a/core/partitions/Cargo.toml
+++ b/core/partitions/Cargo.toml
@@ -28,6 +28,7 @@ repository = "https://github.com/apache/iggy"
readme = "../../README.md"
[dependencies]
+bytes = { workspace = true }
consensus = { workspace = true }
iggy_common = { workspace = true }
journal = { workspace = true }
diff --git a/core/partitions/src/iggy_partitions.rs
b/core/partitions/src/iggy_partitions.rs
index 3ae5d4389..6f94da3bc 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -486,7 +486,8 @@ where
}
}
- let generic_reply = build_reply_message(consensus,
&prepare_header).into_generic();
+ let generic_reply =
+ build_reply_message(consensus, &prepare_header,
bytes::Bytes::new()).into_generic();
debug!(
"on_ack: sending reply to client={} for op={}",
prepare_header.client, prepare_header.op
diff --git a/core/shard/Cargo.toml b/core/shard/Cargo.toml
index f3f9d7e29..5180ba829 100644
--- a/core/shard/Cargo.toml
+++ b/core/shard/Cargo.toml
@@ -21,6 +21,7 @@ version = "0.1.0"
edition = "2024"
[dependencies]
+bytes = { workspace = true }
consensus = { path = "../consensus" }
crossfire = { workspace = true }
futures = { workspace = true }
diff --git a/core/shard/src/lib.rs b/core/shard/src/lib.rs
index 3e6adc00f..2b8b3138b 100644
--- a/core/shard/src/lib.rs
+++ b/core/shard/src/lib.rs
@@ -160,7 +160,11 @@ where
Entry = Message<PrepareHeader>,
Header = PrepareHeader,
>,
- M: StateMachine<Input = Message<PrepareHeader>>,
+ M: StateMachine<
+ Input = Message<PrepareHeader>,
+ Output = bytes::Bytes,
+ Error = iggy_common::IggyError,
+ >,
{
match MessageBag::from(message) {
MessageBag::Request(request) => self.on_request(request).await,
@@ -178,7 +182,11 @@ where
Entry = Message<PrepareHeader>,
Header = PrepareHeader,
>,
- M: StateMachine<Input = Message<PrepareHeader>>,
+ M: StateMachine<
+ Input = Message<PrepareHeader>,
+ Output = bytes::Bytes,
+ Error = iggy_common::IggyError,
+ >,
{
self.plane.on_request(request).await;
}
@@ -192,7 +200,11 @@ where
Entry = Message<PrepareHeader>,
Header = PrepareHeader,
>,
- M: StateMachine<Input = Message<PrepareHeader>>,
+ M: StateMachine<
+ Input = Message<PrepareHeader>,
+ Output = bytes::Bytes,
+ Error = iggy_common::IggyError,
+ >,
{
self.plane.on_replicate(prepare).await;
}
@@ -206,7 +218,11 @@ where
Entry = Message<PrepareHeader>,
Header = PrepareHeader,
>,
- M: StateMachine<Input = Message<PrepareHeader>>,
+ M: StateMachine<
+ Input = Message<PrepareHeader>,
+ Output = bytes::Bytes,
+ Error = iggy_common::IggyError,
+ >,
{
self.plane.on_ack(prepare_ok).await;
}
@@ -228,7 +244,11 @@ where
Entry = Message<PrepareHeader>,
Header = PrepareHeader,
>,
- M: StateMachine<Input = Message<PrepareHeader>>,
+ M: StateMachine<
+ Input = Message<PrepareHeader>,
+ Output = bytes::Bytes,
+ Error = iggy_common::IggyError,
+ >,
{
debug_assert!(buf.is_empty(), "buf must be empty on entry");
diff --git a/core/shard/src/router.rs b/core/shard/src/router.rs
index a8b6d5637..8102fef00 100644
--- a/core/shard/src/router.rs
+++ b/core/shard/src/router.rs
@@ -136,7 +136,11 @@ where
Entry = Message<PrepareHeader>,
Header = PrepareHeader,
>,
- M: StateMachine<Input = Message<PrepareHeader>>,
+ M: StateMachine<
+ Input = Message<PrepareHeader>,
+ Output = bytes::Bytes,
+ Error = iggy_common::IggyError,
+ >,
{
loop {
futures::select! {
@@ -165,7 +169,11 @@ where
Entry = Message<PrepareHeader>,
Header = PrepareHeader,
>,
- M: StateMachine<Input = Message<PrepareHeader>>,
+ M: StateMachine<
+ Input = Message<PrepareHeader>,
+ Output = bytes::Bytes,
+ Error = iggy_common::IggyError,
+ >,
{
self.on_message(frame.message).await;
// TODO: once on_message returns an R (e.g. ShardResponse), send it