krishvishal commented on code in PR #2389:
URL: https://github.com/apache/iggy/pull/2389#discussion_r2559748702
##########
core/common/src/types/consensus/mod.rs:
##########
@@ -19,7 +19,572 @@ use std::marker::PhantomData;
use bytes::Bytes;
-use crate::types::consensus::header::ConsensusHeader;
+use crate::types::consensus::header::{
+ CommitHeader, ConsensusHeader, GenericHeader, PrepareHeader, ReplyHeader,
+};
+
+#[derive(Debug)]
+pub struct Message<H: ConsensusHeader> {
+ buffer: Bytes,
+ _marker: PhantomData<H>,
+}
+
+impl<H> Message<H>
+where
+ H: ConsensusHeader,
+{
+ /// Create a new message from a buffer.
+ ///
+ /// # Safety
+ ///
+ /// The buffer must:
+ /// - be at least `size_of::<H>()` bytes long
+ /// - contain a valid header at the start
+ /// - be properly aligned for type H
+ ///
+ /// # Errors
+ ///
+ /// Returns an error if:
+ /// - buffer is too small for the header
+ /// - buffer is too small for the size specified in the header
+ /// - header validation fails
+ #[allow(unused)]
+ pub fn from_bytes(buffer: Bytes) -> Result<Self, header::ConsensusError> {
+ // verify minimum size
+ if buffer.len() < size_of::<H>() {
+ return Err(header::ConsensusError::InvalidCommand {
+ expected: H::COMMAND,
+ found: header::Command::Reserved,
+ });
+ }
+
+ let message = Self {
+ buffer,
+ _marker: PhantomData,
+ };
+
+ // validate the header
+ message.header().validate()?;
+
+ // verify buffer size matches header.size
+ let header_size = message.header().size() as usize;
+ if message.buffer.len() < header_size {
+ return Err(header::ConsensusError::InvalidCommand {
+ expected: H::COMMAND,
+ found: header::Command::Reserved,
+ });
+ }
+
+ Ok(message)
+ }
+
+ /// Create a new message with a specific size, initializing the buffer
with zeros.
+ ///
+ /// The header will be zeroed and must be initialized by the caller.
+ #[allow(unused)]
+ pub fn new(size: usize) -> Self {
+ assert!(size >= size_of::<H>(), "Size must be at least header size");
+ let buffer = Bytes::from(vec![0u8; size]);
+ Self {
+ buffer,
+ _marker: PhantomData,
+ }
+ }
+
+ /// Get a reference to the header using zero-copy access.
+ ///
+ /// This uses `bytemuck::from_bytes` to cast the buffer to the header type
+ /// without any copying or allocation.
+ #[inline]
+ #[allow(unused)]
+ pub fn header(&self) -> &H {
+ let header_bytes = &self.buffer[..size_of::<H>()];
+ bytemuck::from_bytes(header_bytes)
+ }
+
+ /// Get a reference to the message body (everything after the header).
+ ///
+ /// Returns an empty slice if there is no body.
+ #[inline]
+ #[allow(unused)]
+ pub fn body(&self) -> &[u8] {
+ let header_size = size_of::<H>();
+ let total_size = self.header().size() as usize;
+
+ if total_size > header_size {
+ &self.buffer[header_size..total_size]
+ } else {
+ &[]
+ }
+ }
+
+ /// Get the complete message as bytes (header + body).
+ #[inline]
+ #[allow(unused)]
+ pub fn as_bytes(&self) -> &[u8] {
+ let total_size = self.header().size() as usize;
+ &self.buffer[..total_size]
+ }
+
+ /// Get the underlying buffer.
+ #[inline]
+ #[allow(unused)]
+ pub fn buffer(&self) -> &Bytes {
+ &self.buffer
+ }
+
+ /// Convert into the underlying buffer.
+ #[inline]
+ #[allow(unused)]
+ pub fn into_buffer(self) -> Bytes {
+ self.buffer
+ }
+
+ /// Create a message from a buffer without validation.
+ ///
+ /// # Safety
+ ///
+ /// This is private and skips validation. Use only when:
+ /// - The buffer is already validated
+ /// - If doing a zero-cost type conversion (like to GenericHeader)
+ #[inline]
+ #[allow(unused)]
+ fn from_buffer_unchecked(buffer: Bytes) -> Self {
+ Self {
+ buffer,
+ _marker: PhantomData,
+ }
+ }
+
+ /// Convert to a generic message (erasing the specific header type).
+ ///
+ /// This allows treating any message as a generic message for common
operations.
+ #[allow(unused)]
+ pub fn into_generic(self) -> Message<header::GenericHeader> {
+ Message::from_buffer_unchecked(self.buffer)
+ }
+
+ /// Get a reference to this message as a generic message.
+ #[allow(unused)]
+ pub fn as_generic(&self) -> &Message<header::GenericHeader> {
+ // SAFETY: Message<H> and Message<GenericHeader> have the same memory
layout
+ // because they only differ in the PhantomData type parameter
+ unsafe { &*(self as *const Self as *const
Message<header::GenericHeader>) }
+ }
+
+ /// Try to convert this message to a different header type.
+ ///
+ /// This validates that the command in the header matches the target type's
+ /// expected command before performing the conversion.
+ ///
+ /// # Errors
+ ///
+ /// Returns an error if:
+ /// - The command doesn't match the target type
+ /// - The target header validation fails
+ #[allow(unused)]
+ pub fn try_into_typed<T>(self) -> Result<Message<T>,
header::ConsensusError>
+ where
+ T: ConsensusHeader,
+ {
+ if self.buffer.len() < size_of::<T>() {
+ return Err(header::ConsensusError::InvalidCommand {
+ expected: T::COMMAND,
+ found: header::Command::Reserved,
+ });
+ }
+
+ let generic = self.as_generic();
+ if generic.header().command != T::COMMAND {
+ return Err(header::ConsensusError::InvalidCommand {
+ expected: T::COMMAND,
+ found: generic.header().command,
+ });
+ }
+
+ let new_message = Message::<T>::from_buffer_unchecked(self.buffer);
+
+ new_message.header().validate()?;
+
+ Ok(new_message)
+ }
+
+ /// Try to get a reference to this message as a different header type.
+ ///
+ /// This is similar to `try_into_typed` but borrows instead of consuming.
+ #[allow(unused)]
+ pub fn try_as_typed<T>(&self) -> Result<&Message<T>,
header::ConsensusError>
+ where
+ T: ConsensusHeader,
+ {
+ // check buffer size
+ if self.buffer.len() < size_of::<T>() {
+ return Err(header::ConsensusError::InvalidCommand {
+ expected: T::COMMAND,
+ found: header::Command::Reserved,
+ });
+ }
+
+ // check the command matches
+ let generic = self.as_generic();
+ if generic.header().command != T::COMMAND {
+ return Err(header::ConsensusError::InvalidCommand {
+ expected: T::COMMAND,
+ found: generic.header().command,
+ });
+ }
+
+ let typed_message = unsafe { &*(self as *const Self as *const
Message<T>) };
+
+ // validate the header
+ typed_message.header().validate()?;
+
+ Ok(typed_message)
+ }
+}
+
+#[derive(Debug)]
+pub enum MessageBag {
+ Generic(Message<GenericHeader>),
+ Prepare(Message<PrepareHeader>),
+ Commit(Message<CommitHeader>),
+ Reply(Message<ReplyHeader>),
+}
+
+impl MessageBag {
+ #[allow(unused)]
+ pub fn command(&self) -> header::Command {
+ match self {
+ MessageBag::Generic(message) => message.header().command,
+ MessageBag::Prepare(message) => message.header().command,
+ MessageBag::Commit(message) => message.header().command,
+ MessageBag::Reply(message) => message.header().command,
+ }
+ }
+
+ #[allow(unused)]
+ pub fn size(&self) -> u32 {
+ match self {
+ MessageBag::Generic(message) => message.header().size(),
+ MessageBag::Prepare(message) => message.header().size(),
+ MessageBag::Commit(message) => message.header().size(),
+ MessageBag::Reply(message) => message.header().size(),
+ }
+ }
+
+ #[allow(unused)]
+ pub fn as_prepare(&self) -> Option<&Message<header::PrepareHeader>> {
+ match self {
+ MessageBag::Prepare(m) => Some(m),
+ _ => None,
+ }
+ }
+
+ #[allow(unused)]
+ pub fn as_commit(&self) -> Option<&Message<header::CommitHeader>> {
+ match self {
+ MessageBag::Commit(m) => Some(m),
+ _ => None,
+ }
+ }
+
+ #[allow(unused)]
+ pub fn as_reply(&self) -> Option<&Message<header::ReplyHeader>> {
+ match self {
+ MessageBag::Reply(m) => Some(m),
+ _ => None,
+ }
+ }
+
+ #[allow(unused)]
+ pub fn into_prepare(self) -> Result<Message<header::PrepareHeader>, Self> {
+ match self {
+ MessageBag::Prepare(m) => Ok(m),
+ other => Err(other),
+ }
+ }
+
+ #[allow(unused)]
+ pub fn into_commit(self) -> Result<Message<header::CommitHeader>, Self> {
+ match self {
+ MessageBag::Commit(m) => Ok(m),
+ other => Err(other),
+ }
+ }
+
+ #[allow(unused)]
+ pub fn into_reply(self) -> Result<Message<header::ReplyHeader>, Self> {
+ match self {
+ MessageBag::Reply(m) => Ok(m),
+ other => Err(other),
+ }
+ }
+
+ #[allow(unused)]
+ pub fn into_generic(self) -> Message<header::GenericHeader> {
+ match self {
+ MessageBag::Generic(m) => m,
+ MessageBag::Prepare(m) => m.into_generic(),
+ MessageBag::Commit(m) => m.into_generic(),
+ MessageBag::Reply(m) => m.into_generic(),
+ }
+ }
+
+ #[allow(unused)]
+ pub fn as_generic(&self) -> &Message<header::GenericHeader> {
+ match self {
+ MessageBag::Generic(m) => m,
+ MessageBag::Prepare(m) => m.as_generic(),
+ MessageBag::Commit(m) => m.as_generic(),
+ MessageBag::Reply(m) => m.as_generic(),
+ }
+ }
+}
+
+impl From<Message<header::PrepareHeader>> for MessageBag {
+ fn from(value: Message<header::PrepareHeader>) -> Self {
+ MessageBag::Prepare(value)
+ }
+}
+
+impl From<Message<header::CommitHeader>> for MessageBag {
+ fn from(value: Message<header::CommitHeader>) -> Self {
+ MessageBag::Commit(value)
+ }
+}
+
+impl From<Message<header::ReplyHeader>> for MessageBag {
+ fn from(value: Message<header::ReplyHeader>) -> Self {
+ MessageBag::Reply(value)
+ }
+}
+
+impl From<Message<header::GenericHeader>> for MessageBag {
+ fn from(value: Message<header::GenericHeader>) -> Self {
+ MessageBag::Generic(value)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use bytes::BytesMut;
+
+ use super::*;
+
+ #[test]
+ fn test_message_creation_and_access() {
+ let mut buffer = vec![0u8; 256];
+
+ let header = bytemuck::from_bytes_mut::<header::GenericHeader>(
+ &mut buffer[..size_of::<header::GenericHeader>()],
+ );
+ header.size = 256;
+ header.command = header::Command::Reserved;
+ header.cluster = 123;
+
+ let message =
Message::<header::GenericHeader>::from_bytes(Bytes::from(buffer)).unwrap();
+
+ assert_eq!(message.header().size, 256);
+ assert_eq!(message.header().cluster, 123);
+ assert_eq!(message.header().command, header::Command::Reserved);
+ assert_eq!(
+ message.body().len(),
+ 256 - size_of::<header::GenericHeader>()
+ );
+ }
+
+ #[test]
+ fn test_message_conversion() {
Review Comment:
`test_message_conversion` tests this scenario.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]