This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch consensus_trait_refactor
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit dfd5abffa43e162a1d9b418d4aee7f41fa4c3c77
Author: numinex <[email protected]>
AuthorDate: Tue Feb 17 22:08:05 2026 +0100

    refactor(consensus): refactor assoc types for messages
---
 core/consensus/src/impls.rs            | 80 ++++++++++++++++++++++------------
 core/consensus/src/lib.rs              | 43 +++++++++++++-----
 core/consensus/src/plane_helpers.rs    |  6 +--
 core/metadata/src/impls/metadata.rs    | 23 ++++------
 core/partitions/src/iggy_partitions.rs |  8 ++--
 5 files changed, 100 insertions(+), 60 deletions(-)

diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index 4b1e620c8..7c480d99c 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -22,7 +22,7 @@ use crate::{
 };
 use bit_set::BitSet;
 use iggy_common::header::{
-    Command2, DoViewChangeHeader, PrepareHeader, PrepareOkHeader, 
RequestHeader,
+    Command2, ConsensusHeader, DoViewChangeHeader, PrepareHeader, 
PrepareOkHeader, RequestHeader,
     StartViewChangeHeader, StartViewHeader,
 };
 use iggy_common::message::Message;
@@ -1088,27 +1088,26 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
     }
 }
 
