This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 471973ddd feat(cluster): Add initial support for `VsrConsensus` and 
`Pipeline` (#2426)
471973ddd is described below

commit 471973ddd33c7a60eb42b4de35212bc064a7f72f
Author: Krishna Vishal <[email protected]>
AuthorDate: Tue Dec 2 20:01:53 2025 +0530

    feat(cluster): Add initial support for `VsrConsensus` and `Pipeline` (#2426)
    
    This PR adds initial support for `VsrConsensus` and `Pipeline`.
    
    Also adds a `Sequencer` trait for flexibility of async sequencers in the
    future.
    
    ---------
    
    Co-authored-by: Grzegorz Koszyk 
<[email protected]>
    Co-authored-by: Piotr Gankiewicz <[email protected]>
---
 Cargo.lock                                |  16 ++
 Cargo.toml                                |   1 +
 DEPENDENCIES.md                           |   2 +
 core/common/src/types/consensus/header.rs |   2 +
 core/consensus/Cargo.toml                 |   3 +-
 core/consensus/src/impls.rs               | 417 ++++++++++++++++++++++++++++--
 core/consensus/src/lib.rs                 |   1 +
 core/metadata/src/impls/metadata.rs       |   3 +-
 8 files changed, 415 insertions(+), 30 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 1998d3f78..59712c563 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1130,6 +1130,21 @@ dependencies = [
  "virtue",
 ]
 
+[[package]]
+name = "bit-set"
+version = "0.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "08807e080ed7f9d5433fa9b275196cfc35414f66a0c79d864dc51a0d825231a3"
+dependencies = [
+ "bit-vec",
+]
+
+[[package]]
+name = "bit-vec"
+version = "0.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "5e764a1d40d510daf35e07be9eb06e75770908c27d411ee6c92109c9840eaaf7"
+
 [[package]]
 name = "bitflags"
 version = "1.3.2"
@@ -1919,6 +1934,7 @@ dependencies = [
 name = "consensus"
 version = "0.1.0"
 dependencies = [
+ "bit-set",
  "iggy_common",
  "message_bus",
 ]
diff --git a/Cargo.toml b/Cargo.toml
index 355be9d16..a851ec3b4 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -81,6 +81,7 @@ bench-dashboard-shared = { path = 
"core/bench/dashboard/shared" }
 bench-report = { path = "core/bench/report" }
 bench-runner = { path = "core/bench/runner" }
 bincode = { version = "2.0.1", features = ["serde"] }
+bit-set = "0.8.0"
 blake3 = "1.8.2"
 bon = "3.8.1"
 byte-unit = { version = "5.2.0", default-features = false, features = [
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index 239ffed5a..6be507bd2 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -93,6 +93,8 @@ bimap: 0.6.3, "Apache-2.0 OR MIT",
 bincode: 1.3.3, "MIT",
 bincode: 2.0.1, "MIT",
 bincode_derive: 2.0.1, "MIT",
+bit-set: 0.8.0, "Apache-2.0 OR MIT",
+bit-vec: 0.8.0, "Apache-2.0 OR MIT",
 bitflags: 1.3.2, "Apache-2.0 OR MIT",
 bitflags: 2.10.0, "Apache-2.0 OR MIT",
 bitvec: 1.0.1, "MIT",
diff --git a/core/common/src/types/consensus/header.rs 
b/core/common/src/types/consensus/header.rs
index f069bbc4f..b20d0d5bf 100644
--- a/core/common/src/types/consensus/header.rs
+++ b/core/common/src/types/consensus/header.rs
@@ -156,6 +156,7 @@ pub struct RequestHeader {
     pub replica: u8,
     pub reserved_frame: [u8; 12],
 
+    pub client: u128,
     pub request_checksum: u128,
     pub timestamp: u64,
     pub request: u64,
@@ -200,6 +201,7 @@ pub struct PrepareHeader {
     pub replica: u8,
     pub reserved_frame: [u8; 12],
 
+    pub client: u128,
     pub parent: u128,
     pub parent_padding: u128,
     pub request_checksum: u128,
diff --git a/core/consensus/Cargo.toml b/core/consensus/Cargo.toml
index 0b1a9c02f..d84f8b411 100644
--- a/core/consensus/Cargo.toml
+++ b/core/consensus/Cargo.toml
@@ -28,7 +28,6 @@ repository = "https://github.com/apache/iggy";
 readme = "../../../README.md"
 
 [dependencies]
+bit-set = { workspace = true }
 iggy_common = { path = "../common" }
 message_bus = { path = "../message_bus" }
-
-
diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index 19bafa65d..d830c5a93 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -16,44 +16,375 @@
 // under the License.
 
 use crate::{Consensus, Project};
+use bit_set::BitSet;
 use iggy_common::header::{Command2, PrepareHeader, PrepareOkHeader, 
RequestHeader};
 use iggy_common::message::Message;
 use message_bus::IggyMessageBus;
-use std::cell::Cell;
+use std::cell::{Cell, RefCell};
+use std::collections::VecDeque;
 
-pub struct VsrConsensus {
+pub trait Sequencer {
+    type Sequence;
+    /// Get the current sequence number
+    fn current_sequence(&self) -> Self::Sequence;
+
+    /// Allocate the next sequence number.
+    /// TODO Should this return a Future<Output = u64>? for async case?
+    fn next_sequence(&self) -> Self::Sequence;
+
+    /// Update the current sequence number.
+    fn set_sequence(&self, sequence: Self::Sequence);
+}
+
+pub struct LocalSequencer {
     op: Cell<u64>,
 }
 
+impl LocalSequencer {
+    pub fn new(initial_op: u64) -> Self {
+        Self {
+            op: Cell::new(initial_op),
+        }
+    }
+}
+
+impl Sequencer for LocalSequencer {
+    type Sequence = u64;
+
+    fn current_sequence(&self) -> Self::Sequence {
+        self.op.get()
+    }
+
+    fn next_sequence(&self) -> Self::Sequence {
+        let current = self.current_sequence();
+        let next = current.checked_add(1).expect("sequence number overflow");
+        self.set_sequence(next);
+        next
+    }
+
+    fn set_sequence(&self, sequence: Self::Sequence) {
+        self.op.set(sequence);
+    }
+}
+
+/// TODO The below numbers need to be added a consensus config
+/// TODO understand how to configure these numbers.
+/// Maximum number of prepares that can be in-flight in the pipeline.
+pub const PIPELINE_PREPARE_QUEUE_MAX: usize = 8;
+
+/// Maximum number of replicas in a cluster.
+pub const REPLICAS_MAX: usize = 32;
+
+pub struct PipelineEntry {
+    pub message: Message<PrepareHeader>,
+    /// Bitmap of replicas that have acknowledged this prepare.
+    pub ok_from_replicas: BitSet<u32>,
+    /// Whether we've received a quorum of prepare_ok messages.
+    pub ok_quorum_received: bool,
+}
+
+impl PipelineEntry {
+    pub fn new(message: Message<PrepareHeader>) -> Self {
+        Self {
+            message,
+            ok_from_replicas: BitSet::with_capacity(REPLICAS_MAX),
+            ok_quorum_received: false,
+        }
+    }
+
+    /// Record a prepare_ok from the given replica.
+    /// Returns the new count of acknowledgments.
+    pub fn add_ack(&mut self, replica: u8) -> usize {
+        self.ok_from_replicas.insert(replica as usize);
+        self.ok_from_replicas.len()
+    }
+
+    /// Check if we have an ack from the given replica.
+    pub fn has_ack(&self, replica: u8) -> bool {
+        self.ok_from_replicas.contains(replica as usize)
+    }
+
+    /// Get the number of acks received.
+    pub fn ack_count(&self) -> usize {
+        self.ok_from_replicas.len()
+    }
+}
+
+/// A request message waiting to be prepared.
+pub struct RequestEntry {
+    pub message: Message<RequestHeader>,
+    /// Timestamp when the request was received (for ordering/timeout).
+    pub received_at: i64, //TODO figure the correct way to do this
+}
+
+impl RequestEntry {
+    pub fn new(message: Message<RequestHeader>) -> Self {
+        Self {
+            message,
+            received_at: 0, //TODO figure the correct way to do this
+        }
+    }
+}
+
+pub struct Pipeline {
+    /// Messages being prepared (uncommitted and being replicated).
+    prepare_queue: VecDeque<PipelineEntry>,
+}
+
+impl Default for Pipeline {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl Pipeline {
+    pub fn new() -> Self {
+        Self {
+            prepare_queue: VecDeque::with_capacity(PIPELINE_PREPARE_QUEUE_MAX),
+        }
+    }
+
+    pub fn prepare_count(&self) -> usize {
+        self.prepare_queue.len()
+    }
+
+    pub fn prepare_queue_full(&self) -> bool {
+        self.prepare_queue.len() >= PIPELINE_PREPARE_QUEUE_MAX
+    }
+
+    /// Returns true if prepare queue is full.
+    pub fn is_full(&self) -> bool {
+        self.prepare_queue_full()
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.prepare_queue.is_empty()
+    }
+
+    /// Push a new prepare to the pipeline.
+    ///
+    /// # Panics
+    /// - If prepare queue is full.
+    /// - If the prepare doesn't chain correctly to the previous entry.
+    pub fn push_prepare(&mut self, message: Message<PrepareHeader>) {
+        assert!(!self.prepare_queue_full(), "prepare queue is full");
+
+        let header = message.header();
+
+        // Verify hash chain if there's a previous entry
+        if let Some(tail) = self.prepare_queue.back() {
+            let tail_header = tail.message.header();
+            assert_eq!(
+                header.op,
+                tail_header.op + 1,
+                "sequence must be sequential: expected {}, got {}",
+                tail_header.op + 1,
+                header.op
+            );
+            assert_eq!(
+                header.parent, tail_header.checksum,
+                "parent must chain to previous checksum"
+            );
+            assert!(header.view >= tail_header.view, "view cannot go 
backwards");
+        }
+
+        self.prepare_queue.push_back(PipelineEntry::new(message));
+    }
+
+    /// Pop the oldest prepare (after it's been committed).
+    ///
+    pub fn pop_prepare(&mut self) -> Option<PipelineEntry> {
+        self.prepare_queue.pop_front()
+    }
+
+    /// Get the head (oldest) prepare.
+    pub fn prepare_head(&self) -> Option<&PipelineEntry> {
+        self.prepare_queue.front()
+    }
+
+    pub fn prepare_head_mut(&mut self) -> Option<&mut PipelineEntry> {
+        self.prepare_queue.front_mut()
+    }
+
+    /// Get the tail (newest) prepare.
+    pub fn prepare_tail(&self) -> Option<&PipelineEntry> {
+        self.prepare_queue.back()
+    }
+
+    /// Find a prepare by op number and checksum.
+    pub fn prepare_by_op_and_checksum(
+        &mut self,
+        op: u64,
+        checksum: u128,
+    ) -> Option<&mut PipelineEntry> {
+        let head_op = self.prepare_queue.front()?.message.header().op;
+        let tail_op = self.prepare_queue.back()?.message.header().op;
+
+        // Verify consecutive ops invariant
+        debug_assert_eq!(
+            tail_op,
+            head_op + self.prepare_queue.len() as u64 - 1,
+            "prepare queue ops not consecutive"
+        );
+
+        if op < head_op || op > tail_op {
+            return None;
+        }
+
+        let index = (op - head_op) as usize;
+        let entry = self.prepare_queue.get_mut(index)?;
+
+        debug_assert_eq!(entry.message.header().op, op);
+
+        if entry.message.header().checksum == checksum {
+            Some(entry)
+        } else {
+            None
+        }
+    }
+
+    /// Find a prepare by op number only.
+    pub fn prepare_by_op(&self, op: u64) -> Option<&PipelineEntry> {
+        let head_op = self.prepare_queue.front()?.message.header().op;
+
+        if op < head_op {
+            return None;
+        }
+
+        let index = (op - head_op) as usize;
+        self.prepare_queue.get(index)
+    }
+
+    /// Search prepare queue for a message from the given client.
+    ///
+    /// If there are multiple messages (possible in prepare_queue after view 
change),
+    /// returns the latest one.
+    pub fn has_message_from_client(&self, client: u128) -> bool {
+        self.prepare_queue
+            .iter()
+            .any(|p| p.message.header().client == client)
+    }
+
+    /// Verify pipeline invariants.
+    ///
+    /// # Panics
+    /// If any invariant is violated.
+    pub fn verify(&self) {
+        // Check capacity limits
+        assert!(self.prepare_queue.len() <= PIPELINE_PREPARE_QUEUE_MAX);
+
+        // Verify prepare queue hash chain
+        if let Some(head) = self.prepare_queue.front() {
+            let mut expected_op = head.message.header().op;
+            let mut expected_parent = head.message.header().parent;
+
+            for entry in &self.prepare_queue {
+                let header = entry.message.header();
+
+                assert_eq!(header.op, expected_op, "ops must be sequential");
+                assert_eq!(header.parent, expected_parent, "must be 
hash-chained");
+
+                expected_parent = header.checksum;
+                expected_op += 1;
+            }
+        }
+    }
+
+    /// Clear prepare queue.
+    pub fn clear(&mut self) {
+        self.prepare_queue.clear();
+    }
+}
+
+pub enum Status {
+    Normal,
+    ViewChange,
+    Recovering,
+}
+
+#[allow(unused)]
+pub struct VsrConsensus {
+    cluster: u128,
+    replica: u8,
+    replica_count: u8,
+
+    view: Cell<u32>,
+    log_view: Cell<u32>,
+    status: Cell<Status>,
+    commit: Cell<u64>,
+
+    sequencer: LocalSequencer,
+
+    last_timestamp: Cell<u64>,
+    last_prepare_checksum: Cell<u128>,
+
+    pipeline: RefCell<Pipeline>,
+}
+
 impl VsrConsensus {
-    pub fn advance_commit_number(&self) {}
+    pub fn new(cluster: u128, replica: u8, replica_count: u8) -> Self {
+        assert!(
+            replica < replica_count,
+            "replica index must be < replica_count"
+        );
+        assert!(replica_count >= 1, "need at least 1 replica");
+        Self {
+            cluster,
+            replica,
+            replica_count,
+            view: Cell::new(0),
+            log_view: Cell::new(0),
+            status: Cell::new(Status::Recovering),
+            sequencer: LocalSequencer::new(0),
+            commit: Cell::new(0),
+            last_timestamp: Cell::new(0),
+            last_prepare_checksum: Cell::new(0),
+            pipeline: RefCell::new(Pipeline::new()),
+        }
+    }
 
-    pub fn update_op(&self, op: u64) {
-        self.op.set(op);
+    pub fn primary_index(&self, view: u32) -> u8 {
+        view as u8 % self.replica_count
     }
 
-    pub fn op(&self) -> u64 {
-        self.op.get()
+    pub fn is_primary(&self) -> bool {
+        self.primary_index(self.view.get()) == self.replica
+    }
+
+    pub fn advance_commit_number(&self, commit: u64) {
+        if commit > self.commit.get() {
+            self.commit.set(commit);
+        }
+
+        assert!(self.commit.get() >= commit);
+    }
+
+    pub fn quorum(&self) -> usize {
+        (self.replica_count as usize / 2) + 1
     }
 }
 
 impl Project<Message<PrepareHeader>> for Message<RequestHeader> {
     type Consensus = VsrConsensus;
-    fn project(self, _consensus: &Self::Consensus) -> Message<PrepareHeader> {
+
+    fn project(self, consensus: &Self::Consensus) -> Message<PrepareHeader> {
+        let op = consensus.sequencer.current_sequence() + 1;
+
         self.replace_header(|prev| {
             PrepareHeader {
-                cluster: 0, // TODO: consesus.cluster
+                cluster: consensus.cluster,
                 size: prev.size,
-                view: 0, // TODO: consesus view
+                epoch: 0,
+                view: consensus.view.get(),
                 release: prev.release,
                 command: Command2::Prepare,
-                replica: 0, // TODO: consesus replica
-                parent: 0, // TODO: Get this from the previous entry in the 
journal (figure out how to pass that ctx here)
+                replica: consensus.replica,
+                parent: 0, // TODO: Get parent checksum from the previous 
entry in the journal (figure out how to pass that ctx here)
                 request_checksum: prev.request_checksum,
                 request: prev.request,
-                commit: 0,    // TODO: consensus.commit
-                op: 0,        // TODO: consensus.op
-                timestamp: 0, // TODO: consensus timestamp
+                commit: consensus.commit.get(),
+                op,
+                timestamp: 0, // 0 for now. Implement correct way to get 
timestamp later
                 operation: prev.operation,
                 ..Default::default()
             }
@@ -63,20 +394,20 @@ impl Project<Message<PrepareHeader>> for 
Message<RequestHeader> {
 
 impl Project<Message<PrepareOkHeader>> for Message<PrepareHeader> {
     type Consensus = VsrConsensus;
-    fn project(self, _consensus: &Self::Consensus) -> Message<PrepareOkHeader> 
{
+    fn project(self, consensus: &Self::Consensus) -> Message<PrepareOkHeader> {
         self.replace_header(|prev| {
             PrepareOkHeader {
                 command: Command2::PrepareOk,
                 parent: prev.parent,
                 prepare_checksum: prev.checksum,
                 request: prev.request,
-                cluster: 0, // TODO: consensus.cluster
-                replica: 0, // TODO: consensus replica
-                epoch: 0,   // TODO: consensus.epoch
+                cluster: consensus.cluster,
+                replica: consensus.replica,
+                epoch: 0, // TODO: consensus.epoch
                 // It's important to use the view of the replica, not the 
received prepare!
-                view: 0, // TODO: consensus.view
+                view: consensus.view.get(),
                 op: prev.op,
-                commit: 0, // TODO: consensus.commit
+                commit: consensus.commit.get(),
                 timestamp: prev.timestamp,
                 operation: prev.operation,
                 // PrepareOks are only header no body
@@ -92,20 +423,52 @@ impl Consensus for VsrConsensus {
     type RequestMessage = Message<RequestHeader>;
     type ReplicateMessage = Message<PrepareHeader>;
     type AckMessage = Message<PrepareOkHeader>;
+    type Sequencer = LocalSequencer;
+
+    fn pipeline_message(&self, message: Self::ReplicateMessage) {
+        assert!(self.is_primary(), "only primary can pipeline messages");
 
-    fn pipeline_message(&self, _message: Self::ReplicateMessage) {
-        todo!()
+        let mut pipeline = self.pipeline.borrow_mut();
+        pipeline.push_prepare(message);
     }
 
     fn verify_pipeline(&self) {
-        todo!()
+        let pipeline = self.pipeline.borrow();
+        pipeline.verify();
     }
 
-    fn post_replicate_verify(&self, _message: &Self::ReplicateMessage) {
-        todo!()
+    fn post_replicate_verify(&self, message: &Self::ReplicateMessage) {
+        let header = message.header();
+
+        // verify the message belongs to our cluster
+        assert_eq!(header.cluster, self.cluster, "cluster mismatch");
+
+        // verify view is not from the future
+        assert!(
+            header.view <= self.view.get(),
+            "prepare view {} is ahead of replica view {}",
+            header.view,
+            self.view.get()
+        );
+
+        // verify op is sequential
+        assert_eq!(
+            header.op,
+            self.sequencer.current_sequence() + 1,
+            "op must be sequential: expected {}, got {}",
+            self.sequencer.current_sequence() + 1,
+            header.op
+        );
+
+        // verify hash chain
+        assert_eq!(
+            header.parent,
+            self.last_prepare_checksum.get(),
+            "parent checksum mismatch"
+        );
     }
 
     fn is_follower(&self) -> bool {
-        todo!()
+        !self.is_primary()
     }
 }
diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs
index 2998bdfe9..400fcfa3e 100644
--- a/core/consensus/src/lib.rs
+++ b/core/consensus/src/lib.rs
@@ -28,6 +28,7 @@ pub trait Consensus {
     type RequestMessage: Project<Self::ReplicateMessage, Consensus = Self> + 
Clone;
     type ReplicateMessage: Project<Self::AckMessage, Consensus = Self> + Clone;
     type AckMessage;
+    type Sequencer: Sequencer;
 
     fn pipeline_message(&self, message: Self::ReplicateMessage);
     fn verify_pipeline(&self);
diff --git a/core/metadata/src/impls/metadata.rs 
b/core/metadata/src/impls/metadata.rs
index 140fa7e4c..718844e54 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -62,7 +62,8 @@ where
         }
 
         if self.consensus.is_follower() {
-            self.consensus.advance_commit_number();
+            self.consensus
+                .advance_commit_number(message.header().commit);
         }
         //self.consensus.update_op(header.op());
         self.journal.append(message).await;

Reply via email to