This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 69188127e chore(simulator): enable pedantic and nursery clippy lints
(#2895)
69188127e is described below
commit 69188127e0f95c314b1a8ab149f1b7373361f2ab
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Mar 9 11:18:03 2026 +0100
chore(simulator): enable pedantic and nursery clippy lints (#2895)
---
core/simulator/Cargo.toml | 5 ++++
core/simulator/src/bus.rs | 55 ++++++++++++++++++---------------------
core/simulator/src/client.rs | 30 +++++++++++----------
core/simulator/src/deps.rs | 5 +++-
core/simulator/src/lib.rs | 14 +++++++---
core/simulator/src/main.rs | 13 +++++----
core/simulator/src/network.rs | 12 ++++++---
core/simulator/src/packet.rs | 40 ++++++++++++++++++++--------
core/simulator/src/ready_queue.rs | 14 +++++++---
core/simulator/src/replica.rs | 18 +++++++++----
10 files changed, 128 insertions(+), 78 deletions(-)
diff --git a/core/simulator/Cargo.toml b/core/simulator/Cargo.toml
index 1051c77b5..559f5fae9 100644
--- a/core/simulator/Cargo.toml
+++ b/core/simulator/Cargo.toml
@@ -35,3 +35,8 @@ rand = { workspace = true }
rand_xoshiro = { workspace = true }
shard = { path = "../shard" }
tracing = { workspace = true }
+
+[lints.clippy]
+enum_glob_use = "deny"
+pedantic = "deny"
+nursery = "warn"
diff --git a/core/simulator/src/bus.rs b/core/simulator/src/bus.rs
index b4dc1e52a..371258aef 100644
--- a/core/simulator/src/bus.rs
+++ b/core/simulator/src/bus.rs
@@ -17,7 +17,7 @@
use iggy_common::{IggyError, header::GenericHeader, message::Message};
use message_bus::MessageBus;
-use std::collections::{HashMap, VecDeque};
+use std::collections::{HashSet, VecDeque};
use std::ops::Deref;
use std::sync::{Arc, Mutex};
@@ -36,21 +36,25 @@ pub struct Envelope {
// I think the way we could handle that is by having an dedicated collection
for client responses (clients_table).
#[derive(Debug, Default)]
pub struct MemBus {
- clients: Mutex<HashMap<u128, ()>>,
- replicas: Mutex<HashMap<u8, ()>>,
+ clients: Mutex<HashSet<u128>>,
+ replicas: Mutex<HashSet<u8>>,
pending_messages: Mutex<VecDeque<Envelope>>,
}
impl MemBus {
+ #[must_use]
pub fn new() -> Self {
Self {
- clients: Mutex::new(HashMap::new()),
- replicas: Mutex::new(HashMap::new()),
+ clients: Mutex::new(HashSet::new()),
+ replicas: Mutex::new(HashSet::new()),
pending_messages: Mutex::new(VecDeque::new()),
}
}
/// Get the next pending message from the bus
+ ///
+ /// # Panics
+ /// Panics if the internal mutex is poisoned.
pub fn receive(&self) -> Option<Envelope> {
self.pending_messages.lock().unwrap().pop_front()
}
@@ -63,27 +67,27 @@ impl MessageBus for MemBus {
type Sender = ();
fn add_client(&mut self, client: Self::Client, _sender: Self::Sender) ->
bool {
- if self.clients.lock().unwrap().contains_key(&client) {
+ if self.clients.lock().unwrap().contains(&client) {
return false;
}
- self.clients.lock().unwrap().insert(client, ());
+ self.clients.lock().unwrap().insert(client);
true
}
fn remove_client(&mut self, client: Self::Client) -> bool {
- self.clients.lock().unwrap().remove(&client).is_some()
+ self.clients.lock().unwrap().remove(&client)
}
fn add_replica(&mut self, replica: Self::Replica) -> bool {
- if self.replicas.lock().unwrap().contains_key(&replica) {
+ if self.replicas.lock().unwrap().contains(&replica) {
return false;
}
- self.replicas.lock().unwrap().insert(replica, ());
+ self.replicas.lock().unwrap().insert(replica);
true
}
fn remove_replica(&mut self, replica: Self::Replica) -> bool {
- self.replicas.lock().unwrap().remove(&replica).is_some()
+ self.replicas.lock().unwrap().remove(&replica)
}
async fn send_to_client(
@@ -91,7 +95,8 @@ impl MessageBus for MemBus {
client_id: Self::Client,
message: Self::Data,
) -> Result<(), IggyError> {
- if !self.clients.lock().unwrap().contains_key(&client_id) {
+ if !self.clients.lock().unwrap().contains(&client_id) {
+ #[allow(clippy::cast_possible_truncation)]
return Err(IggyError::ClientNotFound(client_id as u32));
}
@@ -110,8 +115,8 @@ impl MessageBus for MemBus {
replica: Self::Replica,
message: Self::Data,
) -> Result<(), IggyError> {
- if !self.replicas.lock().unwrap().contains_key(&replica) {
- return Err(IggyError::ResourceNotFound(format!("Replica {}",
replica)));
+ if !self.replicas.lock().unwrap().contains(&replica) {
+ return Err(IggyError::ResourceNotFound(format!("Replica
{replica}")));
}
self.pending_messages.lock().unwrap().push_back(Envelope {
@@ -125,7 +130,7 @@ impl MessageBus for MemBus {
}
}
-/// Newtype wrapper for shared MemBus that implements MessageBus
+/// Newtype wrapper for shared [`MemBus`] that implements [`MessageBus`]
#[derive(Debug, Clone)]
pub struct SharedMemBus(pub Arc<MemBus>);
@@ -142,30 +147,20 @@ impl MessageBus for SharedMemBus {
type Data = Message<GenericHeader>;
type Sender = ();
- fn add_client(&mut self, client: Self::Client, sender: Self::Sender) ->
bool {
- self.0
- .clients
- .lock()
- .unwrap()
- .insert(client, sender)
- .is_none()
+ fn add_client(&mut self, client: Self::Client, _sender: Self::Sender) ->
bool {
+ self.0.clients.lock().unwrap().insert(client)
}
fn remove_client(&mut self, client: Self::Client) -> bool {
- self.0.clients.lock().unwrap().remove(&client).is_some()
+ self.0.clients.lock().unwrap().remove(&client)
}
fn add_replica(&mut self, replica: Self::Replica) -> bool {
- self.0
- .replicas
- .lock()
- .unwrap()
- .insert(replica, ())
- .is_none()
+ self.0.replicas.lock().unwrap().insert(replica)
}
fn remove_replica(&mut self, replica: Self::Replica) -> bool {
- self.0.replicas.lock().unwrap().remove(&replica).is_some()
+ self.0.replicas.lock().unwrap().remove(&replica)
}
async fn send_to_client(
diff --git a/core/simulator/src/client.rs b/core/simulator/src/client.rs
index e254cb022..7b55af548 100644
--- a/core/simulator/src/client.rs
+++ b/core/simulator/src/client.rs
@@ -32,7 +32,8 @@ pub struct SimClient {
}
impl SimClient {
- pub fn new(client_id: u128) -> Self {
+ #[must_use]
+ pub const fn new(client_id: u128) -> Self {
Self {
client_id,
request_counter: Cell::new(0),
@@ -51,22 +52,25 @@ impl SimClient {
};
let payload = create_stream.to_bytes();
- self.build_request(Operation::CreateStream, payload)
+ self.build_request(Operation::CreateStream, &payload)
}
+ /// # Panics
+ /// Panics if the stream name cannot be converted to an `Identifier`.
pub fn delete_stream(&self, name: &str) -> Message<RequestHeader> {
let delete_stream = DeleteStream {
stream_id: Identifier::named(name).unwrap(),
};
let payload = delete_stream.to_bytes();
- self.build_request(Operation::DeleteStream, payload)
+ self.build_request(Operation::DeleteStream, &payload)
}
+ #[allow(clippy::cast_possible_truncation)]
pub fn send_messages(
&self,
namespace: IggyNamespace,
- messages: Vec<&[u8]>,
+ messages: &[&[u8]],
) -> Message<RequestHeader> {
// Build batch: count | indexes | messages
let count = messages.len() as u32;
@@ -74,7 +78,7 @@ impl SimClient {
let mut messages_buf = Vec::new();
let mut current_position = 0u32;
- for msg in &messages {
+ for msg in messages {
// Write index: position (u32) + length (u32)
indexes.extend_from_slice(¤t_position.to_le_bytes());
indexes.extend_from_slice(&(msg.len() as u32).to_le_bytes());
@@ -90,17 +94,14 @@ impl SimClient {
payload.extend_from_slice(&indexes);
payload.extend_from_slice(&messages_buf);
- self.build_request_with_namespace(
- Operation::SendMessages,
- bytes::Bytes::from(payload),
- namespace,
- )
+ self.build_request_with_namespace(Operation::SendMessages, &payload,
namespace)
}
+ #[allow(clippy::cast_possible_truncation)]
fn build_request_with_namespace(
&self,
operation: Operation,
- payload: bytes::Bytes,
+ payload: &[u8],
namespace: IggyNamespace,
) -> Message<RequestHeader> {
use bytes::Bytes;
@@ -130,13 +131,14 @@ impl SimClient {
let header_bytes = bytemuck::bytes_of(&header);
let mut buffer = Vec::with_capacity(total_size);
buffer.extend_from_slice(header_bytes);
- buffer.extend_from_slice(&payload);
+ buffer.extend_from_slice(payload);
Message::<RequestHeader>::from_bytes(Bytes::from(buffer))
.expect("failed to build request message")
}
- fn build_request(&self, operation: Operation, payload: bytes::Bytes) ->
Message<RequestHeader> {
+ #[allow(clippy::cast_possible_truncation)]
+ fn build_request(&self, operation: Operation, payload: &[u8]) ->
Message<RequestHeader> {
use bytes::Bytes;
let header_size = std::mem::size_of::<RequestHeader>();
@@ -163,7 +165,7 @@ impl SimClient {
let header_bytes = bytemuck::bytes_of(&header);
let mut buffer = Vec::with_capacity(total_size);
buffer.extend_from_slice(header_bytes);
- buffer.extend_from_slice(&payload);
+ buffer.extend_from_slice(payload);
Message::<RequestHeader>::from_bytes(Bytes::from(buffer))
.expect("failed to build request message")
diff --git a/core/simulator/src/deps.rs b/core/simulator/src/deps.rs
index 622704820..12e78bb05 100644
--- a/core/simulator/src/deps.rs
+++ b/core/simulator/src/deps.rs
@@ -34,6 +34,7 @@ pub struct MemStorage {
offset: Cell<u64>,
}
+#[allow(clippy::future_not_send)]
impl Storage for MemStorage {
type Buffer = Vec<u8>;
@@ -74,6 +75,7 @@ impl<S: Storage + Default> Default for SimJournal<S> {
}
}
+#[allow(clippy::missing_fields_in_debug)]
impl<S: Storage> std::fmt::Debug for SimJournal<S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SimJournal")
@@ -85,6 +87,7 @@ impl<S: Storage> std::fmt::Debug for SimJournal<S> {
}
}
+#[allow(clippy::future_not_send)]
impl<S: Storage<Buffer = Vec<u8>>> Journal<S> for SimJournal<S> {
type Header = PrepareHeader;
type Entry = Message<PrepareHeader>;
@@ -137,7 +140,7 @@ impl<S: Storage<Buffer = Vec<u8>>> Journal<S> for
SimJournal<S> {
impl JournalHandle for SimJournal<MemStorage> {
type Storage = MemStorage;
- type Target = SimJournal<MemStorage>;
+ type Target = Self;
fn handle(&self) -> &Self::Target {
self
diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs
index 260b76488..f3cf63ea3 100644
--- a/core/simulator/src/lib.rs
+++ b/core/simulator/src/lib.rs
@@ -43,6 +43,7 @@ impl Simulator {
}
}
+ #[allow(clippy::cast_possible_truncation)]
pub fn new(replica_count: usize, clients: impl Iterator<Item = u128>) ->
Self {
let mut message_bus = MemBus::new();
for client in clients {
@@ -58,8 +59,8 @@ impl Simulator {
.map(|i| {
new_replica(
i as u8,
- format!("replica-{}", i),
- Arc::clone(&message_bus),
+ format!("replica-{i}"),
+ &message_bus,
replica_count as u8,
)
})
@@ -71,6 +72,7 @@ impl Simulator {
}
}
+ #[allow(clippy::cast_possible_truncation)]
pub fn with_message_bus(replica_count: usize, mut message_bus: MemBus) ->
Self {
for i in 0..replica_count as u8 {
message_bus.add_replica(i);
@@ -81,8 +83,8 @@ impl Simulator {
.map(|i| {
new_replica(
i as u8,
- format!("replica-{}", i),
- Arc::clone(&message_bus),
+ format!("replica-{i}"),
+ &message_bus,
replica_count as u8,
)
})
@@ -96,6 +98,9 @@ impl Simulator {
}
impl Simulator {
+ /// # Panics
+ /// Panics if a client response message has an invalid command type.
+ #[allow(clippy::future_not_send)]
pub async fn step(&self) -> Option<Message<ReplyHeader>> {
if let Some(envelope) = self.message_bus.receive() {
if let Some(_client_id) = envelope.to_client {
@@ -116,6 +121,7 @@ impl Simulator {
None
}
+ #[allow(clippy::future_not_send)]
async fn dispatch_to_replica(
&self,
replica: &Replica,
diff --git a/core/simulator/src/main.rs b/core/simulator/src/main.rs
index 8d519cdda..de2288273 100644
--- a/core/simulator/src/main.rs
+++ b/core/simulator/src/main.rs
@@ -49,7 +49,7 @@ fn main() {
let test_namespace = IggyNamespace::new(1, 1, 0);
// Initialize partition on all replicas
- println!("[sim] Initializing test partition: {:?}", test_namespace);
+ println!("[sim] Initializing test partition: {test_namespace:?}");
sim.init_partition(test_namespace);
// Responses queue
@@ -70,13 +70,14 @@ fn main() {
b"Message 3".as_slice(),
];
- let send_msg = client.send_messages(test_namespace, test_messages);
+ let send_msg = client.send_messages(test_namespace,
&test_messages);
bus.send_to_replica(leader, send_msg.into_generic())
.await
.expect("failed to send messages");
loop {
- if let Some(reply) = responses_clone.lock().unwrap().pop() {
+ let reply = responses_clone.lock().unwrap().pop();
+ if let Some(reply) = reply {
println!("[client] Got send_messages reply: {:?}",
reply.header());
break;
}
@@ -90,7 +91,8 @@ fn main() {
.expect("failed to send create_stream");
loop {
- if let Some(reply) = responses_clone.lock().unwrap().pop() {
+ let reply = responses_clone.lock().unwrap().pop();
+ if let Some(reply) = reply {
println!("[client] Got create_stream reply: {:?}",
reply.header());
break;
}
@@ -103,7 +105,8 @@ fn main() {
.expect("failed to send delete_stream");
loop {
- if let Some(reply) = responses_clone.lock().unwrap().pop() {
+ let reply = responses_clone.lock().unwrap().pop();
+ if let Some(reply) = reply {
println!("[client] Got delete_stream reply: {:?}",
reply.header());
break;
}
diff --git a/core/simulator/src/network.rs b/core/simulator/src/network.rs
index 91cbed788..d5b7b7b61 100644
--- a/core/simulator/src/network.rs
+++ b/core/simulator/src/network.rs
@@ -18,7 +18,7 @@
//! Network abstraction layer for the cluster simulator.
//!
//! **Note:** Currently a thin passthrough over `PacketSimulator`. Once the
-//! Cluster and MessageBus layers are built, this will own
+//! Cluster and [`MessageBus`] layers are built, this will own
//! process-to-bus routing, and node enable/disable logic.
use crate::packet::{
@@ -40,6 +40,7 @@ pub struct Network {
impl Network {
/// Create a new network.
+ #[must_use]
pub fn new(options: PacketSimulatorOptions) -> Self {
Self {
simulator: PacketSimulator::new(options),
@@ -50,8 +51,8 @@ impl Network {
///
/// The message will be queued with a simulated delay and may be:
/// - Delivered normally after the delay
- /// - Dropped (based on packet_loss_probability)
- /// - Replayed/duplicated (based on replay_probability)
+ /// - Dropped (based on `packet_loss_probability`)
+ /// - Replayed/duplicated (based on `replay_probability`)
pub fn submit(&mut self, from: ProcessId, to: ProcessId, message:
Message<GenericHeader>) {
self.simulator.submit(from, to, message);
}
@@ -80,7 +81,8 @@ impl Network {
}
/// Get the current network tick.
- pub fn current_tick(&self) -> u64 {
+ #[must_use]
+ pub const fn current_tick(&self) -> u64 {
self.simulator.current_tick()
}
@@ -105,6 +107,7 @@ impl Network {
}
/// Check whether a specific link is enabled (filter is not empty).
+ #[must_use]
pub fn is_link_enabled(&self, from: ProcessId, to: ProcessId) -> bool {
self.simulator.is_link_enabled(from, to)
}
@@ -144,6 +147,7 @@ impl Network {
}
/// Get the number of packets currently in flight.
+ #[must_use]
pub fn packets_in_flight(&self) -> usize {
self.simulator.packets_in_flight()
}
diff --git a/core/simulator/src/packet.rs b/core/simulator/src/packet.rs
index 3981ad7bd..59317adcc 100644
--- a/core/simulator/src/packet.rs
+++ b/core/simulator/src/packet.rs
@@ -142,11 +142,11 @@ impl Default for PacketSimulatorOptions {
}
}
-/// Per-path link: holds packets in a ReadyQueue sorted by ready_at.
+/// Per-path link: holds packets in a [`ReadyQueue`] sorted by `ready_at`.
struct Link {
- /// Packets waiting to be delivered, ordered by ready_at (min-heap).
+ /// Packets waiting to be delivered, ordered by `ready_at` (min-heap).
packets: ReadyQueue<Packet>,
- /// Tick until which this link is clogged. Clogged when clogged_till >
current_tick.
+ /// Tick until which this link is clogged. Clogged when `clogged_till` >
`current_tick`.
clogged_till: u64,
/// Per-command filter controlling which commands pass through this link.
/// [`ALLOW_ALL`] = fully enabled (default), [`BLOCK_ALL`] = fully
disabled.
@@ -156,6 +156,7 @@ struct Link {
drop_packet_fn: Option<fn(&Packet) -> bool>,
}
+#[allow(clippy::missing_fields_in_debug)]
impl std::fmt::Debug for Link {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Link")
@@ -206,11 +207,11 @@ pub enum PartitionSymmetry {
/// The packet simulator manages a matrix of links between processes.
pub struct PacketSimulator {
options: PacketSimulatorOptions,
- /// Flat array of links. Index = from_idx * max_processes + to_idx.
+ /// Flat array of links. Index = `from_idx` * `max_processes` + `to_idx`.
links: Vec<Link>,
/// Maximum number of processes (determines link array size).
max_processes: usize,
- /// Mapping from ProcessId to flat index.
+ /// Mapping from [`ProcessId`] to flat index.
process_indices: HashMap<ProcessId, usize>,
/// Next flat index to assign to a newly registered client.
/// Initialized to `replica_count` and incremented on each new
registration.
@@ -225,7 +226,7 @@ pub struct PacketSimulator {
auto_partition: Vec<bool>,
/// Countdown timer for partition/unpartition stability.
auto_partition_stability: u32,
- /// Scratch buffer for Fisher-Yates shuffle in UniformSize partition mode.
+ /// Scratch buffer for Fisher-Yates shuffle in [`UniformSize`] partition
mode.
auto_partition_nodes: Vec<usize>,
/// Reusable buffer for delivered packets.
delivered: Vec<Packet>,
@@ -233,11 +234,16 @@ pub struct PacketSimulator {
impl PacketSimulator {
/// Create a new packet simulator.
+ ///
+ /// # Panics
+ /// Panics if `node_count` is 0, or if delay/probability parameters are
invalid.
+ #[must_use]
+ #[allow(clippy::cast_possible_truncation)]
pub fn new(options: PacketSimulatorOptions) -> Self {
let node_count = options.node_count as usize;
let client_count = options.client_count as usize;
- assert!(node_count > 0, "node_count must be > 0, got {}", node_count);
+ assert!(node_count > 0, "node_count must be > 0, got {node_count}");
assert!(
options.one_way_delay_min >= 1,
"one_way_delay_min must be >= 1 (got {}), zero causes unbounded
replay loops",
@@ -310,6 +316,9 @@ impl PacketSimulator {
/// Register a client process. Returns its flat index.
/// Re-registering a client with the same ID returns the same index.
+ ///
+ /// # Panics
+ /// Panics if the maximum number of processes has been reached.
pub fn register_client(&mut self, client_id: u128) -> usize {
if let Some(&idx) =
self.process_indices.get(&ProcessId::Client(client_id)) {
return idx;
@@ -327,7 +336,7 @@ impl PacketSimulator {
}
/// Resolve a `ProcessId` to its flat index.
- /// Fast path for replicas (direct arithmetic), HashMap fallback for
clients.
+ /// Fast path for replicas (direct arithmetic), `HashMap` fallback for
clients.
fn process_index(&self, id: ProcessId) -> Option<usize> {
match id {
ProcessId::Replica(i) => {
@@ -391,6 +400,11 @@ impl PacketSimulator {
/// Generate an exponentially distributed random value with the given mean.
/// Uses inverse CDF: -mean * ln(U) where U ~ Uniform(0,1).
+ #[allow(
+ clippy::cast_precision_loss,
+ clippy::cast_sign_loss,
+ clippy::cast_possible_truncation
+ )]
fn random_exponential(prng: &mut Xoshiro256Plus, mean: u64) -> u64 {
let u: f64 = prng.random::<f64>();
if u > 0.0 {
@@ -402,7 +416,7 @@ impl PacketSimulator {
}
/// Number of registered processes.
- fn process_count(&self) -> usize {
+ const fn process_count(&self) -> usize {
self.next_index
}
@@ -417,6 +431,7 @@ impl PacketSimulator {
}
/// Check whether a link is currently enabled (filter is not empty).
+ #[must_use]
pub fn is_link_enabled(&self, from: ProcessId, to: ProcessId) -> bool {
let idx = self.link_index(from, to);
!self.links[idx].filter.is_empty()
@@ -656,7 +671,8 @@ impl PacketSimulator {
}
/// Get the current tick.
- pub fn current_tick(&self) -> u64 {
+ #[must_use]
+ pub const fn current_tick(&self) -> u64 {
self.current_tick
}
@@ -695,11 +711,13 @@ impl PacketSimulator {
}
/// Get the number of packets in flight across all links.
+ #[must_use]
pub fn packets_in_flight(&self) -> usize {
self.links.iter().map(|l| l.packets.len()).sum()
}
}
+#[allow(clippy::missing_fields_in_debug)]
impl std::fmt::Debug for PacketSimulator {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PacketSimulator")
@@ -830,7 +848,7 @@ mod tests {
sim.clog(ProcessId::Replica(0), ProcessId::Replica(1));
// Submit a packet
- sim.submit(ProcessId::Replica(0), ProcessId::Replica(1), msg.clone());
+ sim.submit(ProcessId::Replica(0), ProcessId::Replica(1), msg);
for _ in 0..20 {
sim.tick();
}
diff --git a/core/simulator/src/ready_queue.rs
b/core/simulator/src/ready_queue.rs
index 0bd91c156..2bb79107b 100644
--- a/core/simulator/src/ready_queue.rs
+++ b/core/simulator/src/ready_queue.rs
@@ -42,10 +42,12 @@ impl<T: Ready> Default for ReadyQueue<T> {
}
impl<T: Ready> ReadyQueue<T> {
- pub fn new() -> Self {
+ #[must_use]
+ pub const fn new() -> Self {
Self { items: Vec::new() }
}
+ #[must_use]
pub fn with_capacity(capacity: usize) -> Self {
Self {
items: Vec::with_capacity(capacity),
@@ -60,12 +62,13 @@ impl<T: Ready> ReadyQueue<T> {
}
/// Peek at the item with the smallest `ready_at`.
+ #[must_use]
pub fn peek(&self) -> Option<&T> {
self.items.first()
}
/// Reset the queue, removing all items but retaining the allocation.
- /// Matches TigerBeetle's `reset()` which sets `items.len = 0`.
+ /// Matches `TigerBeetle`'s `reset()` which sets `items.len = 0`.
pub fn clear(&mut self) {
self.items.clear();
}
@@ -120,16 +123,19 @@ impl<T: Ready> ReadyQueue<T> {
}
/// Number of items in the queue.
- pub fn len(&self) -> usize {
+ #[must_use]
+ pub const fn len(&self) -> usize {
self.items.len()
}
/// Whether the queue is empty.
- pub fn is_empty(&self) -> bool {
+ #[must_use]
+ pub const fn is_empty(&self) -> bool {
self.items.is_empty()
}
/// Access all items in unspecified order.
+ #[must_use]
pub fn as_slice(&self) -> &[T] {
&self.items
}
diff --git a/core/simulator/src/replica.rs b/core/simulator/src/replica.rs
index ddd423f11..5711da0ce 100644
--- a/core/simulator/src/replica.rs
+++ b/core/simulator/src/replica.rs
@@ -36,7 +36,7 @@ const CLUSTER_ID: u128 = 1;
pub type Replica =
shard::IggyShard<SharedMemBus, SimJournal<MemStorage>, SimSnapshot,
SimMuxStateMachine>;
-pub fn new_replica(id: u8, name: String, bus: Arc<MemBus>, replica_count: u8)
-> Replica {
+pub fn new_replica(id: u8, name: String, bus: &Arc<MemBus>, replica_count: u8)
-> Replica {
let users: Users = UsersInner::new().into();
let streams: Streams = StreamsInner::new().into();
let consumer_groups: ConsumerGroups = ConsumerGroupsInner::new().into();
@@ -48,7 +48,7 @@ pub fn new_replica(id: u8, name: String, bus: Arc<MemBus>,
replica_count: u8) ->
id,
replica_count,
0,
- SharedMemBus(Arc::clone(&bus)),
+ SharedMemBus(Arc::clone(bus)),
LocalPipeline::new(),
);
metadata_consensus.init();
@@ -67,7 +67,7 @@ pub fn new_replica(id: u8, name: String, bus: Arc<MemBus>,
replica_count: u8) ->
segment_size: IggyByteSize::from(1024 * 1024 * 1024),
};
- let mut partitions = IggyPartitions::new(ShardId::new(id as u16),
partitions_config);
+ let mut partitions = IggyPartitions::new(ShardId::new(u16::from(id)),
partitions_config);
// TODO: namespace=0 collides with metadata consensus. Safe for now
because the simulator
// routes by Operation type, but a shared view change bus would produce
namespace collisions.
@@ -76,7 +76,7 @@ pub fn new_replica(id: u8, name: String, bus: Arc<MemBus>,
replica_count: u8) ->
id,
replica_count,
0,
- SharedMemBus(Arc::clone(&bus)),
+ SharedMemBus(Arc::clone(bus)),
NamespacedPipeline::new(),
);
partition_consensus.init();
@@ -86,5 +86,13 @@ pub fn new_replica(id: u8, name: String, bus: Arc<MemBus>,
replica_count: u8) ->
// but this is not possible with crossfire without mangling types due to
Flavor trait in crossfire.
// This needs to be revisited in the future.
let (_tx, inbox) = shard::channel(1024);
- shard::IggyShard::new(id as u16, name, metadata, partitions, Vec::new(),
inbox, ())
+ shard::IggyShard::new(
+ u16::from(id),
+ name,
+ metadata,
+ partitions,
+ Vec::new(),
+ inbox,
+ (),
+ )
}