This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 971753124 feat(metadata): use new consensus messages in metadata 
module (#2414)
971753124 is described below

commit 9717531248f983a72c88356c1de449c93468d89a
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Thu Nov 27 10:42:27 2025 +0100

    feat(metadata): use new consensus messages in metadata module (#2414)
---
 Cargo.lock                                         |   2 +
 core/common/src/lib.rs                             |   1 +
 core/common/src/types/consensus/header.rs          | 154 ++++-
 .../src/types/consensus/{mod.rs => message.rs}     |  94 +--
 core/common/src/types/consensus/mod.rs             | 655 +--------------------
 core/consensus/Cargo.toml                          |   1 +
 core/consensus/src/impls.rs                        |  64 +-
 core/metadata/Cargo.toml                           |   1 +
 core/metadata/src/impls/metadata.rs                |  13 +-
 9 files changed, 242 insertions(+), 743 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 970bb914c..02e9ff060 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1921,6 +1921,7 @@ dependencies = [
 name = "consensus"
 version = "0.1.0"
 dependencies = [
+ "iggy_common",
  "message_bus",
 ]
 
@@ -5767,6 +5768,7 @@ name = "metadata"
 version = "0.1.0"
 dependencies = [
  "consensus",
+ "iggy_common",
  "journal",
  "tracing",
 ]
diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs
index ababdfbb1..032ea4bb9 100644
--- a/core/common/src/lib.rs
+++ b/core/common/src/lib.rs
@@ -70,6 +70,7 @@ pub use 
types::configuration::websocket_config::websocket_client_config::*;
 pub use 
types::configuration::websocket_config::websocket_client_config_builder::*;
 pub use 
types::configuration::websocket_config::websocket_client_reconnection_config::*;
 pub use 
types::configuration::websocket_config::websocket_connection_string_options::*;
+pub use types::consensus::*;
 pub use types::consumer::consumer_group::*;
 pub use types::consumer::consumer_kind::*;
 pub use types::consumer::consumer_offset_info::*;
diff --git a/core/common/src/types/consensus/header.rs 
b/core/common/src/types/consensus/header.rs
index 926c1cf11..f069bbc4f 100644
--- a/core/common/src/types/consensus/header.rs
+++ b/core/common/src/types/consensus/header.rs
@@ -18,20 +18,19 @@
 use bytemuck::{Pod, Zeroable};
 use thiserror::Error;
 
-#[expect(unused)]
 pub struct Header {}
 
 pub trait ConsensusHeader: Sized + Pod + Zeroable {
-    const COMMAND: Command;
+    const COMMAND: Command2;
 
     fn validate(&self) -> Result<(), ConsensusError>;
     fn size(&self) -> u32;
 }
 
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
 #[repr(u8)]
-#[expect(unused)]
-pub enum Command {
+pub enum Command2 {
+    #[default]
     Reserved = 0,
 
     Ping = 1,
@@ -49,10 +48,9 @@ pub enum Command {
 }
 
 #[derive(Debug, Clone, Error, PartialEq, Eq)]
-#[expect(unused)]
 pub enum ConsensusError {
     #[error("invalid command: expected {expected:?}, found {found:?}")]
-    InvalidCommand { expected: Command, found: Command },
+    InvalidCommand { expected: Command2, found: Command2 },
 
     #[error("invalid checksum")]
     InvalidChecksum,
@@ -67,14 +65,14 @@ pub enum ConsensusError {
     PrepareRequestChecksumPaddingNonZero,
 
     #[error("command must be Commit")]
-    CommitInvalidCommand,
+    CommitInvalidCommand2,
 
     #[error("size must be 256, found {0}")]
     CommitInvalidSize(u32),
 
     // ReplyHeader specific
     #[error("command must be Reply")]
-    ReplyInvalidCommand,
+    ReplyInvalidCommand2,
 
     #[error("request_checksum_padding must be 0")]
     ReplyRequestChecksumPaddingNonZero,
@@ -83,10 +81,11 @@ pub enum ConsensusError {
     ReplyContextPaddingNonZero,
 }
 
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
 #[repr(u8)]
-#[expect(unused)]
 pub enum Operation {
+    #[default]
+    Default = 0,
     CreateStream = 128,
     UpdateStream = 129,
     DeleteStream = 130,
@@ -120,7 +119,7 @@ pub struct GenericHeader {
     pub view: u32,
     pub release: u32,
     pub protocol: u16,
-    pub command: Command,
+    pub command: Command2,
     pub replica: u8,
     pub reserved_frame: [u8; 12],
 
@@ -131,7 +130,7 @@ unsafe impl Pod for GenericHeader {}
 unsafe impl Zeroable for GenericHeader {}
 
 impl ConsensusHeader for GenericHeader {
-    const COMMAND: Command = Command::Reserved;
+    const COMMAND: Command2 = Command2::Reserved;
 
     fn validate(&self) -> Result<(), ConsensusError> {
         Ok(())
@@ -144,6 +143,50 @@ impl ConsensusHeader for GenericHeader {
 
 #[repr(C)]
 #[derive(Debug, Clone, Copy)]
+pub struct RequestHeader {
+    pub checksum: u128,
+    pub checksum_body: u128,
+    pub cluster: u128,
+    pub size: u32,
+    pub epoch: u32,
+    pub view: u32,
+    pub release: u32,
+    pub protocol: u16,
+    pub command: Command2,
+    pub replica: u8,
+    pub reserved_frame: [u8; 12],
+
+    pub request_checksum: u128,
+    pub timestamp: u64,
+    pub request: u64,
+    pub operation: Operation,
+    pub reserved: [u8; 95],
+}
+
+unsafe impl Pod for RequestHeader {}
+unsafe impl Zeroable for RequestHeader {}
+
+impl ConsensusHeader for RequestHeader {
+    const COMMAND: Command2 = Command2::Request;
+
+    fn validate(&self) -> Result<(), ConsensusError> {
+        if self.command != Command2::Request {
+            return Err(ConsensusError::InvalidCommand {
+                expected: Command2::Request,
+                found: self.command,
+            });
+        }
+        Ok(())
+    }
+
+    fn size(&self) -> u32 {
+        self.size
+    }
+}
+
+// TODO: Manually impl default (and use a const for the `release`)
+#[repr(C)]
+#[derive(Default, Debug, Clone, Copy)]
 pub struct PrepareHeader {
     pub checksum: u128,
     pub checksum_body: u128,
@@ -153,7 +196,7 @@ pub struct PrepareHeader {
     pub view: u32,
     pub release: u32,
     pub protocol: u16,
-    pub command: Command,
+    pub command: Command2,
     pub replica: u8,
     pub reserved_frame: [u8; 12],
 
@@ -161,25 +204,24 @@ pub struct PrepareHeader {
     pub parent_padding: u128,
     pub request_checksum: u128,
     pub request_checksum_padding: u128,
-    pub checkpoint_id: u128,
     pub op: u64,
     pub commit: u64,
     pub timestamp: u64,
     pub request: u64,
     pub operation: Operation,
-    pub reserved: [u8; 3],
+    pub reserved: [u8; 19],
 }
 
 unsafe impl Pod for PrepareHeader {}
 unsafe impl Zeroable for PrepareHeader {}
 
 impl ConsensusHeader for PrepareHeader {
-    const COMMAND: Command = Command::Prepare;
+    const COMMAND: Command2 = Command2::Prepare;
 
     fn validate(&self) -> Result<(), ConsensusError> {
-        if self.command != Command::Prepare {
+        if self.command != Command2::Prepare {
             return Err(ConsensusError::InvalidCommand {
-                expected: Command::Prepare,
+                expected: Command2::Prepare,
                 found: self.command,
             });
         }
@@ -197,6 +239,61 @@ impl ConsensusHeader for PrepareHeader {
     }
 }
 
+// TODO: Manually impl default (and use a const for the `release`)
+#[repr(C)]
+#[derive(Default, Debug, Clone, Copy)]
+pub struct PrepareOkHeader {
+    pub checksum: u128,
+    pub checksum_body: u128,
+    pub cluster: u128,
+    pub size: u32,
+    pub epoch: u32,
+    pub view: u32,
+    pub release: u32,
+    pub protocol: u16,
+    pub command: Command2,
+    pub replica: u8,
+    pub reserved_frame: [u8; 12],
+
+    pub parent: u128,
+    pub parent_padding: u128,
+    pub prepare_checksum: u128,
+    pub prepare_checksum_padding: u128,
+    pub op: u64,
+    pub commit: u64,
+    pub timestamp: u64,
+    pub request: u64,
+    pub operation: Operation,
+    pub reserved: [u8; 19],
+}
+
+unsafe impl Pod for PrepareOkHeader {}
+unsafe impl Zeroable for PrepareOkHeader {}
+
+impl ConsensusHeader for PrepareOkHeader {
+    const COMMAND: Command2 = Command2::PrepareOk;
+
+    fn validate(&self) -> Result<(), ConsensusError> {
+        if self.command != Command2::PrepareOk {
+            return Err(ConsensusError::InvalidCommand {
+                expected: Command2::PrepareOk,
+                found: self.command,
+            });
+        }
+        if self.parent_padding != 0 {
+            return Err(ConsensusError::PrepareParentPaddingNonZero);
+        }
+        if self.prepare_checksum_padding != 0 {
+            return Err(ConsensusError::PrepareRequestChecksumPaddingNonZero);
+        }
+        Ok(())
+    }
+
+    fn size(&self) -> u32 {
+        self.size
+    }
+}
+
 #[repr(C)]
 #[derive(Debug, Clone, Copy)]
 pub struct CommitHeader {
@@ -208,27 +305,26 @@ pub struct CommitHeader {
     pub view: u32,
     pub release: u32,
     pub protocol: u16,
-    pub command: Command,
+    pub command: Command2,
     pub replica: u8,
     pub reserved_frame: [u8; 12],
 
     pub commit_checksum: u128,
     pub timestamp_monotonic: u64,
-    pub checkpoint_id: u128,
     pub commit: u64,
     pub checkpoint_op: u64,
-    pub reserved: [u8; 80],
+    pub reserved: [u8; 96],
 }
 
 unsafe impl Pod for CommitHeader {}
 unsafe impl Zeroable for CommitHeader {}
 
 impl ConsensusHeader for CommitHeader {
-    const COMMAND: Command = Command::Commit;
+    const COMMAND: Command2 = Command2::Commit;
 
     fn validate(&self) -> Result<(), ConsensusError> {
-        if self.command != Command::Commit {
-            return Err(ConsensusError::CommitInvalidCommand);
+        if self.command != Command2::Commit {
+            return Err(ConsensusError::CommitInvalidCommand2);
         }
         if self.size != 256 {
             return Err(ConsensusError::CommitInvalidSize(self.size));
@@ -252,7 +348,7 @@ pub struct ReplyHeader {
     pub view: u32,
     pub release: u32,
     pub protocol: u16,
-    pub command: Command,
+    pub command: Command2,
     pub replica: u8,
     pub reserved_frame: [u8; 12],
 
@@ -272,11 +368,11 @@ unsafe impl Pod for ReplyHeader {}
 unsafe impl Zeroable for ReplyHeader {}
 
 impl ConsensusHeader for ReplyHeader {
-    const COMMAND: Command = Command::Reply;
+    const COMMAND: Command2 = Command2::Reply;
 
     fn validate(&self) -> Result<(), ConsensusError> {
-        if self.command != Command::Reply {
-            return Err(ConsensusError::ReplyInvalidCommand);
+        if self.command != Command2::Reply {
+            return Err(ConsensusError::ReplyInvalidCommand2);
         }
         if self.request_checksum_padding != 0 {
             return Err(ConsensusError::ReplyRequestChecksumPaddingNonZero);
diff --git a/core/common/src/types/consensus/mod.rs 
b/core/common/src/types/consensus/message.rs
similarity index 88%
copy from core/common/src/types/consensus/mod.rs
copy to core/common/src/types/consensus/message.rs
index 9d98bf0e8..f6c638b0c 100644
--- a/core/common/src/types/consensus/mod.rs
+++ b/core/common/src/types/consensus/message.rs
@@ -15,15 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::marker::PhantomData;
-
-use bytes::Bytes;
-
 use crate::types::consensus::header::{
-    CommitHeader, ConsensusHeader, GenericHeader, PrepareHeader, ReplyHeader,
+    self, CommitHeader, ConsensusHeader, GenericHeader, PrepareHeader, 
ReplyHeader,
 };
+use bytes::Bytes;
+use std::marker::PhantomData;
 
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct Message<H: ConsensusHeader> {
     buffer: Bytes,
     _marker: PhantomData<H>,
@@ -54,7 +52,7 @@ where
         if buffer.len() < size_of::<H>() {
             return Err(header::ConsensusError::InvalidCommand {
                 expected: H::COMMAND,
-                found: header::Command::Reserved,
+                found: header::Command2::Reserved,
             });
         }
 
@@ -71,7 +69,7 @@ where
         if message.buffer.len() < header_size {
             return Err(header::ConsensusError::InvalidCommand {
                 expected: H::COMMAND,
-                found: header::Command::Reserved,
+                found: header::Command2::Reserved,
             });
         }
 
@@ -91,6 +89,30 @@ where
         }
     }
 
+    /// Replace the header with a new one, using the provided function to 
generate it.
+    pub fn replace_header<T: ConsensusHeader>(self, f: impl FnOnce(&H) -> T) 
-> Message<T> {
+        assert_eq!(size_of::<H>(), size_of::<T>());
+        let prev = self.header();
+        let header = f(prev);
+
+        let header_bytes = bytemuck::bytes_of(&header);
+        let buffer = self.into_inner();
+        // Safety: We ensured that size_of::<H>() == size_of::<T>()
+        // On top of that, there is going to be only one reference to buffer 
during this function call
+        // so no other references can observe the mutation.
+        // In the future we can replace the `Bytes` buffer with something that 
does not allow sharing between different threads.
+        unsafe {
+            let ptr = buffer.as_ptr() as *mut u8;
+            let slice = std::slice::from_raw_parts_mut(ptr, buffer.len());
+            slice[..size_of::<H>()].copy_from_slice(header_bytes);
+        }
+        // TODO: Recalculate checksums
+        Message {
+            buffer,
+            _marker: PhantomData,
+        }
+    }
+
     /// Get a reference to the header using zero-copy access.
     ///
     /// This uses `bytemuck::from_bytes` to cast the buffer to the header type
@@ -183,7 +205,7 @@ where
         if self.buffer.len() < size_of::<T>() {
             return Err(header::ConsensusError::InvalidCommand {
                 expected: T::COMMAND,
-                found: header::Command::Reserved,
+                found: header::Command2::Reserved,
             });
         }
 
@@ -214,7 +236,7 @@ where
         if self.buffer.len() < size_of::<T>() {
             return Err(header::ConsensusError::InvalidCommand {
                 expected: T::COMMAND,
-                found: header::Command::Reserved,
+                found: header::Command2::Reserved,
             });
         }
 
@@ -247,7 +269,7 @@ pub enum MessageBag {
 
 impl MessageBag {
     #[allow(unused)]
-    pub fn command(&self) -> header::Command {
+    pub fn command(&self) -> header::Command2 {
         match self {
             MessageBag::Generic(message) => message.header().command,
             MessageBag::Prepare(message) => message.header().command,
@@ -279,21 +301,21 @@ where
         // SAFETY: All Message<H> types have identical memory layout (only 
PhantomData differs).
         // We've validated the command when the original message was created.
         match command {
-            header::Command::Prepare => {
+            header::Command2::Prepare => {
                 let msg =
                     unsafe { 
Message::<header::PrepareHeader>::from_buffer_unchecked(buffer) };
                 MessageBag::Prepare(msg)
             }
-            header::Command::Commit => {
+            header::Command2::Commit => {
                 let msg = unsafe { 
Message::<header::CommitHeader>::from_buffer_unchecked(buffer) };
                 MessageBag::Commit(msg)
             }
-            header::Command::Reply => {
+            header::Command2::Reply => {
                 let msg = unsafe { 
Message::<header::ReplyHeader>::from_buffer_unchecked(buffer) };
                 MessageBag::Reply(msg)
             }
             _ => unreachable!(
-                "For now we only support Prepare, Commit, and Reply. In the 
future we will support more commands. Command: {command:?}"
+                "For now we only support Prepare, Commit, and Reply. In the 
future we will support more commands. Command2: {command:?}"
             ),
         }
     }
@@ -322,7 +344,7 @@ mod tests {
             header.checksum = 123456;
             header.cluster = 12345;
             header.size = total_size as u32;
-            header.command = header::Command::Reserved;
+            header.command = header::Command2::Reserved;
 
             for (i, item) in buffer
                 .iter_mut()
@@ -352,7 +374,7 @@ mod tests {
             header.cluster = 12345;
             header.size = total_size as u32;
             header.view = 1;
-            header.command = header::Command::Prepare;
+            header.command = header::Command2::Prepare;
             header.replica = 1;
             header.op = 100;
             header.commit = 99;
@@ -376,7 +398,7 @@ mod tests {
             header.cluster = 12345;
             header.size = 256;
             header.view = 1;
-            header.command = header::Command::Commit;
+            header.command = header::Command2::Commit;
             header.replica = 2;
             header.commit = 50;
 
@@ -398,7 +420,7 @@ mod tests {
             header.cluster = 12345;
             header.size = total_size as u32;
             header.view = 1;
-            header.command = header::Command::Reply;
+            header.command = header::Command2::Reply;
             header.replica = 3;
             header.op = 100;
             header.commit = 99;
@@ -413,7 +435,7 @@ mod tests {
         let message = header::GenericHeader::create_test();
 
         assert_eq!(message.header().cluster, 12345);
-        assert_eq!(message.header().command, header::Command::Reserved);
+        assert_eq!(message.header().command, header::Command2::Reserved);
         assert_eq!(
             message.body().len(),
             message.header().size() as usize - 
size_of::<header::GenericHeader>()
@@ -434,7 +456,7 @@ mod tests {
         let original_bytes = prepare_message.as_bytes().to_vec();
 
         let generic_message = prepare_message.into_generic();
-        assert_eq!(generic_message.header().command, header::Command::Prepare);
+        assert_eq!(generic_message.header().command, 
header::Command2::Prepare);
 
         let prepare_again: Message<header::PrepareHeader> =
             generic_message.try_into_typed().unwrap();
@@ -456,7 +478,7 @@ mod tests {
         let prepare = header::PrepareHeader::create_test();
         let bag = MessageBag::from(prepare);
 
-        assert_eq!(bag.command(), header::Command::Prepare);
+        assert_eq!(bag.command(), header::Command2::Prepare);
         assert!(matches!(bag, MessageBag::Prepare(_)));
         assert!(!matches!(bag, MessageBag::Commit(_)));
         assert!(!matches!(bag, MessageBag::Reply(_)));
@@ -468,7 +490,7 @@ mod tests {
         let commit = header::CommitHeader::create_test();
         let bag = MessageBag::from(commit);
 
-        assert_eq!(bag.command(), header::Command::Commit);
+        assert_eq!(bag.command(), header::Command2::Commit);
         assert!(!matches!(bag, MessageBag::Prepare(_)));
         assert!(matches!(bag, MessageBag::Commit(_)));
         assert!(!matches!(bag, MessageBag::Reply(_)));
@@ -480,7 +502,7 @@ mod tests {
         let reply = header::ReplyHeader::create_test();
         let bag = MessageBag::from(reply);
 
-        assert_eq!(bag.command(), header::Command::Reply);
+        assert_eq!(bag.command(), header::Command2::Reply);
         assert!(!matches!(bag, MessageBag::Prepare(_)));
         assert!(!matches!(bag, MessageBag::Commit(_)));
         assert!(matches!(bag, MessageBag::Reply(_)));
@@ -500,17 +522,17 @@ pub struct Message<H: Header = GenericHeader> {
 
 // Trait that all headers must implement
 pub trait Header: Sized {
-    const COMMAND: Command;
+    const COMMAND: Command2;
 
     fn size(&self) -> u32;
-    fn command(&self) -> Command;
+    fn command(&self) -> Command2;
     fn checksum(&self) -> u128;
 }
 
-// Command enum (simplified)
+// Command2 enum (simplified)
 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
 #[repr(u8)]
-pub enum Command {
+pub enum Command2 {
     reserved = 0,
 
     ping = 1,
@@ -532,7 +554,7 @@ pub enum Command {
 #[repr(C)]
 pub struct GenericHeader {
     checksum: u128,
-    command: Command,
+    command: Command2,
     size: u32,
     // ... other fields
 }
@@ -541,10 +563,10 @@ pub struct GenericHeader {
 // second 128 bytes specific header (PrepareHeader, CommitHeader, etc....)
 
 impl Header for GenericHeader {
-    const COMMAND: Command = Command::Reserved;
+    const COMMAND: Command2 = Command::Reserved;
 
     fn size(&self) -> u32 { self.size }
-    fn command(&self) -> Command { self.command }
+    fn command(&self) -> Command2 { self.command }
     fn checksum(&self) -> u128 { self.checksum }
 }
 
@@ -552,7 +574,7 @@ impl Header for GenericHeader {
 #[repr(C)]
 pub struct PrepareHeader {
     checksum: u128,
-    command: Command,
+    command: Command2,
     size: u32,
     // ... prepare-specific fields
     view: u32,
@@ -561,10 +583,10 @@ pub struct PrepareHeader {
 }
 
 impl Header for PrepareHeader {
-    const COMMAND: Command = Command::Prepare;
+    const COMMAND: Command2 = Command::Prepare;
 
     fn size(&self) -> u32 { self.size }
-    fn command(&self) -> Command { self.command }
+    fn command(&self) -> Command2 { self.command }
     fn checksum(&self) -> u128 { self.checksum }
 }
 */
@@ -638,7 +660,7 @@ pub enum Operation {
 #[repr(C)]
 pub struct PrepareHeader {
     checksum: u128,
-    command: Command,
+    command: Command2,
     size: u32,
     // ... prepare-specific fields
     view: u32,
@@ -666,5 +688,3 @@ pub struct PrepareHeader {
 //         MessageBag::Void
 //     }
 // }
-
-pub(crate) mod header;
diff --git a/core/common/src/types/consensus/mod.rs 
b/core/common/src/types/consensus/mod.rs
index 9d98bf0e8..61bfb7bcb 100644
--- a/core/common/src/types/consensus/mod.rs
+++ b/core/common/src/types/consensus/mod.rs
@@ -15,656 +15,5 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::marker::PhantomData;
-
-use bytes::Bytes;
-
-use crate::types::consensus::header::{
-    CommitHeader, ConsensusHeader, GenericHeader, PrepareHeader, ReplyHeader,
-};
-
-#[derive(Debug)]
-pub struct Message<H: ConsensusHeader> {
-    buffer: Bytes,
-    _marker: PhantomData<H>,
-}
-
-impl<H> Message<H>
-where
-    H: ConsensusHeader,
-{
-    /// Create a new message from a buffer.
-    ///
-    /// # Safety
-    ///
-    /// The buffer must:
-    /// - be at least `size_of::<H>()` bytes long
-    /// - contain a valid header at the start
-    /// - be properly aligned for type H
-    ///
-    /// # Errors
-    ///
-    /// Returns an error if:
-    /// - buffer is too small for the header
-    /// - buffer is too small for the size specified in the header
-    /// - header validation fails
-    #[allow(unused)]
-    pub fn from_bytes(buffer: Bytes) -> Result<Self, header::ConsensusError> {
-        // verify minimum size
-        if buffer.len() < size_of::<H>() {
-            return Err(header::ConsensusError::InvalidCommand {
-                expected: H::COMMAND,
-                found: header::Command::Reserved,
-            });
-        }
-
-        let message = Self {
-            buffer,
-            _marker: PhantomData,
-        };
-
-        // validate the header
-        message.header().validate()?;
-
-        // verify buffer size matches header.size
-        let header_size = message.header().size() as usize;
-        if message.buffer.len() < header_size {
-            return Err(header::ConsensusError::InvalidCommand {
-                expected: H::COMMAND,
-                found: header::Command::Reserved,
-            });
-        }
-
-        Ok(message)
-    }
-
-    /// Create a new message with a specific size, initializing the buffer 
with zeros.
-    ///
-    /// The header will be zeroed and must be initialized by the caller.
-    #[allow(unused)]
-    pub fn new(size: usize) -> Self {
-        assert!(size >= size_of::<H>(), "Size must be at least header size");
-        let buffer = Bytes::from(vec![0u8; size]);
-        Self {
-            buffer,
-            _marker: PhantomData,
-        }
-    }
-
-    /// Get a reference to the header using zero-copy access.
-    ///
-    /// This uses `bytemuck::from_bytes` to cast the buffer to the header type
-    /// without any copying or allocation.
-    #[inline]
-    #[allow(unused)]
-    pub fn header(&self) -> &H {
-        let header_bytes = &self.buffer[..size_of::<H>()];
-        bytemuck::from_bytes(header_bytes)
-    }
-
-    /// Get a reference to the message body (everything after the header).
-    ///
-    /// Returns an empty slice if there is no body.
-    #[inline]
-    #[allow(unused)]
-    pub fn body(&self) -> &[u8] {
-        let header_size = size_of::<H>();
-        let total_size = self.header().size() as usize;
-
-        if total_size > header_size {
-            &self.buffer[header_size..total_size]
-        } else {
-            &[]
-        }
-    }
-
-    /// Get the complete message as bytes (header + body).
-    #[inline]
-    #[allow(unused)]
-    pub fn as_bytes(&self) -> &[u8] {
-        let total_size = self.header().size() as usize;
-        &self.buffer[..total_size]
-    }
-
-    /// Convert into the underlying buffer.
-    #[inline]
-    #[allow(unused)]
-    pub fn into_inner(self) -> Bytes {
-        self.buffer
-    }
-
-    /// Create a message from a buffer without validation.
-    ///
-    /// # Safety
-    ///
-    /// This is private and skips validation. Use only when:
-    /// - The buffer is already validated
-    /// - If doing a zero-cost type conversion (like to GenericHeader)
-    #[inline]
-    #[allow(unused)]
-    unsafe fn from_buffer_unchecked(buffer: Bytes) -> Self {
-        Self {
-            buffer,
-            _marker: PhantomData,
-        }
-    }
-
-    /// Convert to a generic message (erasing the specific header type).
-    ///
-    /// This allows treating any message as a generic message for common 
operations.
-    #[allow(unused)]
-    pub fn into_generic(self) -> Message<header::GenericHeader> {
-        unsafe { Message::from_buffer_unchecked(self.buffer) }
-    }
-
-    /// Get a reference to this message as a generic message.
-    #[allow(unused)]
-    pub fn as_generic(&self) -> &Message<header::GenericHeader> {
-        // SAFETY: Message<H> and Message<GenericHeader> have the same memory 
layout
-        // because they only differ in the PhantomData type parameter
-        unsafe { &*(self as *const Self as *const 
Message<header::GenericHeader>) }
-    }
-
-    /// Try to convert this message to a different header type.
-    ///
-    /// This validates that the command in the header matches the target type's
-    /// expected command before performing the conversion.
-    ///
-    /// # Errors
-    ///
-    /// Returns an error if:
-    /// - The command doesn't match the target type
-    /// - The target header validation fails
-    #[allow(unused)]
-    pub fn try_into_typed<T>(self) -> Result<Message<T>, 
header::ConsensusError>
-    where
-        T: ConsensusHeader,
-    {
-        if self.buffer.len() < size_of::<T>() {
-            return Err(header::ConsensusError::InvalidCommand {
-                expected: T::COMMAND,
-                found: header::Command::Reserved,
-            });
-        }
-
-        let generic = self.as_generic();
-        if generic.header().command != T::COMMAND {
-            return Err(header::ConsensusError::InvalidCommand {
-                expected: T::COMMAND,
-                found: generic.header().command,
-            });
-        }
-
-        let new_message = unsafe { 
Message::<T>::from_buffer_unchecked(self.buffer) };
-
-        new_message.header().validate()?;
-
-        Ok(new_message)
-    }
-
-    /// Try to get a reference to this message as a different header type.
-    ///
-    /// This is similar to `try_into_typed` but borrows instead of consuming.
-    #[allow(unused)]
-    pub fn try_as_typed<T>(&self) -> Result<&Message<T>, 
header::ConsensusError>
-    where
-        T: ConsensusHeader,
-    {
-        // check buffer size
-        if self.buffer.len() < size_of::<T>() {
-            return Err(header::ConsensusError::InvalidCommand {
-                expected: T::COMMAND,
-                found: header::Command::Reserved,
-            });
-        }
-
-        // check the command matches
-        let generic = self.as_generic();
-        if generic.header().command != T::COMMAND {
-            return Err(header::ConsensusError::InvalidCommand {
-                expected: T::COMMAND,
-                found: generic.header().command,
-            });
-        }
-
-        let typed_message = unsafe { &*(self as *const Self as *const 
Message<T>) };
-
-        // validate the header
-        typed_message.header().validate()?;
-
-        Ok(typed_message)
-    }
-}
-
-#[derive(Debug)]
-#[allow(unused)]
-pub enum MessageBag {
-    Generic(Message<GenericHeader>),
-    Prepare(Message<PrepareHeader>),
-    Commit(Message<CommitHeader>),
-    Reply(Message<ReplyHeader>),
-}
-
-impl MessageBag {
-    #[allow(unused)]
-    pub fn command(&self) -> header::Command {
-        match self {
-            MessageBag::Generic(message) => message.header().command,
-            MessageBag::Prepare(message) => message.header().command,
-            MessageBag::Commit(message) => message.header().command,
-            MessageBag::Reply(message) => message.header().command,
-        }
-    }
-
-    #[allow(unused)]
-    pub fn size(&self) -> u32 {
-        match self {
-            MessageBag::Generic(message) => message.header().size(),
-            MessageBag::Prepare(message) => message.header().size(),
-            MessageBag::Commit(message) => message.header().size(),
-            MessageBag::Reply(message) => message.header().size(),
-        }
-    }
-}
-
-impl<T> From<Message<T>> for MessageBag
-where
-    T: ConsensusHeader,
-{
-    fn from(value: Message<T>) -> Self {
-        let command = value.as_generic().header().command;
-
-        let buffer = value.into_inner();
-
-        // SAFETY: All Message<H> types have identical memory layout (only 
PhantomData differs).
-        // We've validated the command when the original message was created.
-        match command {
-            header::Command::Prepare => {
-                let msg =
-                    unsafe { 
Message::<header::PrepareHeader>::from_buffer_unchecked(buffer) };
-                MessageBag::Prepare(msg)
-            }
-            header::Command::Commit => {
-                let msg = unsafe { 
Message::<header::CommitHeader>::from_buffer_unchecked(buffer) };
-                MessageBag::Commit(msg)
-            }
-            header::Command::Reply => {
-                let msg = unsafe { 
Message::<header::ReplyHeader>::from_buffer_unchecked(buffer) };
-                MessageBag::Reply(msg)
-            }
-            _ => unreachable!(
-                "For now we only support Prepare, Commit, and Reply. In the 
future we will support more commands. Command: {command:?}"
-            ),
-        }
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use bytes::BytesMut;
-
-    use super::*;
-
-    trait MessageFactory: ConsensusHeader + Sized {
-        fn create_test() -> Message<Self>;
-    }
-
-    impl MessageFactory for header::GenericHeader {
-        fn create_test() -> Message<Self> {
-            let header_size = size_of::<Self>();
-            let body_size = 128;
-            let total_size = header_size + body_size;
-
-            let mut buffer = BytesMut::zeroed(total_size);
-
-            let header = bytemuck::from_bytes_mut::<Self>(&mut 
buffer[..header_size]);
-
-            header.checksum = 123456;
-            header.cluster = 12345;
-            header.size = total_size as u32;
-            header.command = header::Command::Reserved;
-
-            for (i, item) in buffer
-                .iter_mut()
-                .enumerate()
-                .take(total_size)
-                .skip(header_size)
-            {
-                *item = (i % 256) as u8;
-            }
-
-            Message::<Self>::from_bytes(buffer.freeze()).unwrap()
-        }
-    }
-
-    impl MessageFactory for header::PrepareHeader {
-        fn create_test() -> Message<Self> {
-            let header_size = size_of::<Self>();
-            let body_size = 64;
-            let total_size = header_size + body_size;
-
-            let mut buffer = BytesMut::zeroed(total_size);
-
-            let header = bytemuck::from_bytes_mut::<Self>(&mut 
buffer[..header_size]);
-
-            header.checksum = 123456;
-            header.checksum_body = 789012;
-            header.cluster = 12345;
-            header.size = total_size as u32;
-            header.view = 1;
-            header.command = header::Command::Prepare;
-            header.replica = 1;
-            header.op = 100;
-            header.commit = 99;
-            header.timestamp = 1234567890;
-            header.operation = header::Operation::CreateStream;
-
-            Message::<Self>::from_bytes(buffer.freeze()).unwrap()
-        }
-    }
-
-    impl MessageFactory for header::CommitHeader {
-        fn create_test() -> Message<Self> {
-            let header_size = size_of::<Self>();
-            let total_size = 256;
-
-            let mut buffer = BytesMut::zeroed(total_size);
-
-            let header = bytemuck::from_bytes_mut::<Self>(&mut 
buffer[..header_size]);
-
-            header.checksum = 123456;
-            header.cluster = 12345;
-            header.size = 256;
-            header.view = 1;
-            header.command = header::Command::Commit;
-            header.replica = 2;
-            header.commit = 50;
-
-            Message::<Self>::from_bytes(buffer.freeze()).unwrap()
-        }
-    }
-
-    impl MessageFactory for header::ReplyHeader {
-        fn create_test() -> Message<Self> {
-            let header_size = size_of::<Self>();
-            let body_size = 32;
-            let total_size = header_size + body_size;
-
-            let mut buffer = BytesMut::zeroed(total_size);
-
-            let header = bytemuck::from_bytes_mut::<Self>(&mut 
buffer[..header_size]);
-
-            header.checksum = 123456;
-            header.cluster = 12345;
-            header.size = total_size as u32;
-            header.view = 1;
-            header.command = header::Command::Reply;
-            header.replica = 3;
-            header.op = 100;
-            header.commit = 99;
-            header.operation = header::Operation::CreateStream;
-
-            Message::<Self>::from_bytes(buffer.freeze()).unwrap()
-        }
-    }
-
-    #[test]
-    fn test_message_creation_and_access() {
-        let message = header::GenericHeader::create_test();
-
-        assert_eq!(message.header().cluster, 12345);
-        assert_eq!(message.header().command, header::Command::Reserved);
-        assert_eq!(
-            message.body().len(),
-            message.header().size() as usize - 
size_of::<header::GenericHeader>()
-        );
-
-        let body = message.body();
-        let header_size = size_of::<header::GenericHeader>();
-        for (i, &byte) in body.iter().enumerate() {
-            let expected = ((i + header_size) % 256) as u8;
-            assert_eq!(byte, expected);
-        }
-    }
-
-    #[test]
-    fn test_message_conversion() {
-        let prepare_message = header::PrepareHeader::create_test();
-
-        let original_bytes = prepare_message.as_bytes().to_vec();
-
-        let generic_message = prepare_message.into_generic();
-        assert_eq!(generic_message.header().command, header::Command::Prepare);
-
-        let prepare_again: Message<header::PrepareHeader> =
-            generic_message.try_into_typed().unwrap();
-
-        assert_eq!(prepare_again.header().op, 100);
-        assert_eq!(prepare_again.header().view, 1);
-        assert_eq!(prepare_again.header().cluster, 12345);
-
-        let roundtrip_bytes = prepare_again.as_bytes().to_vec();
-
-        assert_eq!(
-            original_bytes, roundtrip_bytes,
-            "Bytes should be identical after round-trip conversion"
-        );
-    }
-
-    #[test]
-    fn test_message_bag_from_prepare() {
-        let prepare = header::PrepareHeader::create_test();
-        let bag = MessageBag::from(prepare);
-
-        assert_eq!(bag.command(), header::Command::Prepare);
-        assert!(matches!(bag, MessageBag::Prepare(_)));
-        assert!(!matches!(bag, MessageBag::Commit(_)));
-        assert!(!matches!(bag, MessageBag::Reply(_)));
-        assert!(!matches!(bag, MessageBag::Generic(_)));
-    }
-
-    #[test]
-    fn test_message_bag_from_commit() {
-        let commit = header::CommitHeader::create_test();
-        let bag = MessageBag::from(commit);
-
-        assert_eq!(bag.command(), header::Command::Commit);
-        assert!(!matches!(bag, MessageBag::Prepare(_)));
-        assert!(matches!(bag, MessageBag::Commit(_)));
-        assert!(!matches!(bag, MessageBag::Reply(_)));
-        assert!(!matches!(bag, MessageBag::Generic(_)));
-    }
-
-    #[test]
-    fn test_message_bag_from_reply() {
-        let reply = header::ReplyHeader::create_test();
-        let bag = MessageBag::from(reply);
-
-        assert_eq!(bag.command(), header::Command::Reply);
-        assert!(!matches!(bag, MessageBag::Prepare(_)));
-        assert!(!matches!(bag, MessageBag::Commit(_)));
-        assert!(matches!(bag, MessageBag::Reply(_)));
-        assert!(!matches!(bag, MessageBag::Generic(_)));
-    }
-}
-
-// TODO: Header generic
-// TODO: We will have to impl something like this (NOT ONE TO ONE JUST A 
SKETCH) as we will use `bytemuck`:
-/*
-// Generic Message type
-#[repr(C)]
-pub struct Message<H: Header = GenericHeader> {
-    buffer: AlignedBuffer<ALIGNED_TO_HEADER_SIZE>,
-    _phantom: PhantomData<H>,
-}
-
-// Trait that all headers must implement
-pub trait Header: Sized {
-    const COMMAND: Command;
-
-    fn size(&self) -> u32;
-    fn command(&self) -> Command;
-    fn checksum(&self) -> u128;
-}
-
-// Command enum (simplified)
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-#[repr(u8)]
-pub enum Command {
-    reserved = 0,
-
-    ping = 1,
-    pong = 2,
-
-    request = 5,
-    prepare = 6,
-    prepare_ok = 7,
-    reply = 8,
-    commit = 9,
-
-    start_view_change = 10,
-    do_view_change = 11,
-    start_view = 24,
-    // ... etc
-}
-
-// Generic header for base Message
-#[repr(C)]
-pub struct GenericHeader {
-    checksum: u128,
-    command: Command,
-    size: u32,
-    // ... other fields
-}
-
-// first 128 bytes GenericHeader (checksum, command, size, etc....)
-// second 128 bytes specific header (PrepareHeader, CommitHeader, etc....)
-
-impl Header for GenericHeader {
-    const COMMAND: Command = Command::Reserved;
-
-    fn size(&self) -> u32 { self.size }
-    fn command(&self) -> Command { self.command }
-    fn checksum(&self) -> u128 { self.checksum }
-}
-
-// Specific header types
-#[repr(C)]
-pub struct PrepareHeader {
-    checksum: u128,
-    command: Command,
-    size: u32,
-    // ... prepare-specific fields
-    view: u32,
-    op: u64,
-    commit: u64,
-}
-
-impl Header for PrepareHeader {
-    const COMMAND: Command = Command::Prepare;
-
-    fn size(&self) -> u32 { self.size }
-    fn command(&self) -> Command { self.command }
-    fn checksum(&self) -> u128 { self.checksum }
-}
-*/
-
-// And then for Message impl
-/*
-impl<H: Header> Message<H> {
-    // Access header (no stored pointer needed!)
-    pub fn header(&self) -> &H {
-        assert!(size_of::<H>() <= ALIGNED_TO_HEADER_SIZE);
-        unsafe { &*(self.buffer.as_ptr() as *const H) }
-    }
-
-    pub fn header_mut(&mut self) -> &mut H {
-        unsafe { &mut *(self.buffer.as_mut_ptr() as *mut H) }
-    }
-
-    pub fn body(&self) -> &[u8] {
-        let header_size = size_of::<H>();
-        let total_size = self.header().size() as usize;
-        &self.buffer[header_size..total_size]
-    }
-
-    pub fn body_mut(&mut self) -> &mut [u8] {
-        let header_size = size_of::<H>();
-        let total_size = self.header().size() as usize;
-        &mut self.buffer.as_mut()[header_size..total_size]
-    }
-
-    pub fn as_bytes(&self) -> &[u8] {
-        &self.buffer[..self.header().size() as usize]
-    }
-
-    pub fn base(&self) -> &Message<GenericHeader> {
-        unsafe { &*(self as *const Self as *const Message<GenericHeader>) }
-    }
-
-    pub fn base_mut(&mut self) -> &mut Message<GenericHeader> {
-        unsafe { &mut *(self as *mut Self as *mut Message<GenericHeader>) }
-    }
-
-    pub fn try_into<T: Header>(self) -> Option<Message<T>> {
-        if self.header().command() == T::COMMAND {
-            Some(unsafe { std::mem::transmute(self) })
-        } else {
-            None
-        }
-    }
-}
-*/
-
-// Then for the `Request` header we will have to store an `Operation` enum
-/*
- Define your operation enum
-#[repr(u8)]
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub enum Operation {
-    // Metadata operations
-    CreateTopic = 1,
-    DeleteTopic = 2,
-    ListTopics = 3,
-    CreatePartition = 4,
-
-    // Partition operations
-    Produce = 100,
-    Consume = 101,
-    Fetch = 102,
-}
-
-// Specific header types
-#[repr(C)]
-pub struct PrepareHeader {
-    checksum: u128,
-    command: Command,
-    size: u32,
-    // ... prepare-specific fields
-    view: u32,
-    op: u64,
-    commit: u64,
-
-    // second 128 bytes
-    operation: Operation,
-}
-*/
-
-// Which will have an method that returns discriminator between Metadata and 
Partition requests
-
-// TODO: Fill this enum
-// #[expect(unused)]
-// pub enum MessageBag {
-//     Void,
-// }
-
-// impl<H> From<Message<H>> for MessageBag
-// where
-//     H: ConsensusHeader,
-// {
-//     fn from(_value: Message<H>) -> Self {
-//         MessageBag::Void
-//     }
-// }
-
-pub(crate) mod header;
+pub mod header;
+pub mod message;
diff --git a/core/consensus/Cargo.toml b/core/consensus/Cargo.toml
index 513b817d4..0b1a9c02f 100644
--- a/core/consensus/Cargo.toml
+++ b/core/consensus/Cargo.toml
@@ -28,6 +28,7 @@ repository = "https://github.com/apache/iggy";
 readme = "../../../README.md"
 
 [dependencies]
+iggy_common = { path = "../common" }
 message_bus = { path = "../message_bus" }
 
 
diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index cc6084115..19bafa65d 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -16,6 +16,8 @@
 // under the License.
 
 use crate::{Consensus, Project};
+use iggy_common::header::{Command2, PrepareHeader, PrepareOkHeader, 
RequestHeader};
+use iggy_common::message::Message;
 use message_bus::IggyMessageBus;
 use std::cell::Cell;
 
@@ -35,35 +37,61 @@ impl VsrConsensus {
     }
 }
 
-#[derive(Clone)]
-pub struct Request;
-
-impl Project<Prepare> for Request {
+impl Project<Message<PrepareHeader>> for Message<RequestHeader> {
     type Consensus = VsrConsensus;
-    fn project(self, _consensus: &Self::Consensus) -> Prepare {
-        Prepare
+    fn project(self, _consensus: &Self::Consensus) -> Message<PrepareHeader> {
+        self.replace_header(|prev| {
+            PrepareHeader {
+                cluster: 0, // TODO: consesus.cluster
+                size: prev.size,
+                view: 0, // TODO: consesus view
+                release: prev.release,
+                command: Command2::Prepare,
+                replica: 0, // TODO: consesus replica
+                parent: 0, // TODO: Get this from the previous entry in the 
journal (figure out how to pass that ctx here)
+                request_checksum: prev.request_checksum,
+                request: prev.request,
+                commit: 0,    // TODO: consensus.commit
+                op: 0,        // TODO: consensus.op
+                timestamp: 0, // TODO: consensus timestamp
+                operation: prev.operation,
+                ..Default::default()
+            }
+        })
     }
 }
 
-#[derive(Clone)]
-pub struct Prepare;
-
-impl Project<PrepareOk> for Prepare {
+impl Project<Message<PrepareOkHeader>> for Message<PrepareHeader> {
     type Consensus = VsrConsensus;
-    fn project(self, _consensus: &Self::Consensus) -> PrepareOk {
-        PrepareOk
+    fn project(self, _consensus: &Self::Consensus) -> Message<PrepareOkHeader> 
{
+        self.replace_header(|prev| {
+            PrepareOkHeader {
+                command: Command2::PrepareOk,
+                parent: prev.parent,
+                prepare_checksum: prev.checksum,
+                request: prev.request,
+                cluster: 0, // TODO: consensus.cluster
+                replica: 0, // TODO: consensus replica
+                epoch: 0,   // TODO: consensus.epoch
+                // It's important to use the view of the replica, not the 
received prepare!
+                view: 0, // TODO: consensus.view
+                op: prev.op,
+                commit: 0, // TODO: consensus.commit
+                timestamp: prev.timestamp,
+                operation: prev.operation,
+                // PrepareOks are only header no body
+                ..Default::default()
+            }
+        })
     }
 }
 
-#[derive(Clone)]
-pub struct PrepareOk;
-
 impl Consensus for VsrConsensus {
     type MessageBus = IggyMessageBus;
 
-    type RequestMessage = Request;
-    type ReplicateMessage = Prepare;
-    type AckMessage = PrepareOk;
+    type RequestMessage = Message<RequestHeader>;
+    type ReplicateMessage = Message<PrepareHeader>;
+    type AckMessage = Message<PrepareOkHeader>;
 
     fn pipeline_message(&self, _message: Self::ReplicateMessage) {
         todo!()
diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml
index 5d8e71a00..1e8f32970 100644
--- a/core/metadata/Cargo.toml
+++ b/core/metadata/Cargo.toml
@@ -29,5 +29,6 @@ readme = "../../../README.md"
 
 [dependencies]
 consensus = { path = "../consensus" }
+iggy_common = { path = "../common" }
 journal = { path = "../journal" }
 tracing = { workspace = true }
diff --git a/core/metadata/src/impls/metadata.rs 
b/core/metadata/src/impls/metadata.rs
index 7f508c347..140fa7e4c 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -14,7 +14,8 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-use consensus::{Consensus, Prepare, Project, VsrConsensus};
+use consensus::{Consensus, Project, VsrConsensus};
+use iggy_common::{header::PrepareHeader, message::Message};
 use journal::Journal;
 use tracing::{debug, warn};
 
@@ -42,7 +43,7 @@ struct IggyMetadata<M, J, S> {
 
 impl<M, J, S> Metadata for IggyMetadata<M, J, S>
 where
-    J: Journal<Entry = Prepare>,
+    J: Journal<Entry = <VsrConsensus as Consensus>::ReplicateMessage>,
 {
     type Consensus = VsrConsensus;
     type Journal = J;
@@ -74,10 +75,10 @@ where
 
 impl<M, J, S> IggyMetadata<M, J, S>
 where
-    J: Journal<Entry = Prepare>,
+    J: Journal<Entry = <VsrConsensus as Consensus>::ReplicateMessage>,
 {
     #[expect(unused)]
-    fn pipeline_prepare(&self, prepare: Prepare) {
+    fn pipeline_prepare(&self, prepare: Message<PrepareHeader>) {
         debug!("inserting prepare into metadata pipeline");
         self.consensus.verify_pipeline();
         self.consensus.pipeline_message(prepare.clone());
@@ -86,12 +87,12 @@ where
         self.consensus.post_replicate_verify(&prepare);
     }
 
-    fn fence_old_prepare(&self, _prepare: &Prepare) -> bool {
+    fn fence_old_prepare(&self, _prepare: &Message<PrepareHeader>) -> bool {
         // TODO
         false
     }
 
-    fn replicate(&self, _prepare: Prepare) {
+    fn replicate(&self, _prepare: Message<PrepareHeader>) {
         todo!()
     }
 }


Reply via email to