This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch pedantic-consensus in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 4bf90f5dac9eef17d66ad27fc4a55d7b9f5058b3 Author: Hubert Gruszecki <[email protected]> AuthorDate: Tue Mar 3 11:58:13 2026 +0100 feat(metadata): thread STM response into consensus Reply body After a metadata state machine commit, the Reply message had an empty body -- mux_stm.update() returned () and the result was discarded. Clients need the response (e.g. assigned IDs) to proceed without a second query. StateHandler::apply now returns Bytes, stored in a last_result field on each *Inner struct during left_right::Absorb (which forces () return). After publish(), the result is read back through ReadHandle and propagated through State -> StateMachine -> MuxSTM into build_reply_message, which now accepts a body parameter and constructs a variable-length BytesMut buffer instead of a header-only transmute. All 19 handlers return Bytes::new() for now. --- Cargo.lock | 5 ++ core/consensus/Cargo.toml | 2 + core/consensus/src/plane_helpers.rs | 55 ++++++++++++--------- core/metadata/Cargo.toml | 1 + core/metadata/src/impls/metadata.rs | 20 +++++--- core/metadata/src/stm/consumer_group.rs | 13 +++-- core/metadata/src/stm/mod.rs | 18 ++++--- core/metadata/src/stm/mux.rs | 8 +-- core/metadata/src/stm/stream.rs | 86 +++++++++++++++++++-------------- core/metadata/src/stm/user.rs | 43 +++++++++++------ core/partitions/Cargo.toml | 1 + core/partitions/src/iggy_partitions.rs | 3 +- core/shard/Cargo.toml | 1 + core/shard/src/lib.rs | 30 ++++++++++-- 14 files changed, 185 insertions(+), 101 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e54c38866..c0f9957b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2165,6 +2165,8 @@ name = "consensus" version = "0.1.0" dependencies = [ "bit-set", + "bytemuck", + "bytes", "futures", "iggy_common", "message_bus", @@ -6381,6 +6383,7 @@ name = "metadata" version = "0.1.0" dependencies = [ "ahash 0.8.12", + "bytes", "consensus", "iggy_common", "journal", @@ -7276,6 +7279,7 @@ dependencies = [ name = "partitions" version = "0.1.0" dependencies = [ + "bytes", "consensus", "iggy_common", "journal", @@ -9373,6 +9377,7 @@ dependencies = [ name = "shard" version = "0.1.0" dependencies = [ + "bytes", "consensus", "crossfire", "futures", diff --git a/core/consensus/Cargo.toml b/core/consensus/Cargo.toml index 7b433bfb2..70c6ff46b 100644 --- a/core/consensus/Cargo.toml +++ b/core/consensus/Cargo.toml @@ -29,6 +29,8 @@ readme = "../../../README.md" [dependencies] bit-set = { workspace = true } +bytemuck = { workspace = true } +bytes = { workspace = true } iggy_common = { workspace = true } message_bus = { workspace = true } rand = { workspace = true } diff --git a/core/consensus/src/plane_helpers.rs b/core/consensus/src/plane_helpers.rs index 8f8ea1275..f4ca92f3d 100644 --- a/core/consensus/src/plane_helpers.rs +++ b/core/consensus/src/plane_helpers.rs @@ -198,33 +198,44 @@ where pub fn build_reply_message<B, P>( consensus: &VsrConsensus<B, P>, prepare_header: &PrepareHeader, + body: bytes::Bytes, ) -> Message<ReplyHeader> where B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>, { - Message::<ReplyHeader>::new(std::mem::size_of::<ReplyHeader>()).transmute_header(|_, new| { - *new = ReplyHeader { - checksum: 0, - checksum_body: 0, - cluster: consensus.cluster(), - size: std::mem::size_of::<ReplyHeader>() as u32, - view: consensus.view(), - release: 0, - command: Command2::Reply, - replica: consensus.replica(), - reserved_frame: [0; 66], - request_checksum: prepare_header.request_checksum, - context: 0, - op: prepare_header.op, - commit: consensus.commit(), - timestamp: prepare_header.timestamp, - request: prepare_header.request, - operation: prepare_header.operation, - namespace: prepare_header.namespace, - ..Default::default() - }; - }) + let header_size = std::mem::size_of::<ReplyHeader>(); + let total_size = header_size + body.len(); + let mut buffer = bytes::BytesMut::zeroed(total_size); + + let header = bytemuck::from_bytes_mut::<ReplyHeader>(&mut buffer[..header_size]); + *header = ReplyHeader { + checksum: 0, + checksum_body: 0, + cluster: consensus.cluster(), + size: total_size as u32, + view: consensus.view(), + release: 0, + command: Command2::Reply, + replica: consensus.replica(), + reserved_frame: [0; 66], + request_checksum: prepare_header.request_checksum, + context: 0, + op: prepare_header.op, + commit: consensus.commit(), + timestamp: prepare_header.timestamp, + request: prepare_header.request, + operation: prepare_header.operation, + namespace: prepare_header.namespace, + ..Default::default() + }; + + if !body.is_empty() { + buffer[header_size..].copy_from_slice(&body); + } + + Message::<ReplyHeader>::from_bytes(buffer.freeze()) + .expect("build_reply_message: constructed header must be valid") } /// Verify hash chain would not break if we add this header. diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml index f5d81d5f8..fcad045bf 100644 --- a/core/metadata/Cargo.toml +++ b/core/metadata/Cargo.toml @@ -29,6 +29,7 @@ readme = "../../../README.md" [dependencies] ahash = { workspace = true } +bytes = { workspace = true } consensus = { workspace = true } iggy_common = { workspace = true } journal = { workspace = true } diff --git a/core/metadata/src/impls/metadata.rs b/core/metadata/src/impls/metadata.rs index f5502ce06..41cb21296 100644 --- a/core/metadata/src/impls/metadata.rs +++ b/core/metadata/src/impls/metadata.rs @@ -104,7 +104,11 @@ where B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, J: JournalHandle, J::Target: Journal<J::Storage, Entry = Message<PrepareHeader>, Header = PrepareHeader>, - M: StateMachine<Input = Message<PrepareHeader>>, + M: StateMachine< + Input = Message<PrepareHeader>, + Output = bytes::Bytes, + Error = iggy_common::IggyError, + >, { async fn on_request(&self, message: <VsrConsensus<B> as Consensus>::Message<RequestHeader>) { let consensus = self.consensus.as_ref().unwrap(); @@ -220,13 +224,17 @@ where ) }); - // Apply the state (consumes prepare) - // TODO: Handle appending result to response - let _result = self.mux_stm.update(prepare); + let response = self.mux_stm.update(prepare).unwrap_or_else(|err| { + warn!( + "on_ack: state machine error for op={}: {err}", + prepare_header.op + ); + bytes::Bytes::new() + }); debug!("on_ack: state applied for op={}", prepare_header.op); - // Send reply to client - let generic_reply = build_reply_message(consensus, &prepare_header).into_generic(); + let generic_reply = + build_reply_message(consensus, &prepare_header, response).into_generic(); debug!( "on_ack: sending reply to client={} for op={}", prepare_header.client, prepare_header.op diff --git a/core/metadata/src/stm/consumer_group.rs b/core/metadata/src/stm/consumer_group.rs index 5c039e0dc..f9e42178f 100644 --- a/core/metadata/src/stm/consumer_group.rs +++ b/core/metadata/src/stm/consumer_group.rs @@ -18,6 +18,7 @@ use crate::stm::StateHandler; use crate::stm::snapshot::Snapshotable; use crate::{collect_handlers, define_state, impl_fill_restore}; +use bytes::Bytes; use ahash::AHashMap; use iggy_common::create_consumer_group::CreateConsumerGroup; @@ -166,12 +167,13 @@ impl ConsumerGroupsInner { // TODO: This is all a hack, we need to figure out how to do this in a way where `Identifier` // does not reach this stage of execution. +// TODO(hubcio): Serialize proper reply (e.g. assigned group ID) instead of empty Bytes. impl StateHandler for CreateConsumerGroup { type State = ConsumerGroupsInner; - fn apply(&self, state: &mut ConsumerGroupsInner) { + fn apply(&self, state: &mut ConsumerGroupsInner) -> Bytes { let name: Arc<str> = Arc::from(self.name.as_str()); if state.name_index.contains_key(&name) { - return; + return Bytes::new(); } let group = ConsumerGroup::new(name.clone()); @@ -198,18 +200,19 @@ impl StateHandler for CreateConsumerGroup { let key = (Arc::from(s.as_str()), Arc::from(t.as_str())); state.topic_name_index.entry(key).or_default().push(id); } + Bytes::new() } } impl StateHandler for DeleteConsumerGroup { type State = ConsumerGroupsInner; - fn apply(&self, state: &mut ConsumerGroupsInner) { + fn apply(&self, state: &mut ConsumerGroupsInner) -> Bytes { let Some(id) = state.resolve_consumer_group_id_by_identifiers( &self.stream_id, &self.topic_id, &self.group_id, ) else { - return; + return Bytes::new(); }; let group = state.items.remove(id); @@ -232,6 +235,7 @@ impl StateHandler for DeleteConsumerGroup { vec.retain(|&x| x != id); } } + Bytes::new() } } @@ -367,6 +371,7 @@ impl Snapshotable for ConsumerGroups { topic_index, topic_name_index, items, + last_result: None, }; Ok(inner.into()) } diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs index 7de1992d0..82aed80b1 100644 --- a/core/metadata/src/stm/mod.rs +++ b/core/metadata/src/stm/mod.rs @@ -21,6 +21,7 @@ pub mod snapshot; pub mod stream; pub mod user; +use bytes::Bytes; use iggy_common::Either; use left_right::*; use std::cell::UnsafeCell; @@ -78,9 +79,10 @@ pub trait Command { /// Per-command handler for a given state type. /// Each command struct implements this for the state it mutates. +/// Returns a `Bytes` response that will be threaded into the Reply message. pub trait StateHandler { type State; - fn apply(&self, state: &mut Self::State); + fn apply(&self, state: &mut Self::State) -> Bytes; } #[derive(Debug)] @@ -171,6 +173,7 @@ macro_rules! define_state { $( pub $field_name: $field_type, )* + pub last_result: Option<bytes::Bytes>, } impl [<$state Inner>] { @@ -251,19 +254,19 @@ macro_rules! collect_handlers { impl [<$state Inner>] { fn dispatch(&mut self, cmd: &[<$state Command>]) { - match cmd { + self.last_result = Some(match cmd { $( [<$state Command>]::$operation(payload) => { - $crate::stm::StateHandler::apply(payload, self); + $crate::stm::StateHandler::apply(payload, self) }, )* - } + }); } } impl $crate::stm::State for $state { type Input = <[<$state Inner>] as $crate::stm::Command>::Input; - type Output = (); + type Output = bytes::Bytes; type Error = <[<$state Inner>] as $crate::stm::Command>::Error; fn apply(&self, input: Self::Input) -> Result<::iggy_common::Either<Self::Output, Self::Input>, Self::Error> { @@ -272,7 +275,10 @@ macro_rules! collect_handlers { match <[<$state Inner>] as $crate::stm::Command>::parse(input)? { Either::Left(cmd) => { self.inner.do_apply(cmd); - Ok(Either::Left(())) + let result = self.inner.read(|state| { + state.last_result.clone().unwrap_or_default() + }); + Ok(Either::Left(result)) } Either::Right(input) => { Ok(Either::Right(input)) diff --git a/core/metadata/src/stm/mux.rs b/core/metadata/src/stm/mux.rs index 648d8893e..0f6d39dcc 100644 --- a/core/metadata/src/stm/mux.rs +++ b/core/metadata/src/stm/mux.rs @@ -53,18 +53,14 @@ where } } -// TODO: Figure out how to get around the fact that we need to hardcode the Input/Output type for base case. -// TODO: I think we could move the base case to the impl site of `State`, so this way we know the `Input` and `Output` types. // Base case of the recursive resolution. impl StateMachine for () { type Input = Message<PrepareHeader>; - // TODO: Make sure that the `Output` matches to the output type of the rest of list. - // TODO: Add a trait bound to the output that will allow us to get the response in bytes. - type Output = (); + type Output = bytes::Bytes; type Error = iggy_common::IggyError; fn update(&self, _input: Self::Input) -> Result<Self::Output, Self::Error> { - Ok(()) + Ok(bytes::Bytes::new()) } } diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs index 7e4c95104..9bc6f4668 100644 --- a/core/metadata/src/stm/stream.rs +++ b/core/metadata/src/stm/stream.rs @@ -20,6 +20,7 @@ use crate::stm::StateHandler; use crate::stm::snapshot::Snapshotable; use crate::{collect_handlers, define_state, impl_fill_restore}; use ahash::AHashMap; +use bytes::Bytes; use iggy_common::create_partitions::CreatePartitions; use iggy_common::create_stream::CreateStream; use iggy_common::create_topic::CreateTopic; @@ -272,12 +273,13 @@ impl StreamsInner { } } +// TODO(hubcio): Serialize proper reply (e.g. assigned stream ID) instead of empty Bytes. impl StateHandler for CreateStream { type State = StreamsInner; - fn apply(&self, state: &mut StreamsInner) { + fn apply(&self, state: &mut StreamsInner) -> Bytes { let name_arc: Arc<str> = Arc::from(self.name.as_str()); if state.index.contains_key(&name_arc) { - return; + return Bytes::new(); } let stream = Stream { @@ -294,37 +296,39 @@ impl StateHandler for CreateStream { stream.id = id; } state.index.insert(name_arc, id); + Bytes::new() } } impl StateHandler for UpdateStream { type State = StreamsInner; - fn apply(&self, state: &mut StreamsInner) { + fn apply(&self, state: &mut StreamsInner) -> Bytes { let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else { - return; + return Bytes::new(); }; let Some(stream) = state.items.get_mut(stream_id) else { - return; + return Bytes::new(); }; let new_name_arc: Arc<str> = Arc::from(self.name.as_str()); if let Some(&existing_id) = state.index.get(&new_name_arc) && existing_id != stream_id { - return; + return Bytes::new(); } state.index.remove(&stream.name); stream.name = new_name_arc.clone(); state.index.insert(new_name_arc, stream_id); + Bytes::new() } } impl StateHandler for DeleteStream { type State = StreamsInner; - fn apply(&self, state: &mut StreamsInner) { + fn apply(&self, state: &mut StreamsInner) -> Bytes { let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else { - return; + return Bytes::new(); }; if let Some(stream) = state.items.get(stream_id) { @@ -333,30 +337,32 @@ impl StateHandler for DeleteStream { state.items.remove(stream_id); state.index.remove(&name); } + Bytes::new() } } impl StateHandler for PurgeStream { type State = StreamsInner; - fn apply(&self, _state: &mut StreamsInner) { + fn apply(&self, _state: &mut StreamsInner) -> Bytes { // TODO todo!(); } } +// TODO(hubcio): Serialize proper reply (e.g. assigned topic ID) instead of empty Bytes. impl StateHandler for CreateTopic { type State = StreamsInner; - fn apply(&self, state: &mut StreamsInner) { + fn apply(&self, state: &mut StreamsInner) -> Bytes { let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else { - return; + return Bytes::new(); }; let Some(stream) = state.items.get_mut(stream_id) else { - return; + return Bytes::new(); }; let name_arc: Arc<str> = Arc::from(self.name.as_str()); if stream.topic_index.contains_key(&name_arc) { - return; + return Bytes::new(); } let topic = Topic { @@ -386,31 +392,32 @@ impl StateHandler for CreateTopic { } stream.topic_index.insert(name_arc, topic_id); + Bytes::new() } } impl StateHandler for UpdateTopic { type State = StreamsInner; - fn apply(&self, state: &mut StreamsInner) { + fn apply(&self, state: &mut StreamsInner) -> Bytes { let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else { - return; + return Bytes::new(); }; let Some(topic_id) = state.resolve_topic_id(stream_id, &self.topic_id) else { - return; + return Bytes::new(); }; let Some(stream) = state.items.get_mut(stream_id) else { - return; + return Bytes::new(); }; let Some(topic) = stream.topics.get_mut(topic_id) else { - return; + return Bytes::new(); }; let new_name_arc: Arc<str> = Arc::from(self.name.as_str()); if let Some(&existing_id) = stream.topic_index.get(&new_name_arc) && existing_id != topic_id { - return; + return Bytes::new(); } stream.topic_index.remove(&topic.name); @@ -422,20 +429,21 @@ impl StateHandler for UpdateTopic { topic.replication_factor = rf; } stream.topic_index.insert(new_name_arc, topic_id); + Bytes::new() } } impl StateHandler for DeleteTopic { type State = StreamsInner; - fn apply(&self, state: &mut StreamsInner) { + fn apply(&self, state: &mut StreamsInner) -> Bytes { let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else { - return; + return Bytes::new(); }; let Some(topic_id) = state.resolve_topic_id(stream_id, &self.topic_id) else { - return; + return Bytes::new(); }; let Some(stream) = state.items.get_mut(stream_id) else { - return; + return Bytes::new(); }; if let Some(topic) = stream.topics.get(topic_id) { @@ -443,32 +451,34 @@ impl StateHandler for DeleteTopic { stream.topics.remove(topic_id); stream.topic_index.remove(&name); } + Bytes::new() } } impl StateHandler for PurgeTopic { type State = StreamsInner; - fn apply(&self, _state: &mut StreamsInner) { + fn apply(&self, _state: &mut StreamsInner) -> Bytes { // TODO todo!(); } } +// TODO(hubcio): Serialize proper reply (e.g. assigned partition IDs) instead of empty Bytes. impl StateHandler for CreatePartitions { type State = StreamsInner; - fn apply(&self, state: &mut StreamsInner) { + fn apply(&self, state: &mut StreamsInner) -> Bytes { let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else { - return; + return Bytes::new(); }; let Some(topic_id) = state.resolve_topic_id(stream_id, &self.topic_id) else { - return; + return Bytes::new(); }; let Some(stream) = state.items.get_mut(stream_id) else { - return; + return Bytes::new(); }; let Some(topic) = stream.topics.get_mut(topic_id) else { - return; + return Bytes::new(); }; let current_partition_count = topic.partitions.len(); @@ -480,24 +490,25 @@ impl StateHandler for CreatePartitions { }; topic.partitions.push(partition); } + Bytes::new() } } impl StateHandler for DeletePartitions { type State = StreamsInner; - fn apply(&self, state: &mut StreamsInner) { + fn apply(&self, state: &mut StreamsInner) -> Bytes { let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else { - return; + return Bytes::new(); }; let Some(topic_id) = state.resolve_topic_id(stream_id, &self.topic_id) else { - return; + return Bytes::new(); }; let Some(stream) = state.items.get_mut(stream_id) else { - return; + return Bytes::new(); }; let Some(topic) = stream.topics.get_mut(topic_id) else { - return; + return Bytes::new(); }; let count_to_delete = self.partitions_count as usize; @@ -506,6 +517,7 @@ impl StateHandler for DeletePartitions { .partitions .truncate(topic.partitions.len() - count_to_delete); } + Bytes::new() } } @@ -646,7 +658,11 @@ impl Snapshotable for Streams { } let items: Slab<Stream> = stream_entries.into_iter().collect(); - let inner = StreamsInner { index, items }; + let inner = StreamsInner { + index, + items, + last_result: None, + }; Ok(inner.into()) } } diff --git a/core/metadata/src/stm/user.rs b/core/metadata/src/stm/user.rs index 14f71f3af..8e2c7897e 100644 --- a/core/metadata/src/stm/user.rs +++ b/core/metadata/src/stm/user.rs @@ -20,6 +20,7 @@ use crate::stm::StateHandler; use crate::stm::snapshot::Snapshotable; use crate::{collect_handlers, define_state, impl_fill_restore}; use ahash::AHashMap; +use bytes::Bytes; use iggy_common::change_password::ChangePassword; use iggy_common::create_personal_access_token::CreatePersonalAccessToken; use iggy_common::create_user::CreateUser; @@ -122,12 +123,13 @@ impl UsersInner { } } +// TODO(hubcio): Serialize proper reply (e.g. assigned user ID) instead of empty Bytes. impl StateHandler for CreateUser { type State = UsersInner; - fn apply(&self, state: &mut UsersInner) { + fn apply(&self, state: &mut UsersInner) -> Bytes { let username_arc: Arc<str> = Arc::from(self.username.as_str()); if state.index.contains_key(&username_arc) { - return; + return Bytes::new(); } let user = User { @@ -148,18 +150,19 @@ impl StateHandler for CreateUser { state .personal_access_tokens .insert(id as UserId, AHashMap::default()); + Bytes::new() } } impl StateHandler for UpdateUser { type State = UsersInner; - fn apply(&self, state: &mut UsersInner) { + fn apply(&self, state: &mut UsersInner) -> Bytes { let Some(user_id) = state.resolve_user_id(&self.user_id) else { - return; + return Bytes::new(); }; let Some(user) = state.items.get_mut(user_id) else { - return; + return Bytes::new(); }; if let Some(new_username) = &self.username { @@ -167,7 +170,7 @@ impl StateHandler for UpdateUser { if let Some(&existing_id) = state.index.get(&new_username_arc) && existing_id != user_id as UserId { - return; + return Bytes::new(); } state.index.remove(&user.username); @@ -178,14 +181,15 @@ impl StateHandler for UpdateUser { if let Some(new_status) = self.status { user.status = new_status; } + Bytes::new() } } impl StateHandler for DeleteUser { type State = UsersInner; - fn apply(&self, state: &mut UsersInner) { + fn apply(&self, state: &mut UsersInner) -> Bytes { let Some(user_id) = state.resolve_user_id(&self.user_id) else { - return; + return Bytes::new(); }; if let Some(user) = state.items.get(user_id) { @@ -194,51 +198,55 @@ impl StateHandler for DeleteUser { state.index.remove(&username); state.personal_access_tokens.remove(&(user_id as UserId)); } + Bytes::new() } } impl StateHandler for ChangePassword { type State = UsersInner; - fn apply(&self, state: &mut UsersInner) { + fn apply(&self, state: &mut UsersInner) -> Bytes { let Some(user_id) = state.resolve_user_id(&self.user_id) else { - return; + return Bytes::new(); }; if let Some(user) = state.items.get_mut(user_id) { user.password_hash = Arc::from(self.new_password.as_str()); } + Bytes::new() } } impl StateHandler for UpdatePermissions { type State = UsersInner; - fn apply(&self, state: &mut UsersInner) { + fn apply(&self, state: &mut UsersInner) -> Bytes { let Some(user_id) = state.resolve_user_id(&self.user_id) else { - return; + return Bytes::new(); }; if let Some(user) = state.items.get_mut(user_id) { user.permissions = self.permissions.as_ref().map(|p| Arc::new(p.clone())); } + Bytes::new() } } +// TODO(hubcio): Serialize proper reply (e.g. generated token) instead of empty Bytes. impl StateHandler for CreatePersonalAccessToken { type State = UsersInner; - fn apply(&self, state: &mut UsersInner) { + fn apply(&self, state: &mut UsersInner) -> Bytes { // TODO: Stub until protocol gets adjusted. let user_id = 0; let user_tokens = state.personal_access_tokens.entry(user_id).or_default(); let name_arc: Arc<str> = Arc::from(self.name.as_str()); if user_tokens.contains_key(&name_arc) { - return; + return Bytes::new(); } let expiry_at = PersonalAccessToken::calculate_expiry_at(IggyTimestamp::now(), self.expiry); if let Some(expiry_at) = expiry_at && expiry_at.as_micros() <= IggyTimestamp::now().as_micros() { - return; + return Bytes::new(); } let (pat, _) = PersonalAccessToken::new( @@ -248,12 +256,13 @@ impl StateHandler for CreatePersonalAccessToken { self.expiry, ); user_tokens.insert(name_arc, pat); + Bytes::new() } } impl StateHandler for DeletePersonalAccessToken { type State = UsersInner; - fn apply(&self, state: &mut UsersInner) { + fn apply(&self, state: &mut UsersInner) -> Bytes { // TODO: Stub until protocol gets adjusted. let user_id = 0; @@ -261,6 +270,7 @@ impl StateHandler for DeletePersonalAccessToken { let name_arc: Arc<str> = Arc::from(self.name.as_str()); user_tokens.remove(&name_arc); } + Bytes::new() } } @@ -473,6 +483,7 @@ impl Snapshotable for Users { items, personal_access_tokens, permissioner, + last_result: None, }; Ok(inner.into()) } diff --git a/core/partitions/Cargo.toml b/core/partitions/Cargo.toml index 6940fb323..d7fadd93b 100644 --- a/core/partitions/Cargo.toml +++ b/core/partitions/Cargo.toml @@ -28,6 +28,7 @@ repository = "https://github.com/apache/iggy" readme = "../../README.md" [dependencies] +bytes = { workspace = true } consensus = { workspace = true } iggy_common = { workspace = true } journal = { workspace = true } diff --git a/core/partitions/src/iggy_partitions.rs b/core/partitions/src/iggy_partitions.rs index 3ae5d4389..6f94da3bc 100644 --- a/core/partitions/src/iggy_partitions.rs +++ b/core/partitions/src/iggy_partitions.rs @@ -486,7 +486,8 @@ where } } - let generic_reply = build_reply_message(consensus, &prepare_header).into_generic(); + let generic_reply = + build_reply_message(consensus, &prepare_header, bytes::Bytes::new()).into_generic(); debug!( "on_ack: sending reply to client={} for op={}", prepare_header.client, prepare_header.op diff --git a/core/shard/Cargo.toml b/core/shard/Cargo.toml index f3f9d7e29..5180ba829 100644 --- a/core/shard/Cargo.toml +++ b/core/shard/Cargo.toml @@ -21,6 +21,7 @@ version = "0.1.0" edition = "2024" [dependencies] +bytes = { workspace = true } consensus = { path = "../consensus" } crossfire = { workspace = true } futures = { workspace = true } diff --git a/core/shard/src/lib.rs b/core/shard/src/lib.rs index 3e6adc00f..2b8b3138b 100644 --- a/core/shard/src/lib.rs +++ b/core/shard/src/lib.rs @@ -160,7 +160,11 @@ where Entry = Message<PrepareHeader>, Header = PrepareHeader, >, - M: StateMachine<Input = Message<PrepareHeader>>, + M: StateMachine< + Input = Message<PrepareHeader>, + Output = bytes::Bytes, + Error = iggy_common::IggyError, + >, { match MessageBag::from(message) { MessageBag::Request(request) => self.on_request(request).await, @@ -178,7 +182,11 @@ where Entry = Message<PrepareHeader>, Header = PrepareHeader, >, - M: StateMachine<Input = Message<PrepareHeader>>, + M: StateMachine< + Input = Message<PrepareHeader>, + Output = bytes::Bytes, + Error = iggy_common::IggyError, + >, { self.plane.on_request(request).await; } @@ -192,7 +200,11 @@ where Entry = Message<PrepareHeader>, Header = PrepareHeader, >, - M: StateMachine<Input = Message<PrepareHeader>>, + M: StateMachine< + Input = Message<PrepareHeader>, + Output = bytes::Bytes, + Error = iggy_common::IggyError, + >, { self.plane.on_replicate(prepare).await; } @@ -206,7 +218,11 @@ where Entry = Message<PrepareHeader>, Header = PrepareHeader, >, - M: StateMachine<Input = Message<PrepareHeader>>, + M: StateMachine< + Input = Message<PrepareHeader>, + Output = bytes::Bytes, + Error = iggy_common::IggyError, + >, { self.plane.on_ack(prepare_ok).await; } @@ -228,7 +244,11 @@ where Entry = Message<PrepareHeader>, Header = PrepareHeader, >, - M: StateMachine<Input = Message<PrepareHeader>>, + M: StateMachine< + Input = Message<PrepareHeader>, + Output = bytes::Bytes, + Error = iggy_common::IggyError, + >, { debug_assert!(buf.is_empty(), "buf must be empty on entry");
