This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new f428078af refactor(consensus): couple message assoc type (#2761)
f428078af is described below
commit f428078af85cd218c9a2a96163157bcd99b37c9d
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Wed Feb 18 10:05:28 2026 +0100
refactor(consensus): couple message assoc type (#2761)
Binds together message type for request/replicate/ack to particular
message type using family pattern.
---
core/consensus/src/impls.rs | 22 +++++++++++++--------
core/consensus/src/lib.rs | 35 +++++++++++++++++++++++-----------
core/consensus/src/plane_helpers.rs | 6 +++---
core/metadata/src/impls/metadata.rs | 23 +++++++++-------------
core/partitions/src/iggy_partitions.rs | 8 ++++----
5 files changed, 54 insertions(+), 40 deletions(-)
diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index 4b1e620c8..2b6a8c410 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -22,7 +22,7 @@ use crate::{
};
use bit_set::BitSet;
use iggy_common::header::{
- Command2, DoViewChangeHeader, PrepareHeader, PrepareOkHeader,
RequestHeader,
+ Command2, ConsensusHeader, DoViewChangeHeader, PrepareHeader,
PrepareOkHeader, RequestHeader,
StartViewChangeHeader, StartViewHeader,
};
use iggy_common::message::Message;
@@ -1093,7 +1093,9 @@ where
B: MessageBus,
P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
{
- fn project(self, consensus: &VsrConsensus<B, P>) -> Message<PrepareHeader>
{
+ type Consensus = VsrConsensus<B, P>;
+
+ fn project(self, consensus: &Self::Consensus) -> Message<PrepareHeader> {
let op = consensus.sequencer.current_sequence() + 1;
self.transmute_header(|old, new| {
@@ -1124,7 +1126,9 @@ where
B: MessageBus,
P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
{
- fn project(self, consensus: &VsrConsensus<B, P>) ->
Message<PrepareOkHeader> {
+ type Consensus = VsrConsensus<B, P>;
+
+ fn project(self, consensus: &Self::Consensus) -> Message<PrepareOkHeader> {
self.transmute_header(|old, new| {
*new = PrepareOkHeader {
command: Command2::PrepareOk,
@@ -1153,10 +1157,12 @@ where
P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
{
type MessageBus = B;
+ #[rustfmt::skip] // Scuffed formatter. TODO: Make the naming less
ambiguous for `Message`.
+ type Message<H> = Message<H> where H: ConsensusHeader;
+ type RequestHeader = RequestHeader;
+ type ReplicateHeader = PrepareHeader;
+ type AckHeader = PrepareOkHeader;
- type RequestMessage = Message<RequestHeader>;
- type ReplicateMessage = Message<PrepareHeader>;
- type AckMessage = Message<PrepareOkHeader>;
type Sequencer = LocalSequencer;
type Pipeline = P;
@@ -1166,7 +1172,7 @@ where
// This avoids serialization/queuing overhead and would also allow
// reordering to WAL-first (on_replicate before pipeline_message)
// without risking lost self-acks from dispatch timing.
- fn pipeline_message(&self, message: Self::ReplicateMessage) {
+ fn pipeline_message(&self, message: Self::Message<Self::ReplicateHeader>) {
assert!(self.is_primary(), "only primary can pipeline messages");
let mut pipeline = self.pipeline.borrow_mut();
@@ -1178,7 +1184,7 @@ where
pipeline.verify();
}
- fn post_replicate_verify(&self, message: &Self::ReplicateMessage) {
+ fn post_replicate_verify(&self, message:
&Self::Message<Self::ReplicateHeader>) {
let header = message.header();
// verify the message belongs to our cluster
diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs
index 17f2fb9ea..767ee9bf8 100644
--- a/core/consensus/src/lib.rs
+++ b/core/consensus/src/lib.rs
@@ -15,10 +15,12 @@
// specific language governing permissions and limitations
// under the License.
+use iggy_common::header::ConsensusHeader;
use message_bus::MessageBus;
pub trait Project<T, C: Consensus> {
- fn project(self, consensus: &C) -> T;
+ type Consensus: Consensus;
+ fn project(self, consensus: &Self::Consensus) -> T;
}
pub trait Pipeline {
@@ -47,20 +49,24 @@ pub trait Pipeline {
fn verify(&self);
}
+// TODO: Create type aliases for the Message types, both here and on the
`Plane` trait.
pub trait Consensus: Sized {
type MessageBus: MessageBus;
- // I am wondering, whether we should create a dedicated trait for cloning,
so it's explicit that we do ref counting.
- type RequestMessage: Project<Self::ReplicateMessage, Self> + Clone;
- type ReplicateMessage: Project<Self::AckMessage, Self> + Clone;
- type AckMessage;
+ #[rustfmt::skip] // Scuffed formatter.
+ type Message<H> where H: ConsensusHeader;
+
+ type RequestHeader: ConsensusHeader;
+ type ReplicateHeader: ConsensusHeader;
+ type AckHeader: ConsensusHeader;
+
type Sequencer: Sequencer;
- type Pipeline: Pipeline<Message = Self::ReplicateMessage>;
+ type Pipeline: Pipeline<Message = Self::Message<Self::ReplicateHeader>>;
- fn pipeline_message(&self, message: Self::ReplicateMessage);
+ fn pipeline_message(&self, message: Self::Message<Self::ReplicateHeader>);
fn verify_pipeline(&self);
// TODO: Figure out how we can achieve that without exposing such methods
in the Consensus trait.
- fn post_replicate_verify(&self, message: &Self::ReplicateMessage);
+ fn post_replicate_verify(&self, message:
&Self::Message<Self::ReplicateHeader>);
fn is_follower(&self) -> bool;
fn is_normal(&self) -> bool;
@@ -77,9 +83,16 @@ pub trait Plane<C>
where
C: Consensus,
{
- fn on_request(&self, message: C::RequestMessage) -> impl Future<Output =
()>;
- fn on_replicate(&self, message: C::ReplicateMessage) -> impl Future<Output
= ()>;
- fn on_ack(&self, message: C::AckMessage) -> impl Future<Output = ()>;
+ fn on_request(&self, message: C::Message<C::RequestHeader>) -> impl
Future<Output = ()>
+ where
+ C::Message<C::RequestHeader>:
+ Project<C::Message<C::ReplicateHeader>, C, Consensus = C> + Clone;
+
+ fn on_replicate(&self, message: C::Message<C::ReplicateHeader>) -> impl
Future<Output = ()>
+ where
+ C::Message<C::ReplicateHeader>: Project<C::Message<C::AckHeader>, C,
Consensus = C> + Clone;
+
+ fn on_ack(&self, message: C::Message<C::AckHeader>) -> impl Future<Output
= ()>;
}
mod impls;
diff --git a/core/consensus/src/plane_helpers.rs
b/core/consensus/src/plane_helpers.rs
index 60ea1dedf..4b3986067 100644
--- a/core/consensus/src/plane_helpers.rs
+++ b/core/consensus/src/plane_helpers.rs
@@ -26,12 +26,12 @@ use std::ops::AsyncFnOnce;
/// Shared pipeline-first request flow used by metadata and partitions.
pub async fn pipeline_prepare_common<C, F>(
consensus: &C,
- prepare: C::ReplicateMessage,
+ prepare: C::Message<C::ReplicateHeader>,
on_replicate: F,
) where
C: Consensus,
- C::ReplicateMessage: Clone,
- F: AsyncFnOnce(C::ReplicateMessage) -> (),
+ C::Message<C::ReplicateHeader>: Clone,
+ F: AsyncFnOnce(C::Message<C::ReplicateHeader>) -> (),
{
assert!(!consensus.is_follower(), "on_request: primary only");
assert!(consensus.is_normal(), "on_request: status must be normal");
diff --git a/core/metadata/src/impls/metadata.rs
b/core/metadata/src/impls/metadata.rs
index 9a81461af..4ff74f4e2 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -23,7 +23,7 @@ use consensus::{
replicate_to_next_in_chain, send_prepare_ok as send_prepare_ok_common,
};
use iggy_common::{
- header::{Command2, GenericHeader, PrepareHeader},
+ header::{Command2, GenericHeader, PrepareHeader, PrepareOkHeader,
RequestHeader},
message::Message,
};
use journal::{Journal, JournalHandle};
@@ -101,14 +101,10 @@ where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
J: JournalHandle,
- J::Target: Journal<
- J::Storage,
- Entry = <VsrConsensus<B, P> as Consensus>::ReplicateMessage,
- Header = PrepareHeader,
- >,
+ J::Target: Journal<J::Storage, Entry = Message<PrepareHeader>, Header =
PrepareHeader>,
M: StateMachine<Input = Message<PrepareHeader>>,
{
- async fn on_request(&self, message: <VsrConsensus<B, P> as
Consensus>::RequestMessage) {
+ async fn on_request(&self, message: <VsrConsensus<B, P> as
Consensus>::Message<RequestHeader>) {
let consensus = self.consensus.as_ref().unwrap();
// TODO: Bunch of asserts.
@@ -117,7 +113,10 @@ where
pipeline_prepare_common(consensus, prepare, |prepare|
self.on_replicate(prepare)).await;
}
- async fn on_replicate(&self, message: <VsrConsensus<B, P> as
Consensus>::ReplicateMessage) {
+ async fn on_replicate(
+ &self,
+ message: <VsrConsensus<B, P> as Consensus>::Message<PrepareHeader>,
+ ) {
let consensus = self.consensus.as_ref().unwrap();
let journal = self.journal.as_ref().unwrap();
@@ -170,7 +169,7 @@ where
}
}
- async fn on_ack(&self, message: <VsrConsensus<B, P> as
Consensus>::AckMessage) {
+ async fn on_ack(&self, message: <VsrConsensus<B, P> as
Consensus>::Message<PrepareOkHeader>) {
let consensus = self.consensus.as_ref().unwrap();
let header = message.header();
@@ -245,11 +244,7 @@ where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
J: JournalHandle,
- J::Target: Journal<
- J::Storage,
- Entry = <VsrConsensus<B, P> as Consensus>::ReplicateMessage,
- Header = PrepareHeader,
- >,
+ J::Target: Journal<J::Storage, Entry = Message<PrepareHeader>, Header =
PrepareHeader>,
M: StateMachine<Input = Message<PrepareHeader>>,
{
/// Replicate a prepare message to the next replica in the chain.
diff --git a/core/partitions/src/iggy_partitions.rs
b/core/partitions/src/iggy_partitions.rs
index 54891dfdd..16ed124ab 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -28,7 +28,7 @@ use consensus::{
use iggy_common::{
INDEX_SIZE, IggyByteSize, IggyIndexesMut, IggyMessagesBatchMut,
PartitionStats, PooledBuffer,
Segment, SegmentStorage,
- header::{GenericHeader, Operation, PrepareHeader},
+ header::{GenericHeader, Operation, PrepareHeader, PrepareOkHeader,
RequestHeader},
message::Message,
sharding::{IggyNamespace, LocalIdx, ShardId},
};
@@ -333,7 +333,7 @@ impl<B> Plane<VsrConsensus<B>> for
IggyPartitions<VsrConsensus<B>>
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
{
- async fn on_request(&self, message: <VsrConsensus<B> as
Consensus>::RequestMessage) {
+ async fn on_request(&self, message: <VsrConsensus<B> as
Consensus>::Message<RequestHeader>) {
let consensus = self.consensus.as_ref().unwrap();
debug!("handling partition request");
@@ -341,7 +341,7 @@ where
pipeline_prepare_common(consensus, prepare, |prepare|
self.on_replicate(prepare)).await;
}
- async fn on_replicate(&self, message: <VsrConsensus<B> as
Consensus>::ReplicateMessage) {
+ async fn on_replicate(&self, message: <VsrConsensus<B> as
Consensus>::Message<PrepareHeader>) {
let consensus = self.consensus.as_ref().unwrap();
let header = message.header();
@@ -383,7 +383,7 @@ where
}
}
- async fn on_ack(&self, message: <VsrConsensus<B> as
Consensus>::AckMessage) {
+ async fn on_ack(&self, message: <VsrConsensus<B> as
Consensus>::Message<PrepareOkHeader>) {
let consensus = self.consensus.as_ref().unwrap();
let header = message.header();