This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 54153ee34 feat(cluster): Impl VSR `view_change` (#2546)
54153ee34 is described below

commit 54153ee34b0ef781032cb28d72ff5480080fde56
Author: Krishna Vishal <[email protected]>
AuthorDate: Tue Jan 13 22:39:09 2026 +0530

    feat(cluster): Impl VSR `view_change` (#2546)
    
    Implemented `view_change` from VSR-revisited paper.
---
 core/common/src/types/consensus/header.rs | 202 ++++++++++++
 core/consensus/src/impls.rs               | 532 +++++++++++++++++++++++++++++-
 core/consensus/src/lib.rs                 |   2 +
 core/consensus/src/view_change_quorum.rs  | 102 ++++++
 core/consensus/src/vsr_timeout.rs         |   7 +
 5 files changed, 842 insertions(+), 3 deletions(-)

diff --git a/core/common/src/types/consensus/header.rs 
b/core/common/src/types/consensus/header.rs
index b20d0d5bf..376b6be05 100644
--- a/core/common/src/types/consensus/header.rs
+++ b/core/common/src/types/consensus/header.rs
@@ -45,6 +45,8 @@ pub enum Command2 {
     Commit = 9,
 
     StartViewChange = 10,
+    DoViewChange = 11,
+    StartView = 12,
 }
 
 #[derive(Debug, Clone, Error, PartialEq, Eq)]
@@ -52,12 +54,18 @@ pub enum ConsensusError {
     #[error("invalid command: expected {expected:?}, found {found:?}")]
     InvalidCommand { expected: Command2, found: Command2 },
 
+    #[error("invalid size: expected {expected:?}, found {found:?}")]
+    InvalidSize { expected: u32, found: u32 },
+
     #[error("invalid checksum")]
     InvalidChecksum,
 
     #[error("invalid cluster ID")]
     InvalidCluster,
 
+    #[error("invalid field: {0}")]
+    InvalidField(String),
+
     #[error("parent_padding must be 0")]
     PrepareParentPaddingNonZero,
 
@@ -389,3 +397,197 @@ impl ConsensusHeader for ReplyHeader {
         self.size
     }
 }
+
+/// StartViewChange message header.
+///
+/// Sent by a replica when it suspects the primary has failed.
+/// This is a header-only message with no body.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+#[repr(C)]
+pub struct StartViewChangeHeader {
+    pub checksum: u128,
+    pub checksum_padding: u128,
+    pub checksum_body: u128,
+    pub checksum_body_padding: u128,
+    pub cluster: u128,
+    pub size: u32,
+    pub epoch: u32,
+    pub view: u32,
+    pub release: u32,
+    pub protocol: u16,
+    pub command: Command2,
+    pub replica: u8,
+    pub reserved_frame: [u8; 12],
+
+    pub reserved: [u8; 128],
+}
+
+unsafe impl Pod for StartViewChangeHeader {}
+unsafe impl Zeroable for StartViewChangeHeader {}
+
+impl ConsensusHeader for StartViewChangeHeader {
+    const COMMAND: Command2 = Command2::StartViewChange;
+
+    fn validate(&self) -> Result<(), ConsensusError> {
+        if self.command != Command2::StartViewChange {
+            return Err(ConsensusError::InvalidCommand {
+                expected: Command2::StartViewChange,
+                found: self.command,
+            });
+        }
+
+        if self.release != 0 {
+            return Err(ConsensusError::InvalidField("release != 
0".to_string()));
+        }
+        Ok(())
+    }
+
+    fn size(&self) -> u32 {
+        self.size
+    }
+}
+
+/// DoViewChange message header.
+///
+/// Sent by replicas to the primary candidate after collecting a quorum of
+/// StartViewChange messages.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+#[repr(C)]
+pub struct DoViewChangeHeader {
+    pub checksum: u128,
+    pub checksum_padding: u128,
+    pub checksum_body: u128,
+    pub checksum_body_padding: u128,
+    pub cluster: u128,
+    pub size: u32,
+    pub epoch: u32,
+    pub view: u32,
+    pub release: u32,
+    pub protocol: u16,
+    pub command: Command2,
+    pub replica: u8,
+    pub reserved_frame: [u8; 12],
+
+    /// The highest op-number in this replica's log.
+    /// Used to select the most complete log when log_view values are equal.
+    pub op: u64,
+
+    /// The replica's commit number (highest committed op).
+    /// The new primary sets its commit to max(commit) across all DVCs.
+    pub commit: u64,
+
+    /// The view number when this replica's status was last normal.
+    /// This is the key field for log selection: the replica with the
+    /// highest log_view has the most authoritative log.
+    pub log_view: u32,
+
+    pub reserved: [u8; 108],
+}
+
+unsafe impl Pod for DoViewChangeHeader {}
+unsafe impl Zeroable for DoViewChangeHeader {}
+
+impl ConsensusHeader for DoViewChangeHeader {
+    const COMMAND: Command2 = Command2::DoViewChange;
+
+    fn validate(&self) -> Result<(), ConsensusError> {
+        if self.command != Command2::DoViewChange {
+            return Err(ConsensusError::InvalidCommand {
+                expected: Command2::DoViewChange,
+                found: self.command,
+            });
+        }
+
+        if self.release != 0 {
+            return Err(ConsensusError::InvalidField(
+                "release must be 0".to_string(),
+            ));
+        }
+
+        // log_view must be <= view (can't have been normal in a future view)
+        if self.log_view > self.view {
+            return Err(ConsensusError::InvalidField(
+                "log_view cannot exceed view".to_string(),
+            ));
+        }
+
+        // commit must be <= op (can't commit what we haven't seen)
+        if self.commit > self.op {
+            return Err(ConsensusError::InvalidField(
+                "commit cannot exceed op".to_string(),
+            ));
+        }
+        Ok(())
+    }
+
+    fn size(&self) -> u32 {
+        self.size
+    }
+}
+
+/// StartView message header.
+///
+/// Sent by the new primary to all replicas after collecting a quorum of
+/// DoViewChange messages.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+#[repr(C)]
+pub struct StartViewHeader {
+    pub checksum: u128,
+    pub checksum_padding: u128,
+    pub checksum_body: u128,
+    pub checksum_body_padding: u128,
+    pub cluster: u128,
+    pub size: u32,
+    pub epoch: u32,
+    pub view: u32,
+    pub release: u32,
+    pub protocol: u16,
+    pub command: Command2,
+    pub replica: u8,
+    pub reserved_frame: [u8; 12],
+
+    /// The op-number of the highest entry in the new primary's log.
+    /// Backups set their op to this value.
+    pub op: u64,
+
+    /// The commit number.
+    /// This is max(commit) from all DVCs received by the primary.
+    /// Backups set their commit to this value.
+    pub commit: u64,
+
+    pub reserved: [u8; 112],
+}
+
+unsafe impl Pod for StartViewHeader {}
+unsafe impl Zeroable for StartViewHeader {}
+
+impl ConsensusHeader for StartViewHeader {
+    const COMMAND: Command2 = Command2::StartView;
+
+    fn validate(&self) -> Result<(), ConsensusError> {
+        if self.command != Command2::StartView {
+            return Err(ConsensusError::InvalidCommand {
+                expected: Command2::StartView,
+                found: self.command,
+            });
+        }
+
+        if self.release != 0 {
+            return Err(ConsensusError::InvalidField(
+                "release must be 0".to_string(),
+            ));
+        }
+
+        // commit must be <= op
+        if self.commit > self.op {
+            return Err(ConsensusError::InvalidField(
+                "commit cannot exceed op".to_string(),
+            ));
+        }
+        Ok(())
+    }
+
+    fn size(&self) -> u32 {
+        self.size
+    }
+}
diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index 98d78e737..0be640933 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -15,9 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::{Consensus, Project};
+use crate::vsr_timeout::{TimeoutKind, TimeoutManager};
+use crate::{
+    Consensus, DvcQuorumArray, Project, StoredDvc, dvc_count, dvc_max_commit,
+    dvc_quorum_array_empty, dvc_record, dvc_reset, dvc_select_winner,
+};
 use bit_set::BitSet;
-use iggy_common::header::{Command2, PrepareHeader, PrepareOkHeader, 
RequestHeader};
+use iggy_common::header::{
+    Command2, DoViewChangeHeader, PrepareHeader, PrepareOkHeader, 
RequestHeader,
+    StartViewChangeHeader, StartViewHeader,
+};
 use iggy_common::message::Message;
 use message_bus::IggyMessageBus;
 use std::cell::{Cell, RefCell};
@@ -322,6 +329,25 @@ pub enum Status {
     Recovering,
 }
 
+/// Actions to be taken by the caller after processing a VSR event.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum VsrAction {
+    /// Send StartViewChange to all replicas.
+    SendStartViewChange { view: u32 },
+    /// Send DoViewChange to primary.
+    SendDoViewChange {
+        view: u32,
+        target: u8,
+        log_view: u32,
+        op: u64,
+        commit: u64,
+    },
+    /// Send StartView to all backups (as new primary).
+    SendStartView { view: u32, op: u64, commit: u64 },
+    /// Send PrepareOK to primary.
+    SendPrepareOk { view: u32, op: u64, target: u8 },
+}
+
 #[allow(unused)]
 pub struct VsrConsensus {
     cluster: u128,
@@ -329,6 +355,15 @@ pub struct VsrConsensus {
     replica_count: u8,
 
     view: Cell<u32>,
+
+    // The latest view where
+    // - the replica was a primary and acquired a DVC quorum, or
+    // - the replica was a backup and processed a SV message.
+    // i.e. the latest view in which this replica changed its head message.
+    // Initialized from the superblock's VSRState.
+    // Invariants:
+    // * `replica.log_view ≥ replica.log_view_durable`
+    // * `replica.log_view = 0` when replica_count=1.
     log_view: Cell<u32>,
     status: Cell<Status>,
     commit: Cell<u64>,
@@ -342,6 +377,20 @@ pub struct VsrConsensus {
 
     message_bus: IggyMessageBus,
     // TODO: Add loopback_queue for messages to self
+    /// Tracks start view change messages received from all replicas 
(including self)
+    start_view_change_from_all_replicas: RefCell<BitSet<u32>>,
+
+    /// Tracks DVC messages received (only used by primary candidate)
+    /// Stores metadata; actual log comes from message
+    do_view_change_from_all_replicas: RefCell<DvcQuorumArray>,
+    /// Whether DVC quorum has been achieved in current view change
+    do_view_change_quorum: Cell<bool>,
+    /// Whether we've sent our own SVC for current view
+    sent_own_start_view_change: Cell<bool>,
+    /// Whether we've sent our own DVC for current view
+    sent_own_do_view_change: Cell<bool>,
+
+    timeouts: RefCell<TimeoutManager>,
 }
 
 impl VsrConsensus {
@@ -364,6 +413,12 @@ impl VsrConsensus {
             last_prepare_checksum: Cell::new(0),
             pipeline: RefCell::new(Pipeline::new()),
             message_bus: IggyMessageBus::new(replica_count as usize, replica 
as u16, 0),
+            start_view_change_from_all_replicas: 
RefCell::new(BitSet::with_capacity(REPLICAS_MAX)),
+            do_view_change_from_all_replicas: 
RefCell::new(dvc_quorum_array_empty()),
+            do_view_change_quorum: Cell::new(false),
+            sent_own_start_view_change: Cell::new(false),
+            sent_own_do_view_change: Cell::new(false),
+            timeouts: RefCell::new(TimeoutManager::new(replica as u128)),
         }
     }
 
@@ -383,8 +438,15 @@ impl VsrConsensus {
         assert!(self.commit.get() >= commit);
     }
 
+    /// Maximum number of faulty replicas that can be tolerated.
+    /// For a cluster of 2f+1 replicas, this returns f.
+    pub fn max_faulty(&self) -> usize {
+        (self.replica_count as usize - 1) / 2
+    }
+
+    /// Quorum size = f + 1 = max_faulty + 1
     pub fn quorum(&self) -> usize {
-        (self.replica_count as usize / 2) + 1
+        self.max_faulty() + 1
     }
 
     pub fn commit(&self) -> u64 {
@@ -408,6 +470,10 @@ impl VsrConsensus {
         self.view.get()
     }
 
+    pub fn set_view(&mut self, view: u32) {
+        self.view.set(view);
+    }
+
     pub fn status(&self) -> Status {
         self.status.get()
     }
@@ -428,6 +494,466 @@ impl VsrConsensus {
         self.replica_count
     }
 
+    pub fn log_view(&self) -> u32 {
+        self.log_view.get()
+    }
+
+    pub fn set_log_view(&self, log_view: u32) {
+        self.log_view.set(log_view);
+    }
+
+    pub fn is_primary_for_view(&self, view: u32) -> bool {
+        self.primary_index(view) == self.replica
+    }
+
+    /// Count SVCs from OTHER replicas (excluding self).
+    fn svc_count_excluding_self(&self) -> usize {
+        let svc = self.start_view_change_from_all_replicas.borrow();
+        let total = svc.len();
+        if svc.contains(self.replica as usize) {
+            total.saturating_sub(1)
+        } else {
+            total
+        }
+    }
+
+    /// Reset SVC quorum tracking.
+    fn reset_svc_quorum(&self) {
+        self.start_view_change_from_all_replicas
+            .borrow_mut()
+            .clear();
+    }
+
+    /// Reset DVC quorum tracking.
+    fn reset_dvc_quorum(&self) {
+        dvc_reset(&mut self.do_view_change_from_all_replicas.borrow_mut());
+        self.do_view_change_quorum.set(false);
+    }
+
+    /// Reset all view change state for a new view.
+    fn reset_view_change_state(&self) {
+        self.reset_svc_quorum();
+        self.reset_dvc_quorum();
+        self.sent_own_start_view_change.set(false);
+        self.sent_own_do_view_change.set(false);
+    }
+
+    /// Process one tick. Call this periodically (e.g., every 10ms).
+    ///
+    /// Returns a list of actions to take based on fired timeouts.
+    /// Empty vec means no actions needed.
+    pub fn tick(&self, current_op: u64, current_commit: u64) -> Vec<VsrAction> 
{
+        let mut actions = Vec::new();
+        let mut timeouts = self.timeouts.borrow_mut();
+
+        // Phase 1: Tick all timeouts
+        timeouts.tick();
+
+        // Phase 2: Handle fired timeouts
+        if timeouts.fired(TimeoutKind::NormalHeartbeat) {
+            drop(timeouts);
+            actions.extend(self.handle_normal_heartbeat_timeout());
+            timeouts = self.timeouts.borrow_mut();
+        }
+
+        if timeouts.fired(TimeoutKind::StartViewChangeMessage) {
+            drop(timeouts);
+            actions.extend(self.handle_start_view_change_message_timeout());
+            timeouts = self.timeouts.borrow_mut();
+        }
+
+        if timeouts.fired(TimeoutKind::DoViewChangeMessage) {
+            drop(timeouts);
+            
actions.extend(self.handle_do_view_change_message_timeout(current_op, 
current_commit));
+            timeouts = self.timeouts.borrow_mut();
+        }
+
+        if timeouts.fired(TimeoutKind::ViewChangeStatus) {
+            drop(timeouts);
+            actions.extend(self.handle_view_change_status_timeout());
+            // timeouts = self.timeouts.borrow_mut(); // Not needed if last
+        }
+
+        actions
+    }
+
+    /// Called when normal_heartbeat timeout fires.
+    /// Backup hasn't heard from primary - start view change.
+    fn handle_normal_heartbeat_timeout(&self) -> Vec<VsrAction> {
+        // Only backups trigger view change on heartbeat timeout
+        if self.is_primary() {
+            return Vec::new();
+        }
+
+        // Already in view change
+        if self.status.get() == Status::ViewChange {
+            return Vec::new();
+        }
+
+        // Advance to new view and transition to view change
+        let new_view = self.view.get() + 1;
+
+        self.view.set(new_view);
+        self.status.set(Status::ViewChange);
+        self.reset_view_change_state();
+        self.sent_own_start_view_change.set(true);
+        self.start_view_change_from_all_replicas
+            .borrow_mut()
+            .insert(self.replica as usize);
+
+        // Update timeouts for view change status
+        {
+            let mut timeouts = self.timeouts.borrow_mut();
+            timeouts.stop(TimeoutKind::NormalHeartbeat);
+            timeouts.start(TimeoutKind::StartViewChangeMessage);
+            timeouts.start(TimeoutKind::ViewChangeStatus);
+        }
+
+        vec![VsrAction::SendStartViewChange { view: new_view }]
+    }
+
+    /// Resend SVC message if we've started view change.
+    fn handle_start_view_change_message_timeout(&self) -> Vec<VsrAction> {
+        if !self.sent_own_start_view_change.get() {
+            return Vec::new();
+        }
+
+        self.timeouts
+            .borrow_mut()
+            .reset(TimeoutKind::StartViewChangeMessage);
+
+        vec![VsrAction::SendStartViewChange {
+            view: self.view.get(),
+        }]
+    }
+
+    /// Resend DVC message if we've sent one.
+    fn handle_do_view_change_message_timeout(
+        &self,
+        current_op: u64,
+        current_commit: u64,
+    ) -> Vec<VsrAction> {
+        if self.status.get() != Status::ViewChange {
+            return Vec::new();
+        }
+
+        if !self.sent_own_do_view_change.get() {
+            return Vec::new();
+        }
+
+        // If we're primary candidate with quorum, don't resend
+        if self.is_primary() && self.do_view_change_quorum.get() {
+            return Vec::new();
+        }
+
+        self.timeouts
+            .borrow_mut()
+            .reset(TimeoutKind::DoViewChangeMessage);
+
+        vec![VsrAction::SendDoViewChange {
+            view: self.view.get(),
+            target: self.primary_index(self.view.get()),
+            log_view: self.log_view.get(),
+            op: current_op,
+            commit: current_commit,
+        }]
+    }
+
+    /// Escalate to next view if stuck in view change.
+    fn handle_view_change_status_timeout(&self) -> Vec<VsrAction> {
+        if self.status.get() != Status::ViewChange {
+            return Vec::new();
+        }
+
+        // Escalate: try next view
+        let next_view = self.view.get() + 1;
+
+        self.view.set(next_view);
+        self.reset_view_change_state();
+        self.sent_own_start_view_change.set(true);
+        self.start_view_change_from_all_replicas
+            .borrow_mut()
+            .insert(self.replica as usize);
+
+        self.timeouts
+            .borrow_mut()
+            .reset(TimeoutKind::ViewChangeStatus);
+
+        vec![VsrAction::SendStartViewChange { view: next_view }]
+    }
+
+    /// Handle a received StartViewChange message.
+    ///
+    /// "When replica i receives STARTVIEWCHANGE messages for its view-number
+    /// from f OTHER replicas, it sends a DOVIEWCHANGE message to the node
+    /// that will be the primary in the new view."
+    pub fn handle_start_view_change(&self, header: &StartViewChangeHeader) -> 
Vec<VsrAction> {
+        let from_replica = header.replica;
+        let msg_view = header.view;
+
+        // Ignore SVCs for old views
+        if msg_view < self.view.get() {
+            return Vec::new();
+        }
+
+        let mut actions = Vec::new();
+
+        // If SVC is for a higher view, advance to that view
+        if msg_view > self.view.get() {
+            self.view.set(msg_view);
+            self.status.set(Status::ViewChange);
+            self.reset_view_change_state();
+            self.sent_own_start_view_change.set(true);
+            self.start_view_change_from_all_replicas
+                .borrow_mut()
+                .insert(self.replica as usize);
+
+            // Update timeouts
+            {
+                let mut timeouts = self.timeouts.borrow_mut();
+                timeouts.stop(TimeoutKind::NormalHeartbeat);
+                timeouts.start(TimeoutKind::StartViewChangeMessage);
+                timeouts.start(TimeoutKind::ViewChangeStatus);
+            }
+
+            // Send our own SVC
+            actions.push(VsrAction::SendStartViewChange { view: msg_view });
+        }
+
+        // Record the SVC from sender
+        self.start_view_change_from_all_replicas
+            .borrow_mut()
+            .insert(from_replica as usize);
+
+        // Check if we have f SVCs from OTHER replicas
+        // We need f SVCs from others to send DVC
+        if !self.sent_own_do_view_change.get()
+            && self.svc_count_excluding_self() >= self.max_faulty()
+        {
+            self.sent_own_do_view_change.set(true);
+
+            let primary_candidate = self.primary_index(self.view.get());
+            let current_op = self.sequencer.current_sequence();
+            let current_commit = self.commit.get();
+
+            // Start DVC timeout
+            self.timeouts
+                .borrow_mut()
+                .start(TimeoutKind::DoViewChangeMessage);
+
+            actions.push(VsrAction::SendDoViewChange {
+                view: self.view.get(),
+                target: primary_candidate,
+                log_view: self.log_view.get(),
+                op: current_op,
+                commit: current_commit,
+            });
+
+            // If we are the primary candidate, record our own DVC
+            if primary_candidate == self.replica {
+                let own_dvc = StoredDvc {
+                    replica: self.replica,
+                    log_view: self.log_view.get(),
+                    op: current_op,
+                    commit: current_commit,
+                };
+                dvc_record(
+                    &mut self.do_view_change_from_all_replicas.borrow_mut(),
+                    own_dvc,
+                );
+
+                // Check if we now have quorum
+                if dvc_count(&self.do_view_change_from_all_replicas.borrow()) 
>= self.quorum() {
+                    self.do_view_change_quorum.set(true);
+                    actions.extend(self.complete_view_change_as_primary());
+                }
+            }
+        }
+
+        actions
+    }
+
+    /// Handle a received DoViewChange message (only relevant for primary 
candidate).
+    ///
+    /// "When the new primary receives f + 1 DOVIEWCHANGE messages from 
different
+    /// replicas (including itself), it sets its view-number to that in the 
messages
+    /// and selects as the new log the one contained in the message with the 
largest v'..."
+    pub fn handle_do_view_change(&self, header: &DoViewChangeHeader) -> 
Vec<VsrAction> {
+        let from_replica = header.replica;
+        let msg_view = header.view;
+        let msg_log_view = header.log_view;
+        let msg_op = header.op;
+        let msg_commit = header.commit;
+
+        // Ignore DVCs for old views
+        if msg_view < self.view.get() {
+            return Vec::new();
+        }
+
+        let mut actions = Vec::new();
+
+        // If DVC is for a higher view, advance to that view
+        if msg_view > self.view.get() {
+            self.view.set(msg_view);
+            self.status.set(Status::ViewChange);
+            self.reset_view_change_state();
+            self.sent_own_start_view_change.set(true);
+            self.start_view_change_from_all_replicas
+                .borrow_mut()
+                .insert(self.replica as usize);
+
+            // Update timeouts
+            {
+                let mut timeouts = self.timeouts.borrow_mut();
+                timeouts.stop(TimeoutKind::NormalHeartbeat);
+                timeouts.start(TimeoutKind::StartViewChangeMessage);
+                timeouts.start(TimeoutKind::ViewChangeStatus);
+            }
+
+            // Send our own SVC
+            actions.push(VsrAction::SendStartViewChange { view: msg_view });
+        }
+
+        // Only the primary candidate processes DVCs for quorum
+        if !self.is_primary_for_view(self.view.get()) {
+            return actions;
+        }
+
+        // Must be in view change to process DVCs
+        if self.status.get() != Status::ViewChange {
+            return actions;
+        }
+
+        let current_op = self.sequencer.current_sequence();
+        let current_commit = self.commit.get();
+
+        // If we haven't sent our own DVC yet, record it
+        if !self.sent_own_do_view_change.get() {
+            self.sent_own_do_view_change.set(true);
+
+            let own_dvc = StoredDvc {
+                replica: self.replica,
+                log_view: self.log_view.get(),
+                op: current_op,
+                commit: current_commit,
+            };
+            dvc_record(
+                &mut self.do_view_change_from_all_replicas.borrow_mut(),
+                own_dvc,
+            );
+        }
+
+        // Record the received DVC
+        let dvc = StoredDvc {
+            replica: from_replica,
+            log_view: msg_log_view,
+            op: msg_op,
+            commit: msg_commit,
+        };
+        dvc_record(&mut self.do_view_change_from_all_replicas.borrow_mut(), 
dvc);
+
+        // Check if quorum achieved
+        if !self.do_view_change_quorum.get()
+            && dvc_count(&self.do_view_change_from_all_replicas.borrow()) >= 
self.quorum()
+        {
+            self.do_view_change_quorum.set(true);
+            actions.extend(self.complete_view_change_as_primary());
+        }
+
+        actions
+    }
+
+    /// Handle a received StartView message (backups only).
+    ///
+    /// "When other replicas receive the STARTVIEW message, they replace their 
log
+    /// with the one in the message, set their op-number to that of the latest 
entry
+    /// in the log, set their view-number to the view number in the message, 
change
+    /// their status to normal, and send PrepareOK for any uncommitted ops."
+    pub fn handle_start_view(&self, header: &StartViewHeader) -> 
Vec<VsrAction> {
+        let from_replica = header.replica;
+        let msg_view = header.view;
+        let msg_op = header.op;
+        let msg_commit = header.commit;
+
+        // Verify sender is the primary for this view
+        if self.primary_index(msg_view) != from_replica {
+            return Vec::new();
+        }
+
+        // Ignore old views
+        if msg_view < self.view.get() {
+            return Vec::new();
+        }
+
+        // We shouldn't process our own StartView
+        if from_replica == self.replica {
+            return Vec::new();
+        }
+
+        // Accept the StartView and transition to normal
+        self.view.set(msg_view);
+        self.log_view.set(msg_view);
+        self.status.set(Status::Normal);
+        self.advance_commit_number(msg_commit);
+        self.reset_view_change_state();
+
+        // Update our op to match the new primary's log
+        self.sequencer.set_sequence(msg_op);
+
+        // Update timeouts for normal backup operation
+        {
+            let mut timeouts = self.timeouts.borrow_mut();
+            timeouts.stop(TimeoutKind::ViewChangeStatus);
+            timeouts.stop(TimeoutKind::DoViewChangeMessage);
+            timeouts.stop(TimeoutKind::RequestStartViewMessage);
+            timeouts.start(TimeoutKind::NormalHeartbeat);
+        }
+
+        // Send PrepareOK for uncommitted ops (commit+1 to op)
+        let mut actions = Vec::new();
+        for op_num in (msg_commit + 1)..=msg_op {
+            actions.push(VsrAction::SendPrepareOk {
+                view: msg_view,
+                op: op_num,
+                target: from_replica, // Send to new primary
+            });
+        }
+
+        actions
+    }
+
+    /// Complete view change as the new primary after collecting DVC quorum.
+    fn complete_view_change_as_primary(&self) -> Vec<VsrAction> {
+        let dvc_array = self.do_view_change_from_all_replicas.borrow();
+
+        let Some(winner) = dvc_select_winner(&dvc_array) else {
+            return Vec::new();
+        };
+
+        let new_op = winner.op;
+        let max_commit = dvc_max_commit(&dvc_array);
+
+        // Update state
+        self.log_view.set(self.view.get());
+        self.status.set(Status::Normal);
+        self.advance_commit_number(max_commit);
+
+        // Update timeouts for normal primary operation
+        {
+            let mut timeouts = self.timeouts.borrow_mut();
+            timeouts.stop(TimeoutKind::ViewChangeStatus);
+            timeouts.stop(TimeoutKind::DoViewChangeMessage);
+            timeouts.stop(TimeoutKind::StartViewChangeMessage);
+            timeouts.start(TimeoutKind::CommitMessage);
+        }
+
+        vec![VsrAction::SendStartView {
+            view: self.view.get(),
+            op: new_op,
+            commit: max_commit,
+        }]
+    }
+
     /// Handle a prepare_ok message from a follower.
     /// Called on the primary when a follower acknowledges a prepare.
     ///
diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs
index 8ff864ffd..e77c575ce 100644
--- a/core/consensus/src/lib.rs
+++ b/core/consensus/src/lib.rs
@@ -43,4 +43,6 @@ pub trait Consensus {
 mod impls;
 pub use impls::*;
 
+mod view_change_quorum;
+pub use view_change_quorum::*;
 mod vsr_timeout;
diff --git a/core/consensus/src/view_change_quorum.rs 
b/core/consensus/src/view_change_quorum.rs
new file mode 100644
index 000000000..2a8e01e9b
--- /dev/null
+++ b/core/consensus/src/view_change_quorum.rs
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::REPLICAS_MAX;
+
+/// Stored information from a DoViewChange message.
+#[derive(Debug, Clone, Copy)]
+pub struct StoredDvc {
+    pub replica: u8,
+    /// The view when the replica's status was last normal.
+    pub log_view: u32,
+    pub op: u64,
+    pub commit: u64,
+}
+
+impl StoredDvc {
+    /// Compare for log selection: highest log_view, then highest op.
+    pub fn is_better_than(&self, other: &StoredDvc) -> bool {
+        if self.log_view != other.log_view {
+            self.log_view > other.log_view
+        } else {
+            self.op > other.op
+        }
+    }
+}
+
+/// Array type for storing DVC messages from all replicas.
+pub type DvcQuorumArray = [Option<StoredDvc>; REPLICAS_MAX];
+
+/// Create an empty DVC quorum array.
+pub const fn dvc_quorum_array_empty() -> DvcQuorumArray {
+    [None; REPLICAS_MAX]
+}
+
+/// Record a DVC in the array. Returns true if this is a new entry (not 
duplicate).
+pub fn dvc_record(array: &mut DvcQuorumArray, dvc: StoredDvc) -> bool {
+    let slot = &mut array[dvc.replica as usize];
+    if slot.is_some() {
+        return false; // Duplicate
+    }
+    *slot = Some(dvc);
+    true
+}
+
+/// Count how many DVCs have been received.
+pub fn dvc_count(array: &DvcQuorumArray) -> usize {
+    array.iter().filter(|m| m.is_some()).count()
+}
+
+/// Check if a specific replica has sent a DVC.
+pub fn dvc_has_from(array: &DvcQuorumArray, replica: u8) -> bool {
+    array
+        .get(replica as usize)
+        .map(|m| m.is_some())
+        .unwrap_or(false)
+}
+
+/// Select the winning DVC (best log) from the quorum.
+/// Returns the DVC with: highest log_view, then highest op.
+pub fn dvc_select_winner(array: &DvcQuorumArray) -> Option<&StoredDvc> {
+    array
+        .iter()
+        .filter_map(|m| m.as_ref())
+        .max_by(|a, b| match a.log_view.cmp(&b.log_view) {
+            std::cmp::Ordering::Equal => a.op.cmp(&b.op),
+            other => other,
+        })
+}
+
+/// Get the maximum commit number across all DVCs.
+pub fn dvc_max_commit(array: &DvcQuorumArray) -> u64 {
+    array
+        .iter()
+        .filter_map(|m| m.as_ref())
+        .map(|dvc| dvc.commit)
+        .max()
+        .unwrap_or(0)
+}
+
+/// Reset the DVC quorum array.
+pub fn dvc_reset(array: &mut DvcQuorumArray) {
+    *array = dvc_quorum_array_empty();
+}
+
+/// Iterator over all stored DVCs.
+pub fn dvc_iter(array: &DvcQuorumArray) -> impl Iterator<Item = &StoredDvc> {
+    array.iter().filter_map(|m| m.as_ref())
+}
diff --git a/core/consensus/src/vsr_timeout.rs 
b/core/consensus/src/vsr_timeout.rs
index d572bb65e..140d40f02 100644
--- a/core/consensus/src/vsr_timeout.rs
+++ b/core/consensus/src/vsr_timeout.rs
@@ -102,6 +102,7 @@ pub enum TimeoutKind {
     CommitMessage,
     NormalHeartbeat,
     StartViewChangeMessage,
+    ViewChangeStatus,
     DoViewChangeMessage,
     RequestStartViewMessage,
 }
@@ -115,6 +116,7 @@ pub struct TimeoutManager {
     normal_heartbeat: Timeout,
     start_view_change_message: Timeout,
     do_view_change_message: Timeout,
+    view_change_status: Timeout,
     request_start_view_message: Timeout,
     prng: Xoshiro256Plus,
 }
@@ -128,6 +130,7 @@ impl TimeoutManager {
     const COMMIT_MESSAGE_TICKS: u64 = 50;
     const NORMAL_HEARTBEAT_TICKS: u64 = 500;
     const START_VIEW_CHANGE_MESSAGE_TICKS: u64 = 50;
+    const VIEW_CHANGE_STATUS_TICKS: u64 = 500;
     const DO_VIEW_CHANGE_MESSAGE_TICKS: u64 = 50;
     const REQUEST_START_VIEW_MESSAGE_TICKS: u64 = 100;
 
@@ -142,6 +145,7 @@ impl TimeoutManager {
                 Self::START_VIEW_CHANGE_MESSAGE_TICKS,
             ),
             do_view_change_message: Timeout::new(replica_id, 
Self::DO_VIEW_CHANGE_MESSAGE_TICKS),
+            view_change_status: Timeout::new(replica_id, 
Self::VIEW_CHANGE_STATUS_TICKS),
             request_start_view_message: Timeout::new(
                 replica_id,
                 Self::REQUEST_START_VIEW_MESSAGE_TICKS,
@@ -174,6 +178,7 @@ impl TimeoutManager {
             TimeoutKind::CommitMessage => &self.commit_message,
             TimeoutKind::NormalHeartbeat => &self.normal_heartbeat,
             TimeoutKind::StartViewChangeMessage => 
&self.start_view_change_message,
+            TimeoutKind::ViewChangeStatus => &self.view_change_status,
             TimeoutKind::DoViewChangeMessage => &self.do_view_change_message,
             TimeoutKind::RequestStartViewMessage => 
&self.request_start_view_message,
         }
@@ -186,6 +191,7 @@ impl TimeoutManager {
             TimeoutKind::CommitMessage => &mut self.commit_message,
             TimeoutKind::NormalHeartbeat => &mut self.normal_heartbeat,
             TimeoutKind::StartViewChangeMessage => &mut 
self.start_view_change_message,
+            TimeoutKind::ViewChangeStatus => &mut self.view_change_status,
             TimeoutKind::DoViewChangeMessage => &mut 
self.do_view_change_message,
             TimeoutKind::RequestStartViewMessage => &mut 
self.request_start_view_message,
         }
@@ -210,6 +216,7 @@ impl TimeoutManager {
             TimeoutKind::CommitMessage => &mut self.commit_message,
             TimeoutKind::NormalHeartbeat => &mut self.normal_heartbeat,
             TimeoutKind::StartViewChangeMessage => &mut 
self.start_view_change_message,
+            TimeoutKind::ViewChangeStatus => &mut self.view_change_status,
             TimeoutKind::DoViewChangeMessage => &mut 
self.do_view_change_message,
             TimeoutKind::RequestStartViewMessage => &mut 
self.request_start_view_message,
         };


Reply via email to