This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 30f329604 feat(cluster): Impl `on_ack`, `send_prepare_ok` and
`replicate` (#2493)
30f329604 is described below
commit 30f329604567fdf3dff66e1b738798a5687a5f2c
Author: Krishna Vishal <[email protected]>
AuthorDate: Sun Dec 21 16:03:22 2025 +0530
feat(cluster): Impl `on_ack`, `send_prepare_ok` and `replicate` (#2493)
---
Cargo.lock | 1 +
core/common/src/types/consensus/message.rs | 5 +-
core/consensus/src/impls.rs | 117 +++++++++++++++++
core/metadata/Cargo.toml | 1 +
core/metadata/src/impls/metadata.rs | 204 +++++++++++++++++++++++++++--
5 files changed, 318 insertions(+), 10 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index c69a46130..adc4d5afe 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5641,6 +5641,7 @@ dependencies = [
"consensus",
"iggy_common",
"journal",
+ "message_bus",
"tracing",
]
diff --git a/core/common/src/types/consensus/message.rs
b/core/common/src/types/consensus/message.rs
index f6c638b0c..977b06e32 100644
--- a/core/common/src/types/consensus/message.rs
+++ b/core/common/src/types/consensus/message.rs
@@ -16,7 +16,7 @@
// under the License.
use crate::types::consensus::header::{
- self, CommitHeader, ConsensusHeader, GenericHeader, PrepareHeader,
ReplyHeader,
+ self, CommitHeader, ConsensusHeader, GenericHeader, PrepareHeader,
PrepareOkHeader, ReplyHeader,
};
use bytes::Bytes;
use std::marker::PhantomData;
@@ -263,6 +263,7 @@ where
pub enum MessageBag {
Generic(Message<GenericHeader>),
Prepare(Message<PrepareHeader>),
+ PrepareOk(Message<PrepareOkHeader>),
Commit(Message<CommitHeader>),
Reply(Message<ReplyHeader>),
}
@@ -273,6 +274,7 @@ impl MessageBag {
match self {
MessageBag::Generic(message) => message.header().command,
MessageBag::Prepare(message) => message.header().command,
+ MessageBag::PrepareOk(message) => message.header().command,
MessageBag::Commit(message) => message.header().command,
MessageBag::Reply(message) => message.header().command,
}
@@ -283,6 +285,7 @@ impl MessageBag {
match self {
MessageBag::Generic(message) => message.header().size(),
MessageBag::Prepare(message) => message.header().size(),
+ MessageBag::PrepareOk(message) => message.header().size(),
MessageBag::Commit(message) => message.header().size(),
MessageBag::Reply(message) => message.header().size(),
}
diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index c8f6a9503..48248b13d 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -255,6 +255,25 @@ impl Pipeline {
self.prepare_queue.get(index)
}
+ /// Get mutable reference to a prepare entry by op number.
+ /// Returns None if op is not in the pipeline.
+ pub fn prepare_by_op_mut(&mut self, op: u64) -> Option<&mut PipelineEntry>
{
+ let head_op = self.prepare_queue.front()?.message.header().op;
+ if op < head_op {
+ return None;
+ }
+ let index = (op - head_op) as usize;
+ if index >= self.prepare_queue.len() {
+ return None;
+ }
+ self.prepare_queue.get_mut(index)
+ }
+
+ /// Get the entry at the head of the prepare queue (oldest uncommitted).
+ pub fn head(&self) -> Option<&PipelineEntry> {
+ self.prepare_queue.front()
+ }
+
/// Search prepare queue for a message from the given client.
///
/// If there are multiple messages (possible in prepare_queue after view
change),
@@ -320,6 +339,9 @@ pub struct VsrConsensus {
last_prepare_checksum: Cell<u128>,
pipeline: RefCell<Pipeline>,
+
+ message_bus: IggyMessageBus,
+ // TODO: Add loopback_queue for messages to self
}
impl VsrConsensus {
@@ -341,6 +363,7 @@ impl VsrConsensus {
last_timestamp: Cell::new(0),
last_prepare_checksum: Cell::new(0),
pipeline: RefCell::new(Pipeline::new()),
+ message_bus: IggyMessageBus::new(replica_count as usize, replica
as u16, 0),
}
}
@@ -388,6 +411,100 @@ impl VsrConsensus {
pub fn status(&self) -> Status {
self.status.get()
}
+
+ pub fn pipeline(&self) -> &RefCell<Pipeline> {
+ &self.pipeline
+ }
+
+ pub fn pipeline_mut(&mut self) -> &mut RefCell<Pipeline> {
+ &mut self.pipeline
+ }
+
+ pub fn cluster(&self) -> u128 {
+ self.cluster
+ }
+
+ pub fn replica_count(&self) -> u8 {
+ self.replica_count
+ }
+
+ /// Handle a prepare_ok message from a follower.
+ /// Called on the primary when a follower acknowledges a prepare.
+ ///
+ /// Returns true if quorum was just reached for this op.
+ pub fn handle_prepare_ok(&self, message: Message<PrepareOkHeader>) -> bool
{
+ let header = message.header();
+
+ assert_eq!(header.command, Command2::PrepareOk);
+ assert!(
+ header.replica < self.replica_count,
+ "handle_prepare_ok: invalid replica {}",
+ header.replica
+ );
+
+ // Ignore if not in normal status
+ if self.status() != Status::Normal {
+ return false;
+ }
+
+ // Ignore if from older view
+ if header.view < self.view() {
+ return false;
+ }
+
+ // Ignore if from newer view. This shouldn't happen if we're primary
+ if header.view > self.view() {
+ return false;
+ }
+
+ // We must be primary to process prepare_ok
+ if !self.is_primary() {
+ return false;
+ }
+
+ // Ignore if syncing
+ if self.is_syncing() {
+ return false;
+ }
+
+ // Find the prepare in our pipeline
+ let mut pipeline = self.pipeline.borrow_mut();
+
+ let Some(entry) = pipeline.prepare_by_op_mut(header.op) else {
+ // Not in pipeline - could be old/duplicate or already committed
+ return false;
+ };
+
+ // Verify checksum matches
+ if entry.message.header().checksum != header.prepare_checksum {
+ return false;
+ }
+
+ // Verify the prepare is for a valid op range
+ let _commit = self.commit();
+
+ // Check for duplicate ack
+ if entry.has_ack(header.replica) {
+ return false;
+ }
+
+ // Record the ack from this replica
+ let ack_count = entry.add_ack(header.replica);
+ let quorum = self.quorum();
+
+ // Check if we've reached quorum
+ if ack_count >= quorum && !entry.ok_quorum_received {
+ entry.ok_quorum_received = true;
+
+ return true;
+ }
+
+ false
+ }
+
+ pub fn message_bus(&self) -> &IggyMessageBus {
+ &self.message_bus
+ }
}
impl Project<Message<PrepareHeader>> for Message<RequestHeader> {
diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml
index 1e8f32970..a6ba9a43f 100644
--- a/core/metadata/Cargo.toml
+++ b/core/metadata/Cargo.toml
@@ -31,4 +31,5 @@ readme = "../../../README.md"
consensus = { path = "../consensus" }
iggy_common = { path = "../common" }
journal = { path = "../journal" }
+message_bus = { path = "../message_bus" }
tracing = { workspace = true }
diff --git a/core/metadata/src/impls/metadata.rs
b/core/metadata/src/impls/metadata.rs
index 383e45289..cbf50cbe5 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -16,10 +16,11 @@
// under the License.
use consensus::{Consensus, Project, Sequencer, Status, VsrConsensus};
use iggy_common::{
- header::{Command2, PrepareHeader},
+ header::{Command2, PrepareHeader, PrepareOkHeader},
message::Message,
};
use journal::Journal;
+use message_bus::MessageBus;
use tracing::{debug, warn};
// TODO: Define a trait (probably in some external crate)
@@ -66,7 +67,7 @@ where
assert_eq!(header.command, Command2::Prepare);
if !self.fence_old_prepare(&message) {
- self.replicate(message.clone());
+ self.replicate(message.clone()).await;
} else {
warn!("received old prepare, not replicating");
}
@@ -137,7 +138,10 @@ where
self.journal.set_header_as_dirty(header);
// Append to journal.
- self.journal.append(message).await;
+ self.journal.append(message.clone()).await;
+
+ // After successful journal write, send prepare_ok to primary.
+ self.send_prepare_ok(header).await;
// If follower, commit any newly committable entries.
if self.consensus.is_follower() {
@@ -145,9 +149,49 @@ where
}
}
- fn on_ack(&self, _message: <Self::Consensus as Consensus>::AckMessage) {
- // TODO: Implement on_prepare_ok logic
- todo!()
+ fn on_ack(&self, message: <Self::Consensus as Consensus>::AckMessage) {
+ let header = message.header();
+
+ if !self.consensus.is_primary() {
+ warn!("on_ack: ignoring (not primary)");
+ return;
+ }
+
+ if self.consensus.status() != Status::Normal {
+ warn!("on_ack: ignoring (not normal)");
+ return;
+ }
+
+ // Find the prepare in pipeline
+ let Some(mut pipeline) =
self.consensus.pipeline().try_borrow_mut().ok() else {
+ warn!("on_ack: could not borrow pipeline (already mutably
borrowed)");
+ return;
+ };
+
+ let Some(entry) = pipeline.prepare_by_op_and_checksum(header.op,
header.prepare_checksum)
+ else {
+ debug!("on_ack: prepare not in pipeline op={}", header.op);
+ return;
+ };
+
+ // Verify checksum matches
+ if entry.message.header().checksum != header.prepare_checksum {
+ warn!("on_ack: checksum mismatch");
+ return;
+ }
+
+ // Record ack
+ let count = entry.add_ack(header.replica);
+
+ // Check quorum
+ if count >= self.consensus.quorum() && !entry.ok_quorum_received {
+ entry.ok_quorum_received = true;
+ debug!("on_ack: quorum received for op={}", header.op);
+
+ // Advance commit number and trigger commit journal
+ self.consensus.advance_commit_number(header.op);
+ self.commit_journal();
+ }
}
}
@@ -170,9 +214,49 @@ where
header.op <= self.consensus.commit() ||
self.journal.has_prepare(header)
}
- fn replicate(&self, _prepare: Message<PrepareHeader>) {
- // TODO Forward prepare to next replica in chain.
- todo!()
+ /// Replicate a prepare message to the next replica in the chain.
+ ///
+ /// Chain replication pattern:
+ /// - Primary sends to first backup
+ /// - Each backup forwards to the next
+ /// - Stops when we would forward back to primary
+ async fn replicate(&self, message: Message<PrepareHeader>) {
+ let header = message.header();
+
+ assert_eq!(header.command, Command2::Prepare);
+ assert!(
+ !self.journal.has_prepare(header),
+ "replicate: must not already have prepare"
+ );
+ assert!(header.op > self.consensus.commit());
+
+ let next = (self.consensus.replica() + 1) %
self.consensus.replica_count();
+
+ let primary = self.consensus.primary_index(header.view);
+ if next == primary {
+ debug!(
+ replica = self.consensus.replica(),
+ op = header.op,
+ "replicate: not replicating (ring complete)"
+ );
+ return;
+ }
+
+ assert_ne!(next, self.consensus.replica());
+
+ debug!(
+ replica = self.consensus.replica(),
+ to = next,
+ op = header.op,
+ "replicate: forwarding"
+ );
+
+ let message = message.into_generic();
+ self.consensus
+ .message_bus()
+ .send_to_replica(next, message)
+ .await
+ .unwrap();
}
fn on_repair(&self, _message: Message<PrepareHeader>) {
@@ -204,6 +288,108 @@ where
// Apply each entry to the state machine
todo!()
}
+
+ /// Send a prepare_ok message to the primary.
+ /// Called after successfully writing a prepare to the journal.
+ async fn send_prepare_ok(&self, header: &PrepareHeader) {
+ assert_eq!(header.command, Command2::Prepare);
+
+ if self.consensus.status() != Status::Normal {
+ debug!(
+ replica = self.consensus.replica(),
+ status = ?self.consensus.status(),
+ "send_prepare_ok: not sending (not normal)"
+ );
+ return;
+ }
+
+ if self.consensus.is_syncing() {
+ debug!(
+ replica = self.consensus.replica(),
+ "send_prepare_ok: not sending (syncing)"
+ );
+ return;
+ }
+
+ // Verify we have the prepare and it's persisted (not dirty).
+ if !self.journal.has_prepare(header) {
+ debug!(
+ replica = self.consensus.replica(),
+ op = header.op,
+ "send_prepare_ok: not sending (not persisted or missing)"
+ );
+ return;
+ }
+
+ assert!(
+ header.view <= self.consensus.view(),
+ "send_prepare_ok: prepare view {} > our view {}",
+ header.view,
+ self.consensus.view()
+ );
+
+ if header.op > self.consensus.sequencer().current_sequence() {
+ debug!(
+ replica = self.consensus.replica(),
+ op = header.op,
+ our_op = self.consensus.sequencer().current_sequence(),
+ "send_prepare_ok: not sending (op ahead)"
+ );
+ return;
+ }
+
+ debug!(
+ replica = self.consensus.replica(),
+ op = header.op,
+ checksum = header.checksum,
+ "send_prepare_ok: sending"
+ );
+
+ // Use current view, not the prepare's view.
+ let prepare_ok_header = PrepareOkHeader {
+ command: Command2::PrepareOk,
+ cluster: self.consensus.cluster(),
+ replica: self.consensus.replica(),
+ view: self.consensus.view(),
+ epoch: header.epoch,
+ op: header.op,
+ commit: self.consensus.commit(),
+ timestamp: header.timestamp,
+ parent: header.parent,
+ prepare_checksum: header.checksum,
+ request: header.request,
+ operation: header.operation,
+ size: std::mem::size_of::<PrepareOkHeader>() as u32,
+ ..Default::default()
+ };
+
+ let message: Message<PrepareOkHeader> =
+
Message::<PrepareOkHeader>::new(std::mem::size_of::<PrepareOkHeader>())
+ .replace_header(|_prev: &PrepareOkHeader| prepare_ok_header);
+ let generic_message = message.into_generic();
+ let primary = self.consensus.primary_index(self.consensus.view());
+
+ if primary == self.consensus.replica() {
+ debug!(
+ replica = self.consensus.replica(),
+ "send_prepare_ok: loopback to self"
+ );
+ // TODO: Queue for self-processing or call handle_prepare_ok
directly
+ } else {
+ debug!(
+ replica = self.consensus.replica(),
+ to = primary,
+ op = header.op,
+ "send_prepare_ok: sending to primary"
+ );
+
+ self.consensus
+ .message_bus()
+ .send_to_replica(primary, generic_message)
+ .await
+ .unwrap();
+ }
+ }
}
// TODO: Hide with associated types all of those generics, so they are not
leaking to the upper layer, or maybe even make of the `Metadata` trait itself.