This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 971753124 feat(metadata): use new consensus messages in metadata
module (#2414)
971753124 is described below
commit 9717531248f983a72c88356c1de449c93468d89a
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Thu Nov 27 10:42:27 2025 +0100
feat(metadata): use new consensus messages in metadata module (#2414)
---
Cargo.lock | 2 +
core/common/src/lib.rs | 1 +
core/common/src/types/consensus/header.rs | 154 ++++-
.../src/types/consensus/{mod.rs => message.rs} | 94 +--
core/common/src/types/consensus/mod.rs | 655 +--------------------
core/consensus/Cargo.toml | 1 +
core/consensus/src/impls.rs | 64 +-
core/metadata/Cargo.toml | 1 +
core/metadata/src/impls/metadata.rs | 13 +-
9 files changed, 242 insertions(+), 743 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 970bb914c..02e9ff060 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1921,6 +1921,7 @@ dependencies = [
name = "consensus"
version = "0.1.0"
dependencies = [
+ "iggy_common",
"message_bus",
]
@@ -5767,6 +5768,7 @@ name = "metadata"
version = "0.1.0"
dependencies = [
"consensus",
+ "iggy_common",
"journal",
"tracing",
]
diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs
index ababdfbb1..032ea4bb9 100644
--- a/core/common/src/lib.rs
+++ b/core/common/src/lib.rs
@@ -70,6 +70,7 @@ pub use
types::configuration::websocket_config::websocket_client_config::*;
pub use
types::configuration::websocket_config::websocket_client_config_builder::*;
pub use
types::configuration::websocket_config::websocket_client_reconnection_config::*;
pub use
types::configuration::websocket_config::websocket_connection_string_options::*;
+pub use types::consensus::*;
pub use types::consumer::consumer_group::*;
pub use types::consumer::consumer_kind::*;
pub use types::consumer::consumer_offset_info::*;
diff --git a/core/common/src/types/consensus/header.rs
b/core/common/src/types/consensus/header.rs
index 926c1cf11..f069bbc4f 100644
--- a/core/common/src/types/consensus/header.rs
+++ b/core/common/src/types/consensus/header.rs
@@ -18,20 +18,19 @@
use bytemuck::{Pod, Zeroable};
use thiserror::Error;
-#[expect(unused)]
pub struct Header {}
pub trait ConsensusHeader: Sized + Pod + Zeroable {
- const COMMAND: Command;
+ const COMMAND: Command2;
fn validate(&self) -> Result<(), ConsensusError>;
fn size(&self) -> u32;
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
-#[expect(unused)]
-pub enum Command {
+pub enum Command2 {
+ #[default]
Reserved = 0,
Ping = 1,
@@ -49,10 +48,9 @@ pub enum Command {
}
#[derive(Debug, Clone, Error, PartialEq, Eq)]
-#[expect(unused)]
pub enum ConsensusError {
#[error("invalid command: expected {expected:?}, found {found:?}")]
- InvalidCommand { expected: Command, found: Command },
+ InvalidCommand { expected: Command2, found: Command2 },
#[error("invalid checksum")]
InvalidChecksum,
@@ -67,14 +65,14 @@ pub enum ConsensusError {
PrepareRequestChecksumPaddingNonZero,
#[error("command must be Commit")]
- CommitInvalidCommand,
+ CommitInvalidCommand2,
#[error("size must be 256, found {0}")]
CommitInvalidSize(u32),
// ReplyHeader specific
#[error("command must be Reply")]
- ReplyInvalidCommand,
+ ReplyInvalidCommand2,
#[error("request_checksum_padding must be 0")]
ReplyRequestChecksumPaddingNonZero,
@@ -83,10 +81,11 @@ pub enum ConsensusError {
ReplyContextPaddingNonZero,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
-#[expect(unused)]
pub enum Operation {
+ #[default]
+ Default = 0,
CreateStream = 128,
UpdateStream = 129,
DeleteStream = 130,
@@ -120,7 +119,7 @@ pub struct GenericHeader {
pub view: u32,
pub release: u32,
pub protocol: u16,
- pub command: Command,
+ pub command: Command2,
pub replica: u8,
pub reserved_frame: [u8; 12],
@@ -131,7 +130,7 @@ unsafe impl Pod for GenericHeader {}
unsafe impl Zeroable for GenericHeader {}
impl ConsensusHeader for GenericHeader {
- const COMMAND: Command = Command::Reserved;
+ const COMMAND: Command2 = Command2::Reserved;
fn validate(&self) -> Result<(), ConsensusError> {
Ok(())
@@ -144,6 +143,50 @@ impl ConsensusHeader for GenericHeader {
#[repr(C)]
#[derive(Debug, Clone, Copy)]
+pub struct RequestHeader {
+ pub checksum: u128,
+ pub checksum_body: u128,
+ pub cluster: u128,
+ pub size: u32,
+ pub epoch: u32,
+ pub view: u32,
+ pub release: u32,
+ pub protocol: u16,
+ pub command: Command2,
+ pub replica: u8,
+ pub reserved_frame: [u8; 12],
+
+ pub request_checksum: u128,
+ pub timestamp: u64,
+ pub request: u64,
+ pub operation: Operation,
+ pub reserved: [u8; 95],
+}
+
+unsafe impl Pod for RequestHeader {}
+unsafe impl Zeroable for RequestHeader {}
+
+impl ConsensusHeader for RequestHeader {
+ const COMMAND: Command2 = Command2::Request;
+
+ fn validate(&self) -> Result<(), ConsensusError> {
+ if self.command != Command2::Request {
+ return Err(ConsensusError::InvalidCommand {
+ expected: Command2::Request,
+ found: self.command,
+ });
+ }
+ Ok(())
+ }
+
+ fn size(&self) -> u32 {
+ self.size
+ }
+}
+
+// TODO: Manually impl default (and use a const for the `release`)
+#[repr(C)]
+#[derive(Default, Debug, Clone, Copy)]
pub struct PrepareHeader {
pub checksum: u128,
pub checksum_body: u128,
@@ -153,7 +196,7 @@ pub struct PrepareHeader {
pub view: u32,
pub release: u32,
pub protocol: u16,
- pub command: Command,
+ pub command: Command2,
pub replica: u8,
pub reserved_frame: [u8; 12],
@@ -161,25 +204,24 @@ pub struct PrepareHeader {
pub parent_padding: u128,
pub request_checksum: u128,
pub request_checksum_padding: u128,
- pub checkpoint_id: u128,
pub op: u64,
pub commit: u64,
pub timestamp: u64,
pub request: u64,
pub operation: Operation,
- pub reserved: [u8; 3],
+ pub reserved: [u8; 19],
}
unsafe impl Pod for PrepareHeader {}
unsafe impl Zeroable for PrepareHeader {}
impl ConsensusHeader for PrepareHeader {
- const COMMAND: Command = Command::Prepare;
+ const COMMAND: Command2 = Command2::Prepare;
fn validate(&self) -> Result<(), ConsensusError> {
- if self.command != Command::Prepare {
+ if self.command != Command2::Prepare {
return Err(ConsensusError::InvalidCommand {
- expected: Command::Prepare,
+ expected: Command2::Prepare,
found: self.command,
});
}
@@ -197,6 +239,61 @@ impl ConsensusHeader for PrepareHeader {
}
}
+// TODO: Manually impl default (and use a const for the `release`)
+#[repr(C)]
+#[derive(Default, Debug, Clone, Copy)]
+pub struct PrepareOkHeader {
+ pub checksum: u128,
+ pub checksum_body: u128,
+ pub cluster: u128,
+ pub size: u32,
+ pub epoch: u32,
+ pub view: u32,
+ pub release: u32,
+ pub protocol: u16,
+ pub command: Command2,
+ pub replica: u8,
+ pub reserved_frame: [u8; 12],
+
+ pub parent: u128,
+ pub parent_padding: u128,
+ pub prepare_checksum: u128,
+ pub prepare_checksum_padding: u128,
+ pub op: u64,
+ pub commit: u64,
+ pub timestamp: u64,
+ pub request: u64,
+ pub operation: Operation,
+ pub reserved: [u8; 19],
+}
+
+unsafe impl Pod for PrepareOkHeader {}
+unsafe impl Zeroable for PrepareOkHeader {}
+
+impl ConsensusHeader for PrepareOkHeader {
+ const COMMAND: Command2 = Command2::PrepareOk;
+
+ fn validate(&self) -> Result<(), ConsensusError> {
+ if self.command != Command2::PrepareOk {
+ return Err(ConsensusError::InvalidCommand {
+ expected: Command2::PrepareOk,
+ found: self.command,
+ });
+ }
+ if self.parent_padding != 0 {
+ return Err(ConsensusError::PrepareParentPaddingNonZero);
+ }
+ if self.prepare_checksum_padding != 0 {
+ return Err(ConsensusError::PrepareRequestChecksumPaddingNonZero);
+ }
+ Ok(())
+ }
+
+ fn size(&self) -> u32 {
+ self.size
+ }
+}
+
#[repr(C)]
#[derive(Debug, Clone, Copy)]
pub struct CommitHeader {
@@ -208,27 +305,26 @@ pub struct CommitHeader {
pub view: u32,
pub release: u32,
pub protocol: u16,
- pub command: Command,
+ pub command: Command2,
pub replica: u8,
pub reserved_frame: [u8; 12],
pub commit_checksum: u128,
pub timestamp_monotonic: u64,
- pub checkpoint_id: u128,
pub commit: u64,
pub checkpoint_op: u64,
- pub reserved: [u8; 80],
+ pub reserved: [u8; 96],
}
unsafe impl Pod for CommitHeader {}
unsafe impl Zeroable for CommitHeader {}
impl ConsensusHeader for CommitHeader {
- const COMMAND: Command = Command::Commit;
+ const COMMAND: Command2 = Command2::Commit;
fn validate(&self) -> Result<(), ConsensusError> {
- if self.command != Command::Commit {
- return Err(ConsensusError::CommitInvalidCommand);
+ if self.command != Command2::Commit {
+ return Err(ConsensusError::CommitInvalidCommand2);
}
if self.size != 256 {
return Err(ConsensusError::CommitInvalidSize(self.size));
@@ -252,7 +348,7 @@ pub struct ReplyHeader {
pub view: u32,
pub release: u32,
pub protocol: u16,
- pub command: Command,
+ pub command: Command2,
pub replica: u8,
pub reserved_frame: [u8; 12],
@@ -272,11 +368,11 @@ unsafe impl Pod for ReplyHeader {}
unsafe impl Zeroable for ReplyHeader {}
impl ConsensusHeader for ReplyHeader {
- const COMMAND: Command = Command::Reply;
+ const COMMAND: Command2 = Command2::Reply;
fn validate(&self) -> Result<(), ConsensusError> {
- if self.command != Command::Reply {
- return Err(ConsensusError::ReplyInvalidCommand);
+ if self.command != Command2::Reply {
+ return Err(ConsensusError::ReplyInvalidCommand2);
}
if self.request_checksum_padding != 0 {
return Err(ConsensusError::ReplyRequestChecksumPaddingNonZero);
diff --git a/core/common/src/types/consensus/mod.rs
b/core/common/src/types/consensus/message.rs
similarity index 88%
copy from core/common/src/types/consensus/mod.rs
copy to core/common/src/types/consensus/message.rs
index 9d98bf0e8..f6c638b0c 100644
--- a/core/common/src/types/consensus/mod.rs
+++ b/core/common/src/types/consensus/message.rs
@@ -15,15 +15,13 @@
// specific language governing permissions and limitations
// under the License.
-use std::marker::PhantomData;
-
-use bytes::Bytes;
-
use crate::types::consensus::header::{
- CommitHeader, ConsensusHeader, GenericHeader, PrepareHeader, ReplyHeader,
+ self, CommitHeader, ConsensusHeader, GenericHeader, PrepareHeader,
ReplyHeader,
};
+use bytes::Bytes;
+use std::marker::PhantomData;
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct Message<H: ConsensusHeader> {
buffer: Bytes,
_marker: PhantomData<H>,
@@ -54,7 +52,7 @@ where
if buffer.len() < size_of::<H>() {
return Err(header::ConsensusError::InvalidCommand {
expected: H::COMMAND,
- found: header::Command::Reserved,
+ found: header::Command2::Reserved,
});
}
@@ -71,7 +69,7 @@ where
if message.buffer.len() < header_size {
return Err(header::ConsensusError::InvalidCommand {
expected: H::COMMAND,
- found: header::Command::Reserved,
+ found: header::Command2::Reserved,
});
}
@@ -91,6 +89,30 @@ where
}
}
+ /// Replace the header with a new one, using the provided function to
generate it.
+ pub fn replace_header<T: ConsensusHeader>(self, f: impl FnOnce(&H) -> T)
-> Message<T> {
+ assert_eq!(size_of::<H>(), size_of::<T>());
+ let prev = self.header();
+ let header = f(prev);
+
+ let header_bytes = bytemuck::bytes_of(&header);
+ let buffer = self.into_inner();
+ // Safety: We ensured that size_of::<H>() == size_of::<T>()
+ // On top of that, there is going to be only one reference to buffer
during this function call
+ // so no other references can observe the mutation.
+ // In the future we can replace the `Bytes` buffer with something that
does not allow sharing between different threads.
+ unsafe {
+ let ptr = buffer.as_ptr() as *mut u8;
+ let slice = std::slice::from_raw_parts_mut(ptr, buffer.len());
+ slice[..size_of::<H>()].copy_from_slice(header_bytes);
+ }
+ // TODO: Recalculate checksums
+ Message {
+ buffer,
+ _marker: PhantomData,
+ }
+ }
+
/// Get a reference to the header using zero-copy access.
///
/// This uses `bytemuck::from_bytes` to cast the buffer to the header type
@@ -183,7 +205,7 @@ where
if self.buffer.len() < size_of::<T>() {
return Err(header::ConsensusError::InvalidCommand {
expected: T::COMMAND,
- found: header::Command::Reserved,
+ found: header::Command2::Reserved,
});
}
@@ -214,7 +236,7 @@ where
if self.buffer.len() < size_of::<T>() {
return Err(header::ConsensusError::InvalidCommand {
expected: T::COMMAND,
- found: header::Command::Reserved,
+ found: header::Command2::Reserved,
});
}
@@ -247,7 +269,7 @@ pub enum MessageBag {
impl MessageBag {
#[allow(unused)]
- pub fn command(&self) -> header::Command {
+ pub fn command(&self) -> header::Command2 {
match self {
MessageBag::Generic(message) => message.header().command,
MessageBag::Prepare(message) => message.header().command,
@@ -279,21 +301,21 @@ where
// SAFETY: All Message<H> types have identical memory layout (only
PhantomData differs).
// We've validated the command when the original message was created.
match command {
- header::Command::Prepare => {
+ header::Command2::Prepare => {
let msg =
unsafe {
Message::<header::PrepareHeader>::from_buffer_unchecked(buffer) };
MessageBag::Prepare(msg)
}
- header::Command::Commit => {
+ header::Command2::Commit => {
let msg = unsafe {
Message::<header::CommitHeader>::from_buffer_unchecked(buffer) };
MessageBag::Commit(msg)
}
- header::Command::Reply => {
+ header::Command2::Reply => {
let msg = unsafe {
Message::<header::ReplyHeader>::from_buffer_unchecked(buffer) };
MessageBag::Reply(msg)
}
_ => unreachable!(
- "For now we only support Prepare, Commit, and Reply. In the
future we will support more commands. Command: {command:?}"
+ "For now we only support Prepare, Commit, and Reply. In the
future we will support more commands. Command2: {command:?}"
),
}
}
@@ -322,7 +344,7 @@ mod tests {
header.checksum = 123456;
header.cluster = 12345;
header.size = total_size as u32;
- header.command = header::Command::Reserved;
+ header.command = header::Command2::Reserved;
for (i, item) in buffer
.iter_mut()
@@ -352,7 +374,7 @@ mod tests {
header.cluster = 12345;
header.size = total_size as u32;
header.view = 1;
- header.command = header::Command::Prepare;
+ header.command = header::Command2::Prepare;
header.replica = 1;
header.op = 100;
header.commit = 99;
@@ -376,7 +398,7 @@ mod tests {
header.cluster = 12345;
header.size = 256;
header.view = 1;
- header.command = header::Command::Commit;
+ header.command = header::Command2::Commit;
header.replica = 2;
header.commit = 50;
@@ -398,7 +420,7 @@ mod tests {
header.cluster = 12345;
header.size = total_size as u32;
header.view = 1;
- header.command = header::Command::Reply;
+ header.command = header::Command2::Reply;
header.replica = 3;
header.op = 100;
header.commit = 99;
@@ -413,7 +435,7 @@ mod tests {
let message = header::GenericHeader::create_test();
assert_eq!(message.header().cluster, 12345);
- assert_eq!(message.header().command, header::Command::Reserved);
+ assert_eq!(message.header().command, header::Command2::Reserved);
assert_eq!(
message.body().len(),
message.header().size() as usize -
size_of::<header::GenericHeader>()
@@ -434,7 +456,7 @@ mod tests {
let original_bytes = prepare_message.as_bytes().to_vec();
let generic_message = prepare_message.into_generic();
- assert_eq!(generic_message.header().command, header::Command::Prepare);
+ assert_eq!(generic_message.header().command,
header::Command2::Prepare);
let prepare_again: Message<header::PrepareHeader> =
generic_message.try_into_typed().unwrap();
@@ -456,7 +478,7 @@ mod tests {
let prepare = header::PrepareHeader::create_test();
let bag = MessageBag::from(prepare);
- assert_eq!(bag.command(), header::Command::Prepare);
+ assert_eq!(bag.command(), header::Command2::Prepare);
assert!(matches!(bag, MessageBag::Prepare(_)));
assert!(!matches!(bag, MessageBag::Commit(_)));
assert!(!matches!(bag, MessageBag::Reply(_)));
@@ -468,7 +490,7 @@ mod tests {
let commit = header::CommitHeader::create_test();
let bag = MessageBag::from(commit);
- assert_eq!(bag.command(), header::Command::Commit);
+ assert_eq!(bag.command(), header::Command2::Commit);
assert!(!matches!(bag, MessageBag::Prepare(_)));
assert!(matches!(bag, MessageBag::Commit(_)));
assert!(!matches!(bag, MessageBag::Reply(_)));
@@ -480,7 +502,7 @@ mod tests {
let reply = header::ReplyHeader::create_test();
let bag = MessageBag::from(reply);
- assert_eq!(bag.command(), header::Command::Reply);
+ assert_eq!(bag.command(), header::Command2::Reply);
assert!(!matches!(bag, MessageBag::Prepare(_)));
assert!(!matches!(bag, MessageBag::Commit(_)));
assert!(matches!(bag, MessageBag::Reply(_)));
@@ -500,17 +522,17 @@ pub struct Message<H: Header = GenericHeader> {
// Trait that all headers must implement
pub trait Header: Sized {
- const COMMAND: Command;
+ const COMMAND: Command2;
fn size(&self) -> u32;
- fn command(&self) -> Command;
+ fn command(&self) -> Command2;
fn checksum(&self) -> u128;
}
-// Command enum (simplified)
+// Command2 enum (simplified)
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
-pub enum Command {
+pub enum Command2 {
reserved = 0,
ping = 1,
@@ -532,7 +554,7 @@ pub enum Command {
#[repr(C)]
pub struct GenericHeader {
checksum: u128,
- command: Command,
+ command: Command2,
size: u32,
// ... other fields
}
@@ -541,10 +563,10 @@ pub struct GenericHeader {
// second 128 bytes specific header (PrepareHeader, CommitHeader, etc....)
impl Header for GenericHeader {
- const COMMAND: Command = Command::Reserved;
+ const COMMAND: Command2 = Command::Reserved;
fn size(&self) -> u32 { self.size }
- fn command(&self) -> Command { self.command }
+ fn command(&self) -> Command2 { self.command }
fn checksum(&self) -> u128 { self.checksum }
}
@@ -552,7 +574,7 @@ impl Header for GenericHeader {
#[repr(C)]
pub struct PrepareHeader {
checksum: u128,
- command: Command,
+ command: Command2,
size: u32,
// ... prepare-specific fields
view: u32,
@@ -561,10 +583,10 @@ pub struct PrepareHeader {
}
impl Header for PrepareHeader {
- const COMMAND: Command = Command::Prepare;
+ const COMMAND: Command2 = Command::Prepare;
fn size(&self) -> u32 { self.size }
- fn command(&self) -> Command { self.command }
+ fn command(&self) -> Command2 { self.command }
fn checksum(&self) -> u128 { self.checksum }
}
*/
@@ -638,7 +660,7 @@ pub enum Operation {
#[repr(C)]
pub struct PrepareHeader {
checksum: u128,
- command: Command,
+ command: Command2,
size: u32,
// ... prepare-specific fields
view: u32,
@@ -666,5 +688,3 @@ pub struct PrepareHeader {
// MessageBag::Void
// }
// }
-
-pub(crate) mod header;
diff --git a/core/common/src/types/consensus/mod.rs
b/core/common/src/types/consensus/mod.rs
index 9d98bf0e8..61bfb7bcb 100644
--- a/core/common/src/types/consensus/mod.rs
+++ b/core/common/src/types/consensus/mod.rs
@@ -15,656 +15,5 @@
// specific language governing permissions and limitations
// under the License.
-use std::marker::PhantomData;
-
-use bytes::Bytes;
-
-use crate::types::consensus::header::{
- CommitHeader, ConsensusHeader, GenericHeader, PrepareHeader, ReplyHeader,
-};
-
-#[derive(Debug)]
-pub struct Message<H: ConsensusHeader> {
- buffer: Bytes,
- _marker: PhantomData<H>,
-}
-
-impl<H> Message<H>
-where
- H: ConsensusHeader,
-{
- /// Create a new message from a buffer.
- ///
- /// # Safety
- ///
- /// The buffer must:
- /// - be at least `size_of::<H>()` bytes long
- /// - contain a valid header at the start
- /// - be properly aligned for type H
- ///
- /// # Errors
- ///
- /// Returns an error if:
- /// - buffer is too small for the header
- /// - buffer is too small for the size specified in the header
- /// - header validation fails
- #[allow(unused)]
- pub fn from_bytes(buffer: Bytes) -> Result<Self, header::ConsensusError> {
- // verify minimum size
- if buffer.len() < size_of::<H>() {
- return Err(header::ConsensusError::InvalidCommand {
- expected: H::COMMAND,
- found: header::Command::Reserved,
- });
- }
-
- let message = Self {
- buffer,
- _marker: PhantomData,
- };
-
- // validate the header
- message.header().validate()?;
-
- // verify buffer size matches header.size
- let header_size = message.header().size() as usize;
- if message.buffer.len() < header_size {
- return Err(header::ConsensusError::InvalidCommand {
- expected: H::COMMAND,
- found: header::Command::Reserved,
- });
- }
-
- Ok(message)
- }
-
- /// Create a new message with a specific size, initializing the buffer
with zeros.
- ///
- /// The header will be zeroed and must be initialized by the caller.
- #[allow(unused)]
- pub fn new(size: usize) -> Self {
- assert!(size >= size_of::<H>(), "Size must be at least header size");
- let buffer = Bytes::from(vec![0u8; size]);
- Self {
- buffer,
- _marker: PhantomData,
- }
- }
-
- /// Get a reference to the header using zero-copy access.
- ///
- /// This uses `bytemuck::from_bytes` to cast the buffer to the header type
- /// without any copying or allocation.
- #[inline]
- #[allow(unused)]
- pub fn header(&self) -> &H {
- let header_bytes = &self.buffer[..size_of::<H>()];
- bytemuck::from_bytes(header_bytes)
- }
-
- /// Get a reference to the message body (everything after the header).
- ///
- /// Returns an empty slice if there is no body.
- #[inline]
- #[allow(unused)]
- pub fn body(&self) -> &[u8] {
- let header_size = size_of::<H>();
- let total_size = self.header().size() as usize;
-
- if total_size > header_size {
- &self.buffer[header_size..total_size]
- } else {
- &[]
- }
- }
-
- /// Get the complete message as bytes (header + body).
- #[inline]
- #[allow(unused)]
- pub fn as_bytes(&self) -> &[u8] {
- let total_size = self.header().size() as usize;
- &self.buffer[..total_size]
- }
-
- /// Convert into the underlying buffer.
- #[inline]
- #[allow(unused)]
- pub fn into_inner(self) -> Bytes {
- self.buffer
- }
-
- /// Create a message from a buffer without validation.
- ///
- /// # Safety
- ///
- /// This is private and skips validation. Use only when:
- /// - The buffer is already validated
- /// - If doing a zero-cost type conversion (like to GenericHeader)
- #[inline]
- #[allow(unused)]
- unsafe fn from_buffer_unchecked(buffer: Bytes) -> Self {
- Self {
- buffer,
- _marker: PhantomData,
- }
- }
-
- /// Convert to a generic message (erasing the specific header type).
- ///
- /// This allows treating any message as a generic message for common
operations.
- #[allow(unused)]
- pub fn into_generic(self) -> Message<header::GenericHeader> {
- unsafe { Message::from_buffer_unchecked(self.buffer) }
- }
-
- /// Get a reference to this message as a generic message.
- #[allow(unused)]
- pub fn as_generic(&self) -> &Message<header::GenericHeader> {
- // SAFETY: Message<H> and Message<GenericHeader> have the same memory
layout
- // because they only differ in the PhantomData type parameter
- unsafe { &*(self as *const Self as *const
Message<header::GenericHeader>) }
- }
-
- /// Try to convert this message to a different header type.
- ///
- /// This validates that the command in the header matches the target type's
- /// expected command before performing the conversion.
- ///
- /// # Errors
- ///
- /// Returns an error if:
- /// - The command doesn't match the target type
- /// - The target header validation fails
- #[allow(unused)]
- pub fn try_into_typed<T>(self) -> Result<Message<T>,
header::ConsensusError>
- where
- T: ConsensusHeader,
- {
- if self.buffer.len() < size_of::<T>() {
- return Err(header::ConsensusError::InvalidCommand {
- expected: T::COMMAND,
- found: header::Command::Reserved,
- });
- }
-
- let generic = self.as_generic();
- if generic.header().command != T::COMMAND {
- return Err(header::ConsensusError::InvalidCommand {
- expected: T::COMMAND,
- found: generic.header().command,
- });
- }
-
- let new_message = unsafe {
Message::<T>::from_buffer_unchecked(self.buffer) };
-
- new_message.header().validate()?;
-
- Ok(new_message)
- }
-
- /// Try to get a reference to this message as a different header type.
- ///
- /// This is similar to `try_into_typed` but borrows instead of consuming.
- #[allow(unused)]
- pub fn try_as_typed<T>(&self) -> Result<&Message<T>,
header::ConsensusError>
- where
- T: ConsensusHeader,
- {
- // check buffer size
- if self.buffer.len() < size_of::<T>() {
- return Err(header::ConsensusError::InvalidCommand {
- expected: T::COMMAND,
- found: header::Command::Reserved,
- });
- }
-
- // check the command matches
- let generic = self.as_generic();
- if generic.header().command != T::COMMAND {
- return Err(header::ConsensusError::InvalidCommand {
- expected: T::COMMAND,
- found: generic.header().command,
- });
- }
-
- let typed_message = unsafe { &*(self as *const Self as *const
Message<T>) };
-
- // validate the header
- typed_message.header().validate()?;
-
- Ok(typed_message)
- }
-}
-
-#[derive(Debug)]
-#[allow(unused)]
-pub enum MessageBag {
- Generic(Message<GenericHeader>),
- Prepare(Message<PrepareHeader>),
- Commit(Message<CommitHeader>),
- Reply(Message<ReplyHeader>),
-}
-
-impl MessageBag {
- #[allow(unused)]
- pub fn command(&self) -> header::Command {
- match self {
- MessageBag::Generic(message) => message.header().command,
- MessageBag::Prepare(message) => message.header().command,
- MessageBag::Commit(message) => message.header().command,
- MessageBag::Reply(message) => message.header().command,
- }
- }
-
- #[allow(unused)]
- pub fn size(&self) -> u32 {
- match self {
- MessageBag::Generic(message) => message.header().size(),
- MessageBag::Prepare(message) => message.header().size(),
- MessageBag::Commit(message) => message.header().size(),
- MessageBag::Reply(message) => message.header().size(),
- }
- }
-}
-
-impl<T> From<Message<T>> for MessageBag
-where
- T: ConsensusHeader,
-{
- fn from(value: Message<T>) -> Self {
- let command = value.as_generic().header().command;
-
- let buffer = value.into_inner();
-
- // SAFETY: All Message<H> types have identical memory layout (only
PhantomData differs).
- // We've validated the command when the original message was created.
- match command {
- header::Command::Prepare => {
- let msg =
- unsafe {
Message::<header::PrepareHeader>::from_buffer_unchecked(buffer) };
- MessageBag::Prepare(msg)
- }
- header::Command::Commit => {
- let msg = unsafe {
Message::<header::CommitHeader>::from_buffer_unchecked(buffer) };
- MessageBag::Commit(msg)
- }
- header::Command::Reply => {
- let msg = unsafe {
Message::<header::ReplyHeader>::from_buffer_unchecked(buffer) };
- MessageBag::Reply(msg)
- }
- _ => unreachable!(
- "For now we only support Prepare, Commit, and Reply. In the
future we will support more commands. Command: {command:?}"
- ),
- }
- }
-}
-
-#[cfg(test)]
-mod tests {
- use bytes::BytesMut;
-
- use super::*;
-
- trait MessageFactory: ConsensusHeader + Sized {
- fn create_test() -> Message<Self>;
- }
-
- impl MessageFactory for header::GenericHeader {
- fn create_test() -> Message<Self> {
- let header_size = size_of::<Self>();
- let body_size = 128;
- let total_size = header_size + body_size;
-
- let mut buffer = BytesMut::zeroed(total_size);
-
- let header = bytemuck::from_bytes_mut::<Self>(&mut
buffer[..header_size]);
-
- header.checksum = 123456;
- header.cluster = 12345;
- header.size = total_size as u32;
- header.command = header::Command::Reserved;
-
- for (i, item) in buffer
- .iter_mut()
- .enumerate()
- .take(total_size)
- .skip(header_size)
- {
- *item = (i % 256) as u8;
- }
-
- Message::<Self>::from_bytes(buffer.freeze()).unwrap()
- }
- }
-
- impl MessageFactory for header::PrepareHeader {
- fn create_test() -> Message<Self> {
- let header_size = size_of::<Self>();
- let body_size = 64;
- let total_size = header_size + body_size;
-
- let mut buffer = BytesMut::zeroed(total_size);
-
- let header = bytemuck::from_bytes_mut::<Self>(&mut
buffer[..header_size]);
-
- header.checksum = 123456;
- header.checksum_body = 789012;
- header.cluster = 12345;
- header.size = total_size as u32;
- header.view = 1;
- header.command = header::Command::Prepare;
- header.replica = 1;
- header.op = 100;
- header.commit = 99;
- header.timestamp = 1234567890;
- header.operation = header::Operation::CreateStream;
-
- Message::<Self>::from_bytes(buffer.freeze()).unwrap()
- }
- }
-
- impl MessageFactory for header::CommitHeader {
- fn create_test() -> Message<Self> {
- let header_size = size_of::<Self>();
- let total_size = 256;
-
- let mut buffer = BytesMut::zeroed(total_size);
-
- let header = bytemuck::from_bytes_mut::<Self>(&mut
buffer[..header_size]);
-
- header.checksum = 123456;
- header.cluster = 12345;
- header.size = 256;
- header.view = 1;
- header.command = header::Command::Commit;
- header.replica = 2;
- header.commit = 50;
-
- Message::<Self>::from_bytes(buffer.freeze()).unwrap()
- }
- }
-
- impl MessageFactory for header::ReplyHeader {
- fn create_test() -> Message<Self> {
- let header_size = size_of::<Self>();
- let body_size = 32;
- let total_size = header_size + body_size;
-
- let mut buffer = BytesMut::zeroed(total_size);
-
- let header = bytemuck::from_bytes_mut::<Self>(&mut
buffer[..header_size]);
-
- header.checksum = 123456;
- header.cluster = 12345;
- header.size = total_size as u32;
- header.view = 1;
- header.command = header::Command::Reply;
- header.replica = 3;
- header.op = 100;
- header.commit = 99;
- header.operation = header::Operation::CreateStream;
-
- Message::<Self>::from_bytes(buffer.freeze()).unwrap()
- }
- }
-
- #[test]
- fn test_message_creation_and_access() {
- let message = header::GenericHeader::create_test();
-
- assert_eq!(message.header().cluster, 12345);
- assert_eq!(message.header().command, header::Command::Reserved);
- assert_eq!(
- message.body().len(),
- message.header().size() as usize -
size_of::<header::GenericHeader>()
- );
-
- let body = message.body();
- let header_size = size_of::<header::GenericHeader>();
- for (i, &byte) in body.iter().enumerate() {
- let expected = ((i + header_size) % 256) as u8;
- assert_eq!(byte, expected);
- }
- }
-
- #[test]
- fn test_message_conversion() {
- let prepare_message = header::PrepareHeader::create_test();
-
- let original_bytes = prepare_message.as_bytes().to_vec();
-
- let generic_message = prepare_message.into_generic();
- assert_eq!(generic_message.header().command, header::Command::Prepare);
-
- let prepare_again: Message<header::PrepareHeader> =
- generic_message.try_into_typed().unwrap();
-
- assert_eq!(prepare_again.header().op, 100);
- assert_eq!(prepare_again.header().view, 1);
- assert_eq!(prepare_again.header().cluster, 12345);
-
- let roundtrip_bytes = prepare_again.as_bytes().to_vec();
-
- assert_eq!(
- original_bytes, roundtrip_bytes,
- "Bytes should be identical after round-trip conversion"
- );
- }
-
- #[test]
- fn test_message_bag_from_prepare() {
- let prepare = header::PrepareHeader::create_test();
- let bag = MessageBag::from(prepare);
-
- assert_eq!(bag.command(), header::Command::Prepare);
- assert!(matches!(bag, MessageBag::Prepare(_)));
- assert!(!matches!(bag, MessageBag::Commit(_)));
- assert!(!matches!(bag, MessageBag::Reply(_)));
- assert!(!matches!(bag, MessageBag::Generic(_)));
- }
-
- #[test]
- fn test_message_bag_from_commit() {
- let commit = header::CommitHeader::create_test();
- let bag = MessageBag::from(commit);
-
- assert_eq!(bag.command(), header::Command::Commit);
- assert!(!matches!(bag, MessageBag::Prepare(_)));
- assert!(matches!(bag, MessageBag::Commit(_)));
- assert!(!matches!(bag, MessageBag::Reply(_)));
- assert!(!matches!(bag, MessageBag::Generic(_)));
- }
-
- #[test]
- fn test_message_bag_from_reply() {
- let reply = header::ReplyHeader::create_test();
- let bag = MessageBag::from(reply);
-
- assert_eq!(bag.command(), header::Command::Reply);
- assert!(!matches!(bag, MessageBag::Prepare(_)));
- assert!(!matches!(bag, MessageBag::Commit(_)));
- assert!(matches!(bag, MessageBag::Reply(_)));
- assert!(!matches!(bag, MessageBag::Generic(_)));
- }
-}
-
-// TODO: Header generic
-// TODO: We will have to impl something like this (NOT ONE TO ONE JUST A
SKETCH) as we will use `bytemuck`:
-/*
-// Generic Message type
-#[repr(C)]
-pub struct Message<H: Header = GenericHeader> {
- buffer: AlignedBuffer<ALIGNED_TO_HEADER_SIZE>,
- _phantom: PhantomData<H>,
-}
-
-// Trait that all headers must implement
-pub trait Header: Sized {
- const COMMAND: Command;
-
- fn size(&self) -> u32;
- fn command(&self) -> Command;
- fn checksum(&self) -> u128;
-}
-
-// Command enum (simplified)
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-#[repr(u8)]
-pub enum Command {
- reserved = 0,
-
- ping = 1,
- pong = 2,
-
- request = 5,
- prepare = 6,
- prepare_ok = 7,
- reply = 8,
- commit = 9,
-
- start_view_change = 10,
- do_view_change = 11,
- start_view = 24,
- // ... etc
-}
-
-// Generic header for base Message
-#[repr(C)]
-pub struct GenericHeader {
- checksum: u128,
- command: Command,
- size: u32,
- // ... other fields
-}
-
-// first 128 bytes GenericHeader (checksum, command, size, etc....)
-// second 128 bytes specific header (PrepareHeader, CommitHeader, etc....)
-
-impl Header for GenericHeader {
- const COMMAND: Command = Command::Reserved;
-
- fn size(&self) -> u32 { self.size }
- fn command(&self) -> Command { self.command }
- fn checksum(&self) -> u128 { self.checksum }
-}
-
-// Specific header types
-#[repr(C)]
-pub struct PrepareHeader {
- checksum: u128,
- command: Command,
- size: u32,
- // ... prepare-specific fields
- view: u32,
- op: u64,
- commit: u64,
-}
-
-impl Header for PrepareHeader {
- const COMMAND: Command = Command::Prepare;
-
- fn size(&self) -> u32 { self.size }
- fn command(&self) -> Command { self.command }
- fn checksum(&self) -> u128 { self.checksum }
-}
-*/
-
-// And then for Message impl
-/*
-impl<H: Header> Message<H> {
- // Access header (no stored pointer needed!)
- pub fn header(&self) -> &H {
- assert!(size_of::<H>() <= ALIGNED_TO_HEADER_SIZE);
- unsafe { &*(self.buffer.as_ptr() as *const H) }
- }
-
- pub fn header_mut(&mut self) -> &mut H {
- unsafe { &mut *(self.buffer.as_mut_ptr() as *mut H) }
- }
-
- pub fn body(&self) -> &[u8] {
- let header_size = size_of::<H>();
- let total_size = self.header().size() as usize;
- &self.buffer[header_size..total_size]
- }
-
- pub fn body_mut(&mut self) -> &mut [u8] {
- let header_size = size_of::<H>();
- let total_size = self.header().size() as usize;
- &mut self.buffer.as_mut()[header_size..total_size]
- }
-
- pub fn as_bytes(&self) -> &[u8] {
- &self.buffer[..self.header().size() as usize]
- }
-
- pub fn base(&self) -> &Message<GenericHeader> {
- unsafe { &*(self as *const Self as *const Message<GenericHeader>) }
- }
-
- pub fn base_mut(&mut self) -> &mut Message<GenericHeader> {
- unsafe { &mut *(self as *mut Self as *mut Message<GenericHeader>) }
- }
-
- pub fn try_into<T: Header>(self) -> Option<Message<T>> {
- if self.header().command() == T::COMMAND {
- Some(unsafe { std::mem::transmute(self) })
- } else {
- None
- }
- }
-}
-*/
-
-// Then for the `Request` header we will have to store an `Operation` enum
-/*
- Define your operation enum
-#[repr(u8)]
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub enum Operation {
- // Metadata operations
- CreateTopic = 1,
- DeleteTopic = 2,
- ListTopics = 3,
- CreatePartition = 4,
-
- // Partition operations
- Produce = 100,
- Consume = 101,
- Fetch = 102,
-}
-
-// Specific header types
-#[repr(C)]
-pub struct PrepareHeader {
- checksum: u128,
- command: Command,
- size: u32,
- // ... prepare-specific fields
- view: u32,
- op: u64,
- commit: u64,
-
- // second 128 bytes
- operation: Operation,
-}
-*/
-
-// Which will have an method that returns discriminator between Metadata and
Partition requests
-
-// TODO: Fill this enum
-// #[expect(unused)]
-// pub enum MessageBag {
-// Void,
-// }
-
-// impl<H> From<Message<H>> for MessageBag
-// where
-// H: ConsensusHeader,
-// {
-// fn from(_value: Message<H>) -> Self {
-// MessageBag::Void
-// }
-// }
-
-pub(crate) mod header;
+pub mod header;
+pub mod message;
diff --git a/core/consensus/Cargo.toml b/core/consensus/Cargo.toml
index 513b817d4..0b1a9c02f 100644
--- a/core/consensus/Cargo.toml
+++ b/core/consensus/Cargo.toml
@@ -28,6 +28,7 @@ repository = "https://github.com/apache/iggy"
readme = "../../../README.md"
[dependencies]
+iggy_common = { path = "../common" }
message_bus = { path = "../message_bus" }
diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index cc6084115..19bafa65d 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -16,6 +16,8 @@
// under the License.
use crate::{Consensus, Project};
+use iggy_common::header::{Command2, PrepareHeader, PrepareOkHeader,
RequestHeader};
+use iggy_common::message::Message;
use message_bus::IggyMessageBus;
use std::cell::Cell;
@@ -35,35 +37,61 @@ impl VsrConsensus {
}
}
-#[derive(Clone)]
-pub struct Request;
-
-impl Project<Prepare> for Request {
+impl Project<Message<PrepareHeader>> for Message<RequestHeader> {
type Consensus = VsrConsensus;
- fn project(self, _consensus: &Self::Consensus) -> Prepare {
- Prepare
+ fn project(self, _consensus: &Self::Consensus) -> Message<PrepareHeader> {
+ self.replace_header(|prev| {
+ PrepareHeader {
+ cluster: 0, // TODO: consesus.cluster
+ size: prev.size,
+ view: 0, // TODO: consesus view
+ release: prev.release,
+ command: Command2::Prepare,
+ replica: 0, // TODO: consesus replica
+ parent: 0, // TODO: Get this from the previous entry in the
journal (figure out how to pass that ctx here)
+ request_checksum: prev.request_checksum,
+ request: prev.request,
+ commit: 0, // TODO: consensus.commit
+ op: 0, // TODO: consensus.op
+ timestamp: 0, // TODO: consensus timestamp
+ operation: prev.operation,
+ ..Default::default()
+ }
+ })
}
}
-#[derive(Clone)]
-pub struct Prepare;
-
-impl Project<PrepareOk> for Prepare {
+impl Project<Message<PrepareOkHeader>> for Message<PrepareHeader> {
type Consensus = VsrConsensus;
- fn project(self, _consensus: &Self::Consensus) -> PrepareOk {
- PrepareOk
+ fn project(self, _consensus: &Self::Consensus) -> Message<PrepareOkHeader>
{
+ self.replace_header(|prev| {
+ PrepareOkHeader {
+ command: Command2::PrepareOk,
+ parent: prev.parent,
+ prepare_checksum: prev.checksum,
+ request: prev.request,
+ cluster: 0, // TODO: consensus.cluster
+ replica: 0, // TODO: consensus replica
+ epoch: 0, // TODO: consensus.epoch
+ // It's important to use the view of the replica, not the
received prepare!
+ view: 0, // TODO: consensus.view
+ op: prev.op,
+ commit: 0, // TODO: consensus.commit
+ timestamp: prev.timestamp,
+ operation: prev.operation,
+ // PrepareOks are only header no body
+ ..Default::default()
+ }
+ })
}
}
-#[derive(Clone)]
-pub struct PrepareOk;
-
impl Consensus for VsrConsensus {
type MessageBus = IggyMessageBus;
- type RequestMessage = Request;
- type ReplicateMessage = Prepare;
- type AckMessage = PrepareOk;
+ type RequestMessage = Message<RequestHeader>;
+ type ReplicateMessage = Message<PrepareHeader>;
+ type AckMessage = Message<PrepareOkHeader>;
fn pipeline_message(&self, _message: Self::ReplicateMessage) {
todo!()
diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml
index 5d8e71a00..1e8f32970 100644
--- a/core/metadata/Cargo.toml
+++ b/core/metadata/Cargo.toml
@@ -29,5 +29,6 @@ readme = "../../../README.md"
[dependencies]
consensus = { path = "../consensus" }
+iggy_common = { path = "../common" }
journal = { path = "../journal" }
tracing = { workspace = true }
diff --git a/core/metadata/src/impls/metadata.rs
b/core/metadata/src/impls/metadata.rs
index 7f508c347..140fa7e4c 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -14,7 +14,8 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-use consensus::{Consensus, Prepare, Project, VsrConsensus};
+use consensus::{Consensus, Project, VsrConsensus};
+use iggy_common::{header::PrepareHeader, message::Message};
use journal::Journal;
use tracing::{debug, warn};
@@ -42,7 +43,7 @@ struct IggyMetadata<M, J, S> {
impl<M, J, S> Metadata for IggyMetadata<M, J, S>
where
- J: Journal<Entry = Prepare>,
+ J: Journal<Entry = <VsrConsensus as Consensus>::ReplicateMessage>,
{
type Consensus = VsrConsensus;
type Journal = J;
@@ -74,10 +75,10 @@ where
impl<M, J, S> IggyMetadata<M, J, S>
where
- J: Journal<Entry = Prepare>,
+ J: Journal<Entry = <VsrConsensus as Consensus>::ReplicateMessage>,
{
#[expect(unused)]
- fn pipeline_prepare(&self, prepare: Prepare) {
+ fn pipeline_prepare(&self, prepare: Message<PrepareHeader>) {
debug!("inserting prepare into metadata pipeline");
self.consensus.verify_pipeline();
self.consensus.pipeline_message(prepare.clone());
@@ -86,12 +87,12 @@ where
self.consensus.post_replicate_verify(&prepare);
}
- fn fence_old_prepare(&self, _prepare: &Prepare) -> bool {
+ fn fence_old_prepare(&self, _prepare: &Message<PrepareHeader>) -> bool {
// TODO
false
}
- fn replicate(&self, _prepare: Prepare) {
+ fn replicate(&self, _prepare: Message<PrepareHeader>) {
todo!()
}
}