This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch pedantic-consensus in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 0312c6a4f7fd2fa7b047d4f3648f23d3b477cf2f Author: Hubert Gruszecki <[email protected]> AuthorDate: Tue Mar 3 13:00:10 2026 +0100 chore(consensus): enable clippy pedantic/nursery and fix all warnings --- core/consensus/Cargo.toml | 5 + core/consensus/src/impls.rs | 169 +++++++++++++++++++++--------- core/consensus/src/lib.rs | 2 +- core/consensus/src/namespaced_pipeline.rs | 17 ++- core/consensus/src/plane_helpers.rs | 44 +++++++- core/consensus/src/plane_mux.rs | 14 ++- core/consensus/src/view_change_quorum.rs | 30 +++--- core/consensus/src/vsr_timeout.rs | 35 ++++--- 8 files changed, 229 insertions(+), 87 deletions(-) diff --git a/core/consensus/Cargo.toml b/core/consensus/Cargo.toml index 70c6ff46b..33948523e 100644 --- a/core/consensus/Cargo.toml +++ b/core/consensus/Cargo.toml @@ -38,3 +38,8 @@ rand_xoshiro = { workspace = true } [dev-dependencies] futures = { workspace = true } + +[lints.clippy] +enum_glob_use = "deny" +pedantic = "deny" +nursery = "deny" diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs index e1e992d8a..016fe839f 100644 --- a/core/consensus/src/impls.rs +++ b/core/consensus/src/impls.rs @@ -50,7 +50,8 @@ pub struct LocalSequencer { } impl LocalSequencer { - pub fn new(initial_op: u64) -> Self { + #[must_use] + pub const fn new(initial_op: u64) -> Self { Self { op: Cell::new(initial_op), } @@ -89,11 +90,12 @@ pub struct PipelineEntry { pub header: PrepareHeader, /// Bitmap of replicas that have acknowledged this prepare. pub ok_from_replicas: BitSet<u32>, - /// Whether we've received a quorum of prepare_ok messages. + /// Whether we've received a quorum of `prepare_ok` messages. pub ok_quorum_received: bool, } impl PipelineEntry { + #[must_use] pub fn new(header: PrepareHeader) -> Self { Self { header, @@ -102,7 +104,7 @@ impl PipelineEntry { } } - /// Record a prepare_ok from the given replica. + /// Record a `prepare_ok` from the given replica. /// Returns the new count of acknowledgments. pub fn add_ack(&mut self, replica: u8) -> usize { self.ok_from_replicas.insert(replica as usize); @@ -110,11 +112,13 @@ impl PipelineEntry { } /// Check if we have an ack from the given replica. + #[must_use] pub fn has_ack(&self, replica: u8) -> bool { self.ok_from_replicas.contains(replica as usize) } /// Get the number of acks received. + #[must_use] pub fn ack_count(&self) -> usize { self.ok_from_replicas.len() } @@ -128,7 +132,7 @@ pub struct RequestEntry { } impl RequestEntry { - pub fn new(message: Message<RequestHeader>) -> Self { + pub const fn new(message: Message<RequestHeader>) -> Self { Self { message, received_at: 0, //TODO figure the correct way to do this @@ -149,25 +153,30 @@ impl Default for LocalPipeline { } impl LocalPipeline { + #[must_use] pub fn new() -> Self { Self { prepare_queue: VecDeque::with_capacity(PIPELINE_PREPARE_QUEUE_MAX), } } + #[must_use] pub fn prepare_count(&self) -> usize { self.prepare_queue.len() } + #[must_use] pub fn prepare_queue_full(&self) -> bool { self.prepare_queue.len() >= PIPELINE_PREPARE_QUEUE_MAX } /// Returns true if prepare queue is full. + #[must_use] pub fn is_full(&self) -> bool { self.prepare_queue_full() } + #[must_use] pub fn is_empty(&self) -> bool { self.prepare_queue.is_empty() } @@ -177,6 +186,8 @@ impl LocalPipeline { /// # Panics /// - If message queue is full. /// - If the message doesn't chain correctly to the previous entry. + // Signature must match `Pipeline::push_message` trait definition. + #[allow(clippy::needless_pass_by_value)] pub fn push_message(&mut self, message: Message<PrepareHeader>) { assert!(!self.prepare_queue_full(), "prepare queue is full"); @@ -208,6 +219,7 @@ impl LocalPipeline { } /// Get the head (oldest) prepare. + #[must_use] pub fn prepare_head(&self) -> Option<&PipelineEntry> { self.prepare_queue.front() } @@ -217,11 +229,15 @@ impl LocalPipeline { } /// Get the tail (newest) prepare. + #[must_use] pub fn prepare_tail(&self) -> Option<&PipelineEntry> { self.prepare_queue.back() } /// Find a message by op number and checksum (immutable). + // Pipeline bounded at PIPELINE_PREPARE_QUEUE_MAX (8) entries; index always fits in usize. + #[must_use] + #[allow(clippy::cast_possible_truncation)] pub fn message_by_op_and_checksum(&self, op: u64, checksum: u128) -> Option<&PipelineEntry> { let head_op = self.prepare_queue.front()?.header.op; let tail_op = self.prepare_queue.back()?.header.op; @@ -250,6 +266,9 @@ impl LocalPipeline { } /// Find a message by op number only. + // Pipeline bounded at PIPELINE_PREPARE_QUEUE_MAX (8) entries; index always fits in usize. + #[must_use] + #[allow(clippy::cast_possible_truncation)] pub fn message_by_op(&self, op: u64) -> Option<&PipelineEntry> { let head_op = self.prepare_queue.front()?.header.op; @@ -263,6 +282,8 @@ impl LocalPipeline { /// Get mutable reference to a message entry by op number. /// Returns None if op is not in the pipeline. + // Pipeline bounded at PIPELINE_PREPARE_QUEUE_MAX (8) entries; index always fits in usize. + #[allow(clippy::cast_possible_truncation)] pub fn message_by_op_mut(&mut self, op: u64) -> Option<&mut PipelineEntry> { let head_op = self.prepare_queue.front()?.header.op; if op < head_op { @@ -276,14 +297,16 @@ impl LocalPipeline { } /// Get the entry at the head of the prepare queue (oldest uncommitted). + #[must_use] pub fn head(&self) -> Option<&PipelineEntry> { self.prepare_queue.front() } - /// Search prepare queue for a message from the given client. + /// Search `prepare_queue` for a message from the given client. /// - /// If there are multiple messages (possible in prepare_queue after view change), + /// If there are multiple messages (possible in `prepare_queue` after view change), /// returns the latest one. + #[must_use] pub fn has_message_from_client(&self, client: u128) -> bool { self.prepare_queue.iter().any(|p| p.header.client == client) } @@ -324,43 +347,43 @@ impl Pipeline for LocalPipeline { type Entry = PipelineEntry; fn push_message(&mut self, message: Self::Message) { - LocalPipeline::push_message(self, message) + Self::push_message(self, message); } fn pop_message(&mut self) -> Option<Self::Entry> { - LocalPipeline::pop_message(self) + Self::pop_message(self) } fn clear(&mut self) { - LocalPipeline::clear(self) + Self::clear(self); } fn message_by_op(&self, op: u64) -> Option<&Self::Entry> { - LocalPipeline::message_by_op(self, op) + Self::message_by_op(self, op) } fn message_by_op_mut(&mut self, op: u64) -> Option<&mut Self::Entry> { - LocalPipeline::message_by_op_mut(self, op) + Self::message_by_op_mut(self, op) } fn message_by_op_and_checksum(&self, op: u64, checksum: u128) -> Option<&Self::Entry> { - LocalPipeline::message_by_op_and_checksum(self, op, checksum) + Self::message_by_op_and_checksum(self, op, checksum) } fn head(&self) -> Option<&Self::Entry> { - LocalPipeline::head(self) + Self::head(self) } fn is_full(&self) -> bool { - LocalPipeline::is_full(self) + Self::is_full(self) } fn is_empty(&self) -> bool { - LocalPipeline::is_empty(self) + Self::is_empty(self) } fn verify(&self) { - LocalPipeline::verify(self) + Self::verify(self); } } @@ -374,9 +397,9 @@ pub enum Status { /// Actions to be taken by the caller after processing a VSR event. #[derive(Debug, Clone, PartialEq, Eq)] pub enum VsrAction { - /// Send StartViewChange to all replicas. + /// Send `StartViewChange` to all replicas. SendStartViewChange { view: u32, namespace: u64 }, - /// Send DoViewChange to primary. + /// Send `DoViewChange` to primary. SendDoViewChange { view: u32, target: u8, @@ -385,14 +408,14 @@ pub enum VsrAction { commit: u64, namespace: u64, }, - /// Send StartView to all backups (as new primary). + /// Send `StartView` to all backups (as new primary). SendStartView { view: u32, op: u64, commit: u64, namespace: u64, }, - /// Send PrepareOK to primary. + /// Send `PrepareOK` to primary. SendPrepareOk { view: u32, op: u64, @@ -453,6 +476,9 @@ where } impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { + /// # Panics + /// - If `replica >= replica_count`. + /// - If `replica_count < 1`. pub fn new( cluster: u128, replica: u8, @@ -469,7 +495,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { // TODO: Verify that XOR-based seeding provides sufficient jitter diversity // across groups. Consider using a proper hash (e.g., Murmur3) of // (replica_id, namespace) for production. - let timeout_seed = replica as u128 ^ namespace as u128; + let timeout_seed = u128::from(replica) ^ u128::from(namespace); Self { cluster, replica, @@ -499,14 +525,21 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { self.status.set(Status::Normal); } - pub fn primary_index(&self, view: u32) -> u8 { + #[must_use] + // cast_lossless: `u32::from()` unavailable in const fn. + // cast_possible_truncation: modulo by replica_count (u8) guarantees result fits in u8. + #[allow(clippy::cast_lossless, clippy::cast_possible_truncation)] + pub const fn primary_index(&self, view: u32) -> u8 { (view % self.replica_count as u32) as u8 } - pub fn is_primary(&self) -> bool { + #[must_use] + pub const fn is_primary(&self) -> bool { self.primary_index(self.view.get()) == self.replica } + /// # Panics + /// If the stored commit number somehow exceeds the given `commit` after update. pub fn advance_commit_number(&self, commit: u64) { if commit > self.commit.get() { self.commit.set(commit); @@ -517,33 +550,40 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { /// Maximum number of faulty replicas that can be tolerated. /// For a cluster of 2f+1 replicas, this returns f. - pub fn max_faulty(&self) -> usize { + #[must_use] + pub const fn max_faulty(&self) -> usize { (self.replica_count as usize - 1) / 2 } - /// Quorum size = f + 1 = max_faulty + 1 - pub fn quorum(&self) -> usize { + /// Quorum size = f + 1 = `max_faulty` + 1 + #[must_use] + pub const fn quorum(&self) -> usize { self.max_faulty() + 1 } - pub fn commit(&self) -> u64 { + #[must_use] + pub const fn commit(&self) -> u64 { self.commit.get() } - pub fn is_syncing(&self) -> bool { - // for now return false. we have to add syncing related setup to VsrConsensus to make this work. + #[must_use] + pub const fn is_syncing(&self) -> bool { + // TODO: for now return false. we have to add syncing related setup to VsrConsensus to make this work. false } - pub fn replica(&self) -> u8 { + #[must_use] + pub const fn replica(&self) -> u8 { self.replica } - pub fn sequencer(&self) -> &LocalSequencer { + #[must_use] + pub const fn sequencer(&self) -> &LocalSequencer { &self.sequencer } - pub fn view(&self) -> u32 { + #[must_use] + pub const fn view(&self) -> u32 { self.view.get() } @@ -551,34 +591,41 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { self.view.set(view); } - pub fn status(&self) -> Status { + #[must_use] + pub const fn status(&self) -> Status { self.status.get() } // TODO(hubcio): returning &RefCell<P> leaks interior mutability - callers // could hold a Ref/RefMut across an .await and cause a runtime panic. // We had this problem with slab + ECS. - pub fn pipeline(&self) -> &RefCell<P> { + #[must_use] + pub const fn pipeline(&self) -> &RefCell<P> { &self.pipeline } - pub fn pipeline_mut(&mut self) -> &mut RefCell<P> { + #[must_use] + pub const fn pipeline_mut(&mut self) -> &mut RefCell<P> { &mut self.pipeline } - pub fn cluster(&self) -> u128 { + #[must_use] + pub const fn cluster(&self) -> u128 { self.cluster } - pub fn replica_count(&self) -> u8 { + #[must_use] + pub const fn replica_count(&self) -> u8 { self.replica_count } - pub fn namespace(&self) -> u64 { + #[must_use] + pub const fn namespace(&self) -> u64 { self.namespace } - pub fn last_prepare_checksum(&self) -> u128 { + #[must_use] + pub const fn last_prepare_checksum(&self) -> u128 { self.last_prepare_checksum.get() } @@ -586,7 +633,8 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { self.last_prepare_checksum.set(checksum); } - pub fn log_view(&self) -> u32 { + #[must_use] + pub const fn log_view(&self) -> u32 { self.log_view.get() } @@ -594,7 +642,8 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { self.log_view.set(log_view); } - pub fn is_primary_for_view(&self, view: u32) -> bool { + #[must_use] + pub const fn is_primary_for_view(&self, view: u32) -> bool { self.primary_index(view) == self.replica } @@ -624,9 +673,9 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { /// Reset all view change state when transitioning to a new view. /// - /// Clears the loopback queue: stale PrepareOks from the old view + /// Clears the loopback queue: stale `PrepareOks` from the old view /// reference pipeline entries that no longer exist, so processing - /// them would be a no-op (handle_prepare_ok ignores unknown ops). + /// them would be a no-op (`handle_prepare_ok` ignores unknown ops). /// The primary does not require its own self-ack for quorum. pub(crate) fn reset_view_change_state(&self) { self.reset_svc_quorum(); @@ -675,7 +724,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { actions } - /// Called when normal_heartbeat timeout fires. + /// Called when `normal_heartbeat` timeout fires. /// Backup hasn't heard from primary - start view change. fn handle_normal_heartbeat_timeout(&self) -> Vec<VsrAction> { // Only backups trigger view change on heartbeat timeout @@ -788,11 +837,14 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { }] } - /// Handle a received StartViewChange message. + /// Handle a received `StartViewChange` message. /// /// "When replica i receives STARTVIEWCHANGE messages for its view-number /// from f OTHER replicas, it sends a DOVIEWCHANGE message to the node /// that will be the primary in the new view." + /// + /// # Panics + /// If `header.namespace` does not match this replica's namespace. pub fn handle_start_view_change(&self, header: &StartViewChangeHeader) -> Vec<VsrAction> { assert_eq!( header.namespace, self.namespace, @@ -887,11 +939,14 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { actions } - /// Handle a received DoViewChange message (only relevant for primary candidate). + /// Handle a received `DoViewChange` message (only relevant for primary candidate). /// /// "When the new primary receives f + 1 DOVIEWCHANGE messages from different /// replicas (including itself), it sets its view-number to that in the messages /// and selects as the new log the one contained in the message with the largest v'..." + /// + /// # Panics + /// If `header.namespace` does not match this replica's namespace. pub fn handle_do_view_change(&self, header: &DoViewChangeHeader) -> Vec<VsrAction> { assert_eq!( header.namespace, self.namespace, @@ -984,12 +1039,15 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { actions } - /// Handle a received StartView message (backups only). + /// Handle a received `StartView` message (backups only). /// /// "When other replicas receive the STARTVIEW message, they replace their log /// with the one in the message, set their op-number to that of the latest entry /// in the log, set their view-number to the view number in the message, change - /// their status to normal, and send PrepareOK for any uncommitted ops." + /// their status to normal, and send `PrepareOK` for any uncommitted ops." + /// + /// # Panics + /// If `header.namespace` does not match this replica's namespace. pub fn handle_start_view(&self, header: &StartViewHeader) -> Vec<VsrAction> { assert_eq!(header.namespace, self.namespace, "SV routed to wrong group"); let from_replica = header.replica; @@ -1091,10 +1149,14 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { }] } - /// Handle a PrepareOk message from a replica. + /// Handle a `PrepareOk` message from a replica. /// /// Returns `true` if quorum was just reached for this op. /// Caller (`on_ack`) should validate `is_primary` and status before calling. + /// + /// # Panics + /// - If `header.command` is not `Command2::PrepareOk`. + /// - If `header.replica >= self.replica_count`. pub fn handle_prepare_ok(&self, header: &PrepareOkHeader) -> bool { assert_eq!(header.command, Command2::PrepareOk); assert!( @@ -1151,7 +1213,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { /// Enqueue a self-addressed message for processing in the next loopback drain. /// - /// Currently only PrepareOk messages are routed here (via `send_or_loopback`). + /// Currently only `PrepareOk` messages are routed here (via `send_or_loopback`). // TODO: Route SVC/DVC self-messages through loopback once VsrAction dispatch is implemented. pub(crate) fn push_loopback(&self, message: Message<GenericHeader>) { assert!( @@ -1170,6 +1232,8 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { } /// Send a message to `target`, routing self-addressed messages through the loopback queue. + // VsrConsensus uses Cell/RefCell for single-threaded compio shards; futures are intentionally !Send. + #[allow(clippy::future_not_send)] pub(crate) async fn send_or_loopback(&self, target: u8, message: Message<GenericHeader>) where B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, @@ -1185,7 +1249,8 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { } } - pub fn message_bus(&self) -> &B { + #[must_use] + pub const fn message_bus(&self) -> &B { &self.message_bus } } @@ -1226,7 +1291,7 @@ where impl<B, P> Project<Message<PrepareOkHeader>, VsrConsensus<B, P>> for Message<PrepareHeader> where B: MessageBus, - P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>, + P: Pipeline<Message = Self, Entry = PipelineEntry>, { type Consensus = VsrConsensus<B, P>; diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs index b1f7460e2..dcc5cbc6d 100644 --- a/core/consensus/src/lib.rs +++ b/core/consensus/src/lib.rs @@ -78,7 +78,7 @@ pub trait Consensus: Sized { /// This abstracts the VSR message flow: /// - request -> prepare /// - replicate (prepare) -/// - ack (prepare_ok) +/// - ack (`prepare_ok`) pub trait Plane<C> where C: Consensus, diff --git a/core/consensus/src/namespaced_pipeline.rs b/core/consensus/src/namespaced_pipeline.rs index 05d47d80f..2eb90125b 100644 --- a/core/consensus/src/namespaced_pipeline.rs +++ b/core/consensus/src/namespaced_pipeline.rs @@ -24,7 +24,7 @@ use std::collections::{HashMap, VecDeque}; /// Pipeline that partitions entries by namespace for independent commit draining. /// /// A single global op sequence and hash chain spans all namespaces, but entries -/// are stored in per-namespace VecDeques. Each namespace tracks its own commit +/// are stored in per-namespace `VecDeques`. Each namespace tracks its own commit /// frontier so `drain_committable_all` drains quorum'd entries per-namespace /// without waiting for the global commit to advance past unrelated namespaces. /// @@ -58,6 +58,7 @@ impl Default for NamespacedPipeline { } impl NamespacedPipeline { + #[must_use] pub fn new() -> Self { Self { queues: HashMap::new(), @@ -75,6 +76,7 @@ impl NamespacedPipeline { } /// Per-namespace commit frontier for the given namespace. + #[must_use] pub fn ns_commit(&self, ns: u64) -> Option<u64> { self.ns_commits.get(&ns).copied() } @@ -84,6 +86,9 @@ impl NamespacedPipeline { /// For each namespace queue, drains from the front while entries have /// `ok_quorum_received == true`. Returns entries sorted by global op /// for deterministic processing. + /// + /// # Panics + /// If the front entry exists but `pop_front` returns `None` (unreachable). pub fn drain_committable_all(&mut self) -> Vec<PipelineEntry> { let mut drained = Vec::new(); @@ -117,6 +122,7 @@ impl NamespacedPipeline { /// Walks forward from `current_commit + 1`, treating any op not found /// in the pipeline (already drained) as committed. Stops at the first /// op still present in a queue or past `last_push_op`. + #[must_use] pub fn global_commit_frontier(&self, current_commit: u64) -> u64 { let mut commit = current_commit; loop { @@ -143,6 +149,11 @@ impl Pipeline for NamespacedPipeline { type Message = Message<PrepareHeader>; type Entry = PipelineEntry; + /// # Panics + /// - If the pipeline is full. + /// - If ops are not globally sequential. + /// - If the hash chain is broken. + /// - If the namespace is not registered. fn push_message(&mut self, message: Self::Message) { assert!( self.total_count < PIPELINE_PREPARE_QUEUE_MAX, @@ -258,7 +269,7 @@ impl Pipeline for NamespacedPipeline { fn verify(&self) { assert!(self.total_count <= PIPELINE_PREPARE_QUEUE_MAX); - let actual_count: usize = self.queues.values().map(|q| q.len()).sum(); + let actual_count: usize = self.queues.values().map(VecDeque::len).sum(); assert_eq!(actual_count, self.total_count, "total_count mismatch"); // Per-namespace: ops must be monotonically increasing @@ -492,6 +503,7 @@ mod tests { } #[test] + #[allow(clippy::cast_possible_truncation)] fn is_full() { let mut pipeline = NamespacedPipeline::new(); pipeline.register_namespace(0); @@ -506,6 +518,7 @@ mod tests { #[test] #[should_panic(expected = "namespaced pipeline full")] + #[allow(clippy::cast_possible_truncation)] fn push_when_full_panics() { let mut pipeline = NamespacedPipeline::new(); pipeline.register_namespace(0); diff --git a/core/consensus/src/plane_helpers.rs b/core/consensus/src/plane_helpers.rs index f4ca92f3d..55d21534a 100644 --- a/core/consensus/src/plane_helpers.rs +++ b/core/consensus/src/plane_helpers.rs @@ -24,6 +24,12 @@ use std::ops::AsyncFnOnce; // TODO: Rework all of those helpers, once the boundaries are more clear and we have a better picture of the commonalities between all of the planes. /// Shared pipeline-first request flow used by metadata and partitions. +/// +/// # Panics +/// - If the caller is not the primary. +/// - If the consensus status is not normal. +/// - If the consensus is syncing. +#[allow(clippy::future_not_send)] pub async fn pipeline_prepare_common<C, F>( consensus: &C, prepare: C::Message<C::ReplicateHeader>, @@ -43,7 +49,8 @@ pub async fn pipeline_prepare_common<C, F>( } /// Shared commit-based old-prepare fence. -pub fn fence_old_prepare_by_commit<B, P>( +#[must_use] +pub const fn fence_old_prepare_by_commit<B, P>( consensus: &VsrConsensus<B, P>, header: &PrepareHeader, ) -> bool @@ -55,6 +62,13 @@ where } /// Shared chain-replication forwarding to the next replica. +/// +/// # Panics +/// - If `header.command` is not `Command2::Prepare`. +/// - If `header.op <= consensus.commit()`. +/// - If the computed next replica equals self. +/// - If the message bus send fails. +#[allow(clippy::future_not_send)] pub async fn replicate_to_next_in_chain<B, P>( consensus: &VsrConsensus<B, P>, message: Message<PrepareHeader>, @@ -87,6 +101,13 @@ pub async fn replicate_to_next_in_chain<B, P>( /// Shared preflight checks for `on_replicate`. /// /// Returns current op on success. +/// +/// # Errors +/// Returns a static error string if the replica is syncing, not in normal +/// status, or the message has a newer view. +/// +/// # Panics +/// If `header.command` is not `Command2::Prepare`. pub fn replicate_preflight<B, P>( consensus: &VsrConsensus<B, P>, header: &PrepareHeader, @@ -119,6 +140,10 @@ where } /// Shared preflight checks for `on_ack`. +/// +/// # Errors +/// Returns a static error string if the replica is not primary or not in +/// normal status. pub fn ack_preflight<B, P>(consensus: &VsrConsensus<B, P>) -> Result<(), &'static str> where B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, @@ -171,6 +196,9 @@ where /// /// Entries are drained only from the head and only while their op is covered /// by the current commit frontier. +/// +/// # Panics +/// If `head()` returns `Some` but `pop_message()` returns `None` (unreachable). pub fn drain_committable_prefix<B, P>(consensus: &VsrConsensus<B, P>) -> Vec<PipelineEntry> where B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, @@ -195,6 +223,10 @@ where } /// Shared reply-message construction for committed prepare. +/// +/// # Panics +/// If the constructed message buffer is not valid. +#[allow(clippy::needless_pass_by_value, clippy::cast_possible_truncation)] pub fn build_reply_message<B, P>( consensus: &VsrConsensus<B, P>, prepare_header: &PrepareHeader, @@ -239,6 +271,9 @@ where } /// Verify hash chain would not break if we add this header. +/// +/// # Panics +/// If both headers share the same view and `current.parent != previous.checksum`. pub fn panic_if_hash_chain_would_break_in_same_view( previous: &PrepareHeader, current: &PrepareHeader, @@ -254,6 +289,10 @@ pub fn panic_if_hash_chain_would_break_in_same_view( } // TODO: Figure out how to make this check the journal if it contains the prepare. +/// # Panics +/// - If `header.command` is not `Command2::Prepare`. +/// - If `header.view > consensus.view()`. +#[allow(clippy::cast_possible_truncation, clippy::future_not_send)] pub async fn send_prepare_ok<B, P>( consensus: &VsrConsensus<B, P>, header: &PrepareHeader, @@ -272,7 +311,7 @@ pub async fn send_prepare_ok<B, P>( return; } - if let Some(false) = is_persisted { + if is_persisted == Some(false) { return; } @@ -534,6 +573,7 @@ mod tests { } } + #[allow(clippy::future_not_send)] impl MessageBus for SpyBus { type Client = u128; type Replica = u8; diff --git a/core/consensus/src/plane_mux.rs b/core/consensus/src/plane_mux.rs index 1f32cb810..a26076937 100644 --- a/core/consensus/src/plane_mux.rs +++ b/core/consensus/src/plane_mux.rs @@ -26,19 +26,23 @@ pub struct MuxPlane<T> { } impl<T> MuxPlane<T> { - pub fn new(inner: T) -> Self { + #[must_use] + pub const fn new(inner: T) -> Self { Self { inner } } - pub fn inner(&self) -> &T { + #[must_use] + pub const fn inner(&self) -> &T { &self.inner } - pub fn inner_mut(&mut self) -> &mut T { + pub const fn inner_mut(&mut self) -> &mut T { &mut self.inner } } +// Consensus runs on single-threaded compio shards; futures are intentionally !Send. +#[allow(clippy::future_not_send)] impl<C, T> Plane<C> for MuxPlane<T> where C: Consensus, @@ -63,6 +67,8 @@ where } } +// Consensus runs on single-threaded compio shards; futures are intentionally !Send. +#[allow(clippy::future_not_send)] impl<C> Plane<C> for () where C: Consensus, @@ -82,6 +88,8 @@ where async fn on_ack(&self, _message: AckMessage<C>) {} } +// Consensus runs on single-threaded compio shards; futures are intentionally !Send. +#[allow(clippy::future_not_send)] impl<C, Head, Tail> Plane<C> for variadic!(Head, ...Tail) where C: Consensus, diff --git a/core/consensus/src/view_change_quorum.rs b/core/consensus/src/view_change_quorum.rs index 2a8e01e9b..2eaff15b3 100644 --- a/core/consensus/src/view_change_quorum.rs +++ b/core/consensus/src/view_change_quorum.rs @@ -17,7 +17,7 @@ use crate::REPLICAS_MAX; -/// Stored information from a DoViewChange message. +/// Stored information from a `DoViewChange` message. #[derive(Debug, Clone, Copy)] pub struct StoredDvc { pub replica: u8, @@ -28,12 +28,13 @@ pub struct StoredDvc { } impl StoredDvc { - /// Compare for log selection: highest log_view, then highest op. - pub fn is_better_than(&self, other: &StoredDvc) -> bool { - if self.log_view != other.log_view { - self.log_view > other.log_view - } else { + /// Compare for log selection: highest `log_view`, then highest op. + #[must_use] + pub const fn is_better_than(&self, other: &Self) -> bool { + if self.log_view == other.log_view { self.op > other.op + } else { + self.log_view > other.log_view } } } @@ -42,12 +43,13 @@ impl StoredDvc { pub type DvcQuorumArray = [Option<StoredDvc>; REPLICAS_MAX]; /// Create an empty DVC quorum array. +#[must_use] pub const fn dvc_quorum_array_empty() -> DvcQuorumArray { [None; REPLICAS_MAX] } /// Record a DVC in the array. Returns true if this is a new entry (not duplicate). -pub fn dvc_record(array: &mut DvcQuorumArray, dvc: StoredDvc) -> bool { +pub const fn dvc_record(array: &mut DvcQuorumArray, dvc: StoredDvc) -> bool { let slot = &mut array[dvc.replica as usize]; if slot.is_some() { return false; // Duplicate @@ -57,20 +59,20 @@ pub fn dvc_record(array: &mut DvcQuorumArray, dvc: StoredDvc) -> bool { } /// Count how many DVCs have been received. +#[must_use] pub fn dvc_count(array: &DvcQuorumArray) -> usize { array.iter().filter(|m| m.is_some()).count() } /// Check if a specific replica has sent a DVC. +#[must_use] pub fn dvc_has_from(array: &DvcQuorumArray, replica: u8) -> bool { - array - .get(replica as usize) - .map(|m| m.is_some()) - .unwrap_or(false) + array.get(replica as usize).is_some_and(Option::is_some) } /// Select the winning DVC (best log) from the quorum. -/// Returns the DVC with: highest log_view, then highest op. +/// Returns the DVC with: highest `log_view`, then highest op. +#[must_use] pub fn dvc_select_winner(array: &DvcQuorumArray) -> Option<&StoredDvc> { array .iter() @@ -82,6 +84,7 @@ pub fn dvc_select_winner(array: &DvcQuorumArray) -> Option<&StoredDvc> { } /// Get the maximum commit number across all DVCs. +#[must_use] pub fn dvc_max_commit(array: &DvcQuorumArray) -> u64 { array .iter() @@ -92,11 +95,12 @@ pub fn dvc_max_commit(array: &DvcQuorumArray) -> u64 { } /// Reset the DVC quorum array. -pub fn dvc_reset(array: &mut DvcQuorumArray) { +pub const fn dvc_reset(array: &mut DvcQuorumArray) { *array = dvc_quorum_array_empty(); } /// Iterator over all stored DVCs. +// TODO: add #[must_use] -- pure iterator query, callers should not ignore. pub fn dvc_iter(array: &DvcQuorumArray) -> impl Iterator<Item = &StoredDvc> { array.iter().filter_map(|m| m.as_ref()) } diff --git a/core/consensus/src/vsr_timeout.rs b/core/consensus/src/vsr_timeout.rs index f301ec7d2..eabc477e6 100644 --- a/core/consensus/src/vsr_timeout.rs +++ b/core/consensus/src/vsr_timeout.rs @@ -46,7 +46,8 @@ pub struct Timeout { } impl Timeout { - pub fn new(id: u128, after: u64) -> Self { + #[must_use] + pub const fn new(id: u128, after: u64) -> Self { Self { id, after, @@ -56,30 +57,31 @@ impl Timeout { } } - pub fn start(&mut self) { + pub const fn start(&mut self) { self.ticks_remaining = self.after; self.ticking = true; self.attempts = 0; } - pub fn stop(&mut self) { + pub const fn stop(&mut self) { self.ticking = false; self.ticks_remaining = 0; self.attempts = 0; } - pub fn reset(&mut self) { + pub const fn reset(&mut self) { self.ticks_remaining = self.after; self.attempts = 0; } - pub fn tick(&mut self) { + pub const fn tick(&mut self) { if self.ticking { self.ticks_remaining = self.ticks_remaining.saturating_sub(1); } } - pub fn fired(&self) -> bool { + #[must_use] + pub const fn fired(&self) -> bool { self.ticking && self.ticks_remaining == 0 } @@ -135,6 +137,7 @@ impl TimeoutManager { const DO_VIEW_CHANGE_MESSAGE_TICKS: u64 = 50; const REQUEST_START_VIEW_MESSAGE_TICKS: u64 = 100; + // TODO: add #[must_use] -- constructor, discarding is always a bug. pub fn new(replica_id: u128) -> Self { Self { ping: Timeout::new(replica_id, Self::PING_TICKS), @@ -151,6 +154,8 @@ impl TimeoutManager { replica_id, Self::REQUEST_START_VIEW_MESSAGE_TICKS, ), + // Upper bits of replica_id are not critical for PRNG seed diversity. + #[allow(clippy::cast_possible_truncation)] prng: Xoshiro256Plus::seed_from_u64(replica_id as u64), } } @@ -158,7 +163,7 @@ impl TimeoutManager { /// Tick all timeouts /// This is the first phase of the two-phase tick-based timeout mechanism. /// 2nd phase is checking which timeouts have fired and calling the appropriate handlers. - pub fn tick(&mut self) { + pub const fn tick(&mut self) { self.ping.tick(); self.prepare.tick(); self.commit_message.tick(); @@ -169,11 +174,12 @@ impl TimeoutManager { self.view_change_status.tick(); } - pub fn fired(&self, kind: TimeoutKind) -> bool { + #[must_use] + pub const fn fired(&self, kind: TimeoutKind) -> bool { self.get(kind).fired() } - pub fn get(&self, kind: TimeoutKind) -> &Timeout { + pub const fn get(&self, kind: TimeoutKind) -> &Timeout { match kind { TimeoutKind::Ping => &self.ping, TimeoutKind::Prepare => &self.prepare, @@ -186,7 +192,7 @@ impl TimeoutManager { } } - pub fn get_mut(&mut self, kind: TimeoutKind) -> &mut Timeout { + pub const fn get_mut(&mut self, kind: TimeoutKind) -> &mut Timeout { match kind { TimeoutKind::Ping => &mut self.ping, TimeoutKind::Prepare => &mut self.prepare, @@ -199,15 +205,15 @@ impl TimeoutManager { } } - pub fn start(&mut self, kind: TimeoutKind) { + pub const fn start(&mut self, kind: TimeoutKind) { self.get_mut(kind).start(); } - pub fn stop(&mut self, kind: TimeoutKind) { + pub const fn stop(&mut self, kind: TimeoutKind) { self.get_mut(kind).stop(); } - pub fn reset(&mut self, kind: TimeoutKind) { + pub const fn reset(&mut self, kind: TimeoutKind) { self.get_mut(kind).reset(); } @@ -225,7 +231,8 @@ impl TimeoutManager { timeout.backoff(&mut self.prng); } - pub fn is_ticking(&self, kind: TimeoutKind) -> bool { + #[must_use] + pub const fn is_ticking(&self, kind: TimeoutKind) -> bool { self.get(kind).ticking } }
