This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 870e738a chore(bench): fix clippy pedantic and nursery lints in bench 
(#1843)
870e738a is described below

commit 870e738a2723196b887d6b2107546edc5c22351a
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Sat Jun 7 16:13:46 2025 +0200

    chore(bench): fix clippy pedantic and nursery lints in bench (#1843)
---
 core/bench/Cargo.toml                              |   5 +
 core/bench/src/actors/consumer.rs                  | 218 ++++++++++------
 core/bench/src/actors/producer.rs                  |  11 +-
 core/bench/src/actors/producing_consumer.rs        | 276 +++++++++++++--------
 core/bench/src/analytics/metrics/group.rs          | 220 +++++++++-------
 core/bench/src/analytics/metrics/individual.rs     | 258 ++++++++++++-------
 core/bench/src/analytics/record.rs                 |   2 +-
 core/bench/src/analytics/report_builder.rs         |  10 +-
 core/bench/src/analytics/time_series/calculator.rs |  24 +-
 .../analytics/time_series/calculators/latency.rs   |   3 +
 .../time_series/calculators/throughput.rs          |   6 +-
 .../time_series/processors/moving_average.rs       |   4 +-
 core/bench/src/args/common.rs                      |  75 +++---
 core/bench/src/args/defaults.rs                    |   2 +-
 core/bench/src/args/examples.rs                    |   2 +-
 core/bench/src/args/kind.rs                        |  42 ++--
 .../src/args/kinds/balanced/consumer_group.rs      |  10 +-
 core/bench/src/args/kinds/balanced/producer.rs     |   7 +-
 .../kinds/balanced/producer_and_consumer_group.rs  |   8 +-
 .../args/kinds/end_to_end/producing_consumer.rs    |   4 +-
 .../kinds/end_to_end/producing_consumer_group.rs   |  11 +-
 core/bench/src/args/kinds/pinned/consumer.rs       |   2 +-
 core/bench/src/args/kinds/pinned/producer.rs       |   2 +-
 .../src/args/kinds/pinned/producer_and_consumer.rs |   7 +-
 core/bench/src/args/props.rs                       |   2 +-
 core/bench/src/args/transport.rs                   |  31 +--
 .../src/benchmarks/balanced_consumer_group.rs      |  13 +-
 core/bench/src/benchmarks/balanced_producer.rs     |  18 +-
 .../balanced_producer_and_consumer_group.rs        |  23 +-
 core/bench/src/benchmarks/benchmark.rs             |  82 +++---
 core/bench/src/benchmarks/common.rs                |  70 +++---
 .../benchmarks/end_to_end_producing_consumer.rs    |  18 +-
 .../end_to_end_producing_consumer_group.rs         |  10 +-
 core/bench/src/benchmarks/pinned_consumer.rs       |  10 +-
 core/bench/src/benchmarks/pinned_producer.rs       |  18 +-
 .../src/benchmarks/pinned_producer_and_consumer.rs |  20 +-
 core/bench/src/main.rs                             |   4 +-
 core/bench/src/plot.rs                             |  24 +-
 core/bench/src/runner.rs                           |  12 +-
 core/bench/src/utils/cpu_name.rs                   |   6 +-
 core/bench/src/utils/finish_condition.rs           |  60 ++---
 core/bench/src/utils/mod.rs                        | 106 +++++---
 core/bench/src/utils/rate_limiter.rs               |   6 +-
 core/bench/src/utils/server_starter.rs             | 105 ++++----
 44 files changed, 1088 insertions(+), 759 deletions(-)

diff --git a/core/bench/Cargo.toml b/core/bench/Cargo.toml
index 3b2c6c64..ab5ec18d 100644
--- a/core/bench/Cargo.toml
+++ b/core/bench/Cargo.toml
@@ -50,3 +50,8 @@ uuid = { workspace = true }
 [[bin]]
 name = "iggy-bench"
 path = "src/main.rs"
+
+[lints.clippy]
+enum_glob_use = "deny"
+pedantic = "deny"
+nursery = "deny"
diff --git a/core/bench/src/actors/consumer.rs 
b/core/bench/src/actors/consumer.rs
index a00a7e80..8ea71282 100644
--- a/core/bench/src/actors/consumer.rs
+++ b/core/bench/src/actors/consumer.rs
@@ -85,6 +85,44 @@ impl BenchmarkConsumer {
     }
 
     pub async fn run(self) -> Result<BenchmarkIndividualMetrics, IggyError> {
+        let (client, stream_id, topic_id, partition_id, consumer, 
rate_limiter) =
+            self.setup_client().await;
+
+        if self.warmup_time.get_duration() != Duration::from_millis(0) {
+            self.run_warmup(
+                &client,
+                &stream_id,
+                &topic_id,
+                partition_id,
+                &consumer,
+                rate_limiter.as_ref(),
+            )
+            .await?;
+        }
+
+        let metrics = self
+            .run_benchmark(
+                &client,
+                &stream_id,
+                &topic_id,
+                partition_id,
+                &consumer,
+                rate_limiter.as_ref(),
+            )
+            .await?;
+        Ok(metrics)
+    }
+
+    async fn setup_client(
+        &self,
+    ) -> (
+        IggyClient,
+        Identifier,
+        Identifier,
+        Option<u32>,
+        Consumer,
+        Option<BenchmarkRateLimiter>,
+    ) {
         let topic_id: u32 = 1;
         let default_partition_id: u32 = 1;
         let client = self.client_factory.create_client().await;
@@ -99,77 +137,103 @@ impl BenchmarkConsumer {
             Some(default_partition_id)
         };
         let cg_id = self.consumer_group_id;
-        let consumer =
-            create_consumer(&client, &cg_id, &stream_id, &topic_id, 
self.consumer_id).await;
-
+        let consumer = create_consumer(
+            &client,
+            cg_id.as_ref(),
+            &stream_id,
+            &topic_id,
+            self.consumer_id,
+        )
+        .await;
         let rate_limiter = 
self.limit_bytes_per_second.map(BenchmarkRateLimiter::new);
 
-        // -----------------------
-        // WARM-UP
-        // -----------------------
+        (
+            client,
+            stream_id,
+            topic_id,
+            partition_id,
+            consumer,
+            rate_limiter,
+        )
+    }
 
-        if self.warmup_time.get_duration() != Duration::from_millis(0) {
-            if let Some(cg_id) = self.consumer_group_id {
-                info!(
-                    "Consumer #{}, part of consumer group #{}, → warming up 
for {}...",
-                    self.consumer_id, cg_id, self.warmup_time
-                );
-            } else {
-                info!(
-                    "Consumer #{} → warming up for {}...",
-                    self.consumer_id, self.warmup_time
-                );
-            }
+    async fn run_warmup(
+        &self,
+        client: &IggyClient,
+        stream_id: &Identifier,
+        topic_id: &Identifier,
+        partition_id: Option<u32>,
+        consumer: &Consumer,
+        rate_limiter: Option<&BenchmarkRateLimiter>,
+    ) -> Result<(), IggyError> {
+        if let Some(cg_id) = self.consumer_group_id {
+            info!(
+                "Consumer #{}, part of consumer group #{}, → warming up for 
{}...",
+                self.consumer_id, cg_id, self.warmup_time
+            );
+        } else {
+            info!(
+                "Consumer #{} → warming up for {}...",
+                self.consumer_id, self.warmup_time
+            );
+        }
 
-            let warmup_end = Instant::now() + self.warmup_time.get_duration();
-            let mut messages_processed = 0;
-            let mut last_batch_user_size_bytes = 0;
-            while Instant::now() < warmup_end {
-                if let Some(rate_limiter) = &rate_limiter {
-                    if last_batch_user_size_bytes > 0 {
-                        rate_limiter
-                            .wait_until_necessary(last_batch_user_size_bytes)
-                            .await;
-                    }
+        let warmup_end = Instant::now() + self.warmup_time.get_duration();
+        let mut messages_processed = 0;
+        let mut last_batch_user_size_bytes = 0;
+        while Instant::now() < warmup_end {
+            if let Some(rate_limiter) = rate_limiter {
+                if last_batch_user_size_bytes > 0 {
+                    rate_limiter
+                        .wait_until_necessary(last_batch_user_size_bytes)
+                        .await;
                 }
-                let messages_to_receive = self.messages_per_batch.get();
-                let offset = messages_processed;
-                let (strategy, auto_commit) = match self.polling_kind {
-                    PollingKind::Offset => (PollingStrategy::offset(offset), 
false),
-                    PollingKind::Next => (PollingStrategy::next(), true),
-                    _ => panic!(
-                        "Unsupported polling kind for benchmark: {:?}",
-                        self.polling_kind
-                    ),
-                };
-                let polled_messages = client
-                    .poll_messages(
-                        &stream_id,
-                        &topic_id,
-                        partition_id,
-                        &consumer,
-                        &strategy,
-                        messages_to_receive,
-                        auto_commit,
-                    )
-                    .await?;
+            }
+            let messages_to_receive = self.messages_per_batch.get();
+            let offset = messages_processed;
+            let (strategy, auto_commit) = match self.polling_kind {
+                PollingKind::Offset => (PollingStrategy::offset(offset), 
false),
+                PollingKind::Next => (PollingStrategy::next(), true),
+                _ => panic!(
+                    "Unsupported polling kind for benchmark: {:?}",
+                    self.polling_kind
+                ),
+            };
+            let polled_messages = client
+                .poll_messages(
+                    stream_id,
+                    topic_id,
+                    partition_id,
+                    consumer,
+                    &strategy,
+                    messages_to_receive,
+                    auto_commit,
+                )
+                .await?;
 
-                if polled_messages.messages.is_empty() {
-                    warn!(
-                        "Consumer #{} → Messages are empty for offset: {}, 
retrying...",
-                        self.consumer_id, offset
-                    );
-                    continue;
-                }
-                messages_processed = polled_messages.messages.len() as u64;
-                last_batch_user_size_bytes = 
batch_user_size_bytes(&polled_messages);
+            if polled_messages.messages.is_empty() {
+                warn!(
+                    "Consumer #{} → Messages are empty for offset: {}, 
retrying...",
+                    self.consumer_id, offset
+                );
+                continue;
             }
+            messages_processed = polled_messages.messages.len() as u64;
+            last_batch_user_size_bytes = 
batch_user_size_bytes(&polled_messages);
         }
+        Ok(())
+    }
 
-        // -----------------------
-        // MAIN BENCHMARK
-        // -----------------------
-
+    #[allow(clippy::too_many_lines)]
+    async fn run_benchmark(
+        &self,
+        client: &IggyClient,
+        stream_id: &Identifier,
+        topic_id: &Identifier,
+        partition_id: Option<u32>,
+        consumer: &Consumer,
+        rate_limiter: Option<&BenchmarkRateLimiter>,
+    ) -> Result<BenchmarkIndividualMetrics, IggyError> {
         if let Some(cg_id) = self.consumer_group_id {
             info!(
                 "Consumer #{}, part of consumer group #{} → polling {} in {} 
messages per batch from stream {}, rate limit: {:?}...",
@@ -193,7 +257,6 @@ impl BenchmarkConsumer {
 
         let max_capacity = self.finish_condition.max_capacity();
         let mut records = Vec::with_capacity(max_capacity);
-        let mut latencies: Vec<Duration> = Vec::with_capacity(max_capacity);
         let mut skipped_warnings_count: u32 = 0;
         let mut topic_not_found_counter = 0;
         let mut initial_poll_timestamp: Option<Instant> = None;
@@ -222,10 +285,10 @@ impl BenchmarkConsumer {
             let before_poll = Instant::now();
             let polled_messages = client
                 .poll_messages(
-                    &stream_id,
-                    &topic_id,
+                    stream_id,
+                    topic_id,
                     partition_id,
-                    &consumer,
+                    consumer,
                     &strategy,
                     messages_to_receive,
                     auto_commit,
@@ -249,9 +312,8 @@ impl BenchmarkConsumer {
                 if initial_poll_timestamp.is_none() {
                     initial_poll_timestamp = Some(before_poll);
                 }
-                let should_warn = last_warning_time
-                    .map(|t| t.elapsed() >= Duration::from_secs(1))
-                    .unwrap_or(true);
+                let should_warn =
+                    last_warning_time.is_none_or(|t| t.elapsed() >= 
Duration::from_secs(1));
 
                 if should_warn {
                     warn!(
@@ -270,9 +332,8 @@ impl BenchmarkConsumer {
             }
 
             if polled_messages.messages.len() != messages_to_receive as usize {
-                let should_warn = last_warning_time
-                    .map(|t| t.elapsed() >= Duration::from_secs(1))
-                    .unwrap_or(true);
+                let should_warn =
+                    last_warning_time.is_none_or(|t| t.elapsed() >= 
Duration::from_secs(1));
 
                 if should_warn {
                     warn!(
@@ -296,8 +357,6 @@ impl BenchmarkConsumer {
                 initial_poll_timestamp.unwrap_or(before_poll).elapsed()
             };
 
-            latencies.push(latency);
-
             let batch_user_size_bytes = 
batch_user_size_bytes(&polled_messages);
             let batch_total_size_bytes = 
batch_total_size_bytes(&polled_messages);
 
@@ -309,15 +368,16 @@ impl BenchmarkConsumer {
             user_data_bytes_processed += batch_user_size_bytes;
 
             records.push(BenchmarkRecord {
-                elapsed_time_us: start_timestamp.elapsed().as_micros() as u64,
-                latency_us: latency.as_micros() as u64,
+                elapsed_time_us: 
u64::try_from(start_timestamp.elapsed().as_micros())
+                    .unwrap_or(u64::MAX),
+                latency_us: 
u64::try_from(latency.as_micros()).unwrap_or(u64::MAX),
                 messages: messages_processed,
                 message_batches: batches_processed,
                 user_data_bytes: user_data_bytes_processed,
                 total_bytes: bytes_processed,
             });
 
-            if let Some(rate_limiter) = &rate_limiter {
+            if let Some(rate_limiter) = rate_limiter {
                 rate_limiter
                     .wait_until_necessary(batch_user_size_bytes)
                     .await;
@@ -332,7 +392,7 @@ impl BenchmarkConsumer {
         }
 
         let metrics = from_records(
-            records,
+            &records,
             self.benchmark_kind,
             ActorKind::Consumer,
             self.consumer_id,
@@ -343,7 +403,7 @@ impl BenchmarkConsumer {
         Self::log_statistics(
             self.consumer_id,
             messages_processed,
-            batches_processed as u32,
+            u32::try_from(batches_processed).unwrap_or(u32::MAX),
             &self.messages_per_batch,
             &metrics,
         );
diff --git a/core/bench/src/actors/producer.rs 
b/core/bench/src/actors/producer.rs
index 972aed22..c32191ab 100644
--- a/core/bench/src/actors/producer.rs
+++ b/core/bench/src/actors/producer.rs
@@ -64,7 +64,7 @@ impl BenchmarkProducer {
         moving_average_window: u32,
         limit_bytes_per_second: Option<IggyByteSize>,
     ) -> Self {
-        BenchmarkProducer {
+        Self {
             client_factory,
             benchmark_kind,
             producer_id,
@@ -134,7 +134,6 @@ impl BenchmarkProducer {
         );
 
         let max_capacity = self.finish_condition.max_capacity();
-        let mut latencies: Vec<Duration> = Vec::with_capacity(max_capacity);
         let mut records: Vec<BenchmarkRecord> = 
Vec::with_capacity(max_capacity);
         let mut messages_processed = 0;
         let mut batches_processed = 0;
@@ -158,10 +157,10 @@ impl BenchmarkProducer {
             user_data_bytes_processed += batch.user_data_bytes;
             total_bytes_processed += batch.total_bytes;
 
-            latencies.push(latency);
             records.push(BenchmarkRecord {
-                elapsed_time_us: start_timestamp.elapsed().as_micros() as u64,
-                latency_us: latency.as_micros() as u64,
+                elapsed_time_us: 
u64::try_from(start_timestamp.elapsed().as_micros())
+                    .unwrap_or(u64::MAX),
+                latency_us: 
u64::try_from(latency.as_micros()).unwrap_or(u64::MAX),
                 messages: messages_processed,
                 message_batches: batches_processed,
                 user_data_bytes: user_data_bytes_processed,
@@ -183,7 +182,7 @@ impl BenchmarkProducer {
         }
 
         let metrics = from_records(
-            records,
+            &records,
             self.benchmark_kind,
             ActorKind::Producer,
             self.producer_id,
diff --git a/core/bench/src/actors/producing_consumer.rs 
b/core/bench/src/actors/producing_consumer.rs
index f99d62e0..bc102ffd 100644
--- a/core/bench/src/actors/producing_consumer.rs
+++ b/core/bench/src/actors/producing_consumer.rs
@@ -93,12 +93,65 @@ impl BenchmarkProducingConsumer {
     }
 
     pub async fn run(self) -> Result<BenchmarkIndividualMetrics, IggyError> {
+        let (
+            client,
+            stream_id,
+            topic_id,
+            partitioning,
+            partition_id,
+            consumer,
+            mut batch_generator,
+            rate_limiter,
+        ) = self.setup_client().await?;
+
+        if self.warmup_time.get_duration() != Duration::from_millis(0) {
+            self.run_warmup(
+                &client,
+                &stream_id,
+                &topic_id,
+                &partitioning,
+                partition_id,
+                &consumer,
+                &mut batch_generator,
+            )
+            .await?;
+        }
+
+        let metrics = self
+            .run_benchmark(
+                client,
+                stream_id,
+                topic_id,
+                partitioning,
+                partition_id,
+                consumer,
+                batch_generator,
+                rate_limiter,
+            )
+            .await?;
+        Ok(metrics)
+    }
+
+    async fn setup_client(
+        &self,
+    ) -> Result<
+        (
+            IggyClient,
+            Identifier,
+            Identifier,
+            Partitioning,
+            Option<u32>,
+            Consumer,
+            BenchmarkBatchGenerator,
+            Option<BenchmarkRateLimiter>,
+        ),
+        IggyError,
+    > {
         let topic_id: u32 = 1;
         let default_partition_id: u32 = 1;
 
         let client = self.client_factory.create_client().await;
         let client = IggyClient::create(client, None, None);
-
         login_root(&client).await;
 
         let stream_id = self.stream_id.try_into()?;
@@ -116,91 +169,116 @@ impl BenchmarkProducingConsumer {
 
         let consumer = create_consumer(
             &client,
-            &self.consumer_group_id,
+            self.consumer_group_id.as_ref(),
             &stream_id,
             &topic_id,
             self.actor_id,
         )
         .await;
 
-        let mut batch_generator =
+        let batch_generator =
             BenchmarkBatchGenerator::new(self.message_size, 
self.messages_per_batch);
-
         let rate_limiter = 
self.limit_bytes_per_second.map(BenchmarkRateLimiter::new);
 
-        // -----------------------
-        // WARM-UP
-        // -----------------------
-
-        if self.warmup_time.get_duration() != Duration::from_millis(0) {
-            let warmup_end = Instant::now() + self.warmup_time.get_duration();
-            let mut offset: u64 = 0;
-            let mut last_warning_time: Option<Instant> = None;
-            let mut skipped_warnings_count: u32 = 0;
-
-            if let Some(cg_id) = self.consumer_group_id {
-                info!(
-                    "ProducingConsumer #{}, part of consumer group #{}, → 
warming up for {}...",
-                    self.actor_id, cg_id, self.warmup_time
-                );
-            } else {
-                info!(
-                    "ProducingConsumer #{} → warming up for {}...",
-                    self.actor_id, self.warmup_time
-                );
-            }
-            while Instant::now() < warmup_end {
-                let batch = batch_generator.generate_batch();
-                client
-                    .send_messages(&stream_id, &topic_id, &partitioning, &mut 
batch.messages)
-                    .await?;
-
-                let (strategy, auto_commit) = match self.polling_kind {
-                    PollingKind::Offset => (PollingStrategy::offset(offset), 
false),
-                    PollingKind::Next => (PollingStrategy::next(), true),
-                    other => panic!("Unsupported polling kind for warmup: 
{:?}", other),
-                };
-
-                let polled_messages = client
-                    .poll_messages(
-                        &stream_id,
-                        &topic_id,
-                        partition_id,
-                        &consumer,
-                        &strategy,
-                        batch.messages.len() as u32,
-                        auto_commit,
-                    )
-                    .await?;
+        Ok((
+            client,
+            stream_id,
+            topic_id,
+            partitioning,
+            partition_id,
+            consumer,
+            batch_generator,
+            rate_limiter,
+        ))
+    }
 
-                if polled_messages.messages.is_empty() {
-                    let should_warn = last_warning_time
-                        .map(|t| t.elapsed() >= Duration::from_secs(1))
-                        .unwrap_or(true);
+    #[allow(clippy::too_many_arguments)]
+    async fn run_warmup(
+        &self,
+        client: &IggyClient,
+        stream_id: &Identifier,
+        topic_id: &Identifier,
+        partitioning: &Partitioning,
+        partition_id: Option<u32>,
+        consumer: &Consumer,
+        batch_generator: &mut BenchmarkBatchGenerator,
+    ) -> Result<(), IggyError> {
+        let warmup_end = Instant::now() + self.warmup_time.get_duration();
+        let mut offset: u64 = 0;
+        let mut last_warning_time: Option<Instant> = None;
+        let mut skipped_warnings_count: u32 = 0;
 
-                    if should_warn {
-                        warn!(
-                            "ProducingConsumer #{} (warmup) → expected {} 
messages but got {}, retrying... ({} warnings skipped)",
-                            self.actor_id,
-                            self.messages_per_batch,
-                            polled_messages.messages.len(),
-                            skipped_warnings_count
-                        );
-                        last_warning_time = Some(Instant::now());
-                        skipped_warnings_count = 0;
-                    } else {
-                        skipped_warnings_count += 1;
-                    }
-                    continue;
+        if let Some(cg_id) = self.consumer_group_id {
+            info!(
+                "ProducingConsumer #{}, part of consumer group #{}, → warming 
up for {}...",
+                self.actor_id, cg_id, self.warmup_time
+            );
+        } else {
+            info!(
+                "ProducingConsumer #{} → warming up for {}...",
+                self.actor_id, self.warmup_time
+            );
+        }
+        while Instant::now() < warmup_end {
+            let batch = batch_generator.generate_batch();
+            client
+                .send_messages(stream_id, topic_id, partitioning, &mut 
batch.messages)
+                .await?;
+
+            let (strategy, auto_commit) = match self.polling_kind {
+                PollingKind::Offset => (PollingStrategy::offset(offset), 
false),
+                PollingKind::Next => (PollingStrategy::next(), true),
+                other => panic!("Unsupported polling kind for warmup: 
{other:?}"),
+            };
+
+            let polled_messages = client
+                .poll_messages(
+                    stream_id,
+                    topic_id,
+                    partition_id,
+                    consumer,
+                    &strategy,
+                    u32::try_from(batch.messages.len()).unwrap_or(u32::MAX),
+                    auto_commit,
+                )
+                .await?;
+
+            if polled_messages.messages.is_empty() {
+                let should_warn =
+                    last_warning_time.is_none_or(|t| t.elapsed() >= 
Duration::from_secs(1));
+
+                if should_warn {
+                    warn!(
+                        "ProducingConsumer #{actor_id} (warmup) → expected 
{messages_per_batch} messages but got {actual_count}, retrying... 
({skipped_warnings_count} warnings skipped)",
+                        actor_id = self.actor_id,
+                        messages_per_batch = self.messages_per_batch,
+                        actual_count = polled_messages.messages.len()
+                    );
+                    last_warning_time = Some(Instant::now());
+                    skipped_warnings_count = 0;
+                } else {
+                    skipped_warnings_count += 1;
                 }
-
-                offset += batch.messages.len() as u64;
+                continue;
             }
+
+            offset += u64::try_from(batch.messages.len()).unwrap_or(u64::MAX);
         }
+        Ok(())
+    }
 
-        // --------------------------------
-        // MAIN BENCHMARK LOOP
-        // --------------------------------
+    #[allow(clippy::too_many_arguments, clippy::too_many_lines)]
+    async fn run_benchmark(
+        self,
+        client: IggyClient,
+        stream_id: Identifier,
+        topic_id: Identifier,
+        partitioning: Partitioning,
+        partition_id: Option<u32>,
+        consumer: Consumer,
+        mut batch_generator: BenchmarkBatchGenerator,
+        rate_limiter: Option<BenchmarkRateLimiter>,
+    ) -> Result<BenchmarkIndividualMetrics, IggyError> {
         info!(
             "ProducingConsumer #{} → sending {} and polling {} ({} msgs/batch) 
on stream {}, rate limit: {:?}",
             self.actor_id,
@@ -215,7 +293,6 @@ impl BenchmarkProducingConsumer {
             .send_finish_condition
             .max_capacity()
             .max(self.poll_finish_condition.max_capacity());
-        let mut latencies: Vec<Duration> = Vec::with_capacity(max_capacity);
         let mut records: Vec<BenchmarkRecord> = 
Vec::with_capacity(max_capacity);
         let mut offset = 0;
         let mut last_warning_time: Option<Instant> = None;
@@ -243,7 +320,7 @@ impl BenchmarkProducingConsumer {
         let is_consumer = self.poll_finish_condition.total() > 0;
 
         let require_reply = is_producer && is_consumer && 
self.consumer_group_id.is_none();
-        let mut awaiting_reply = false; // meaningful only if require_reply == 
true
+        let mut awaiting_reply = false;
 
         let start_timestamp = Instant::now();
 
@@ -262,7 +339,7 @@ impl BenchmarkProducingConsumer {
 
                 sent_total_bytes_processed += batch.total_bytes;
                 sent_user_bytes_processed += batch.user_data_bytes;
-                sent_messages += batch.messages.len() as u32;
+                sent_messages += 
u32::try_from(batch.messages.len()).unwrap_or(u32::MAX);
                 sent_batches += 1;
 
                 awaiting_reply = is_consumer;
@@ -272,14 +349,14 @@ impl BenchmarkProducingConsumer {
                     .account_and_check(batch.user_data_bytes)
                 {
                     info!(
-                        "ProducingConsumer #{} → finished sending {} messages 
in {} batches ({} bytes of user data, {} bytes of total data), send finish 
condition: {}, poll finish condition: {}",
-                        self.actor_id,
-                        sent_messages.human_count_bare(),
-                        sent_batches.human_count_bare(),
-                        sent_user_bytes_processed.human_count_bytes(),
-                        sent_total_bytes_processed.human_count_bytes(),
-                        self.send_finish_condition.status(),
-                        self.poll_finish_condition.status()
+                        "ProducingConsumer #{actor_id} → finished sending 
{sent_messages} messages in {sent_batches} batches ({sent_user_bytes_processed} 
bytes of user data, {sent_total_bytes_processed} bytes of total data), send 
finish condition: {send_status}, poll finish condition: {poll_status}",
+                        actor_id = self.actor_id,
+                        sent_messages = sent_messages.human_count_bare(),
+                        sent_batches = sent_batches.human_count_bare(),
+                        sent_user_bytes_processed = 
sent_user_bytes_processed.human_count_bytes(),
+                        sent_total_bytes_processed = 
sent_total_bytes_processed.human_count_bytes(),
+                        send_status = self.send_finish_condition.status(),
+                        poll_status = self.poll_finish_condition.status()
                     );
                 }
             }
@@ -288,7 +365,7 @@ impl BenchmarkProducingConsumer {
                 let (strategy, auto_commit) = match self.polling_kind {
                     PollingKind::Offset => (PollingStrategy::offset(offset), 
false),
                     PollingKind::Next => (PollingStrategy::next(), true),
-                    other => panic!("Unsupported polling kind for benchmark: 
{:?}", other),
+                    other => panic!("Unsupported polling kind for benchmark: 
{other:?}"),
                 };
 
                 let polled_messages = client
@@ -304,18 +381,16 @@ impl BenchmarkProducingConsumer {
                     .await?;
 
                 if polled_messages.messages.is_empty() {
-                    let should_warn = last_warning_time
-                        .map(|t| t.elapsed() >= Duration::from_secs(1))
-                        .unwrap_or(true);
+                    let should_warn =
+                        last_warning_time.is_none_or(|t| t.elapsed() >= 
Duration::from_secs(1));
 
                     if should_warn {
                         warn!(
-                            "ProducingConsumer #{} → received empty batch, 
sent: {}, polled: {}, polling kind: {:?}, retrying... ({} warnings skipped in 
last second)",
-                            self.actor_id,
-                            self.send_finish_condition.status(),
-                            self.poll_finish_condition.status(),
-                            self.polling_kind,
-                            skipped_warnings_count
+                            "ProducingConsumer #{actor_id} → received empty 
batch, sent: {send_status}, polled: {poll_status}, polling kind: 
{polling_kind:?}, retrying... ({skipped_warnings_count} warnings skipped in 
last second)",
+                            actor_id = self.actor_id,
+                            send_status = self.send_finish_condition.status(),
+                            poll_status = self.poll_finish_condition.status(),
+                            polling_kind = self.polling_kind
                         );
                         last_warning_time = Some(Instant::now());
                         skipped_warnings_count = 0;
@@ -330,17 +405,17 @@ impl BenchmarkProducingConsumer {
                 let latency = Duration::from_micros(
                     now - polled_messages.messages[0].header.origin_timestamp,
                 );
-                latencies.push(latency);
 
                 last_received_batch_user_data_bytes = 
batch_user_size_bytes(&polled_messages);
                 rl_value += last_received_batch_user_data_bytes;
 
                 received_user_bytes_processed += 
last_received_batch_user_data_bytes;
                 received_total_bytes_processed += 
batch_total_size_bytes(&polled_messages);
-                received_messages += polled_messages.messages.len() as u32;
+                received_messages +=
+                    
u32::try_from(polled_messages.messages.len()).unwrap_or(u32::MAX);
                 received_batches += 1;
 
-                offset += polled_messages.messages.len() as u64;
+                offset += 
u64::try_from(polled_messages.messages.len()).unwrap_or(u64::MAX);
 
                 total_user_data_bytes_processed =
                     received_user_bytes_processed + sent_user_bytes_processed;
@@ -349,9 +424,10 @@ impl BenchmarkProducingConsumer {
                 total_batches_processed = received_batches + sent_batches;
 
                 records.push(BenchmarkRecord {
-                    elapsed_time_us: start_timestamp.elapsed().as_micros() as 
u64,
-                    latency_us: latency.as_micros() as u64,
-                    messages: total_messages_processed as u64,
+                    elapsed_time_us: 
u64::try_from(start_timestamp.elapsed().as_micros())
+                        .unwrap_or(u64::MAX),
+                    latency_us: 
u64::try_from(latency.as_micros()).unwrap_or(u64::MAX),
+                    messages: u64::from(total_messages_processed),
                     message_batches: total_batches_processed,
                     user_data_bytes: total_user_data_bytes_processed,
                     total_bytes: total_bytes_processed,
@@ -372,7 +448,7 @@ impl BenchmarkProducingConsumer {
         }
 
         let metrics = from_records(
-            records,
+            &records,
             self.benchmark_kind,
             ActorKind::ProducingConsumer,
             self.actor_id,
@@ -382,7 +458,7 @@ impl BenchmarkProducingConsumer {
 
         Self::log_statistics(
             self.actor_id,
-            total_messages_processed as u64,
+            u64::from(total_messages_processed),
             total_batches_processed,
             &self.messages_per_batch,
             &metrics,
diff --git a/core/bench/src/analytics/metrics/group.rs 
b/core/bench/src/analytics/metrics/group.rs
index 47aaf83e..dd585e37 100644
--- a/core/bench/src/analytics/metrics/group.rs
+++ b/core/bench/src/analytics/metrics/group.rs
@@ -16,14 +16,20 @@
  * under the License.
  */
 
+#![allow(clippy::cast_precision_loss)]
+#![allow(clippy::struct_field_names)]
+
 use crate::analytics::time_series::{
     calculator::TimeSeriesCalculator,
     processors::{TimeSeriesProcessor, moving_average::MovingAverageProcessor},
 };
 use bench_report::{
-    actor_kind::ActorKind, group_metrics::BenchmarkGroupMetrics,
-    group_metrics_kind::GroupMetricsKind, 
group_metrics_summary::BenchmarkGroupMetricsSummary,
-    individual_metrics::BenchmarkIndividualMetrics, utils::*,
+    actor_kind::ActorKind,
+    group_metrics::BenchmarkGroupMetrics,
+    group_metrics_kind::GroupMetricsKind,
+    group_metrics_summary::BenchmarkGroupMetricsSummary,
+    individual_metrics::BenchmarkIndividualMetrics,
+    utils::{max, min, std_dev},
 };
 
 pub fn from_producers_and_consumers_statistics(
@@ -46,78 +52,132 @@ pub fn from_individual_metrics(
     if stats.is_empty() {
         return None;
     }
-    let count = stats.len() as f64;
 
-    // Compute aggregate throughput MB/s
-    let total_throughput_megabytes_per_second: f64 = stats
+    let throughput_metrics = calculate_throughput_metrics(stats);
+    let latency_metrics = calculate_latency_metrics(stats);
+    let kind = determine_group_kind(stats);
+    let time_series = calculate_group_time_series(stats, 
moving_average_window);
+    let (min_latency_ms_value, max_latency_ms_value) =
+        calculate_min_max_latencies(stats, &time_series.2);
+
+    let summary = BenchmarkGroupMetricsSummary {
+        kind,
+        total_throughput_megabytes_per_second: 
throughput_metrics.total_megabytes_per_sec,
+        total_throughput_messages_per_second: 
throughput_metrics.total_messages_per_sec,
+        average_throughput_megabytes_per_second: 
throughput_metrics.average_megabytes_per_sec,
+        average_throughput_messages_per_second: 
throughput_metrics.average_messages_per_sec,
+        average_p50_latency_ms: latency_metrics.p50_latency,
+        average_p90_latency_ms: latency_metrics.p90_latency,
+        average_p95_latency_ms: latency_metrics.p95_latency,
+        average_p99_latency_ms: latency_metrics.p99_latency,
+        average_p999_latency_ms: latency_metrics.p999_latency,
+        average_p9999_latency_ms: latency_metrics.p9999_latency,
+        average_latency_ms: latency_metrics.average_latency,
+        average_median_latency_ms: latency_metrics.median_latency,
+        min_latency_ms: min_latency_ms_value,
+        max_latency_ms: max_latency_ms_value,
+        std_dev_latency_ms: std_dev(&time_series.2).unwrap_or(0.0),
+    };
+
+    Some(BenchmarkGroupMetrics {
+        summary,
+        avg_throughput_mb_ts: time_series.0,
+        avg_throughput_msg_ts: time_series.1,
+        avg_latency_ts: time_series.2,
+    })
+}
+
+struct ThroughputMetrics {
+    total_megabytes_per_sec: f64,
+    total_messages_per_sec: f64,
+    average_megabytes_per_sec: f64,
+    average_messages_per_sec: f64,
+}
+
+fn calculate_throughput_metrics(stats: &[BenchmarkIndividualMetrics]) -> 
ThroughputMetrics {
+    let count = stats.len() as f64;
+    let total_mb_per_sec = stats
         .iter()
         .map(|r| r.summary.throughput_megabytes_per_second)
         .sum();
-
-    // Compute aggregate throughput messages/s
-    let total_throughput_messages_per_second: f64 = stats
+    let total_msg_per_sec = stats
         .iter()
         .map(|r| r.summary.throughput_messages_per_second)
         .sum();
 
-    // Compute average throughput MB/s
-    let average_throughput_megabytes_per_second = 
total_throughput_megabytes_per_second / count;
-
-    // Compute average throughput messages/s
-    let average_throughput_messages_per_second = 
total_throughput_messages_per_second / count;
-
-    // Compute average latencies
-    let average_p50_latency_ms =
-        stats.iter().map(|r| r.summary.p50_latency_ms).sum::<f64>() / count;
-    let average_p90_latency_ms =
-        stats.iter().map(|r| r.summary.p90_latency_ms).sum::<f64>() / count;
-    let average_p95_latency_ms =
-        stats.iter().map(|r| r.summary.p95_latency_ms).sum::<f64>() / count;
-    let average_p99_latency_ms =
-        stats.iter().map(|r| r.summary.p99_latency_ms).sum::<f64>() / count;
-    let average_p999_latency_ms: f64 =
-        stats.iter().map(|r| r.summary.p999_latency_ms).sum::<f64>() / count;
-    let average_p9999_latency_ms: f64 = stats
-        .iter()
-        .map(|r| r.summary.p9999_latency_ms)
-        .sum::<f64>()
-        / count;
-    let average_avg_latency_ms =
-        stats.iter().map(|r| r.summary.avg_latency_ms).sum::<f64>() / count;
-    let average_median_latency_ms = stats
-        .iter()
-        .map(|r| r.summary.median_latency_ms)
-        .sum::<f64>()
-        / count;
+    ThroughputMetrics {
+        total_megabytes_per_sec: total_mb_per_sec,
+        total_messages_per_sec: total_msg_per_sec,
+        average_megabytes_per_sec: total_mb_per_sec / count,
+        average_messages_per_sec: total_msg_per_sec / count,
+    }
+}
+
+struct LatencyMetrics {
+    p50_latency: f64,
+    p90_latency: f64,
+    p95_latency: f64,
+    p99_latency: f64,
+    p999_latency: f64,
+    p9999_latency: f64,
+    average_latency: f64,
+    median_latency: f64,
+}
+
+fn calculate_latency_metrics(stats: &[BenchmarkIndividualMetrics]) -> 
LatencyMetrics {
+    let count = stats.len() as f64;
+
+    LatencyMetrics {
+        p50_latency: stats.iter().map(|r| 
r.summary.p50_latency_ms).sum::<f64>() / count,
+        p90_latency: stats.iter().map(|r| 
r.summary.p90_latency_ms).sum::<f64>() / count,
+        p95_latency: stats.iter().map(|r| 
r.summary.p95_latency_ms).sum::<f64>() / count,
+        p99_latency: stats.iter().map(|r| 
r.summary.p99_latency_ms).sum::<f64>() / count,
+        p999_latency: stats.iter().map(|r| 
r.summary.p999_latency_ms).sum::<f64>() / count,
+        p9999_latency: stats
+            .iter()
+            .map(|r| r.summary.p9999_latency_ms)
+            .sum::<f64>()
+            / count,
+        average_latency: stats.iter().map(|r| 
r.summary.avg_latency_ms).sum::<f64>() / count,
+        median_latency: stats
+            .iter()
+            .map(|r| r.summary.median_latency_ms)
+            .sum::<f64>()
+            / count,
+    }
+}
 
-    let kind = match stats.iter().next().unwrap().summary.actor_kind {
+fn determine_group_kind(stats: &[BenchmarkIndividualMetrics]) -> 
GroupMetricsKind {
+    match stats.iter().next().unwrap().summary.actor_kind {
         ActorKind::Producer => GroupMetricsKind::Producers,
         ActorKind::Consumer => GroupMetricsKind::Consumers,
         ActorKind::ProducingConsumer => GroupMetricsKind::ProducingConsumers,
-    };
+    }
+}
 
-    let calculator = TimeSeriesCalculator::new();
+use bench_report::time_series::TimeSeries;
 
-    let mut avg_throughput_mb_ts = calculator.aggregate_sum(
-        stats
+fn calculate_group_time_series(
+    stats: &[BenchmarkIndividualMetrics],
+    moving_average_window: u32,
+) -> (TimeSeries, TimeSeries, TimeSeries) {
+    let mut avg_throughput_mb_ts = TimeSeriesCalculator::aggregate_sum(
+        &stats
             .iter()
             .map(|r| r.throughput_mb_ts.clone())
-            .collect::<Vec<_>>()
-            .as_slice(),
+            .collect::<Vec<_>>(),
     );
-    let mut avg_throughput_msg_ts = calculator.aggregate_sum(
-        stats
+    let mut avg_throughput_msg_ts = TimeSeriesCalculator::aggregate_sum(
+        &stats
             .iter()
             .map(|r| r.throughput_msg_ts.clone())
-            .collect::<Vec<_>>()
-            .as_slice(),
+            .collect::<Vec<_>>(),
     );
-    let mut avg_latency_ts = calculator.aggregate_avg(
-        stats
+    let mut avg_latency_ts = TimeSeriesCalculator::aggregate_avg(
+        &stats
             .iter()
             .map(|r| r.latency_ts.clone())
-            .collect::<Vec<_>>()
-            .as_slice(),
+            .collect::<Vec<_>>(),
     );
 
     let sma = MovingAverageProcessor::new(moving_average_window as usize);
@@ -125,55 +185,33 @@ pub fn from_individual_metrics(
     avg_throughput_msg_ts = sma.process(&avg_throughput_msg_ts);
     avg_latency_ts = sma.process(&avg_latency_ts);
 
-    let min_latency_ms = if !stats.is_empty() {
+    (avg_throughput_mb_ts, avg_throughput_msg_ts, avg_latency_ts)
+}
+
+fn calculate_min_max_latencies(
+    stats: &[BenchmarkIndividualMetrics],
+    avg_latency_ts: &TimeSeries,
+) -> (f64, f64) {
+    let min_latency_ms = if stats.is_empty() {
+        None
+    } else {
         stats
             .iter()
             .map(|s| s.summary.min_latency_ms)
             .min_by(|a, b| 
a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal))
-    } else {
-        None
     };
 
-    let max_latency_ms = if !stats.is_empty() {
+    let max_latency_ms = if stats.is_empty() {
+        None
+    } else {
         stats
             .iter()
             .map(|s| s.summary.max_latency_ms)
             .max_by(|a, b| 
a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal))
-    } else {
-        None
     };
 
-    let min_latency_ms_value =
-        min_latency_ms.unwrap_or_else(|| min(&avg_latency_ts).unwrap_or(0.0));
+    let min_latency_ms_value = min_latency_ms.unwrap_or_else(|| 
min(avg_latency_ts).unwrap_or(0.0));
+    let max_latency_ms_value = max_latency_ms.unwrap_or_else(|| 
max(avg_latency_ts).unwrap_or(0.0));
 
-    let max_latency_ms_value =
-        max_latency_ms.unwrap_or_else(|| max(&avg_latency_ts).unwrap_or(0.0));
-
-    let std_dev_latency_ms = std_dev(&avg_latency_ts);
-
-    let summary = BenchmarkGroupMetricsSummary {
-        kind,
-        total_throughput_megabytes_per_second,
-        total_throughput_messages_per_second,
-        average_throughput_megabytes_per_second,
-        average_throughput_messages_per_second,
-        average_p50_latency_ms,
-        average_p90_latency_ms,
-        average_p95_latency_ms,
-        average_p99_latency_ms,
-        average_p999_latency_ms,
-        average_p9999_latency_ms,
-        average_latency_ms: average_avg_latency_ms,
-        average_median_latency_ms,
-        min_latency_ms: min_latency_ms_value,
-        max_latency_ms: max_latency_ms_value,
-        std_dev_latency_ms: std_dev_latency_ms.unwrap_or(0.0),
-    };
-
-    Some(BenchmarkGroupMetrics {
-        summary,
-        avg_throughput_mb_ts,
-        avg_throughput_msg_ts,
-        avg_latency_ts,
-    })
+    (min_latency_ms_value, max_latency_ms_value)
 }
diff --git a/core/bench/src/analytics/metrics/individual.rs 
b/core/bench/src/analytics/metrics/individual.rs
index 30ec44af..f9746a0e 100644
--- a/core/bench/src/analytics/metrics/individual.rs
+++ b/core/bench/src/analytics/metrics/individual.rs
@@ -16,6 +16,10 @@
  * under the License.
  */
 
+#![allow(clippy::cast_precision_loss)]
+#![allow(clippy::cast_possible_truncation)]
+#![allow(clippy::cast_sign_loss)]
+
 use crate::analytics::record::BenchmarkRecord;
 use crate::analytics::time_series::calculator::TimeSeriesCalculator;
 use crate::analytics::time_series::processors::TimeSeriesProcessor;
@@ -25,11 +29,11 @@ use bench_report::benchmark_kind::BenchmarkKind;
 use bench_report::individual_metrics::BenchmarkIndividualMetrics;
 use 
bench_report::individual_metrics_summary::BenchmarkIndividualMetricsSummary;
 use bench_report::time_series::TimeSeries;
-use bench_report::utils::*;
+use bench_report::utils::{max, min, std_dev};
 use iggy::prelude::IggyDuration;
 
 pub fn from_records(
-    records: Vec<BenchmarkRecord>,
+    records: &[BenchmarkRecord],
     benchmark_kind: BenchmarkKind,
     actor_kind: ActorKind,
     actor_id: u32,
@@ -37,52 +41,131 @@ pub fn from_records(
     moving_average_window: u32,
 ) -> BenchmarkIndividualMetrics {
     if records.is_empty() {
-        return BenchmarkIndividualMetrics {
-            summary: BenchmarkIndividualMetricsSummary {
-                benchmark_kind,
-                actor_kind,
-                actor_id,
-                total_time_secs: 0.0,
-                total_user_data_bytes: 0,
-                total_bytes: 0,
-                total_messages: 0,
-                total_message_batches: 0,
-                throughput_megabytes_per_second: 0.0,
-                throughput_messages_per_second: 0.0,
-                p50_latency_ms: 0.0,
-                p90_latency_ms: 0.0,
-                p95_latency_ms: 0.0,
-                p99_latency_ms: 0.0,
-                p999_latency_ms: 0.0,
-                p9999_latency_ms: 0.0,
-                avg_latency_ms: 0.0,
-                median_latency_ms: 0.0,
-                min_latency_ms: 0.0,
-                max_latency_ms: 0.0,
-                std_dev_latency_ms: 0.0,
-            },
-            throughput_mb_ts: TimeSeries::default(),
-            throughput_msg_ts: TimeSeries::default(),
-            latency_ts: TimeSeries::default(),
-        };
+        return create_empty_metrics(benchmark_kind, actor_kind, actor_id);
     }
 
-    let total_time_secs = records.last().unwrap().elapsed_time_us as f64 / 
1_000_000.0;
+    let (
+        total_time_secs,
+        total_user_data_bytes,
+        total_bytes,
+        total_messages,
+        total_message_batches,
+    ) = extract_totals(records);
+
+    let (throughput_mb_ts, throughput_msg_ts, latency_ts) =
+        calculate_time_series(records, sampling_time, moving_average_window);
+
+    let (throughput_megabytes_per_second, throughput_messages_per_second) = 
calculate_throughput(
+        &throughput_mb_ts,
+        &throughput_msg_ts,
+        total_time_secs,
+        total_user_data_bytes,
+        total_messages,
+    );
+
+    let latency_metrics = calculate_latency_metrics(records, &latency_ts);
 
-    let total_user_data_bytes = records.iter().last().unwrap().user_data_bytes;
-    let total_bytes = records.iter().last().unwrap().total_bytes;
-    let total_messages = records.iter().last().unwrap().messages;
-    let total_message_batches = records.iter().last().unwrap().message_batches;
+    BenchmarkIndividualMetrics {
+        summary: BenchmarkIndividualMetricsSummary {
+            benchmark_kind,
+            actor_kind,
+            actor_id,
+            total_time_secs,
+            total_user_data_bytes,
+            total_bytes,
+            total_messages,
+            total_message_batches,
+            throughput_megabytes_per_second,
+            throughput_messages_per_second,
+            p50_latency_ms: latency_metrics.p50,
+            p90_latency_ms: latency_metrics.p90,
+            p95_latency_ms: latency_metrics.p95,
+            p99_latency_ms: latency_metrics.p99,
+            p999_latency_ms: latency_metrics.p999,
+            p9999_latency_ms: latency_metrics.p9999,
+            avg_latency_ms: latency_metrics.avg,
+            median_latency_ms: latency_metrics.median,
+            min_latency_ms: latency_metrics.min,
+            max_latency_ms: latency_metrics.max,
+            std_dev_latency_ms: latency_metrics.std_dev,
+        },
+        throughput_mb_ts,
+        throughput_msg_ts,
+        latency_ts,
+    }
+}
+
+fn create_empty_metrics(
+    benchmark_kind: BenchmarkKind,
+    actor_kind: ActorKind,
+    actor_id: u32,
+) -> BenchmarkIndividualMetrics {
+    BenchmarkIndividualMetrics {
+        summary: BenchmarkIndividualMetricsSummary {
+            benchmark_kind,
+            actor_kind,
+            actor_id,
+            total_time_secs: 0.0,
+            total_user_data_bytes: 0,
+            total_bytes: 0,
+            total_messages: 0,
+            total_message_batches: 0,
+            throughput_megabytes_per_second: 0.0,
+            throughput_messages_per_second: 0.0,
+            p50_latency_ms: 0.0,
+            p90_latency_ms: 0.0,
+            p95_latency_ms: 0.0,
+            p99_latency_ms: 0.0,
+            p999_latency_ms: 0.0,
+            p9999_latency_ms: 0.0,
+            avg_latency_ms: 0.0,
+            median_latency_ms: 0.0,
+            min_latency_ms: 0.0,
+            max_latency_ms: 0.0,
+            std_dev_latency_ms: 0.0,
+        },
+        throughput_mb_ts: TimeSeries::default(),
+        throughput_msg_ts: TimeSeries::default(),
+        latency_ts: TimeSeries::default(),
+    }
+}
 
-    let calculator = TimeSeriesCalculator::new();
+fn extract_totals(records: &[BenchmarkRecord]) -> (f64, u64, u64, u64, u64) {
+    let last_record = records.last().unwrap();
+    let total_time_secs = last_record.elapsed_time_us as f64 / 1_000_000.0;
+    (
+        total_time_secs,
+        last_record.user_data_bytes,
+        last_record.total_bytes,
+        last_record.messages,
+        last_record.message_batches,
+    )
+}
 
-    let throughput_mb_ts = calculator.throughput_mb(&records, sampling_time);
-    let throughput_msg_ts = calculator.throughput_msg(&records, sampling_time);
+fn calculate_time_series(
+    records: &[BenchmarkRecord],
+    sampling_time: IggyDuration,
+    moving_average_window: u32,
+) -> (TimeSeries, TimeSeries, TimeSeries) {
+    let throughput_mb_ts = TimeSeriesCalculator::throughput_mb(records, 
sampling_time);
+    let throughput_msg_ts = TimeSeriesCalculator::throughput_msg(records, 
sampling_time);
 
     let sma = MovingAverageProcessor::new(moving_average_window as usize);
     let throughput_mb_ts = sma.process(&throughput_mb_ts);
     let throughput_msg_ts = sma.process(&throughput_msg_ts);
 
+    let latency_ts = TimeSeriesCalculator::latency(records, sampling_time);
+
+    (throughput_mb_ts, throughput_msg_ts, latency_ts)
+}
+
+fn calculate_throughput(
+    throughput_mb_ts: &TimeSeries,
+    throughput_msg_ts: &TimeSeries,
+    total_time_secs: f64,
+    total_user_data_bytes: u64,
+    total_messages: u64,
+) -> (f64, f64) {
     let throughput_megabytes_per_second = if 
!throughput_mb_ts.points.is_empty() {
         throughput_mb_ts
             .points
@@ -91,7 +174,7 @@ pub fn from_records(
             .sum::<f64>()
             / throughput_mb_ts.points.len() as f64
     } else if total_time_secs > 0.0 {
-        (total_user_data_bytes as f64) / 1_000_000.0 / total_time_secs
+        total_user_data_bytes as f64 / 1_000_000.0 / total_time_secs
     } else {
         0.0
     };
@@ -104,65 +187,72 @@ pub fn from_records(
             .sum::<f64>()
             / throughput_msg_ts.points.len() as f64
     } else if total_time_secs > 0.0 {
-        (total_messages as f64) / total_time_secs
+        total_messages as f64 / total_time_secs
     } else {
         0.0
     };
 
+    (
+        throughput_megabytes_per_second,
+        throughput_messages_per_second,
+    )
+}
+
+struct LatencyMetrics {
+    p50: f64,
+    p90: f64,
+    p95: f64,
+    p99: f64,
+    p999: f64,
+    p9999: f64,
+    avg: f64,
+    median: f64,
+    min: f64,
+    max: f64,
+    std_dev: f64,
+}
+
+fn calculate_latency_metrics(
+    records: &[BenchmarkRecord],
+    latency_ts: &TimeSeries,
+) -> LatencyMetrics {
     let mut latencies_ms: Vec<f64> = records
         .iter()
         .map(|r| r.latency_us as f64 / 1_000.0)
         .collect();
     latencies_ms.sort_by(|a, b| a.partial_cmp(b).unwrap());
 
-    let p50_latency_ms = calculate_percentile(&latencies_ms, 50.0);
-    let p90_latency_ms = calculate_percentile(&latencies_ms, 90.0);
-    let p95_latency_ms = calculate_percentile(&latencies_ms, 95.0);
-    let p99_latency_ms = calculate_percentile(&latencies_ms, 99.0);
-    let p999_latency_ms = calculate_percentile(&latencies_ms, 99.9);
-    let p9999_latency_ms = calculate_percentile(&latencies_ms, 99.99);
+    let p50 = calculate_percentile(&latencies_ms, 50.0);
+    let p90 = calculate_percentile(&latencies_ms, 90.0);
+    let p95 = calculate_percentile(&latencies_ms, 95.0);
+    let p99 = calculate_percentile(&latencies_ms, 99.0);
+    let p999 = calculate_percentile(&latencies_ms, 99.9);
+    let p9999 = calculate_percentile(&latencies_ms, 99.99);
 
-    let avg_latency_ms = latencies_ms.iter().sum::<f64>() / latencies_ms.len() 
as f64;
+    let avg = latencies_ms.iter().sum::<f64>() / latencies_ms.len() as f64;
     let len = latencies_ms.len() / 2;
-    let median_latency_ms = if latencies_ms.len() % 2 == 0 {
-        (latencies_ms[len - 1] + latencies_ms[len]) / 2.0
+    let median = if latencies_ms.len() % 2 == 0 {
+        f64::midpoint(latencies_ms[len - 1], latencies_ms[len])
     } else {
         latencies_ms[len]
     };
 
-    let latency_ts = calculator.latency(&records, sampling_time);
+    let min = min(latency_ts).unwrap_or(0.0);
+    let max = max(latency_ts).unwrap_or(0.0);
+    let std_dev = std_dev(latency_ts).unwrap_or(0.0);
 
-    let min_latency_ms = min(&latency_ts).unwrap_or(0.0);
-    let max_latency_ms = max(&latency_ts).unwrap_or(0.0);
-    let std_dev_latency_ms = std_dev(&latency_ts).unwrap_or(0.0);
-
-    BenchmarkIndividualMetrics {
-        summary: BenchmarkIndividualMetricsSummary {
-            benchmark_kind,
-            actor_kind,
-            actor_id,
-            total_time_secs,
-            total_user_data_bytes,
-            total_bytes,
-            total_messages,
-            total_message_batches,
-            throughput_megabytes_per_second,
-            throughput_messages_per_second,
-            p50_latency_ms,
-            p90_latency_ms,
-            p95_latency_ms,
-            p99_latency_ms,
-            p999_latency_ms,
-            p9999_latency_ms,
-            avg_latency_ms,
-            median_latency_ms,
-            min_latency_ms,
-            max_latency_ms,
-            std_dev_latency_ms,
-        },
-        throughput_mb_ts,
-        throughput_msg_ts,
-        latency_ts,
+    LatencyMetrics {
+        p50,
+        p90,
+        p95,
+        p99,
+        p999,
+        p9999,
+        avg,
+        median,
+        min,
+        max,
+        std_dev,
     }
 }
 
@@ -172,13 +262,13 @@ fn calculate_percentile(sorted_data: &[f64], percentile: 
f64) -> f64 {
     }
 
     let rank = percentile / 100.0 * (sorted_data.len() - 1) as f64;
-    let lower = rank.floor() as usize;
-    let upper = rank.ceil() as usize;
+    let lower = rank.floor().clamp(0.0, (sorted_data.len() - 1) as f64) as 
usize;
+    let upper = rank.ceil().clamp(0.0, (sorted_data.len() - 1) as f64) as 
usize;
 
     if upper >= sorted_data.len() {
         return sorted_data[sorted_data.len() - 1];
     }
 
     let weight = rank - lower as f64;
-    sorted_data[lower] * (1.0 - weight) + sorted_data[upper] * weight
+    sorted_data[lower].mul_add(1.0 - weight, sorted_data[upper] * weight)
 }
diff --git a/core/bench/src/analytics/record.rs 
b/core/bench/src/analytics/record.rs
index 979bf051..168e8633 100644
--- a/core/bench/src/analytics/record.rs
+++ b/core/bench/src/analytics/record.rs
@@ -16,7 +16,7 @@
  * under the License.
  */
 
-#[derive(Debug, Clone, PartialEq)]
+#[derive(Debug, Clone, PartialEq, Eq)]
 pub struct BenchmarkRecord {
     pub elapsed_time_us: u64,
     pub latency_us: u64,
diff --git a/core/bench/src/analytics/report_builder.rs 
b/core/bench/src/analytics/report_builder.rs
index ffd4dc74..248708e0 100644
--- a/core/bench/src/analytics/report_builder.rs
+++ b/core/bench/src/analytics/report_builder.rs
@@ -35,6 +35,7 @@ use iggy::prelude::{CacheMetrics, CacheMetricsKey, 
IggyTimestamp, Stats};
 pub struct BenchmarkReportBuilder;
 
 impl BenchmarkReportBuilder {
+    #[allow(clippy::cast_possible_wrap)]
     pub async fn build(
         hardware: BenchmarkHardware,
         mut params: BenchmarkParams,
@@ -45,8 +46,7 @@ impl BenchmarkReportBuilder {
 
         let timestamp =
             
DateTime::<Utc>::from_timestamp_micros(IggyTimestamp::now().as_micros() as i64)
-                .map(|dt| dt.to_rfc3339())
-                .unwrap_or_else(|| String::from("unknown"));
+                .map_or_else(|| String::from("unknown"), |dt| dt.to_rfc3339());
 
         let transport = params.transport;
         let server_addr = params.server_address.clone();
@@ -57,7 +57,7 @@ impl BenchmarkReportBuilder {
 
         if params.gitref.is_none() {
             params.gitref = Some(server_stats.iggy_server_version.clone());
-        };
+        }
 
         if params.gitref_date.is_none() {
             params.gitref_date = Some(timestamp.clone());
@@ -134,7 +134,7 @@ impl BenchmarkReportBuilder {
 }
 
 /// This function is a workaround.
-/// See server_stats.rs in `bench_report` crate for more details.
+/// See `server_stats.rs` in `bench_report` crate for more details.
 fn stats_to_benchmark_server_stats(stats: Stats) -> BenchmarkServerStats {
     BenchmarkServerStats {
         process_id: stats.process_id,
@@ -166,7 +166,7 @@ fn stats_to_benchmark_server_stats(stats: Stats) -> 
BenchmarkServerStats {
 }
 
 /// This function is a workaround.
-/// See server_stats.rs in `bench_report` crate for more details.
+/// See `server_stats.rs` in `bench_report` crate for more details.
 fn cache_metrics_to_benchmark_cache_metrics(
     cache_metrics: HashMap<CacheMetricsKey, CacheMetrics>,
 ) -> HashMap<BenchmarkCacheMetricsKey, BenchmarkCacheMetrics> {
diff --git a/core/bench/src/analytics/time_series/calculator.rs 
b/core/bench/src/analytics/time_series/calculator.rs
index 8d1a6df1..42cf211d 100644
--- a/core/bench/src/analytics/time_series/calculator.rs
+++ b/core/bench/src/analytics/time_series/calculator.rs
@@ -16,6 +16,8 @@
  * under the License.
  */
 
+#![allow(clippy::cast_precision_loss)]
+
 use super::calculators::{
     LatencyTimeSeriesCalculator, MBThroughputCalculator, 
MessageThroughputCalculator,
     ThroughputTimeSeriesCalculator, TimeSeriesCalculation,
@@ -29,34 +31,22 @@ use tracing::warn;
 pub struct TimeSeriesCalculator;
 
 impl TimeSeriesCalculator {
-    pub fn new() -> Self {
-        Self
-    }
-
-    pub fn throughput_mb(
-        &self,
-        records: &[BenchmarkRecord],
-        bucket_size: IggyDuration,
-    ) -> TimeSeries {
+    pub fn throughput_mb(records: &[BenchmarkRecord], bucket_size: 
IggyDuration) -> TimeSeries {
         let calculator = 
ThroughputTimeSeriesCalculator::new(MBThroughputCalculator);
         calculator.calculate(records, bucket_size)
     }
 
-    pub fn throughput_msg(
-        &self,
-        records: &[BenchmarkRecord],
-        bucket_size: IggyDuration,
-    ) -> TimeSeries {
+    pub fn throughput_msg(records: &[BenchmarkRecord], bucket_size: 
IggyDuration) -> TimeSeries {
         let calculator = 
ThroughputTimeSeriesCalculator::new(MessageThroughputCalculator);
         calculator.calculate(records, bucket_size)
     }
 
-    pub fn latency(&self, records: &[BenchmarkRecord], bucket_size: 
IggyDuration) -> TimeSeries {
+    pub fn latency(records: &[BenchmarkRecord], bucket_size: IggyDuration) -> 
TimeSeries {
         let calculator = LatencyTimeSeriesCalculator;
         calculator.calculate(records, bucket_size)
     }
 
-    pub fn aggregate_sum(&self, series: &[TimeSeries]) -> TimeSeries {
+    pub fn aggregate_sum(series: &[TimeSeries]) -> TimeSeries {
         if series.is_empty() {
             warn!("Attempting to aggregate empty series");
             return TimeSeries {
@@ -92,7 +82,7 @@ impl TimeSeriesCalculator {
         TimeSeries { points, kind }
     }
 
-    pub fn aggregate_avg(&self, series: &[TimeSeries]) -> TimeSeries {
+    pub fn aggregate_avg(series: &[TimeSeries]) -> TimeSeries {
         if series.is_empty() {
             warn!("Attempting to aggregate empty series");
             return TimeSeries {
diff --git a/core/bench/src/analytics/time_series/calculators/latency.rs 
b/core/bench/src/analytics/time_series/calculators/latency.rs
index 2bca3d84..93cb9fdc 100644
--- a/core/bench/src/analytics/time_series/calculators/latency.rs
+++ b/core/bench/src/analytics/time_series/calculators/latency.rs
@@ -16,6 +16,9 @@
  * under the License.
  */
 
+#![allow(clippy::cast_precision_loss)]
+#![allow(clippy::cast_possible_truncation)]
+
 use super::TimeSeriesCalculation;
 use crate::analytics::record::BenchmarkRecord;
 use bench_report::time_series::{TimePoint, TimeSeries, TimeSeriesKind};
diff --git a/core/bench/src/analytics/time_series/calculators/throughput.rs 
b/core/bench/src/analytics/time_series/calculators/throughput.rs
index b6708fa2..f2b76c1a 100644
--- a/core/bench/src/analytics/time_series/calculators/throughput.rs
+++ b/core/bench/src/analytics/time_series/calculators/throughput.rs
@@ -16,6 +16,10 @@
  * under the License.
  */
 
+#![allow(clippy::cast_precision_loss)]
+#![allow(clippy::cast_possible_truncation)]
+#![allow(clippy::cast_sign_loss)]
+
 use super::TimeSeriesCalculation;
 use crate::analytics::record::BenchmarkRecord;
 use bench_report::time_series::{TimePoint, TimeSeries, TimeSeriesKind};
@@ -68,7 +72,7 @@ pub struct ThroughputTimeSeriesCalculator<T: 
ThroughputCalculation> {
 }
 
 impl<T: ThroughputCalculation> ThroughputTimeSeriesCalculator<T> {
-    pub fn new(calculator: T) -> Self {
+    pub const fn new(calculator: T) -> Self {
         Self { calculator }
     }
 }
diff --git a/core/bench/src/analytics/time_series/processors/moving_average.rs 
b/core/bench/src/analytics/time_series/processors/moving_average.rs
index 3a0034d3..292fbba1 100644
--- a/core/bench/src/analytics/time_series/processors/moving_average.rs
+++ b/core/bench/src/analytics/time_series/processors/moving_average.rs
@@ -16,6 +16,8 @@
  * under the License.
  */
 
+#![allow(clippy::cast_precision_loss)]
+
 use super::TimeSeriesProcessor;
 use bench_report::time_series::{TimePoint, TimeSeries};
 use std::collections::VecDeque;
@@ -27,7 +29,7 @@ pub struct MovingAverageProcessor {
 }
 
 impl MovingAverageProcessor {
-    pub fn new(window_size: usize) -> Self {
+    pub const fn new(window_size: usize) -> Self {
         Self { window_size }
     }
 }
diff --git a/core/bench/src/args/common.rs b/core/bench/src/args/common.rs
index dec4ac3b..1ca93f0c 100644
--- a/core/bench/src/args/common.rs
+++ b/core/bench/src/args/common.rs
@@ -19,7 +19,15 @@
 use super::kind::BenchmarkKindCommand;
 use super::output::BenchmarkOutputCommand;
 use super::props::{BenchmarkKindProps, BenchmarkTransportProps};
-use super::{defaults::*, transport::BenchmarkTransportCommand};
+use super::{
+    defaults::{
+        DEFAULT_MESSAGE_BATCHES, DEFAULT_MESSAGE_SIZE, 
DEFAULT_MESSAGES_PER_BATCH,
+        DEFAULT_MOVING_AVERAGE_WINDOW, DEFAULT_PERFORM_CLEANUP, 
DEFAULT_SAMPLING_TIME,
+        DEFAULT_SERVER_STDOUT_VISIBILITY, DEFAULT_SKIP_SERVER_START, 
DEFAULT_START_STREAM_ID,
+        DEFAULT_WARMUP_TIME,
+    },
+    transport::BenchmarkTransportCommand,
+};
 use bench_report::benchmark_kind::BenchmarkKind;
 use bench_report::numeric_parameter::BenchmarkNumericParameter;
 use clap::error::ErrorKind;
@@ -81,7 +89,7 @@ pub struct IggyBenchArgs {
     #[arg(long, short = 'W', default_value_t = DEFAULT_MOVING_AVERAGE_WINDOW)]
     pub moving_average_window: u32,
 
-    /// Shutdown iggy-server and remove server local_data directory after the 
benchmark is finished.
+    /// Shutdown iggy-server and remove server `local_data` directory after 
the benchmark is finished.
     /// Only applicable to local benchmarks.
     #[arg(long, default_value_t = DEFAULT_PERFORM_CLEANUP, 
verbatim_doc_comment)]
     pub cleanup: bool,
@@ -125,14 +133,14 @@ impl IggyBenchArgs {
             .server_address()
     }
 
-    pub fn start_stream_id(&self) -> u32 {
+    pub const fn start_stream_id(&self) -> u32 {
         self.start_stream_id.get()
     }
 
     pub fn validate(&mut self) {
         let server_address = 
self.server_address().parse::<SocketAddr>().unwrap();
         if (self.cleanup || self.verbose) && 
!server_address.ip().is_loopback() {
-            IggyBenchArgs::command()
+            Self::command()
                 .error(
                     ErrorKind::ArgumentConflict,
                     format!(
@@ -150,7 +158,7 @@ impl IggyBenchArgs {
                 || self.extra_info().is_some()
                 || self.gitref_date().is_some())
         {
-            IggyBenchArgs::command()
+            Self::command()
                 .error(
                     ErrorKind::ArgumentConflict,
                     "--git-ref, --git-ref-date, --identifier, --remark, 
--extra-info can only be used with --output-dir",
@@ -158,14 +166,14 @@ impl IggyBenchArgs {
                 .exit();
         }
 
-        if let (None, None) = (self.message_batches, self.total_data) {
+        if (self.message_batches, self.total_data) == (None, None) {
             self.message_batches = Some(DEFAULT_MESSAGE_BATCHES);
         }
 
         if let Some(total_data) = self.total_data {
-            let samples = total_data.as_bytes_u64() / 
self.message_size().min() as u64;
+            let samples = total_data.as_bytes_u64() / 
u64::from(self.message_size().min());
             if samples <= 1 {
-                IggyBenchArgs::command()
+                Self::command()
                     .error(
                         ErrorKind::ArgumentConflict,
                         "--total-messages-size must be at least 2x greater 
than --message-size",
@@ -174,32 +182,31 @@ impl IggyBenchArgs {
             }
         }
 
-        self.benchmark_kind.inner().validate()
+        self.benchmark_kind.inner().validate();
     }
 
-    pub fn messages_per_batch(&self) -> BenchmarkNumericParameter {
+    pub const fn messages_per_batch(&self) -> BenchmarkNumericParameter {
         self.messages_per_batch
     }
 
-    pub fn message_batches(&self) -> Option<NonZeroU32> {
+    pub const fn message_batches(&self) -> Option<NonZeroU32> {
         self.message_batches
     }
 
-    pub fn message_size(&self) -> BenchmarkNumericParameter {
+    pub const fn message_size(&self) -> BenchmarkNumericParameter {
         self.message_size
     }
 
-    pub fn total_data(&self) -> Option<IggyByteSize> {
+    pub const fn total_data(&self) -> Option<IggyByteSize> {
         self.total_data
     }
 
     // Used only for generation of unique directory name
     pub fn data_volume_identifier(&self) -> String {
-        if let Some(total_messages_size) = self.total_data() {
-            format!("{}B", total_messages_size.as_bytes_u64())
-        } else {
-            self.message_batches().unwrap().to_string()
-        }
+        self.total_data().map_or_else(
+            || self.message_batches().unwrap().to_string(),
+            |total_messages_size| format!("{}B", 
total_messages_size.as_bytes_u64()),
+        )
     }
 
     pub fn streams(&self) -> u32 {
@@ -226,19 +233,19 @@ impl IggyBenchArgs {
         self.benchmark_kind.inner().number_of_consumer_groups()
     }
 
-    pub fn warmup_time(&self) -> IggyDuration {
+    pub const fn warmup_time(&self) -> IggyDuration {
         self.warmup_time
     }
 
-    pub fn sampling_time(&self) -> IggyDuration {
+    pub const fn sampling_time(&self) -> IggyDuration {
         self.sampling_time
     }
 
-    pub fn moving_average_window(&self) -> u32 {
+    pub const fn moving_average_window(&self) -> u32 {
         self.moving_average_window
     }
 
-    pub fn rate_limit(&self) -> Option<IggyByteSize> {
+    pub const fn rate_limit(&self) -> Option<IggyByteSize> {
         self.rate_limit
     }
 
@@ -348,24 +355,22 @@ impl IggyBenchArgs {
         };
 
         let actors = match &self.benchmark_kind {
-            BenchmarkKindCommand::PinnedProducer(_) => self.producers(),
-            BenchmarkKindCommand::PinnedConsumer(_) => self.consumers(),
-            BenchmarkKindCommand::PinnedProducerAndConsumer(_) => {
-                self.producers() + self.consumers()
-            }
-            BenchmarkKindCommand::BalancedProducer(_) => self.producers(),
-            BenchmarkKindCommand::BalancedConsumerGroup(_) => self.consumers(),
-            BenchmarkKindCommand::BalancedProducerAndConsumerGroup(_) => {
+            BenchmarkKindCommand::PinnedProducer(_)
+            | BenchmarkKindCommand::BalancedProducer(_)
+            | BenchmarkKindCommand::EndToEndProducingConsumer(_)
+            | BenchmarkKindCommand::EndToEndProducingConsumerGroup(_) => 
self.producers(),
+            BenchmarkKindCommand::PinnedConsumer(_)
+            | BenchmarkKindCommand::BalancedConsumerGroup(_) => 
self.consumers(),
+            BenchmarkKindCommand::PinnedProducerAndConsumer(_)
+            | BenchmarkKindCommand::BalancedProducerAndConsumerGroup(_) => {
                 self.producers() + self.consumers()
             }
-            BenchmarkKindCommand::EndToEndProducingConsumer(_) => 
self.producers(),
-            BenchmarkKindCommand::EndToEndProducingConsumerGroup(_) => 
self.producers(),
             BenchmarkKindCommand::Examples => unreachable!(),
         };
 
         let data_volume_arg = match (self.total_data, self.message_batches) {
-            (Some(total), None) => format!("{}", total),
-            (None, Some(batches)) => format!("{}", batches),
+            (Some(total), None) => format!("{total}"),
+            (None, Some(batches)) => format!("{batches}"),
             _ => unreachable!(),
         };
 
@@ -430,7 +435,7 @@ impl IggyBenchArgs {
         );
 
         if let Some(remark) = &self.remark() {
-            name.push_str(&format!(" ({})", remark));
+            name = format!("{name} ({remark})");
         }
 
         name
diff --git a/core/bench/src/args/defaults.rs b/core/bench/src/args/defaults.rs
index b49c31c9..2e3f2b1e 100644
--- a/core/bench/src/args/defaults.rs
+++ b/core/bench/src/args/defaults.rs
@@ -33,7 +33,7 @@ pub const DEFAULT_MESSAGES_PER_BATCH: NonZeroU32 = u32!(1000);
 pub const DEFAULT_MESSAGE_BATCHES: NonZeroU32 = u32!(1000);
 pub const DEFAULT_MESSAGE_SIZE: NonZeroU32 = u32!(1000);
 pub const DEFAULT_TOTAL_MESSAGES_SIZE: IggyByteSize = 
IggyByteSize::new(8_000_000);
-pub const DEFAULT_START_STREAM_ID: NonZeroU32 = u32!(3000000);
+pub const DEFAULT_START_STREAM_ID: NonZeroU32 = u32!(3_000_000);
 
 pub const DEFAULT_PINNED_NUMBER_OF_STREAMS: NonZeroU32 = u32!(8);
 pub const DEFAULT_BALANCED_NUMBER_OF_STREAMS: NonZeroU32 = u32!(1);
diff --git a/core/bench/src/args/examples.rs b/core/bench/src/args/examples.rs
index b187a12e..84a7839d 100644
--- a/core/bench/src/args/examples.rs
+++ b/core/bench/src/args/examples.rs
@@ -169,5 +169,5 @@ const EXAMPLES: &str = r#"EXAMPLES:
 "#;
 
 pub fn print_examples() {
-    println!("{}", EXAMPLES);
+    println!("{EXAMPLES}");
 }
diff --git a/core/bench/src/args/kind.rs b/core/bench/src/args/kind.rs
index deb78815..9ffa1acf 100644
--- a/core/bench/src/args/kind.rs
+++ b/core/bench/src/args/kind.rs
@@ -100,23 +100,19 @@ pub enum BenchmarkKindCommand {
 impl BenchmarkKindCommand {
     pub fn as_simple_kind(&self) -> BenchmarkKind {
         match self {
-            BenchmarkKindCommand::PinnedProducer(_) => 
BenchmarkKind::PinnedProducer,
-            BenchmarkKindCommand::PinnedConsumer(_) => 
BenchmarkKind::PinnedConsumer,
-            BenchmarkKindCommand::PinnedProducerAndConsumer(_) => {
-                BenchmarkKind::PinnedProducerAndConsumer
-            }
-            BenchmarkKindCommand::BalancedProducer(_) => 
BenchmarkKind::BalancedProducer,
-            BenchmarkKindCommand::BalancedConsumerGroup(_) => 
BenchmarkKind::BalancedConsumerGroup,
-            BenchmarkKindCommand::BalancedProducerAndConsumerGroup(_) => {
+            Self::PinnedProducer(_) => BenchmarkKind::PinnedProducer,
+            Self::PinnedConsumer(_) => BenchmarkKind::PinnedConsumer,
+            Self::PinnedProducerAndConsumer(_) => 
BenchmarkKind::PinnedProducerAndConsumer,
+            Self::BalancedProducer(_) => BenchmarkKind::BalancedProducer,
+            Self::BalancedConsumerGroup(_) => 
BenchmarkKind::BalancedConsumerGroup,
+            Self::BalancedProducerAndConsumerGroup(_) => {
                 BenchmarkKind::BalancedProducerAndConsumerGroup
             }
-            BenchmarkKindCommand::EndToEndProducingConsumer(_) => {
-                BenchmarkKind::EndToEndProducingConsumer
-            }
-            BenchmarkKindCommand::EndToEndProducingConsumerGroup(_) => {
+            Self::EndToEndProducingConsumer(_) => 
BenchmarkKind::EndToEndProducingConsumer,
+            Self::EndToEndProducingConsumerGroup(_) => {
                 BenchmarkKind::EndToEndProducingConsumerGroup
             }
-            BenchmarkKindCommand::Examples => {
+            Self::Examples => {
                 print_examples();
                 std::process::exit(0);
             }
@@ -155,15 +151,15 @@ impl BenchmarkKindProps for BenchmarkKindCommand {
 
     fn inner(&self) -> &dyn BenchmarkKindProps {
         match self {
-            BenchmarkKindCommand::PinnedProducer(args) => args,
-            BenchmarkKindCommand::PinnedConsumer(args) => args,
-            BenchmarkKindCommand::PinnedProducerAndConsumer(args) => args,
-            BenchmarkKindCommand::BalancedProducer(args) => args,
-            BenchmarkKindCommand::BalancedConsumerGroup(args) => args,
-            BenchmarkKindCommand::BalancedProducerAndConsumerGroup(args) => 
args,
-            BenchmarkKindCommand::EndToEndProducingConsumer(args) => args,
-            BenchmarkKindCommand::EndToEndProducingConsumerGroup(args) => args,
-            BenchmarkKindCommand::Examples => {
+            Self::PinnedProducer(args) => args,
+            Self::PinnedConsumer(args) => args,
+            Self::PinnedProducerAndConsumer(args) => args,
+            Self::BalancedProducer(args) => args,
+            Self::BalancedConsumerGroup(args) => args,
+            Self::BalancedProducerAndConsumerGroup(args) => args,
+            Self::EndToEndProducingConsumer(args) => args,
+            Self::EndToEndProducingConsumerGroup(args) => args,
+            Self::Examples => {
                 print_examples();
                 std::process::exit(0);
             }
@@ -171,6 +167,6 @@ impl BenchmarkKindProps for BenchmarkKindCommand {
     }
 
     fn validate(&self) {
-        self.inner().validate()
+        self.inner().validate();
     }
 }
diff --git a/core/bench/src/args/kinds/balanced/consumer_group.rs 
b/core/bench/src/args/kinds/balanced/consumer_group.rs
index d9068c51..64602bdf 100644
--- a/core/bench/src/args/kinds/balanced/consumer_group.rs
+++ b/core/bench/src/args/kinds/balanced/consumer_group.rs
@@ -17,7 +17,12 @@
  */
 
 use crate::args::{
-    common::IggyBenchArgs, defaults::*, props::BenchmarkKindProps,
+    common::IggyBenchArgs,
+    defaults::{
+        DEFAULT_BALANCED_NUMBER_OF_STREAMS, DEFAULT_NUMBER_OF_CONSUMER_GROUPS,
+        DEFAULT_NUMBER_OF_CONSUMERS,
+    },
+    props::BenchmarkKindProps,
     transport::BenchmarkTransportCommand,
 };
 use clap::{CommandFactory, Parser, error::ErrorKind};
@@ -77,8 +82,7 @@ impl BenchmarkKindProps for BalancedConsumerGroupArgs {
             cmd.error(
                 ErrorKind::ArgumentConflict,
                 format!(
-                    "In balanced consumer group, consumer groups number ({}) 
must be less than the number of streams ({})",
-                    cg_number, streams
+                    "In balanced consumer group, consumer groups number 
({cg_number}) must be less than the number of streams ({streams})"
                 ),
             )
             .exit();
diff --git a/core/bench/src/args/kinds/balanced/producer.rs 
b/core/bench/src/args/kinds/balanced/producer.rs
index 10ffdd2f..cc58e9eb 100644
--- a/core/bench/src/args/kinds/balanced/producer.rs
+++ b/core/bench/src/args/kinds/balanced/producer.rs
@@ -17,7 +17,12 @@
  */
 
 use crate::args::{
-    common::IggyBenchArgs, defaults::*, props::BenchmarkKindProps,
+    common::IggyBenchArgs,
+    defaults::{
+        DEFAULT_BALANCED_NUMBER_OF_PARTITIONS, 
DEFAULT_BALANCED_NUMBER_OF_STREAMS,
+        DEFAULT_NUMBER_OF_PRODUCERS,
+    },
+    props::BenchmarkKindProps,
     transport::BenchmarkTransportCommand,
 };
 use clap::{CommandFactory, Parser, error::ErrorKind};
diff --git a/core/bench/src/args/kinds/balanced/producer_and_consumer_group.rs 
b/core/bench/src/args/kinds/balanced/producer_and_consumer_group.rs
index 6a288f3b..ab7ced0f 100644
--- a/core/bench/src/args/kinds/balanced/producer_and_consumer_group.rs
+++ b/core/bench/src/args/kinds/balanced/producer_and_consumer_group.rs
@@ -17,7 +17,13 @@
  */
 
 use crate::args::{
-    common::IggyBenchArgs, defaults::*, props::BenchmarkKindProps,
+    common::IggyBenchArgs,
+    defaults::{
+        DEFAULT_BALANCED_NUMBER_OF_PARTITIONS, 
DEFAULT_BALANCED_NUMBER_OF_STREAMS,
+        DEFAULT_NUMBER_OF_CONSUMER_GROUPS, DEFAULT_NUMBER_OF_CONSUMERS,
+        DEFAULT_NUMBER_OF_PRODUCERS,
+    },
+    props::BenchmarkKindProps,
     transport::BenchmarkTransportCommand,
 };
 use clap::{CommandFactory, Parser, error::ErrorKind};
diff --git a/core/bench/src/args/kinds/end_to_end/producing_consumer.rs 
b/core/bench/src/args/kinds/end_to_end/producing_consumer.rs
index b33ee05e..dc0826df 100644
--- a/core/bench/src/args/kinds/end_to_end/producing_consumer.rs
+++ b/core/bench/src/args/kinds/end_to_end/producing_consumer.rs
@@ -17,7 +17,9 @@
  */
 
 use crate::args::{
-    common::IggyBenchArgs, defaults::*, props::BenchmarkKindProps,
+    common::IggyBenchArgs,
+    defaults::{DEFAULT_NUMBER_OF_PRODUCERS, DEFAULT_PINNED_NUMBER_OF_STREAMS},
+    props::BenchmarkKindProps,
     transport::BenchmarkTransportCommand,
 };
 use clap::{CommandFactory, Parser, error::ErrorKind};
diff --git a/core/bench/src/args/kinds/end_to_end/producing_consumer_group.rs 
b/core/bench/src/args/kinds/end_to_end/producing_consumer_group.rs
index 218b4e3d..af7829b5 100644
--- a/core/bench/src/args/kinds/end_to_end/producing_consumer_group.rs
+++ b/core/bench/src/args/kinds/end_to_end/producing_consumer_group.rs
@@ -17,7 +17,13 @@
  */
 
 use crate::args::{
-    common::IggyBenchArgs, defaults::*, props::BenchmarkKindProps,
+    common::IggyBenchArgs,
+    defaults::{
+        DEFAULT_BALANCED_NUMBER_OF_PARTITIONS, 
DEFAULT_BALANCED_NUMBER_OF_STREAMS,
+        DEFAULT_NUMBER_OF_CONSUMER_GROUPS, DEFAULT_NUMBER_OF_CONSUMERS,
+        DEFAULT_NUMBER_OF_PRODUCERS,
+    },
+    props::BenchmarkKindProps,
     transport::BenchmarkTransportCommand,
 };
 use clap::{CommandFactory, Parser, error::ErrorKind};
@@ -100,8 +106,7 @@ impl BenchmarkKindProps for 
EndToEndProducingConsumerGroupArgs {
             cmd.error(
                 ErrorKind::ArgumentConflict,
                 format!(
-                    "For producing consumer group benchmark, consumer groups 
number ({}) must be less than the number of streams ({})",
-                    cg_number, streams
+                    "For producing consumer group benchmark, consumer groups 
number ({cg_number}) must be less than the number of streams ({streams})"
                 ),
             )
             .exit();
diff --git a/core/bench/src/args/kinds/pinned/consumer.rs 
b/core/bench/src/args/kinds/pinned/consumer.rs
index 5d29d395..c6a1b347 100644
--- a/core/bench/src/args/kinds/pinned/consumer.rs
+++ b/core/bench/src/args/kinds/pinned/consumer.rs
@@ -17,7 +17,7 @@
  */
 
 use crate::args::{
-    common::IggyBenchArgs, defaults::*, props::BenchmarkKindProps,
+    common::IggyBenchArgs, defaults::DEFAULT_NUMBER_OF_PRODUCERS, 
props::BenchmarkKindProps,
     transport::BenchmarkTransportCommand,
 };
 use clap::{CommandFactory, Parser, error::ErrorKind};
diff --git a/core/bench/src/args/kinds/pinned/producer.rs 
b/core/bench/src/args/kinds/pinned/producer.rs
index 5ae60eea..138988fb 100644
--- a/core/bench/src/args/kinds/pinned/producer.rs
+++ b/core/bench/src/args/kinds/pinned/producer.rs
@@ -17,7 +17,7 @@
  */
 
 use crate::args::{
-    common::IggyBenchArgs, defaults::*, props::BenchmarkKindProps,
+    common::IggyBenchArgs, defaults::DEFAULT_NUMBER_OF_PRODUCERS, 
props::BenchmarkKindProps,
     transport::BenchmarkTransportCommand,
 };
 use clap::{CommandFactory, Parser, error::ErrorKind};
diff --git a/core/bench/src/args/kinds/pinned/producer_and_consumer.rs 
b/core/bench/src/args/kinds/pinned/producer_and_consumer.rs
index bb21073e..6bf6314d 100644
--- a/core/bench/src/args/kinds/pinned/producer_and_consumer.rs
+++ b/core/bench/src/args/kinds/pinned/producer_and_consumer.rs
@@ -17,7 +17,12 @@
  */
 
 use crate::args::{
-    common::IggyBenchArgs, defaults::*, props::BenchmarkKindProps,
+    common::IggyBenchArgs,
+    defaults::{
+        DEFAULT_NUMBER_OF_CONSUMERS, DEFAULT_NUMBER_OF_PRODUCERS,
+        DEFAULT_PINNED_NUMBER_OF_PARTITIONS,
+    },
+    props::BenchmarkKindProps,
     transport::BenchmarkTransportCommand,
 };
 use clap::{CommandFactory, Parser, error::ErrorKind};
diff --git a/core/bench/src/args/props.rs b/core/bench/src/args/props.rs
index 67368ed7..c3386320 100644
--- a/core/bench/src/args/props.rs
+++ b/core/bench/src/args/props.rs
@@ -42,7 +42,7 @@ pub trait BenchmarkTransportProps {
     fn server_address(&self) -> &str;
     fn client_address(&self) -> &str;
     fn validate_certificate(&self) -> bool;
-    fn output_command(&self) -> &Option<BenchmarkOutputCommand>;
+    fn output_command(&self) -> Option<&BenchmarkOutputCommand>;
     fn nodelay(&self) -> bool;
     fn inner(&self) -> &dyn BenchmarkTransportProps
     where
diff --git a/core/bench/src/args/transport.rs b/core/bench/src/args/transport.rs
index f9702579..18ca668d 100644
--- a/core/bench/src/args/transport.rs
+++ b/core/bench/src/args/transport.rs
@@ -16,7 +16,10 @@
  * under the License.
  */
 
-use super::defaults::*;
+use super::defaults::{
+    DEFAULT_HTTP_SERVER_ADDRESS, DEFAULT_QUIC_CLIENT_ADDRESS, 
DEFAULT_QUIC_SERVER_ADDRESS,
+    DEFAULT_QUIC_SERVER_NAME, DEFAULT_QUIC_VALIDATE_CERTIFICATE, 
DEFAULT_TCP_SERVER_ADDRESS,
+};
 use super::{output::BenchmarkOutputCommand, props::BenchmarkTransportProps};
 use clap::{Parser, Subcommand};
 use integration::test_server::Transport;
@@ -35,9 +38,9 @@ impl Serialize for BenchmarkTransportCommand {
         S: Serializer,
     {
         let variant_str = match self {
-            BenchmarkTransportCommand::Http(_) => "http",
-            BenchmarkTransportCommand::Tcp(_) => "tcp",
-            BenchmarkTransportCommand::Quic(_) => "quic",
+            Self::Http(_) => "http",
+            Self::Tcp(_) => "tcp",
+            Self::Quic(_) => "quic",
         };
         serializer.serialize_str(variant_str)
     }
@@ -66,13 +69,13 @@ impl BenchmarkTransportProps for BenchmarkTransportCommand {
 
     fn inner(&self) -> &dyn BenchmarkTransportProps {
         match self {
-            BenchmarkTransportCommand::Http(args) => args,
-            BenchmarkTransportCommand::Tcp(args) => args,
-            BenchmarkTransportCommand::Quic(args) => args,
+            Self::Http(args) => args,
+            Self::Tcp(args) => args,
+            Self::Quic(args) => args,
         }
     }
 
-    fn output_command(&self) -> &Option<BenchmarkOutputCommand> {
+    fn output_command(&self) -> Option<&BenchmarkOutputCommand> {
         self.inner().output_command()
     }
 }
@@ -109,8 +112,8 @@ impl BenchmarkTransportProps for HttpArgs {
         panic!("Setting nodelay for HTTP transport is not supported!")
     }
 
-    fn output_command(&self) -> &Option<BenchmarkOutputCommand> {
-        &self.output
+    fn output_command(&self) -> Option<&BenchmarkOutputCommand> {
+        self.output.as_ref()
     }
 }
 
@@ -150,8 +153,8 @@ impl BenchmarkTransportProps for TcpArgs {
         self.nodelay
     }
 
-    fn output_command(&self) -> &Option<BenchmarkOutputCommand> {
-        &self.output
+    fn output_command(&self) -> Option<&BenchmarkOutputCommand> {
+        self.output.as_ref()
     }
 }
 
@@ -199,7 +202,7 @@ impl BenchmarkTransportProps for QuicArgs {
         panic!("Setting nodelay for QUIC transport is not supported!")
     }
 
-    fn output_command(&self) -> &Option<BenchmarkOutputCommand> {
-        &self.output
+    fn output_command(&self) -> Option<&BenchmarkOutputCommand> {
+        self.output.as_ref()
     }
 }
diff --git a/core/bench/src/benchmarks/balanced_consumer_group.rs 
b/core/bench/src/benchmarks/balanced_consumer_group.rs
index bb044806..ac60fc6f 100644
--- a/core/bench/src/benchmarks/balanced_consumer_group.rs
+++ b/core/bench/src/benchmarks/balanced_consumer_group.rs
@@ -17,7 +17,10 @@
  */
 
 use super::benchmark::Benchmarkable;
-use crate::{args::common::IggyBenchArgs, benchmarks::common::*};
+use crate::{
+    args::common::IggyBenchArgs,
+    benchmarks::common::{build_consumer_futures, init_consumer_groups},
+};
 use async_trait::async_trait;
 use bench_report::{benchmark_kind::BenchmarkKind, 
individual_metrics::BenchmarkIndividualMetrics};
 use iggy::prelude::*;
@@ -32,11 +35,11 @@ pub struct BalancedConsumerGroupBenchmark {
 }
 
 impl BalancedConsumerGroupBenchmark {
-    pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn 
ClientFactory>) -> Box<Self> {
-        Box::new(Self {
+    pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn 
ClientFactory>) -> Self {
+        Self {
             args,
             client_factory,
-        })
+        }
     }
 }
 
@@ -52,7 +55,7 @@ impl Benchmarkable for BalancedConsumerGroupBenchmark {
 
         init_consumer_groups(cf, &args).await?;
 
-        let consumer_futures = build_consumer_futures(cf, &args)?;
+        let consumer_futures = build_consumer_futures(cf, &args);
         for fut in consumer_futures {
             tasks.spawn(fut);
         }
diff --git a/core/bench/src/benchmarks/balanced_producer.rs 
b/core/bench/src/benchmarks/balanced_producer.rs
index 1f8261e6..6e81ab5e 100644
--- a/core/bench/src/benchmarks/balanced_producer.rs
+++ b/core/bench/src/benchmarks/balanced_producer.rs
@@ -18,7 +18,7 @@
 
 use super::benchmark::Benchmarkable;
 use crate::args::common::IggyBenchArgs;
-use crate::benchmarks::common::*;
+use crate::benchmarks::common::build_producer_futures;
 use async_trait::async_trait;
 use bench_report::benchmark_kind::BenchmarkKind;
 use bench_report::individual_metrics::BenchmarkIndividualMetrics;
@@ -34,11 +34,11 @@ pub struct BalancedProducerBenchmark {
 }
 
 impl BalancedProducerBenchmark {
-    pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn 
ClientFactory>) -> Box<Self> {
-        Box::new(Self {
+    pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn 
ClientFactory>) -> Self {
+        Self {
             args,
             client_factory,
-        })
+        }
     }
 }
 
@@ -50,7 +50,7 @@ impl Benchmarkable for BalancedProducerBenchmark {
         self.init_streams().await?;
         let cf = &self.client_factory;
         let args = self.args.clone();
-        let producer_futures = build_producer_futures(cf, &args)?;
+        let producer_futures = build_producer_futures(cf, &args);
         let mut tasks = JoinSet::new();
 
         for fut in producer_futures {
@@ -76,10 +76,10 @@ impl Benchmarkable for BalancedProducerBenchmark {
         let streams = format!("streams: {}", self.args.streams());
         let partitions = format!("partitions: {}", 
self.args.number_of_partitions());
         let producers = format!("producers: {}", self.args.producers());
-        let max_topic_size = match self.args.max_topic_size() {
-            Some(size) => format!(" max topic size: {}", size),
-            None => format!(" max topic size: {}", 
MaxTopicSize::ServerDefault),
-        };
+        let max_topic_size = self.args.max_topic_size().map_or_else(
+            || format!(" max topic size: {}", MaxTopicSize::ServerDefault),
+            |size| format!(" max topic size: {size}"),
+        );
         let common_params = self.common_params_str();
 
         info!(
diff --git a/core/bench/src/benchmarks/balanced_producer_and_consumer_group.rs 
b/core/bench/src/benchmarks/balanced_producer_and_consumer_group.rs
index 8c86afc3..0eab5db8 100644
--- a/core/bench/src/benchmarks/balanced_producer_and_consumer_group.rs
+++ b/core/bench/src/benchmarks/balanced_producer_and_consumer_group.rs
@@ -17,7 +17,10 @@
  */
 
 use super::benchmark::Benchmarkable;
-use crate::{args::common::IggyBenchArgs, benchmarks::common::*};
+use crate::{
+    args::common::IggyBenchArgs,
+    benchmarks::common::{build_consumer_futures, build_producer_futures, 
init_consumer_groups},
+};
 use async_trait::async_trait;
 use bench_report::{benchmark_kind::BenchmarkKind, 
individual_metrics::BenchmarkIndividualMetrics};
 use iggy::prelude::*;
@@ -32,11 +35,11 @@ pub struct BalancedProducerAndConsumerGroupBenchmark {
 }
 
 impl BalancedProducerAndConsumerGroupBenchmark {
-    pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn 
ClientFactory>) -> Box<Self> {
-        Box::new(Self {
+    pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn 
ClientFactory>) -> Self {
+        Self {
             args,
             client_factory,
-        })
+        }
     }
 }
 
@@ -52,8 +55,8 @@ impl Benchmarkable for 
BalancedProducerAndConsumerGroupBenchmark {
 
         init_consumer_groups(cf, &args).await?;
 
-        let producer_futures = build_producer_futures(cf, &args)?;
-        let consumer_futures = build_consumer_futures(cf, &args)?;
+        let producer_futures = build_producer_futures(cf, &args);
+        let consumer_futures = build_consumer_futures(cf, &args);
 
         for fut in producer_futures {
             tasks.spawn(fut);
@@ -84,10 +87,10 @@ impl Benchmarkable for 
BalancedProducerAndConsumerGroupBenchmark {
         let producers = format!("producers: {}", self.args.producers());
         let consumers = format!("consumers: {}", self.args.consumers());
         let cg_count = format!("consumer groups: {}", 
self.args.number_of_consumer_groups());
-        let max_topic_size = match self.args.max_topic_size() {
-            Some(size) => format!(" max topic size: {}", size),
-            None => format!(" max topic size: {}", 
MaxTopicSize::ServerDefault),
-        };
+        let max_topic_size = self.args.max_topic_size().map_or_else(
+            || format!(" max topic size: {}", MaxTopicSize::ServerDefault),
+            |size| format!(" max topic size: {size}"),
+        );
         let common_params = self.common_params_str();
 
         info!(
diff --git a/core/bench/src/benchmarks/benchmark.rs 
b/core/bench/src/benchmarks/benchmark.rs
index e14fd132..9b96a1b8 100644
--- a/core/bench/src/benchmarks/benchmark.rs
+++ b/core/bench/src/benchmarks/benchmark.rs
@@ -43,43 +43,46 @@ impl From<IggyBenchArgs> for Box<dyn Benchmarkable> {
 
         match args.benchmark_kind {
             BenchmarkKindCommand::PinnedProducer(_) => {
-                PinnedProducerBenchmark::new(Arc::new(args), client_factory)
+                Box::new(PinnedProducerBenchmark::new(Arc::new(args), 
client_factory))
             }
 
             BenchmarkKindCommand::PinnedConsumer(_) => {
-                PinnedConsumerBenchmark::new(Arc::new(args), client_factory)
+                Box::new(PinnedConsumerBenchmark::new(Arc::new(args), 
client_factory))
             }
 
-            BenchmarkKindCommand::PinnedProducerAndConsumer(_) => {
-                PinnedProducerAndConsumerBenchmark::new(Arc::new(args), 
client_factory)
-            }
+            BenchmarkKindCommand::PinnedProducerAndConsumer(_) => Box::new(
+                PinnedProducerAndConsumerBenchmark::new(Arc::new(args), 
client_factory),
+            ),
 
-            BenchmarkKindCommand::BalancedProducer(_) => {
-                BalancedProducerBenchmark::new(Arc::new(args), client_factory)
-            }
+            BenchmarkKindCommand::BalancedProducer(_) => 
Box::new(BalancedProducerBenchmark::new(
+                Arc::new(args),
+                client_factory,
+            )),
 
-            BenchmarkKindCommand::BalancedConsumerGroup(_) => {
-                BalancedConsumerGroupBenchmark::new(Arc::new(args), 
client_factory)
-            }
+            BenchmarkKindCommand::BalancedConsumerGroup(_) => Box::new(
+                BalancedConsumerGroupBenchmark::new(Arc::new(args), 
client_factory),
+            ),
 
-            BenchmarkKindCommand::BalancedProducerAndConsumerGroup(_) => {
-                BalancedProducerAndConsumerGroupBenchmark::new(Arc::new(args), 
client_factory)
-            }
+            BenchmarkKindCommand::BalancedProducerAndConsumerGroup(_) => 
Box::new(
+                BalancedProducerAndConsumerGroupBenchmark::new(Arc::new(args), 
client_factory),
+            ),
 
-            BenchmarkKindCommand::EndToEndProducingConsumer(_) => {
-                EndToEndProducingConsumerBenchmark::new(Arc::new(args), 
client_factory)
-            }
+            BenchmarkKindCommand::EndToEndProducingConsumer(_) => Box::new(
+                EndToEndProducingConsumerBenchmark::new(Arc::new(args), 
client_factory),
+            ),
 
-            BenchmarkKindCommand::EndToEndProducingConsumerGroup(_) => {
-                EndToEndProducingConsumerGroupBenchmark::new(Arc::new(args), 
client_factory)
+            BenchmarkKindCommand::EndToEndProducingConsumerGroup(_) => 
Box::new(
+                EndToEndProducingConsumerGroupBenchmark::new(Arc::new(args), 
client_factory),
+            ),
+            BenchmarkKindCommand::Examples => {
+                unreachable!("Examples should be handled before this point")
             }
-            _ => todo!(),
         }
     }
 }
 
 #[async_trait]
-pub trait Benchmarkable {
+pub trait Benchmarkable: Send {
     async fn run(
         &mut self,
     ) -> Result<JoinSet<Result<BenchmarkIndividualMetrics, IggyError>>, 
IggyError>;
@@ -104,13 +107,13 @@ pub trait Benchmarkable {
             let stream_id = start_stream_id + i;
             if streams.iter().all(|s| s.id != stream_id) {
                 info!("Creating the test stream {}", stream_id);
-                let name = format!("stream {}", stream_id);
+                let name = format!("stream {stream_id}");
                 client.create_stream(&name, Some(stream_id)).await?;
-                let name = format!("topic {}", topic_id);
-                let max_topic_size = match self.args().max_topic_size() {
-                    Some(size) => MaxTopicSize::Custom(size),
-                    None => MaxTopicSize::Unlimited,
-                };
+                let name = format!("topic {topic_id}");
+                let max_topic_size = self
+                    .args()
+                    .max_topic_size()
+                    .map_or(MaxTopicSize::Unlimited, MaxTopicSize::Custom);
 
                 info!(
                     "Creating the test topic {} for stream {} with max topic 
size: {:?}",
@@ -145,8 +148,7 @@ pub trait Benchmarkable {
             let stream_id = start_stream_id + i;
             if streams.iter().all(|s| s.id != stream_id) {
                 return Err(IggyError::ResourceNotFound(format!(
-                    "Streams for testing are not properly initialized. Stream 
with id: {} is missing.",
-                    stream_id
+                    "Streams for testing are not properly initialized. Stream 
with id: {stream_id} is missing."
                 )));
             }
         }
@@ -159,23 +161,21 @@ pub trait Benchmarkable {
             " messages per batch: {} b,",
             self.args().messages_per_batch()
         );
-        let data = if let Some(data) = self.args().total_data() {
-            format!(" total data to send: {},", data)
-        } else {
-            format!(
-                " total batches to send: {},",
-                self.args().message_batches().unwrap()
-            )
-        };
+        let data = self.args().total_data().map_or_else(
+            || {
+                format!(
+                    " total batches to send: {},",
+                    self.args().message_batches().unwrap()
+                )
+            },
+            |data| format!(" total data to send: {data},"),
+        );
         let rate_limit = self
             .args()
             .rate_limit()
             .map(|rl| format!(" global rate limit: {rl}/s"))
             .unwrap_or_default();
 
-        format!(
-            "{}{}{}{}",
-            message_size, messages_per_batch, data, rate_limit,
-        )
+        format!("{message_size}{messages_per_batch}{data}{rate_limit}",)
     }
 }
diff --git a/core/bench/src/benchmarks/common.rs 
b/core/bench/src/benchmarks/common.rs
index de6356d2..22c9ad22 100644
--- a/core/bench/src/benchmarks/common.rs
+++ b/core/bench/src/benchmarks/common.rs
@@ -16,7 +16,7 @@
  * under the License.
  */
 
-use super::*;
+use super::{CONSUMER_GROUP_BASE_ID, CONSUMER_GROUP_NAME_PREFIX};
 use crate::{
     actors::{
         consumer::BenchmarkConsumer, producer::BenchmarkProducer,
@@ -33,7 +33,7 @@ use tracing::{error, info};
 
 pub async fn create_consumer(
     client: &IggyClient,
-    consumer_group_id: &Option<u32>,
+    consumer_group_id: Option<&u32>,
     stream_id: &Identifier,
     topic_id: &Identifier,
     consumer_id: u32,
@@ -56,7 +56,14 @@ pub async fn create_consumer(
 }
 
 pub fn rate_limit_per_actor(total_rate: Option<IggyByteSize>, actors: u32) -> 
Option<IggyByteSize> {
-    total_rate.map(|rl| (rl.as_bytes_u64() / (actors as u64)).into())
+    total_rate.and_then(|rl| {
+        let per_actor = rl.as_bytes_u64() / u64::from(actors);
+        if per_actor > 0 {
+            Some(per_actor.into())
+        } else {
+            None
+        }
+    })
 }
 
 pub async fn init_consumer_groups(
@@ -73,7 +80,7 @@ pub async fn init_consumer_groups(
     for i in 1..=cg_count {
         let consumer_group_id = CONSUMER_GROUP_BASE_ID + i;
         let stream_id = start_stream_id + i;
-        let consumer_group_name = format!("{}-{}", CONSUMER_GROUP_NAME_PREFIX, 
consumer_group_id);
+        let consumer_group_name = 
format!("{CONSUMER_GROUP_NAME_PREFIX}-{consumer_group_id}");
         info!(
             "Creating test consumer group: name={}, id={}, stream={}, 
topic={}",
             consumer_group_name, consumer_group_id, stream_id, topic_id
@@ -105,10 +112,7 @@ pub async fn init_consumer_groups(
 pub fn build_producer_futures(
     client_factory: &Arc<dyn ClientFactory>,
     args: &IggyBenchArgs,
-) -> Result<
-    Vec<impl Future<Output = Result<BenchmarkIndividualMetrics, IggyError>> + 
Send + use<>>,
-    IggyError,
-> {
+) -> Vec<impl Future<Output = Result<BenchmarkIndividualMetrics, IggyError>> + 
Send + use<>> {
     let streams = args.streams();
     let partitions = args.number_of_partitions();
     let start_stream_id = args.start_stream_id();
@@ -124,7 +128,7 @@ pub fn build_producer_futures(
         BenchmarkFinishCondition::new(args, 
BenchmarkFinishConditionMode::Shared);
     let rate_limit = rate_limit_per_actor(args.rate_limit(), actors);
 
-    let futures = (1..=producers)
+    (1..=producers)
         .map(|producer_id| {
             let client_factory = client_factory.clone();
 
@@ -154,18 +158,13 @@ pub fn build_producer_futures(
                 producer.run().await
             }
         })
-        .collect();
-
-    Ok(futures)
+        .collect()
 }
 
 pub fn build_consumer_futures(
     client_factory: &Arc<dyn ClientFactory>,
     args: &IggyBenchArgs,
-) -> Result<
-    Vec<impl Future<Output = Result<BenchmarkIndividualMetrics, IggyError>> + 
Send + use<>>,
-    IggyError,
-> {
+) -> Vec<impl Future<Output = Result<BenchmarkIndividualMetrics, IggyError>> + 
Send + use<>> {
     let start_stream_id = args.start_stream_id();
     let cg_count = args.number_of_consumer_groups();
     let consumers = args.consumers();
@@ -181,9 +180,9 @@ pub fn build_consumer_futures(
         PollingKind::Offset
     };
     let origin_timestamp_latency_calculation = match args.kind() {
-        BenchmarkKind::PinnedConsumer => false,
-        BenchmarkKind::PinnedProducerAndConsumer => false, // TODO(hubcio): in 
future, it can also be true
-        BenchmarkKind::BalancedConsumerGroup => false,
+        BenchmarkKind::PinnedConsumer
+        | BenchmarkKind::PinnedProducerAndConsumer
+        | BenchmarkKind::BalancedConsumerGroup => false, // TODO(hubcio): in 
future, PinnedProducerAndConsumer can also be true
         BenchmarkKind::BalancedProducerAndConsumerGroup => true,
         _ => unreachable!(),
     };
@@ -192,7 +191,7 @@ pub fn build_consumer_futures(
         BenchmarkFinishCondition::new(args, 
BenchmarkFinishConditionMode::Shared);
     let rate_limit = rate_limit_per_actor(args.rate_limit(), actors);
 
-    let futures = (1..=consumers)
+    (1..=consumers)
         .map(|consumer_id| {
             let client_factory = client_factory.clone();
             let finish_condition = if cg_count > 0 {
@@ -230,19 +229,14 @@ pub fn build_consumer_futures(
                 consumer.run().await
             }
         })
-        .collect();
-
-    Ok(futures)
+        .collect()
 }
 
-#[allow(clippy::too_many_arguments)]
+#[allow(clippy::needless_pass_by_value)]
 pub fn build_producing_consumers_futures(
     client_factory: Arc<dyn ClientFactory>,
     args: Arc<IggyBenchArgs>,
-) -> Result<
-    Vec<impl Future<Output = Result<BenchmarkIndividualMetrics, IggyError>> + 
Send>,
-    IggyError,
-> {
+) -> Vec<impl Future<Output = Result<BenchmarkIndividualMetrics, IggyError>> + 
Send> {
     let producing_consumers = args.producers();
     let streams = args.streams();
     let partitions = args.number_of_partitions();
@@ -252,7 +246,7 @@ pub fn build_producing_consumers_futures(
     let start_stream_id = args.start_stream_id();
     let polling_kind = PollingKind::Offset;
 
-    let futures = (1..=producing_consumers)
+    (1..=producing_consumers)
         .map(|actor_id| {
             let client_factory_clone = client_factory.clone();
             let args_clone = args.clone();
@@ -294,18 +288,14 @@ pub fn build_producing_consumers_futures(
                 actor.run().await
             }
         })
-        .collect();
-
-    Ok(futures)
+        .collect()
 }
 
+#[allow(clippy::needless_pass_by_value)]
 pub fn build_producing_consumer_groups_futures(
     client_factory: Arc<dyn ClientFactory>,
     args: Arc<IggyBenchArgs>,
-) -> Result<
-    Vec<impl Future<Output = Result<BenchmarkIndividualMetrics, IggyError>> + 
Send>,
-    IggyError,
-> {
+) -> Vec<impl Future<Output = Result<BenchmarkIndividualMetrics, IggyError>> + 
Send> {
     let producers = args.producers();
     let consumers = args.consumers();
     let total_actors = producers.max(consumers);
@@ -324,7 +314,7 @@ pub fn build_producing_consumer_groups_futures(
     let shared_poll_finish_condition =
         BenchmarkFinishCondition::new(&args, 
BenchmarkFinishConditionMode::SharedHalf);
 
-    let futures = (1..=total_actors)
+    (1..=total_actors)
         .map(|actor_id| {
             let client_factory_clone = client_factory.clone();
             let args_clone = args.clone();
@@ -371,7 +361,7 @@ pub fn build_producing_consumer_groups_futures(
                     if should_consume {
                         format!(" in group={}", consumer_group_id.unwrap())
                     } else {
-                        "".to_string()
+                        String::new()
                     },
                     stream_id,
                     actor_type
@@ -396,7 +386,5 @@ pub fn build_producing_consumer_groups_futures(
                 actor.run().await
             }
         })
-        .collect();
-
-    Ok(futures)
+        .collect()
 }
diff --git a/core/bench/src/benchmarks/end_to_end_producing_consumer.rs 
b/core/bench/src/benchmarks/end_to_end_producing_consumer.rs
index a7be281f..2818fea6 100644
--- a/core/bench/src/benchmarks/end_to_end_producing_consumer.rs
+++ b/core/bench/src/benchmarks/end_to_end_producing_consumer.rs
@@ -17,7 +17,7 @@
  */
 
 use crate::args::common::IggyBenchArgs;
-use crate::benchmarks::common::*;
+use crate::benchmarks::common::build_producing_consumers_futures;
 use async_trait::async_trait;
 use bench_report::benchmark_kind::BenchmarkKind;
 use bench_report::individual_metrics::BenchmarkIndividualMetrics;
@@ -35,11 +35,11 @@ pub struct EndToEndProducingConsumerBenchmark {
 }
 
 impl EndToEndProducingConsumerBenchmark {
-    pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn 
ClientFactory>) -> Box<Self> {
-        Box::new(Self {
+    pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn 
ClientFactory>) -> Self {
+        Self {
             args,
             client_factory,
-        })
+        }
     }
 }
 
@@ -53,7 +53,7 @@ impl Benchmarkable for EndToEndProducingConsumerBenchmark {
         let args = self.args.clone();
         let mut tasks = JoinSet::new();
 
-        let futures = build_producing_consumers_futures(cf, args.clone())?;
+        let futures = build_producing_consumers_futures(cf, args);
         for fut in futures {
             tasks.spawn(fut);
         }
@@ -76,10 +76,10 @@ impl Benchmarkable for EndToEndProducingConsumerBenchmark {
     fn print_info(&self) {
         let streams = format!("streams: {}", self.args.streams());
         let producers = format!("producers: {}", self.args.producers());
-        let max_topic_size = match self.args.max_topic_size() {
-            Some(size) => format!(" max topic size: {}", size),
-            None => format!(" max topic size: {}", 
MaxTopicSize::ServerDefault),
-        };
+        let max_topic_size = self.args.max_topic_size().map_or_else(
+            || format!(" max topic size: {}", MaxTopicSize::ServerDefault),
+            |size| format!(" max topic size: {size}"),
+        );
         let common_params = self.common_params_str();
 
         info!(
diff --git a/core/bench/src/benchmarks/end_to_end_producing_consumer_group.rs 
b/core/bench/src/benchmarks/end_to_end_producing_consumer_group.rs
index 3545cee9..2adda4d5 100644
--- a/core/bench/src/benchmarks/end_to_end_producing_consumer_group.rs
+++ b/core/bench/src/benchmarks/end_to_end_producing_consumer_group.rs
@@ -17,7 +17,7 @@
  */
 
 use crate::args::common::IggyBenchArgs;
-use crate::benchmarks::common::*;
+use crate::benchmarks::common::{build_producing_consumer_groups_futures, 
init_consumer_groups};
 use async_trait::async_trait;
 use bench_report::benchmark_kind::BenchmarkKind;
 use bench_report::individual_metrics::BenchmarkIndividualMetrics;
@@ -35,11 +35,11 @@ pub struct EndToEndProducingConsumerGroupBenchmark {
 }
 
 impl EndToEndProducingConsumerGroupBenchmark {
-    pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn 
ClientFactory>) -> Box<Self> {
-        Box::new(Self {
+    pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn 
ClientFactory>) -> Self {
+        Self {
             args,
             client_factory,
-        })
+        }
     }
 }
 
@@ -55,7 +55,7 @@ impl Benchmarkable for 
EndToEndProducingConsumerGroupBenchmark {
 
         init_consumer_groups(&cf, &args).await?;
 
-        let futures = build_producing_consumer_groups_futures(cf, args)?;
+        let futures = build_producing_consumer_groups_futures(cf, args);
         for fut in futures {
             tasks.spawn(fut);
         }
diff --git a/core/bench/src/benchmarks/pinned_consumer.rs 
b/core/bench/src/benchmarks/pinned_consumer.rs
index 15da8676..870df77a 100644
--- a/core/bench/src/benchmarks/pinned_consumer.rs
+++ b/core/bench/src/benchmarks/pinned_consumer.rs
@@ -18,7 +18,7 @@
 
 use crate::args::common::IggyBenchArgs;
 use crate::benchmarks::benchmark::Benchmarkable;
-use crate::benchmarks::common::*;
+use crate::benchmarks::common::build_consumer_futures;
 use async_trait::async_trait;
 use bench_report::benchmark_kind::BenchmarkKind;
 use bench_report::individual_metrics::BenchmarkIndividualMetrics;
@@ -34,11 +34,11 @@ pub struct PinnedConsumerBenchmark {
 }
 
 impl PinnedConsumerBenchmark {
-    pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn 
ClientFactory>) -> Box<Self> {
-        Box::new(Self {
+    pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn 
ClientFactory>) -> Self {
+        Self {
             args,
             client_factory,
-        })
+        }
     }
 }
 
@@ -52,7 +52,7 @@ impl Benchmarkable for PinnedConsumerBenchmark {
         let args = self.args.clone();
         let mut tasks: JoinSet<_> = JoinSet::new();
 
-        let futures = build_consumer_futures(cf, &args)?;
+        let futures = build_consumer_futures(cf, &args);
         for fut in futures {
             tasks.spawn(fut);
         }
diff --git a/core/bench/src/benchmarks/pinned_producer.rs 
b/core/bench/src/benchmarks/pinned_producer.rs
index 578819c8..3514e24d 100644
--- a/core/bench/src/benchmarks/pinned_producer.rs
+++ b/core/bench/src/benchmarks/pinned_producer.rs
@@ -18,7 +18,7 @@
 
 use crate::args::common::IggyBenchArgs;
 use crate::benchmarks::benchmark::Benchmarkable;
-use crate::benchmarks::common::*;
+use crate::benchmarks::common::build_producer_futures;
 use async_trait::async_trait;
 use bench_report::benchmark_kind::BenchmarkKind;
 use bench_report::individual_metrics::BenchmarkIndividualMetrics;
@@ -34,11 +34,11 @@ pub struct PinnedProducerBenchmark {
 }
 
 impl PinnedProducerBenchmark {
-    pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn 
ClientFactory>) -> Box<Self> {
-        Box::new(Self {
+    pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn 
ClientFactory>) -> Self {
+        Self {
             args,
             client_factory,
-        })
+        }
     }
 }
 
@@ -52,7 +52,7 @@ impl Benchmarkable for PinnedProducerBenchmark {
         let args = self.args.clone();
         let mut tasks = JoinSet::new();
 
-        let producer_futures = build_producer_futures(client_factory, &args)?;
+        let producer_futures = build_producer_futures(client_factory, &args);
 
         for fut in producer_futures {
             tasks.spawn(fut);
@@ -76,10 +76,10 @@ impl Benchmarkable for PinnedProducerBenchmark {
     fn print_info(&self) {
         let streams = format!("streams: {}", self.args.streams());
         let producers = format!("producers: {}", self.args.producers());
-        let max_topic_size = match self.args.max_topic_size() {
-            Some(size) => format!(" max topic size: {}", size),
-            None => format!(" max topic size: {}", 
MaxTopicSize::ServerDefault),
-        };
+        let max_topic_size = self.args.max_topic_size().map_or_else(
+            || format!(" max topic size: {}", MaxTopicSize::ServerDefault),
+            |size| format!(" max topic size: {size}"),
+        );
         let common_params = self.common_params_str();
 
         info!(
diff --git a/core/bench/src/benchmarks/pinned_producer_and_consumer.rs 
b/core/bench/src/benchmarks/pinned_producer_and_consumer.rs
index 1940dc9b..cde6632c 100644
--- a/core/bench/src/benchmarks/pinned_producer_and_consumer.rs
+++ b/core/bench/src/benchmarks/pinned_producer_and_consumer.rs
@@ -17,7 +17,7 @@
 
 use crate::args::common::IggyBenchArgs;
 use crate::benchmarks::benchmark::Benchmarkable;
-use crate::benchmarks::common::*;
+use crate::benchmarks::common::{build_consumer_futures, 
build_producer_futures};
 use async_trait::async_trait;
 use bench_report::benchmark_kind::BenchmarkKind;
 use bench_report::individual_metrics::BenchmarkIndividualMetrics;
@@ -33,11 +33,11 @@ pub struct PinnedProducerAndConsumerBenchmark {
 }
 
 impl PinnedProducerAndConsumerBenchmark {
-    pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn 
ClientFactory>) -> Box<Self> {
-        Box::new(Self {
+    pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn 
ClientFactory>) -> Self {
+        Self {
             args,
             client_factory,
-        })
+        }
     }
 }
 
@@ -51,8 +51,8 @@ impl Benchmarkable for PinnedProducerAndConsumerBenchmark {
         let args = self.args.clone();
         let mut tasks = JoinSet::new();
 
-        let producer_futures = build_producer_futures(cf, &args)?;
-        let consumer_futures = build_consumer_futures(cf, &args)?;
+        let producer_futures = build_producer_futures(cf, &args);
+        let consumer_futures = build_consumer_futures(cf, &args);
 
         for fut in producer_futures {
             tasks.spawn(fut);
@@ -82,10 +82,10 @@ impl Benchmarkable for PinnedProducerAndConsumerBenchmark {
         let partitions = format!("partitions: {}", 
self.args.number_of_partitions());
         let producers = format!("producers: {}", self.args.producers());
         let consumers = format!("consumers: {}", self.args.consumers());
-        let max_topic_size = match self.args.max_topic_size() {
-            Some(size) => format!(" max topic size: {}", size),
-            None => format!(" max topic size: {}", 
MaxTopicSize::ServerDefault),
-        };
+        let max_topic_size = self.args.max_topic_size().map_or_else(
+            || format!(" max topic size: {}", MaxTopicSize::ServerDefault),
+            |size| format!(" max topic size: {size}"),
+        );
         let common_params = self.common_params_str();
 
         info!(
diff --git a/core/bench/src/main.rs b/core/bench/src/main.rs
index fc23cf5f..77d7d66b 100644
--- a/core/bench/src/main.rs
+++ b/core/bench/src/main.rs
@@ -56,7 +56,7 @@ async fn main() -> Result<(), IggyError> {
     });
 
     // Configure logging
-    let env_filter = 
EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO"));
+    let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| 
EnvFilter::new("INFO"));
     let stdout_layer = fmt::layer().with_ansi(true);
 
     // If output directory is specified, also log to file
@@ -78,7 +78,7 @@ async fn main() -> Result<(), IggyError> {
             .init();
     }
 
-    let mut benchmark_runner = BenchmarkRunner::new(args);
+    let benchmark_runner = BenchmarkRunner::new(args);
 
     info!("Starting the benchmarks...");
     let ctrl_c = tokio::signal::ctrl_c();
diff --git a/core/bench/src/plot.rs b/core/bench/src/plot.rs
index 3d75be77..fae733aa 100644
--- a/core/bench/src/plot.rs
+++ b/core/bench/src/plot.rs
@@ -31,28 +31,28 @@ pub enum ChartType {
 }
 
 impl ChartType {
-    fn name(&self) -> &'static str {
+    const fn name(&self) -> &'static str {
         match self {
-            ChartType::Throughput => "throughput",
-            ChartType::Latency => "latency",
+            Self::Throughput => "throughput",
+            Self::Latency => "latency",
         }
     }
 
     fn create_chart(&self) -> fn(&BenchmarkReport, bool, bool) -> Chart {
         match self {
-            ChartType::Throughput => bench_report::create_throughput_chart,
-            ChartType::Latency => bench_report::create_latency_chart,
+            Self::Throughput => bench_report::create_throughput_chart,
+            Self::Latency => bench_report::create_latency_chart,
         }
     }
 
     fn get_samples(&self, report: &BenchmarkReport) -> usize {
         match self {
-            ChartType::Throughput => report
+            Self::Throughput => report
                 .individual_metrics
                 .iter()
                 .map(|m| m.throughput_mb_ts.points.len())
                 .sum(),
-            ChartType::Latency => report
+            Self::Latency => report
                 .individual_metrics
                 .iter()
                 .filter(|m| !m.latency_ts.points.is_empty())
@@ -85,7 +85,7 @@ fn open_in_browser(path: &str) -> std::io::Result<()> {
 pub fn plot_chart(
     report: &BenchmarkReport,
     output_directory: &str,
-    chart_type: ChartType,
+    chart_type: &ChartType,
     should_open_in_browser: bool,
 ) -> std::io::Result<()> {
     let data_processing_start = Instant::now();
@@ -97,12 +97,12 @@ pub fn plot_chart(
     save_chart(&chart, file_name, output_directory, 1600, 1200)?;
 
     if should_open_in_browser {
-        let chart_path = format!("{}/{}.html", output_directory, file_name);
+        let chart_path = format!("{output_directory}/{file_name}.html");
         open_in_browser(&chart_path)?;
     }
 
     let total_samples = chart_type.get_samples(report);
-    let report_path = format!("{}/report.json", output_directory);
+    let report_path = format!("{output_directory}/report.json");
     let report_size = 
IggyByteSize::from(std::fs::metadata(&report_path)?.len());
 
     let chart_render_time = chart_render_start.elapsed();
@@ -129,10 +129,10 @@ fn save_chart(
 ) -> std::io::Result<()> {
     let parent = Path::new(output_directory).parent().unwrap();
     std::fs::create_dir_all(parent)?;
-    let full_output_path = Path::new(output_directory).join(format!("{}.html", 
file_name));
+    let full_output_path = 
Path::new(output_directory).join(format!("{file_name}.html"));
 
     let mut renderer = HtmlRenderer::new(file_name, width, 
height).theme(Theme::Dark);
     renderer
         .save(chart, &full_output_path)
-        .map_err(|e| std::io::Error::other(format!("Failed to save HTML plot: 
{}", e)))
+        .map_err(|e| std::io::Error::other(format!("Failed to save HTML plot: 
{e}")))
 }
diff --git a/core/bench/src/runner.rs b/core/bench/src/runner.rs
index db0feaf6..49c5e696 100644
--- a/core/bench/src/runner.rs
+++ b/core/bench/src/runner.rs
@@ -37,17 +37,17 @@ pub struct BenchmarkRunner {
 }
 
 impl BenchmarkRunner {
-    pub fn new(args: IggyBenchArgs) -> Self {
+    pub const fn new(args: IggyBenchArgs) -> Self {
         Self {
             args: Some(args),
             test_server: None,
         }
     }
 
-    pub async fn run(&mut self) -> Result<(), IggyError> {
-        let mut args = self.args.take().unwrap();
+    pub async fn run(mut self) -> Result<(), IggyError> {
+        let args = self.args.take().unwrap();
         let should_open_charts = args.open_charts();
-        self.test_server = start_server_if_needed(&mut args).await;
+        self.test_server = start_server_if_needed(&args).await;
 
         let transport = args.transport();
         let server_addr = args.server_address();
@@ -113,7 +113,7 @@ impl BenchmarkRunner {
             plot_chart(
                 &report,
                 &full_output_path,
-                ChartType::Throughput,
+                &ChartType::Throughput,
                 should_open_charts,
             )
             .map_err(|e| {
@@ -123,7 +123,7 @@ impl BenchmarkRunner {
             plot_chart(
                 &report,
                 &full_output_path,
-                ChartType::Latency,
+                &ChartType::Latency,
                 should_open_charts,
             )
             .map_err(|e| {
diff --git a/core/bench/src/utils/cpu_name.rs b/core/bench/src/utils/cpu_name.rs
index e36ff2c6..adf08d95 100644
--- a/core/bench/src/utils/cpu_name.rs
+++ b/core/bench/src/utils/cpu_name.rs
@@ -25,8 +25,10 @@ pub fn append_cpu_name_lowercase(to: &mut String) {
     let cpu = sys
         .cpus()
         .first()
-        .map(|cpu| (cpu.brand().to_string(), cpu.frequency()))
-        .unwrap_or_else(|| (String::from("unknown"), 0))
+        .map_or_else(
+            || (String::from("unknown"), 0),
+            |cpu| (cpu.brand().to_string(), cpu.frequency()),
+        )
         .0
         .to_lowercase()
         .replace(' ', "_");
diff --git a/core/bench/src/utils/finish_condition.rs 
b/core/bench/src/utils/finish_condition.rs
index 102e65ab..01bd174e 100644
--- a/core/bench/src/utils/finish_condition.rs
+++ b/core/bench/src/utils/finish_condition.rs
@@ -28,7 +28,7 @@ use std::{
 const MINIMUM_MSG_PAYLOAD_SIZE: usize = 20;
 
 /// Determines how to calculate the finish condition's workload division
-#[derive(Debug, Clone, Copy, PartialEq)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 pub enum BenchmarkFinishConditionMode {
     /// Global condition shares work across all actors
     Shared,
@@ -68,9 +68,9 @@ impl BenchmarkFinishCondition {
     ///
     /// The mode parameter automatically determines the appropriate workload 
division factor:
     /// - Global: factor = 1 (total workload is shared across all actors)
-    /// - PerProducer: factor = number of producers
-    /// - PerConsumer: factor = number of consumers
-    /// - PerProducingConsumer: factor = number of producing consumers * 2
+    /// - `PerProducer`: factor = number of producers
+    /// - `PerConsumer`: factor = number of consumers
+    /// - `PerProducingConsumer`: factor = number of producing consumers * 2
     pub fn new(args: &IggyBenchArgs, mode: BenchmarkFinishConditionMode) -> 
Arc<Self> {
         let total_data = args.total_data();
         let batches_count = args.message_batches();
@@ -84,17 +84,17 @@ impl BenchmarkFinishCondition {
         };
 
         let total_data_multiplier = match args.benchmark_kind {
-            BenchmarkKindCommand::PinnedProducer(_) => args.producers(),
-            BenchmarkKindCommand::PinnedConsumer(_) => args.consumers(),
+            BenchmarkKindCommand::PinnedProducer(_)
+            | BenchmarkKindCommand::BalancedProducer(_)
+            | BenchmarkKindCommand::BalancedProducerAndConsumerGroup(_) => 
args.producers(),
+            BenchmarkKindCommand::PinnedConsumer(_)
+            | BenchmarkKindCommand::BalancedConsumerGroup(_) => 
args.consumers(),
             BenchmarkKindCommand::PinnedProducerAndConsumer(_) => {
                 args.producers() + args.consumers()
             }
-            BenchmarkKindCommand::BalancedProducer(_) => args.producers(),
-            BenchmarkKindCommand::BalancedConsumerGroup(_) => args.consumers(),
-            BenchmarkKindCommand::BalancedProducerAndConsumerGroup(_) => 
args.producers(),
-            BenchmarkKindCommand::EndToEndProducingConsumer(_) => 
args.producers() * 2,
-            BenchmarkKindCommand::EndToEndProducingConsumerGroup(_) => 
args.producers() * 2,
-            _ => unreachable!(),
+            BenchmarkKindCommand::EndToEndProducingConsumer(_)
+            | BenchmarkKindCommand::EndToEndProducingConsumerGroup(_) => 
args.producers() * 2,
+            BenchmarkKindCommand::Examples => unreachable!(),
         };
 
         Arc::new(match (total_data, batches_count) {
@@ -103,18 +103,20 @@ impl BenchmarkFinishCondition {
 
                 Self {
                     kind: BenchmarkFinishConditionType::ByMessageBatchesCount,
-                    total: count_per_actor as u64,
-                    left_total: Arc::new(AtomicI64::new(count_per_actor as 
i64)),
+                    total: u64::from(count_per_actor),
+                    left_total: 
Arc::new(AtomicI64::new(i64::from(count_per_actor))),
                     mode,
                 }
             }
             (Some(size), None) => {
-                let bytes_per_actor = size.as_bytes_u64() / total_data_factor 
as u64;
+                let bytes_per_actor = size.as_bytes_u64() / 
u64::from(total_data_factor);
 
                 Self {
                     kind: BenchmarkFinishConditionType::ByTotalData,
                     total: bytes_per_actor,
-                    left_total: Arc::new(AtomicI64::new(bytes_per_actor as 
i64)),
+                    left_total: Arc::new(AtomicI64::new(
+                        i64::try_from(bytes_per_actor).unwrap_or(i64::MAX),
+                    )),
                     mode,
                 }
             }
@@ -136,8 +138,10 @@ impl BenchmarkFinishCondition {
     pub fn account_and_check(&self, size_to_subtract: u64) -> bool {
         match self.kind {
             BenchmarkFinishConditionType::ByTotalData => {
-                self.left_total
-                    .fetch_sub(size_to_subtract as i64, Ordering::AcqRel);
+                self.left_total.fetch_sub(
+                    i64::try_from(size_to_subtract).unwrap_or(i64::MAX),
+                    Ordering::AcqRel,
+                );
             }
             BenchmarkFinishConditionType::ByMessageBatchesCount => {
                 self.left_total.fetch_sub(1, Ordering::AcqRel);
@@ -150,7 +154,7 @@ impl BenchmarkFinishCondition {
         self.left() <= 0
     }
 
-    pub fn total(&self) -> u64 {
+    pub const fn total(&self) -> u64 {
         self.total
     }
 
@@ -175,8 +179,8 @@ impl BenchmarkFinishCondition {
     }
 
     pub fn status(&self) -> String {
-        let done = self.total() as i64 - self.left();
-        let total = self.total() as i64;
+        let done = i64::try_from(self.total()).unwrap_or(i64::MAX) - 
self.left();
+        let total = i64::try_from(self.total()).unwrap_or(i64::MAX);
         match self.kind {
             BenchmarkFinishConditionType::ByTotalData => {
                 format!(
@@ -200,9 +204,9 @@ impl BenchmarkFinishCondition {
     pub fn max_capacity(&self) -> usize {
         let value = self.left_total.load(Ordering::Relaxed);
         if self.kind == BenchmarkFinishConditionType::ByTotalData {
-            value as usize / MINIMUM_MSG_PAYLOAD_SIZE
+            usize::try_from(value).unwrap_or(0) / MINIMUM_MSG_PAYLOAD_SIZE
         } else {
-            value as usize
+            usize::try_from(value).unwrap_or(0)
         }
     }
 }
@@ -210,11 +214,11 @@ impl BenchmarkFinishCondition {
 impl Display for BenchmarkFinishConditionMode {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         match self {
-            BenchmarkFinishConditionMode::Shared => write!(f, "shared"),
-            BenchmarkFinishConditionMode::SharedHalf => write!(f, 
"shared-half"),
-            BenchmarkFinishConditionMode::PerProducer => write!(f, 
"per-producer"),
-            BenchmarkFinishConditionMode::PerConsumer => write!(f, 
"per-consumer"),
-            BenchmarkFinishConditionMode::PerProducingConsumer => {
+            Self::Shared => write!(f, "shared"),
+            Self::SharedHalf => write!(f, "shared-half"),
+            Self::PerProducer => write!(f, "per-producer"),
+            Self::PerConsumer => write!(f, "per-consumer"),
+            Self::PerProducingConsumer => {
                 write!(f, "per-producing-consumer")
             }
         }
diff --git a/core/bench/src/utils/mod.rs b/core/bench/src/utils/mod.rs
index c27cb3ae..42d316f0 100644
--- a/core/bench/src/utils/mod.rs
+++ b/core/bench/src/utils/mod.rs
@@ -26,7 +26,17 @@ use integration::test_server::Transport;
 use std::{fs, path::Path};
 use tracing::{error, info};
 
-use crate::args::{common::IggyBenchArgs, defaults::*};
+use crate::args::{
+    common::IggyBenchArgs,
+    defaults::{
+        DEFAULT_BALANCED_NUMBER_OF_PARTITIONS, 
DEFAULT_BALANCED_NUMBER_OF_STREAMS,
+        DEFAULT_HTTP_SERVER_ADDRESS, DEFAULT_MESSAGE_BATCHES, 
DEFAULT_MESSAGE_SIZE,
+        DEFAULT_MESSAGES_PER_BATCH, DEFAULT_NUMBER_OF_CONSUMER_GROUPS, 
DEFAULT_NUMBER_OF_CONSUMERS,
+        DEFAULT_NUMBER_OF_PRODUCERS, DEFAULT_PINNED_NUMBER_OF_PARTITIONS,
+        DEFAULT_PINNED_NUMBER_OF_STREAMS, DEFAULT_QUIC_SERVER_ADDRESS, 
DEFAULT_TCP_SERVER_ADDRESS,
+        DEFAULT_TOTAL_MESSAGES_SIZE, DEFAULT_WARMUP_TIME,
+    },
+};
 
 pub mod batch_generator;
 pub mod client_factory;
@@ -64,7 +74,7 @@ pub async fn get_server_stats(
             .build()?,
         BenchmarkTransport::Http => client
             .with_http()
-            .with_api_url(format!("http://{}";, server_address))
+            .with_api_url(format!("http://{server_address}";))
             .build()?,
         BenchmarkTransport::Quic => client
             .with_quic()
@@ -94,7 +104,7 @@ pub async fn collect_server_logs_and_save_to_file(
             .build()?,
         BenchmarkTransport::Http => client
             .with_http()
-            .with_api_url(format!("http://{}";, server_address))
+            .with_api_url(format!("http://{server_address}";))
             .build()?,
         BenchmarkTransport::Quic => client
             .with_quic()
@@ -159,7 +169,7 @@ pub fn params_from_args_and_metrics(
 
     let remark_for_identifier = remark
         .clone()
-        .unwrap_or("no_remark".to_string())
+        .unwrap_or_else(|| "no_remark".to_string())
         .replace(' ', "_");
 
     let data_volume_identifier = args.data_volume_identifier();
@@ -206,60 +216,68 @@ pub fn params_from_args_and_metrics(
 fn recreate_bench_command(args: &IggyBenchArgs) -> String {
     let mut parts = Vec::new();
 
-    // If using localhost, add env vars
-    let server_address = args.server_address();
+    add_environment_variables(&mut parts, args.server_address());
+    parts.push("iggy-bench".to_string());
+
+    add_basic_arguments(&mut parts, args);
+    add_benchmark_kind_arguments(&mut parts, args);
+    add_infrastructure_arguments(&mut parts, args);
+    add_output_arguments(&mut parts, args);
+
+    parts.join(" ")
+}
+
+fn add_environment_variables(parts: &mut Vec<String>, server_address: &str) {
     let is_localhost = server_address
         .split(':')
         .next()
-        .map(|host| host == "localhost" || host == "127.0.0.1")
-        .unwrap_or(false);
+        .is_some_and(|host| host == "localhost" || host == "127.0.0.1");
 
     if is_localhost {
-        // Get all env vars starting with IGGY_
         let iggy_vars: Vec<_> = std::env::vars()
             .filter(|(k, _)| k.starts_with("IGGY_"))
             .collect();
 
         if !iggy_vars.is_empty() {
             info!("Found env vars starting with IGGY_: {:?}", iggy_vars);
-            parts.extend(iggy_vars.into_iter().map(|(k, v)| format!("{}={}", 
k, v)));
+            parts.extend(iggy_vars.into_iter().map(|(k, v)| 
format!("{k}={v}")));
         }
     }
+}
 
-    parts.push("iggy-bench".to_string());
-
+fn add_basic_arguments(parts: &mut Vec<String>, args: &IggyBenchArgs) {
     let messages_per_batch = args.messages_per_batch();
     if messages_per_batch != 
BenchmarkNumericParameter::Value(DEFAULT_MESSAGES_PER_BATCH.get()) {
-        parts.push(format!("--messages-per-batch {}", messages_per_batch));
+        parts.push(format!("--messages-per-batch {messages_per_batch}"));
     }
 
-    let message_batches = args.message_batches();
-    if let Some(message_batches) = message_batches {
+    if let Some(message_batches) = args.message_batches() {
         if message_batches != DEFAULT_MESSAGE_BATCHES {
-            parts.push(format!("--message-batches {}", message_batches));
+            parts.push(format!("--message-batches {message_batches}"));
         }
     }
 
-    let total_messages_size = args.total_data();
-    if let Some(total_messages_size) = total_messages_size {
+    if let Some(total_messages_size) = args.total_data() {
         if total_messages_size != DEFAULT_TOTAL_MESSAGES_SIZE {
-            parts.push(format!("--total-messages-size {}", 
total_messages_size));
+            parts.push(format!("--total-messages-size {total_messages_size}"));
         }
     }
 
     let message_size = args.message_size();
     if message_size != 
BenchmarkNumericParameter::Value(DEFAULT_MESSAGE_SIZE.get()) {
-        parts.push(format!("--message-size {}", message_size));
+        parts.push(format!("--message-size {message_size}"));
     }
 
     if let Some(rate_limit) = args.rate_limit() {
-        parts.push(format!("--rate-limit \'{}\'", rate_limit));
+        parts.push(format!("--rate-limit \'{rate_limit}\'"));
     }
 
     if args.warmup_time().to_string() != DEFAULT_WARMUP_TIME {
         parts.push(format!("--warmup-time \'{}\'", args.warmup_time()));
     }
+}
 
+fn add_benchmark_kind_arguments(parts: &mut Vec<String>, args: &IggyBenchArgs) 
{
     let kind_str = match args.benchmark_kind.as_simple_kind() {
         BenchmarkKind::PinnedProducer => "pinned-producer",
         BenchmarkKind::PinnedConsumer => "pinned-consumer",
@@ -272,6 +290,10 @@ fn recreate_bench_command(args: &IggyBenchArgs) -> String {
     };
     parts.push(kind_str.to_string());
 
+    add_actor_arguments(parts, args);
+}
+
+fn add_actor_arguments(parts: &mut Vec<String>, args: &IggyBenchArgs) {
     let producers = args.producers();
     let consumers = args.consumers();
     let number_of_consumer_groups = args.number_of_consumer_groups();
@@ -281,36 +303,38 @@ fn recreate_bench_command(args: &IggyBenchArgs) -> String 
{
         | BenchmarkKind::BalancedProducer
         | BenchmarkKind::EndToEndProducingConsumer => {
             if producers != DEFAULT_NUMBER_OF_PRODUCERS.get() {
-                parts.push(format!("--producers {}", producers));
+                parts.push(format!("--producers {producers}"));
             }
         }
         BenchmarkKind::PinnedConsumer | BenchmarkKind::BalancedConsumerGroup 
=> {
             if consumers != DEFAULT_NUMBER_OF_CONSUMERS.get() {
-                parts.push(format!("--consumers {}", consumers));
+                parts.push(format!("--consumers {consumers}"));
             }
         }
         BenchmarkKind::PinnedProducerAndConsumer
         | BenchmarkKind::BalancedProducerAndConsumerGroup => {
             if producers != DEFAULT_NUMBER_OF_PRODUCERS.get() {
-                parts.push(format!("--producers {}", producers));
+                parts.push(format!("--producers {producers}"));
             }
             if consumers != DEFAULT_NUMBER_OF_CONSUMERS.get() {
-                parts.push(format!("--consumers {}", consumers));
+                parts.push(format!("--consumers {consumers}"));
             }
         }
         BenchmarkKind::EndToEndProducingConsumerGroup => {
             if producers != DEFAULT_NUMBER_OF_PRODUCERS.get() {
-                parts.push(format!("--producers {}", producers));
+                parts.push(format!("--producers {producers}"));
             }
             if consumers != DEFAULT_NUMBER_OF_CONSUMERS.get() {
-                parts.push(format!("--consumers {}", consumers));
+                parts.push(format!("--consumers {consumers}"));
             }
             if number_of_consumer_groups != 
DEFAULT_NUMBER_OF_CONSUMER_GROUPS.get() {
-                parts.push(format!("--consumer-groups {}", 
number_of_consumer_groups));
+                parts.push(format!("--consumer-groups 
{number_of_consumer_groups}"));
             }
         }
     }
+}
 
+fn add_infrastructure_arguments(parts: &mut Vec<String>, args: &IggyBenchArgs) 
{
     let streams = args.streams();
     let default_streams = match args.benchmark_kind.as_simple_kind() {
         BenchmarkKind::BalancedProducerAndConsumerGroup
@@ -319,7 +343,7 @@ fn recreate_bench_command(args: &IggyBenchArgs) -> String {
         _ => DEFAULT_PINNED_NUMBER_OF_STREAMS.get(),
     };
     if streams != default_streams {
-        parts.push(format!("--streams {}", streams));
+        parts.push(format!("--streams {streams}"));
     }
 
     let partitions = args.number_of_partitions();
@@ -330,24 +354,25 @@ fn recreate_bench_command(args: &IggyBenchArgs) -> String 
{
         _ => DEFAULT_PINNED_NUMBER_OF_PARTITIONS.get(),
     };
     if partitions != default_partitions {
-        parts.push(format!("--partitions {}", partitions));
+        parts.push(format!("--partitions {partitions}"));
     }
 
     let consumer_groups = args.number_of_consumer_groups();
-    if args.benchmark_kind.as_simple_kind() == 
BenchmarkKind::BalancedConsumerGroup
-        || args.benchmark_kind.as_simple_kind() == 
BenchmarkKind::BalancedProducerAndConsumerGroup
-            && consumer_groups != DEFAULT_NUMBER_OF_CONSUMER_GROUPS.get()
+    if (args.benchmark_kind.as_simple_kind() == 
BenchmarkKind::BalancedConsumerGroup
+        || args.benchmark_kind.as_simple_kind() == 
BenchmarkKind::BalancedProducerAndConsumerGroup)
+        && consumer_groups != DEFAULT_NUMBER_OF_CONSUMER_GROUPS.get()
     {
-        parts.push(format!("--consumer-groups {}", consumer_groups));
+        parts.push(format!("--consumer-groups {consumer_groups}"));
     }
 
     if let Some(max_topic_size) = args.max_topic_size() {
-        parts.push(format!("--max-topic-size \'{}\'", max_topic_size));
+        parts.push(format!("--max-topic-size \'{max_topic_size}\'"));
     }
 
     let transport = args.transport().to_string().to_lowercase();
     parts.push(transport.clone());
 
+    let server_address = args.server_address();
     let default_address = match transport.as_str() {
         "tcp" => DEFAULT_TCP_SERVER_ADDRESS,
         "quic" => DEFAULT_QUIC_SERVER_ADDRESS,
@@ -356,16 +381,15 @@ fn recreate_bench_command(args: &IggyBenchArgs) -> String 
{
     };
 
     if server_address != default_address {
-        parts.push(format!("--server-address {}", server_address));
+        parts.push(format!("--server-address {server_address}"));
     }
+}
 
+fn add_output_arguments(parts: &mut Vec<String>, args: &IggyBenchArgs) {
     parts.push("output".to_string());
-
     parts.push("-o performance_results".to_string());
 
-    let remark = args.remark();
-    if let Some(remark) = remark {
-        parts.push(format!("--remark \'{}\'", remark));
+    if let Some(remark) = args.remark() {
+        parts.push(format!("--remark \'{remark}\'"));
     }
-    parts.join(" ")
 }
diff --git a/core/bench/src/utils/rate_limiter.rs 
b/core/bench/src/utils/rate_limiter.rs
index f365c958..fdc86193 100644
--- a/core/bench/src/utils/rate_limiter.rs
+++ b/core/bench/src/utils/rate_limiter.rs
@@ -29,7 +29,9 @@ pub struct BenchmarkRateLimiter {
 
 impl BenchmarkRateLimiter {
     pub fn new(bytes_per_second: IggyByteSize) -> Self {
-        let bytes_per_second = NonZeroU32::new(bytes_per_second.as_bytes_u64() 
as u32).unwrap();
+        let bytes_per_second =
+            
NonZeroU32::new(u32::try_from(bytes_per_second.as_bytes_u64()).unwrap_or(u32::MAX))
+                .unwrap();
         let rate_limiter = 
GovernorRateLimiter::direct(Quota::per_second(bytes_per_second));
 
         // Fill the bucket to avoid burst
@@ -40,7 +42,7 @@ impl BenchmarkRateLimiter {
 
     pub async fn wait_until_necessary(&self, bytes: u64) {
         self.rate_limiter
-            .until_n_ready(NonZeroU32::new(bytes as u32).unwrap())
+            
.until_n_ready(NonZeroU32::new(u32::try_from(bytes).unwrap_or(u32::MAX)).unwrap())
             .await
             .unwrap();
     }
diff --git a/core/bench/src/utils/server_starter.rs 
b/core/bench/src/utils/server_starter.rs
index c3d0df52..17dc8fc5 100644
--- a/core/bench/src/utils/server_starter.rs
+++ b/core/bench/src/utils/server_starter.rs
@@ -36,22 +36,72 @@ struct ConfigAddress {
     address: String,
 }
 
-pub async fn start_server_if_needed(args: &mut IggyBenchArgs) -> 
Option<TestServer> {
+pub async fn start_server_if_needed(args: &IggyBenchArgs) -> 
Option<TestServer> {
     if args.skip_server_start {
         info!("Skipping iggy-server start");
         return None;
     }
 
+    let (should_start, mut envs) = evaluate_server_start_condition(args).await;
+
+    if should_start {
+        envs.insert(SYSTEM_PATH_ENV_VAR.to_owned(), "local_data".to_owned());
+
+        if args.verbose {
+            envs.insert("IGGY_TEST_VERBOSE".to_owned(), "true".to_owned());
+            info!("Enabling verbose output - iggy-server will logs print to 
stdout");
+        } else {
+            info!("Disabling verbose output - iggy-server will print logs to 
files");
+        }
+
+        info!(
+            "Starting test server, transport: {}, cleanup: {}, verbosity: {}",
+            args.transport(),
+            args.cleanup,
+            args.verbose
+        );
+        let mut test_server = TestServer::new(
+            Some(envs),
+            args.cleanup,
+            args.server_executable_path.clone(),
+            IpAddrKind::V4,
+        );
+        let now = Instant::now();
+        test_server.start();
+        let elapsed = now.elapsed();
+        if elapsed.as_millis() > 1000 {
+            warn!(
+                "Test iggy-server started, pid: {}, startup took {} ms because 
it had to load messages from disk to cache",
+                test_server.pid(),
+                elapsed.as_millis()
+            );
+        } else {
+            info!(
+                "Test iggy-server started, pid: {}, startup time: {} ms",
+                test_server.pid(),
+                elapsed.as_millis()
+            );
+        }
+
+        Some(test_server)
+    } else {
+        info!("Skipping iggy-server start");
+        None
+    }
+}
+
+async fn evaluate_server_start_condition(args: &IggyBenchArgs) -> (bool, 
HashMap<String, String>) {
     let default_config: ServerConfig =
         toml::from_str(include_str!("../../../configs/server.toml")).unwrap();
-    let (should_start, mut envs) = match &args.transport() {
+
+    match &args.transport() {
         Transport::Http => {
             let args_http_address = 
args.server_address().parse::<SocketAddr>().unwrap();
             let config_http_address = 
default_config.http.address.parse::<SocketAddr>().unwrap();
             let envs = HashMap::from([
                 (
                     "IGGY_HTTP_ADDRESS".to_owned(),
-                    default_config.http.address.to_owned(),
+                    default_config.http.address.clone(),
                 ),
                 ("IGGY_TCP_ENABLED".to_owned(), "false".to_owned()),
                 ("IGGY_QUIC_ENABLED".to_owned(), "false".to_owned()),
@@ -69,7 +119,7 @@ pub async fn start_server_if_needed(args: &mut 
IggyBenchArgs) -> Option<TestServ
             let envs = HashMap::from([
                 (
                     "IGGY_TCP_ADDRESS".to_owned(),
-                    default_config.tcp.address.to_owned(),
+                    default_config.tcp.address.clone(),
                 ),
                 ("IGGY_HTTP_ENABLED".to_owned(), "false".to_owned()),
                 ("IGGY_QUIC_ENABLED".to_owned(), "false".to_owned()),
@@ -86,7 +136,7 @@ pub async fn start_server_if_needed(args: &mut 
IggyBenchArgs) -> Option<TestServ
             let envs = HashMap::from([
                 (
                     "IGGY_QUIC_ADDRESS".to_owned(),
-                    default_config.quic.address.to_owned(),
+                    default_config.quic.address.clone(),
                 ),
                 ("IGGY_HTTP_ENABLED".to_owned(), "false".to_owned()),
                 ("IGGY_TCP_ENABLED".to_owned(), "false".to_owned()),
@@ -98,51 +148,6 @@ pub async fn start_server_if_needed(args: &mut 
IggyBenchArgs) -> Option<TestServ
                 envs,
             )
         }
-    };
-
-    if should_start {
-        envs.insert(SYSTEM_PATH_ENV_VAR.to_owned(), "local_data".to_owned());
-
-        if args.verbose {
-            envs.insert("IGGY_TEST_VERBOSE".to_owned(), "true".to_owned());
-            info!("Enabling verbose output - iggy-server will logs print to 
stdout")
-        } else {
-            info!("Disabling verbose output - iggy-server will print logs to 
files")
-        }
-
-        info!(
-            "Starting test server, transport: {}, cleanup: {}, verbosity: {}",
-            args.transport(),
-            args.cleanup,
-            args.verbose
-        );
-        let mut test_server = TestServer::new(
-            Some(envs),
-            args.cleanup,
-            args.server_executable_path.clone(),
-            IpAddrKind::V4,
-        );
-        let now = Instant::now();
-        test_server.start();
-        let elapsed = now.elapsed();
-        if elapsed.as_millis() > 1000 {
-            warn!(
-                "Test iggy-server started, pid: {}, startup took {} ms because 
it had to load messages from disk to cache",
-                test_server.pid(),
-                elapsed.as_millis()
-            );
-        } else {
-            info!(
-                "Test iggy-server started, pid: {}, startup time: {} ms",
-                test_server.pid(),
-                elapsed.as_millis()
-            )
-        }
-
-        Some(test_server)
-    } else {
-        info!("Skipping iggy-server start");
-        None
     }
 }
 


Reply via email to