This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch refactor-binary-4-sans-io in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 9748b3a3ff16110eecc8b7f343214b19c57ec4cb Author: Hubert Gruszecki <[email protected]> AuthorDate: Tue Mar 17 15:54:28 2026 +0100 feat(rust): add sans-IO frame codec and command dispatch table The binary_protocol crate had wire types but no framing layer. Server and SDK hand-rolled frame parsing independently. Add RequestFrame/ResponseFrame sans-IO codec (framing.rs) with pre-reserved buffers, checked encoded_size() returning Option, and NonZeroU32 status in encode_error() for type-level safety. Add CommandMeta dispatch table (dispatch.rs) as single source of truth for all 47 command codes, names, and VSR operations. Operation::from_command_code/to_command_code now delegate to it. Replace frame.rs with framing.rs. Remove glob re-exports from crate root. Change WireError::Validation to Cow<'static, str>. --- core/binary_protocol/src/codes.rs | 54 +--- core/binary_protocol/src/consensus/operation.rs | 68 +--- core/binary_protocol/src/dispatch.rs | 350 +++++++++++++++++++++ core/binary_protocol/src/error.rs | 4 +- core/binary_protocol/src/frame.rs | 32 -- core/binary_protocol/src/framing.rs | 289 +++++++++++++++++ core/binary_protocol/src/lib.rs | 28 +- core/binary_protocol/src/message_view.rs | 11 +- core/binary_protocol/src/primitives/identifier.rs | 19 +- .../binary_protocol/src/primitives/partitioning.rs | 25 +- .../src/requests/users/create_user.rs | 5 +- .../src/requests/users/update_permissions.rs | 5 +- .../src/responses/streams/get_stream.rs | 5 +- .../src/responses/topics/get_topic.rs | 5 +- 14 files changed, 729 insertions(+), 171 deletions(-) diff --git a/core/binary_protocol/src/codes.rs b/core/binary_protocol/src/codes.rs index f20209172..a088b71fb 100644 --- a/core/binary_protocol/src/codes.rs +++ b/core/binary_protocol/src/codes.rs @@ -87,58 +87,14 @@ pub const DELETE_CONSUMER_GROUP_CODE: u32 = 603; pub const JOIN_CONSUMER_GROUP_CODE: u32 = 604; pub const LEAVE_CONSUMER_GROUP_CODE: u32 = 605; +/// Lookup the human-readable name for a command code. +/// /// # Errors /// Returns `WireError::UnknownCommand` if the code is not recognized. pub const fn command_name(code: u32) -> Result<&'static str, WireError> { - match code { - PING_CODE => Ok("ping"), - GET_STATS_CODE => Ok("stats"), - GET_SNAPSHOT_FILE_CODE => Ok("snapshot"), - GET_CLUSTER_METADATA_CODE => Ok("cluster.metadata"), - GET_ME_CODE => Ok("me"), - GET_CLIENT_CODE => Ok("client.get"), - GET_CLIENTS_CODE => Ok("client.list"), - GET_USER_CODE => Ok("user.get"), - GET_USERS_CODE => Ok("user.list"), - CREATE_USER_CODE => Ok("user.create"), - DELETE_USER_CODE => Ok("user.delete"), - UPDATE_USER_CODE => Ok("user.update"), - UPDATE_PERMISSIONS_CODE => Ok("user.permissions"), - CHANGE_PASSWORD_CODE => Ok("user.password"), - LOGIN_USER_CODE => Ok("user.login"), - LOGOUT_USER_CODE => Ok("user.logout"), - GET_PERSONAL_ACCESS_TOKENS_CODE => Ok("personal_access_token.list"), - CREATE_PERSONAL_ACCESS_TOKEN_CODE => Ok("personal_access_token.create"), - DELETE_PERSONAL_ACCESS_TOKEN_CODE => Ok("personal_access_token.delete"), - LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE => Ok("personal_access_token.login"), - POLL_MESSAGES_CODE => Ok("message.poll"), - SEND_MESSAGES_CODE => Ok("message.send"), - FLUSH_UNSAVED_BUFFER_CODE => Ok("message.flush_unsaved_buffer"), - GET_CONSUMER_OFFSET_CODE => Ok("consumer_offset.get"), - STORE_CONSUMER_OFFSET_CODE => Ok("consumer_offset.store"), - DELETE_CONSUMER_OFFSET_CODE => Ok("consumer_offset.delete"), - GET_STREAM_CODE => Ok("stream.get"), - GET_STREAMS_CODE => Ok("stream.list"), - CREATE_STREAM_CODE => Ok("stream.create"), - DELETE_STREAM_CODE => Ok("stream.delete"), - UPDATE_STREAM_CODE => Ok("stream.update"), - PURGE_STREAM_CODE => Ok("stream.purge"), - GET_TOPIC_CODE => Ok("topic.get"), - GET_TOPICS_CODE => Ok("topic.list"), - CREATE_TOPIC_CODE => Ok("topic.create"), - DELETE_TOPIC_CODE => Ok("topic.delete"), - UPDATE_TOPIC_CODE => Ok("topic.update"), - PURGE_TOPIC_CODE => Ok("topic.purge"), - CREATE_PARTITIONS_CODE => Ok("partition.create"), - DELETE_PARTITIONS_CODE => Ok("partition.delete"), - DELETE_SEGMENTS_CODE => Ok("segment.delete"), - GET_CONSUMER_GROUP_CODE => Ok("consumer_group.get"), - GET_CONSUMER_GROUPS_CODE => Ok("consumer_group.list"), - CREATE_CONSUMER_GROUP_CODE => Ok("consumer_group.create"), - DELETE_CONSUMER_GROUP_CODE => Ok("consumer_group.delete"), - JOIN_CONSUMER_GROUP_CODE => Ok("consumer_group.join"), - LEAVE_CONSUMER_GROUP_CODE => Ok("consumer_group.leave"), - _ => Err(WireError::UnknownCommand(code)), + match crate::dispatch::lookup_command(code) { + Some(meta) => Ok(meta.name), + None => Err(WireError::UnknownCommand(code)), } } diff --git a/core/binary_protocol/src/consensus/operation.rs b/core/binary_protocol/src/consensus/operation.rs index 8bf2b5016..834e82003 100644 --- a/core/binary_protocol/src/consensus/operation.rs +++ b/core/binary_protocol/src/consensus/operation.rs @@ -95,64 +95,27 @@ impl Operation { } /// Bidirectional mapping: `Operation` -> client command code. + /// + /// Delegates to the dispatch table as the single source of truth. #[must_use] pub const fn to_command_code(&self) -> Option<u32> { - use crate::codes; match self { Self::Reserved => None, - Self::CreateStream => Some(codes::CREATE_STREAM_CODE), - Self::UpdateStream => Some(codes::UPDATE_STREAM_CODE), - Self::DeleteStream => Some(codes::DELETE_STREAM_CODE), - Self::PurgeStream => Some(codes::PURGE_STREAM_CODE), - Self::CreateTopic => Some(codes::CREATE_TOPIC_CODE), - Self::UpdateTopic => Some(codes::UPDATE_TOPIC_CODE), - Self::DeleteTopic => Some(codes::DELETE_TOPIC_CODE), - Self::PurgeTopic => Some(codes::PURGE_TOPIC_CODE), - Self::CreatePartitions => Some(codes::CREATE_PARTITIONS_CODE), - Self::DeletePartitions => Some(codes::DELETE_PARTITIONS_CODE), - Self::DeleteSegments => Some(codes::DELETE_SEGMENTS_CODE), - Self::CreateConsumerGroup => Some(codes::CREATE_CONSUMER_GROUP_CODE), - Self::DeleteConsumerGroup => Some(codes::DELETE_CONSUMER_GROUP_CODE), - Self::CreateUser => Some(codes::CREATE_USER_CODE), - Self::UpdateUser => Some(codes::UPDATE_USER_CODE), - Self::DeleteUser => Some(codes::DELETE_USER_CODE), - Self::ChangePassword => Some(codes::CHANGE_PASSWORD_CODE), - Self::UpdatePermissions => Some(codes::UPDATE_PERMISSIONS_CODE), - Self::CreatePersonalAccessToken => Some(codes::CREATE_PERSONAL_ACCESS_TOKEN_CODE), - Self::DeletePersonalAccessToken => Some(codes::DELETE_PERSONAL_ACCESS_TOKEN_CODE), - Self::SendMessages => Some(codes::SEND_MESSAGES_CODE), - Self::StoreConsumerOffset => Some(codes::STORE_CONSUMER_OFFSET_CODE), + _ => match crate::dispatch::lookup_by_operation(*self) { + Some(meta) => Some(meta.code), + None => None, + }, } } /// Bidirectional mapping: client command code -> `Operation`. + /// + /// Delegates to the dispatch table as the single source of truth. #[must_use] pub const fn from_command_code(code: u32) -> Option<Self> { - use crate::codes; - match code { - codes::CREATE_STREAM_CODE => Some(Self::CreateStream), - codes::UPDATE_STREAM_CODE => Some(Self::UpdateStream), - codes::DELETE_STREAM_CODE => Some(Self::DeleteStream), - codes::PURGE_STREAM_CODE => Some(Self::PurgeStream), - codes::CREATE_TOPIC_CODE => Some(Self::CreateTopic), - codes::UPDATE_TOPIC_CODE => Some(Self::UpdateTopic), - codes::DELETE_TOPIC_CODE => Some(Self::DeleteTopic), - codes::PURGE_TOPIC_CODE => Some(Self::PurgeTopic), - codes::CREATE_PARTITIONS_CODE => Some(Self::CreatePartitions), - codes::DELETE_PARTITIONS_CODE => Some(Self::DeletePartitions), - codes::DELETE_SEGMENTS_CODE => Some(Self::DeleteSegments), - codes::CREATE_CONSUMER_GROUP_CODE => Some(Self::CreateConsumerGroup), - codes::DELETE_CONSUMER_GROUP_CODE => Some(Self::DeleteConsumerGroup), - codes::CREATE_USER_CODE => Some(Self::CreateUser), - codes::UPDATE_USER_CODE => Some(Self::UpdateUser), - codes::DELETE_USER_CODE => Some(Self::DeleteUser), - codes::CHANGE_PASSWORD_CODE => Some(Self::ChangePassword), - codes::UPDATE_PERMISSIONS_CODE => Some(Self::UpdatePermissions), - codes::CREATE_PERSONAL_ACCESS_TOKEN_CODE => Some(Self::CreatePersonalAccessToken), - codes::DELETE_PERSONAL_ACCESS_TOKEN_CODE => Some(Self::DeletePersonalAccessToken), - codes::SEND_MESSAGES_CODE => Some(Self::SendMessages), - codes::STORE_CONSUMER_OFFSET_CODE => Some(Self::StoreConsumerOffset), - _ => None, + match crate::dispatch::lookup_command(code) { + Some(meta) => meta.operation, + None => None, } } } @@ -204,10 +167,11 @@ mod tests { #[test] fn read_only_commands_have_no_operation() { - assert!(Operation::from_command_code(crate::PING_CODE).is_none()); - assert!(Operation::from_command_code(crate::GET_STATS_CODE).is_none()); - assert!(Operation::from_command_code(crate::GET_STREAM_CODE).is_none()); - assert!(Operation::from_command_code(crate::POLL_MESSAGES_CODE).is_none()); + use crate::codes::{GET_STATS_CODE, GET_STREAM_CODE, PING_CODE, POLL_MESSAGES_CODE}; + assert!(Operation::from_command_code(PING_CODE).is_none()); + assert!(Operation::from_command_code(GET_STATS_CODE).is_none()); + assert!(Operation::from_command_code(GET_STREAM_CODE).is_none()); + assert!(Operation::from_command_code(POLL_MESSAGES_CODE).is_none()); } #[test] diff --git a/core/binary_protocol/src/dispatch.rs b/core/binary_protocol/src/dispatch.rs new file mode 100644 index 000000000..ac755d61d --- /dev/null +++ b/core/binary_protocol/src/dispatch.rs @@ -0,0 +1,350 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Command dispatch table mapping codes and operations to metadata. +//! +//! This is the protocol's identity registry. Every command has an entry +//! with its numeric code, human-readable name, and optional VSR operation. +//! +//! Two lookup paths: +//! - `lookup_command(code)`: current framing reads `[length][code][payload]`, looks up by code +//! - `lookup_by_operation(op)`: future VSR framing reads 256-byte header, looks up by operation + +#[allow(clippy::wildcard_imports)] +use crate::codes::*; +use crate::consensus::Operation; + +/// Metadata for a single protocol command. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct CommandMeta { + pub code: u32, + pub name: &'static str, + /// VSR operation for replicated commands. `None` for non-replicated + /// commands that bypass consensus. + pub operation: Option<Operation>, +} + +impl CommandMeta { + /// Returns `true` if this command is replicated through VSR consensus. + #[must_use] + pub const fn is_replicated(&self) -> bool { + self.operation.is_some() + } + + const fn new(code: u32, name: &'static str, operation: Option<Operation>) -> Self { + Self { + code, + name, + operation, + } + } + + const fn non_replicated(code: u32, name: &'static str) -> Self { + Self::new(code, name, None) + } + + const fn replicated(code: u32, name: &'static str, op: Operation) -> Self { + Self::new(code, name, Some(op)) + } +} + +/// All known command metadata entries. +pub const COMMAND_TABLE: &[CommandMeta] = &[ + // System + CommandMeta::non_replicated(PING_CODE, "ping"), + CommandMeta::non_replicated(GET_STATS_CODE, "stats"), + CommandMeta::non_replicated(GET_SNAPSHOT_FILE_CODE, "snapshot"), + CommandMeta::non_replicated(GET_CLUSTER_METADATA_CODE, "cluster.metadata"), + CommandMeta::non_replicated(GET_ME_CODE, "me"), + CommandMeta::non_replicated(GET_CLIENT_CODE, "client.get"), + CommandMeta::non_replicated(GET_CLIENTS_CODE, "client.list"), + // Users + CommandMeta::non_replicated(GET_USER_CODE, "user.get"), + CommandMeta::non_replicated(GET_USERS_CODE, "user.list"), + CommandMeta::replicated(CREATE_USER_CODE, "user.create", Operation::CreateUser), + CommandMeta::replicated(DELETE_USER_CODE, "user.delete", Operation::DeleteUser), + CommandMeta::replicated(UPDATE_USER_CODE, "user.update", Operation::UpdateUser), + CommandMeta::replicated( + UPDATE_PERMISSIONS_CODE, + "user.permissions", + Operation::UpdatePermissions, + ), + CommandMeta::replicated( + CHANGE_PASSWORD_CODE, + "user.password", + Operation::ChangePassword, + ), + CommandMeta::non_replicated(LOGIN_USER_CODE, "user.login"), + CommandMeta::non_replicated(LOGOUT_USER_CODE, "user.logout"), + // Personal Access Tokens + CommandMeta::non_replicated( + GET_PERSONAL_ACCESS_TOKENS_CODE, + "personal_access_token.list", + ), + CommandMeta::replicated( + CREATE_PERSONAL_ACCESS_TOKEN_CODE, + "personal_access_token.create", + Operation::CreatePersonalAccessToken, + ), + CommandMeta::replicated( + DELETE_PERSONAL_ACCESS_TOKEN_CODE, + "personal_access_token.delete", + Operation::DeletePersonalAccessToken, + ), + CommandMeta::non_replicated( + LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE, + "personal_access_token.login", + ), + // Messages + CommandMeta::non_replicated(POLL_MESSAGES_CODE, "message.poll"), + CommandMeta::replicated(SEND_MESSAGES_CODE, "message.send", Operation::SendMessages), + CommandMeta::non_replicated(FLUSH_UNSAVED_BUFFER_CODE, "message.flush_unsaved_buffer"), + // Consumer Offsets + CommandMeta::non_replicated(GET_CONSUMER_OFFSET_CODE, "consumer_offset.get"), + CommandMeta::replicated( + STORE_CONSUMER_OFFSET_CODE, + "consumer_offset.store", + Operation::StoreConsumerOffset, + ), + CommandMeta::non_replicated(DELETE_CONSUMER_OFFSET_CODE, "consumer_offset.delete"), + // Streams + CommandMeta::non_replicated(GET_STREAM_CODE, "stream.get"), + CommandMeta::non_replicated(GET_STREAMS_CODE, "stream.list"), + CommandMeta::replicated(CREATE_STREAM_CODE, "stream.create", Operation::CreateStream), + CommandMeta::replicated(DELETE_STREAM_CODE, "stream.delete", Operation::DeleteStream), + CommandMeta::replicated(UPDATE_STREAM_CODE, "stream.update", Operation::UpdateStream), + CommandMeta::replicated(PURGE_STREAM_CODE, "stream.purge", Operation::PurgeStream), + // Topics + CommandMeta::non_replicated(GET_TOPIC_CODE, "topic.get"), + CommandMeta::non_replicated(GET_TOPICS_CODE, "topic.list"), + CommandMeta::replicated(CREATE_TOPIC_CODE, "topic.create", Operation::CreateTopic), + CommandMeta::replicated(DELETE_TOPIC_CODE, "topic.delete", Operation::DeleteTopic), + CommandMeta::replicated(UPDATE_TOPIC_CODE, "topic.update", Operation::UpdateTopic), + CommandMeta::replicated(PURGE_TOPIC_CODE, "topic.purge", Operation::PurgeTopic), + // Partitions + CommandMeta::replicated( + CREATE_PARTITIONS_CODE, + "partition.create", + Operation::CreatePartitions, + ), + CommandMeta::replicated( + DELETE_PARTITIONS_CODE, + "partition.delete", + Operation::DeletePartitions, + ), + // Segments + CommandMeta::replicated( + DELETE_SEGMENTS_CODE, + "segment.delete", + Operation::DeleteSegments, + ), + // Consumer Groups + CommandMeta::non_replicated(GET_CONSUMER_GROUP_CODE, "consumer_group.get"), + CommandMeta::non_replicated(GET_CONSUMER_GROUPS_CODE, "consumer_group.list"), + CommandMeta::replicated( + CREATE_CONSUMER_GROUP_CODE, + "consumer_group.create", + Operation::CreateConsumerGroup, + ), + CommandMeta::replicated( + DELETE_CONSUMER_GROUP_CODE, + "consumer_group.delete", + Operation::DeleteConsumerGroup, + ), + CommandMeta::non_replicated(JOIN_CONSUMER_GROUP_CODE, "consumer_group.join"), + CommandMeta::non_replicated(LEAVE_CONSUMER_GROUP_CODE, "consumer_group.leave"), +]; + +/// Lookup command metadata by command code. +#[must_use] +pub const fn lookup_command(code: u32) -> Option<&'static CommandMeta> { + let mut i = 0; + while i < COMMAND_TABLE.len() { + if COMMAND_TABLE[i].code == code { + return Some(&COMMAND_TABLE[i]); + } + i += 1; + } + None +} + +/// Lookup command metadata by VSR operation. +/// +/// Returns `None` for `Operation::Reserved` and for non-replicated commands +/// that have no operation mapping. +#[must_use] +pub const fn lookup_by_operation(op: Operation) -> Option<&'static CommandMeta> { + let mut i = 0; + while i < COMMAND_TABLE.len() { + if let Some(table_op) = COMMAND_TABLE[i].operation + && table_op as u8 == op as u8 + { + return Some(&COMMAND_TABLE[i]); + } + i += 1; + } + None +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn every_code_has_entry() { + let all_codes = [ + PING_CODE, + GET_STATS_CODE, + GET_SNAPSHOT_FILE_CODE, + GET_CLUSTER_METADATA_CODE, + GET_ME_CODE, + GET_CLIENT_CODE, + GET_CLIENTS_CODE, + GET_USER_CODE, + GET_USERS_CODE, + CREATE_USER_CODE, + DELETE_USER_CODE, + UPDATE_USER_CODE, + UPDATE_PERMISSIONS_CODE, + CHANGE_PASSWORD_CODE, + LOGIN_USER_CODE, + LOGOUT_USER_CODE, + GET_PERSONAL_ACCESS_TOKENS_CODE, + CREATE_PERSONAL_ACCESS_TOKEN_CODE, + DELETE_PERSONAL_ACCESS_TOKEN_CODE, + LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE, + POLL_MESSAGES_CODE, + SEND_MESSAGES_CODE, + FLUSH_UNSAVED_BUFFER_CODE, + GET_CONSUMER_OFFSET_CODE, + STORE_CONSUMER_OFFSET_CODE, + DELETE_CONSUMER_OFFSET_CODE, + GET_STREAM_CODE, + GET_STREAMS_CODE, + CREATE_STREAM_CODE, + DELETE_STREAM_CODE, + UPDATE_STREAM_CODE, + PURGE_STREAM_CODE, + GET_TOPIC_CODE, + GET_TOPICS_CODE, + CREATE_TOPIC_CODE, + DELETE_TOPIC_CODE, + UPDATE_TOPIC_CODE, + PURGE_TOPIC_CODE, + CREATE_PARTITIONS_CODE, + DELETE_PARTITIONS_CODE, + DELETE_SEGMENTS_CODE, + GET_CONSUMER_GROUP_CODE, + GET_CONSUMER_GROUPS_CODE, + CREATE_CONSUMER_GROUP_CODE, + DELETE_CONSUMER_GROUP_CODE, + JOIN_CONSUMER_GROUP_CODE, + LEAVE_CONSUMER_GROUP_CODE, + ]; + for code in all_codes { + assert!( + lookup_command(code).is_some(), + "missing dispatch entry for code {code}" + ); + } + } + + #[test] + fn no_duplicate_codes_in_table() { + let mut seen = std::collections::HashSet::new(); + for entry in COMMAND_TABLE { + assert!( + seen.insert(entry.code), + "duplicate code {} ({}) in COMMAND_TABLE", + entry.code, + entry.name + ); + } + } + + #[test] + fn unknown_code_returns_none() { + assert!(lookup_command(9999).is_none()); + } + + #[test] + fn names_are_non_empty() { + for entry in COMMAND_TABLE { + assert!(!entry.name.is_empty(), "empty name for code {}", entry.code); + } + } + + #[test] + fn lookup_by_operation_roundtrips_with_lookup_command() { + let replicated_ops = [ + Operation::CreateStream, + Operation::UpdateStream, + Operation::DeleteStream, + Operation::PurgeStream, + Operation::CreateTopic, + Operation::UpdateTopic, + Operation::DeleteTopic, + Operation::PurgeTopic, + Operation::CreatePartitions, + Operation::DeletePartitions, + Operation::DeleteSegments, + Operation::CreateConsumerGroup, + Operation::DeleteConsumerGroup, + Operation::CreateUser, + Operation::UpdateUser, + Operation::DeleteUser, + Operation::ChangePassword, + Operation::UpdatePermissions, + Operation::CreatePersonalAccessToken, + Operation::DeletePersonalAccessToken, + Operation::SendMessages, + Operation::StoreConsumerOffset, + ]; + for op in replicated_ops { + let meta = lookup_by_operation(op) + .unwrap_or_else(|| panic!("no dispatch entry for operation {op:?}")); + + let by_code = lookup_command(meta.code) + .unwrap_or_else(|| panic!("no dispatch entry for code {}", meta.code)); + + assert_eq!( + meta.code, by_code.code, + "lookup_by_operation and lookup disagree for {op:?}" + ); + } + } + + #[test] + fn reserved_operation_returns_none() { + assert!(lookup_by_operation(Operation::Reserved).is_none()); + } + + #[test] + fn no_duplicate_operations_in_table() { + let mut seen = std::collections::HashSet::new(); + for entry in COMMAND_TABLE { + if let Some(op) = entry.operation { + assert!( + seen.insert(op as u8), + "duplicate operation {:?} ({}) in COMMAND_TABLE", + op, + entry.name + ); + } + } + } +} diff --git a/core/binary_protocol/src/error.rs b/core/binary_protocol/src/error.rs index f4f53ea56..82d5cb2b5 100644 --- a/core/binary_protocol/src/error.rs +++ b/core/binary_protocol/src/error.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +use std::borrow::Cow; + /// Protocol-local error type for wire format encode/decode failures. /// /// Intentionally decoupled from `IggyError` to keep the protocol crate @@ -46,5 +48,5 @@ pub enum WireError { PayloadTooLarge { size: usize, max: usize }, #[error("validation failed: {0}")] - Validation(String), + Validation(Cow<'static, str>), } diff --git a/core/binary_protocol/src/frame.rs b/core/binary_protocol/src/frame.rs deleted file mode 100644 index 77a5f2b3e..000000000 --- a/core/binary_protocol/src/frame.rs +++ /dev/null @@ -1,32 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -// TODO(hubcio): Legacy framing constants for the current binary protocol. -// Once VSR consensus is integrated, both client-server and -// replica-replica traffic will use the unified 256-byte -// consensus header (`consensus::header::HEADER_SIZE`). -// These constants will be removed at that point. - -/// Request frame: `[length:4 LE][code:4 LE][payload:N]` -/// `length` = size of code + payload = 4 + N -pub const REQUEST_HEADER_SIZE: usize = 4; - -/// Response frame: `[status:4 LE][length:4 LE][payload:N]` -pub const RESPONSE_HEADER_SIZE: usize = 8; - -/// Status code for a successful response. -pub const STATUS_OK: u32 = 0; diff --git a/core/binary_protocol/src/framing.rs b/core/binary_protocol/src/framing.rs new file mode 100644 index 000000000..981ead667 --- /dev/null +++ b/core/binary_protocol/src/framing.rs @@ -0,0 +1,289 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Sans-IO frame codec for the Iggy binary protocol. +//! +//! Encodes and decodes complete request/response frames without any I/O. +//! The transport layer (TCP, QUIC, WebSocket) reads bytes into a buffer, +//! then hands the buffer to these types for zero-copy parsing. +//! +//! When VSR consensus replaces this framing, the transport layer will +//! switch to `consensus::header::GenericHeader` (256-byte fixed header) +//! while the command payload codec stays the same. + +use crate::codec::{read_bytes, read_u32_le}; +use crate::error::WireError; +use bytes::{BufMut, BytesMut}; +use std::borrow::Cow; +use std::num::NonZeroU32; + +/// Status code for a successful response. +pub const STATUS_OK: u32 = 0; + +/// Decoded request frame. Borrows the payload from the input buffer. +/// +/// Wire format: `[length:4 LE][code:4 LE][payload:N]` +/// where `length` = 4 (code size) + N (payload size). +#[derive(Debug)] +pub struct RequestFrame<'a> { + pub code: u32, + pub payload: &'a [u8], +} + +impl<'a> RequestFrame<'a> { + /// Size of the frame header: `[length:4][code:4]`. + pub const HEADER_SIZE: usize = 8; + + /// Decode a request frame from a complete buffer. + /// + /// # Errors + /// Returns `WireError::UnexpectedEof` if the buffer is too short. + pub fn decode(buf: &'a [u8]) -> Result<(Self, usize), WireError> { + let length = read_u32_le(buf, 0)? as usize; + if length < 4 { + return Err(WireError::Validation(Cow::Borrowed( + "request frame length must be at least 4 (code size)", + ))); + } + let code = read_u32_le(buf, 4)?; + let payload_len = length - 4; + let payload = read_bytes(buf, Self::HEADER_SIZE, payload_len)?; + let total = Self::HEADER_SIZE + payload_len; + Ok((Self { code, payload }, total)) + } + + /// Encode a request frame into `out`. + /// + /// Writes `[length:4 LE][code:4 LE][payload]` where length includes + /// the 4-byte code field. + /// + /// # Errors + /// Returns `WireError::PayloadTooLarge` if payload exceeds u32 capacity. + pub fn encode(code: u32, payload: &[u8], out: &mut BytesMut) -> Result<(), WireError> { + let length = payload + .len() + .checked_add(4) + .and_then(|n| u32::try_from(n).ok()) + .ok_or(WireError::PayloadTooLarge { + size: payload.len(), + max: u32::MAX as usize - 4, + })?; + out.reserve(Self::HEADER_SIZE + payload.len()); + out.put_u32_le(length); + out.put_u32_le(code); + out.put_slice(payload); + Ok(()) + } + + /// Total encoded size for a given payload length. + /// + /// Returns `None` if `HEADER_SIZE + payload_len` overflows `usize`. + #[must_use] + pub const fn encoded_size(payload_len: usize) -> Option<usize> { + Self::HEADER_SIZE.checked_add(payload_len) + } +} + +/// Decoded response frame. Borrows the payload from the input buffer. +/// +/// Wire format: `[status:4 LE][length:4 LE][payload:N]` +/// where `status` = 0 for success, non-zero for error code. +#[derive(Debug)] +pub struct ResponseFrame<'a> { + pub status: u32, + pub payload: &'a [u8], +} + +impl<'a> ResponseFrame<'a> { + /// Size of the frame header: `[status:4][length:4]`. + pub const HEADER_SIZE: usize = 8; + + /// Decode a response frame from a complete buffer. + /// + /// # Errors + /// Returns `WireError::UnexpectedEof` if the buffer is too short. + pub fn decode(buf: &'a [u8]) -> Result<(Self, usize), WireError> { + let status = read_u32_le(buf, 0)?; + let length = read_u32_le(buf, 4)? as usize; + let payload = read_bytes(buf, Self::HEADER_SIZE, length)?; + let total = Self::HEADER_SIZE + length; + Ok((Self { status, payload }, total)) + } + + /// Encode a successful response with payload. + /// + /// # Errors + /// Returns `WireError::PayloadTooLarge` if payload exceeds u32 capacity. + pub fn encode_ok(payload: &[u8], out: &mut BytesMut) -> Result<(), WireError> { + let length = u32::try_from(payload.len()).map_err(|_| WireError::PayloadTooLarge { + size: payload.len(), + max: u32::MAX as usize, + })?; + out.reserve(Self::HEADER_SIZE + payload.len()); + out.put_u32_le(STATUS_OK); + out.put_u32_le(length); + out.put_slice(payload); + Ok(()) + } + + /// Encode an error response (status code, empty payload). + pub fn encode_error(status: NonZeroU32, out: &mut BytesMut) { + out.reserve(Self::HEADER_SIZE); + out.put_u32_le(status.get()); + out.put_u32_le(0); + } + + /// Returns `true` if this is a success response. + #[must_use] + pub const fn is_ok(&self) -> bool { + self.status == STATUS_OK + } + + /// Total encoded size for a given payload length. + /// + /// Returns `None` if `HEADER_SIZE + payload_len` overflows `usize`. + #[must_use] + pub const fn encoded_size(payload_len: usize) -> Option<usize> { + Self::HEADER_SIZE.checked_add(payload_len) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn request_roundtrip() { + let payload = b"hello world"; + let mut buf = BytesMut::with_capacity(RequestFrame::encoded_size(payload.len()).unwrap()); + RequestFrame::encode(42, payload, &mut buf).unwrap(); + + let (frame, consumed) = RequestFrame::decode(&buf).unwrap(); + assert_eq!(consumed, buf.len()); + assert_eq!(frame.code, 42); + assert_eq!(frame.payload, payload); + } + + #[test] + fn request_empty_payload() { + let mut buf = BytesMut::with_capacity(RequestFrame::HEADER_SIZE); + RequestFrame::encode(1, &[], &mut buf).unwrap(); + + let (frame, consumed) = RequestFrame::decode(&buf).unwrap(); + assert_eq!(consumed, 8); + assert_eq!(frame.code, 1); + assert!(frame.payload.is_empty()); + } + + #[test] + fn request_length_field_includes_code() { + let payload = b"test"; + let mut buf = BytesMut::new(); + RequestFrame::encode(99, payload, &mut buf).unwrap(); + + let length = u32::from_le_bytes(buf[0..4].try_into().unwrap()); + assert_eq!(length, 4 + 4); // code(4) + payload(4) + } + + #[test] + fn request_truncated_header() { + let buf = [0u8; 7]; // less than HEADER_SIZE + assert!(RequestFrame::decode(&buf).is_err()); + } + + #[test] + fn request_truncated_payload() { + let mut buf = BytesMut::new(); + buf.put_u32_le(104); // length = 104 (code + 100 bytes payload) + buf.put_u32_le(1); // code + buf.put_slice(&[0u8; 50]); // only 50 of 100 bytes + assert!(RequestFrame::decode(&buf).is_err()); + } + + #[test] + fn request_length_too_small() { + let mut buf = BytesMut::new(); + buf.put_u32_le(3); // length < 4 (must include code) + buf.put_u32_le(1); + assert!(RequestFrame::decode(&buf).is_err()); + } + + #[test] + fn request_encoded_size() { + assert_eq!(RequestFrame::encoded_size(0), Some(8)); + assert_eq!(RequestFrame::encoded_size(100), Some(108)); + assert_eq!(RequestFrame::encoded_size(usize::MAX), None); + } + + #[test] + fn response_ok_roundtrip() { + let payload = b"response data"; + let mut buf = BytesMut::with_capacity(ResponseFrame::encoded_size(payload.len()).unwrap()); + ResponseFrame::encode_ok(payload, &mut buf).unwrap(); + + let (frame, consumed) = ResponseFrame::decode(&buf).unwrap(); + assert_eq!(consumed, buf.len()); + assert!(frame.is_ok()); + assert_eq!(frame.status, 0); + assert_eq!(frame.payload, payload); + } + + #[test] + fn response_ok_empty_payload() { + let mut buf = BytesMut::new(); + ResponseFrame::encode_ok(&[], &mut buf).unwrap(); + + let (frame, consumed) = ResponseFrame::decode(&buf).unwrap(); + assert_eq!(consumed, 8); + assert!(frame.is_ok()); + assert!(frame.payload.is_empty()); + } + + #[test] + fn response_error_roundtrip() { + let mut buf = BytesMut::new(); + ResponseFrame::encode_error(NonZeroU32::new(1001).unwrap(), &mut buf); + + let (frame, consumed) = ResponseFrame::decode(&buf).unwrap(); + assert_eq!(consumed, 8); + assert!(!frame.is_ok()); + assert_eq!(frame.status, 1001); + assert!(frame.payload.is_empty()); + } + + #[test] + fn response_truncated_header() { + let buf = [0u8; 7]; + assert!(ResponseFrame::decode(&buf).is_err()); + } + + #[test] + fn response_truncated_payload() { + let mut buf = BytesMut::new(); + buf.put_u32_le(0); // status OK + buf.put_u32_le(100); // length = 100 + buf.put_slice(&[0u8; 50]); // only 50 bytes + assert!(ResponseFrame::decode(&buf).is_err()); + } + + #[test] + fn response_encoded_size() { + assert_eq!(ResponseFrame::encoded_size(0), Some(8)); + assert_eq!(ResponseFrame::encoded_size(256), Some(264)); + assert_eq!(ResponseFrame::encoded_size(usize::MAX), None); + } +} diff --git a/core/binary_protocol/src/lib.rs b/core/binary_protocol/src/lib.rs index 9610abf7e..7ed4fdad0 100644 --- a/core/binary_protocol/src/lib.rs +++ b/core/binary_protocol/src/lib.rs @@ -41,12 +41,25 @@ //! //! All multi-byte integers are little-endian. Strings are length-prefixed //! (u8 length for names, u32 length for longer strings). +//! +//! # VSR consensus framing +//! +//! All consensus headers are 256 bytes with `#[repr(C)]` layout. +//! Deserialization is zero-copy via `bytemuck`. The [`Message`] type +//! wraps a `Bytes` buffer with typed header access. +//! +//! - Client-facing: [`RequestHeader`], [`ReplyHeader`] +//! - Replication: [`PrepareHeader`], [`PrepareOkHeader`], [`CommitHeader`] +//! - View change: [`StartViewChangeHeader`], [`DoViewChangeHeader`], +//! [`StartViewHeader`] +//! - Dispatch: [`GenericHeader`] for type-erased initial parsing pub mod codec; pub mod codes; pub mod consensus; +pub mod dispatch; pub mod error; -pub mod frame; +pub mod framing; pub mod message_layout; pub mod message_view; pub mod primitives; @@ -54,10 +67,17 @@ pub mod requests; pub mod responses; pub use codec::{WireDecode, WireEncode}; -pub use codes::*; +pub use consensus::{ + Command2, CommitHeader, ConsensusError, ConsensusHeader, DoViewChangeHeader, GenericHeader, + HEADER_SIZE, Operation, PrepareHeader, PrepareOkHeader, ReplyHeader, RequestHeader, + StartViewChangeHeader, StartViewHeader, message::Message, +}; +pub use dispatch::{COMMAND_TABLE, CommandMeta, lookup_by_operation, lookup_command}; pub use error::WireError; -pub use frame::*; -pub use message_layout::*; +pub use framing::{RequestFrame, ResponseFrame, STATUS_OK}; +pub use message_view::{ + WireMessageIterator, WireMessageIteratorMut, WireMessageView, WireMessageViewMut, +}; pub use primitives::consumer::WireConsumer; pub use primitives::identifier::{MAX_WIRE_NAME_LENGTH, WireIdentifier, WireName}; pub use primitives::partitioning::{MAX_MESSAGES_KEY_LENGTH, WirePartitioning}; diff --git a/core/binary_protocol/src/message_view.rs b/core/binary_protocol/src/message_view.rs index c291d9a7e..3159dac5b 100644 --- a/core/binary_protocol/src/message_view.rs +++ b/core/binary_protocol/src/message_view.rs @@ -26,6 +26,7 @@ use crate::message_layout::{ MSG_PAYLOAD_LEN_OFFSET, MSG_TIMESTAMP_OFFSET, MSG_USER_HEADERS_LEN_OFFSET, WIRE_MESSAGE_HEADER_SIZE, }; +use std::borrow::Cow; // Private helpers for infallible reads on validated buffers @@ -72,7 +73,9 @@ fn validate_frame(buf: &[u8]) -> Result<(usize, usize, usize), WireError> { let total = WIRE_MESSAGE_HEADER_SIZE .checked_add(payload_len) .and_then(|s| s.checked_add(user_headers_len)) - .ok_or_else(|| WireError::Validation("message frame size overflow".to_string()))?; + .ok_or(WireError::Validation(Cow::Borrowed( + "message frame size overflow", + )))?; if buf.len() < total { return Err(WireError::UnexpectedEof { @@ -371,9 +374,9 @@ impl<'a> WireMessageIteratorMut<'a> { .and_then(|s| s.checked_add(user_headers_len)) else { self.remaining = 0; - return Some(Err(WireError::Validation( - "message frame size overflow".to_string(), - ))); + return Some(Err(WireError::Validation(Cow::Borrowed( + "message frame size overflow", + )))); }; (total, rest.len()) }; diff --git a/core/binary_protocol/src/primitives/identifier.rs b/core/binary_protocol/src/primitives/identifier.rs index af25c189c..f28614eba 100644 --- a/core/binary_protocol/src/primitives/identifier.rs +++ b/core/binary_protocol/src/primitives/identifier.rs @@ -18,6 +18,7 @@ use crate::WireError; use crate::codec::{WireDecode, WireEncode, read_bytes, read_str, read_u8}; use bytes::{BufMut, BytesMut}; +use std::borrow::Cow; use std::ops::Deref; // WireName @@ -40,10 +41,10 @@ impl WireName { pub fn new(s: impl Into<String>) -> Result<Self, WireError> { let s = s.into(); if s.is_empty() || s.len() > MAX_WIRE_NAME_LENGTH { - return Err(WireError::Validation(format!( + return Err(WireError::Validation(Cow::Owned(format!( "wire name must be 1-{MAX_WIRE_NAME_LENGTH} bytes, got {}", s.len() - ))); + )))); } Ok(Self(s)) } @@ -102,9 +103,9 @@ impl WireDecode for WireName { fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> { let name_len = read_u8(buf, 0)? as usize; if name_len == 0 || name_len > MAX_WIRE_NAME_LENGTH { - return Err(WireError::Validation(format!( + return Err(WireError::Validation(Cow::Owned(format!( "wire name must be 1-{MAX_WIRE_NAME_LENGTH} bytes, got {name_len}" - ))); + )))); } let name = read_str(buf, 1, name_len)?; Ok((Self(name), 1 + name_len)) @@ -199,9 +200,9 @@ impl WireDecode for WireIdentifier { match kind { KIND_NUMERIC => { if length != NUMERIC_VALUE_LEN as usize { - return Err(WireError::Validation(format!( + return Err(WireError::Validation(Cow::Owned(format!( "numeric identifier must be {NUMERIC_VALUE_LEN} bytes, got {length}" - ))); + )))); } let id = u32::from_le_bytes( value @@ -212,9 +213,9 @@ impl WireDecode for WireIdentifier { } KIND_STRING => { if length == 0 { - return Err(WireError::Validation( - "string identifier cannot be empty".to_string(), - )); + return Err(WireError::Validation(Cow::Borrowed( + "string identifier cannot be empty", + ))); } let s = std::str::from_utf8(value).map_err(|_| WireError::InvalidUtf8 { offset: 2 })?; diff --git a/core/binary_protocol/src/primitives/partitioning.rs b/core/binary_protocol/src/primitives/partitioning.rs index 4bfbf5f58..6e3b277de 100644 --- a/core/binary_protocol/src/primitives/partitioning.rs +++ b/core/binary_protocol/src/primitives/partitioning.rs @@ -18,6 +18,7 @@ use crate::WireError; use crate::codec::{WireDecode, WireEncode, read_bytes, read_u8, read_u32_le}; use bytes::{BufMut, BytesMut}; +use std::borrow::Cow; const KIND_BALANCED: u8 = 1; const KIND_PARTITION_ID: u8 = 2; @@ -49,15 +50,15 @@ impl WirePartitioning { /// Returns `WireError::Validation` if `key` is empty or exceeds 255 bytes. pub fn messages_key(key: Vec<u8>) -> Result<Self, WireError> { if key.is_empty() { - return Err(WireError::Validation( - "messages_key partitioning cannot have empty key".to_string(), - )); + return Err(WireError::Validation(Cow::Borrowed( + "messages_key partitioning cannot have empty key", + ))); } if key.len() > MAX_MESSAGES_KEY_LENGTH { - return Err(WireError::Validation(format!( + return Err(WireError::Validation(Cow::Owned(format!( "messages_key length {} exceeds maximum {MAX_MESSAGES_KEY_LENGTH}", key.len() - ))); + )))); } Ok(Self::MessagesKey(key)) } @@ -107,26 +108,26 @@ impl WireDecode for WirePartitioning { match kind { KIND_BALANCED => { if length != 0 { - return Err(WireError::Validation(format!( + return Err(WireError::Validation(Cow::Owned(format!( "balanced partitioning must have length 0, got {length}" - ))); + )))); } Ok((Self::Balanced, 2)) } KIND_PARTITION_ID => { if length != 4 { - return Err(WireError::Validation(format!( + return Err(WireError::Validation(Cow::Owned(format!( "partition_id partitioning must have length 4, got {length}" - ))); + )))); } let id = read_u32_le(buf, 2)?; Ok((Self::PartitionId(id), 6)) } KIND_MESSAGES_KEY => { if length == 0 { - return Err(WireError::Validation( - "messages_key partitioning cannot have empty key".to_string(), - )); + return Err(WireError::Validation(Cow::Borrowed( + "messages_key partitioning cannot have empty key", + ))); } let key = read_bytes(buf, 2, length)?; Ok((Self::MessagesKey(key.to_vec()), 2 + length)) diff --git a/core/binary_protocol/src/requests/users/create_user.rs b/core/binary_protocol/src/requests/users/create_user.rs index 4348426e9..632e077d9 100644 --- a/core/binary_protocol/src/requests/users/create_user.rs +++ b/core/binary_protocol/src/requests/users/create_user.rs @@ -20,6 +20,7 @@ use crate::codec::{WireDecode, WireEncode, read_str, read_u8, read_u32_le}; use crate::primitives::identifier::WireName; use crate::primitives::permissions::WirePermissions; use bytes::{BufMut, BytesMut}; +use std::borrow::Cow; /// `CreateUser` request. /// @@ -85,9 +86,9 @@ impl WireDecode for CreateUserRequest { let permissions = if has_permissions == 1 && perm_len > 0 { let (perms, consumed) = WirePermissions::decode(&buf[pos..])?; if consumed != perm_len { - return Err(WireError::Validation(format!( + return Err(WireError::Validation(Cow::Owned(format!( "permissions length mismatch: header says {perm_len}, decoded {consumed}" - ))); + )))); } pos += consumed; Some(perms) diff --git a/core/binary_protocol/src/requests/users/update_permissions.rs b/core/binary_protocol/src/requests/users/update_permissions.rs index 6b5bcbbc4..16de85737 100644 --- a/core/binary_protocol/src/requests/users/update_permissions.rs +++ b/core/binary_protocol/src/requests/users/update_permissions.rs @@ -20,6 +20,7 @@ use crate::WireIdentifier; use crate::codec::{WireDecode, WireEncode, read_u8, read_u32_le}; use crate::primitives::permissions::WirePermissions; use bytes::{BufMut, BytesMut}; +use std::borrow::Cow; /// `UpdatePermissions` request. /// @@ -67,9 +68,9 @@ impl WireDecode for UpdatePermissionsRequest { let permissions = if has_permissions == 1 && perm_len > 0 { let (perms, consumed) = WirePermissions::decode(&buf[pos..])?; if consumed != perm_len { - return Err(WireError::Validation(format!( + return Err(WireError::Validation(Cow::Owned(format!( "permissions length mismatch: header says {perm_len}, decoded {consumed}" - ))); + )))); } pos += consumed; Some(perms) diff --git a/core/binary_protocol/src/responses/streams/get_stream.rs b/core/binary_protocol/src/responses/streams/get_stream.rs index 3470e3743..25aae5177 100644 --- a/core/binary_protocol/src/responses/streams/get_stream.rs +++ b/core/binary_protocol/src/responses/streams/get_stream.rs @@ -20,6 +20,7 @@ use crate::codec::{WireDecode, WireEncode, read_u8, read_u32_le, read_u64_le}; use crate::primitives::identifier::WireName; use crate::responses::streams::StreamResponse; use bytes::{BufMut, BytesMut}; +use std::borrow::Cow; /// Topic header within a `GetStream` response. /// @@ -141,11 +142,11 @@ impl WireDecode for GetStreamResponse { topics.push(topic); } if topics.len() != stream.topics_count as usize { - return Err(WireError::Validation(format!( + return Err(WireError::Validation(Cow::Owned(format!( "stream.topics_count={} but decoded {} topics", stream.topics_count, topics.len() - ))); + )))); } Ok((Self { stream, topics }, pos)) } diff --git a/core/binary_protocol/src/responses/topics/get_topic.rs b/core/binary_protocol/src/responses/topics/get_topic.rs index 80b510a9c..b90e52156 100644 --- a/core/binary_protocol/src/responses/topics/get_topic.rs +++ b/core/binary_protocol/src/responses/topics/get_topic.rs @@ -19,6 +19,7 @@ use crate::WireError; use crate::codec::{WireDecode, WireEncode, read_u32_le, read_u64_le}; use crate::responses::streams::get_stream::TopicHeader; use bytes::{BufMut, BytesMut}; +use std::borrow::Cow; /// Partition details within a `GetTopic` response. /// @@ -120,11 +121,11 @@ impl WireDecode for GetTopicResponse { partitions.push(partition); } if partitions.len() != topic.partitions_count as usize { - return Err(WireError::Validation(format!( + return Err(WireError::Validation(Cow::Owned(format!( "topic.partitions_count={} but decoded {} partitions", topic.partitions_count, partitions.len() - ))); + )))); } Ok((Self { topic, partitions }, pos)) }
