This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 54153ee34 feat(cluster): Impl VSR `view_change` (#2546)
54153ee34 is described below
commit 54153ee34b0ef781032cb28d72ff5480080fde56
Author: Krishna Vishal <[email protected]>
AuthorDate: Tue Jan 13 22:39:09 2026 +0530
feat(cluster): Impl VSR `view_change` (#2546)
Implemented `view_change` from VSR-revisited paper.
---
core/common/src/types/consensus/header.rs | 202 ++++++++++++
core/consensus/src/impls.rs | 532 +++++++++++++++++++++++++++++-
core/consensus/src/lib.rs | 2 +
core/consensus/src/view_change_quorum.rs | 102 ++++++
core/consensus/src/vsr_timeout.rs | 7 +
5 files changed, 842 insertions(+), 3 deletions(-)
diff --git a/core/common/src/types/consensus/header.rs
b/core/common/src/types/consensus/header.rs
index b20d0d5bf..376b6be05 100644
--- a/core/common/src/types/consensus/header.rs
+++ b/core/common/src/types/consensus/header.rs
@@ -45,6 +45,8 @@ pub enum Command2 {
Commit = 9,
StartViewChange = 10,
+ DoViewChange = 11,
+ StartView = 12,
}
#[derive(Debug, Clone, Error, PartialEq, Eq)]
@@ -52,12 +54,18 @@ pub enum ConsensusError {
#[error("invalid command: expected {expected:?}, found {found:?}")]
InvalidCommand { expected: Command2, found: Command2 },
+ #[error("invalid size: expected {expected:?}, found {found:?}")]
+ InvalidSize { expected: u32, found: u32 },
+
#[error("invalid checksum")]
InvalidChecksum,
#[error("invalid cluster ID")]
InvalidCluster,
+ #[error("invalid field: {0}")]
+ InvalidField(String),
+
#[error("parent_padding must be 0")]
PrepareParentPaddingNonZero,
@@ -389,3 +397,197 @@ impl ConsensusHeader for ReplyHeader {
self.size
}
}
+
+/// StartViewChange message header.
+///
+/// Sent by a replica when it suspects the primary has failed.
+/// This is a header-only message with no body.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+#[repr(C)]
+pub struct StartViewChangeHeader {
+ pub checksum: u128,
+ pub checksum_padding: u128,
+ pub checksum_body: u128,
+ pub checksum_body_padding: u128,
+ pub cluster: u128,
+ pub size: u32,
+ pub epoch: u32,
+ pub view: u32,
+ pub release: u32,
+ pub protocol: u16,
+ pub command: Command2,
+ pub replica: u8,
+ pub reserved_frame: [u8; 12],
+
+ pub reserved: [u8; 128],
+}
+
+unsafe impl Pod for StartViewChangeHeader {}
+unsafe impl Zeroable for StartViewChangeHeader {}
+
+impl ConsensusHeader for StartViewChangeHeader {
+ const COMMAND: Command2 = Command2::StartViewChange;
+
+ fn validate(&self) -> Result<(), ConsensusError> {
+ if self.command != Command2::StartViewChange {
+ return Err(ConsensusError::InvalidCommand {
+ expected: Command2::StartViewChange,
+ found: self.command,
+ });
+ }
+
+ if self.release != 0 {
+ return Err(ConsensusError::InvalidField("release !=
0".to_string()));
+ }
+ Ok(())
+ }
+
+ fn size(&self) -> u32 {
+ self.size
+ }
+}
+
+/// DoViewChange message header.
+///
+/// Sent by replicas to the primary candidate after collecting a quorum of
+/// StartViewChange messages.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+#[repr(C)]
+pub struct DoViewChangeHeader {
+ pub checksum: u128,
+ pub checksum_padding: u128,
+ pub checksum_body: u128,
+ pub checksum_body_padding: u128,
+ pub cluster: u128,
+ pub size: u32,
+ pub epoch: u32,
+ pub view: u32,
+ pub release: u32,
+ pub protocol: u16,
+ pub command: Command2,
+ pub replica: u8,
+ pub reserved_frame: [u8; 12],
+
+ /// The highest op-number in this replica's log.
+ /// Used to select the most complete log when log_view values are equal.
+ pub op: u64,
+
+ /// The replica's commit number (highest committed op).
+ /// The new primary sets its commit to max(commit) across all DVCs.
+ pub commit: u64,
+
+ /// The view number when this replica's status was last normal.
+ /// This is the key field for log selection: the replica with the
+ /// highest log_view has the most authoritative log.
+ pub log_view: u32,
+
+ pub reserved: [u8; 108],
+}
+
+unsafe impl Pod for DoViewChangeHeader {}
+unsafe impl Zeroable for DoViewChangeHeader {}
+
+impl ConsensusHeader for DoViewChangeHeader {
+ const COMMAND: Command2 = Command2::DoViewChange;
+
+ fn validate(&self) -> Result<(), ConsensusError> {
+ if self.command != Command2::DoViewChange {
+ return Err(ConsensusError::InvalidCommand {
+ expected: Command2::DoViewChange,
+ found: self.command,
+ });
+ }
+
+ if self.release != 0 {
+ return Err(ConsensusError::InvalidField(
+ "release must be 0".to_string(),
+ ));
+ }
+
+ // log_view must be <= view (can't have been normal in a future view)
+ if self.log_view > self.view {
+ return Err(ConsensusError::InvalidField(
+ "log_view cannot exceed view".to_string(),
+ ));
+ }
+
+ // commit must be <= op (can't commit what we haven't seen)
+ if self.commit > self.op {
+ return Err(ConsensusError::InvalidField(
+ "commit cannot exceed op".to_string(),
+ ));
+ }
+ Ok(())
+ }
+
+ fn size(&self) -> u32 {
+ self.size
+ }
+}
+
+/// StartView message header.
+///
+/// Sent by the new primary to all replicas after collecting a quorum of
+/// DoViewChange messages.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+#[repr(C)]
+pub struct StartViewHeader {
+ pub checksum: u128,
+ pub checksum_padding: u128,
+ pub checksum_body: u128,
+ pub checksum_body_padding: u128,
+ pub cluster: u128,
+ pub size: u32,
+ pub epoch: u32,
+ pub view: u32,
+ pub release: u32,
+ pub protocol: u16,
+ pub command: Command2,
+ pub replica: u8,
+ pub reserved_frame: [u8; 12],
+
+ /// The op-number of the highest entry in the new primary's log.
+ /// Backups set their op to this value.
+ pub op: u64,
+
+ /// The commit number.
+ /// This is max(commit) from all DVCs received by the primary.
+ /// Backups set their commit to this value.
+ pub commit: u64,
+
+ pub reserved: [u8; 112],
+}
+
+unsafe impl Pod for StartViewHeader {}
+unsafe impl Zeroable for StartViewHeader {}
+
+impl ConsensusHeader for StartViewHeader {
+ const COMMAND: Command2 = Command2::StartView;
+
+ fn validate(&self) -> Result<(), ConsensusError> {
+ if self.command != Command2::StartView {
+ return Err(ConsensusError::InvalidCommand {
+ expected: Command2::StartView,
+ found: self.command,
+ });
+ }
+
+ if self.release != 0 {
+ return Err(ConsensusError::InvalidField(
+ "release must be 0".to_string(),
+ ));
+ }
+
+ // commit must be <= op
+ if self.commit > self.op {
+ return Err(ConsensusError::InvalidField(
+ "commit cannot exceed op".to_string(),
+ ));
+ }
+ Ok(())
+ }
+
+ fn size(&self) -> u32 {
+ self.size
+ }
+}
diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index 98d78e737..0be640933 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -15,9 +15,16 @@
// specific language governing permissions and limitations
// under the License.
-use crate::{Consensus, Project};
+use crate::vsr_timeout::{TimeoutKind, TimeoutManager};
+use crate::{
+ Consensus, DvcQuorumArray, Project, StoredDvc, dvc_count, dvc_max_commit,
+ dvc_quorum_array_empty, dvc_record, dvc_reset, dvc_select_winner,
+};
use bit_set::BitSet;
-use iggy_common::header::{Command2, PrepareHeader, PrepareOkHeader,
RequestHeader};
+use iggy_common::header::{
+ Command2, DoViewChangeHeader, PrepareHeader, PrepareOkHeader,
RequestHeader,
+ StartViewChangeHeader, StartViewHeader,
+};
use iggy_common::message::Message;
use message_bus::IggyMessageBus;
use std::cell::{Cell, RefCell};
@@ -322,6 +329,25 @@ pub enum Status {
Recovering,
}
+/// Actions to be taken by the caller after processing a VSR event.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum VsrAction {
+ /// Send StartViewChange to all replicas.
+ SendStartViewChange { view: u32 },
+ /// Send DoViewChange to primary.
+ SendDoViewChange {
+ view: u32,
+ target: u8,
+ log_view: u32,
+ op: u64,
+ commit: u64,
+ },
+ /// Send StartView to all backups (as new primary).
+ SendStartView { view: u32, op: u64, commit: u64 },
+ /// Send PrepareOK to primary.
+ SendPrepareOk { view: u32, op: u64, target: u8 },
+}
+
#[allow(unused)]
pub struct VsrConsensus {
cluster: u128,
@@ -329,6 +355,15 @@ pub struct VsrConsensus {
replica_count: u8,
view: Cell<u32>,
+
+ // The latest view where
+ // - the replica was a primary and acquired a DVC quorum, or
+ // - the replica was a backup and processed a SV message.
+ // i.e. the latest view in which this replica changed its head message.
+ // Initialized from the superblock's VSRState.
+ // Invariants:
+ // * `replica.log_view ≥ replica.log_view_durable`
+ // * `replica.log_view = 0` when replica_count=1.
log_view: Cell<u32>,
status: Cell<Status>,
commit: Cell<u64>,
@@ -342,6 +377,20 @@ pub struct VsrConsensus {
message_bus: IggyMessageBus,
// TODO: Add loopback_queue for messages to self
+ /// Tracks start view change messages received from all replicas
(including self)
+ start_view_change_from_all_replicas: RefCell<BitSet<u32>>,
+
+ /// Tracks DVC messages received (only used by primary candidate)
+ /// Stores metadata; actual log comes from message
+ do_view_change_from_all_replicas: RefCell<DvcQuorumArray>,
+ /// Whether DVC quorum has been achieved in current view change
+ do_view_change_quorum: Cell<bool>,
+ /// Whether we've sent our own SVC for current view
+ sent_own_start_view_change: Cell<bool>,
+ /// Whether we've sent our own DVC for current view
+ sent_own_do_view_change: Cell<bool>,
+
+ timeouts: RefCell<TimeoutManager>,
}
impl VsrConsensus {
@@ -364,6 +413,12 @@ impl VsrConsensus {
last_prepare_checksum: Cell::new(0),
pipeline: RefCell::new(Pipeline::new()),
message_bus: IggyMessageBus::new(replica_count as usize, replica
as u16, 0),
+ start_view_change_from_all_replicas:
RefCell::new(BitSet::with_capacity(REPLICAS_MAX)),
+ do_view_change_from_all_replicas:
RefCell::new(dvc_quorum_array_empty()),
+ do_view_change_quorum: Cell::new(false),
+ sent_own_start_view_change: Cell::new(false),
+ sent_own_do_view_change: Cell::new(false),
+ timeouts: RefCell::new(TimeoutManager::new(replica as u128)),
}
}
@@ -383,8 +438,15 @@ impl VsrConsensus {
assert!(self.commit.get() >= commit);
}
+ /// Maximum number of faulty replicas that can be tolerated.
+ /// For a cluster of 2f+1 replicas, this returns f.
+ pub fn max_faulty(&self) -> usize {
+ (self.replica_count as usize - 1) / 2
+ }
+
+ /// Quorum size = f + 1 = max_faulty + 1
pub fn quorum(&self) -> usize {
- (self.replica_count as usize / 2) + 1
+ self.max_faulty() + 1
}
pub fn commit(&self) -> u64 {
@@ -408,6 +470,10 @@ impl VsrConsensus {
self.view.get()
}
+ pub fn set_view(&mut self, view: u32) {
+ self.view.set(view);
+ }
+
pub fn status(&self) -> Status {
self.status.get()
}
@@ -428,6 +494,466 @@ impl VsrConsensus {
self.replica_count
}
+ pub fn log_view(&self) -> u32 {
+ self.log_view.get()
+ }
+
+ pub fn set_log_view(&self, log_view: u32) {
+ self.log_view.set(log_view);
+ }
+
+ pub fn is_primary_for_view(&self, view: u32) -> bool {
+ self.primary_index(view) == self.replica
+ }
+
+ /// Count SVCs from OTHER replicas (excluding self).
+ fn svc_count_excluding_self(&self) -> usize {
+ let svc = self.start_view_change_from_all_replicas.borrow();
+ let total = svc.len();
+ if svc.contains(self.replica as usize) {
+ total.saturating_sub(1)
+ } else {
+ total
+ }
+ }
+
+ /// Reset SVC quorum tracking.
+ fn reset_svc_quorum(&self) {
+ self.start_view_change_from_all_replicas
+ .borrow_mut()
+ .clear();
+ }
+
+ /// Reset DVC quorum tracking.
+ fn reset_dvc_quorum(&self) {
+ dvc_reset(&mut self.do_view_change_from_all_replicas.borrow_mut());
+ self.do_view_change_quorum.set(false);
+ }
+
+ /// Reset all view change state for a new view.
+ fn reset_view_change_state(&self) {
+ self.reset_svc_quorum();
+ self.reset_dvc_quorum();
+ self.sent_own_start_view_change.set(false);
+ self.sent_own_do_view_change.set(false);
+ }
+
+ /// Process one tick. Call this periodically (e.g., every 10ms).
+ ///
+ /// Returns a list of actions to take based on fired timeouts.
+ /// Empty vec means no actions needed.
+ pub fn tick(&self, current_op: u64, current_commit: u64) -> Vec<VsrAction>
{
+ let mut actions = Vec::new();
+ let mut timeouts = self.timeouts.borrow_mut();
+
+ // Phase 1: Tick all timeouts
+ timeouts.tick();
+
+ // Phase 2: Handle fired timeouts
+ if timeouts.fired(TimeoutKind::NormalHeartbeat) {
+ drop(timeouts);
+ actions.extend(self.handle_normal_heartbeat_timeout());
+ timeouts = self.timeouts.borrow_mut();
+ }
+
+ if timeouts.fired(TimeoutKind::StartViewChangeMessage) {
+ drop(timeouts);
+ actions.extend(self.handle_start_view_change_message_timeout());
+ timeouts = self.timeouts.borrow_mut();
+ }
+
+ if timeouts.fired(TimeoutKind::DoViewChangeMessage) {
+ drop(timeouts);
+
actions.extend(self.handle_do_view_change_message_timeout(current_op,
current_commit));
+ timeouts = self.timeouts.borrow_mut();
+ }
+
+ if timeouts.fired(TimeoutKind::ViewChangeStatus) {
+ drop(timeouts);
+ actions.extend(self.handle_view_change_status_timeout());
+ // timeouts = self.timeouts.borrow_mut(); // Not needed if last
+ }
+
+ actions
+ }
+
+ /// Called when normal_heartbeat timeout fires.
+ /// Backup hasn't heard from primary - start view change.
+ fn handle_normal_heartbeat_timeout(&self) -> Vec<VsrAction> {
+ // Only backups trigger view change on heartbeat timeout
+ if self.is_primary() {
+ return Vec::new();
+ }
+
+ // Already in view change
+ if self.status.get() == Status::ViewChange {
+ return Vec::new();
+ }
+
+ // Advance to new view and transition to view change
+ let new_view = self.view.get() + 1;
+
+ self.view.set(new_view);
+ self.status.set(Status::ViewChange);
+ self.reset_view_change_state();
+ self.sent_own_start_view_change.set(true);
+ self.start_view_change_from_all_replicas
+ .borrow_mut()
+ .insert(self.replica as usize);
+
+ // Update timeouts for view change status
+ {
+ let mut timeouts = self.timeouts.borrow_mut();
+ timeouts.stop(TimeoutKind::NormalHeartbeat);
+ timeouts.start(TimeoutKind::StartViewChangeMessage);
+ timeouts.start(TimeoutKind::ViewChangeStatus);
+ }
+
+ vec![VsrAction::SendStartViewChange { view: new_view }]
+ }
+
+ /// Resend SVC message if we've started view change.
+ fn handle_start_view_change_message_timeout(&self) -> Vec<VsrAction> {
+ if !self.sent_own_start_view_change.get() {
+ return Vec::new();
+ }
+
+ self.timeouts
+ .borrow_mut()
+ .reset(TimeoutKind::StartViewChangeMessage);
+
+ vec![VsrAction::SendStartViewChange {
+ view: self.view.get(),
+ }]
+ }
+
+ /// Resend DVC message if we've sent one.
+ fn handle_do_view_change_message_timeout(
+ &self,
+ current_op: u64,
+ current_commit: u64,
+ ) -> Vec<VsrAction> {
+ if self.status.get() != Status::ViewChange {
+ return Vec::new();
+ }
+
+ if !self.sent_own_do_view_change.get() {
+ return Vec::new();
+ }
+
+ // If we're primary candidate with quorum, don't resend
+ if self.is_primary() && self.do_view_change_quorum.get() {
+ return Vec::new();
+ }
+
+ self.timeouts
+ .borrow_mut()
+ .reset(TimeoutKind::DoViewChangeMessage);
+
+ vec![VsrAction::SendDoViewChange {
+ view: self.view.get(),
+ target: self.primary_index(self.view.get()),
+ log_view: self.log_view.get(),
+ op: current_op,
+ commit: current_commit,
+ }]
+ }
+
+ /// Escalate to next view if stuck in view change.
+ fn handle_view_change_status_timeout(&self) -> Vec<VsrAction> {
+ if self.status.get() != Status::ViewChange {
+ return Vec::new();
+ }
+
+ // Escalate: try next view
+ let next_view = self.view.get() + 1;
+
+ self.view.set(next_view);
+ self.reset_view_change_state();
+ self.sent_own_start_view_change.set(true);
+ self.start_view_change_from_all_replicas
+ .borrow_mut()
+ .insert(self.replica as usize);
+
+ self.timeouts
+ .borrow_mut()
+ .reset(TimeoutKind::ViewChangeStatus);
+
+ vec![VsrAction::SendStartViewChange { view: next_view }]
+ }
+
+ /// Handle a received StartViewChange message.
+ ///
+ /// "When replica i receives STARTVIEWCHANGE messages for its view-number
+ /// from f OTHER replicas, it sends a DOVIEWCHANGE message to the node
+ /// that will be the primary in the new view."
+ pub fn handle_start_view_change(&self, header: &StartViewChangeHeader) ->
Vec<VsrAction> {
+ let from_replica = header.replica;
+ let msg_view = header.view;
+
+ // Ignore SVCs for old views
+ if msg_view < self.view.get() {
+ return Vec::new();
+ }
+
+ let mut actions = Vec::new();
+
+ // If SVC is for a higher view, advance to that view
+ if msg_view > self.view.get() {
+ self.view.set(msg_view);
+ self.status.set(Status::ViewChange);
+ self.reset_view_change_state();
+ self.sent_own_start_view_change.set(true);
+ self.start_view_change_from_all_replicas
+ .borrow_mut()
+ .insert(self.replica as usize);
+
+ // Update timeouts
+ {
+ let mut timeouts = self.timeouts.borrow_mut();
+ timeouts.stop(TimeoutKind::NormalHeartbeat);
+ timeouts.start(TimeoutKind::StartViewChangeMessage);
+ timeouts.start(TimeoutKind::ViewChangeStatus);
+ }
+
+ // Send our own SVC
+ actions.push(VsrAction::SendStartViewChange { view: msg_view });
+ }
+
+ // Record the SVC from sender
+ self.start_view_change_from_all_replicas
+ .borrow_mut()
+ .insert(from_replica as usize);
+
+ // Check if we have f SVCs from OTHER replicas
+ // We need f SVCs from others to send DVC
+ if !self.sent_own_do_view_change.get()
+ && self.svc_count_excluding_self() >= self.max_faulty()
+ {
+ self.sent_own_do_view_change.set(true);
+
+ let primary_candidate = self.primary_index(self.view.get());
+ let current_op = self.sequencer.current_sequence();
+ let current_commit = self.commit.get();
+
+ // Start DVC timeout
+ self.timeouts
+ .borrow_mut()
+ .start(TimeoutKind::DoViewChangeMessage);
+
+ actions.push(VsrAction::SendDoViewChange {
+ view: self.view.get(),
+ target: primary_candidate,
+ log_view: self.log_view.get(),
+ op: current_op,
+ commit: current_commit,
+ });
+
+ // If we are the primary candidate, record our own DVC
+ if primary_candidate == self.replica {
+ let own_dvc = StoredDvc {
+ replica: self.replica,
+ log_view: self.log_view.get(),
+ op: current_op,
+ commit: current_commit,
+ };
+ dvc_record(
+ &mut self.do_view_change_from_all_replicas.borrow_mut(),
+ own_dvc,
+ );
+
+ // Check if we now have quorum
+ if dvc_count(&self.do_view_change_from_all_replicas.borrow())
>= self.quorum() {
+ self.do_view_change_quorum.set(true);
+ actions.extend(self.complete_view_change_as_primary());
+ }
+ }
+ }
+
+ actions
+ }
+
+ /// Handle a received DoViewChange message (only relevant for primary
candidate).
+ ///
+ /// "When the new primary receives f + 1 DOVIEWCHANGE messages from
different
+ /// replicas (including itself), it sets its view-number to that in the
messages
+ /// and selects as the new log the one contained in the message with the
largest v'..."
+ pub fn handle_do_view_change(&self, header: &DoViewChangeHeader) ->
Vec<VsrAction> {
+ let from_replica = header.replica;
+ let msg_view = header.view;
+ let msg_log_view = header.log_view;
+ let msg_op = header.op;
+ let msg_commit = header.commit;
+
+ // Ignore DVCs for old views
+ if msg_view < self.view.get() {
+ return Vec::new();
+ }
+
+ let mut actions = Vec::new();
+
+ // If DVC is for a higher view, advance to that view
+ if msg_view > self.view.get() {
+ self.view.set(msg_view);
+ self.status.set(Status::ViewChange);
+ self.reset_view_change_state();
+ self.sent_own_start_view_change.set(true);
+ self.start_view_change_from_all_replicas
+ .borrow_mut()
+ .insert(self.replica as usize);
+
+ // Update timeouts
+ {
+ let mut timeouts = self.timeouts.borrow_mut();
+ timeouts.stop(TimeoutKind::NormalHeartbeat);
+ timeouts.start(TimeoutKind::StartViewChangeMessage);
+ timeouts.start(TimeoutKind::ViewChangeStatus);
+ }
+
+ // Send our own SVC
+ actions.push(VsrAction::SendStartViewChange { view: msg_view });
+ }
+
+ // Only the primary candidate processes DVCs for quorum
+ if !self.is_primary_for_view(self.view.get()) {
+ return actions;
+ }
+
+ // Must be in view change to process DVCs
+ if self.status.get() != Status::ViewChange {
+ return actions;
+ }
+
+ let current_op = self.sequencer.current_sequence();
+ let current_commit = self.commit.get();
+
+ // If we haven't sent our own DVC yet, record it
+ if !self.sent_own_do_view_change.get() {
+ self.sent_own_do_view_change.set(true);
+
+ let own_dvc = StoredDvc {
+ replica: self.replica,
+ log_view: self.log_view.get(),
+ op: current_op,
+ commit: current_commit,
+ };
+ dvc_record(
+ &mut self.do_view_change_from_all_replicas.borrow_mut(),
+ own_dvc,
+ );
+ }
+
+ // Record the received DVC
+ let dvc = StoredDvc {
+ replica: from_replica,
+ log_view: msg_log_view,
+ op: msg_op,
+ commit: msg_commit,
+ };
+ dvc_record(&mut self.do_view_change_from_all_replicas.borrow_mut(),
dvc);
+
+ // Check if quorum achieved
+ if !self.do_view_change_quorum.get()
+ && dvc_count(&self.do_view_change_from_all_replicas.borrow()) >=
self.quorum()
+ {
+ self.do_view_change_quorum.set(true);
+ actions.extend(self.complete_view_change_as_primary());
+ }
+
+ actions
+ }
+
+ /// Handle a received StartView message (backups only).
+ ///
+ /// "When other replicas receive the STARTVIEW message, they replace their
log
+ /// with the one in the message, set their op-number to that of the latest
entry
+ /// in the log, set their view-number to the view number in the message,
change
+ /// their status to normal, and send PrepareOK for any uncommitted ops."
+ pub fn handle_start_view(&self, header: &StartViewHeader) ->
Vec<VsrAction> {
+ let from_replica = header.replica;
+ let msg_view = header.view;
+ let msg_op = header.op;
+ let msg_commit = header.commit;
+
+ // Verify sender is the primary for this view
+ if self.primary_index(msg_view) != from_replica {
+ return Vec::new();
+ }
+
+ // Ignore old views
+ if msg_view < self.view.get() {
+ return Vec::new();
+ }
+
+ // We shouldn't process our own StartView
+ if from_replica == self.replica {
+ return Vec::new();
+ }
+
+ // Accept the StartView and transition to normal
+ self.view.set(msg_view);
+ self.log_view.set(msg_view);
+ self.status.set(Status::Normal);
+ self.advance_commit_number(msg_commit);
+ self.reset_view_change_state();
+
+ // Update our op to match the new primary's log
+ self.sequencer.set_sequence(msg_op);
+
+ // Update timeouts for normal backup operation
+ {
+ let mut timeouts = self.timeouts.borrow_mut();
+ timeouts.stop(TimeoutKind::ViewChangeStatus);
+ timeouts.stop(TimeoutKind::DoViewChangeMessage);
+ timeouts.stop(TimeoutKind::RequestStartViewMessage);
+ timeouts.start(TimeoutKind::NormalHeartbeat);
+ }
+
+ // Send PrepareOK for uncommitted ops (commit+1 to op)
+ let mut actions = Vec::new();
+ for op_num in (msg_commit + 1)..=msg_op {
+ actions.push(VsrAction::SendPrepareOk {
+ view: msg_view,
+ op: op_num,
+ target: from_replica, // Send to new primary
+ });
+ }
+
+ actions
+ }
+
+ /// Complete view change as the new primary after collecting DVC quorum.
+ fn complete_view_change_as_primary(&self) -> Vec<VsrAction> {
+ let dvc_array = self.do_view_change_from_all_replicas.borrow();
+
+ let Some(winner) = dvc_select_winner(&dvc_array) else {
+ return Vec::new();
+ };
+
+ let new_op = winner.op;
+ let max_commit = dvc_max_commit(&dvc_array);
+
+ // Update state
+ self.log_view.set(self.view.get());
+ self.status.set(Status::Normal);
+ self.advance_commit_number(max_commit);
+
+ // Update timeouts for normal primary operation
+ {
+ let mut timeouts = self.timeouts.borrow_mut();
+ timeouts.stop(TimeoutKind::ViewChangeStatus);
+ timeouts.stop(TimeoutKind::DoViewChangeMessage);
+ timeouts.stop(TimeoutKind::StartViewChangeMessage);
+ timeouts.start(TimeoutKind::CommitMessage);
+ }
+
+ vec![VsrAction::SendStartView {
+ view: self.view.get(),
+ op: new_op,
+ commit: max_commit,
+ }]
+ }
+
/// Handle a prepare_ok message from a follower.
/// Called on the primary when a follower acknowledges a prepare.
///
diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs
index 8ff864ffd..e77c575ce 100644
--- a/core/consensus/src/lib.rs
+++ b/core/consensus/src/lib.rs
@@ -43,4 +43,6 @@ pub trait Consensus {
mod impls;
pub use impls::*;
+mod view_change_quorum;
+pub use view_change_quorum::*;
mod vsr_timeout;
diff --git a/core/consensus/src/view_change_quorum.rs
b/core/consensus/src/view_change_quorum.rs
new file mode 100644
index 000000000..2a8e01e9b
--- /dev/null
+++ b/core/consensus/src/view_change_quorum.rs
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::REPLICAS_MAX;
+
+/// Stored information from a DoViewChange message.
+#[derive(Debug, Clone, Copy)]
+pub struct StoredDvc {
+ pub replica: u8,
+ /// The view when the replica's status was last normal.
+ pub log_view: u32,
+ pub op: u64,
+ pub commit: u64,
+}
+
+impl StoredDvc {
+ /// Compare for log selection: highest log_view, then highest op.
+ pub fn is_better_than(&self, other: &StoredDvc) -> bool {
+ if self.log_view != other.log_view {
+ self.log_view > other.log_view
+ } else {
+ self.op > other.op
+ }
+ }
+}
+
+/// Array type for storing DVC messages from all replicas.
+pub type DvcQuorumArray = [Option<StoredDvc>; REPLICAS_MAX];
+
+/// Create an empty DVC quorum array.
+pub const fn dvc_quorum_array_empty() -> DvcQuorumArray {
+ [None; REPLICAS_MAX]
+}
+
+/// Record a DVC in the array. Returns true if this is a new entry (not
duplicate).
+pub fn dvc_record(array: &mut DvcQuorumArray, dvc: StoredDvc) -> bool {
+ let slot = &mut array[dvc.replica as usize];
+ if slot.is_some() {
+ return false; // Duplicate
+ }
+ *slot = Some(dvc);
+ true
+}
+
+/// Count how many DVCs have been received.
+pub fn dvc_count(array: &DvcQuorumArray) -> usize {
+ array.iter().filter(|m| m.is_some()).count()
+}
+
+/// Check if a specific replica has sent a DVC.
+pub fn dvc_has_from(array: &DvcQuorumArray, replica: u8) -> bool {
+ array
+ .get(replica as usize)
+ .map(|m| m.is_some())
+ .unwrap_or(false)
+}
+
+/// Select the winning DVC (best log) from the quorum.
+/// Returns the DVC with: highest log_view, then highest op.
+pub fn dvc_select_winner(array: &DvcQuorumArray) -> Option<&StoredDvc> {
+ array
+ .iter()
+ .filter_map(|m| m.as_ref())
+ .max_by(|a, b| match a.log_view.cmp(&b.log_view) {
+ std::cmp::Ordering::Equal => a.op.cmp(&b.op),
+ other => other,
+ })
+}
+
+/// Get the maximum commit number across all DVCs.
+pub fn dvc_max_commit(array: &DvcQuorumArray) -> u64 {
+ array
+ .iter()
+ .filter_map(|m| m.as_ref())
+ .map(|dvc| dvc.commit)
+ .max()
+ .unwrap_or(0)
+}
+
+/// Reset the DVC quorum array.
+pub fn dvc_reset(array: &mut DvcQuorumArray) {
+ *array = dvc_quorum_array_empty();
+}
+
+/// Iterator over all stored DVCs.
+pub fn dvc_iter(array: &DvcQuorumArray) -> impl Iterator<Item = &StoredDvc> {
+ array.iter().filter_map(|m| m.as_ref())
+}
diff --git a/core/consensus/src/vsr_timeout.rs
b/core/consensus/src/vsr_timeout.rs
index d572bb65e..140d40f02 100644
--- a/core/consensus/src/vsr_timeout.rs
+++ b/core/consensus/src/vsr_timeout.rs
@@ -102,6 +102,7 @@ pub enum TimeoutKind {
CommitMessage,
NormalHeartbeat,
StartViewChangeMessage,
+ ViewChangeStatus,
DoViewChangeMessage,
RequestStartViewMessage,
}
@@ -115,6 +116,7 @@ pub struct TimeoutManager {
normal_heartbeat: Timeout,
start_view_change_message: Timeout,
do_view_change_message: Timeout,
+ view_change_status: Timeout,
request_start_view_message: Timeout,
prng: Xoshiro256Plus,
}
@@ -128,6 +130,7 @@ impl TimeoutManager {
const COMMIT_MESSAGE_TICKS: u64 = 50;
const NORMAL_HEARTBEAT_TICKS: u64 = 500;
const START_VIEW_CHANGE_MESSAGE_TICKS: u64 = 50;
+ const VIEW_CHANGE_STATUS_TICKS: u64 = 500;
const DO_VIEW_CHANGE_MESSAGE_TICKS: u64 = 50;
const REQUEST_START_VIEW_MESSAGE_TICKS: u64 = 100;
@@ -142,6 +145,7 @@ impl TimeoutManager {
Self::START_VIEW_CHANGE_MESSAGE_TICKS,
),
do_view_change_message: Timeout::new(replica_id,
Self::DO_VIEW_CHANGE_MESSAGE_TICKS),
+ view_change_status: Timeout::new(replica_id,
Self::VIEW_CHANGE_STATUS_TICKS),
request_start_view_message: Timeout::new(
replica_id,
Self::REQUEST_START_VIEW_MESSAGE_TICKS,
@@ -174,6 +178,7 @@ impl TimeoutManager {
TimeoutKind::CommitMessage => &self.commit_message,
TimeoutKind::NormalHeartbeat => &self.normal_heartbeat,
TimeoutKind::StartViewChangeMessage =>
&self.start_view_change_message,
+ TimeoutKind::ViewChangeStatus => &self.view_change_status,
TimeoutKind::DoViewChangeMessage => &self.do_view_change_message,
TimeoutKind::RequestStartViewMessage =>
&self.request_start_view_message,
}
@@ -186,6 +191,7 @@ impl TimeoutManager {
TimeoutKind::CommitMessage => &mut self.commit_message,
TimeoutKind::NormalHeartbeat => &mut self.normal_heartbeat,
TimeoutKind::StartViewChangeMessage => &mut
self.start_view_change_message,
+ TimeoutKind::ViewChangeStatus => &mut self.view_change_status,
TimeoutKind::DoViewChangeMessage => &mut
self.do_view_change_message,
TimeoutKind::RequestStartViewMessage => &mut
self.request_start_view_message,
}
@@ -210,6 +216,7 @@ impl TimeoutManager {
TimeoutKind::CommitMessage => &mut self.commit_message,
TimeoutKind::NormalHeartbeat => &mut self.normal_heartbeat,
TimeoutKind::StartViewChangeMessage => &mut
self.start_view_change_message,
+ TimeoutKind::ViewChangeStatus => &mut self.view_change_status,
TimeoutKind::DoViewChangeMessage => &mut
self.do_view_change_message,
TimeoutKind::RequestStartViewMessage => &mut
self.request_start_view_message,
};