This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch pedantic-consensus
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 86c4bc97adcb144b487204ddc6b6e1be813c1e55
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Tue Mar 3 13:00:10 2026 +0100

    chore(consensus): enable clippy pedantic/nursery and fix all warnings
---
 core/consensus/Cargo.toml                 |   5 +
 core/consensus/src/impls.rs               | 169 +++++++++++++++++++++---------
 core/consensus/src/lib.rs                 |   2 +-
 core/consensus/src/namespaced_pipeline.rs |  17 ++-
 core/consensus/src/plane_helpers.rs       |  44 +++++++-
 core/consensus/src/plane_mux.rs           |  14 ++-
 core/consensus/src/view_change_quorum.rs  |  30 +++---
 core/consensus/src/vsr_timeout.rs         |  35 ++++---
 core/shard/src/router.rs                  |  12 ++-
 9 files changed, 239 insertions(+), 89 deletions(-)

diff --git a/core/consensus/Cargo.toml b/core/consensus/Cargo.toml
index 70c6ff46b..33948523e 100644
--- a/core/consensus/Cargo.toml
+++ b/core/consensus/Cargo.toml
@@ -38,3 +38,8 @@ rand_xoshiro = { workspace = true }
 
 [dev-dependencies]
 futures = { workspace = true }
