This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new ae484ec7d feat(consensus): enable independent commit progress across
namespaces (#2765)
ae484ec7d is described below
commit ae484ec7d53ab64e377e878294032c2715b818d6
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Feb 20 14:56:48 2026 +0100
feat(consensus): enable independent commit progress across namespaces
(#2765)
LocalPipeline's single VecDeque blocked independent commit
progress across namespaces. NamespacedPipeline uses per-namespace
VecDeques under one global op sequence and hash chain, so
drain_committable_all drains each namespace independently.
Additional fixes: wire last_prepare_checksum through Project
and on_replicate instead of hardcoded 0; walk consecutive ops
in ack_quorum_reached to prevent premature drain; propagate
namespace in build_reply_message; enforce namespace registration
before push; add namespace field to view change headers; replace
magic-number dispatch with Operation enum in simulator.
---
core/common/src/types/consensus/header.rs | 26 +-
core/consensus/src/impls.rs | 147 ++++----
core/consensus/src/lib.rs | 10 +-
core/consensus/src/namespaced_pipeline.rs | 540 ++++++++++++++++++++++++++++++
core/consensus/src/plane_helpers.rs | 159 ++++++++-
core/consensus/src/vsr_timeout.rs | 1 +
core/metadata/src/impls/metadata.rs | 89 ++---
core/partitions/src/iggy_partitions.rs | 206 +++++++-----
core/simulator/src/deps.rs | 18 +-
core/simulator/src/lib.rs | 28 +-
core/simulator/src/replica.rs | 57 ++--
11 files changed, 1035 insertions(+), 246 deletions(-)
diff --git a/core/common/src/types/consensus/header.rs
b/core/common/src/types/consensus/header.rs
index c8d311fa7..2991f89c3 100644
--- a/core/common/src/types/consensus/header.rs
+++ b/core/common/src/types/consensus/header.rs
@@ -21,6 +21,8 @@ use thiserror::Error;
const HEADER_SIZE: usize = 256;
pub trait ConsensusHeader: Sized + Pod + Zeroable {
const COMMAND: Command2;
+ // TODO: Trait consts are never evaluated unless explicitly accessed (e.g.
`<T as ConsensusHeader>::_SIZE_CHECK`).
+ // The size invariant is enforced by repr(C) layout + bytemuck Pod derive;
consider adding a static_assert in each impl.
const _SIZE_CHECK: () = assert!(std::mem::size_of::<Self>() ==
HEADER_SIZE);
fn validate(&self) -> Result<(), ConsensusError>;
@@ -135,8 +137,7 @@ pub struct GenericHeader {
pub replica: u8,
pub reserved_frame: [u8; 66],
- pub namespace: u64,
- pub reserved_command: [u8; 120],
+ pub reserved_command: [u8; 128],
}
unsafe impl Pod for GenericHeader {}
@@ -489,18 +490,17 @@ impl Default for ReplyHeader {
#[repr(C)]
pub struct StartViewChangeHeader {
pub checksum: u128,
- pub checksum_padding: u128,
pub checksum_body: u128,
- pub checksum_body_padding: u128,
pub cluster: u128,
pub size: u32,
pub view: u32,
pub release: u32,
pub command: Command2,
pub replica: u8,
- pub reserved_frame: [u8; 42],
+ pub reserved_frame: [u8; 66],
- pub reserved: [u8; 128],
+ pub namespace: u64,
+ pub reserved: [u8; 120],
}
unsafe impl Pod for StartViewChangeHeader {}
@@ -536,16 +536,14 @@ impl ConsensusHeader for StartViewChangeHeader {
#[repr(C)]
pub struct DoViewChangeHeader {
pub checksum: u128,
- pub checksum_padding: u128,
pub checksum_body: u128,
- pub checksum_body_padding: u128,
pub cluster: u128,
pub size: u32,
pub view: u32,
pub release: u32,
pub command: Command2,
pub replica: u8,
- pub reserved_frame: [u8; 42],
+ pub reserved_frame: [u8; 66],
/// The highest op-number in this replica's log.
/// Used to select the most complete log when log_view values are equal.
@@ -553,11 +551,12 @@ pub struct DoViewChangeHeader {
/// The replica's commit number (highest committed op).
/// The new primary sets its commit to max(commit) across all DVCs.
pub commit: u64,
+ pub namespace: u64,
/// The view number when this replica's status was last normal.
/// This is the key field for log selection: the replica with the
/// highest log_view has the most authoritative log.
pub log_view: u32,
- pub reserved: [u8; 108],
+ pub reserved: [u8; 100],
}
unsafe impl Pod for DoViewChangeHeader {}
@@ -609,16 +608,14 @@ impl ConsensusHeader for DoViewChangeHeader {
#[repr(C)]
pub struct StartViewHeader {
pub checksum: u128,
- pub checksum_padding: u128,
pub checksum_body: u128,
- pub checksum_body_padding: u128,
pub cluster: u128,
pub size: u32,
pub view: u32,
pub release: u32,
pub command: Command2,
pub replica: u8,
- pub reserved_frame: [u8; 42],
+ pub reserved_frame: [u8; 66],
/// The op-number of the highest entry in the new primary's log.
/// Backups set their op to this value.
@@ -627,7 +624,8 @@ pub struct StartViewHeader {
/// This is max(commit) from all DVCs received by the primary.
/// Backups set their commit to this value.
pub commit: u64,
- pub reserved: [u8; 112],
+ pub namespace: u64,
+ pub reserved: [u8; 104],
}
unsafe impl Pod for StartViewHeader {}
diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index 2b6a8c410..500a522c7 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -317,20 +317,6 @@ impl LocalPipeline {
pub fn clear(&mut self) {
self.prepare_queue.clear();
}
-
- /// Extract and remove a message by op number.
- /// Returns None if op is not in the pipeline.
- pub fn extract_by_op(&mut self, op: u64) -> Option<PipelineEntry> {
- let head_op = self.prepare_queue.front()?.header.op;
- if op < head_op {
- return None;
- }
- let index = (op - head_op) as usize;
- if index >= self.prepare_queue.len() {
- return None;
- }
- self.prepare_queue.remove(index)
- }
}
impl Pipeline for LocalPipeline {
@@ -345,10 +331,6 @@ impl Pipeline for LocalPipeline {
LocalPipeline::pop_message(self)
}
- fn extract_by_op(&mut self, op: u64) -> Option<Self::Entry> {
- LocalPipeline::extract_by_op(self, op)
- }
-
fn clear(&mut self) {
LocalPipeline::clear(self)
}
@@ -365,6 +347,10 @@ impl Pipeline for LocalPipeline {
LocalPipeline::message_by_op_and_checksum(self, op, checksum)
}
+ fn head(&self) -> Option<&Self::Entry> {
+ LocalPipeline::head(self)
+ }
+
fn is_full(&self) -> bool {
LocalPipeline::is_full(self)
}
@@ -389,7 +375,7 @@ pub enum Status {
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum VsrAction {
/// Send StartViewChange to all replicas.
- SendStartViewChange { view: u32 },
+ SendStartViewChange { view: u32, namespace: u64 },
/// Send DoViewChange to primary.
SendDoViewChange {
view: u32,
@@ -397,11 +383,22 @@ pub enum VsrAction {
log_view: u32,
op: u64,
commit: u64,
+ namespace: u64,
},
/// Send StartView to all backups (as new primary).
- SendStartView { view: u32, op: u64, commit: u64 },
+ SendStartView {
+ view: u32,
+ op: u64,
+ commit: u64,
+ namespace: u64,
+ },
/// Send PrepareOK to primary.
- SendPrepareOk { view: u32, op: u64, target: u8 },
+ SendPrepareOk {
+ view: u32,
+ op: u64,
+ target: u8,
+ namespace: u64,
+ },
}
#[allow(unused)]
@@ -414,6 +411,7 @@ where
cluster: u128,
replica: u8,
replica_count: u8,
+ namespace: u64,
view: Cell<u32>,
@@ -455,16 +453,28 @@ where
}
impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> {
- pub fn new(cluster: u128, replica: u8, replica_count: u8, message_bus: B,
pipeline: P) -> Self {
+ pub fn new(
+ cluster: u128,
+ replica: u8,
+ replica_count: u8,
+ namespace: u64,
+ message_bus: B,
+ pipeline: P,
+ ) -> Self {
assert!(
replica < replica_count,
"replica index must be < replica_count"
);
assert!(replica_count >= 1, "need at least 1 replica");
+ // TODO: Verify that XOR-based seeding provides sufficient jitter
diversity
+ // across groups. Consider using a proper hash (e.g., Murmur3) of
+ // (replica_id, namespace) for production.
+ let timeout_seed = replica as u128 ^ namespace as u128;
Self {
cluster,
replica,
replica_count,
+ namespace,
view: Cell::new(0),
log_view: Cell::new(0),
status: Cell::new(Status::Recovering),
@@ -479,7 +489,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
do_view_change_quorum: Cell::new(false),
sent_own_start_view_change: Cell::new(false),
sent_own_do_view_change: Cell::new(false),
- timeouts: RefCell::new(TimeoutManager::new(replica as u128)),
+ timeouts: RefCell::new(TimeoutManager::new(timeout_seed)),
}
}
@@ -489,7 +499,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
}
pub fn primary_index(&self, view: u32) -> u8 {
- view as u8 % self.replica_count
+ (view % self.replica_count as u32) as u8
}
pub fn is_primary(&self) -> bool {
@@ -563,6 +573,18 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
self.replica_count
}
+ pub fn namespace(&self) -> u64 {
+ self.namespace
+ }
+
+ pub fn last_prepare_checksum(&self) -> u128 {
+ self.last_prepare_checksum.get()
+ }
+
+ pub fn set_last_prepare_checksum(&self, checksum: u128) {
+ self.last_prepare_checksum.set(checksum);
+ }
+
pub fn log_view(&self) -> u32 {
self.log_view.get()
}
@@ -678,7 +700,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
timeouts.start(TimeoutKind::ViewChangeStatus);
}
- vec![VsrAction::SendStartViewChange { view: new_view }]
+ vec![VsrAction::SendStartViewChange {
+ view: new_view,
+ namespace: self.namespace,
+ }]
}
/// Resend SVC message if we've started view change.
@@ -693,6 +718,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
vec![VsrAction::SendStartViewChange {
view: self.view.get(),
+ namespace: self.namespace,
}]
}
@@ -725,6 +751,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
log_view: self.log_view.get(),
op: current_op,
commit: current_commit,
+ namespace: self.namespace,
}]
}
@@ -748,7 +775,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
.borrow_mut()
.reset(TimeoutKind::ViewChangeStatus);
- vec![VsrAction::SendStartViewChange { view: next_view }]
+ vec![VsrAction::SendStartViewChange {
+ view: next_view,
+ namespace: self.namespace,
+ }]
}
/// Handle a received StartViewChange message.
@@ -757,6 +787,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
/// from f OTHER replicas, it sends a DOVIEWCHANGE message to the node
/// that will be the primary in the new view."
pub fn handle_start_view_change(&self, header: &StartViewChangeHeader) ->
Vec<VsrAction> {
+ assert_eq!(
+ header.namespace, self.namespace,
+ "SVC routed to wrong group"
+ );
let from_replica = header.replica;
let msg_view = header.view;
@@ -786,7 +820,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
}
// Send our own SVC
- actions.push(VsrAction::SendStartViewChange { view: msg_view });
+ actions.push(VsrAction::SendStartViewChange {
+ view: msg_view,
+ namespace: self.namespace,
+ });
}
// Record the SVC from sender
@@ -816,6 +853,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
log_view: self.log_view.get(),
op: current_op,
commit: current_commit,
+ namespace: self.namespace,
});
// If we are the primary candidate, record our own DVC
@@ -848,6 +886,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
/// replicas (including itself), it sets its view-number to that in the
messages
/// and selects as the new log the one contained in the message with the
largest v'..."
pub fn handle_do_view_change(&self, header: &DoViewChangeHeader) ->
Vec<VsrAction> {
+ assert_eq!(
+ header.namespace, self.namespace,
+ "DVC routed to wrong group"
+ );
let from_replica = header.replica;
let msg_view = header.view;
let msg_log_view = header.log_view;
@@ -880,7 +922,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
}
// Send our own SVC
- actions.push(VsrAction::SendStartViewChange { view: msg_view });
+ actions.push(VsrAction::SendStartViewChange {
+ view: msg_view,
+ namespace: self.namespace,
+ });
}
// Only the primary candidate processes DVCs for quorum
@@ -939,6 +984,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
/// in the log, set their view-number to the view number in the message,
change
/// their status to normal, and send PrepareOK for any uncommitted ops."
pub fn handle_start_view(&self, header: &StartViewHeader) ->
Vec<VsrAction> {
+ assert_eq!(header.namespace, self.namespace, "SV routed to wrong
group");
let from_replica = header.replica;
let msg_view = header.view;
let msg_op = header.op;
@@ -966,6 +1012,9 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
self.advance_commit_number(msg_commit);
self.reset_view_change_state();
+ // Stale pipeline entries from the old view must be discarded
+ self.pipeline.borrow_mut().clear();
+
// Update our op to match the new primary's log
self.sequencer.set_sequence(msg_op);
@@ -984,7 +1033,8 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
actions.push(VsrAction::SendPrepareOk {
view: msg_view,
op: op_num,
- target: from_replica, // Send to new primary
+ target: from_replica,
+ namespace: self.namespace,
});
}
@@ -1006,6 +1056,11 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
self.log_view.set(self.view.get());
self.status.set(Status::Normal);
self.advance_commit_number(max_commit);
+ self.sequencer.set_sequence(new_op);
+
+ // Stale pipeline entries from the old view are invalid in the new
view.
+ // Log reconciliation replays from the journal, not the pipeline.
+ self.pipeline.borrow_mut().clear();
// Update timeouts for normal primary operation
{
@@ -1020,6 +1075,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
view: self.view.get(),
op: new_op,
commit: max_commit,
+ namespace: self.namespace,
}]
}
@@ -1107,7 +1163,7 @@ where
command: Command2::Prepare,
replica: consensus.replica,
client: old.client,
- parent: 0, // TODO: Get parent checksum from the previous
entry in the journal (figure out how to pass that ctx here)
+ parent: consensus.last_prepare_checksum(),
request_checksum: old.request_checksum,
request: old.request,
commit: consensus.commit.get(),
@@ -1184,37 +1240,6 @@ where
pipeline.verify();
}
- fn post_replicate_verify(&self, message:
&Self::Message<Self::ReplicateHeader>) {
- let header = message.header();
-
- // verify the message belongs to our cluster
- assert_eq!(header.cluster, self.cluster, "cluster mismatch");
-
- // verify view is not from the future
- assert!(
- header.view <= self.view.get(),
- "prepare view {} is ahead of replica view {}",
- header.view,
- self.view.get()
- );
-
- // verify op is sequential
- assert_eq!(
- header.op,
- self.sequencer.current_sequence(),
- "op must be sequential: expected {}, got {}",
- self.sequencer.current_sequence(),
- header.op
- );
-
- // verify hash chain
- assert_eq!(
- header.parent,
- self.last_prepare_checksum.get(),
- "parent checksum mismatch"
- );
- }
-
fn is_follower(&self) -> bool {
!self.is_primary()
}
diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs
index 767ee9bf8..0ee87eab6 100644
--- a/core/consensus/src/lib.rs
+++ b/core/consensus/src/lib.rs
@@ -31,9 +31,6 @@ pub trait Pipeline {
fn pop_message(&mut self) -> Option<Self::Entry>;
- /// Extract and remove a message by op number.
- fn extract_by_op(&mut self, op: u64) -> Option<Self::Entry>;
-
fn clear(&mut self);
fn message_by_op(&self, op: u64) -> Option<&Self::Entry>;
@@ -42,6 +39,8 @@ pub trait Pipeline {
fn message_by_op_and_checksum(&self, op: u64, checksum: u128) ->
Option<&Self::Entry>;
+ fn head(&self) -> Option<&Self::Entry>;
+
fn is_full(&self) -> bool;
fn is_empty(&self) -> bool;
@@ -65,9 +64,6 @@ pub trait Consensus: Sized {
fn pipeline_message(&self, message: Self::Message<Self::ReplicateHeader>);
fn verify_pipeline(&self);
- // TODO: Figure out how we can achieve that without exposing such methods
in the Consensus trait.
- fn post_replicate_verify(&self, message:
&Self::Message<Self::ReplicateHeader>);
-
fn is_follower(&self) -> bool;
fn is_normal(&self) -> bool;
fn is_syncing(&self) -> bool;
@@ -97,6 +93,8 @@ where
mod impls;
pub use impls::*;
+mod namespaced_pipeline;
+pub use namespaced_pipeline::*;
mod plane_helpers;
pub use plane_helpers::*;
diff --git a/core/consensus/src/namespaced_pipeline.rs
b/core/consensus/src/namespaced_pipeline.rs
new file mode 100644
index 000000000..05d47d80f
--- /dev/null
+++ b/core/consensus/src/namespaced_pipeline.rs
@@ -0,0 +1,540 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::Pipeline;
+use crate::impls::{PIPELINE_PREPARE_QUEUE_MAX, PipelineEntry};
+use iggy_common::header::PrepareHeader;
+use iggy_common::message::Message;
+use std::collections::{HashMap, VecDeque};
+
+/// Pipeline that partitions entries by namespace for independent commit
draining.
+///
+/// A single global op sequence and hash chain spans all namespaces, but
entries
+/// are stored in per-namespace VecDeques. Each namespace tracks its own commit
+/// frontier so `drain_committable_all` drains quorum'd entries per-namespace
+/// without waiting for the global commit to advance past unrelated namespaces.
+///
+/// The global commit (on `VsrConsensus`) remains a conservative lower bound
+/// for the VSR protocol (view change, follower commit piggybacking). It only
+/// advances when all ops up to that point are drained. Per-namespace draining
+/// can run ahead of the global commit.
+///
+/// An alternative (simpler) approach would drain purely by per-entry quorum
+/// flag without tracking per-namespace commit numbers, relying solely on
+/// `global_commit_frontier` for the protocol commit. We track per-namespace
+/// commits explicitly for observability and to make the independence model
+/// visible in the data structure.
+#[derive(Debug)]
+pub struct NamespacedPipeline {
+ queues: HashMap<u64, VecDeque<PipelineEntry>>,
+ /// Per-namespace commit frontier: highest drained op per namespace.
+ pub(crate) ns_commits: HashMap<u64, u64>,
+ pub(crate) total_count: usize,
+ last_push_checksum: u128,
+ last_push_op: u64,
+ /// Lower bound of ops pushed to this pipeline instance.
+ /// Used by `global_commit_frontier` to distinguish "never pushed" from
"drained."
+ first_push_op: u64,
+}
+
+impl Default for NamespacedPipeline {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl NamespacedPipeline {
+ pub fn new() -> Self {
+ Self {
+ queues: HashMap::new(),
+ ns_commits: HashMap::new(),
+ total_count: 0,
+ last_push_checksum: 0,
+ last_push_op: 0,
+ first_push_op: 0,
+ }
+ }
+
+ pub fn register_namespace(&mut self, ns: u64) {
+ self.queues.entry(ns).or_default();
+ self.ns_commits.entry(ns).or_insert(0);
+ }
+
+ /// Per-namespace commit frontier for the given namespace.
+ pub fn ns_commit(&self, ns: u64) -> Option<u64> {
+ self.ns_commits.get(&ns).copied()
+ }
+
+ /// Drain entries that have achieved quorum, independently per namespace.
+ ///
+ /// For each namespace queue, drains from the front while entries have
+ /// `ok_quorum_received == true`. Returns entries sorted by global op
+ /// for deterministic processing.
+ pub fn drain_committable_all(&mut self) -> Vec<PipelineEntry> {
+ let mut drained = Vec::new();
+
+ let Self {
+ queues,
+ ns_commits,
+ total_count,
+ ..
+ } = self;
+
+ for (ns, queue) in queues.iter_mut() {
+ while let Some(front) = queue.front() {
+ if !front.ok_quorum_received {
+ break;
+ }
+ let entry = queue.pop_front().expect("front exists");
+ *total_count -= 1;
+ if let Some(ns_commit) = ns_commits.get_mut(ns) {
+ *ns_commit = entry.header.op;
+ }
+ drained.push(entry);
+ }
+ }
+
+ drained.sort_by_key(|entry| entry.header.op);
+ drained
+ }
+
+ /// Compute the global commit frontier after draining.
+ ///
+ /// Walks forward from `current_commit + 1`, treating any op not found
+ /// in the pipeline (already drained) as committed. Stops at the first
+ /// op still present in a queue or past `last_push_op`.
+ pub fn global_commit_frontier(&self, current_commit: u64) -> u64 {
+ let mut commit = current_commit;
+ loop {
+ let next = commit + 1;
+ if next > self.last_push_op {
+ break;
+ }
+ // Ops below first_push_op were never in this pipeline instance
+ // and must not be mistaken for drained entries.
+ if next < self.first_push_op {
+ break;
+ }
+ // Still in a queue means not yet drained
+ if self.message_by_op(next).is_some() {
+ break;
+ }
+ commit = next;
+ }
+ commit
+ }
+}
+
+impl Pipeline for NamespacedPipeline {
+ type Message = Message<PrepareHeader>;
+ type Entry = PipelineEntry;
+
+ fn push_message(&mut self, message: Self::Message) {
+ assert!(
+ self.total_count < PIPELINE_PREPARE_QUEUE_MAX,
+ "namespaced pipeline full"
+ );
+
+ let header = *message.header();
+ let ns = header.namespace;
+
+ if self.total_count > 0 {
+ assert_eq!(
+ header.op,
+ self.last_push_op + 1,
+ "global ops must be sequential: expected {}, got {}",
+ self.last_push_op + 1,
+ header.op
+ );
+ assert_eq!(
+ header.parent, self.last_push_checksum,
+ "parent must chain to previous global checksum"
+ );
+ } else {
+ self.first_push_op = header.op;
+ }
+
+ let queue = self
+ .queues
+ .get_mut(&ns)
+ .expect("push_message: namespace not registered");
+ if let Some(tail) = queue.back() {
+ assert!(
+ header.op > tail.header.op,
+ "op must increase within namespace queue"
+ );
+ }
+
+ queue.push_back(PipelineEntry::new(header));
+ self.total_count += 1;
+ self.last_push_checksum = header.checksum;
+ self.last_push_op = header.op;
+ }
+
+ fn pop_message(&mut self) -> Option<Self::Entry> {
+ let min_ns = self
+ .queues
+ .iter()
+ .filter_map(|(ns, q)| q.front().map(|entry| (*ns,
entry.header.op)))
+ .min_by_key(|(_, op)| *op)
+ .map(|(ns, _)| ns)?;
+
+ let entry = self.queues.get_mut(&min_ns)?.pop_front()?;
+ self.total_count -= 1;
+ Some(entry)
+ }
+
+ fn clear(&mut self) {
+ for queue in self.queues.values_mut() {
+ queue.clear();
+ }
+ self.total_count = 0;
+ self.last_push_checksum = 0;
+ self.last_push_op = 0;
+ self.first_push_op = 0;
+ }
+
+ /// Linear scan all queues. Ops are globally unique; max 8 entries total.
+ fn message_by_op(&self, op: u64) -> Option<&Self::Entry> {
+ for queue in self.queues.values() {
+ for entry in queue {
+ if entry.header.op == op {
+ return Some(entry);
+ }
+ }
+ }
+ None
+ }
+
+ fn message_by_op_mut(&mut self, op: u64) -> Option<&mut Self::Entry> {
+ for queue in self.queues.values_mut() {
+ for entry in queue.iter_mut() {
+ if entry.header.op == op {
+ return Some(entry);
+ }
+ }
+ }
+ None
+ }
+
+ fn message_by_op_and_checksum(&self, op: u64, checksum: u128) ->
Option<&Self::Entry> {
+ let entry = self.message_by_op(op)?;
+ if entry.header.checksum == checksum {
+ Some(entry)
+ } else {
+ None
+ }
+ }
+
+ fn head(&self) -> Option<&Self::Entry> {
+ self.queues
+ .values()
+ .filter_map(|q| q.front())
+ .min_by_key(|entry| entry.header.op)
+ }
+
+ fn is_full(&self) -> bool {
+ self.total_count >= PIPELINE_PREPARE_QUEUE_MAX
+ }
+
+ fn is_empty(&self) -> bool {
+ self.total_count == 0
+ }
+
+ fn verify(&self) {
+ assert!(self.total_count <= PIPELINE_PREPARE_QUEUE_MAX);
+
+ let actual_count: usize = self.queues.values().map(|q| q.len()).sum();
+ assert_eq!(actual_count, self.total_count, "total_count mismatch");
+
+ // Per-namespace: ops must be monotonically increasing
+ for queue in self.queues.values() {
+ let mut prev_op = None;
+ for entry in queue {
+ if let Some(prev) = prev_op {
+ assert!(
+ entry.header.op > prev,
+ "ops must increase within namespace queue"
+ );
+ }
+ prev_op = Some(entry.header.op);
+ }
+ }
+
+ // Global: collect all entries, sort by op, verify sequential ops and
hash chain
+ let mut all_entries: Vec<&PipelineEntry> =
+ self.queues.values().flat_map(|q| q.iter()).collect();
+ all_entries.sort_by_key(|e| e.header.op);
+
+ for window in all_entries.windows(2) {
+ let prev = &window[0].header;
+ let curr = &window[1].header;
+ assert_eq!(
+ curr.op,
+ prev.op + 1,
+ "global ops must be sequential: {} -> {}",
+ prev.op,
+ curr.op
+ );
+ assert_eq!(
+ curr.parent, prev.checksum,
+ "global hash chain broken at op {}: parent={} expected={}",
+ curr.op, curr.parent, prev.checksum
+ );
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use iggy_common::header::Command2;
+
+ fn make_prepare(
+ op: u64,
+ parent: u128,
+ checksum: u128,
+ namespace: u64,
+ ) -> Message<PrepareHeader> {
+
Message::<PrepareHeader>::new(std::mem::size_of::<PrepareHeader>()).transmute_header(
+ |_, new| {
+ *new = PrepareHeader {
+ command: Command2::Prepare,
+ op,
+ parent,
+ checksum,
+ namespace,
+ ..Default::default()
+ };
+ },
+ )
+ }
+
+ fn mark_quorum(pipeline: &mut NamespacedPipeline, op: u64) {
+ pipeline
+ .message_by_op_mut(op)
+ .expect("mark_quorum: op not in pipeline")
+ .ok_quorum_received = true;
+ }
+
+ #[test]
+ fn multi_namespace_push_pop() {
+ let mut pipeline = NamespacedPipeline::new();
+ pipeline.register_namespace(100);
+ pipeline.register_namespace(200);
+
+ pipeline.push_message(make_prepare(1, 0, 10, 100));
+ pipeline.push_message(make_prepare(2, 10, 20, 200));
+ pipeline.push_message(make_prepare(3, 20, 30, 100));
+ pipeline.push_message(make_prepare(4, 30, 40, 200));
+
+ assert_eq!(pipeline.total_count, 4);
+ assert!(!pipeline.is_empty());
+
+ // head is the entry with the smallest op
+ assert_eq!(pipeline.head().unwrap().header.op, 1);
+
+ // pop returns entries in global op order
+ assert_eq!(pipeline.pop_message().unwrap().header.op, 1);
+ assert_eq!(pipeline.pop_message().unwrap().header.op, 2);
+ assert_eq!(pipeline.pop_message().unwrap().header.op, 3);
+ assert_eq!(pipeline.pop_message().unwrap().header.op, 4);
+ assert!(pipeline.is_empty());
+ }
+
+ #[test]
+ fn drain_committable_all() {
+ let mut pipeline = NamespacedPipeline::new();
+ pipeline.register_namespace(100);
+ pipeline.register_namespace(200);
+
+ // Interleaved ops across two namespaces: [ns_a:1, ns_b:2, ns_a:3,
ns_b:4]
+ pipeline.push_message(make_prepare(1, 0, 10, 100));
+ pipeline.push_message(make_prepare(2, 10, 20, 200));
+ pipeline.push_message(make_prepare(3, 20, 30, 100));
+ pipeline.push_message(make_prepare(4, 30, 40, 200));
+
+ // Mark ops 1,2,3 as quorum'd (not 4)
+ mark_quorum(&mut pipeline, 1);
+ mark_quorum(&mut pipeline, 2);
+ mark_quorum(&mut pipeline, 3);
+
+ // ns_100 drains [1,3], ns_200 drains [2] (stops at non-quorum'd 4)
+ let drained = pipeline.drain_committable_all();
+ let drained_ops: Vec<_> = drained.iter().map(|e|
e.header.op).collect();
+ assert_eq!(drained_ops, vec![1, 2, 3]);
+
+ assert_eq!(pipeline.total_count, 1);
+ assert_eq!(pipeline.head().unwrap().header.op, 4);
+
+ // Per-namespace commits track highest drained op
+ assert_eq!(pipeline.ns_commit(100), Some(3));
+ assert_eq!(pipeline.ns_commit(200), Some(2));
+
+ // Global commit advances past contiguously drained ops
+ assert_eq!(pipeline.global_commit_frontier(0), 3);
+ }
+
+ #[test]
+ fn drain_committable_all_full() {
+ let mut pipeline = NamespacedPipeline::new();
+ pipeline.register_namespace(100);
+ pipeline.register_namespace(200);
+
+ pipeline.push_message(make_prepare(1, 0, 10, 100));
+ pipeline.push_message(make_prepare(2, 10, 20, 200));
+ pipeline.push_message(make_prepare(3, 20, 30, 100));
+ pipeline.push_message(make_prepare(4, 30, 40, 200));
+
+ mark_quorum(&mut pipeline, 1);
+ mark_quorum(&mut pipeline, 2);
+ mark_quorum(&mut pipeline, 3);
+ mark_quorum(&mut pipeline, 4);
+
+ let drained = pipeline.drain_committable_all();
+ assert_eq!(drained.len(), 4);
+ assert!(pipeline.is_empty());
+ assert_eq!(pipeline.global_commit_frontier(0), 4);
+ }
+
+ #[test]
+ fn independent_namespace_progress() {
+ let mut pipeline = NamespacedPipeline::new();
+ pipeline.register_namespace(100);
+ pipeline.register_namespace(200);
+
+ // [ns_a:1, ns_b:2, ns_a:3, ns_b:4]
+ pipeline.push_message(make_prepare(1, 0, 10, 100));
+ pipeline.push_message(make_prepare(2, 10, 20, 200));
+ pipeline.push_message(make_prepare(3, 20, 30, 100));
+ pipeline.push_message(make_prepare(4, 30, 40, 200));
+
+ // Only ns_a (ops 1,3) gets quorum, ns_b (ops 2,4) does not
+ mark_quorum(&mut pipeline, 1);
+ mark_quorum(&mut pipeline, 3);
+
+ let drained = pipeline.drain_committable_all();
+ let drained_ops: Vec<_> = drained.iter().map(|e|
e.header.op).collect();
+ assert_eq!(drained_ops, vec![1, 3]);
+
+ // ns_a progressed independently, ns_b untouched
+ assert_eq!(pipeline.ns_commit(100), Some(3));
+ assert_eq!(pipeline.ns_commit(200), Some(0));
+ assert_eq!(pipeline.total_count, 2);
+
+ // Global commit only advances to 1 (can't skip ns_b's op 2)
+ assert_eq!(pipeline.global_commit_frontier(0), 1);
+
+ // Now ns_b gets quorum
+ mark_quorum(&mut pipeline, 2);
+ mark_quorum(&mut pipeline, 4);
+
+ let drained = pipeline.drain_committable_all();
+ let drained_ops: Vec<_> = drained.iter().map(|e|
e.header.op).collect();
+ assert_eq!(drained_ops, vec![2, 4]);
+
+ // Global commit jumps to 4 (ops 1,3 already drained, 2,4 just drained)
+ assert_eq!(pipeline.global_commit_frontier(1), 4);
+ assert_eq!(pipeline.ns_commit(200), Some(4));
+ }
+
+ #[test]
+ fn message_by_op_cross_namespace() {
+ let mut pipeline = NamespacedPipeline::new();
+ pipeline.register_namespace(100);
+ pipeline.register_namespace(200);
+ pipeline.register_namespace(300);
+
+ pipeline.push_message(make_prepare(1, 0, 10, 100));
+ pipeline.push_message(make_prepare(2, 10, 20, 200));
+ pipeline.push_message(make_prepare(3, 20, 30, 300));
+
+ assert_eq!(pipeline.message_by_op(1).unwrap().header.namespace, 100);
+ assert_eq!(pipeline.message_by_op(2).unwrap().header.namespace, 200);
+ assert_eq!(pipeline.message_by_op(3).unwrap().header.namespace, 300);
+ assert!(pipeline.message_by_op(4).is_none());
+ }
+
+ #[test]
+ fn message_by_op_and_checksum() {
+ let mut pipeline = NamespacedPipeline::new();
+ pipeline.register_namespace(100);
+ pipeline.push_message(make_prepare(1, 0, 10, 100));
+
+ assert!(pipeline.message_by_op_and_checksum(1, 10).is_some());
+ assert!(pipeline.message_by_op_and_checksum(1, 99).is_none());
+ assert!(pipeline.message_by_op_and_checksum(2, 10).is_none());
+ }
+
+ #[test]
+ fn verify_passes() {
+ let mut pipeline = NamespacedPipeline::new();
+ pipeline.register_namespace(100);
+ pipeline.register_namespace(200);
+ pipeline.push_message(make_prepare(1, 0, 10, 100));
+ pipeline.push_message(make_prepare(2, 10, 20, 200));
+ pipeline.push_message(make_prepare(3, 20, 30, 100));
+ pipeline.verify();
+ }
+
+ #[test]
+ fn is_full() {
+ let mut pipeline = NamespacedPipeline::new();
+ pipeline.register_namespace(0);
+ pipeline.register_namespace(1);
+ for i in 0..PIPELINE_PREPARE_QUEUE_MAX as u128 {
+ let parent = if i == 0 { 0 } else { i * 10 };
+ let checksum = (i + 1) * 10;
+ pipeline.push_message(make_prepare(i as u64 + 1, parent, checksum,
i as u64 % 2));
+ }
+ assert!(pipeline.is_full());
+ }
+
+ #[test]
+ #[should_panic(expected = "namespaced pipeline full")]
+ fn push_when_full_panics() {
+ let mut pipeline = NamespacedPipeline::new();
+ pipeline.register_namespace(0);
+ for i in 0..PIPELINE_PREPARE_QUEUE_MAX as u128 {
+ let parent = if i == 0 { 0 } else { i * 10 };
+ let checksum = (i + 1) * 10;
+ pipeline.push_message(make_prepare(i as u64 + 1, parent, checksum,
0));
+ }
+ pipeline.push_message(make_prepare(100, 80, 1000, 0));
+ }
+
+ #[test]
+ fn clear_preserves_ns_commits() {
+ let mut pipeline = NamespacedPipeline::new();
+ pipeline.register_namespace(100);
+ pipeline.register_namespace(200);
+ pipeline.push_message(make_prepare(1, 0, 10, 100));
+ pipeline.push_message(make_prepare(2, 10, 20, 200));
+
+ // Mark op 1 as committed in ns 100 before clearing
+ pipeline.ns_commits.insert(100, 1);
+
+ pipeline.clear();
+ assert!(pipeline.is_empty());
+ assert_eq!(pipeline.total_count, 0);
+
+ // ns_commits must survive clear -- they represent durable knowledge
+ // about already-drained ops, not pipeline state
+ assert_eq!(pipeline.ns_commits.get(&100), Some(&1));
+ assert_eq!(pipeline.ns_commits.get(&200), Some(&0));
+ }
+}
diff --git a/core/consensus/src/plane_helpers.rs
b/core/consensus/src/plane_helpers.rs
index 4b3986067..2307accb1 100644
--- a/core/consensus/src/plane_helpers.rs
+++ b/core/consensus/src/plane_helpers.rs
@@ -40,7 +40,6 @@ pub async fn pipeline_prepare_common<C, F>(
consensus.verify_pipeline();
consensus.pipeline_message(prepare.clone());
on_replicate(prepare.clone()).await;
- consensus.post_replicate_verify(&prepare);
}
/// Shared commit-based old-prepare fence.
@@ -77,6 +76,7 @@ pub async fn replicate_to_next_in_chain<B, P>(
assert_ne!(next, consensus.replica());
+ // TODO: Propagate send error instead of panicking; requires bus error
design.
consensus
.message_bus()
.send_to_replica(next, message.into_generic())
@@ -135,7 +135,11 @@ where
Ok(())
}
-/// Shared quorum + extraction flow for ack handling.
+/// Shared quorum tracking flow for ack handling.
+///
+/// After recording the ack, walks forward from `current_commit + 1` advancing
+/// the commit number only while consecutive ops have achieved quorum. This
+/// prevents committing ops that have gaps in quorum acknowledgment.
pub fn ack_quorum_reached<B, P>(consensus: &VsrConsensus<B, P>, ack:
&PrepareOkHeader) -> bool
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
@@ -145,8 +149,49 @@ where
return false;
}
- consensus.advance_commit_number(ack.op);
- true
+ let pipeline = consensus.pipeline().borrow();
+ let mut new_commit = consensus.commit();
+ while let Some(entry) = pipeline.message_by_op(new_commit + 1) {
+ if !entry.ok_quorum_received {
+ break;
+ }
+ new_commit += 1;
+ }
+ drop(pipeline);
+
+ if new_commit > consensus.commit() {
+ consensus.advance_commit_number(new_commit);
+ return true;
+ }
+
+ false
+}
+
+/// Drain and return committable prepares from the pipeline head.
+///
+/// Entries are drained only from the head and only while their op is covered
+/// by the current commit frontier.
+pub fn drain_committable_prefix<B, P>(consensus: &VsrConsensus<B, P>) ->
Vec<PipelineEntry>
+where
+ B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+ P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
+{
+ let commit = consensus.commit();
+ let mut drained = Vec::new();
+ let mut pipeline = consensus.pipeline().borrow_mut();
+
+ while let Some(head_op) = pipeline.head().map(|entry| entry.header.op) {
+ if head_op > commit {
+ break;
+ }
+
+ let entry = pipeline
+ .pop_message()
+ .expect("drain_committable_prefix: head exists");
+ drained.push(entry);
+ }
+
+ drained
}
/// Shared reply-message construction for committed prepare.
@@ -176,6 +221,7 @@ where
timestamp: prepare_header.timestamp,
request: prepare_header.request,
operation: prepare_header.operation,
+ namespace: prepare_header.namespace,
..Default::default()
};
})
@@ -253,6 +299,7 @@ pub async fn send_prepare_ok<B, P>(
let generic_message = message.into_generic();
let primary = consensus.primary_index(consensus.view());
+ // TODO: Propagate send errors instead of panicking; requires bus error
design.
if primary == consensus.replica() {
// TODO: Queue for self-processing or call handle_prepare_ok directly.
// TODO: This is temporal, to test simulator, but we should send
message to ourselves properly.
@@ -269,3 +316,107 @@ pub async fn send_prepare_ok<B, P>(
.unwrap();
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::{Consensus, LocalPipeline};
+ use iggy_common::IggyError;
+
+ #[derive(Debug, Default)]
+ struct NoopBus;
+
+ impl MessageBus for NoopBus {
+ type Client = u128;
+ type Replica = u8;
+ type Data = Message<GenericHeader>;
+ type Sender = ();
+
+ fn add_client(&mut self, _client: Self::Client, _sender: Self::Sender)
-> bool {
+ true
+ }
+
+ fn remove_client(&mut self, _client: Self::Client) -> bool {
+ true
+ }
+
+ fn add_replica(&mut self, _replica: Self::Replica) -> bool {
+ true
+ }
+
+ fn remove_replica(&mut self, _replica: Self::Replica) -> bool {
+ true
+ }
+
+ async fn send_to_client(
+ &self,
+ _client_id: Self::Client,
+ _data: Self::Data,
+ ) -> Result<(), IggyError> {
+ Ok(())
+ }
+
+ async fn send_to_replica(
+ &self,
+ _replica: Self::Replica,
+ _data: Self::Data,
+ ) -> Result<(), IggyError> {
+ Ok(())
+ }
+ }
+
+ fn prepare_message(op: u64, parent: u128, checksum: u128) ->
Message<PrepareHeader> {
+
Message::<PrepareHeader>::new(std::mem::size_of::<PrepareHeader>()).transmute_header(
+ |_, new| {
+ *new = PrepareHeader {
+ command: Command2::Prepare,
+ op,
+ parent,
+ checksum,
+ ..Default::default()
+ };
+ },
+ )
+ }
+
+ #[test]
+ fn drains_head_prefix_by_commit_frontier() {
+ let consensus = VsrConsensus::new(1, 0, 3, 0, NoopBus,
LocalPipeline::new());
+ consensus.init();
+
+ consensus.pipeline_message(prepare_message(1, 0, 10));
+ consensus.pipeline_message(prepare_message(2, 10, 20));
+ consensus.pipeline_message(prepare_message(3, 20, 30));
+
+ consensus.advance_commit_number(3);
+
+ let drained = drain_committable_prefix(&consensus);
+ let drained_ops: Vec<_> = drained.into_iter().map(|entry|
entry.header.op).collect();
+ assert_eq!(drained_ops, vec![1, 2, 3]);
+ assert!(consensus.pipeline().borrow().is_empty());
+ }
+
+ #[test]
+ fn drains_only_up_to_commit_frontier_even_without_quorum_flags() {
+ let consensus = VsrConsensus::new(1, 0, 3, 0, NoopBus,
LocalPipeline::new());
+ consensus.init();
+
+ consensus.pipeline_message(prepare_message(5, 0, 50));
+ consensus.pipeline_message(prepare_message(6, 50, 60));
+ consensus.pipeline_message(prepare_message(7, 60, 70));
+
+ consensus.advance_commit_number(6);
+ let drained = drain_committable_prefix(&consensus);
+ let drained_ops: Vec<_> = drained.into_iter().map(|entry|
entry.header.op).collect();
+
+ assert_eq!(drained_ops, vec![5, 6]);
+ assert_eq!(
+ consensus
+ .pipeline()
+ .borrow()
+ .head()
+ .map(|entry| entry.header.op),
+ Some(7)
+ );
+ }
+}
diff --git a/core/consensus/src/vsr_timeout.rs
b/core/consensus/src/vsr_timeout.rs
index de08a44dd..f301ec7d2 100644
--- a/core/consensus/src/vsr_timeout.rs
+++ b/core/consensus/src/vsr_timeout.rs
@@ -166,6 +166,7 @@ impl TimeoutManager {
self.start_view_change_message.tick();
self.do_view_change_message.tick();
self.request_start_view_message.tick();
+ self.view_change_status.tick();
}
pub fn fired(&self, kind: TimeoutKind) -> bool {
diff --git a/core/metadata/src/impls/metadata.rs
b/core/metadata/src/impls/metadata.rs
index 4ff74f4e2..35386b182 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -18,7 +18,7 @@ use crate::stm::StateMachine;
use crate::stm::snapshot::{FillSnapshot, MetadataSnapshot, Snapshot,
SnapshotError};
use consensus::{
Consensus, Pipeline, PipelineEntry, Plane, Project, Sequencer,
VsrConsensus, ack_preflight,
- ack_quorum_reached, build_reply_message, fence_old_prepare_by_commit,
+ ack_quorum_reached, build_reply_message, drain_committable_prefix,
fence_old_prepare_by_commit,
panic_if_hash_chain_would_break_in_same_view, pipeline_prepare_common,
replicate_preflight,
replicate_to_next_in_chain, send_prepare_ok as send_prepare_ok_common,
};
@@ -156,6 +156,7 @@ where
assert_eq!(header.op, current_op + 1);
consensus.sequencer().set_sequence(header.op);
+ consensus.set_last_prepare_checksum(header.checksum);
// Append to journal.
journal.handle().append(message.clone()).await;
@@ -194,47 +195,51 @@ where
debug!("on_ack: quorum received for op={}", header.op);
- // Extract the header from the pipeline, fetch the full message
from journal
- // TODO: Commit from the head. ALWAYS
- let entry =
consensus.pipeline().borrow_mut().extract_by_op(header.op);
- let Some(entry) = entry else {
- warn!("on_ack: prepare not found in pipeline for op={}",
header.op);
- return;
- };
-
- let prepare_header = entry.header;
- // TODO(hubcio): should we replace this with graceful fallback
(warn + return)?
- // When journal compaction is implemented compaction could race
- // with this lookup if it removes entries below the commit number.
- let prepare = journal
- .handle()
- .entry(&prepare_header)
- .await
- .unwrap_or_else(|| {
- panic!(
- "on_ack: committed prepare op={} checksum={} must be
in journal",
- prepare_header.op, prepare_header.checksum
- )
- });
-
- // Apply the state (consumes prepare)
- // TODO: Handle appending result to response
- let _result = self.mux_stm.update(prepare);
- debug!("on_ack: state applied for op={}", prepare_header.op);
-
- // Send reply to client
- let generic_reply = build_reply_message(consensus,
&prepare_header).into_generic();
- debug!(
- "on_ack: sending reply to client={} for op={}",
- prepare_header.client, prepare_header.op
- );
-
- // TODO: Error handling
- consensus
- .message_bus()
- .send_to_client(prepare_header.client, generic_reply)
- .await
- .unwrap()
+ let drained = drain_committable_prefix(consensus);
+ if let (Some(first), Some(last)) = (drained.first(),
drained.last()) {
+ debug!(
+ "on_ack: draining committed prefix ops=[{}..={}] count={}",
+ first.header.op,
+ last.header.op,
+ drained.len()
+ );
+ }
+
+ for entry in drained {
+ let prepare_header = entry.header;
+ // TODO(hubcio): should we replace this with graceful fallback
(warn + return)?
+ // When journal compaction is implemented compaction could race
+ // with this lookup if it removes entries below the commit
number.
+ let prepare = journal
+ .handle()
+ .entry(&prepare_header)
+ .await
+ .unwrap_or_else(|| {
+ panic!(
+ "on_ack: committed prepare op={} checksum={} must
be in journal",
+ prepare_header.op, prepare_header.checksum
+ )
+ });
+
+ // Apply the state (consumes prepare)
+ // TODO: Handle appending result to response
+ let _result = self.mux_stm.update(prepare);
+ debug!("on_ack: state applied for op={}", prepare_header.op);
+
+ // Send reply to client
+ let generic_reply = build_reply_message(consensus,
&prepare_header).into_generic();
+ debug!(
+ "on_ack: sending reply to client={} for op={}",
+ prepare_header.client, prepare_header.op
+ );
+
+ // TODO: Propagate send error instead of panicking; requires
bus error design.
+ consensus
+ .message_bus()
+ .send_to_client(prepare_header.client, generic_reply)
+ .await
+ .unwrap()
+ }
}
}
}
diff --git a/core/partitions/src/iggy_partitions.rs
b/core/partitions/src/iggy_partitions.rs
index 16ed124ab..2c646072d 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -21,9 +21,10 @@ use crate::IggyPartition;
use crate::Partition;
use crate::types::PartitionsConfig;
use consensus::{
- Consensus, PipelineEntry, Plane, Project, Sequencer, VsrConsensus,
ack_preflight,
- ack_quorum_reached, build_reply_message, fence_old_prepare_by_commit,
pipeline_prepare_common,
- replicate_preflight, replicate_to_next_in_chain, send_prepare_ok as
send_prepare_ok_common,
+ Consensus, NamespacedPipeline, Pipeline, PipelineEntry, Plane, Project,
Sequencer,
+ VsrConsensus, ack_preflight, build_reply_message,
fence_old_prepare_by_commit,
+ pipeline_prepare_common, replicate_preflight, replicate_to_next_in_chain,
+ send_prepare_ok as send_prepare_ok_common,
};
use iggy_common::{
INDEX_SIZE, IggyByteSize, IggyIndexesMut, IggyMessagesBatchMut,
PartitionStats, PooledBuffer,
@@ -34,7 +35,7 @@ use iggy_common::{
};
use message_bus::MessageBus;
use std::cell::UnsafeCell;
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::sync::atomic::Ordering;
use tracing::{debug, warn};
@@ -58,33 +59,27 @@ pub struct IggyPartitions<C> {
/// mutate partition state (segments, offsets, journal).
partitions: UnsafeCell<Vec<IggyPartition>>,
namespace_to_local: HashMap<IggyNamespace, LocalIdx>,
- /// Some on shard0, None on other shards
- pub consensus: Option<C>,
+ consensus: Option<C>,
}
impl<C> IggyPartitions<C> {
- pub fn new(shard_id: ShardId, config: PartitionsConfig, consensus:
Option<C>) -> Self {
+ pub fn new(shard_id: ShardId, config: PartitionsConfig) -> Self {
Self {
shard_id,
config,
partitions: UnsafeCell::new(Vec::new()),
namespace_to_local: HashMap::new(),
- consensus,
+ consensus: None,
}
}
- pub fn with_capacity(
- shard_id: ShardId,
- config: PartitionsConfig,
- consensus: Option<C>,
- capacity: usize,
- ) -> Self {
+ pub fn with_capacity(shard_id: ShardId, config: PartitionsConfig,
capacity: usize) -> Self {
Self {
shard_id,
config,
partitions: UnsafeCell::new(Vec::with_capacity(capacity)),
namespace_to_local: HashMap::with_capacity(capacity),
- consensus,
+ consensus: None,
}
}
@@ -231,16 +226,23 @@ impl<C> IggyPartitions<C> {
&mut self.partitions_mut()[idx]
}
+ pub fn consensus(&self) -> Option<&C> {
+ self.consensus.as_ref()
+ }
+
+ pub fn set_consensus(&mut self, consensus: C) {
+ self.consensus = Some(consensus);
+ }
+
/// Initialize a new partition with in-memory storage (for
testing/simulation).
///
- /// This is a simplified version that doesn't create file-backed storage.
- /// Use `init_partition()` for production use with real files.
+ /// Idempotent: subsequent calls for the same namespace are no-ops
returning
+ /// the existing index. Consensus must be set separately via
`set_consensus`.
///
/// TODO: Make the log generic over its storage backend to support both
/// in-memory (for testing) and file-backed (for production) storage
without
/// needing separate initialization methods.
pub fn init_partition_in_memory(&mut self, namespace: IggyNamespace) ->
LocalIdx {
- // Check if already initialized
if let Some(idx) = self.local_idx(&namespace) {
return idx;
}
@@ -261,7 +263,6 @@ impl<C> IggyPartitions<C> {
partition.should_increment_offset = false;
partition.stats.increment_segments_count(1);
- // Insert and return local index
self.insert(namespace, partition)
}
@@ -275,9 +276,9 @@ impl<C> IggyPartitions<C> {
/// 2. Control plane: broadcast to shards (SKIPPED in this method)
/// 3. Data plane: INITIATE PARTITION (THIS METHOD)
///
- /// Idempotent - returns existing LocalIdx if partition already exists.
+ /// Idempotent: subsequent calls for the same namespace are no-ops.
+ /// Consensus must be set separately via `set_consensus`.
pub async fn init_partition(&mut self, namespace: IggyNamespace) ->
LocalIdx {
- // Check if already initialized
if let Some(idx) = self.local_idx(&namespace) {
return idx;
}
@@ -324,27 +325,38 @@ impl<C> IggyPartitions<C> {
partition.should_increment_offset = false;
partition.stats.increment_segments_count(1);
- // Insert and return local index
self.insert(namespace, partition)
}
}
-impl<B> Plane<VsrConsensus<B>> for IggyPartitions<VsrConsensus<B>>
+impl<B> Plane<VsrConsensus<B, NamespacedPipeline>>
+ for IggyPartitions<VsrConsensus<B, NamespacedPipeline>>
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
{
- async fn on_request(&self, message: <VsrConsensus<B> as
Consensus>::Message<RequestHeader>) {
- let consensus = self.consensus.as_ref().unwrap();
+ async fn on_request(
+ &self,
+ message: <VsrConsensus<B, NamespacedPipeline> as
Consensus>::Message<RequestHeader>,
+ ) {
+ let namespace = IggyNamespace::from_raw(message.header().namespace);
+ let consensus = self
+ .consensus()
+ .expect("on_request: consensus not initialized");
- debug!("handling partition request");
+ debug!(?namespace, "handling partition request");
let prepare = message.project(consensus);
pipeline_prepare_common(consensus, prepare, |prepare|
self.on_replicate(prepare)).await;
}
- async fn on_replicate(&self, message: <VsrConsensus<B> as
Consensus>::Message<PrepareHeader>) {
- let consensus = self.consensus.as_ref().unwrap();
-
+ async fn on_replicate(
+ &self,
+ message: <VsrConsensus<B, NamespacedPipeline> as
Consensus>::Message<PrepareHeader>,
+ ) {
let header = message.header();
+ let namespace = IggyNamespace::from_raw(header.namespace);
+ let consensus = self
+ .consensus()
+ .expect("on_replicate: consensus not initialized");
let current_op = match replicate_preflight(consensus, header) {
Ok(current_op) => current_op,
@@ -367,25 +379,26 @@ where
// TODO: Make those assertions be toggleable through an feature flag,
so they can be used only by simulator/tests.
debug_assert_eq!(header.op, current_op + 1);
consensus.sequencer().set_sequence(header.op);
+ consensus.set_last_prepare_checksum(header.checksum);
// TODO: Figure out the flow of the partition operations.
// In metadata layer we assume that when an `on_request` or
`on_replicate` is called, it's called from correct shard.
// I think we need to do the same here, which means that the code from
below is unfallable, the partition should always exist by now!
- let namespace = IggyNamespace::from_raw(header.namespace);
- self.apply_replicated_operation(&message, &namespace).await;
+ self.apply_replicated_operation(&namespace, &message).await;
- // After successful journal write, send prepare_ok to primary.
self.send_prepare_ok(header).await;
- // If follower, commit any newly committable entries.
if consensus.is_follower() {
- self.commit_journal();
+ self.commit_journal(&namespace);
}
}
- async fn on_ack(&self, message: <VsrConsensus<B> as
Consensus>::Message<PrepareOkHeader>) {
- let consensus = self.consensus.as_ref().unwrap();
+ async fn on_ack(
+ &self,
+ message: <VsrConsensus<B, NamespacedPipeline> as
Consensus>::Message<PrepareOkHeader>,
+ ) {
let header = message.header();
+ let consensus = self.consensus().expect("on_ack: consensus not
initialized");
if let Err(reason) = ack_preflight(consensus) {
warn!("on_ack: ignoring ({reason})");
@@ -394,42 +407,74 @@ where
{
let pipeline = consensus.pipeline().borrow();
- let Some(entry) =
- pipeline.message_by_op_and_checksum(header.op,
header.prepare_checksum)
- else {
+ if pipeline
+ .message_by_op_and_checksum(header.op, header.prepare_checksum)
+ .is_none()
+ {
debug!("on_ack: prepare not in pipeline op={}", header.op);
return;
- };
-
- if entry.header.checksum != header.prepare_checksum {
- warn!("on_ack: checksum mismatch");
- return;
}
}
- if ack_quorum_reached(consensus, header) {
- debug!("on_ack: quorum received for op={}", header.op);
-
- // Extract the prepare message from the pipeline by op
- // TODO: Commit from the head. ALWAYS
- let entry =
consensus.pipeline().borrow_mut().extract_by_op(header.op);
- let Some(PipelineEntry {
- header: prepare_header,
- ..
- }) = entry
- else {
- warn!("on_ack: prepare not found in pipeline for op={}",
header.op);
- return;
- };
+ consensus.handle_prepare_ok(header);
+
+ // SAFETY(IGGY-66): Per-namespace drain independent of global commit.
+ //
+ // drain_committable_all() drains each namespace queue independently by
+ // quorum flag, so ns_a ops can be drained and replied to clients while
+ // ns_b ops block the global commit (e.g., ns_a ops 1,3 drain while
+ // ns_b op 2 is pending). This is intentional for partition
independence.
+ //
+ // View change risk: if a view change occurs before the global commit
+ // covers a drained op, the new primary replays from max_commit+1 and
+ // re-executes it. append_messages is NOT idempotent -- re-execution
+ // produces duplicate partition data.
+ //
+ // Before this path handles real traffic, two guards are required:
+ // 1. Op-based dedup in apply_replicated_operation: skip append if
+ // the partition journal already contains data for this op.
+ // 2. Client reply dedup by (client_id, request_id): prevent
+ // duplicate replies after view change re-execution.
+ let drained = {
+ let mut pipeline = consensus.pipeline().borrow_mut();
+ pipeline.drain_committable_all()
+ };
+
+ if drained.is_empty() {
+ return;
+ }
+
+ // Advance global commit for VSR protocol correctness
+ {
+ let pipeline = consensus.pipeline().borrow();
+ let new_commit =
pipeline.global_commit_frontier(consensus.commit());
+ drop(pipeline);
+ consensus.advance_commit_number(new_commit);
+ }
+
+ if let (Some(first), Some(last)) = (drained.first(), drained.last()) {
+ debug!(
+ "on_ack: draining committed ops=[{}..={}] count={}",
+ first.header.op,
+ last.header.op,
+ drained.len()
+ );
+ }
- // Data was already appended to the partition journal during
- // on_replicate. Now that quorum is reached, update the partition's
- // current offset and check whether the journal needs flushing.
- let namespace = IggyNamespace::from_raw(prepare_header.namespace);
+ let mut committed_ns: HashSet<IggyNamespace> = HashSet::new();
+
+ for PipelineEntry {
+ header: prepare_header,
+ ..
+ } in drained
+ {
+ let entry_namespace =
IggyNamespace::from_raw(prepare_header.namespace);
match prepare_header.operation {
Operation::SendMessages => {
- self.commit_messages(&namespace).await;
+ if committed_ns.insert(entry_namespace) {
+ self.commit_messages(&entry_namespace).await;
+ }
debug!("on_ack: messages committed for op={}",
prepare_header.op,);
}
Operation::StoreConsumerOffset => {
@@ -447,14 +492,13 @@ where
}
}
- // Send reply to client
let generic_reply = build_reply_message(consensus,
&prepare_header).into_generic();
debug!(
"on_ack: sending reply to client={} for op={}",
prepare_header.client, prepare_header.op
);
- // TODO: Error handling
+ // TODO: Propagate send error instead of panicking; requires bus
error design.
consensus
.message_bus()
.send_to_client(prepare_header.client, generic_reply)
@@ -464,10 +508,18 @@ where
}
}
-impl<B> IggyPartitions<VsrConsensus<B>>
+impl<B> IggyPartitions<VsrConsensus<B, NamespacedPipeline>>
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
{
+ pub fn register_namespace_in_pipeline(&self, ns: u64) {
+ self.consensus()
+ .expect("register_namespace_in_pipeline: consensus not
initialized")
+ .pipeline()
+ .borrow_mut()
+ .register_namespace(ns);
+ }
+
// TODO: Move this elsewhere, also do not reallocate, we do reallocationg
now becauise we use PooledBuffer for the batch body
// but `Bytes` for `Message` payload.
fn batch_from_body(body: &[u8]) -> IggyMessagesBatchMut {
@@ -488,10 +540,12 @@ where
async fn apply_replicated_operation(
&self,
- message: &Message<PrepareHeader>,
namespace: &IggyNamespace,
+ message: &Message<PrepareHeader>,
) {
- let consensus = self.consensus.as_ref().unwrap();
+ let consensus = self
+ .consensus()
+ .expect("apply_replicated_operation: consensus not initialized");
let header = message.header();
match header.operation {
@@ -552,16 +606,16 @@ where
/// Replicate a prepare message to the next replica in the chain.
///
- /// Chain replication pattern:
- /// - Primary sends to first backup
- /// - Each backup forwards to the next
- /// - Stops when we would forward back to primary
+ /// Chain replication: primary -> first backup -> ... -> last backup.
+ /// Stops when the next replica would be the primary.
async fn replicate(&self, message: Message<PrepareHeader>) {
- let consensus = self.consensus.as_ref().unwrap();
+ let consensus = self
+ .consensus()
+ .expect("replicate: consensus not initialized");
replicate_to_next_in_chain(consensus, message).await;
}
- fn commit_journal(&self) {
+ fn commit_journal(&self, _namespace: &IggyNamespace) {
// TODO: Implement commit logic for followers.
// Walk through journal from last committed to current commit number
// Apply each entry to the partition state
@@ -787,7 +841,9 @@ where
}
async fn send_prepare_ok(&self, header: &PrepareHeader) {
- let consensus = self.consensus.as_ref().unwrap();
+ let consensus = self
+ .consensus()
+ .expect("send_prepare_ok: consensus not initialized");
// TODO: Verify the prepare is persisted in the partition journal.
// The partition journal uses MessageLookup headers, so we cannot
// check by PrepareHeader.op directly. For now, skip this check.
diff --git a/core/simulator/src/deps.rs b/core/simulator/src/deps.rs
index 25175ea8e..c4afc1bcf 100644
--- a/core/simulator/src/deps.rs
+++ b/core/simulator/src/deps.rs
@@ -17,7 +17,7 @@
use crate::bus::SharedMemBus;
use bytes::Bytes;
-use consensus::VsrConsensus;
+use consensus::{NamespacedPipeline, VsrConsensus};
use iggy_common::header::PrepareHeader;
use iggy_common::message::Message;
use journal::{Journal, JournalHandle, Storage};
@@ -61,6 +61,7 @@ pub struct SimJournal<S: Storage> {
storage: S,
headers: UnsafeCell<HashMap<u64, PrepareHeader>>,
offsets: UnsafeCell<HashMap<u64, usize>>,
+ write_offset: Cell<usize>,
}
impl<S: Storage + Default> Default for SimJournal<S> {
@@ -69,6 +70,7 @@ impl<S: Storage + Default> Default for SimJournal<S> {
storage: S::default(),
headers: UnsafeCell::new(HashMap::new()),
offsets: UnsafeCell::new(HashMap::new()),
+ write_offset: Cell::new(0),
}
}
}
@@ -79,6 +81,7 @@ impl<S: Storage> std::fmt::Debug for SimJournal<S> {
.field("storage", &"<Storage>")
.field("headers", &"<UnsafeCell>")
.field("offsets", &"<UnsafeCell>")
+ .field("write_offset", &self.write_offset.get())
.finish()
}
}
@@ -121,14 +124,10 @@ impl<S: Storage<Buffer = Vec<u8>>> Journal<S> for
SimJournal<S> {
let bytes_written = self.storage.write(message_bytes.to_vec()).await;
- let current_offset = unsafe { &mut *self.offsets.get() }
- .values()
- .last()
- .cloned()
- .unwrap_or_default();
-
+ let offset = self.write_offset.get();
unsafe { &mut *self.headers.get() }.insert(header.op, header);
- unsafe { &mut *self.offsets.get() }.insert(header.op, current_offset +
bytes_written);
+ unsafe { &mut *self.offsets.get() }.insert(header.op, offset);
+ self.write_offset.set(offset + bytes_written);
}
fn header(&self, idx: usize) -> Option<Self::HeaderRef<'_>> {
@@ -159,4 +158,5 @@ pub type SimMetadata = IggyMetadata<
>;
/// Type alias for simulator partitions
-pub type ReplicaPartitions =
partitions::IggyPartitions<VsrConsensus<SharedMemBus>>;
+pub type ReplicaPartitions =
+ partitions::IggyPartitions<VsrConsensus<SharedMemBus, NamespacedPipeline>>;
diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs
index 44e063f78..332b7fc31 100644
--- a/core/simulator/src/lib.rs
+++ b/core/simulator/src/lib.rs
@@ -22,7 +22,7 @@ pub mod replica;
use bus::MemBus;
use consensus::Plane;
-use iggy_common::header::{GenericHeader, ReplyHeader};
+use iggy_common::header::{GenericHeader, Operation, ReplyHeader};
use iggy_common::message::{Message, MessageBag};
use message_bus::MessageBus;
use replica::Replica;
@@ -34,10 +34,10 @@ pub struct Simulator {
}
impl Simulator {
- /// Initialize a partition on all replicas (in-memory for simulation)
+ /// Initialize a partition with its own consensus group on all replicas.
pub fn init_partition(&mut self, namespace:
iggy_common::sharding::IggyNamespace) {
for replica in &mut self.replicas {
- replica.partitions.init_partition_in_memory(namespace);
+ replica.init_partition(namespace);
}
}
@@ -120,13 +120,16 @@ impl Simulator {
MessageBag::Request(message) => message.header().operation,
MessageBag::Prepare(message) => message.header().operation,
MessageBag::PrepareOk(message) => message.header().operation,
- } as u8;
+ };
- if operation < 200 {
- self.dispatch_to_metadata_on_replica(replica, message).await;
- } else {
- self.dispatch_to_partition_on_replica(replica, message)
- .await;
+ match operation {
+ Operation::SendMessages | Operation::StoreConsumerOffset => {
+ self.dispatch_to_partition_on_replica(replica, message)
+ .await;
+ }
+ _ => {
+ self.dispatch_to_metadata_on_replica(replica, message).await;
+ }
}
}
@@ -158,3 +161,10 @@ impl Simulator {
}
}
}
+
+// TODO(IGGY-66): Add acceptance test for per-partition consensus independence.
+// Setup: 3-replica simulator, two partitions (ns_a, ns_b).
+// 1. Fill ns_a's pipeline to PIPELINE_PREPARE_QUEUE_MAX without delivering
acks.
+// 2. Send a request to ns_b, step until ns_b reply arrives.
+// 3. Assert ns_b committed while ns_a pipeline is still full.
+// Requires namespace-aware stepping (filter bus by namespace) or two-phase
delivery.
diff --git a/core/simulator/src/replica.rs b/core/simulator/src/replica.rs
index ae4631e95..9f8081656 100644
--- a/core/simulator/src/replica.rs
+++ b/core/simulator/src/replica.rs
@@ -19,9 +19,9 @@ use crate::bus::{MemBus, SharedMemBus};
use crate::deps::{
MemStorage, ReplicaPartitions, SimJournal, SimMetadata,
SimMuxStateMachine, SimSnapshot,
};
-use consensus::{LocalPipeline, VsrConsensus};
+use consensus::{LocalPipeline, NamespacedPipeline, VsrConsensus};
use iggy_common::IggyByteSize;
-use iggy_common::sharding::ShardId;
+use iggy_common::sharding::{IggyNamespace, ShardId};
use metadata::stm::consumer_group::{ConsumerGroups, ConsumerGroupsInner};
use metadata::stm::stream::{Streams, StreamsInner};
use metadata::stm::user::{Users, UsersInner};
@@ -29,9 +29,13 @@ use metadata::{IggyMetadata, variadic};
use partitions::PartitionsConfig;
use std::sync::Arc;
+// TODO: Make configurable
+const CLUSTER_ID: u128 = 1;
+
pub struct Replica {
pub id: u8,
pub name: String,
+ pub replica_count: u8,
pub metadata: SimMetadata,
pub partitions: ReplicaPartitions,
pub bus: Arc<MemBus>,
@@ -44,48 +48,43 @@ impl Replica {
let consumer_groups: ConsumerGroups =
ConsumerGroupsInner::new().into();
let mux = SimMuxStateMachine::new(variadic!(users, streams,
consumer_groups));
- let cluster_id: u128 = 1; // TODO: Make configurable
+ // Metadata uses namespace=0 (not partition-scoped)
let metadata_consensus = VsrConsensus::new(
- cluster_id,
+ CLUSTER_ID,
id,
replica_count,
+ 0,
SharedMemBus(Arc::clone(&bus)),
LocalPipeline::new(),
);
metadata_consensus.init();
- // Create separate consensus instance for partitions
- let partitions_consensus = VsrConsensus::new(
- cluster_id,
- id,
- replica_count,
- SharedMemBus(Arc::clone(&bus)),
- LocalPipeline::new(),
- );
- partitions_consensus.init();
-
- // Configure partitions
let partitions_config = PartitionsConfig {
messages_required_to_save: 1000,
size_of_messages_required_to_save: IggyByteSize::from(4 * 1024 *
1024),
enforce_fsync: false, // Disable fsync for simulation
- segment_size: IggyByteSize::from(1024 * 1024 * 1024), // 1GB
segments
+ segment_size: IggyByteSize::from(1024 * 1024 * 1024), // 1GiB
segments
};
- // Only replica 0 gets consensus (primary shard for now)
- let partitions = if id == 0 {
- ReplicaPartitions::new(
- ShardId::new(id as u16),
- partitions_config,
- Some(partitions_consensus),
- )
- } else {
- ReplicaPartitions::new(ShardId::new(id as u16), partitions_config,
None)
- };
+ let mut partitions = ReplicaPartitions::new(ShardId::new(id as u16),
partitions_config);
+
+ // TODO: namespace=0 collides with metadata consensus. Safe for now
because the simulator
+ // routes by Operation type, but a shared view change bus would
produce namespace collisions.
+ let partition_consensus = VsrConsensus::new(
+ CLUSTER_ID,
+ id,
+ replica_count,
+ 0,
+ SharedMemBus(Arc::clone(&bus)),
+ NamespacedPipeline::new(),
+ );
+ partition_consensus.init();
+ partitions.set_consensus(partition_consensus);
Self {
id,
name,
+ replica_count,
metadata: IggyMetadata {
consensus: Some(metadata_consensus),
journal: Some(SimJournal::<MemStorage>::default()),
@@ -96,4 +95,10 @@ impl Replica {
bus,
}
}
+
+ pub fn init_partition(&mut self, namespace: IggyNamespace) {
+ self.partitions.init_partition_in_memory(namespace);
+ self.partitions
+ .register_namespace_in_pipeline(namespace.inner());
+ }
}