-impl<B, P> Project<Message<PrepareHeader>, VsrConsensus<B, P>> for 
Message<RequestHeader>
-where
-    B: MessageBus,
-    P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
-{
-    fn project(self, consensus: &VsrConsensus<B, P>) -> Message<PrepareHeader> 
{
-        let op = consensus.sequencer.current_sequence() + 1;
+impl Project<Message<PrepareHeader>> for Message<RequestHeader> {
+    fn project<C>(self, consensus: &C) -> Message<PrepareHeader>
+    where
+        C: Consensus,
+    {
+        let op = consensus.current_sequence() + 1;
 
         self.transmute_header(|old, new| {
             *new = PrepareHeader {
-                cluster: consensus.cluster,
+                cluster: consensus.cluster(),
                 size: old.size,
-                view: consensus.view.get(),
+                view: consensus.view(),
                 release: old.release,
                 command: Command2::Prepare,
-                replica: consensus.replica,
+                replica: consensus.replica(),
                 client: old.client,
                 parent: 0, // TODO: Get parent checksum from the previous 
entry in the journal (figure out how to pass that ctx here)
                 request_checksum: old.request_checksum,
                 request: old.request,
-                commit: consensus.commit.get(),
+                commit: consensus.commit(),
                 op,
                 timestamp: 0, // 0 for now. Implement correct way to get 
timestamp later
                 operation: old.operation,
@@ -1119,24 +1118,23 @@ where
     }
 }
 
-impl<B, P> Project<Message<PrepareOkHeader>, VsrConsensus<B, P>> for 
Message<PrepareHeader>
-where
-    B: MessageBus,
-    P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
-{
-    fn project(self, consensus: &VsrConsensus<B, P>) -> 
Message<PrepareOkHeader> {
+impl Project<Message<PrepareOkHeader>> for Message<PrepareHeader> {
+    fn project<C>(self, consensus: &C) -> Message<PrepareOkHeader>
+    where
+        C: Consensus,
+    {
         self.transmute_header(|old, new| {
             *new = PrepareOkHeader {
                 command: Command2::PrepareOk,
                 parent: old.parent,
                 prepare_checksum: old.checksum,
                 request: old.request,
-                cluster: consensus.cluster,
-                replica: consensus.replica,
+                cluster: consensus.cluster(),
+                replica: consensus.replica(),
                 // It's important to use the view of the replica, not the 
received prepare!
-                view: consensus.view.get(),
+                view: consensus.view(),
                 op: old.op,
-                commit: consensus.commit.get(),
+                commit: consensus.commit(),
                 timestamp: old.timestamp,
                 operation: old.operation,
                 namespace: old.namespace,
@@ -1154,9 +1152,13 @@ where
 {
     type MessageBus = B;
 
-    type RequestMessage = Message<RequestHeader>;
-    type ReplicateMessage = Message<PrepareHeader>;
-    type AckMessage = Message<PrepareOkHeader>;
+    type Message<H>
+        = iggy_common::message::Message<H>
+    where
+        H: ConsensusHeader;
+    type RequestHeader = RequestHeader;
+    type ReplicateHeader = PrepareHeader;
+    type AckHeader = PrepareOkHeader;
     type Sequencer = LocalSequencer;
     type Pipeline = P;
 
@@ -1166,7 +1168,7 @@ where
     // This avoids serialization/queuing overhead and would also allow
     // reordering to WAL-first (on_replicate before pipeline_message)
     // without risking lost self-acks from dispatch timing.
-    fn pipeline_message(&self, message: Self::ReplicateMessage) {
+    fn pipeline_message(&self, message: Self::Message<Self::ReplicateHeader>) {
         assert!(self.is_primary(), "only primary can pipeline messages");
 
         let mut pipeline = self.pipeline.borrow_mut();
@@ -1178,7 +1180,7 @@ where
         pipeline.verify();
     }
 
-    fn post_replicate_verify(&self, message: &Self::ReplicateMessage) {
+    fn post_replicate_verify(&self, message: 
&Self::Message<Self::ReplicateHeader>) {
         let header = message.header();
 
         // verify the message belongs to our cluster
@@ -1220,4 +1222,28 @@ where
     fn is_syncing(&self) -> bool {
         self.is_syncing()
     }
+
+    fn sequencer(&self) -> &Self::Sequencer {
+        &self.sequencer
+    }
+
+    fn current_sequence(&self) -> u64 {
+        self.sequencer.current_sequence()
+    }
+
+    fn cluster(&self) -> u128 {
+        self.cluster
+    }
+
+    fn view(&self) -> u32 {
+        self.view.get()
+    }
+
+    fn replica(&self) -> u8 {
+        self.replica
+    }
+
+    fn commit(&self) -> u64 {
+        self.commit.get()
+    }
 }
diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs
index 17f2fb9ea..3c80865f9 100644
--- a/core/consensus/src/lib.rs
+++ b/core/consensus/src/lib.rs
@@ -15,10 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use iggy_common::header::ConsensusHeader;
 use message_bus::MessageBus;
 
-pub trait Project<T, C: Consensus> {
-    fn project(self, consensus: &C) -> T;
+pub trait Project<T> {
+    fn project<C>(self, consensus: &C) -> T
+    where
+        C: Consensus;
 }
 
 pub trait Pipeline {
@@ -49,18 +52,28 @@ pub trait Pipeline {
 
 pub trait Consensus: Sized {
     type MessageBus: MessageBus;
-    // I am wondering, whether we should create a dedicated trait for cloning, 
so it's explicit that we do ref counting.
-    type RequestMessage: Project<Self::ReplicateMessage, Self> + Clone;
-    type ReplicateMessage: Project<Self::AckMessage, Self> + Clone;
-    type AckMessage;
+    #[rustfmt::skip] // Scuffed formatter.
+    type Message<H> where H: ConsensusHeader;
+
+    type RequestHeader: ConsensusHeader;
+    type ReplicateHeader: ConsensusHeader;
+    type AckHeader: ConsensusHeader;
+
     type Sequencer: Sequencer;
-    type Pipeline: Pipeline<Message = Self::ReplicateMessage>;
+    type Pipeline: Pipeline<Message = Self::Message<Self::ReplicateHeader>>;
 
-    fn pipeline_message(&self, message: Self::ReplicateMessage);
+    fn pipeline_message(&self, message: Self::Message<Self::ReplicateHeader>);
     fn verify_pipeline(&self);
 
     // TODO: Figure out how we can achieve that without exposing such methods 
in the Consensus trait.
-    fn post_replicate_verify(&self, message: &Self::ReplicateMessage);
+    fn post_replicate_verify(&self, message: 
&Self::Message<Self::ReplicateHeader>);
+
+    fn sequencer(&self) -> &Self::Sequencer;
+    fn current_sequence(&self) -> u64;
+    fn cluster(&self) -> u128;
+    fn view(&self) -> u32;
+    fn replica(&self) -> u8;
+    fn commit(&self) -> u64;
 
     fn is_follower(&self) -> bool;
     fn is_normal(&self) -> bool;
@@ -77,9 +90,15 @@ pub trait Plane<C>
 where
     C: Consensus,
 {
-    fn on_request(&self, message: C::RequestMessage) -> impl Future<Output = 
()>;
-    fn on_replicate(&self, message: C::ReplicateMessage) -> impl Future<Output 
= ()>;
-    fn on_ack(&self, message: C::AckMessage) -> impl Future<Output = ()>;
+    fn on_request(&self, message: C::Message<C::RequestHeader>) -> impl 
Future<Output = ()>
+    where
+        C::Message<C::RequestHeader>: Project<C::Message<C::ReplicateHeader>> 
+ Clone;
+
+    fn on_replicate(&self, message: C::Message<C::ReplicateHeader>) -> impl 
Future<Output = ()>
+    where
+        C::Message<C::ReplicateHeader>: Project<C::Message<C::AckHeader>> + 
Clone;
+
+    fn on_ack(&self, message: C::Message<C::AckHeader>) -> impl Future<Output 
= ()>;
 }
 
 mod impls;
diff --git a/core/consensus/src/plane_helpers.rs 
b/core/consensus/src/plane_helpers.rs
index 60ea1dedf..4b3986067 100644
--- a/core/consensus/src/plane_helpers.rs
+++ b/core/consensus/src/plane_helpers.rs
@@ -26,12 +26,12 @@ use std::ops::AsyncFnOnce;
 /// Shared pipeline-first request flow used by metadata and partitions.
 pub async fn pipeline_prepare_common<C, F>(
     consensus: &C,
-    prepare: C::ReplicateMessage,
+    prepare: C::Message<C::ReplicateHeader>,
     on_replicate: F,
 ) where
     C: Consensus,
-    C::ReplicateMessage: Clone,
-    F: AsyncFnOnce(C::ReplicateMessage) -> (),
+    C::Message<C::ReplicateHeader>: Clone,
+    F: AsyncFnOnce(C::Message<C::ReplicateHeader>) -> (),
 {
     assert!(!consensus.is_follower(), "on_request: primary only");
     assert!(consensus.is_normal(), "on_request: status must be normal");
diff --git a/core/metadata/src/impls/metadata.rs 
b/core/metadata/src/impls/metadata.rs
index 9a81461af..4ff74f4e2 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -23,7 +23,7 @@ use consensus::{
     replicate_to_next_in_chain, send_prepare_ok as send_prepare_ok_common,
 };
 use iggy_common::{
-    header::{Command2, GenericHeader, PrepareHeader},
+    header::{Command2, GenericHeader, PrepareHeader, PrepareOkHeader, 
RequestHeader},
     message::Message,
 };
 use journal::{Journal, JournalHandle};
@@ -101,14 +101,10 @@ where
     B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
     P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
     J: JournalHandle,
-    J::Target: Journal<
-            J::Storage,
-            Entry = <VsrConsensus<B, P> as Consensus>::ReplicateMessage,
-            Header = PrepareHeader,
-        >,
+    J::Target: Journal<J::Storage, Entry = Message<PrepareHeader>, Header = 
PrepareHeader>,
     M: StateMachine<Input = Message<PrepareHeader>>,
 {
-    async fn on_request(&self, message: <VsrConsensus<B, P> as 
Consensus>::RequestMessage) {
+    async fn on_request(&self, message: <VsrConsensus<B, P> as 
Consensus>::Message<RequestHeader>) {
         let consensus = self.consensus.as_ref().unwrap();
 
         // TODO: Bunch of asserts.
@@ -117,7 +113,10 @@ where
         pipeline_prepare_common(consensus, prepare, |prepare| 
self.on_replicate(prepare)).await;
     }
 
-    async fn on_replicate(&self, message: <VsrConsensus<B, P> as 
Consensus>::ReplicateMessage) {
+    async fn on_replicate(
+        &self,
+        message: <VsrConsensus<B, P> as Consensus>::Message<PrepareHeader>,
+    ) {
         let consensus = self.consensus.as_ref().unwrap();
         let journal = self.journal.as_ref().unwrap();
 
@@ -170,7 +169,7 @@ where
         }
     }
 
-    async fn on_ack(&self, message: <VsrConsensus<B, P> as 
Consensus>::AckMessage) {
+    async fn on_ack(&self, message: <VsrConsensus<B, P> as 
Consensus>::Message<PrepareOkHeader>) {
         let consensus = self.consensus.as_ref().unwrap();
         let header = message.header();
 
@@ -245,11 +244,7 @@ where
     B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
     P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
     J: JournalHandle,
-    J::Target: Journal<
-            J::Storage,
-            Entry = <VsrConsensus<B, P> as Consensus>::ReplicateMessage,
-            Header = PrepareHeader,
-        >,
+    J::Target: Journal<J::Storage, Entry = Message<PrepareHeader>, Header = 
PrepareHeader>,
     M: StateMachine<Input = Message<PrepareHeader>>,
 {
     /// Replicate a prepare message to the next replica in the chain.
diff --git a/core/partitions/src/iggy_partitions.rs 
b/core/partitions/src/iggy_partitions.rs
index 54891dfdd..16ed124ab 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -28,7 +28,7 @@ use consensus::{
 use iggy_common::{
     INDEX_SIZE, IggyByteSize, IggyIndexesMut, IggyMessagesBatchMut, 
PartitionStats, PooledBuffer,
     Segment, SegmentStorage,
-    header::{GenericHeader, Operation, PrepareHeader},
+    header::{GenericHeader, Operation, PrepareHeader, PrepareOkHeader, 
RequestHeader},
     message::Message,
     sharding::{IggyNamespace, LocalIdx, ShardId},
 };
@@ -333,7 +333,7 @@ impl<B> Plane<VsrConsensus<B>> for 
IggyPartitions<VsrConsensus<B>>
 where
     B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
 {
-    async fn on_request(&self, message: <VsrConsensus<B> as 
Consensus>::RequestMessage) {
+    async fn on_request(&self, message: <VsrConsensus<B> as 
Consensus>::Message<RequestHeader>) {
         let consensus = self.consensus.as_ref().unwrap();
 
         debug!("handling partition request");
@@ -341,7 +341,7 @@ where
         pipeline_prepare_common(consensus, prepare, |prepare| 
self.on_replicate(prepare)).await;
     }
 
-    async fn on_replicate(&self, message: <VsrConsensus<B> as 
Consensus>::ReplicateMessage) {
+    async fn on_replicate(&self, message: <VsrConsensus<B> as 
Consensus>::Message<PrepareHeader>) {
         let consensus = self.consensus.as_ref().unwrap();
 
         let header = message.header();
@@ -383,7 +383,7 @@ where
         }
     }
 
-    async fn on_ack(&self, message: <VsrConsensus<B> as 
Consensus>::AckMessage) {
+    async fn on_ack(&self, message: <VsrConsensus<B> as 
Consensus>::Message<PrepareOkHeader>) {
         let consensus = self.consensus.as_ref().unwrap();
         let header = message.header();
 

Reply via email to