This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch refactor-binary-2-wire-types in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 730d7ebb3c3351355345cc5eeca71137637c002c Author: Hubert Gruszecki <[email protected]> AuthorDate: Mon Mar 16 15:51:05 2026 +0100 feat(rust): add wire protocol codec and types to binary_protocol crate Pure codec layer (no I/O, no async) with protocol-owned types decoupled from iggy_common. Adds WireEncode/WireDecode traits, WireName validated newtype, WireIdentifier, command codes, stream request/response types, VSR consensus headers with bytemuck zero-copy, and Operation-to-CommandCode mapping. 62 tests. First step - remaining domains to follow. --- Cargo.lock | 6 + core/binary_protocol/Cargo.toml | 11 +- core/binary_protocol/src/codec.rs | 174 +++++++++++ core/binary_protocol/src/codes.rs | 218 ++++++++++++++ core/binary_protocol/src/consensus/command.rs | 67 +++++ core/binary_protocol/src/consensus/commit.rs | 76 +++++ .../src/consensus/do_view_change.rs | 93 ++++++ core/binary_protocol/src/consensus/error.rs | 64 ++++ core/binary_protocol/src/consensus/generic.rs | 77 +++++ core/binary_protocol/src/consensus/header_trait.rs | 59 ++++ core/binary_protocol/src/consensus/message.rs | 331 +++++++++++++++++++++ core/binary_protocol/src/consensus/mod.rs | 69 +++++ core/binary_protocol/src/consensus/operation.rs | 218 ++++++++++++++ core/binary_protocol/src/consensus/prepare.rs | 108 +++++++ core/binary_protocol/src/consensus/prepare_ok.rs | 106 +++++++ core/binary_protocol/src/consensus/reply.rs | 117 ++++++++ core/binary_protocol/src/consensus/request.rs | 123 ++++++++ core/binary_protocol/src/consensus/start_view.rs | 86 ++++++ .../src/consensus/start_view_change.rs | 75 +++++ core/binary_protocol/src/error.rs | 50 ++++ core/binary_protocol/src/frame.rs | 26 ++ core/binary_protocol/src/identifier.rs | 252 ++++++++++++++++ core/binary_protocol/src/lib.rs | 77 +++-- core/binary_protocol/src/requests/mod.rs | 18 ++ .../src/requests/streams/create_stream.rs | 96 ++++++ .../src/requests/streams/delete_stream.rs | 60 ++++ .../src/requests/streams/get_stream.rs | 80 +++++ .../src/requests/streams/get_streams.rs | 53 ++++ core/binary_protocol/src/requests/streams/mod.rs | 30 ++ .../src/requests/streams/purge_stream.rs | 60 ++++ .../src/requests/streams/update_stream.rs | 81 +++++ core/binary_protocol/src/responses/mod.rs | 40 +++ .../src/responses/streams/create_stream.rs | 22 ++ .../src/responses/streams/delete_stream.rs | 19 ++ .../src/responses/streams/get_stream.rs | 232 +++++++++++++++ .../src/responses/streams/get_streams.rs | 103 +++++++ core/binary_protocol/src/responses/streams/mod.rs | 33 ++ .../src/responses/streams/purge_stream.rs | 19 ++ .../src/responses/streams/stream_response.rs | 142 +++++++++ .../src/responses/streams/update_stream.rs | 19 ++ core/binary_protocol/src/wire_name.rs | 170 +++++++++++ 41 files changed, 3742 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3afb4cadd..e1e5efb8c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5323,6 +5323,12 @@ dependencies = [ [[package]] name = "iggy_binary_protocol" version = "0.9.2-edge.1" +dependencies = [ + "bytemuck", + "bytes", + "enumset", + "thiserror 2.0.18", +] [[package]] name = "iggy_common" diff --git a/core/binary_protocol/Cargo.toml b/core/binary_protocol/Cargo.toml index 2d809084d..12ed184fd 100644 --- a/core/binary_protocol/Cargo.toml +++ b/core/binary_protocol/Cargo.toml @@ -18,7 +18,7 @@ [package] name = "iggy_binary_protocol" version = "0.9.2-edge.1" -description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." +description = "Wire protocol types and codec for the Iggy binary protocol. Shared between server and SDK." edition = "2024" license = "Apache-2.0" keywords = ["iggy", "messaging", "streaming"] @@ -29,3 +29,12 @@ repository = "https://github.com/apache/iggy" readme = "../../README.md" [dependencies] +bytemuck = { workspace = true } +bytes = { workspace = true } +enumset = { workspace = true } +thiserror = { workspace = true } + +[lints.clippy] +enum_glob_use = "deny" +nursery = "warn" +pedantic = "deny" diff --git a/core/binary_protocol/src/codec.rs b/core/binary_protocol/src/codec.rs new file mode 100644 index 000000000..3aedd9c5e --- /dev/null +++ b/core/binary_protocol/src/codec.rs @@ -0,0 +1,174 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::WireError; +use bytes::{Bytes, BytesMut}; + +/// Encode a wire type into a caller-owned buffer. +/// +/// Buffer-first design: the caller controls allocation. The `to_bytes()` +/// convenience method allocates when needed, but hot paths can reuse +/// buffers via `encode()` directly. +pub trait WireEncode { + /// Write the encoded representation into `buf`. + fn encode(&self, buf: &mut BytesMut); + + /// Return the exact encoded size in bytes. + fn encoded_size(&self) -> usize; + + /// Convenience: allocate a new [`Bytes`] and encode into it. + #[must_use] + fn to_bytes(&self) -> Bytes { + let mut buf = BytesMut::with_capacity(self.encoded_size()); + self.encode(&mut buf); + buf.freeze() + } +} + +/// Decode a wire type from a byte slice. +/// +/// Takes `&[u8]` instead of [`Bytes`] to avoid requiring reference-counted +/// ownership at the decode boundary. +pub trait WireDecode: Sized { + /// Decode from `buf`, consuming exactly the bytes needed. + /// Returns the decoded value and the number of bytes consumed. + /// + /// # Errors + /// Returns `WireError` if the buffer is too short or contains invalid data. + fn decode(buf: &[u8]) -> Result<(Self, usize), WireError>; + + /// Convenience: decode from the entire buffer, ignoring trailing bytes. + /// + /// # Errors + /// Returns `WireError` if decoding fails. + fn decode_from(buf: &[u8]) -> Result<Self, WireError> { + Self::decode(buf).map(|(val, _)| val) + } +} + +/// Helper to read a `u8` from `buf` at `offset`. +/// +/// # Errors +/// Returns `WireError::UnexpectedEof` if `offset` is out of bounds. +#[inline] +pub fn read_u8(buf: &[u8], offset: usize) -> Result<u8, WireError> { + buf.get(offset) + .copied() + .ok_or_else(|| WireError::UnexpectedEof { + offset, + need: 1, + have: buf.len().saturating_sub(offset), + }) +} + +/// Helper to read a `u32` LE from `buf` at `offset`. +/// +/// # Errors +/// Returns `WireError::UnexpectedEof` if fewer than 4 bytes remain. +#[allow(clippy::missing_panics_doc)] +#[inline] +pub fn read_u32_le(buf: &[u8], offset: usize) -> Result<u32, WireError> { + let end = offset + .checked_add(4) + .ok_or_else(|| WireError::UnexpectedEof { + offset, + need: 4, + have: buf.len().saturating_sub(offset), + })?; + let slice = buf + .get(offset..end) + .ok_or_else(|| WireError::UnexpectedEof { + offset, + need: 4, + have: buf.len().saturating_sub(offset), + })?; + Ok(u32::from_le_bytes( + slice.try_into().expect("slice is exactly 4 bytes"), + )) +} + +/// Helper to read a `u64` LE from `buf` at `offset`. +/// +/// # Errors +/// Returns `WireError::UnexpectedEof` if fewer than 8 bytes remain. +#[allow(clippy::missing_panics_doc)] +#[inline] +pub fn read_u64_le(buf: &[u8], offset: usize) -> Result<u64, WireError> { + let end = offset + .checked_add(8) + .ok_or_else(|| WireError::UnexpectedEof { + offset, + need: 8, + have: buf.len().saturating_sub(offset), + })?; + let slice = buf + .get(offset..end) + .ok_or_else(|| WireError::UnexpectedEof { + offset, + need: 8, + have: buf.len().saturating_sub(offset), + })?; + Ok(u64::from_le_bytes( + slice.try_into().expect("slice is exactly 8 bytes"), + )) +} + +/// Helper to read a UTF-8 string of `len` bytes from `buf` at `offset`. +/// +/// # Errors +/// Returns `WireError::UnexpectedEof` or `WireError::InvalidUtf8` on failure. +#[inline] +pub fn read_str(buf: &[u8], offset: usize, len: usize) -> Result<String, WireError> { + let end = offset + .checked_add(len) + .ok_or_else(|| WireError::UnexpectedEof { + offset, + need: len, + have: buf.len().saturating_sub(offset), + })?; + let slice = buf + .get(offset..end) + .ok_or_else(|| WireError::UnexpectedEof { + offset, + need: len, + have: buf.len().saturating_sub(offset), + })?; + std::str::from_utf8(slice) + .map(str::to_string) + .map_err(|_| WireError::InvalidUtf8 { offset }) +} + +/// Helper to read a byte slice of `len` bytes from `buf` at `offset`. +/// +/// # Errors +/// Returns `WireError::UnexpectedEof` if fewer than `len` bytes remain. +#[inline] +pub fn read_bytes(buf: &[u8], offset: usize, len: usize) -> Result<&[u8], WireError> { + let end = offset + .checked_add(len) + .ok_or_else(|| WireError::UnexpectedEof { + offset, + need: len, + have: buf.len().saturating_sub(offset), + })?; + buf.get(offset..end) + .ok_or_else(|| WireError::UnexpectedEof { + offset, + need: len, + have: buf.len().saturating_sub(offset), + }) +} diff --git a/core/binary_protocol/src/codes.rs b/core/binary_protocol/src/codes.rs new file mode 100644 index 000000000..254cd210b --- /dev/null +++ b/core/binary_protocol/src/codes.rs @@ -0,0 +1,218 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::WireError; + +// -- System -- +pub const PING_CODE: u32 = 1; +pub const GET_STATS_CODE: u32 = 10; +pub const GET_SNAPSHOT_FILE_CODE: u32 = 11; +pub const GET_CLUSTER_METADATA_CODE: u32 = 12; +pub const GET_ME_CODE: u32 = 20; +pub const GET_CLIENT_CODE: u32 = 21; +pub const GET_CLIENTS_CODE: u32 = 22; + +// -- Users -- +pub const GET_USER_CODE: u32 = 31; +pub const GET_USERS_CODE: u32 = 32; +pub const CREATE_USER_CODE: u32 = 33; +pub const DELETE_USER_CODE: u32 = 34; +pub const UPDATE_USER_CODE: u32 = 35; +pub const UPDATE_PERMISSIONS_CODE: u32 = 36; +pub const CHANGE_PASSWORD_CODE: u32 = 37; +pub const LOGIN_USER_CODE: u32 = 38; +pub const LOGOUT_USER_CODE: u32 = 39; + +// -- Personal Access Tokens -- +pub const GET_PERSONAL_ACCESS_TOKENS_CODE: u32 = 41; +pub const CREATE_PERSONAL_ACCESS_TOKEN_CODE: u32 = 42; +pub const DELETE_PERSONAL_ACCESS_TOKEN_CODE: u32 = 43; +pub const LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE: u32 = 44; + +// -- Messages -- +pub const POLL_MESSAGES_CODE: u32 = 100; +pub const SEND_MESSAGES_CODE: u32 = 101; +pub const FLUSH_UNSAVED_BUFFER_CODE: u32 = 102; + +// -- Consumer Offsets -- +pub const GET_CONSUMER_OFFSET_CODE: u32 = 120; +pub const STORE_CONSUMER_OFFSET_CODE: u32 = 121; +pub const DELETE_CONSUMER_OFFSET_CODE: u32 = 122; + +// -- Streams -- +pub const GET_STREAM_CODE: u32 = 200; +pub const GET_STREAMS_CODE: u32 = 201; +pub const CREATE_STREAM_CODE: u32 = 202; +pub const DELETE_STREAM_CODE: u32 = 203; +pub const UPDATE_STREAM_CODE: u32 = 204; +pub const PURGE_STREAM_CODE: u32 = 205; + +// -- Topics -- +pub const GET_TOPIC_CODE: u32 = 300; +pub const GET_TOPICS_CODE: u32 = 301; +pub const CREATE_TOPIC_CODE: u32 = 302; +pub const DELETE_TOPIC_CODE: u32 = 303; +pub const UPDATE_TOPIC_CODE: u32 = 304; +pub const PURGE_TOPIC_CODE: u32 = 305; + +// -- Partitions -- +pub const CREATE_PARTITIONS_CODE: u32 = 402; +pub const DELETE_PARTITIONS_CODE: u32 = 403; + +// -- Segments -- +pub const DELETE_SEGMENTS_CODE: u32 = 503; + +// -- Consumer Groups -- +pub const GET_CONSUMER_GROUP_CODE: u32 = 600; +pub const GET_CONSUMER_GROUPS_CODE: u32 = 601; +pub const CREATE_CONSUMER_GROUP_CODE: u32 = 602; +pub const DELETE_CONSUMER_GROUP_CODE: u32 = 603; +pub const JOIN_CONSUMER_GROUP_CODE: u32 = 604; +pub const LEAVE_CONSUMER_GROUP_CODE: u32 = 605; + +/// # Errors +/// Returns `WireError::UnknownCommand` if the code is not recognized. +pub const fn command_name(code: u32) -> Result<&'static str, WireError> { + match code { + PING_CODE => Ok("ping"), + GET_STATS_CODE => Ok("stats"), + GET_SNAPSHOT_FILE_CODE => Ok("snapshot"), + GET_CLUSTER_METADATA_CODE => Ok("cluster.metadata"), + GET_ME_CODE => Ok("me"), + GET_CLIENT_CODE => Ok("client.get"), + GET_CLIENTS_CODE => Ok("client.list"), + GET_USER_CODE => Ok("user.get"), + GET_USERS_CODE => Ok("user.list"), + CREATE_USER_CODE => Ok("user.create"), + DELETE_USER_CODE => Ok("user.delete"), + UPDATE_USER_CODE => Ok("user.update"), + UPDATE_PERMISSIONS_CODE => Ok("user.permissions"), + CHANGE_PASSWORD_CODE => Ok("user.password"), + LOGIN_USER_CODE => Ok("user.login"), + LOGOUT_USER_CODE => Ok("user.logout"), + GET_PERSONAL_ACCESS_TOKENS_CODE => Ok("personal_access_token.list"), + CREATE_PERSONAL_ACCESS_TOKEN_CODE => Ok("personal_access_token.create"), + DELETE_PERSONAL_ACCESS_TOKEN_CODE => Ok("personal_access_token.delete"), + LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE => Ok("personal_access_token.login"), + POLL_MESSAGES_CODE => Ok("message.poll"), + SEND_MESSAGES_CODE => Ok("message.send"), + FLUSH_UNSAVED_BUFFER_CODE => Ok("message.flush_unsaved_buffer"), + GET_CONSUMER_OFFSET_CODE => Ok("consumer_offset.get"), + STORE_CONSUMER_OFFSET_CODE => Ok("consumer_offset.store"), + DELETE_CONSUMER_OFFSET_CODE => Ok("consumer_offset.delete"), + GET_STREAM_CODE => Ok("stream.get"), + GET_STREAMS_CODE => Ok("stream.list"), + CREATE_STREAM_CODE => Ok("stream.create"), + DELETE_STREAM_CODE => Ok("stream.delete"), + UPDATE_STREAM_CODE => Ok("stream.update"), + PURGE_STREAM_CODE => Ok("stream.purge"), + GET_TOPIC_CODE => Ok("topic.get"), + GET_TOPICS_CODE => Ok("topic.list"), + CREATE_TOPIC_CODE => Ok("topic.create"), + DELETE_TOPIC_CODE => Ok("topic.delete"), + UPDATE_TOPIC_CODE => Ok("topic.update"), + PURGE_TOPIC_CODE => Ok("topic.purge"), + CREATE_PARTITIONS_CODE => Ok("partition.create"), + DELETE_PARTITIONS_CODE => Ok("partition.delete"), + DELETE_SEGMENTS_CODE => Ok("segment.delete"), + GET_CONSUMER_GROUP_CODE => Ok("consumer_group.get"), + GET_CONSUMER_GROUPS_CODE => Ok("consumer_group.list"), + CREATE_CONSUMER_GROUP_CODE => Ok("consumer_group.create"), + DELETE_CONSUMER_GROUP_CODE => Ok("consumer_group.delete"), + JOIN_CONSUMER_GROUP_CODE => Ok("consumer_group.join"), + LEAVE_CONSUMER_GROUP_CODE => Ok("consumer_group.leave"), + _ => Err(WireError::UnknownCommand(code)), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + const ALL_CODES: &[u32] = &[ + PING_CODE, + GET_STATS_CODE, + GET_SNAPSHOT_FILE_CODE, + GET_CLUSTER_METADATA_CODE, + GET_ME_CODE, + GET_CLIENT_CODE, + GET_CLIENTS_CODE, + GET_USER_CODE, + GET_USERS_CODE, + CREATE_USER_CODE, + DELETE_USER_CODE, + UPDATE_USER_CODE, + UPDATE_PERMISSIONS_CODE, + CHANGE_PASSWORD_CODE, + LOGIN_USER_CODE, + LOGOUT_USER_CODE, + GET_PERSONAL_ACCESS_TOKENS_CODE, + CREATE_PERSONAL_ACCESS_TOKEN_CODE, + DELETE_PERSONAL_ACCESS_TOKEN_CODE, + LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE, + POLL_MESSAGES_CODE, + SEND_MESSAGES_CODE, + FLUSH_UNSAVED_BUFFER_CODE, + GET_CONSUMER_OFFSET_CODE, + STORE_CONSUMER_OFFSET_CODE, + DELETE_CONSUMER_OFFSET_CODE, + GET_STREAM_CODE, + GET_STREAMS_CODE, + CREATE_STREAM_CODE, + DELETE_STREAM_CODE, + UPDATE_STREAM_CODE, + PURGE_STREAM_CODE, + GET_TOPIC_CODE, + GET_TOPICS_CODE, + CREATE_TOPIC_CODE, + DELETE_TOPIC_CODE, + UPDATE_TOPIC_CODE, + PURGE_TOPIC_CODE, + CREATE_PARTITIONS_CODE, + DELETE_PARTITIONS_CODE, + DELETE_SEGMENTS_CODE, + GET_CONSUMER_GROUP_CODE, + GET_CONSUMER_GROUPS_CODE, + CREATE_CONSUMER_GROUP_CODE, + DELETE_CONSUMER_GROUP_CODE, + JOIN_CONSUMER_GROUP_CODE, + LEAVE_CONSUMER_GROUP_CODE, + ]; + + #[test] + fn every_code_has_a_name() { + for &code in ALL_CODES { + assert!( + command_name(code).is_ok(), + "missing name for command code {code}" + ); + } + } + + #[test] + fn no_duplicate_codes() { + let mut seen = std::collections::HashSet::new(); + for &code in ALL_CODES { + assert!(seen.insert(code), "duplicate command code: {code}"); + } + } + + #[test] + fn unknown_code_returns_error() { + assert!(command_name(9999).is_err()); + } +} diff --git a/core/binary_protocol/src/consensus/command.rs b/core/binary_protocol/src/consensus/command.rs new file mode 100644 index 000000000..8eea606bc --- /dev/null +++ b/core/binary_protocol/src/consensus/command.rs @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use bytemuck::{CheckedBitPattern, NoUninit}; +use enumset::EnumSetType; + +/// VSR message type discriminant. +#[derive(Default, Debug, EnumSetType)] +#[repr(u8)] +pub enum Command2 { + #[default] + Reserved = 0, + + Ping = 1, + Pong = 2, + PingClient = 3, + PongClient = 4, + + Request = 5, + Prepare = 6, + PrepareOk = 7, + Reply = 8, + Commit = 9, + + StartViewChange = 10, + DoViewChange = 11, + StartView = 12, +} + +// SAFETY: Command2 is #[repr(u8)] with no padding bytes. +unsafe impl NoUninit for Command2 {} + +// SAFETY: Command2 is #[repr(u8)]; is_valid_bit_pattern matches all defined discriminants. +unsafe impl CheckedBitPattern for Command2 { + type Bits = u8; + + fn is_valid_bit_pattern(bits: &u8) -> bool { + *bits <= 12 + } +} + +#[cfg(test)] +mod tests { + use crate::consensus::GenericHeader; + + #[test] + fn invalid_bit_pattern_rejected() { + let mut buf = bytes::BytesMut::zeroed(256); + buf[60] = 99; + let result = bytemuck::checked::try_from_bytes::<GenericHeader>(&buf); + assert!(result.is_err()); + } +} diff --git a/core/binary_protocol/src/consensus/commit.rs b/core/binary_protocol/src/consensus/commit.rs new file mode 100644 index 000000000..93d23b831 --- /dev/null +++ b/core/binary_protocol/src/consensus/commit.rs @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::{Command2, ConsensusError, ConsensusHeader, HEADER_SIZE, Operation}; +use bytemuck::{CheckedBitPattern, NoUninit}; +use std::mem::offset_of; + +/// Primary -> replicas: commit up to this op. Header-only (no body). +#[repr(C)] +#[derive(Debug, Clone, Copy, CheckedBitPattern, NoUninit)] +pub struct CommitHeader { + pub checksum: u128, + pub checksum_body: u128, + pub cluster: u128, + pub size: u32, + pub view: u32, + pub release: u32, + pub command: Command2, + pub replica: u8, + pub reserved_frame: [u8; 66], + + pub commit_checksum: u128, + pub timestamp_monotonic: u64, + pub commit: u64, + pub checkpoint_op: u64, + pub namespace: u64, + pub reserved: [u8; 80], +} +const _: () = { + assert!(size_of::<CommitHeader>() == HEADER_SIZE); + assert!( + offset_of!(CommitHeader, commit_checksum) + == offset_of!(CommitHeader, reserved_frame) + size_of::<[u8; 66]>() + ); + assert!(offset_of!(CommitHeader, reserved) + size_of::<[u8; 80]>() == HEADER_SIZE); +}; + +impl ConsensusHeader for CommitHeader { + const COMMAND: Command2 = Command2::Commit; + + fn operation(&self) -> Operation { + Operation::Reserved + } + + fn command(&self) -> Command2 { + self.command + } + + fn validate(&self) -> Result<(), ConsensusError> { + if self.command != Command2::Commit { + return Err(ConsensusError::CommitInvalidCommand2); + } + if self.size != 256 { + return Err(ConsensusError::CommitInvalidSize(self.size)); + } + Ok(()) + } + + fn size(&self) -> u32 { + self.size + } +} diff --git a/core/binary_protocol/src/consensus/do_view_change.rs b/core/binary_protocol/src/consensus/do_view_change.rs new file mode 100644 index 000000000..be09cc1eb --- /dev/null +++ b/core/binary_protocol/src/consensus/do_view_change.rs @@ -0,0 +1,93 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::{Command2, ConsensusError, ConsensusHeader, HEADER_SIZE, Operation}; +use bytemuck::{CheckedBitPattern, NoUninit}; +use std::mem::offset_of; + +/// Replica -> primary candidate: vote for view change. Header-only. +#[derive(Debug, Clone, Copy, PartialEq, Eq, CheckedBitPattern, NoUninit)] +#[repr(C)] +pub struct DoViewChangeHeader { + pub checksum: u128, + pub checksum_body: u128, + pub cluster: u128, + pub size: u32, + pub view: u32, + pub release: u32, + pub command: Command2, + pub replica: u8, + pub reserved_frame: [u8; 66], + + /// Highest op-number in this replica's log. + pub op: u64, + /// Highest committed op. + pub commit: u64, + pub namespace: u64, + /// View when status was last normal (key for log selection). + pub log_view: u32, + pub reserved: [u8; 100], +} +const _: () = { + assert!(size_of::<DoViewChangeHeader>() == HEADER_SIZE); + assert!( + offset_of!(DoViewChangeHeader, op) + == offset_of!(DoViewChangeHeader, reserved_frame) + size_of::<[u8; 66]>() + ); + assert!(offset_of!(DoViewChangeHeader, reserved) + size_of::<[u8; 100]>() == HEADER_SIZE); +}; + +impl ConsensusHeader for DoViewChangeHeader { + const COMMAND: Command2 = Command2::DoViewChange; + + fn operation(&self) -> Operation { + Operation::Reserved + } + + fn command(&self) -> Command2 { + self.command + } + + fn validate(&self) -> Result<(), ConsensusError> { + if self.command != Command2::DoViewChange { + return Err(ConsensusError::InvalidCommand { + expected: Command2::DoViewChange, + found: self.command, + }); + } + if self.release != 0 { + return Err(ConsensusError::InvalidField( + "release must be 0".to_string(), + )); + } + if self.log_view > self.view { + return Err(ConsensusError::InvalidField( + "log_view cannot exceed view".to_string(), + )); + } + if self.commit > self.op { + return Err(ConsensusError::InvalidField( + "commit cannot exceed op".to_string(), + )); + } + Ok(()) + } + + fn size(&self) -> u32 { + self.size + } +} diff --git a/core/binary_protocol/src/consensus/error.rs b/core/binary_protocol/src/consensus/error.rs new file mode 100644 index 000000000..cb7838128 --- /dev/null +++ b/core/binary_protocol/src/consensus/error.rs @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::command::Command2; +use thiserror::Error; + +#[derive(Debug, Clone, Error, PartialEq, Eq)] +pub enum ConsensusError { + #[error("invalid command: expected {expected:?}, found {found:?}")] + InvalidCommand { expected: Command2, found: Command2 }, + + #[error("invalid size: expected {expected:?}, found {found:?}")] + InvalidSize { expected: u32, found: u32 }, + + #[error("invalid checksum")] + InvalidChecksum, + + #[error("invalid cluster ID")] + InvalidCluster, + + #[error("invalid field: {0}")] + InvalidField(String), + + // -- Reserved for future consensus validation -- + // These variants exist in the original iggy_common implementation + // and will be used when the consensus crate migrates to this crate. + #[error("parent_padding must be 0")] + PrepareParentPaddingNonZero, + + #[error("request_checksum_padding must be 0")] + PrepareRequestChecksumPaddingNonZero, + + #[error("command must be Commit")] + CommitInvalidCommand2, + + #[error("size must be 256, found {0}")] + CommitInvalidSize(u32), + + #[error("command must be Reply")] + ReplyInvalidCommand2, + + #[error("request_checksum_padding must be 0")] + ReplyRequestChecksumPaddingNonZero, + + #[error("context_padding must be 0")] + ReplyContextPaddingNonZero, + + #[error("invalid bit pattern in header (enum discriminant out of range)")] + InvalidBitPattern, +} diff --git a/core/binary_protocol/src/consensus/generic.rs b/core/binary_protocol/src/consensus/generic.rs new file mode 100644 index 000000000..8945b6b7c --- /dev/null +++ b/core/binary_protocol/src/consensus/generic.rs @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::{Command2, ConsensusError, ConsensusHeader, HEADER_SIZE, Operation}; +use bytemuck::{CheckedBitPattern, NoUninit}; +use std::mem::offset_of; + +/// Type-erased 256-byte header for initial message dispatch. +#[repr(C)] +#[derive(Debug, Clone, Copy, CheckedBitPattern, NoUninit)] +pub struct GenericHeader { + pub checksum: u128, + pub checksum_body: u128, + pub cluster: u128, + pub size: u32, + pub view: u32, + pub release: u32, + pub command: Command2, + pub replica: u8, + pub reserved_frame: [u8; 66], + pub reserved_command: [u8; 128], +} +const _: () = { + assert!(size_of::<GenericHeader>() == HEADER_SIZE); + assert!( + offset_of!(GenericHeader, reserved_command) + == offset_of!(GenericHeader, reserved_frame) + size_of::<[u8; 66]>() + ); + assert!(offset_of!(GenericHeader, reserved_command) + size_of::<[u8; 128]>() == HEADER_SIZE); +}; + +impl ConsensusHeader for GenericHeader { + const COMMAND: Command2 = Command2::Reserved; + + fn operation(&self) -> Operation { + Operation::Reserved + } + + fn command(&self) -> Command2 { + self.command + } + + fn validate(&self) -> Result<(), ConsensusError> { + Ok(()) + } + + fn size(&self) -> u32 { + self.size + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn generic_header_zero_copy() { + let buf = bytes::BytesMut::zeroed(256); + let header: &GenericHeader = bytemuck::checked::try_from_bytes(&buf).unwrap(); + assert_eq!(header.command, Command2::Reserved); + assert_eq!(header.size, 0); + } +} diff --git a/core/binary_protocol/src/consensus/header_trait.rs b/core/binary_protocol/src/consensus/header_trait.rs new file mode 100644 index 000000000..aedfd11b1 --- /dev/null +++ b/core/binary_protocol/src/consensus/header_trait.rs @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::command::Command2; +use super::error::ConsensusError; +use super::operation::Operation; +use bytemuck::{CheckedBitPattern, NoUninit}; + +pub const HEADER_SIZE: usize = 256; + +/// Trait implemented by all consensus header types. +/// +/// Every header is exactly [`HEADER_SIZE`] bytes, `#[repr(C)]`, and supports +/// zero-copy deserialization via `bytemuck`. +pub trait ConsensusHeader: Sized + CheckedBitPattern + NoUninit { + const COMMAND: Command2; + + /// # Errors + /// Returns `ConsensusError` if the header fields are inconsistent. + fn validate(&self) -> Result<(), ConsensusError>; + fn operation(&self) -> Operation; + fn command(&self) -> Command2; + fn size(&self) -> u32; +} + +#[cfg(test)] +mod tests { + use crate::consensus::{ + CommitHeader, DoViewChangeHeader, GenericHeader, PrepareHeader, PrepareOkHeader, + ReplyHeader, RequestHeader, StartViewChangeHeader, StartViewHeader, + }; + + #[test] + fn all_headers_are_256_bytes() { + assert_eq!(size_of::<GenericHeader>(), 256); + assert_eq!(size_of::<RequestHeader>(), 256); + assert_eq!(size_of::<ReplyHeader>(), 256); + assert_eq!(size_of::<PrepareHeader>(), 256); + assert_eq!(size_of::<PrepareOkHeader>(), 256); + assert_eq!(size_of::<CommitHeader>(), 256); + assert_eq!(size_of::<StartViewChangeHeader>(), 256); + assert_eq!(size_of::<DoViewChangeHeader>(), 256); + assert_eq!(size_of::<StartViewHeader>(), 256); + } +} diff --git a/core/binary_protocol/src/consensus/message.rs b/core/binary_protocol/src/consensus/message.rs new file mode 100644 index 000000000..c5785b5f0 --- /dev/null +++ b/core/binary_protocol/src/consensus/message.rs @@ -0,0 +1,331 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Zero-copy consensus message wrapper. +//! +//! `Message<H>` wraps a `Bytes` buffer and provides typed access to the +//! header via `bytemuck` pointer cast (zero allocation, zero copy). + +use crate::consensus::{Command2, ConsensusError, ConsensusHeader, GenericHeader}; +use bytes::Bytes; +use std::marker::PhantomData; + +/// Zero-copy message wrapping a `Bytes` buffer with a typed header. +/// +/// The header is accessed via `bytemuck::try_from_bytes` - a pointer cast, +/// not a deserialization step. The body is the slice after the header, +/// sized according to `header.size()`. +#[repr(C)] +#[derive(Debug, Clone)] +pub struct Message<H: ConsensusHeader> { + buffer: Bytes, + _marker: PhantomData<H>, +} + +impl<H: ConsensusHeader> Message<H> { + /// Create a message from a buffer, validating the header. + /// + /// # Errors + /// Returns `ConsensusError` if the buffer is too small or the header is invalid. + pub fn from_bytes(buffer: Bytes) -> Result<Self, ConsensusError> { + if buffer.len() < size_of::<H>() { + return Err(ConsensusError::InvalidCommand { + expected: H::COMMAND, + found: Command2::Reserved, + }); + } + + // TODO: bytemuck::checked::try_from_bytes requires the buffer to be aligned + // to the target type's alignment. Consensus headers contain u128 fields (16-byte + // alignment), but Bytes from Vec<u8> only guarantees 8-byte alignment. The checked + // variant returns Err on misalignment (no UB), but production code can fail on + // misaligned buffers. The original iggy_common code has the same limitation. + // Fix by either using pod_read_unaligned (copies header) or ensuring aligned + // allocation at the network receive layer. + let header_bytes = &buffer[..size_of::<H>()]; + let header = bytemuck::checked::try_from_bytes::<H>(header_bytes) + .map_err(|_| ConsensusError::InvalidBitPattern)?; + + header.validate()?; + + let header_size = header.size() as usize; + if buffer.len() < header_size { + return Err(ConsensusError::InvalidCommand { + expected: H::COMMAND, + found: Command2::Reserved, + }); + } + + Ok(Self { + buffer, + _marker: PhantomData, + }) + } + + /// Create a zero-initialized message of the given size. + /// The caller must initialize the header fields. + /// + /// # Panics + /// Panics if `size` is smaller than the header size. + #[must_use] + pub fn new(size: usize) -> Self { + assert!( + size >= size_of::<H>(), + "size must be at least header size ({})", + size_of::<H>() + ); + Self { + buffer: Bytes::from(vec![0u8; size]), + _marker: PhantomData, + } + } + + /// Access the typed header (zero-copy pointer cast). + #[must_use] + #[allow(clippy::missing_panics_doc)] + #[inline] + pub fn header(&self) -> &H { + let header_bytes = &self.buffer[..size_of::<H>()]; + bytemuck::checked::try_from_bytes(header_bytes) + .expect("header validated at construction time") + } + + /// Message body (everything after the header). + #[must_use] + #[inline] + pub fn body(&self) -> &[u8] { + let header_size = size_of::<H>(); + let total_size = self.header().size() as usize; + if total_size > header_size { + &self.buffer[header_size..total_size] + } else { + &[] + } + } + + /// Message body as zero-copy [`Bytes`]. + #[must_use] + #[inline] + pub fn body_bytes(&self) -> Bytes { + let header_size = size_of::<H>(); + let total_size = self.header().size() as usize; + if total_size > header_size { + self.buffer.slice(header_size..total_size) + } else { + Bytes::new() + } + } + + /// Complete message bytes (header + body). + #[must_use] + #[inline] + pub fn as_bytes(&self) -> &[u8] { + let total_size = self.header().size() as usize; + &self.buffer[..total_size] + } + + /// Consume into the underlying buffer. + #[must_use] + #[inline] + pub fn into_inner(self) -> Bytes { + self.buffer + } + + /// Convert to a generic message (type-erase the header). + #[must_use] + pub fn into_generic(self) -> Message<GenericHeader> { + // SAFETY: Message<H> and Message<GenericHeader> have identical layout + // (only PhantomData differs). GenericHeader accepts any command. + unsafe { Message::from_buffer_unchecked(self.buffer) } + } + + /// View as a generic message without consuming. + #[must_use] + #[allow(clippy::missing_const_for_fn)] + pub fn as_generic(&self) -> &Message<GenericHeader> { + // SAFETY: #[repr(C)] guarantees field layout. Message<H> and + // Message<GenericHeader> differ only in PhantomData (ZST). + unsafe { &*std::ptr::from_ref(self).cast::<Message<GenericHeader>>() } + } + + /// Try to reinterpret as a different header type. + /// + /// # Errors + /// Returns `ConsensusError` if the header command doesn't match or validation fails. + pub fn try_into_typed<T: ConsensusHeader>(self) -> Result<Message<T>, ConsensusError> { + if self.buffer.len() < size_of::<T>() { + return Err(ConsensusError::InvalidCommand { + expected: T::COMMAND, + found: Command2::Reserved, + }); + } + + let generic = self.as_generic(); + if generic.header().command != T::COMMAND { + return Err(ConsensusError::InvalidCommand { + expected: T::COMMAND, + found: generic.header().command, + }); + } + + let header_bytes = &self.buffer[..size_of::<T>()]; + let header = bytemuck::checked::try_from_bytes::<T>(header_bytes) + .map_err(|_| ConsensusError::InvalidBitPattern)?; + header.validate()?; + + Ok(unsafe { Message::<T>::from_buffer_unchecked(self.buffer) }) + } + + /// Transmute the header to a different type, preserving the buffer. + /// + /// The callback receives the old header (by value) and a mutable reference + /// to the new (zeroed) header, allowing field-by-field initialization. + /// + /// # Panics + /// Panics if `size_of::<H>() != size_of::<T>()`. + #[must_use] + pub fn transmute_header<T: ConsensusHeader>(self, f: impl FnOnce(H, &mut T)) -> Message<T> { + assert_eq!(size_of::<H>(), size_of::<T>()); + let old_header = *self.header(); + let buffer = self.into_inner(); + // TODO: This mutates through Bytes via cast_mut(), which violates aliasing rules. + // The original iggy_common code has the same pattern. Once iggy_common is migrated + // to use iggy_binary_protocol types, fix both by using Bytes::try_into_mut() -> + // mutate BytesMut -> freeze(). Tracked as part of the consensus migration. + unsafe { + let ptr = buffer.as_ptr().cast_mut(); + let slice = std::slice::from_raw_parts_mut(ptr, size_of::<T>()); + slice.fill(0); + let new_header = + bytemuck::checked::try_from_bytes_mut(slice).expect("zeroed bytes are valid"); + f(old_header, new_header); + } + Message { + buffer, + _marker: PhantomData, + } + } + + /// Create without validation. Private. + /// + /// # Safety + /// Buffer must already be validated for the target header type. + #[inline] + const unsafe fn from_buffer_unchecked(buffer: Bytes) -> Self { + Self { + buffer, + _marker: PhantomData, + } + } +} + +#[cfg(test)] +mod tests { + use super::Message; + use crate::consensus::{Command2, GenericHeader, RequestHeader}; + use bytes::{Bytes, BytesMut}; + + #[test] + fn generic_message_from_zeroed_buffer() { + let mut buf = BytesMut::zeroed(384); + let header = + bytemuck::checked::try_from_bytes_mut::<GenericHeader>(&mut buf[..256]).unwrap(); + header.size = 384; + header.cluster = 42; + + let msg = Message::<GenericHeader>::from_bytes(buf.freeze()).unwrap(); + assert_eq!(msg.header().cluster, 42); + assert_eq!(msg.header().size, 384); + assert_eq!(msg.body().len(), 128); + } + + #[test] + fn request_message_roundtrip() { + let mut buf = BytesMut::zeroed(320); + let header = + bytemuck::checked::try_from_bytes_mut::<RequestHeader>(&mut buf[..256]).unwrap(); + header.size = 320; + header.command = Command2::Request; + header.client = 12345; + header.operation = crate::consensus::Operation::CreateStream; + + let msg = Message::<RequestHeader>::from_bytes(buf.freeze()).unwrap(); + assert_eq!(msg.header().client, 12345); + assert_eq!( + msg.header().operation, + crate::consensus::Operation::CreateStream + ); + assert_eq!(msg.body().len(), 64); + } + + #[test] + fn too_small_buffer_rejected() { + let buf = Bytes::from(vec![0u8; 100]); // < 256 + assert!(Message::<GenericHeader>::from_bytes(buf).is_err()); + } + + #[test] + fn into_generic_and_back() { + let mut buf = BytesMut::zeroed(256); + let header = + bytemuck::checked::try_from_bytes_mut::<RequestHeader>(&mut buf[..256]).unwrap(); + header.size = 256; + header.command = Command2::Request; + + let msg = Message::<RequestHeader>::from_bytes(buf.freeze()).unwrap(); + let generic = msg.into_generic(); + assert_eq!(generic.header().command, Command2::Request); + + let back = generic.try_into_typed::<RequestHeader>().unwrap(); + assert_eq!(back.header().command, Command2::Request); + } + + #[test] + fn transmute_header_generic_to_request() { + let mut buf = BytesMut::zeroed(320); + let header = + bytemuck::checked::try_from_bytes_mut::<GenericHeader>(&mut buf[..256]).unwrap(); + header.size = 320; + header.cluster = 99; + + let generic = Message::<GenericHeader>::from_bytes(buf.freeze()).unwrap(); + let request = generic.transmute_header(|old, new: &mut RequestHeader| { + new.size = old.size; + new.command = Command2::Request; + new.cluster = old.cluster; + new.client = 42; + new.operation = crate::consensus::Operation::CreateStream; + }); + assert_eq!(request.header().command, Command2::Request); + assert_eq!(request.header().client, 42); + assert_eq!(request.header().cluster, 99); + assert_eq!(request.header().size, 320); + assert_eq!(request.body().len(), 64); + } + + #[test] + fn wrong_type_conversion_fails() { + let mut buf = BytesMut::zeroed(256); + let header = + bytemuck::checked::try_from_bytes_mut::<GenericHeader>(&mut buf[..256]).unwrap(); + header.size = 256; + header.command = Command2::Reserved; // not Request + + let msg = Message::<GenericHeader>::from_bytes(buf.freeze()).unwrap(); + assert!(msg.try_into_typed::<RequestHeader>().is_err()); + } +} diff --git a/core/binary_protocol/src/consensus/mod.rs b/core/binary_protocol/src/consensus/mod.rs new file mode 100644 index 000000000..af14f82c9 --- /dev/null +++ b/core/binary_protocol/src/consensus/mod.rs @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! VSR (Viewstamped Replication) consensus wire types. +//! +//! All headers are exactly 256 bytes with `#[repr(C)]` layout. Size is +//! enforced at compile time. Deserialization is a pointer cast (zero-copy) +//! via `bytemuck::try_from_bytes`. +//! +//! ## Client-facing +//! - [`RequestHeader`] - client -> primary +//! - [`ReplyHeader`] - primary -> client +//! - [`GenericHeader`] - type-erased envelope for dispatch +//! +//! ## Replication (server-to-server) +//! - [`PrepareHeader`] - primary -> replicas (replicate operation) +//! - [`PrepareOkHeader`] - replica -> primary (acknowledge) +//! - [`CommitHeader`] - primary -> replicas (commit, header-only) +//! +//! ## View change (server-to-server) +//! - [`StartViewChangeHeader`] - replica suspects primary failure (header-only) +//! - [`DoViewChangeHeader`] - replica -> primary candidate (header-only) +//! - [`StartViewHeader`] - new primary -> all replicas (header-only) +//! +//! ## Message wrapper +//! - [`message::Message`] - zero-copy `Bytes` wrapper with typed header access + +mod command; +mod commit; +mod do_view_change; +mod error; +mod generic; +mod header_trait; +pub mod message; +mod operation; +mod prepare; +mod prepare_ok; +mod reply; +mod request; +mod start_view; +mod start_view_change; + +pub use command::Command2; +pub use commit::CommitHeader; +pub use do_view_change::DoViewChangeHeader; +pub use error::ConsensusError; +pub use generic::GenericHeader; +pub use header_trait::{ConsensusHeader, HEADER_SIZE}; +pub use operation::Operation; +pub use prepare::PrepareHeader; +pub use prepare_ok::PrepareOkHeader; +pub use reply::ReplyHeader; +pub use request::RequestHeader; +pub use start_view::StartViewHeader; +pub use start_view_change::StartViewChangeHeader; diff --git a/core/binary_protocol/src/consensus/operation.rs b/core/binary_protocol/src/consensus/operation.rs new file mode 100644 index 000000000..018ea27a2 --- /dev/null +++ b/core/binary_protocol/src/consensus/operation.rs @@ -0,0 +1,218 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use bytemuck::{CheckedBitPattern, NoUninit}; + +/// Replicated operation discriminant. Identifies the state machine operation +/// carried in a consensus message body. +#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, NoUninit, CheckedBitPattern)] +#[repr(u8)] +pub enum Operation { + #[default] + Reserved = 0, + + // Metadata operations (shard 0) + CreateStream = 128, + UpdateStream = 129, + DeleteStream = 130, + PurgeStream = 131, + CreateTopic = 132, + UpdateTopic = 133, + DeleteTopic = 134, + PurgeTopic = 135, + CreatePartitions = 136, + DeletePartitions = 137, + DeleteSegments = 138, + CreateConsumerGroup = 139, + DeleteConsumerGroup = 140, + CreateUser = 141, + UpdateUser = 142, + DeleteUser = 143, + ChangePassword = 144, + UpdatePermissions = 145, + CreatePersonalAccessToken = 146, + DeletePersonalAccessToken = 147, + + // Partition operations (routed by namespace) + SendMessages = 160, + StoreConsumerOffset = 161, +} + +impl Operation { + /// Metadata / control-plane operations handled by shard 0. + #[must_use] + #[inline] + pub const fn is_metadata(&self) -> bool { + matches!( + self, + Self::CreateStream + | Self::UpdateStream + | Self::DeleteStream + | Self::PurgeStream + | Self::CreateTopic + | Self::UpdateTopic + | Self::DeleteTopic + | Self::PurgeTopic + | Self::CreatePartitions + | Self::DeletePartitions + | Self::CreateConsumerGroup + | Self::DeleteConsumerGroup + | Self::CreateUser + | Self::UpdateUser + | Self::DeleteUser + | Self::ChangePassword + | Self::UpdatePermissions + | Self::CreatePersonalAccessToken + | Self::DeletePersonalAccessToken + ) + } + + /// Data-plane operations routed to the shard owning the partition. + #[must_use] + #[inline] + pub const fn is_partition(&self) -> bool { + matches!( + self, + Self::SendMessages | Self::StoreConsumerOffset | Self::DeleteSegments + ) + } + + /// Bidirectional mapping: `Operation` -> client command code. + #[must_use] + pub const fn to_command_code(&self) -> Option<u32> { + use crate::codes; + match self { + Self::Reserved => None, + Self::CreateStream => Some(codes::CREATE_STREAM_CODE), + Self::UpdateStream => Some(codes::UPDATE_STREAM_CODE), + Self::DeleteStream => Some(codes::DELETE_STREAM_CODE), + Self::PurgeStream => Some(codes::PURGE_STREAM_CODE), + Self::CreateTopic => Some(codes::CREATE_TOPIC_CODE), + Self::UpdateTopic => Some(codes::UPDATE_TOPIC_CODE), + Self::DeleteTopic => Some(codes::DELETE_TOPIC_CODE), + Self::PurgeTopic => Some(codes::PURGE_TOPIC_CODE), + Self::CreatePartitions => Some(codes::CREATE_PARTITIONS_CODE), + Self::DeletePartitions => Some(codes::DELETE_PARTITIONS_CODE), + Self::DeleteSegments => Some(codes::DELETE_SEGMENTS_CODE), + Self::CreateConsumerGroup => Some(codes::CREATE_CONSUMER_GROUP_CODE), + Self::DeleteConsumerGroup => Some(codes::DELETE_CONSUMER_GROUP_CODE), + Self::CreateUser => Some(codes::CREATE_USER_CODE), + Self::UpdateUser => Some(codes::UPDATE_USER_CODE), + Self::DeleteUser => Some(codes::DELETE_USER_CODE), + Self::ChangePassword => Some(codes::CHANGE_PASSWORD_CODE), + Self::UpdatePermissions => Some(codes::UPDATE_PERMISSIONS_CODE), + Self::CreatePersonalAccessToken => Some(codes::CREATE_PERSONAL_ACCESS_TOKEN_CODE), + Self::DeletePersonalAccessToken => Some(codes::DELETE_PERSONAL_ACCESS_TOKEN_CODE), + Self::SendMessages => Some(codes::SEND_MESSAGES_CODE), + Self::StoreConsumerOffset => Some(codes::STORE_CONSUMER_OFFSET_CODE), + } + } + + /// Bidirectional mapping: client command code -> `Operation`. + #[must_use] + pub const fn from_command_code(code: u32) -> Option<Self> { + use crate::codes; + match code { + codes::CREATE_STREAM_CODE => Some(Self::CreateStream), + codes::UPDATE_STREAM_CODE => Some(Self::UpdateStream), + codes::DELETE_STREAM_CODE => Some(Self::DeleteStream), + codes::PURGE_STREAM_CODE => Some(Self::PurgeStream), + codes::CREATE_TOPIC_CODE => Some(Self::CreateTopic), + codes::UPDATE_TOPIC_CODE => Some(Self::UpdateTopic), + codes::DELETE_TOPIC_CODE => Some(Self::DeleteTopic), + codes::PURGE_TOPIC_CODE => Some(Self::PurgeTopic), + codes::CREATE_PARTITIONS_CODE => Some(Self::CreatePartitions), + codes::DELETE_PARTITIONS_CODE => Some(Self::DeletePartitions), + codes::DELETE_SEGMENTS_CODE => Some(Self::DeleteSegments), + codes::CREATE_CONSUMER_GROUP_CODE => Some(Self::CreateConsumerGroup), + codes::DELETE_CONSUMER_GROUP_CODE => Some(Self::DeleteConsumerGroup), + codes::CREATE_USER_CODE => Some(Self::CreateUser), + codes::UPDATE_USER_CODE => Some(Self::UpdateUser), + codes::DELETE_USER_CODE => Some(Self::DeleteUser), + codes::CHANGE_PASSWORD_CODE => Some(Self::ChangePassword), + codes::UPDATE_PERMISSIONS_CODE => Some(Self::UpdatePermissions), + codes::CREATE_PERSONAL_ACCESS_TOKEN_CODE => Some(Self::CreatePersonalAccessToken), + codes::DELETE_PERSONAL_ACCESS_TOKEN_CODE => Some(Self::DeletePersonalAccessToken), + codes::SEND_MESSAGES_CODE => Some(Self::SendMessages), + codes::STORE_CONSUMER_OFFSET_CODE => Some(Self::StoreConsumerOffset), + _ => None, + } + } +} + +#[cfg(test)] +mod tests { + use super::Operation; + + #[test] + fn command_code_roundtrip() { + let ops = [ + Operation::CreateStream, + Operation::UpdateStream, + Operation::DeleteStream, + Operation::PurgeStream, + Operation::CreateTopic, + Operation::UpdateTopic, + Operation::DeleteTopic, + Operation::PurgeTopic, + Operation::CreatePartitions, + Operation::DeletePartitions, + Operation::DeleteSegments, + Operation::CreateConsumerGroup, + Operation::DeleteConsumerGroup, + Operation::CreateUser, + Operation::UpdateUser, + Operation::DeleteUser, + Operation::ChangePassword, + Operation::UpdatePermissions, + Operation::CreatePersonalAccessToken, + Operation::DeletePersonalAccessToken, + Operation::SendMessages, + Operation::StoreConsumerOffset, + ]; + for op in ops { + let code = op + .to_command_code() + .unwrap_or_else(|| panic!("Operation {op:?} has no command code mapping")); + let back = Operation::from_command_code(code) + .unwrap_or_else(|| panic!("command code {code} has no Operation mapping")); + assert_eq!(op, back, "roundtrip failed for {op:?} (code={code})"); + } + } + + #[test] + fn reserved_has_no_code() { + assert_eq!(Operation::Reserved.to_command_code(), None); + } + + #[test] + fn read_only_commands_have_no_operation() { + assert!(Operation::from_command_code(crate::PING_CODE).is_none()); + assert!(Operation::from_command_code(crate::GET_STATS_CODE).is_none()); + assert!(Operation::from_command_code(crate::GET_STREAM_CODE).is_none()); + assert!(Operation::from_command_code(crate::POLL_MESSAGES_CODE).is_none()); + } + + #[test] + fn metadata_vs_partition() { + assert!(Operation::CreateStream.is_metadata()); + assert!(!Operation::CreateStream.is_partition()); + assert!(Operation::SendMessages.is_partition()); + assert!(!Operation::SendMessages.is_metadata()); + assert!(Operation::DeleteSegments.is_partition()); + } +} diff --git a/core/binary_protocol/src/consensus/prepare.rs b/core/binary_protocol/src/consensus/prepare.rs new file mode 100644 index 000000000..a5a6313c5 --- /dev/null +++ b/core/binary_protocol/src/consensus/prepare.rs @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::{Command2, ConsensusError, ConsensusHeader, HEADER_SIZE, Operation}; +use bytemuck::{CheckedBitPattern, NoUninit}; +use std::mem::offset_of; + +/// Primary -> replicas: replicate this operation. +#[repr(C)] +#[derive(Debug, Clone, Copy, CheckedBitPattern, NoUninit)] +pub struct PrepareHeader { + pub checksum: u128, + pub checksum_body: u128, + pub cluster: u128, + pub size: u32, + pub view: u32, + pub release: u32, + pub command: Command2, + pub replica: u8, + pub reserved_frame: [u8; 66], + + pub client: u128, + pub parent: u128, + pub request_checksum: u128, + pub op: u64, + pub commit: u64, + pub timestamp: u64, + pub request: u64, + pub operation: Operation, + pub operation_padding: [u8; 7], + pub namespace: u64, + pub reserved: [u8; 32], +} +const _: () = { + assert!(size_of::<PrepareHeader>() == HEADER_SIZE); + assert!( + offset_of!(PrepareHeader, client) + == offset_of!(PrepareHeader, reserved_frame) + size_of::<[u8; 66]>() + ); + assert!(offset_of!(PrepareHeader, reserved) + size_of::<[u8; 32]>() == HEADER_SIZE); +}; + +impl ConsensusHeader for PrepareHeader { + const COMMAND: Command2 = Command2::Prepare; + + fn operation(&self) -> Operation { + self.operation + } + + fn validate(&self) -> Result<(), ConsensusError> { + if self.command != Command2::Prepare { + return Err(ConsensusError::InvalidCommand { + expected: Command2::Prepare, + found: self.command, + }); + } + Ok(()) + } + + fn command(&self) -> Command2 { + self.command + } + + fn size(&self) -> u32 { + self.size + } +} + +impl Default for PrepareHeader { + fn default() -> Self { + Self { + checksum: 0, + checksum_body: 0, + cluster: 0, + size: 0, + view: 0, + release: 0, + command: Command2::Reserved, + replica: 0, + reserved_frame: [0; 66], + client: 0, + parent: 0, + request_checksum: 0, + op: 0, + commit: 0, + timestamp: 0, + request: 0, + operation: Operation::Reserved, + operation_padding: [0; 7], + namespace: 0, + reserved: [0; 32], + } + } +} diff --git a/core/binary_protocol/src/consensus/prepare_ok.rs b/core/binary_protocol/src/consensus/prepare_ok.rs new file mode 100644 index 000000000..ac673db87 --- /dev/null +++ b/core/binary_protocol/src/consensus/prepare_ok.rs @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::{Command2, ConsensusError, ConsensusHeader, HEADER_SIZE, Operation}; +use bytemuck::{CheckedBitPattern, NoUninit}; +use std::mem::offset_of; + +/// Replica -> primary: acknowledge a Prepare. +#[repr(C)] +#[derive(Debug, Clone, Copy, CheckedBitPattern, NoUninit)] +pub struct PrepareOkHeader { + pub checksum: u128, + pub checksum_body: u128, + pub cluster: u128, + pub size: u32, + pub view: u32, + pub release: u32, + pub command: Command2, + pub replica: u8, + pub reserved_frame: [u8; 66], + + pub parent: u128, + pub prepare_checksum: u128, + pub op: u64, + pub commit: u64, + pub timestamp: u64, + pub request: u64, + pub operation: Operation, + pub operation_padding: [u8; 7], + pub namespace: u64, + pub reserved: [u8; 48], +} +const _: () = { + assert!(size_of::<PrepareOkHeader>() == HEADER_SIZE); + assert!( + offset_of!(PrepareOkHeader, parent) + == offset_of!(PrepareOkHeader, reserved_frame) + size_of::<[u8; 66]>() + ); + assert!(offset_of!(PrepareOkHeader, reserved) + size_of::<[u8; 48]>() == HEADER_SIZE); +}; + +impl ConsensusHeader for PrepareOkHeader { + const COMMAND: Command2 = Command2::PrepareOk; + + fn operation(&self) -> Operation { + self.operation + } + + fn command(&self) -> Command2 { + self.command + } + + fn validate(&self) -> Result<(), ConsensusError> { + if self.command != Command2::PrepareOk { + return Err(ConsensusError::InvalidCommand { + expected: Command2::PrepareOk, + found: self.command, + }); + } + Ok(()) + } + + fn size(&self) -> u32 { + self.size + } +} + +impl Default for PrepareOkHeader { + fn default() -> Self { + Self { + checksum: 0, + checksum_body: 0, + cluster: 0, + size: 0, + view: 0, + release: 0, + command: Command2::Reserved, + replica: 0, + reserved_frame: [0; 66], + parent: 0, + prepare_checksum: 0, + op: 0, + commit: 0, + timestamp: 0, + request: 0, + operation: Operation::Reserved, + operation_padding: [0; 7], + namespace: 0, + reserved: [0; 48], + } + } +} diff --git a/core/binary_protocol/src/consensus/reply.rs b/core/binary_protocol/src/consensus/reply.rs new file mode 100644 index 000000000..e265203b8 --- /dev/null +++ b/core/binary_protocol/src/consensus/reply.rs @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::{Command2, ConsensusError, ConsensusHeader, HEADER_SIZE, Operation}; +use bytemuck::{CheckedBitPattern, NoUninit}; +use std::mem::offset_of; + +/// Primary -> client reply header. 256 bytes. +#[repr(C)] +#[derive(Debug, Clone, Copy, CheckedBitPattern, NoUninit)] +pub struct ReplyHeader { + pub checksum: u128, + pub checksum_body: u128, + pub cluster: u128, + pub size: u32, + pub view: u32, + pub release: u32, + pub command: Command2, + pub replica: u8, + pub reserved_frame: [u8; 66], + + pub request_checksum: u128, + pub context: u128, + pub op: u64, + pub commit: u64, + pub timestamp: u64, + pub request: u64, + pub operation: Operation, + pub operation_padding: [u8; 7], + pub namespace: u64, + pub reserved: [u8; 48], +} +const _: () = { + assert!(size_of::<ReplyHeader>() == HEADER_SIZE); + assert!( + offset_of!(ReplyHeader, request_checksum) + == offset_of!(ReplyHeader, reserved_frame) + size_of::<[u8; 66]>() + ); + assert!(offset_of!(ReplyHeader, reserved) + size_of::<[u8; 48]>() == HEADER_SIZE); +}; + +impl ConsensusHeader for ReplyHeader { + const COMMAND: Command2 = Command2::Reply; + + fn operation(&self) -> Operation { + self.operation + } + + fn command(&self) -> Command2 { + self.command + } + + fn validate(&self) -> Result<(), ConsensusError> { + if self.command != Command2::Reply { + return Err(ConsensusError::ReplyInvalidCommand2); + } + Ok(()) + } + + fn size(&self) -> u32 { + self.size + } +} + +impl Default for ReplyHeader { + fn default() -> Self { + Self { + checksum: 0, + checksum_body: 0, + cluster: 0, + size: 0, + view: 0, + release: 0, + command: Command2::Reserved, + replica: 0, + reserved_frame: [0; 66], + request_checksum: 0, + context: 0, + op: 0, + commit: 0, + timestamp: 0, + request: 0, + operation: Operation::Reserved, + operation_padding: [0; 7], + namespace: 0, + reserved: [0; 48], + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn reply_header_zero_copy() { + let mut buf = bytes::BytesMut::zeroed(256); + buf[60] = Command2::Reply as u8; + let header: &ReplyHeader = bytemuck::checked::try_from_bytes(&buf).unwrap(); + assert_eq!(header.command, Command2::Reply); + assert!(header.validate().is_ok()); + } +} diff --git a/core/binary_protocol/src/consensus/request.rs b/core/binary_protocol/src/consensus/request.rs new file mode 100644 index 000000000..eeba8fb52 --- /dev/null +++ b/core/binary_protocol/src/consensus/request.rs @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::{Command2, ConsensusError, ConsensusHeader, HEADER_SIZE, Operation}; +use bytemuck::{CheckedBitPattern, NoUninit}; +use std::mem::offset_of; + +/// Client -> primary request header. 256 bytes. +#[repr(C)] +#[derive(Debug, Clone, Copy, CheckedBitPattern, NoUninit)] +pub struct RequestHeader { + pub checksum: u128, + pub checksum_body: u128, + pub cluster: u128, + pub size: u32, + pub view: u32, + pub release: u32, + pub command: Command2, + pub replica: u8, + pub reserved_frame: [u8; 66], + + pub client: u128, + pub request_checksum: u128, + pub timestamp: u64, + pub request: u64, + pub operation: Operation, + pub operation_padding: [u8; 7], + pub namespace: u64, + pub reserved: [u8; 64], +} +const _: () = { + assert!(size_of::<RequestHeader>() == HEADER_SIZE); + assert!( + offset_of!(RequestHeader, client) + == offset_of!(RequestHeader, reserved_frame) + size_of::<[u8; 66]>() + ); + assert!(offset_of!(RequestHeader, reserved) + size_of::<[u8; 64]>() == HEADER_SIZE); +}; + +impl Default for RequestHeader { + fn default() -> Self { + Self { + checksum: 0, + checksum_body: 0, + cluster: 0, + size: 0, + view: 0, + release: 0, + command: Command2::Reserved, + replica: 0, + reserved_frame: [0; 66], + client: 0, + request_checksum: 0, + timestamp: 0, + request: 0, + operation: Operation::Reserved, + operation_padding: [0; 7], + namespace: 0, + reserved: [0; 64], + } + } +} + +impl ConsensusHeader for RequestHeader { + const COMMAND: Command2 = Command2::Request; + + fn operation(&self) -> Operation { + self.operation + } + + fn validate(&self) -> Result<(), ConsensusError> { + if self.command != Command2::Request { + return Err(ConsensusError::InvalidCommand { + expected: Command2::Request, + found: self.command, + }); + } + Ok(()) + } + + fn command(&self) -> Command2 { + self.command + } + + fn size(&self) -> u32 { + self.size + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn request_header_zero_copy() { + let mut buf = bytes::BytesMut::zeroed(256); + buf[60] = Command2::Request as u8; + let header: &RequestHeader = bytemuck::checked::try_from_bytes(&buf).unwrap(); + assert_eq!(header.command, Command2::Request); + assert!(header.validate().is_ok()); + } + + #[test] + fn request_header_wrong_command_fails_validation() { + let buf = bytes::BytesMut::zeroed(256); + let header: &RequestHeader = bytemuck::checked::try_from_bytes(&buf).unwrap(); + assert!(header.validate().is_err()); + } +} diff --git a/core/binary_protocol/src/consensus/start_view.rs b/core/binary_protocol/src/consensus/start_view.rs new file mode 100644 index 000000000..255229633 --- /dev/null +++ b/core/binary_protocol/src/consensus/start_view.rs @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::{Command2, ConsensusError, ConsensusHeader, HEADER_SIZE, Operation}; +use bytemuck::{CheckedBitPattern, NoUninit}; +use std::mem::offset_of; + +/// New primary -> all replicas: start new view. Header-only. +#[derive(Debug, Clone, Copy, PartialEq, Eq, CheckedBitPattern, NoUninit)] +#[repr(C)] +pub struct StartViewHeader { + pub checksum: u128, + pub checksum_body: u128, + pub cluster: u128, + pub size: u32, + pub view: u32, + pub release: u32, + pub command: Command2, + pub replica: u8, + pub reserved_frame: [u8; 66], + + /// Highest op in the new primary's log. + pub op: u64, + /// max(commit) from all DVCs. + pub commit: u64, + pub namespace: u64, + pub reserved: [u8; 104], +} +const _: () = { + assert!(size_of::<StartViewHeader>() == HEADER_SIZE); + assert!( + offset_of!(StartViewHeader, op) + == offset_of!(StartViewHeader, reserved_frame) + size_of::<[u8; 66]>() + ); + assert!(offset_of!(StartViewHeader, reserved) + size_of::<[u8; 104]>() == HEADER_SIZE); +}; + +impl ConsensusHeader for StartViewHeader { + const COMMAND: Command2 = Command2::StartView; + + fn operation(&self) -> Operation { + Operation::Reserved + } + + fn command(&self) -> Command2 { + self.command + } + + fn validate(&self) -> Result<(), ConsensusError> { + if self.command != Command2::StartView { + return Err(ConsensusError::InvalidCommand { + expected: Command2::StartView, + found: self.command, + }); + } + if self.release != 0 { + return Err(ConsensusError::InvalidField( + "release must be 0".to_string(), + )); + } + if self.commit > self.op { + return Err(ConsensusError::InvalidField( + "commit cannot exceed op".to_string(), + )); + } + Ok(()) + } + + fn size(&self) -> u32 { + self.size + } +} diff --git a/core/binary_protocol/src/consensus/start_view_change.rs b/core/binary_protocol/src/consensus/start_view_change.rs new file mode 100644 index 000000000..c33d02042 --- /dev/null +++ b/core/binary_protocol/src/consensus/start_view_change.rs @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::{Command2, ConsensusError, ConsensusHeader, HEADER_SIZE, Operation}; +use bytemuck::{CheckedBitPattern, NoUninit}; +use std::mem::offset_of; + +/// Replica suspects primary failure. Header-only. +#[derive(Debug, Clone, Copy, PartialEq, Eq, CheckedBitPattern, NoUninit)] +#[repr(C)] +pub struct StartViewChangeHeader { + pub checksum: u128, + pub checksum_body: u128, + pub cluster: u128, + pub size: u32, + pub view: u32, + pub release: u32, + pub command: Command2, + pub replica: u8, + pub reserved_frame: [u8; 66], + + pub namespace: u64, + pub reserved: [u8; 120], +} +const _: () = { + assert!(size_of::<StartViewChangeHeader>() == HEADER_SIZE); + assert!( + offset_of!(StartViewChangeHeader, namespace) + == offset_of!(StartViewChangeHeader, reserved_frame) + size_of::<[u8; 66]>() + ); + assert!(offset_of!(StartViewChangeHeader, reserved) + size_of::<[u8; 120]>() == HEADER_SIZE); +}; + +impl ConsensusHeader for StartViewChangeHeader { + const COMMAND: Command2 = Command2::StartViewChange; + + fn operation(&self) -> Operation { + Operation::Reserved + } + + fn command(&self) -> Command2 { + self.command + } + + fn validate(&self) -> Result<(), ConsensusError> { + if self.command != Command2::StartViewChange { + return Err(ConsensusError::InvalidCommand { + expected: Command2::StartViewChange, + found: self.command, + }); + } + if self.release != 0 { + return Err(ConsensusError::InvalidField("release != 0".to_string())); + } + Ok(()) + } + + fn size(&self) -> u32 { + self.size + } +} diff --git a/core/binary_protocol/src/error.rs b/core/binary_protocol/src/error.rs new file mode 100644 index 000000000..f4f53ea56 --- /dev/null +++ b/core/binary_protocol/src/error.rs @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// Protocol-local error type for wire format encode/decode failures. +/// +/// Intentionally decoupled from `IggyError` to keep the protocol crate +/// free of domain dependencies. Conversion to `IggyError` happens at +/// the boundary (SDK / server). +#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] +pub enum WireError { + #[error("unexpected end of buffer at offset {offset}: need {need} bytes, have {have}")] + UnexpectedEof { + offset: usize, + need: usize, + have: usize, + }, + + #[error("invalid utf-8 at offset {offset}")] + InvalidUtf8 { offset: usize }, + + #[error("unknown command code: {0}")] + UnknownCommand(u32), + + #[error("unknown discriminant {value} for {type_name} at offset {offset}")] + UnknownDiscriminant { + type_name: &'static str, + value: u8, + offset: usize, + }, + + #[error("payload too large: {size} bytes, max {max}")] + PayloadTooLarge { size: usize, max: usize }, + + #[error("validation failed: {0}")] + Validation(String), +} diff --git a/core/binary_protocol/src/frame.rs b/core/binary_protocol/src/frame.rs new file mode 100644 index 000000000..b17d0d3f9 --- /dev/null +++ b/core/binary_protocol/src/frame.rs @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// Request frame: `[length:4 LE][code:4 LE][payload:N]` +/// `length` = size of code + payload = 4 + N +pub const REQUEST_HEADER_SIZE: usize = 4; + +/// Response frame: `[status:4 LE][length:4 LE][payload:N]` +pub const RESPONSE_HEADER_SIZE: usize = 8; + +/// Status code for a successful response. +pub const STATUS_OK: u32 = 0; diff --git a/core/binary_protocol/src/identifier.rs b/core/binary_protocol/src/identifier.rs new file mode 100644 index 000000000..e2b356e0c --- /dev/null +++ b/core/binary_protocol/src/identifier.rs @@ -0,0 +1,252 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::WireError; +use crate::codec::{WireDecode, WireEncode, read_bytes, read_u8}; +use crate::wire_name::WireName; +use bytes::{BufMut, BytesMut}; + +const KIND_NUMERIC: u8 = 1; +const KIND_STRING: u8 = 2; +const NUMERIC_VALUE_LEN: u8 = 4; + +/// Protocol-owned identifier type. Polymorphic: numeric (u32) or string. +/// +/// Wire format: `[kind:1][length:1][value:N]` +/// - Numeric: kind=1, length=4, value=u32 LE +/// - String: kind=2, length=1..255, value=UTF-8 bytes +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum WireIdentifier { + Numeric(u32), + String(WireName), +} + +impl WireIdentifier { + #[must_use] + pub const fn numeric(id: u32) -> Self { + Self::Numeric(id) + } + + /// # Errors + /// Returns `WireError::Validation` if the name is empty or exceeds 255 bytes. + pub fn named(name: impl Into<String>) -> Result<Self, WireError> { + let wire_name = WireName::new(name)?; + Ok(Self::String(wire_name)) + } + + #[must_use] + pub const fn as_u32(&self) -> Option<u32> { + match self { + Self::Numeric(id) => Some(*id), + Self::String(_) => None, + } + } + + #[must_use] + pub fn as_str(&self) -> Option<&str> { + match self { + Self::Numeric(_) => None, + Self::String(s) => Some(s.as_str()), + } + } + + const fn kind_code(&self) -> u8 { + match self { + Self::Numeric(_) => KIND_NUMERIC, + Self::String(_) => KIND_STRING, + } + } + + #[allow(clippy::cast_possible_truncation)] + const fn value_len(&self) -> u8 { + match self { + Self::Numeric(_) => NUMERIC_VALUE_LEN, + // WireName guarantees len <= 255, truncation impossible. + Self::String(s) => s.len() as u8, + } + } +} + +impl WireEncode for WireIdentifier { + fn encoded_size(&self) -> usize { + 2 + self.value_len() as usize + } + + fn encode(&self, buf: &mut BytesMut) { + buf.put_u8(self.kind_code()); + buf.put_u8(self.value_len()); + match self { + Self::Numeric(id) => buf.put_u32_le(*id), + Self::String(s) => buf.put_slice(s.as_bytes()), + } + } +} + +impl WireDecode for WireIdentifier { + fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> { + let kind = read_u8(buf, 0)?; + let length = read_u8(buf, 1)? as usize; + let value = read_bytes(buf, 2, length)?; + let consumed = 2 + length; + + match kind { + KIND_NUMERIC => { + if length != NUMERIC_VALUE_LEN as usize { + return Err(WireError::Validation(format!( + "numeric identifier must be {NUMERIC_VALUE_LEN} bytes, got {length}" + ))); + } + let id = u32::from_le_bytes( + value + .try_into() + .expect("length already validated as 4 bytes"), + ); + Ok((Self::Numeric(id), consumed)) + } + KIND_STRING => { + if length == 0 { + return Err(WireError::Validation( + "string identifier cannot be empty".to_string(), + )); + } + let s = + std::str::from_utf8(value).map_err(|_| WireError::InvalidUtf8 { offset: 2 })?; + let wire_name = WireName::new(s)?; + Ok((Self::String(wire_name), consumed)) + } + _ => Err(WireError::UnknownDiscriminant { + type_name: "WireIdentifier", + value: kind, + offset: 0, + }), + } + } +} + +impl std::fmt::Display for WireIdentifier { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Numeric(id) => write!(f, "{id}"), + Self::String(s) => write!(f, "{s}"), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn roundtrip_numeric() { + let id = WireIdentifier::numeric(42); + let bytes = id.to_bytes(); + assert_eq!(bytes.len(), 6); // 1 + 1 + 4 + let (decoded, consumed) = WireIdentifier::decode(&bytes).unwrap(); + assert_eq!(consumed, 6); + assert_eq!(decoded, id); + assert_eq!(decoded.as_u32(), Some(42)); + } + + #[test] + fn roundtrip_string() { + let id = WireIdentifier::named("my-stream").unwrap(); + let bytes = id.to_bytes(); + assert_eq!(bytes.len(), 2 + 9); // 1 + 1 + 9 + let (decoded, consumed) = WireIdentifier::decode(&bytes).unwrap(); + assert_eq!(consumed, 11); + assert_eq!(decoded, id); + assert_eq!(decoded.as_str(), Some("my-stream")); + } + + #[test] + fn empty_string_rejected() { + assert!(WireIdentifier::named("").is_err()); + } + + #[test] + fn max_length_string_accepted() { + let name = "a".repeat(255); + let id = WireIdentifier::named(&name).unwrap(); + let bytes = id.to_bytes(); + let (decoded, _) = WireIdentifier::decode(&bytes).unwrap(); + assert_eq!(decoded.as_str(), Some(name.as_str())); + } + + #[test] + fn too_long_string_rejected() { + let name = "a".repeat(256); + assert!(WireIdentifier::named(&name).is_err()); + } + + #[test] + fn truncated_buffer_returns_error() { + let id = WireIdentifier::numeric(1); + let bytes = id.to_bytes(); + for i in 0..bytes.len() { + assert!( + WireIdentifier::decode(&bytes[..i]).is_err(), + "expected error for truncation at byte {i}" + ); + } + } + + #[test] + fn unknown_kind_returns_error() { + let buf = [0xFF, 0x04, 0x01, 0x00, 0x00, 0x00]; + assert!(WireIdentifier::decode(&buf).is_err()); + } + + #[test] + fn numeric_with_wrong_length_returns_error() { + let buf = [KIND_NUMERIC, 0x03, 0x01, 0x00, 0x00]; // length=3, should be 4 + assert!(WireIdentifier::decode(&buf).is_err()); + } + + #[test] + fn string_with_zero_length_returns_error() { + let buf = [KIND_STRING, 0x00]; + assert!(WireIdentifier::decode(&buf).is_err()); + } + + #[test] + fn display_numeric() { + let id = WireIdentifier::numeric(7); + assert_eq!(format!("{id}"), "7"); + } + + #[test] + fn display_string() { + let id = WireIdentifier::named("test").unwrap(); + assert_eq!(format!("{id}"), "test"); + } + + #[test] + fn non_ascii_utf8_string_roundtrip() { + let id = WireIdentifier::named("str\u{00e9}am-\u{00fc}ser").unwrap(); + let bytes = id.to_bytes(); + let (decoded, _) = WireIdentifier::decode(&bytes).unwrap(); + assert_eq!(decoded, id); + } + + #[test] + fn invalid_utf8_bytes_rejected() { + // kind=STRING, length=2, then 2 bytes of invalid UTF-8 + let buf = [KIND_STRING, 0x02, 0xFF, 0xFE]; + let result = WireIdentifier::decode(&buf); + assert!(result.is_err()); + } +} diff --git a/core/binary_protocol/src/lib.rs b/core/binary_protocol/src/lib.rs index 31bd66e6e..874596247 100644 --- a/core/binary_protocol/src/lib.rs +++ b/core/binary_protocol/src/lib.rs @@ -1,17 +1,60 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Wire protocol types and codec for the Iggy binary protocol. +//! +//! This crate is the single source of truth for the binary wire format +//! shared between the Iggy server and SDK. It is a pure codec - no I/O, +//! no async, no runtime dependencies. +//! +//! # Design +//! +//! Protocol types are independent from domain types (`iggy_common`). +//! Conversion between wire types and domain types happens at the boundary +//! (SDK client impls, server handlers). +//! +//! # Wire frame format +//! +//! **Request** (client -> server): +//! ```text +//! [length:4 bytes, u32 LE][code:4 bytes, u32 LE][payload:N bytes] +//! ``` +//! +//! **Response** (server -> client): +//! ```text +//! [status:4 bytes, u32 LE][length:4 bytes, u32 LE][payload:N bytes] +//! ``` +//! +//! All multi-byte integers are little-endian. Strings are length-prefixed +//! (u8 length for names, u32 length for longer strings). + +pub mod codec; +pub mod codes; +pub mod consensus; +pub mod error; +pub mod frame; +pub mod identifier; +pub mod requests; +pub mod responses; +pub mod wire_name; + +pub use codec::{WireDecode, WireEncode}; +pub use codes::*; +pub use error::WireError; +pub use frame::*; +pub use identifier::WireIdentifier; +pub use wire_name::{MAX_WIRE_NAME_LENGTH, WireName}; diff --git a/core/binary_protocol/src/requests/mod.rs b/core/binary_protocol/src/requests/mod.rs new file mode 100644 index 000000000..e7b687f1f --- /dev/null +++ b/core/binary_protocol/src/requests/mod.rs @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod streams; diff --git a/core/binary_protocol/src/requests/streams/create_stream.rs b/core/binary_protocol/src/requests/streams/create_stream.rs new file mode 100644 index 000000000..c6d339040 --- /dev/null +++ b/core/binary_protocol/src/requests/streams/create_stream.rs @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::WireError; +use crate::codec::{WireDecode, WireEncode}; +use crate::wire_name::WireName; +use bytes::BytesMut; + +/// `CreateStream` request. Wire format: `[name_len:1][name:N]` +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CreateStreamRequest { + pub name: WireName, +} + +impl WireEncode for CreateStreamRequest { + fn encoded_size(&self) -> usize { + self.name.encoded_size() + } + + fn encode(&self, buf: &mut BytesMut) { + self.name.encode(buf); + } +} + +impl WireDecode for CreateStreamRequest { + fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> { + let (name, consumed) = WireName::decode(buf)?; + Ok((Self { name }, consumed)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn roundtrip() { + let req = CreateStreamRequest { + name: WireName::new("test-stream").unwrap(), + }; + let bytes = req.to_bytes(); + assert_eq!(bytes.len(), 1 + 11); + let (decoded, consumed) = CreateStreamRequest::decode(&bytes).unwrap(); + assert_eq!(consumed, bytes.len()); + assert_eq!(decoded, req); + } + + #[test] + fn empty_name_rejected() { + let buf = [0u8]; + assert!(CreateStreamRequest::decode(&buf).is_err()); + } + + #[test] + fn truncated_returns_error() { + let req = CreateStreamRequest { + name: WireName::new("test").unwrap(), + }; + let bytes = req.to_bytes(); + for i in 0..bytes.len() { + assert!( + CreateStreamRequest::decode(&bytes[..i]).is_err(), + "expected error for truncation at byte {i}" + ); + } + } + + #[test] + fn wire_compat_byte_layout() { + let req = CreateStreamRequest { + name: WireName::new("test").unwrap(), + }; + let bytes = req.to_bytes(); + assert_eq!(&bytes[..], &[4, b't', b'e', b's', b't']); + } + + #[test] + fn too_long_name_rejected_at_construction() { + let long = "a".repeat(256); + assert!(WireName::new(long).is_err()); + } +} diff --git a/core/binary_protocol/src/requests/streams/delete_stream.rs b/core/binary_protocol/src/requests/streams/delete_stream.rs new file mode 100644 index 000000000..b30bfa882 --- /dev/null +++ b/core/binary_protocol/src/requests/streams/delete_stream.rs @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::WireError; +use crate::WireIdentifier; +use crate::codec::{WireDecode, WireEncode}; +use bytes::BytesMut; + +/// `DeleteStream` request. Wire format: `[identifier]` +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DeleteStreamRequest { + pub stream_id: WireIdentifier, +} + +impl WireEncode for DeleteStreamRequest { + fn encoded_size(&self) -> usize { + self.stream_id.encoded_size() + } + + fn encode(&self, buf: &mut BytesMut) { + self.stream_id.encode(buf); + } +} + +impl WireDecode for DeleteStreamRequest { + fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> { + let (stream_id, consumed) = WireIdentifier::decode(buf)?; + Ok((Self { stream_id }, consumed)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn roundtrip() { + let req = DeleteStreamRequest { + stream_id: WireIdentifier::numeric(5), + }; + let bytes = req.to_bytes(); + let (decoded, consumed) = DeleteStreamRequest::decode(&bytes).unwrap(); + assert_eq!(consumed, bytes.len()); + assert_eq!(decoded, req); + } +} diff --git a/core/binary_protocol/src/requests/streams/get_stream.rs b/core/binary_protocol/src/requests/streams/get_stream.rs new file mode 100644 index 000000000..e543667e4 --- /dev/null +++ b/core/binary_protocol/src/requests/streams/get_stream.rs @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::WireError; +use crate::WireIdentifier; +use crate::codec::{WireDecode, WireEncode}; +use bytes::BytesMut; + +/// `GetStream` request. Wire format: `[identifier]` +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct GetStreamRequest { + pub stream_id: WireIdentifier, +} + +impl WireEncode for GetStreamRequest { + fn encoded_size(&self) -> usize { + self.stream_id.encoded_size() + } + + fn encode(&self, buf: &mut BytesMut) { + self.stream_id.encode(buf); + } +} + +impl WireDecode for GetStreamRequest { + fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> { + let (stream_id, consumed) = WireIdentifier::decode(buf)?; + Ok((Self { stream_id }, consumed)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn roundtrip_numeric() { + let req = GetStreamRequest { + stream_id: WireIdentifier::numeric(42), + }; + let bytes = req.to_bytes(); + let (decoded, consumed) = GetStreamRequest::decode(&bytes).unwrap(); + assert_eq!(consumed, bytes.len()); + assert_eq!(decoded, req); + } + + #[test] + fn roundtrip_named() { + let req = GetStreamRequest { + stream_id: WireIdentifier::named("my-stream").unwrap(), + }; + let bytes = req.to_bytes(); + let (decoded, consumed) = GetStreamRequest::decode(&bytes).unwrap(); + assert_eq!(consumed, bytes.len()); + assert_eq!(decoded, req); + } + + #[test] + fn wire_compat_numeric_byte_layout() { + let req = GetStreamRequest { + stream_id: WireIdentifier::numeric(1), + }; + let bytes = req.to_bytes(); + assert_eq!(&bytes[..], &[1, 4, 1, 0, 0, 0]); + } +} diff --git a/core/binary_protocol/src/requests/streams/get_streams.rs b/core/binary_protocol/src/requests/streams/get_streams.rs new file mode 100644 index 000000000..897e89888 --- /dev/null +++ b/core/binary_protocol/src/requests/streams/get_streams.rs @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::WireError; +use crate::codec::{WireDecode, WireEncode}; +use bytes::BytesMut; + +/// `GetStreams` request. Wire format: empty. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct GetStreamsRequest; + +impl WireEncode for GetStreamsRequest { + fn encoded_size(&self) -> usize { + 0 + } + + fn encode(&self, _buf: &mut BytesMut) {} +} + +impl WireDecode for GetStreamsRequest { + fn decode(_buf: &[u8]) -> Result<(Self, usize), WireError> { + Ok((Self, 0)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn roundtrip() { + let req = GetStreamsRequest; + let bytes = req.to_bytes(); + assert!(bytes.is_empty()); + let (decoded, consumed) = GetStreamsRequest::decode(&bytes).unwrap(); + assert_eq!(consumed, 0); + assert_eq!(decoded, req); + } +} diff --git a/core/binary_protocol/src/requests/streams/mod.rs b/core/binary_protocol/src/requests/streams/mod.rs new file mode 100644 index 000000000..8307a1d46 --- /dev/null +++ b/core/binary_protocol/src/requests/streams/mod.rs @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod create_stream; +pub mod delete_stream; +pub mod get_stream; +pub mod get_streams; +pub mod purge_stream; +pub mod update_stream; + +pub use create_stream::CreateStreamRequest; +pub use delete_stream::DeleteStreamRequest; +pub use get_stream::GetStreamRequest; +pub use get_streams::GetStreamsRequest; +pub use purge_stream::PurgeStreamRequest; +pub use update_stream::UpdateStreamRequest; diff --git a/core/binary_protocol/src/requests/streams/purge_stream.rs b/core/binary_protocol/src/requests/streams/purge_stream.rs new file mode 100644 index 000000000..bd8028b3f --- /dev/null +++ b/core/binary_protocol/src/requests/streams/purge_stream.rs @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::WireError; +use crate::WireIdentifier; +use crate::codec::{WireDecode, WireEncode}; +use bytes::BytesMut; + +/// `PurgeStream` request. Wire format: `[identifier]` +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PurgeStreamRequest { + pub stream_id: WireIdentifier, +} + +impl WireEncode for PurgeStreamRequest { + fn encoded_size(&self) -> usize { + self.stream_id.encoded_size() + } + + fn encode(&self, buf: &mut BytesMut) { + self.stream_id.encode(buf); + } +} + +impl WireDecode for PurgeStreamRequest { + fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> { + let (stream_id, consumed) = WireIdentifier::decode(buf)?; + Ok((Self { stream_id }, consumed)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn roundtrip() { + let req = PurgeStreamRequest { + stream_id: WireIdentifier::numeric(1), + }; + let bytes = req.to_bytes(); + let (decoded, consumed) = PurgeStreamRequest::decode(&bytes).unwrap(); + assert_eq!(consumed, bytes.len()); + assert_eq!(decoded, req); + } +} diff --git a/core/binary_protocol/src/requests/streams/update_stream.rs b/core/binary_protocol/src/requests/streams/update_stream.rs new file mode 100644 index 000000000..5592a2ed2 --- /dev/null +++ b/core/binary_protocol/src/requests/streams/update_stream.rs @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::WireError; +use crate::WireIdentifier; +use crate::codec::{WireDecode, WireEncode}; +use crate::wire_name::WireName; +use bytes::BytesMut; + +/// `UpdateStream` request. Wire format: `[identifier][name_len:1][name:N]` +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct UpdateStreamRequest { + pub stream_id: WireIdentifier, + pub name: WireName, +} + +impl WireEncode for UpdateStreamRequest { + fn encoded_size(&self) -> usize { + self.stream_id.encoded_size() + self.name.encoded_size() + } + + fn encode(&self, buf: &mut BytesMut) { + self.stream_id.encode(buf); + self.name.encode(buf); + } +} + +impl WireDecode for UpdateStreamRequest { + fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> { + let (stream_id, mut pos) = WireIdentifier::decode(buf)?; + let (name, consumed) = WireName::decode(&buf[pos..])?; + pos += consumed; + Ok((Self { stream_id, name }, pos)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn roundtrip() { + let req = UpdateStreamRequest { + stream_id: WireIdentifier::named("old-name").unwrap(), + name: WireName::new("new-name").unwrap(), + }; + let bytes = req.to_bytes(); + let (decoded, consumed) = UpdateStreamRequest::decode(&bytes).unwrap(); + assert_eq!(consumed, bytes.len()); + assert_eq!(decoded, req); + } + + #[test] + fn truncated_returns_error() { + let req = UpdateStreamRequest { + stream_id: WireIdentifier::numeric(1), + name: WireName::new("test").unwrap(), + }; + let bytes = req.to_bytes(); + for i in 0..bytes.len() { + assert!( + UpdateStreamRequest::decode(&bytes[..i]).is_err(), + "expected error for truncation at byte {i}" + ); + } + } +} diff --git a/core/binary_protocol/src/responses/mod.rs b/core/binary_protocol/src/responses/mod.rs new file mode 100644 index 000000000..7b062a665 --- /dev/null +++ b/core/binary_protocol/src/responses/mod.rs @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod streams; + +use crate::WireError; +use crate::codec::{WireDecode, WireEncode}; +use bytes::BytesMut; + +/// Marker type for commands that return an empty response payload. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct EmptyResponse; + +impl WireEncode for EmptyResponse { + fn encoded_size(&self) -> usize { + 0 + } + + fn encode(&self, _buf: &mut BytesMut) {} +} + +impl WireDecode for EmptyResponse { + fn decode(_buf: &[u8]) -> Result<(Self, usize), WireError> { + Ok((Self, 0)) + } +} diff --git a/core/binary_protocol/src/responses/streams/create_stream.rs b/core/binary_protocol/src/responses/streams/create_stream.rs new file mode 100644 index 000000000..7b23fa9ce --- /dev/null +++ b/core/binary_protocol/src/responses/streams/create_stream.rs @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// `CreateStream` response is a [`StreamResponse`](super::StreamResponse) (stream header). +/// A newly created stream has zero topics, zero size, zero messages. +/// +/// Re-exported as a type alias for clarity at call sites. +pub type CreateStreamResponse = super::StreamResponse; diff --git a/core/binary_protocol/src/responses/streams/delete_stream.rs b/core/binary_protocol/src/responses/streams/delete_stream.rs new file mode 100644 index 000000000..35903ae87 --- /dev/null +++ b/core/binary_protocol/src/responses/streams/delete_stream.rs @@ -0,0 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// `DeleteStream` response is empty. +pub type DeleteStreamResponse = super::EmptyResponse; diff --git a/core/binary_protocol/src/responses/streams/get_stream.rs b/core/binary_protocol/src/responses/streams/get_stream.rs new file mode 100644 index 000000000..fc6aed76f --- /dev/null +++ b/core/binary_protocol/src/responses/streams/get_stream.rs @@ -0,0 +1,232 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::WireError; +use crate::codec::{WireDecode, WireEncode, read_u8, read_u32_le, read_u64_le}; +use crate::responses::streams::StreamResponse; +use crate::wire_name::WireName; +use bytes::{BufMut, BytesMut}; + +/// Topic header within a `GetStream` response. +/// +/// Wire format (51 + `name_len` bytes): +/// ```text +/// [id:4][created_at:8][partitions_count:4][message_expiry:8] +/// [compression_algorithm:1][max_topic_size:8][replication_factor:1] +/// [size_bytes:8][messages_count:8][name_len:1][name:N] +/// ``` +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct TopicHeader { + pub id: u32, + pub created_at: u64, + pub partitions_count: u32, + pub message_expiry: u64, + pub compression_algorithm: u8, + pub max_topic_size: u64, + pub replication_factor: u8, + pub size_bytes: u64, + pub messages_count: u64, + pub name: WireName, +} + +impl TopicHeader { + const FIXED_SIZE: usize = 4 + 8 + 4 + 8 + 1 + 8 + 1 + 8 + 8 + 1; // 51 +} + +impl WireEncode for TopicHeader { + fn encoded_size(&self) -> usize { + Self::FIXED_SIZE + self.name.len() + } + + fn encode(&self, buf: &mut BytesMut) { + buf.put_u32_le(self.id); + buf.put_u64_le(self.created_at); + buf.put_u32_le(self.partitions_count); + buf.put_u64_le(self.message_expiry); + buf.put_u8(self.compression_algorithm); + buf.put_u64_le(self.max_topic_size); + buf.put_u8(self.replication_factor); + buf.put_u64_le(self.size_bytes); + buf.put_u64_le(self.messages_count); + self.name.encode(buf); + } +} + +impl WireDecode for TopicHeader { + fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> { + let id = read_u32_le(buf, 0)?; + let created_at = read_u64_le(buf, 4)?; + let partitions_count = read_u32_le(buf, 12)?; + let message_expiry = read_u64_le(buf, 16)?; + let compression_algorithm = read_u8(buf, 24)?; + let max_topic_size = read_u64_le(buf, 25)?; + let replication_factor = read_u8(buf, 33)?; + let size_bytes = read_u64_le(buf, 34)?; + let messages_count = read_u64_le(buf, 42)?; + let (name, name_consumed) = WireName::decode(&buf[50..])?; + let consumed = 50 + name_consumed; + + Ok(( + Self { + id, + created_at, + partitions_count, + message_expiry, + compression_algorithm, + max_topic_size, + replication_factor, + size_bytes, + messages_count, + name, + }, + consumed, + )) + } +} + +/// `GetStream` response: stream header followed by topic headers. +/// +/// Wire format: +/// ```text +/// [StreamResponse][TopicHeader]* +/// ``` +/// +/// The number of topics is determined by `stream.topics_count` or by +/// consuming remaining bytes (topics are packed sequentially). +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct GetStreamResponse { + pub stream: StreamResponse, + pub topics: Vec<TopicHeader>, +} + +impl WireEncode for GetStreamResponse { + fn encoded_size(&self) -> usize { + self.stream.encoded_size() + + self + .topics + .iter() + .map(WireEncode::encoded_size) + .sum::<usize>() + } + + fn encode(&self, buf: &mut BytesMut) { + self.stream.encode(buf); + for topic in &self.topics { + topic.encode(buf); + } + } +} + +impl WireDecode for GetStreamResponse { + fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> { + let (stream, mut pos) = StreamResponse::decode(buf)?; + let mut topics = Vec::new(); + while pos < buf.len() { + let (topic, consumed) = TopicHeader::decode(&buf[pos..])?; + pos += consumed; + topics.push(topic); + } + if topics.len() != stream.topics_count as usize { + return Err(WireError::Validation(format!( + "stream.topics_count={} but decoded {} topics", + stream.topics_count, + topics.len() + ))); + } + Ok((Self { stream, topics }, pos)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn sample_stream() -> StreamResponse { + StreamResponse { + id: 1, + created_at: 1_710_000_000_000, + topics_count: 2, + size_bytes: 2048, + messages_count: 200, + name: WireName::new("my-stream").unwrap(), + } + } + + fn sample_topic(id: u32, name: &str) -> TopicHeader { + TopicHeader { + id, + created_at: 1_710_000_000_000, + partitions_count: 3, + message_expiry: 0, + compression_algorithm: 1, + max_topic_size: 0, + replication_factor: 1, + size_bytes: 1024, + messages_count: 100, + name: WireName::new(name).unwrap(), + } + } + + #[test] + fn roundtrip_with_topics() { + let resp = GetStreamResponse { + stream: sample_stream(), + topics: vec![sample_topic(1, "topic-a"), sample_topic(2, "topic-b")], + }; + let bytes = resp.to_bytes(); + let (decoded, consumed) = GetStreamResponse::decode(&bytes).unwrap(); + assert_eq!(consumed, bytes.len()); + assert_eq!(decoded, resp); + } + + #[test] + fn roundtrip_no_topics() { + let resp = GetStreamResponse { + stream: StreamResponse { + topics_count: 0, + ..sample_stream() + }, + topics: vec![], + }; + let bytes = resp.to_bytes(); + let (decoded, consumed) = GetStreamResponse::decode(&bytes).unwrap(); + assert_eq!(consumed, bytes.len()); + assert_eq!(decoded, resp); + } + + #[test] + fn topic_header_roundtrip() { + let topic = sample_topic(5, "events"); + let bytes = topic.to_bytes(); + assert_eq!(bytes.len(), TopicHeader::FIXED_SIZE + 6); + let (decoded, consumed) = TopicHeader::decode(&bytes).unwrap(); + assert_eq!(consumed, bytes.len()); + assert_eq!(decoded, topic); + } + + #[test] + fn topic_header_truncated_returns_error() { + let topic = sample_topic(1, "t"); + let bytes = topic.to_bytes(); + for i in 0..bytes.len() { + assert!( + TopicHeader::decode(&bytes[..i]).is_err(), + "expected error for truncation at byte {i}" + ); + } + } +} diff --git a/core/binary_protocol/src/responses/streams/get_streams.rs b/core/binary_protocol/src/responses/streams/get_streams.rs new file mode 100644 index 000000000..3221c45e4 --- /dev/null +++ b/core/binary_protocol/src/responses/streams/get_streams.rs @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::WireError; +use crate::codec::{WireDecode, WireEncode}; +use crate::responses::streams::StreamResponse; +use bytes::BytesMut; + +/// `GetStreams` response: sequential stream headers. +/// +/// Wire format: +/// ```text +/// [StreamResponse]* +/// ``` +/// +/// Empty payload means zero streams. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct GetStreamsResponse { + pub streams: Vec<StreamResponse>, +} + +impl WireEncode for GetStreamsResponse { + fn encoded_size(&self) -> usize { + self.streams.iter().map(WireEncode::encoded_size).sum() + } + + fn encode(&self, buf: &mut BytesMut) { + for stream in &self.streams { + stream.encode(buf); + } + } +} + +impl WireDecode for GetStreamsResponse { + fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> { + let mut streams = Vec::new(); + let mut pos = 0; + while pos < buf.len() { + let (stream, consumed) = StreamResponse::decode(&buf[pos..])?; + pos += consumed; + streams.push(stream); + } + Ok((Self { streams }, pos)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::WireName; + + #[test] + fn roundtrip_empty() { + let resp = GetStreamsResponse { streams: vec![] }; + let bytes = resp.to_bytes(); + assert!(bytes.is_empty()); + let (decoded, consumed) = GetStreamsResponse::decode(&bytes).unwrap(); + assert_eq!(consumed, 0); + assert_eq!(decoded, resp); + } + + #[test] + fn roundtrip_multiple() { + let resp = GetStreamsResponse { + streams: vec![ + StreamResponse { + id: 1, + created_at: 100, + topics_count: 2, + size_bytes: 512, + messages_count: 50, + name: WireName::new("s1").unwrap(), + }, + StreamResponse { + id: 2, + created_at: 200, + topics_count: 0, + size_bytes: 0, + messages_count: 0, + name: WireName::new("stream-two").unwrap(), + }, + ], + }; + let bytes = resp.to_bytes(); + let (decoded, consumed) = GetStreamsResponse::decode(&bytes).unwrap(); + assert_eq!(consumed, bytes.len()); + assert_eq!(decoded, resp); + } +} diff --git a/core/binary_protocol/src/responses/streams/mod.rs b/core/binary_protocol/src/responses/streams/mod.rs new file mode 100644 index 000000000..e245a5648 --- /dev/null +++ b/core/binary_protocol/src/responses/streams/mod.rs @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod create_stream; +mod delete_stream; +pub mod get_stream; +pub mod get_streams; +mod purge_stream; +mod stream_response; +mod update_stream; + +pub use super::EmptyResponse; +pub use create_stream::CreateStreamResponse; +pub use delete_stream::DeleteStreamResponse; +pub use get_stream::{GetStreamResponse, TopicHeader}; +pub use get_streams::GetStreamsResponse; +pub use purge_stream::PurgeStreamResponse; +pub use stream_response::StreamResponse; +pub use update_stream::UpdateStreamResponse; diff --git a/core/binary_protocol/src/responses/streams/purge_stream.rs b/core/binary_protocol/src/responses/streams/purge_stream.rs new file mode 100644 index 000000000..364e563eb --- /dev/null +++ b/core/binary_protocol/src/responses/streams/purge_stream.rs @@ -0,0 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// `PurgeStream` response is empty. +pub type PurgeStreamResponse = super::EmptyResponse; diff --git a/core/binary_protocol/src/responses/streams/stream_response.rs b/core/binary_protocol/src/responses/streams/stream_response.rs new file mode 100644 index 000000000..d62d135e5 --- /dev/null +++ b/core/binary_protocol/src/responses/streams/stream_response.rs @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::WireError; +use crate::codec::{WireDecode, WireEncode, read_u32_le, read_u64_le}; +use crate::wire_name::WireName; +use bytes::{BufMut, BytesMut}; + +/// Stream header on the wire. Used in both single-stream and multi-stream responses. +/// +/// Wire format (33 + `name_len` bytes): +/// ```text +/// [id:4][created_at:8][topics_count:4][size_bytes:8][messages_count:8][name_len:1][name:N] +/// ``` +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct StreamResponse { + pub id: u32, + pub created_at: u64, + pub topics_count: u32, + pub size_bytes: u64, + pub messages_count: u64, + pub name: WireName, +} + +impl StreamResponse { + const FIXED_SIZE: usize = 4 + 8 + 4 + 8 + 8 + 1; // 33 +} + +impl WireEncode for StreamResponse { + fn encoded_size(&self) -> usize { + Self::FIXED_SIZE + self.name.len() + } + + fn encode(&self, buf: &mut BytesMut) { + buf.put_u32_le(self.id); + buf.put_u64_le(self.created_at); + buf.put_u32_le(self.topics_count); + buf.put_u64_le(self.size_bytes); + buf.put_u64_le(self.messages_count); + self.name.encode(buf); + } +} + +impl WireDecode for StreamResponse { + fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> { + let id = read_u32_le(buf, 0)?; + let created_at = read_u64_le(buf, 4)?; + let topics_count = read_u32_le(buf, 12)?; + let size_bytes = read_u64_le(buf, 16)?; + let messages_count = read_u64_le(buf, 24)?; + let (name, name_consumed) = WireName::decode(&buf[32..])?; + let consumed = 32 + name_consumed; + + Ok(( + Self { + id, + created_at, + topics_count, + size_bytes, + messages_count, + name, + }, + consumed, + )) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn sample() -> StreamResponse { + StreamResponse { + id: 1, + created_at: 1_710_000_000_000, + topics_count: 3, + size_bytes: 1024, + messages_count: 100, + name: WireName::new("test-stream").unwrap(), + } + } + + #[test] + fn roundtrip() { + let resp = sample(); + let bytes = resp.to_bytes(); + assert_eq!(bytes.len(), StreamResponse::FIXED_SIZE + 11); + let (decoded, consumed) = StreamResponse::decode(&bytes).unwrap(); + assert_eq!(consumed, bytes.len()); + assert_eq!(decoded, resp); + } + + #[test] + fn truncated_returns_error() { + let resp = sample(); + let bytes = resp.to_bytes(); + for i in 0..bytes.len() { + assert!( + StreamResponse::decode(&bytes[..i]).is_err(), + "expected error for truncation at byte {i}" + ); + } + } + + #[test] + fn multiple_streams_sequential() { + let s1 = StreamResponse { + id: 1, + name: WireName::new("a").unwrap(), + ..sample() + }; + let s2 = StreamResponse { + id: 2, + name: WireName::new("bb").unwrap(), + ..sample() + }; + let mut buf = BytesMut::new(); + s1.encode(&mut buf); + s2.encode(&mut buf); + let bytes = buf.freeze(); + + let (d1, pos1) = StreamResponse::decode(&bytes).unwrap(); + let (d2, pos2) = StreamResponse::decode(&bytes[pos1..]).unwrap(); + assert_eq!(d1, s1); + assert_eq!(d2, s2); + assert_eq!(pos1 + pos2, bytes.len()); + } +} diff --git a/core/binary_protocol/src/responses/streams/update_stream.rs b/core/binary_protocol/src/responses/streams/update_stream.rs new file mode 100644 index 000000000..b002d9c86 --- /dev/null +++ b/core/binary_protocol/src/responses/streams/update_stream.rs @@ -0,0 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// `UpdateStream` response is empty. +pub type UpdateStreamResponse = super::EmptyResponse; diff --git a/core/binary_protocol/src/wire_name.rs b/core/binary_protocol/src/wire_name.rs new file mode 100644 index 000000000..fe56733e9 --- /dev/null +++ b/core/binary_protocol/src/wire_name.rs @@ -0,0 +1,170 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::WireError; +use crate::codec::{WireDecode, WireEncode, read_str, read_u8}; +use bytes::{BufMut, BytesMut}; +use std::ops::Deref; + +/// Maximum byte length for a wire name (fits in a u8 length prefix). +pub const MAX_WIRE_NAME_LENGTH: usize = 255; + +/// Validated name type used by protocol request/response types. +/// +/// Guarantees the inner string is 1-255 bytes, matching the u8 +/// length prefix used on the wire. +#[derive(Clone, PartialEq, Eq)] +pub struct WireName(String); + +impl WireName { + /// Create a new `WireName`, validating the length is 1-255 bytes. + /// + /// # Errors + /// Returns `WireError::Validation` if the name is empty or exceeds 255 bytes. + pub fn new(s: impl Into<String>) -> Result<Self, WireError> { + let s = s.into(); + if s.is_empty() || s.len() > MAX_WIRE_NAME_LENGTH { + return Err(WireError::Validation(format!( + "wire name must be 1-{MAX_WIRE_NAME_LENGTH} bytes, got {}", + s.len() + ))); + } + Ok(Self(s)) + } + + #[must_use] + pub fn as_str(&self) -> &str { + &self.0 + } + + #[must_use] + pub const fn len(&self) -> usize { + self.0.len() + } + + /// Always returns `false` - `WireName` is guaranteed non-empty by construction. + #[must_use] + pub const fn is_empty(&self) -> bool { + false + } +} + +impl Deref for WireName { + type Target = str; + + fn deref(&self) -> &str { + &self.0 + } +} + +impl std::fmt::Display for WireName { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(&self.0) + } +} + +impl std::fmt::Debug for WireName { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "WireName({:?})", self.0) + } +} + +impl WireEncode for WireName { + fn encoded_size(&self) -> usize { + 1 + self.0.len() + } + + fn encode(&self, buf: &mut BytesMut) { + // Length guaranteed <= 255 by construction, truncation impossible. + #[allow(clippy::cast_possible_truncation)] + buf.put_u8(self.0.len() as u8); + buf.put_slice(self.0.as_bytes()); + } +} + +impl WireDecode for WireName { + fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> { + let name_len = read_u8(buf, 0)? as usize; + if name_len == 0 || name_len > MAX_WIRE_NAME_LENGTH { + return Err(WireError::Validation(format!( + "wire name must be 1-{MAX_WIRE_NAME_LENGTH} bytes, got {name_len}" + ))); + } + let name = read_str(buf, 1, name_len)?; + Ok((Self(name), 1 + name_len)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn valid_name() { + let name = WireName::new("test").unwrap(); + assert_eq!(name.as_str(), "test"); + assert_eq!(name.len(), 4); + assert_eq!(&*name, "test"); + } + + #[test] + fn empty_rejected() { + assert!(WireName::new("").is_err()); + } + + #[test] + fn too_long_rejected() { + let long = "a".repeat(256); + assert!(WireName::new(long).is_err()); + } + + #[test] + fn max_length_accepted() { + let name = "a".repeat(255); + assert!(WireName::new(name).is_ok()); + } + + #[test] + fn roundtrip() { + let name = WireName::new("my-stream").unwrap(); + let bytes = name.to_bytes(); + assert_eq!(bytes.len(), 1 + 9); + let (decoded, consumed) = WireName::decode(&bytes).unwrap(); + assert_eq!(consumed, 10); + assert_eq!(decoded, name); + } + + #[test] + fn display() { + let name = WireName::new("hello").unwrap(); + assert_eq!(format!("{name}"), "hello"); + } + + #[test] + fn deref_to_str() { + let name = WireName::new("test").unwrap(); + assert!(name.starts_with("te")); + } + + #[test] + fn non_ascii_utf8_roundtrip() { + let name = WireName::new("str\u{00e9}am-\u{00fc}ser").unwrap(); + let bytes = name.to_bytes(); + let (decoded, _) = WireName::decode(&bytes).unwrap(); + assert_eq!(decoded, name); + } +}
