This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 54a67f837 feat(cluster): Add `on_replicate` and related machinery
(#2469)
54a67f837 is described below
commit 54a67f8372e5eb8d30b2443738536a6634d238de
Author: Krishna Vishal <[email protected]>
AuthorDate: Thu Dec 11 18:15:01 2025 +0530
feat(cluster): Add `on_replicate` and related machinery (#2469)
I've added initial implementation of `on_replicate` and related
machinery in `Journal` etc.
`on_replicate` is same as `on_prepare` in VSR.
Didn't implement `send_ack` (`send_prepare_ok`) because it needed
`MessageBus`. I wasn't sure about the design (where to add it).
Co-authored-by: Grzegorz Koszyk
<[email protected]>
---
core/consensus/src/impls.rs | 30 +++++++++
core/consensus/src/lib.rs | 1 +
core/journal/src/lib.rs | 8 +++
core/metadata/src/impls/metadata.rs | 124 +++++++++++++++++++++++++++++++++---
4 files changed, 155 insertions(+), 8 deletions(-)
diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index d830c5a93..c8f6a9503 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -296,6 +296,7 @@ impl Pipeline {
}
}
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Status {
Normal,
ViewChange,
@@ -362,6 +363,31 @@ impl VsrConsensus {
pub fn quorum(&self) -> usize {
(self.replica_count as usize / 2) + 1
}
+
+ pub fn commit(&self) -> u64 {
+ self.commit.get()
+ }
+
+ pub fn is_syncing(&self) -> bool {
+ // for now return false. we have to add syncing related setup to
VsrConsensus to make this work.
+ false
+ }
+
+ pub fn replica(&self) -> u8 {
+ self.replica
+ }
+
+ pub fn sequencer(&self) -> &LocalSequencer {
+ &self.sequencer
+ }
+
+ pub fn view(&self) -> u32 {
+ self.view.get()
+ }
+
+ pub fn status(&self) -> Status {
+ self.status.get()
+ }
}
impl Project<Message<PrepareHeader>> for Message<RequestHeader> {
@@ -471,4 +497,8 @@ impl Consensus for VsrConsensus {
fn is_follower(&self) -> bool {
!self.is_primary()
}
+
+ fn is_syncing(&self) -> bool {
+ self.is_syncing()
+ }
}
diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs
index 1db81e9eb..8ff864ffd 100644
--- a/core/consensus/src/lib.rs
+++ b/core/consensus/src/lib.rs
@@ -37,6 +37,7 @@ pub trait Consensus {
fn post_replicate_verify(&self, message: &Self::ReplicateMessage);
fn is_follower(&self) -> bool;
+ fn is_syncing(&self) -> bool;
}
mod impls;
diff --git a/core/journal/src/lib.rs b/core/journal/src/lib.rs
index 460173ab7..2aa693c68 100644
--- a/core/journal/src/lib.rs
+++ b/core/journal/src/lib.rs
@@ -19,5 +19,13 @@
// But the interface was designed for partition log, not an generic journal.
pub trait Journal {
type Entry;
+ type Header;
+
+ fn has_prepare(&self, header: &Self::Header) -> bool;
+
+ fn previous_entry(&self, header: &Self::Header) -> Option<Self::Header>;
+
+ fn set_header_as_dirty(&self, header: &Self::Header);
+
fn append(&self, entry: Self::Entry) -> impl Future<Output = ()>;
}
diff --git a/core/metadata/src/impls/metadata.rs
b/core/metadata/src/impls/metadata.rs
index 718844e54..383e45289 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -14,8 +14,11 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-use consensus::{Consensus, Project, VsrConsensus};
-use iggy_common::{header::PrepareHeader, message::Message};
+use consensus::{Consensus, Project, Sequencer, Status, VsrConsensus};
+use iggy_common::{
+ header::{Command2, PrepareHeader},
+ message::Message,
+};
use journal::Journal;
use tracing::{debug, warn};
@@ -25,7 +28,10 @@ trait Metadata {
type Consensus: Consensus;
type Journal: Journal<Entry = <Self::Consensus as
Consensus>::ReplicateMessage>;
+ /// Handle a replicate message (Prepare in VSR).
fn on_request(&self, message: <Self::Consensus as
Consensus>::RequestMessage);
+
+ /// Handle an ack message (PrepareOk in VSR).
fn on_replicate(
&self,
message: <Self::Consensus as Consensus>::ReplicateMessage,
@@ -43,7 +49,7 @@ struct IggyMetadata<M, J, S> {
impl<M, J, S> Metadata for IggyMetadata<M, J, S>
where
- J: Journal<Entry = <VsrConsensus as Consensus>::ReplicateMessage>,
+ J: Journal<Entry = <VsrConsensus as Consensus>::ReplicateMessage, Header =
PrepareHeader>,
{
type Consensus = VsrConsensus;
type Journal = J;
@@ -55,28 +61,99 @@ where
}
async fn on_replicate(&self, message: <Self::Consensus as
Consensus>::ReplicateMessage) {
+ let header = message.header();
+
+ assert_eq!(header.command, Command2::Prepare);
+
if !self.fence_old_prepare(&message) {
self.replicate(message.clone());
} else {
warn!("received old prepare, not replicating");
}
+ // If syncing, ignore the replicate message.
+ if self.consensus.is_syncing() {
+ warn!(
+ replica = self.consensus.replica(),
+ "on_replicate: ignoring (sync)"
+ );
+ return;
+ }
+
+ let current_op = self.consensus.sequencer().current_sequence();
+
+ // Old message (handle as repair). Not replicating.
+ if header.view < self.consensus.view()
+ || (self.consensus.status() == Status::Normal
+ && header.view == self.consensus.view()
+ && header.op <= current_op)
+ {
+ debug!(
+ replica = self.consensus.replica(),
+ "on_replicate: ignoring (repair)"
+ );
+ self.on_repair(message);
+ return;
+ }
+
+ // If status is not normal, ignore the replicate.
+ if self.consensus.status() != Status::Normal {
+ warn!(
+ replica = self.consensus.replica(),
+ "on_replicate: ignoring (not normal state)"
+ );
+ return;
+ }
+
+ //if message from future view, we ignore the replicate.
+ if header.view > self.consensus.view() {
+ warn!(
+ replica = self.consensus.replica(),
+ "on_replicate: ignoring (newer view)"
+ );
+ return;
+ }
+
+ // TODO add assertions for valid state here.
+
+ // If we are a follower, we advance the commit number.
if self.consensus.is_follower() {
self.consensus
.advance_commit_number(message.header().commit);
}
- //self.consensus.update_op(header.op());
+
+ // TODO verify that the current prepare fits in the WAL.
+
+ // TODO handle gap in ops.
+
+ // Verify hash chain integrity.
+ if let Some(previous) = self.journal.previous_entry(header) {
+ self.panic_if_hash_chain_would_break_in_same_view(&previous,
header);
+ }
+
+ assert_eq!(header.op, current_op + 1);
+
+ self.consensus.sequencer().set_sequence(header.op);
+ self.journal.set_header_as_dirty(header);
+
+ // Append to journal.
self.journal.append(message).await;
+
+ // If follower, commit any newly committable entries.
+ if self.consensus.is_follower() {
+ self.commit_journal();
+ }
}
fn on_ack(&self, _message: <Self::Consensus as Consensus>::AckMessage) {
+ // TODO: Implement on_prepare_ok logic
todo!()
}
}
impl<M, J, S> IggyMetadata<M, J, S>
where
- J: Journal<Entry = <VsrConsensus as Consensus>::ReplicateMessage>,
+ J: Journal<Entry = <VsrConsensus as Consensus>::ReplicateMessage, Header =
PrepareHeader>,
{
#[expect(unused)]
fn pipeline_prepare(&self, prepare: Message<PrepareHeader>) {
@@ -88,12 +165,43 @@ where
self.consensus.post_replicate_verify(&prepare);
}
- fn fence_old_prepare(&self, _prepare: &Message<PrepareHeader>) -> bool {
- // TODO
- false
+ fn fence_old_prepare(&self, prepare: &Message<PrepareHeader>) -> bool {
+ let header = prepare.header();
+ header.op <= self.consensus.commit() ||
self.journal.has_prepare(header)
}
fn replicate(&self, _prepare: Message<PrepareHeader>) {
+ // TODO Forward prepare to next replica in chain.
+ todo!()
+ }
+
+ fn on_repair(&self, _message: Message<PrepareHeader>) {
+ todo!()
+ }
+
+ /// Verify hash chain would not break if we add this header.
+ fn panic_if_hash_chain_would_break_in_same_view(
+ &self,
+ previous: &PrepareHeader,
+ current: &PrepareHeader,
+ ) {
+ // If both headers are in the same view, parent must chain correctly
+ if previous.view == current.view {
+ assert_eq!(
+ current.parent, previous.checksum,
+ "hash chain broken in same view: op={} parent={} expected={}",
+ current.op, current.parent, previous.checksum
+ );
+ }
+ }
+
+ // TODO: Implement jump_to_newer_op
+ // fn jump_to_newer_op(&self, header: &PrepareHeader) {}
+
+ fn commit_journal(&self) {
+ // TODO: Implement commit logic
+ // Walk through journal from last committed to current commit number
+ // Apply each entry to the state machine
todo!()
}
}