This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new e6a38e2fe refactor(consensus): decouple pipeline from VsrConsensus
internals (#2740)
e6a38e2fe is described below
commit e6a38e2fe582f7ad9115bf5a67492800f01efd65
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Feb 16 14:29:55 2026 +0100
refactor(consensus): decouple pipeline from VsrConsensus internals (#2740)
VsrConsensus hardcoded LocalPipeline and PipelineEntry stored
full Message<PrepareHeader> (header + body), even though only
the header is needed for quorum tracking. This coupled the
consensus pipeline to a specific implementation and wasted
memory on message bodies that already live in the journal.
Parameterize VsrConsensus<B, P = LocalPipeline> over the
Pipeline trait so implementations can be swapped.
Slim PipelineEntry to store only PrepareHeader; at commit
time on_ack fetches the full message from the journal.
---
core/consensus/src/impls.rs | 89 +++++++++++++++++++++-------------
core/metadata/src/impls/metadata.rs | 57 ++++++++++++++--------
core/partitions/src/iggy_partitions.rs | 5 +-
core/simulator/src/deps.rs | 3 ++
core/simulator/src/replica.rs | 4 +-
5 files changed, 98 insertions(+), 60 deletions(-)
diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index 288c40472..3fd146cd4 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -86,7 +86,7 @@ pub const REPLICAS_MAX: usize = 32;
#[derive(Debug)]
pub struct PipelineEntry {
- pub message: Message<PrepareHeader>,
+ pub header: PrepareHeader,
/// Bitmap of replicas that have acknowledged this prepare.
pub ok_from_replicas: BitSet<u32>,
/// Whether we've received a quorum of prepare_ok messages.
@@ -94,9 +94,9 @@ pub struct PipelineEntry {
}
impl PipelineEntry {
- pub fn new(message: Message<PrepareHeader>) -> Self {
+ pub fn new(header: PrepareHeader) -> Self {
Self {
- message,
+ header,
ok_from_replicas: BitSet::with_capacity(REPLICAS_MAX),
ok_quorum_received: false,
}
@@ -180,11 +180,10 @@ impl LocalPipeline {
pub fn push_message(&mut self, message: Message<PrepareHeader>) {
assert!(!self.prepare_queue_full(), "prepare queue is full");
- let header = message.header();
+ let header = *message.header();
- // Verify hash chain if there's a previous entry
if let Some(tail) = self.prepare_queue.back() {
- let tail_header = tail.message.header();
+ let tail_header = &tail.header;
assert_eq!(
header.op,
tail_header.op + 1,
@@ -199,7 +198,7 @@ impl LocalPipeline {
assert!(header.view >= tail_header.view, "view cannot go
backwards");
}
- self.prepare_queue.push_back(PipelineEntry::new(message));
+ self.prepare_queue.push_back(PipelineEntry::new(header));
}
/// Pop the oldest message (after it's been committed).
@@ -224,8 +223,8 @@ impl LocalPipeline {
/// Find a message by op number and checksum (immutable).
pub fn message_by_op_and_checksum(&self, op: u64, checksum: u128) ->
Option<&PipelineEntry> {
- let head_op = self.prepare_queue.front()?.message.header().op;
- let tail_op = self.prepare_queue.back()?.message.header().op;
+ let head_op = self.prepare_queue.front()?.header.op;
+ let tail_op = self.prepare_queue.back()?.header.op;
// Verify consecutive ops invariant
debug_assert_eq!(
@@ -241,9 +240,9 @@ impl LocalPipeline {
let index = (op - head_op) as usize;
let entry = self.prepare_queue.get(index)?;
- debug_assert_eq!(entry.message.header().op, op);
+ debug_assert_eq!(entry.header.op, op);
- if entry.message.header().checksum == checksum {
+ if entry.header.checksum == checksum {
Some(entry)
} else {
None
@@ -252,7 +251,7 @@ impl LocalPipeline {
/// Find a message by op number only.
pub fn message_by_op(&self, op: u64) -> Option<&PipelineEntry> {
- let head_op = self.prepare_queue.front()?.message.header().op;
+ let head_op = self.prepare_queue.front()?.header.op;
if op < head_op {
return None;
@@ -265,7 +264,7 @@ impl LocalPipeline {
/// Get mutable reference to a message entry by op number.
/// Returns None if op is not in the pipeline.
pub fn message_by_op_mut(&mut self, op: u64) -> Option<&mut PipelineEntry>
{
- let head_op = self.prepare_queue.front()?.message.header().op;
+ let head_op = self.prepare_queue.front()?.header.op;
if op < head_op {
return None;
}
@@ -286,9 +285,7 @@ impl LocalPipeline {
/// If there are multiple messages (possible in prepare_queue after view
change),
/// returns the latest one.
pub fn has_message_from_client(&self, client: u128) -> bool {
- self.prepare_queue
- .iter()
- .any(|p| p.message.header().client == client)
+ self.prepare_queue.iter().any(|p| p.header.client == client)
}
/// Verify pipeline invariants.
@@ -301,11 +298,11 @@ impl LocalPipeline {
// Verify prepare queue hash chain
if let Some(head) = self.prepare_queue.front() {
- let mut expected_op = head.message.header().op;
- let mut expected_parent = head.message.header().parent;
+ let mut expected_op = head.header.op;
+ let mut expected_parent = head.header.parent;
for entry in &self.prepare_queue {
- let header = entry.message.header();
+ let header = &entry.header;
assert_eq!(header.op, expected_op, "ops must be sequential");
assert_eq!(header.parent, expected_parent, "must be
hash-chained");
@@ -324,7 +321,7 @@ impl LocalPipeline {
/// Extract and remove a message by op number.
/// Returns None if op is not in the pipeline.
pub fn extract_by_op(&mut self, op: u64) -> Option<PipelineEntry> {
- let head_op = self.prepare_queue.front()?.message.header().op;
+ let head_op = self.prepare_queue.front()?.header.op;
if op < head_op {
return None;
}
@@ -409,9 +406,10 @@ pub enum VsrAction {
#[allow(unused)]
#[derive(Debug)]
-pub struct VsrConsensus<B = IggyMessageBus>
+pub struct VsrConsensus<B = IggyMessageBus, P = LocalPipeline>
where
B: MessageBus,
+ P: Pipeline,
{
cluster: u128,
replica: u8,
@@ -436,7 +434,7 @@ where
last_timestamp: Cell<u64>,
last_prepare_checksum: Cell<u128>,
- pipeline: RefCell<LocalPipeline>,
+ pipeline: RefCell<P>,
message_bus: B,
// TODO: Add loopback_queue for messages to self
@@ -456,8 +454,8 @@ where
timeouts: RefCell<TimeoutManager>,
}
-impl<B: MessageBus> VsrConsensus<B> {
- pub fn new(cluster: u128, replica: u8, replica_count: u8, message_bus: B)
-> Self {
+impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> {
+ pub fn new(cluster: u128, replica: u8, replica_count: u8, message_bus: B,
pipeline: P) -> Self {
assert!(
replica < replica_count,
"replica index must be < replica_count"
@@ -474,7 +472,7 @@ impl<B: MessageBus> VsrConsensus<B> {
commit: Cell::new(0),
last_timestamp: Cell::new(0),
last_prepare_checksum: Cell::new(0),
- pipeline: RefCell::new(LocalPipeline::new()),
+ pipeline: RefCell::new(pipeline),
message_bus,
start_view_change_from_all_replicas:
RefCell::new(BitSet::with_capacity(REPLICAS_MAX)),
do_view_change_from_all_replicas:
RefCell::new(dvc_quorum_array_empty()),
@@ -546,11 +544,14 @@ impl<B: MessageBus> VsrConsensus<B> {
self.status.get()
}
- pub fn pipeline(&self) -> &RefCell<LocalPipeline> {
+ // TODO(hubcio): returning &RefCell<P> leaks interior mutability - callers
+ // could hold a Ref/RefMut across an .await and cause a runtime panic.
+ // We had this problem with slab + ECS.
+ pub fn pipeline(&self) -> &RefCell<P> {
&self.pipeline
}
- pub fn pipeline_mut(&mut self) -> &mut RefCell<LocalPipeline> {
+ pub fn pipeline_mut(&mut self) -> &mut RefCell<P> {
&mut self.pipeline
}
@@ -1060,7 +1061,7 @@ impl<B: MessageBus> VsrConsensus<B> {
};
// Verify checksum matches
- if entry.message.header().checksum != header.prepare_checksum {
+ if entry.header.checksum != header.prepare_checksum {
return false;
}
@@ -1087,8 +1088,12 @@ impl<B: MessageBus> VsrConsensus<B> {
}
}
-impl<B: MessageBus> Project<Message<PrepareHeader>, VsrConsensus<B>> for
Message<RequestHeader> {
- fn project(self, consensus: &VsrConsensus<B>) -> Message<PrepareHeader> {
+impl<B, P> Project<Message<PrepareHeader>, VsrConsensus<B, P>> for
Message<RequestHeader>
+where
+ B: MessageBus,
+ P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
+{
+ fn project(self, consensus: &VsrConsensus<B, P>) -> Message<PrepareHeader>
{
let op = consensus.sequencer.current_sequence() + 1;
self.transmute_header(|old, new| {
@@ -1114,8 +1119,12 @@ impl<B: MessageBus> Project<Message<PrepareHeader>,
VsrConsensus<B>> for Message
}
}
-impl<B: MessageBus> Project<Message<PrepareOkHeader>, VsrConsensus<B>> for
Message<PrepareHeader> {
- fn project(self, consensus: &VsrConsensus<B>) -> Message<PrepareOkHeader> {
+impl<B, P> Project<Message<PrepareOkHeader>, VsrConsensus<B, P>> for
Message<PrepareHeader>
+where
+ B: MessageBus,
+ P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
+{
+ fn project(self, consensus: &VsrConsensus<B, P>) ->
Message<PrepareOkHeader> {
self.transmute_header(|old, new| {
*new = PrepareOkHeader {
command: Command2::PrepareOk,
@@ -1138,15 +1147,25 @@ impl<B: MessageBus> Project<Message<PrepareOkHeader>,
VsrConsensus<B>> for Messa
}
}
-impl<B: MessageBus> Consensus for VsrConsensus<B> {
+impl<B, P> Consensus for VsrConsensus<B, P>
+where
+ B: MessageBus,
+ P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
+{
type MessageBus = B;
type RequestMessage = Message<RequestHeader>;
type ReplicateMessage = Message<PrepareHeader>;
type AckMessage = Message<PrepareOkHeader>;
type Sequencer = LocalSequencer;
- type Pipeline = LocalPipeline;
-
+ type Pipeline = P;
+
+ // TODO(hubcio): maybe we could record the primary's own ack here
+ // (entry.add_ack(self.replica)) instead of round-tripping through
+ // the message bus via send_prepare_ok.
+ // This avoids serialization/queuing overhead and would also allow
+ // reordering to WAL-first (on_replicate before pipeline_message)
+ // without risking lost self-acks from dispatch timing.
fn pipeline_message(&self, message: Self::ReplicateMessage) {
assert!(self.is_primary(), "only primary can pipeline messages");
diff --git a/core/metadata/src/impls/metadata.rs
b/core/metadata/src/impls/metadata.rs
index f3176f71f..77a174e39 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -16,7 +16,7 @@
// under the License.
use crate::stm::StateMachine;
use crate::stm::snapshot::{FillSnapshot, MetadataSnapshot, Snapshot,
SnapshotError};
-use consensus::{Consensus, Project, Sequencer, Status, VsrConsensus};
+use consensus::{Consensus, Pipeline, PipelineEntry, Project, Sequencer,
Status, VsrConsensus};
use iggy_common::{
header::{Command2, GenericHeader, PrepareHeader, PrepareOkHeader,
ReplyHeader},
message::Message,
@@ -105,18 +105,19 @@ pub struct IggyMetadata<C, J, S, M> {
pub mux_stm: M,
}
-impl<B, J, S, M> Metadata<VsrConsensus<B>> for IggyMetadata<VsrConsensus<B>,
J, S, M>
+impl<B, P, J, S, M> Metadata<VsrConsensus<B, P>> for
IggyMetadata<VsrConsensus<B, P>, J, S, M>
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+ P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
J: JournalHandle,
J::Target: Journal<
J::Storage,
- Entry = <VsrConsensus<B> as Consensus>::ReplicateMessage,
+ Entry = <VsrConsensus<B, P> as Consensus>::ReplicateMessage,
Header = PrepareHeader,
>,
M: StateMachine<Input = Message<PrepareHeader>>,
{
- async fn on_request(&self, message: <VsrConsensus<B> as
Consensus>::RequestMessage) {
+ async fn on_request(&self, message: <VsrConsensus<B, P> as
Consensus>::RequestMessage) {
let consensus = self.consensus.as_ref().unwrap();
// TODO: Bunch of asserts.
@@ -125,7 +126,7 @@ where
self.pipeline_prepare(prepare).await;
}
- async fn on_replicate(&self, message: <VsrConsensus<B> as
Consensus>::ReplicateMessage) {
+ async fn on_replicate(&self, message: <VsrConsensus<B, P> as
Consensus>::ReplicateMessage) {
let consensus = self.consensus.as_ref().unwrap();
let journal = self.journal.as_ref().unwrap();
@@ -200,7 +201,7 @@ where
}
}
- async fn on_ack(&self, message: <VsrConsensus<B> as
Consensus>::AckMessage) {
+ async fn on_ack(&self, message: <VsrConsensus<B, P> as
Consensus>::AckMessage) {
let consensus = self.consensus.as_ref().unwrap();
let header = message.header();
@@ -214,28 +215,25 @@ where
return;
}
- // Verify checksum by checking pipeline entry exists
{
let pipeline = consensus.pipeline().borrow();
- let Some(entry) =
- pipeline.message_by_op_and_checksum(header.op,
header.prepare_checksum)
- else {
+ if pipeline
+ .message_by_op_and_checksum(header.op, header.prepare_checksum)
+ .is_none()
+ {
debug!("on_ack: prepare not in pipeline op={}", header.op);
return;
- };
-
- if entry.message.header().checksum != header.prepare_checksum {
- warn!("on_ack: checksum mismatch");
- return;
}
}
// Let consensus handle the ack increment and quorum check
if consensus.handle_prepare_ok(header) {
+ let journal = self.journal.as_ref().unwrap();
+
debug!("on_ack: quorum received for op={}", header.op);
consensus.advance_commit_number(header.op);
- // Extract the prepare message from the pipeline by op
+ // Extract the header from the pipeline, fetch the full message
from journal
// TODO: Commit from the head. ALWAYS
let entry =
consensus.pipeline().borrow_mut().extract_by_op(header.op);
let Some(entry) = entry else {
@@ -243,8 +241,20 @@ where
return;
};
- let prepare = entry.message;
- let prepare_header = *prepare.header();
+ let prepare_header = entry.header;
+ // TODO(hubcio): should we replace this with graceful fallback
(warn + return)?
+ // When journal compaction is implemented compaction could race
+ // with this lookup if it removes entries below the commit number.
+ let prepare = journal
+ .handle()
+ .entry(&prepare_header)
+ .await
+ .unwrap_or_else(|| {
+ panic!(
+ "on_ack: committed prepare op={} checksum={} must be
in journal",
+ prepare_header.op, prepare_header.checksum
+ )
+ });
// Apply the state (consumes prepare)
// TODO: Handle appending result to response
@@ -292,13 +302,14 @@ where
}
}
-impl<B, J, S, M> IggyMetadata<VsrConsensus<B>, J, S, M>
+impl<B, P, J, S, M> IggyMetadata<VsrConsensus<B, P>, J, S, M>
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+ P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
J: JournalHandle,
J::Target: Journal<
J::Storage,
- Entry = <VsrConsensus<B> as Consensus>::ReplicateMessage,
+ Entry = <VsrConsensus<B, P> as Consensus>::ReplicateMessage,
Header = PrepareHeader,
>,
M: StateMachine<Input = Message<PrepareHeader>>,
@@ -308,9 +319,13 @@ where
debug!("inserting prepare into metadata pipeline");
consensus.verify_pipeline();
+ // Pipeline-first ordering is safe only because message
+ // processing is cooperative (single-task, RefCell-based).
+ // If on_replicate ever early-returns (syncing, status change)
+ // the entry would be in the pipeline without journal backing.
consensus.pipeline_message(prepare.clone());
-
self.on_replicate(prepare.clone()).await;
+
consensus.post_replicate_verify(&prepare);
}
diff --git a/core/partitions/src/iggy_partitions.rs
b/core/partitions/src/iggy_partitions.rs
index 1fa1d7d9e..b3cdf414c 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -457,7 +457,7 @@ where
return;
};
- if entry.message.header().checksum != header.prepare_checksum {
+ if entry.header.checksum != header.prepare_checksum {
warn!("on_ack: checksum mismatch");
return;
}
@@ -476,8 +476,7 @@ where
return;
};
- let prepare = entry.message;
- let prepare_header = *prepare.header();
+ let prepare_header = entry.header;
// Data was already appended to the partition journal during
// on_replicate. Now that quorum is reached, update the partition's
diff --git a/core/simulator/src/deps.rs b/core/simulator/src/deps.rs
index 6a798b4aa..25175ea8e 100644
--- a/core/simulator/src/deps.rs
+++ b/core/simulator/src/deps.rs
@@ -91,6 +91,9 @@ impl<S: Storage<Buffer = Vec<u8>>> Journal<S> for
SimJournal<S> {
where
Self: 'a;
+ // TODO(hubcio): validate that the caller's checksum matches the stored
+ // header - currently this looks up by op only, ignoring the checksum.
+ // A real journal implementation must reject mismatches.
async fn entry(&self, header: &Self::Header) -> Option<Self::Entry> {
let headers = unsafe { &*self.headers.get() };
let offsets = unsafe { &*self.offsets.get() };
diff --git a/core/simulator/src/replica.rs b/core/simulator/src/replica.rs
index a0d0fb54c..ae4631e95 100644
--- a/core/simulator/src/replica.rs
+++ b/core/simulator/src/replica.rs
@@ -19,7 +19,7 @@ use crate::bus::{MemBus, SharedMemBus};
use crate::deps::{
MemStorage, ReplicaPartitions, SimJournal, SimMetadata,
SimMuxStateMachine, SimSnapshot,
};
-use consensus::VsrConsensus;
+use consensus::{LocalPipeline, VsrConsensus};
use iggy_common::IggyByteSize;
use iggy_common::sharding::ShardId;
use metadata::stm::consumer_group::{ConsumerGroups, ConsumerGroupsInner};
@@ -50,6 +50,7 @@ impl Replica {
id,
replica_count,
SharedMemBus(Arc::clone(&bus)),
+ LocalPipeline::new(),
);
metadata_consensus.init();
@@ -59,6 +60,7 @@ impl Replica {
id,
replica_count,
SharedMemBus(Arc::clone(&bus)),
+ LocalPipeline::new(),
);
partitions_consensus.init();