This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new bf460c3ae chore(partitions): enable pedantic and nursery clippy lints
(#2876)
bf460c3ae is described below
commit bf460c3aeb66279ec781219a2ef9118a693804d2
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Mar 9 10:23:17 2026 +0100
chore(partitions): enable pedantic and nursery clippy lints (#2876)
Co-authored-by: Grzegorz Koszyk
<[email protected]>
---
core/partitions/Cargo.toml | 5 ++++
core/partitions/src/iggy_partition.rs | 13 ++++----
core/partitions/src/iggy_partitions.rs | 55 ++++++++++++++++++++--------------
core/partitions/src/journal.rs | 4 +--
core/partitions/src/lib.rs | 4 +++
core/partitions/src/log.rs | 22 +++++++-------
core/partitions/src/types.rs | 38 +++++++++++++----------
7 files changed, 81 insertions(+), 60 deletions(-)
diff --git a/core/partitions/Cargo.toml b/core/partitions/Cargo.toml
index d7fadd93b..c56af72b1 100644
--- a/core/partitions/Cargo.toml
+++ b/core/partitions/Cargo.toml
@@ -36,3 +36,8 @@ message_bus = { workspace = true }
ringbuffer = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
+
+[lints.clippy]
+enum_glob_use = "deny"
+pedantic = "deny"
+nursery = "deny"
diff --git a/core/partitions/src/iggy_partition.rs
b/core/partitions/src/iggy_partition.rs
index 47e71dea2..49c820230 100644
--- a/core/partitions/src/iggy_partition.rs
+++ b/core/partitions/src/iggy_partition.rs
@@ -91,24 +91,21 @@ impl Partition for IggyPartition {
let last_dirty_offset = if batch_messages_count == 0 {
dirty_offset
} else {
- dirty_offset + batch_messages_count as u64 - 1
+ dirty_offset + u64::from(batch_messages_count) - 1
};
- if self.should_increment_offset {
- self.dirty_offset
- .store(last_dirty_offset, Ordering::Relaxed);
- } else {
+ if !self.should_increment_offset {
self.should_increment_offset = true;
- self.dirty_offset
- .store(last_dirty_offset, Ordering::Relaxed);
}
+ self.dirty_offset
+ .store(last_dirty_offset, Ordering::Relaxed);
let segment_index = self.log.segments().len() - 1;
self.log.segments_mut()[segment_index].current_position +=
batch_messages_size;
let journal = self.log.journal_mut();
journal.info.messages_count += batch_messages_count;
- journal.info.size += IggyByteSize::from(batch_messages_size as u64);
+ journal.info.size +=
IggyByteSize::from(u64::from(batch_messages_size));
journal.info.current_offset = last_dirty_offset;
if let Some(ts) = batch.first_timestamp()
&& journal.info.first_timestamp == 0
diff --git a/core/partitions/src/iggy_partitions.rs
b/core/partitions/src/iggy_partitions.rs
index 6f94da3bc..c3b1e0bc6 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -19,6 +19,7 @@
use crate::IggyPartition;
use crate::Partition;
+use crate::log::JournalInfo;
use crate::types::PartitionsConfig;
use consensus::PlaneIdentity;
use consensus::{
@@ -49,14 +50,14 @@ use tracing::{debug, warn};
/// This struct manages ALL partitions assigned to a single shard, regardless
/// of which stream/topic they belong to.
///
-/// Note: The partition_id within IggyNamespace may NOT equal the Vec index.
-/// For example, shard 0 might have partition_ids [0, 2, 4] while shard 1
-/// has partition_ids [1, 3, 5]. The `LocalIdx` provides the actual index
+/// Note: The `partition_id` within `IggyNamespace` may NOT equal the Vec
index.
+/// For example, shard 0 might have `partition_ids` [0, 2, 4] while shard 1
+/// has `partition_ids` [1, 3, 5]. The `LocalIdx` provides the actual index
/// into the `partitions` Vec.
pub struct IggyPartitions<C> {
shard_id: ShardId,
config: PartitionsConfig,
- /// Collection of partitions, the index of each partition isn't it's ID,
but rather an local index (LocalIdx) which is used for lookups.
+ /// Collection of partitions, the index of each partition isn't it's ID,
but rather an local index (`LocalIdx`) which is used for lookups.
///
/// Wrapped in `UnsafeCell` for interior mutability — matches the
single-threaded
/// per-shard execution model. Consensus trait methods take `&self` but
need to
@@ -67,6 +68,7 @@ pub struct IggyPartitions<C> {
}
impl<C> IggyPartitions<C> {
+ #[must_use]
pub fn new(shard_id: ShardId, config: PartitionsConfig) -> Self {
Self {
shard_id,
@@ -77,6 +79,7 @@ impl<C> IggyPartitions<C> {
}
}
+ #[must_use]
pub fn with_capacity(shard_id: ShardId, config: PartitionsConfig,
capacity: usize) -> Self {
Self {
shard_id,
@@ -87,7 +90,7 @@ impl<C> IggyPartitions<C> {
}
}
- pub fn config(&self) -> &PartitionsConfig {
+ pub const fn config(&self) -> &PartitionsConfig {
&self.config
}
@@ -103,7 +106,7 @@ impl<C> IggyPartitions<C> {
unsafe { &mut *self.partitions.get() }
}
- pub fn shard_id(&self) -> ShardId {
+ pub const fn shard_id(&self) -> ShardId {
self.shard_id
}
@@ -172,7 +175,7 @@ impl<C> IggyPartitions<C> {
// If we swapped an element, update its index in the map
if idx < partitions.len() {
// Find the namespace that was at the last position (now at idx)
- for (_ns, lidx) in self.namespace_to_local.iter_mut() {
+ for lidx in self.namespace_to_local.values_mut() {
if **lidx == partitions.len() {
*lidx = LocalIdx::new(idx);
break;
@@ -230,7 +233,7 @@ impl<C> IggyPartitions<C> {
&mut self.partitions_mut()[idx]
}
- pub fn consensus(&self) -> Option<&C> {
+ pub const fn consensus(&self) -> Option<&C> {
self.consensus.as_ref()
}
@@ -258,7 +261,7 @@ impl<C> IggyPartitions<C> {
// Create partition with initialized log
let stats = Arc::new(PartitionStats::default());
- let mut partition = IggyPartition::new(stats.clone());
+ let mut partition = IggyPartition::new(stats);
partition.log.add_persisted_segment(segment, storage);
partition.offset.store(start_offset, Ordering::Relaxed);
partition
@@ -276,12 +279,15 @@ impl<C> IggyPartitions<C> {
/// initial segment, and storage. Skips the control plane metadata
broadcasting.
///
/// Corresponds to the "INITIATE PARTITION" phase in the server's flow:
- /// 1. Control plane: create PartitionMeta (SKIPPED in this method)
+ /// 1. Control plane: create `PartitionMeta` (SKIPPED in this method)
/// 2. Control plane: broadcast to shards (SKIPPED in this method)
/// 3. Data plane: INITIATE PARTITION (THIS METHOD)
///
/// Idempotent: subsequent calls for the same namespace are no-ops.
/// Consensus must be set separately via `set_consensus`.
+ ///
+ /// # Panics
+ /// Panics if segment storage creation fails.
pub async fn init_partition(&mut self, namespace: IggyNamespace) ->
LocalIdx {
if let Some(idx) = self.local_idx(&namespace) {
return idx;
@@ -320,7 +326,7 @@ impl<C> IggyPartitions<C> {
// Create partition with initialized log
let stats = Arc::new(PartitionStats::default());
- let mut partition = IggyPartition::new(stats.clone());
+ let mut partition = IggyPartition::new(stats);
partition.log.add_persisted_segment(segment, storage);
partition.offset.store(start_offset, Ordering::Relaxed);
partition
@@ -367,10 +373,10 @@ where
};
let is_old_prepare = fence_old_prepare_by_commit(consensus, header);
- if !is_old_prepare {
- self.replicate(message.clone()).await;
- } else {
+ if is_old_prepare {
warn!("received old prepare, not replicating");
+ } else {
+ self.replicate(message.clone()).await;
}
// TODO: Make those assertions be toggleable through an feature flag,
so they can be used only by simulator/tests.
@@ -386,7 +392,7 @@ where
self.send_prepare_ok(header).await;
if consensus.is_follower() {
- self.commit_journal(&namespace);
+ self.commit_journal(namespace);
}
}
@@ -498,7 +504,7 @@ where
.message_bus()
.send_to_client(prepare_header.client, generic_reply)
.await
- .unwrap()
+ .unwrap();
}
}
}
@@ -523,6 +529,8 @@ impl<B> IggyPartitions<VsrConsensus<B, NamespacedPipeline>>
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
{
+ /// # Panics
+ /// Panics if consensus is not initialized.
pub fn register_namespace_in_pipeline(&self, ns: u64) {
self.consensus()
.expect("register_namespace_in_pipeline: consensus not
initialized")
@@ -540,8 +548,7 @@ where
let indexes_end = 4 + indexes_len;
assert!(
body.len() >= indexes_end,
- "prepare body too small for {} indexes",
- count
+ "prepare body too small for {count} indexes",
);
let indexes =
IggyIndexesMut::from_bytes(PooledBuffer::from(&body[4..indexes_end]), 0);
@@ -626,7 +633,8 @@ where
replicate_to_next_in_chain(consensus, message).await;
}
- fn commit_journal(&self, _namespace: &IggyNamespace) {
+ #[allow(clippy::unused_self, clippy::missing_const_for_fn)]
+ fn commit_journal(&self, _namespace: IggyNamespace) {
// TODO: Implement commit logic for followers.
// Walk through journal from last committed to current commit number
// Apply each entry to the partition state
@@ -665,7 +673,7 @@ where
.increment_size_bytes(journal_info.size.as_bytes_u64());
partition
.stats
- .increment_messages_count(journal_info.messages_count as u64);
+ .increment_messages_count(u64::from(journal_info.messages_count));
// 3. Check flush thresholds.
let is_full = segment.is_full();
@@ -703,7 +711,7 @@ where
let partition = self
.get_mut_by_ns(namespace)
.expect("commit_messages: partition not found");
- partition.log.journal_mut().info = Default::default();
+ partition.log.journal_mut().info = JournalInfo::default();
}
// 4. Advance committed offset (last, so consumers only see offset
after data is durable).
@@ -721,7 +729,10 @@ where
namespace: &IggyNamespace,
frozen_batches: Vec<iggy_common::IggyMessagesBatch>,
) {
- let batch_count: u32 = frozen_batches.iter().map(|b| b.count()).sum();
+ let batch_count: u32 = frozen_batches
+ .iter()
+ .map(iggy_common::IggyMessagesBatch::count)
+ .sum();
if batch_count == 0 {
return;
diff --git a/core/partitions/src/journal.rs b/core/partitions/src/journal.rs
index 1a4169daf..ab281e940 100644
--- a/core/partitions/src/journal.rs
+++ b/core/partitions/src/journal.rs
@@ -31,9 +31,7 @@ impl Storage for Noop {
0
}
- async fn read(&self, _offset: usize, buffer: ()) -> () {
- buffer
- }
+ async fn read(&self, _offset: usize, _buffer: ()) {}
}
/// Lookup key for querying messages from the journal.
diff --git a/core/partitions/src/lib.rs b/core/partitions/src/lib.rs
index d92b078c4..d997d72b7 100644
--- a/core/partitions/src/lib.rs
+++ b/core/partitions/src/lib.rs
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+#![allow(clippy::future_not_send)]
+
mod iggy_partition;
mod iggy_partitions;
mod journal;
@@ -48,6 +50,8 @@ pub trait Partition {
async { Err(IggyError::FeatureUnavailable) }
}
+ /// # Errors
+ /// Returns `IggyError::FeatureUnavailable` by default.
fn store_consumer_offset(
&self,
consumer: PollingConsumer,
diff --git a/core/partitions/src/log.rs b/core/partitions/src/log.rs
index 9a9fd11bb..7d88f60c2 100644
--- a/core/partitions/src/log.rs
+++ b/core/partitions/src/log.rs
@@ -133,23 +133,23 @@ where
S: Storage,
J: Debug + Journal<S>,
{
- pub fn has_segments(&self) -> bool {
+ pub const fn has_segments(&self) -> bool {
!self.segments.is_empty()
}
- pub fn segments(&self) -> &Vec<Segment> {
+ pub const fn segments(&self) -> &Vec<Segment> {
&self.segments
}
- pub fn segments_mut(&mut self) -> &mut Vec<Segment> {
+ pub const fn segments_mut(&mut self) -> &mut Vec<Segment> {
&mut self.segments
}
- pub fn storages_mut(&mut self) -> &mut Vec<SegmentStorage> {
+ pub const fn storages_mut(&mut self) -> &mut Vec<SegmentStorage> {
&mut self.storage
}
- pub fn storages(&self) -> &Vec<SegmentStorage> {
+ pub const fn storages(&self) -> &Vec<SegmentStorage> {
&self.storage
}
@@ -177,11 +177,11 @@ where
.expect("active storage called on empty log")
}
- pub fn indexes(&self) -> &Vec<Option<IggyIndexesMut>> {
+ pub const fn indexes(&self) -> &Vec<Option<IggyIndexesMut>> {
&self.indexes
}
- pub fn indexes_mut(&mut self) -> &mut Vec<Option<IggyIndexesMut>> {
+ pub const fn indexes_mut(&mut self) -> &mut Vec<Option<IggyIndexesMut>> {
&mut self.indexes
}
@@ -230,11 +230,11 @@ where
}
}
- pub fn in_flight(&self) -> &IggyMessagesBatchSetInFlight {
+ pub const fn in_flight(&self) -> &IggyMessagesBatchSetInFlight {
&self.in_flight
}
- pub fn in_flight_mut(&mut self) -> &mut IggyMessagesBatchSetInFlight {
+ pub const fn in_flight_mut(&mut self) -> &mut IggyMessagesBatchSetInFlight
{
&mut self.in_flight
}
@@ -252,11 +252,11 @@ where
S: Storage,
J: Debug + Journal<S>,
{
- pub fn journal_mut(&mut self) -> &mut JournalState<J> {
+ pub const fn journal_mut(&mut self) -> &mut JournalState<J> {
&mut self.journal
}
- pub fn journal(&self) -> &JournalState<J> {
+ pub const fn journal(&self) -> &JournalState<J> {
&self.journal
}
}
diff --git a/core/partitions/src/types.rs b/core/partitions/src/types.rs
index 886cb715e..1a21609a5 100644
--- a/core/partitions/src/types.rs
+++ b/core/partitions/src/types.rs
@@ -26,7 +26,8 @@ pub struct PollingArgs {
}
impl PollingArgs {
- pub fn new(strategy: PollingStrategy, count: u32, auto_commit: bool) ->
Self {
+ #[must_use]
+ pub const fn new(strategy: PollingStrategy, count: u32, auto_commit: bool)
-> Self {
Self {
strategy,
count,
@@ -45,9 +46,9 @@ pub struct SendMessagesResult {
// TODO(hubcio): unify with server's `PollingConsumer` in
`streaming/polling_consumer.rs`.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PollingConsumer {
- /// Regular consumer with (consumer_id, partition_id)
+ /// Regular consumer with (`consumer_id`, `partition_id`)
Consumer(usize, usize),
- /// Consumer group with (group_id, member_id)
+ /// Consumer group with (`group_id`, `member_id`)
ConsumerGroup(usize, usize),
}
@@ -65,7 +66,8 @@ pub struct AppendResult {
}
impl AppendResult {
- pub fn new(start_offset: u64, end_offset: u64, messages_count: u32) ->
Self {
+ #[must_use]
+ pub const fn new(start_offset: u64, end_offset: u64, messages_count: u32)
-> Self {
Self {
start_offset,
end_offset,
@@ -75,7 +77,8 @@ impl AppendResult {
/// Returns the number of offsets in the range.
#[inline]
- pub fn offset_count(&self) -> u64 {
+ #[must_use]
+ pub const fn offset_count(&self) -> u64 {
self.end_offset - self.start_offset + 1
}
}
@@ -114,12 +117,11 @@ pub struct PartitionOffsets {
}
impl PartitionOffsets {
+ #[must_use]
pub fn new(commit_offset: u64, write_offset: u64) -> Self {
debug_assert!(
write_offset >= commit_offset,
- "write_offset ({}) must be >= commit_offset ({})",
- write_offset,
- commit_offset
+ "write_offset ({write_offset}) must be >= commit_offset
({commit_offset})",
);
Self {
commit_offset,
@@ -128,7 +130,8 @@ impl PartitionOffsets {
}
/// Create offsets for an empty partition.
- pub fn empty() -> Self {
+ #[must_use]
+ pub const fn empty() -> Self {
Self {
commit_offset: 0,
write_offset: 0,
@@ -136,17 +139,20 @@ impl PartitionOffsets {
}
/// Returns true if there are uncommitted (prepared) messages.
- pub fn has_uncommitted(&self) -> bool {
+ #[must_use]
+ pub const fn has_uncommitted(&self) -> bool {
self.write_offset > self.commit_offset
}
/// Returns the number of uncommitted messages.
- pub fn uncommitted_count(&self) -> u64 {
+ #[must_use]
+ pub const fn uncommitted_count(&self) -> u64 {
self.write_offset - self.commit_offset
}
/// Returns true if commit and write offsets are equal.
- pub fn is_fully_committed(&self) -> bool {
+ #[must_use]
+ pub const fn is_fully_committed(&self) -> bool {
self.write_offset == self.commit_offset
}
}
@@ -179,6 +185,7 @@ impl PartitionsConfig {
/// TODO: This is a stub waiting for completion of issue to move server
config
/// to shared module. Real implementation should use:
///
`{base_path}/{streams_path}/{stream_id}/{topics_path}/{topic_id}/{partitions_path}/{partition_id}/{start_offset:0>20}.log`
+ #[must_use]
pub fn get_messages_path(
&self,
stream_id: usize,
@@ -187,8 +194,7 @@ impl PartitionsConfig {
start_offset: u64,
) -> String {
format!(
- "/tmp/iggy_stub/streams/{}/topics/{}/partitions/{}/{:0>20}.log",
- stream_id, topic_id, partition_id, start_offset
+
"/tmp/iggy_stub/streams/{stream_id}/topics/{topic_id}/partitions/{partition_id}/{start_offset:0>20}.log",
)
}
@@ -197,6 +203,7 @@ impl PartitionsConfig {
/// TODO: This is a stub waiting for completion of issue to move server
config
/// to shared module. Real implementation should use:
///
`{base_path}/{streams_path}/{stream_id}/{topics_path}/{topic_id}/{partitions_path}/{partition_id}/{start_offset:0>20}.index`
+ #[must_use]
pub fn get_index_path(
&self,
stream_id: usize,
@@ -205,8 +212,7 @@ impl PartitionsConfig {
start_offset: u64,
) -> String {
format!(
- "/tmp/iggy_stub/streams/{}/topics/{}/partitions/{}/{:0>20}.index",
- stream_id, topic_id, partition_id, start_offset
+
"/tmp/iggy_stub/streams/{stream_id}/topics/{topic_id}/partitions/{partition_id}/{start_offset:0>20}.index",
)
}
}