This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new ad46a979c chore(shard,metadata): enable pedantic and nursery clippy
lints (#2896)
ad46a979c is described below
commit ad46a979c8a5e301ee9d7ba197785bd5d1d06f9b
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Mar 9 11:47:05 2026 +0100
chore(shard,metadata): enable pedantic and nursery clippy lints (#2896)
---
core/bench/Cargo.toml | 2 +-
core/message_bus/Cargo.toml | 2 +-
core/metadata/Cargo.toml | 5 ++++
core/metadata/src/impls/metadata.rs | 20 +++++++++----
core/metadata/src/permissioner/mod.rs | 13 ++++-----
.../permissioner_rules/consumer_groups.rs | 1 +
.../permissioner_rules/consumer_offsets.rs | 1 +
.../permissioner/permissioner_rules/messages.rs | 5 ++--
.../permissioner/permissioner_rules/partitions.rs | 1 +
.../permissioner/permissioner_rules/segments.rs | 1 +
.../src/permissioner/permissioner_rules/streams.rs | 1 +
.../src/permissioner/permissioner_rules/system.rs | 1 +
.../src/permissioner/permissioner_rules/topics.rs | 7 +++--
.../src/permissioner/permissioner_rules/users.rs | 1 +
core/metadata/src/stats/mod.rs | 4 +--
core/metadata/src/stm/consumer_group.rs | 17 +++++------
core/metadata/src/stm/mod.rs | 18 +++++++++---
core/metadata/src/stm/mux.rs | 12 ++++----
core/metadata/src/stm/snapshot.rs | 33 ++++++++++++++--------
core/metadata/src/stm/stream.rs | 5 +++-
core/metadata/src/stm/user.rs | 7 ++++-
core/partitions/Cargo.toml | 2 +-
core/shard/Cargo.toml | 5 ++++
core/shard/src/lib.rs | 18 ++++++++++--
core/shard/src/router.rs | 4 ++-
core/shard/src/shards_table.rs | 7 +++--
26 files changed, 133 insertions(+), 60 deletions(-)
diff --git a/core/bench/Cargo.toml b/core/bench/Cargo.toml
index a94add9fa..2cd42a80d 100644
--- a/core/bench/Cargo.toml
+++ b/core/bench/Cargo.toml
@@ -57,4 +57,4 @@ uuid = { workspace = true }
[lints.clippy]
enum_glob_use = "deny"
pedantic = "deny"
-nursery = "deny"
+nursery = "warn"
diff --git a/core/message_bus/Cargo.toml b/core/message_bus/Cargo.toml
index fabab64ce..aabb56543 100644
--- a/core/message_bus/Cargo.toml
+++ b/core/message_bus/Cargo.toml
@@ -34,4 +34,4 @@ rand = { workspace = true }
[lints.clippy]
enum_glob_use = "deny"
pedantic = "deny"
-nursery = "deny"
+nursery = "warn"
diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml
index fcad045bf..6cdad3ad4 100644
--- a/core/metadata/Cargo.toml
+++ b/core/metadata/Cargo.toml
@@ -40,3 +40,8 @@ rmp-serde = { workspace = true }
serde = { workspace = true, features = ["derive"] }
slab = { workspace = true }
tracing = { workspace = true }
+
+[lints.clippy]
+enum_glob_use = "deny"
+pedantic = "deny"
+nursery = "warn"
diff --git a/core/metadata/src/impls/metadata.rs
b/core/metadata/src/impls/metadata.rs
index 41cb21296..9751eb0c7 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -41,13 +41,15 @@ pub struct IggySnapshot {
#[allow(unused)]
impl IggySnapshot {
+ #[must_use]
pub fn new(sequence_number: u64) -> Self {
Self {
snapshot: MetadataSnapshot::new(sequence_number),
}
}
- pub fn snapshot(&self) -> &MetadataSnapshot {
+ #[must_use]
+ pub const fn snapshot(&self) -> &MetadataSnapshot {
&self.snapshot
}
}
@@ -99,6 +101,7 @@ pub struct IggyMetadata<C, J, S, M> {
pub mux_stm: M,
}
+#[allow(clippy::future_not_send)]
impl<B, J, S, M> Plane<VsrConsensus<B>> for IggyMetadata<VsrConsensus<B>, J,
S, M>
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
@@ -137,12 +140,13 @@ where
};
// TODO: Handle idx calculation, for now using header.op, but since
the journal may get compacted, this may not be correct.
+ #[allow(clippy::cast_possible_truncation)]
let is_old_prepare = fence_old_prepare_by_commit(consensus, header)
|| journal.handle().header(header.op as usize).is_some();
- if !is_old_prepare {
- self.replicate(message.clone()).await;
- } else {
+ if is_old_prepare {
warn!("received old prepare, not replicating");
+ } else {
+ self.replicate(message.clone()).await;
}
// TODO add assertions for valid state here.
@@ -245,7 +249,7 @@ where
.message_bus()
.send_to_client(prepare_header.client, generic_reply)
.await
- .unwrap()
+ .unwrap();
}
}
}
@@ -285,6 +289,7 @@ where
/// - Primary sends to first backup
/// - Each backup forwards to the next
/// - Stops when we would forward back to primary
+ #[allow(clippy::future_not_send)]
async fn replicate(&self, message: Message<PrepareHeader>) {
let consensus = self.consensus.as_ref().unwrap();
let journal = self.journal.as_ref().unwrap();
@@ -292,6 +297,7 @@ where
let header = message.header();
// TODO: calculate the index;
+ #[allow(clippy::cast_possible_truncation)]
let idx = header.op as usize;
assert_eq!(header.command, Command2::Prepare);
assert!(
@@ -304,12 +310,14 @@ where
// TODO: Implement jump_to_newer_op
// fn jump_to_newer_op(&self, header: &PrepareHeader) {}
- fn commit_journal(&self) {
+ #[allow(clippy::unused_self)]
+ const fn commit_journal(&self) {
// TODO: Implement commit logic
// Walk through journal from last committed to current commit number
// Apply each entry to the state machine
}
+ #[allow(clippy::future_not_send, clippy::cast_possible_truncation)]
async fn send_prepare_ok(&self, header: &PrepareHeader) {
let consensus = self.consensus.as_ref().unwrap();
let journal = self.journal.as_ref().unwrap();
diff --git a/core/metadata/src/permissioner/mod.rs
b/core/metadata/src/permissioner/mod.rs
index 52b13443f..74a83ec23 100644
--- a/core/metadata/src/permissioner/mod.rs
+++ b/core/metadata/src/permissioner/mod.rs
@@ -31,16 +31,15 @@ pub struct Permissioner {
}
impl Permissioner {
+ #[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn init_permissions_for_user(&mut self, user_id: UserId, permissions:
Option<Permissions>) {
- if permissions.is_none() {
+ let Some(permissions) = permissions else {
return;
- }
-
- let permissions = permissions.unwrap();
+ };
if permissions.global.poll_messages {
self.users_that_can_poll_messages_from_all_streams
.insert(user_id);
@@ -52,11 +51,9 @@ impl Permissioner {
}
self.users_permissions.insert(user_id, permissions.global);
- if permissions.streams.is_none() {
+ let Some(streams) = permissions.streams else {
return;
- }
-
- let streams = permissions.streams.unwrap();
+ };
for (stream_id, stream) in streams {
if stream.poll_messages {
self.users_that_can_poll_messages_from_specific_streams
diff --git
a/core/metadata/src/permissioner/permissioner_rules/consumer_groups.rs
b/core/metadata/src/permissioner/permissioner_rules/consumer_groups.rs
index 66022ae2d..2d4bcdd1b 100644
--- a/core/metadata/src/permissioner/permissioner_rules/consumer_groups.rs
+++ b/core/metadata/src/permissioner/permissioner_rules/consumer_groups.rs
@@ -19,6 +19,7 @@
use crate::permissioner::Permissioner;
use iggy_common::IggyError;
+#[allow(clippy::missing_errors_doc)]
impl Permissioner {
pub fn create_consumer_group(
&self,
diff --git
a/core/metadata/src/permissioner/permissioner_rules/consumer_offsets.rs
b/core/metadata/src/permissioner/permissioner_rules/consumer_offsets.rs
index f94e3fdc0..ab93c1fe4 100644
--- a/core/metadata/src/permissioner/permissioner_rules/consumer_offsets.rs
+++ b/core/metadata/src/permissioner/permissioner_rules/consumer_offsets.rs
@@ -19,6 +19,7 @@
use crate::permissioner::Permissioner;
use iggy_common::IggyError;
+#[allow(clippy::missing_errors_doc)]
impl Permissioner {
pub fn get_consumer_offset(
&self,
diff --git a/core/metadata/src/permissioner/permissioner_rules/messages.rs
b/core/metadata/src/permissioner/permissioner_rules/messages.rs
index ad678d0f2..968d1aed4 100644
--- a/core/metadata/src/permissioner/permissioner_rules/messages.rs
+++ b/core/metadata/src/permissioner/permissioner_rules/messages.rs
@@ -19,8 +19,9 @@
use crate::permissioner::Permissioner;
use iggy_common::IggyError;
+#[allow(clippy::missing_errors_doc)]
impl Permissioner {
- /// Inheritance: manage_streams → read_streams → read_topics →
poll_messages
+ /// Inheritance: `manage_streams` -> `read_streams` -> `read_topics` ->
`poll_messages`
pub fn poll_messages(
&self,
user_id: u32,
@@ -79,7 +80,7 @@ impl Permissioner {
Err(IggyError::Unauthorized)
}
- /// Inheritance: manage_streams → manage_topics → send_messages
+ /// Inheritance: `manage_streams` -> `manage_topics` -> `send_messages`
pub fn append_messages(
&self,
user_id: u32,
diff --git a/core/metadata/src/permissioner/permissioner_rules/partitions.rs
b/core/metadata/src/permissioner/permissioner_rules/partitions.rs
index 7f11007a1..11b780fc7 100644
--- a/core/metadata/src/permissioner/permissioner_rules/partitions.rs
+++ b/core/metadata/src/permissioner/permissioner_rules/partitions.rs
@@ -19,6 +19,7 @@
use crate::permissioner::Permissioner;
use iggy_common::IggyError;
+#[allow(clippy::missing_errors_doc)]
impl Permissioner {
pub fn create_partitions(
&self,
diff --git a/core/metadata/src/permissioner/permissioner_rules/segments.rs
b/core/metadata/src/permissioner/permissioner_rules/segments.rs
index 558e2b0b3..4de5bd2cc 100644
--- a/core/metadata/src/permissioner/permissioner_rules/segments.rs
+++ b/core/metadata/src/permissioner/permissioner_rules/segments.rs
@@ -19,6 +19,7 @@
use crate::permissioner::Permissioner;
use iggy_common::IggyError;
+#[allow(clippy::missing_errors_doc)]
impl Permissioner {
pub fn delete_segments(
&self,
diff --git a/core/metadata/src/permissioner/permissioner_rules/streams.rs
b/core/metadata/src/permissioner/permissioner_rules/streams.rs
index 153d3d4d5..a978d2fdd 100644
--- a/core/metadata/src/permissioner/permissioner_rules/streams.rs
+++ b/core/metadata/src/permissioner/permissioner_rules/streams.rs
@@ -19,6 +19,7 @@
use crate::permissioner::Permissioner;
use iggy_common::IggyError;
+#[allow(clippy::missing_errors_doc)]
impl Permissioner {
pub fn get_stream(&self, user_id: u32, stream_id: usize) -> Result<(),
IggyError> {
if let Some(global_permissions) = self.users_permissions.get(&user_id)
diff --git a/core/metadata/src/permissioner/permissioner_rules/system.rs
b/core/metadata/src/permissioner/permissioner_rules/system.rs
index e2ef0d8c7..e74ced86f 100644
--- a/core/metadata/src/permissioner/permissioner_rules/system.rs
+++ b/core/metadata/src/permissioner/permissioner_rules/system.rs
@@ -19,6 +19,7 @@
use crate::permissioner::Permissioner;
use iggy_common::IggyError;
+#[allow(clippy::missing_errors_doc)]
impl Permissioner {
pub fn get_stats(&self, user_id: u32) -> Result<(), IggyError> {
self.get_server_info(user_id)
diff --git a/core/metadata/src/permissioner/permissioner_rules/topics.rs
b/core/metadata/src/permissioner/permissioner_rules/topics.rs
index 6e93655c3..443478d7a 100644
--- a/core/metadata/src/permissioner/permissioner_rules/topics.rs
+++ b/core/metadata/src/permissioner/permissioner_rules/topics.rs
@@ -19,8 +19,9 @@
use crate::permissioner::Permissioner;
use iggy_common::IggyError;
+#[allow(clippy::missing_errors_doc)]
impl Permissioner {
- /// Inheritance: manage_streams → read_streams → read_topics
+ /// Inheritance: `manage_streams` -> `read_streams` -> `read_topics`
pub fn get_topic(
&self,
user_id: u32,
@@ -79,7 +80,7 @@ impl Permissioner {
Err(IggyError::Unauthorized)
}
- /// Inheritance: manage_streams → manage_topics
+ /// Inheritance: `manage_streams` -> `manage_topics`
pub fn create_topic(&self, user_id: u32, stream_id: usize) -> Result<(),
IggyError> {
if let Some(global) = self.users_permissions.get(&user_id)
&& (global.manage_streams || global.manage_topics)
@@ -123,7 +124,7 @@ impl Permissioner {
self.manage_topic(user_id, stream_id, topic_id)
}
- /// Inheritance: manage_streams → manage_topics
+ /// Inheritance: `manage_streams` -> `manage_topics`
fn manage_topic(
&self,
user_id: u32,
diff --git a/core/metadata/src/permissioner/permissioner_rules/users.rs
b/core/metadata/src/permissioner/permissioner_rules/users.rs
index bba4b84a9..3c831b0a6 100644
--- a/core/metadata/src/permissioner/permissioner_rules/users.rs
+++ b/core/metadata/src/permissioner/permissioner_rules/users.rs
@@ -19,6 +19,7 @@
use crate::permissioner::Permissioner;
use iggy_common::IggyError;
+#[allow(clippy::missing_errors_doc)]
impl Permissioner {
pub fn get_user(&self, user_id: u32) -> Result<(), IggyError> {
self.read_users(user_id)
diff --git a/core/metadata/src/stats/mod.rs b/core/metadata/src/stats/mod.rs
index f4cc3aedc..28dd5c5a4 100644
--- a/core/metadata/src/stats/mod.rs
+++ b/core/metadata/src/stats/mod.rs
@@ -114,7 +114,7 @@ pub struct TopicStats {
}
impl TopicStats {
- pub fn new(parent: Arc<StreamStats>) -> Self {
+ pub const fn new(parent: Arc<StreamStats>) -> Self {
Self {
parent,
size_bytes: AtomicU64::new(0),
@@ -258,7 +258,7 @@ pub struct PartitionStats {
}
impl PartitionStats {
- pub fn new(parent_stats: Arc<TopicStats>) -> Self {
+ pub const fn new(parent_stats: Arc<TopicStats>) -> Self {
Self {
parent: parent_stats,
messages_count: AtomicU64::new(0),
diff --git a/core/metadata/src/stm/consumer_group.rs
b/core/metadata/src/stm/consumer_group.rs
index f9e42178f..2eddcdfaa 100644
--- a/core/metadata/src/stm/consumer_group.rs
+++ b/core/metadata/src/stm/consumer_group.rs
@@ -38,6 +38,7 @@ pub struct ConsumerGroupMember {
}
impl ConsumerGroupMember {
+ #[must_use]
pub fn new(id: usize, client_id: u32) -> Self {
Self {
id,
@@ -57,7 +58,8 @@ pub struct ConsumerGroup {
}
impl ConsumerGroup {
- pub fn new(name: Arc<str>) -> Self {
+ #[must_use]
+ pub const fn new(name: Arc<str>) -> Self {
Self {
id: 0,
name,
@@ -74,16 +76,16 @@ impl ConsumerGroup {
return;
}
- let member_ids: Vec<usize> = self.members.iter().map(|(id, _)|
id).collect();
- for &member_id in &member_ids {
+ let member_keys: Vec<usize> = self.members.iter().map(|(id, _)|
id).collect();
+ for &member_id in &member_keys {
if let Some(member) = self.members.get_mut(member_id) {
member.partitions.clear();
}
}
for (i, &partition_id) in self.partitions.iter().enumerate() {
- let member_idx = i % member_count;
- if let Some(&member_id) = member_ids.get(member_idx)
+ let target = i % member_count;
+ if let Some(&member_id) = member_keys.get(target)
&& let Some(member) = self.members.get_mut(member_id)
{
member.partitions.push(partition_id);
@@ -180,7 +182,7 @@ impl StateHandler for CreateConsumerGroup {
let id = state.items.insert(group);
state.items[id].id = id;
- state.name_index.insert(name.clone(), id);
+ state.name_index.insert(name, id);
if let (Ok(s), Ok(t)) = (
self.stream_id.get_u32_value(),
@@ -328,7 +330,7 @@ impl Snapshotable for ConsumerGroups {
let mut group_entries: Vec<(usize, ConsumerGroup)> = Vec::new();
for (slab_key, group_snap) in snapshot.items {
- let member_entries: Vec<(usize, ConsumerGroupMember)> = group_snap
+ let members: Slab<ConsumerGroupMember> = group_snap
.members
.into_iter()
.map(|(member_key, member_snap)| {
@@ -341,7 +343,6 @@ impl Snapshotable for ConsumerGroups {
(member_key, member)
})
.collect();
- let members: Slab<ConsumerGroupMember> =
member_entries.into_iter().collect();
let group_name: Arc<str> = Arc::from(group_snap.name.as_str());
let group = ConsumerGroup {
diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs
index 82aed80b1..5171207ae 100644
--- a/core/metadata/src/stm/mod.rs
+++ b/core/metadata/src/stm/mod.rs
@@ -23,7 +23,7 @@ pub mod user;
use bytes::Bytes;
use iggy_common::Either;
-use left_right::*;
+use left_right::{Absorb, ReadHandle, WriteHandle};
use std::cell::UnsafeCell;
use std::sync::Arc;
@@ -47,18 +47,21 @@ impl<T, O> WriteCell<T, O>
where
T: Absorb<O>,
{
- pub fn new(write: WriteHandle<T, O>) -> Self {
+ pub const fn new(write: WriteHandle<T, O>) -> Self {
Self {
inner: UnsafeCell::new(write),
}
}
+ /// # Panics
+ /// Panics if the inner `UnsafeCell` pointer is null (should never happen
+ /// unless the writer was not properly initialized).
pub fn apply(&self, cmd: O) {
let hdl = unsafe {
self.inner
.get()
.as_mut()
- .expect("[apply]: called on uninit writer, for cmd: {cmd}")
+ .expect("[apply]: called on uninit writer")
};
hdl.append(cmd).publish();
}
@@ -69,6 +72,7 @@ where
/// - `Ok(Either::Left(cmd))` if applicable
/// - `Ok(Either::Right(input))` to pass ownership back
/// - `Err(error)` for malformed payload/parse errors
+#[allow(clippy::missing_errors_doc)]
pub trait Command {
type Cmd;
type Input;
@@ -99,6 +103,8 @@ impl<T, C> LeftRight<T, C>
where
T: Absorb<C>,
{
+ /// # Panics
+ /// Panics if the read handle has been dropped (should never happen in
normal operation).
pub fn read<F, R>(&self, f: F) -> R
where
F: FnOnce(&T) -> R,
@@ -125,6 +131,8 @@ impl<T> LeftRight<T, <T as Command>::Cmd>
where
T: Absorb<<T as Command>::Cmd> + Clone + Command,
{
+ /// # Panics
+ /// Panics if this is not the owner shard (no write handle available).
pub fn do_apply(&self, cmd: <T as Command>::Cmd) {
self.write
.as_ref()
@@ -134,6 +142,7 @@ where
}
/// Public interface for state handlers.
+#[allow(clippy::missing_errors_doc)]
pub trait State {
type Output;
type Input;
@@ -142,6 +151,7 @@ pub trait State {
fn apply(&self, input: Self::Input) -> Result<Either<Self::Output,
Self::Input>, Self::Error>;
}
+#[allow(clippy::missing_errors_doc)]
pub trait StateMachine {
type Input;
type Output;
@@ -153,7 +163,7 @@ pub trait StateMachine {
///
/// # Generated items
/// - `{$state}Inner` struct with the specified fields (the data)
-/// - `$state` wrapper struct (contains LeftRight storage)
+/// - `$state` wrapper struct (contains `LeftRight` storage)
/// - `From<LeftRight<...>>` impl for `$state`
/// - `From<{$state}Inner>` impl for `$state`
///
diff --git a/core/metadata/src/stm/mux.rs b/core/metadata/src/stm/mux.rs
index 0f6d39dcc..4b6127873 100644
--- a/core/metadata/src/stm/mux.rs
+++ b/core/metadata/src/stm/mux.rs
@@ -35,7 +35,8 @@ impl<T> MuxStateMachine<T>
where
T: StateMachine,
{
- pub fn new(inner: T) -> Self {
+ #[must_use]
+ pub const fn new(inner: T) -> Self {
Self { inner }
}
}
@@ -124,7 +125,7 @@ where
{
fn restore_snapshot(snapshot: &SnapshotData) -> Result<Self,
SnapshotError> {
let inner = T::restore_snapshot(snapshot)?;
- Ok(MuxStateMachine::new(inner))
+ Ok(Self::new(inner))
}
}
@@ -156,6 +157,8 @@ mod tests {
#[test]
fn mux_state_machine_snapshot_roundtrip() {
+ type MuxTuple = (Users, (Streams, (ConsumerGroups, ())));
+
let users: Users = UsersInner::new().into();
let streams: Streams = StreamsInner::new().into();
let consumer_groups: ConsumerGroups =
ConsumerGroupsInner::new().into();
@@ -171,8 +174,6 @@ mod tests {
assert!(snapshot.streams.is_some());
assert!(snapshot.consumer_groups.is_some());
- // Restore and verify
- type MuxTuple = (Users, (Streams, (ConsumerGroups, ())));
let restored: MuxStateMachine<MuxTuple> =
MuxStateMachine::restore_snapshot(&snapshot).unwrap();
@@ -189,11 +190,12 @@ mod tests {
use crate::impls::metadata::IggySnapshot;
use crate::stm::snapshot::Snapshot;
+ type MuxTuple = (Users, (Streams, (ConsumerGroups, ())));
+
let users: Users = UsersInner::new().into();
let streams: Streams = StreamsInner::new().into();
let consumer_groups: ConsumerGroups =
ConsumerGroupsInner::new().into();
- type MuxTuple = (Users, (Streams, (ConsumerGroups, ())));
let mux: MuxStateMachine<MuxTuple> =
MuxStateMachine::new(variadic!(users, streams, consumer_groups));
diff --git a/core/metadata/src/stm/snapshot.rs
b/core/metadata/src/stm/snapshot.rs
index 76210ceb6..6daadc03a 100644
--- a/core/metadata/src/stm/snapshot.rs
+++ b/core/metadata/src/stm/snapshot.rs
@@ -33,8 +33,8 @@ pub enum SnapshotError {
impl fmt::Display for SnapshotError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
- SnapshotError::Serialize(e) => write!(f, "snapshot serialization
failed: {}", e),
- SnapshotError::Deserialize(e) => write!(f, "snapshot
deserialization failed: {}", e),
+ Self::Serialize(e) => write!(f, "snapshot serialization failed:
{e}"),
+ Self::Deserialize(e) => write!(f, "snapshot deserialization
failed: {e}"),
}
}
}
@@ -42,8 +42,8 @@ impl fmt::Display for SnapshotError {
impl std::error::Error for SnapshotError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
- SnapshotError::Serialize(e) => Some(e),
- SnapshotError::Deserialize(e) => Some(e),
+ Self::Serialize(e) => Some(e),
+ Self::Deserialize(e) => Some(e),
}
}
}
@@ -75,6 +75,7 @@ impl Default for MetadataSnapshot {
impl MetadataSnapshot {
/// Create a new snapshot with the given sequence number.
+ #[must_use]
pub fn new(sequence_number: u64) -> Self {
Self {
version: 1,
@@ -87,11 +88,17 @@ impl MetadataSnapshot {
}
/// Encode the snapshot to msgpack bytes.
+ ///
+ /// # Errors
+ /// Returns `SnapshotError::Serialize` if msgpack serialization fails.
pub fn encode(&self) -> Result<Vec<u8>, SnapshotError> {
rmp_serde::to_vec(self).map_err(SnapshotError::Serialize)
}
/// Decode a snapshot from msgpack bytes.
+ ///
+ /// # Errors
+ /// Returns `SnapshotError::Deserialize` if msgpack deserialization fails.
pub fn decode(bytes: &[u8]) -> Result<Self, SnapshotError> {
rmp_serde::from_slice(bytes).map_err(SnapshotError::Deserialize)
}
@@ -101,6 +108,7 @@ impl MetadataSnapshot {
///
/// This is the high-level interface that concrete snapshot types (e.g.
`IggySnapshot`)
/// must satisfy. It provides methods for creating, encoding, and decoding
snapshots.
+#[allow(clippy::missing_errors_doc)]
pub trait Snapshot: Sized {
/// The error type for snapshot operations.
type Error: std::error::Error;
@@ -139,6 +147,7 @@ pub trait Snapshot: Sized {
/// Trait implemented by each `{Name}Inner` state machine to support
snapshotting.
/// Each state machine defines its own snapshot
/// type for serialization and provides conversion methods.
+#[allow(clippy::missing_errors_doc)]
pub trait Snapshotable {
/// The serde-serializable snapshot representation of this state.
/// This should be a plain struct with only serializable types and no
wrappers
@@ -156,7 +165,8 @@ pub trait Snapshotable {
/// Trait for filling a typed snapshot with state machine data.
///
-/// Each state machine implements this to write its serialized state
+/// Each state machine implements this to write its serialized state.
+#[allow(clippy::missing_errors_doc)]
pub trait FillSnapshot<S> {
/// Fill the snapshot with this state machine's data.
fn fill_snapshot(&self, snapshot: &mut S) -> Result<(), SnapshotError>;
@@ -165,6 +175,7 @@ pub trait FillSnapshot<S> {
/// Trait for restoring state machine data from a typed snapshot.
///
/// Each state machine implements this to read its state.
+#[allow(clippy::missing_errors_doc)]
pub trait RestoreSnapshot<S>: Sized {
/// Restore this state machine from the snapshot.
fn restore_snapshot(snapshot: &S) -> Result<Self, SnapshotError>;
@@ -249,7 +260,7 @@ mod tests {
#[test]
fn roundtrip_with_data() {
- let ts = IggyTimestamp::from(1694968446131680u64);
+ let ts = IggyTimestamp::from(1_694_968_446_131_680_u64);
let mut snapshot = MetadataSnapshot::new(100);
snapshot.streams = Some(StreamsSnapshot {
@@ -295,7 +306,7 @@ mod tests {
use crate::stm::user::{PermissionerSnapshot, UserSnapshot,
UsersSnapshot};
use iggy_common::UserStatus;
- let ts = IggyTimestamp::from(1694968446131680u64);
+ let ts = IggyTimestamp::from(1_694_968_446_131_680_u64);
let users_snap = UsersSnapshot {
items: vec![
@@ -373,8 +384,8 @@ mod tests {
let encoded = snapshot.encode().unwrap();
let decoded = MetadataSnapshot::decode(&encoded).unwrap();
- use crate::stm::user::Users;
- let restored_users: Users =
RestoreSnapshot::restore_snapshot(&decoded).unwrap();
+ let restored_users: crate::stm::user::Users =
+ RestoreSnapshot::restore_snapshot(&decoded).unwrap();
let mut verify = MetadataSnapshot::new(0);
restored_users.fill_snapshot(&mut verify).unwrap();
@@ -387,8 +398,8 @@ mod tests {
assert_eq!(users_snap.items[1].1.username, "charlie");
assert_eq!(users_snap.items[1].1.id, 2);
- use crate::stm::stream::Streams;
- let restored_streams: Streams =
RestoreSnapshot::restore_snapshot(&decoded).unwrap();
+ let restored_streams: crate::stm::stream::Streams =
+ RestoreSnapshot::restore_snapshot(&decoded).unwrap();
let mut verify = MetadataSnapshot::new(0);
restored_streams.fill_snapshot(&mut verify).unwrap();
diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs
index 9bc6f4668..df610a6a1 100644
--- a/core/metadata/src/stm/stream.rs
+++ b/core/metadata/src/stm/stream.rs
@@ -51,7 +51,8 @@ pub struct Partition {
}
impl Partition {
- pub fn new(id: usize, created_at: IggyTimestamp) -> Self {
+ #[must_use]
+ pub const fn new(id: usize, created_at: IggyTimestamp) -> Self {
Self { id, created_at }
}
}
@@ -184,6 +185,7 @@ impl Clone for Stream {
}
impl Stream {
+ #[must_use]
pub fn new(name: Arc<str>, created_at: IggyTimestamp) -> Self {
Self {
id: 0,
@@ -195,6 +197,7 @@ impl Stream {
}
}
+ #[must_use]
pub fn with_stats(name: Arc<str>, created_at: IggyTimestamp, stats:
Arc<StreamStats>) -> Self {
Self {
id: 0,
diff --git a/core/metadata/src/stm/user.rs b/core/metadata/src/stm/user.rs
index 8e2c7897e..8e0ec5d8e 100644
--- a/core/metadata/src/stm/user.rs
+++ b/core/metadata/src/stm/user.rs
@@ -64,7 +64,8 @@ impl Default for User {
}
impl User {
- pub fn new(
+ #[must_use]
+ pub const fn new(
username: Arc<str>,
password_hash: Arc<str>,
status: UserStatus,
@@ -126,6 +127,7 @@ impl UsersInner {
// TODO(hubcio): Serialize proper reply (e.g. assigned user ID) instead of
empty Bytes.
impl StateHandler for CreateUser {
type State = UsersInner;
+ #[allow(clippy::cast_possible_truncation)]
fn apply(&self, state: &mut UsersInner) -> Bytes {
let username_arc: Arc<str> = Arc::from(self.username.as_str());
if state.index.contains_key(&username_arc) {
@@ -156,6 +158,7 @@ impl StateHandler for CreateUser {
impl StateHandler for UpdateUser {
type State = UsersInner;
+ #[allow(clippy::cast_possible_truncation)]
fn apply(&self, state: &mut UsersInner) -> Bytes {
let Some(user_id) = state.resolve_user_id(&self.user_id) else {
return Bytes::new();
@@ -187,6 +190,7 @@ impl StateHandler for UpdateUser {
impl StateHandler for DeleteUser {
type State = UsersInner;
+ #[allow(clippy::cast_possible_truncation)]
fn apply(&self, state: &mut UsersInner) -> Bytes {
let Some(user_id) = state.resolve_user_id(&self.user_id) else {
return Bytes::new();
@@ -406,6 +410,7 @@ impl Snapshotable for Users {
})
}
+ #[allow(clippy::cast_possible_truncation)]
fn from_snapshot(
snapshot: Self::Snapshot,
) -> Result<Self, crate::stm::snapshot::SnapshotError> {
diff --git a/core/partitions/Cargo.toml b/core/partitions/Cargo.toml
index c56af72b1..c355ea509 100644
--- a/core/partitions/Cargo.toml
+++ b/core/partitions/Cargo.toml
@@ -40,4 +40,4 @@ tracing = { workspace = true }
[lints.clippy]
enum_glob_use = "deny"
pedantic = "deny"
-nursery = "deny"
+nursery = "warn"
diff --git a/core/shard/Cargo.toml b/core/shard/Cargo.toml
index 5180ba829..387f5f7ef 100644
--- a/core/shard/Cargo.toml
+++ b/core/shard/Cargo.toml
@@ -33,3 +33,8 @@ metadata = { path = "../metadata" }
papaya = { workspace = true }
partitions = { path = "../partitions" }
tracing = { workspace = true }
+
+[lints.clippy]
+enum_glob_use = "deny"
+pedantic = "deny"
+nursery = "warn"
diff --git a/core/shard/src/lib.rs b/core/shard/src/lib.rs
index 2b8b3138b..e41e210a6 100644
--- a/core/shard/src/lib.rs
+++ b/core/shard/src/lib.rs
@@ -44,6 +44,7 @@ pub type Sender<T> =
crossfire::MTx<crossfire::mpsc::Array<T>>;
pub type Receiver<T> = crossfire::AsyncRx<crossfire::mpsc::Array<T>>;
/// Create a bounded mpsc channel with a blocking sender and async receiver.
+#[must_use]
pub fn channel<T: Send + 'static>(capacity: usize) -> (Sender<T>, Receiver<T>)
{
crossfire::mpsc::bounded_blocking_async(capacity)
}
@@ -64,7 +65,8 @@ pub struct ShardFrame<R: Send + 'static = ()> {
impl<R: Send + 'static> ShardFrame<R> {
/// Create a fire-and-forget frame (no caller waiting for completion).
- pub fn fire_and_forget(message: Message<GenericHeader>) -> Self {
+ #[must_use]
+ pub const fn fire_and_forget(message: Message<GenericHeader>) -> Self {
Self {
message,
response_sender: None,
@@ -116,7 +118,8 @@ where
/// * `senders` - one sender per shard in the cluster (indexed by shard
id).
/// * `inbox` - the receiver that this shard drains in its message pump.
/// * `shards_table` - namespace -> shard routing table.
- pub fn new(
+ #[must_use]
+ pub const fn new(
id: u16,
name: String,
metadata: IggyMetadata<VsrConsensus<B>, J, S, M>,
@@ -136,7 +139,8 @@ where
}
}
- pub fn shards_table(&self) -> &T {
+ #[must_use]
+ pub const fn shards_table(&self) -> &T {
&self.shards_table
}
}
@@ -151,6 +155,7 @@ where
///
/// Routes requests, replication messages, and acks to either the metadata
/// plane or the partitions plane based on `PlaneIdentity::is_applicable`.
+ #[allow(clippy::future_not_send)]
pub async fn on_message(&self, message: Message<GenericHeader>)
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client =
u128>,
@@ -173,6 +178,7 @@ where
}
}
+ #[allow(clippy::future_not_send)]
pub async fn on_request(&self, request: Message<RequestHeader>)
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client =
u128>,
@@ -191,6 +197,7 @@ where
self.plane.on_request(request).await;
}
+ #[allow(clippy::future_not_send)]
pub async fn on_replicate(&self, prepare: Message<PrepareHeader>)
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client =
u128>,
@@ -209,6 +216,7 @@ where
self.plane.on_replicate(prepare).await;
}
+ #[allow(clippy::future_not_send)]
pub async fn on_ack(&self, prepare_ok: Message<PrepareOkHeader>)
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client =
u128>,
@@ -235,6 +243,10 @@ where
/// Invariant: planes do not produce loopback messages for each other.
/// `on_ack` commits and applies but never calls `push_loopback`, so
/// draining metadata before partitions is order-independent.
+ ///
+ /// # Panics
+ /// Panics if a loopback message is not a valid `PrepareOk` message.
+ #[allow(clippy::future_not_send)]
pub async fn process_loopback(&self, buf: &mut
Vec<Message<GenericHeader>>) -> usize
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client =
u128>,
diff --git a/core/shard/src/router.rs b/core/shard/src/router.rs
index 8102fef00..0a68b98bf 100644
--- a/core/shard/src/router.rs
+++ b/core/shard/src/router.rs
@@ -41,7 +41,7 @@ where
/// the correct shard's message pump.
///
/// Decomposes the generic message into its typed form (Request, Prepare,
- /// or PrepareOk) to access the operation and namespace, then resolves
+ /// or `PrepareOk`) to access the operation and namespace, then resolves
/// the target shard and enqueues the message via its channel sender.
pub fn dispatch(&self, message: Message<GenericHeader>) {
let (operation, namespace, generic) = match MessageBag::from(message) {
@@ -127,6 +127,7 @@ where
}
/// Drain this shard's inbox and process each frame locally.
+ #[allow(clippy::future_not_send)]
pub async fn run_message_pump(&self, stop: Receiver<()>)
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client =
u128>,
@@ -160,6 +161,7 @@ where
}
}
+ #[allow(clippy::future_not_send)]
async fn process_frame(&self, frame: ShardFrame<R>)
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client =
u128>,
diff --git a/core/shard/src/shards_table.rs b/core/shard/src/shards_table.rs
index 719f52e10..47834a8f6 100644
--- a/core/shard/src/shards_table.rs
+++ b/core/shard/src/shards_table.rs
@@ -22,8 +22,8 @@ use std::hash::Hasher as _;
/// Lookup table that maps partition namespaces to their owning shard.
///
/// Implementations can be:
-/// - A shared concurrent map (DashMap, papaya, etc.) referenced by all shards.
-/// - A per-shard local `HashMap` replica, updated via
+/// - A shared concurrent map (`DashMap`, papaya, etc.) referenced by all
shards.
+/// - A per-shard local `HashMap` replica, updated via a
/// broadcast when partitions are created, deleted, or moved.
pub trait ShardsTable {
/// Returns the shard id that owns `namespace`, or `None` if the
@@ -51,12 +51,14 @@ impl Default for PapayaShardsTable {
}
impl PapayaShardsTable {
+ #[must_use]
pub fn new() -> Self {
Self {
inner: papaya::HashMap::new(),
}
}
+ #[must_use]
pub fn with_capacity(capacity: usize) -> Self {
Self {
inner: papaya::HashMap::with_capacity(capacity),
@@ -85,6 +87,7 @@ impl ShardsTable for PapayaShardsTable {
/// Given a packed `IggyNamespace` and the total number of shards, returns the
/// shard id that should own the partition. The upper bits of the Murmur3 hash
/// are used to avoid the weak lower bits for small integer inputs.
+#[must_use]
pub fn calculate_shard_assignment(ns: &IggyNamespace, shard_count: u32) -> u16
{
let mut hasher = Murmur3Hasher::default();
hasher.write_u64(ns.inner());