This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch refactor-binary-4-sans-io
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit e2e309e4ccc1d00a5ac8159521d6e1d93c6df2ba
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Tue Mar 17 15:54:28 2026 +0100

    feat(rust): add sans-IO frame codec and command dispatch table
    
    The binary_protocol crate had wire types but no framing layer.
    Server and SDK hand-rolled frame parsing independently.
    
    Add RequestFrame/ResponseFrame sans-IO codec (framing.rs) with
    pre-reserved buffers, checked encoded_size() returning Option,
    and NonZeroU32 status in encode_error() for type-level safety.
    
    Add CommandMeta dispatch table (dispatch.rs) as single source
    of truth for all 47 command codes, names, and VSR operations.
    Operation::from_command_code/to_command_code now delegate to it.
    
    Replace frame.rs with framing.rs. Remove glob re-exports from
    crate root. Change WireError::Validation to Cow<'static, str>.
    
    Add remaining system responses: ClusterMetadataResponse (cluster
    name + nodes with transport endpoints), GetSnapshotResponse (raw
    ZIP bytes), GetMeResponse (alias for ClientDetailsResponse).
    Promote all consensus types to crate root re-exports. Add
    read_u16_le codec helper.
---
 core/binary_protocol/src/codec.rs                  |  26 ++
 core/binary_protocol/src/codes.rs                  |  54 +---
 core/binary_protocol/src/consensus/operation.rs    |  68 +---
 core/binary_protocol/src/dispatch.rs               | 350 +++++++++++++++++++++
 core/binary_protocol/src/error.rs                  |   4 +-
 core/binary_protocol/src/framing.rs                | 289 +++++++++++++++++
 core/binary_protocol/src/lib.rs                    |  28 +-
 core/binary_protocol/src/message_view.rs           |  11 +-
 core/binary_protocol/src/primitives/identifier.rs  |  19 +-
 .../binary_protocol/src/primitives/partitioning.rs |  25 +-
 .../src/requests/users/create_user.rs              |   5 +-
 .../src/requests/users/update_permissions.rs       |   5 +-
 .../src/responses/streams/get_stream.rs            |   5 +-
 .../src/responses/system/get_cluster_metadata.rs   | 230 ++++++++++++++
 .../src/{frame.rs => responses/system/get_me.rs}   |  20 +-
 .../src/responses/system/get_snapshot.rs           |  71 +++++
 core/binary_protocol/src/responses/system/mod.rs   |   6 +
 .../src/responses/topics/get_topic.rs              |   5 +-
 18 files changed, 1067 insertions(+), 154 deletions(-)

diff --git a/core/binary_protocol/src/codec.rs 
b/core/binary_protocol/src/codec.rs
index 1ef155847..04ad07506 100644
--- a/core/binary_protocol/src/codec.rs
+++ b/core/binary_protocol/src/codec.rs
@@ -75,6 +75,32 @@ pub fn read_u8(buf: &[u8], offset: usize) -> Result<u8, 
WireError> {
         })
 }
 
+/// Helper to read a `u16` LE from `buf` at `offset`.
+///
+/// # Errors
+/// Returns `WireError::UnexpectedEof` if fewer than 2 bytes remain.
+#[allow(clippy::missing_panics_doc)]
+#[inline]
+pub fn read_u16_le(buf: &[u8], offset: usize) -> Result<u16, WireError> {
+    let end = offset
+        .checked_add(2)
+        .ok_or_else(|| WireError::UnexpectedEof {
+            offset,
+            need: 2,
+            have: buf.len().saturating_sub(offset),
+        })?;
+    let slice = buf
+        .get(offset..end)
+        .ok_or_else(|| WireError::UnexpectedEof {
+            offset,
+            need: 2,
+            have: buf.len().saturating_sub(offset),
+        })?;
+    Ok(u16::from_le_bytes(
+        slice.try_into().expect("slice is exactly 2 bytes"),
+    ))
+}
+
 /// Helper to read a `u32` LE from `buf` at `offset`.
 ///
 /// # Errors
diff --git a/core/binary_protocol/src/codes.rs 
b/core/binary_protocol/src/codes.rs
index f20209172..a088b71fb 100644
--- a/core/binary_protocol/src/codes.rs
+++ b/core/binary_protocol/src/codes.rs
@@ -87,58 +87,14 @@ pub const DELETE_CONSUMER_GROUP_CODE: u32 = 603;
 pub const JOIN_CONSUMER_GROUP_CODE: u32 = 604;
 pub const LEAVE_CONSUMER_GROUP_CODE: u32 = 605;
 
+/// Lookup the human-readable name for a command code.
+///
 /// # Errors
 /// Returns `WireError::UnknownCommand` if the code is not recognized.
 pub const fn command_name(code: u32) -> Result<&'static str, WireError> {
-    match code {
-        PING_CODE => Ok("ping"),
-        GET_STATS_CODE => Ok("stats"),
-        GET_SNAPSHOT_FILE_CODE => Ok("snapshot"),
-        GET_CLUSTER_METADATA_CODE => Ok("cluster.metadata"),
-        GET_ME_CODE => Ok("me"),
-        GET_CLIENT_CODE => Ok("client.get"),
-        GET_CLIENTS_CODE => Ok("client.list"),
-        GET_USER_CODE => Ok("user.get"),
-        GET_USERS_CODE => Ok("user.list"),
-        CREATE_USER_CODE => Ok("user.create"),
-        DELETE_USER_CODE => Ok("user.delete"),
-        UPDATE_USER_CODE => Ok("user.update"),
-        UPDATE_PERMISSIONS_CODE => Ok("user.permissions"),
-        CHANGE_PASSWORD_CODE => Ok("user.password"),
-        LOGIN_USER_CODE => Ok("user.login"),
-        LOGOUT_USER_CODE => Ok("user.logout"),
-        GET_PERSONAL_ACCESS_TOKENS_CODE => Ok("personal_access_token.list"),
-        CREATE_PERSONAL_ACCESS_TOKEN_CODE => 
Ok("personal_access_token.create"),
-        DELETE_PERSONAL_ACCESS_TOKEN_CODE => 
Ok("personal_access_token.delete"),
-        LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE => 
Ok("personal_access_token.login"),
-        POLL_MESSAGES_CODE => Ok("message.poll"),
-        SEND_MESSAGES_CODE => Ok("message.send"),
-        FLUSH_UNSAVED_BUFFER_CODE => Ok("message.flush_unsaved_buffer"),
-        GET_CONSUMER_OFFSET_CODE => Ok("consumer_offset.get"),
-        STORE_CONSUMER_OFFSET_CODE => Ok("consumer_offset.store"),
-        DELETE_CONSUMER_OFFSET_CODE => Ok("consumer_offset.delete"),
-        GET_STREAM_CODE => Ok("stream.get"),
-        GET_STREAMS_CODE => Ok("stream.list"),
-        CREATE_STREAM_CODE => Ok("stream.create"),
-        DELETE_STREAM_CODE => Ok("stream.delete"),
-        UPDATE_STREAM_CODE => Ok("stream.update"),
-        PURGE_STREAM_CODE => Ok("stream.purge"),
-        GET_TOPIC_CODE => Ok("topic.get"),
-        GET_TOPICS_CODE => Ok("topic.list"),
-        CREATE_TOPIC_CODE => Ok("topic.create"),
-        DELETE_TOPIC_CODE => Ok("topic.delete"),
-        UPDATE_TOPIC_CODE => Ok("topic.update"),
-        PURGE_TOPIC_CODE => Ok("topic.purge"),
-        CREATE_PARTITIONS_CODE => Ok("partition.create"),
-        DELETE_PARTITIONS_CODE => Ok("partition.delete"),
-        DELETE_SEGMENTS_CODE => Ok("segment.delete"),
-        GET_CONSUMER_GROUP_CODE => Ok("consumer_group.get"),
-        GET_CONSUMER_GROUPS_CODE => Ok("consumer_group.list"),
-        CREATE_CONSUMER_GROUP_CODE => Ok("consumer_group.create"),
-        DELETE_CONSUMER_GROUP_CODE => Ok("consumer_group.delete"),
-        JOIN_CONSUMER_GROUP_CODE => Ok("consumer_group.join"),
-        LEAVE_CONSUMER_GROUP_CODE => Ok("consumer_group.leave"),
-        _ => Err(WireError::UnknownCommand(code)),
+    match crate::dispatch::lookup_command(code) {
+        Some(meta) => Ok(meta.name),
+        None => Err(WireError::UnknownCommand(code)),
     }
 }
 
