numinnex commented on code in PR #2389:
URL: https://github.com/apache/iggy/pull/2389#discussion_r2558896191


##########
core/common/src/types/consensus/mod.rs:
##########
@@ -19,7 +19,572 @@ use std::marker::PhantomData;
 
 use bytes::Bytes;
 
-use crate::types::consensus::header::ConsensusHeader;
+use crate::types::consensus::header::{
+    CommitHeader, ConsensusHeader, GenericHeader, PrepareHeader, ReplyHeader,
+};
+
+#[derive(Debug)]
+pub struct Message<H: ConsensusHeader> {
+    buffer: Bytes,
+    _marker: PhantomData<H>,
+}
+
+impl<H> Message<H>
+where
+    H: ConsensusHeader,
+{
+    /// Create a new message from a buffer.
+    ///
+    /// # Safety
+    ///
+    /// The buffer must:
+    /// - be at least `size_of::<H>()` bytes long
+    /// - contain a valid header at the start
+    /// - be properly aligned for type H
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if:
+    /// - buffer is too small for the header
+    /// - buffer is too small for the size specified in the header
+    /// - header validation fails
+    #[allow(unused)]
+    pub fn from_bytes(buffer: Bytes) -> Result<Self, header::ConsensusError> {
+        // verify minimum size
+        if buffer.len() < size_of::<H>() {
+            return Err(header::ConsensusError::InvalidCommand {
+                expected: H::COMMAND,
+                found: header::Command::Reserved,
+            });
+        }
+
+        let message = Self {
+            buffer,
+            _marker: PhantomData,
+        };
+
+        // validate the header
+        message.header().validate()?;
+
+        // verify buffer size matches header.size
+        let header_size = message.header().size() as usize;
+        if message.buffer.len() < header_size {
+            return Err(header::ConsensusError::InvalidCommand {
+                expected: H::COMMAND,
+                found: header::Command::Reserved,
+            });
+        }
+
+        Ok(message)
+    }
+
+    /// Create a new message with a specific size, initializing the buffer 
with zeros.
+    ///
+    /// The header will be zeroed and must be initialized by the caller.
+    #[allow(unused)]
+    pub fn new(size: usize) -> Self {
+        assert!(size >= size_of::<H>(), "Size must be at least header size");
+        let buffer = Bytes::from(vec![0u8; size]);
+        Self {
+            buffer,
+            _marker: PhantomData,
+        }
+    }
+
+    /// Get a reference to the header using zero-copy access.
+    ///
+    /// This uses `bytemuck::from_bytes` to cast the buffer to the header type
+    /// without any copying or allocation.
+    #[inline]
+    #[allow(unused)]
+    pub fn header(&self) -> &H {
+        let header_bytes = &self.buffer[..size_of::<H>()];
+        bytemuck::from_bytes(header_bytes)
+    }
+
+    /// Get a reference to the message body (everything after the header).
+    ///
+    /// Returns an empty slice if there is no body.
+    #[inline]
+    #[allow(unused)]
+    pub fn body(&self) -> &[u8] {
+        let header_size = size_of::<H>();
+        let total_size = self.header().size() as usize;
+
+        if total_size > header_size {
+            &self.buffer[header_size..total_size]
+        } else {
+            &[]
+        }
+    }
+
+    /// Get the complete message as bytes (header + body).
+    #[inline]
+    #[allow(unused)]
+    pub fn as_bytes(&self) -> &[u8] {
+        let total_size = self.header().size() as usize;
+        &self.buffer[..total_size]
+    }
+
+    /// Get the underlying buffer.
+    #[inline]
+    #[allow(unused)]
+    pub fn buffer(&self) -> &Bytes {
+        &self.buffer
+    }

Review Comment:
   I think the method `as_bytes` from above is enough.



##########
core/common/src/types/consensus/header.rs:
##########
@@ -15,7 +15,290 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use bytemuck::{Pod, Zeroable};
+use thiserror::Error;
+
 #[expect(unused)]
 pub struct Header {}
 
-pub trait ConsensusHeader {}
+pub trait ConsensusHeader: Sized + Pod + Zeroable {
+    const COMMAND: Command;
+
+    fn validate(&self) -> Result<(), ConsensusError>;
+    fn size(&self) -> u32;
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+#[repr(u8)]
+#[expect(unused)]
+pub enum Command {
+    Reserved = 0,
+
+    Ping = 1,
+    Pong = 2,
+    PingClient = 3,
+    PongClient = 4,
+
+    Request = 5,
+    Prepare = 6,
+    PrepareOk = 7,
+    Reply = 8,
+    Commit = 9,
+
+    StartViewChange = 10,
+
+    RequstHeads = 11,
+    RequestPrepare = 12,
+    RequestReply = 13,
+
+    Headers = 14,
+
+    Eviction = 15,
+
+    RequestBlocks = 16,
+    Block = 17,

Review Comment:
   We don't need those for now, we will add them later on. 



##########
core/common/src/types/consensus/mod.rs:
##########
@@ -19,7 +19,572 @@ use std::marker::PhantomData;
 
 use bytes::Bytes;
 
-use crate::types::consensus::header::ConsensusHeader;
+use crate::types::consensus::header::{
+    CommitHeader, ConsensusHeader, GenericHeader, PrepareHeader, ReplyHeader,
+};
+
+#[derive(Debug)]
+pub struct Message<H: ConsensusHeader> {
+    buffer: Bytes,
+    _marker: PhantomData<H>,
+}
+
+impl<H> Message<H>
+where
+    H: ConsensusHeader,
+{
+    /// Create a new message from a buffer.
+    ///
+    /// # Safety
+    ///
+    /// The buffer must:
+    /// - be at least `size_of::<H>()` bytes long
+    /// - contain a valid header at the start
+    /// - be properly aligned for type H
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if:
+    /// - buffer is too small for the header
+    /// - buffer is too small for the size specified in the header
+    /// - header validation fails
+    #[allow(unused)]
+    pub fn from_bytes(buffer: Bytes) -> Result<Self, header::ConsensusError> {
+        // verify minimum size
+        if buffer.len() < size_of::<H>() {
+            return Err(header::ConsensusError::InvalidCommand {
+                expected: H::COMMAND,
+                found: header::Command::Reserved,
+            });
+        }
+
+        let message = Self {
+            buffer,
+            _marker: PhantomData,
+        };
+
+        // validate the header
+        message.header().validate()?;
+
+        // verify buffer size matches header.size
+        let header_size = message.header().size() as usize;
+        if message.buffer.len() < header_size {
+            return Err(header::ConsensusError::InvalidCommand {
+                expected: H::COMMAND,
+                found: header::Command::Reserved,
+            });
+        }
+
+        Ok(message)
+    }
+
+    /// Create a new message with a specific size, initializing the buffer 
with zeros.
+    ///
+    /// The header will be zeroed and must be initialized by the caller.
+    #[allow(unused)]
+    pub fn new(size: usize) -> Self {
+        assert!(size >= size_of::<H>(), "Size must be at least header size");
+        let buffer = Bytes::from(vec![0u8; size]);
+        Self {
+            buffer,
+            _marker: PhantomData,
+        }
+    }
+
+    /// Get a reference to the header using zero-copy access.
+    ///
+    /// This uses `bytemuck::from_bytes` to cast the buffer to the header type
+    /// without any copying or allocation.
+    #[inline]
+    #[allow(unused)]
+    pub fn header(&self) -> &H {
+        let header_bytes = &self.buffer[..size_of::<H>()];
+        bytemuck::from_bytes(header_bytes)
+    }
+
+    /// Get a reference to the message body (everything after the header).
+    ///
+    /// Returns an empty slice if there is no body.
+    #[inline]
+    #[allow(unused)]
+    pub fn body(&self) -> &[u8] {
+        let header_size = size_of::<H>();
+        let total_size = self.header().size() as usize;
+
+        if total_size > header_size {
+            &self.buffer[header_size..total_size]
+        } else {
+            &[]
+        }
+    }
+
+    /// Get the complete message as bytes (header + body).
+    #[inline]
+    #[allow(unused)]
+    pub fn as_bytes(&self) -> &[u8] {
+        let total_size = self.header().size() as usize;
+        &self.buffer[..total_size]
+    }
+
+    /// Get the underlying buffer.
+    #[inline]
+    #[allow(unused)]
+    pub fn buffer(&self) -> &Bytes {
+        &self.buffer
+    }
+
+    /// Convert into the underlying buffer.
+    #[inline]
+    #[allow(unused)]
+    pub fn into_buffer(self) -> Bytes {
+        self.buffer
+    }
+
+    /// Create a message from a buffer without validation.
+    ///
+    /// # Safety
+    ///
+    /// This is private and skips validation. Use only when:
+    /// - The buffer is already validated
+    /// - If doing a zero-cost type conversion (like to GenericHeader)
+    #[inline]
+    #[allow(unused)]
+    fn from_buffer_unchecked(buffer: Bytes) -> Self {
+        Self {
+            buffer,
+            _marker: PhantomData,
+        }
+    }
+
+    /// Convert to a generic message (erasing the specific header type).
+    ///
+    /// This allows treating any message as a generic message for common 
operations.
+    #[allow(unused)]
+    pub fn into_generic(self) -> Message<header::GenericHeader> {
+        Message::from_buffer_unchecked(self.buffer)
+    }
+
+    /// Get a reference to this message as a generic message.
+    #[allow(unused)]
+    pub fn as_generic(&self) -> &Message<header::GenericHeader> {
+        // SAFETY: Message<H> and Message<GenericHeader> have the same memory 
layout
+        // because they only differ in the PhantomData type parameter
+        unsafe { &*(self as *const Self as *const 
Message<header::GenericHeader>) }
+    }
+
+    /// Try to convert this message to a different header type.
+    ///
+    /// This validates that the command in the header matches the target type's
+    /// expected command before performing the conversion.
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if:
+    /// - The command doesn't match the target type
+    /// - The target header validation fails
+    #[allow(unused)]
+    pub fn try_into_typed<T>(self) -> Result<Message<T>, 
header::ConsensusError>
+    where
+        T: ConsensusHeader,
+    {
+        if self.buffer.len() < size_of::<T>() {
+            return Err(header::ConsensusError::InvalidCommand {
+                expected: T::COMMAND,
+                found: header::Command::Reserved,
+            });
+        }
+
+        let generic = self.as_generic();
+        if generic.header().command != T::COMMAND {
+            return Err(header::ConsensusError::InvalidCommand {
+                expected: T::COMMAND,
+                found: generic.header().command,
+            });
+        }
+
+        let new_message = Message::<T>::from_buffer_unchecked(self.buffer);
+
+        new_message.header().validate()?;
+
+        Ok(new_message)
+    }
+
+    /// Try to get a reference to this message as a different header type.
+    ///
+    /// This is similar to `try_into_typed` but borrows instead of consuming.
+    #[allow(unused)]
+    pub fn try_as_typed<T>(&self) -> Result<&Message<T>, 
header::ConsensusError>
+    where
+        T: ConsensusHeader,
+    {
+        // check buffer size
+        if self.buffer.len() < size_of::<T>() {
+            return Err(header::ConsensusError::InvalidCommand {
+                expected: T::COMMAND,
+                found: header::Command::Reserved,
+            });
+        }
+
+        // check the command matches
+        let generic = self.as_generic();
+        if generic.header().command != T::COMMAND {
+            return Err(header::ConsensusError::InvalidCommand {
+                expected: T::COMMAND,
+                found: generic.header().command,
+            });
+        }
+
+        let typed_message = unsafe { &*(self as *const Self as *const 
Message<T>) };
+
+        // validate the header
+        typed_message.header().validate()?;
+
+        Ok(typed_message)
+    }
+}
+
+#[derive(Debug)]
+pub enum MessageBag {
+    Generic(Message<GenericHeader>),
+    Prepare(Message<PrepareHeader>),
+    Commit(Message<CommitHeader>),
+    Reply(Message<ReplyHeader>),
+}
+
+impl MessageBag {
+    #[allow(unused)]
+    pub fn command(&self) -> header::Command {
+        match self {
+            MessageBag::Generic(message) => message.header().command,
+            MessageBag::Prepare(message) => message.header().command,
+            MessageBag::Commit(message) => message.header().command,
+            MessageBag::Reply(message) => message.header().command,
+        }
+    }
+
+    #[allow(unused)]
+    pub fn size(&self) -> u32 {
+        match self {
+            MessageBag::Generic(message) => message.header().size(),
+            MessageBag::Prepare(message) => message.header().size(),
+            MessageBag::Commit(message) => message.header().size(),
+            MessageBag::Reply(message) => message.header().size(),
+        }
+    }
+
+    #[allow(unused)]
+    pub fn as_prepare(&self) -> Option<&Message<header::PrepareHeader>> {
+        match self {
+            MessageBag::Prepare(m) => Some(m),
+            _ => None,
+        }
+    }
+
+    #[allow(unused)]
+    pub fn as_commit(&self) -> Option<&Message<header::CommitHeader>> {
+        match self {
+            MessageBag::Commit(m) => Some(m),
+            _ => None,
+        }
+    }
+
+    #[allow(unused)]
+    pub fn as_reply(&self) -> Option<&Message<header::ReplyHeader>> {
+        match self {
+            MessageBag::Reply(m) => Some(m),
+            _ => None,
+        }
+    }
+
+    #[allow(unused)]
+    pub fn into_prepare(self) -> Result<Message<header::PrepareHeader>, Self> {
+        match self {
+            MessageBag::Prepare(m) => Ok(m),
+            other => Err(other),
+        }
+    }
+
+    #[allow(unused)]
+    pub fn into_commit(self) -> Result<Message<header::CommitHeader>, Self> {
+        match self {
+            MessageBag::Commit(m) => Ok(m),
+            other => Err(other),
+        }
+    }
+
+    #[allow(unused)]
+    pub fn into_reply(self) -> Result<Message<header::ReplyHeader>, Self> {
+        match self {
+            MessageBag::Reply(m) => Ok(m),
+            other => Err(other),
+        }
+    }
+
+    #[allow(unused)]
+    pub fn into_generic(self) -> Message<header::GenericHeader> {
+        match self {
+            MessageBag::Generic(m) => m,
+            MessageBag::Prepare(m) => m.into_generic(),
+            MessageBag::Commit(m) => m.into_generic(),
+            MessageBag::Reply(m) => m.into_generic(),
+        }
+    }
+
+    #[allow(unused)]
+    pub fn as_generic(&self) -> &Message<header::GenericHeader> {
+        match self {
+            MessageBag::Generic(m) => m,
+            MessageBag::Prepare(m) => m.as_generic(),
+            MessageBag::Commit(m) => m.as_generic(),
+            MessageBag::Reply(m) => m.as_generic(),
+        }
+    }

Review Comment:
   Let's not expose such methods, we want to `match` on the `MessageBag` on the 
call site and depending on the variant dispatch to appropriate methods.



##########
core/common/src/types/consensus/mod.rs:
##########
@@ -19,7 +19,572 @@ use std::marker::PhantomData;
 
 use bytes::Bytes;
 
-use crate::types::consensus::header::ConsensusHeader;
+use crate::types::consensus::header::{
+    CommitHeader, ConsensusHeader, GenericHeader, PrepareHeader, ReplyHeader,
+};
+
+#[derive(Debug)]
+pub struct Message<H: ConsensusHeader> {
+    buffer: Bytes,
+    _marker: PhantomData<H>,
+}
+
+impl<H> Message<H>
+where
+    H: ConsensusHeader,
+{
+    /// Create a new message from a buffer.
+    ///
+    /// # Safety
+    ///
+    /// The buffer must:
+    /// - be at least `size_of::<H>()` bytes long
+    /// - contain a valid header at the start
+    /// - be properly aligned for type H
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if:
+    /// - buffer is too small for the header
+    /// - buffer is too small for the size specified in the header
+    /// - header validation fails
+    #[allow(unused)]
+    pub fn from_bytes(buffer: Bytes) -> Result<Self, header::ConsensusError> {
+        // verify minimum size
+        if buffer.len() < size_of::<H>() {
+            return Err(header::ConsensusError::InvalidCommand {
+                expected: H::COMMAND,
+                found: header::Command::Reserved,
+            });
+        }
+
+        let message = Self {
+            buffer,
+            _marker: PhantomData,
+        };
+
+        // validate the header
+        message.header().validate()?;
+
+        // verify buffer size matches header.size
+        let header_size = message.header().size() as usize;
+        if message.buffer.len() < header_size {
+            return Err(header::ConsensusError::InvalidCommand {
+                expected: H::COMMAND,
+                found: header::Command::Reserved,
+            });
+        }
+
+        Ok(message)
+    }
+
+    /// Create a new message with a specific size, initializing the buffer 
with zeros.
+    ///
+    /// The header will be zeroed and must be initialized by the caller.
+    #[allow(unused)]
+    pub fn new(size: usize) -> Self {
+        assert!(size >= size_of::<H>(), "Size must be at least header size");
+        let buffer = Bytes::from(vec![0u8; size]);
+        Self {
+            buffer,
+            _marker: PhantomData,
+        }
+    }
+
+    /// Get a reference to the header using zero-copy access.
+    ///
+    /// This uses `bytemuck::from_bytes` to cast the buffer to the header type
+    /// without any copying or allocation.
+    #[inline]
+    #[allow(unused)]
+    pub fn header(&self) -> &H {
+        let header_bytes = &self.buffer[..size_of::<H>()];
+        bytemuck::from_bytes(header_bytes)
+    }
+
+    /// Get a reference to the message body (everything after the header).
+    ///
+    /// Returns an empty slice if there is no body.
+    #[inline]
+    #[allow(unused)]
+    pub fn body(&self) -> &[u8] {
+        let header_size = size_of::<H>();
+        let total_size = self.header().size() as usize;
+
+        if total_size > header_size {
+            &self.buffer[header_size..total_size]
+        } else {
+            &[]
+        }
+    }
+
+    /// Get the complete message as bytes (header + body).
+    #[inline]
+    #[allow(unused)]
+    pub fn as_bytes(&self) -> &[u8] {
+        let total_size = self.header().size() as usize;
+        &self.buffer[..total_size]
+    }
+
+    /// Get the underlying buffer.
+    #[inline]
+    #[allow(unused)]
+    pub fn buffer(&self) -> &Bytes {
+        &self.buffer
+    }
+
+    /// Convert into the underlying buffer.
+    #[inline]
+    #[allow(unused)]
+    pub fn into_buffer(self) -> Bytes {
+        self.buffer
+    }

Review Comment:
   Maybe lets call this `into_inner` 



##########
core/common/src/types/consensus/mod.rs:
##########
@@ -19,7 +19,572 @@ use std::marker::PhantomData;
 
 use bytes::Bytes;
 
-use crate::types::consensus::header::ConsensusHeader;
+use crate::types::consensus::header::{
+    CommitHeader, ConsensusHeader, GenericHeader, PrepareHeader, ReplyHeader,
+};
+
+#[derive(Debug)]
+pub struct Message<H: ConsensusHeader> {
+    buffer: Bytes,
+    _marker: PhantomData<H>,
+}
+
+impl<H> Message<H>
+where
+    H: ConsensusHeader,
+{
+    /// Create a new message from a buffer.
+    ///
+    /// # Safety
+    ///
+    /// The buffer must:
+    /// - be at least `size_of::<H>()` bytes long
+    /// - contain a valid header at the start
+    /// - be properly aligned for type H
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if:
+    /// - buffer is too small for the header
+    /// - buffer is too small for the size specified in the header
+    /// - header validation fails
+    #[allow(unused)]
+    pub fn from_bytes(buffer: Bytes) -> Result<Self, header::ConsensusError> {
+        // verify minimum size
+        if buffer.len() < size_of::<H>() {
+            return Err(header::ConsensusError::InvalidCommand {
+                expected: H::COMMAND,
+                found: header::Command::Reserved,
+            });
+        }
+
+        let message = Self {
+            buffer,
+            _marker: PhantomData,
+        };
+
+        // validate the header
+        message.header().validate()?;
+
+        // verify buffer size matches header.size
+        let header_size = message.header().size() as usize;
+        if message.buffer.len() < header_size {
+            return Err(header::ConsensusError::InvalidCommand {
+                expected: H::COMMAND,
+                found: header::Command::Reserved,
+            });
+        }
+
+        Ok(message)
+    }
+
+    /// Create a new message with a specific size, initializing the buffer 
with zeros.
+    ///
+    /// The header will be zeroed and must be initialized by the caller.
+    #[allow(unused)]
+    pub fn new(size: usize) -> Self {
+        assert!(size >= size_of::<H>(), "Size must be at least header size");
+        let buffer = Bytes::from(vec![0u8; size]);
+        Self {
+            buffer,
+            _marker: PhantomData,
+        }
+    }
+
+    /// Get a reference to the header using zero-copy access.
+    ///
+    /// This uses `bytemuck::from_bytes` to cast the buffer to the header type
+    /// without any copying or allocation.
+    #[inline]
+    #[allow(unused)]
+    pub fn header(&self) -> &H {
+        let header_bytes = &self.buffer[..size_of::<H>()];
+        bytemuck::from_bytes(header_bytes)
+    }
+
+    /// Get a reference to the message body (everything after the header).
+    ///
+    /// Returns an empty slice if there is no body.
+    #[inline]
+    #[allow(unused)]
+    pub fn body(&self) -> &[u8] {
+        let header_size = size_of::<H>();
+        let total_size = self.header().size() as usize;
+
+        if total_size > header_size {
+            &self.buffer[header_size..total_size]
+        } else {
+            &[]
+        }
+    }
+
+    /// Get the complete message as bytes (header + body).
+    #[inline]
+    #[allow(unused)]
+    pub fn as_bytes(&self) -> &[u8] {
+        let total_size = self.header().size() as usize;
+        &self.buffer[..total_size]
+    }
+
+    /// Get the underlying buffer.
+    #[inline]
+    #[allow(unused)]
+    pub fn buffer(&self) -> &Bytes {
+        &self.buffer
+    }
+
+    /// Convert into the underlying buffer.
+    #[inline]
+    #[allow(unused)]
+    pub fn into_buffer(self) -> Bytes {
+        self.buffer
+    }
+
+    /// Create a message from a buffer without validation.
+    ///
+    /// # Safety
+    ///
+    /// This is private and skips validation. Use only when:
+    /// - The buffer is already validated
+    /// - If doing a zero-cost type conversion (like to GenericHeader)
+    #[inline]
+    #[allow(unused)]
+    fn from_buffer_unchecked(buffer: Bytes) -> Self {
+        Self {
+            buffer,
+            _marker: PhantomData,
+        }
+    }
+
+    /// Convert to a generic message (erasing the specific header type).
+    ///
+    /// This allows treating any message as a generic message for common 
operations.
+    #[allow(unused)]
+    pub fn into_generic(self) -> Message<header::GenericHeader> {
+        Message::from_buffer_unchecked(self.buffer)
+    }
+
+    /// Get a reference to this message as a generic message.
+    #[allow(unused)]
+    pub fn as_generic(&self) -> &Message<header::GenericHeader> {
+        // SAFETY: Message<H> and Message<GenericHeader> have the same memory 
layout
+        // because they only differ in the PhantomData type parameter
+        unsafe { &*(self as *const Self as *const 
Message<header::GenericHeader>) }
+    }
+
+    /// Try to convert this message to a different header type.
+    ///
+    /// This validates that the command in the header matches the target type's
+    /// expected command before performing the conversion.
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if:
+    /// - The command doesn't match the target type
+    /// - The target header validation fails
+    #[allow(unused)]
+    pub fn try_into_typed<T>(self) -> Result<Message<T>, 
header::ConsensusError>
+    where
+        T: ConsensusHeader,
+    {
+        if self.buffer.len() < size_of::<T>() {
+            return Err(header::ConsensusError::InvalidCommand {
+                expected: T::COMMAND,
+                found: header::Command::Reserved,
+            });
+        }
+
+        let generic = self.as_generic();
+        if generic.header().command != T::COMMAND {
+            return Err(header::ConsensusError::InvalidCommand {
+                expected: T::COMMAND,
+                found: generic.header().command,
+            });
+        }
+
+        let new_message = Message::<T>::from_buffer_unchecked(self.buffer);
+
+        new_message.header().validate()?;
+
+        Ok(new_message)
+    }
+
+    /// Try to get a reference to this message as a different header type.
+    ///
+    /// This is similar to `try_into_typed` but borrows instead of consuming.
+    #[allow(unused)]
+    pub fn try_as_typed<T>(&self) -> Result<&Message<T>, 
header::ConsensusError>
+    where
+        T: ConsensusHeader,
+    {
+        // check buffer size
+        if self.buffer.len() < size_of::<T>() {
+            return Err(header::ConsensusError::InvalidCommand {
+                expected: T::COMMAND,
+                found: header::Command::Reserved,
+            });
+        }
+
+        // check the command matches
+        let generic = self.as_generic();
+        if generic.header().command != T::COMMAND {
+            return Err(header::ConsensusError::InvalidCommand {
+                expected: T::COMMAND,
+                found: generic.header().command,
+            });
+        }
+
+        let typed_message = unsafe { &*(self as *const Self as *const 
Message<T>) };
+
+        // validate the header
+        typed_message.header().validate()?;
+
+        Ok(typed_message)
+    }
+}
+
+#[derive(Debug)]
+pub enum MessageBag {
+    Generic(Message<GenericHeader>),
+    Prepare(Message<PrepareHeader>),
+    Commit(Message<CommitHeader>),
+    Reply(Message<ReplyHeader>),
+}
+
+impl MessageBag {
+    #[allow(unused)]
+    pub fn command(&self) -> header::Command {
+        match self {
+            MessageBag::Generic(message) => message.header().command,
+            MessageBag::Prepare(message) => message.header().command,
+            MessageBag::Commit(message) => message.header().command,
+            MessageBag::Reply(message) => message.header().command,
+        }
+    }
+
+    #[allow(unused)]
+    pub fn size(&self) -> u32 {
+        match self {
+            MessageBag::Generic(message) => message.header().size(),
+            MessageBag::Prepare(message) => message.header().size(),
+            MessageBag::Commit(message) => message.header().size(),
+            MessageBag::Reply(message) => message.header().size(),
+        }
+    }
+
+    #[allow(unused)]
+    pub fn as_prepare(&self) -> Option<&Message<header::PrepareHeader>> {
+        match self {
+            MessageBag::Prepare(m) => Some(m),
+            _ => None,
+        }
+    }
+
+    #[allow(unused)]
+    pub fn as_commit(&self) -> Option<&Message<header::CommitHeader>> {
+        match self {
+            MessageBag::Commit(m) => Some(m),
+            _ => None,
+        }
+    }
+
+    #[allow(unused)]
+    pub fn as_reply(&self) -> Option<&Message<header::ReplyHeader>> {
+        match self {
+            MessageBag::Reply(m) => Some(m),
+            _ => None,
+        }
+    }
+
+    #[allow(unused)]
+    pub fn into_prepare(self) -> Result<Message<header::PrepareHeader>, Self> {
+        match self {
+            MessageBag::Prepare(m) => Ok(m),
+            other => Err(other),
+        }
+    }
+
+    #[allow(unused)]
+    pub fn into_commit(self) -> Result<Message<header::CommitHeader>, Self> {
+        match self {
+            MessageBag::Commit(m) => Ok(m),
+            other => Err(other),
+        }
+    }
+
+    #[allow(unused)]
+    pub fn into_reply(self) -> Result<Message<header::ReplyHeader>, Self> {
+        match self {
+            MessageBag::Reply(m) => Ok(m),
+            other => Err(other),
+        }
+    }
+
+    #[allow(unused)]
+    pub fn into_generic(self) -> Message<header::GenericHeader> {
+        match self {
+            MessageBag::Generic(m) => m,
+            MessageBag::Prepare(m) => m.into_generic(),
+            MessageBag::Commit(m) => m.into_generic(),
+            MessageBag::Reply(m) => m.into_generic(),
+        }
+    }
+
+    #[allow(unused)]
+    pub fn as_generic(&self) -> &Message<header::GenericHeader> {
+        match self {
+            MessageBag::Generic(m) => m,
+            MessageBag::Prepare(m) => m.as_generic(),
+            MessageBag::Commit(m) => m.as_generic(),
+            MessageBag::Reply(m) => m.as_generic(),
+        }
+    }
+}
+
+impl From<Message<header::PrepareHeader>> for MessageBag {
+    fn from(value: Message<header::PrepareHeader>) -> Self {
+        MessageBag::Prepare(value)
+    }
+}
+
+impl From<Message<header::CommitHeader>> for MessageBag {
+    fn from(value: Message<header::CommitHeader>) -> Self {
+        MessageBag::Commit(value)
+    }
+}
+
+impl From<Message<header::ReplyHeader>> for MessageBag {
+    fn from(value: Message<header::ReplyHeader>) -> Self {
+        MessageBag::Reply(value)
+    }
+}
+
+impl From<Message<header::GenericHeader>> for MessageBag {
+    fn from(value: Message<header::GenericHeader>) -> Self {
+        MessageBag::Generic(value)
+    }
+}

Review Comment:
   I think you can avoid writing down all of those translations by doing 
something like this 
   
   ```rs
   impl From<Message<T: Header>> for MessageBag {
       fn from(value: Message<T>) -> Self {
           match T::Command {
                // Here put the code.
           }
       }
   }
   ```



##########
core/common/src/types/consensus/mod.rs:
##########
@@ -19,7 +19,572 @@ use std::marker::PhantomData;
 
 use bytes::Bytes;
 
-use crate::types::consensus::header::ConsensusHeader;
+use crate::types::consensus::header::{
+    CommitHeader, ConsensusHeader, GenericHeader, PrepareHeader, ReplyHeader,
+};
+
+#[derive(Debug)]
+pub struct Message<H: ConsensusHeader> {
+    buffer: Bytes,
+    _marker: PhantomData<H>,
+}
+
+impl<H> Message<H>
+where
+    H: ConsensusHeader,
+{
+    /// Create a new message from a buffer.
+    ///
+    /// # Safety
+    ///
+    /// The buffer must:
+    /// - be at least `size_of::<H>()` bytes long
+    /// - contain a valid header at the start
+    /// - be properly aligned for type H
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if:
+    /// - buffer is too small for the header
+    /// - buffer is too small for the size specified in the header
+    /// - header validation fails
+    #[allow(unused)]
+    pub fn from_bytes(buffer: Bytes) -> Result<Self, header::ConsensusError> {
+        // verify minimum size
+        if buffer.len() < size_of::<H>() {
+            return Err(header::ConsensusError::InvalidCommand {
+                expected: H::COMMAND,
+                found: header::Command::Reserved,
+            });
+        }
+
+        let message = Self {
+            buffer,
+            _marker: PhantomData,
+        };
+
+        // validate the header
+        message.header().validate()?;
+
+        // verify buffer size matches header.size
+        let header_size = message.header().size() as usize;
+        if message.buffer.len() < header_size {
+            return Err(header::ConsensusError::InvalidCommand {
+                expected: H::COMMAND,
+                found: header::Command::Reserved,
+            });
+        }
+
+        Ok(message)
+    }
+
+    /// Create a new message with a specific size, initializing the buffer 
with zeros.
+    ///
+    /// The header will be zeroed and must be initialized by the caller.
+    #[allow(unused)]
+    pub fn new(size: usize) -> Self {
+        assert!(size >= size_of::<H>(), "Size must be at least header size");
+        let buffer = Bytes::from(vec![0u8; size]);
+        Self {
+            buffer,
+            _marker: PhantomData,
+        }
+    }
+
+    /// Get a reference to the header using zero-copy access.
+    ///
+    /// This uses `bytemuck::from_bytes` to cast the buffer to the header type
+    /// without any copying or allocation.
+    #[inline]
+    #[allow(unused)]
+    pub fn header(&self) -> &H {
+        let header_bytes = &self.buffer[..size_of::<H>()];
+        bytemuck::from_bytes(header_bytes)
+    }
+
+    /// Get a reference to the message body (everything after the header).
+    ///
+    /// Returns an empty slice if there is no body.
+    #[inline]
+    #[allow(unused)]
+    pub fn body(&self) -> &[u8] {
+        let header_size = size_of::<H>();
+        let total_size = self.header().size() as usize;
+
+        if total_size > header_size {
+            &self.buffer[header_size..total_size]
+        } else {
+            &[]
+        }
+    }
+
+    /// Get the complete message as bytes (header + body).
+    #[inline]
+    #[allow(unused)]
+    pub fn as_bytes(&self) -> &[u8] {
+        let total_size = self.header().size() as usize;
+        &self.buffer[..total_size]
+    }
+
+    /// Get the underlying buffer.
+    #[inline]
+    #[allow(unused)]
+    pub fn buffer(&self) -> &Bytes {
+        &self.buffer
+    }
+
+    /// Convert into the underlying buffer.
+    #[inline]
+    #[allow(unused)]
+    pub fn into_buffer(self) -> Bytes {
+        self.buffer
+    }
+
+    /// Create a message from a buffer without validation.
+    ///
+    /// # Safety
+    ///
+    /// This is private and skips validation. Use only when:
+    /// - The buffer is already validated
+    /// - If doing a zero-cost type conversion (like to GenericHeader)
+    #[inline]
+    #[allow(unused)]
+    fn from_buffer_unchecked(buffer: Bytes) -> Self {
+        Self {
+            buffer,
+            _marker: PhantomData,
+        }
+    }

Review Comment:
   mark this method as `unsafe`



##########
core/common/src/types/consensus/mod.rs:
##########
@@ -19,7 +19,572 @@ use std::marker::PhantomData;
 
 use bytes::Bytes;
 
-use crate::types::consensus::header::ConsensusHeader;
+use crate::types::consensus::header::{
+    CommitHeader, ConsensusHeader, GenericHeader, PrepareHeader, ReplyHeader,
+};
+
+#[derive(Debug)]
+pub struct Message<H: ConsensusHeader> {
+    buffer: Bytes,
+    _marker: PhantomData<H>,
+}
+
+impl<H> Message<H>
+where
+    H: ConsensusHeader,
+{
+    /// Create a new message from a buffer.
+    ///
+    /// # Safety
+    ///
+    /// The buffer must:
+    /// - be at least `size_of::<H>()` bytes long
+    /// - contain a valid header at the start
+    /// - be properly aligned for type H
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if:
+    /// - buffer is too small for the header
+    /// - buffer is too small for the size specified in the header
+    /// - header validation fails
+    #[allow(unused)]
+    pub fn from_bytes(buffer: Bytes) -> Result<Self, header::ConsensusError> {
+        // verify minimum size
+        if buffer.len() < size_of::<H>() {
+            return Err(header::ConsensusError::InvalidCommand {
+                expected: H::COMMAND,
+                found: header::Command::Reserved,
+            });
+        }
+
+        let message = Self {
+            buffer,
+            _marker: PhantomData,
+        };
+
+        // validate the header
+        message.header().validate()?;
+
+        // verify buffer size matches header.size
+        let header_size = message.header().size() as usize;
+        if message.buffer.len() < header_size {
+            return Err(header::ConsensusError::InvalidCommand {
+                expected: H::COMMAND,
+                found: header::Command::Reserved,
+            });
+        }
+
+        Ok(message)
+    }
+
+    /// Create a new message with a specific size, initializing the buffer 
with zeros.
+    ///
+    /// The header will be zeroed and must be initialized by the caller.
+    #[allow(unused)]
+    pub fn new(size: usize) -> Self {
+        assert!(size >= size_of::<H>(), "Size must be at least header size");
+        let buffer = Bytes::from(vec![0u8; size]);
+        Self {
+            buffer,
+            _marker: PhantomData,
+        }
+    }
+
+    /// Get a reference to the header using zero-copy access.
+    ///
+    /// This uses `bytemuck::from_bytes` to cast the buffer to the header type
+    /// without any copying or allocation.
+    #[inline]
+    #[allow(unused)]
+    pub fn header(&self) -> &H {
+        let header_bytes = &self.buffer[..size_of::<H>()];
+        bytemuck::from_bytes(header_bytes)
+    }
+
+    /// Get a reference to the message body (everything after the header).
+    ///
+    /// Returns an empty slice if there is no body.
+    #[inline]
+    #[allow(unused)]
+    pub fn body(&self) -> &[u8] {
+        let header_size = size_of::<H>();
+        let total_size = self.header().size() as usize;
+
+        if total_size > header_size {
+            &self.buffer[header_size..total_size]
+        } else {
+            &[]
+        }
+    }
+
+    /// Get the complete message as bytes (header + body).
+    #[inline]
+    #[allow(unused)]
+    pub fn as_bytes(&self) -> &[u8] {
+        let total_size = self.header().size() as usize;
+        &self.buffer[..total_size]
+    }
+
+    /// Get the underlying buffer.
+    #[inline]
+    #[allow(unused)]
+    pub fn buffer(&self) -> &Bytes {
+        &self.buffer
+    }
+
+    /// Convert into the underlying buffer.
+    #[inline]
+    #[allow(unused)]
+    pub fn into_buffer(self) -> Bytes {
+        self.buffer
+    }
+
+    /// Create a message from a buffer without validation.
+    ///
+    /// # Safety
+    ///
+    /// This is private and skips validation. Use only when:
+    /// - The buffer is already validated
+    /// - If doing a zero-cost type conversion (like to GenericHeader)
+    #[inline]
+    #[allow(unused)]
+    fn from_buffer_unchecked(buffer: Bytes) -> Self {
+        Self {
+            buffer,
+            _marker: PhantomData,
+        }
+    }
+
+    /// Convert to a generic message (erasing the specific header type).
+    ///
+    /// This allows treating any message as a generic message for common 
operations.
+    #[allow(unused)]
+    pub fn into_generic(self) -> Message<header::GenericHeader> {
+        Message::from_buffer_unchecked(self.buffer)
+    }
+
+    /// Get a reference to this message as a generic message.
+    #[allow(unused)]
+    pub fn as_generic(&self) -> &Message<header::GenericHeader> {
+        // SAFETY: Message<H> and Message<GenericHeader> have the same memory 
layout
+        // because they only differ in the PhantomData type parameter
+        unsafe { &*(self as *const Self as *const 
Message<header::GenericHeader>) }
+    }
+
+    /// Try to convert this message to a different header type.
+    ///
+    /// This validates that the command in the header matches the target type's
+    /// expected command before performing the conversion.
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if:
+    /// - The command doesn't match the target type
+    /// - The target header validation fails
+    #[allow(unused)]
+    pub fn try_into_typed<T>(self) -> Result<Message<T>, 
header::ConsensusError>
+    where
+        T: ConsensusHeader,
+    {
+        if self.buffer.len() < size_of::<T>() {
+            return Err(header::ConsensusError::InvalidCommand {
+                expected: T::COMMAND,
+                found: header::Command::Reserved,
+            });
+        }
+
+        let generic = self.as_generic();
+        if generic.header().command != T::COMMAND {
+            return Err(header::ConsensusError::InvalidCommand {
+                expected: T::COMMAND,
+                found: generic.header().command,
+            });
+        }
+
+        let new_message = Message::<T>::from_buffer_unchecked(self.buffer);
+
+        new_message.header().validate()?;
+
+        Ok(new_message)
+    }
+
+    /// Try to get a reference to this message as a different header type.
+    ///
+    /// This is similar to `try_into_typed` but borrows instead of consuming.
+    #[allow(unused)]
+    pub fn try_as_typed<T>(&self) -> Result<&Message<T>, 
header::ConsensusError>
+    where
+        T: ConsensusHeader,
+    {
+        // check buffer size
+        if self.buffer.len() < size_of::<T>() {
+            return Err(header::ConsensusError::InvalidCommand {
+                expected: T::COMMAND,
+                found: header::Command::Reserved,
+            });
+        }
+
+        // check the command matches
+        let generic = self.as_generic();
+        if generic.header().command != T::COMMAND {
+            return Err(header::ConsensusError::InvalidCommand {
+                expected: T::COMMAND,
+                found: generic.header().command,
+            });
+        }
+
+        let typed_message = unsafe { &*(self as *const Self as *const 
Message<T>) };
+
+        // validate the header
+        typed_message.header().validate()?;
+
+        Ok(typed_message)
+    }
+}
+
+#[derive(Debug)]
+pub enum MessageBag {
+    Generic(Message<GenericHeader>),
+    Prepare(Message<PrepareHeader>),
+    Commit(Message<CommitHeader>),
+    Reply(Message<ReplyHeader>),
+}
+
+impl MessageBag {
+    #[allow(unused)]
+    pub fn command(&self) -> header::Command {
+        match self {
+            MessageBag::Generic(message) => message.header().command,
+            MessageBag::Prepare(message) => message.header().command,
+            MessageBag::Commit(message) => message.header().command,
+            MessageBag::Reply(message) => message.header().command,
+        }
+    }
+
+    #[allow(unused)]
+    pub fn size(&self) -> u32 {
+        match self {
+            MessageBag::Generic(message) => message.header().size(),
+            MessageBag::Prepare(message) => message.header().size(),
+            MessageBag::Commit(message) => message.header().size(),
+            MessageBag::Reply(message) => message.header().size(),
+        }
+    }
+
+    #[allow(unused)]
+    pub fn as_prepare(&self) -> Option<&Message<header::PrepareHeader>> {
+        match self {
+            MessageBag::Prepare(m) => Some(m),
+            _ => None,
+        }
+    }
+
+    #[allow(unused)]
+    pub fn as_commit(&self) -> Option<&Message<header::CommitHeader>> {
+        match self {
+            MessageBag::Commit(m) => Some(m),
+            _ => None,
+        }
+    }
+
+    #[allow(unused)]
+    pub fn as_reply(&self) -> Option<&Message<header::ReplyHeader>> {
+        match self {
+            MessageBag::Reply(m) => Some(m),
+            _ => None,
+        }
+    }
+
+    #[allow(unused)]
+    pub fn into_prepare(self) -> Result<Message<header::PrepareHeader>, Self> {
+        match self {
+            MessageBag::Prepare(m) => Ok(m),
+            other => Err(other),
+        }
+    }
+
+    #[allow(unused)]
+    pub fn into_commit(self) -> Result<Message<header::CommitHeader>, Self> {
+        match self {
+            MessageBag::Commit(m) => Ok(m),
+            other => Err(other),
+        }
+    }
+
+    #[allow(unused)]
+    pub fn into_reply(self) -> Result<Message<header::ReplyHeader>, Self> {
+        match self {
+            MessageBag::Reply(m) => Ok(m),
+            other => Err(other),
+        }
+    }
+
+    #[allow(unused)]
+    pub fn into_generic(self) -> Message<header::GenericHeader> {
+        match self {
+            MessageBag::Generic(m) => m,
+            MessageBag::Prepare(m) => m.into_generic(),
+            MessageBag::Commit(m) => m.into_generic(),
+            MessageBag::Reply(m) => m.into_generic(),
+        }
+    }
+
+    #[allow(unused)]
+    pub fn as_generic(&self) -> &Message<header::GenericHeader> {
+        match self {
+            MessageBag::Generic(m) => m,
+            MessageBag::Prepare(m) => m.as_generic(),
+            MessageBag::Commit(m) => m.as_generic(),
+            MessageBag::Reply(m) => m.as_generic(),
+        }
+    }
+}
+
+impl From<Message<header::PrepareHeader>> for MessageBag {
+    fn from(value: Message<header::PrepareHeader>) -> Self {
+        MessageBag::Prepare(value)
+    }
+}
+
+impl From<Message<header::CommitHeader>> for MessageBag {
+    fn from(value: Message<header::CommitHeader>) -> Self {
+        MessageBag::Commit(value)
+    }
+}
+
+impl From<Message<header::ReplyHeader>> for MessageBag {
+    fn from(value: Message<header::ReplyHeader>) -> Self {
+        MessageBag::Reply(value)
+    }
+}
+
+impl From<Message<header::GenericHeader>> for MessageBag {
+    fn from(value: Message<header::GenericHeader>) -> Self {
+        MessageBag::Generic(value)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use bytes::BytesMut;
+
+    use super::*;
+
+    #[test]
+    fn test_message_creation_and_access() {
+        let mut buffer = vec![0u8; 256];
+
+        let header = bytemuck::from_bytes_mut::<header::GenericHeader>(
+            &mut buffer[..size_of::<header::GenericHeader>()],
+        );
+        header.size = 256;
+        header.command = header::Command::Reserved;
+        header.cluster = 123;
+
+        let message = 
Message::<header::GenericHeader>::from_bytes(Bytes::from(buffer)).unwrap();
+
+        assert_eq!(message.header().size, 256);
+        assert_eq!(message.header().cluster, 123);
+        assert_eq!(message.header().command, header::Command::Reserved);
+        assert_eq!(
+            message.body().len(),
+            256 - size_of::<header::GenericHeader>()
+        );
+    }
+
+    #[test]
+    fn test_message_conversion() {

Review Comment:
   I think for testing, it's only worth testing following scenario:
   
   - Create an particular `Message` with header and validate whether the bytes 
are exactly the same (create a message, turn it into byte representation, store 
those bytes in some variable by copying them, recreate the message using the 
zero-copy api,, turn it again into bytes and assert whether the bytes are 
exactly the same as those bytes that got copied for reference
   



##########
core/common/src/types/consensus/mod.rs:
##########
@@ -19,7 +19,572 @@ use std::marker::PhantomData;
 
 use bytes::Bytes;
 
-use crate::types::consensus::header::ConsensusHeader;
+use crate::types::consensus::header::{
+    CommitHeader, ConsensusHeader, GenericHeader, PrepareHeader, ReplyHeader,
+};
+
+#[derive(Debug)]
+pub struct Message<H: ConsensusHeader> {
+    buffer: Bytes,
+    _marker: PhantomData<H>,
+}
+
+impl<H> Message<H>
+where
+    H: ConsensusHeader,
+{
+    /// Create a new message from a buffer.
+    ///
+    /// # Safety
+    ///
+    /// The buffer must:
+    /// - be at least `size_of::<H>()` bytes long
+    /// - contain a valid header at the start
+    /// - be properly aligned for type H
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if:
+    /// - buffer is too small for the header
+    /// - buffer is too small for the size specified in the header
+    /// - header validation fails
+    #[allow(unused)]
+    pub fn from_bytes(buffer: Bytes) -> Result<Self, header::ConsensusError> {
+        // verify minimum size
+        if buffer.len() < size_of::<H>() {
+            return Err(header::ConsensusError::InvalidCommand {
+                expected: H::COMMAND,
+                found: header::Command::Reserved,
+            });
+        }
+
+        let message = Self {
+            buffer,
+            _marker: PhantomData,
+        };
+
+        // validate the header
+        message.header().validate()?;
+
+        // verify buffer size matches header.size
+        let header_size = message.header().size() as usize;
+        if message.buffer.len() < header_size {
+            return Err(header::ConsensusError::InvalidCommand {
+                expected: H::COMMAND,
+                found: header::Command::Reserved,
+            });
+        }
+
+        Ok(message)
+    }
+
+    /// Create a new message with a specific size, initializing the buffer 
with zeros.
+    ///
+    /// The header will be zeroed and must be initialized by the caller.
+    #[allow(unused)]
+    pub fn new(size: usize) -> Self {
+        assert!(size >= size_of::<H>(), "Size must be at least header size");
+        let buffer = Bytes::from(vec![0u8; size]);
+        Self {
+            buffer,
+            _marker: PhantomData,
+        }
+    }
+
+    /// Get a reference to the header using zero-copy access.
+    ///
+    /// This uses `bytemuck::from_bytes` to cast the buffer to the header type
+    /// without any copying or allocation.
+    #[inline]
+    #[allow(unused)]
+    pub fn header(&self) -> &H {
+        let header_bytes = &self.buffer[..size_of::<H>()];
+        bytemuck::from_bytes(header_bytes)
+    }
+
+    /// Get a reference to the message body (everything after the header).
+    ///
+    /// Returns an empty slice if there is no body.
+    #[inline]
+    #[allow(unused)]
+    pub fn body(&self) -> &[u8] {
+        let header_size = size_of::<H>();
+        let total_size = self.header().size() as usize;
+
+        if total_size > header_size {
+            &self.buffer[header_size..total_size]
+        } else {
+            &[]
+        }
+    }
+
+    /// Get the complete message as bytes (header + body).
+    #[inline]
+    #[allow(unused)]
+    pub fn as_bytes(&self) -> &[u8] {
+        let total_size = self.header().size() as usize;
+        &self.buffer[..total_size]
+    }
+
+    /// Get the underlying buffer.
+    #[inline]
+    #[allow(unused)]
+    pub fn buffer(&self) -> &Bytes {
+        &self.buffer
+    }
+
+    /// Convert into the underlying buffer.
+    #[inline]
+    #[allow(unused)]
+    pub fn into_buffer(self) -> Bytes {
+        self.buffer
+    }
+
+    /// Create a message from a buffer without validation.
+    ///
+    /// # Safety
+    ///
+    /// This is private and skips validation. Use only when:
+    /// - The buffer is already validated
+    /// - If doing a zero-cost type conversion (like to GenericHeader)
+    #[inline]
+    #[allow(unused)]
+    fn from_buffer_unchecked(buffer: Bytes) -> Self {
+        Self {
+            buffer,
+            _marker: PhantomData,
+        }
+    }
+
+    /// Convert to a generic message (erasing the specific header type).
+    ///
+    /// This allows treating any message as a generic message for common 
operations.
+    #[allow(unused)]
+    pub fn into_generic(self) -> Message<header::GenericHeader> {
+        Message::from_buffer_unchecked(self.buffer)
+    }
+
+    /// Get a reference to this message as a generic message.
+    #[allow(unused)]
+    pub fn as_generic(&self) -> &Message<header::GenericHeader> {
+        // SAFETY: Message<H> and Message<GenericHeader> have the same memory 
layout
+        // because they only differ in the PhantomData type parameter
+        unsafe { &*(self as *const Self as *const 
Message<header::GenericHeader>) }
+    }
+
+    /// Try to convert this message to a different header type.
+    ///
+    /// This validates that the command in the header matches the target type's
+    /// expected command before performing the conversion.
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if:
+    /// - The command doesn't match the target type
+    /// - The target header validation fails
+    #[allow(unused)]
+    pub fn try_into_typed<T>(self) -> Result<Message<T>, 
header::ConsensusError>
+    where
+        T: ConsensusHeader,
+    {
+        if self.buffer.len() < size_of::<T>() {
+            return Err(header::ConsensusError::InvalidCommand {
+                expected: T::COMMAND,
+                found: header::Command::Reserved,
+            });
+        }
+
+        let generic = self.as_generic();
+        if generic.header().command != T::COMMAND {
+            return Err(header::ConsensusError::InvalidCommand {
+                expected: T::COMMAND,
+                found: generic.header().command,
+            });
+        }
+
+        let new_message = Message::<T>::from_buffer_unchecked(self.buffer);
+
+        new_message.header().validate()?;
+
+        Ok(new_message)
+    }
+
+    /// Try to get a reference to this message as a different header type.
+    ///
+    /// This is similar to `try_into_typed` but borrows instead of consuming.
+    #[allow(unused)]
+    pub fn try_as_typed<T>(&self) -> Result<&Message<T>, 
header::ConsensusError>
+    where
+        T: ConsensusHeader,
+    {
+        // check buffer size
+        if self.buffer.len() < size_of::<T>() {
+            return Err(header::ConsensusError::InvalidCommand {
+                expected: T::COMMAND,
+                found: header::Command::Reserved,
+            });
+        }
+
+        // check the command matches
+        let generic = self.as_generic();
+        if generic.header().command != T::COMMAND {
+            return Err(header::ConsensusError::InvalidCommand {
+                expected: T::COMMAND,
+                found: generic.header().command,
+            });
+        }
+
+        let typed_message = unsafe { &*(self as *const Self as *const 
Message<T>) };
+
+        // validate the header
+        typed_message.header().validate()?;
+
+        Ok(typed_message)
+    }
+}
+
+#[derive(Debug)]
+pub enum MessageBag {
+    Generic(Message<GenericHeader>),
+    Prepare(Message<PrepareHeader>),
+    Commit(Message<CommitHeader>),
+    Reply(Message<ReplyHeader>),
+}
+
+impl MessageBag {
+    #[allow(unused)]
+    pub fn command(&self) -> header::Command {
+        match self {
+            MessageBag::Generic(message) => message.header().command,
+            MessageBag::Prepare(message) => message.header().command,
+            MessageBag::Commit(message) => message.header().command,
+            MessageBag::Reply(message) => message.header().command,
+        }
+    }
+
+    #[allow(unused)]
+    pub fn size(&self) -> u32 {
+        match self {
+            MessageBag::Generic(message) => message.header().size(),
+            MessageBag::Prepare(message) => message.header().size(),
+            MessageBag::Commit(message) => message.header().size(),
+            MessageBag::Reply(message) => message.header().size(),
+        }
+    }
+
+    #[allow(unused)]
+    pub fn as_prepare(&self) -> Option<&Message<header::PrepareHeader>> {
+        match self {
+            MessageBag::Prepare(m) => Some(m),
+            _ => None,
+        }
+    }
+
+    #[allow(unused)]
+    pub fn as_commit(&self) -> Option<&Message<header::CommitHeader>> {
+        match self {
+            MessageBag::Commit(m) => Some(m),
+            _ => None,
+        }
+    }
+
+    #[allow(unused)]
+    pub fn as_reply(&self) -> Option<&Message<header::ReplyHeader>> {
+        match self {
+            MessageBag::Reply(m) => Some(m),
+            _ => None,
+        }
+    }
+
+    #[allow(unused)]
+    pub fn into_prepare(self) -> Result<Message<header::PrepareHeader>, Self> {
+        match self {
+            MessageBag::Prepare(m) => Ok(m),
+            other => Err(other),
+        }
+    }
+
+    #[allow(unused)]
+    pub fn into_commit(self) -> Result<Message<header::CommitHeader>, Self> {
+        match self {
+            MessageBag::Commit(m) => Ok(m),
+            other => Err(other),
+        }
+    }
+
+    #[allow(unused)]
+    pub fn into_reply(self) -> Result<Message<header::ReplyHeader>, Self> {
+        match self {
+            MessageBag::Reply(m) => Ok(m),
+            other => Err(other),
+        }
+    }
+
+    #[allow(unused)]
+    pub fn into_generic(self) -> Message<header::GenericHeader> {
+        match self {
+            MessageBag::Generic(m) => m,
+            MessageBag::Prepare(m) => m.into_generic(),
+            MessageBag::Commit(m) => m.into_generic(),
+            MessageBag::Reply(m) => m.into_generic(),
+        }
+    }
+
+    #[allow(unused)]
+    pub fn as_generic(&self) -> &Message<header::GenericHeader> {
+        match self {
+            MessageBag::Generic(m) => m,
+            MessageBag::Prepare(m) => m.as_generic(),
+            MessageBag::Commit(m) => m.as_generic(),
+            MessageBag::Reply(m) => m.as_generic(),
+        }
+    }
+}
+
+impl From<Message<header::PrepareHeader>> for MessageBag {
+    fn from(value: Message<header::PrepareHeader>) -> Self {
+        MessageBag::Prepare(value)
+    }
+}
+
+impl From<Message<header::CommitHeader>> for MessageBag {
+    fn from(value: Message<header::CommitHeader>) -> Self {
+        MessageBag::Commit(value)
+    }
+}
+
+impl From<Message<header::ReplyHeader>> for MessageBag {
+    fn from(value: Message<header::ReplyHeader>) -> Self {
+        MessageBag::Reply(value)
+    }
+}
+
+impl From<Message<header::GenericHeader>> for MessageBag {
+    fn from(value: Message<header::GenericHeader>) -> Self {
+        MessageBag::Generic(value)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use bytes::BytesMut;
+
+    use super::*;
+
+    #[test]
+    fn test_message_creation_and_access() {
+        let mut buffer = vec![0u8; 256];

Review Comment:
   Start the test by creating actual `Message` rather than using raw bytes, 
create factory method for each message type (depending on the header).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to