This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new cc5dedfb7 chore(consensus): enable pedantic and nursery clippy lints
(#2870)
cc5dedfb7 is described below
commit cc5dedfb7e1c8a28b4eab43363c67f7b0d1fefd2
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Mar 9 09:46:09 2026 +0100
chore(consensus): enable pedantic and nursery clippy lints (#2870)
---
core/consensus/Cargo.toml | 5 +
core/consensus/src/impls.rs | 169 +++++++++++++++++++++---------
core/consensus/src/lib.rs | 2 +-
core/consensus/src/namespaced_pipeline.rs | 17 ++-
core/consensus/src/plane_helpers.rs | 44 +++++++-
core/consensus/src/plane_mux.rs | 14 ++-
core/consensus/src/view_change_quorum.rs | 30 +++---
core/consensus/src/vsr_timeout.rs | 35 ++++---
8 files changed, 229 insertions(+), 87 deletions(-)
diff --git a/core/consensus/Cargo.toml b/core/consensus/Cargo.toml
index 70c6ff46b..2695f624c 100644
--- a/core/consensus/Cargo.toml
+++ b/core/consensus/Cargo.toml
@@ -38,3 +38,8 @@ rand_xoshiro = { workspace = true }
[dev-dependencies]
futures = { workspace = true }
+
+[lints.clippy]
+enum_glob_use = "deny"
+pedantic = "deny"
+nursery = "warn"
diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index e1e992d8a..016fe839f 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -50,7 +50,8 @@ pub struct LocalSequencer {
}
impl LocalSequencer {
- pub fn new(initial_op: u64) -> Self {
+ #[must_use]
+ pub const fn new(initial_op: u64) -> Self {
Self {
op: Cell::new(initial_op),
}
@@ -89,11 +90,12 @@ pub struct PipelineEntry {
pub header: PrepareHeader,
/// Bitmap of replicas that have acknowledged this prepare.
pub ok_from_replicas: BitSet<u32>,
- /// Whether we've received a quorum of prepare_ok messages.
+ /// Whether we've received a quorum of `prepare_ok` messages.
pub ok_quorum_received: bool,
}
impl PipelineEntry {
+ #[must_use]
pub fn new(header: PrepareHeader) -> Self {
Self {
header,
@@ -102,7 +104,7 @@ impl PipelineEntry {
}
}
- /// Record a prepare_ok from the given replica.
+ /// Record a `prepare_ok` from the given replica.
/// Returns the new count of acknowledgments.
pub fn add_ack(&mut self, replica: u8) -> usize {
self.ok_from_replicas.insert(replica as usize);
@@ -110,11 +112,13 @@ impl PipelineEntry {
}
/// Check if we have an ack from the given replica.
+ #[must_use]
pub fn has_ack(&self, replica: u8) -> bool {
self.ok_from_replicas.contains(replica as usize)
}
/// Get the number of acks received.
+ #[must_use]
pub fn ack_count(&self) -> usize {
self.ok_from_replicas.len()
}
@@ -128,7 +132,7 @@ pub struct RequestEntry {
}
impl RequestEntry {
- pub fn new(message: Message<RequestHeader>) -> Self {
+ pub const fn new(message: Message<RequestHeader>) -> Self {
Self {
message,
received_at: 0, //TODO figure the correct way to do this
@@ -149,25 +153,30 @@ impl Default for LocalPipeline {
}
impl LocalPipeline {
+ #[must_use]
pub fn new() -> Self {
Self {
prepare_queue: VecDeque::with_capacity(PIPELINE_PREPARE_QUEUE_MAX),
}
}
+ #[must_use]
pub fn prepare_count(&self) -> usize {
self.prepare_queue.len()
}
+ #[must_use]
pub fn prepare_queue_full(&self) -> bool {
self.prepare_queue.len() >= PIPELINE_PREPARE_QUEUE_MAX
}
/// Returns true if prepare queue is full.
+ #[must_use]
pub fn is_full(&self) -> bool {
self.prepare_queue_full()
}
+ #[must_use]
pub fn is_empty(&self) -> bool {
self.prepare_queue.is_empty()
}
@@ -177,6 +186,8 @@ impl LocalPipeline {
/// # Panics
/// - If message queue is full.
/// - If the message doesn't chain correctly to the previous entry.
+ // Signature must match `Pipeline::push_message` trait definition.
+ #[allow(clippy::needless_pass_by_value)]
pub fn push_message(&mut self, message: Message<PrepareHeader>) {
assert!(!self.prepare_queue_full(), "prepare queue is full");
@@ -208,6 +219,7 @@ impl LocalPipeline {
}
/// Get the head (oldest) prepare.
+ #[must_use]
pub fn prepare_head(&self) -> Option<&PipelineEntry> {
self.prepare_queue.front()
}
@@ -217,11 +229,15 @@ impl LocalPipeline {
}
/// Get the tail (newest) prepare.
+ #[must_use]
pub fn prepare_tail(&self) -> Option<&PipelineEntry> {
self.prepare_queue.back()
}
/// Find a message by op number and checksum (immutable).
+ // Pipeline bounded at PIPELINE_PREPARE_QUEUE_MAX (8) entries; index
always fits in usize.
+ #[must_use]
+ #[allow(clippy::cast_possible_truncation)]
pub fn message_by_op_and_checksum(&self, op: u64, checksum: u128) ->
Option<&PipelineEntry> {
let head_op = self.prepare_queue.front()?.header.op;
let tail_op = self.prepare_queue.back()?.header.op;
@@ -250,6 +266,9 @@ impl LocalPipeline {
}
/// Find a message by op number only.
+ // Pipeline bounded at PIPELINE_PREPARE_QUEUE_MAX (8) entries; index
always fits in usize.
+ #[must_use]
+ #[allow(clippy::cast_possible_truncation)]
pub fn message_by_op(&self, op: u64) -> Option<&PipelineEntry> {
let head_op = self.prepare_queue.front()?.header.op;
@@ -263,6 +282,8 @@ impl LocalPipeline {
/// Get mutable reference to a message entry by op number.
/// Returns None if op is not in the pipeline.
+ // Pipeline bounded at PIPELINE_PREPARE_QUEUE_MAX (8) entries; index
always fits in usize.
+ #[allow(clippy::cast_possible_truncation)]
pub fn message_by_op_mut(&mut self, op: u64) -> Option<&mut PipelineEntry>
{
let head_op = self.prepare_queue.front()?.header.op;
if op < head_op {
@@ -276,14 +297,16 @@ impl LocalPipeline {
}
/// Get the entry at the head of the prepare queue (oldest uncommitted).
+ #[must_use]
pub fn head(&self) -> Option<&PipelineEntry> {
self.prepare_queue.front()
}
- /// Search prepare queue for a message from the given client.
+ /// Search `prepare_queue` for a message from the given client.
///
- /// If there are multiple messages (possible in prepare_queue after view
change),
+ /// If there are multiple messages (possible in `prepare_queue` after view
change),
/// returns the latest one.
+ #[must_use]
pub fn has_message_from_client(&self, client: u128) -> bool {
self.prepare_queue.iter().any(|p| p.header.client == client)
}
@@ -324,43 +347,43 @@ impl Pipeline for LocalPipeline {
type Entry = PipelineEntry;
fn push_message(&mut self, message: Self::Message) {
- LocalPipeline::push_message(self, message)
+ Self::push_message(self, message);
}
fn pop_message(&mut self) -> Option<Self::Entry> {
- LocalPipeline::pop_message(self)
+ Self::pop_message(self)
}
fn clear(&mut self) {
- LocalPipeline::clear(self)
+ Self::clear(self);
}
fn message_by_op(&self, op: u64) -> Option<&Self::Entry> {
- LocalPipeline::message_by_op(self, op)
+ Self::message_by_op(self, op)
}
fn message_by_op_mut(&mut self, op: u64) -> Option<&mut Self::Entry> {
- LocalPipeline::message_by_op_mut(self, op)
+ Self::message_by_op_mut(self, op)
}
fn message_by_op_and_checksum(&self, op: u64, checksum: u128) ->
Option<&Self::Entry> {
- LocalPipeline::message_by_op_and_checksum(self, op, checksum)
+ Self::message_by_op_and_checksum(self, op, checksum)
}
fn head(&self) -> Option<&Self::Entry> {
- LocalPipeline::head(self)
+ Self::head(self)
}
fn is_full(&self) -> bool {
- LocalPipeline::is_full(self)
+ Self::is_full(self)
}
fn is_empty(&self) -> bool {
- LocalPipeline::is_empty(self)
+ Self::is_empty(self)
}
fn verify(&self) {
- LocalPipeline::verify(self)
+ Self::verify(self);
}
}
@@ -374,9 +397,9 @@ pub enum Status {
/// Actions to be taken by the caller after processing a VSR event.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum VsrAction {
- /// Send StartViewChange to all replicas.
+ /// Send `StartViewChange` to all replicas.
SendStartViewChange { view: u32, namespace: u64 },
- /// Send DoViewChange to primary.
+ /// Send `DoViewChange` to primary.
SendDoViewChange {
view: u32,
target: u8,
@@ -385,14 +408,14 @@ pub enum VsrAction {
commit: u64,
namespace: u64,
},
- /// Send StartView to all backups (as new primary).
+ /// Send `StartView` to all backups (as new primary).
SendStartView {
view: u32,
op: u64,
commit: u64,
namespace: u64,
},
- /// Send PrepareOK to primary.
+ /// Send `PrepareOK` to primary.
SendPrepareOk {
view: u32,
op: u64,
@@ -453,6 +476,9 @@ where
}
impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> {
+ /// # Panics
+ /// - If `replica >= replica_count`.
+ /// - If `replica_count < 1`.
pub fn new(
cluster: u128,
replica: u8,
@@ -469,7 +495,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
// TODO: Verify that XOR-based seeding provides sufficient jitter
diversity
// across groups. Consider using a proper hash (e.g., Murmur3) of
// (replica_id, namespace) for production.
- let timeout_seed = replica as u128 ^ namespace as u128;
+ let timeout_seed = u128::from(replica) ^ u128::from(namespace);
Self {
cluster,
replica,
@@ -499,14 +525,21 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
self.status.set(Status::Normal);
}
- pub fn primary_index(&self, view: u32) -> u8 {
+ #[must_use]
+ // cast_lossless: `u32::from()` unavailable in const fn.
+ // cast_possible_truncation: modulo by replica_count (u8) guarantees
result fits in u8.
+ #[allow(clippy::cast_lossless, clippy::cast_possible_truncation)]
+ pub const fn primary_index(&self, view: u32) -> u8 {
(view % self.replica_count as u32) as u8
}
- pub fn is_primary(&self) -> bool {
+ #[must_use]
+ pub const fn is_primary(&self) -> bool {
self.primary_index(self.view.get()) == self.replica
}
+ /// # Panics
+ /// If the stored commit number somehow exceeds the given `commit` after
update.
pub fn advance_commit_number(&self, commit: u64) {
if commit > self.commit.get() {
self.commit.set(commit);
@@ -517,33 +550,40 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
/// Maximum number of faulty replicas that can be tolerated.
/// For a cluster of 2f+1 replicas, this returns f.
- pub fn max_faulty(&self) -> usize {
+ #[must_use]
+ pub const fn max_faulty(&self) -> usize {
(self.replica_count as usize - 1) / 2
}
- /// Quorum size = f + 1 = max_faulty + 1
- pub fn quorum(&self) -> usize {
+ /// Quorum size = f + 1 = `max_faulty` + 1
+ #[must_use]
+ pub const fn quorum(&self) -> usize {
self.max_faulty() + 1
}
- pub fn commit(&self) -> u64 {
+ #[must_use]
+ pub const fn commit(&self) -> u64 {
self.commit.get()
}
- pub fn is_syncing(&self) -> bool {
- // for now return false. we have to add syncing related setup to
VsrConsensus to make this work.
+ #[must_use]
+ pub const fn is_syncing(&self) -> bool {
+ // TODO: for now return false. we have to add syncing related setup to
VsrConsensus to make this work.
false
}
- pub fn replica(&self) -> u8 {
+ #[must_use]
+ pub const fn replica(&self) -> u8 {
self.replica
}
- pub fn sequencer(&self) -> &LocalSequencer {
+ #[must_use]
+ pub const fn sequencer(&self) -> &LocalSequencer {
&self.sequencer
}
- pub fn view(&self) -> u32 {
+ #[must_use]
+ pub const fn view(&self) -> u32 {
self.view.get()
}
@@ -551,34 +591,41 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
self.view.set(view);
}
- pub fn status(&self) -> Status {
+ #[must_use]
+ pub const fn status(&self) -> Status {
self.status.get()
}
// TODO(hubcio): returning &RefCell<P> leaks interior mutability - callers
// could hold a Ref/RefMut across an .await and cause a runtime panic.
// We had this problem with slab + ECS.
- pub fn pipeline(&self) -> &RefCell<P> {
+ #[must_use]
+ pub const fn pipeline(&self) -> &RefCell<P> {
&self.pipeline
}
- pub fn pipeline_mut(&mut self) -> &mut RefCell<P> {
+ #[must_use]
+ pub const fn pipeline_mut(&mut self) -> &mut RefCell<P> {
&mut self.pipeline
}
- pub fn cluster(&self) -> u128 {
+ #[must_use]
+ pub const fn cluster(&self) -> u128 {
self.cluster
}
- pub fn replica_count(&self) -> u8 {
+ #[must_use]
+ pub const fn replica_count(&self) -> u8 {
self.replica_count
}
- pub fn namespace(&self) -> u64 {
+ #[must_use]
+ pub const fn namespace(&self) -> u64 {
self.namespace
}
- pub fn last_prepare_checksum(&self) -> u128 {
+ #[must_use]
+ pub const fn last_prepare_checksum(&self) -> u128 {
self.last_prepare_checksum.get()
}
@@ -586,7 +633,8 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
self.last_prepare_checksum.set(checksum);
}
- pub fn log_view(&self) -> u32 {
+ #[must_use]
+ pub const fn log_view(&self) -> u32 {
self.log_view.get()
}
@@ -594,7 +642,8 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
self.log_view.set(log_view);
}
- pub fn is_primary_for_view(&self, view: u32) -> bool {
+ #[must_use]
+ pub const fn is_primary_for_view(&self, view: u32) -> bool {
self.primary_index(view) == self.replica
}
@@ -624,9 +673,9 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
/// Reset all view change state when transitioning to a new view.
///
- /// Clears the loopback queue: stale PrepareOks from the old view
+ /// Clears the loopback queue: stale `PrepareOks` from the old view
/// reference pipeline entries that no longer exist, so processing
- /// them would be a no-op (handle_prepare_ok ignores unknown ops).
+ /// them would be a no-op (`handle_prepare_ok` ignores unknown ops).
/// The primary does not require its own self-ack for quorum.
pub(crate) fn reset_view_change_state(&self) {
self.reset_svc_quorum();
@@ -675,7 +724,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
actions
}
- /// Called when normal_heartbeat timeout fires.
+ /// Called when `normal_heartbeat` timeout fires.
/// Backup hasn't heard from primary - start view change.
fn handle_normal_heartbeat_timeout(&self) -> Vec<VsrAction> {
// Only backups trigger view change on heartbeat timeout
@@ -788,11 +837,14 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
}]
}
- /// Handle a received StartViewChange message.
+ /// Handle a received `StartViewChange` message.
///
/// "When replica i receives STARTVIEWCHANGE messages for its view-number
/// from f OTHER replicas, it sends a DOVIEWCHANGE message to the node
/// that will be the primary in the new view."
+ ///
+ /// # Panics
+ /// If `header.namespace` does not match this replica's namespace.
pub fn handle_start_view_change(&self, header: &StartViewChangeHeader) ->
Vec<VsrAction> {
assert_eq!(
header.namespace, self.namespace,
@@ -887,11 +939,14 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
actions
}
- /// Handle a received DoViewChange message (only relevant for primary
candidate).
+ /// Handle a received `DoViewChange` message (only relevant for primary
candidate).
///
/// "When the new primary receives f + 1 DOVIEWCHANGE messages from
different
/// replicas (including itself), it sets its view-number to that in the
messages
/// and selects as the new log the one contained in the message with the
largest v'..."
+ ///
+ /// # Panics
+ /// If `header.namespace` does not match this replica's namespace.
pub fn handle_do_view_change(&self, header: &DoViewChangeHeader) ->
Vec<VsrAction> {
assert_eq!(
header.namespace, self.namespace,
@@ -984,12 +1039,15 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
actions
}
- /// Handle a received StartView message (backups only).
+ /// Handle a received `StartView` message (backups only).
///
/// "When other replicas receive the STARTVIEW message, they replace their
log
/// with the one in the message, set their op-number to that of the latest
entry
/// in the log, set their view-number to the view number in the message,
change
- /// their status to normal, and send PrepareOK for any uncommitted ops."
+ /// their status to normal, and send `PrepareOK` for any uncommitted ops."
+ ///
+ /// # Panics
+ /// If `header.namespace` does not match this replica's namespace.
pub fn handle_start_view(&self, header: &StartViewHeader) ->
Vec<VsrAction> {
assert_eq!(header.namespace, self.namespace, "SV routed to wrong
group");
let from_replica = header.replica;
@@ -1091,10 +1149,14 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
}]
}
- /// Handle a PrepareOk message from a replica.
+ /// Handle a `PrepareOk` message from a replica.
///
/// Returns `true` if quorum was just reached for this op.
/// Caller (`on_ack`) should validate `is_primary` and status before
calling.
+ ///
+ /// # Panics
+ /// - If `header.command` is not `Command2::PrepareOk`.
+ /// - If `header.replica >= self.replica_count`.
pub fn handle_prepare_ok(&self, header: &PrepareOkHeader) -> bool {
assert_eq!(header.command, Command2::PrepareOk);
assert!(
@@ -1151,7 +1213,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
/// Enqueue a self-addressed message for processing in the next loopback
drain.
///
- /// Currently only PrepareOk messages are routed here (via
`send_or_loopback`).
+ /// Currently only `PrepareOk` messages are routed here (via
`send_or_loopback`).
// TODO: Route SVC/DVC self-messages through loopback once VsrAction
dispatch is implemented.
pub(crate) fn push_loopback(&self, message: Message<GenericHeader>) {
assert!(
@@ -1170,6 +1232,8 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
}
/// Send a message to `target`, routing self-addressed messages through
the loopback queue.
+ // VsrConsensus uses Cell/RefCell for single-threaded compio shards;
futures are intentionally !Send.
+ #[allow(clippy::future_not_send)]
pub(crate) async fn send_or_loopback(&self, target: u8, message:
Message<GenericHeader>)
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client =
u128>,
@@ -1185,7 +1249,8 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
}
}
- pub fn message_bus(&self) -> &B {
+ #[must_use]
+ pub const fn message_bus(&self) -> &B {
&self.message_bus
}
}
@@ -1226,7 +1291,7 @@ where
impl<B, P> Project<Message<PrepareOkHeader>, VsrConsensus<B, P>> for
Message<PrepareHeader>
where
B: MessageBus,
- P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
+ P: Pipeline<Message = Self, Entry = PipelineEntry>,
{
type Consensus = VsrConsensus<B, P>;
diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs
index b1f7460e2..dcc5cbc6d 100644
--- a/core/consensus/src/lib.rs
+++ b/core/consensus/src/lib.rs
@@ -78,7 +78,7 @@ pub trait Consensus: Sized {
/// This abstracts the VSR message flow:
/// - request -> prepare
/// - replicate (prepare)
-/// - ack (prepare_ok)
+/// - ack (`prepare_ok`)
pub trait Plane<C>
where
C: Consensus,
diff --git a/core/consensus/src/namespaced_pipeline.rs
b/core/consensus/src/namespaced_pipeline.rs
index 05d47d80f..2eb90125b 100644
--- a/core/consensus/src/namespaced_pipeline.rs
+++ b/core/consensus/src/namespaced_pipeline.rs
@@ -24,7 +24,7 @@ use std::collections::{HashMap, VecDeque};
/// Pipeline that partitions entries by namespace for independent commit
draining.
///
/// A single global op sequence and hash chain spans all namespaces, but
entries
-/// are stored in per-namespace VecDeques. Each namespace tracks its own commit
+/// are stored in per-namespace `VecDeques`. Each namespace tracks its own
commit
/// frontier so `drain_committable_all` drains quorum'd entries per-namespace
/// without waiting for the global commit to advance past unrelated namespaces.
///
@@ -58,6 +58,7 @@ impl Default for NamespacedPipeline {
}
impl NamespacedPipeline {
+ #[must_use]
pub fn new() -> Self {
Self {
queues: HashMap::new(),
@@ -75,6 +76,7 @@ impl NamespacedPipeline {
}
/// Per-namespace commit frontier for the given namespace.
+ #[must_use]
pub fn ns_commit(&self, ns: u64) -> Option<u64> {
self.ns_commits.get(&ns).copied()
}
@@ -84,6 +86,9 @@ impl NamespacedPipeline {
/// For each namespace queue, drains from the front while entries have
/// `ok_quorum_received == true`. Returns entries sorted by global op
/// for deterministic processing.
+ ///
+ /// # Panics
+ /// If the front entry exists but `pop_front` returns `None` (unreachable).
pub fn drain_committable_all(&mut self) -> Vec<PipelineEntry> {
let mut drained = Vec::new();
@@ -117,6 +122,7 @@ impl NamespacedPipeline {
/// Walks forward from `current_commit + 1`, treating any op not found
/// in the pipeline (already drained) as committed. Stops at the first
/// op still present in a queue or past `last_push_op`.
+ #[must_use]
pub fn global_commit_frontier(&self, current_commit: u64) -> u64 {
let mut commit = current_commit;
loop {
@@ -143,6 +149,11 @@ impl Pipeline for NamespacedPipeline {
type Message = Message<PrepareHeader>;
type Entry = PipelineEntry;
+ /// # Panics
+ /// - If the pipeline is full.
+ /// - If ops are not globally sequential.
+ /// - If the hash chain is broken.
+ /// - If the namespace is not registered.
fn push_message(&mut self, message: Self::Message) {
assert!(
self.total_count < PIPELINE_PREPARE_QUEUE_MAX,
@@ -258,7 +269,7 @@ impl Pipeline for NamespacedPipeline {
fn verify(&self) {
assert!(self.total_count <= PIPELINE_PREPARE_QUEUE_MAX);
- let actual_count: usize = self.queues.values().map(|q| q.len()).sum();
+ let actual_count: usize =
self.queues.values().map(VecDeque::len).sum();
assert_eq!(actual_count, self.total_count, "total_count mismatch");
// Per-namespace: ops must be monotonically increasing
@@ -492,6 +503,7 @@ mod tests {
}
#[test]
+ #[allow(clippy::cast_possible_truncation)]
fn is_full() {
let mut pipeline = NamespacedPipeline::new();
pipeline.register_namespace(0);
@@ -506,6 +518,7 @@ mod tests {
#[test]
#[should_panic(expected = "namespaced pipeline full")]
+ #[allow(clippy::cast_possible_truncation)]
fn push_when_full_panics() {
let mut pipeline = NamespacedPipeline::new();
pipeline.register_namespace(0);
diff --git a/core/consensus/src/plane_helpers.rs
b/core/consensus/src/plane_helpers.rs
index f4ca92f3d..55d21534a 100644
--- a/core/consensus/src/plane_helpers.rs
+++ b/core/consensus/src/plane_helpers.rs
@@ -24,6 +24,12 @@ use std::ops::AsyncFnOnce;
// TODO: Rework all of those helpers, once the boundaries are more clear and
we have a better picture of the commonalities between all of the planes.
/// Shared pipeline-first request flow used by metadata and partitions.
+///
+/// # Panics
+/// - If the caller is not the primary.
+/// - If the consensus status is not normal.
+/// - If the consensus is syncing.
+#[allow(clippy::future_not_send)]
pub async fn pipeline_prepare_common<C, F>(
consensus: &C,
prepare: C::Message<C::ReplicateHeader>,
@@ -43,7 +49,8 @@ pub async fn pipeline_prepare_common<C, F>(
}
/// Shared commit-based old-prepare fence.
-pub fn fence_old_prepare_by_commit<B, P>(
+#[must_use]
+pub const fn fence_old_prepare_by_commit<B, P>(
consensus: &VsrConsensus<B, P>,
header: &PrepareHeader,
) -> bool
@@ -55,6 +62,13 @@ where
}
/// Shared chain-replication forwarding to the next replica.
+///
+/// # Panics
+/// - If `header.command` is not `Command2::Prepare`.
+/// - If `header.op <= consensus.commit()`.
+/// - If the computed next replica equals self.
+/// - If the message bus send fails.
+#[allow(clippy::future_not_send)]
pub async fn replicate_to_next_in_chain<B, P>(
consensus: &VsrConsensus<B, P>,
message: Message<PrepareHeader>,
@@ -87,6 +101,13 @@ pub async fn replicate_to_next_in_chain<B, P>(
/// Shared preflight checks for `on_replicate`.
///
/// Returns current op on success.
+///
+/// # Errors
+/// Returns a static error string if the replica is syncing, not in normal
+/// status, or the message has a newer view.
+///
+/// # Panics
+/// If `header.command` is not `Command2::Prepare`.
pub fn replicate_preflight<B, P>(
consensus: &VsrConsensus<B, P>,
header: &PrepareHeader,
@@ -119,6 +140,10 @@ where
}
/// Shared preflight checks for `on_ack`.
+///
+/// # Errors
+/// Returns a static error string if the replica is not primary or not in
+/// normal status.
pub fn ack_preflight<B, P>(consensus: &VsrConsensus<B, P>) -> Result<(),
&'static str>
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
@@ -171,6 +196,9 @@ where
///
/// Entries are drained only from the head and only while their op is covered
/// by the current commit frontier.
+///
+/// # Panics
+/// If `head()` returns `Some` but `pop_message()` returns `None`
(unreachable).
pub fn drain_committable_prefix<B, P>(consensus: &VsrConsensus<B, P>) ->
Vec<PipelineEntry>
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
@@ -195,6 +223,10 @@ where
}
/// Shared reply-message construction for committed prepare.
+///
+/// # Panics
+/// If the constructed message buffer is not valid.
+#[allow(clippy::needless_pass_by_value, clippy::cast_possible_truncation)]
pub fn build_reply_message<B, P>(
consensus: &VsrConsensus<B, P>,
prepare_header: &PrepareHeader,
@@ -239,6 +271,9 @@ where
}
/// Verify hash chain would not break if we add this header.
+///
+/// # Panics
+/// If both headers share the same view and `current.parent !=
previous.checksum`.
pub fn panic_if_hash_chain_would_break_in_same_view(
previous: &PrepareHeader,
current: &PrepareHeader,
@@ -254,6 +289,10 @@ pub fn panic_if_hash_chain_would_break_in_same_view(
}
// TODO: Figure out how to make this check the journal if it contains the
prepare.
+/// # Panics
+/// - If `header.command` is not `Command2::Prepare`.
+/// - If `header.view > consensus.view()`.
+#[allow(clippy::cast_possible_truncation, clippy::future_not_send)]
pub async fn send_prepare_ok<B, P>(
consensus: &VsrConsensus<B, P>,
header: &PrepareHeader,
@@ -272,7 +311,7 @@ pub async fn send_prepare_ok<B, P>(
return;
}
- if let Some(false) = is_persisted {
+ if is_persisted == Some(false) {
return;
}
@@ -534,6 +573,7 @@ mod tests {
}
}
+ #[allow(clippy::future_not_send)]
impl MessageBus for SpyBus {
type Client = u128;
type Replica = u8;
diff --git a/core/consensus/src/plane_mux.rs b/core/consensus/src/plane_mux.rs
index d904bf98d..6ad5078ff 100644
--- a/core/consensus/src/plane_mux.rs
+++ b/core/consensus/src/plane_mux.rs
@@ -26,19 +26,23 @@ pub struct MuxPlane<T> {
}
impl<T> MuxPlane<T> {
- pub fn new(inner: T) -> Self {
+ #[must_use]
+ pub const fn new(inner: T) -> Self {
Self { inner }
}
- pub fn inner(&self) -> &T {
+ #[must_use]
+ pub const fn inner(&self) -> &T {
&self.inner
}
- pub fn inner_mut(&mut self) -> &mut T {
+ pub const fn inner_mut(&mut self) -> &mut T {
&mut self.inner
}
}
+// Consensus runs on single-threaded compio shards; futures are intentionally
!Send.
+#[allow(clippy::future_not_send)]
impl<C, T> Plane<C> for MuxPlane<T>
where
C: Consensus,
@@ -63,6 +67,8 @@ where
}
}
+// Consensus runs on single-threaded compio shards; futures are intentionally
!Send.
+#[allow(clippy::future_not_send)]
impl<C> Plane<C> for ()
where
C: Consensus,
@@ -134,6 +140,8 @@ impl<T: PartitionsHandle> PartitionsHandle for MuxPlane<T> {
}
}
+// Consensus runs on single-threaded compio shards; futures are intentionally
!Send.
+#[allow(clippy::future_not_send)]
impl<C, Head, Tail> Plane<C> for variadic!(Head, ...Tail)
where
C: Consensus,
diff --git a/core/consensus/src/view_change_quorum.rs
b/core/consensus/src/view_change_quorum.rs
index 2a8e01e9b..2eaff15b3 100644
--- a/core/consensus/src/view_change_quorum.rs
+++ b/core/consensus/src/view_change_quorum.rs
@@ -17,7 +17,7 @@
use crate::REPLICAS_MAX;
-/// Stored information from a DoViewChange message.
+/// Stored information from a `DoViewChange` message.
#[derive(Debug, Clone, Copy)]
pub struct StoredDvc {
pub replica: u8,
@@ -28,12 +28,13 @@ pub struct StoredDvc {
}
impl StoredDvc {
- /// Compare for log selection: highest log_view, then highest op.
- pub fn is_better_than(&self, other: &StoredDvc) -> bool {
- if self.log_view != other.log_view {
- self.log_view > other.log_view
- } else {
+ /// Compare for log selection: highest `log_view`, then highest op.
+ #[must_use]
+ pub const fn is_better_than(&self, other: &Self) -> bool {
+ if self.log_view == other.log_view {
self.op > other.op
+ } else {
+ self.log_view > other.log_view
}
}
}
@@ -42,12 +43,13 @@ impl StoredDvc {
pub type DvcQuorumArray = [Option<StoredDvc>; REPLICAS_MAX];
/// Create an empty DVC quorum array.
+#[must_use]
pub const fn dvc_quorum_array_empty() -> DvcQuorumArray {
[None; REPLICAS_MAX]
}
/// Record a DVC in the array. Returns true if this is a new entry (not
duplicate).
-pub fn dvc_record(array: &mut DvcQuorumArray, dvc: StoredDvc) -> bool {
+pub const fn dvc_record(array: &mut DvcQuorumArray, dvc: StoredDvc) -> bool {
let slot = &mut array[dvc.replica as usize];
if slot.is_some() {
return false; // Duplicate
@@ -57,20 +59,20 @@ pub fn dvc_record(array: &mut DvcQuorumArray, dvc:
StoredDvc) -> bool {
}
/// Count how many DVCs have been received.
+#[must_use]
pub fn dvc_count(array: &DvcQuorumArray) -> usize {
array.iter().filter(|m| m.is_some()).count()
}
/// Check if a specific replica has sent a DVC.
+#[must_use]
pub fn dvc_has_from(array: &DvcQuorumArray, replica: u8) -> bool {
- array
- .get(replica as usize)
- .map(|m| m.is_some())
- .unwrap_or(false)
+ array.get(replica as usize).is_some_and(Option::is_some)
}
/// Select the winning DVC (best log) from the quorum.
-/// Returns the DVC with: highest log_view, then highest op.
+/// Returns the DVC with: highest `log_view`, then highest op.
+#[must_use]
pub fn dvc_select_winner(array: &DvcQuorumArray) -> Option<&StoredDvc> {
array
.iter()
@@ -82,6 +84,7 @@ pub fn dvc_select_winner(array: &DvcQuorumArray) ->
Option<&StoredDvc> {
}
/// Get the maximum commit number across all DVCs.
+#[must_use]
pub fn dvc_max_commit(array: &DvcQuorumArray) -> u64 {
array
.iter()
@@ -92,11 +95,12 @@ pub fn dvc_max_commit(array: &DvcQuorumArray) -> u64 {
}
/// Reset the DVC quorum array.
-pub fn dvc_reset(array: &mut DvcQuorumArray) {
+pub const fn dvc_reset(array: &mut DvcQuorumArray) {
*array = dvc_quorum_array_empty();
}
/// Iterator over all stored DVCs.
+// TODO: add #[must_use] -- pure iterator query, callers should not ignore.
pub fn dvc_iter(array: &DvcQuorumArray) -> impl Iterator<Item = &StoredDvc> {
array.iter().filter_map(|m| m.as_ref())
}
diff --git a/core/consensus/src/vsr_timeout.rs
b/core/consensus/src/vsr_timeout.rs
index f301ec7d2..eabc477e6 100644
--- a/core/consensus/src/vsr_timeout.rs
+++ b/core/consensus/src/vsr_timeout.rs
@@ -46,7 +46,8 @@ pub struct Timeout {
}
impl Timeout {
- pub fn new(id: u128, after: u64) -> Self {
+ #[must_use]
+ pub const fn new(id: u128, after: u64) -> Self {
Self {
id,
after,
@@ -56,30 +57,31 @@ impl Timeout {
}
}
- pub fn start(&mut self) {
+ pub const fn start(&mut self) {
self.ticks_remaining = self.after;
self.ticking = true;
self.attempts = 0;
}
- pub fn stop(&mut self) {
+ pub const fn stop(&mut self) {
self.ticking = false;
self.ticks_remaining = 0;
self.attempts = 0;
}
- pub fn reset(&mut self) {
+ pub const fn reset(&mut self) {
self.ticks_remaining = self.after;
self.attempts = 0;
}
- pub fn tick(&mut self) {
+ pub const fn tick(&mut self) {
if self.ticking {
self.ticks_remaining = self.ticks_remaining.saturating_sub(1);
}
}
- pub fn fired(&self) -> bool {
+ #[must_use]
+ pub const fn fired(&self) -> bool {
self.ticking && self.ticks_remaining == 0
}
@@ -135,6 +137,7 @@ impl TimeoutManager {
const DO_VIEW_CHANGE_MESSAGE_TICKS: u64 = 50;
const REQUEST_START_VIEW_MESSAGE_TICKS: u64 = 100;
+ // TODO: add #[must_use] -- constructor, discarding is always a bug.
pub fn new(replica_id: u128) -> Self {
Self {
ping: Timeout::new(replica_id, Self::PING_TICKS),
@@ -151,6 +154,8 @@ impl TimeoutManager {
replica_id,
Self::REQUEST_START_VIEW_MESSAGE_TICKS,
),
+ // Upper bits of replica_id are not critical for PRNG seed
diversity.
+ #[allow(clippy::cast_possible_truncation)]
prng: Xoshiro256Plus::seed_from_u64(replica_id as u64),
}
}
@@ -158,7 +163,7 @@ impl TimeoutManager {
/// Tick all timeouts
/// This is the first phase of the two-phase tick-based timeout mechanism.
/// 2nd phase is checking which timeouts have fired and calling the
appropriate handlers.
- pub fn tick(&mut self) {
+ pub const fn tick(&mut self) {
self.ping.tick();
self.prepare.tick();
self.commit_message.tick();
@@ -169,11 +174,12 @@ impl TimeoutManager {
self.view_change_status.tick();
}
- pub fn fired(&self, kind: TimeoutKind) -> bool {
+ #[must_use]
+ pub const fn fired(&self, kind: TimeoutKind) -> bool {
self.get(kind).fired()
}
- pub fn get(&self, kind: TimeoutKind) -> &Timeout {
+ pub const fn get(&self, kind: TimeoutKind) -> &Timeout {
match kind {
TimeoutKind::Ping => &self.ping,
TimeoutKind::Prepare => &self.prepare,
@@ -186,7 +192,7 @@ impl TimeoutManager {
}
}
- pub fn get_mut(&mut self, kind: TimeoutKind) -> &mut Timeout {
+ pub const fn get_mut(&mut self, kind: TimeoutKind) -> &mut Timeout {
match kind {
TimeoutKind::Ping => &mut self.ping,
TimeoutKind::Prepare => &mut self.prepare,
@@ -199,15 +205,15 @@ impl TimeoutManager {
}
}
- pub fn start(&mut self, kind: TimeoutKind) {
+ pub const fn start(&mut self, kind: TimeoutKind) {
self.get_mut(kind).start();
}
- pub fn stop(&mut self, kind: TimeoutKind) {
+ pub const fn stop(&mut self, kind: TimeoutKind) {
self.get_mut(kind).stop();
}
- pub fn reset(&mut self, kind: TimeoutKind) {
+ pub const fn reset(&mut self, kind: TimeoutKind) {
self.get_mut(kind).reset();
}
@@ -225,7 +231,8 @@ impl TimeoutManager {
timeout.backoff(&mut self.prng);
}
- pub fn is_ticking(&self, kind: TimeoutKind) -> bool {
+ #[must_use]
+ pub const fn is_ticking(&self, kind: TimeoutKind) -> bool {
self.get(kind).ticking
}
}