This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new b87a84f96 feat(rust): add wire protocol codec and types to
binary_protocol crate (#2946)
b87a84f96 is described below
commit b87a84f9655e73cded7d32cf2a794eb6fb5eeae3
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Tue Mar 17 12:38:16 2026 +0100
feat(rust): add wire protocol codec and types to binary_protocol crate
(#2946)
---
.typos.toml | 2 +
Cargo.lock | 6 +
core/binary_protocol/Cargo.toml | 11 +-
core/binary_protocol/src/codec.rs | 174 +++++
core/binary_protocol/src/codes.rs | 221 ++++++
core/binary_protocol/src/consensus/command.rs | 67 ++
core/binary_protocol/src/consensus/error.rs | 64 ++
core/binary_protocol/src/consensus/header.rs | 744 +++++++++++++++++++++
core/binary_protocol/src/consensus/message.rs | 338 ++++++++++
core/binary_protocol/src/consensus/mod.rs | 54 ++
core/binary_protocol/src/consensus/operation.rs | 221 ++++++
core/binary_protocol/src/error.rs | 50 ++
core/binary_protocol/src/frame.rs | 26 +
core/binary_protocol/src/identifier.rs | 408 +++++++++++
core/binary_protocol/src/lib.rs | 75 ++-
core/binary_protocol/src/requests/mod.rs | 18 +
.../src/requests/streams/create_stream.rs | 96 +++
.../src/requests/streams/delete_stream.rs | 60 ++
.../src/requests/streams/get_stream.rs | 80 +++
.../src/requests/streams/get_streams.rs | 53 ++
core/binary_protocol/src/requests/streams/mod.rs | 30 +
.../src/requests/streams/purge_stream.rs | 60 ++
.../src/requests/streams/update_stream.rs | 81 +++
core/binary_protocol/src/responses/mod.rs | 40 ++
.../src/responses/streams/create_stream.rs | 22 +
.../src/responses/streams/delete_stream.rs | 19 +
.../src/responses/streams/get_stream.rs | 232 +++++++
.../src/responses/streams/get_streams.rs | 103 +++
core/binary_protocol/src/responses/streams/mod.rs | 33 +
.../src/responses/streams/purge_stream.rs | 19 +
.../src/responses/streams/stream_response.rs | 142 ++++
.../src/responses/streams/update_stream.rs | 19 +
32 files changed, 3550 insertions(+), 18 deletions(-)
diff --git a/.typos.toml b/.typos.toml
index bb4d5e4aa..dba98da09 100644
--- a/.typos.toml
+++ b/.typos.toml
@@ -29,6 +29,8 @@ bais = "bais"
Strat = "Strat"
# Same as above
strin = "strin"
+# Valid prefix in Unicode escape test strings (e.g. "caf\u{00e9}" = cafe)
+caf = "caf"
# Exclude auto-generated/non-editable files from typos check
[files]
diff --git a/Cargo.lock b/Cargo.lock
index faa885be4..dffee4844 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5326,6 +5326,12 @@ dependencies = [
[[package]]
name = "iggy_binary_protocol"
version = "0.9.2-edge.1"
+dependencies = [
+ "bytemuck",
+ "bytes",
+ "enumset",
+ "thiserror 2.0.18",
+]
[[package]]
name = "iggy_common"
diff --git a/core/binary_protocol/Cargo.toml b/core/binary_protocol/Cargo.toml
index 2d809084d..12ed184fd 100644
--- a/core/binary_protocol/Cargo.toml
+++ b/core/binary_protocol/Cargo.toml
@@ -18,7 +18,7 @@
[package]
name = "iggy_binary_protocol"
version = "0.9.2-edge.1"
-description = "Iggy is the persistent message streaming platform written in
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing
millions of messages per second."
+description = "Wire protocol types and codec for the Iggy binary protocol.
Shared between server and SDK."
edition = "2024"
license = "Apache-2.0"
keywords = ["iggy", "messaging", "streaming"]
@@ -29,3 +29,12 @@ repository = "https://github.com/apache/iggy"
readme = "../../README.md"
[dependencies]
+bytemuck = { workspace = true }
+bytes = { workspace = true }
+enumset = { workspace = true }
+thiserror = { workspace = true }
+
+[lints.clippy]
+enum_glob_use = "deny"
+nursery = "warn"
+pedantic = "deny"
diff --git a/core/binary_protocol/src/codec.rs
b/core/binary_protocol/src/codec.rs
new file mode 100644
index 000000000..3aedd9c5e
--- /dev/null
+++ b/core/binary_protocol/src/codec.rs
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use bytes::{Bytes, BytesMut};
+
+/// Encode a wire type into a caller-owned buffer.
+///
+/// Buffer-first design: the caller controls allocation. The `to_bytes()`
+/// convenience method allocates when needed, but hot paths can reuse
+/// buffers via `encode()` directly.
+pub trait WireEncode {
+ /// Write the encoded representation into `buf`.
+ fn encode(&self, buf: &mut BytesMut);
+
+ /// Return the exact encoded size in bytes.
+ fn encoded_size(&self) -> usize;
+
+ /// Convenience: allocate a new [`Bytes`] and encode into it.
+ #[must_use]
+ fn to_bytes(&self) -> Bytes {
+ let mut buf = BytesMut::with_capacity(self.encoded_size());
+ self.encode(&mut buf);
+ buf.freeze()
+ }
+}
+
+/// Decode a wire type from a byte slice.
+///
+/// Takes `&[u8]` instead of [`Bytes`] to avoid requiring reference-counted
+/// ownership at the decode boundary.
+pub trait WireDecode: Sized {
+ /// Decode from `buf`, consuming exactly the bytes needed.
+ /// Returns the decoded value and the number of bytes consumed.
+ ///
+ /// # Errors
+ /// Returns `WireError` if the buffer is too short or contains invalid
data.
+ fn decode(buf: &[u8]) -> Result<(Self, usize), WireError>;
+
+ /// Convenience: decode from the entire buffer, ignoring trailing bytes.
+ ///
+ /// # Errors
+ /// Returns `WireError` if decoding fails.
+ fn decode_from(buf: &[u8]) -> Result<Self, WireError> {
+ Self::decode(buf).map(|(val, _)| val)
+ }
+}
+
+/// Helper to read a `u8` from `buf` at `offset`.
+///
+/// # Errors
+/// Returns `WireError::UnexpectedEof` if `offset` is out of bounds.
+#[inline]
+pub fn read_u8(buf: &[u8], offset: usize) -> Result<u8, WireError> {
+ buf.get(offset)
+ .copied()
+ .ok_or_else(|| WireError::UnexpectedEof {
+ offset,
+ need: 1,
+ have: buf.len().saturating_sub(offset),
+ })
+}
+
+/// Helper to read a `u32` LE from `buf` at `offset`.
+///
+/// # Errors
+/// Returns `WireError::UnexpectedEof` if fewer than 4 bytes remain.
+#[allow(clippy::missing_panics_doc)]
+#[inline]
+pub fn read_u32_le(buf: &[u8], offset: usize) -> Result<u32, WireError> {
+ let end = offset
+ .checked_add(4)
+ .ok_or_else(|| WireError::UnexpectedEof {
+ offset,
+ need: 4,
+ have: buf.len().saturating_sub(offset),
+ })?;
+ let slice = buf
+ .get(offset..end)
+ .ok_or_else(|| WireError::UnexpectedEof {
+ offset,
+ need: 4,
+ have: buf.len().saturating_sub(offset),
+ })?;
+ Ok(u32::from_le_bytes(
+ slice.try_into().expect("slice is exactly 4 bytes"),
+ ))
+}
+
+/// Helper to read a `u64` LE from `buf` at `offset`.
+///
+/// # Errors
+/// Returns `WireError::UnexpectedEof` if fewer than 8 bytes remain.
+#[allow(clippy::missing_panics_doc)]
+#[inline]
+pub fn read_u64_le(buf: &[u8], offset: usize) -> Result<u64, WireError> {
+ let end = offset
+ .checked_add(8)
+ .ok_or_else(|| WireError::UnexpectedEof {
+ offset,
+ need: 8,
+ have: buf.len().saturating_sub(offset),
+ })?;
+ let slice = buf
+ .get(offset..end)
+ .ok_or_else(|| WireError::UnexpectedEof {
+ offset,
+ need: 8,
+ have: buf.len().saturating_sub(offset),
+ })?;
+ Ok(u64::from_le_bytes(
+ slice.try_into().expect("slice is exactly 8 bytes"),
+ ))
+}
+
+/// Helper to read a UTF-8 string of `len` bytes from `buf` at `offset`.
+///
+/// # Errors
+/// Returns `WireError::UnexpectedEof` or `WireError::InvalidUtf8` on failure.
+#[inline]
+pub fn read_str(buf: &[u8], offset: usize, len: usize) -> Result<String,
WireError> {
+ let end = offset
+ .checked_add(len)
+ .ok_or_else(|| WireError::UnexpectedEof {
+ offset,
+ need: len,
+ have: buf.len().saturating_sub(offset),
+ })?;
+ let slice = buf
+ .get(offset..end)
+ .ok_or_else(|| WireError::UnexpectedEof {
+ offset,
+ need: len,
+ have: buf.len().saturating_sub(offset),
+ })?;
+ std::str::from_utf8(slice)
+ .map(str::to_string)
+ .map_err(|_| WireError::InvalidUtf8 { offset })
+}
+
+/// Helper to read a byte slice of `len` bytes from `buf` at `offset`.
+///
+/// # Errors
+/// Returns `WireError::UnexpectedEof` if fewer than `len` bytes remain.
+#[inline]
+pub fn read_bytes(buf: &[u8], offset: usize, len: usize) -> Result<&[u8],
WireError> {
+ let end = offset
+ .checked_add(len)
+ .ok_or_else(|| WireError::UnexpectedEof {
+ offset,
+ need: len,
+ have: buf.len().saturating_sub(offset),
+ })?;
+ buf.get(offset..end)
+ .ok_or_else(|| WireError::UnexpectedEof {
+ offset,
+ need: len,
+ have: buf.len().saturating_sub(offset),
+ })
+}
diff --git a/core/binary_protocol/src/codes.rs
b/core/binary_protocol/src/codes.rs
new file mode 100644
index 000000000..f20209172
--- /dev/null
+++ b/core/binary_protocol/src/codes.rs
@@ -0,0 +1,221 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+
+// TODO: consider converting these constants into a `#[repr(u32)]` enum with
`TryFrom<u32>`
+// for type safety at conversion boundaries (see PR #2946 discussion).
+
+// -- System --
+pub const PING_CODE: u32 = 1;
+pub const GET_STATS_CODE: u32 = 10;
+pub const GET_SNAPSHOT_FILE_CODE: u32 = 11;
+pub const GET_CLUSTER_METADATA_CODE: u32 = 12;
+pub const GET_ME_CODE: u32 = 20;
+pub const GET_CLIENT_CODE: u32 = 21;
+pub const GET_CLIENTS_CODE: u32 = 22;
+
+// -- Users --
+pub const GET_USER_CODE: u32 = 31;
+pub const GET_USERS_CODE: u32 = 32;
+pub const CREATE_USER_CODE: u32 = 33;
+pub const DELETE_USER_CODE: u32 = 34;
+pub const UPDATE_USER_CODE: u32 = 35;
+pub const UPDATE_PERMISSIONS_CODE: u32 = 36;
+pub const CHANGE_PASSWORD_CODE: u32 = 37;
+pub const LOGIN_USER_CODE: u32 = 38;
+pub const LOGOUT_USER_CODE: u32 = 39;
+
+// -- Personal Access Tokens --
+pub const GET_PERSONAL_ACCESS_TOKENS_CODE: u32 = 41;
+pub const CREATE_PERSONAL_ACCESS_TOKEN_CODE: u32 = 42;
+pub const DELETE_PERSONAL_ACCESS_TOKEN_CODE: u32 = 43;
+pub const LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE: u32 = 44;
+
+// -- Messages --
+pub const POLL_MESSAGES_CODE: u32 = 100;
+pub const SEND_MESSAGES_CODE: u32 = 101;
+pub const FLUSH_UNSAVED_BUFFER_CODE: u32 = 102;
+
+// -- Consumer Offsets --
+pub const GET_CONSUMER_OFFSET_CODE: u32 = 120;
+pub const STORE_CONSUMER_OFFSET_CODE: u32 = 121;
+pub const DELETE_CONSUMER_OFFSET_CODE: u32 = 122;
+
+// -- Streams --
+pub const GET_STREAM_CODE: u32 = 200;
+pub const GET_STREAMS_CODE: u32 = 201;
+pub const CREATE_STREAM_CODE: u32 = 202;
+pub const DELETE_STREAM_CODE: u32 = 203;
+pub const UPDATE_STREAM_CODE: u32 = 204;
+pub const PURGE_STREAM_CODE: u32 = 205;
+
+// -- Topics --
+pub const GET_TOPIC_CODE: u32 = 300;
+pub const GET_TOPICS_CODE: u32 = 301;
+pub const CREATE_TOPIC_CODE: u32 = 302;
+pub const DELETE_TOPIC_CODE: u32 = 303;
+pub const UPDATE_TOPIC_CODE: u32 = 304;
+pub const PURGE_TOPIC_CODE: u32 = 305;
+
+// -- Partitions --
+pub const CREATE_PARTITIONS_CODE: u32 = 402;
+pub const DELETE_PARTITIONS_CODE: u32 = 403;
+
+// -- Segments --
+pub const DELETE_SEGMENTS_CODE: u32 = 503;
+
+// -- Consumer Groups --
+pub const GET_CONSUMER_GROUP_CODE: u32 = 600;
+pub const GET_CONSUMER_GROUPS_CODE: u32 = 601;
+pub const CREATE_CONSUMER_GROUP_CODE: u32 = 602;
+pub const DELETE_CONSUMER_GROUP_CODE: u32 = 603;
+pub const JOIN_CONSUMER_GROUP_CODE: u32 = 604;
+pub const LEAVE_CONSUMER_GROUP_CODE: u32 = 605;
+
+/// # Errors
+/// Returns `WireError::UnknownCommand` if the code is not recognized.
+pub const fn command_name(code: u32) -> Result<&'static str, WireError> {
+ match code {
+ PING_CODE => Ok("ping"),
+ GET_STATS_CODE => Ok("stats"),
+ GET_SNAPSHOT_FILE_CODE => Ok("snapshot"),
+ GET_CLUSTER_METADATA_CODE => Ok("cluster.metadata"),
+ GET_ME_CODE => Ok("me"),
+ GET_CLIENT_CODE => Ok("client.get"),
+ GET_CLIENTS_CODE => Ok("client.list"),
+ GET_USER_CODE => Ok("user.get"),
+ GET_USERS_CODE => Ok("user.list"),
+ CREATE_USER_CODE => Ok("user.create"),
+ DELETE_USER_CODE => Ok("user.delete"),
+ UPDATE_USER_CODE => Ok("user.update"),
+ UPDATE_PERMISSIONS_CODE => Ok("user.permissions"),
+ CHANGE_PASSWORD_CODE => Ok("user.password"),
+ LOGIN_USER_CODE => Ok("user.login"),
+ LOGOUT_USER_CODE => Ok("user.logout"),
+ GET_PERSONAL_ACCESS_TOKENS_CODE => Ok("personal_access_token.list"),
+ CREATE_PERSONAL_ACCESS_TOKEN_CODE =>
Ok("personal_access_token.create"),
+ DELETE_PERSONAL_ACCESS_TOKEN_CODE =>
Ok("personal_access_token.delete"),
+ LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE =>
Ok("personal_access_token.login"),
+ POLL_MESSAGES_CODE => Ok("message.poll"),
+ SEND_MESSAGES_CODE => Ok("message.send"),
+ FLUSH_UNSAVED_BUFFER_CODE => Ok("message.flush_unsaved_buffer"),
+ GET_CONSUMER_OFFSET_CODE => Ok("consumer_offset.get"),
+ STORE_CONSUMER_OFFSET_CODE => Ok("consumer_offset.store"),
+ DELETE_CONSUMER_OFFSET_CODE => Ok("consumer_offset.delete"),
+ GET_STREAM_CODE => Ok("stream.get"),
+ GET_STREAMS_CODE => Ok("stream.list"),
+ CREATE_STREAM_CODE => Ok("stream.create"),
+ DELETE_STREAM_CODE => Ok("stream.delete"),
+ UPDATE_STREAM_CODE => Ok("stream.update"),
+ PURGE_STREAM_CODE => Ok("stream.purge"),
+ GET_TOPIC_CODE => Ok("topic.get"),
+ GET_TOPICS_CODE => Ok("topic.list"),
+ CREATE_TOPIC_CODE => Ok("topic.create"),
+ DELETE_TOPIC_CODE => Ok("topic.delete"),
+ UPDATE_TOPIC_CODE => Ok("topic.update"),
+ PURGE_TOPIC_CODE => Ok("topic.purge"),
+ CREATE_PARTITIONS_CODE => Ok("partition.create"),
+ DELETE_PARTITIONS_CODE => Ok("partition.delete"),
+ DELETE_SEGMENTS_CODE => Ok("segment.delete"),
+ GET_CONSUMER_GROUP_CODE => Ok("consumer_group.get"),
+ GET_CONSUMER_GROUPS_CODE => Ok("consumer_group.list"),
+ CREATE_CONSUMER_GROUP_CODE => Ok("consumer_group.create"),
+ DELETE_CONSUMER_GROUP_CODE => Ok("consumer_group.delete"),
+ JOIN_CONSUMER_GROUP_CODE => Ok("consumer_group.join"),
+ LEAVE_CONSUMER_GROUP_CODE => Ok("consumer_group.leave"),
+ _ => Err(WireError::UnknownCommand(code)),
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ const ALL_CODES: &[u32] = &[
+ PING_CODE,
+ GET_STATS_CODE,
+ GET_SNAPSHOT_FILE_CODE,
+ GET_CLUSTER_METADATA_CODE,
+ GET_ME_CODE,
+ GET_CLIENT_CODE,
+ GET_CLIENTS_CODE,
+ GET_USER_CODE,
+ GET_USERS_CODE,
+ CREATE_USER_CODE,
+ DELETE_USER_CODE,
+ UPDATE_USER_CODE,
+ UPDATE_PERMISSIONS_CODE,
+ CHANGE_PASSWORD_CODE,
+ LOGIN_USER_CODE,
+ LOGOUT_USER_CODE,
+ GET_PERSONAL_ACCESS_TOKENS_CODE,
+ CREATE_PERSONAL_ACCESS_TOKEN_CODE,
+ DELETE_PERSONAL_ACCESS_TOKEN_CODE,
+ LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE,
+ POLL_MESSAGES_CODE,
+ SEND_MESSAGES_CODE,
+ FLUSH_UNSAVED_BUFFER_CODE,
+ GET_CONSUMER_OFFSET_CODE,
+ STORE_CONSUMER_OFFSET_CODE,
+ DELETE_CONSUMER_OFFSET_CODE,
+ GET_STREAM_CODE,
+ GET_STREAMS_CODE,
+ CREATE_STREAM_CODE,
+ DELETE_STREAM_CODE,
+ UPDATE_STREAM_CODE,
+ PURGE_STREAM_CODE,
+ GET_TOPIC_CODE,
+ GET_TOPICS_CODE,
+ CREATE_TOPIC_CODE,
+ DELETE_TOPIC_CODE,
+ UPDATE_TOPIC_CODE,
+ PURGE_TOPIC_CODE,
+ CREATE_PARTITIONS_CODE,
+ DELETE_PARTITIONS_CODE,
+ DELETE_SEGMENTS_CODE,
+ GET_CONSUMER_GROUP_CODE,
+ GET_CONSUMER_GROUPS_CODE,
+ CREATE_CONSUMER_GROUP_CODE,
+ DELETE_CONSUMER_GROUP_CODE,
+ JOIN_CONSUMER_GROUP_CODE,
+ LEAVE_CONSUMER_GROUP_CODE,
+ ];
+
+ #[test]
+ fn every_code_has_a_name() {
+ for &code in ALL_CODES {
+ assert!(
+ command_name(code).is_ok(),
+ "missing name for command code {code}"
+ );
+ }
+ }
+
+ #[test]
+ fn no_duplicate_codes() {
+ let mut seen = std::collections::HashSet::new();
+ for &code in ALL_CODES {
+ assert!(seen.insert(code), "duplicate command code: {code}");
+ }
+ }
+
+ #[test]
+ fn unknown_code_returns_error() {
+ assert!(command_name(9999).is_err());
+ }
+}
diff --git a/core/binary_protocol/src/consensus/command.rs
b/core/binary_protocol/src/consensus/command.rs
new file mode 100644
index 000000000..8eea606bc
--- /dev/null
+++ b/core/binary_protocol/src/consensus/command.rs
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytemuck::{CheckedBitPattern, NoUninit};
+use enumset::EnumSetType;
+
+/// VSR message type discriminant.
+#[derive(Default, Debug, EnumSetType)]
+#[repr(u8)]
+pub enum Command2 {
+ #[default]
+ Reserved = 0,
+
+ Ping = 1,
+ Pong = 2,
+ PingClient = 3,
+ PongClient = 4,
+
+ Request = 5,
+ Prepare = 6,
+ PrepareOk = 7,
+ Reply = 8,
+ Commit = 9,
+
+ StartViewChange = 10,
+ DoViewChange = 11,
+ StartView = 12,
+}
+
+// SAFETY: Command2 is #[repr(u8)] with no padding bytes.
+unsafe impl NoUninit for Command2 {}
+
+// SAFETY: Command2 is #[repr(u8)]; is_valid_bit_pattern matches all defined
discriminants.
+unsafe impl CheckedBitPattern for Command2 {
+ type Bits = u8;
+
+ fn is_valid_bit_pattern(bits: &u8) -> bool {
+ *bits <= 12
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::consensus::GenericHeader;
+
+ #[test]
+ fn invalid_bit_pattern_rejected() {
+ let mut buf = bytes::BytesMut::zeroed(256);
+ buf[60] = 99;
+ let result = bytemuck::checked::try_from_bytes::<GenericHeader>(&buf);
+ assert!(result.is_err());
+ }
+}
diff --git a/core/binary_protocol/src/consensus/error.rs
b/core/binary_protocol/src/consensus/error.rs
new file mode 100644
index 000000000..cb7838128
--- /dev/null
+++ b/core/binary_protocol/src/consensus/error.rs
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::command::Command2;
+use thiserror::Error;
+
+#[derive(Debug, Clone, Error, PartialEq, Eq)]
+pub enum ConsensusError {
+ #[error("invalid command: expected {expected:?}, found {found:?}")]
+ InvalidCommand { expected: Command2, found: Command2 },
+
+ #[error("invalid size: expected {expected:?}, found {found:?}")]
+ InvalidSize { expected: u32, found: u32 },
+
+ #[error("invalid checksum")]
+ InvalidChecksum,
+
+ #[error("invalid cluster ID")]
+ InvalidCluster,
+
+ #[error("invalid field: {0}")]
+ InvalidField(String),
+
+ // -- Reserved for future consensus validation --
+ // These variants exist in the original iggy_common implementation
+ // and will be used when the consensus crate migrates to this crate.
+ #[error("parent_padding must be 0")]
+ PrepareParentPaddingNonZero,
+
+ #[error("request_checksum_padding must be 0")]
+ PrepareRequestChecksumPaddingNonZero,
+
+ #[error("command must be Commit")]
+ CommitInvalidCommand2,
+
+ #[error("size must be 256, found {0}")]
+ CommitInvalidSize(u32),
+
+ #[error("command must be Reply")]
+ ReplyInvalidCommand2,
+
+ #[error("request_checksum_padding must be 0")]
+ ReplyRequestChecksumPaddingNonZero,
+
+ #[error("context_padding must be 0")]
+ ReplyContextPaddingNonZero,
+
+ #[error("invalid bit pattern in header (enum discriminant out of range)")]
+ InvalidBitPattern,
+}
diff --git a/core/binary_protocol/src/consensus/header.rs
b/core/binary_protocol/src/consensus/header.rs
new file mode 100644
index 000000000..648aae6ef
--- /dev/null
+++ b/core/binary_protocol/src/consensus/header.rs
@@ -0,0 +1,744 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! All consensus headers are exactly 256 bytes with `#[repr(C)]` layout.
+//! Size and field offsets are enforced at compile time. Deserialization
+//! is a pointer cast (zero-copy) via `bytemuck::try_from_bytes`.
+
+use super::{Command2, ConsensusError, Operation};
+use bytemuck::{CheckedBitPattern, NoUninit};
+use std::mem::offset_of;
+
+pub const HEADER_SIZE: usize = 256;
+
+/// Trait implemented by all consensus header types.
+///
+/// Every header is exactly [`HEADER_SIZE`] bytes, `#[repr(C)]`, and supports
+/// zero-copy deserialization via `bytemuck`.
+pub trait ConsensusHeader: Sized + CheckedBitPattern + NoUninit {
+ const COMMAND: Command2;
+
+ /// # Errors
+ /// Returns `ConsensusError` if the header fields are inconsistent.
+ fn validate(&self) -> Result<(), ConsensusError>;
+ fn operation(&self) -> Operation;
+ fn command(&self) -> Command2;
+ fn size(&self) -> u32;
+}
+
+// ---------------------------------------------------------------------------
+// GenericHeader - type-erased dispatch
+// ---------------------------------------------------------------------------
+
+/// Type-erased 256-byte header for initial message dispatch.
+#[repr(C)]
+#[derive(Debug, Clone, Copy, CheckedBitPattern, NoUninit)]
+pub struct GenericHeader {
+ pub checksum: u128,
+ pub checksum_body: u128,
+ pub cluster: u128,
+ pub size: u32,
+ pub view: u32,
+ pub release: u32,
+ pub command: Command2,
+ pub replica: u8,
+ pub reserved_frame: [u8; 66],
+ pub reserved_command: [u8; 128],
+}
+const _: () = {
+ assert!(size_of::<GenericHeader>() == HEADER_SIZE);
+ assert!(
+ offset_of!(GenericHeader, reserved_command)
+ == offset_of!(GenericHeader, reserved_frame) + size_of::<[u8;
66]>()
+ );
+ assert!(offset_of!(GenericHeader, reserved_command) + size_of::<[u8;
128]>() == HEADER_SIZE);
+};
+
+impl ConsensusHeader for GenericHeader {
+ const COMMAND: Command2 = Command2::Reserved;
+ fn operation(&self) -> Operation {
+ Operation::Reserved
+ }
+ fn command(&self) -> Command2 {
+ self.command
+ }
+ fn validate(&self) -> Result<(), ConsensusError> {
+ Ok(())
+ }
+ fn size(&self) -> u32 {
+ self.size
+ }
+}
+
+// ---------------------------------------------------------------------------
+// RequestHeader - client -> primary
+// ---------------------------------------------------------------------------
+
+/// Client -> primary request header. 256 bytes.
+#[repr(C)]
+#[derive(Debug, Clone, Copy, CheckedBitPattern, NoUninit)]
+pub struct RequestHeader {
+ pub checksum: u128,
+ pub checksum_body: u128,
+ pub cluster: u128,
+ pub size: u32,
+ pub view: u32,
+ pub release: u32,
+ pub command: Command2,
+ pub replica: u8,
+ pub reserved_frame: [u8; 66],
+
+ pub client: u128,
+ pub request_checksum: u128,
+ pub timestamp: u64,
+ pub request: u64,
+ pub operation: Operation,
+ pub operation_padding: [u8; 7],
+ pub namespace: u64,
+ pub reserved: [u8; 64],
+}
+const _: () = {
+ assert!(size_of::<RequestHeader>() == HEADER_SIZE);
+ assert!(
+ offset_of!(RequestHeader, client)
+ == offset_of!(RequestHeader, reserved_frame) + size_of::<[u8;
66]>()
+ );
+ assert!(offset_of!(RequestHeader, reserved) + size_of::<[u8; 64]>() ==
HEADER_SIZE);
+};
+
+impl Default for RequestHeader {
+ fn default() -> Self {
+ Self {
+ checksum: 0,
+ checksum_body: 0,
+ cluster: 0,
+ size: 0,
+ view: 0,
+ release: 0,
+ command: Command2::Reserved,
+ replica: 0,
+ reserved_frame: [0; 66],
+ client: 0,
+ request_checksum: 0,
+ timestamp: 0,
+ request: 0,
+ operation: Operation::Reserved,
+ operation_padding: [0; 7],
+ namespace: 0,
+ reserved: [0; 64],
+ }
+ }
+}
+
+impl ConsensusHeader for RequestHeader {
+ const COMMAND: Command2 = Command2::Request;
+ fn operation(&self) -> Operation {
+ self.operation
+ }
+ fn command(&self) -> Command2 {
+ self.command
+ }
+ fn size(&self) -> u32 {
+ self.size
+ }
+
+ fn validate(&self) -> Result<(), ConsensusError> {
+ if self.command != Command2::Request {
+ return Err(ConsensusError::InvalidCommand {
+ expected: Command2::Request,
+ found: self.command,
+ });
+ }
+ Ok(())
+ }
+}
+
+// ---------------------------------------------------------------------------
+// ReplyHeader - primary -> client
+// ---------------------------------------------------------------------------
+
+/// Primary -> client reply header. 256 bytes.
+#[repr(C)]
+#[derive(Debug, Clone, Copy, CheckedBitPattern, NoUninit)]
+pub struct ReplyHeader {
+ pub checksum: u128,
+ pub checksum_body: u128,
+ pub cluster: u128,
+ pub size: u32,
+ pub view: u32,
+ pub release: u32,
+ pub command: Command2,
+ pub replica: u8,
+ pub reserved_frame: [u8; 66],
+
+ pub request_checksum: u128,
+ pub context: u128,
+ pub op: u64,
+ pub commit: u64,
+ pub timestamp: u64,
+ pub request: u64,
+ pub operation: Operation,
+ pub operation_padding: [u8; 7],
+ pub namespace: u64,
+ pub reserved: [u8; 48],
+}
+const _: () = {
+ assert!(size_of::<ReplyHeader>() == HEADER_SIZE);
+ assert!(
+ offset_of!(ReplyHeader, request_checksum)
+ == offset_of!(ReplyHeader, reserved_frame) + size_of::<[u8; 66]>()
+ );
+ assert!(offset_of!(ReplyHeader, reserved) + size_of::<[u8; 48]>() ==
HEADER_SIZE);
+};
+
+impl Default for ReplyHeader {
+ fn default() -> Self {
+ Self {
+ checksum: 0,
+ checksum_body: 0,
+ cluster: 0,
+ size: 0,
+ view: 0,
+ release: 0,
+ command: Command2::Reserved,
+ replica: 0,
+ reserved_frame: [0; 66],
+ request_checksum: 0,
+ context: 0,
+ op: 0,
+ commit: 0,
+ timestamp: 0,
+ request: 0,
+ operation: Operation::Reserved,
+ operation_padding: [0; 7],
+ namespace: 0,
+ reserved: [0; 48],
+ }
+ }
+}
+
+impl ConsensusHeader for ReplyHeader {
+ const COMMAND: Command2 = Command2::Reply;
+ fn operation(&self) -> Operation {
+ self.operation
+ }
+ fn command(&self) -> Command2 {
+ self.command
+ }
+ fn size(&self) -> u32 {
+ self.size
+ }
+
+ fn validate(&self) -> Result<(), ConsensusError> {
+ if self.command != Command2::Reply {
+ return Err(ConsensusError::ReplyInvalidCommand2);
+ }
+ Ok(())
+ }
+}
+
+// ---------------------------------------------------------------------------
+// PrepareHeader - primary -> replicas (replication)
+// ---------------------------------------------------------------------------
+
+/// Primary -> replicas: replicate this operation.
+#[repr(C)]
+#[derive(Debug, Clone, Copy, CheckedBitPattern, NoUninit)]
+pub struct PrepareHeader {
+ pub checksum: u128,
+ pub checksum_body: u128,
+ pub cluster: u128,
+ pub size: u32,
+ pub view: u32,
+ pub release: u32,
+ pub command: Command2,
+ pub replica: u8,
+ pub reserved_frame: [u8; 66],
+
+ pub client: u128,
+ pub parent: u128,
+ pub request_checksum: u128,
+ pub op: u64,
+ pub commit: u64,
+ pub timestamp: u64,
+ pub request: u64,
+ pub operation: Operation,
+ pub operation_padding: [u8; 7],
+ pub namespace: u64,
+ pub reserved: [u8; 32],
+}
+const _: () = {
+ assert!(size_of::<PrepareHeader>() == HEADER_SIZE);
+ assert!(
+ offset_of!(PrepareHeader, client)
+ == offset_of!(PrepareHeader, reserved_frame) + size_of::<[u8;
66]>()
+ );
+ assert!(offset_of!(PrepareHeader, reserved) + size_of::<[u8; 32]>() ==
HEADER_SIZE);
+};
+
+impl Default for PrepareHeader {
+ fn default() -> Self {
+ Self {
+ checksum: 0,
+ checksum_body: 0,
+ cluster: 0,
+ size: 0,
+ view: 0,
+ release: 0,
+ command: Command2::Reserved,
+ replica: 0,
+ reserved_frame: [0; 66],
+ client: 0,
+ parent: 0,
+ request_checksum: 0,
+ op: 0,
+ commit: 0,
+ timestamp: 0,
+ request: 0,
+ operation: Operation::Reserved,
+ operation_padding: [0; 7],
+ namespace: 0,
+ reserved: [0; 32],
+ }
+ }
+}
+
+impl ConsensusHeader for PrepareHeader {
+ const COMMAND: Command2 = Command2::Prepare;
+ fn operation(&self) -> Operation {
+ self.operation
+ }
+ fn command(&self) -> Command2 {
+ self.command
+ }
+ fn size(&self) -> u32 {
+ self.size
+ }
+
+ fn validate(&self) -> Result<(), ConsensusError> {
+ if self.command != Command2::Prepare {
+ return Err(ConsensusError::InvalidCommand {
+ expected: Command2::Prepare,
+ found: self.command,
+ });
+ }
+ Ok(())
+ }
+}
+
+// ---------------------------------------------------------------------------
+// PrepareOkHeader - replica -> primary (acknowledgement)
+// ---------------------------------------------------------------------------
+
+/// Replica -> primary: acknowledge a Prepare.
+#[repr(C)]
+#[derive(Debug, Clone, Copy, CheckedBitPattern, NoUninit)]
+pub struct PrepareOkHeader {
+ pub checksum: u128,
+ pub checksum_body: u128,
+ pub cluster: u128,
+ pub size: u32,
+ pub view: u32,
+ pub release: u32,
+ pub command: Command2,
+ pub replica: u8,
+ pub reserved_frame: [u8; 66],
+
+ pub parent: u128,
+ pub prepare_checksum: u128,
+ pub op: u64,
+ pub commit: u64,
+ pub timestamp: u64,
+ pub request: u64,
+ pub operation: Operation,
+ pub operation_padding: [u8; 7],
+ pub namespace: u64,
+ pub reserved: [u8; 48],
+}
+const _: () = {
+ assert!(size_of::<PrepareOkHeader>() == HEADER_SIZE);
+ assert!(
+ offset_of!(PrepareOkHeader, parent)
+ == offset_of!(PrepareOkHeader, reserved_frame) + size_of::<[u8;
66]>()
+ );
+ assert!(offset_of!(PrepareOkHeader, reserved) + size_of::<[u8; 48]>() ==
HEADER_SIZE);
+};
+
+impl Default for PrepareOkHeader {
+ fn default() -> Self {
+ Self {
+ checksum: 0,
+ checksum_body: 0,
+ cluster: 0,
+ size: 0,
+ view: 0,
+ release: 0,
+ command: Command2::Reserved,
+ replica: 0,
+ reserved_frame: [0; 66],
+ parent: 0,
+ prepare_checksum: 0,
+ op: 0,
+ commit: 0,
+ timestamp: 0,
+ request: 0,
+ operation: Operation::Reserved,
+ operation_padding: [0; 7],
+ namespace: 0,
+ reserved: [0; 48],
+ }
+ }
+}
+
+impl ConsensusHeader for PrepareOkHeader {
+ const COMMAND: Command2 = Command2::PrepareOk;
+ fn operation(&self) -> Operation {
+ self.operation
+ }
+ fn command(&self) -> Command2 {
+ self.command
+ }
+ fn size(&self) -> u32 {
+ self.size
+ }
+
+ fn validate(&self) -> Result<(), ConsensusError> {
+ if self.command != Command2::PrepareOk {
+ return Err(ConsensusError::InvalidCommand {
+ expected: Command2::PrepareOk,
+ found: self.command,
+ });
+ }
+ Ok(())
+ }
+}
+
+// ---------------------------------------------------------------------------
+// CommitHeader - primary -> replicas (commit, header-only)
+// ---------------------------------------------------------------------------
+
+/// Primary -> replicas: commit up to this op. Header-only (no body).
+#[repr(C)]
+#[derive(Debug, Clone, Copy, CheckedBitPattern, NoUninit)]
+pub struct CommitHeader {
+ pub checksum: u128,
+ pub checksum_body: u128,
+ pub cluster: u128,
+ pub size: u32,
+ pub view: u32,
+ pub release: u32,
+ pub command: Command2,
+ pub replica: u8,
+ pub reserved_frame: [u8; 66],
+
+ pub commit_checksum: u128,
+ pub timestamp_monotonic: u64,
+ pub commit: u64,
+ pub checkpoint_op: u64,
+ pub namespace: u64,
+ pub reserved: [u8; 80],
+}
+const _: () = {
+ assert!(size_of::<CommitHeader>() == HEADER_SIZE);
+ assert!(
+ offset_of!(CommitHeader, commit_checksum)
+ == offset_of!(CommitHeader, reserved_frame) + size_of::<[u8; 66]>()
+ );
+ assert!(offset_of!(CommitHeader, reserved) + size_of::<[u8; 80]>() ==
HEADER_SIZE);
+};
+
+impl ConsensusHeader for CommitHeader {
+ const COMMAND: Command2 = Command2::Commit;
+ fn operation(&self) -> Operation {
+ Operation::Reserved
+ }
+ fn command(&self) -> Command2 {
+ self.command
+ }
+ fn size(&self) -> u32 {
+ self.size
+ }
+
+ fn validate(&self) -> Result<(), ConsensusError> {
+ if self.command != Command2::Commit {
+ return Err(ConsensusError::CommitInvalidCommand2);
+ }
+ if self.size != 256 {
+ return Err(ConsensusError::CommitInvalidSize(self.size));
+ }
+ Ok(())
+ }
+}
+
+// ---------------------------------------------------------------------------
+// StartViewChangeHeader - failure detection (header-only)
+// ---------------------------------------------------------------------------
+
+/// Replica suspects primary failure. Header-only.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, CheckedBitPattern, NoUninit)]
+#[repr(C)]
+pub struct StartViewChangeHeader {
+ pub checksum: u128,
+ pub checksum_body: u128,
+ pub cluster: u128,
+ pub size: u32,
+ pub view: u32,
+ pub release: u32,
+ pub command: Command2,
+ pub replica: u8,
+ pub reserved_frame: [u8; 66],
+
+ pub namespace: u64,
+ pub reserved: [u8; 120],
+}
+const _: () = {
+ assert!(size_of::<StartViewChangeHeader>() == HEADER_SIZE);
+ assert!(
+ offset_of!(StartViewChangeHeader, namespace)
+ == offset_of!(StartViewChangeHeader, reserved_frame) +
size_of::<[u8; 66]>()
+ );
+ assert!(offset_of!(StartViewChangeHeader, reserved) + size_of::<[u8;
120]>() == HEADER_SIZE);
+};
+
+impl ConsensusHeader for StartViewChangeHeader {
+ const COMMAND: Command2 = Command2::StartViewChange;
+ fn operation(&self) -> Operation {
+ Operation::Reserved
+ }
+ fn command(&self) -> Command2 {
+ self.command
+ }
+ fn size(&self) -> u32 {
+ self.size
+ }
+
+ fn validate(&self) -> Result<(), ConsensusError> {
+ if self.command != Command2::StartViewChange {
+ return Err(ConsensusError::InvalidCommand {
+ expected: Command2::StartViewChange,
+ found: self.command,
+ });
+ }
+ if self.release != 0 {
+ return Err(ConsensusError::InvalidField("release !=
0".to_string()));
+ }
+ Ok(())
+ }
+}
+
+// ---------------------------------------------------------------------------
+// DoViewChangeHeader - view change vote (header-only)
+// ---------------------------------------------------------------------------
+
+/// Replica -> primary candidate: vote for view change. Header-only.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, CheckedBitPattern, NoUninit)]
+#[repr(C)]
+pub struct DoViewChangeHeader {
+ pub checksum: u128,
+ pub checksum_body: u128,
+ pub cluster: u128,
+ pub size: u32,
+ pub view: u32,
+ pub release: u32,
+ pub command: Command2,
+ pub replica: u8,
+ pub reserved_frame: [u8; 66],
+
+ /// Highest op-number in this replica's log.
+ pub op: u64,
+ /// Highest committed op.
+ pub commit: u64,
+ pub namespace: u64,
+ /// View when status was last normal (key for log selection).
+ pub log_view: u32,
+ pub reserved: [u8; 100],
+}
+const _: () = {
+ assert!(size_of::<DoViewChangeHeader>() == HEADER_SIZE);
+ assert!(
+ offset_of!(DoViewChangeHeader, op)
+ == offset_of!(DoViewChangeHeader, reserved_frame) + size_of::<[u8;
66]>()
+ );
+ assert!(offset_of!(DoViewChangeHeader, reserved) + size_of::<[u8; 100]>()
== HEADER_SIZE);
+};
+
+impl ConsensusHeader for DoViewChangeHeader {
+ const COMMAND: Command2 = Command2::DoViewChange;
+ fn operation(&self) -> Operation {
+ Operation::Reserved
+ }
+ fn command(&self) -> Command2 {
+ self.command
+ }
+ fn size(&self) -> u32 {
+ self.size
+ }
+
+ fn validate(&self) -> Result<(), ConsensusError> {
+ if self.command != Command2::DoViewChange {
+ return Err(ConsensusError::InvalidCommand {
+ expected: Command2::DoViewChange,
+ found: self.command,
+ });
+ }
+ if self.release != 0 {
+ return Err(ConsensusError::InvalidField(
+ "release must be 0".to_string(),
+ ));
+ }
+ if self.log_view > self.view {
+ return Err(ConsensusError::InvalidField(
+ "log_view cannot exceed view".to_string(),
+ ));
+ }
+ if self.commit > self.op {
+ return Err(ConsensusError::InvalidField(
+ "commit cannot exceed op".to_string(),
+ ));
+ }
+ Ok(())
+ }
+}
+
+// ---------------------------------------------------------------------------
+// StartViewHeader - new view announcement (header-only)
+// ---------------------------------------------------------------------------
+
+/// New primary -> all replicas: start new view. Header-only.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, CheckedBitPattern, NoUninit)]
+#[repr(C)]
+pub struct StartViewHeader {
+ pub checksum: u128,
+ pub checksum_body: u128,
+ pub cluster: u128,
+ pub size: u32,
+ pub view: u32,
+ pub release: u32,
+ pub command: Command2,
+ pub replica: u8,
+ pub reserved_frame: [u8; 66],
+
+ /// Highest op in the new primary's log.
+ pub op: u64,
+ /// max(commit) from all DVCs.
+ pub commit: u64,
+ pub namespace: u64,
+ pub reserved: [u8; 104],
+}
+const _: () = {
+ assert!(size_of::<StartViewHeader>() == HEADER_SIZE);
+ assert!(
+ offset_of!(StartViewHeader, op)
+ == offset_of!(StartViewHeader, reserved_frame) + size_of::<[u8;
66]>()
+ );
+ assert!(offset_of!(StartViewHeader, reserved) + size_of::<[u8; 104]>() ==
HEADER_SIZE);
+};
+
+impl ConsensusHeader for StartViewHeader {
+ const COMMAND: Command2 = Command2::StartView;
+ fn operation(&self) -> Operation {
+ Operation::Reserved
+ }
+ fn command(&self) -> Command2 {
+ self.command
+ }
+ fn size(&self) -> u32 {
+ self.size
+ }
+
+ fn validate(&self) -> Result<(), ConsensusError> {
+ if self.command != Command2::StartView {
+ return Err(ConsensusError::InvalidCommand {
+ expected: Command2::StartView,
+ found: self.command,
+ });
+ }
+ if self.release != 0 {
+ return Err(ConsensusError::InvalidField(
+ "release must be 0".to_string(),
+ ));
+ }
+ if self.commit > self.op {
+ return Err(ConsensusError::InvalidField(
+ "commit cannot exceed op".to_string(),
+ ));
+ }
+ Ok(())
+ }
+}
+
+// ---------------------------------------------------------------------------
+// Tests
+// ---------------------------------------------------------------------------
+
+#[cfg(test)]
+mod tests {
+ use super::{
+ Command2, CommitHeader, ConsensusHeader, DoViewChangeHeader,
GenericHeader, PrepareHeader,
+ PrepareOkHeader, ReplyHeader, RequestHeader, StartViewChangeHeader,
StartViewHeader,
+ };
+
+ fn aligned_zeroed(size: usize) -> bytes::BytesMut {
+ bytes::BytesMut::zeroed(size)
+ }
+
+ #[test]
+ fn all_headers_are_256_bytes() {
+ assert_eq!(size_of::<GenericHeader>(), 256);
+ assert_eq!(size_of::<RequestHeader>(), 256);
+ assert_eq!(size_of::<ReplyHeader>(), 256);
+ assert_eq!(size_of::<PrepareHeader>(), 256);
+ assert_eq!(size_of::<PrepareOkHeader>(), 256);
+ assert_eq!(size_of::<CommitHeader>(), 256);
+ assert_eq!(size_of::<StartViewChangeHeader>(), 256);
+ assert_eq!(size_of::<DoViewChangeHeader>(), 256);
+ assert_eq!(size_of::<StartViewHeader>(), 256);
+ }
+
+ #[test]
+ fn generic_header_zero_copy() {
+ let buf = aligned_zeroed(256);
+ let header: &GenericHeader =
bytemuck::checked::try_from_bytes(&buf).unwrap();
+ assert_eq!(header.command, Command2::Reserved);
+ assert_eq!(header.size, 0);
+ }
+
+ #[test]
+ fn request_header_zero_copy() {
+ let mut buf = aligned_zeroed(256);
+ buf[60] = Command2::Request as u8;
+ let header: &RequestHeader =
bytemuck::checked::try_from_bytes(&buf).unwrap();
+ assert_eq!(header.command, Command2::Request);
+ assert!(header.validate().is_ok());
+ }
+
+ #[test]
+ fn request_header_wrong_command_fails_validation() {
+ let buf = aligned_zeroed(256);
+ let header: &RequestHeader =
bytemuck::checked::try_from_bytes(&buf).unwrap();
+ assert!(header.validate().is_err());
+ }
+
+ #[test]
+ fn reply_header_zero_copy() {
+ let mut buf = aligned_zeroed(256);
+ buf[60] = Command2::Reply as u8;
+ let header: &ReplyHeader =
bytemuck::checked::try_from_bytes(&buf).unwrap();
+ assert_eq!(header.command, Command2::Reply);
+ assert!(header.validate().is_ok());
+ }
+}
diff --git a/core/binary_protocol/src/consensus/message.rs
b/core/binary_protocol/src/consensus/message.rs
new file mode 100644
index 000000000..ee2ac4be3
--- /dev/null
+++ b/core/binary_protocol/src/consensus/message.rs
@@ -0,0 +1,338 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Zero-copy consensus message wrapper.
+//!
+//! `Message<H>` wraps a `Bytes` buffer and provides typed access to the
+//! header via `bytemuck` pointer cast (zero allocation, zero copy).
+
+use crate::consensus::{Command2, ConsensusError, ConsensusHeader,
GenericHeader};
+use bytes::Bytes;
+use std::marker::PhantomData;
+
+/// Zero-copy message wrapping a `Bytes` buffer with a typed header.
+///
+/// The header is accessed via `bytemuck::try_from_bytes` - a pointer cast,
+/// not a deserialization step. The body is the slice after the header,
+/// sized according to `header.size()`.
+#[repr(C)]
+#[derive(Debug, Clone)]
+pub struct Message<H: ConsensusHeader> {
+ buffer: Bytes,
+ _marker: PhantomData<H>,
+}
+
+impl<H: ConsensusHeader> Message<H> {
+ /// Create a message from a buffer, validating the header.
+ ///
+ /// # Errors
+ /// Returns `ConsensusError` if the buffer is too small or the header is
invalid.
+ pub fn from_bytes(buffer: Bytes) -> Result<Self, ConsensusError> {
+ if buffer.len() < size_of::<H>() {
+ return Err(ConsensusError::InvalidCommand {
+ expected: H::COMMAND,
+ found: Command2::Reserved,
+ });
+ }
+
+ // TODO: bytemuck::checked::try_from_bytes requires the buffer to be
aligned
+ // to the target type's alignment. Consensus headers contain u128
fields (16-byte
+ // alignment), but Bytes from Vec<u8> only guarantees 8-byte
alignment. The checked
+ // variant returns Err on misalignment (no UB), but production code
can fail on
+ // misaligned buffers. The original iggy_common code has the same
limitation.
+ // Fix by either using pod_read_unaligned (copies header) or ensuring
aligned
+ // allocation at the network receive layer.
+ let header_bytes = &buffer[..size_of::<H>()];
+ let header = bytemuck::checked::try_from_bytes::<H>(header_bytes)
+ .map_err(|_| ConsensusError::InvalidBitPattern)?;
+
+ header.validate()?;
+
+ let header_size = header.size() as usize;
+ if buffer.len() < header_size {
+ return Err(ConsensusError::InvalidCommand {
+ expected: H::COMMAND,
+ found: Command2::Reserved,
+ });
+ }
+
+ Ok(Self {
+ buffer,
+ _marker: PhantomData,
+ })
+ }
+
+ /// Create a zero-initialized message of the given size.
+ /// The caller must initialize the header fields.
+ ///
+ /// # Panics
+ /// Panics if `size` is smaller than the header size.
+ #[must_use]
+ pub fn new(size: usize) -> Self {
+ assert!(
+ size >= size_of::<H>(),
+ "size must be at least header size ({})",
+ size_of::<H>()
+ );
+ Self {
+ buffer: Bytes::from(vec![0u8; size]),
+ _marker: PhantomData,
+ }
+ }
+
+ /// Access the typed header (zero-copy pointer cast).
+ ///
+ /// Re-validates the bit pattern on each call because headers contain
+ /// `CheckedBitPattern` enums (`Command2`, `Operation`) which prevent
+ /// using the unchecked `bytemuck::try_from_bytes` (requires `Pod`).
+ /// The cost is a few enum discriminant range checks per access.
+ /// The buffer is immutable (`Bytes`) so validation cannot fail after
+ /// successful construction.
+ #[must_use]
+ #[allow(clippy::missing_panics_doc)]
+ #[inline]
+ pub fn header(&self) -> &H {
+ let header_bytes = &self.buffer[..size_of::<H>()];
+ bytemuck::checked::try_from_bytes(header_bytes)
+ .expect("header validated at construction time")
+ }
+
+ /// Message body (everything after the header).
+ #[must_use]
+ #[inline]
+ pub fn body(&self) -> &[u8] {
+ let header_size = size_of::<H>();
+ let total_size = self.header().size() as usize;
+ if total_size > header_size {
+ &self.buffer[header_size..total_size]
+ } else {
+ &[]
+ }
+ }
+
+ /// Message body as zero-copy [`Bytes`].
+ #[must_use]
+ #[inline]
+ pub fn body_bytes(&self) -> Bytes {
+ let header_size = size_of::<H>();
+ let total_size = self.header().size() as usize;
+ if total_size > header_size {
+ self.buffer.slice(header_size..total_size)
+ } else {
+ Bytes::new()
+ }
+ }
+
+ /// Complete message bytes (header + body).
+ #[must_use]
+ #[inline]
+ pub fn as_bytes(&self) -> &[u8] {
+ let total_size = self.header().size() as usize;
+ &self.buffer[..total_size]
+ }
+
+ /// Consume into the underlying buffer.
+ #[must_use]
+ #[inline]
+ pub fn into_inner(self) -> Bytes {
+ self.buffer
+ }
+
+ /// Convert to a generic message (type-erase the header).
+ #[must_use]
+ pub fn into_generic(self) -> Message<GenericHeader> {
+ // SAFETY: Message<H> and Message<GenericHeader> have identical layout
+ // (only PhantomData differs). GenericHeader accepts any command.
+ unsafe { Message::from_buffer_unchecked(self.buffer) }
+ }
+
+ /// View as a generic message without consuming.
+ #[must_use]
+ #[allow(clippy::missing_const_for_fn)]
+ pub fn as_generic(&self) -> &Message<GenericHeader> {
+ // SAFETY: #[repr(C)] guarantees field layout. Message<H> and
+ // Message<GenericHeader> differ only in PhantomData (ZST).
+ unsafe { &*std::ptr::from_ref(self).cast::<Message<GenericHeader>>() }
+ }
+
+ /// Try to reinterpret as a different header type.
+ ///
+ /// # Errors
+ /// Returns `ConsensusError` if the header command doesn't match or
validation fails.
+ pub fn try_into_typed<T: ConsensusHeader>(self) -> Result<Message<T>,
ConsensusError> {
+ if self.buffer.len() < size_of::<T>() {
+ return Err(ConsensusError::InvalidCommand {
+ expected: T::COMMAND,
+ found: Command2::Reserved,
+ });
+ }
+
+ let generic = self.as_generic();
+ if generic.header().command != T::COMMAND {
+ return Err(ConsensusError::InvalidCommand {
+ expected: T::COMMAND,
+ found: generic.header().command,
+ });
+ }
+
+ let header_bytes = &self.buffer[..size_of::<T>()];
+ let header = bytemuck::checked::try_from_bytes::<T>(header_bytes)
+ .map_err(|_| ConsensusError::InvalidBitPattern)?;
+ header.validate()?;
+
+ Ok(unsafe { Message::<T>::from_buffer_unchecked(self.buffer) })
+ }
+
+ /// Transmute the header to a different type, preserving the buffer.
+ ///
+ /// The callback receives the old header (by value) and a mutable reference
+ /// to the new (zeroed) header, allowing field-by-field initialization.
+ ///
+ /// # Panics
+ /// Panics if `size_of::<H>() != size_of::<T>()`.
+ #[must_use]
+ pub fn transmute_header<T: ConsensusHeader>(self, f: impl FnOnce(H, &mut
T)) -> Message<T> {
+ assert_eq!(size_of::<H>(), size_of::<T>());
+ let old_header = *self.header();
+ let buffer = self.into_inner();
+ // TODO: This mutates through Bytes via cast_mut(), which violates
aliasing rules.
+ // The original iggy_common code has the same pattern. Once
iggy_common is migrated
+ // to use iggy_binary_protocol types, fix both by using
Bytes::try_into_mut() ->
+ // mutate BytesMut -> freeze(). Tracked as part of the consensus
migration.
+ unsafe {
+ let ptr = buffer.as_ptr().cast_mut();
+ let slice = std::slice::from_raw_parts_mut(ptr, size_of::<T>());
+ slice.fill(0);
+ let new_header =
+ bytemuck::checked::try_from_bytes_mut(slice).expect("zeroed
bytes are valid");
+ f(old_header, new_header);
+ }
+ Message {
+ buffer,
+ _marker: PhantomData,
+ }
+ }
+
+ /// Create without validation. Private.
+ ///
+ /// # Safety
+ /// Buffer must already be validated for the target header type.
+ #[inline]
+ const unsafe fn from_buffer_unchecked(buffer: Bytes) -> Self {
+ Self {
+ buffer,
+ _marker: PhantomData,
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::Message;
+ use crate::consensus::{Command2, GenericHeader, RequestHeader};
+ use bytes::{Bytes, BytesMut};
+
+ #[test]
+ fn generic_message_from_zeroed_buffer() {
+ let mut buf = BytesMut::zeroed(384);
+ let header =
+ bytemuck::checked::try_from_bytes_mut::<GenericHeader>(&mut
buf[..256]).unwrap();
+ header.size = 384;
+ header.cluster = 42;
+
+ let msg = Message::<GenericHeader>::from_bytes(buf.freeze()).unwrap();
+ assert_eq!(msg.header().cluster, 42);
+ assert_eq!(msg.header().size, 384);
+ assert_eq!(msg.body().len(), 128);
+ }
+
+ #[test]
+ fn request_message_roundtrip() {
+ let mut buf = BytesMut::zeroed(320);
+ let header =
+ bytemuck::checked::try_from_bytes_mut::<RequestHeader>(&mut
buf[..256]).unwrap();
+ header.size = 320;
+ header.command = Command2::Request;
+ header.client = 12345;
+ header.operation = crate::consensus::Operation::CreateStream;
+
+ let msg = Message::<RequestHeader>::from_bytes(buf.freeze()).unwrap();
+ assert_eq!(msg.header().client, 12345);
+ assert_eq!(
+ msg.header().operation,
+ crate::consensus::Operation::CreateStream
+ );
+ assert_eq!(msg.body().len(), 64);
+ }
+
+ #[test]
+ fn too_small_buffer_rejected() {
+ let buf = Bytes::from(vec![0u8; 100]); // < 256
+ assert!(Message::<GenericHeader>::from_bytes(buf).is_err());
+ }
+
+ #[test]
+ fn into_generic_and_back() {
+ let mut buf = BytesMut::zeroed(256);
+ let header =
+ bytemuck::checked::try_from_bytes_mut::<RequestHeader>(&mut
buf[..256]).unwrap();
+ header.size = 256;
+ header.command = Command2::Request;
+
+ let msg = Message::<RequestHeader>::from_bytes(buf.freeze()).unwrap();
+ let generic = msg.into_generic();
+ assert_eq!(generic.header().command, Command2::Request);
+
+ let back = generic.try_into_typed::<RequestHeader>().unwrap();
+ assert_eq!(back.header().command, Command2::Request);
+ }
+
+ #[test]
+ fn transmute_header_generic_to_request() {
+ let mut buf = BytesMut::zeroed(320);
+ let header =
+ bytemuck::checked::try_from_bytes_mut::<GenericHeader>(&mut
buf[..256]).unwrap();
+ header.size = 320;
+ header.cluster = 99;
+
+ let generic =
Message::<GenericHeader>::from_bytes(buf.freeze()).unwrap();
+ let request = generic.transmute_header(|old, new: &mut RequestHeader| {
+ new.size = old.size;
+ new.command = Command2::Request;
+ new.cluster = old.cluster;
+ new.client = 42;
+ new.operation = crate::consensus::Operation::CreateStream;
+ });
+ assert_eq!(request.header().command, Command2::Request);
+ assert_eq!(request.header().client, 42);
+ assert_eq!(request.header().cluster, 99);
+ assert_eq!(request.header().size, 320);
+ assert_eq!(request.body().len(), 64);
+ }
+
+ #[test]
+ fn wrong_type_conversion_fails() {
+ let mut buf = BytesMut::zeroed(256);
+ let header =
+ bytemuck::checked::try_from_bytes_mut::<GenericHeader>(&mut
buf[..256]).unwrap();
+ header.size = 256;
+ header.command = Command2::Reserved; // not Request
+
+ let msg = Message::<GenericHeader>::from_bytes(buf.freeze()).unwrap();
+ assert!(msg.try_into_typed::<RequestHeader>().is_err());
+ }
+}
diff --git a/core/binary_protocol/src/consensus/mod.rs
b/core/binary_protocol/src/consensus/mod.rs
new file mode 100644
index 000000000..11bead1ef
--- /dev/null
+++ b/core/binary_protocol/src/consensus/mod.rs
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! VSR (Viewstamped Replication) consensus wire types.
+//!
+//! All headers are exactly 256 bytes with `#[repr(C)]` layout. Size is
+//! enforced at compile time. Deserialization is a pointer cast (zero-copy)
+//! via `bytemuck::try_from_bytes`.
+//!
+//! ## Client-facing
+//! - [`RequestHeader`] - client -> primary
+//! - [`ReplyHeader`] - primary -> client
+//! - [`GenericHeader`] - type-erased envelope for dispatch
+//!
+//! ## Replication (server-to-server)
+//! - [`PrepareHeader`] - primary -> replicas (replicate operation)
+//! - [`PrepareOkHeader`] - replica -> primary (acknowledge)
+//! - [`CommitHeader`] - primary -> replicas (commit, header-only)
+//!
+//! ## View change (server-to-server)
+//! - [`StartViewChangeHeader`] - replica suspects primary failure
(header-only)
+//! - [`DoViewChangeHeader`] - replica -> primary candidate (header-only)
+//! - [`StartViewHeader`] - new primary -> all replicas (header-only)
+//!
+//! ## Message wrapper
+//! - [`message::Message`] - zero-copy `Bytes` wrapper with typed header access
+
+mod command;
+mod error;
+mod header;
+pub mod message;
+mod operation;
+
+pub use command::Command2;
+pub use error::ConsensusError;
+pub use header::{
+ CommitHeader, ConsensusHeader, DoViewChangeHeader, GenericHeader,
HEADER_SIZE, PrepareHeader,
+ PrepareOkHeader, ReplyHeader, RequestHeader, StartViewChangeHeader,
StartViewHeader,
+};
+pub use operation::Operation;
diff --git a/core/binary_protocol/src/consensus/operation.rs
b/core/binary_protocol/src/consensus/operation.rs
new file mode 100644
index 000000000..8bf2b5016
--- /dev/null
+++ b/core/binary_protocol/src/consensus/operation.rs
@@ -0,0 +1,221 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytemuck::{CheckedBitPattern, NoUninit};
+
+/// Replicated operation discriminant. Identifies the state machine operation
+/// carried in a consensus message body.
+#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, NoUninit,
CheckedBitPattern)]
+#[repr(u8)]
+pub enum Operation {
+ #[default]
+ Reserved = 0,
+
+ // Metadata operations (shard 0)
+ CreateStream = 128,
+ UpdateStream = 129,
+ DeleteStream = 130,
+ PurgeStream = 131,
+ CreateTopic = 132,
+ UpdateTopic = 133,
+ DeleteTopic = 134,
+ PurgeTopic = 135,
+ CreatePartitions = 136,
+ DeletePartitions = 137,
+ // TODO: DeleteSegments is a partition operation (is_partition() == true)
but its
+ // discriminant sits in the metadata range (128-147). Should be moved to
162 once
+ // iggy_common's Operation enum is removed and wire compat is no longer a
concern.
+ DeleteSegments = 138,
+ CreateConsumerGroup = 139,
+ DeleteConsumerGroup = 140,
+ CreateUser = 141,
+ UpdateUser = 142,
+ DeleteUser = 143,
+ ChangePassword = 144,
+ UpdatePermissions = 145,
+ CreatePersonalAccessToken = 146,
+ DeletePersonalAccessToken = 147,
+
+ // Partition operations (routed by namespace)
+ SendMessages = 160,
+ StoreConsumerOffset = 161,
+}
+
+impl Operation {
+ /// Metadata / control-plane operations handled by shard 0.
+ #[must_use]
+ #[inline]
+ pub const fn is_metadata(&self) -> bool {
+ matches!(
+ self,
+ Self::CreateStream
+ | Self::UpdateStream
+ | Self::DeleteStream
+ | Self::PurgeStream
+ | Self::CreateTopic
+ | Self::UpdateTopic
+ | Self::DeleteTopic
+ | Self::PurgeTopic
+ | Self::CreatePartitions
+ | Self::DeletePartitions
+ | Self::CreateConsumerGroup
+ | Self::DeleteConsumerGroup
+ | Self::CreateUser
+ | Self::UpdateUser
+ | Self::DeleteUser
+ | Self::ChangePassword
+ | Self::UpdatePermissions
+ | Self::CreatePersonalAccessToken
+ | Self::DeletePersonalAccessToken
+ )
+ }
+
+ /// Data-plane operations routed to the shard owning the partition.
+ #[must_use]
+ #[inline]
+ pub const fn is_partition(&self) -> bool {
+ matches!(
+ self,
+ Self::SendMessages | Self::StoreConsumerOffset |
Self::DeleteSegments
+ )
+ }
+
+ /// Bidirectional mapping: `Operation` -> client command code.
+ #[must_use]
+ pub const fn to_command_code(&self) -> Option<u32> {
+ use crate::codes;
+ match self {
+ Self::Reserved => None,
+ Self::CreateStream => Some(codes::CREATE_STREAM_CODE),
+ Self::UpdateStream => Some(codes::UPDATE_STREAM_CODE),
+ Self::DeleteStream => Some(codes::DELETE_STREAM_CODE),
+ Self::PurgeStream => Some(codes::PURGE_STREAM_CODE),
+ Self::CreateTopic => Some(codes::CREATE_TOPIC_CODE),
+ Self::UpdateTopic => Some(codes::UPDATE_TOPIC_CODE),
+ Self::DeleteTopic => Some(codes::DELETE_TOPIC_CODE),
+ Self::PurgeTopic => Some(codes::PURGE_TOPIC_CODE),
+ Self::CreatePartitions => Some(codes::CREATE_PARTITIONS_CODE),
+ Self::DeletePartitions => Some(codes::DELETE_PARTITIONS_CODE),
+ Self::DeleteSegments => Some(codes::DELETE_SEGMENTS_CODE),
+ Self::CreateConsumerGroup =>
Some(codes::CREATE_CONSUMER_GROUP_CODE),
+ Self::DeleteConsumerGroup =>
Some(codes::DELETE_CONSUMER_GROUP_CODE),
+ Self::CreateUser => Some(codes::CREATE_USER_CODE),
+ Self::UpdateUser => Some(codes::UPDATE_USER_CODE),
+ Self::DeleteUser => Some(codes::DELETE_USER_CODE),
+ Self::ChangePassword => Some(codes::CHANGE_PASSWORD_CODE),
+ Self::UpdatePermissions => Some(codes::UPDATE_PERMISSIONS_CODE),
+ Self::CreatePersonalAccessToken =>
Some(codes::CREATE_PERSONAL_ACCESS_TOKEN_CODE),
+ Self::DeletePersonalAccessToken =>
Some(codes::DELETE_PERSONAL_ACCESS_TOKEN_CODE),
+ Self::SendMessages => Some(codes::SEND_MESSAGES_CODE),
+ Self::StoreConsumerOffset =>
Some(codes::STORE_CONSUMER_OFFSET_CODE),
+ }
+ }
+
+ /// Bidirectional mapping: client command code -> `Operation`.
+ #[must_use]
+ pub const fn from_command_code(code: u32) -> Option<Self> {
+ use crate::codes;
+ match code {
+ codes::CREATE_STREAM_CODE => Some(Self::CreateStream),
+ codes::UPDATE_STREAM_CODE => Some(Self::UpdateStream),
+ codes::DELETE_STREAM_CODE => Some(Self::DeleteStream),
+ codes::PURGE_STREAM_CODE => Some(Self::PurgeStream),
+ codes::CREATE_TOPIC_CODE => Some(Self::CreateTopic),
+ codes::UPDATE_TOPIC_CODE => Some(Self::UpdateTopic),
+ codes::DELETE_TOPIC_CODE => Some(Self::DeleteTopic),
+ codes::PURGE_TOPIC_CODE => Some(Self::PurgeTopic),
+ codes::CREATE_PARTITIONS_CODE => Some(Self::CreatePartitions),
+ codes::DELETE_PARTITIONS_CODE => Some(Self::DeletePartitions),
+ codes::DELETE_SEGMENTS_CODE => Some(Self::DeleteSegments),
+ codes::CREATE_CONSUMER_GROUP_CODE =>
Some(Self::CreateConsumerGroup),
+ codes::DELETE_CONSUMER_GROUP_CODE =>
Some(Self::DeleteConsumerGroup),
+ codes::CREATE_USER_CODE => Some(Self::CreateUser),
+ codes::UPDATE_USER_CODE => Some(Self::UpdateUser),
+ codes::DELETE_USER_CODE => Some(Self::DeleteUser),
+ codes::CHANGE_PASSWORD_CODE => Some(Self::ChangePassword),
+ codes::UPDATE_PERMISSIONS_CODE => Some(Self::UpdatePermissions),
+ codes::CREATE_PERSONAL_ACCESS_TOKEN_CODE =>
Some(Self::CreatePersonalAccessToken),
+ codes::DELETE_PERSONAL_ACCESS_TOKEN_CODE =>
Some(Self::DeletePersonalAccessToken),
+ codes::SEND_MESSAGES_CODE => Some(Self::SendMessages),
+ codes::STORE_CONSUMER_OFFSET_CODE =>
Some(Self::StoreConsumerOffset),
+ _ => None,
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::Operation;
+
+ #[test]
+ fn command_code_roundtrip() {
+ let ops = [
+ Operation::CreateStream,
+ Operation::UpdateStream,
+ Operation::DeleteStream,
+ Operation::PurgeStream,
+ Operation::CreateTopic,
+ Operation::UpdateTopic,
+ Operation::DeleteTopic,
+ Operation::PurgeTopic,
+ Operation::CreatePartitions,
+ Operation::DeletePartitions,
+ Operation::DeleteSegments,
+ Operation::CreateConsumerGroup,
+ Operation::DeleteConsumerGroup,
+ Operation::CreateUser,
+ Operation::UpdateUser,
+ Operation::DeleteUser,
+ Operation::ChangePassword,
+ Operation::UpdatePermissions,
+ Operation::CreatePersonalAccessToken,
+ Operation::DeletePersonalAccessToken,
+ Operation::SendMessages,
+ Operation::StoreConsumerOffset,
+ ];
+ for op in ops {
+ let code = op
+ .to_command_code()
+ .unwrap_or_else(|| panic!("Operation {op:?} has no command
code mapping"));
+ let back = Operation::from_command_code(code)
+ .unwrap_or_else(|| panic!("command code {code} has no
Operation mapping"));
+ assert_eq!(op, back, "roundtrip failed for {op:?} (code={code})");
+ }
+ }
+
+ #[test]
+ fn reserved_has_no_code() {
+ assert_eq!(Operation::Reserved.to_command_code(), None);
+ }
+
+ #[test]
+ fn read_only_commands_have_no_operation() {
+ assert!(Operation::from_command_code(crate::PING_CODE).is_none());
+ assert!(Operation::from_command_code(crate::GET_STATS_CODE).is_none());
+
assert!(Operation::from_command_code(crate::GET_STREAM_CODE).is_none());
+
assert!(Operation::from_command_code(crate::POLL_MESSAGES_CODE).is_none());
+ }
+
+ #[test]
+ fn metadata_vs_partition() {
+ assert!(Operation::CreateStream.is_metadata());
+ assert!(!Operation::CreateStream.is_partition());
+ assert!(Operation::SendMessages.is_partition());
+ assert!(!Operation::SendMessages.is_metadata());
+ assert!(Operation::DeleteSegments.is_partition());
+ }
+}
diff --git a/core/binary_protocol/src/error.rs
b/core/binary_protocol/src/error.rs
new file mode 100644
index 000000000..f4f53ea56
--- /dev/null
+++ b/core/binary_protocol/src/error.rs
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// Protocol-local error type for wire format encode/decode failures.
+///
+/// Intentionally decoupled from `IggyError` to keep the protocol crate
+/// free of domain dependencies. Conversion to `IggyError` happens at
+/// the boundary (SDK / server).
+#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
+pub enum WireError {
+ #[error("unexpected end of buffer at offset {offset}: need {need} bytes,
have {have}")]
+ UnexpectedEof {
+ offset: usize,
+ need: usize,
+ have: usize,
+ },
+
+ #[error("invalid utf-8 at offset {offset}")]
+ InvalidUtf8 { offset: usize },
+
+ #[error("unknown command code: {0}")]
+ UnknownCommand(u32),
+
+ #[error("unknown discriminant {value} for {type_name} at offset {offset}")]
+ UnknownDiscriminant {
+ type_name: &'static str,
+ value: u8,
+ offset: usize,
+ },
+
+ #[error("payload too large: {size} bytes, max {max}")]
+ PayloadTooLarge { size: usize, max: usize },
+
+ #[error("validation failed: {0}")]
+ Validation(String),
+}
diff --git a/core/binary_protocol/src/frame.rs
b/core/binary_protocol/src/frame.rs
new file mode 100644
index 000000000..b17d0d3f9
--- /dev/null
+++ b/core/binary_protocol/src/frame.rs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// Request frame: `[length:4 LE][code:4 LE][payload:N]`
+/// `length` = size of code + payload = 4 + N
+pub const REQUEST_HEADER_SIZE: usize = 4;
+
+/// Response frame: `[status:4 LE][length:4 LE][payload:N]`
+pub const RESPONSE_HEADER_SIZE: usize = 8;
+
+/// Status code for a successful response.
+pub const STATUS_OK: u32 = 0;
diff --git a/core/binary_protocol/src/identifier.rs
b/core/binary_protocol/src/identifier.rs
new file mode 100644
index 000000000..79c5c03d7
--- /dev/null
+++ b/core/binary_protocol/src/identifier.rs
@@ -0,0 +1,408 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::codec::{WireDecode, WireEncode, read_bytes, read_str, read_u8};
+use bytes::{BufMut, BytesMut};
+use std::ops::Deref;
+
+// ---------------------------------------------------------------------------
+// WireName
+// ---------------------------------------------------------------------------
+
+/// Maximum byte length for a wire name (fits in a u8 length prefix).
+pub const MAX_WIRE_NAME_LENGTH: usize = 255;
+
+/// Validated name type used by protocol request/response types.
+///
+/// Guarantees the inner string is 1-255 bytes, matching the u8
+/// length prefix used on the wire.
+#[derive(Clone, PartialEq, Eq)]
+pub struct WireName(String);
+
+impl WireName {
+ /// Create a new `WireName`, validating the length is 1-255 bytes.
+ ///
+ /// # Errors
+ /// Returns `WireError::Validation` if the name is empty or exceeds 255
bytes.
+ pub fn new(s: impl Into<String>) -> Result<Self, WireError> {
+ let s = s.into();
+ if s.is_empty() || s.len() > MAX_WIRE_NAME_LENGTH {
+ return Err(WireError::Validation(format!(
+ "wire name must be 1-{MAX_WIRE_NAME_LENGTH} bytes, got {}",
+ s.len()
+ )));
+ }
+ Ok(Self(s))
+ }
+
+ #[must_use]
+ pub fn as_str(&self) -> &str {
+ &self.0
+ }
+
+ #[must_use]
+ pub const fn len(&self) -> usize {
+ self.0.len()
+ }
+
+ /// Always returns `false` - `WireName` is guaranteed non-empty by
construction.
+ #[must_use]
+ pub const fn is_empty(&self) -> bool {
+ false
+ }
+}
+
+impl Deref for WireName {
+ type Target = str;
+
+ fn deref(&self) -> &str {
+ &self.0
+ }
+}
+
+impl std::fmt::Display for WireName {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.write_str(&self.0)
+ }
+}
+
+impl std::fmt::Debug for WireName {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "WireName({:?})", self.0)
+ }
+}
+
+impl WireEncode for WireName {
+ fn encoded_size(&self) -> usize {
+ 1 + self.0.len()
+ }
+
+ fn encode(&self, buf: &mut BytesMut) {
+ // Length guaranteed <= 255 by construction, truncation impossible.
+ #[allow(clippy::cast_possible_truncation)]
+ buf.put_u8(self.0.len() as u8);
+ buf.put_slice(self.0.as_bytes());
+ }
+}
+
+impl WireDecode for WireName {
+ fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+ let name_len = read_u8(buf, 0)? as usize;
+ if name_len == 0 || name_len > MAX_WIRE_NAME_LENGTH {
+ return Err(WireError::Validation(format!(
+ "wire name must be 1-{MAX_WIRE_NAME_LENGTH} bytes, got
{name_len}"
+ )));
+ }
+ let name = read_str(buf, 1, name_len)?;
+ Ok((Self(name), 1 + name_len))
+ }
+}
+
+// ---------------------------------------------------------------------------
+// WireIdentifier
+// ---------------------------------------------------------------------------
+
+const KIND_NUMERIC: u8 = 1;
+const KIND_STRING: u8 = 2;
+const NUMERIC_VALUE_LEN: u8 = 4;
+
+/// Protocol-owned identifier type. Polymorphic: numeric (u32) or string.
+///
+/// Wire format: `[kind:1][length:1][value:N]`
+/// - Numeric: kind=1, length=4, value=u32 LE
+/// - String: kind=2, length=1..255, value=UTF-8 bytes
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum WireIdentifier {
+ Numeric(u32),
+ String(WireName),
+}
+
+impl WireIdentifier {
+ #[must_use]
+ pub const fn numeric(id: u32) -> Self {
+ Self::Numeric(id)
+ }
+
+ /// # Errors
+ /// Returns `WireError::Validation` if the name is empty or exceeds 255
bytes.
+ pub fn named(name: impl Into<String>) -> Result<Self, WireError> {
+ let wire_name = WireName::new(name)?;
+ Ok(Self::String(wire_name))
+ }
+
+ #[must_use]
+ pub const fn as_u32(&self) -> Option<u32> {
+ match self {
+ Self::Numeric(id) => Some(*id),
+ Self::String(_) => None,
+ }
+ }
+
+ #[must_use]
+ pub fn as_str(&self) -> Option<&str> {
+ match self {
+ Self::Numeric(_) => None,
+ Self::String(s) => Some(s.as_str()),
+ }
+ }
+
+ const fn kind_code(&self) -> u8 {
+ match self {
+ Self::Numeric(_) => KIND_NUMERIC,
+ Self::String(_) => KIND_STRING,
+ }
+ }
+
+ #[allow(clippy::cast_possible_truncation)]
+ const fn value_len(&self) -> u8 {
+ match self {
+ Self::Numeric(_) => NUMERIC_VALUE_LEN,
+ // WireName guarantees len <= 255, truncation impossible.
+ Self::String(s) => s.len() as u8,
+ }
+ }
+}
+
+impl WireEncode for WireIdentifier {
+ fn encoded_size(&self) -> usize {
+ 2 + self.value_len() as usize
+ }
+
+ fn encode(&self, buf: &mut BytesMut) {
+ buf.put_u8(self.kind_code());
+ buf.put_u8(self.value_len());
+ match self {
+ Self::Numeric(id) => buf.put_u32_le(*id),
+ Self::String(s) => buf.put_slice(s.as_bytes()),
+ }
+ }
+}
+
+impl WireDecode for WireIdentifier {
+ fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+ let kind = read_u8(buf, 0)?;
+ let length = read_u8(buf, 1)? as usize;
+ let value = read_bytes(buf, 2, length)?;
+ let consumed = 2 + length;
+
+ match kind {
+ KIND_NUMERIC => {
+ if length != NUMERIC_VALUE_LEN as usize {
+ return Err(WireError::Validation(format!(
+ "numeric identifier must be {NUMERIC_VALUE_LEN} bytes,
got {length}"
+ )));
+ }
+ let id = u32::from_le_bytes(
+ value
+ .try_into()
+ .expect("length already validated as 4 bytes"),
+ );
+ Ok((Self::Numeric(id), consumed))
+ }
+ KIND_STRING => {
+ if length == 0 {
+ return Err(WireError::Validation(
+ "string identifier cannot be empty".to_string(),
+ ));
+ }
+ let s =
+ std::str::from_utf8(value).map_err(|_|
WireError::InvalidUtf8 { offset: 2 })?;
+ let wire_name = WireName::new(s)?;
+ Ok((Self::String(wire_name), consumed))
+ }
+ _ => Err(WireError::UnknownDiscriminant {
+ type_name: "WireIdentifier",
+ value: kind,
+ offset: 0,
+ }),
+ }
+ }
+}
+
+impl std::fmt::Display for WireIdentifier {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ Self::Numeric(id) => write!(f, "{id}"),
+ Self::String(s) => write!(f, "{s}"),
+ }
+ }
+}
+
+// ---------------------------------------------------------------------------
+// Tests
+// ---------------------------------------------------------------------------
+
+#[cfg(test)]
+mod tests {
+ use super::{KIND_NUMERIC, KIND_STRING, WireIdentifier, WireName};
+ use crate::codec::{WireDecode, WireEncode};
+
+ // -- WireName tests --
+
+ #[test]
+ fn wire_name_valid() {
+ let name = WireName::new("test").unwrap();
+ assert_eq!(name.as_str(), "test");
+ assert_eq!(name.len(), 4);
+ assert_eq!(&*name, "test");
+ }
+
+ #[test]
+ fn wire_name_empty_rejected() {
+ assert!(WireName::new("").is_err());
+ }
+
+ #[test]
+ fn wire_name_too_long_rejected() {
+ assert!(WireName::new("a".repeat(256)).is_err());
+ }
+
+ #[test]
+ fn wire_name_max_length_accepted() {
+ assert!(WireName::new("a".repeat(255)).is_ok());
+ }
+
+ #[test]
+ fn wire_name_roundtrip() {
+ let name = WireName::new("my-stream").unwrap();
+ let bytes = name.to_bytes();
+ assert_eq!(bytes.len(), 1 + 9);
+ let (decoded, consumed) = WireName::decode(&bytes).unwrap();
+ assert_eq!(consumed, 10);
+ assert_eq!(decoded, name);
+ }
+
+ #[test]
+ fn wire_name_display() {
+ let name = WireName::new("hello").unwrap();
+ assert_eq!(format!("{name}"), "hello");
+ }
+
+ #[test]
+ fn wire_name_deref_to_str() {
+ let name = WireName::new("test").unwrap();
+ assert!(name.starts_with("te"));
+ }
+
+ #[test]
+ fn wire_name_non_ascii_utf8_roundtrip() {
+ let name = WireName::new("str\u{00e9}am-\u{00fc}ser").unwrap();
+ let bytes = name.to_bytes();
+ let (decoded, _) = WireName::decode(&bytes).unwrap();
+ assert_eq!(decoded, name);
+ }
+
+ // -- WireIdentifier tests --
+
+ #[test]
+ fn roundtrip_numeric() {
+ let id = WireIdentifier::numeric(42);
+ let bytes = id.to_bytes();
+ assert_eq!(bytes.len(), 6);
+ let (decoded, consumed) = WireIdentifier::decode(&bytes).unwrap();
+ assert_eq!(consumed, 6);
+ assert_eq!(decoded, id);
+ assert_eq!(decoded.as_u32(), Some(42));
+ }
+
+ #[test]
+ fn roundtrip_string() {
+ let id = WireIdentifier::named("my-stream").unwrap();
+ let bytes = id.to_bytes();
+ assert_eq!(bytes.len(), 2 + 9);
+ let (decoded, consumed) = WireIdentifier::decode(&bytes).unwrap();
+ assert_eq!(consumed, 11);
+ assert_eq!(decoded, id);
+ assert_eq!(decoded.as_str(), Some("my-stream"));
+ }
+
+ #[test]
+ fn empty_string_rejected() {
+ assert!(WireIdentifier::named("").is_err());
+ }
+
+ #[test]
+ fn max_length_string_accepted() {
+ let name = "a".repeat(255);
+ let id = WireIdentifier::named(&name).unwrap();
+ let bytes = id.to_bytes();
+ let (decoded, _) = WireIdentifier::decode(&bytes).unwrap();
+ assert_eq!(decoded.as_str(), Some(name.as_str()));
+ }
+
+ #[test]
+ fn too_long_string_rejected() {
+ assert!(WireIdentifier::named("a".repeat(256)).is_err());
+ }
+
+ #[test]
+ fn truncated_buffer_returns_error() {
+ let id = WireIdentifier::numeric(1);
+ let bytes = id.to_bytes();
+ for i in 0..bytes.len() {
+ assert!(
+ WireIdentifier::decode(&bytes[..i]).is_err(),
+ "expected error for truncation at byte {i}"
+ );
+ }
+ }
+
+ #[test]
+ fn unknown_kind_returns_error() {
+ let buf = [0xFF, 0x04, 0x01, 0x00, 0x00, 0x00];
+ assert!(WireIdentifier::decode(&buf).is_err());
+ }
+
+ #[test]
+ fn numeric_with_wrong_length_returns_error() {
+ let buf = [KIND_NUMERIC, 0x03, 0x01, 0x00, 0x00];
+ assert!(WireIdentifier::decode(&buf).is_err());
+ }
+
+ #[test]
+ fn string_with_zero_length_returns_error() {
+ let buf = [KIND_STRING, 0x00];
+ assert!(WireIdentifier::decode(&buf).is_err());
+ }
+
+ #[test]
+ fn display_numeric() {
+ let id = WireIdentifier::numeric(7);
+ assert_eq!(format!("{id}"), "7");
+ }
+
+ #[test]
+ fn display_string() {
+ let id = WireIdentifier::named("test").unwrap();
+ assert_eq!(format!("{id}"), "test");
+ }
+
+ #[test]
+ fn non_ascii_utf8_string_roundtrip() {
+ let id = WireIdentifier::named("caf\u{00e9}-latt\u{00e9}").unwrap();
+ let bytes = id.to_bytes();
+ let (decoded, _) = WireIdentifier::decode(&bytes).unwrap();
+ assert_eq!(decoded, id);
+ }
+
+ #[test]
+ fn invalid_utf8_bytes_rejected() {
+ let buf = [KIND_STRING, 0x02, 0xFF, 0xFE];
+ assert!(WireIdentifier::decode(&buf).is_err());
+ }
+}
diff --git a/core/binary_protocol/src/lib.rs b/core/binary_protocol/src/lib.rs
index 31bd66e6e..288d468a5 100644
--- a/core/binary_protocol/src/lib.rs
+++ b/core/binary_protocol/src/lib.rs
@@ -1,17 +1,58 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Wire protocol types and codec for the Iggy binary protocol.
+//!
+//! This crate is the single source of truth for the binary wire format
+//! shared between the Iggy server and SDK. It is a pure codec - no I/O,
+//! no async, no runtime dependencies.
+//!
+//! # Design
+//!
+//! Protocol types are independent from domain types (`iggy_common`).
+//! Conversion between wire types and domain types happens at the boundary
+//! (SDK client impls, server handlers).
+//!
+//! # Wire frame format
+//!
+//! **Request** (client -> server):
+//! ```text
+//! [length:4 bytes, u32 LE][code:4 bytes, u32 LE][payload:N bytes]
+//! ```
+//!
+//! **Response** (server -> client):
+//! ```text
+//! [status:4 bytes, u32 LE][length:4 bytes, u32 LE][payload:N bytes]
+//! ```
+//!
+//! All multi-byte integers are little-endian. Strings are length-prefixed
+//! (u8 length for names, u32 length for longer strings).
+
+pub mod codec;
+pub mod codes;
+pub mod consensus;
+pub mod error;
+pub mod frame;
+pub mod identifier;
+pub mod requests;
+pub mod responses;
+
+pub use codec::{WireDecode, WireEncode};
+pub use codes::*;
+pub use error::WireError;
+pub use frame::*;
+pub use identifier::{MAX_WIRE_NAME_LENGTH, WireIdentifier, WireName};
diff --git a/core/binary_protocol/src/requests/mod.rs
b/core/binary_protocol/src/requests/mod.rs
new file mode 100644
index 000000000..e7b687f1f
--- /dev/null
+++ b/core/binary_protocol/src/requests/mod.rs
@@ -0,0 +1,18 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub mod streams;
diff --git a/core/binary_protocol/src/requests/streams/create_stream.rs
b/core/binary_protocol/src/requests/streams/create_stream.rs
new file mode 100644
index 000000000..fb86fba94
--- /dev/null
+++ b/core/binary_protocol/src/requests/streams/create_stream.rs
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::codec::{WireDecode, WireEncode};
+use crate::identifier::WireName;
+use bytes::BytesMut;
+
+/// `CreateStream` request. Wire format: `[name_len:1][name:N]`
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct CreateStreamRequest {
+ pub name: WireName,
+}
+
+impl WireEncode for CreateStreamRequest {
+ fn encoded_size(&self) -> usize {
+ self.name.encoded_size()
+ }
+
+ fn encode(&self, buf: &mut BytesMut) {
+ self.name.encode(buf);
+ }
+}
+
+impl WireDecode for CreateStreamRequest {
+ fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+ let (name, consumed) = WireName::decode(buf)?;
+ Ok((Self { name }, consumed))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn roundtrip() {
+ let req = CreateStreamRequest {
+ name: WireName::new("test-stream").unwrap(),
+ };
+ let bytes = req.to_bytes();
+ assert_eq!(bytes.len(), 1 + 11);
+ let (decoded, consumed) = CreateStreamRequest::decode(&bytes).unwrap();
+ assert_eq!(consumed, bytes.len());
+ assert_eq!(decoded, req);
+ }
+
+ #[test]
+ fn empty_name_rejected() {
+ let buf = [0u8];
+ assert!(CreateStreamRequest::decode(&buf).is_err());
+ }
+
+ #[test]
+ fn truncated_returns_error() {
+ let req = CreateStreamRequest {
+ name: WireName::new("test").unwrap(),
+ };
+ let bytes = req.to_bytes();
+ for i in 0..bytes.len() {
+ assert!(
+ CreateStreamRequest::decode(&bytes[..i]).is_err(),
+ "expected error for truncation at byte {i}"
+ );
+ }
+ }
+
+ #[test]
+ fn wire_compat_byte_layout() {
+ let req = CreateStreamRequest {
+ name: WireName::new("test").unwrap(),
+ };
+ let bytes = req.to_bytes();
+ assert_eq!(&bytes[..], &[4, b't', b'e', b's', b't']);
+ }
+
+ #[test]
+ fn too_long_name_rejected_at_construction() {
+ let long = "a".repeat(256);
+ assert!(WireName::new(long).is_err());
+ }
+}
diff --git a/core/binary_protocol/src/requests/streams/delete_stream.rs
b/core/binary_protocol/src/requests/streams/delete_stream.rs
new file mode 100644
index 000000000..b30bfa882
--- /dev/null
+++ b/core/binary_protocol/src/requests/streams/delete_stream.rs
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::WireIdentifier;
+use crate::codec::{WireDecode, WireEncode};
+use bytes::BytesMut;
+
+/// `DeleteStream` request. Wire format: `[identifier]`
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct DeleteStreamRequest {
+ pub stream_id: WireIdentifier,
+}
+
+impl WireEncode for DeleteStreamRequest {
+ fn encoded_size(&self) -> usize {
+ self.stream_id.encoded_size()
+ }
+
+ fn encode(&self, buf: &mut BytesMut) {
+ self.stream_id.encode(buf);
+ }
+}
+
+impl WireDecode for DeleteStreamRequest {
+ fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+ let (stream_id, consumed) = WireIdentifier::decode(buf)?;
+ Ok((Self { stream_id }, consumed))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn roundtrip() {
+ let req = DeleteStreamRequest {
+ stream_id: WireIdentifier::numeric(5),
+ };
+ let bytes = req.to_bytes();
+ let (decoded, consumed) = DeleteStreamRequest::decode(&bytes).unwrap();
+ assert_eq!(consumed, bytes.len());
+ assert_eq!(decoded, req);
+ }
+}
diff --git a/core/binary_protocol/src/requests/streams/get_stream.rs
b/core/binary_protocol/src/requests/streams/get_stream.rs
new file mode 100644
index 000000000..e543667e4
--- /dev/null
+++ b/core/binary_protocol/src/requests/streams/get_stream.rs
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::WireIdentifier;
+use crate::codec::{WireDecode, WireEncode};
+use bytes::BytesMut;
+
+/// `GetStream` request. Wire format: `[identifier]`
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct GetStreamRequest {
+ pub stream_id: WireIdentifier,
+}
+
+impl WireEncode for GetStreamRequest {
+ fn encoded_size(&self) -> usize {
+ self.stream_id.encoded_size()
+ }
+
+ fn encode(&self, buf: &mut BytesMut) {
+ self.stream_id.encode(buf);
+ }
+}
+
+impl WireDecode for GetStreamRequest {
+ fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+ let (stream_id, consumed) = WireIdentifier::decode(buf)?;
+ Ok((Self { stream_id }, consumed))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn roundtrip_numeric() {
+ let req = GetStreamRequest {
+ stream_id: WireIdentifier::numeric(42),
+ };
+ let bytes = req.to_bytes();
+ let (decoded, consumed) = GetStreamRequest::decode(&bytes).unwrap();
+ assert_eq!(consumed, bytes.len());
+ assert_eq!(decoded, req);
+ }
+
+ #[test]
+ fn roundtrip_named() {
+ let req = GetStreamRequest {
+ stream_id: WireIdentifier::named("my-stream").unwrap(),
+ };
+ let bytes = req.to_bytes();
+ let (decoded, consumed) = GetStreamRequest::decode(&bytes).unwrap();
+ assert_eq!(consumed, bytes.len());
+ assert_eq!(decoded, req);
+ }
+
+ #[test]
+ fn wire_compat_numeric_byte_layout() {
+ let req = GetStreamRequest {
+ stream_id: WireIdentifier::numeric(1),
+ };
+ let bytes = req.to_bytes();
+ assert_eq!(&bytes[..], &[1, 4, 1, 0, 0, 0]);
+ }
+}
diff --git a/core/binary_protocol/src/requests/streams/get_streams.rs
b/core/binary_protocol/src/requests/streams/get_streams.rs
new file mode 100644
index 000000000..897e89888
--- /dev/null
+++ b/core/binary_protocol/src/requests/streams/get_streams.rs
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::codec::{WireDecode, WireEncode};
+use bytes::BytesMut;
+
+/// `GetStreams` request. Wire format: empty.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct GetStreamsRequest;
+
+impl WireEncode for GetStreamsRequest {
+ fn encoded_size(&self) -> usize {
+ 0
+ }
+
+ fn encode(&self, _buf: &mut BytesMut) {}
+}
+
+impl WireDecode for GetStreamsRequest {
+ fn decode(_buf: &[u8]) -> Result<(Self, usize), WireError> {
+ Ok((Self, 0))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn roundtrip() {
+ let req = GetStreamsRequest;
+ let bytes = req.to_bytes();
+ assert!(bytes.is_empty());
+ let (decoded, consumed) = GetStreamsRequest::decode(&bytes).unwrap();
+ assert_eq!(consumed, 0);
+ assert_eq!(decoded, req);
+ }
+}
diff --git a/core/binary_protocol/src/requests/streams/mod.rs
b/core/binary_protocol/src/requests/streams/mod.rs
new file mode 100644
index 000000000..8307a1d46
--- /dev/null
+++ b/core/binary_protocol/src/requests/streams/mod.rs
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub mod create_stream;
+pub mod delete_stream;
+pub mod get_stream;
+pub mod get_streams;
+pub mod purge_stream;
+pub mod update_stream;
+
+pub use create_stream::CreateStreamRequest;
+pub use delete_stream::DeleteStreamRequest;
+pub use get_stream::GetStreamRequest;
+pub use get_streams::GetStreamsRequest;
+pub use purge_stream::PurgeStreamRequest;
+pub use update_stream::UpdateStreamRequest;
diff --git a/core/binary_protocol/src/requests/streams/purge_stream.rs
b/core/binary_protocol/src/requests/streams/purge_stream.rs
new file mode 100644
index 000000000..bd8028b3f
--- /dev/null
+++ b/core/binary_protocol/src/requests/streams/purge_stream.rs
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::WireIdentifier;
+use crate::codec::{WireDecode, WireEncode};
+use bytes::BytesMut;
+
+/// `PurgeStream` request. Wire format: `[identifier]`
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct PurgeStreamRequest {
+ pub stream_id: WireIdentifier,
+}
+
+impl WireEncode for PurgeStreamRequest {
+ fn encoded_size(&self) -> usize {
+ self.stream_id.encoded_size()
+ }
+
+ fn encode(&self, buf: &mut BytesMut) {
+ self.stream_id.encode(buf);
+ }
+}
+
+impl WireDecode for PurgeStreamRequest {
+ fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+ let (stream_id, consumed) = WireIdentifier::decode(buf)?;
+ Ok((Self { stream_id }, consumed))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn roundtrip() {
+ let req = PurgeStreamRequest {
+ stream_id: WireIdentifier::numeric(1),
+ };
+ let bytes = req.to_bytes();
+ let (decoded, consumed) = PurgeStreamRequest::decode(&bytes).unwrap();
+ assert_eq!(consumed, bytes.len());
+ assert_eq!(decoded, req);
+ }
+}
diff --git a/core/binary_protocol/src/requests/streams/update_stream.rs
b/core/binary_protocol/src/requests/streams/update_stream.rs
new file mode 100644
index 000000000..a8085f099
--- /dev/null
+++ b/core/binary_protocol/src/requests/streams/update_stream.rs
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::WireIdentifier;
+use crate::codec::{WireDecode, WireEncode};
+use crate::identifier::WireName;
+use bytes::BytesMut;
+
+/// `UpdateStream` request. Wire format: `[identifier][name_len:1][name:N]`
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct UpdateStreamRequest {
+ pub stream_id: WireIdentifier,
+ pub name: WireName,
+}
+
+impl WireEncode for UpdateStreamRequest {
+ fn encoded_size(&self) -> usize {
+ self.stream_id.encoded_size() + self.name.encoded_size()
+ }
+
+ fn encode(&self, buf: &mut BytesMut) {
+ self.stream_id.encode(buf);
+ self.name.encode(buf);
+ }
+}
+
+impl WireDecode for UpdateStreamRequest {
+ fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+ let (stream_id, mut pos) = WireIdentifier::decode(buf)?;
+ let (name, consumed) = WireName::decode(&buf[pos..])?;
+ pos += consumed;
+ Ok((Self { stream_id, name }, pos))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn roundtrip() {
+ let req = UpdateStreamRequest {
+ stream_id: WireIdentifier::named("old-name").unwrap(),
+ name: WireName::new("new-name").unwrap(),
+ };
+ let bytes = req.to_bytes();
+ let (decoded, consumed) = UpdateStreamRequest::decode(&bytes).unwrap();
+ assert_eq!(consumed, bytes.len());
+ assert_eq!(decoded, req);
+ }
+
+ #[test]
+ fn truncated_returns_error() {
+ let req = UpdateStreamRequest {
+ stream_id: WireIdentifier::numeric(1),
+ name: WireName::new("test").unwrap(),
+ };
+ let bytes = req.to_bytes();
+ for i in 0..bytes.len() {
+ assert!(
+ UpdateStreamRequest::decode(&bytes[..i]).is_err(),
+ "expected error for truncation at byte {i}"
+ );
+ }
+ }
+}
diff --git a/core/binary_protocol/src/responses/mod.rs
b/core/binary_protocol/src/responses/mod.rs
new file mode 100644
index 000000000..7b062a665
--- /dev/null
+++ b/core/binary_protocol/src/responses/mod.rs
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub mod streams;
+
+use crate::WireError;
+use crate::codec::{WireDecode, WireEncode};
+use bytes::BytesMut;
+
+/// Marker type for commands that return an empty response payload.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct EmptyResponse;
+
+impl WireEncode for EmptyResponse {
+ fn encoded_size(&self) -> usize {
+ 0
+ }
+
+ fn encode(&self, _buf: &mut BytesMut) {}
+}
+
+impl WireDecode for EmptyResponse {
+ fn decode(_buf: &[u8]) -> Result<(Self, usize), WireError> {
+ Ok((Self, 0))
+ }
+}
diff --git a/core/binary_protocol/src/responses/streams/create_stream.rs
b/core/binary_protocol/src/responses/streams/create_stream.rs
new file mode 100644
index 000000000..7b23fa9ce
--- /dev/null
+++ b/core/binary_protocol/src/responses/streams/create_stream.rs
@@ -0,0 +1,22 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// `CreateStream` response is a [`StreamResponse`](super::StreamResponse)
(stream header).
+/// A newly created stream has zero topics, zero size, zero messages.
+///
+/// Re-exported as a type alias for clarity at call sites.
+pub type CreateStreamResponse = super::StreamResponse;
diff --git a/core/binary_protocol/src/responses/streams/delete_stream.rs
b/core/binary_protocol/src/responses/streams/delete_stream.rs
new file mode 100644
index 000000000..35903ae87
--- /dev/null
+++ b/core/binary_protocol/src/responses/streams/delete_stream.rs
@@ -0,0 +1,19 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// `DeleteStream` response is empty.
+pub type DeleteStreamResponse = super::EmptyResponse;
diff --git a/core/binary_protocol/src/responses/streams/get_stream.rs
b/core/binary_protocol/src/responses/streams/get_stream.rs
new file mode 100644
index 000000000..936690eaf
--- /dev/null
+++ b/core/binary_protocol/src/responses/streams/get_stream.rs
@@ -0,0 +1,232 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::codec::{WireDecode, WireEncode, read_u8, read_u32_le, read_u64_le};
+use crate::identifier::WireName;
+use crate::responses::streams::StreamResponse;
+use bytes::{BufMut, BytesMut};
+
+/// Topic header within a `GetStream` response.
+///
+/// Wire format (51 + `name_len` bytes):
+/// ```text
+/// [id:4][created_at:8][partitions_count:4][message_expiry:8]
+/// [compression_algorithm:1][max_topic_size:8][replication_factor:1]
+/// [size_bytes:8][messages_count:8][name_len:1][name:N]
+/// ```
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct TopicHeader {
+ pub id: u32,
+ pub created_at: u64,
+ pub partitions_count: u32,
+ pub message_expiry: u64,
+ pub compression_algorithm: u8,
+ pub max_topic_size: u64,
+ pub replication_factor: u8,
+ pub size_bytes: u64,
+ pub messages_count: u64,
+ pub name: WireName,
+}
+
+impl TopicHeader {
+ const FIXED_SIZE: usize = 4 + 8 + 4 + 8 + 1 + 8 + 1 + 8 + 8 + 1; // 51
+}
+
+impl WireEncode for TopicHeader {
+ fn encoded_size(&self) -> usize {
+ Self::FIXED_SIZE + self.name.len()
+ }
+
+ fn encode(&self, buf: &mut BytesMut) {
+ buf.put_u32_le(self.id);
+ buf.put_u64_le(self.created_at);
+ buf.put_u32_le(self.partitions_count);
+ buf.put_u64_le(self.message_expiry);
+ buf.put_u8(self.compression_algorithm);
+ buf.put_u64_le(self.max_topic_size);
+ buf.put_u8(self.replication_factor);
+ buf.put_u64_le(self.size_bytes);
+ buf.put_u64_le(self.messages_count);
+ self.name.encode(buf);
+ }
+}
+
+impl WireDecode for TopicHeader {
+ fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+ let id = read_u32_le(buf, 0)?;
+ let created_at = read_u64_le(buf, 4)?;
+ let partitions_count = read_u32_le(buf, 12)?;
+ let message_expiry = read_u64_le(buf, 16)?;
+ let compression_algorithm = read_u8(buf, 24)?;
+ let max_topic_size = read_u64_le(buf, 25)?;
+ let replication_factor = read_u8(buf, 33)?;
+ let size_bytes = read_u64_le(buf, 34)?;
+ let messages_count = read_u64_le(buf, 42)?;
+ let (name, name_consumed) = WireName::decode(&buf[50..])?;
+ let consumed = 50 + name_consumed;
+
+ Ok((
+ Self {
+ id,
+ created_at,
+ partitions_count,
+ message_expiry,
+ compression_algorithm,
+ max_topic_size,
+ replication_factor,
+ size_bytes,
+ messages_count,
+ name,
+ },
+ consumed,
+ ))
+ }
+}
+
+/// `GetStream` response: stream header followed by topic headers.
+///
+/// Wire format:
+/// ```text
+/// [StreamResponse][TopicHeader]*
+/// ```
+///
+/// The number of topics is determined by `stream.topics_count` or by
+/// consuming remaining bytes (topics are packed sequentially).
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct GetStreamResponse {
+ pub stream: StreamResponse,
+ pub topics: Vec<TopicHeader>,
+}
+
+impl WireEncode for GetStreamResponse {
+ fn encoded_size(&self) -> usize {
+ self.stream.encoded_size()
+ + self
+ .topics
+ .iter()
+ .map(WireEncode::encoded_size)
+ .sum::<usize>()
+ }
+
+ fn encode(&self, buf: &mut BytesMut) {
+ self.stream.encode(buf);
+ for topic in &self.topics {
+ topic.encode(buf);
+ }
+ }
+}
+
+impl WireDecode for GetStreamResponse {
+ fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+ let (stream, mut pos) = StreamResponse::decode(buf)?;
+ let mut topics = Vec::new();
+ while pos < buf.len() {
+ let (topic, consumed) = TopicHeader::decode(&buf[pos..])?;
+ pos += consumed;
+ topics.push(topic);
+ }
+ if topics.len() != stream.topics_count as usize {
+ return Err(WireError::Validation(format!(
+ "stream.topics_count={} but decoded {} topics",
+ stream.topics_count,
+ topics.len()
+ )));
+ }
+ Ok((Self { stream, topics }, pos))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ fn sample_stream() -> StreamResponse {
+ StreamResponse {
+ id: 1,
+ created_at: 1_710_000_000_000,
+ topics_count: 2,
+ size_bytes: 2048,
+ messages_count: 200,
+ name: WireName::new("my-stream").unwrap(),
+ }
+ }
+
+ fn sample_topic(id: u32, name: &str) -> TopicHeader {
+ TopicHeader {
+ id,
+ created_at: 1_710_000_000_000,
+ partitions_count: 3,
+ message_expiry: 0,
+ compression_algorithm: 1,
+ max_topic_size: 0,
+ replication_factor: 1,
+ size_bytes: 1024,
+ messages_count: 100,
+ name: WireName::new(name).unwrap(),
+ }
+ }
+
+ #[test]
+ fn roundtrip_with_topics() {
+ let resp = GetStreamResponse {
+ stream: sample_stream(),
+ topics: vec![sample_topic(1, "topic-a"), sample_topic(2,
"topic-b")],
+ };
+ let bytes = resp.to_bytes();
+ let (decoded, consumed) = GetStreamResponse::decode(&bytes).unwrap();
+ assert_eq!(consumed, bytes.len());
+ assert_eq!(decoded, resp);
+ }
+
+ #[test]
+ fn roundtrip_no_topics() {
+ let resp = GetStreamResponse {
+ stream: StreamResponse {
+ topics_count: 0,
+ ..sample_stream()
+ },
+ topics: vec![],
+ };
+ let bytes = resp.to_bytes();
+ let (decoded, consumed) = GetStreamResponse::decode(&bytes).unwrap();
+ assert_eq!(consumed, bytes.len());
+ assert_eq!(decoded, resp);
+ }
+
+ #[test]
+ fn topic_header_roundtrip() {
+ let topic = sample_topic(5, "events");
+ let bytes = topic.to_bytes();
+ assert_eq!(bytes.len(), TopicHeader::FIXED_SIZE + 6);
+ let (decoded, consumed) = TopicHeader::decode(&bytes).unwrap();
+ assert_eq!(consumed, bytes.len());
+ assert_eq!(decoded, topic);
+ }
+
+ #[test]
+ fn topic_header_truncated_returns_error() {
+ let topic = sample_topic(1, "t");
+ let bytes = topic.to_bytes();
+ for i in 0..bytes.len() {
+ assert!(
+ TopicHeader::decode(&bytes[..i]).is_err(),
+ "expected error for truncation at byte {i}"
+ );
+ }
+ }
+}
diff --git a/core/binary_protocol/src/responses/streams/get_streams.rs
b/core/binary_protocol/src/responses/streams/get_streams.rs
new file mode 100644
index 000000000..3221c45e4
--- /dev/null
+++ b/core/binary_protocol/src/responses/streams/get_streams.rs
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::codec::{WireDecode, WireEncode};
+use crate::responses::streams::StreamResponse;
+use bytes::BytesMut;
+
+/// `GetStreams` response: sequential stream headers.
+///
+/// Wire format:
+/// ```text
+/// [StreamResponse]*
+/// ```
+///
+/// Empty payload means zero streams.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct GetStreamsResponse {
+ pub streams: Vec<StreamResponse>,
+}
+
+impl WireEncode for GetStreamsResponse {
+ fn encoded_size(&self) -> usize {
+ self.streams.iter().map(WireEncode::encoded_size).sum()
+ }
+
+ fn encode(&self, buf: &mut BytesMut) {
+ for stream in &self.streams {
+ stream.encode(buf);
+ }
+ }
+}
+
+impl WireDecode for GetStreamsResponse {
+ fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+ let mut streams = Vec::new();
+ let mut pos = 0;
+ while pos < buf.len() {
+ let (stream, consumed) = StreamResponse::decode(&buf[pos..])?;
+ pos += consumed;
+ streams.push(stream);
+ }
+ Ok((Self { streams }, pos))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::WireName;
+
+ #[test]
+ fn roundtrip_empty() {
+ let resp = GetStreamsResponse { streams: vec![] };
+ let bytes = resp.to_bytes();
+ assert!(bytes.is_empty());
+ let (decoded, consumed) = GetStreamsResponse::decode(&bytes).unwrap();
+ assert_eq!(consumed, 0);
+ assert_eq!(decoded, resp);
+ }
+
+ #[test]
+ fn roundtrip_multiple() {
+ let resp = GetStreamsResponse {
+ streams: vec![
+ StreamResponse {
+ id: 1,
+ created_at: 100,
+ topics_count: 2,
+ size_bytes: 512,
+ messages_count: 50,
+ name: WireName::new("s1").unwrap(),
+ },
+ StreamResponse {
+ id: 2,
+ created_at: 200,
+ topics_count: 0,
+ size_bytes: 0,
+ messages_count: 0,
+ name: WireName::new("stream-two").unwrap(),
+ },
+ ],
+ };
+ let bytes = resp.to_bytes();
+ let (decoded, consumed) = GetStreamsResponse::decode(&bytes).unwrap();
+ assert_eq!(consumed, bytes.len());
+ assert_eq!(decoded, resp);
+ }
+}
diff --git a/core/binary_protocol/src/responses/streams/mod.rs
b/core/binary_protocol/src/responses/streams/mod.rs
new file mode 100644
index 000000000..e245a5648
--- /dev/null
+++ b/core/binary_protocol/src/responses/streams/mod.rs
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod create_stream;
+mod delete_stream;
+pub mod get_stream;
+pub mod get_streams;
+mod purge_stream;
+mod stream_response;
+mod update_stream;
+
+pub use super::EmptyResponse;
+pub use create_stream::CreateStreamResponse;
+pub use delete_stream::DeleteStreamResponse;
+pub use get_stream::{GetStreamResponse, TopicHeader};
+pub use get_streams::GetStreamsResponse;
+pub use purge_stream::PurgeStreamResponse;
+pub use stream_response::StreamResponse;
+pub use update_stream::UpdateStreamResponse;
diff --git a/core/binary_protocol/src/responses/streams/purge_stream.rs
b/core/binary_protocol/src/responses/streams/purge_stream.rs
new file mode 100644
index 000000000..364e563eb
--- /dev/null
+++ b/core/binary_protocol/src/responses/streams/purge_stream.rs
@@ -0,0 +1,19 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// `PurgeStream` response is empty.
+pub type PurgeStreamResponse = super::EmptyResponse;
diff --git a/core/binary_protocol/src/responses/streams/stream_response.rs
b/core/binary_protocol/src/responses/streams/stream_response.rs
new file mode 100644
index 000000000..b6fbe02e9
--- /dev/null
+++ b/core/binary_protocol/src/responses/streams/stream_response.rs
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::codec::{WireDecode, WireEncode, read_u32_le, read_u64_le};
+use crate::identifier::WireName;
+use bytes::{BufMut, BytesMut};
+
+/// Stream header on the wire. Used in both single-stream and multi-stream
responses.
+///
+/// Wire format (33 + `name_len` bytes):
+/// ```text
+///
[id:4][created_at:8][topics_count:4][size_bytes:8][messages_count:8][name_len:1][name:N]
+/// ```
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct StreamResponse {
+ pub id: u32,
+ pub created_at: u64,
+ pub topics_count: u32,
+ pub size_bytes: u64,
+ pub messages_count: u64,
+ pub name: WireName,
+}
+
+impl StreamResponse {
+ const FIXED_SIZE: usize = 4 + 8 + 4 + 8 + 8 + 1; // 33
+}
+
+impl WireEncode for StreamResponse {
+ fn encoded_size(&self) -> usize {
+ Self::FIXED_SIZE + self.name.len()
+ }
+
+ fn encode(&self, buf: &mut BytesMut) {
+ buf.put_u32_le(self.id);
+ buf.put_u64_le(self.created_at);
+ buf.put_u32_le(self.topics_count);
+ buf.put_u64_le(self.size_bytes);
+ buf.put_u64_le(self.messages_count);
+ self.name.encode(buf);
+ }
+}
+
+impl WireDecode for StreamResponse {
+ fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+ let id = read_u32_le(buf, 0)?;
+ let created_at = read_u64_le(buf, 4)?;
+ let topics_count = read_u32_le(buf, 12)?;
+ let size_bytes = read_u64_le(buf, 16)?;
+ let messages_count = read_u64_le(buf, 24)?;
+ let (name, name_consumed) = WireName::decode(&buf[32..])?;
+ let consumed = 32 + name_consumed;
+
+ Ok((
+ Self {
+ id,
+ created_at,
+ topics_count,
+ size_bytes,
+ messages_count,
+ name,
+ },
+ consumed,
+ ))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ fn sample() -> StreamResponse {
+ StreamResponse {
+ id: 1,
+ created_at: 1_710_000_000_000,
+ topics_count: 3,
+ size_bytes: 1024,
+ messages_count: 100,
+ name: WireName::new("test-stream").unwrap(),
+ }
+ }
+
+ #[test]
+ fn roundtrip() {
+ let resp = sample();
+ let bytes = resp.to_bytes();
+ assert_eq!(bytes.len(), StreamResponse::FIXED_SIZE + 11);
+ let (decoded, consumed) = StreamResponse::decode(&bytes).unwrap();
+ assert_eq!(consumed, bytes.len());
+ assert_eq!(decoded, resp);
+ }
+
+ #[test]
+ fn truncated_returns_error() {
+ let resp = sample();
+ let bytes = resp.to_bytes();
+ for i in 0..bytes.len() {
+ assert!(
+ StreamResponse::decode(&bytes[..i]).is_err(),
+ "expected error for truncation at byte {i}"
+ );
+ }
+ }
+
+ #[test]
+ fn multiple_streams_sequential() {
+ let s1 = StreamResponse {
+ id: 1,
+ name: WireName::new("a").unwrap(),
+ ..sample()
+ };
+ let s2 = StreamResponse {
+ id: 2,
+ name: WireName::new("bb").unwrap(),
+ ..sample()
+ };
+ let mut buf = BytesMut::new();
+ s1.encode(&mut buf);
+ s2.encode(&mut buf);
+ let bytes = buf.freeze();
+
+ let (d1, pos1) = StreamResponse::decode(&bytes).unwrap();
+ let (d2, pos2) = StreamResponse::decode(&bytes[pos1..]).unwrap();
+ assert_eq!(d1, s1);
+ assert_eq!(d2, s2);
+ assert_eq!(pos1 + pos2, bytes.len());
+ }
+}
diff --git a/core/binary_protocol/src/responses/streams/update_stream.rs
b/core/binary_protocol/src/responses/streams/update_stream.rs
new file mode 100644
index 000000000..b002d9c86
--- /dev/null
+++ b/core/binary_protocol/src/responses/streams/update_stream.rs
@@ -0,0 +1,19 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// `UpdateStream` response is empty.
+pub type UpdateStreamResponse = super::EmptyResponse;