This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch refactor-binary-2-wire-types
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 7b9d17c4688e02f75c4f04b39175d21e2e630c83
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Mar 16 15:51:05 2026 +0100

    feat(rust): add wire protocol codec and types to binary_protocol crate
    
    Pure codec layer (no I/O, no async) with protocol-owned
    types decoupled from iggy_common. Adds WireEncode/WireDecode
    traits, WireName validated newtype, WireIdentifier, command
    codes, stream request/response types, VSR consensus headers
    with bytemuck zero-copy, and Operation-to-CommandCode mapping.
    62 tests. First step - remaining domains to follow.
---
 Cargo.lock                                         |   6 +
 core/binary_protocol/Cargo.toml                    |  11 +-
 core/binary_protocol/src/codec.rs                  | 174 +++++
 core/binary_protocol/src/codes.rs                  | 218 ++++++
 core/binary_protocol/src/consensus/command.rs      |  67 ++
 core/binary_protocol/src/consensus/error.rs        |  64 ++
 core/binary_protocol/src/consensus/header.rs       | 744 +++++++++++++++++++++
 core/binary_protocol/src/consensus/message.rs      | 331 +++++++++
 core/binary_protocol/src/consensus/mod.rs          |  54 ++
 core/binary_protocol/src/consensus/operation.rs    | 218 ++++++
 core/binary_protocol/src/error.rs                  |  50 ++
 core/binary_protocol/src/frame.rs                  |  26 +
 core/binary_protocol/src/identifier.rs             | 252 +++++++
 core/binary_protocol/src/lib.rs                    |  77 ++-
 core/binary_protocol/src/requests/mod.rs           |  18 +
 .../src/requests/streams/create_stream.rs          |  96 +++
 .../src/requests/streams/delete_stream.rs          |  60 ++
 .../src/requests/streams/get_stream.rs             |  80 +++
 .../src/requests/streams/get_streams.rs            |  53 ++
 core/binary_protocol/src/requests/streams/mod.rs   |  30 +
 .../src/requests/streams/purge_stream.rs           |  60 ++
 .../src/requests/streams/update_stream.rs          |  81 +++
 core/binary_protocol/src/responses/mod.rs          |  40 ++
 .../src/responses/streams/create_stream.rs         |  22 +
 .../src/responses/streams/delete_stream.rs         |  19 +
 .../src/responses/streams/get_stream.rs            | 232 +++++++
 .../src/responses/streams/get_streams.rs           | 103 +++
 core/binary_protocol/src/responses/streams/mod.rs  |  33 +
 .../src/responses/streams/purge_stream.rs          |  19 +
 .../src/responses/streams/stream_response.rs       | 142 ++++
 .../src/responses/streams/update_stream.rs         |  19 +
 core/binary_protocol/src/wire_name.rs              | 170 +++++
 32 files changed, 3551 insertions(+), 18 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 3afb4cadd..e1e5efb8c 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5323,6 +5323,12 @@ dependencies = [
 [[package]]
 name = "iggy_binary_protocol"
 version = "0.9.2-edge.1"
+dependencies = [
+ "bytemuck",
+ "bytes",
+ "enumset",
+ "thiserror 2.0.18",
+]
 
 [[package]]
 name = "iggy_common"
diff --git a/core/binary_protocol/Cargo.toml b/core/binary_protocol/Cargo.toml
index 2d809084d..12ed184fd 100644
--- a/core/binary_protocol/Cargo.toml
+++ b/core/binary_protocol/Cargo.toml
@@ -18,7 +18,7 @@
 [package]
 name = "iggy_binary_protocol"
 version = "0.9.2-edge.1"
-description = "Iggy is the persistent message streaming platform written in 
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing 
millions of messages per second."
+description = "Wire protocol types and codec for the Iggy binary protocol. 
Shared between server and SDK."
 edition = "2024"
 license = "Apache-2.0"
 keywords = ["iggy", "messaging", "streaming"]
@@ -29,3 +29,12 @@ repository = "https://github.com/apache/iggy";
 readme = "../../README.md"
 
 [dependencies]
+bytemuck = { workspace = true }
+bytes = { workspace = true }
+enumset = { workspace = true }
+thiserror = { workspace = true }
+
+[lints.clippy]
+enum_glob_use = "deny"
+nursery = "warn"
+pedantic = "deny"
diff --git a/core/binary_protocol/src/codec.rs 
b/core/binary_protocol/src/codec.rs
new file mode 100644
index 000000000..3aedd9c5e
--- /dev/null
+++ b/core/binary_protocol/src/codec.rs
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use bytes::{Bytes, BytesMut};
+
+/// Encode a wire type into a caller-owned buffer.
+///
+/// Buffer-first design: the caller controls allocation. The `to_bytes()`
+/// convenience method allocates when needed, but hot paths can reuse
+/// buffers via `encode()` directly.
+pub trait WireEncode {
+    /// Write the encoded representation into `buf`.
+    fn encode(&self, buf: &mut BytesMut);
+
+    /// Return the exact encoded size in bytes.
+    fn encoded_size(&self) -> usize;
+
+    /// Convenience: allocate a new [`Bytes`] and encode into it.
+    #[must_use]
+    fn to_bytes(&self) -> Bytes {
+        let mut buf = BytesMut::with_capacity(self.encoded_size());
+        self.encode(&mut buf);
+        buf.freeze()
+    }
+}
+
+/// Decode a wire type from a byte slice.
+///
+/// Takes `&[u8]` instead of [`Bytes`] to avoid requiring reference-counted
+/// ownership at the decode boundary.
+pub trait WireDecode: Sized {
+    /// Decode from `buf`, consuming exactly the bytes needed.
+    /// Returns the decoded value and the number of bytes consumed.
+    ///
+    /// # Errors
+    /// Returns `WireError` if the buffer is too short or contains invalid 
data.
+    fn decode(buf: &[u8]) -> Result<(Self, usize), WireError>;
+
+    /// Convenience: decode from the entire buffer, ignoring trailing bytes.
+    ///
+    /// # Errors
+    /// Returns `WireError` if decoding fails.
+    fn decode_from(buf: &[u8]) -> Result<Self, WireError> {
+        Self::decode(buf).map(|(val, _)| val)
+    }
+}
+
+/// Helper to read a `u8` from `buf` at `offset`.
+///
+/// # Errors
+/// Returns `WireError::UnexpectedEof` if `offset` is out of bounds.
+#[inline]
+pub fn read_u8(buf: &[u8], offset: usize) -> Result<u8, WireError> {
+    buf.get(offset)
+        .copied()
+        .ok_or_else(|| WireError::UnexpectedEof {
+            offset,
+            need: 1,
+            have: buf.len().saturating_sub(offset),
+        })
+}
+
+/// Helper to read a `u32` LE from `buf` at `offset`.
+///
+/// # Errors
+/// Returns `WireError::UnexpectedEof` if fewer than 4 bytes remain.
+#[allow(clippy::missing_panics_doc)]
+#[inline]
+pub fn read_u32_le(buf: &[u8], offset: usize) -> Result<u32, WireError> {
+    let end = offset
+        .checked_add(4)
+        .ok_or_else(|| WireError::UnexpectedEof {
+            offset,
+            need: 4,
+            have: buf.len().saturating_sub(offset),
+        })?;
+    let slice = buf
+        .get(offset..end)
+        .ok_or_else(|| WireError::UnexpectedEof {
+            offset,
+            need: 4,
+            have: buf.len().saturating_sub(offset),
+        })?;
+    Ok(u32::from_le_bytes(
+        slice.try_into().expect("slice is exactly 4 bytes"),
+    ))
+}
+
+/// Helper to read a `u64` LE from `buf` at `offset`.
+///
+/// # Errors
+/// Returns `WireError::UnexpectedEof` if fewer than 8 bytes remain.
+#[allow(clippy::missing_panics_doc)]
+#[inline]
+pub fn read_u64_le(buf: &[u8], offset: usize) -> Result<u64, WireError> {
+    let end = offset
+        .checked_add(8)
+        .ok_or_else(|| WireError::UnexpectedEof {
+            offset,
+            need: 8,
+            have: buf.len().saturating_sub(offset),
+        })?;
+    let slice = buf
+        .get(offset..end)
+        .ok_or_else(|| WireError::UnexpectedEof {
+            offset,
+            need: 8,
+            have: buf.len().saturating_sub(offset),
+        })?;
+    Ok(u64::from_le_bytes(
+        slice.try_into().expect("slice is exactly 8 bytes"),
+    ))
+}
+
+/// Helper to read a UTF-8 string of `len` bytes from `buf` at `offset`.
+///
+/// # Errors
+/// Returns `WireError::UnexpectedEof` or `WireError::InvalidUtf8` on failure.
+#[inline]
+pub fn read_str(buf: &[u8], offset: usize, len: usize) -> Result<String, 
WireError> {
+    let end = offset
+        .checked_add(len)
+        .ok_or_else(|| WireError::UnexpectedEof {
+            offset,
+            need: len,
+            have: buf.len().saturating_sub(offset),
+        })?;
+    let slice = buf
+        .get(offset..end)
+        .ok_or_else(|| WireError::UnexpectedEof {
+            offset,
+            need: len,
+            have: buf.len().saturating_sub(offset),
+        })?;
+    std::str::from_utf8(slice)
+        .map(str::to_string)
+        .map_err(|_| WireError::InvalidUtf8 { offset })
+}
+
+/// Helper to read a byte slice of `len` bytes from `buf` at `offset`.
+///
+/// # Errors
+/// Returns `WireError::UnexpectedEof` if fewer than `len` bytes remain.
+#[inline]
+pub fn read_bytes(buf: &[u8], offset: usize, len: usize) -> Result<&[u8], 
WireError> {
+    let end = offset
+        .checked_add(len)
+        .ok_or_else(|| WireError::UnexpectedEof {
+            offset,
+            need: len,
+            have: buf.len().saturating_sub(offset),
+        })?;
+    buf.get(offset..end)
+        .ok_or_else(|| WireError::UnexpectedEof {
+            offset,
+            need: len,
+            have: buf.len().saturating_sub(offset),
+        })
+}
diff --git a/core/binary_protocol/src/codes.rs 
b/core/binary_protocol/src/codes.rs
new file mode 100644
index 000000000..254cd210b
--- /dev/null
+++ b/core/binary_protocol/src/codes.rs
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+
+// -- System --
+pub const PING_CODE: u32 = 1;
+pub const GET_STATS_CODE: u32 = 10;
+pub const GET_SNAPSHOT_FILE_CODE: u32 = 11;
+pub const GET_CLUSTER_METADATA_CODE: u32 = 12;
+pub const GET_ME_CODE: u32 = 20;
+pub const GET_CLIENT_CODE: u32 = 21;
+pub const GET_CLIENTS_CODE: u32 = 22;
+
+// -- Users --
+pub const GET_USER_CODE: u32 = 31;
+pub const GET_USERS_CODE: u32 = 32;
+pub const CREATE_USER_CODE: u32 = 33;
+pub const DELETE_USER_CODE: u32 = 34;
+pub const UPDATE_USER_CODE: u32 = 35;
+pub const UPDATE_PERMISSIONS_CODE: u32 = 36;
+pub const CHANGE_PASSWORD_CODE: u32 = 37;
+pub const LOGIN_USER_CODE: u32 = 38;
+pub const LOGOUT_USER_CODE: u32 = 39;
+
+// -- Personal Access Tokens --
+pub const GET_PERSONAL_ACCESS_TOKENS_CODE: u32 = 41;
+pub const CREATE_PERSONAL_ACCESS_TOKEN_CODE: u32 = 42;
+pub const DELETE_PERSONAL_ACCESS_TOKEN_CODE: u32 = 43;
+pub const LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE: u32 = 44;
+
+// -- Messages --
+pub const POLL_MESSAGES_CODE: u32 = 100;
+pub const SEND_MESSAGES_CODE: u32 = 101;
+pub const FLUSH_UNSAVED_BUFFER_CODE: u32 = 102;
+
+// -- Consumer Offsets --
+pub const GET_CONSUMER_OFFSET_CODE: u32 = 120;
+pub const STORE_CONSUMER_OFFSET_CODE: u32 = 121;
+pub const DELETE_CONSUMER_OFFSET_CODE: u32 = 122;
+
+// -- Streams --
+pub const GET_STREAM_CODE: u32 = 200;
+pub const GET_STREAMS_CODE: u32 = 201;
+pub const CREATE_STREAM_CODE: u32 = 202;
+pub const DELETE_STREAM_CODE: u32 = 203;
+pub const UPDATE_STREAM_CODE: u32 = 204;
+pub const PURGE_STREAM_CODE: u32 = 205;
+
+// -- Topics --
+pub const GET_TOPIC_CODE: u32 = 300;
+pub const GET_TOPICS_CODE: u32 = 301;
+pub const CREATE_TOPIC_CODE: u32 = 302;
+pub const DELETE_TOPIC_CODE: u32 = 303;
+pub const UPDATE_TOPIC_CODE: u32 = 304;
+pub const PURGE_TOPIC_CODE: u32 = 305;
+
+// -- Partitions --
+pub const CREATE_PARTITIONS_CODE: u32 = 402;
+pub const DELETE_PARTITIONS_CODE: u32 = 403;
+
+// -- Segments --
+pub const DELETE_SEGMENTS_CODE: u32 = 503;
+
+// -- Consumer Groups --
+pub const GET_CONSUMER_GROUP_CODE: u32 = 600;
+pub const GET_CONSUMER_GROUPS_CODE: u32 = 601;
+pub const CREATE_CONSUMER_GROUP_CODE: u32 = 602;
+pub const DELETE_CONSUMER_GROUP_CODE: u32 = 603;
+pub const JOIN_CONSUMER_GROUP_CODE: u32 = 604;
+pub const LEAVE_CONSUMER_GROUP_CODE: u32 = 605;
+
+/// # Errors
+/// Returns `WireError::UnknownCommand` if the code is not recognized.
+pub const fn command_name(code: u32) -> Result<&'static str, WireError> {
+    match code {
+        PING_CODE => Ok("ping"),
+        GET_STATS_CODE => Ok("stats"),
+        GET_SNAPSHOT_FILE_CODE => Ok("snapshot"),
+        GET_CLUSTER_METADATA_CODE => Ok("cluster.metadata"),
+        GET_ME_CODE => Ok("me"),
+        GET_CLIENT_CODE => Ok("client.get"),
+        GET_CLIENTS_CODE => Ok("client.list"),
+        GET_USER_CODE => Ok("user.get"),
+        GET_USERS_CODE => Ok("user.list"),
+        CREATE_USER_CODE => Ok("user.create"),
+        DELETE_USER_CODE => Ok("user.delete"),
+        UPDATE_USER_CODE => Ok("user.update"),
+        UPDATE_PERMISSIONS_CODE => Ok("user.permissions"),
+        CHANGE_PASSWORD_CODE => Ok("user.password"),
+        LOGIN_USER_CODE => Ok("user.login"),
+        LOGOUT_USER_CODE => Ok("user.logout"),
+        GET_PERSONAL_ACCESS_TOKENS_CODE => Ok("personal_access_token.list"),
+        CREATE_PERSONAL_ACCESS_TOKEN_CODE => 
Ok("personal_access_token.create"),
+        DELETE_PERSONAL_ACCESS_TOKEN_CODE => 
Ok("personal_access_token.delete"),
+        LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE => 
Ok("personal_access_token.login"),
+        POLL_MESSAGES_CODE => Ok("message.poll"),
+        SEND_MESSAGES_CODE => Ok("message.send"),
+        FLUSH_UNSAVED_BUFFER_CODE => Ok("message.flush_unsaved_buffer"),
+        GET_CONSUMER_OFFSET_CODE => Ok("consumer_offset.get"),
+        STORE_CONSUMER_OFFSET_CODE => Ok("consumer_offset.store"),
+        DELETE_CONSUMER_OFFSET_CODE => Ok("consumer_offset.delete"),
+        GET_STREAM_CODE => Ok("stream.get"),
+        GET_STREAMS_CODE => Ok("stream.list"),
+        CREATE_STREAM_CODE => Ok("stream.create"),
+        DELETE_STREAM_CODE => Ok("stream.delete"),
+        UPDATE_STREAM_CODE => Ok("stream.update"),
+        PURGE_STREAM_CODE => Ok("stream.purge"),
+        GET_TOPIC_CODE => Ok("topic.get"),
+        GET_TOPICS_CODE => Ok("topic.list"),
+        CREATE_TOPIC_CODE => Ok("topic.create"),
+        DELETE_TOPIC_CODE => Ok("topic.delete"),
+        UPDATE_TOPIC_CODE => Ok("topic.update"),
+        PURGE_TOPIC_CODE => Ok("topic.purge"),
+        CREATE_PARTITIONS_CODE => Ok("partition.create"),
+        DELETE_PARTITIONS_CODE => Ok("partition.delete"),
+        DELETE_SEGMENTS_CODE => Ok("segment.delete"),
+        GET_CONSUMER_GROUP_CODE => Ok("consumer_group.get"),
+        GET_CONSUMER_GROUPS_CODE => Ok("consumer_group.list"),
+        CREATE_CONSUMER_GROUP_CODE => Ok("consumer_group.create"),
+        DELETE_CONSUMER_GROUP_CODE => Ok("consumer_group.delete"),
+        JOIN_CONSUMER_GROUP_CODE => Ok("consumer_group.join"),
+        LEAVE_CONSUMER_GROUP_CODE => Ok("consumer_group.leave"),
+        _ => Err(WireError::UnknownCommand(code)),
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    const ALL_CODES: &[u32] = &[
+        PING_CODE,
+        GET_STATS_CODE,
+        GET_SNAPSHOT_FILE_CODE,
+        GET_CLUSTER_METADATA_CODE,
+        GET_ME_CODE,
+        GET_CLIENT_CODE,
+        GET_CLIENTS_CODE,
+        GET_USER_CODE,
+        GET_USERS_CODE,
+        CREATE_USER_CODE,
+        DELETE_USER_CODE,
+        UPDATE_USER_CODE,
+        UPDATE_PERMISSIONS_CODE,
+        CHANGE_PASSWORD_CODE,
+        LOGIN_USER_CODE,
+        LOGOUT_USER_CODE,
+        GET_PERSONAL_ACCESS_TOKENS_CODE,
+        CREATE_PERSONAL_ACCESS_TOKEN_CODE,
+        DELETE_PERSONAL_ACCESS_TOKEN_CODE,
+        LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE,
+        POLL_MESSAGES_CODE,
+        SEND_MESSAGES_CODE,
+        FLUSH_UNSAVED_BUFFER_CODE,
+        GET_CONSUMER_OFFSET_CODE,
+        STORE_CONSUMER_OFFSET_CODE,
+        DELETE_CONSUMER_OFFSET_CODE,
+        GET_STREAM_CODE,
+        GET_STREAMS_CODE,
+        CREATE_STREAM_CODE,
+        DELETE_STREAM_CODE,
+        UPDATE_STREAM_CODE,
+        PURGE_STREAM_CODE,
+        GET_TOPIC_CODE,
+        GET_TOPICS_CODE,
+        CREATE_TOPIC_CODE,
+        DELETE_TOPIC_CODE,
+        UPDATE_TOPIC_CODE,
+        PURGE_TOPIC_CODE,
+        CREATE_PARTITIONS_CODE,
+        DELETE_PARTITIONS_CODE,
+        DELETE_SEGMENTS_CODE,
+        GET_CONSUMER_GROUP_CODE,
+        GET_CONSUMER_GROUPS_CODE,
+        CREATE_CONSUMER_GROUP_CODE,
+        DELETE_CONSUMER_GROUP_CODE,
+        JOIN_CONSUMER_GROUP_CODE,
+        LEAVE_CONSUMER_GROUP_CODE,
+    ];
+
+    #[test]
+    fn every_code_has_a_name() {
+        for &code in ALL_CODES {
+            assert!(
+                command_name(code).is_ok(),
+                "missing name for command code {code}"
+            );
+        }
+    }
+
+    #[test]
+    fn no_duplicate_codes() {
+        let mut seen = std::collections::HashSet::new();
+        for &code in ALL_CODES {
+            assert!(seen.insert(code), "duplicate command code: {code}");
+        }
+    }
+
+    #[test]
+    fn unknown_code_returns_error() {
+        assert!(command_name(9999).is_err());
+    }
+}
diff --git a/core/binary_protocol/src/consensus/command.rs 
b/core/binary_protocol/src/consensus/command.rs
new file mode 100644
index 000000000..8eea606bc
--- /dev/null
+++ b/core/binary_protocol/src/consensus/command.rs
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytemuck::{CheckedBitPattern, NoUninit};
+use enumset::EnumSetType;
+
+/// VSR message type discriminant.
+#[derive(Default, Debug, EnumSetType)]
+#[repr(u8)]
+pub enum Command2 {
+    #[default]
+    Reserved = 0,
+
+    Ping = 1,
+    Pong = 2,
+    PingClient = 3,
+    PongClient = 4,
+
+    Request = 5,
+    Prepare = 6,
+    PrepareOk = 7,
+    Reply = 8,
+    Commit = 9,
+
+    StartViewChange = 10,
+    DoViewChange = 11,
+    StartView = 12,
+}
+
+// SAFETY: Command2 is #[repr(u8)] with no padding bytes.
+unsafe impl NoUninit for Command2 {}
+
+// SAFETY: Command2 is #[repr(u8)]; is_valid_bit_pattern matches all defined 
discriminants.
+unsafe impl CheckedBitPattern for Command2 {
+    type Bits = u8;
+
+    fn is_valid_bit_pattern(bits: &u8) -> bool {
+        *bits <= 12
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::consensus::GenericHeader;
+
+    #[test]
+    fn invalid_bit_pattern_rejected() {
+        let mut buf = bytes::BytesMut::zeroed(256);
+        buf[60] = 99;
+        let result = bytemuck::checked::try_from_bytes::<GenericHeader>(&buf);
+        assert!(result.is_err());
+    }
+}
diff --git a/core/binary_protocol/src/consensus/error.rs 
b/core/binary_protocol/src/consensus/error.rs
new file mode 100644
index 000000000..cb7838128
--- /dev/null
+++ b/core/binary_protocol/src/consensus/error.rs
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::command::Command2;
+use thiserror::Error;
+
+#[derive(Debug, Clone, Error, PartialEq, Eq)]
+pub enum ConsensusError {
+    #[error("invalid command: expected {expected:?}, found {found:?}")]
+    InvalidCommand { expected: Command2, found: Command2 },
+
+    #[error("invalid size: expected {expected:?}, found {found:?}")]
+    InvalidSize { expected: u32, found: u32 },
+
+    #[error("invalid checksum")]
+    InvalidChecksum,
+
+    #[error("invalid cluster ID")]
+    InvalidCluster,
+
+    #[error("invalid field: {0}")]
+    InvalidField(String),
+
+    // -- Reserved for future consensus validation --
+    // These variants exist in the original iggy_common implementation
+    // and will be used when the consensus crate migrates to this crate.
+    #[error("parent_padding must be 0")]
+    PrepareParentPaddingNonZero,
+
+    #[error("request_checksum_padding must be 0")]
+    PrepareRequestChecksumPaddingNonZero,
+
+    #[error("command must be Commit")]
+    CommitInvalidCommand2,
+
+    #[error("size must be 256, found {0}")]
+    CommitInvalidSize(u32),
+
+    #[error("command must be Reply")]
+    ReplyInvalidCommand2,
+
+    #[error("request_checksum_padding must be 0")]
+    ReplyRequestChecksumPaddingNonZero,
+
+    #[error("context_padding must be 0")]
+    ReplyContextPaddingNonZero,
+
+    #[error("invalid bit pattern in header (enum discriminant out of range)")]
+    InvalidBitPattern,
+}
diff --git a/core/binary_protocol/src/consensus/header.rs 
b/core/binary_protocol/src/consensus/header.rs
new file mode 100644
index 000000000..648aae6ef
--- /dev/null
+++ b/core/binary_protocol/src/consensus/header.rs
@@ -0,0 +1,744 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! All consensus headers are exactly 256 bytes with `#[repr(C)]` layout.
+//! Size and field offsets are enforced at compile time. Deserialization
+//! is a pointer cast (zero-copy) via `bytemuck::try_from_bytes`.
+
+use super::{Command2, ConsensusError, Operation};
+use bytemuck::{CheckedBitPattern, NoUninit};
+use std::mem::offset_of;
+
+pub const HEADER_SIZE: usize = 256;
+
+/// Trait implemented by all consensus header types.
+///
+/// Every header is exactly [`HEADER_SIZE`] bytes, `#[repr(C)]`, and supports
+/// zero-copy deserialization via `bytemuck`.
+pub trait ConsensusHeader: Sized + CheckedBitPattern + NoUninit {
+    const COMMAND: Command2;
+
+    /// # Errors
+    /// Returns `ConsensusError` if the header fields are inconsistent.
+    fn validate(&self) -> Result<(), ConsensusError>;
+    fn operation(&self) -> Operation;
+    fn command(&self) -> Command2;
+    fn size(&self) -> u32;
+}
+
+// ---------------------------------------------------------------------------
+// GenericHeader - type-erased dispatch
+// ---------------------------------------------------------------------------
+
+/// Type-erased 256-byte header for initial message dispatch.
+#[repr(C)]
+#[derive(Debug, Clone, Copy, CheckedBitPattern, NoUninit)]
+pub struct GenericHeader {
+    pub checksum: u128,
+    pub checksum_body: u128,
+    pub cluster: u128,
+    pub size: u32,
+    pub view: u32,
+    pub release: u32,
+    pub command: Command2,
+    pub replica: u8,
+    pub reserved_frame: [u8; 66],
+    pub reserved_command: [u8; 128],
+}
+const _: () = {
+    assert!(size_of::<GenericHeader>() == HEADER_SIZE);
+    assert!(
+        offset_of!(GenericHeader, reserved_command)
+            == offset_of!(GenericHeader, reserved_frame) + size_of::<[u8; 
66]>()
+    );
+    assert!(offset_of!(GenericHeader, reserved_command) + size_of::<[u8; 
128]>() == HEADER_SIZE);
+};
+
+impl ConsensusHeader for GenericHeader {
+    const COMMAND: Command2 = Command2::Reserved;
+    fn operation(&self) -> Operation {
+        Operation::Reserved
+    }
+    fn command(&self) -> Command2 {
+        self.command
+    }
+    fn validate(&self) -> Result<(), ConsensusError> {
+        Ok(())
+    }
+    fn size(&self) -> u32 {
+        self.size
+    }
+}
+
+// ---------------------------------------------------------------------------
+// RequestHeader - client -> primary
+// ---------------------------------------------------------------------------
+
+/// Client -> primary request header. 256 bytes.
+#[repr(C)]
+#[derive(Debug, Clone, Copy, CheckedBitPattern, NoUninit)]
+pub struct RequestHeader {
+    pub checksum: u128,
+    pub checksum_body: u128,
+    pub cluster: u128,
+    pub size: u32,
+    pub view: u32,
+    pub release: u32,
+    pub command: Command2,
+    pub replica: u8,
+    pub reserved_frame: [u8; 66],
+
+    pub client: u128,
+    pub request_checksum: u128,
+    pub timestamp: u64,
+    pub request: u64,
+    pub operation: Operation,
+    pub operation_padding: [u8; 7],
+    pub namespace: u64,
+    pub reserved: [u8; 64],
+}
+const _: () = {
+    assert!(size_of::<RequestHeader>() == HEADER_SIZE);
+    assert!(
+        offset_of!(RequestHeader, client)
+            == offset_of!(RequestHeader, reserved_frame) + size_of::<[u8; 
66]>()
+    );
+    assert!(offset_of!(RequestHeader, reserved) + size_of::<[u8; 64]>() == 
HEADER_SIZE);
+};
+
+impl Default for RequestHeader {
+    fn default() -> Self {
+        Self {
+            checksum: 0,
+            checksum_body: 0,
+            cluster: 0,
+            size: 0,
+            view: 0,
+            release: 0,
+            command: Command2::Reserved,
+            replica: 0,
+            reserved_frame: [0; 66],
+            client: 0,
+            request_checksum: 0,
+            timestamp: 0,
+            request: 0,
+            operation: Operation::Reserved,
+            operation_padding: [0; 7],
+            namespace: 0,
+            reserved: [0; 64],
+        }
+    }
+}
+
+impl ConsensusHeader for RequestHeader {
+    const COMMAND: Command2 = Command2::Request;
+    fn operation(&self) -> Operation {
+        self.operation
+    }
+    fn command(&self) -> Command2 {
+        self.command
+    }
+    fn size(&self) -> u32 {
+        self.size
+    }
+
+    fn validate(&self) -> Result<(), ConsensusError> {
+        if self.command != Command2::Request {
+            return Err(ConsensusError::InvalidCommand {
+                expected: Command2::Request,
+                found: self.command,
+            });
+        }
+        Ok(())
+    }
+}
+
+// ---------------------------------------------------------------------------
+// ReplyHeader - primary -> client
+// ---------------------------------------------------------------------------
+
+/// Primary -> client reply header. 256 bytes.
+#[repr(C)]
+#[derive(Debug, Clone, Copy, CheckedBitPattern, NoUninit)]
+pub struct ReplyHeader {
+    pub checksum: u128,
+    pub checksum_body: u128,
+    pub cluster: u128,
+    pub size: u32,
+    pub view: u32,
+    pub release: u32,
+    pub command: Command2,
+    pub replica: u8,
+    pub reserved_frame: [u8; 66],
+
+    pub request_checksum: u128,
+    pub context: u128,
+    pub op: u64,
+    pub commit: u64,
+    pub timestamp: u64,
+    pub request: u64,
+    pub operation: Operation,
+    pub operation_padding: [u8; 7],
+    pub namespace: u64,
+    pub reserved: [u8; 48],
+}
+const _: () = {
+    assert!(size_of::<ReplyHeader>() == HEADER_SIZE);
+    assert!(
+        offset_of!(ReplyHeader, request_checksum)
+            == offset_of!(ReplyHeader, reserved_frame) + size_of::<[u8; 66]>()
+    );
+    assert!(offset_of!(ReplyHeader, reserved) + size_of::<[u8; 48]>() == 
HEADER_SIZE);
+};
+
+impl Default for ReplyHeader {
+    fn default() -> Self {
+        Self {
+            checksum: 0,
+            checksum_body: 0,
+            cluster: 0,
+            size: 0,
+            view: 0,
+            release: 0,
+            command: Command2::Reserved,
+            replica: 0,
+            reserved_frame: [0; 66],
+            request_checksum: 0,
+            context: 0,
+            op: 0,
+            commit: 0,
+            timestamp: 0,
+            request: 0,
+            operation: Operation::Reserved,
+            operation_padding: [0; 7],
+            namespace: 0,
+            reserved: [0; 48],
+        }
+    }
+}
+
+impl ConsensusHeader for ReplyHeader {
+    const COMMAND: Command2 = Command2::Reply;
+    fn operation(&self) -> Operation {
+        self.operation
+    }
+    fn command(&self) -> Command2 {
+        self.command
+    }
+    fn size(&self) -> u32 {
+        self.size
+    }
+
+    fn validate(&self) -> Result<(), ConsensusError> {
+        if self.command != Command2::Reply {
+            return Err(ConsensusError::ReplyInvalidCommand2);
+        }
+        Ok(())
+    }
+}
+
+// ---------------------------------------------------------------------------
+// PrepareHeader - primary -> replicas (replication)
+// ---------------------------------------------------------------------------
+
+/// Primary -> replicas: replicate this operation.
+#[repr(C)]
+#[derive(Debug, Clone, Copy, CheckedBitPattern, NoUninit)]
+pub struct PrepareHeader {
+    pub checksum: u128,
+    pub checksum_body: u128,
+    pub cluster: u128,
+    pub size: u32,
+    pub view: u32,
+    pub release: u32,
+    pub command: Command2,
+    pub replica: u8,
+    pub reserved_frame: [u8; 66],
+
+    pub client: u128,
+    pub parent: u128,
+    pub request_checksum: u128,
+    pub op: u64,
+    pub commit: u64,
+    pub timestamp: u64,
+    pub request: u64,
+    pub operation: Operation,
+    pub operation_padding: [u8; 7],
+    pub namespace: u64,
+    pub reserved: [u8; 32],
+}
+const _: () = {
+    assert!(size_of::<PrepareHeader>() == HEADER_SIZE);
+    assert!(
+        offset_of!(PrepareHeader, client)
+            == offset_of!(PrepareHeader, reserved_frame) + size_of::<[u8; 
66]>()
+    );
+    assert!(offset_of!(PrepareHeader, reserved) + size_of::<[u8; 32]>() == 
HEADER_SIZE);
+};
+
+impl Default for PrepareHeader {
+    fn default() -> Self {
+        Self {
+            checksum: 0,
+            checksum_body: 0,
+            cluster: 0,
+            size: 0,
+            view: 0,
+            release: 0,
+            command: Command2::Reserved,
+            replica: 0,
+            reserved_frame: [0; 66],
+            client: 0,
+            parent: 0,
+            request_checksum: 0,
+            op: 0,
+            commit: 0,
+            timestamp: 0,
+            request: 0,
+            operation: Operation::Reserved,
+            operation_padding: [0; 7],
+            namespace: 0,
+            reserved: [0; 32],
+        }
+    }
+}
+
+impl ConsensusHeader for PrepareHeader {
+    const COMMAND: Command2 = Command2::Prepare;
+    fn operation(&self) -> Operation {
+        self.operation
+    }
+    fn command(&self) -> Command2 {
+        self.command
+    }
+    fn size(&self) -> u32 {
+        self.size
+    }
+
+    fn validate(&self) -> Result<(), ConsensusError> {
+        if self.command != Command2::Prepare {
+            return Err(ConsensusError::InvalidCommand {
+                expected: Command2::Prepare,
+                found: self.command,
+            });
+        }
+        Ok(())
+    }
+}
+
+// ---------------------------------------------------------------------------
+// PrepareOkHeader - replica -> primary (acknowledgement)
+// ---------------------------------------------------------------------------
+
+/// Replica -> primary: acknowledge a Prepare.
+#[repr(C)]
+#[derive(Debug, Clone, Copy, CheckedBitPattern, NoUninit)]
+pub struct PrepareOkHeader {
+    pub checksum: u128,
+    pub checksum_body: u128,
+    pub cluster: u128,
+    pub size: u32,
+    pub view: u32,
+    pub release: u32,
+    pub command: Command2,
+    pub replica: u8,
+    pub reserved_frame: [u8; 66],
+
+    pub parent: u128,
+    pub prepare_checksum: u128,
+    pub op: u64,
+    pub commit: u64,
+    pub timestamp: u64,
+    pub request: u64,
+    pub operation: Operation,
+    pub operation_padding: [u8; 7],
+    pub namespace: u64,
+    pub reserved: [u8; 48],
+}
+const _: () = {
+    assert!(size_of::<PrepareOkHeader>() == HEADER_SIZE);
+    assert!(
+        offset_of!(PrepareOkHeader, parent)
+            == offset_of!(PrepareOkHeader, reserved_frame) + size_of::<[u8; 
66]>()
+    );
+    assert!(offset_of!(PrepareOkHeader, reserved) + size_of::<[u8; 48]>() == 
HEADER_SIZE);
+};
+
+impl Default for PrepareOkHeader {
+    fn default() -> Self {
+        Self {
+            checksum: 0,
+            checksum_body: 0,
+            cluster: 0,
+            size: 0,
+            view: 0,
+            release: 0,
+            command: Command2::Reserved,
+            replica: 0,
+            reserved_frame: [0; 66],
+            parent: 0,
+            prepare_checksum: 0,
+            op: 0,
+            commit: 0,
+            timestamp: 0,
+            request: 0,
+            operation: Operation::Reserved,
+            operation_padding: [0; 7],
+            namespace: 0,
+            reserved: [0; 48],
+        }
+    }
+}
+
+impl ConsensusHeader for PrepareOkHeader {
+    const COMMAND: Command2 = Command2::PrepareOk;
+    fn operation(&self) -> Operation {
+        self.operation
+    }
+    fn command(&self) -> Command2 {
+        self.command
+    }
+    fn size(&self) -> u32 {
+        self.size
+    }
+
+    fn validate(&self) -> Result<(), ConsensusError> {
+        if self.command != Command2::PrepareOk {
+            return Err(ConsensusError::InvalidCommand {
+                expected: Command2::PrepareOk,
+                found: self.command,
+            });
+        }
+        Ok(())
+    }
+}
+
+// ---------------------------------------------------------------------------
+// CommitHeader - primary -> replicas (commit, header-only)
+// ---------------------------------------------------------------------------
+
+/// Primary -> replicas: commit up to this op. Header-only (no body).
+#[repr(C)]
+#[derive(Debug, Clone, Copy, CheckedBitPattern, NoUninit)]
+pub struct CommitHeader {
+    pub checksum: u128,
+    pub checksum_body: u128,
+    pub cluster: u128,
+    pub size: u32,
+    pub view: u32,
+    pub release: u32,
+    pub command: Command2,
+    pub replica: u8,
+    pub reserved_frame: [u8; 66],
+
+    pub commit_checksum: u128,
+    pub timestamp_monotonic: u64,
+    pub commit: u64,
+    pub checkpoint_op: u64,
+    pub namespace: u64,
+    pub reserved: [u8; 80],
+}
+const _: () = {
+    assert!(size_of::<CommitHeader>() == HEADER_SIZE);
+    assert!(
+        offset_of!(CommitHeader, commit_checksum)
+            == offset_of!(CommitHeader, reserved_frame) + size_of::<[u8; 66]>()
+    );
+    assert!(offset_of!(CommitHeader, reserved) + size_of::<[u8; 80]>() == 
HEADER_SIZE);
+};
+
+impl ConsensusHeader for CommitHeader {
+    const COMMAND: Command2 = Command2::Commit;
+    fn operation(&self) -> Operation {
+        Operation::Reserved
+    }
+    fn command(&self) -> Command2 {
+        self.command
+    }
+    fn size(&self) -> u32 {
+        self.size
+    }
+
+    fn validate(&self) -> Result<(), ConsensusError> {
+        if self.command != Command2::Commit {
+            return Err(ConsensusError::CommitInvalidCommand2);
+        }
+        if self.size != 256 {
+            return Err(ConsensusError::CommitInvalidSize(self.size));
+        }
+        Ok(())
+    }
+}
+
+// ---------------------------------------------------------------------------
+// StartViewChangeHeader - failure detection (header-only)
+// ---------------------------------------------------------------------------
+
+/// Replica suspects primary failure. Header-only.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, CheckedBitPattern, NoUninit)]
+#[repr(C)]
+pub struct StartViewChangeHeader {
+    pub checksum: u128,
+    pub checksum_body: u128,
+    pub cluster: u128,
+    pub size: u32,
+    pub view: u32,
+    pub release: u32,
+    pub command: Command2,
+    pub replica: u8,
+    pub reserved_frame: [u8; 66],
+
+    pub namespace: u64,
+    pub reserved: [u8; 120],
+}
+const _: () = {
+    assert!(size_of::<StartViewChangeHeader>() == HEADER_SIZE);
+    assert!(
+        offset_of!(StartViewChangeHeader, namespace)
+            == offset_of!(StartViewChangeHeader, reserved_frame) + 
size_of::<[u8; 66]>()
+    );
+    assert!(offset_of!(StartViewChangeHeader, reserved) + size_of::<[u8; 
120]>() == HEADER_SIZE);
+};
+
+impl ConsensusHeader for StartViewChangeHeader {
+    const COMMAND: Command2 = Command2::StartViewChange;
+    fn operation(&self) -> Operation {
+        Operation::Reserved
+    }
+    fn command(&self) -> Command2 {
+        self.command
+    }
+    fn size(&self) -> u32 {
+        self.size
+    }
+
+    fn validate(&self) -> Result<(), ConsensusError> {
+        if self.command != Command2::StartViewChange {
+            return Err(ConsensusError::InvalidCommand {
+                expected: Command2::StartViewChange,
+                found: self.command,
+            });
+        }
+        if self.release != 0 {
+            return Err(ConsensusError::InvalidField("release != 
0".to_string()));
+        }
+        Ok(())
+    }
+}
+
+// ---------------------------------------------------------------------------
+// DoViewChangeHeader - view change vote (header-only)
+// ---------------------------------------------------------------------------
+
+/// Replica -> primary candidate: vote for view change. Header-only.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, CheckedBitPattern, NoUninit)]
+#[repr(C)]
+pub struct DoViewChangeHeader {
+    pub checksum: u128,
+    pub checksum_body: u128,
+    pub cluster: u128,
+    pub size: u32,
+    pub view: u32,
+    pub release: u32,
+    pub command: Command2,
+    pub replica: u8,
+    pub reserved_frame: [u8; 66],
+
+    /// Highest op-number in this replica's log.
+    pub op: u64,
+    /// Highest committed op.
+    pub commit: u64,
+    pub namespace: u64,
+    /// View when status was last normal (key for log selection).
+    pub log_view: u32,
+    pub reserved: [u8; 100],
+}
+const _: () = {
+    assert!(size_of::<DoViewChangeHeader>() == HEADER_SIZE);
+    assert!(
+        offset_of!(DoViewChangeHeader, op)
+            == offset_of!(DoViewChangeHeader, reserved_frame) + size_of::<[u8; 
66]>()
+    );
+    assert!(offset_of!(DoViewChangeHeader, reserved) + size_of::<[u8; 100]>() 
== HEADER_SIZE);
+};
+
+impl ConsensusHeader for DoViewChangeHeader {
+    const COMMAND: Command2 = Command2::DoViewChange;
+    fn operation(&self) -> Operation {
+        Operation::Reserved
+    }
+    fn command(&self) -> Command2 {
+        self.command
+    }
+    fn size(&self) -> u32 {
+        self.size
+    }
+
+    fn validate(&self) -> Result<(), ConsensusError> {
+        if self.command != Command2::DoViewChange {
+            return Err(ConsensusError::InvalidCommand {
+                expected: Command2::DoViewChange,
+                found: self.command,
+            });
+        }
+        if self.release != 0 {
+            return Err(ConsensusError::InvalidField(
+                "release must be 0".to_string(),
+            ));
+        }
+        if self.log_view > self.view {
+            return Err(ConsensusError::InvalidField(
+                "log_view cannot exceed view".to_string(),
+            ));
+        }
+        if self.commit > self.op {
+            return Err(ConsensusError::InvalidField(
+                "commit cannot exceed op".to_string(),
+            ));
+        }
+        Ok(())
+    }
+}
+
+// ---------------------------------------------------------------------------
+// StartViewHeader - new view announcement (header-only)
+// ---------------------------------------------------------------------------
+
+/// New primary -> all replicas: start new view. Header-only.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, CheckedBitPattern, NoUninit)]
+#[repr(C)]
+pub struct StartViewHeader {
+    pub checksum: u128,
+    pub checksum_body: u128,
+    pub cluster: u128,
+    pub size: u32,
+    pub view: u32,
+    pub release: u32,
+    pub command: Command2,
+    pub replica: u8,
+    pub reserved_frame: [u8; 66],
+
+    /// Highest op in the new primary's log.
+    pub op: u64,
+    /// max(commit) from all DVCs.
+    pub commit: u64,
+    pub namespace: u64,
+    pub reserved: [u8; 104],
+}
+const _: () = {
+    assert!(size_of::<StartViewHeader>() == HEADER_SIZE);
+    assert!(
+        offset_of!(StartViewHeader, op)
+            == offset_of!(StartViewHeader, reserved_frame) + size_of::<[u8; 
66]>()
+    );
+    assert!(offset_of!(StartViewHeader, reserved) + size_of::<[u8; 104]>() == 
HEADER_SIZE);
+};
+
+impl ConsensusHeader for StartViewHeader {
+    const COMMAND: Command2 = Command2::StartView;
+    fn operation(&self) -> Operation {
+        Operation::Reserved
+    }
+    fn command(&self) -> Command2 {
+        self.command
+    }
+    fn size(&self) -> u32 {
+        self.size
+    }
+
+    fn validate(&self) -> Result<(), ConsensusError> {
+        if self.command != Command2::StartView {
+            return Err(ConsensusError::InvalidCommand {
+                expected: Command2::StartView,
+                found: self.command,
+            });
+        }
+        if self.release != 0 {
+            return Err(ConsensusError::InvalidField(
+                "release must be 0".to_string(),
+            ));
+        }
+        if self.commit > self.op {
+            return Err(ConsensusError::InvalidField(
+                "commit cannot exceed op".to_string(),
+            ));
+        }
+        Ok(())
+    }
+}
+
+// ---------------------------------------------------------------------------
+// Tests
+// ---------------------------------------------------------------------------
+
+#[cfg(test)]
+mod tests {
+    use super::{
+        Command2, CommitHeader, ConsensusHeader, DoViewChangeHeader, 
GenericHeader, PrepareHeader,
+        PrepareOkHeader, ReplyHeader, RequestHeader, StartViewChangeHeader, 
StartViewHeader,
+    };
+
+    fn aligned_zeroed(size: usize) -> bytes::BytesMut {
+        bytes::BytesMut::zeroed(size)
+    }
+
+    #[test]
+    fn all_headers_are_256_bytes() {
+        assert_eq!(size_of::<GenericHeader>(), 256);
+        assert_eq!(size_of::<RequestHeader>(), 256);
+        assert_eq!(size_of::<ReplyHeader>(), 256);
+        assert_eq!(size_of::<PrepareHeader>(), 256);
+        assert_eq!(size_of::<PrepareOkHeader>(), 256);
+        assert_eq!(size_of::<CommitHeader>(), 256);
+        assert_eq!(size_of::<StartViewChangeHeader>(), 256);
+        assert_eq!(size_of::<DoViewChangeHeader>(), 256);
+        assert_eq!(size_of::<StartViewHeader>(), 256);
+    }
+
+    #[test]
+    fn generic_header_zero_copy() {
+        let buf = aligned_zeroed(256);
+        let header: &GenericHeader = 
bytemuck::checked::try_from_bytes(&buf).unwrap();
+        assert_eq!(header.command, Command2::Reserved);
+        assert_eq!(header.size, 0);
+    }
+
+    #[test]
+    fn request_header_zero_copy() {
+        let mut buf = aligned_zeroed(256);
+        buf[60] = Command2::Request as u8;
+        let header: &RequestHeader = 
bytemuck::checked::try_from_bytes(&buf).unwrap();
+        assert_eq!(header.command, Command2::Request);
+        assert!(header.validate().is_ok());
+    }
+
+    #[test]
+    fn request_header_wrong_command_fails_validation() {
+        let buf = aligned_zeroed(256);
+        let header: &RequestHeader = 
bytemuck::checked::try_from_bytes(&buf).unwrap();
+        assert!(header.validate().is_err());
+    }
+
+    #[test]
+    fn reply_header_zero_copy() {
+        let mut buf = aligned_zeroed(256);
+        buf[60] = Command2::Reply as u8;
+        let header: &ReplyHeader = 
bytemuck::checked::try_from_bytes(&buf).unwrap();
+        assert_eq!(header.command, Command2::Reply);
+        assert!(header.validate().is_ok());
+    }
+}
diff --git a/core/binary_protocol/src/consensus/message.rs 
b/core/binary_protocol/src/consensus/message.rs
new file mode 100644
index 000000000..c5785b5f0
--- /dev/null
+++ b/core/binary_protocol/src/consensus/message.rs
@@ -0,0 +1,331 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Zero-copy consensus message wrapper.
+//!
+//! `Message<H>` wraps a `Bytes` buffer and provides typed access to the
+//! header via `bytemuck` pointer cast (zero allocation, zero copy).
+
+use crate::consensus::{Command2, ConsensusError, ConsensusHeader, 
GenericHeader};
+use bytes::Bytes;
+use std::marker::PhantomData;
+
+/// Zero-copy message wrapping a `Bytes` buffer with a typed header.
+///
+/// The header is accessed via `bytemuck::try_from_bytes` - a pointer cast,
+/// not a deserialization step. The body is the slice after the header,
+/// sized according to `header.size()`.
+#[repr(C)]
+#[derive(Debug, Clone)]
+pub struct Message<H: ConsensusHeader> {
+    buffer: Bytes,
+    _marker: PhantomData<H>,
+}
+
+impl<H: ConsensusHeader> Message<H> {
+    /// Create a message from a buffer, validating the header.
+    ///
+    /// # Errors
+    /// Returns `ConsensusError` if the buffer is too small or the header is 
invalid.
+    pub fn from_bytes(buffer: Bytes) -> Result<Self, ConsensusError> {
+        if buffer.len() < size_of::<H>() {
+            return Err(ConsensusError::InvalidCommand {
+                expected: H::COMMAND,
+                found: Command2::Reserved,
+            });
+        }
+
+        // TODO: bytemuck::checked::try_from_bytes requires the buffer to be 
aligned
+        // to the target type's alignment. Consensus headers contain u128 
fields (16-byte
+        // alignment), but Bytes from Vec<u8> only guarantees 8-byte 
alignment. The checked
+        // variant returns Err on misalignment (no UB), but production code 
can fail on
+        // misaligned buffers. The original iggy_common code has the same 
limitation.
+        // Fix by either using pod_read_unaligned (copies header) or ensuring 
aligned
+        // allocation at the network receive layer.
+        let header_bytes = &buffer[..size_of::<H>()];
+        let header = bytemuck::checked::try_from_bytes::<H>(header_bytes)
+            .map_err(|_| ConsensusError::InvalidBitPattern)?;
+
+        header.validate()?;
+
+        let header_size = header.size() as usize;
+        if buffer.len() < header_size {
+            return Err(ConsensusError::InvalidCommand {
+                expected: H::COMMAND,
+                found: Command2::Reserved,
+            });
+        }
+
+        Ok(Self {
+            buffer,
+            _marker: PhantomData,
+        })
+    }
+
+    /// Create a zero-initialized message of the given size.
+    /// The caller must initialize the header fields.
+    ///
+    /// # Panics
+    /// Panics if `size` is smaller than the header size.
+    #[must_use]
+    pub fn new(size: usize) -> Self {
+        assert!(
+            size >= size_of::<H>(),
+            "size must be at least header size ({})",
+            size_of::<H>()
+        );
+        Self {
+            buffer: Bytes::from(vec![0u8; size]),
+            _marker: PhantomData,
+        }
+    }
+
+    /// Access the typed header (zero-copy pointer cast).
+    #[must_use]
+    #[allow(clippy::missing_panics_doc)]
+    #[inline]
+    pub fn header(&self) -> &H {
+        let header_bytes = &self.buffer[..size_of::<H>()];
+        bytemuck::checked::try_from_bytes(header_bytes)
+            .expect("header validated at construction time")
+    }
+
+    /// Message body (everything after the header).
+    #[must_use]
+    #[inline]
+    pub fn body(&self) -> &[u8] {
+        let header_size = size_of::<H>();
+        let total_size = self.header().size() as usize;
+        if total_size > header_size {
+            &self.buffer[header_size..total_size]
+        } else {
+            &[]
+        }
+    }
+
+    /// Message body as zero-copy [`Bytes`].
+    #[must_use]
+    #[inline]
+    pub fn body_bytes(&self) -> Bytes {
+        let header_size = size_of::<H>();
+        let total_size = self.header().size() as usize;
+        if total_size > header_size {
+            self.buffer.slice(header_size..total_size)
+        } else {
+            Bytes::new()
+        }
+    }
+
+    /// Complete message bytes (header + body).
+    #[must_use]
+    #[inline]
+    pub fn as_bytes(&self) -> &[u8] {
+        let total_size = self.header().size() as usize;
+        &self.buffer[..total_size]
+    }
+
+    /// Consume into the underlying buffer.
+    #[must_use]
+    #[inline]
+    pub fn into_inner(self) -> Bytes {
+        self.buffer
+    }
+
+    /// Convert to a generic message (type-erase the header).
+    #[must_use]
+    pub fn into_generic(self) -> Message<GenericHeader> {
+        // SAFETY: Message<H> and Message<GenericHeader> have identical layout
+        // (only PhantomData differs). GenericHeader accepts any command.
+        unsafe { Message::from_buffer_unchecked(self.buffer) }
+    }
+
+    /// View as a generic message without consuming.
+    #[must_use]
+    #[allow(clippy::missing_const_for_fn)]
+    pub fn as_generic(&self) -> &Message<GenericHeader> {
+        // SAFETY: #[repr(C)] guarantees field layout. Message<H> and
+        // Message<GenericHeader> differ only in PhantomData (ZST).
+        unsafe { &*std::ptr::from_ref(self).cast::<Message<GenericHeader>>() }
+    }
+
+    /// Try to reinterpret as a different header type.
+    ///
+    /// # Errors
+    /// Returns `ConsensusError` if the header command doesn't match or 
validation fails.
+    pub fn try_into_typed<T: ConsensusHeader>(self) -> Result<Message<T>, 
ConsensusError> {
+        if self.buffer.len() < size_of::<T>() {
+            return Err(ConsensusError::InvalidCommand {
+                expected: T::COMMAND,
+                found: Command2::Reserved,
+            });
+        }
+
+        let generic = self.as_generic();
+        if generic.header().command != T::COMMAND {
+            return Err(ConsensusError::InvalidCommand {
+                expected: T::COMMAND,
+                found: generic.header().command,
+            });
+        }
+
+        let header_bytes = &self.buffer[..size_of::<T>()];
+        let header = bytemuck::checked::try_from_bytes::<T>(header_bytes)
+            .map_err(|_| ConsensusError::InvalidBitPattern)?;
+        header.validate()?;
+
+        Ok(unsafe { Message::<T>::from_buffer_unchecked(self.buffer) })
+    }
+
+    /// Transmute the header to a different type, preserving the buffer.
+    ///
+    /// The callback receives the old header (by value) and a mutable reference
+    /// to the new (zeroed) header, allowing field-by-field initialization.
+    ///
+    /// # Panics
+    /// Panics if `size_of::<H>() != size_of::<T>()`.
+    #[must_use]
+    pub fn transmute_header<T: ConsensusHeader>(self, f: impl FnOnce(H, &mut 
T)) -> Message<T> {
+        assert_eq!(size_of::<H>(), size_of::<T>());
+        let old_header = *self.header();
+        let buffer = self.into_inner();
+        // TODO: This mutates through Bytes via cast_mut(), which violates 
aliasing rules.
+        // The original iggy_common code has the same pattern. Once 
iggy_common is migrated
+        // to use iggy_binary_protocol types, fix both by using 
Bytes::try_into_mut() ->
+        // mutate BytesMut -> freeze(). Tracked as part of the consensus 
migration.
+        unsafe {
+            let ptr = buffer.as_ptr().cast_mut();
+            let slice = std::slice::from_raw_parts_mut(ptr, size_of::<T>());
+            slice.fill(0);
+            let new_header =
+                bytemuck::checked::try_from_bytes_mut(slice).expect("zeroed 
bytes are valid");
+            f(old_header, new_header);
+        }
+        Message {
+            buffer,
+            _marker: PhantomData,
+        }
+    }
+
+    /// Create without validation. Private.
+    ///
+    /// # Safety
+    /// Buffer must already be validated for the target header type.
+    #[inline]
+    const unsafe fn from_buffer_unchecked(buffer: Bytes) -> Self {
+        Self {
+            buffer,
+            _marker: PhantomData,
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::Message;
+    use crate::consensus::{Command2, GenericHeader, RequestHeader};
+    use bytes::{Bytes, BytesMut};
+
+    #[test]
+    fn generic_message_from_zeroed_buffer() {
+        let mut buf = BytesMut::zeroed(384);
+        let header =
+            bytemuck::checked::try_from_bytes_mut::<GenericHeader>(&mut 
buf[..256]).unwrap();
+        header.size = 384;
+        header.cluster = 42;
+
+        let msg = Message::<GenericHeader>::from_bytes(buf.freeze()).unwrap();
+        assert_eq!(msg.header().cluster, 42);
+        assert_eq!(msg.header().size, 384);
+        assert_eq!(msg.body().len(), 128);
+    }
+
+    #[test]
+    fn request_message_roundtrip() {
+        let mut buf = BytesMut::zeroed(320);
+        let header =
+            bytemuck::checked::try_from_bytes_mut::<RequestHeader>(&mut 
buf[..256]).unwrap();
+        header.size = 320;
+        header.command = Command2::Request;
+        header.client = 12345;
+        header.operation = crate::consensus::Operation::CreateStream;
+
+        let msg = Message::<RequestHeader>::from_bytes(buf.freeze()).unwrap();
+        assert_eq!(msg.header().client, 12345);
+        assert_eq!(
+            msg.header().operation,
+            crate::consensus::Operation::CreateStream
+        );
+        assert_eq!(msg.body().len(), 64);
+    }
+
+    #[test]
+    fn too_small_buffer_rejected() {
+        let buf = Bytes::from(vec![0u8; 100]); // < 256
+        assert!(Message::<GenericHeader>::from_bytes(buf).is_err());
+    }
+
+    #[test]
+    fn into_generic_and_back() {
+        let mut buf = BytesMut::zeroed(256);
+        let header =
+            bytemuck::checked::try_from_bytes_mut::<RequestHeader>(&mut 
buf[..256]).unwrap();
+        header.size = 256;
+        header.command = Command2::Request;
+
+        let msg = Message::<RequestHeader>::from_bytes(buf.freeze()).unwrap();
+        let generic = msg.into_generic();
+        assert_eq!(generic.header().command, Command2::Request);
+
+        let back = generic.try_into_typed::<RequestHeader>().unwrap();
+        assert_eq!(back.header().command, Command2::Request);
+    }
+
+    #[test]
+    fn transmute_header_generic_to_request() {
+        let mut buf = BytesMut::zeroed(320);
+        let header =
+            bytemuck::checked::try_from_bytes_mut::<GenericHeader>(&mut 
buf[..256]).unwrap();
+        header.size = 320;
+        header.cluster = 99;
+
+        let generic = 
Message::<GenericHeader>::from_bytes(buf.freeze()).unwrap();
+        let request = generic.transmute_header(|old, new: &mut RequestHeader| {
+            new.size = old.size;
+            new.command = Command2::Request;
+            new.cluster = old.cluster;
+            new.client = 42;
+            new.operation = crate::consensus::Operation::CreateStream;
+        });
+        assert_eq!(request.header().command, Command2::Request);
+        assert_eq!(request.header().client, 42);
+        assert_eq!(request.header().cluster, 99);
+        assert_eq!(request.header().size, 320);
+        assert_eq!(request.body().len(), 64);
+    }
+
+    #[test]
+    fn wrong_type_conversion_fails() {
+        let mut buf = BytesMut::zeroed(256);
+        let header =
+            bytemuck::checked::try_from_bytes_mut::<GenericHeader>(&mut 
buf[..256]).unwrap();
+        header.size = 256;
+        header.command = Command2::Reserved; // not Request
+
+        let msg = Message::<GenericHeader>::from_bytes(buf.freeze()).unwrap();
+        assert!(msg.try_into_typed::<RequestHeader>().is_err());
+    }
+}
diff --git a/core/binary_protocol/src/consensus/mod.rs 
b/core/binary_protocol/src/consensus/mod.rs
new file mode 100644
index 000000000..11bead1ef
--- /dev/null
+++ b/core/binary_protocol/src/consensus/mod.rs
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! VSR (Viewstamped Replication) consensus wire types.
+//!
+//! All headers are exactly 256 bytes with `#[repr(C)]` layout. Size is
+//! enforced at compile time. Deserialization is a pointer cast (zero-copy)
+//! via `bytemuck::try_from_bytes`.
+//!
+//! ## Client-facing
+//! - [`RequestHeader`] - client -> primary
+//! - [`ReplyHeader`] - primary -> client
+//! - [`GenericHeader`] - type-erased envelope for dispatch
+//!
+//! ## Replication (server-to-server)
+//! - [`PrepareHeader`] - primary -> replicas (replicate operation)
+//! - [`PrepareOkHeader`] - replica -> primary (acknowledge)
+//! - [`CommitHeader`] - primary -> replicas (commit, header-only)
+//!
+//! ## View change (server-to-server)
+//! - [`StartViewChangeHeader`] - replica suspects primary failure 
(header-only)
+//! - [`DoViewChangeHeader`] - replica -> primary candidate (header-only)
+//! - [`StartViewHeader`] - new primary -> all replicas (header-only)
+//!
+//! ## Message wrapper
+//! - [`message::Message`] - zero-copy `Bytes` wrapper with typed header access
+
+mod command;
+mod error;
+mod header;
+pub mod message;
+mod operation;
+
+pub use command::Command2;
+pub use error::ConsensusError;
+pub use header::{
+    CommitHeader, ConsensusHeader, DoViewChangeHeader, GenericHeader, 
HEADER_SIZE, PrepareHeader,
+    PrepareOkHeader, ReplyHeader, RequestHeader, StartViewChangeHeader, 
StartViewHeader,
+};
+pub use operation::Operation;
diff --git a/core/binary_protocol/src/consensus/operation.rs 
b/core/binary_protocol/src/consensus/operation.rs
new file mode 100644
index 000000000..018ea27a2
--- /dev/null
+++ b/core/binary_protocol/src/consensus/operation.rs
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytemuck::{CheckedBitPattern, NoUninit};
+
+/// Replicated operation discriminant. Identifies the state machine operation
+/// carried in a consensus message body.
+#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, NoUninit, 
CheckedBitPattern)]
+#[repr(u8)]
+pub enum Operation {
+    #[default]
+    Reserved = 0,
+
+    // Metadata operations (shard 0)
+    CreateStream = 128,
+    UpdateStream = 129,
+    DeleteStream = 130,
+    PurgeStream = 131,
+    CreateTopic = 132,
+    UpdateTopic = 133,
+    DeleteTopic = 134,
+    PurgeTopic = 135,
+    CreatePartitions = 136,
+    DeletePartitions = 137,
+    DeleteSegments = 138,
+    CreateConsumerGroup = 139,
+    DeleteConsumerGroup = 140,
+    CreateUser = 141,
+    UpdateUser = 142,
+    DeleteUser = 143,
+    ChangePassword = 144,
+    UpdatePermissions = 145,
+    CreatePersonalAccessToken = 146,
+    DeletePersonalAccessToken = 147,
+
+    // Partition operations (routed by namespace)
+    SendMessages = 160,
+    StoreConsumerOffset = 161,
+}
+
+impl Operation {
+    /// Metadata / control-plane operations handled by shard 0.
+    #[must_use]
+    #[inline]
+    pub const fn is_metadata(&self) -> bool {
+        matches!(
+            self,
+            Self::CreateStream
+                | Self::UpdateStream
+                | Self::DeleteStream
+                | Self::PurgeStream
+                | Self::CreateTopic
+                | Self::UpdateTopic
+                | Self::DeleteTopic
+                | Self::PurgeTopic
+                | Self::CreatePartitions
+                | Self::DeletePartitions
+                | Self::CreateConsumerGroup
+                | Self::DeleteConsumerGroup
+                | Self::CreateUser
+                | Self::UpdateUser
+                | Self::DeleteUser
+                | Self::ChangePassword
+                | Self::UpdatePermissions
+                | Self::CreatePersonalAccessToken
+                | Self::DeletePersonalAccessToken
+        )
+    }
+
+    /// Data-plane operations routed to the shard owning the partition.
+    #[must_use]
+    #[inline]
+    pub const fn is_partition(&self) -> bool {
+        matches!(
+            self,
+            Self::SendMessages | Self::StoreConsumerOffset | 
Self::DeleteSegments
+        )
+    }
+
+    /// Bidirectional mapping: `Operation` -> client command code.
+    #[must_use]
+    pub const fn to_command_code(&self) -> Option<u32> {
+        use crate::codes;
+        match self {
+            Self::Reserved => None,
+            Self::CreateStream => Some(codes::CREATE_STREAM_CODE),
+            Self::UpdateStream => Some(codes::UPDATE_STREAM_CODE),
+            Self::DeleteStream => Some(codes::DELETE_STREAM_CODE),
+            Self::PurgeStream => Some(codes::PURGE_STREAM_CODE),
+            Self::CreateTopic => Some(codes::CREATE_TOPIC_CODE),
+            Self::UpdateTopic => Some(codes::UPDATE_TOPIC_CODE),
+            Self::DeleteTopic => Some(codes::DELETE_TOPIC_CODE),
+            Self::PurgeTopic => Some(codes::PURGE_TOPIC_CODE),
+            Self::CreatePartitions => Some(codes::CREATE_PARTITIONS_CODE),
+            Self::DeletePartitions => Some(codes::DELETE_PARTITIONS_CODE),
+            Self::DeleteSegments => Some(codes::DELETE_SEGMENTS_CODE),
+            Self::CreateConsumerGroup => 
Some(codes::CREATE_CONSUMER_GROUP_CODE),
+            Self::DeleteConsumerGroup => 
Some(codes::DELETE_CONSUMER_GROUP_CODE),
+            Self::CreateUser => Some(codes::CREATE_USER_CODE),
+            Self::UpdateUser => Some(codes::UPDATE_USER_CODE),
+            Self::DeleteUser => Some(codes::DELETE_USER_CODE),
+            Self::ChangePassword => Some(codes::CHANGE_PASSWORD_CODE),
+            Self::UpdatePermissions => Some(codes::UPDATE_PERMISSIONS_CODE),
+            Self::CreatePersonalAccessToken => 
Some(codes::CREATE_PERSONAL_ACCESS_TOKEN_CODE),
+            Self::DeletePersonalAccessToken => 
Some(codes::DELETE_PERSONAL_ACCESS_TOKEN_CODE),
+            Self::SendMessages => Some(codes::SEND_MESSAGES_CODE),
+            Self::StoreConsumerOffset => 
Some(codes::STORE_CONSUMER_OFFSET_CODE),
+        }
+    }
+
+    /// Bidirectional mapping: client command code -> `Operation`.
+    #[must_use]
+    pub const fn from_command_code(code: u32) -> Option<Self> {
+        use crate::codes;
+        match code {
+            codes::CREATE_STREAM_CODE => Some(Self::CreateStream),
+            codes::UPDATE_STREAM_CODE => Some(Self::UpdateStream),
+            codes::DELETE_STREAM_CODE => Some(Self::DeleteStream),
+            codes::PURGE_STREAM_CODE => Some(Self::PurgeStream),
+            codes::CREATE_TOPIC_CODE => Some(Self::CreateTopic),
+            codes::UPDATE_TOPIC_CODE => Some(Self::UpdateTopic),
+            codes::DELETE_TOPIC_CODE => Some(Self::DeleteTopic),
+            codes::PURGE_TOPIC_CODE => Some(Self::PurgeTopic),
+            codes::CREATE_PARTITIONS_CODE => Some(Self::CreatePartitions),
+            codes::DELETE_PARTITIONS_CODE => Some(Self::DeletePartitions),
+            codes::DELETE_SEGMENTS_CODE => Some(Self::DeleteSegments),
+            codes::CREATE_CONSUMER_GROUP_CODE => 
Some(Self::CreateConsumerGroup),
+            codes::DELETE_CONSUMER_GROUP_CODE => 
Some(Self::DeleteConsumerGroup),
+            codes::CREATE_USER_CODE => Some(Self::CreateUser),
+            codes::UPDATE_USER_CODE => Some(Self::UpdateUser),
+            codes::DELETE_USER_CODE => Some(Self::DeleteUser),
+            codes::CHANGE_PASSWORD_CODE => Some(Self::ChangePassword),
+            codes::UPDATE_PERMISSIONS_CODE => Some(Self::UpdatePermissions),
+            codes::CREATE_PERSONAL_ACCESS_TOKEN_CODE => 
Some(Self::CreatePersonalAccessToken),
+            codes::DELETE_PERSONAL_ACCESS_TOKEN_CODE => 
Some(Self::DeletePersonalAccessToken),
+            codes::SEND_MESSAGES_CODE => Some(Self::SendMessages),
+            codes::STORE_CONSUMER_OFFSET_CODE => 
Some(Self::StoreConsumerOffset),
+            _ => None,
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::Operation;
+
+    #[test]
+    fn command_code_roundtrip() {
+        let ops = [
+            Operation::CreateStream,
+            Operation::UpdateStream,
+            Operation::DeleteStream,
+            Operation::PurgeStream,
+            Operation::CreateTopic,
+            Operation::UpdateTopic,
+            Operation::DeleteTopic,
+            Operation::PurgeTopic,
+            Operation::CreatePartitions,
+            Operation::DeletePartitions,
+            Operation::DeleteSegments,
+            Operation::CreateConsumerGroup,
+            Operation::DeleteConsumerGroup,
+            Operation::CreateUser,
+            Operation::UpdateUser,
+            Operation::DeleteUser,
+            Operation::ChangePassword,
+            Operation::UpdatePermissions,
+            Operation::CreatePersonalAccessToken,
+            Operation::DeletePersonalAccessToken,
+            Operation::SendMessages,
+            Operation::StoreConsumerOffset,
+        ];
+        for op in ops {
+            let code = op
+                .to_command_code()
+                .unwrap_or_else(|| panic!("Operation {op:?} has no command 
code mapping"));
+            let back = Operation::from_command_code(code)
+                .unwrap_or_else(|| panic!("command code {code} has no 
Operation mapping"));
+            assert_eq!(op, back, "roundtrip failed for {op:?} (code={code})");
+        }
+    }
+
+    #[test]
+    fn reserved_has_no_code() {
+        assert_eq!(Operation::Reserved.to_command_code(), None);
+    }
+
+    #[test]
+    fn read_only_commands_have_no_operation() {
+        assert!(Operation::from_command_code(crate::PING_CODE).is_none());
+        assert!(Operation::from_command_code(crate::GET_STATS_CODE).is_none());
+        
assert!(Operation::from_command_code(crate::GET_STREAM_CODE).is_none());
+        
assert!(Operation::from_command_code(crate::POLL_MESSAGES_CODE).is_none());
+    }
+
+    #[test]
+    fn metadata_vs_partition() {
+        assert!(Operation::CreateStream.is_metadata());
+        assert!(!Operation::CreateStream.is_partition());
+        assert!(Operation::SendMessages.is_partition());
+        assert!(!Operation::SendMessages.is_metadata());
+        assert!(Operation::DeleteSegments.is_partition());
+    }
+}
diff --git a/core/binary_protocol/src/error.rs 
b/core/binary_protocol/src/error.rs
new file mode 100644
index 000000000..f4f53ea56
--- /dev/null
+++ b/core/binary_protocol/src/error.rs
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// Protocol-local error type for wire format encode/decode failures.
+///
+/// Intentionally decoupled from `IggyError` to keep the protocol crate
+/// free of domain dependencies. Conversion to `IggyError` happens at
+/// the boundary (SDK / server).
+#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
+pub enum WireError {
+    #[error("unexpected end of buffer at offset {offset}: need {need} bytes, 
have {have}")]
+    UnexpectedEof {
+        offset: usize,
+        need: usize,
+        have: usize,
+    },
+
+    #[error("invalid utf-8 at offset {offset}")]
+    InvalidUtf8 { offset: usize },
+
+    #[error("unknown command code: {0}")]
+    UnknownCommand(u32),
+
+    #[error("unknown discriminant {value} for {type_name} at offset {offset}")]
+    UnknownDiscriminant {
+        type_name: &'static str,
+        value: u8,
+        offset: usize,
+    },
+
+    #[error("payload too large: {size} bytes, max {max}")]
+    PayloadTooLarge { size: usize, max: usize },
+
+    #[error("validation failed: {0}")]
+    Validation(String),
+}
diff --git a/core/binary_protocol/src/frame.rs 
b/core/binary_protocol/src/frame.rs
new file mode 100644
index 000000000..b17d0d3f9
--- /dev/null
+++ b/core/binary_protocol/src/frame.rs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// Request frame: `[length:4 LE][code:4 LE][payload:N]`
+/// `length` = size of code + payload = 4 + N
+pub const REQUEST_HEADER_SIZE: usize = 4;
+
+/// Response frame: `[status:4 LE][length:4 LE][payload:N]`
+pub const RESPONSE_HEADER_SIZE: usize = 8;
+
+/// Status code for a successful response.
+pub const STATUS_OK: u32 = 0;
diff --git a/core/binary_protocol/src/identifier.rs 
b/core/binary_protocol/src/identifier.rs
new file mode 100644
index 000000000..e2b356e0c
--- /dev/null
+++ b/core/binary_protocol/src/identifier.rs
@@ -0,0 +1,252 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::codec::{WireDecode, WireEncode, read_bytes, read_u8};
+use crate::wire_name::WireName;
+use bytes::{BufMut, BytesMut};
+
+const KIND_NUMERIC: u8 = 1;
+const KIND_STRING: u8 = 2;
+const NUMERIC_VALUE_LEN: u8 = 4;
+
+/// Protocol-owned identifier type. Polymorphic: numeric (u32) or string.
+///
+/// Wire format: `[kind:1][length:1][value:N]`
+/// - Numeric: kind=1, length=4, value=u32 LE
+/// - String:  kind=2, length=1..255, value=UTF-8 bytes
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum WireIdentifier {
+    Numeric(u32),
+    String(WireName),
+}
+
+impl WireIdentifier {
+    #[must_use]
+    pub const fn numeric(id: u32) -> Self {
+        Self::Numeric(id)
+    }
+
+    /// # Errors
+    /// Returns `WireError::Validation` if the name is empty or exceeds 255 
bytes.
+    pub fn named(name: impl Into<String>) -> Result<Self, WireError> {
+        let wire_name = WireName::new(name)?;
+        Ok(Self::String(wire_name))
+    }
+
+    #[must_use]
+    pub const fn as_u32(&self) -> Option<u32> {
+        match self {
+            Self::Numeric(id) => Some(*id),
+            Self::String(_) => None,
+        }
+    }
+
+    #[must_use]
+    pub fn as_str(&self) -> Option<&str> {
+        match self {
+            Self::Numeric(_) => None,
+            Self::String(s) => Some(s.as_str()),
+        }
+    }
+
+    const fn kind_code(&self) -> u8 {
+        match self {
+            Self::Numeric(_) => KIND_NUMERIC,
+            Self::String(_) => KIND_STRING,
+        }
+    }
+
+    #[allow(clippy::cast_possible_truncation)]
+    const fn value_len(&self) -> u8 {
+        match self {
+            Self::Numeric(_) => NUMERIC_VALUE_LEN,
+            // WireName guarantees len <= 255, truncation impossible.
+            Self::String(s) => s.len() as u8,
+        }
+    }
+}
+
+impl WireEncode for WireIdentifier {
+    fn encoded_size(&self) -> usize {
+        2 + self.value_len() as usize
+    }
+
+    fn encode(&self, buf: &mut BytesMut) {
+        buf.put_u8(self.kind_code());
+        buf.put_u8(self.value_len());
+        match self {
+            Self::Numeric(id) => buf.put_u32_le(*id),
+            Self::String(s) => buf.put_slice(s.as_bytes()),
+        }
+    }
+}
+
+impl WireDecode for WireIdentifier {
+    fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+        let kind = read_u8(buf, 0)?;
+        let length = read_u8(buf, 1)? as usize;
+        let value = read_bytes(buf, 2, length)?;
+        let consumed = 2 + length;
+
+        match kind {
+            KIND_NUMERIC => {
+                if length != NUMERIC_VALUE_LEN as usize {
+                    return Err(WireError::Validation(format!(
+                        "numeric identifier must be {NUMERIC_VALUE_LEN} bytes, 
got {length}"
+                    )));
+                }
+                let id = u32::from_le_bytes(
+                    value
+                        .try_into()
+                        .expect("length already validated as 4 bytes"),
+                );
+                Ok((Self::Numeric(id), consumed))
+            }
+            KIND_STRING => {
+                if length == 0 {
+                    return Err(WireError::Validation(
+                        "string identifier cannot be empty".to_string(),
+                    ));
+                }
+                let s =
+                    std::str::from_utf8(value).map_err(|_| 
WireError::InvalidUtf8 { offset: 2 })?;
+                let wire_name = WireName::new(s)?;
+                Ok((Self::String(wire_name), consumed))
+            }
+            _ => Err(WireError::UnknownDiscriminant {
+                type_name: "WireIdentifier",
+                value: kind,
+                offset: 0,
+            }),
+        }
+    }
+}
+
+impl std::fmt::Display for WireIdentifier {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Self::Numeric(id) => write!(f, "{id}"),
+            Self::String(s) => write!(f, "{s}"),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn roundtrip_numeric() {
+        let id = WireIdentifier::numeric(42);
+        let bytes = id.to_bytes();
+        assert_eq!(bytes.len(), 6); // 1 + 1 + 4
+        let (decoded, consumed) = WireIdentifier::decode(&bytes).unwrap();
+        assert_eq!(consumed, 6);
+        assert_eq!(decoded, id);
+        assert_eq!(decoded.as_u32(), Some(42));
+    }
+
+    #[test]
+    fn roundtrip_string() {
+        let id = WireIdentifier::named("my-stream").unwrap();
+        let bytes = id.to_bytes();
+        assert_eq!(bytes.len(), 2 + 9); // 1 + 1 + 9
+        let (decoded, consumed) = WireIdentifier::decode(&bytes).unwrap();
+        assert_eq!(consumed, 11);
+        assert_eq!(decoded, id);
+        assert_eq!(decoded.as_str(), Some("my-stream"));
+    }
+
+    #[test]
+    fn empty_string_rejected() {
+        assert!(WireIdentifier::named("").is_err());
+    }
+
+    #[test]
+    fn max_length_string_accepted() {
+        let name = "a".repeat(255);
+        let id = WireIdentifier::named(&name).unwrap();
+        let bytes = id.to_bytes();
+        let (decoded, _) = WireIdentifier::decode(&bytes).unwrap();
+        assert_eq!(decoded.as_str(), Some(name.as_str()));
+    }
+
+    #[test]
+    fn too_long_string_rejected() {
+        let name = "a".repeat(256);
+        assert!(WireIdentifier::named(&name).is_err());
+    }
+
+    #[test]
+    fn truncated_buffer_returns_error() {
+        let id = WireIdentifier::numeric(1);
+        let bytes = id.to_bytes();
+        for i in 0..bytes.len() {
+            assert!(
+                WireIdentifier::decode(&bytes[..i]).is_err(),
+                "expected error for truncation at byte {i}"
+            );
+        }
+    }
+
+    #[test]
+    fn unknown_kind_returns_error() {
+        let buf = [0xFF, 0x04, 0x01, 0x00, 0x00, 0x00];
+        assert!(WireIdentifier::decode(&buf).is_err());
+    }
+
+    #[test]
+    fn numeric_with_wrong_length_returns_error() {
+        let buf = [KIND_NUMERIC, 0x03, 0x01, 0x00, 0x00]; // length=3, should 
be 4
+        assert!(WireIdentifier::decode(&buf).is_err());
+    }
+
+    #[test]
+    fn string_with_zero_length_returns_error() {
+        let buf = [KIND_STRING, 0x00];
+        assert!(WireIdentifier::decode(&buf).is_err());
+    }
+
+    #[test]
+    fn display_numeric() {
+        let id = WireIdentifier::numeric(7);
+        assert_eq!(format!("{id}"), "7");
+    }
+
+    #[test]
+    fn display_string() {
+        let id = WireIdentifier::named("test").unwrap();
+        assert_eq!(format!("{id}"), "test");
+    }
+
+    #[test]
+    fn non_ascii_utf8_string_roundtrip() {
+        let id = WireIdentifier::named("str\u{00e9}am-\u{00fc}ser").unwrap();
+        let bytes = id.to_bytes();
+        let (decoded, _) = WireIdentifier::decode(&bytes).unwrap();
+        assert_eq!(decoded, id);
+    }
+
+    #[test]
+    fn invalid_utf8_bytes_rejected() {
+        // kind=STRING, length=2, then 2 bytes of invalid UTF-8
+        let buf = [KIND_STRING, 0x02, 0xFF, 0xFE];
+        let result = WireIdentifier::decode(&buf);
+        assert!(result.is_err());
+    }
+}
diff --git a/core/binary_protocol/src/lib.rs b/core/binary_protocol/src/lib.rs
index 31bd66e6e..874596247 100644
--- a/core/binary_protocol/src/lib.rs
+++ b/core/binary_protocol/src/lib.rs
@@ -1,17 +1,60 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Wire protocol types and codec for the Iggy binary protocol.
+//!
+//! This crate is the single source of truth for the binary wire format
+//! shared between the Iggy server and SDK. It is a pure codec - no I/O,
+//! no async, no runtime dependencies.
+//!
+//! # Design
+//!
+//! Protocol types are independent from domain types (`iggy_common`).
+//! Conversion between wire types and domain types happens at the boundary
+//! (SDK client impls, server handlers).
+//!
+//! # Wire frame format
+//!
+//! **Request** (client -> server):
+//! ```text
+//! [length:4 bytes, u32 LE][code:4 bytes, u32 LE][payload:N bytes]
+//! ```
+//!
+//! **Response** (server -> client):
+//! ```text
+//! [status:4 bytes, u32 LE][length:4 bytes, u32 LE][payload:N bytes]
+//! ```
+//!
+//! All multi-byte integers are little-endian. Strings are length-prefixed
+//! (u8 length for names, u32 length for longer strings).
+
+pub mod codec;
+pub mod codes;
+pub mod consensus;
+pub mod error;
+pub mod frame;
+pub mod identifier;
+pub mod requests;
+pub mod responses;
+pub mod wire_name;
+
+pub use codec::{WireDecode, WireEncode};
+pub use codes::*;
+pub use error::WireError;
+pub use frame::*;
+pub use identifier::WireIdentifier;
+pub use wire_name::{MAX_WIRE_NAME_LENGTH, WireName};
diff --git a/core/binary_protocol/src/requests/mod.rs 
b/core/binary_protocol/src/requests/mod.rs
new file mode 100644
index 000000000..e7b687f1f
--- /dev/null
+++ b/core/binary_protocol/src/requests/mod.rs
@@ -0,0 +1,18 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub mod streams;
diff --git a/core/binary_protocol/src/requests/streams/create_stream.rs 
b/core/binary_protocol/src/requests/streams/create_stream.rs
new file mode 100644
index 000000000..c6d339040
--- /dev/null
+++ b/core/binary_protocol/src/requests/streams/create_stream.rs
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::codec::{WireDecode, WireEncode};
+use crate::wire_name::WireName;
+use bytes::BytesMut;
+
+/// `CreateStream` request. Wire format: `[name_len:1][name:N]`
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct CreateStreamRequest {
+    pub name: WireName,
+}
+
+impl WireEncode for CreateStreamRequest {
+    fn encoded_size(&self) -> usize {
+        self.name.encoded_size()
+    }
+
+    fn encode(&self, buf: &mut BytesMut) {
+        self.name.encode(buf);
+    }
+}
+
+impl WireDecode for CreateStreamRequest {
+    fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+        let (name, consumed) = WireName::decode(buf)?;
+        Ok((Self { name }, consumed))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn roundtrip() {
+        let req = CreateStreamRequest {
+            name: WireName::new("test-stream").unwrap(),
+        };
+        let bytes = req.to_bytes();
+        assert_eq!(bytes.len(), 1 + 11);
+        let (decoded, consumed) = CreateStreamRequest::decode(&bytes).unwrap();
+        assert_eq!(consumed, bytes.len());
+        assert_eq!(decoded, req);
+    }
+
+    #[test]
+    fn empty_name_rejected() {
+        let buf = [0u8];
+        assert!(CreateStreamRequest::decode(&buf).is_err());
+    }
+
+    #[test]
+    fn truncated_returns_error() {
+        let req = CreateStreamRequest {
+            name: WireName::new("test").unwrap(),
+        };
+        let bytes = req.to_bytes();
+        for i in 0..bytes.len() {
+            assert!(
+                CreateStreamRequest::decode(&bytes[..i]).is_err(),
+                "expected error for truncation at byte {i}"
+            );
+        }
+    }
+
+    #[test]
+    fn wire_compat_byte_layout() {
+        let req = CreateStreamRequest {
+            name: WireName::new("test").unwrap(),
+        };
+        let bytes = req.to_bytes();
+        assert_eq!(&bytes[..], &[4, b't', b'e', b's', b't']);
+    }
+
+    #[test]
+    fn too_long_name_rejected_at_construction() {
+        let long = "a".repeat(256);
+        assert!(WireName::new(long).is_err());
+    }
+}
diff --git a/core/binary_protocol/src/requests/streams/delete_stream.rs 
b/core/binary_protocol/src/requests/streams/delete_stream.rs
new file mode 100644
index 000000000..b30bfa882
--- /dev/null
+++ b/core/binary_protocol/src/requests/streams/delete_stream.rs
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::WireIdentifier;
+use crate::codec::{WireDecode, WireEncode};
+use bytes::BytesMut;
+
+/// `DeleteStream` request. Wire format: `[identifier]`
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct DeleteStreamRequest {
+    pub stream_id: WireIdentifier,
+}
+
+impl WireEncode for DeleteStreamRequest {
+    fn encoded_size(&self) -> usize {
+        self.stream_id.encoded_size()
+    }
+
+    fn encode(&self, buf: &mut BytesMut) {
+        self.stream_id.encode(buf);
+    }
+}
+
+impl WireDecode for DeleteStreamRequest {
+    fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+        let (stream_id, consumed) = WireIdentifier::decode(buf)?;
+        Ok((Self { stream_id }, consumed))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn roundtrip() {
+        let req = DeleteStreamRequest {
+            stream_id: WireIdentifier::numeric(5),
+        };
+        let bytes = req.to_bytes();
+        let (decoded, consumed) = DeleteStreamRequest::decode(&bytes).unwrap();
+        assert_eq!(consumed, bytes.len());
+        assert_eq!(decoded, req);
+    }
+}
diff --git a/core/binary_protocol/src/requests/streams/get_stream.rs 
b/core/binary_protocol/src/requests/streams/get_stream.rs
new file mode 100644
index 000000000..e543667e4
--- /dev/null
+++ b/core/binary_protocol/src/requests/streams/get_stream.rs
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::WireIdentifier;
+use crate::codec::{WireDecode, WireEncode};
+use bytes::BytesMut;
+
+/// `GetStream` request. Wire format: `[identifier]`
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct GetStreamRequest {
+    pub stream_id: WireIdentifier,
+}
+
+impl WireEncode for GetStreamRequest {
+    fn encoded_size(&self) -> usize {
+        self.stream_id.encoded_size()
+    }
+
+    fn encode(&self, buf: &mut BytesMut) {
+        self.stream_id.encode(buf);
+    }
+}
+
+impl WireDecode for GetStreamRequest {
+    fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+        let (stream_id, consumed) = WireIdentifier::decode(buf)?;
+        Ok((Self { stream_id }, consumed))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn roundtrip_numeric() {
+        let req = GetStreamRequest {
+            stream_id: WireIdentifier::numeric(42),
+        };
+        let bytes = req.to_bytes();
+        let (decoded, consumed) = GetStreamRequest::decode(&bytes).unwrap();
+        assert_eq!(consumed, bytes.len());
+        assert_eq!(decoded, req);
+    }
+
+    #[test]
+    fn roundtrip_named() {
+        let req = GetStreamRequest {
+            stream_id: WireIdentifier::named("my-stream").unwrap(),
+        };
+        let bytes = req.to_bytes();
+        let (decoded, consumed) = GetStreamRequest::decode(&bytes).unwrap();
+        assert_eq!(consumed, bytes.len());
+        assert_eq!(decoded, req);
+    }
+
+    #[test]
+    fn wire_compat_numeric_byte_layout() {
+        let req = GetStreamRequest {
+            stream_id: WireIdentifier::numeric(1),
+        };
+        let bytes = req.to_bytes();
+        assert_eq!(&bytes[..], &[1, 4, 1, 0, 0, 0]);
+    }
+}
diff --git a/core/binary_protocol/src/requests/streams/get_streams.rs 
b/core/binary_protocol/src/requests/streams/get_streams.rs
new file mode 100644
index 000000000..897e89888
--- /dev/null
+++ b/core/binary_protocol/src/requests/streams/get_streams.rs
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::codec::{WireDecode, WireEncode};
+use bytes::BytesMut;
+
+/// `GetStreams` request. Wire format: empty.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct GetStreamsRequest;
+
+impl WireEncode for GetStreamsRequest {
+    fn encoded_size(&self) -> usize {
+        0
+    }
+
+    fn encode(&self, _buf: &mut BytesMut) {}
+}
+
+impl WireDecode for GetStreamsRequest {
+    fn decode(_buf: &[u8]) -> Result<(Self, usize), WireError> {
+        Ok((Self, 0))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn roundtrip() {
+        let req = GetStreamsRequest;
+        let bytes = req.to_bytes();
+        assert!(bytes.is_empty());
+        let (decoded, consumed) = GetStreamsRequest::decode(&bytes).unwrap();
+        assert_eq!(consumed, 0);
+        assert_eq!(decoded, req);
+    }
+}
diff --git a/core/binary_protocol/src/requests/streams/mod.rs 
b/core/binary_protocol/src/requests/streams/mod.rs
new file mode 100644
index 000000000..8307a1d46
--- /dev/null
+++ b/core/binary_protocol/src/requests/streams/mod.rs
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub mod create_stream;
+pub mod delete_stream;
+pub mod get_stream;
+pub mod get_streams;
+pub mod purge_stream;
+pub mod update_stream;
+
+pub use create_stream::CreateStreamRequest;
+pub use delete_stream::DeleteStreamRequest;
+pub use get_stream::GetStreamRequest;
+pub use get_streams::GetStreamsRequest;
+pub use purge_stream::PurgeStreamRequest;
+pub use update_stream::UpdateStreamRequest;
diff --git a/core/binary_protocol/src/requests/streams/purge_stream.rs 
b/core/binary_protocol/src/requests/streams/purge_stream.rs
new file mode 100644
index 000000000..bd8028b3f
--- /dev/null
+++ b/core/binary_protocol/src/requests/streams/purge_stream.rs
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::WireIdentifier;
+use crate::codec::{WireDecode, WireEncode};
+use bytes::BytesMut;
+
+/// `PurgeStream` request. Wire format: `[identifier]`
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct PurgeStreamRequest {
+    pub stream_id: WireIdentifier,
+}
+
+impl WireEncode for PurgeStreamRequest {
+    fn encoded_size(&self) -> usize {
+        self.stream_id.encoded_size()
+    }
+
+    fn encode(&self, buf: &mut BytesMut) {
+        self.stream_id.encode(buf);
+    }
+}
+
+impl WireDecode for PurgeStreamRequest {
+    fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+        let (stream_id, consumed) = WireIdentifier::decode(buf)?;
+        Ok((Self { stream_id }, consumed))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn roundtrip() {
+        let req = PurgeStreamRequest {
+            stream_id: WireIdentifier::numeric(1),
+        };
+        let bytes = req.to_bytes();
+        let (decoded, consumed) = PurgeStreamRequest::decode(&bytes).unwrap();
+        assert_eq!(consumed, bytes.len());
+        assert_eq!(decoded, req);
+    }
+}
diff --git a/core/binary_protocol/src/requests/streams/update_stream.rs 
b/core/binary_protocol/src/requests/streams/update_stream.rs
new file mode 100644
index 000000000..5592a2ed2
--- /dev/null
+++ b/core/binary_protocol/src/requests/streams/update_stream.rs
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::WireIdentifier;
+use crate::codec::{WireDecode, WireEncode};
+use crate::wire_name::WireName;
+use bytes::BytesMut;
+
+/// `UpdateStream` request. Wire format: `[identifier][name_len:1][name:N]`
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct UpdateStreamRequest {
+    pub stream_id: WireIdentifier,
+    pub name: WireName,
+}
+
+impl WireEncode for UpdateStreamRequest {
+    fn encoded_size(&self) -> usize {
+        self.stream_id.encoded_size() + self.name.encoded_size()
+    }
+
+    fn encode(&self, buf: &mut BytesMut) {
+        self.stream_id.encode(buf);
+        self.name.encode(buf);
+    }
+}
+
+impl WireDecode for UpdateStreamRequest {
+    fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+        let (stream_id, mut pos) = WireIdentifier::decode(buf)?;
+        let (name, consumed) = WireName::decode(&buf[pos..])?;
+        pos += consumed;
+        Ok((Self { stream_id, name }, pos))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn roundtrip() {
+        let req = UpdateStreamRequest {
+            stream_id: WireIdentifier::named("old-name").unwrap(),
+            name: WireName::new("new-name").unwrap(),
+        };
+        let bytes = req.to_bytes();
+        let (decoded, consumed) = UpdateStreamRequest::decode(&bytes).unwrap();
+        assert_eq!(consumed, bytes.len());
+        assert_eq!(decoded, req);
+    }
+
+    #[test]
+    fn truncated_returns_error() {
+        let req = UpdateStreamRequest {
+            stream_id: WireIdentifier::numeric(1),
+            name: WireName::new("test").unwrap(),
+        };
+        let bytes = req.to_bytes();
+        for i in 0..bytes.len() {
+            assert!(
+                UpdateStreamRequest::decode(&bytes[..i]).is_err(),
+                "expected error for truncation at byte {i}"
+            );
+        }
+    }
+}
diff --git a/core/binary_protocol/src/responses/mod.rs 
b/core/binary_protocol/src/responses/mod.rs
new file mode 100644
index 000000000..7b062a665
--- /dev/null
+++ b/core/binary_protocol/src/responses/mod.rs
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub mod streams;
+
+use crate::WireError;
+use crate::codec::{WireDecode, WireEncode};
+use bytes::BytesMut;
+
+/// Marker type for commands that return an empty response payload.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct EmptyResponse;
+
+impl WireEncode for EmptyResponse {
+    fn encoded_size(&self) -> usize {
+        0
+    }
+
+    fn encode(&self, _buf: &mut BytesMut) {}
+}
+
+impl WireDecode for EmptyResponse {
+    fn decode(_buf: &[u8]) -> Result<(Self, usize), WireError> {
+        Ok((Self, 0))
+    }
+}
diff --git a/core/binary_protocol/src/responses/streams/create_stream.rs 
b/core/binary_protocol/src/responses/streams/create_stream.rs
new file mode 100644
index 000000000..7b23fa9ce
--- /dev/null
+++ b/core/binary_protocol/src/responses/streams/create_stream.rs
@@ -0,0 +1,22 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// `CreateStream` response is a [`StreamResponse`](super::StreamResponse) 
(stream header).
+/// A newly created stream has zero topics, zero size, zero messages.
+///
+/// Re-exported as a type alias for clarity at call sites.
+pub type CreateStreamResponse = super::StreamResponse;
diff --git a/core/binary_protocol/src/responses/streams/delete_stream.rs 
b/core/binary_protocol/src/responses/streams/delete_stream.rs
new file mode 100644
index 000000000..35903ae87
--- /dev/null
+++ b/core/binary_protocol/src/responses/streams/delete_stream.rs
@@ -0,0 +1,19 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// `DeleteStream` response is empty.
+pub type DeleteStreamResponse = super::EmptyResponse;
diff --git a/core/binary_protocol/src/responses/streams/get_stream.rs 
b/core/binary_protocol/src/responses/streams/get_stream.rs
new file mode 100644
index 000000000..fc6aed76f
--- /dev/null
+++ b/core/binary_protocol/src/responses/streams/get_stream.rs
@@ -0,0 +1,232 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::codec::{WireDecode, WireEncode, read_u8, read_u32_le, read_u64_le};
+use crate::responses::streams::StreamResponse;
+use crate::wire_name::WireName;
+use bytes::{BufMut, BytesMut};
+
+/// Topic header within a `GetStream` response.
+///
+/// Wire format (51 + `name_len` bytes):
+/// ```text
+/// [id:4][created_at:8][partitions_count:4][message_expiry:8]
+/// [compression_algorithm:1][max_topic_size:8][replication_factor:1]
+/// [size_bytes:8][messages_count:8][name_len:1][name:N]
+/// ```
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct TopicHeader {
+    pub id: u32,
+    pub created_at: u64,
+    pub partitions_count: u32,
+    pub message_expiry: u64,
+    pub compression_algorithm: u8,
+    pub max_topic_size: u64,
+    pub replication_factor: u8,
+    pub size_bytes: u64,
+    pub messages_count: u64,
+    pub name: WireName,
+}
+
+impl TopicHeader {
+    const FIXED_SIZE: usize = 4 + 8 + 4 + 8 + 1 + 8 + 1 + 8 + 8 + 1; // 51
+}
+
+impl WireEncode for TopicHeader {
+    fn encoded_size(&self) -> usize {
+        Self::FIXED_SIZE + self.name.len()
+    }
+
+    fn encode(&self, buf: &mut BytesMut) {
+        buf.put_u32_le(self.id);
+        buf.put_u64_le(self.created_at);
+        buf.put_u32_le(self.partitions_count);
+        buf.put_u64_le(self.message_expiry);
+        buf.put_u8(self.compression_algorithm);
+        buf.put_u64_le(self.max_topic_size);
+        buf.put_u8(self.replication_factor);
+        buf.put_u64_le(self.size_bytes);
+        buf.put_u64_le(self.messages_count);
+        self.name.encode(buf);
+    }
+}
+
+impl WireDecode for TopicHeader {
+    fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+        let id = read_u32_le(buf, 0)?;
+        let created_at = read_u64_le(buf, 4)?;
+        let partitions_count = read_u32_le(buf, 12)?;
+        let message_expiry = read_u64_le(buf, 16)?;
+        let compression_algorithm = read_u8(buf, 24)?;
+        let max_topic_size = read_u64_le(buf, 25)?;
+        let replication_factor = read_u8(buf, 33)?;
+        let size_bytes = read_u64_le(buf, 34)?;
+        let messages_count = read_u64_le(buf, 42)?;
+        let (name, name_consumed) = WireName::decode(&buf[50..])?;
+        let consumed = 50 + name_consumed;
+
+        Ok((
+            Self {
+                id,
+                created_at,
+                partitions_count,
+                message_expiry,
+                compression_algorithm,
+                max_topic_size,
+                replication_factor,
+                size_bytes,
+                messages_count,
+                name,
+            },
+            consumed,
+        ))
+    }
+}
+
+/// `GetStream` response: stream header followed by topic headers.
+///
+/// Wire format:
+/// ```text
+/// [StreamResponse][TopicHeader]*
+/// ```
+///
+/// The number of topics is determined by `stream.topics_count` or by
+/// consuming remaining bytes (topics are packed sequentially).
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct GetStreamResponse {
+    pub stream: StreamResponse,
+    pub topics: Vec<TopicHeader>,
+}
+
+impl WireEncode for GetStreamResponse {
+    fn encoded_size(&self) -> usize {
+        self.stream.encoded_size()
+            + self
+                .topics
+                .iter()
+                .map(WireEncode::encoded_size)
+                .sum::<usize>()
+    }
+
+    fn encode(&self, buf: &mut BytesMut) {
+        self.stream.encode(buf);
+        for topic in &self.topics {
+            topic.encode(buf);
+        }
+    }
+}
+
+impl WireDecode for GetStreamResponse {
+    fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+        let (stream, mut pos) = StreamResponse::decode(buf)?;
+        let mut topics = Vec::new();
+        while pos < buf.len() {
+            let (topic, consumed) = TopicHeader::decode(&buf[pos..])?;
+            pos += consumed;
+            topics.push(topic);
+        }
+        if topics.len() != stream.topics_count as usize {
+            return Err(WireError::Validation(format!(
+                "stream.topics_count={} but decoded {} topics",
+                stream.topics_count,
+                topics.len()
+            )));
+        }
+        Ok((Self { stream, topics }, pos))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    fn sample_stream() -> StreamResponse {
+        StreamResponse {
+            id: 1,
+            created_at: 1_710_000_000_000,
+            topics_count: 2,
+            size_bytes: 2048,
+            messages_count: 200,
+            name: WireName::new("my-stream").unwrap(),
+        }
+    }
+
+    fn sample_topic(id: u32, name: &str) -> TopicHeader {
+        TopicHeader {
+            id,
+            created_at: 1_710_000_000_000,
+            partitions_count: 3,
+            message_expiry: 0,
+            compression_algorithm: 1,
+            max_topic_size: 0,
+            replication_factor: 1,
+            size_bytes: 1024,
+            messages_count: 100,
+            name: WireName::new(name).unwrap(),
+        }
+    }
+
+    #[test]
+    fn roundtrip_with_topics() {
+        let resp = GetStreamResponse {
+            stream: sample_stream(),
+            topics: vec![sample_topic(1, "topic-a"), sample_topic(2, 
"topic-b")],
+        };
+        let bytes = resp.to_bytes();
+        let (decoded, consumed) = GetStreamResponse::decode(&bytes).unwrap();
+        assert_eq!(consumed, bytes.len());
+        assert_eq!(decoded, resp);
+    }
+
+    #[test]
+    fn roundtrip_no_topics() {
+        let resp = GetStreamResponse {
+            stream: StreamResponse {
+                topics_count: 0,
+                ..sample_stream()
+            },
+            topics: vec![],
+        };
+        let bytes = resp.to_bytes();
+        let (decoded, consumed) = GetStreamResponse::decode(&bytes).unwrap();
+        assert_eq!(consumed, bytes.len());
+        assert_eq!(decoded, resp);
+    }
+
+    #[test]
+    fn topic_header_roundtrip() {
+        let topic = sample_topic(5, "events");
+        let bytes = topic.to_bytes();
+        assert_eq!(bytes.len(), TopicHeader::FIXED_SIZE + 6);
+        let (decoded, consumed) = TopicHeader::decode(&bytes).unwrap();
+        assert_eq!(consumed, bytes.len());
+        assert_eq!(decoded, topic);
+    }
+
+    #[test]
+    fn topic_header_truncated_returns_error() {
+        let topic = sample_topic(1, "t");
+        let bytes = topic.to_bytes();
+        for i in 0..bytes.len() {
+            assert!(
+                TopicHeader::decode(&bytes[..i]).is_err(),
+                "expected error for truncation at byte {i}"
+            );
+        }
+    }
+}
diff --git a/core/binary_protocol/src/responses/streams/get_streams.rs 
b/core/binary_protocol/src/responses/streams/get_streams.rs
new file mode 100644
index 000000000..3221c45e4
--- /dev/null
+++ b/core/binary_protocol/src/responses/streams/get_streams.rs
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::codec::{WireDecode, WireEncode};
+use crate::responses::streams::StreamResponse;
+use bytes::BytesMut;
+
+/// `GetStreams` response: sequential stream headers.
+///
+/// Wire format:
+/// ```text
+/// [StreamResponse]*
+/// ```
+///
+/// Empty payload means zero streams.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct GetStreamsResponse {
+    pub streams: Vec<StreamResponse>,
+}
+
+impl WireEncode for GetStreamsResponse {
+    fn encoded_size(&self) -> usize {
+        self.streams.iter().map(WireEncode::encoded_size).sum()
+    }
+
+    fn encode(&self, buf: &mut BytesMut) {
+        for stream in &self.streams {
+            stream.encode(buf);
+        }
+    }
+}
+
+impl WireDecode for GetStreamsResponse {
+    fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+        let mut streams = Vec::new();
+        let mut pos = 0;
+        while pos < buf.len() {
+            let (stream, consumed) = StreamResponse::decode(&buf[pos..])?;
+            pos += consumed;
+            streams.push(stream);
+        }
+        Ok((Self { streams }, pos))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::WireName;
+
+    #[test]
+    fn roundtrip_empty() {
+        let resp = GetStreamsResponse { streams: vec![] };
+        let bytes = resp.to_bytes();
+        assert!(bytes.is_empty());
+        let (decoded, consumed) = GetStreamsResponse::decode(&bytes).unwrap();
+        assert_eq!(consumed, 0);
+        assert_eq!(decoded, resp);
+    }
+
+    #[test]
+    fn roundtrip_multiple() {
+        let resp = GetStreamsResponse {
+            streams: vec![
+                StreamResponse {
+                    id: 1,
+                    created_at: 100,
+                    topics_count: 2,
+                    size_bytes: 512,
+                    messages_count: 50,
+                    name: WireName::new("s1").unwrap(),
+                },
+                StreamResponse {
+                    id: 2,
+                    created_at: 200,
+                    topics_count: 0,
+                    size_bytes: 0,
+                    messages_count: 0,
+                    name: WireName::new("stream-two").unwrap(),
+                },
+            ],
+        };
+        let bytes = resp.to_bytes();
+        let (decoded, consumed) = GetStreamsResponse::decode(&bytes).unwrap();
+        assert_eq!(consumed, bytes.len());
+        assert_eq!(decoded, resp);
+    }
+}
diff --git a/core/binary_protocol/src/responses/streams/mod.rs 
b/core/binary_protocol/src/responses/streams/mod.rs
new file mode 100644
index 000000000..e245a5648
--- /dev/null
+++ b/core/binary_protocol/src/responses/streams/mod.rs
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod create_stream;
+mod delete_stream;
+pub mod get_stream;
+pub mod get_streams;
+mod purge_stream;
+mod stream_response;
+mod update_stream;
+
+pub use super::EmptyResponse;
+pub use create_stream::CreateStreamResponse;
+pub use delete_stream::DeleteStreamResponse;
+pub use get_stream::{GetStreamResponse, TopicHeader};
+pub use get_streams::GetStreamsResponse;
+pub use purge_stream::PurgeStreamResponse;
+pub use stream_response::StreamResponse;
+pub use update_stream::UpdateStreamResponse;
diff --git a/core/binary_protocol/src/responses/streams/purge_stream.rs 
b/core/binary_protocol/src/responses/streams/purge_stream.rs
new file mode 100644
index 000000000..364e563eb
--- /dev/null
+++ b/core/binary_protocol/src/responses/streams/purge_stream.rs
@@ -0,0 +1,19 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// `PurgeStream` response is empty.
+pub type PurgeStreamResponse = super::EmptyResponse;
diff --git a/core/binary_protocol/src/responses/streams/stream_response.rs 
b/core/binary_protocol/src/responses/streams/stream_response.rs
new file mode 100644
index 000000000..d62d135e5
--- /dev/null
+++ b/core/binary_protocol/src/responses/streams/stream_response.rs
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::codec::{WireDecode, WireEncode, read_u32_le, read_u64_le};
+use crate::wire_name::WireName;
+use bytes::{BufMut, BytesMut};
+
+/// Stream header on the wire. Used in both single-stream and multi-stream 
responses.
+///
+/// Wire format (33 + `name_len` bytes):
+/// ```text
+/// 
[id:4][created_at:8][topics_count:4][size_bytes:8][messages_count:8][name_len:1][name:N]
+/// ```
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct StreamResponse {
+    pub id: u32,
+    pub created_at: u64,
+    pub topics_count: u32,
+    pub size_bytes: u64,
+    pub messages_count: u64,
+    pub name: WireName,
+}
+
+impl StreamResponse {
+    const FIXED_SIZE: usize = 4 + 8 + 4 + 8 + 8 + 1; // 33
+}
+
+impl WireEncode for StreamResponse {
+    fn encoded_size(&self) -> usize {
+        Self::FIXED_SIZE + self.name.len()
+    }
+
+    fn encode(&self, buf: &mut BytesMut) {
+        buf.put_u32_le(self.id);
+        buf.put_u64_le(self.created_at);
+        buf.put_u32_le(self.topics_count);
+        buf.put_u64_le(self.size_bytes);
+        buf.put_u64_le(self.messages_count);
+        self.name.encode(buf);
+    }
+}
+
+impl WireDecode for StreamResponse {
+    fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+        let id = read_u32_le(buf, 0)?;
+        let created_at = read_u64_le(buf, 4)?;
+        let topics_count = read_u32_le(buf, 12)?;
+        let size_bytes = read_u64_le(buf, 16)?;
+        let messages_count = read_u64_le(buf, 24)?;
+        let (name, name_consumed) = WireName::decode(&buf[32..])?;
+        let consumed = 32 + name_consumed;
+
+        Ok((
+            Self {
+                id,
+                created_at,
+                topics_count,
+                size_bytes,
+                messages_count,
+                name,
+            },
+            consumed,
+        ))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    fn sample() -> StreamResponse {
+        StreamResponse {
+            id: 1,
+            created_at: 1_710_000_000_000,
+            topics_count: 3,
+            size_bytes: 1024,
+            messages_count: 100,
+            name: WireName::new("test-stream").unwrap(),
+        }
+    }
+
+    #[test]
+    fn roundtrip() {
+        let resp = sample();
+        let bytes = resp.to_bytes();
+        assert_eq!(bytes.len(), StreamResponse::FIXED_SIZE + 11);
+        let (decoded, consumed) = StreamResponse::decode(&bytes).unwrap();
+        assert_eq!(consumed, bytes.len());
+        assert_eq!(decoded, resp);
+    }
+
+    #[test]
+    fn truncated_returns_error() {
+        let resp = sample();
+        let bytes = resp.to_bytes();
+        for i in 0..bytes.len() {
+            assert!(
+                StreamResponse::decode(&bytes[..i]).is_err(),
+                "expected error for truncation at byte {i}"
+            );
+        }
+    }
+
+    #[test]
+    fn multiple_streams_sequential() {
+        let s1 = StreamResponse {
+            id: 1,
+            name: WireName::new("a").unwrap(),
+            ..sample()
+        };
+        let s2 = StreamResponse {
+            id: 2,
+            name: WireName::new("bb").unwrap(),
+            ..sample()
+        };
+        let mut buf = BytesMut::new();
+        s1.encode(&mut buf);
+        s2.encode(&mut buf);
+        let bytes = buf.freeze();
+
+        let (d1, pos1) = StreamResponse::decode(&bytes).unwrap();
+        let (d2, pos2) = StreamResponse::decode(&bytes[pos1..]).unwrap();
+        assert_eq!(d1, s1);
+        assert_eq!(d2, s2);
+        assert_eq!(pos1 + pos2, bytes.len());
+    }
+}
diff --git a/core/binary_protocol/src/responses/streams/update_stream.rs 
b/core/binary_protocol/src/responses/streams/update_stream.rs
new file mode 100644
index 000000000..b002d9c86
--- /dev/null
+++ b/core/binary_protocol/src/responses/streams/update_stream.rs
@@ -0,0 +1,19 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// `UpdateStream` response is empty.
+pub type UpdateStreamResponse = super::EmptyResponse;
diff --git a/core/binary_protocol/src/wire_name.rs 
b/core/binary_protocol/src/wire_name.rs
new file mode 100644
index 000000000..fe56733e9
--- /dev/null
+++ b/core/binary_protocol/src/wire_name.rs
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::codec::{WireDecode, WireEncode, read_str, read_u8};
+use bytes::{BufMut, BytesMut};
+use std::ops::Deref;
+
+/// Maximum byte length for a wire name (fits in a u8 length prefix).
+pub const MAX_WIRE_NAME_LENGTH: usize = 255;
+
+/// Validated name type used by protocol request/response types.
+///
+/// Guarantees the inner string is 1-255 bytes, matching the u8
+/// length prefix used on the wire.
+#[derive(Clone, PartialEq, Eq)]
+pub struct WireName(String);
+
+impl WireName {
+    /// Create a new `WireName`, validating the length is 1-255 bytes.
+    ///
+    /// # Errors
+    /// Returns `WireError::Validation` if the name is empty or exceeds 255 
bytes.
+    pub fn new(s: impl Into<String>) -> Result<Self, WireError> {
+        let s = s.into();
+        if s.is_empty() || s.len() > MAX_WIRE_NAME_LENGTH {
+            return Err(WireError::Validation(format!(
+                "wire name must be 1-{MAX_WIRE_NAME_LENGTH} bytes, got {}",
+                s.len()
+            )));
+        }
+        Ok(Self(s))
+    }
+
+    #[must_use]
+    pub fn as_str(&self) -> &str {
+        &self.0
+    }
+
+    #[must_use]
+    pub const fn len(&self) -> usize {
+        self.0.len()
+    }
+
+    /// Always returns `false` - `WireName` is guaranteed non-empty by 
construction.
+    #[must_use]
+    pub const fn is_empty(&self) -> bool {
+        false
+    }
+}
+
+impl Deref for WireName {
+    type Target = str;
+
+    fn deref(&self) -> &str {
+        &self.0
+    }
+}
+
+impl std::fmt::Display for WireName {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.write_str(&self.0)
+    }
+}
+
+impl std::fmt::Debug for WireName {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "WireName({:?})", self.0)
+    }
+}
+
+impl WireEncode for WireName {
+    fn encoded_size(&self) -> usize {
+        1 + self.0.len()
+    }
+
+    fn encode(&self, buf: &mut BytesMut) {
+        // Length guaranteed <= 255 by construction, truncation impossible.
+        #[allow(clippy::cast_possible_truncation)]
+        buf.put_u8(self.0.len() as u8);
+        buf.put_slice(self.0.as_bytes());
+    }
+}
+
+impl WireDecode for WireName {
+    fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+        let name_len = read_u8(buf, 0)? as usize;
+        if name_len == 0 || name_len > MAX_WIRE_NAME_LENGTH {
+            return Err(WireError::Validation(format!(
+                "wire name must be 1-{MAX_WIRE_NAME_LENGTH} bytes, got 
{name_len}"
+            )));
+        }
+        let name = read_str(buf, 1, name_len)?;
+        Ok((Self(name), 1 + name_len))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn valid_name() {
+        let name = WireName::new("test").unwrap();
+        assert_eq!(name.as_str(), "test");
+        assert_eq!(name.len(), 4);
+        assert_eq!(&*name, "test");
+    }
+
+    #[test]
+    fn empty_rejected() {
+        assert!(WireName::new("").is_err());
+    }
+
+    #[test]
+    fn too_long_rejected() {
+        let long = "a".repeat(256);
+        assert!(WireName::new(long).is_err());
+    }
+
+    #[test]
+    fn max_length_accepted() {
+        let name = "a".repeat(255);
+        assert!(WireName::new(name).is_ok());
+    }
+
+    #[test]
+    fn roundtrip() {
+        let name = WireName::new("my-stream").unwrap();
+        let bytes = name.to_bytes();
+        assert_eq!(bytes.len(), 1 + 9);
+        let (decoded, consumed) = WireName::decode(&bytes).unwrap();
+        assert_eq!(consumed, 10);
+        assert_eq!(decoded, name);
+    }
+
+    #[test]
+    fn display() {
+        let name = WireName::new("hello").unwrap();
+        assert_eq!(format!("{name}"), "hello");
+    }
+
+    #[test]
+    fn deref_to_str() {
+        let name = WireName::new("test").unwrap();
+        assert!(name.starts_with("te"));
+    }
+
+    #[test]
+    fn non_ascii_utf8_roundtrip() {
+        let name = WireName::new("str\u{00e9}am-\u{00fc}ser").unwrap();
+        let bytes = name.to_bytes();
+        let (decoded, _) = WireName::decode(&bytes).unwrap();
+        assert_eq!(decoded, name);
+    }
+}

Reply via email to