diff --git a/core/binary_protocol/src/consensus/operation.rs 
b/core/binary_protocol/src/consensus/operation.rs
index 8bf2b5016..834e82003 100644
--- a/core/binary_protocol/src/consensus/operation.rs
+++ b/core/binary_protocol/src/consensus/operation.rs
@@ -95,64 +95,27 @@ impl Operation {
     }
 
     /// Bidirectional mapping: `Operation` -> client command code.
+    ///
+    /// Delegates to the dispatch table as the single source of truth.
     #[must_use]
     pub const fn to_command_code(&self) -> Option<u32> {
-        use crate::codes;
         match self {
             Self::Reserved => None,
-            Self::CreateStream => Some(codes::CREATE_STREAM_CODE),
-            Self::UpdateStream => Some(codes::UPDATE_STREAM_CODE),
-            Self::DeleteStream => Some(codes::DELETE_STREAM_CODE),
-            Self::PurgeStream => Some(codes::PURGE_STREAM_CODE),
-            Self::CreateTopic => Some(codes::CREATE_TOPIC_CODE),
-            Self::UpdateTopic => Some(codes::UPDATE_TOPIC_CODE),
-            Self::DeleteTopic => Some(codes::DELETE_TOPIC_CODE),
-            Self::PurgeTopic => Some(codes::PURGE_TOPIC_CODE),
-            Self::CreatePartitions => Some(codes::CREATE_PARTITIONS_CODE),
-            Self::DeletePartitions => Some(codes::DELETE_PARTITIONS_CODE),
-            Self::DeleteSegments => Some(codes::DELETE_SEGMENTS_CODE),
-            Self::CreateConsumerGroup => 
Some(codes::CREATE_CONSUMER_GROUP_CODE),
-            Self::DeleteConsumerGroup => 
Some(codes::DELETE_CONSUMER_GROUP_CODE),
-            Self::CreateUser => Some(codes::CREATE_USER_CODE),
-            Self::UpdateUser => Some(codes::UPDATE_USER_CODE),
-            Self::DeleteUser => Some(codes::DELETE_USER_CODE),
-            Self::ChangePassword => Some(codes::CHANGE_PASSWORD_CODE),
-            Self::UpdatePermissions => Some(codes::UPDATE_PERMISSIONS_CODE),
-            Self::CreatePersonalAccessToken => 
Some(codes::CREATE_PERSONAL_ACCESS_TOKEN_CODE),
-            Self::DeletePersonalAccessToken => 
Some(codes::DELETE_PERSONAL_ACCESS_TOKEN_CODE),
-            Self::SendMessages => Some(codes::SEND_MESSAGES_CODE),
-            Self::StoreConsumerOffset => 
Some(codes::STORE_CONSUMER_OFFSET_CODE),
+            _ => match crate::dispatch::lookup_by_operation(*self) {
+                Some(meta) => Some(meta.code),
+                None => None,
+            },
         }
     }
 
     /// Bidirectional mapping: client command code -> `Operation`.
+    ///
+    /// Delegates to the dispatch table as the single source of truth.
     #[must_use]
     pub const fn from_command_code(code: u32) -> Option<Self> {
-        use crate::codes;
-        match code {
-            codes::CREATE_STREAM_CODE => Some(Self::CreateStream),
-            codes::UPDATE_STREAM_CODE => Some(Self::UpdateStream),
-            codes::DELETE_STREAM_CODE => Some(Self::DeleteStream),
-            codes::PURGE_STREAM_CODE => Some(Self::PurgeStream),
-            codes::CREATE_TOPIC_CODE => Some(Self::CreateTopic),
-            codes::UPDATE_TOPIC_CODE => Some(Self::UpdateTopic),
-            codes::DELETE_TOPIC_CODE => Some(Self::DeleteTopic),
-            codes::PURGE_TOPIC_CODE => Some(Self::PurgeTopic),
-            codes::CREATE_PARTITIONS_CODE => Some(Self::CreatePartitions),
-            codes::DELETE_PARTITIONS_CODE => Some(Self::DeletePartitions),
-            codes::DELETE_SEGMENTS_CODE => Some(Self::DeleteSegments),
-            codes::CREATE_CONSUMER_GROUP_CODE => 
Some(Self::CreateConsumerGroup),
-            codes::DELETE_CONSUMER_GROUP_CODE => 
Some(Self::DeleteConsumerGroup),
-            codes::CREATE_USER_CODE => Some(Self::CreateUser),
-            codes::UPDATE_USER_CODE => Some(Self::UpdateUser),
-            codes::DELETE_USER_CODE => Some(Self::DeleteUser),
-            codes::CHANGE_PASSWORD_CODE => Some(Self::ChangePassword),
-            codes::UPDATE_PERMISSIONS_CODE => Some(Self::UpdatePermissions),
-            codes::CREATE_PERSONAL_ACCESS_TOKEN_CODE => 
Some(Self::CreatePersonalAccessToken),
-            codes::DELETE_PERSONAL_ACCESS_TOKEN_CODE => 
Some(Self::DeletePersonalAccessToken),
-            codes::SEND_MESSAGES_CODE => Some(Self::SendMessages),
-            codes::STORE_CONSUMER_OFFSET_CODE => 
Some(Self::StoreConsumerOffset),
-            _ => None,
+        match crate::dispatch::lookup_command(code) {
+            Some(meta) => meta.operation,
+            None => None,
         }
     }
 }
