krishvishal commented on code in PR #2389:
URL: https://github.com/apache/iggy/pull/2389#discussion_r2559744961


##########
core/common/src/types/consensus/mod.rs:
##########
@@ -19,7 +19,572 @@ use std::marker::PhantomData;
 
 use bytes::Bytes;
 
-use crate::types::consensus::header::ConsensusHeader;
+use crate::types::consensus::header::{
+    CommitHeader, ConsensusHeader, GenericHeader, PrepareHeader, ReplyHeader,
+};
+
+#[derive(Debug)]
+pub struct Message<H: ConsensusHeader> {
+    buffer: Bytes,
+    _marker: PhantomData<H>,
+}
+
+impl<H> Message<H>
+where
+    H: ConsensusHeader,
+{
+    /// Create a new message from a buffer.
+    ///
+    /// # Safety
+    ///
+    /// The buffer must:
+    /// - be at least `size_of::<H>()` bytes long
+    /// - contain a valid header at the start
+    /// - be properly aligned for type H
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if:
+    /// - buffer is too small for the header
+    /// - buffer is too small for the size specified in the header
+    /// - header validation fails
+    #[allow(unused)]
+    pub fn from_bytes(buffer: Bytes) -> Result<Self, header::ConsensusError> {
+        // verify minimum size
+        if buffer.len() < size_of::<H>() {
+            return Err(header::ConsensusError::InvalidCommand {
+                expected: H::COMMAND,
+                found: header::Command::Reserved,
+            });
+        }
+
+        let message = Self {
+            buffer,
+            _marker: PhantomData,
+        };
+
+        // validate the header
+        message.header().validate()?;
+
+        // verify buffer size matches header.size
+        let header_size = message.header().size() as usize;
+        if message.buffer.len() < header_size {
+            return Err(header::ConsensusError::InvalidCommand {
+                expected: H::COMMAND,
+                found: header::Command::Reserved,
+            });
+        }
+
+        Ok(message)
+    }
+
+    /// Create a new message with a specific size, initializing the buffer 
with zeros.
+    ///
+    /// The header will be zeroed and must be initialized by the caller.
+    #[allow(unused)]
+    pub fn new(size: usize) -> Self {
+        assert!(size >= size_of::<H>(), "Size must be at least header size");
+        let buffer = Bytes::from(vec![0u8; size]);
+        Self {
+            buffer,
+            _marker: PhantomData,
+        }
+    }
+
+    /// Get a reference to the header using zero-copy access.
+    ///
+    /// This uses `bytemuck::from_bytes` to cast the buffer to the header type
+    /// without any copying or allocation.
+    #[inline]
+    #[allow(unused)]
+    pub fn header(&self) -> &H {
+        let header_bytes = &self.buffer[..size_of::<H>()];
+        bytemuck::from_bytes(header_bytes)
+    }
+
+    /// Get a reference to the message body (everything after the header).
+    ///
+    /// Returns an empty slice if there is no body.
+    #[inline]
+    #[allow(unused)]
+    pub fn body(&self) -> &[u8] {
+        let header_size = size_of::<H>();
+        let total_size = self.header().size() as usize;
+
+        if total_size > header_size {
+            &self.buffer[header_size..total_size]
+        } else {
+            &[]
+        }
+    }
+
+    /// Get the complete message as bytes (header + body).
+    #[inline]
+    #[allow(unused)]
+    pub fn as_bytes(&self) -> &[u8] {
+        let total_size = self.header().size() as usize;
+        &self.buffer[..total_size]
+    }
+
+    /// Get the underlying buffer.
+    #[inline]
+    #[allow(unused)]
+    pub fn buffer(&self) -> &Bytes {
+        &self.buffer
+    }
+
+    /// Convert into the underlying buffer.
+    #[inline]
+    #[allow(unused)]
+    pub fn into_buffer(self) -> Bytes {
+        self.buffer
+    }

Review Comment:
   Done.



##########
core/common/src/types/consensus/mod.rs:
##########
@@ -19,7 +19,572 @@ use std::marker::PhantomData;
 
 use bytes::Bytes;
 
-use crate::types::consensus::header::ConsensusHeader;
+use crate::types::consensus::header::{
+    CommitHeader, ConsensusHeader, GenericHeader, PrepareHeader, ReplyHeader,
+};
+
+#[derive(Debug)]
+pub struct Message<H: ConsensusHeader> {
+    buffer: Bytes,
+    _marker: PhantomData<H>,
+}
+
+impl<H> Message<H>
+where
+    H: ConsensusHeader,
+{
+    /// Create a new message from a buffer.
+    ///
+    /// # Safety
+    ///
+    /// The buffer must:
+    /// - be at least `size_of::<H>()` bytes long
+    /// - contain a valid header at the start
+    /// - be properly aligned for type H
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if:
+    /// - buffer is too small for the header
+    /// - buffer is too small for the size specified in the header
+    /// - header validation fails
+    #[allow(unused)]
+    pub fn from_bytes(buffer: Bytes) -> Result<Self, header::ConsensusError> {
+        // verify minimum size
+        if buffer.len() < size_of::<H>() {
+            return Err(header::ConsensusError::InvalidCommand {
+                expected: H::COMMAND,
+                found: header::Command::Reserved,
+            });
+        }
+
+        let message = Self {
+            buffer,
+            _marker: PhantomData,
+        };
+
+        // validate the header
+        message.header().validate()?;
+
+        // verify buffer size matches header.size
+        let header_size = message.header().size() as usize;
+        if message.buffer.len() < header_size {
+            return Err(header::ConsensusError::InvalidCommand {
+                expected: H::COMMAND,
+                found: header::Command::Reserved,
+            });
+        }
+
+        Ok(message)
+    }
+
+    /// Create a new message with a specific size, initializing the buffer 
with zeros.
+    ///
+    /// The header will be zeroed and must be initialized by the caller.
+    #[allow(unused)]
+    pub fn new(size: usize) -> Self {
+        assert!(size >= size_of::<H>(), "Size must be at least header size");
+        let buffer = Bytes::from(vec![0u8; size]);
+        Self {
+            buffer,
+            _marker: PhantomData,
+        }
+    }
+
+    /// Get a reference to the header using zero-copy access.
+    ///
+    /// This uses `bytemuck::from_bytes` to cast the buffer to the header type
+    /// without any copying or allocation.
+    #[inline]
+    #[allow(unused)]
+    pub fn header(&self) -> &H {
+        let header_bytes = &self.buffer[..size_of::<H>()];
+        bytemuck::from_bytes(header_bytes)
+    }
+
+    /// Get a reference to the message body (everything after the header).
+    ///
+    /// Returns an empty slice if there is no body.
+    #[inline]
+    #[allow(unused)]
+    pub fn body(&self) -> &[u8] {
+        let header_size = size_of::<H>();
+        let total_size = self.header().size() as usize;
+
+        if total_size > header_size {
+            &self.buffer[header_size..total_size]
+        } else {
+            &[]
+        }
+    }
+
+    /// Get the complete message as bytes (header + body).
+    #[inline]
+    #[allow(unused)]
+    pub fn as_bytes(&self) -> &[u8] {
+        let total_size = self.header().size() as usize;
+        &self.buffer[..total_size]
+    }
+
+    /// Get the underlying buffer.
+    #[inline]
+    #[allow(unused)]
+    pub fn buffer(&self) -> &Bytes {
+        &self.buffer
+    }
+
+    /// Convert into the underlying buffer.
+    #[inline]
+    #[allow(unused)]
+    pub fn into_buffer(self) -> Bytes {
+        self.buffer
+    }
+
+    /// Create a message from a buffer without validation.
+    ///
+    /// # Safety
+    ///
+    /// This is private and skips validation. Use only when:
+    /// - The buffer is already validated
+    /// - If doing a zero-cost type conversion (like to GenericHeader)
+    #[inline]
+    #[allow(unused)]
+    fn from_buffer_unchecked(buffer: Bytes) -> Self {
+        Self {
+            buffer,
+            _marker: PhantomData,
+        }
+    }

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to