This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 471973ddd feat(cluster): Add initial support for `VsrConsensus` and
`Pipeline` (#2426)
471973ddd is described below
commit 471973ddd33c7a60eb42b4de35212bc064a7f72f
Author: Krishna Vishal <[email protected]>
AuthorDate: Tue Dec 2 20:01:53 2025 +0530
feat(cluster): Add initial support for `VsrConsensus` and `Pipeline` (#2426)
This PR adds initial support for `VsrConsensus` and `Pipeline`.
Also adds a `Sequencer` trait for flexibility of async sequencers in the
future.
---------
Co-authored-by: Grzegorz Koszyk
<[email protected]>
Co-authored-by: Piotr Gankiewicz <[email protected]>
---
Cargo.lock | 16 ++
Cargo.toml | 1 +
DEPENDENCIES.md | 2 +
core/common/src/types/consensus/header.rs | 2 +
core/consensus/Cargo.toml | 3 +-
core/consensus/src/impls.rs | 417 ++++++++++++++++++++++++++++--
core/consensus/src/lib.rs | 1 +
core/metadata/src/impls/metadata.rs | 3 +-
8 files changed, 415 insertions(+), 30 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 1998d3f78..59712c563 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1130,6 +1130,21 @@ dependencies = [
"virtue",
]
+[[package]]
+name = "bit-set"
+version = "0.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "08807e080ed7f9d5433fa9b275196cfc35414f66a0c79d864dc51a0d825231a3"
+dependencies = [
+ "bit-vec",
+]
+
+[[package]]
+name = "bit-vec"
+version = "0.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5e764a1d40d510daf35e07be9eb06e75770908c27d411ee6c92109c9840eaaf7"
+
[[package]]
name = "bitflags"
version = "1.3.2"
@@ -1919,6 +1934,7 @@ dependencies = [
name = "consensus"
version = "0.1.0"
dependencies = [
+ "bit-set",
"iggy_common",
"message_bus",
]
diff --git a/Cargo.toml b/Cargo.toml
index 355be9d16..a851ec3b4 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -81,6 +81,7 @@ bench-dashboard-shared = { path =
"core/bench/dashboard/shared" }
bench-report = { path = "core/bench/report" }
bench-runner = { path = "core/bench/runner" }
bincode = { version = "2.0.1", features = ["serde"] }
+bit-set = "0.8.0"
blake3 = "1.8.2"
bon = "3.8.1"
byte-unit = { version = "5.2.0", default-features = false, features = [
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index 239ffed5a..6be507bd2 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -93,6 +93,8 @@ bimap: 0.6.3, "Apache-2.0 OR MIT",
bincode: 1.3.3, "MIT",
bincode: 2.0.1, "MIT",
bincode_derive: 2.0.1, "MIT",
+bit-set: 0.8.0, "Apache-2.0 OR MIT",
+bit-vec: 0.8.0, "Apache-2.0 OR MIT",
bitflags: 1.3.2, "Apache-2.0 OR MIT",
bitflags: 2.10.0, "Apache-2.0 OR MIT",
bitvec: 1.0.1, "MIT",
diff --git a/core/common/src/types/consensus/header.rs
b/core/common/src/types/consensus/header.rs
index f069bbc4f..b20d0d5bf 100644
--- a/core/common/src/types/consensus/header.rs
+++ b/core/common/src/types/consensus/header.rs
@@ -156,6 +156,7 @@ pub struct RequestHeader {
pub replica: u8,
pub reserved_frame: [u8; 12],
+ pub client: u128,
pub request_checksum: u128,
pub timestamp: u64,
pub request: u64,
@@ -200,6 +201,7 @@ pub struct PrepareHeader {
pub replica: u8,
pub reserved_frame: [u8; 12],
+ pub client: u128,
pub parent: u128,
pub parent_padding: u128,
pub request_checksum: u128,
diff --git a/core/consensus/Cargo.toml b/core/consensus/Cargo.toml
index 0b1a9c02f..d84f8b411 100644
--- a/core/consensus/Cargo.toml
+++ b/core/consensus/Cargo.toml
@@ -28,7 +28,6 @@ repository = "https://github.com/apache/iggy"
readme = "../../../README.md"
[dependencies]
+bit-set = { workspace = true }
iggy_common = { path = "../common" }
message_bus = { path = "../message_bus" }
-
-
diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index 19bafa65d..d830c5a93 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -16,44 +16,375 @@
// under the License.
use crate::{Consensus, Project};
+use bit_set::BitSet;
use iggy_common::header::{Command2, PrepareHeader, PrepareOkHeader,
RequestHeader};
use iggy_common::message::Message;
use message_bus::IggyMessageBus;
-use std::cell::Cell;
+use std::cell::{Cell, RefCell};
+use std::collections::VecDeque;
-pub struct VsrConsensus {
+pub trait Sequencer {
+ type Sequence;
+ /// Get the current sequence number
+ fn current_sequence(&self) -> Self::Sequence;
+
+ /// Allocate the next sequence number.
+ /// TODO Should this return a Future<Output = u64>? for async case?
+ fn next_sequence(&self) -> Self::Sequence;
+
+ /// Update the current sequence number.
+ fn set_sequence(&self, sequence: Self::Sequence);
+}
+
+pub struct LocalSequencer {
op: Cell<u64>,
}
+impl LocalSequencer {
+ pub fn new(initial_op: u64) -> Self {
+ Self {
+ op: Cell::new(initial_op),
+ }
+ }
+}
+
+impl Sequencer for LocalSequencer {
+ type Sequence = u64;
+
+ fn current_sequence(&self) -> Self::Sequence {
+ self.op.get()
+ }
+
+ fn next_sequence(&self) -> Self::Sequence {
+ let current = self.current_sequence();
+ let next = current.checked_add(1).expect("sequence number overflow");
+ self.set_sequence(next);
+ next
+ }
+
+ fn set_sequence(&self, sequence: Self::Sequence) {
+ self.op.set(sequence);
+ }
+}
+
+/// TODO The below numbers need to be added a consensus config
+/// TODO understand how to configure these numbers.
+/// Maximum number of prepares that can be in-flight in the pipeline.
+pub const PIPELINE_PREPARE_QUEUE_MAX: usize = 8;
+
+/// Maximum number of replicas in a cluster.
+pub const REPLICAS_MAX: usize = 32;
+
+pub struct PipelineEntry {
+ pub message: Message<PrepareHeader>,
+ /// Bitmap of replicas that have acknowledged this prepare.
+ pub ok_from_replicas: BitSet<u32>,
+ /// Whether we've received a quorum of prepare_ok messages.
+ pub ok_quorum_received: bool,
+}
+
+impl PipelineEntry {
+ pub fn new(message: Message<PrepareHeader>) -> Self {
+ Self {
+ message,
+ ok_from_replicas: BitSet::with_capacity(REPLICAS_MAX),
+ ok_quorum_received: false,
+ }
+ }
+
+ /// Record a prepare_ok from the given replica.
+ /// Returns the new count of acknowledgments.
+ pub fn add_ack(&mut self, replica: u8) -> usize {
+ self.ok_from_replicas.insert(replica as usize);
+ self.ok_from_replicas.len()
+ }
+
+ /// Check if we have an ack from the given replica.
+ pub fn has_ack(&self, replica: u8) -> bool {
+ self.ok_from_replicas.contains(replica as usize)
+ }
+
+ /// Get the number of acks received.
+ pub fn ack_count(&self) -> usize {
+ self.ok_from_replicas.len()
+ }
+}
+
+/// A request message waiting to be prepared.
+pub struct RequestEntry {
+ pub message: Message<RequestHeader>,
+ /// Timestamp when the request was received (for ordering/timeout).
+ pub received_at: i64, //TODO figure the correct way to do this
+}
+
+impl RequestEntry {
+ pub fn new(message: Message<RequestHeader>) -> Self {
+ Self {
+ message,
+ received_at: 0, //TODO figure the correct way to do this
+ }
+ }
+}
+
+pub struct Pipeline {
+ /// Messages being prepared (uncommitted and being replicated).
+ prepare_queue: VecDeque<PipelineEntry>,
+}
+
+impl Default for Pipeline {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl Pipeline {
+ pub fn new() -> Self {
+ Self {
+ prepare_queue: VecDeque::with_capacity(PIPELINE_PREPARE_QUEUE_MAX),
+ }
+ }
+
+ pub fn prepare_count(&self) -> usize {
+ self.prepare_queue.len()
+ }
+
+ pub fn prepare_queue_full(&self) -> bool {
+ self.prepare_queue.len() >= PIPELINE_PREPARE_QUEUE_MAX
+ }
+
+ /// Returns true if prepare queue is full.
+ pub fn is_full(&self) -> bool {
+ self.prepare_queue_full()
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.prepare_queue.is_empty()
+ }
+
+ /// Push a new prepare to the pipeline.
+ ///
+ /// # Panics
+ /// - If prepare queue is full.
+ /// - If the prepare doesn't chain correctly to the previous entry.
+ pub fn push_prepare(&mut self, message: Message<PrepareHeader>) {
+ assert!(!self.prepare_queue_full(), "prepare queue is full");
+
+ let header = message.header();
+
+ // Verify hash chain if there's a previous entry
+ if let Some(tail) = self.prepare_queue.back() {
+ let tail_header = tail.message.header();
+ assert_eq!(
+ header.op,
+ tail_header.op + 1,
+ "sequence must be sequential: expected {}, got {}",
+ tail_header.op + 1,
+ header.op
+ );
+ assert_eq!(
+ header.parent, tail_header.checksum,
+ "parent must chain to previous checksum"
+ );
+ assert!(header.view >= tail_header.view, "view cannot go
backwards");
+ }
+
+ self.prepare_queue.push_back(PipelineEntry::new(message));
+ }
+
+ /// Pop the oldest prepare (after it's been committed).
+ ///
+ pub fn pop_prepare(&mut self) -> Option<PipelineEntry> {
+ self.prepare_queue.pop_front()
+ }
+
+ /// Get the head (oldest) prepare.
+ pub fn prepare_head(&self) -> Option<&PipelineEntry> {
+ self.prepare_queue.front()
+ }
+
+ pub fn prepare_head_mut(&mut self) -> Option<&mut PipelineEntry> {
+ self.prepare_queue.front_mut()
+ }
+
+ /// Get the tail (newest) prepare.
+ pub fn prepare_tail(&self) -> Option<&PipelineEntry> {
+ self.prepare_queue.back()
+ }
+
+ /// Find a prepare by op number and checksum.
+ pub fn prepare_by_op_and_checksum(
+ &mut self,
+ op: u64,
+ checksum: u128,
+ ) -> Option<&mut PipelineEntry> {
+ let head_op = self.prepare_queue.front()?.message.header().op;
+ let tail_op = self.prepare_queue.back()?.message.header().op;
+
+ // Verify consecutive ops invariant
+ debug_assert_eq!(
+ tail_op,
+ head_op + self.prepare_queue.len() as u64 - 1,
+ "prepare queue ops not consecutive"
+ );
+
+ if op < head_op || op > tail_op {
+ return None;
+ }
+
+ let index = (op - head_op) as usize;
+ let entry = self.prepare_queue.get_mut(index)?;
+
+ debug_assert_eq!(entry.message.header().op, op);
+
+ if entry.message.header().checksum == checksum {
+ Some(entry)
+ } else {
+ None
+ }
+ }
+
+ /// Find a prepare by op number only.
+ pub fn prepare_by_op(&self, op: u64) -> Option<&PipelineEntry> {
+ let head_op = self.prepare_queue.front()?.message.header().op;
+
+ if op < head_op {
+ return None;
+ }
+
+ let index = (op - head_op) as usize;
+ self.prepare_queue.get(index)
+ }
+
+ /// Search prepare queue for a message from the given client.
+ ///
+ /// If there are multiple messages (possible in prepare_queue after view
change),
+ /// returns the latest one.
+ pub fn has_message_from_client(&self, client: u128) -> bool {
+ self.prepare_queue
+ .iter()
+ .any(|p| p.message.header().client == client)
+ }
+
+ /// Verify pipeline invariants.
+ ///
+ /// # Panics
+ /// If any invariant is violated.
+ pub fn verify(&self) {
+ // Check capacity limits
+ assert!(self.prepare_queue.len() <= PIPELINE_PREPARE_QUEUE_MAX);
+
+ // Verify prepare queue hash chain
+ if let Some(head) = self.prepare_queue.front() {
+ let mut expected_op = head.message.header().op;
+ let mut expected_parent = head.message.header().parent;
+
+ for entry in &self.prepare_queue {
+ let header = entry.message.header();
+
+ assert_eq!(header.op, expected_op, "ops must be sequential");
+ assert_eq!(header.parent, expected_parent, "must be
hash-chained");
+
+ expected_parent = header.checksum;
+ expected_op += 1;
+ }
+ }
+ }
+
+ /// Clear prepare queue.
+ pub fn clear(&mut self) {
+ self.prepare_queue.clear();
+ }
+}
+
+pub enum Status {
+ Normal,
+ ViewChange,
+ Recovering,
+}
+
+#[allow(unused)]
+pub struct VsrConsensus {
+ cluster: u128,
+ replica: u8,
+ replica_count: u8,
+
+ view: Cell<u32>,
+ log_view: Cell<u32>,
+ status: Cell<Status>,
+ commit: Cell<u64>,
+
+ sequencer: LocalSequencer,
+
+ last_timestamp: Cell<u64>,
+ last_prepare_checksum: Cell<u128>,
+
+ pipeline: RefCell<Pipeline>,
+}
+
impl VsrConsensus {
- pub fn advance_commit_number(&self) {}
+ pub fn new(cluster: u128, replica: u8, replica_count: u8) -> Self {
+ assert!(
+ replica < replica_count,
+ "replica index must be < replica_count"
+ );
+ assert!(replica_count >= 1, "need at least 1 replica");
+ Self {
+ cluster,
+ replica,
+ replica_count,
+ view: Cell::new(0),
+ log_view: Cell::new(0),
+ status: Cell::new(Status::Recovering),
+ sequencer: LocalSequencer::new(0),
+ commit: Cell::new(0),
+ last_timestamp: Cell::new(0),
+ last_prepare_checksum: Cell::new(0),
+ pipeline: RefCell::new(Pipeline::new()),
+ }
+ }
- pub fn update_op(&self, op: u64) {
- self.op.set(op);
+ pub fn primary_index(&self, view: u32) -> u8 {
+ view as u8 % self.replica_count
}
- pub fn op(&self) -> u64 {
- self.op.get()
+ pub fn is_primary(&self) -> bool {
+ self.primary_index(self.view.get()) == self.replica
+ }
+
+ pub fn advance_commit_number(&self, commit: u64) {
+ if commit > self.commit.get() {
+ self.commit.set(commit);
+ }
+
+ assert!(self.commit.get() >= commit);
+ }
+
+ pub fn quorum(&self) -> usize {
+ (self.replica_count as usize / 2) + 1
}
}
impl Project<Message<PrepareHeader>> for Message<RequestHeader> {
type Consensus = VsrConsensus;
- fn project(self, _consensus: &Self::Consensus) -> Message<PrepareHeader> {
+
+ fn project(self, consensus: &Self::Consensus) -> Message<PrepareHeader> {
+ let op = consensus.sequencer.current_sequence() + 1;
+
self.replace_header(|prev| {
PrepareHeader {
- cluster: 0, // TODO: consesus.cluster
+ cluster: consensus.cluster,
size: prev.size,
- view: 0, // TODO: consesus view
+ epoch: 0,
+ view: consensus.view.get(),
release: prev.release,
command: Command2::Prepare,
- replica: 0, // TODO: consesus replica
- parent: 0, // TODO: Get this from the previous entry in the
journal (figure out how to pass that ctx here)
+ replica: consensus.replica,
+ parent: 0, // TODO: Get parent checksum from the previous
entry in the journal (figure out how to pass that ctx here)
request_checksum: prev.request_checksum,
request: prev.request,
- commit: 0, // TODO: consensus.commit
- op: 0, // TODO: consensus.op
- timestamp: 0, // TODO: consensus timestamp
+ commit: consensus.commit.get(),
+ op,
+ timestamp: 0, // 0 for now. Implement correct way to get
timestamp later
operation: prev.operation,
..Default::default()
}
@@ -63,20 +394,20 @@ impl Project<Message<PrepareHeader>> for
Message<RequestHeader> {
impl Project<Message<PrepareOkHeader>> for Message<PrepareHeader> {
type Consensus = VsrConsensus;
- fn project(self, _consensus: &Self::Consensus) -> Message<PrepareOkHeader>
{
+ fn project(self, consensus: &Self::Consensus) -> Message<PrepareOkHeader> {
self.replace_header(|prev| {
PrepareOkHeader {
command: Command2::PrepareOk,
parent: prev.parent,
prepare_checksum: prev.checksum,
request: prev.request,
- cluster: 0, // TODO: consensus.cluster
- replica: 0, // TODO: consensus replica
- epoch: 0, // TODO: consensus.epoch
+ cluster: consensus.cluster,
+ replica: consensus.replica,
+ epoch: 0, // TODO: consensus.epoch
// It's important to use the view of the replica, not the
received prepare!
- view: 0, // TODO: consensus.view
+ view: consensus.view.get(),
op: prev.op,
- commit: 0, // TODO: consensus.commit
+ commit: consensus.commit.get(),
timestamp: prev.timestamp,
operation: prev.operation,
// PrepareOks are only header no body
@@ -92,20 +423,52 @@ impl Consensus for VsrConsensus {
type RequestMessage = Message<RequestHeader>;
type ReplicateMessage = Message<PrepareHeader>;
type AckMessage = Message<PrepareOkHeader>;
+ type Sequencer = LocalSequencer;
+
+ fn pipeline_message(&self, message: Self::ReplicateMessage) {
+ assert!(self.is_primary(), "only primary can pipeline messages");
- fn pipeline_message(&self, _message: Self::ReplicateMessage) {
- todo!()
+ let mut pipeline = self.pipeline.borrow_mut();
+ pipeline.push_prepare(message);
}
fn verify_pipeline(&self) {
- todo!()
+ let pipeline = self.pipeline.borrow();
+ pipeline.verify();
}
- fn post_replicate_verify(&self, _message: &Self::ReplicateMessage) {
- todo!()
+ fn post_replicate_verify(&self, message: &Self::ReplicateMessage) {
+ let header = message.header();
+
+ // verify the message belongs to our cluster
+ assert_eq!(header.cluster, self.cluster, "cluster mismatch");
+
+ // verify view is not from the future
+ assert!(
+ header.view <= self.view.get(),
+ "prepare view {} is ahead of replica view {}",
+ header.view,
+ self.view.get()
+ );
+
+ // verify op is sequential
+ assert_eq!(
+ header.op,
+ self.sequencer.current_sequence() + 1,
+ "op must be sequential: expected {}, got {}",
+ self.sequencer.current_sequence() + 1,
+ header.op
+ );
+
+ // verify hash chain
+ assert_eq!(
+ header.parent,
+ self.last_prepare_checksum.get(),
+ "parent checksum mismatch"
+ );
}
fn is_follower(&self) -> bool {
- todo!()
+ !self.is_primary()
}
}
diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs
index 2998bdfe9..400fcfa3e 100644
--- a/core/consensus/src/lib.rs
+++ b/core/consensus/src/lib.rs
@@ -28,6 +28,7 @@ pub trait Consensus {
type RequestMessage: Project<Self::ReplicateMessage, Consensus = Self> +
Clone;
type ReplicateMessage: Project<Self::AckMessage, Consensus = Self> + Clone;
type AckMessage;
+ type Sequencer: Sequencer;
fn pipeline_message(&self, message: Self::ReplicateMessage);
fn verify_pipeline(&self);
diff --git a/core/metadata/src/impls/metadata.rs
b/core/metadata/src/impls/metadata.rs
index 140fa7e4c..718844e54 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -62,7 +62,8 @@ where
}
if self.consensus.is_follower() {
- self.consensus.advance_commit_number();
+ self.consensus
+ .advance_commit_number(message.header().commit);
}
//self.consensus.update_op(header.op());
self.journal.append(message).await;