This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 54a67f837 feat(cluster): Add `on_replicate` and related machinery 
(#2469)
54a67f837 is described below

commit 54a67f8372e5eb8d30b2443738536a6634d238de
Author: Krishna Vishal <[email protected]>
AuthorDate: Thu Dec 11 18:15:01 2025 +0530

    feat(cluster): Add `on_replicate` and related machinery (#2469)
    
    I've added initial implementation of `on_replicate` and related
    machinery in `Journal` etc.
    
    `on_replicate` is same as `on_prepare` in VSR.
    
    Didn't implement `send_ack` (`send_prepare_ok`) because it needed
    `MessageBus`. I wasn't sure about the design (where to add it).
    
    Co-authored-by: Grzegorz Koszyk 
<[email protected]>
---
 core/consensus/src/impls.rs         |  30 +++++++++
 core/consensus/src/lib.rs           |   1 +
 core/journal/src/lib.rs             |   8 +++
 core/metadata/src/impls/metadata.rs | 124 +++++++++++++++++++++++++++++++++---
 4 files changed, 155 insertions(+), 8 deletions(-)

diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index d830c5a93..c8f6a9503 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -296,6 +296,7 @@ impl Pipeline {
     }
 }
 
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 pub enum Status {
     Normal,
     ViewChange,
@@ -362,6 +363,31 @@ impl VsrConsensus {
     pub fn quorum(&self) -> usize {
         (self.replica_count as usize / 2) + 1
     }
+
+    pub fn commit(&self) -> u64 {
+        self.commit.get()
+    }
+
+    pub fn is_syncing(&self) -> bool {
+        // for now return false. we have to add syncing related setup to 
VsrConsensus to make this work.
+        false
+    }
+
+    pub fn replica(&self) -> u8 {
+        self.replica
+    }
+
+    pub fn sequencer(&self) -> &LocalSequencer {
+        &self.sequencer
+    }
+
+    pub fn view(&self) -> u32 {
+        self.view.get()
+    }
+
+    pub fn status(&self) -> Status {
+        self.status.get()
+    }
 }
 
 impl Project<Message<PrepareHeader>> for Message<RequestHeader> {
@@ -471,4 +497,8 @@ impl Consensus for VsrConsensus {
     fn is_follower(&self) -> bool {
         !self.is_primary()
     }
+
+    fn is_syncing(&self) -> bool {
+        self.is_syncing()
+    }
 }
diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs
index 1db81e9eb..8ff864ffd 100644
--- a/core/consensus/src/lib.rs
+++ b/core/consensus/src/lib.rs
@@ -37,6 +37,7 @@ pub trait Consensus {
     fn post_replicate_verify(&self, message: &Self::ReplicateMessage);
 
     fn is_follower(&self) -> bool;
+    fn is_syncing(&self) -> bool;
 }
 
 mod impls;
diff --git a/core/journal/src/lib.rs b/core/journal/src/lib.rs
index 460173ab7..2aa693c68 100644
--- a/core/journal/src/lib.rs
+++ b/core/journal/src/lib.rs
@@ -19,5 +19,13 @@
 // But the interface was designed for partition log, not an generic journal.
 pub trait Journal {
     type Entry;
+    type Header;
+
+    fn has_prepare(&self, header: &Self::Header) -> bool;
+
+    fn previous_entry(&self, header: &Self::Header) -> Option<Self::Header>;
+
+    fn set_header_as_dirty(&self, header: &Self::Header);
+
     fn append(&self, entry: Self::Entry) -> impl Future<Output = ()>;
 }
diff --git a/core/metadata/src/impls/metadata.rs 
b/core/metadata/src/impls/metadata.rs
index 718844e54..383e45289 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -14,8 +14,11 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-use consensus::{Consensus, Project, VsrConsensus};
-use iggy_common::{header::PrepareHeader, message::Message};
+use consensus::{Consensus, Project, Sequencer, Status, VsrConsensus};
+use iggy_common::{
+    header::{Command2, PrepareHeader},
+    message::Message,
+};
 use journal::Journal;
 use tracing::{debug, warn};
 
@@ -25,7 +28,10 @@ trait Metadata {
     type Consensus: Consensus;
     type Journal: Journal<Entry = <Self::Consensus as 
Consensus>::ReplicateMessage>;
 
+    /// Handle a replicate message (Prepare in VSR).
     fn on_request(&self, message: <Self::Consensus as 
Consensus>::RequestMessage);
+
+    /// Handle an ack message (PrepareOk in VSR).
     fn on_replicate(
         &self,
         message: <Self::Consensus as Consensus>::ReplicateMessage,
@@ -43,7 +49,7 @@ struct IggyMetadata<M, J, S> {
 
 impl<M, J, S> Metadata for IggyMetadata<M, J, S>
 where
-    J: Journal<Entry = <VsrConsensus as Consensus>::ReplicateMessage>,
+    J: Journal<Entry = <VsrConsensus as Consensus>::ReplicateMessage, Header = 
PrepareHeader>,
 {
     type Consensus = VsrConsensus;
     type Journal = J;
@@ -55,28 +61,99 @@ where
     }
 
     async fn on_replicate(&self, message: <Self::Consensus as 
Consensus>::ReplicateMessage) {
+        let header = message.header();
+
+        assert_eq!(header.command, Command2::Prepare);
+
         if !self.fence_old_prepare(&message) {
             self.replicate(message.clone());
         } else {
             warn!("received old prepare, not replicating");
         }
 
+        // If syncing, ignore the replicate message.
+        if self.consensus.is_syncing() {
+            warn!(
+                replica = self.consensus.replica(),
+                "on_replicate: ignoring (sync)"
+            );
+            return;
+        }
+
+        let current_op = self.consensus.sequencer().current_sequence();
+
+        // Old message (handle as repair). Not replicating.
+        if header.view < self.consensus.view()
+            || (self.consensus.status() == Status::Normal
+                && header.view == self.consensus.view()
+                && header.op <= current_op)
+        {
+            debug!(
+                replica = self.consensus.replica(),
+                "on_replicate: ignoring (repair)"
+            );
+            self.on_repair(message);
+            return;
+        }
+
+        // If status is not normal, ignore the replicate.
+        if self.consensus.status() != Status::Normal {
+            warn!(
+                replica = self.consensus.replica(),
+                "on_replicate: ignoring (not normal state)"
+            );
+            return;
+        }
+
+        //if message from future view, we ignore the replicate.
+        if header.view > self.consensus.view() {
+            warn!(
+                replica = self.consensus.replica(),
+                "on_replicate: ignoring (newer view)"
+            );
+            return;
+        }
+
+        // TODO add assertions for valid state here.
+
+        // If we are a follower, we advance the commit number.
         if self.consensus.is_follower() {
             self.consensus
                 .advance_commit_number(message.header().commit);
         }
-        //self.consensus.update_op(header.op());
+
+        // TODO verify that the current prepare fits in the WAL.
+
+        // TODO handle gap in ops.
+
+        // Verify hash chain integrity.
+        if let Some(previous) = self.journal.previous_entry(header) {
+            self.panic_if_hash_chain_would_break_in_same_view(&previous, 
header);
+        }
+
+        assert_eq!(header.op, current_op + 1);
+
+        self.consensus.sequencer().set_sequence(header.op);
+        self.journal.set_header_as_dirty(header);
+
+        // Append to journal.
         self.journal.append(message).await;
+
+        // If follower, commit any newly committable entries.
+        if self.consensus.is_follower() {
+            self.commit_journal();
+        }
     }
 
     fn on_ack(&self, _message: <Self::Consensus as Consensus>::AckMessage) {
+        // TODO: Implement on_prepare_ok logic
         todo!()
     }
 }
 
 impl<M, J, S> IggyMetadata<M, J, S>
 where
-    J: Journal<Entry = <VsrConsensus as Consensus>::ReplicateMessage>,
+    J: Journal<Entry = <VsrConsensus as Consensus>::ReplicateMessage, Header = 
PrepareHeader>,
 {
     #[expect(unused)]
     fn pipeline_prepare(&self, prepare: Message<PrepareHeader>) {
@@ -88,12 +165,43 @@ where
         self.consensus.post_replicate_verify(&prepare);
     }
 
-    fn fence_old_prepare(&self, _prepare: &Message<PrepareHeader>) -> bool {
-        // TODO
-        false
+    fn fence_old_prepare(&self, prepare: &Message<PrepareHeader>) -> bool {
+        let header = prepare.header();
+        header.op <= self.consensus.commit() || 
self.journal.has_prepare(header)
     }
 
     fn replicate(&self, _prepare: Message<PrepareHeader>) {
+        // TODO Forward prepare to next replica in chain.
+        todo!()
+    }
+
+    fn on_repair(&self, _message: Message<PrepareHeader>) {
+        todo!()
+    }
+
+    /// Verify hash chain would not break if we add this header.
+    fn panic_if_hash_chain_would_break_in_same_view(
+        &self,
+        previous: &PrepareHeader,
+        current: &PrepareHeader,
+    ) {
+        // If both headers are in the same view, parent must chain correctly
+        if previous.view == current.view {
+            assert_eq!(
+                current.parent, previous.checksum,
+                "hash chain broken in same view: op={} parent={} expected={}",
+                current.op, current.parent, previous.checksum
+            );
+        }
+    }
+
+    // TODO: Implement jump_to_newer_op
+    // fn jump_to_newer_op(&self, header: &PrepareHeader) {}
+
+    fn commit_journal(&self) {
+        // TODO: Implement commit logic
+        // Walk through journal from last committed to current commit number
+        // Apply each entry to the state machine
         todo!()
     }
 }

Reply via email to