This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 30f329604 feat(cluster): Impl `on_ack`, `send_prepare_ok` and 
`replicate` (#2493)
30f329604 is described below

commit 30f329604567fdf3dff66e1b738798a5687a5f2c
Author: Krishna Vishal <[email protected]>
AuthorDate: Sun Dec 21 16:03:22 2025 +0530

    feat(cluster): Impl `on_ack`, `send_prepare_ok` and `replicate` (#2493)
---
 Cargo.lock                                 |   1 +
 core/common/src/types/consensus/message.rs |   5 +-
 core/consensus/src/impls.rs                | 117 +++++++++++++++++
 core/metadata/Cargo.toml                   |   1 +
 core/metadata/src/impls/metadata.rs        | 204 +++++++++++++++++++++++++++--
 5 files changed, 318 insertions(+), 10 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index c69a46130..adc4d5afe 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5641,6 +5641,7 @@ dependencies = [
  "consensus",
  "iggy_common",
  "journal",
+ "message_bus",
  "tracing",
 ]
 
diff --git a/core/common/src/types/consensus/message.rs 
b/core/common/src/types/consensus/message.rs
index f6c638b0c..977b06e32 100644
--- a/core/common/src/types/consensus/message.rs
+++ b/core/common/src/types/consensus/message.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 use crate::types::consensus::header::{
-    self, CommitHeader, ConsensusHeader, GenericHeader, PrepareHeader, 
ReplyHeader,
+    self, CommitHeader, ConsensusHeader, GenericHeader, PrepareHeader, 
PrepareOkHeader, ReplyHeader,
 };
 use bytes::Bytes;
 use std::marker::PhantomData;
@@ -263,6 +263,7 @@ where
 pub enum MessageBag {
     Generic(Message<GenericHeader>),
     Prepare(Message<PrepareHeader>),
+    PrepareOk(Message<PrepareOkHeader>),
     Commit(Message<CommitHeader>),
     Reply(Message<ReplyHeader>),
 }
@@ -273,6 +274,7 @@ impl MessageBag {
         match self {
             MessageBag::Generic(message) => message.header().command,
             MessageBag::Prepare(message) => message.header().command,
+            MessageBag::PrepareOk(message) => message.header().command,
             MessageBag::Commit(message) => message.header().command,
             MessageBag::Reply(message) => message.header().command,
         }
@@ -283,6 +285,7 @@ impl MessageBag {
         match self {
             MessageBag::Generic(message) => message.header().size(),
             MessageBag::Prepare(message) => message.header().size(),
+            MessageBag::PrepareOk(message) => message.header().size(),
             MessageBag::Commit(message) => message.header().size(),
             MessageBag::Reply(message) => message.header().size(),
         }
diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index c8f6a9503..48248b13d 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -255,6 +255,25 @@ impl Pipeline {
         self.prepare_queue.get(index)
     }
 
+    /// Get mutable reference to a prepare entry by op number.
+    /// Returns None if op is not in the pipeline.
+    pub fn prepare_by_op_mut(&mut self, op: u64) -> Option<&mut PipelineEntry> 
{
+        let head_op = self.prepare_queue.front()?.message.header().op;
+        if op < head_op {
+            return None;
+        }
+        let index = (op - head_op) as usize;
+        if index >= self.prepare_queue.len() {
+            return None;
+        }
+        self.prepare_queue.get_mut(index)
+    }
+
+    /// Get the entry at the head of the prepare queue (oldest uncommitted).
+    pub fn head(&self) -> Option<&PipelineEntry> {
+        self.prepare_queue.front()
+    }
+
     /// Search prepare queue for a message from the given client.
     ///
     /// If there are multiple messages (possible in prepare_queue after view 
change),
@@ -320,6 +339,9 @@ pub struct VsrConsensus {
     last_prepare_checksum: Cell<u128>,
 
     pipeline: RefCell<Pipeline>,
+
+    message_bus: IggyMessageBus,
+    // TODO: Add loopback_queue for messages to self
 }
 
 impl VsrConsensus {
@@ -341,6 +363,7 @@ impl VsrConsensus {
             last_timestamp: Cell::new(0),
             last_prepare_checksum: Cell::new(0),
             pipeline: RefCell::new(Pipeline::new()),
+            message_bus: IggyMessageBus::new(replica_count as usize, replica 
as u16, 0),
         }
     }
 
@@ -388,6 +411,100 @@ impl VsrConsensus {
     pub fn status(&self) -> Status {
         self.status.get()
     }
+
+    pub fn pipeline(&self) -> &RefCell<Pipeline> {
+        &self.pipeline
+    }
+
+    pub fn pipeline_mut(&mut self) -> &mut RefCell<Pipeline> {
+        &mut self.pipeline
+    }
+
+    pub fn cluster(&self) -> u128 {
+        self.cluster
+    }
+
+    pub fn replica_count(&self) -> u8 {
+        self.replica_count
+    }
+
+    /// Handle a prepare_ok message from a follower.
+    /// Called on the primary when a follower acknowledges a prepare.
+    ///
+    /// Returns true if quorum was just reached for this op.
+    pub fn handle_prepare_ok(&self, message: Message<PrepareOkHeader>) -> bool 
{
+        let header = message.header();
+
+        assert_eq!(header.command, Command2::PrepareOk);
+        assert!(
+            header.replica < self.replica_count,
+            "handle_prepare_ok: invalid replica {}",
+            header.replica
+        );
+
+        // Ignore if not in normal status
+        if self.status() != Status::Normal {
+            return false;
+        }
+
+        // Ignore if from older view
+        if header.view < self.view() {
+            return false;
+        }
+
+        // Ignore if from newer view. This shouldn't happen if we're primary
+        if header.view > self.view() {
+            return false;
+        }
+
+        // We must be primary to process prepare_ok
+        if !self.is_primary() {
+            return false;
+        }
+
+        // Ignore if syncing
+        if self.is_syncing() {
+            return false;
+        }
+
+        // Find the prepare in our pipeline
+        let mut pipeline = self.pipeline.borrow_mut();
+
+        let Some(entry) = pipeline.prepare_by_op_mut(header.op) else {
+            // Not in pipeline - could be old/duplicate or already committed
+            return false;
+        };
+
+        // Verify checksum matches
+        if entry.message.header().checksum != header.prepare_checksum {
+            return false;
+        }
+
+        // Verify the prepare is for a valid op range
+        let _commit = self.commit();
+
+        // Check for duplicate ack
+        if entry.has_ack(header.replica) {
+            return false;
+        }
+
+        // Record the ack from this replica
+        let ack_count = entry.add_ack(header.replica);
+        let quorum = self.quorum();
+
+        // Check if we've reached quorum
+        if ack_count >= quorum && !entry.ok_quorum_received {
+            entry.ok_quorum_received = true;
+
+            return true;
+        }
+
+        false
+    }
+
+    pub fn message_bus(&self) -> &IggyMessageBus {
+        &self.message_bus
+    }
 }
 
 impl Project<Message<PrepareHeader>> for Message<RequestHeader> {
diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml
index 1e8f32970..a6ba9a43f 100644
--- a/core/metadata/Cargo.toml
+++ b/core/metadata/Cargo.toml
@@ -31,4 +31,5 @@ readme = "../../../README.md"
 consensus = { path = "../consensus" }
 iggy_common = { path = "../common" }
 journal = { path = "../journal" }
+message_bus = { path = "../message_bus" }
 tracing = { workspace = true }
diff --git a/core/metadata/src/impls/metadata.rs 
b/core/metadata/src/impls/metadata.rs
index 383e45289..cbf50cbe5 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -16,10 +16,11 @@
 // under the License.
 use consensus::{Consensus, Project, Sequencer, Status, VsrConsensus};
 use iggy_common::{
-    header::{Command2, PrepareHeader},
+    header::{Command2, PrepareHeader, PrepareOkHeader},
     message::Message,
 };
 use journal::Journal;
+use message_bus::MessageBus;
 use tracing::{debug, warn};
 
 // TODO: Define a trait (probably in some external crate)
@@ -66,7 +67,7 @@ where
         assert_eq!(header.command, Command2::Prepare);
 
         if !self.fence_old_prepare(&message) {
-            self.replicate(message.clone());
+            self.replicate(message.clone()).await;
         } else {
             warn!("received old prepare, not replicating");
         }
@@ -137,7 +138,10 @@ where
         self.journal.set_header_as_dirty(header);
 
         // Append to journal.
-        self.journal.append(message).await;
+        self.journal.append(message.clone()).await;
+
+        // After successful journal write, send prepare_ok to primary.
+        self.send_prepare_ok(header).await;
 
         // If follower, commit any newly committable entries.
         if self.consensus.is_follower() {
@@ -145,9 +149,49 @@ where
         }
     }
 
-    fn on_ack(&self, _message: <Self::Consensus as Consensus>::AckMessage) {
-        // TODO: Implement on_prepare_ok logic
-        todo!()
+    fn on_ack(&self, message: <Self::Consensus as Consensus>::AckMessage) {
+        let header = message.header();
+
+        if !self.consensus.is_primary() {
+            warn!("on_ack: ignoring (not primary)");
+            return;
+        }
+
+        if self.consensus.status() != Status::Normal {
+            warn!("on_ack: ignoring (not normal)");
+            return;
+        }
+
+        // Find the prepare in pipeline
+        let Some(mut pipeline) = 
self.consensus.pipeline().try_borrow_mut().ok() else {
+            warn!("on_ack: could not borrow pipeline (already mutably 
borrowed)");
+            return;
+        };
+
+        let Some(entry) = pipeline.prepare_by_op_and_checksum(header.op, 
header.prepare_checksum)
+        else {
+            debug!("on_ack: prepare not in pipeline op={}", header.op);
+            return;
+        };
+
+        // Verify checksum matches
+        if entry.message.header().checksum != header.prepare_checksum {
+            warn!("on_ack: checksum mismatch");
+            return;
+        }
+
+        // Record ack
+        let count = entry.add_ack(header.replica);
+
+        // Check quorum
+        if count >= self.consensus.quorum() && !entry.ok_quorum_received {
+            entry.ok_quorum_received = true;
+            debug!("on_ack: quorum received for op={}", header.op);
+
+            // Advance commit number and trigger commit journal
+            self.consensus.advance_commit_number(header.op);
+            self.commit_journal();
+        }
     }
 }
 
@@ -170,9 +214,49 @@ where
         header.op <= self.consensus.commit() || 
self.journal.has_prepare(header)
     }
 
-    fn replicate(&self, _prepare: Message<PrepareHeader>) {
-        // TODO Forward prepare to next replica in chain.
-        todo!()
+    /// Replicate a prepare message to the next replica in the chain.
+    ///
+    /// Chain replication pattern:
+    /// - Primary sends to first backup
+    /// - Each backup forwards to the next
+    /// - Stops when we would forward back to primary
+    async fn replicate(&self, message: Message<PrepareHeader>) {
+        let header = message.header();
+
+        assert_eq!(header.command, Command2::Prepare);
+        assert!(
+            !self.journal.has_prepare(header),
+            "replicate: must not already have prepare"
+        );
+        assert!(header.op > self.consensus.commit());
+
+        let next = (self.consensus.replica() + 1) % 
self.consensus.replica_count();
+
+        let primary = self.consensus.primary_index(header.view);
+        if next == primary {
+            debug!(
+                replica = self.consensus.replica(),
+                op = header.op,
+                "replicate: not replicating (ring complete)"
+            );
+            return;
+        }
+
+        assert_ne!(next, self.consensus.replica());
+
+        debug!(
+            replica = self.consensus.replica(),
+            to = next,
+            op = header.op,
+            "replicate: forwarding"
+        );
+
+        let message = message.into_generic();
+        self.consensus
+            .message_bus()
+            .send_to_replica(next, message)
+            .await
+            .unwrap();
     }
 
     fn on_repair(&self, _message: Message<PrepareHeader>) {
@@ -204,6 +288,108 @@ where
         // Apply each entry to the state machine
         todo!()
     }
+
+    /// Send a prepare_ok message to the primary.
+    /// Called after successfully writing a prepare to the journal.
+    async fn send_prepare_ok(&self, header: &PrepareHeader) {
+        assert_eq!(header.command, Command2::Prepare);
+
+        if self.consensus.status() != Status::Normal {
+            debug!(
+                replica = self.consensus.replica(),
+                status = ?self.consensus.status(),
+                "send_prepare_ok: not sending (not normal)"
+            );
+            return;
+        }
+
+        if self.consensus.is_syncing() {
+            debug!(
+                replica = self.consensus.replica(),
+                "send_prepare_ok: not sending (syncing)"
+            );
+            return;
+        }
+
+        // Verify we have the prepare and it's persisted (not dirty).
+        if !self.journal.has_prepare(header) {
+            debug!(
+                replica = self.consensus.replica(),
+                op = header.op,
+                "send_prepare_ok: not sending (not persisted or missing)"
+            );
+            return;
+        }
+
+        assert!(
+            header.view <= self.consensus.view(),
+            "send_prepare_ok: prepare view {} > our view {}",
+            header.view,
+            self.consensus.view()
+        );
+
+        if header.op > self.consensus.sequencer().current_sequence() {
+            debug!(
+                replica = self.consensus.replica(),
+                op = header.op,
+                our_op = self.consensus.sequencer().current_sequence(),
+                "send_prepare_ok: not sending (op ahead)"
+            );
+            return;
+        }
+
+        debug!(
+            replica = self.consensus.replica(),
+            op = header.op,
+            checksum = header.checksum,
+            "send_prepare_ok: sending"
+        );
+
+        // Use current view, not the prepare's view.
+        let prepare_ok_header = PrepareOkHeader {
+            command: Command2::PrepareOk,
+            cluster: self.consensus.cluster(),
+            replica: self.consensus.replica(),
+            view: self.consensus.view(),
+            epoch: header.epoch,
+            op: header.op,
+            commit: self.consensus.commit(),
+            timestamp: header.timestamp,
+            parent: header.parent,
+            prepare_checksum: header.checksum,
+            request: header.request,
+            operation: header.operation,
+            size: std::mem::size_of::<PrepareOkHeader>() as u32,
+            ..Default::default()
+        };
+
+        let message: Message<PrepareOkHeader> =
+            
Message::<PrepareOkHeader>::new(std::mem::size_of::<PrepareOkHeader>())
+                .replace_header(|_prev: &PrepareOkHeader| prepare_ok_header);
+        let generic_message = message.into_generic();
+        let primary = self.consensus.primary_index(self.consensus.view());
+
+        if primary == self.consensus.replica() {
+            debug!(
+                replica = self.consensus.replica(),
+                "send_prepare_ok: loopback to self"
+            );
+            // TODO: Queue for self-processing or call handle_prepare_ok 
directly
+        } else {
+            debug!(
+                replica = self.consensus.replica(),
+                to = primary,
+                op = header.op,
+                "send_prepare_ok: sending to primary"
+            );
+
+            self.consensus
+                .message_bus()
+                .send_to_replica(primary, generic_message)
+                .await
+                .unwrap();
+        }
+    }
 }
 
 // TODO: Hide with associated types all of those generics, so they are not 
leaking to the upper layer, or maybe even make of the `Metadata` trait itself.

Reply via email to