+
+[lints.clippy]
+enum_glob_use = "deny"
+pedantic = "deny"
+nursery = "deny"
diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index e1e992d8a..016fe839f 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -50,7 +50,8 @@ pub struct LocalSequencer {
 }
 
 impl LocalSequencer {
-    pub fn new(initial_op: u64) -> Self {
+    #[must_use]
+    pub const fn new(initial_op: u64) -> Self {
         Self {
             op: Cell::new(initial_op),
         }
@@ -89,11 +90,12 @@ pub struct PipelineEntry {
     pub header: PrepareHeader,
     /// Bitmap of replicas that have acknowledged this prepare.
     pub ok_from_replicas: BitSet<u32>,
-    /// Whether we've received a quorum of prepare_ok messages.
+    /// Whether we've received a quorum of `prepare_ok` messages.
     pub ok_quorum_received: bool,
 }
 
 impl PipelineEntry {
+    #[must_use]
     pub fn new(header: PrepareHeader) -> Self {
         Self {
             header,
@@ -102,7 +104,7 @@ impl PipelineEntry {
         }
     }
 
-    /// Record a prepare_ok from the given replica.
+    /// Record a `prepare_ok` from the given replica.
     /// Returns the new count of acknowledgments.
     pub fn add_ack(&mut self, replica: u8) -> usize {
         self.ok_from_replicas.insert(replica as usize);
@@ -110,11 +112,13 @@ impl PipelineEntry {
     }
 
     /// Check if we have an ack from the given replica.
+    #[must_use]
     pub fn has_ack(&self, replica: u8) -> bool {
         self.ok_from_replicas.contains(replica as usize)
     }
 
     /// Get the number of acks received.
+    #[must_use]
     pub fn ack_count(&self) -> usize {
         self.ok_from_replicas.len()
     }
@@ -128,7 +132,7 @@ pub struct RequestEntry {
 }
 
 impl RequestEntry {
-    pub fn new(message: Message<RequestHeader>) -> Self {
+    pub const fn new(message: Message<RequestHeader>) -> Self {
         Self {
             message,
             received_at: 0, //TODO figure the correct way to do this
@@ -149,25 +153,30 @@ impl Default for LocalPipeline {
 }
 
 impl LocalPipeline {
+    #[must_use]
     pub fn new() -> Self {
         Self {
             prepare_queue: VecDeque::with_capacity(PIPELINE_PREPARE_QUEUE_MAX),
         }
     }
 
+    #[must_use]
     pub fn prepare_count(&self) -> usize {
         self.prepare_queue.len()
     }
 
+    #[must_use]
     pub fn prepare_queue_full(&self) -> bool {
         self.prepare_queue.len() >= PIPELINE_PREPARE_QUEUE_MAX
     }
 
     /// Returns true if prepare queue is full.
+    #[must_use]
     pub fn is_full(&self) -> bool {
         self.prepare_queue_full()
     }
 
+    #[must_use]
     pub fn is_empty(&self) -> bool {
         self.prepare_queue.is_empty()
     }
@@ -177,6 +186,8 @@ impl LocalPipeline {
     /// # Panics
     /// - If message queue is full.
     /// - If the message doesn't chain correctly to the previous entry.
+    // Signature must match `Pipeline::push_message` trait definition.
+    #[allow(clippy::needless_pass_by_value)]
     pub fn push_message(&mut self, message: Message<PrepareHeader>) {
         assert!(!self.prepare_queue_full(), "prepare queue is full");
 
@@ -208,6 +219,7 @@ impl LocalPipeline {
     }
 
     /// Get the head (oldest) prepare.
+    #[must_use]
     pub fn prepare_head(&self) -> Option<&PipelineEntry> {
         self.prepare_queue.front()
     }
@@ -217,11 +229,15 @@ impl LocalPipeline {
     }
 
     /// Get the tail (newest) prepare.
+    #[must_use]
     pub fn prepare_tail(&self) -> Option<&PipelineEntry> {
         self.prepare_queue.back()
     }
 
     /// Find a message by op number and checksum (immutable).
+    // Pipeline bounded at PIPELINE_PREPARE_QUEUE_MAX (8) entries; index 
always fits in usize.
+    #[must_use]
+    #[allow(clippy::cast_possible_truncation)]
     pub fn message_by_op_and_checksum(&self, op: u64, checksum: u128) -> 
Option<&PipelineEntry> {
         let head_op = self.prepare_queue.front()?.header.op;
         let tail_op = self.prepare_queue.back()?.header.op;
@@ -250,6 +266,9 @@ impl LocalPipeline {
     }
 
     /// Find a message by op number only.
+    // Pipeline bounded at PIPELINE_PREPARE_QUEUE_MAX (8) entries; index 
always fits in usize.
+    #[must_use]
+    #[allow(clippy::cast_possible_truncation)]
     pub fn message_by_op(&self, op: u64) -> Option<&PipelineEntry> {
         let head_op = self.prepare_queue.front()?.header.op;
 
@@ -263,6 +282,8 @@ impl LocalPipeline {
 
     /// Get mutable reference to a message entry by op number.
     /// Returns None if op is not in the pipeline.
+    // Pipeline bounded at PIPELINE_PREPARE_QUEUE_MAX (8) entries; index 
always fits in usize.
+    #[allow(clippy::cast_possible_truncation)]
     pub fn message_by_op_mut(&mut self, op: u64) -> Option<&mut PipelineEntry> 
{
         let head_op = self.prepare_queue.front()?.header.op;
         if op < head_op {
@@ -276,14 +297,16 @@ impl LocalPipeline {
     }
 
     /// Get the entry at the head of the prepare queue (oldest uncommitted).
+    #[must_use]
     pub fn head(&self) -> Option<&PipelineEntry> {
         self.prepare_queue.front()
     }
 
-    /// Search prepare queue for a message from the given client.
+    /// Search `prepare_queue` for a message from the given client.
     ///
-    /// If there are multiple messages (possible in prepare_queue after view 
change),
+    /// If there are multiple messages (possible in `prepare_queue` after view 
change),
     /// returns the latest one.
+    #[must_use]
     pub fn has_message_from_client(&self, client: u128) -> bool {
         self.prepare_queue.iter().any(|p| p.header.client == client)
     }
@@ -324,43 +347,43 @@ impl Pipeline for LocalPipeline {
     type Entry = PipelineEntry;
 
     fn push_message(&mut self, message: Self::Message) {
-        LocalPipeline::push_message(self, message)
+        Self::push_message(self, message);
     }
 
     fn pop_message(&mut self) -> Option<Self::Entry> {
-        LocalPipeline::pop_message(self)
+        Self::pop_message(self)
     }
 
     fn clear(&mut self) {
-        LocalPipeline::clear(self)
+        Self::clear(self);
     }
 
     fn message_by_op(&self, op: u64) -> Option<&Self::Entry> {
-        LocalPipeline::message_by_op(self, op)
+        Self::message_by_op(self, op)
     }
 
     fn message_by_op_mut(&mut self, op: u64) -> Option<&mut Self::Entry> {
-        LocalPipeline::message_by_op_mut(self, op)
+        Self::message_by_op_mut(self, op)
     }
 
     fn message_by_op_and_checksum(&self, op: u64, checksum: u128) -> 
Option<&Self::Entry> {
-        LocalPipeline::message_by_op_and_checksum(self, op, checksum)
+        Self::message_by_op_and_checksum(self, op, checksum)
     }
 
     fn head(&self) -> Option<&Self::Entry> {
-        LocalPipeline::head(self)
+        Self::head(self)
     }
 
     fn is_full(&self) -> bool {
-        LocalPipeline::is_full(self)
+        Self::is_full(self)
     }
 
     fn is_empty(&self) -> bool {
-        LocalPipeline::is_empty(self)
+        Self::is_empty(self)
     }
 
     fn verify(&self) {
-        LocalPipeline::verify(self)
+        Self::verify(self);
     }
 }
 
@@ -374,9 +397,9 @@ pub enum Status {
 /// Actions to be taken by the caller after processing a VSR event.
 #[derive(Debug, Clone, PartialEq, Eq)]
 pub enum VsrAction {
-    /// Send StartViewChange to all replicas.
+    /// Send `StartViewChange` to all replicas.
     SendStartViewChange { view: u32, namespace: u64 },
-    /// Send DoViewChange to primary.
+    /// Send `DoViewChange` to primary.
     SendDoViewChange {
         view: u32,
         target: u8,
@@ -385,14 +408,14 @@ pub enum VsrAction {
         commit: u64,
         namespace: u64,
     },
-    /// Send StartView to all backups (as new primary).
+    /// Send `StartView` to all backups (as new primary).
     SendStartView {
         view: u32,
         op: u64,
         commit: u64,
         namespace: u64,
     },
-    /// Send PrepareOK to primary.
+    /// Send `PrepareOK` to primary.
     SendPrepareOk {
         view: u32,
         op: u64,
@@ -453,6 +476,9 @@ where
 }
 
 impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> {
+    /// # Panics
+    /// - If `replica >= replica_count`.
+    /// - If `replica_count < 1`.
     pub fn new(
         cluster: u128,
         replica: u8,
@@ -469,7 +495,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
         // TODO: Verify that XOR-based seeding provides sufficient jitter 
diversity
         // across groups. Consider using a proper hash (e.g., Murmur3) of
         // (replica_id, namespace) for production.
-        let timeout_seed = replica as u128 ^ namespace as u128;
+        let timeout_seed = u128::from(replica) ^ u128::from(namespace);
         Self {
             cluster,
             replica,
@@ -499,14 +525,21 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
         self.status.set(Status::Normal);
     }
 
-    pub fn primary_index(&self, view: u32) -> u8 {
+    #[must_use]
+    // cast_lossless: `u32::from()` unavailable in const fn.
+    // cast_possible_truncation: modulo by replica_count (u8) guarantees 
result fits in u8.
+    #[allow(clippy::cast_lossless, clippy::cast_possible_truncation)]
+    pub const fn primary_index(&self, view: u32) -> u8 {
         (view % self.replica_count as u32) as u8
     }
 
-    pub fn is_primary(&self) -> bool {
+    #[must_use]
+    pub const fn is_primary(&self) -> bool {
         self.primary_index(self.view.get()) == self.replica
     }
 
+    /// # Panics
+    /// If the stored commit number somehow exceeds the given `commit` after 
update.
     pub fn advance_commit_number(&self, commit: u64) {
         if commit > self.commit.get() {
             self.commit.set(commit);
@@ -517,33 +550,40 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
 
     /// Maximum number of faulty replicas that can be tolerated.
     /// For a cluster of 2f+1 replicas, this returns f.
-    pub fn max_faulty(&self) -> usize {
+    #[must_use]
+    pub const fn max_faulty(&self) -> usize {
         (self.replica_count as usize - 1) / 2
     }
 
-    /// Quorum size = f + 1 = max_faulty + 1
-    pub fn quorum(&self) -> usize {
+    /// Quorum size = f + 1 = `max_faulty` + 1
+    #[must_use]
+    pub const fn quorum(&self) -> usize {
         self.max_faulty() + 1
     }
 
-    pub fn commit(&self) -> u64 {
+    #[must_use]
+    pub const fn commit(&self) -> u64 {
         self.commit.get()
     }
 
-    pub fn is_syncing(&self) -> bool {
-        // for now return false. we have to add syncing related setup to 
VsrConsensus to make this work.
+    #[must_use]
+    pub const fn is_syncing(&self) -> bool {
+        // TODO: for now return false. we have to add syncing related setup to 
VsrConsensus to make this work.
         false
     }
 
-    pub fn replica(&self) -> u8 {
+    #[must_use]
+    pub const fn replica(&self) -> u8 {
         self.replica
     }
 
-    pub fn sequencer(&self) -> &LocalSequencer {
+    #[must_use]
+    pub const fn sequencer(&self) -> &LocalSequencer {
         &self.sequencer
     }
 
-    pub fn view(&self) -> u32 {
+    #[must_use]
+    pub const fn view(&self) -> u32 {
         self.view.get()
     }
 
@@ -551,34 +591,41 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
         self.view.set(view);
     }
 
-    pub fn status(&self) -> Status {
+    #[must_use]
+    pub const fn status(&self) -> Status {
         self.status.get()
     }
 
     // TODO(hubcio): returning &RefCell<P> leaks interior mutability - callers
     // could hold a Ref/RefMut across an .await and cause a runtime panic.
     // We had this problem with slab + ECS.
-    pub fn pipeline(&self) -> &RefCell<P> {
+    #[must_use]
+    pub const fn pipeline(&self) -> &RefCell<P> {
         &self.pipeline
     }
 
-    pub fn pipeline_mut(&mut self) -> &mut RefCell<P> {
+    #[must_use]
+    pub const fn pipeline_mut(&mut self) -> &mut RefCell<P> {
         &mut self.pipeline
     }
 
-    pub fn cluster(&self) -> u128 {
+    #[must_use]
+    pub const fn cluster(&self) -> u128 {
         self.cluster
     }
 
-    pub fn replica_count(&self) -> u8 {
+    #[must_use]
+    pub const fn replica_count(&self) -> u8 {
         self.replica_count
     }
 
-    pub fn namespace(&self) -> u64 {
+    #[must_use]
+    pub const fn namespace(&self) -> u64 {
         self.namespace
     }
 
-    pub fn last_prepare_checksum(&self) -> u128 {
+    #[must_use]
+    pub const fn last_prepare_checksum(&self) -> u128 {
         self.last_prepare_checksum.get()
     }
 
@@ -586,7 +633,8 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
         self.last_prepare_checksum.set(checksum);
     }
 
-    pub fn log_view(&self) -> u32 {
+    #[must_use]
+    pub const fn log_view(&self) -> u32 {
         self.log_view.get()
     }
 
@@ -594,7 +642,8 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
         self.log_view.set(log_view);
     }
 
-    pub fn is_primary_for_view(&self, view: u32) -> bool {
+    #[must_use]
+    pub const fn is_primary_for_view(&self, view: u32) -> bool {
         self.primary_index(view) == self.replica
     }
 
@@ -624,9 +673,9 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
 
     /// Reset all view change state when transitioning to a new view.
     ///
-    /// Clears the loopback queue: stale PrepareOks from the old view
+    /// Clears the loopback queue: stale `PrepareOks` from the old view
     /// reference pipeline entries that no longer exist, so processing
-    /// them would be a no-op (handle_prepare_ok ignores unknown ops).
+    /// them would be a no-op (`handle_prepare_ok` ignores unknown ops).
     /// The primary does not require its own self-ack for quorum.
     pub(crate) fn reset_view_change_state(&self) {
         self.reset_svc_quorum();
@@ -675,7 +724,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
         actions
     }
 
-    /// Called when normal_heartbeat timeout fires.
+    /// Called when `normal_heartbeat` timeout fires.
     /// Backup hasn't heard from primary - start view change.
     fn handle_normal_heartbeat_timeout(&self) -> Vec<VsrAction> {
         // Only backups trigger view change on heartbeat timeout
@@ -788,11 +837,14 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
         }]
     }
 
-    /// Handle a received StartViewChange message.
+    /// Handle a received `StartViewChange` message.
     ///
     /// "When replica i receives STARTVIEWCHANGE messages for its view-number
     /// from f OTHER replicas, it sends a DOVIEWCHANGE message to the node
     /// that will be the primary in the new view."
+    ///
+    /// # Panics
+    /// If `header.namespace` does not match this replica's namespace.
     pub fn handle_start_view_change(&self, header: &StartViewChangeHeader) -> 
Vec<VsrAction> {
         assert_eq!(
             header.namespace, self.namespace,
@@ -887,11 +939,14 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
         actions
     }
 
-    /// Handle a received DoViewChange message (only relevant for primary 
candidate).
+    /// Handle a received `DoViewChange` message (only relevant for primary 
candidate).
     ///
     /// "When the new primary receives f + 1 DOVIEWCHANGE messages from 
different
     /// replicas (including itself), it sets its view-number to that in the 
messages
     /// and selects as the new log the one contained in the message with the 
largest v'..."
+    ///
+    /// # Panics
+    /// If `header.namespace` does not match this replica's namespace.
     pub fn handle_do_view_change(&self, header: &DoViewChangeHeader) -> 
Vec<VsrAction> {
         assert_eq!(
             header.namespace, self.namespace,
@@ -984,12 +1039,15 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
         actions
     }
 
-    /// Handle a received StartView message (backups only).
+    /// Handle a received `StartView` message (backups only).
     ///
     /// "When other replicas receive the STARTVIEW message, they replace their 
log
     /// with the one in the message, set their op-number to that of the latest 
entry
     /// in the log, set their view-number to the view number in the message, 
change
-    /// their status to normal, and send PrepareOK for any uncommitted ops."
+    /// their status to normal, and send `PrepareOK` for any uncommitted ops."
+    ///
+    /// # Panics
+    /// If `header.namespace` does not match this replica's namespace.
     pub fn handle_start_view(&self, header: &StartViewHeader) -> 
Vec<VsrAction> {
         assert_eq!(header.namespace, self.namespace, "SV routed to wrong 
group");
         let from_replica = header.replica;
@@ -1091,10 +1149,14 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
         }]
     }
 
-    /// Handle a PrepareOk message from a replica.
+    /// Handle a `PrepareOk` message from a replica.
     ///
     /// Returns `true` if quorum was just reached for this op.
     /// Caller (`on_ack`) should validate `is_primary` and status before 
calling.
+    ///
+    /// # Panics
+    /// - If `header.command` is not `Command2::PrepareOk`.
+    /// - If `header.replica >= self.replica_count`.
     pub fn handle_prepare_ok(&self, header: &PrepareOkHeader) -> bool {
         assert_eq!(header.command, Command2::PrepareOk);
         assert!(
@@ -1151,7 +1213,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
 
     /// Enqueue a self-addressed message for processing in the next loopback 
drain.
     ///
-    /// Currently only PrepareOk messages are routed here (via 
`send_or_loopback`).
+    /// Currently only `PrepareOk` messages are routed here (via 
`send_or_loopback`).
     // TODO: Route SVC/DVC self-messages through loopback once VsrAction 
dispatch is implemented.
     pub(crate) fn push_loopback(&self, message: Message<GenericHeader>) {
         assert!(
@@ -1170,6 +1232,8 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
     }
 
     /// Send a message to `target`, routing self-addressed messages through 
the loopback queue.
+    // VsrConsensus uses Cell/RefCell for single-threaded compio shards; 
futures are intentionally !Send.
+    #[allow(clippy::future_not_send)]
     pub(crate) async fn send_or_loopback(&self, target: u8, message: 
Message<GenericHeader>)
     where
         B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = 
u128>,
@@ -1185,7 +1249,8 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
         }
     }
 
-    pub fn message_bus(&self) -> &B {
+    #[must_use]
+    pub const fn message_bus(&self) -> &B {
         &self.message_bus
     }
 }
@@ -1226,7 +1291,7 @@ where
 impl<B, P> Project<Message<PrepareOkHeader>, VsrConsensus<B, P>> for 
Message<PrepareHeader>
 where
     B: MessageBus,
-    P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
+    P: Pipeline<Message = Self, Entry = PipelineEntry>,
 {
     type Consensus = VsrConsensus<B, P>;
 
diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs
index b1f7460e2..dcc5cbc6d 100644
--- a/core/consensus/src/lib.rs
+++ b/core/consensus/src/lib.rs
@@ -78,7 +78,7 @@ pub trait Consensus: Sized {
 /// This abstracts the VSR message flow:
 /// - request -> prepare
 /// - replicate (prepare)
-/// - ack (prepare_ok)
+/// - ack (`prepare_ok`)
 pub trait Plane<C>
 where
     C: Consensus,
diff --git a/core/consensus/src/namespaced_pipeline.rs 
b/core/consensus/src/namespaced_pipeline.rs
index 05d47d80f..2eb90125b 100644
--- a/core/consensus/src/namespaced_pipeline.rs
+++ b/core/consensus/src/namespaced_pipeline.rs
@@ -24,7 +24,7 @@ use std::collections::{HashMap, VecDeque};
 /// Pipeline that partitions entries by namespace for independent commit 
draining.
 ///
 /// A single global op sequence and hash chain spans all namespaces, but 
entries
-/// are stored in per-namespace VecDeques. Each namespace tracks its own commit
+/// are stored in per-namespace `VecDeques`. Each namespace tracks its own 
commit
 /// frontier so `drain_committable_all` drains quorum'd entries per-namespace
 /// without waiting for the global commit to advance past unrelated namespaces.
 ///
@@ -58,6 +58,7 @@ impl Default for NamespacedPipeline {
 }
 
 impl NamespacedPipeline {
+    #[must_use]
     pub fn new() -> Self {
         Self {
             queues: HashMap::new(),
@@ -75,6 +76,7 @@ impl NamespacedPipeline {
     }
 
     /// Per-namespace commit frontier for the given namespace.
+    #[must_use]
     pub fn ns_commit(&self, ns: u64) -> Option<u64> {
         self.ns_commits.get(&ns).copied()
     }
@@ -84,6 +86,9 @@ impl NamespacedPipeline {
     /// For each namespace queue, drains from the front while entries have
     /// `ok_quorum_received == true`. Returns entries sorted by global op
     /// for deterministic processing.
+    ///
+    /// # Panics
+    /// If the front entry exists but `pop_front` returns `None` (unreachable).
     pub fn drain_committable_all(&mut self) -> Vec<PipelineEntry> {
         let mut drained = Vec::new();
 
@@ -117,6 +122,7 @@ impl NamespacedPipeline {
     /// Walks forward from `current_commit + 1`, treating any op not found
     /// in the pipeline (already drained) as committed. Stops at the first
     /// op still present in a queue or past `last_push_op`.
+    #[must_use]
     pub fn global_commit_frontier(&self, current_commit: u64) -> u64 {
         let mut commit = current_commit;
         loop {
@@ -143,6 +149,11 @@ impl Pipeline for NamespacedPipeline {
     type Message = Message<PrepareHeader>;
     type Entry = PipelineEntry;
 
+    /// # Panics
+    /// - If the pipeline is full.
+    /// - If ops are not globally sequential.
+    /// - If the hash chain is broken.
+    /// - If the namespace is not registered.
     fn push_message(&mut self, message: Self::Message) {
         assert!(
             self.total_count < PIPELINE_PREPARE_QUEUE_MAX,
@@ -258,7 +269,7 @@ impl Pipeline for NamespacedPipeline {
     fn verify(&self) {
         assert!(self.total_count <= PIPELINE_PREPARE_QUEUE_MAX);
 
-        let actual_count: usize = self.queues.values().map(|q| q.len()).sum();
+        let actual_count: usize = 
self.queues.values().map(VecDeque::len).sum();
         assert_eq!(actual_count, self.total_count, "total_count mismatch");
 
         // Per-namespace: ops must be monotonically increasing
@@ -492,6 +503,7 @@ mod tests {
     }
 
     #[test]
+    #[allow(clippy::cast_possible_truncation)]
     fn is_full() {
         let mut pipeline = NamespacedPipeline::new();
         pipeline.register_namespace(0);
@@ -506,6 +518,7 @@ mod tests {
 
     #[test]
     #[should_panic(expected = "namespaced pipeline full")]
+    #[allow(clippy::cast_possible_truncation)]
     fn push_when_full_panics() {
         let mut pipeline = NamespacedPipeline::new();
         pipeline.register_namespace(0);
diff --git a/core/consensus/src/plane_helpers.rs 
b/core/consensus/src/plane_helpers.rs
index f4ca92f3d..55d21534a 100644
--- a/core/consensus/src/plane_helpers.rs
+++ b/core/consensus/src/plane_helpers.rs
@@ -24,6 +24,12 @@ use std::ops::AsyncFnOnce;
 // TODO: Rework all of those helpers, once the boundaries are more clear and 
we have a better picture of the commonalities between all of the planes.
 
 /// Shared pipeline-first request flow used by metadata and partitions.
+///
+/// # Panics
+/// - If the caller is not the primary.
+/// - If the consensus status is not normal.
+/// - If the consensus is syncing.
+#[allow(clippy::future_not_send)]
 pub async fn pipeline_prepare_common<C, F>(
     consensus: &C,
     prepare: C::Message<C::ReplicateHeader>,
@@ -43,7 +49,8 @@ pub async fn pipeline_prepare_common<C, F>(
 }
 
 /// Shared commit-based old-prepare fence.
-pub fn fence_old_prepare_by_commit<B, P>(
+#[must_use]
+pub const fn fence_old_prepare_by_commit<B, P>(
     consensus: &VsrConsensus<B, P>,
     header: &PrepareHeader,
 ) -> bool
@@ -55,6 +62,13 @@ where
 }
 
 /// Shared chain-replication forwarding to the next replica.
+///
+/// # Panics
+/// - If `header.command` is not `Command2::Prepare`.
+/// - If `header.op <= consensus.commit()`.
+/// - If the computed next replica equals self.
+/// - If the message bus send fails.
+#[allow(clippy::future_not_send)]
 pub async fn replicate_to_next_in_chain<B, P>(
     consensus: &VsrConsensus<B, P>,
     message: Message<PrepareHeader>,
@@ -87,6 +101,13 @@ pub async fn replicate_to_next_in_chain<B, P>(
 /// Shared preflight checks for `on_replicate`.
 ///
 /// Returns current op on success.
+///
+/// # Errors
+/// Returns a static error string if the replica is syncing, not in normal
+/// status, or the message has a newer view.
+///
+/// # Panics
+/// If `header.command` is not `Command2::Prepare`.
 pub fn replicate_preflight<B, P>(
     consensus: &VsrConsensus<B, P>,
     header: &PrepareHeader,
@@ -119,6 +140,10 @@ where
 }
 
 /// Shared preflight checks for `on_ack`.
+///
+/// # Errors
+/// Returns a static error string if the replica is not primary or not in
+/// normal status.
 pub fn ack_preflight<B, P>(consensus: &VsrConsensus<B, P>) -> Result<(), 
&'static str>
 where
     B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
@@ -171,6 +196,9 @@ where
 ///
 /// Entries are drained only from the head and only while their op is covered
 /// by the current commit frontier.
+///
+/// # Panics
+/// If `head()` returns `Some` but `pop_message()` returns `None` 
(unreachable).
 pub fn drain_committable_prefix<B, P>(consensus: &VsrConsensus<B, P>) -> 
Vec<PipelineEntry>
 where
     B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
@@ -195,6 +223,10 @@ where
 }
 
 /// Shared reply-message construction for committed prepare.
+///
+/// # Panics
+/// If the constructed message buffer is not valid.
+#[allow(clippy::needless_pass_by_value, clippy::cast_possible_truncation)]
 pub fn build_reply_message<B, P>(
     consensus: &VsrConsensus<B, P>,
     prepare_header: &PrepareHeader,
@@ -239,6 +271,9 @@ where
 }
 
 /// Verify hash chain would not break if we add this header.
+///
+/// # Panics
+/// If both headers share the same view and `current.parent != 
previous.checksum`.
 pub fn panic_if_hash_chain_would_break_in_same_view(
     previous: &PrepareHeader,
     current: &PrepareHeader,
@@ -254,6 +289,10 @@ pub fn panic_if_hash_chain_would_break_in_same_view(
 }
 
 // TODO: Figure out how to make this check the journal if it contains the 
prepare.
+/// # Panics
+/// - If `header.command` is not `Command2::Prepare`.
+/// - If `header.view > consensus.view()`.
+#[allow(clippy::cast_possible_truncation, clippy::future_not_send)]
 pub async fn send_prepare_ok<B, P>(
     consensus: &VsrConsensus<B, P>,
     header: &PrepareHeader,
@@ -272,7 +311,7 @@ pub async fn send_prepare_ok<B, P>(
         return;
     }
 
-    if let Some(false) = is_persisted {
+    if is_persisted == Some(false) {
         return;
     }
 
@@ -534,6 +573,7 @@ mod tests {
         }
     }
 
+    #[allow(clippy::future_not_send)]
     impl MessageBus for SpyBus {
         type Client = u128;
         type Replica = u8;
diff --git a/core/consensus/src/plane_mux.rs b/core/consensus/src/plane_mux.rs
index d904bf98d..6ad5078ff 100644
--- a/core/consensus/src/plane_mux.rs
+++ b/core/consensus/src/plane_mux.rs
@@ -26,19 +26,23 @@ pub struct MuxPlane<T> {
 }
 
 impl<T> MuxPlane<T> {
-    pub fn new(inner: T) -> Self {
+    #[must_use]
+    pub const fn new(inner: T) -> Self {
         Self { inner }
     }
 
-    pub fn inner(&self) -> &T {
+    #[must_use]
+    pub const fn inner(&self) -> &T {
         &self.inner
     }
 
-    pub fn inner_mut(&mut self) -> &mut T {
+    pub const fn inner_mut(&mut self) -> &mut T {
         &mut self.inner
     }
 }
 
+// Consensus runs on single-threaded compio shards; futures are intentionally 
!Send.
+#[allow(clippy::future_not_send)]
 impl<C, T> Plane<C> for MuxPlane<T>
 where
     C: Consensus,
@@ -63,6 +67,8 @@ where
     }
 }
 
+// Consensus runs on single-threaded compio shards; futures are intentionally 
!Send.
+#[allow(clippy::future_not_send)]
 impl<C> Plane<C> for ()
 where
     C: Consensus,
@@ -134,6 +140,8 @@ impl<T: PartitionsHandle> PartitionsHandle for MuxPlane<T> {
     }
 }
 
+// Consensus runs on single-threaded compio shards; futures are intentionally 
!Send.
+#[allow(clippy::future_not_send)]
 impl<C, Head, Tail> Plane<C> for variadic!(Head, ...Tail)
 where
     C: Consensus,
diff --git a/core/consensus/src/view_change_quorum.rs 
b/core/consensus/src/view_change_quorum.rs
index 2a8e01e9b..2eaff15b3 100644
--- a/core/consensus/src/view_change_quorum.rs
+++ b/core/consensus/src/view_change_quorum.rs
@@ -17,7 +17,7 @@
 
 use crate::REPLICAS_MAX;
 
-/// Stored information from a DoViewChange message.
+/// Stored information from a `DoViewChange` message.
 #[derive(Debug, Clone, Copy)]
 pub struct StoredDvc {
     pub replica: u8,
@@ -28,12 +28,13 @@ pub struct StoredDvc {
 }
 
 impl StoredDvc {
-    /// Compare for log selection: highest log_view, then highest op.
-    pub fn is_better_than(&self, other: &StoredDvc) -> bool {
-        if self.log_view != other.log_view {
-            self.log_view > other.log_view
-        } else {
+    /// Compare for log selection: highest `log_view`, then highest op.
+    #[must_use]
+    pub const fn is_better_than(&self, other: &Self) -> bool {
+        if self.log_view == other.log_view {
             self.op > other.op
+        } else {
+            self.log_view > other.log_view
         }
     }
 }
@@ -42,12 +43,13 @@ impl StoredDvc {
 pub type DvcQuorumArray = [Option<StoredDvc>; REPLICAS_MAX];
 
 /// Create an empty DVC quorum array.
+#[must_use]
 pub const fn dvc_quorum_array_empty() -> DvcQuorumArray {
     [None; REPLICAS_MAX]
 }
 
 /// Record a DVC in the array. Returns true if this is a new entry (not 
duplicate).
-pub fn dvc_record(array: &mut DvcQuorumArray, dvc: StoredDvc) -> bool {
+pub const fn dvc_record(array: &mut DvcQuorumArray, dvc: StoredDvc) -> bool {
     let slot = &mut array[dvc.replica as usize];
     if slot.is_some() {
         return false; // Duplicate
@@ -57,20 +59,20 @@ pub fn dvc_record(array: &mut DvcQuorumArray, dvc: 
StoredDvc) -> bool {
 }
 
 /// Count how many DVCs have been received.
+#[must_use]
 pub fn dvc_count(array: &DvcQuorumArray) -> usize {
     array.iter().filter(|m| m.is_some()).count()
 }
 
 /// Check if a specific replica has sent a DVC.
+#[must_use]
 pub fn dvc_has_from(array: &DvcQuorumArray, replica: u8) -> bool {
-    array
-        .get(replica as usize)
-        .map(|m| m.is_some())
-        .unwrap_or(false)
+    array.get(replica as usize).is_some_and(Option::is_some)
 }
 
 /// Select the winning DVC (best log) from the quorum.
-/// Returns the DVC with: highest log_view, then highest op.
+/// Returns the DVC with: highest `log_view`, then highest op.
+#[must_use]
 pub fn dvc_select_winner(array: &DvcQuorumArray) -> Option<&StoredDvc> {
     array
         .iter()
@@ -82,6 +84,7 @@ pub fn dvc_select_winner(array: &DvcQuorumArray) -> 
Option<&StoredDvc> {
 }
 
 /// Get the maximum commit number across all DVCs.
+#[must_use]
 pub fn dvc_max_commit(array: &DvcQuorumArray) -> u64 {
     array
         .iter()
@@ -92,11 +95,12 @@ pub fn dvc_max_commit(array: &DvcQuorumArray) -> u64 {
 }
 
 /// Reset the DVC quorum array.
-pub fn dvc_reset(array: &mut DvcQuorumArray) {
+pub const fn dvc_reset(array: &mut DvcQuorumArray) {
     *array = dvc_quorum_array_empty();
 }
 
 /// Iterator over all stored DVCs.
+// TODO: add #[must_use] -- pure iterator query, callers should not ignore.
 pub fn dvc_iter(array: &DvcQuorumArray) -> impl Iterator<Item = &StoredDvc> {
     array.iter().filter_map(|m| m.as_ref())
 }
diff --git a/core/consensus/src/vsr_timeout.rs 
b/core/consensus/src/vsr_timeout.rs
index f301ec7d2..eabc477e6 100644
--- a/core/consensus/src/vsr_timeout.rs
+++ b/core/consensus/src/vsr_timeout.rs
@@ -46,7 +46,8 @@ pub struct Timeout {
 }
 
 impl Timeout {
-    pub fn new(id: u128, after: u64) -> Self {
+    #[must_use]
+    pub const fn new(id: u128, after: u64) -> Self {
         Self {
             id,
             after,
@@ -56,30 +57,31 @@ impl Timeout {
         }
     }
 
-    pub fn start(&mut self) {
+    pub const fn start(&mut self) {
         self.ticks_remaining = self.after;
         self.ticking = true;
         self.attempts = 0;
     }
 
-    pub fn stop(&mut self) {
+    pub const fn stop(&mut self) {
         self.ticking = false;
         self.ticks_remaining = 0;
         self.attempts = 0;
     }
 
-    pub fn reset(&mut self) {
+    pub const fn reset(&mut self) {
         self.ticks_remaining = self.after;
         self.attempts = 0;
     }
 
-    pub fn tick(&mut self) {
+    pub const fn tick(&mut self) {
         if self.ticking {
             self.ticks_remaining = self.ticks_remaining.saturating_sub(1);
         }
     }
 
-    pub fn fired(&self) -> bool {
+    #[must_use]
+    pub const fn fired(&self) -> bool {
         self.ticking && self.ticks_remaining == 0
     }
 
@@ -135,6 +137,7 @@ impl TimeoutManager {
     const DO_VIEW_CHANGE_MESSAGE_TICKS: u64 = 50;
     const REQUEST_START_VIEW_MESSAGE_TICKS: u64 = 100;
 
+    // TODO: add #[must_use] -- constructor, discarding is always a bug.
     pub fn new(replica_id: u128) -> Self {
         Self {
             ping: Timeout::new(replica_id, Self::PING_TICKS),
@@ -151,6 +154,8 @@ impl TimeoutManager {
                 replica_id,
                 Self::REQUEST_START_VIEW_MESSAGE_TICKS,
             ),
+            // Upper bits of replica_id are not critical for PRNG seed 
diversity.
+            #[allow(clippy::cast_possible_truncation)]
             prng: Xoshiro256Plus::seed_from_u64(replica_id as u64),
         }
     }
@@ -158,7 +163,7 @@ impl TimeoutManager {
     /// Tick all timeouts
     /// This is the first phase of the two-phase tick-based timeout mechanism.
     /// 2nd phase is checking which timeouts have fired and calling the 
appropriate handlers.
-    pub fn tick(&mut self) {
+    pub const fn tick(&mut self) {
         self.ping.tick();
         self.prepare.tick();
         self.commit_message.tick();
@@ -169,11 +174,12 @@ impl TimeoutManager {
         self.view_change_status.tick();
     }
 
-    pub fn fired(&self, kind: TimeoutKind) -> bool {
+    #[must_use]
+    pub const fn fired(&self, kind: TimeoutKind) -> bool {
         self.get(kind).fired()
     }
 
-    pub fn get(&self, kind: TimeoutKind) -> &Timeout {
+    pub const fn get(&self, kind: TimeoutKind) -> &Timeout {
         match kind {
             TimeoutKind::Ping => &self.ping,
             TimeoutKind::Prepare => &self.prepare,
@@ -186,7 +192,7 @@ impl TimeoutManager {
         }
     }
 
-    pub fn get_mut(&mut self, kind: TimeoutKind) -> &mut Timeout {
+    pub const fn get_mut(&mut self, kind: TimeoutKind) -> &mut Timeout {
         match kind {
             TimeoutKind::Ping => &mut self.ping,
             TimeoutKind::Prepare => &mut self.prepare,
@@ -199,15 +205,15 @@ impl TimeoutManager {
         }
     }
 
-    pub fn start(&mut self, kind: TimeoutKind) {
+    pub const fn start(&mut self, kind: TimeoutKind) {
         self.get_mut(kind).start();
     }
 
-    pub fn stop(&mut self, kind: TimeoutKind) {
+    pub const fn stop(&mut self, kind: TimeoutKind) {
         self.get_mut(kind).stop();
     }
 
-    pub fn reset(&mut self, kind: TimeoutKind) {
+    pub const fn reset(&mut self, kind: TimeoutKind) {
         self.get_mut(kind).reset();
     }
 
@@ -225,7 +231,8 @@ impl TimeoutManager {
         timeout.backoff(&mut self.prng);
     }
 
-    pub fn is_ticking(&self, kind: TimeoutKind) -> bool {
+    #[must_use]
+    pub const fn is_ticking(&self, kind: TimeoutKind) -> bool {
         self.get(kind).ticking
     }
 }
diff --git a/core/shard/src/router.rs b/core/shard/src/router.rs
index a8b6d5637..8102fef00 100644
--- a/core/shard/src/router.rs
+++ b/core/shard/src/router.rs
@@ -136,7 +136,11 @@ where
                 Entry = Message<PrepareHeader>,
                 Header = PrepareHeader,
             >,
-        M: StateMachine<Input = Message<PrepareHeader>>,
+        M: StateMachine<
+                Input = Message<PrepareHeader>,
+                Output = bytes::Bytes,
+                Error = iggy_common::IggyError,
+            >,
     {
         loop {
             futures::select! {
@@ -165,7 +169,11 @@ where
                 Entry = Message<PrepareHeader>,
                 Header = PrepareHeader,
             >,
-        M: StateMachine<Input = Message<PrepareHeader>>,
+        M: StateMachine<
+                Input = Message<PrepareHeader>,
+                Output = bytes::Bytes,
+                Error = iggy_common::IggyError,
+            >,
     {
         self.on_message(frame.message).await;
         // TODO: once on_message returns an R (e.g. ShardResponse), send it


Reply via email to