@@ -204,10 +167,11 @@ mod tests {
 
     #[test]
     fn read_only_commands_have_no_operation() {
-        assert!(Operation::from_command_code(crate::PING_CODE).is_none());
-        assert!(Operation::from_command_code(crate::GET_STATS_CODE).is_none());
-        
assert!(Operation::from_command_code(crate::GET_STREAM_CODE).is_none());
-        
assert!(Operation::from_command_code(crate::POLL_MESSAGES_CODE).is_none());
+        use crate::codes::{GET_STATS_CODE, GET_STREAM_CODE, PING_CODE, 
POLL_MESSAGES_CODE};
+        assert!(Operation::from_command_code(PING_CODE).is_none());
+        assert!(Operation::from_command_code(GET_STATS_CODE).is_none());
+        assert!(Operation::from_command_code(GET_STREAM_CODE).is_none());
+        assert!(Operation::from_command_code(POLL_MESSAGES_CODE).is_none());
     }
 
     #[test]
diff --git a/core/binary_protocol/src/dispatch.rs 
b/core/binary_protocol/src/dispatch.rs
new file mode 100644
index 000000000..ac755d61d
--- /dev/null
+++ b/core/binary_protocol/src/dispatch.rs
@@ -0,0 +1,350 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Command dispatch table mapping codes and operations to metadata.
+//!
+//! This is the protocol's identity registry. Every command has an entry
+//! with its numeric code, human-readable name, and optional VSR operation.
+//!
+//! Two lookup paths:
+//! - `lookup_command(code)`: current framing reads `[length][code][payload]`, 
looks up by code
+//! - `lookup_by_operation(op)`: future VSR framing reads 256-byte header, 
looks up by operation
+
+#[allow(clippy::wildcard_imports)]
+use crate::codes::*;
+use crate::consensus::Operation;
+
+/// Metadata for a single protocol command.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct CommandMeta {
+    pub code: u32,
+    pub name: &'static str,
+    /// VSR operation for replicated commands. `None` for non-replicated
+    /// commands that bypass consensus.
+    pub operation: Option<Operation>,
+}
+
+impl CommandMeta {
+    /// Returns `true` if this command is replicated through VSR consensus.
+    #[must_use]
+    pub const fn is_replicated(&self) -> bool {
+        self.operation.is_some()
+    }
+
+    const fn new(code: u32, name: &'static str, operation: Option<Operation>) 
-> Self {
+        Self {
+            code,
+            name,
+            operation,
+        }
+    }
+
+    const fn non_replicated(code: u32, name: &'static str) -> Self {
+        Self::new(code, name, None)
+    }
+
+    const fn replicated(code: u32, name: &'static str, op: Operation) -> Self {
+        Self::new(code, name, Some(op))
+    }
+}
+
+/// All known command metadata entries.
+pub const COMMAND_TABLE: &[CommandMeta] = &[
+    // System
+    CommandMeta::non_replicated(PING_CODE, "ping"),
+    CommandMeta::non_replicated(GET_STATS_CODE, "stats"),
+    CommandMeta::non_replicated(GET_SNAPSHOT_FILE_CODE, "snapshot"),
+    CommandMeta::non_replicated(GET_CLUSTER_METADATA_CODE, "cluster.metadata"),
+    CommandMeta::non_replicated(GET_ME_CODE, "me"),
+    CommandMeta::non_replicated(GET_CLIENT_CODE, "client.get"),
+    CommandMeta::non_replicated(GET_CLIENTS_CODE, "client.list"),
+    // Users
+    CommandMeta::non_replicated(GET_USER_CODE, "user.get"),
+    CommandMeta::non_replicated(GET_USERS_CODE, "user.list"),
+    CommandMeta::replicated(CREATE_USER_CODE, "user.create", 
Operation::CreateUser),
+    CommandMeta::replicated(DELETE_USER_CODE, "user.delete", 
Operation::DeleteUser),
+    CommandMeta::replicated(UPDATE_USER_CODE, "user.update", 
Operation::UpdateUser),
+    CommandMeta::replicated(
+        UPDATE_PERMISSIONS_CODE,
+        "user.permissions",
+        Operation::UpdatePermissions,
+    ),
+    CommandMeta::replicated(
+        CHANGE_PASSWORD_CODE,
+        "user.password",
+        Operation::ChangePassword,
+    ),
+    CommandMeta::non_replicated(LOGIN_USER_CODE, "user.login"),
+    CommandMeta::non_replicated(LOGOUT_USER_CODE, "user.logout"),
+    // Personal Access Tokens
+    CommandMeta::non_replicated(
+        GET_PERSONAL_ACCESS_TOKENS_CODE,
+        "personal_access_token.list",
+    ),
+    CommandMeta::replicated(
+        CREATE_PERSONAL_ACCESS_TOKEN_CODE,
+        "personal_access_token.create",
+        Operation::CreatePersonalAccessToken,
+    ),
+    CommandMeta::replicated(
+        DELETE_PERSONAL_ACCESS_TOKEN_CODE,
+        "personal_access_token.delete",
+        Operation::DeletePersonalAccessToken,
+    ),
+    CommandMeta::non_replicated(
+        LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE,
+        "personal_access_token.login",
+    ),
+    // Messages
+    CommandMeta::non_replicated(POLL_MESSAGES_CODE, "message.poll"),
+    CommandMeta::replicated(SEND_MESSAGES_CODE, "message.send", 
Operation::SendMessages),
+    CommandMeta::non_replicated(FLUSH_UNSAVED_BUFFER_CODE, 
"message.flush_unsaved_buffer"),
+    // Consumer Offsets
+    CommandMeta::non_replicated(GET_CONSUMER_OFFSET_CODE, 
"consumer_offset.get"),
+    CommandMeta::replicated(
+        STORE_CONSUMER_OFFSET_CODE,
+        "consumer_offset.store",
+        Operation::StoreConsumerOffset,
+    ),
+    CommandMeta::non_replicated(DELETE_CONSUMER_OFFSET_CODE, 
"consumer_offset.delete"),
+    // Streams
+    CommandMeta::non_replicated(GET_STREAM_CODE, "stream.get"),
+    CommandMeta::non_replicated(GET_STREAMS_CODE, "stream.list"),
+    CommandMeta::replicated(CREATE_STREAM_CODE, "stream.create", 
Operation::CreateStream),
+    CommandMeta::replicated(DELETE_STREAM_CODE, "stream.delete", 
Operation::DeleteStream),
+    CommandMeta::replicated(UPDATE_STREAM_CODE, "stream.update", 
Operation::UpdateStream),
+    CommandMeta::replicated(PURGE_STREAM_CODE, "stream.purge", 
Operation::PurgeStream),
+    // Topics
+    CommandMeta::non_replicated(GET_TOPIC_CODE, "topic.get"),
+    CommandMeta::non_replicated(GET_TOPICS_CODE, "topic.list"),
+    CommandMeta::replicated(CREATE_TOPIC_CODE, "topic.create", 
Operation::CreateTopic),
+    CommandMeta::replicated(DELETE_TOPIC_CODE, "topic.delete", 
Operation::DeleteTopic),
+    CommandMeta::replicated(UPDATE_TOPIC_CODE, "topic.update", 
Operation::UpdateTopic),
+    CommandMeta::replicated(PURGE_TOPIC_CODE, "topic.purge", 
Operation::PurgeTopic),
+    // Partitions
+    CommandMeta::replicated(
+        CREATE_PARTITIONS_CODE,
+        "partition.create",
+        Operation::CreatePartitions,
+    ),
+    CommandMeta::replicated(
+        DELETE_PARTITIONS_CODE,
+        "partition.delete",
+        Operation::DeletePartitions,
+    ),
+    // Segments
+    CommandMeta::replicated(
+        DELETE_SEGMENTS_CODE,
+        "segment.delete",
+        Operation::DeleteSegments,
+    ),
+    // Consumer Groups
+    CommandMeta::non_replicated(GET_CONSUMER_GROUP_CODE, "consumer_group.get"),
+    CommandMeta::non_replicated(GET_CONSUMER_GROUPS_CODE, 
"consumer_group.list"),
+    CommandMeta::replicated(
+        CREATE_CONSUMER_GROUP_CODE,
+        "consumer_group.create",
+        Operation::CreateConsumerGroup,
+    ),
+    CommandMeta::replicated(
+        DELETE_CONSUMER_GROUP_CODE,
+        "consumer_group.delete",
+        Operation::DeleteConsumerGroup,
+    ),
+    CommandMeta::non_replicated(JOIN_CONSUMER_GROUP_CODE, 
"consumer_group.join"),
+    CommandMeta::non_replicated(LEAVE_CONSUMER_GROUP_CODE, 
"consumer_group.leave"),
+];
+
+/// Lookup command metadata by command code.
+#[must_use]
+pub const fn lookup_command(code: u32) -> Option<&'static CommandMeta> {
+    let mut i = 0;
+    while i < COMMAND_TABLE.len() {
+        if COMMAND_TABLE[i].code == code {
+            return Some(&COMMAND_TABLE[i]);
+        }
+        i += 1;
+    }
+    None
+}
+
+/// Lookup command metadata by VSR operation.
+///
+/// Returns `None` for `Operation::Reserved` and for non-replicated commands
+/// that have no operation mapping.
+#[must_use]
+pub const fn lookup_by_operation(op: Operation) -> Option<&'static 
CommandMeta> {
+    let mut i = 0;
+    while i < COMMAND_TABLE.len() {
+        if let Some(table_op) = COMMAND_TABLE[i].operation
+            && table_op as u8 == op as u8
+        {
+            return Some(&COMMAND_TABLE[i]);
+        }
+        i += 1;
+    }
+    None
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn every_code_has_entry() {
+        let all_codes = [
+            PING_CODE,
+            GET_STATS_CODE,
+            GET_SNAPSHOT_FILE_CODE,
+            GET_CLUSTER_METADATA_CODE,
+            GET_ME_CODE,
+            GET_CLIENT_CODE,
+            GET_CLIENTS_CODE,
+            GET_USER_CODE,
+            GET_USERS_CODE,
+            CREATE_USER_CODE,
+            DELETE_USER_CODE,
+            UPDATE_USER_CODE,
+            UPDATE_PERMISSIONS_CODE,
+            CHANGE_PASSWORD_CODE,
+            LOGIN_USER_CODE,
+            LOGOUT_USER_CODE,
+            GET_PERSONAL_ACCESS_TOKENS_CODE,
+            CREATE_PERSONAL_ACCESS_TOKEN_CODE,
+            DELETE_PERSONAL_ACCESS_TOKEN_CODE,
+            LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE,
+            POLL_MESSAGES_CODE,
+            SEND_MESSAGES_CODE,
+            FLUSH_UNSAVED_BUFFER_CODE,
+            GET_CONSUMER_OFFSET_CODE,
+            STORE_CONSUMER_OFFSET_CODE,
+            DELETE_CONSUMER_OFFSET_CODE,
+            GET_STREAM_CODE,
+            GET_STREAMS_CODE,
+            CREATE_STREAM_CODE,
+            DELETE_STREAM_CODE,
+            UPDATE_STREAM_CODE,
+            PURGE_STREAM_CODE,
+            GET_TOPIC_CODE,
+            GET_TOPICS_CODE,
+            CREATE_TOPIC_CODE,
+            DELETE_TOPIC_CODE,
+            UPDATE_TOPIC_CODE,
+            PURGE_TOPIC_CODE,
+            CREATE_PARTITIONS_CODE,
+            DELETE_PARTITIONS_CODE,
+            DELETE_SEGMENTS_CODE,
+            GET_CONSUMER_GROUP_CODE,
+            GET_CONSUMER_GROUPS_CODE,
+            CREATE_CONSUMER_GROUP_CODE,
+            DELETE_CONSUMER_GROUP_CODE,
+            JOIN_CONSUMER_GROUP_CODE,
+            LEAVE_CONSUMER_GROUP_CODE,
+        ];
+        for code in all_codes {
+            assert!(
+                lookup_command(code).is_some(),
+                "missing dispatch entry for code {code}"
+            );
+        }
+    }
+
+    #[test]
+    fn no_duplicate_codes_in_table() {
+        let mut seen = std::collections::HashSet::new();
+        for entry in COMMAND_TABLE {
+            assert!(
+                seen.insert(entry.code),
+                "duplicate code {} ({}) in COMMAND_TABLE",
+                entry.code,
+                entry.name
+            );
+        }
+    }
+
+    #[test]
+    fn unknown_code_returns_none() {
+        assert!(lookup_command(9999).is_none());
+    }
+
+    #[test]
+    fn names_are_non_empty() {
+        for entry in COMMAND_TABLE {
+            assert!(!entry.name.is_empty(), "empty name for code {}", 
entry.code);
+        }
+    }
+
+    #[test]
+    fn lookup_by_operation_roundtrips_with_lookup_command() {
+        let replicated_ops = [
+            Operation::CreateStream,
+            Operation::UpdateStream,
+            Operation::DeleteStream,
+            Operation::PurgeStream,
+            Operation::CreateTopic,
+            Operation::UpdateTopic,
+            Operation::DeleteTopic,
+            Operation::PurgeTopic,
+            Operation::CreatePartitions,
+            Operation::DeletePartitions,
+            Operation::DeleteSegments,
+            Operation::CreateConsumerGroup,
+            Operation::DeleteConsumerGroup,
+            Operation::CreateUser,
+            Operation::UpdateUser,
+            Operation::DeleteUser,
+            Operation::ChangePassword,
+            Operation::UpdatePermissions,
+            Operation::CreatePersonalAccessToken,
+            Operation::DeletePersonalAccessToken,
+            Operation::SendMessages,
+            Operation::StoreConsumerOffset,
+        ];
+        for op in replicated_ops {
+            let meta = lookup_by_operation(op)
+                .unwrap_or_else(|| panic!("no dispatch entry for operation 
{op:?}"));
+
+            let by_code = lookup_command(meta.code)
+                .unwrap_or_else(|| panic!("no dispatch entry for code {}", 
meta.code));
+
+            assert_eq!(
+                meta.code, by_code.code,
+                "lookup_by_operation and lookup disagree for {op:?}"
+            );
+        }
+    }
+
+    #[test]
+    fn reserved_operation_returns_none() {
+        assert!(lookup_by_operation(Operation::Reserved).is_none());
+    }
+
+    #[test]
+    fn no_duplicate_operations_in_table() {
+        let mut seen = std::collections::HashSet::new();
+        for entry in COMMAND_TABLE {
+            if let Some(op) = entry.operation {
+                assert!(
+                    seen.insert(op as u8),
+                    "duplicate operation {:?} ({}) in COMMAND_TABLE",
+                    op,
+                    entry.name
+                );
+            }
+        }
+    }
+}
diff --git a/core/binary_protocol/src/error.rs 
b/core/binary_protocol/src/error.rs
index f4f53ea56..82d5cb2b5 100644
--- a/core/binary_protocol/src/error.rs
+++ b/core/binary_protocol/src/error.rs
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::borrow::Cow;
+
 /// Protocol-local error type for wire format encode/decode failures.
 ///
 /// Intentionally decoupled from `IggyError` to keep the protocol crate
@@ -46,5 +48,5 @@ pub enum WireError {
     PayloadTooLarge { size: usize, max: usize },
 
     #[error("validation failed: {0}")]
-    Validation(String),
+    Validation(Cow<'static, str>),
 }
diff --git a/core/binary_protocol/src/framing.rs 
b/core/binary_protocol/src/framing.rs
new file mode 100644
index 000000000..981ead667
--- /dev/null
+++ b/core/binary_protocol/src/framing.rs
@@ -0,0 +1,289 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Sans-IO frame codec for the Iggy binary protocol.
+//!
+//! Encodes and decodes complete request/response frames without any I/O.
+//! The transport layer (TCP, QUIC, WebSocket) reads bytes into a buffer,
+//! then hands the buffer to these types for zero-copy parsing.
+//!
+//! When VSR consensus replaces this framing, the transport layer will
+//! switch to `consensus::header::GenericHeader` (256-byte fixed header)
+//! while the command payload codec stays the same.
+
+use crate::codec::{read_bytes, read_u32_le};
+use crate::error::WireError;
+use bytes::{BufMut, BytesMut};
+use std::borrow::Cow;
+use std::num::NonZeroU32;
+
+/// Status code for a successful response.
+pub const STATUS_OK: u32 = 0;
+
+/// Decoded request frame. Borrows the payload from the input buffer.
+///
+/// Wire format: `[length:4 LE][code:4 LE][payload:N]`
+/// where `length` = 4 (code size) + N (payload size).
+#[derive(Debug)]
+pub struct RequestFrame<'a> {
+    pub code: u32,
+    pub payload: &'a [u8],
+}
+
+impl<'a> RequestFrame<'a> {
+    /// Size of the frame header: `[length:4][code:4]`.
+    pub const HEADER_SIZE: usize = 8;
+
+    /// Decode a request frame from a complete buffer.
+    ///
+    /// # Errors
+    /// Returns `WireError::UnexpectedEof` if the buffer is too short.
+    pub fn decode(buf: &'a [u8]) -> Result<(Self, usize), WireError> {
+        let length = read_u32_le(buf, 0)? as usize;
+        if length < 4 {
+            return Err(WireError::Validation(Cow::Borrowed(
+                "request frame length must be at least 4 (code size)",
+            )));
+        }
+        let code = read_u32_le(buf, 4)?;
+        let payload_len = length - 4;
+        let payload = read_bytes(buf, Self::HEADER_SIZE, payload_len)?;
+        let total = Self::HEADER_SIZE + payload_len;
+        Ok((Self { code, payload }, total))
+    }
+
+    /// Encode a request frame into `out`.
+    ///
+    /// Writes `[length:4 LE][code:4 LE][payload]` where length includes
+    /// the 4-byte code field.
+    ///
+    /// # Errors
+    /// Returns `WireError::PayloadTooLarge` if payload exceeds u32 capacity.
+    pub fn encode(code: u32, payload: &[u8], out: &mut BytesMut) -> Result<(), 
WireError> {
+        let length = payload
+            .len()
+            .checked_add(4)
+            .and_then(|n| u32::try_from(n).ok())
+            .ok_or(WireError::PayloadTooLarge {
+                size: payload.len(),
+                max: u32::MAX as usize - 4,
+            })?;
+        out.reserve(Self::HEADER_SIZE + payload.len());
+        out.put_u32_le(length);
+        out.put_u32_le(code);
+        out.put_slice(payload);
+        Ok(())
+    }
+
+    /// Total encoded size for a given payload length.
+    ///
+    /// Returns `None` if `HEADER_SIZE + payload_len` overflows `usize`.
+    #[must_use]
+    pub const fn encoded_size(payload_len: usize) -> Option<usize> {
+        Self::HEADER_SIZE.checked_add(payload_len)
+    }
+}
+
+/// Decoded response frame. Borrows the payload from the input buffer.
+///
+/// Wire format: `[status:4 LE][length:4 LE][payload:N]`
+/// where `status` = 0 for success, non-zero for error code.
+#[derive(Debug)]
+pub struct ResponseFrame<'a> {
+    pub status: u32,
+    pub payload: &'a [u8],
+}
+
+impl<'a> ResponseFrame<'a> {
+    /// Size of the frame header: `[status:4][length:4]`.
+    pub const HEADER_SIZE: usize = 8;
+
+    /// Decode a response frame from a complete buffer.
+    ///
+    /// # Errors
+    /// Returns `WireError::UnexpectedEof` if the buffer is too short.
+    pub fn decode(buf: &'a [u8]) -> Result<(Self, usize), WireError> {
+        let status = read_u32_le(buf, 0)?;
+        let length = read_u32_le(buf, 4)? as usize;
+        let payload = read_bytes(buf, Self::HEADER_SIZE, length)?;
+        let total = Self::HEADER_SIZE + length;
+        Ok((Self { status, payload }, total))
+    }
+
+    /// Encode a successful response with payload.
+    ///
+    /// # Errors
+    /// Returns `WireError::PayloadTooLarge` if payload exceeds u32 capacity.
+    pub fn encode_ok(payload: &[u8], out: &mut BytesMut) -> Result<(), 
WireError> {
+        let length = u32::try_from(payload.len()).map_err(|_| 
WireError::PayloadTooLarge {
+            size: payload.len(),
+            max: u32::MAX as usize,
+        })?;
+        out.reserve(Self::HEADER_SIZE + payload.len());
+        out.put_u32_le(STATUS_OK);
+        out.put_u32_le(length);
+        out.put_slice(payload);
+        Ok(())
+    }
+
+    /// Encode an error response (status code, empty payload).
+    pub fn encode_error(status: NonZeroU32, out: &mut BytesMut) {
+        out.reserve(Self::HEADER_SIZE);
+        out.put_u32_le(status.get());
+        out.put_u32_le(0);
+    }
+
+    /// Returns `true` if this is a success response.
+    #[must_use]
+    pub const fn is_ok(&self) -> bool {
+        self.status == STATUS_OK
+    }
+
+    /// Total encoded size for a given payload length.
+    ///
+    /// Returns `None` if `HEADER_SIZE + payload_len` overflows `usize`.
+    #[must_use]
+    pub const fn encoded_size(payload_len: usize) -> Option<usize> {
+        Self::HEADER_SIZE.checked_add(payload_len)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn request_roundtrip() {
+        let payload = b"hello world";
+        let mut buf = 
BytesMut::with_capacity(RequestFrame::encoded_size(payload.len()).unwrap());
+        RequestFrame::encode(42, payload, &mut buf).unwrap();
+
+        let (frame, consumed) = RequestFrame::decode(&buf).unwrap();
+        assert_eq!(consumed, buf.len());
+        assert_eq!(frame.code, 42);
+        assert_eq!(frame.payload, payload);
+    }
+
+    #[test]
+    fn request_empty_payload() {
+        let mut buf = BytesMut::with_capacity(RequestFrame::HEADER_SIZE);
+        RequestFrame::encode(1, &[], &mut buf).unwrap();
+
+        let (frame, consumed) = RequestFrame::decode(&buf).unwrap();
+        assert_eq!(consumed, 8);
+        assert_eq!(frame.code, 1);
+        assert!(frame.payload.is_empty());
+    }
+
+    #[test]
+    fn request_length_field_includes_code() {
+        let payload = b"test";
+        let mut buf = BytesMut::new();
+        RequestFrame::encode(99, payload, &mut buf).unwrap();
+
+        let length = u32::from_le_bytes(buf[0..4].try_into().unwrap());
+        assert_eq!(length, 4 + 4); // code(4) + payload(4)
+    }
+
+    #[test]
+    fn request_truncated_header() {
+        let buf = [0u8; 7]; // less than HEADER_SIZE
+        assert!(RequestFrame::decode(&buf).is_err());
+    }
+
+    #[test]
+    fn request_truncated_payload() {
+        let mut buf = BytesMut::new();
+        buf.put_u32_le(104); // length = 104 (code + 100 bytes payload)
+        buf.put_u32_le(1); // code
+        buf.put_slice(&[0u8; 50]); // only 50 of 100 bytes
+        assert!(RequestFrame::decode(&buf).is_err());
+    }
+
+    #[test]
+    fn request_length_too_small() {
+        let mut buf = BytesMut::new();
+        buf.put_u32_le(3); // length < 4 (must include code)
+        buf.put_u32_le(1);
+        assert!(RequestFrame::decode(&buf).is_err());
+    }
+
+    #[test]
+    fn request_encoded_size() {
+        assert_eq!(RequestFrame::encoded_size(0), Some(8));
+        assert_eq!(RequestFrame::encoded_size(100), Some(108));
+        assert_eq!(RequestFrame::encoded_size(usize::MAX), None);
+    }
+
+    #[test]
+    fn response_ok_roundtrip() {
+        let payload = b"response data";
+        let mut buf = 
BytesMut::with_capacity(ResponseFrame::encoded_size(payload.len()).unwrap());
+        ResponseFrame::encode_ok(payload, &mut buf).unwrap();
+
+        let (frame, consumed) = ResponseFrame::decode(&buf).unwrap();
+        assert_eq!(consumed, buf.len());
+        assert!(frame.is_ok());
+        assert_eq!(frame.status, 0);
+        assert_eq!(frame.payload, payload);
+    }
+
+    #[test]
+    fn response_ok_empty_payload() {
+        let mut buf = BytesMut::new();
+        ResponseFrame::encode_ok(&[], &mut buf).unwrap();
+
+        let (frame, consumed) = ResponseFrame::decode(&buf).unwrap();
+        assert_eq!(consumed, 8);
+        assert!(frame.is_ok());
+        assert!(frame.payload.is_empty());
+    }
+
+    #[test]
+    fn response_error_roundtrip() {
+        let mut buf = BytesMut::new();
+        ResponseFrame::encode_error(NonZeroU32::new(1001).unwrap(), &mut buf);
+
+        let (frame, consumed) = ResponseFrame::decode(&buf).unwrap();
+        assert_eq!(consumed, 8);
+        assert!(!frame.is_ok());
+        assert_eq!(frame.status, 1001);
+        assert!(frame.payload.is_empty());
+    }
+
+    #[test]
+    fn response_truncated_header() {
+        let buf = [0u8; 7];
+        assert!(ResponseFrame::decode(&buf).is_err());
+    }
+
+    #[test]
+    fn response_truncated_payload() {
+        let mut buf = BytesMut::new();
+        buf.put_u32_le(0); // status OK
+        buf.put_u32_le(100); // length = 100
+        buf.put_slice(&[0u8; 50]); // only 50 bytes
+        assert!(ResponseFrame::decode(&buf).is_err());
+    }
+
+    #[test]
+    fn response_encoded_size() {
+        assert_eq!(ResponseFrame::encoded_size(0), Some(8));
+        assert_eq!(ResponseFrame::encoded_size(256), Some(264));
+        assert_eq!(ResponseFrame::encoded_size(usize::MAX), None);
+    }
+}
diff --git a/core/binary_protocol/src/lib.rs b/core/binary_protocol/src/lib.rs
index 9610abf7e..7ed4fdad0 100644
--- a/core/binary_protocol/src/lib.rs
+++ b/core/binary_protocol/src/lib.rs
@@ -41,12 +41,25 @@
 //!
 //! All multi-byte integers are little-endian. Strings are length-prefixed
 //! (u8 length for names, u32 length for longer strings).
+//!
+//! # VSR consensus framing
+//!
+//! All consensus headers are 256 bytes with `#[repr(C)]` layout.
+//! Deserialization is zero-copy via `bytemuck`. The [`Message`] type
+//! wraps a `Bytes` buffer with typed header access.
+//!
+//! - Client-facing: [`RequestHeader`], [`ReplyHeader`]
+//! - Replication: [`PrepareHeader`], [`PrepareOkHeader`], [`CommitHeader`]
+//! - View change: [`StartViewChangeHeader`], [`DoViewChangeHeader`],
+//!   [`StartViewHeader`]
+//! - Dispatch: [`GenericHeader`] for type-erased initial parsing
 
 pub mod codec;
 pub mod codes;
 pub mod consensus;
+pub mod dispatch;
 pub mod error;
-pub mod frame;
+pub mod framing;
 pub mod message_layout;
 pub mod message_view;
 pub mod primitives;
@@ -54,10 +67,17 @@ pub mod requests;
 pub mod responses;
 
 pub use codec::{WireDecode, WireEncode};
-pub use codes::*;
+pub use consensus::{
+    Command2, CommitHeader, ConsensusError, ConsensusHeader, 
DoViewChangeHeader, GenericHeader,
+    HEADER_SIZE, Operation, PrepareHeader, PrepareOkHeader, ReplyHeader, 
RequestHeader,
+    StartViewChangeHeader, StartViewHeader, message::Message,
+};
+pub use dispatch::{COMMAND_TABLE, CommandMeta, lookup_by_operation, 
lookup_command};
 pub use error::WireError;
-pub use frame::*;
-pub use message_layout::*;
+pub use framing::{RequestFrame, ResponseFrame, STATUS_OK};
+pub use message_view::{
+    WireMessageIterator, WireMessageIteratorMut, WireMessageView, 
WireMessageViewMut,
+};
 pub use primitives::consumer::WireConsumer;
 pub use primitives::identifier::{MAX_WIRE_NAME_LENGTH, WireIdentifier, 
WireName};
 pub use primitives::partitioning::{MAX_MESSAGES_KEY_LENGTH, WirePartitioning};
diff --git a/core/binary_protocol/src/message_view.rs 
b/core/binary_protocol/src/message_view.rs
index c291d9a7e..3159dac5b 100644
--- a/core/binary_protocol/src/message_view.rs
+++ b/core/binary_protocol/src/message_view.rs
@@ -26,6 +26,7 @@ use crate::message_layout::{
     MSG_PAYLOAD_LEN_OFFSET, MSG_TIMESTAMP_OFFSET, MSG_USER_HEADERS_LEN_OFFSET,
     WIRE_MESSAGE_HEADER_SIZE,
 };
+use std::borrow::Cow;
 
 // Private helpers for infallible reads on validated buffers
 
@@ -72,7 +73,9 @@ fn validate_frame(buf: &[u8]) -> Result<(usize, usize, 
usize), WireError> {
     let total = WIRE_MESSAGE_HEADER_SIZE
         .checked_add(payload_len)
         .and_then(|s| s.checked_add(user_headers_len))
-        .ok_or_else(|| WireError::Validation("message frame size 
overflow".to_string()))?;
+        .ok_or(WireError::Validation(Cow::Borrowed(
+            "message frame size overflow",
+        )))?;
 
     if buf.len() < total {
         return Err(WireError::UnexpectedEof {
@@ -371,9 +374,9 @@ impl<'a> WireMessageIteratorMut<'a> {
                 .and_then(|s| s.checked_add(user_headers_len))
             else {
                 self.remaining = 0;
-                return Some(Err(WireError::Validation(
-                    "message frame size overflow".to_string(),
-                )));
+                return Some(Err(WireError::Validation(Cow::Borrowed(
+                    "message frame size overflow",
+                ))));
             };
             (total, rest.len())
         };
diff --git a/core/binary_protocol/src/primitives/identifier.rs 
b/core/binary_protocol/src/primitives/identifier.rs
index af25c189c..f28614eba 100644
--- a/core/binary_protocol/src/primitives/identifier.rs
+++ b/core/binary_protocol/src/primitives/identifier.rs
@@ -18,6 +18,7 @@
 use crate::WireError;
 use crate::codec::{WireDecode, WireEncode, read_bytes, read_str, read_u8};
 use bytes::{BufMut, BytesMut};
+use std::borrow::Cow;
 use std::ops::Deref;
 
 // WireName
@@ -40,10 +41,10 @@ impl WireName {
     pub fn new(s: impl Into<String>) -> Result<Self, WireError> {
         let s = s.into();
         if s.is_empty() || s.len() > MAX_WIRE_NAME_LENGTH {
-            return Err(WireError::Validation(format!(
+            return Err(WireError::Validation(Cow::Owned(format!(
                 "wire name must be 1-{MAX_WIRE_NAME_LENGTH} bytes, got {}",
                 s.len()
-            )));
+            ))));
         }
         Ok(Self(s))
     }
@@ -102,9 +103,9 @@ impl WireDecode for WireName {
     fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
         let name_len = read_u8(buf, 0)? as usize;
         if name_len == 0 || name_len > MAX_WIRE_NAME_LENGTH {
-            return Err(WireError::Validation(format!(
+            return Err(WireError::Validation(Cow::Owned(format!(
                 "wire name must be 1-{MAX_WIRE_NAME_LENGTH} bytes, got 
{name_len}"
-            )));
+            ))));
         }
         let name = read_str(buf, 1, name_len)?;
         Ok((Self(name), 1 + name_len))
@@ -199,9 +200,9 @@ impl WireDecode for WireIdentifier {
         match kind {
             KIND_NUMERIC => {
                 if length != NUMERIC_VALUE_LEN as usize {
-                    return Err(WireError::Validation(format!(
+                    return Err(WireError::Validation(Cow::Owned(format!(
                         "numeric identifier must be {NUMERIC_VALUE_LEN} bytes, 
got {length}"
-                    )));
+                    ))));
                 }
                 let id = u32::from_le_bytes(
                     value
@@ -212,9 +213,9 @@ impl WireDecode for WireIdentifier {
             }
             KIND_STRING => {
                 if length == 0 {
-                    return Err(WireError::Validation(
-                        "string identifier cannot be empty".to_string(),
-                    ));
+                    return Err(WireError::Validation(Cow::Borrowed(
+                        "string identifier cannot be empty",
+                    )));
                 }
                 let s =
                     std::str::from_utf8(value).map_err(|_| 
WireError::InvalidUtf8 { offset: 2 })?;
diff --git a/core/binary_protocol/src/primitives/partitioning.rs 
b/core/binary_protocol/src/primitives/partitioning.rs
index 4bfbf5f58..6e3b277de 100644
--- a/core/binary_protocol/src/primitives/partitioning.rs
+++ b/core/binary_protocol/src/primitives/partitioning.rs
@@ -18,6 +18,7 @@
 use crate::WireError;
 use crate::codec::{WireDecode, WireEncode, read_bytes, read_u8, read_u32_le};
 use bytes::{BufMut, BytesMut};
+use std::borrow::Cow;
 
 const KIND_BALANCED: u8 = 1;
 const KIND_PARTITION_ID: u8 = 2;
@@ -49,15 +50,15 @@ impl WirePartitioning {
     /// Returns `WireError::Validation` if `key` is empty or exceeds 255 bytes.
     pub fn messages_key(key: Vec<u8>) -> Result<Self, WireError> {
         if key.is_empty() {
-            return Err(WireError::Validation(
-                "messages_key partitioning cannot have empty key".to_string(),
-            ));
+            return Err(WireError::Validation(Cow::Borrowed(
+                "messages_key partitioning cannot have empty key",
+            )));
         }
         if key.len() > MAX_MESSAGES_KEY_LENGTH {
-            return Err(WireError::Validation(format!(
+            return Err(WireError::Validation(Cow::Owned(format!(
                 "messages_key length {} exceeds maximum 
{MAX_MESSAGES_KEY_LENGTH}",
                 key.len()
-            )));
+            ))));
         }
         Ok(Self::MessagesKey(key))
     }
@@ -107,26 +108,26 @@ impl WireDecode for WirePartitioning {
         match kind {
             KIND_BALANCED => {
                 if length != 0 {
-                    return Err(WireError::Validation(format!(
+                    return Err(WireError::Validation(Cow::Owned(format!(
                         "balanced partitioning must have length 0, got 
{length}"
-                    )));
+                    ))));
                 }
                 Ok((Self::Balanced, 2))
             }
             KIND_PARTITION_ID => {
                 if length != 4 {
-                    return Err(WireError::Validation(format!(
+                    return Err(WireError::Validation(Cow::Owned(format!(
                         "partition_id partitioning must have length 4, got 
{length}"
-                    )));
+                    ))));
                 }
                 let id = read_u32_le(buf, 2)?;
                 Ok((Self::PartitionId(id), 6))
             }
             KIND_MESSAGES_KEY => {
                 if length == 0 {
-                    return Err(WireError::Validation(
-                        "messages_key partitioning cannot have empty 
key".to_string(),
-                    ));
+                    return Err(WireError::Validation(Cow::Borrowed(
+                        "messages_key partitioning cannot have empty key",
+                    )));
                 }
                 let key = read_bytes(buf, 2, length)?;
                 Ok((Self::MessagesKey(key.to_vec()), 2 + length))
diff --git a/core/binary_protocol/src/requests/users/create_user.rs 
b/core/binary_protocol/src/requests/users/create_user.rs
index 4348426e9..632e077d9 100644
--- a/core/binary_protocol/src/requests/users/create_user.rs
+++ b/core/binary_protocol/src/requests/users/create_user.rs
@@ -20,6 +20,7 @@ use crate::codec::{WireDecode, WireEncode, read_str, read_u8, 
read_u32_le};
 use crate::primitives::identifier::WireName;
 use crate::primitives::permissions::WirePermissions;
 use bytes::{BufMut, BytesMut};
+use std::borrow::Cow;
 
 /// `CreateUser` request.
 ///
@@ -85,9 +86,9 @@ impl WireDecode for CreateUserRequest {
         let permissions = if has_permissions == 1 && perm_len > 0 {
             let (perms, consumed) = WirePermissions::decode(&buf[pos..])?;
             if consumed != perm_len {
-                return Err(WireError::Validation(format!(
+                return Err(WireError::Validation(Cow::Owned(format!(
                     "permissions length mismatch: header says {perm_len}, 
decoded {consumed}"
-                )));
+                ))));
             }
             pos += consumed;
             Some(perms)
diff --git a/core/binary_protocol/src/requests/users/update_permissions.rs 
b/core/binary_protocol/src/requests/users/update_permissions.rs
index 6b5bcbbc4..16de85737 100644
--- a/core/binary_protocol/src/requests/users/update_permissions.rs
+++ b/core/binary_protocol/src/requests/users/update_permissions.rs
@@ -20,6 +20,7 @@ use crate::WireIdentifier;
 use crate::codec::{WireDecode, WireEncode, read_u8, read_u32_le};
 use crate::primitives::permissions::WirePermissions;
 use bytes::{BufMut, BytesMut};
+use std::borrow::Cow;
 
 /// `UpdatePermissions` request.
 ///
@@ -67,9 +68,9 @@ impl WireDecode for UpdatePermissionsRequest {
         let permissions = if has_permissions == 1 && perm_len > 0 {
             let (perms, consumed) = WirePermissions::decode(&buf[pos..])?;
             if consumed != perm_len {
-                return Err(WireError::Validation(format!(
+                return Err(WireError::Validation(Cow::Owned(format!(
                     "permissions length mismatch: header says {perm_len}, 
decoded {consumed}"
-                )));
+                ))));
             }
             pos += consumed;
             Some(perms)
diff --git a/core/binary_protocol/src/responses/streams/get_stream.rs 
b/core/binary_protocol/src/responses/streams/get_stream.rs
index 3470e3743..25aae5177 100644
--- a/core/binary_protocol/src/responses/streams/get_stream.rs
+++ b/core/binary_protocol/src/responses/streams/get_stream.rs
@@ -20,6 +20,7 @@ use crate::codec::{WireDecode, WireEncode, read_u8, 
read_u32_le, read_u64_le};
 use crate::primitives::identifier::WireName;
 use crate::responses::streams::StreamResponse;
 use bytes::{BufMut, BytesMut};
+use std::borrow::Cow;
 
 /// Topic header within a `GetStream` response.
 ///
@@ -141,11 +142,11 @@ impl WireDecode for GetStreamResponse {
             topics.push(topic);
         }
         if topics.len() != stream.topics_count as usize {
-            return Err(WireError::Validation(format!(
+            return Err(WireError::Validation(Cow::Owned(format!(
                 "stream.topics_count={} but decoded {} topics",
                 stream.topics_count,
                 topics.len()
-            )));
+            ))));
         }
         Ok((Self { stream, topics }, pos))
     }
diff --git a/core/binary_protocol/src/responses/system/get_cluster_metadata.rs 
b/core/binary_protocol/src/responses/system/get_cluster_metadata.rs
new file mode 100644
index 000000000..c717f5139
--- /dev/null
+++ b/core/binary_protocol/src/responses/system/get_cluster_metadata.rs
@@ -0,0 +1,230 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::codec::{WireDecode, WireEncode, read_str, read_u8, read_u16_le, 
read_u32_le};
+use bytes::{BufMut, BytesMut};
+
+/// `GetClusterMetadata` response: cluster name and list of nodes.
+///
+/// Wire format:
+/// ```text
+/// [name_len:4 LE][name:N][nodes_count:4 LE][ClusterNodeResponse]*
+/// ```
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct ClusterMetadataResponse {
+    pub name: String,
+    pub nodes: Vec<ClusterNodeResponse>,
+}
+
+/// A single node within the cluster metadata.
+///
+/// Wire format:
+/// ```text
+/// [name_len:4 LE][name:N][ip_len:4 LE][ip:N]
+/// [tcp:2 LE][quic:2 LE][http:2 LE][websocket:2 LE]
+/// [role:1][status:1]
+/// ```
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct ClusterNodeResponse {
+    pub name: String,
+    pub ip: String,
+    pub tcp_port: u16,
+    pub quic_port: u16,
+    pub http_port: u16,
+    pub websocket_port: u16,
+    pub role: u8,
+    pub status: u8,
+}
+
+const NODE_FIXED_SIZE: usize = 4 + 4 + 8 + 1 + 1; // name_len + ip_len + ports 
+ role + status
+
+impl WireEncode for ClusterNodeResponse {
+    fn encoded_size(&self) -> usize {
+        NODE_FIXED_SIZE + self.name.len() + self.ip.len()
+    }
+
+    #[allow(clippy::cast_possible_truncation)]
+    fn encode(&self, buf: &mut BytesMut) {
+        buf.put_u32_le(self.name.len() as u32);
+        buf.put_slice(self.name.as_bytes());
+        buf.put_u32_le(self.ip.len() as u32);
+        buf.put_slice(self.ip.as_bytes());
+        buf.put_u16_le(self.tcp_port);
+        buf.put_u16_le(self.quic_port);
+        buf.put_u16_le(self.http_port);
+        buf.put_u16_le(self.websocket_port);
+        buf.put_u8(self.role);
+        buf.put_u8(self.status);
+    }
+}
+
+impl WireDecode for ClusterNodeResponse {
+    fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+        let name_len = read_u32_le(buf, 0)? as usize;
+        let name = read_str(buf, 4, name_len)?;
+        let pos = 4 + name_len;
+
+        let ip_len = read_u32_le(buf, pos)? as usize;
+        let ip = read_str(buf, pos + 4, ip_len)?;
+        let pos = pos + 4 + ip_len;
+
+        let tcp_port = read_u16_le(buf, pos)?;
+        let quic_port = read_u16_le(buf, pos + 2)?;
+        let http_port = read_u16_le(buf, pos + 4)?;
+        let websocket_port = read_u16_le(buf, pos + 6)?;
+        let role = read_u8(buf, pos + 8)?;
+        let status = read_u8(buf, pos + 9)?;
+
+        Ok((
+            Self {
+                name,
+                ip,
+                tcp_port,
+                quic_port,
+                http_port,
+                websocket_port,
+                role,
+                status,
+            },
+            pos + 10,
+        ))
+    }
+}
+
+impl WireEncode for ClusterMetadataResponse {
+    fn encoded_size(&self) -> usize {
+        4 + self.name.len()
+            + 4
+            + self
+                .nodes
+                .iter()
+                .map(WireEncode::encoded_size)
+                .sum::<usize>()
+    }
+
+    #[allow(clippy::cast_possible_truncation)]
+    fn encode(&self, buf: &mut BytesMut) {
+        buf.put_u32_le(self.name.len() as u32);
+        buf.put_slice(self.name.as_bytes());
+        buf.put_u32_le(self.nodes.len() as u32);
+        for node in &self.nodes {
+            node.encode(buf);
+        }
+    }
+}
+
+impl WireDecode for ClusterMetadataResponse {
+    fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+        let name_len = read_u32_le(buf, 0)? as usize;
+        let name = read_str(buf, 4, name_len)?;
+        let mut pos = 4 + name_len;
+
+        let nodes_count = read_u32_le(buf, pos)? as usize;
+        pos += 4;
+
+        let mut nodes = Vec::with_capacity(nodes_count);
+        for _ in 0..nodes_count {
+            let (node, consumed) = ClusterNodeResponse::decode(&buf[pos..])?;
+            pos += consumed;
+            nodes.push(node);
+        }
+
+        Ok((Self { name, nodes }, pos))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    fn sample_node(name: &str, ip: &str) -> ClusterNodeResponse {
+        ClusterNodeResponse {
+            name: name.to_string(),
+            ip: ip.to_string(),
+            tcp_port: 8090,
+            quic_port: 8091,
+            http_port: 3000,
+            websocket_port: 3001,
+            role: 1,
+            status: 1,
+        }
+    }
+
+    #[test]
+    fn node_roundtrip() {
+        let node = sample_node("node-1", "192.168.1.1");
+        let bytes = node.to_bytes();
+        let (decoded, consumed) = ClusterNodeResponse::decode(&bytes).unwrap();
+        assert_eq!(consumed, bytes.len());
+        assert_eq!(decoded, node);
+    }
+
+    #[test]
+    fn roundtrip_no_nodes() {
+        let resp = ClusterMetadataResponse {
+            name: "test-cluster".to_string(),
+            nodes: vec![],
+        };
+        let bytes = resp.to_bytes();
+        let (decoded, consumed) = 
ClusterMetadataResponse::decode(&bytes).unwrap();
+        assert_eq!(consumed, bytes.len());
+        assert_eq!(decoded, resp);
+    }
+
+    #[test]
+    fn roundtrip_with_nodes() {
+        let resp = ClusterMetadataResponse {
+            name: "prod-cluster".to_string(),
+            nodes: vec![
+                sample_node("node-1", "10.0.0.1"),
+                sample_node("node-2", "10.0.0.2"),
+                sample_node("node-3", "10.0.0.3"),
+            ],
+        };
+        let bytes = resp.to_bytes();
+        let (decoded, consumed) = 
ClusterMetadataResponse::decode(&bytes).unwrap();
+        assert_eq!(consumed, bytes.len());
+        assert_eq!(decoded, resp);
+    }
+
+    #[test]
+    fn truncated_returns_error() {
+        let resp = ClusterMetadataResponse {
+            name: "c".to_string(),
+            nodes: vec![sample_node("n", "1.2.3.4")],
+        };
+        let bytes = resp.to_bytes();
+        for i in 0..bytes.len() {
+            assert!(
+                ClusterMetadataResponse::decode(&bytes[..i]).is_err(),
+                "expected error for truncation at byte {i}"
+            );
+        }
+    }
+
+    #[test]
+    fn empty_cluster_name() {
+        let resp = ClusterMetadataResponse {
+            name: String::new(),
+            nodes: vec![],
+        };
+        let bytes = resp.to_bytes();
+        let (decoded, _) = ClusterMetadataResponse::decode(&bytes).unwrap();
+        assert_eq!(decoded.name, "");
+    }
+}
diff --git a/core/binary_protocol/src/frame.rs 
b/core/binary_protocol/src/responses/system/get_me.rs
similarity index 56%
rename from core/binary_protocol/src/frame.rs
rename to core/binary_protocol/src/responses/system/get_me.rs
index 77a5f2b3e..5b2be1091 100644
--- a/core/binary_protocol/src/frame.rs
+++ b/core/binary_protocol/src/responses/system/get_me.rs
@@ -15,18 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-// TODO(hubcio): Legacy framing constants for the current binary protocol.
-// Once VSR consensus is integrated, both client-server and
-// replica-replica traffic will use the unified 256-byte
-// consensus header (`consensus::header::HEADER_SIZE`).
-// These constants will be removed at that point.
-
-/// Request frame: `[length:4 LE][code:4 LE][payload:N]`
-/// `length` = size of code + payload = 4 + N
-pub const REQUEST_HEADER_SIZE: usize = 4;
-
-/// Response frame: `[status:4 LE][length:4 LE][payload:N]`
-pub const RESPONSE_HEADER_SIZE: usize = 8;
-
-/// Status code for a successful response.
-pub const STATUS_OK: u32 = 0;
+/// `GetMe` response: same wire format as `ClientDetailsResponse`.
+///
+/// The server returns the authenticated client's own info using the
+/// same `map_client` serialization as `GetClient`.
+pub type GetMeResponse = 
crate::responses::clients::get_client::ClientDetailsResponse;
diff --git a/core/binary_protocol/src/responses/system/get_snapshot.rs 
b/core/binary_protocol/src/responses/system/get_snapshot.rs
new file mode 100644
index 000000000..2fcbb9291
--- /dev/null
+++ b/core/binary_protocol/src/responses/system/get_snapshot.rs
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::codec::{WireDecode, WireEncode};
+use bytes::{BufMut, BytesMut};
+
+/// `GetSnapshot` response: raw snapshot data (ZIP archive).
+///
+/// The server produces a ZIP file containing diagnostic data.
+/// The payload is the complete archive with no additional framing.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct GetSnapshotResponse {
+    pub data: Vec<u8>,
+}
+
+impl WireEncode for GetSnapshotResponse {
+    fn encoded_size(&self) -> usize {
+        self.data.len()
+    }
+
+    fn encode(&self, buf: &mut BytesMut) {
+        buf.put_slice(&self.data);
+    }
+}
+
+impl WireDecode for GetSnapshotResponse {
+    fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+        Ok((Self { data: buf.to_vec() }, buf.len()))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn roundtrip() {
+        let resp = GetSnapshotResponse {
+            data: vec![0x50, 0x4B, 0x03, 0x04, 1, 2, 3, 4],
+        };
+        let bytes = resp.to_bytes();
+        let (decoded, consumed) = GetSnapshotResponse::decode(&bytes).unwrap();
+        assert_eq!(consumed, bytes.len());
+        assert_eq!(decoded, resp);
+    }
+
+    #[test]
+    fn empty_snapshot() {
+        let resp = GetSnapshotResponse { data: vec![] };
+        let bytes = resp.to_bytes();
+        assert!(bytes.is_empty());
+        let (decoded, consumed) = GetSnapshotResponse::decode(&bytes).unwrap();
+        assert_eq!(consumed, 0);
+        assert_eq!(decoded, resp);
+    }
+}
diff --git a/core/binary_protocol/src/responses/system/mod.rs 
b/core/binary_protocol/src/responses/system/mod.rs
index 7394548b6..8b3cafc70 100644
--- a/core/binary_protocol/src/responses/system/mod.rs
+++ b/core/binary_protocol/src/responses/system/mod.rs
@@ -15,9 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
+pub mod get_cluster_metadata;
+pub mod get_me;
+pub mod get_snapshot;
 pub mod get_stats;
 mod ping;
 
 pub use super::EmptyResponse;
+pub use get_cluster_metadata::{ClusterMetadataResponse, ClusterNodeResponse};
+pub use get_me::GetMeResponse;
+pub use get_snapshot::GetSnapshotResponse;
 pub use get_stats::StatsResponse;
 pub use ping::PingResponse;
diff --git a/core/binary_protocol/src/responses/topics/get_topic.rs 
b/core/binary_protocol/src/responses/topics/get_topic.rs
index 80b510a9c..b90e52156 100644
--- a/core/binary_protocol/src/responses/topics/get_topic.rs
+++ b/core/binary_protocol/src/responses/topics/get_topic.rs
@@ -19,6 +19,7 @@ use crate::WireError;
 use crate::codec::{WireDecode, WireEncode, read_u32_le, read_u64_le};
 use crate::responses::streams::get_stream::TopicHeader;
 use bytes::{BufMut, BytesMut};
+use std::borrow::Cow;
 
 /// Partition details within a `GetTopic` response.
 ///
@@ -120,11 +121,11 @@ impl WireDecode for GetTopicResponse {
             partitions.push(partition);
         }
         if partitions.len() != topic.partitions_count as usize {
-            return Err(WireError::Validation(format!(
+            return Err(WireError::Validation(Cow::Owned(format!(
                 "topic.partitions_count={} but decoded {} partitions",
                 topic.partitions_count,
                 partitions.len()
-            )));
+            ))));
         }
         Ok((Self { topic, partitions }, pos))
     }

Reply via email to