numinnex commented on code in PR #2389:
URL: https://github.com/apache/iggy/pull/2389#discussion_r2559778752
##########
core/common/src/types/consensus/mod.rs:
##########
@@ -19,7 +19,572 @@ use std::marker::PhantomData;
use bytes::Bytes;
-use crate::types::consensus::header::ConsensusHeader;
+use crate::types::consensus::header::{
+ CommitHeader, ConsensusHeader, GenericHeader, PrepareHeader, ReplyHeader,
+};
+
+#[derive(Debug)]
+pub struct Message<H: ConsensusHeader> {
+ buffer: Bytes,
+ _marker: PhantomData<H>,
+}
+
+impl<H> Message<H>
+where
+ H: ConsensusHeader,
+{
+ /// Create a new message from a buffer.
+ ///
+ /// # Safety
+ ///
+ /// The buffer must:
+ /// - be at least `size_of::<H>()` bytes long
+ /// - contain a valid header at the start
+ /// - be properly aligned for type H
+ ///
+ /// # Errors
+ ///
+ /// Returns an error if:
+ /// - buffer is too small for the header
+ /// - buffer is too small for the size specified in the header
+ /// - header validation fails
+ #[allow(unused)]
+ pub fn from_bytes(buffer: Bytes) -> Result<Self, header::ConsensusError> {
+ // verify minimum size
+ if buffer.len() < size_of::<H>() {
+ return Err(header::ConsensusError::InvalidCommand {
+ expected: H::COMMAND,
+ found: header::Command::Reserved,
+ });
+ }
+
+ let message = Self {
+ buffer,
+ _marker: PhantomData,
+ };
+
+ // validate the header
+ message.header().validate()?;
+
+ // verify buffer size matches header.size
+ let header_size = message.header().size() as usize;
+ if message.buffer.len() < header_size {
+ return Err(header::ConsensusError::InvalidCommand {
+ expected: H::COMMAND,
+ found: header::Command::Reserved,
+ });
+ }
+
+ Ok(message)
+ }
+
+ /// Create a new message with a specific size, initializing the buffer
with zeros.
+ ///
+ /// The header will be zeroed and must be initialized by the caller.
+ #[allow(unused)]
+ pub fn new(size: usize) -> Self {
+ assert!(size >= size_of::<H>(), "Size must be at least header size");
+ let buffer = Bytes::from(vec![0u8; size]);
+ Self {
+ buffer,
+ _marker: PhantomData,
+ }
+ }
+
+ /// Get a reference to the header using zero-copy access.
+ ///
+ /// This uses `bytemuck::from_bytes` to cast the buffer to the header type
+ /// without any copying or allocation.
+ #[inline]
+ #[allow(unused)]
+ pub fn header(&self) -> &H {
+ let header_bytes = &self.buffer[..size_of::<H>()];
+ bytemuck::from_bytes(header_bytes)
+ }
+
+ /// Get a reference to the message body (everything after the header).
+ ///
+ /// Returns an empty slice if there is no body.
+ #[inline]
+ #[allow(unused)]
+ pub fn body(&self) -> &[u8] {
+ let header_size = size_of::<H>();
+ let total_size = self.header().size() as usize;
+
+ if total_size > header_size {
+ &self.buffer[header_size..total_size]
+ } else {
+ &[]
+ }
+ }
+
+ /// Get the complete message as bytes (header + body).
+ #[inline]
+ #[allow(unused)]
+ pub fn as_bytes(&self) -> &[u8] {
+ let total_size = self.header().size() as usize;
+ &self.buffer[..total_size]
+ }
+
+ /// Get the underlying buffer.
+ #[inline]
+ #[allow(unused)]
+ pub fn buffer(&self) -> &Bytes {
+ &self.buffer
+ }
+
+ /// Convert into the underlying buffer.
+ #[inline]
+ #[allow(unused)]
+ pub fn into_buffer(self) -> Bytes {
+ self.buffer
+ }
+
+ /// Create a message from a buffer without validation.
+ ///
+ /// # Safety
+ ///
+ /// This is private and skips validation. Use only when:
+ /// - The buffer is already validated
+ /// - If doing a zero-cost type conversion (like to GenericHeader)
+ #[inline]
+ #[allow(unused)]
+ fn from_buffer_unchecked(buffer: Bytes) -> Self {
+ Self {
+ buffer,
+ _marker: PhantomData,
+ }
+ }
+
+ /// Convert to a generic message (erasing the specific header type).
+ ///
+ /// This allows treating any message as a generic message for common
operations.
+ #[allow(unused)]
+ pub fn into_generic(self) -> Message<header::GenericHeader> {
+ Message::from_buffer_unchecked(self.buffer)
+ }
+
+ /// Get a reference to this message as a generic message.
+ #[allow(unused)]
+ pub fn as_generic(&self) -> &Message<header::GenericHeader> {
+ // SAFETY: Message<H> and Message<GenericHeader> have the same memory
layout
+ // because they only differ in the PhantomData type parameter
+ unsafe { &*(self as *const Self as *const
Message<header::GenericHeader>) }
+ }
+
+ /// Try to convert this message to a different header type.
+ ///
+ /// This validates that the command in the header matches the target type's
+ /// expected command before performing the conversion.
+ ///
+ /// # Errors
+ ///
+ /// Returns an error if:
+ /// - The command doesn't match the target type
+ /// - The target header validation fails
+ #[allow(unused)]
+ pub fn try_into_typed<T>(self) -> Result<Message<T>,
header::ConsensusError>
+ where
+ T: ConsensusHeader,
+ {
+ if self.buffer.len() < size_of::<T>() {
+ return Err(header::ConsensusError::InvalidCommand {
+ expected: T::COMMAND,
+ found: header::Command::Reserved,
+ });
+ }
+
+ let generic = self.as_generic();
+ if generic.header().command != T::COMMAND {
+ return Err(header::ConsensusError::InvalidCommand {
+ expected: T::COMMAND,
+ found: generic.header().command,
+ });
+ }
+
+ let new_message = Message::<T>::from_buffer_unchecked(self.buffer);
+
+ new_message.header().validate()?;
+
+ Ok(new_message)
+ }
+
+ /// Try to get a reference to this message as a different header type.
+ ///
+ /// This is similar to `try_into_typed` but borrows instead of consuming.
+ #[allow(unused)]
+ pub fn try_as_typed<T>(&self) -> Result<&Message<T>,
header::ConsensusError>
+ where
+ T: ConsensusHeader,
+ {
+ // check buffer size
+ if self.buffer.len() < size_of::<T>() {
+ return Err(header::ConsensusError::InvalidCommand {
+ expected: T::COMMAND,
+ found: header::Command::Reserved,
+ });
+ }
+
+ // check the command matches
+ let generic = self.as_generic();
+ if generic.header().command != T::COMMAND {
+ return Err(header::ConsensusError::InvalidCommand {
+ expected: T::COMMAND,
+ found: generic.header().command,
+ });
+ }
+
+ let typed_message = unsafe { &*(self as *const Self as *const
Message<T>) };
+
+ // validate the header
+ typed_message.header().validate()?;
+
+ Ok(typed_message)
+ }
+}
+
+#[derive(Debug)]
+pub enum MessageBag {
+ Generic(Message<GenericHeader>),
+ Prepare(Message<PrepareHeader>),
+ Commit(Message<CommitHeader>),
+ Reply(Message<ReplyHeader>),
+}
+
+impl MessageBag {
+ #[allow(unused)]
+ pub fn command(&self) -> header::Command {
+ match self {
+ MessageBag::Generic(message) => message.header().command,
+ MessageBag::Prepare(message) => message.header().command,
+ MessageBag::Commit(message) => message.header().command,
+ MessageBag::Reply(message) => message.header().command,
+ }
+ }
+
+ #[allow(unused)]
+ pub fn size(&self) -> u32 {
+ match self {
+ MessageBag::Generic(message) => message.header().size(),
+ MessageBag::Prepare(message) => message.header().size(),
+ MessageBag::Commit(message) => message.header().size(),
+ MessageBag::Reply(message) => message.header().size(),
+ }
+ }
+
+ #[allow(unused)]
+ pub fn as_prepare(&self) -> Option<&Message<header::PrepareHeader>> {
+ match self {
+ MessageBag::Prepare(m) => Some(m),
+ _ => None,
+ }
+ }
+
+ #[allow(unused)]
+ pub fn as_commit(&self) -> Option<&Message<header::CommitHeader>> {
+ match self {
+ MessageBag::Commit(m) => Some(m),
+ _ => None,
+ }
+ }
+
+ #[allow(unused)]
+ pub fn as_reply(&self) -> Option<&Message<header::ReplyHeader>> {
+ match self {
+ MessageBag::Reply(m) => Some(m),
+ _ => None,
+ }
+ }
+
+ #[allow(unused)]
+ pub fn into_prepare(self) -> Result<Message<header::PrepareHeader>, Self> {
+ match self {
+ MessageBag::Prepare(m) => Ok(m),
+ other => Err(other),
+ }
+ }
+
+ #[allow(unused)]
+ pub fn into_commit(self) -> Result<Message<header::CommitHeader>, Self> {
+ match self {
+ MessageBag::Commit(m) => Ok(m),
+ other => Err(other),
+ }
+ }
+
+ #[allow(unused)]
+ pub fn into_reply(self) -> Result<Message<header::ReplyHeader>, Self> {
+ match self {
+ MessageBag::Reply(m) => Ok(m),
+ other => Err(other),
+ }
+ }
+
+ #[allow(unused)]
+ pub fn into_generic(self) -> Message<header::GenericHeader> {
+ match self {
+ MessageBag::Generic(m) => m,
+ MessageBag::Prepare(m) => m.into_generic(),
+ MessageBag::Commit(m) => m.into_generic(),
+ MessageBag::Reply(m) => m.into_generic(),
+ }
+ }
+
+ #[allow(unused)]
+ pub fn as_generic(&self) -> &Message<header::GenericHeader> {
+ match self {
+ MessageBag::Generic(m) => m,
+ MessageBag::Prepare(m) => m.as_generic(),
+ MessageBag::Commit(m) => m.as_generic(),
+ MessageBag::Reply(m) => m.as_generic(),
+ }
+ }
+}
+
+impl From<Message<header::PrepareHeader>> for MessageBag {
+ fn from(value: Message<header::PrepareHeader>) -> Self {
+ MessageBag::Prepare(value)
+ }
+}
+
+impl From<Message<header::CommitHeader>> for MessageBag {
+ fn from(value: Message<header::CommitHeader>) -> Self {
+ MessageBag::Commit(value)
+ }
+}
+
+impl From<Message<header::ReplyHeader>> for MessageBag {
+ fn from(value: Message<header::ReplyHeader>) -> Self {
+ MessageBag::Reply(value)
+ }
+}
+
+impl From<Message<header::GenericHeader>> for MessageBag {
+ fn from(value: Message<header::GenericHeader>) -> Self {
+ MessageBag::Generic(value)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use bytes::BytesMut;
+
+ use super::*;
+
+ #[test]
+ fn test_message_creation_and_access() {
+ let mut buffer = vec![0u8; 256];
+
+ let header = bytemuck::from_bytes_mut::<header::GenericHeader>(
+ &mut buffer[..size_of::<header::GenericHeader>()],
+ );
+ header.size = 256;
+ header.command = header::Command::Reserved;
+ header.cluster = 123;
+
+ let message =
Message::<header::GenericHeader>::from_bytes(Bytes::from(buffer)).unwrap();
+
+ assert_eq!(message.header().size, 256);
+ assert_eq!(message.header().cluster, 123);
+ assert_eq!(message.header().command, header::Command::Reserved);
+ assert_eq!(
+ message.body().len(),
+ 256 - size_of::<header::GenericHeader>()
+ );
+ }
+
+ #[test]
+ fn test_message_conversion() {
Review Comment:
It does not seem to be doing that, also we don't want to test conversions
like this. There is a `Project` trait that we will use for conversions between
different header types.
What you have to do is start from an message (not bytes), assign some values
to that message header (size/command/op/etc..), convert it into bytes and store
copy of those bytes in a variable, reconstruct that message back, convert into
bytes and compare those bytes with the copy stored in variable.
##########
core/common/src/types/consensus/mod.rs:
##########
@@ -19,7 +19,572 @@ use std::marker::PhantomData;
use bytes::Bytes;
-use crate::types::consensus::header::ConsensusHeader;
+use crate::types::consensus::header::{
+ CommitHeader, ConsensusHeader, GenericHeader, PrepareHeader, ReplyHeader,
+};
+
+#[derive(Debug)]
+pub struct Message<H: ConsensusHeader> {
+ buffer: Bytes,
+ _marker: PhantomData<H>,
+}
+
+impl<H> Message<H>
+where
+ H: ConsensusHeader,
+{
+ /// Create a new message from a buffer.
+ ///
+ /// # Safety
+ ///
+ /// The buffer must:
+ /// - be at least `size_of::<H>()` bytes long
+ /// - contain a valid header at the start
+ /// - be properly aligned for type H
+ ///
+ /// # Errors
+ ///
+ /// Returns an error if:
+ /// - buffer is too small for the header
+ /// - buffer is too small for the size specified in the header
+ /// - header validation fails
+ #[allow(unused)]
+ pub fn from_bytes(buffer: Bytes) -> Result<Self, header::ConsensusError> {
+ // verify minimum size
+ if buffer.len() < size_of::<H>() {
+ return Err(header::ConsensusError::InvalidCommand {
+ expected: H::COMMAND,
+ found: header::Command::Reserved,
+ });
+ }
+
+ let message = Self {
+ buffer,
+ _marker: PhantomData,
+ };
+
+ // validate the header
+ message.header().validate()?;
+
+ // verify buffer size matches header.size
+ let header_size = message.header().size() as usize;
+ if message.buffer.len() < header_size {
+ return Err(header::ConsensusError::InvalidCommand {
+ expected: H::COMMAND,
+ found: header::Command::Reserved,
+ });
+ }
+
+ Ok(message)
+ }
+
+ /// Create a new message with a specific size, initializing the buffer
with zeros.
+ ///
+ /// The header will be zeroed and must be initialized by the caller.
+ #[allow(unused)]
+ pub fn new(size: usize) -> Self {
+ assert!(size >= size_of::<H>(), "Size must be at least header size");
+ let buffer = Bytes::from(vec![0u8; size]);
+ Self {
+ buffer,
+ _marker: PhantomData,
+ }
+ }
+
+ /// Get a reference to the header using zero-copy access.
+ ///
+ /// This uses `bytemuck::from_bytes` to cast the buffer to the header type
+ /// without any copying or allocation.
+ #[inline]
+ #[allow(unused)]
+ pub fn header(&self) -> &H {
+ let header_bytes = &self.buffer[..size_of::<H>()];
+ bytemuck::from_bytes(header_bytes)
+ }
+
+ /// Get a reference to the message body (everything after the header).
+ ///
+ /// Returns an empty slice if there is no body.
+ #[inline]
+ #[allow(unused)]
+ pub fn body(&self) -> &[u8] {
+ let header_size = size_of::<H>();
+ let total_size = self.header().size() as usize;
+
+ if total_size > header_size {
+ &self.buffer[header_size..total_size]
+ } else {
+ &[]
+ }
+ }
+
+ /// Get the complete message as bytes (header + body).
+ #[inline]
+ #[allow(unused)]
+ pub fn as_bytes(&self) -> &[u8] {
+ let total_size = self.header().size() as usize;
+ &self.buffer[..total_size]
+ }
+
+ /// Get the underlying buffer.
+ #[inline]
+ #[allow(unused)]
+ pub fn buffer(&self) -> &Bytes {
+ &self.buffer
+ }
+
+ /// Convert into the underlying buffer.
+ #[inline]
+ #[allow(unused)]
+ pub fn into_buffer(self) -> Bytes {
+ self.buffer
+ }
+
+ /// Create a message from a buffer without validation.
+ ///
+ /// # Safety
+ ///
+ /// This is private and skips validation. Use only when:
+ /// - The buffer is already validated
+ /// - If doing a zero-cost type conversion (like to GenericHeader)
+ #[inline]
+ #[allow(unused)]
+ fn from_buffer_unchecked(buffer: Bytes) -> Self {
+ Self {
+ buffer,
+ _marker: PhantomData,
+ }
+ }
+
+ /// Convert to a generic message (erasing the specific header type).
+ ///
+ /// This allows treating any message as a generic message for common
operations.
+ #[allow(unused)]
+ pub fn into_generic(self) -> Message<header::GenericHeader> {
+ Message::from_buffer_unchecked(self.buffer)
+ }
+
+ /// Get a reference to this message as a generic message.
+ #[allow(unused)]
+ pub fn as_generic(&self) -> &Message<header::GenericHeader> {
+ // SAFETY: Message<H> and Message<GenericHeader> have the same memory
layout
+ // because they only differ in the PhantomData type parameter
+ unsafe { &*(self as *const Self as *const
Message<header::GenericHeader>) }
+ }
+
+ /// Try to convert this message to a different header type.
+ ///
+ /// This validates that the command in the header matches the target type's
+ /// expected command before performing the conversion.
+ ///
+ /// # Errors
+ ///
+ /// Returns an error if:
+ /// - The command doesn't match the target type
+ /// - The target header validation fails
+ #[allow(unused)]
+ pub fn try_into_typed<T>(self) -> Result<Message<T>,
header::ConsensusError>
+ where
+ T: ConsensusHeader,
+ {
+ if self.buffer.len() < size_of::<T>() {
+ return Err(header::ConsensusError::InvalidCommand {
+ expected: T::COMMAND,
+ found: header::Command::Reserved,
+ });
+ }
+
+ let generic = self.as_generic();
+ if generic.header().command != T::COMMAND {
+ return Err(header::ConsensusError::InvalidCommand {
+ expected: T::COMMAND,
+ found: generic.header().command,
+ });
+ }
+
+ let new_message = Message::<T>::from_buffer_unchecked(self.buffer);
+
+ new_message.header().validate()?;
+
+ Ok(new_message)
+ }
+
+ /// Try to get a reference to this message as a different header type.
+ ///
+ /// This is similar to `try_into_typed` but borrows instead of consuming.
+ #[allow(unused)]
+ pub fn try_as_typed<T>(&self) -> Result<&Message<T>,
header::ConsensusError>
+ where
+ T: ConsensusHeader,
+ {
+ // check buffer size
+ if self.buffer.len() < size_of::<T>() {
+ return Err(header::ConsensusError::InvalidCommand {
+ expected: T::COMMAND,
+ found: header::Command::Reserved,
+ });
+ }
+
+ // check the command matches
+ let generic = self.as_generic();
+ if generic.header().command != T::COMMAND {
+ return Err(header::ConsensusError::InvalidCommand {
+ expected: T::COMMAND,
+ found: generic.header().command,
+ });
+ }
+
+ let typed_message = unsafe { &*(self as *const Self as *const
Message<T>) };
+
+ // validate the header
+ typed_message.header().validate()?;
+
+ Ok(typed_message)
+ }
+}
+
+#[derive(Debug)]
+pub enum MessageBag {
+ Generic(Message<GenericHeader>),
+ Prepare(Message<PrepareHeader>),
+ Commit(Message<CommitHeader>),
+ Reply(Message<ReplyHeader>),
+}
+
+impl MessageBag {
+ #[allow(unused)]
+ pub fn command(&self) -> header::Command {
+ match self {
+ MessageBag::Generic(message) => message.header().command,
+ MessageBag::Prepare(message) => message.header().command,
+ MessageBag::Commit(message) => message.header().command,
+ MessageBag::Reply(message) => message.header().command,
+ }
+ }
+
+ #[allow(unused)]
+ pub fn size(&self) -> u32 {
+ match self {
+ MessageBag::Generic(message) => message.header().size(),
+ MessageBag::Prepare(message) => message.header().size(),
+ MessageBag::Commit(message) => message.header().size(),
+ MessageBag::Reply(message) => message.header().size(),
+ }
+ }
+
+ #[allow(unused)]
+ pub fn as_prepare(&self) -> Option<&Message<header::PrepareHeader>> {
+ match self {
+ MessageBag::Prepare(m) => Some(m),
+ _ => None,
+ }
+ }
+
+ #[allow(unused)]
+ pub fn as_commit(&self) -> Option<&Message<header::CommitHeader>> {
+ match self {
+ MessageBag::Commit(m) => Some(m),
+ _ => None,
+ }
+ }
+
+ #[allow(unused)]
+ pub fn as_reply(&self) -> Option<&Message<header::ReplyHeader>> {
+ match self {
+ MessageBag::Reply(m) => Some(m),
+ _ => None,
+ }
+ }
+
+ #[allow(unused)]
+ pub fn into_prepare(self) -> Result<Message<header::PrepareHeader>, Self> {
+ match self {
+ MessageBag::Prepare(m) => Ok(m),
+ other => Err(other),
+ }
+ }
+
+ #[allow(unused)]
+ pub fn into_commit(self) -> Result<Message<header::CommitHeader>, Self> {
+ match self {
+ MessageBag::Commit(m) => Ok(m),
+ other => Err(other),
+ }
+ }
+
+ #[allow(unused)]
+ pub fn into_reply(self) -> Result<Message<header::ReplyHeader>, Self> {
+ match self {
+ MessageBag::Reply(m) => Ok(m),
+ other => Err(other),
+ }
+ }
+
+ #[allow(unused)]
+ pub fn into_generic(self) -> Message<header::GenericHeader> {
+ match self {
+ MessageBag::Generic(m) => m,
+ MessageBag::Prepare(m) => m.into_generic(),
+ MessageBag::Commit(m) => m.into_generic(),
+ MessageBag::Reply(m) => m.into_generic(),
+ }
+ }
+
+ #[allow(unused)]
+ pub fn as_generic(&self) -> &Message<header::GenericHeader> {
+ match self {
+ MessageBag::Generic(m) => m,
+ MessageBag::Prepare(m) => m.as_generic(),
+ MessageBag::Commit(m) => m.as_generic(),
+ MessageBag::Reply(m) => m.as_generic(),
+ }
+ }
+}
+
+impl From<Message<header::PrepareHeader>> for MessageBag {
+ fn from(value: Message<header::PrepareHeader>) -> Self {
+ MessageBag::Prepare(value)
+ }
+}
+
+impl From<Message<header::CommitHeader>> for MessageBag {
+ fn from(value: Message<header::CommitHeader>) -> Self {
+ MessageBag::Commit(value)
+ }
+}
+
+impl From<Message<header::ReplyHeader>> for MessageBag {
+ fn from(value: Message<header::ReplyHeader>) -> Self {
+ MessageBag::Reply(value)
+ }
+}
+
+impl From<Message<header::GenericHeader>> for MessageBag {
+ fn from(value: Message<header::GenericHeader>) -> Self {
+ MessageBag::Generic(value)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use bytes::BytesMut;
+
+ use super::*;
+
+ #[test]
+ fn test_message_creation_and_access() {
+ let mut buffer = vec![0u8; 256];
+
+ let header = bytemuck::from_bytes_mut::<header::GenericHeader>(
+ &mut buffer[..size_of::<header::GenericHeader>()],
+ );
+ header.size = 256;
+ header.command = header::Command::Reserved;
+ header.cluster = 123;
+
+ let message =
Message::<header::GenericHeader>::from_bytes(Bytes::from(buffer)).unwrap();
+
+ assert_eq!(message.header().size, 256);
+ assert_eq!(message.header().cluster, 123);
+ assert_eq!(message.header().command, header::Command::Reserved);
+ assert_eq!(
+ message.body().len(),
+ 256 - size_of::<header::GenericHeader>()
+ );
+ }
+
+ #[test]
+ fn test_message_conversion() {
+ let mut buffer = vec![0u8; 256];
+
+ let header = bytemuck::from_bytes_mut::<header::PrepareHeader>(
+ &mut buffer[..size_of::<header::PrepareHeader>()],
+ );
+ header.size = 256;
+ header.command = header::Command::Prepare;
+ header.cluster = 456;
+ header.op = 100;
+ header.view = 1;
+ header.operation = header::Operation::CreateStream;
+
+ let prepare_message =
+
Message::<header::PrepareHeader>::from_bytes(Bytes::from(buffer)).unwrap();
+
+ let generic_message = prepare_message.into_generic();
+ assert_eq!(generic_message.header().command, header::Command::Prepare);
+
+ let prepare_again: Message<header::PrepareHeader> =
+ generic_message.try_into_typed().unwrap();
+ assert_eq!(prepare_again.header().op, 100);
+ assert_eq!(prepare_again.header().view, 1);
+ }
+
+ #[test]
+ fn test_message_body() {
+ let mut buffer = vec![0u8; 256];
+
+ let header = bytemuck::from_bytes_mut::<header::GenericHeader>(
+ &mut buffer[..size_of::<header::GenericHeader>()],
+ );
+ header.size = 256;
+ header.command = header::Command::Reserved;
+
+ for (i, item) in buffer
+ .iter_mut()
+ .enumerate()
+ .take(256)
+ .skip(size_of::<header::GenericHeader>())
+ {
+ *item = (i % 256) as u8;
+ }
+
+ let message =
Message::<header::GenericHeader>::from_bytes(Bytes::from(buffer)).unwrap();
+
+ let body = message.body();
+ assert_eq!(body.len(), 256 - size_of::<header::GenericHeader>());
+
+ for (i, &byte) in body.iter().enumerate() {
+ let expected = ((i + size_of::<header::GenericHeader>()) % 256) as
u8;
+ assert_eq!(byte, expected);
+ }
+ }
+
+ fn create_test_prepare(op: u64, cluster: u128, view: u32) ->
Message<header::PrepareHeader> {
+ let header_size = size_of::<header::PrepareHeader>();
+ let body_size = 64;
+ let total_size = header_size + body_size;
+
+ let mut buffer = BytesMut::zeroed(total_size);
+
+ let header = bytemuck::from_bytes_mut::<header::PrepareHeader>(&mut
buffer[..header_size]);
+
+ header.checksum = 123456;
+ header.checksum_body = 789012;
+ header.cluster = cluster;
+ header.size = total_size as u32;
+ header.view = view;
+ header.command = header::Command::Prepare;
+ header.replica = 1;
+ header.op = op;
+ header.commit = op.saturating_sub(1);
+ header.timestamp = 1234567890;
+ header.operation = header::Operation::CreateStream;
+
+ Message::<header::PrepareHeader>::from_bytes(buffer.freeze()).unwrap()
+ }
+
+ fn create_test_commit(
+ commit_num: u64,
+ cluster: u128,
+ view: u32,
+ ) -> Message<header::CommitHeader> {
+ let header_size = size_of::<header::CommitHeader>();
+ let total_size = 256; // CommitHeader must be exactly 256 bytes
+
+ let mut buffer = BytesMut::zeroed(total_size);
+
+ let header = bytemuck::from_bytes_mut::<header::CommitHeader>(&mut
buffer[..header_size]);
+
+ header.checksum = 123456;
+ header.cluster = cluster;
+ header.size = 256;
+ header.view = view;
+ header.command = header::Command::Commit;
+ header.replica = 2;
+ header.commit = commit_num;
+
+ Message::<header::CommitHeader>::from_bytes(buffer.freeze()).unwrap()
+ }
+
+ fn create_test_reply(op: u64, cluster: u128, view: u32) ->
Message<header::ReplyHeader> {
+ let header_size = size_of::<header::ReplyHeader>();
+ let body_size = 32;
+ let total_size = header_size + body_size;
+
+ let mut buffer = BytesMut::zeroed(total_size);
+
+ let header = bytemuck::from_bytes_mut::<header::ReplyHeader>(&mut
buffer[..header_size]);
+
+ header.checksum = 123456;
+ header.cluster = cluster;
+ header.size = total_size as u32;
+ header.view = view;
+ header.command = header::Command::Reply;
+ header.replica = 3;
+ header.op = op;
+ header.commit = op.saturating_sub(1);
+ header.operation = header::Operation::CreateStream;
+
+ Message::<header::ReplyHeader>::from_bytes(buffer.freeze()).unwrap()
+ }
+
+ #[test]
+ fn test_message_bag_from_prepare() {
+ let prepare = create_test_prepare(100, 12345, 1);
+ let bag = MessageBag::from(prepare);
+
+ assert_eq!(bag.command(), header::Command::Prepare);
+ assert!(matches!(bag, MessageBag::Prepare(_)));
+ assert!(!matches!(bag, MessageBag::Commit(_)));
+ assert!(!matches!(bag, MessageBag::Reply(_)));
+ assert!(!matches!(bag, MessageBag::Generic(_)));
+ }
+
+ #[test]
+ fn test_message_bag_from_commit() {
+ let commit = create_test_commit(50, 12345, 1);
+ let bag = MessageBag::from(commit);
+
+ assert_eq!(bag.command(), header::Command::Commit);
+ assert!(!matches!(bag, MessageBag::Prepare(_)));
+ assert!(matches!(bag, MessageBag::Commit(_)));
+ assert!(!matches!(bag, MessageBag::Reply(_)));
+ assert!(!matches!(bag, MessageBag::Generic(_)));
+ }
+
+ #[test]
+ fn test_message_bag_from_reply() {
+ let reply = create_test_reply(100, 12345, 1);
+ let bag = MessageBag::from(reply);
+
+ assert_eq!(bag.command(), header::Command::Reply);
+ assert!(!matches!(bag, MessageBag::Prepare(_)));
+ assert!(!matches!(bag, MessageBag::Commit(_)));
+ assert!(matches!(bag, MessageBag::Reply(_)));
+ assert!(!matches!(bag, MessageBag::Generic(_)));
+ }
+
+ #[test]
+ fn test_message_bag_as_generic() {
+ let prepare = create_test_prepare(100, 12345, 1);
+ let bag = MessageBag::from(prepare);
+
+ let generic = bag.as_generic();
+ assert_eq!(generic.header().command, header::Command::Prepare);
+ assert_eq!(generic.header().cluster, 12345);
+ }
+
+ #[test]
+ fn test_message_bag_into_prepare() {
+ let prepare = create_test_prepare(100, 12345, 1);
+ let bag = MessageBag::from(prepare);
+
+ let prepare_back = bag.into_prepare().unwrap();
+ assert_eq!(prepare_back.header().op, 100);
+ }
+
+ #[test]
+ fn test_message_bag_into_wrong_type() {
+ let prepare = create_test_prepare(100, 12345, 1);
+ let bag = MessageBag::from(prepare);
+
+ let result = bag.into_commit();
+ assert!(result.is_err(), "Should fail to unwrap as Commit");
+
+ let bag_again = result.unwrap_err();
+ assert!(matches!(bag_again, MessageBag::Prepare(_)));
Review Comment:
I think we don't need those tests, as `MessageBag` is just a temporal
placeholder.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]