This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/io_uring_tpc by this push:
     new 1fca8609 fix(io_uring): fix lints (#2240)
1fca8609 is described below

commit 1fca8609f762e7e3cfcca5cdfdc60fdaa4318f96
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Oct 6 16:53:00 2025 +0200

    fix(io_uring): fix lints (#2240)
    
    Co-authored-by: numminex <[email protected]>
---
 core/bench/src/actors/consumer/client/low_level.rs |   7 +-
 .../benchmark_producing_consumer.rs                |  96 +++---
 .../tests/cli/stream/test_stream_list_command.rs   |  12 +-
 core/integration/tests/server/scenarios/mod.rs     |   1 -
 .../tests/server/scenarios/system_scenario.rs      |   6 +-
 .../tests/streaming/common/test_setup.rs           | 105 +------
 core/integration/tests/streaming/get_by_offset.rs  |  12 +-
 core/integration/tests/streaming/mod.rs            |  32 +-
 .../delete_consumer_offset_handler.rs              |  10 +-
 .../store_consumer_offset_handler.rs               |  11 +-
 .../binary/handlers/streams/get_stream_handler.rs  |   2 +-
 .../binary/handlers/topics/create_topic_handler.rs |   4 +-
 .../binary/handlers/topics/get_topics_handler.rs   |   2 +-
 core/server/src/binary/mapper.rs                   |   2 +-
 core/server/src/bootstrap.rs                       |   4 +-
 core/server/src/http/http_server.rs                |   3 +-
 core/server/src/http/mapper.rs                     |  10 +-
 core/server/src/http/streams.rs                    |   2 +-
 core/server/src/lib.rs                             |   9 -
 core/server/src/main.rs                            |   4 +-
 core/server/src/shard/builder.rs                   |   5 +-
 core/server/src/shard/mod.rs                       |  43 +--
 core/server/src/shard/system/messages.rs           |   4 +-
 core/server/src/shard/system/partitions.rs         |   4 +-
 .../src/shard/system/personal_access_tokens.rs     |   2 +-
 core/server/src/shard/system/streams.rs            |   3 +-
 core/server/src/shard/system/topics.rs             |  36 +--
 core/server/src/shard/system/users.rs              |  10 +-
 core/server/src/shard/system/utils.rs              |   4 +-
 core/server/src/shard/transmission/event.rs        |  14 -
 core/server/src/slab/consumer_groups.rs            |   4 +
 core/server/src/slab/helpers.rs                    |  12 -
 core/server/src/slab/partitions.rs                 |   6 +-
 core/server/src/slab/streams.rs                    |  70 ++---
 core/server/src/slab/topics.rs                     |   6 +-
 core/server/src/state/file.rs                      |   2 +-
 core/server/src/state/system.rs                    |  23 +-
 .../deduplication/message_deduplicator.rs          |   5 -
 .../src/streaming/partitions/consumer_offset.rs    |   4 +-
 core/server/src/streaming/partitions/helpers.rs    |   2 -
 core/server/src/streaming/partitions/journal.rs    |   2 +-
 core/server/src/streaming/partitions/log.rs        |   8 +-
 core/server/src/streaming/partitions/partition2.rs |   3 +-
 core/server/src/streaming/partitions/storage2.rs   |  11 +-
 core/server/src/streaming/persistence/mod.rs       |   1 -
 core/server/src/streaming/persistence/task.rs      | 135 ---------
 .../src/streaming/segments/indexes/index_reader.rs |   2 +-
 core/server/src/streaming/stats/mod.rs             | 332 ++++++++++++++++++++-
 core/server/src/streaming/stats/stats.rs           | 331 --------------------
 core/server/src/streaming/streams/stream2.rs       |   2 +-
 .../server/src/streaming/topics/consumer_group2.rs |   4 +-
 core/server/src/streaming/topics/helpers.rs        |  38 +--
 core/server/src/streaming/topics/storage2.rs       |   5 +-
 core/server/src/streaming/topics/topic2.rs         |   3 +-
 core/server/src/streaming/utils/memory_pool.rs     |   6 +-
 core/server/src/tcp/connection_handler.rs          |   2 +-
 core/server/src/tcp/tcp_listener.rs                |   3 +-
 core/server/src/tcp/tcp_server.rs                  |   1 -
 core/server/src/tcp/tcp_tls_listener.rs            |   1 -
 59 files changed, 519 insertions(+), 964 deletions(-)

diff --git a/core/bench/src/actors/consumer/client/low_level.rs 
b/core/bench/src/actors/consumer/client/low_level.rs
index 4c80c4ce..27feb3a4 100644
--- a/core/bench/src/actors/consumer/client/low_level.rs
+++ b/core/bench/src/actors/consumer/client/low_level.rs
@@ -101,11 +101,8 @@ impl ConsumerClient for LowLevelConsumerClient {
         let total_bytes = batch_total_size_bytes(&polled);
 
         self.offset += messages_count;
-        match self.polling_strategy.kind {
-            PollingKind::Offset => {
-                self.polling_strategy.value += messages_count;
-            }
-            _ => {}
+        if self.polling_strategy.kind == PollingKind::Offset {
+            self.polling_strategy.value += messages_count;
         }
 
         Ok(Some(BatchMetrics {
diff --git 
a/core/bench/src/actors/producing_consumer/benchmark_producing_consumer.rs 
b/core/bench/src/actors/producing_consumer/benchmark_producing_consumer.rs
index 0f5d323a..a676d449 100644
--- a/core/bench/src/actors/producing_consumer/benchmark_producing_consumer.rs
+++ b/core/bench/src/actors/producing_consumer/benchmark_producing_consumer.rs
@@ -146,63 +146,63 @@ where
             if is_producer
                 && !self.send_finish_condition.is_done()
                 && (!require_reply || !awaiting_reply)
+                && let Some(batch) = self.producer.produce_batch(&mut 
batch_generator).await?
             {
-                if let Some(batch) = self.producer.produce_batch(&mut 
batch_generator).await? {
-                    rl_value += batch.user_data_bytes;
-                    sent_user_bytes += batch.user_data_bytes;
-                    sent_total_bytes += batch.total_bytes;
-                    sent_messages += u64::from(batch.messages);
-                    sent_batches += 1;
-                    awaiting_reply = is_consumer;
+                rl_value += batch.user_data_bytes;
+                sent_user_bytes += batch.user_data_bytes;
+                sent_total_bytes += batch.total_bytes;
+                sent_messages += u64::from(batch.messages);
+                sent_batches += 1;
+                awaiting_reply = is_consumer;
 
-                    if self
-                        .send_finish_condition
-                        .account_and_check(batch.user_data_bytes)
-                    {
-                        info!(
-                            "ProducingConsumer #{actor_id} → finished sending 
{sent_messages} messages in {sent_batches} batches ({sent_user_bytes} bytes of 
user data, {sent_total_bytes} bytes of total data), send finish condition: 
{send_status}, poll finish condition: {poll_status}",
-                            actor_id = self.producer_config.producer_id,
-                            sent_messages = sent_messages.human_count_bare(),
-                            sent_batches = sent_batches.human_count_bare(),
-                            sent_user_bytes = 
sent_user_bytes.human_count_bytes(),
-                            sent_total_bytes = 
sent_total_bytes.human_count_bytes(),
-                            send_status = self.send_finish_condition.status(),
-                            poll_status = self.poll_finish_condition.status()
-                        );
-                    }
+                if self
+                    .send_finish_condition
+                    .account_and_check(batch.user_data_bytes)
+                {
+                    info!(
+                        "ProducingConsumer #{actor_id} → finished sending 
{sent_messages} messages in {sent_batches} batches ({sent_user_bytes} bytes of 
user data, {sent_total_bytes} bytes of total data), send finish condition: 
{send_status}, poll finish condition: {poll_status}",
+                        actor_id = self.producer_config.producer_id,
+                        sent_messages = sent_messages.human_count_bare(),
+                        sent_batches = sent_batches.human_count_bare(),
+                        sent_user_bytes = sent_user_bytes.human_count_bytes(),
+                        sent_total_bytes = 
sent_total_bytes.human_count_bytes(),
+                        send_status = self.send_finish_condition.status(),
+                        poll_status = self.poll_finish_condition.status()
+                    );
                 }
             }
 
-            if is_consumer && !self.poll_finish_condition.is_done() {
-                if let Some(batch) = self.consumer.consume_batch().await? {
-                    rl_value += batch.user_data_bytes;
-                    recv_user_bytes += batch.user_data_bytes;
-                    recv_total_bytes += batch.total_bytes;
-                    recv_messages += u64::from(batch.messages);
-                    recv_batches += 1;
+            if is_consumer
+                && !self.poll_finish_condition.is_done()
+                && let Some(batch) = self.consumer.consume_batch().await?
+            {
+                rl_value += batch.user_data_bytes;
+                recv_user_bytes += batch.user_data_bytes;
+                recv_total_bytes += batch.total_bytes;
+                recv_messages += u64::from(batch.messages);
+                recv_batches += 1;
 
-                    let elapsed = 
u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
-                    let latency = 
u64::try_from(batch.latency.as_micros()).unwrap_or(u64::MAX);
+                let elapsed = 
u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
+                let latency = 
u64::try_from(batch.latency.as_micros()).unwrap_or(u64::MAX);
 
-                    records.push(BenchmarkRecord {
-                        elapsed_time_us: elapsed,
-                        latency_us: latency,
-                        messages: sent_messages + recv_messages,
-                        message_batches: sent_batches + recv_batches,
-                        user_data_bytes: sent_user_bytes + recv_user_bytes,
-                        total_bytes: sent_total_bytes + recv_total_bytes,
-                    });
+                records.push(BenchmarkRecord {
+                    elapsed_time_us: elapsed,
+                    latency_us: latency,
+                    messages: sent_messages + recv_messages,
+                    message_batches: sent_batches + recv_batches,
+                    user_data_bytes: sent_user_bytes + recv_user_bytes,
+                    total_bytes: sent_total_bytes + recv_total_bytes,
+                });
 
-                    if let Some(limiter) = &rate_limiter {
-                        limiter.wait_until_necessary(rl_value).await;
-                        rl_value = 0;
-                    }
+                if let Some(limiter) = &rate_limiter {
+                    limiter.wait_until_necessary(rl_value).await;
+                    rl_value = 0;
+                }
 
-                    self.poll_finish_condition
-                        .account_and_check(batch.user_data_bytes);
-                    if require_reply {
-                        awaiting_reply = false;
-                    }
+                self.poll_finish_condition
+                    .account_and_check(batch.user_data_bytes);
+                if require_reply {
+                    awaiting_reply = false;
                 }
             }
         }
diff --git a/core/integration/tests/cli/stream/test_stream_list_command.rs 
b/core/integration/tests/cli/stream/test_stream_list_command.rs
index 5f879fc0..610e0384 100644
--- a/core/integration/tests/cli/stream/test_stream_list_command.rs
+++ b/core/integration/tests/cli/stream/test_stream_list_command.rs
@@ -27,18 +27,13 @@ use predicates::str::{contains, starts_with};
 use serial_test::parallel;
 
 struct TestStreamListCmd {
-    stream_id: u32,
     name: String,
     output: OutputFormat,
 }
 
 impl TestStreamListCmd {
-    fn new(stream_id: u32, name: String, output: OutputFormat) -> Self {
-        Self {
-            stream_id,
-            name,
-            output,
-        }
+    fn new(name: String, output: OutputFormat) -> Self {
+        Self { name, output }
     }
 
     fn to_args(&self) -> Vec<&str> {
@@ -82,21 +77,18 @@ pub async fn should_be_successful() {
     iggy_cmd_test.setup().await;
     iggy_cmd_test
         .execute_test(TestStreamListCmd::new(
-            1,
             String::from("prod"),
             OutputFormat::Default,
         ))
         .await;
     iggy_cmd_test
         .execute_test(TestStreamListCmd::new(
-            2,
             String::from("testing"),
             OutputFormat::List,
         ))
         .await;
     iggy_cmd_test
         .execute_test(TestStreamListCmd::new(
-            3,
             String::from("misc"),
             OutputFormat::Table,
         ))
diff --git a/core/integration/tests/server/scenarios/mod.rs 
b/core/integration/tests/server/scenarios/mod.rs
index c63cc212..0e2aa594 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -41,7 +41,6 @@ const CONSUMER_GROUP_NAME: &str = "test-consumer-group";
 const USERNAME_1: &str = "user1";
 const USERNAME_2: &str = "user2";
 const USERNAME_3: &str = "user3";
-const CONSUMER_ID: u32 = 1;
 const CONSUMER_KIND: ConsumerKind = ConsumerKind::Consumer;
 const MESSAGES_COUNT: u32 = 1337;
 
diff --git a/core/integration/tests/server/scenarios/system_scenario.rs 
b/core/integration/tests/server/scenarios/system_scenario.rs
index 02072cf5..61163cde 100644
--- a/core/integration/tests/server/scenarios/system_scenario.rs
+++ b/core/integration/tests/server/scenarios/system_scenario.rs
@@ -140,14 +140,12 @@ pub async fn run(client_factory: &dyn ClientFactory) {
     assert_eq!(topic.partitions.len(), PARTITIONS_COUNT as usize);
     assert_eq!(topic.size, 0);
     assert_eq!(topic.messages_count, 0);
-    let mut id = 0;
-    for topic_partition in topic.partitions {
-        assert_eq!(topic_partition.id, id);
+    for (id, topic_partition) in topic.partitions.iter().enumerate() {
+        assert_eq!(topic_partition.id, id as u32);
         assert_eq!(topic_partition.segments_count, 1);
         assert_eq!(topic_partition.size, 0);
         assert_eq!(topic_partition.current_offset, 0);
         assert_eq!(topic_partition.messages_count, 0);
-        id += 1;
     }
 
     // 12. Get topic details by name
diff --git a/core/integration/tests/streaming/common/test_setup.rs 
b/core/integration/tests/streaming/common/test_setup.rs
index d22b16ea..cc461f89 100644
--- a/core/integration/tests/streaming/common/test_setup.rs
+++ b/core/integration/tests/streaming/common/test_setup.rs
@@ -19,16 +19,12 @@
 use compio::fs;
 use server::bootstrap::create_directories;
 use server::configs::system::SystemConfig;
-use server::streaming::persistence::persister::{FileWithSyncPersister, 
PersisterKind};
-use server::streaming::storage::SystemStorage;
 use server::streaming::utils::MemoryPool;
-use std::rc::Rc;
 use std::sync::Arc;
 use uuid::Uuid;
 
 pub struct TestSetup {
     pub config: Arc<SystemConfig>,
-    pub storage: Rc<SystemStorage>,
 }
 
 impl TestSetup {
@@ -44,107 +40,8 @@ impl TestSetup {
         let config = Arc::new(config);
         fs::create_dir(config.get_system_path()).await.unwrap();
         create_directories(&config).await.unwrap();
-        let persister = PersisterKind::FileWithSync(FileWithSyncPersister {});
-        let storage = Rc::new(SystemStorage::new(config.clone(), 
Arc::new(persister)));
         MemoryPool::init_pool(config.clone());
-        TestSetup { config, storage }
-    }
-
-    pub async fn create_streams_directory(&self) {
-        if fs::metadata(&self.config.get_streams_path()).await.is_err() {
-            fs::create_dir(&self.config.get_streams_path())
-                .await
-                .unwrap();
-        }
-    }
-
-    pub async fn create_stream_directory(&self, stream_id: u32) {
-        self.create_streams_directory().await;
-        if fs::metadata(&self.config.get_stream_path(stream_id as usize))
-            .await
-            .is_err()
-        {
-            fs::create_dir(&self.config.get_stream_path(stream_id as usize))
-                .await
-                .unwrap();
-        }
-    }
-
-    pub async fn create_topics_directory(&self, stream_id: u32) {
-        self.create_stream_directory(stream_id).await;
-        if fs::metadata(&self.config.get_topics_path(stream_id as usize))
-            .await
-            .is_err()
-        {
-            fs::create_dir(&self.config.get_topics_path(stream_id as usize))
-                .await
-                .unwrap();
-        }
-    }
-
-    pub async fn create_topic_directory(&self, stream_id: u32, topic_id: u32) {
-        self.create_topics_directory(stream_id).await;
-        if fs::metadata(
-            &self
-                .config
-                .get_topic_path(stream_id as usize, topic_id as usize),
-        )
-        .await
-        .is_err()
-        {
-            fs::create_dir(
-                &self
-                    .config
-                    .get_topic_path(stream_id as usize, topic_id as usize),
-            )
-            .await
-            .unwrap();
-        }
-    }
-
-    pub async fn create_partitions_directory(&self, stream_id: u32, topic_id: 
u32) {
-        self.create_topic_directory(stream_id, topic_id).await;
-        if fs::metadata(
-            &self
-                .config
-                .get_partitions_path(stream_id as usize, topic_id as usize),
-        )
-        .await
-        .is_err()
-        {
-            fs::create_dir(
-                &self
-                    .config
-                    .get_partitions_path(stream_id as usize, topic_id as 
usize),
-            )
-            .await
-            .unwrap();
-        }
-    }
-
-    pub async fn create_partition_directory(
-        &self,
-        stream_id: u32,
-        topic_id: u32,
-        partition_id: u32,
-    ) {
-        self.create_partitions_directory(stream_id, topic_id).await;
-        if fs::metadata(&self.config.get_partition_path(
-            stream_id as usize,
-            topic_id as usize,
-            partition_id as usize,
-        ))
-        .await
-        .is_err()
-        {
-            fs::create_dir(&self.config.get_partition_path(
-                stream_id as usize,
-                topic_id as usize,
-                partition_id as usize,
-            ))
-            .await
-            .unwrap();
-        }
+        TestSetup { config }
     }
 }
 
diff --git a/core/integration/tests/streaming/get_by_offset.rs 
b/core/integration/tests/streaming/get_by_offset.rs
index 7fd903bb..35810ee3 100644
--- a/core/integration/tests/streaming/get_by_offset.rs
+++ b/core/integration/tests/streaming/get_by_offset.rs
@@ -221,7 +221,7 @@ async fn test_get_messages_by_offset(
     // Test 1: All messages from start
     let args = PollingArgs::new(PollingStrategy::offset(0), 
total_sent_messages, false);
     let (_, all_loaded_messages) = streams
-        .poll_messages(&namespace, consumer.clone(), args)
+        .poll_messages(&namespace, consumer, args)
         .await
         .unwrap();
     assert_eq!(
@@ -244,7 +244,7 @@ async fn test_get_messages_by_offset(
             false,
         );
         let (_, middle_messages) = streams
-            .poll_messages(&namespace, consumer.clone(), args)
+            .poll_messages(&namespace, consumer, args)
             .await
             .unwrap();
 
@@ -262,7 +262,7 @@ async fn test_get_messages_by_offset(
         let final_offset = *batch_offsets.last().unwrap();
         let args = PollingArgs::new(PollingStrategy::offset(final_offset + 1), 
1, false);
         let (_, no_messages) = streams
-            .poll_messages(&namespace, consumer.clone(), args)
+            .poll_messages(&namespace, consumer, args)
             .await
             .unwrap();
         assert_eq!(
@@ -277,7 +277,7 @@ async fn test_get_messages_by_offset(
     let subset_size = std::cmp::min(3, total_sent_messages);
     let args = PollingArgs::new(PollingStrategy::offset(0), subset_size, 
false);
     let (_, subset_messages) = streams
-        .poll_messages(&namespace, consumer.clone(), args)
+        .poll_messages(&namespace, consumer, args)
         .await
         .unwrap();
     assert_eq!(
@@ -294,7 +294,7 @@ async fn test_get_messages_by_offset(
         let span_size = 8; // Should span across 2nd, 3rd, and into 4th batch
         let args = PollingArgs::new(PollingStrategy::offset(span_offset), 
span_size, false);
         let (_, batches) = streams
-            .poll_messages(&namespace, consumer.clone(), args)
+            .poll_messages(&namespace, consumer, args)
             .await
             .unwrap();
         assert_eq!(
@@ -372,7 +372,7 @@ async fn test_get_messages_by_offset(
             false,
         );
         let (_, chunk) = streams
-            .poll_messages(&namespace, consumer.clone(), args)
+            .poll_messages(&namespace, consumer, args)
             .await
             .unwrap();
 
diff --git a/core/integration/tests/streaming/mod.rs 
b/core/integration/tests/streaming/mod.rs
index c759c165..d6f55a84 100644
--- a/core/integration/tests/streaming/mod.rs
+++ b/core/integration/tests/streaming/mod.rs
@@ -16,8 +16,6 @@
  * under the License.
  */
 
-use bytes::Bytes;
-use iggy::prelude::IggyMessage;
 use iggy_common::{CompressionAlgorithm, Identifier, IggyError, IggyExpiry, 
MaxTopicSize};
 use server::{
     configs::system::SystemConfig,
@@ -59,7 +57,7 @@ async fn bootstrap_test_environment(
     let streams = Streams::default();
     // Create stream together with its dirs
     let stream = stream2::create_and_insert_stream_mem(&streams, stream_name);
-    create_stream_file_hierarchy(shard_id, stream.id(), &config).await?;
+    create_stream_file_hierarchy(shard_id, stream.id(), config).await?;
     // Create topic together with its dirs
     let stream_id = Identifier::numeric(stream.id() as u32).unwrap();
     let parent_stats = streams.with_stream_by_id(&stream_id, |(_, stats)| 
stats.clone());
@@ -76,7 +74,7 @@ async fn bootstrap_test_environment(
         max_topic_size,
         parent_stats,
     );
-    create_topic_file_hierarchy(shard_id, stream.id(), topic.id(), 
&config).await?;
+    create_topic_file_hierarchy(shard_id, stream.id(), topic.id(), 
config).await?;
     // Create partition together with its dirs
     let topic_id = Identifier::numeric(topic.id() as u32).unwrap();
     let parent_stats = streams.with_topic_by_id(
@@ -93,7 +91,7 @@ async fn bootstrap_test_environment(
         config,
     );
     for partition in partitions {
-        create_partition_file_hierarchy(shard_id, stream.id(), topic.id(), 
partition.id(), &config)
+        create_partition_file_hierarchy(shard_id, stream.id(), topic.id(), 
partition.id(), config)
             .await?;
 
         // Open the log
@@ -106,7 +104,7 @@ async fn bootstrap_test_environment(
         let messages_size = 0;
         let indexes_size = 0;
         let storage = create_segment_storage(
-            &config,
+            config,
             stream.id(),
             topic.id(),
             partition.id(),
@@ -131,24 +129,4 @@ async fn bootstrap_test_environment(
         partition_id: 0,
         task_registry,
     })
-}
-
-fn create_messages() -> Vec<IggyMessage> {
-    vec![
-        create_message(1, "message 1"),
-        create_message(2, "message 2"),
-        create_message(3, "message 3"),
-        create_message(4, "message 3.2"),
-        create_message(5, "message 1.2"),
-        create_message(6, "message 3.3"),
-    ]
-}
-
-fn create_message(id: u128, payload: &str) -> IggyMessage {
-    let payload = Bytes::from(payload.to_string());
-    IggyMessage::builder()
-        .id(id)
-        .payload(payload)
-        .build()
-        .expect("Failed to create message with valid payload and headers")
-}
+}
\ No newline at end of file
diff --git 
a/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs
 
b/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs
index 9458861d..96c1e43c 100644
--- 
a/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs
@@ -21,7 +21,6 @@ use crate::binary::handlers::consumer_offsets::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::sender::SenderKind;
 use crate::shard::IggyShard;
-use crate::shard::transmission::event::ShardEvent;
 use crate::streaming::session::Session;
 use anyhow::Result;
 use error_set::ErrContext;
@@ -43,7 +42,7 @@ impl ServerCommandHandler for DeleteConsumerOffset {
         shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
-        let (polling_consumer, partition_id) = shard
+        shard
             .delete_consumer_offset(
                 session,
                 self.consumer,
@@ -55,13 +54,6 @@ impl ServerCommandHandler for DeleteConsumerOffset {
             .with_error_context(|error| format!("{COMPONENT} (error: {error}) 
- failed to delete consumer offset for topic with ID: {} in stream with ID: {} 
partition ID: {:#?}, session: {}",
                 self.topic_id, self.stream_id, self.partition_id, session
             ))?;
-        // TODO: Get rid of this event.
-        let event = ShardEvent::DeletedOffset {
-            stream_id: self.stream_id,
-            topic_id: self.topic_id,
-            partition_id,
-            polling_consumer,
-        };
         sender.send_empty_ok_response().await?;
         Ok(())
     }
diff --git 
a/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs
 
b/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs
index 85eb95cd..db770e4a 100644
--- 
a/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs
@@ -23,7 +23,6 @@ use crate::binary::handlers::consumer_offsets::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::sender::SenderKind;
 use crate::shard::IggyShard;
-use crate::shard::transmission::event::ShardEvent;
 use crate::streaming::session::Session;
 use anyhow::Result;
 use error_set::ErrContext;
@@ -44,7 +43,7 @@ impl ServerCommandHandler for StoreConsumerOffset {
         shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
-        let (polling_consumer, partition_id) = shard
+        shard
             .store_consumer_offset(
                 session,
                 self.consumer,
@@ -57,14 +56,6 @@ impl ServerCommandHandler for StoreConsumerOffset {
             .with_error_context(|error| format!("{COMPONENT} (error: {error}) 
- failed to store consumer offset for stream_id: {}, topic_id: {}, 
partition_id: {:?}, offset: {}, session: {}",
                 self.stream_id, self.topic_id, self.partition_id, self.offset, 
session
             ))?;
-        // TODO: Get rid of this event.
-        let event = ShardEvent::StoredOffset {
-            stream_id: self.stream_id,
-            topic_id: self.topic_id,
-            partition_id,
-            polling_consumer,
-            offset: self.offset,
-        };
         sender.send_empty_ok_response().await?;
         Ok(())
     }
diff --git a/core/server/src/binary/handlers/streams/get_stream_handler.rs 
b/core/server/src/binary/handlers/streams/get_stream_handler.rs
index dfd4cc4a..195cd0b2 100644
--- a/core/server/src/binary/handlers/streams/get_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/get_stream_handler.rs
@@ -59,7 +59,7 @@ impl ServerCommandHandler for GetStream {
                     self.stream_id,
                     session.get_user_id(),
                 )
-            });
+            })?;
         let response = shard
             .streams2
             .with_components_by_id(stream_id, |(root, stats)| 
mapper::map_stream(&root, &stats));
diff --git a/core/server/src/binary/handlers/topics/create_topic_handler.rs 
b/core/server/src/binary/handlers/topics/create_topic_handler.rs
index 2f1ee6cd..35eba8d9 100644
--- a/core/server/src/binary/handlers/topics/create_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/create_topic_handler.rs
@@ -71,7 +71,7 @@ impl ServerCommandHandler for CreateTopic {
             stream_id: self.stream_id.clone(),
             topic,
         };
-        let responses = shard.broadcast_event_to_all_shards(event).await;
+        let _responses = shard.broadcast_event_to_all_shards(event).await;
         let partitions = shard
             .create_partitions2(
                 session,
@@ -85,7 +85,7 @@ impl ServerCommandHandler for CreateTopic {
             topic_id: Identifier::numeric(topic_id as u32).unwrap(),
             partitions,
         };
-        let responses = shard.broadcast_event_to_all_shards(event).await;
+        let _responses = shard.broadcast_event_to_all_shards(event).await;
         let response = shard.streams2.with_topic_by_id(
             &self.stream_id,
             &Identifier::numeric(topic_id as u32).unwrap(),
diff --git a/core/server/src/binary/handlers/topics/get_topics_handler.rs 
b/core/server/src/binary/handlers/topics/get_topics_handler.rs
index 0c78e90e..52f23a7a 100644
--- a/core/server/src/binary/handlers/topics/get_topics_handler.rs
+++ b/core/server/src/binary/handlers/topics/get_topics_handler.rs
@@ -51,7 +51,7 @@ impl ServerCommandHandler for GetTopics {
         shard
             .permissioner
             .borrow()
-            .get_topics(session.get_user_id(), numeric_stream_id as u32);
+            .get_topics(session.get_user_id(), numeric_stream_id as u32)?;
 
         let response = shard.streams2.with_topics(&self.stream_id, |topics| {
             topics.with_components(|topics| {
diff --git a/core/server/src/binary/mapper.rs b/core/server/src/binary/mapper.rs
index ca43991d..6856dea2 100644
--- a/core/server/src/binary/mapper.rs
+++ b/core/server/src/binary/mapper.rs
@@ -23,7 +23,7 @@ use crate::slab::traits_ext::{EntityComponentSystem, 
IntoComponents};
 use crate::streaming::clients::client_manager::Client;
 use crate::streaming::partitions::partition2::PartitionRoot;
 use 
crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken;
-use crate::streaming::stats::stats::{PartitionStats, StreamStats, TopicStats};
+use crate::streaming::stats::{PartitionStats, StreamStats, TopicStats};
 use crate::streaming::streams::stream2;
 use crate::streaming::topics::consumer_group2::{ConsumerGroupMembers, 
ConsumerGroupRoot, Member};
 use crate::streaming::topics::topic2::{self, TopicRoot};
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index 09dca813..5a150ab1 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -32,7 +32,7 @@ use crate::{
         persistence::persister::{FilePersister, FileWithSyncPersister, 
PersisterKind},
         personal_access_tokens::personal_access_token::PersonalAccessToken,
         segments::{Segment2, storage::Storage},
-        stats::stats::{PartitionStats, StreamStats, TopicStats},
+        stats::{PartitionStats, StreamStats, TopicStats},
         storage::SystemStorage,
         streams::stream2,
         topics::{consumer_group2, topic2},
@@ -631,7 +631,7 @@ async fn load_partition(
     parent_stats: Arc<TopicStats>,
 ) -> Result<partition2::Partition, IggyError> {
     let stats = Arc::new(PartitionStats::new(parent_stats));
-    let partition_id = partition_state.id as u32;
+    let partition_id = partition_state.id;
 
     let partition_path = config.get_partition_path(stream_id, topic_id, 
partition_id as usize);
     let log_files = collect_log_files(&partition_path).await?;
diff --git a/core/server/src/http/http_server.rs 
b/core/server/src/http/http_server.rs
index 317e9c61..617bd763 100644
--- a/core/server/src/http/http_server.rs
+++ b/core/server/src/http/http_server.rs
@@ -135,7 +135,8 @@ pub async fn start_http_server(
         let service = 
app.into_make_service_with_connect_info::<CompioSocketAddr>();
 
         // Spawn the server in a task so we can handle shutdown
-        let server_task =
+        // TODO(hubcio): investigate if we can use TaskRegistry here
+        let _server_task =
             compio::runtime::spawn(async move { cyper_axum::serve(listener, 
service).await });
 
         // Wait for shutdown signal
diff --git a/core/server/src/http/mapper.rs b/core/server/src/http/mapper.rs
index df93990a..1fec7f99 100644
--- a/core/server/src/http/mapper.rs
+++ b/core/server/src/http/mapper.rs
@@ -21,7 +21,7 @@ use crate::slab::Keyed;
 use crate::slab::traits_ext::{EntityComponentSystem, IntoComponents};
 use crate::streaming::clients::client_manager::Client;
 use 
crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken;
-use crate::streaming::stats::stats::TopicStats;
+use crate::streaming::stats::TopicStats;
 use crate::streaming::topics::consumer_group2::{ConsumerGroupMembers, 
ConsumerGroupRoot};
 use crate::streaming::topics::topic2::TopicRoot;
 use crate::streaming::users::user::User;
@@ -259,7 +259,7 @@ pub fn map_generated_access_token_to_identity_info(token: 
GeneratedToken) -> Ide
 /// Map StreamRoot and StreamStats to StreamDetails for HTTP responses
 pub fn map_stream_details(
     root: &crate::streaming::streams::stream2::StreamRoot,
-    stats: &crate::streaming::stats::stats::StreamStats,
+    stats: &crate::streaming::stats::StreamStats,
 ) -> iggy_common::StreamDetails {
     // Get topics using the new slab-based API
     let topics = root.topics().with_components(|topic_ref| {
@@ -291,10 +291,10 @@ pub fn map_stream_details(
     }
 }
 
-/// Map StreamRoot and StreamStats to Stream for HTTP responses  
+/// Map StreamRoot and StreamStats to Stream for HTTP responses
 pub fn map_stream(
     root: &crate::streaming::streams::stream2::StreamRoot,
-    stats: &crate::streaming::stats::stats::StreamStats,
+    stats: &crate::streaming::stats::StreamStats,
 ) -> iggy_common::Stream {
     iggy_common::Stream {
         id: root.id() as u32,
@@ -309,7 +309,7 @@ pub fn map_stream(
 /// Map multiple streams from slabs
 pub fn map_streams_from_slabs(
     roots: &slab::Slab<crate::streaming::streams::stream2::StreamRoot>,
-    stats: &slab::Slab<Arc<crate::streaming::stats::stats::StreamStats>>,
+    stats: &slab::Slab<Arc<crate::streaming::stats::StreamStats>>,
 ) -> Vec<iggy_common::Stream> {
     let mut streams = Vec::new();
     for (root, stat) in roots
diff --git a/core/server/src/http/streams.rs b/core/server/src/http/streams.rs
index 650262f7..ab71c551 100644
--- a/core/server/src/http/streams.rs
+++ b/core/server/src/http/streams.rs
@@ -150,7 +150,7 @@ async fn create_stream(
                 .shard()
                 .streams2
                 .with_components_by_id(created_stream_id, |(root, stats)| {
-                    crate::http::mapper::map_stream_details(&*root, &**stats)
+                    crate::http::mapper::map_stream_details(&root, &stats)
                 })
         })();
 
diff --git a/core/server/src/lib.rs b/core/server/src/lib.rs
index 735a2e51..e24396ac 100644
--- a/core/server/src/lib.rs
+++ b/core/server/src/lib.rs
@@ -18,8 +18,6 @@
 
 #[cfg(not(feature = "disable-mimalloc"))]
 use mimalloc::MiMalloc;
-use nix::libc::c_void;
-use nix::libc::iovec;
 
 #[cfg(not(feature = "disable-mimalloc"))]
 #[global_allocator]
@@ -55,10 +53,3 @@ pub fn map_toggle_str<'a>(enabled: bool) -> &'a str {
         false => "disabled",
     }
 }
-
-pub fn to_iovec<T>(data: &[T]) -> iovec {
-    iovec {
-        iov_base: data.as_ptr() as *mut c_void,
-        iov_len: data.len() * std::mem::size_of::<T>(),
-    }
-}
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 36091a5a..a4ecdcc7 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -52,7 +52,6 @@ use std::collections::HashSet;
 use std::rc::Rc;
 use std::str::FromStr;
 use std::sync::atomic::{AtomicU64, Ordering};
-use tokio::time::Instant;
 use tracing::{error, info, instrument, warn};
 
 const COMPONENT: &str = "MAIN";
@@ -63,7 +62,6 @@ static SHUTDOWN_START_TIME: AtomicU64 = AtomicU64::new(0);
 #[instrument(skip_all, name = "trace_start_server")]
 #[compio::main]
 async fn main() -> Result<(), ServerError> {
-    let startup_timestamp = Instant::now();
     let standard_font = FIGfont::standard().unwrap();
     let figure = standard_font.convert("Iggy Server");
     println!("{}", figure.unwrap());
@@ -373,7 +371,7 @@ async fn main() -> Result<(), ServerError> {
     for (idx, handle) in handles.into_iter().enumerate() {
         handle
             .join()
-            .expect(format!("Failed to join shard thread-{}", idx).as_str());
+            .unwrap_or_else(|_| panic!("Failed to join shard thread-{}", idx));
     }
 
     let shutdown_duration_msg = {
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
index a439f7e3..5f25ce63 100644
--- a/core/server/src/shard/builder.rs
+++ b/core/server/src/shard/builder.rs
@@ -116,7 +116,7 @@ impl IggyShardBuilder {
         let connections = self.connections.unwrap();
         let encryptor = self.encryptor;
         let version = self.version.unwrap();
-        let (stop_sender, stop_receiver, frame_receiver) = connections
+        let (_, stop_receiver, frame_receiver) = connections
             .iter()
             .filter(|c| c.id == id)
             .map(|c| {
@@ -150,10 +150,9 @@ impl IggyShardBuilder {
             users: RefCell::new(users),
             encryptor,
             config,
-            version,
+            _version: version,
             state,
             stop_receiver,
-            stop_sender,
             messages_receiver: Cell::new(Some(frame_receiver)),
             metrics,
             is_shutting_down: AtomicBool::new(false),
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 3c6e9348..896c202b 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -37,7 +37,7 @@ use crate::{
             message::{ShardMessage, ShardRequest, ShardRequestPayload, 
ShardSendRequestResult},
         },
     },
-    shard_error, shard_info, shard_warn,
+    shard_error, shard_info,
     slab::{streams::Streams, traits_ext::EntityMarker},
     state::StateKind,
     streaming::{
@@ -70,7 +70,7 @@ use std::{
     time::{Duration, Instant},
 };
 use tracing::{debug, error, instrument, trace};
-use transmission::connector::{Receiver, ShardConnector, StopReceiver, 
StopSender};
+use transmission::connector::{Receiver, ShardConnector, StopReceiver};
 
 pub const COMPONENT: &str = "SHARD";
 pub const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
@@ -123,7 +123,7 @@ impl ShardInfo {
 pub struct IggyShard {
     pub id: u16,
     shards: Vec<Shard>,
-    version: SemanticVersion,
+    _version: SemanticVersion,
 
     // Heart transplant of the old streams structure.
     pub(crate) streams2: Streams,
@@ -142,7 +142,6 @@ pub struct IggyShard {
     pub(crate) metrics: Metrics,
     pub messages_receiver: Cell<Option<Receiver<ShardFrame>>>,
     pub(crate) stop_receiver: StopReceiver,
-    pub(crate) stop_sender: StopSender,
     pub(crate) is_shutting_down: AtomicBool,
     pub(crate) tcp_bound_address: Cell<Option<SocketAddr>>,
     pub(crate) quic_bound_address: Cell<Option<SocketAddr>>,
@@ -622,7 +621,7 @@ impl IggyShard {
                             ),
                         };
 
-                        let batches = if consumer_offset.is_none() {
+                        if consumer_offset.is_none() {
                             let batches = self
                                 .streams2
                                 .get_messages_by_offset(
@@ -652,8 +651,7 @@ impl IggyShard {
                                 )
                                 .await?;
                             Ok(batches)
-                        };
-                        batches
+                        }
                     }
                 }?;
 
@@ -864,13 +862,6 @@ impl IggyShard {
                         // Notify config writer that a server has bound
                         let _ = self.config_writer_notify.try_send(());
                     }
-                    _ => {
-                        shard_warn!(
-                            self.id,
-                            "Received AddressBound event for unsupported 
protocol: {:?}",
-                            protocol
-                        );
-                    }
                 }
                 Ok(())
             }
@@ -991,29 +982,7 @@ impl IggyShard {
                 }
 
                 Ok(())
-            }
-            ShardEvent::StoredOffset {
-                stream_id,
-                topic_id,
-                partition_id,
-                polling_consumer,
-                offset,
-            } => {
-                self.store_consumer_offset_bypass_auth(
-                    &stream_id,
-                    &topic_id,
-                    &polling_consumer,
-                    partition_id,
-                    offset,
-                );
-                Ok(())
-            }
-            ShardEvent::DeletedOffset {
-                stream_id,
-                topic_id,
-                partition_id,
-                polling_consumer,
-            } => Ok(()),
+            },
             ShardEvent::JoinedConsumerGroup {
                 client_id,
                 stream_id,
diff --git a/core/server/src/shard/system/messages.rs 
b/core/server/src/shard/system/messages.rs
index ea834468..437c163e 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -368,11 +368,11 @@ impl IggyShard {
         self.ensure_authenticated(session)?;
         let numeric_stream_id = self
             .streams2
-            .with_stream_by_id(&stream_id, streams::helpers::get_stream_id());
+            .with_stream_by_id(stream_id, streams::helpers::get_stream_id());
 
         let numeric_topic_id =
             self.streams2
-                .with_topic_by_id(&stream_id, &topic_id, 
topics::helpers::get_topic_id());
+                .with_topic_by_id(stream_id, topic_id, 
topics::helpers::get_topic_id());
 
         // Validate permissions for given user on stream and topic.
         self.permissioner
diff --git a/core/server/src/shard/system/partitions.rs 
b/core/server/src/shard/system/partitions.rs
index 695006e2..0f742b2c 100644
--- a/core/server/src/shard/system/partitions.rs
+++ b/core/server/src/shard/system/partitions.rs
@@ -271,7 +271,7 @@ impl IggyShard {
             let ns = IggyNamespace::new(numeric_stream_id, numeric_topic_id, 
partition_id);
             self.remove_shard_table_record(&ns);
 
-            self.delete_partition_dir(numeric_stream_id, numeric_topic_id, 
partition_id, &mut log)
+            self.delete_partition_dir(numeric_stream_id, numeric_topic_id, 
partition_id)
                 .await?;
             let segments_count = stats.segments_count_inconsistent();
             let messages_count = stats.messages_count_inconsistent();
@@ -297,14 +297,12 @@ impl IggyShard {
         stream_id: usize,
         topic_id: usize,
         partition_id: usize,
-        log: &mut SegmentedLog<MemoryMessageJournal>,
     ) -> Result<(), IggyError> {
         delete_partitions_from_disk(
             self.id,
             stream_id,
             topic_id,
             partition_id,
-            log,
             &self.config.system,
         )
         .await
diff --git a/core/server/src/shard/system/personal_access_tokens.rs 
b/core/server/src/shard/system/personal_access_tokens.rs
index 62322e5e..6eb6a0cd 100644
--- a/core/server/src/shard/system/personal_access_tokens.rs
+++ b/core/server/src/shard/system/personal_access_tokens.rs
@@ -171,7 +171,7 @@ impl IggyShard {
         let session = active_sessions
             .iter()
             .find(|s| s.client_id == client_id)
-            .expect(format!("At this point session for {}, should exist.", 
client_id).as_str());
+            .unwrap_or_else(|| panic!("At this point session for {}, should 
exist.", client_id));
         self.login_with_personal_access_token(token, Some(session))?;
         Ok(())
     }
diff --git a/core/server/src/shard/system/streams.rs 
b/core/server/src/shard/system/streams.rs
index 1dbae2c6..b2679f07 100644
--- a/core/server/src/shard/system/streams.rs
+++ b/core/server/src/shard/system/streams.rs
@@ -108,8 +108,7 @@ impl IggyShard {
     }
 
     pub fn delete_stream2_bypass_auth(&self, id: &Identifier) -> 
stream2::Stream {
-        let stream = self.delete_stream2_base(id);
-        stream
+       self.delete_stream2_base(id)
     }
 
     fn delete_stream2_base(&self, id: &Identifier) -> stream2::Stream {
diff --git a/core/server/src/shard/system/topics.rs 
b/core/server/src/shard/system/topics.rs
index 385fb7c5..fbf583cd 100644
--- a/core/server/src/shard/system/topics.rs
+++ b/core/server/src/shard/system/topics.rs
@@ -21,7 +21,7 @@ use crate::shard::IggyShard;
 use crate::shard_info;
 use crate::slab::traits_ext::{EntityComponentSystem, EntityMarker, InsertCell, 
IntoComponents};
 use crate::streaming::session::Session;
-use crate::streaming::stats::stats::{StreamStats, TopicStats};
+use crate::streaming::stats::{StreamStats, TopicStats};
 use crate::streaming::topics::storage2::{create_topic_file_hierarchy, 
delete_topic_from_disk};
 use crate::streaming::topics::topic2::{self};
 use crate::streaming::{partitions, streams, topics};
@@ -31,9 +31,9 @@ use iggy_common::{
 };
 use std::str::FromStr;
 use std::sync::Arc;
-use std::u32;
 
 impl IggyShard {
+    #[allow(clippy::too_many_arguments)]
     pub async fn create_topic2(
         &self,
         session: &Session,
@@ -94,40 +94,12 @@ impl IggyShard {
         Ok(topic)
     }
 
-    fn create_and_insert_topics_mem(
-        &self,
-        stream_id: &Identifier,
-        name: String,
-        replication_factor: u8,
-        message_expiry: IggyExpiry,
-        compression: CompressionAlgorithm,
-        max_topic_size: MaxTopicSize,
-        parent_stats: Arc<StreamStats>,
-    ) -> topic2::Topic {
-        let stats = Arc::new(TopicStats::new(parent_stats));
-        let now = IggyTimestamp::now();
-        let mut topic = topic2::Topic::new(
-            name,
-            stats,
-            now,
-            replication_factor,
-            message_expiry,
-            compression,
-            max_topic_size,
-        );
-
-        let id = self
-            .streams2
-            .with_topics(stream_id, |topics| topics.insert(topic.clone()));
-        topic.update_id(id);
-        topic
-    }
-
     pub fn create_topic2_bypass_auth(&self, stream_id: &Identifier, topic: 
topic2::Topic) -> usize {
         self.streams2
             .with_topics(stream_id, |topics| topics.insert(topic))
     }
 
+    #[allow(clippy::too_many_arguments)]
     pub fn update_topic2(
         &self,
         session: &Session,
@@ -176,6 +148,7 @@ impl IggyShard {
         Ok(())
     }
 
+    #[allow(clippy::too_many_arguments)]
     pub fn update_topic_bypass_auth2(
         &self,
         stream_id: &Identifier,
@@ -198,6 +171,7 @@ impl IggyShard {
         Ok(())
     }
 
+    #[allow(clippy::too_many_arguments)]
     pub fn update_topic_base2(
         &self,
         stream_id: &Identifier,
diff --git a/core/server/src/shard/system/users.rs 
b/core/server/src/shard/system/users.rs
index 573e1f2e..c13d0cbf 100644
--- a/core/server/src/shard/system/users.rs
+++ b/core/server/src/shard/system/users.rs
@@ -64,11 +64,7 @@ impl IggyShard {
     pub fn try_get_user(&self, user_id: &Identifier) -> Result<Option<User>, 
IggyError> {
         match user_id.kind {
             IdKind::Numeric => {
-                let user = self
-                    .users
-                    .borrow()
-                    .get(&user_id.get_u32_value()?)
-                    .map(|user| user.clone());
+                let user = 
self.users.borrow().get(&user_id.get_u32_value()?).cloned();
                 Ok(user)
             }
             IdKind::String => {
@@ -226,7 +222,6 @@ impl IggyShard {
 
     fn delete_user_base(&self, user_id: &Identifier) -> Result<User, 
IggyError> {
         let existing_user_id;
-        let existing_username;
         {
             let user = self.get_user(user_id).with_error_context(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to get user 
with id: {user_id}")
@@ -237,7 +232,6 @@ impl IggyShard {
             }
 
             existing_user_id = user.id;
-            existing_username = user.username.clone();
         }
 
         let user = self
@@ -448,7 +442,7 @@ impl IggyShard {
         let session = active_sessions
             .iter()
             .find(|s| s.client_id == client_id)
-            .expect(format!("At this point session for {}, should exist.", 
client_id).as_str());
+            .unwrap_or_else(|| panic!("At this point session for {}, should 
exist.", client_id));
         self.login_user_with_credentials(username, Some(password), 
Some(session))?;
         Ok(())
     }
diff --git a/core/server/src/shard/system/utils.rs 
b/core/server/src/shard/system/utils.rs
index d2518da7..a0a75313 100644
--- a/core/server/src/shard/system/utils.rs
+++ b/core/server/src/shard/system/utils.rs
@@ -120,9 +120,7 @@ impl IggyShard {
                         
topics::helpers::get_current_partition_id_unchecked(member_id),
                     )
                 };
-                let Some(partition_id) = partition_id else {
-                    return None;
-                };
+                let partition_id = partition_id?;
 
                 Some((
                     PollingConsumer::consumer_group(cg_id, member_id),
diff --git a/core/server/src/shard/transmission/event.rs 
b/core/server/src/shard/transmission/event.rs
index 61bd5647..a57417e5 100644
--- a/core/server/src/shard/transmission/event.rs
+++ b/core/server/src/shard/transmission/event.rs
@@ -1,7 +1,6 @@
 use crate::streaming::{
     partitions::partition2,
     personal_access_tokens::personal_access_token::PersonalAccessToken,
-    polling_consumer::PollingConsumer,
     streams::stream2,
     topics::{
         consumer_group2::{self},
@@ -81,19 +80,6 @@ pub enum ShardEvent {
         stream_id: Identifier,
         topic_id: Identifier,
     },
-    StoredOffset {
-        stream_id: Identifier,
-        topic_id: Identifier,
-        partition_id: usize,
-        polling_consumer: PollingConsumer,
-        offset: u64,
-    },
-    DeletedOffset {
-        stream_id: Identifier,
-        topic_id: Identifier,
-        partition_id: usize,
-        polling_consumer: PollingConsumer,
-    },
     CreatedUser {
         user_id: u32,
         username: String,
diff --git a/core/server/src/slab/consumer_groups.rs 
b/core/server/src/slab/consumer_groups.rs
index 5f98ac5d..d66a41be 100644
--- a/core/server/src/slab/consumer_groups.rs
+++ b/core/server/src/slab/consumer_groups.rs
@@ -112,6 +112,10 @@ impl ConsumerGroups {
         self.root.len()
     }
 
+    pub fn is_empty(&self) -> bool {
+        self.root.is_empty()
+    }
+
     pub fn get_index(&self, id: &Identifier) -> usize {
         match id.kind {
             iggy_common::IdKind::Numeric => id.get_u32_value().unwrap() as 
usize,
diff --git a/core/server/src/slab/helpers.rs b/core/server/src/slab/helpers.rs
index d2fe7487..0a1c5145 100644
--- a/core/server/src/slab/helpers.rs
+++ b/core/server/src/slab/helpers.rs
@@ -17,12 +17,6 @@ where
     |(root, ..)| f(root.topics())
 }
 
-pub fn topics_async<O, F>(f: F) -> impl AsyncFnOnce(ComponentsById<StreamRef>) 
-> O
-where
-    F: for<'a> AsyncFnOnce(&'a Topics) -> O,
-{
-    async |(root, ..)| f(root.topics()).await
-}
 pub fn topics_mut<O, F>(f: F) -> impl FnOnce(ComponentsById<StreamRef>) -> O
 where
     F: for<'a> FnOnce(&'a Topics) -> O,
@@ -37,12 +31,6 @@ where
     |(root, ..)| f(root.partitions())
 }
 
-pub fn partitions_async<O, F>(f: F) -> impl 
AsyncFnOnce(ComponentsById<TopicRef>) -> O
-where
-    F: for<'a> AsyncFnOnce(&'a Partitions) -> O,
-{
-    async |(root, ..)| f(root.partitions()).await
-}
 pub fn partitions_mut<O, F>(f: F) -> impl FnOnce(ComponentsById<TopicRefMut>) 
-> O
 where
     F: for<'a> FnOnce(&'a mut Partitions) -> O,
diff --git a/core/server/src/slab/partitions.rs 
b/core/server/src/slab/partitions.rs
index f430d1ab..87de3186 100644
--- a/core/server/src/slab/partitions.rs
+++ b/core/server/src/slab/partitions.rs
@@ -13,7 +13,7 @@ use crate::{
                 PartitionRefMut,
             },
         },
-        stats::stats::PartitionStats,
+        stats::PartitionStats,
     },
 };
 use slab::Slab;
@@ -196,6 +196,10 @@ impl Partitions {
         self.root.len()
     }
 
+    pub fn is_empty(&self) -> bool {
+        self.root.is_empty()
+    }
+
     pub fn insert_default_log(&mut self) -> ContainerId {
         self.log.insert(Default::default())
     }
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index f318327e..386c7e56 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -1,6 +1,7 @@
 use crate::shard::task_registry::TaskRegistry;
 use crate::shard_trace;
 use crate::streaming::partitions as streaming_partitions;
+use crate::streaming::stats::StreamStats;
 use crate::{
     binary::handlers::messages::poll_messages_handler::IggyPollMetadata,
     configs::{cache_indexes::CacheIndexesConfig, system::SystemConfig},
@@ -26,7 +27,6 @@ use crate::{
         segments::{
             IggyMessagesBatchMut, IggyMessagesBatchSet, Segment2, 
storage::create_segment_storage,
         },
-        stats::stats::StreamStats,
         streams::{
             self,
             stream2::{self, StreamRef, StreamRefMut},
@@ -381,10 +381,7 @@ impl MainOps for Streams {
                 };
 
                 let Some(consumer_offset) = consumer_offset else {
-                    let batches = self
-                        .get_messages_by_offset(stream_id, topic_id, 
partition_id, 0, count)
-                        .await?;
-                    return Ok((metadata, batches));
+                    return Err(IggyError::ConsumerOffsetNotFound(consumer_id));
                 };
                 let offset = consumer_offset + 1;
                 trace!(
@@ -451,7 +448,7 @@ impl Streams {
         f: impl FnOnce(ComponentsById<StreamRef>) -> T,
     ) -> T {
         let id = self.get_index(id);
-        self.with_components_by_id(id, |stream| f(stream))
+        self.with_components_by_id(id, f)
     }
 
     pub fn with_stream_by_id_mut<T>(
@@ -460,7 +457,7 @@ impl Streams {
         f: impl FnOnce(ComponentsById<StreamRefMut>) -> T,
     ) -> T {
         let id = self.get_index(id);
-        self.with_components_by_id_mut(id, |stream| f(stream))
+        self.with_components_by_id_mut(id, f)
     }
 
     pub fn with_topics<T>(&self, stream_id: &Identifier, f: impl 
FnOnce(&Topics) -> T) -> T {
@@ -585,10 +582,6 @@ impl Streams {
         })
     }
 
-    pub fn len(&self) -> usize {
-        self.root.borrow().len()
-    }
-
     pub async fn get_messages_by_offset(
         &self,
         stream_id: &Identifier,
@@ -610,10 +603,6 @@ impl Streams {
         let mut current_offset = offset;
 
         for idx in range {
-            if remaining_count == 0 {
-                break;
-            }
-
             let (segment_start_offset, segment_end_offset) = 
self.with_partition_by_id(
                 stream_id,
                 topic_id,
@@ -665,11 +654,16 @@ impl Streams {
             }
 
             batches.add_batch_set(messages);
+
+            if remaining_count == 0 {
+                break;
+            }
         }
 
         Ok(batches)
     }
 
+    #[allow(clippy::too_many_arguments)]
     async fn get_messages_by_offset_base(
         &self,
         stream_id: &Identifier,
@@ -799,6 +793,7 @@ impl Streams {
         Ok(combined_batch_set)
     }
 
+    #[allow(clippy::too_many_arguments)]
     async fn load_messages_from_disk_by_offset(
         &self,
         stream_id: &Identifier,
@@ -893,10 +888,6 @@ impl Streams {
         let mut batches = IggyMessagesBatchSet::empty();
 
         for idx in range {
-            if remaining_count == 0 {
-                break;
-            }
-
             let segment_end_timestamp = self.with_partition_by_id(
                 stream_id,
                 topic_id,
@@ -929,6 +920,10 @@ impl Streams {
 
             remaining_count = remaining_count.saturating_sub(messages_count);
             batches.add_batch_set(messages);
+
+            if remaining_count == 0 {
+                break;
+            }
         }
 
         Ok(batches)
@@ -1313,7 +1308,6 @@ impl Streams {
             partition_id,
             streaming_partitions::helpers::update_index_and_increment_stats(
                 saved,
-                batch_count,
                 config,
             ),
         );
@@ -1335,26 +1329,26 @@ impl Streams {
             return Ok(());
         }
 
-        if let Some(ref messages_writer) = storage.messages_writer {
-            if let Err(e) = messages_writer.fsync().await {
-                tracing::error!(
-                    "Failed to fsync messages writer for partition {}: {}",
-                    partition_id,
-                    e
-                );
-                return Err(e);
-            }
+        if let Some(ref messages_writer) = storage.messages_writer
+            && let Err(e) = messages_writer.fsync().await
+        {
+            tracing::error!(
+                "Failed to fsync messages writer for partition {}: {}",
+                partition_id,
+                e
+            );
+            return Err(e);
         }
 
-        if let Some(ref index_writer) = storage.index_writer {
-            if let Err(e) = index_writer.fsync().await {
-                tracing::error!(
-                    "Failed to fsync index writer for partition {}: {}",
-                    partition_id,
-                    e
-                );
-                return Err(e);
-            }
+        if let Some(ref index_writer) = storage.index_writer
+            && let Err(e) = index_writer.fsync().await
+        {
+            tracing::error!(
+                "Failed to fsync index writer for partition {}: {}",
+                partition_id,
+                e
+            );
+            return Err(e);
         }
 
         Ok(())
diff --git a/core/server/src/slab/topics.rs b/core/server/src/slab/topics.rs
index d93df224..d66880a9 100644
--- a/core/server/src/slab/topics.rs
+++ b/core/server/src/slab/topics.rs
@@ -15,7 +15,7 @@ use crate::{
         },
     },
     streaming::{
-        stats::stats::TopicStats,
+        stats::TopicStats,
         topics::{
             consumer_group2::{ConsumerGroupRef, ConsumerGroupRefMut},
             topic2::{self, TopicRef, TopicRefMut},
@@ -196,7 +196,7 @@ impl Topics {
         f: impl FnOnce(ComponentsById<TopicRef>) -> T,
     ) -> T {
         let id = self.get_index(topic_id);
-        self.with_components_by_id(id, |components| f(components))
+        self.with_components_by_id(id, f)
     }
 
     pub fn with_topic_by_id_mut<T>(
@@ -205,7 +205,7 @@ impl Topics {
         f: impl FnOnce(ComponentsById<TopicRefMut>) -> T,
     ) -> T {
         let id = self.get_index(topic_id);
-        self.with_components_by_id_mut(id, |components| f(components))
+        self.with_components_by_id_mut(id, f)
     }
 
     pub fn with_consumer_groups<T>(
diff --git a/core/server/src/state/file.rs b/core/server/src/state/file.rs
index 3c4be921..54a59469 100644
--- a/core/server/src/state/file.rs
+++ b/core/server/src/state/file.rs
@@ -35,7 +35,7 @@ use std::sync::Arc;
 use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
 use tracing::{debug, error, info};
 
-pub const BUF_cursor_CAPACITY_BYTES: usize = 512 * 1000;
+pub const BUF_CURSOR_CAPACITY_BYTES: usize = 512 * 1000;
 const FILE_STATE_PARSE_ERROR: &str = "STATE - failed to parse file state";
 
 #[derive(Debug)]
diff --git a/core/server/src/state/system.rs b/core/server/src/state/system.rs
index b5535d0e..31275f56 100644
--- a/core/server/src/state/system.rs
+++ b/core/server/src/state/system.rs
@@ -106,26 +106,15 @@ impl SystemState {
         // Create root user if does not exist.
         let root_exists = state_entries
             .iter()
-            .find(|entry| {
+            .any(|entry| {
                 entry
                     .command()
-                    .and_then(|command| match command {
-                        EntryCommand::CreateUser(payload)
-                            if payload.user_id == DEFAULT_ROOT_USER_ID =>
-                        {
-                            Ok(true)
-                        }
-                        _ => Ok(false),
+                    .map(|command| matches!(command, 
EntryCommand::CreateUser(payload) if payload.user_id == DEFAULT_ROOT_USER_ID))
+                    .unwrap_or_else(|err| {
+                        error!("Failed to check if root user exists: {err}");
+                        false
                     })
-                    .map_or_else(
-                        |err| {
-                            error!("Failed to check if root user exists: 
{err}");
-                            false
-                        },
-                        |v| v,
-                    )
-            })
-            .is_some();
+            });
 
         if !root_exists {
             info!("No users found, creating the root user...");
diff --git a/core/server/src/streaming/deduplication/message_deduplicator.rs 
b/core/server/src/streaming/deduplication/message_deduplicator.rs
index b5a01f32..96b55a74 100644
--- a/core/server/src/streaming/deduplication/message_deduplicator.rs
+++ b/core/server/src/streaming/deduplication/message_deduplicator.rs
@@ -35,11 +35,6 @@ impl Clone for MessageDeduplicator {
         let builder = Self::setup_cache_builder(builder, self.max_entries, 
self.ttl);
         let cache = builder.build();
 
-        self.cache.clone();
-        // Transfer data from the original cache to the new one
-        for (key, value) in self.cache.iter() {
-            cache.insert(*key, value);
-        }
         Self {
             ttl: self.ttl,
             max_entries: self.max_entries,
diff --git a/core/server/src/streaming/partitions/consumer_offset.rs 
b/core/server/src/streaming/partitions/consumer_offset.rs
index ac51b9da..49d30a94 100644
--- a/core/server/src/streaming/partitions/consumer_offset.rs
+++ b/core/server/src/streaming/partitions/consumer_offset.rs
@@ -13,8 +13,8 @@ pub struct ConsumerOffset {
 impl Clone for ConsumerOffset {
     fn clone(&self) -> Self {
         Self {
-            kind: self.kind.clone(),
-            consumer_id: self.consumer_id.clone(),
+            kind: self.kind,
+            consumer_id: self.consumer_id,
             offset: AtomicU64::new(0),
             path: self.path.clone(),
         }
diff --git a/core/server/src/streaming/partitions/helpers.rs 
b/core/server/src/streaming/partitions/helpers.rs
index d0cff51b..9cdd6b2f 100644
--- a/core/server/src/streaming/partitions/helpers.rs
+++ b/core/server/src/streaming/partitions/helpers.rs
@@ -290,7 +290,6 @@ pub fn get_segment_range_by_timestamp(
     timestamp: u64,
 ) -> impl FnOnce(ComponentsById<PartitionRef>) -> 
Result<std::ops::Range<usize>, IggyError> {
     move |(.., log)| -> Result<std::ops::Range<usize>, IggyError> {
-        let segments = log.segments();
         let start = log
             .segments()
             .iter()
@@ -513,7 +512,6 @@ pub fn persist_batch(
 
 pub fn update_index_and_increment_stats(
     saved: IggyByteSize,
-    batch_count: u32,
     config: &SystemConfig,
 ) -> impl FnOnce(ComponentsById<PartitionRefMut>) {
     move |(.., log)| {
diff --git a/core/server/src/streaming/partitions/journal.rs 
b/core/server/src/streaming/partitions/journal.rs
index 54dfa225..bcfbf111 100644
--- a/core/server/src/streaming/partitions/journal.rs
+++ b/core/server/src/streaming/partitions/journal.rs
@@ -119,5 +119,5 @@ pub trait Journal {
     // This could be merged together with `append`, but not doing this for two 
reasons.
     // 1. In case of the `Journal` being used as part of structure that 
utilizes interior mutability, async with borrow_mut is not possible.
     // 2. Having it as separate function allows for more optimal usage 
patterns, e.g. batching multiple appends before flushing.
-    async fn flush(&self) -> Result<(), IggyError>;
+    fn flush(&self) -> impl Future<Output = Result<(), IggyError>>;
 }
diff --git a/core/server/src/streaming/partitions/log.rs 
b/core/server/src/streaming/partitions/log.rs
index 922f929b..e8fb56d5 100644
--- a/core/server/src/streaming/partitions/log.rs
+++ b/core/server/src/streaming/partitions/log.rs
@@ -17,8 +17,8 @@ where
     journal: J,
     // Ring buffer tracking recently accessed segment indices for cleanup 
optimization.
     // A background task uses this to identify and close file descriptors for 
unused segments.
-    access_map: AllocRingBuffer<usize>,
-    cache: (),
+    _access_map: AllocRingBuffer<usize>,
+    _cache: (),
     segments: Vec<Segment2>,
     indexes: Vec<Option<IggyIndexesMut>>,
     storage: Vec<Storage>,
@@ -31,8 +31,8 @@ where
     fn default() -> Self {
         Self {
             journal: J::default(),
-            access_map: 
AllocRingBuffer::with_capacity_power_of_2(ACCESS_MAP_CAPACITY),
-            cache: (),
+            _access_map: 
AllocRingBuffer::with_capacity_power_of_2(ACCESS_MAP_CAPACITY),
+            _cache: (),
             segments: Vec::with_capacity(SEGMENTS_CAPACITY),
             storage: Vec::with_capacity(SEGMENTS_CAPACITY),
             indexes: Vec::with_capacity(SEGMENTS_CAPACITY),
diff --git a/core/server/src/streaming/partitions/partition2.rs 
b/core/server/src/streaming/partitions/partition2.rs
index d635e863..087d23b0 100644
--- a/core/server/src/streaming/partitions/partition2.rs
+++ b/core/server/src/streaming/partitions/partition2.rs
@@ -12,7 +12,7 @@ use crate::{
             consumer_offset, helpers::create_message_deduplicator, 
journal::MemoryMessageJournal,
             log::SegmentedLog,
         },
-        stats::stats::{PartitionStats, TopicStats},
+        stats::{PartitionStats, TopicStats},
     },
 };
 use iggy_common::{Identifier, IggyTimestamp};
@@ -95,6 +95,7 @@ pub struct Partition {
 }
 
 impl Partition {
+    #[allow(clippy::too_many_arguments)] 
     pub fn new(
         created_at: IggyTimestamp,
         should_increment_offset: bool,
diff --git a/core/server/src/streaming/partitions/storage2.rs 
b/core/server/src/streaming/partitions/storage2.rs
index 4977121d..63e5cb65 100644
--- a/core/server/src/streaming/partitions/storage2.rs
+++ b/core/server/src/streaming/partitions/storage2.rs
@@ -4,7 +4,7 @@ use crate::{
     io::fs_utils::remove_dir_all,
     shard_error, shard_info, shard_trace,
     streaming::partitions::{
-        consumer_offset::ConsumerOffset, journal::MemoryMessageJournal, 
log::SegmentedLog,
+        consumer_offset::ConsumerOffset,
     },
 };
 use compio::{
@@ -109,11 +109,8 @@ pub async fn delete_partitions_from_disk(
     stream_id: usize,
     topic_id: usize,
     partition_id: usize,
-    log: &mut SegmentedLog<MemoryMessageJournal>,
     config: &SystemConfig,
 ) -> Result<(), IggyError> {
-    //TODO:
-    //log.close().await;
 
     let partition_path = config.get_partition_path(stream_id, topic_id, 
partition_id);
     remove_dir_all(&partition_path).await.map_err(|_| {
@@ -172,14 +169,14 @@ pub fn load_consumer_offsets(
     kind: ConsumerKind,
 ) -> Result<Vec<ConsumerOffset>, IggyError> {
     trace!("Loading consumer offsets from path: {path}...");
-    let dir_entries = std::fs::read_dir(&path);
+    let dir_entries = std::fs::read_dir(path);
     if dir_entries.is_err() {
         return Err(IggyError::CannotReadConsumerOffsets(path.to_owned()));
     }
 
     let mut consumer_offsets = Vec::new();
-    let mut dir_entries = dir_entries.unwrap();
-    while let Some(dir_entry) = dir_entries.next() {
+    let dir_entries = dir_entries.unwrap();
+    for dir_entry in dir_entries {
         let dir_entry = dir_entry.unwrap();
         let metadata = dir_entry.metadata();
         if metadata.is_err() {
diff --git a/core/server/src/streaming/persistence/mod.rs 
b/core/server/src/streaming/persistence/mod.rs
index 97d3cba3..df7333ba 100644
--- a/core/server/src/streaming/persistence/mod.rs
+++ b/core/server/src/streaming/persistence/mod.rs
@@ -17,6 +17,5 @@
  */
 
 pub mod persister;
-pub mod task;
 
 pub const COMPONENT: &str = "STREAMING_PERSISTENCE";
diff --git a/core/server/src/streaming/persistence/task.rs 
b/core/server/src/streaming/persistence/task.rs
deleted file mode 100644
index 1b359ef7..00000000
--- a/core/server/src/streaming/persistence/task.rs
+++ /dev/null
@@ -1,135 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use crate::streaming::persistence::COMPONENT;
-use bytes::Bytes;
-use compio::runtime::Task;
-use error_set::ErrContext;
-use flume::{Receiver, Sender, unbounded};
-use iggy_common::IggyError;
-use std::{any::Any, sync::Arc, time::Duration};
-use tracing::error;
-
-use super::persister::PersisterKind;
-
-pub struct LogPersisterTask {
-    _sender: Option<Sender<Bytes>>,
-    _task_handle: Option<Task<Result<(), Box<dyn Any + Send>>>>,
-}
-
-impl std::fmt::Debug for LogPersisterTask {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        f.debug_struct("LogPersisterTask")
-            .field("_sender", &self._sender.is_some())
-            .field("_task_handle", &self._task_handle.is_some())
-            .finish()
-    }
-}
-
-impl LogPersisterTask {
-    pub fn new(
-        path: String,
-        persister: Arc<PersisterKind>,
-        max_retries: u32,
-        retry_sleep: Duration,
-    ) -> Self {
-        let (sender, receiver): (Sender<Bytes>, Receiver<Bytes>) = unbounded();
-
-        let task_handle = compio::runtime::spawn(async move {
-            loop {
-                match receiver.recv_async().await {
-                    Ok(data) => {
-                        if let Err(error) = Self::persist_with_retries(
-                            &path,
-                            &persister,
-                            data,
-                            max_retries,
-                            retry_sleep,
-                        )
-                        .await
-                        {
-                            error!("{COMPONENT} (error: {error}) - Final 
failure to persist data.");
-                        }
-                    }
-                    Err(error) => {
-                        error!("{COMPONENT} (error: {error}) - Error receiving 
data from channel.");
-                        return;
-                    }
-                }
-            }
-        });
-
-        LogPersisterTask {
-            _sender: Some(sender),
-            _task_handle: Some(task_handle),
-        }
-    }
-
-    async fn persist_with_retries(
-        path: &str,
-        persister: &Arc<PersisterKind>,
-        data: Bytes,
-        max_retries: u32,
-        retry_sleep: Duration,
-    ) -> Result<(), String> {
-        let mut retries = 0;
-
-        while retries < max_retries {
-            match persister.append(path, data.clone()).await {
-                Ok(_) => return Ok(()),
-                Err(e) => {
-                    error!(
-                        "Could not append to persister (attempt {}): {}",
-                        retries + 1,
-                        e
-                    );
-                    retries += 1;
-                    tokio::time::sleep(retry_sleep).await;
-                }
-            }
-        }
-
-        Err(format!(
-            "{COMPONENT} - failed to persist data after {max_retries} retries",
-        ))
-    }
-
-    pub async fn send(&self, data: Bytes) -> Result<(), IggyError> {
-        if let Some(sender) = &self._sender {
-            sender
-                .send_async(data)
-                .await
-                .with_error_context(|error| {
-                    format!("{COMPONENT} (error: {error}) - failed to send 
data to async channel")
-                })
-                .map_err(|_| IggyError::CannotSaveMessagesToSegment)
-        } else {
-            Err(IggyError::CannotSaveMessagesToSegment)
-        }
-    }
-}
-
-impl Drop for LogPersisterTask {
-    fn drop(&mut self) {
-        self._sender.take();
-
-        if let Some(handle) = self._task_handle.take() {
-            compio::runtime::spawn(async move { handle.await });
-        }
-    }
-}
diff --git a/core/server/src/streaming/segments/indexes/index_reader.rs 
b/core/server/src/streaming/segments/indexes/index_reader.rs
index ff9800d7..33528c62 100644
--- a/core/server/src/streaming/segments/indexes/index_reader.rs
+++ b/core/server/src/streaming/segments/indexes/index_reader.rs
@@ -339,7 +339,7 @@ impl IndexReader {
     ) -> Result<PooledBuffer, std::io::Error> {
         if use_pool {
             let len = len as usize;
-            let buf = PooledBuffer::with_capacity(len as usize);
+            let buf = PooledBuffer::with_capacity(len);
             let (result, buf) = self
                 .file
                 .read_exact_at(buf.slice(..len), offset as u64)
diff --git a/core/server/src/streaming/stats/mod.rs 
b/core/server/src/streaming/stats/mod.rs
index 9d34677f..67c464f3 100644
--- a/core/server/src/streaming/stats/mod.rs
+++ b/core/server/src/streaming/stats/mod.rs
@@ -1 +1,331 @@
-pub mod stats;
+use std::sync::{
+    Arc,
+    atomic::{AtomicU32, AtomicU64, Ordering},
+};
+
+#[derive(Default, Debug)]
+pub struct StreamStats {
+    size_bytes: AtomicU64,
+    messages_count: AtomicU64,
+    segments_count: AtomicU32,
+}
+
+impl StreamStats {
+    pub fn increment_size_bytes(&self, size_bytes: u64) {
+        self.size_bytes.fetch_add(size_bytes, Ordering::AcqRel);
+    }
+
+    pub fn increment_messages_count(&self, messages_count: u64) {
+        self.messages_count
+            .fetch_add(messages_count, Ordering::AcqRel);
+    }
+
+    pub fn increment_segments_count(&self, segments_count: u32) {
+        self.segments_count
+            .fetch_add(segments_count, Ordering::AcqRel);
+    }
+
+    pub fn decrement_size_bytes(&self, size_bytes: u64) {
+        self.size_bytes.fetch_sub(size_bytes, Ordering::AcqRel);
+    }
+
+    pub fn decrement_messages_count(&self, messages_count: u64) {
+        self.messages_count
+            .fetch_sub(messages_count, Ordering::AcqRel);
+    }
+
+    pub fn decrement_segments_count(&self, segments_count: u32) {
+        self.segments_count
+            .fetch_sub(segments_count, Ordering::AcqRel);
+    }
+
+    pub fn size_bytes_inconsistent(&self) -> u64 {
+        self.size_bytes.load(Ordering::Relaxed)
+    }
+
+    pub fn messages_count_inconsistent(&self) -> u64 {
+        self.messages_count.load(Ordering::Relaxed)
+    }
+
+    pub fn segments_count_inconsistent(&self) -> u32 {
+        self.segments_count.load(Ordering::Relaxed)
+    }
+
+    pub fn zero_out_size_bytes(&self) {
+        self.size_bytes.store(0, Ordering::Relaxed);
+    }
+
+    pub fn zero_out_messages_count(&self) {
+        self.messages_count.store(0, Ordering::Relaxed);
+    }
+
+    pub fn zero_out_segments_count(&self) {
+        self.segments_count.store(0, Ordering::Relaxed);
+    }
+
+    pub fn zero_out_all(&self) {
+        self.zero_out_size_bytes();
+        self.zero_out_messages_count();
+        self.zero_out_segments_count();
+    }
+}
+
+#[derive(Default, Debug)]
+pub struct TopicStats {
+    parent: Arc<StreamStats>,
+    size_bytes: AtomicU64,
+    messages_count: AtomicU64,
+    segments_count: AtomicU32,
+}
+
+impl TopicStats {
+    pub fn new(parent: Arc<StreamStats>) -> Self {
+        Self {
+            parent,
+            size_bytes: AtomicU64::new(0),
+            messages_count: AtomicU64::new(0),
+            segments_count: AtomicU32::new(0),
+        }
+    }
+
+    pub fn parent(&self) -> Arc<StreamStats> {
+        self.parent.clone()
+    }
+
+    pub fn increment_parent_size_bytes(&self, size_bytes: u64) {
+        self.parent.increment_size_bytes(size_bytes);
+    }
+
+    pub fn increment_parent_messages_count(&self, messages_count: u64) {
+        self.parent.increment_messages_count(messages_count);
+    }
+
+    pub fn increment_parent_segments_count(&self, segments_count: u32) {
+        self.parent.increment_segments_count(segments_count);
+    }
+
+    pub fn increment_size_bytes(&self, size_bytes: u64) {
+        self.size_bytes.fetch_add(size_bytes, Ordering::AcqRel);
+        self.increment_parent_size_bytes(size_bytes);
+    }
+
+    pub fn increment_messages_count(&self, messages_count: u64) {
+        self.messages_count
+            .fetch_add(messages_count, Ordering::AcqRel);
+        self.increment_parent_messages_count(messages_count);
+    }
+
+    pub fn increment_segments_count(&self, segments_count: u32) {
+        self.segments_count
+            .fetch_add(segments_count, Ordering::AcqRel);
+        self.increment_parent_segments_count(segments_count);
+    }
+
+    pub fn decrement_parent_size_bytes(&self, size_bytes: u64) {
+        self.parent.decrement_size_bytes(size_bytes);
+    }
+
+    pub fn decrement_parent_messages_count(&self, messages_count: u64) {
+        self.parent.decrement_messages_count(messages_count);
+    }
+
+    pub fn decrement_parent_segments_count(&self, segments_count: u32) {
+        self.parent.decrement_segments_count(segments_count);
+    }
+
+    pub fn decrement_size_bytes(&self, size_bytes: u64) {
+        self.size_bytes.fetch_sub(size_bytes, Ordering::AcqRel);
+        self.decrement_parent_size_bytes(size_bytes);
+    }
+
+    pub fn decrement_messages_count(&self, messages_count: u64) {
+        self.messages_count
+            .fetch_sub(messages_count, Ordering::AcqRel);
+        self.decrement_parent_messages_count(messages_count);
+    }
+
+    pub fn decrement_segments_count(&self, segments_count: u32) {
+        self.segments_count
+            .fetch_sub(segments_count, Ordering::AcqRel);
+        self.decrement_parent_segments_count(segments_count);
+    }
+
+    pub fn size_bytes_inconsistent(&self) -> u64 {
+        self.size_bytes.load(Ordering::Relaxed)
+    }
+
+    pub fn messages_count_inconsistent(&self) -> u64 {
+        self.messages_count.load(Ordering::Relaxed)
+    }
+
+    pub fn segments_count_inconsistent(&self) -> u32 {
+        self.segments_count.load(Ordering::Relaxed)
+    }
+
+    pub fn zero_out_parent_size_bytes(&self) {
+        self.parent.zero_out_size_bytes();
+    }
+
+    pub fn zero_out_parent_messages_count(&self) {
+        self.parent.zero_out_messages_count();
+    }
+
+    pub fn zero_out_parent_segments_count(&self) {
+        self.parent.zero_out_segments_count();
+    }
+
+    pub fn zero_out_parent_all(&self) {
+        self.parent.zero_out_all();
+    }
+
+    pub fn zero_out_size_bytes(&self) {
+        self.size_bytes.store(0, Ordering::Relaxed);
+        self.zero_out_parent_size_bytes();
+    }
+
+    pub fn zero_out_messages_count(&self) {
+        self.messages_count.store(0, Ordering::Relaxed);
+        self.zero_out_parent_messages_count();
+    }
+
+    pub fn zero_out_segments_count(&self) {
+        self.segments_count.store(0, Ordering::Relaxed);
+        self.zero_out_parent_segments_count();
+    }
+
+    pub fn zero_out_all(&self) {
+        self.zero_out_size_bytes();
+        self.zero_out_messages_count();
+        self.zero_out_segments_count();
+    }
+}
+
+#[derive(Default, Debug)]
+pub struct PartitionStats {
+    parent: Arc<TopicStats>,
+    messages_count: AtomicU64,
+    size_bytes: AtomicU64,
+    segments_count: AtomicU32,
+}
+
+impl PartitionStats {
+    pub fn new(parent_stats: Arc<TopicStats>) -> Self {
+        Self {
+            parent: parent_stats,
+            messages_count: AtomicU64::new(0),
+            size_bytes: AtomicU64::new(0),
+            segments_count: AtomicU32::new(0),
+        }
+    }
+
+    pub fn parent(&self) -> Arc<TopicStats> {
+        self.parent.clone()
+    }
+
+    pub fn increment_size_bytes(&self, size_bytes: u64) {
+        self.size_bytes.fetch_add(size_bytes, Ordering::AcqRel);
+        self.increment_parent_size_bytes(size_bytes);
+    }
+
+    pub fn increment_messages_count(&self, messages_count: u64) {
+        self.messages_count
+            .fetch_add(messages_count, Ordering::AcqRel);
+        self.increment_parent_messages_count(messages_count);
+    }
+
+    pub fn increment_segments_count(&self, segments_count: u32) {
+        self.segments_count
+            .fetch_add(segments_count, Ordering::AcqRel);
+        self.increment_parent_segments_count(segments_count);
+    }
+
+    pub fn increment_parent_size_bytes(&self, size_bytes: u64) {
+        self.parent.increment_size_bytes(size_bytes);
+    }
+
+    pub fn increment_parent_messages_count(&self, messages_count: u64) {
+        self.parent.increment_messages_count(messages_count);
+    }
+
+    pub fn increment_parent_segments_count(&self, segments_count: u32) {
+        self.parent.increment_segments_count(segments_count);
+    }
+
+    pub fn decrement_size_bytes(&self, size_bytes: u64) {
+        self.size_bytes.fetch_sub(size_bytes, Ordering::AcqRel);
+        self.decrement_parent_size_bytes(size_bytes);
+    }
+
+    pub fn decrement_messages_count(&self, messages_count: u64) {
+        self.messages_count
+            .fetch_sub(messages_count, Ordering::AcqRel);
+        self.decrement_parent_messages_count(messages_count);
+    }
+
+    pub fn decrement_segments_count(&self, segments_count: u32) {
+        self.segments_count
+            .fetch_sub(segments_count, Ordering::AcqRel);
+        self.decrement_parent_segments_count(segments_count);
+    }
+
+    pub fn decrement_parent_size_bytes(&self, size_bytes: u64) {
+        self.parent.decrement_size_bytes(size_bytes);
+    }
+
+    pub fn decrement_parent_messages_count(&self, messages_count: u64) {
+        self.parent.decrement_messages_count(messages_count);
+    }
+
+    pub fn decrement_parent_segments_count(&self, segments_count: u32) {
+        self.parent.decrement_segments_count(segments_count);
+    }
+
+    pub fn size_bytes_inconsistent(&self) -> u64 {
+        self.size_bytes.load(Ordering::Relaxed)
+    }
+
+    pub fn messages_count_inconsistent(&self) -> u64 {
+        self.messages_count.load(Ordering::Relaxed)
+    }
+
+    pub fn segments_count_inconsistent(&self) -> u32 {
+        self.segments_count.load(Ordering::Relaxed)
+    }
+
+    pub fn zero_out_parent_size_bytes(&self) {
+        self.parent.zero_out_size_bytes();
+    }
+
+    pub fn zero_out_parent_messages_count(&self) {
+        self.parent.zero_out_messages_count();
+    }
+
+    pub fn zero_out_parent_segments_count(&self) {
+        self.parent.zero_out_segments_count();
+    }
+
+    pub fn zero_out_parent_all(&self) {
+        self.parent.zero_out_all();
+    }
+
+    pub fn zero_out_size_bytes(&self) {
+        self.size_bytes.store(0, Ordering::Relaxed);
+        self.zero_out_parent_size_bytes();
+    }
+
+    pub fn zero_out_messages_count(&self) {
+        self.messages_count.store(0, Ordering::Relaxed);
+        self.zero_out_parent_messages_count();
+    }
+
+    pub fn zero_out_segments_count(&self) {
+        self.segments_count.store(0, Ordering::Relaxed);
+        self.zero_out_parent_segments_count();
+    }
+
+    pub fn zero_out_all(&self) {
+        self.zero_out_size_bytes();
+        self.zero_out_messages_count();
+        self.zero_out_segments_count();
+    }
+}
diff --git a/core/server/src/streaming/stats/stats.rs 
b/core/server/src/streaming/stats/stats.rs
deleted file mode 100644
index 67c464f3..00000000
--- a/core/server/src/streaming/stats/stats.rs
+++ /dev/null
@@ -1,331 +0,0 @@
-use std::sync::{
-    Arc,
-    atomic::{AtomicU32, AtomicU64, Ordering},
-};
-
-#[derive(Default, Debug)]
-pub struct StreamStats {
-    size_bytes: AtomicU64,
-    messages_count: AtomicU64,
-    segments_count: AtomicU32,
-}
-
-impl StreamStats {
-    pub fn increment_size_bytes(&self, size_bytes: u64) {
-        self.size_bytes.fetch_add(size_bytes, Ordering::AcqRel);
-    }
-
-    pub fn increment_messages_count(&self, messages_count: u64) {
-        self.messages_count
-            .fetch_add(messages_count, Ordering::AcqRel);
-    }
-
-    pub fn increment_segments_count(&self, segments_count: u32) {
-        self.segments_count
-            .fetch_add(segments_count, Ordering::AcqRel);
-    }
-
-    pub fn decrement_size_bytes(&self, size_bytes: u64) {
-        self.size_bytes.fetch_sub(size_bytes, Ordering::AcqRel);
-    }
-
-    pub fn decrement_messages_count(&self, messages_count: u64) {
-        self.messages_count
-            .fetch_sub(messages_count, Ordering::AcqRel);
-    }
-
-    pub fn decrement_segments_count(&self, segments_count: u32) {
-        self.segments_count
-            .fetch_sub(segments_count, Ordering::AcqRel);
-    }
-
-    pub fn size_bytes_inconsistent(&self) -> u64 {
-        self.size_bytes.load(Ordering::Relaxed)
-    }
-
-    pub fn messages_count_inconsistent(&self) -> u64 {
-        self.messages_count.load(Ordering::Relaxed)
-    }
-
-    pub fn segments_count_inconsistent(&self) -> u32 {
-        self.segments_count.load(Ordering::Relaxed)
-    }
-
-    pub fn zero_out_size_bytes(&self) {
-        self.size_bytes.store(0, Ordering::Relaxed);
-    }
-
-    pub fn zero_out_messages_count(&self) {
-        self.messages_count.store(0, Ordering::Relaxed);
-    }
-
-    pub fn zero_out_segments_count(&self) {
-        self.segments_count.store(0, Ordering::Relaxed);
-    }
-
-    pub fn zero_out_all(&self) {
-        self.zero_out_size_bytes();
-        self.zero_out_messages_count();
-        self.zero_out_segments_count();
-    }
-}
-
-#[derive(Default, Debug)]
-pub struct TopicStats {
-    parent: Arc<StreamStats>,
-    size_bytes: AtomicU64,
-    messages_count: AtomicU64,
-    segments_count: AtomicU32,
-}
-
-impl TopicStats {
-    pub fn new(parent: Arc<StreamStats>) -> Self {
-        Self {
-            parent,
-            size_bytes: AtomicU64::new(0),
-            messages_count: AtomicU64::new(0),
-            segments_count: AtomicU32::new(0),
-        }
-    }
-
-    pub fn parent(&self) -> Arc<StreamStats> {
-        self.parent.clone()
-    }
-
-    pub fn increment_parent_size_bytes(&self, size_bytes: u64) {
-        self.parent.increment_size_bytes(size_bytes);
-    }
-
-    pub fn increment_parent_messages_count(&self, messages_count: u64) {
-        self.parent.increment_messages_count(messages_count);
-    }
-
-    pub fn increment_parent_segments_count(&self, segments_count: u32) {
-        self.parent.increment_segments_count(segments_count);
-    }
-
-    pub fn increment_size_bytes(&self, size_bytes: u64) {
-        self.size_bytes.fetch_add(size_bytes, Ordering::AcqRel);
-        self.increment_parent_size_bytes(size_bytes);
-    }
-
-    pub fn increment_messages_count(&self, messages_count: u64) {
-        self.messages_count
-            .fetch_add(messages_count, Ordering::AcqRel);
-        self.increment_parent_messages_count(messages_count);
-    }
-
-    pub fn increment_segments_count(&self, segments_count: u32) {
-        self.segments_count
-            .fetch_add(segments_count, Ordering::AcqRel);
-        self.increment_parent_segments_count(segments_count);
-    }
-
-    pub fn decrement_parent_size_bytes(&self, size_bytes: u64) {
-        self.parent.decrement_size_bytes(size_bytes);
-    }
-
-    pub fn decrement_parent_messages_count(&self, messages_count: u64) {
-        self.parent.decrement_messages_count(messages_count);
-    }
-
-    pub fn decrement_parent_segments_count(&self, segments_count: u32) {
-        self.parent.decrement_segments_count(segments_count);
-    }
-
-    pub fn decrement_size_bytes(&self, size_bytes: u64) {
-        self.size_bytes.fetch_sub(size_bytes, Ordering::AcqRel);
-        self.decrement_parent_size_bytes(size_bytes);
-    }
-
-    pub fn decrement_messages_count(&self, messages_count: u64) {
-        self.messages_count
-            .fetch_sub(messages_count, Ordering::AcqRel);
-        self.decrement_parent_messages_count(messages_count);
-    }
-
-    pub fn decrement_segments_count(&self, segments_count: u32) {
-        self.segments_count
-            .fetch_sub(segments_count, Ordering::AcqRel);
-        self.decrement_parent_segments_count(segments_count);
-    }
-
-    pub fn size_bytes_inconsistent(&self) -> u64 {
-        self.size_bytes.load(Ordering::Relaxed)
-    }
-
-    pub fn messages_count_inconsistent(&self) -> u64 {
-        self.messages_count.load(Ordering::Relaxed)
-    }
-
-    pub fn segments_count_inconsistent(&self) -> u32 {
-        self.segments_count.load(Ordering::Relaxed)
-    }
-
-    pub fn zero_out_parent_size_bytes(&self) {
-        self.parent.zero_out_size_bytes();
-    }
-
-    pub fn zero_out_parent_messages_count(&self) {
-        self.parent.zero_out_messages_count();
-    }
-
-    pub fn zero_out_parent_segments_count(&self) {
-        self.parent.zero_out_segments_count();
-    }
-
-    pub fn zero_out_parent_all(&self) {
-        self.parent.zero_out_all();
-    }
-
-    pub fn zero_out_size_bytes(&self) {
-        self.size_bytes.store(0, Ordering::Relaxed);
-        self.zero_out_parent_size_bytes();
-    }
-
-    pub fn zero_out_messages_count(&self) {
-        self.messages_count.store(0, Ordering::Relaxed);
-        self.zero_out_parent_messages_count();
-    }
-
-    pub fn zero_out_segments_count(&self) {
-        self.segments_count.store(0, Ordering::Relaxed);
-        self.zero_out_parent_segments_count();
-    }
-
-    pub fn zero_out_all(&self) {
-        self.zero_out_size_bytes();
-        self.zero_out_messages_count();
-        self.zero_out_segments_count();
-    }
-}
-
-#[derive(Default, Debug)]
-pub struct PartitionStats {
-    parent: Arc<TopicStats>,
-    messages_count: AtomicU64,
-    size_bytes: AtomicU64,
-    segments_count: AtomicU32,
-}
-
-impl PartitionStats {
-    pub fn new(parent_stats: Arc<TopicStats>) -> Self {
-        Self {
-            parent: parent_stats,
-            messages_count: AtomicU64::new(0),
-            size_bytes: AtomicU64::new(0),
-            segments_count: AtomicU32::new(0),
-        }
-    }
-
-    pub fn parent(&self) -> Arc<TopicStats> {
-        self.parent.clone()
-    }
-
-    pub fn increment_size_bytes(&self, size_bytes: u64) {
-        self.size_bytes.fetch_add(size_bytes, Ordering::AcqRel);
-        self.increment_parent_size_bytes(size_bytes);
-    }
-
-    pub fn increment_messages_count(&self, messages_count: u64) {
-        self.messages_count
-            .fetch_add(messages_count, Ordering::AcqRel);
-        self.increment_parent_messages_count(messages_count);
-    }
-
-    pub fn increment_segments_count(&self, segments_count: u32) {
-        self.segments_count
-            .fetch_add(segments_count, Ordering::AcqRel);
-        self.increment_parent_segments_count(segments_count);
-    }
-
-    pub fn increment_parent_size_bytes(&self, size_bytes: u64) {
-        self.parent.increment_size_bytes(size_bytes);
-    }
-
-    pub fn increment_parent_messages_count(&self, messages_count: u64) {
-        self.parent.increment_messages_count(messages_count);
-    }
-
-    pub fn increment_parent_segments_count(&self, segments_count: u32) {
-        self.parent.increment_segments_count(segments_count);
-    }
-
-    pub fn decrement_size_bytes(&self, size_bytes: u64) {
-        self.size_bytes.fetch_sub(size_bytes, Ordering::AcqRel);
-        self.decrement_parent_size_bytes(size_bytes);
-    }
-
-    pub fn decrement_messages_count(&self, messages_count: u64) {
-        self.messages_count
-            .fetch_sub(messages_count, Ordering::AcqRel);
-        self.decrement_parent_messages_count(messages_count);
-    }
-
-    pub fn decrement_segments_count(&self, segments_count: u32) {
-        self.segments_count
-            .fetch_sub(segments_count, Ordering::AcqRel);
-        self.decrement_parent_segments_count(segments_count);
-    }
-
-    pub fn decrement_parent_size_bytes(&self, size_bytes: u64) {
-        self.parent.decrement_size_bytes(size_bytes);
-    }
-
-    pub fn decrement_parent_messages_count(&self, messages_count: u64) {
-        self.parent.decrement_messages_count(messages_count);
-    }
-
-    pub fn decrement_parent_segments_count(&self, segments_count: u32) {
-        self.parent.decrement_segments_count(segments_count);
-    }
-
-    pub fn size_bytes_inconsistent(&self) -> u64 {
-        self.size_bytes.load(Ordering::Relaxed)
-    }
-
-    pub fn messages_count_inconsistent(&self) -> u64 {
-        self.messages_count.load(Ordering::Relaxed)
-    }
-
-    pub fn segments_count_inconsistent(&self) -> u32 {
-        self.segments_count.load(Ordering::Relaxed)
-    }
-
-    pub fn zero_out_parent_size_bytes(&self) {
-        self.parent.zero_out_size_bytes();
-    }
-
-    pub fn zero_out_parent_messages_count(&self) {
-        self.parent.zero_out_messages_count();
-    }
-
-    pub fn zero_out_parent_segments_count(&self) {
-        self.parent.zero_out_segments_count();
-    }
-
-    pub fn zero_out_parent_all(&self) {
-        self.parent.zero_out_all();
-    }
-
-    pub fn zero_out_size_bytes(&self) {
-        self.size_bytes.store(0, Ordering::Relaxed);
-        self.zero_out_parent_size_bytes();
-    }
-
-    pub fn zero_out_messages_count(&self) {
-        self.messages_count.store(0, Ordering::Relaxed);
-        self.zero_out_parent_messages_count();
-    }
-
-    pub fn zero_out_segments_count(&self) {
-        self.segments_count.store(0, Ordering::Relaxed);
-        self.zero_out_parent_segments_count();
-    }
-
-    pub fn zero_out_all(&self) {
-        self.zero_out_size_bytes();
-        self.zero_out_messages_count();
-        self.zero_out_segments_count();
-    }
-}
diff --git a/core/server/src/streaming/streams/stream2.rs 
b/core/server/src/streaming/streams/stream2.rs
index d637e29d..f890d96d 100644
--- a/core/server/src/streaming/streams/stream2.rs
+++ b/core/server/src/streaming/streams/stream2.rs
@@ -12,7 +12,7 @@ use crate::{
         topics::Topics,
         traits_ext::{EntityMarker, InsertCell, IntoComponents, 
IntoComponentsById},
     },
-    streaming::stats::stats::StreamStats,
+    streaming::stats::StreamStats,
 };
 
 #[derive(Debug, Clone)]
diff --git a/core/server/src/streaming/topics/consumer_group2.rs 
b/core/server/src/streaming/topics/consumer_group2.rs
index 0236d3da..ca58d55f 100644
--- a/core/server/src/streaming/topics/consumer_group2.rs
+++ b/core/server/src/streaming/topics/consumer_group2.rs
@@ -196,8 +196,8 @@ pub struct Member {
 impl Clone for Member {
     fn clone(&self) -> Self {
         Self {
-            id: self.id.clone(),
-            client_id: self.client_id.clone(),
+            id: self.id,
+            client_id: self.client_id,
             partitions: self.partitions.clone(),
             current_partition_idx: AtomicUsize::new(0),
         }
diff --git a/core/server/src/streaming/topics/helpers.rs 
b/core/server/src/streaming/topics/helpers.rs
index 8b925a0d..976ffad9 100644
--- a/core/server/src/streaming/topics/helpers.rs
+++ b/core/server/src/streaming/topics/helpers.rs
@@ -5,12 +5,11 @@ use crate::{
     slab::{
         Keyed,
         consumer_groups::{self, ConsumerGroups},
-        partitions::{self},
         topics::{self, Topics},
         traits_ext::{ComponentsById, Delete, DeleteCell, EntityMarker},
     },
     streaming::{
-        stats::stats::TopicStats,
+        stats::TopicStats,
         topics::{
             consumer_group2::{
                 self, ConsumerGroupMembers, ConsumerGroupRef, 
ConsumerGroupRefMut, Member,
@@ -183,13 +182,13 @@ fn add_member(
     shard_id: u16,
     id: usize,
     members: &mut ConsumerGroupMembers,
-    partitions: &Vec<usize>,
+    partitions: &[usize],
     client_id: u32,
 ) {
     members.inner_mut().rcu(move |members| {
         let mut members = mimic_members(members);
         Member::new(client_id).insert_into(&mut members);
-        assign_partitions_to_members(shard_id, id, &mut members, 
partitions.clone());
+        assign_partitions_to_members(shard_id, id, &mut members, 
partitions.to_vec());
         members
     });
 }
@@ -199,7 +198,7 @@ fn delete_member(
     id: usize,
     client_id: u32,
     members: &mut ConsumerGroupMembers,
-    partitions: &Vec<usize>,
+    partitions: &[usize],
 ) {
     let member_id = members
         .inner()
@@ -214,7 +213,7 @@ fn delete_member(
             entry.id = idx;
             true
         });
-        assign_partitions_to_members(shard_id, id, &mut members, 
partitions.clone());
+        assign_partitions_to_members(shard_id, id, &mut members, 
partitions.to_vec());
         members
     });
 }
@@ -253,29 +252,4 @@ fn mimic_members(members: &Slab<Member>) -> Slab<Member> {
         Member::new(member.client_id).insert_into(&mut container);
     }
     container
-}
-
-fn reassign_partitions(
-    shard_id: u16,
-    partitions: Vec<partitions::ContainerId>,
-) -> impl FnOnce(ComponentsById<ConsumerGroupRefMut>) {
-    move |(root, members)| {
-        root.assign_partitions(partitions);
-        let partitions = root.partitions();
-        let id = root.id();
-        reassign_partitions_to_members(shard_id, id, members, partitions);
-    }
-}
-
-fn reassign_partitions_to_members(
-    shard_id: u16,
-    id: usize,
-    members: &mut ConsumerGroupMembers,
-    partitions: &Vec<usize>,
-) {
-    members.inner_mut().rcu(move |members| {
-        let mut members = mimic_members(members);
-        assign_partitions_to_members(shard_id, id, &mut members, 
partitions.clone());
-        members
-    });
-}
+}
\ No newline at end of file
diff --git a/core/server/src/streaming/topics/storage2.rs 
b/core/server/src/streaming/topics/storage2.rs
index f355b74f..b166bc96 100644
--- a/core/server/src/streaming/topics/storage2.rs
+++ b/core/server/src/streaming/topics/storage2.rs
@@ -66,15 +66,14 @@ pub async fn delete_topic_from_disk(
     let partitions = topic.root_mut().partitions_mut();
     for id in ids {
         let partition = partitions.delete(id);
-        let (root, stats, _, _, _, _, mut log) = partition.into_components();
+        let (root, stats, _, _, _, _, _log) = partition.into_components();
         let partition_id = root.id();
         delete_partitions_from_disk(
             shard_id,
             stream_id,
             topic_id,
             partition_id,
-            &mut log,
-            &config,
+            config,
         )
         .await?;
         messages_count += stats.messages_count_inconsistent();
diff --git a/core/server/src/streaming/topics/topic2.rs 
b/core/server/src/streaming/topics/topic2.rs
index 930d96af..cfa9e593 100644
--- a/core/server/src/streaming/topics/topic2.rs
+++ b/core/server/src/streaming/topics/topic2.rs
@@ -3,7 +3,7 @@ use crate::slab::streams::Streams;
 use crate::slab::topics;
 use crate::slab::traits_ext::{EntityMarker, InsertCell, IntoComponents, 
IntoComponentsById};
 use crate::slab::{Keyed, consumer_groups::ConsumerGroups, 
partitions::Partitions};
-use crate::streaming::stats::stats::{StreamStats, TopicStats};
+use crate::streaming::stats::{StreamStats, TopicStats};
 use iggy_common::{CompressionAlgorithm, Identifier, IggyExpiry, IggyTimestamp, 
MaxTopicSize};
 use slab::Slab;
 use std::cell::{Ref, RefMut};
@@ -326,6 +326,7 @@ impl TopicRoot {
 }
 
 // TODO: Move to separate module.
+#[allow(clippy::too_many_arguments)]
 pub fn create_and_insert_topics_mem(
     streams: &Streams,
     stream_id: &Identifier,
diff --git a/core/server/src/streaming/utils/memory_pool.rs 
b/core/server/src/streaming/utils/memory_pool.rs
index ed9cb4e9..d3fb62fe 100644
--- a/core/server/src/streaming/utils/memory_pool.rs
+++ b/core/server/src/streaming/utils/memory_pool.rs
@@ -240,10 +240,8 @@ impl MemoryPool {
             );
         }
 
-        if was_pool_allocated {
-            if let Some(orig_idx) = self.best_fit(original_capacity) {
-                self.dec_bucket_in_use(orig_idx);
-            }
+        if was_pool_allocated && let Some(orig_idx) = 
self.best_fit(original_capacity) {
+            self.dec_bucket_in_use(orig_idx);
         }
 
         match self.best_fit(current_capacity) {
diff --git a/core/server/src/tcp/connection_handler.rs 
b/core/server/src/tcp/connection_handler.rs
index 1427640a..7e1c112d 100644
--- a/core/server/src/tcp/connection_handler.rs
+++ b/core/server/src/tcp/connection_handler.rs
@@ -71,7 +71,7 @@ pub(crate) async fn handle_connection(
         let length =
             
u32::from_le_bytes(initial_buffer[0..INITIAL_BYTES_LENGTH].try_into().unwrap());
         let (res, mut code_buffer_out) = sender.read(code_buffer).await;
-        let _ = res?;
+        res?;
         let code: u32 =
             
u32::from_le_bytes(code_buffer_out[0..INITIAL_BYTES_LENGTH].try_into().unwrap());
         initial_buffer.clear();
diff --git a/core/server/src/tcp/tcp_listener.rs 
b/core/server/src/tcp/tcp_listener.rs
index 8fc8cbd3..5332ec19 100644
--- a/core/server/src/tcp/tcp_listener.rs
+++ b/core/server/src/tcp/tcp_listener.rs
@@ -89,7 +89,8 @@ pub async fn start(
             format!("Failed to bind {server_name} server to address: {addr}, 
{err}")
         })?;
     let actual_addr = listener.local_addr().map_err(|e| {
-        shard_error!(shard.id, "Failed to get local address: {e}");
+        // TODO(hubcio): macro doesn't work properly with syntax like {e}
+        shard_error!(shard.id, "Failed to get local address: {}", e);
         IggyError::CannotBindToSocket(addr.to_string())
     })?;
     shard_info!(
diff --git a/core/server/src/tcp/tcp_server.rs 
b/core/server/src/tcp/tcp_server.rs
index 180388f7..6051d518 100644
--- a/core/server/src/tcp/tcp_server.rs
+++ b/core/server/src/tcp/tcp_server.rs
@@ -34,7 +34,6 @@ pub async fn spawn_tcp_server(
     } else {
         "Iggy TCP"
     };
-    let ip_v6 = shard.config.tcp.ipv6;
     let socket_config = &shard.config.tcp.socket;
     let addr: SocketAddr = shard
         .config
diff --git a/core/server/src/tcp/tcp_tls_listener.rs 
b/core/server/src/tcp/tcp_tls_listener.rs
index c69326d2..376e6930 100644
--- a/core/server/src/tcp/tcp_tls_listener.rs
+++ b/core/server/src/tcp/tcp_tls_listener.rs
@@ -194,7 +194,6 @@ async fn accept_loop(
                         let acceptor = acceptor.clone();
 
                         // Perform TLS handshake in a separate task to avoid 
blocking the accept loop
-                        let task_shard = shard_clone.clone();
                         let registry = shard.task_registry.clone();
                         let registry_clone = registry.clone();
                         registry.spawn_connection(async move {

Reply via email to