This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/io_uring_tpc by this push:
     new f940fc82 feat(io_uring): fix nasty concurrency bug (#2223)
f940fc82 is described below

commit f940fc821508b9d1aff9db6bce251c47d7a8a1e8
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Thu Oct 2 09:29:29 2025 +0200

    feat(io_uring): fix nasty concurrency bug (#2223)
---
 .../consumer_groups/get_consumer_group_handler.rs  |  19 +-
 .../consumer_groups/get_consumer_groups_handler.rs |  21 +-
 .../binary/handlers/topics/get_topic_handler.rs    |  14 +-
 .../binary/handlers/topics/get_topics_handler.rs   |  17 +-
 core/server/src/http/consumer_groups.rs            |  66 +-
 core/server/src/shard/mod.rs                       |  81 +--
 core/server/src/shard/system/consumer_offsets.rs   |  48 +-
 core/server/src/shard/system/messages.rs           |  48 +-
 core/server/src/slab/consumer_groups.rs            |  18 +-
 core/server/src/slab/helpers.rs                    |  16 +-
 core/server/src/slab/partitions.rs                 |  22 +-
 core/server/src/slab/streams.rs                    | 721 ++++++++++++++++-----
 core/server/src/slab/topics.rs                     |  49 +-
 core/server/src/slab/traits_ext.rs                 |  11 -
 core/server/src/streaming/partitions/helpers.rs    | 325 +---------
 core/server/src/streaming/partitions/partition2.rs |  28 +-
 core/server/src/streaming/segments/storage.rs      |  28 +-
 .../streaming/segments/types/messages_batch_mut.rs |   3 +-
 core/server/src/streaming/topics/storage2.rs       | 188 ------
 19 files changed, 766 insertions(+), 957 deletions(-)

diff --git 
a/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs 
b/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs
index 5330e748..a056659c 100644
--- 
a/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs
@@ -59,18 +59,13 @@ impl ServerCommandHandler for GetConsumerGroup {
             numeric_topic_id as u32,
         );
 
-        shard
-            .streams2
-            .with_consumer_group_by_id_async(
-                &self.stream_id,
-                &self.topic_id,
-                &self.group_id,
-                async |(root, members)| {
-                    let consumer_group = mapper::map_consumer_group(root, 
members);
-                    sender.send_ok_response(&consumer_group).await
-                },
-            )
-            .await?;
+        let consumer_group = shard.streams2.with_consumer_group_by_id(
+            &self.stream_id,
+            &self.topic_id,
+            &self.group_id,
+            |(root, members)| mapper::map_consumer_group(root, members),
+        );
+        sender.send_ok_response(&consumer_group).await?;
         Ok(())
     }
 }
diff --git 
a/core/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs
 
b/core/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs
index 1c9ecd95..3740d44a 100644
--- 
a/core/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs
@@ -61,17 +61,16 @@ impl ServerCommandHandler for GetConsumerGroups {
             numeric_topic_id as u32,
         );
 
-        shard
-            .streams2
-            .with_consumer_groups_async(&self.stream_id, &self.topic_id, async 
|cgs| {
-                cgs.with_components_async(async |cgs| {
-                    let (roots, members) = cgs.into_components();
-                    let consumer_groups = mapper::map_consumer_groups(roots, 
members);
-                    sender.send_ok_response(&consumer_groups).await
-                })
-                .await
-            })
-            .await?;
+        let consumer_groups =
+            shard
+                .streams2
+                .with_consumer_groups(&self.stream_id, &self.topic_id, |cgs| {
+                    cgs.with_components(|cgs| {
+                        let (roots, members) = cgs.into_components();
+                        mapper::map_consumer_groups(roots, members)
+                    })
+                });
+        sender.send_ok_response(&consumer_groups).await?;
         Ok(())
     }
 }
diff --git a/core/server/src/binary/handlers/topics/get_topic_handler.rs 
b/core/server/src/binary/handlers/topics/get_topic_handler.rs
index 653ce995..e06a07a7 100644
--- a/core/server/src/binary/handlers/topics/get_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/get_topic_handler.rs
@@ -53,13 +53,13 @@ impl ServerCommandHandler for GetTopic {
             self.topic_id.get_u32_value().unwrap_or(0),
         )?;
 
-        shard
-            .streams2
-            .with_topic_by_id_async(&self.stream_id, &self.topic_id, async 
|(root, _, stats)| {
-                let response = mapper::map_topic(&root, &stats);
-                sender.send_ok_response(&response).await
-            })
-            .await?;
+        let response =
+            shard
+                .streams2
+                .with_topic_by_id(&self.stream_id, &self.topic_id, |(root, _, 
stats)| {
+                    mapper::map_topic(&root, &stats)
+                });
+        sender.send_ok_response(&response).await?;
         Ok(())
     }
 }
diff --git a/core/server/src/binary/handlers/topics/get_topics_handler.rs 
b/core/server/src/binary/handlers/topics/get_topics_handler.rs
index 561c0adc..0c78e90e 100644
--- a/core/server/src/binary/handlers/topics/get_topics_handler.rs
+++ b/core/server/src/binary/handlers/topics/get_topics_handler.rs
@@ -53,18 +53,13 @@ impl ServerCommandHandler for GetTopics {
             .borrow()
             .get_topics(session.get_user_id(), numeric_stream_id as u32);
 
-        shard
-            .streams2
-            .with_topics_async(&self.stream_id, async |topics| {
-                topics
-                    .with_components_async(async |topics| {
-                        let (roots, _, stats) = topics.into_components();
-                        let response = mapper::map_topics(&roots, &stats);
-                        sender.send_ok_response(&response).await
-                    })
-                    .await
+        let response = shard.streams2.with_topics(&self.stream_id, |topics| {
+            topics.with_components(|topics| {
+                let (roots, _, stats) = topics.into_components();
+                mapper::map_topics(&roots, &stats)
             })
-            .await?;
+        });
+        sender.send_ok_response(&response).await?;
         Ok(())
     }
 }
diff --git a/core/server/src/http/consumer_groups.rs 
b/core/server/src/http/consumer_groups.rs
index 56f4ecf4..d4883470 100644
--- a/core/server/src/http/consumer_groups.rs
+++ b/core/server/src/http/consumer_groups.rs
@@ -93,21 +93,12 @@ async fn get_consumer_group(
             numeric_topic_id as u32,
         )?;
 
-    let consumer_group = {
-        let future = SendWrapper::new(
-            state
-                .shard
-                .shard()
-                .streams2
-                .with_consumer_group_by_id_async(
-                    &identifier_stream_id,
-                    &identifier_topic_id,
-                    &identifier_group_id,
-                    async |(root, members)| mapper::map_consumer_group(root, 
members),
-                ),
-        );
-        future.await
-    };
+    let consumer_group = 
state.shard.shard().streams2.with_consumer_group_by_id(
+        &identifier_stream_id,
+        &identifier_topic_id,
+        &identifier_group_id,
+        |(root, members)| mapper::map_consumer_group(root, members),
+    );
 
     Ok(Json(consumer_group))
 }
@@ -150,20 +141,16 @@ async fn get_consumer_groups(
             numeric_topic_id as u32,
         )?;
 
-    let consumer_groups = {
-        let future = 
SendWrapper::new(state.shard.shard().streams2.with_consumer_groups_async(
-            &identifier_stream_id,
-            &identifier_topic_id,
-            async |cgs| {
-                cgs.with_components_async(async |cgs| {
-                    let (roots, members) = cgs.into_components();
-                    mapper::map_consumer_groups(roots, members)
-                })
-                .await
-            },
-        ));
-        future.await
-    };
+    let consumer_groups = state.shard.shard().streams2.with_consumer_groups(
+        &identifier_stream_id,
+        &identifier_topic_id,
+        |cgs| {
+            cgs.with_components(|cgs| {
+                let (roots, members) = cgs.into_components();
+                mapper::map_consumer_groups(roots, members)
+            })
+        },
+    );
 
     Ok(Json(consumer_groups))
 }
@@ -213,21 +200,12 @@ async fn create_consumer_group(
 
     // Get the created consumer group details
     let group_id_identifier = Identifier::numeric(group_id as u32).unwrap();
-    let consumer_group_details = {
-        let future = SendWrapper::new(
-            state
-                .shard
-                .shard()
-                .streams2
-                .with_consumer_group_by_id_async(
-                    &command.stream_id,
-                    &command.topic_id,
-                    &group_id_identifier,
-                    async |(root, members)| mapper::map_consumer_group(root, 
members),
-                ),
-        );
-        future.await
-    };
+    let consumer_group_details = 
state.shard.shard().streams2.with_consumer_group_by_id(
+        &command.stream_id,
+        &command.topic_id,
+        &group_id_identifier,
+        |(root, members)| mapper::map_consumer_group(root, members),
+    );
 
     // Apply state change
     let entry_command = 
EntryCommand::CreateConsumerGroup(CreateConsumerGroupWithId {
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 035cdaac..8a5b7d8b 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -572,55 +572,58 @@ impl IggyShard {
                     );
                     match consumer {
                         PollingConsumer::Consumer(consumer_id, _) => {
-                            self.streams2.with_partition_by_id(
+                            let (offset_value, path) = 
self.streams2.with_partition_by_id(
                                 &stream_id,
                                 &topic_id,
                                 partition_id,
-                                partitions::helpers::store_consumer_offset(
-                                    consumer_id,
-                                    numeric_stream_id,
-                                    numeric_topic_id,
-                                    partition_id,
-                                    offset,
-                                    &self.config.system,
-                                ),
-                            );
-                            self.streams2
-                                .with_partition_by_id_async(
-                                    &stream_id,
-                                    &topic_id,
-                                    partition_id,
-                                    
partitions::helpers::persist_consumer_offset_to_disk(
-                                        self.id,
+                                |(.., offsets, _, _)| {
+                                    let hdl = offsets.pin();
+                                    let item = hdl.get_or_insert(
                                         consumer_id,
-                                    ),
-                                )
-                                .await?;
+                                        
crate::streaming::partitions::consumer_offset::ConsumerOffset::default_for_consumer(
+                                            consumer_id as u32,
+                                            
&self.config.system.get_consumer_offsets_path(numeric_stream_id, 
numeric_topic_id, partition_id),
+                                        ),
+                                    );
+                                    item.offset.store(offset, 
std::sync::atomic::Ordering::Relaxed);
+                                    let offset_value = 
item.offset.load(std::sync::atomic::Ordering::Relaxed);
+                                    let path = item.path.clone();
+                                    (offset_value, path)
+                                },
+                            );
+                            
crate::streaming::partitions::storage2::persist_offset(
+                                self.id,
+                                &path,
+                                offset_value,
+                            )
+                            .await?;
                         }
                         PollingConsumer::ConsumerGroup(cg_id, _) => {
-                            self.streams2.with_partition_by_id(
+                            let (offset_value, path) = 
self.streams2.with_partition_by_id(
                                 &stream_id,
                                 &topic_id,
                                 partition_id,
-                                
partitions::helpers::store_consumer_group_member_offset(
-                                    cg_id,
-                                    numeric_stream_id,
-                                    numeric_topic_id,
-                                    partition_id,
-                                    offset,
-                                    &self.config.system,
-                                ),
-                            );
-                            self.streams2.with_partition_by_id_async(
-                                    &stream_id,
-                                    &topic_id,
-                                    partition_id,
-                                    
partitions::helpers::persist_consumer_group_member_offset_to_disk(
-                                        self.id,
+                                |(.., offsets, _)| {
+                                    let hdl = offsets.pin();
+                                    let item = hdl.get_or_insert(
                                         cg_id,
-                                    ),
-                                )
-                                .await?;
+                                        
crate::streaming::partitions::consumer_offset::ConsumerOffset::default_for_consumer_group(
+                                            cg_id as u32,
+                                            
&self.config.system.get_consumer_group_offsets_path(numeric_stream_id, 
numeric_topic_id, partition_id),
+                                        ),
+                                    );
+                                    item.offset.store(offset, 
std::sync::atomic::Ordering::Relaxed);
+                                    let offset_value = 
item.offset.load(std::sync::atomic::Ordering::Relaxed);
+                                    let path = item.path.clone();
+                                    (offset_value, path)
+                                },
+                            );
+                            
crate::streaming::partitions::storage2::persist_offset(
+                                self.id,
+                                &path,
+                                offset_value,
+                            )
+                            .await?;
                         }
                     }
                 }
diff --git a/core/server/src/shard/system/consumer_offsets.rs 
b/core/server/src/shard/system/consumer_offsets.rs
index 5e4f4ff5..af5f71bb 100644
--- a/core/server/src/shard/system/consumer_offsets.rs
+++ b/core/server/src/shard/system/consumer_offsets.rs
@@ -258,26 +258,38 @@ impl IggyShard {
     ) -> Result<(), IggyError> {
         match polling_consumer {
             PollingConsumer::Consumer(id, _) => {
-                self.streams2
-                    .with_partition_by_id_async(
-                        stream_id,
-                        topic_id,
-                        partition_id,
-                        
partitions::helpers::persist_consumer_offset_to_disk(self.id, *id),
-                    )
-                    .await
+                let (offset_value, path) = self.streams2.with_partition_by_id(
+                    stream_id,
+                    topic_id,
+                    partition_id,
+                    |(.., offsets, _, _)| {
+                        let hdl = offsets.pin();
+                        let item = hdl
+                            .get(id)
+                            .expect("persist_consumer_offset_to_disk: offset 
not found");
+                        let offset = 
item.offset.load(std::sync::atomic::Ordering::Relaxed);
+                        let path = item.path.clone();
+                        (offset, path)
+                    },
+                );
+                partitions::storage2::persist_offset(self.id, &path, 
offset_value).await
             }
             PollingConsumer::ConsumerGroup(_, id) => {
-                self.streams2
-                    .with_partition_by_id_async(
-                        stream_id,
-                        topic_id,
-                        partition_id,
-                        
partitions::helpers::persist_consumer_group_member_offset_to_disk(
-                            self.id, *id,
-                        ),
-                    )
-                    .await
+                let (offset_value, path) = self.streams2.with_partition_by_id(
+                    stream_id,
+                    topic_id,
+                    partition_id,
+                    |(.., offsets, _)| {
+                        let hdl = offsets.pin();
+                        let item = hdl.get(id).expect(
+                            "persist_consumer_group_member_offset_to_disk: 
offset not found",
+                        );
+                        let offset = 
item.offset.load(std::sync::atomic::Ordering::Relaxed);
+                        let path = item.path.clone();
+                        (offset, path)
+                    },
+                );
+                partitions::storage2::persist_offset(self.id, &path, 
offset_value).await
             }
         }
     }
diff --git a/core/server/src/shard/system/messages.rs 
b/core/server/src/shard/system/messages.rs
index 320ce902..211d4a54 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -281,16 +281,23 @@ impl IggyShard {
                                         &self.config.system,
                                     ),
                                 );
-                                self.streams2
-                                    .with_partition_by_id_async(
-                                        &stream_id,
-                                        &topic_id,
-                                        partition_id,
-                                        
partitions::helpers::persist_consumer_offset_to_disk(
-                                            self.id,
-                                            consumer_id,
-                                        ),
-                                    )
+
+                                let (offset_value, path) = 
self.streams2.with_partition_by_id(
+                                    &stream_id,
+                                    &topic_id,
+                                    partition_id,
+                                    |(.., offsets, _, _)| {
+                                        let hdl = offsets.pin();
+                                        let item = 
hdl.get(&consumer_id).expect(
+                                            "persist_consumer_offset_to_disk: 
offset not found",
+                                        );
+                                        let offset =
+                                            
item.offset.load(std::sync::atomic::Ordering::Relaxed);
+                                        let path = item.path.clone();
+                                        (offset, path)
+                                    },
+                                );
+                                partitions::storage2::persist_offset(self.id, 
&path, offset_value)
                                     .await?;
                             }
                             PollingConsumer::ConsumerGroup(cg_id, _) => {
@@ -307,16 +314,23 @@ impl IggyShard {
                                         &self.config.system,
                                     ),
                                 );
-                                self.streams2.with_partition_by_id_async(
+
+                                let (offset_value, path) = 
self.streams2.with_partition_by_id(
                                     &stream_id,
                                     &topic_id,
                                     partition_id,
-                                    
partitions::helpers::persist_consumer_group_member_offset_to_disk(
-                                        self.id,
-                                        cg_id,
-                                    ),
-                                )
-                                .await?;
+                                    |(.., offsets, _)| {
+                                        let hdl = offsets.pin();
+                                        let item = hdl
+                                            .get(&cg_id)
+                                            
.expect("persist_consumer_group_member_offset_to_disk: offset not found");
+                                        let offset = 
item.offset.load(std::sync::atomic::Ordering::Relaxed);
+                                        let path = item.path.clone();
+                                        (offset, path)
+                                    },
+                                );
+                                partitions::storage2::persist_offset(self.id, 
&path, offset_value)
+                                    .await?;
                             }
                         }
                     }
diff --git a/core/server/src/slab/consumer_groups.rs 
b/core/server/src/slab/consumer_groups.rs
index 683c3a0e..5f98ac5d 100644
--- a/core/server/src/slab/consumer_groups.rs
+++ b/core/server/src/slab/consumer_groups.rs
@@ -3,7 +3,7 @@ use crate::{
         Keyed, consumer_groups,
         traits_ext::{
             Borrow, ComponentsById, Delete, EntityComponentSystem, 
EntityComponentSystemMut,
-            EntityMarker, Insert, IntoComponents, IntoComponentsById,
+            Insert, IntoComponents,
         },
     },
     streaming::topics::consumer_group2::{self, ConsumerGroupRef, 
ConsumerGroupRefMut},
@@ -81,13 +81,6 @@ impl EntityComponentSystem<Borrow> for ConsumerGroups {
     {
         f(self.into())
     }
-
-    fn with_components_async<O, F>(&self, f: F) -> impl Future<Output = O>
-    where
-        F: for<'a> AsyncFnOnce(Self::EntityComponents<'a>) -> O,
-    {
-        f(self.into())
-    }
 }
 
 impl EntityComponentSystemMut for ConsumerGroups {
@@ -146,15 +139,6 @@ impl ConsumerGroups {
         let id = self.get_index(identifier);
         self.with_components_by_id_mut(id, |components| f(components))
     }
-
-    pub fn with_consumer_group_by_id_async<T>(
-        &self,
-        identifier: &Identifier,
-        f: impl AsyncFnOnce(ComponentsById<ConsumerGroupRef>) -> T,
-    ) -> impl Future<Output = T> {
-        let id = self.get_index(identifier);
-        self.with_components_by_id_async(id, async |components| 
f(components).await)
-    }
 }
 
 impl Default for ConsumerGroups {
diff --git a/core/server/src/slab/helpers.rs b/core/server/src/slab/helpers.rs
index 31a5a8b7..d2fe7487 100644
--- a/core/server/src/slab/helpers.rs
+++ b/core/server/src/slab/helpers.rs
@@ -1,13 +1,10 @@
 use crate::{
     slab::{
-        consumer_groups::ConsumerGroups,
-        partitions::{self, ContainerId, Partitions},
-        topics::Topics,
+        consumer_groups::ConsumerGroups, partitions::Partitions, 
topics::Topics,
         traits_ext::ComponentsById,
     },
     streaming::{
-        partitions::log::Log,
-        streams::stream2::{StreamRef, StreamRefMut},
+        streams::stream2::StreamRef,
         topics::topic2::{TopicRef, TopicRefMut},
     },
 };
@@ -26,7 +23,6 @@ where
 {
     async |(root, ..)| f(root.topics()).await
 }
-
 pub fn topics_mut<O, F>(f: F) -> impl FnOnce(ComponentsById<StreamRef>) -> O
 where
     F: for<'a> FnOnce(&'a Topics) -> O,
@@ -47,7 +43,6 @@ where
 {
     async |(root, ..)| f(root.partitions()).await
 }
-
 pub fn partitions_mut<O, F>(f: F) -> impl FnOnce(ComponentsById<TopicRefMut>) 
-> O
 where
     F: for<'a> FnOnce(&'a mut Partitions) -> O,
@@ -68,10 +63,3 @@ where
 {
     |(mut root, ..)| f(root.consumer_groups_mut())
 }
-
-pub fn consumer_groups_async<O, F>(f: F) -> impl 
AsyncFnOnce(ComponentsById<TopicRef>) -> O
-where
-    F: for<'a> AsyncFnOnce(&'a ConsumerGroups) -> O,
-{
-    async |(root, ..)| f(root.consumer_groups()).await
-}
diff --git a/core/server/src/slab/partitions.rs 
b/core/server/src/slab/partitions.rs
index 678846c3..b3d3fda9 100644
--- a/core/server/src/slab/partitions.rs
+++ b/core/server/src/slab/partitions.rs
@@ -1,7 +1,7 @@
 use crate::{
     slab::traits_ext::{
         Borrow, ComponentsById, Delete, EntityComponentSystem, 
EntityComponentSystemMut, Insert,
-        IntoComponents, IntoComponentsById,
+        IntoComponents,
     },
     streaming::{
         deduplication::message_deduplicator::MessageDeduplicator,
@@ -17,10 +17,7 @@ use crate::{
     },
 };
 use slab::Slab;
-use std::{
-    future::Future,
-    sync::{Arc, atomic::AtomicU64},
-};
+use std::sync::{Arc, atomic::AtomicU64};
 
 // TODO: This could be upper limit of partitions per topic, use that value to 
validate instead of whathever this thing is in `common` crate.
 pub const PARTITIONS_CAPACITY: usize = 16384;
@@ -30,7 +27,7 @@ pub type ContainerId = usize;
 pub struct Partitions {
     root: Slab<partition2::PartitionRoot>,
     stats: Slab<Arc<PartitionStats>>,
-    message_deduplicator: Slab<Option<MessageDeduplicator>>,
+    message_deduplicator: Slab<Option<Arc<MessageDeduplicator>>>,
     offset: Slab<Arc<AtomicU64>>,
 
     consumer_offset: Slab<Arc<ConsumerOffsets>>,
@@ -168,12 +165,6 @@ impl EntityComponentSystem<Borrow> for Partitions {
         f(self.into())
     }
 
-    fn with_components_async<O, F>(&self, f: F) -> impl Future<Output = O>
-    where
-        F: for<'a> AsyncFnOnce(Self::EntityComponents<'a>) -> O,
-    {
-        f(self.into())
-    }
 }
 
 impl EntityComponentSystemMut for Partitions {
@@ -230,11 +221,4 @@ impl Partitions {
         self.with_components_by_id_mut(id, |components| f(components))
     }
 
-    pub fn with_partition_by_id_async<T>(
-        &self,
-        id: ContainerId,
-        f: impl AsyncFnOnce(ComponentsById<PartitionRef>) -> T,
-    ) -> impl Future<Output = T> {
-        self.with_components_by_id_async(id, async move |components| 
f(components).await)
-    }
 }
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index 7fadce50..decc6b63 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -1,5 +1,8 @@
 use crate::shard::task_registry::TaskRegistry;
+use crate::shard_trace;
 use crate::streaming::partitions as streaming_partitions;
+use crate::streaming::segments::IggyIndexesMut;
+use crate::streaming::segments::storage::Storage;
 use crate::{
     binary::handlers::messages::poll_messages_handler::IggyPollMetadata,
     configs::{cache_indexes::CacheIndexesConfig, system::SystemConfig},
@@ -39,6 +42,7 @@ use crate::{
     },
 };
 use ahash::AHashMap;
+use error_set::ErrContext;
 use iggy_common::{Identifier, IggyError, IggyTimestamp, PollingKind};
 use slab::Slab;
 use std::{
@@ -141,13 +145,6 @@ impl EntityComponentSystem<InteriorMutability> for Streams 
{
     {
         f(self.into())
     }
-
-    fn with_components_async<O, F>(&self, f: F) -> impl Future<Output = O>
-    where
-        F: for<'a> AsyncFnOnce(Self::EntityComponents<'a>) -> O,
-    {
-        f(self.into())
-    }
 }
 
 impl EntityComponentSystemMutCell for Streams {
@@ -192,17 +189,21 @@ impl MainOps for Streams {
             self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {
                 log.journal().inner().size + log.active_segment().size
             });
-        self.with_partition_by_id_async(
+        let (segment_start_offset, message_deduplicator) = 
self.with_partition_by_id(
             stream_id,
             topic_id,
             partition_id,
-            streaming_partitions::helpers::deduplicate_messages(
+            
streaming_partitions::helpers::get_segment_start_offset_and_deduplicator(),
+        );
+
+        input
+            .prepare_for_persistence(
+                segment_start_offset,
                 current_offset,
                 current_position,
-                &mut input,
-            ),
-        )
-        .await;
+                message_deduplicator.as_ref(),
+            )
+            .await;
 
         let (journal_messages_count, journal_size) = 
self.with_partition_by_id_mut(
             stream_id,
@@ -452,15 +453,6 @@ impl Streams {
         self.with_components_by_id(id, |stream| f(stream))
     }
 
-    pub fn with_stream_by_id_async<T>(
-        &self,
-        id: &Identifier,
-        f: impl AsyncFnOnce(ComponentsById<StreamRef>) -> T,
-    ) -> impl Future<Output = T> {
-        let id = self.get_index(id);
-        self.with_components_by_id_async(id, async |stream| f(stream).await)
-    }
-
     pub fn with_stream_by_id_mut<T>(
         &self,
         id: &Identifier,
@@ -474,14 +466,6 @@ impl Streams {
         self.with_stream_by_id(stream_id, helpers::topics(f))
     }
 
-    pub fn with_topics_async<T>(
-        &self,
-        stream_id: &Identifier,
-        f: impl AsyncFnOnce(&Topics) -> T,
-    ) -> impl Future<Output = T> {
-        self.with_stream_by_id_async(stream_id, helpers::topics_async(f))
-    }
-
     pub fn with_topics_mut<T>(&self, stream_id: &Identifier, f: impl 
FnOnce(&Topics) -> T) -> T {
         self.with_stream_by_id(stream_id, helpers::topics_mut(f))
     }
@@ -497,17 +481,6 @@ impl Streams {
         })
     }
 
-    pub fn with_topic_by_id_async<T>(
-        &self,
-        stream_id: &Identifier,
-        topic_id: &Identifier,
-        f: impl AsyncFnOnce(ComponentsById<TopicRef>) -> T,
-    ) -> impl Future<Output = T> {
-        self.with_topics_async(stream_id, async |container| {
-            container.with_topic_by_id_async(topic_id, f).await
-        })
-    }
-
     pub fn with_topic_by_id_mut<T>(
         &self,
         stream_id: &Identifier,
@@ -530,17 +503,6 @@ impl Streams {
         })
     }
 
-    pub fn with_consumer_groups_async<T>(
-        &self,
-        stream_id: &Identifier,
-        topic_id: &Identifier,
-        f: impl AsyncFnOnce(&ConsumerGroups) -> T,
-    ) -> impl Future<Output = T> {
-        self.with_topics_async(stream_id, async |container| {
-            container.with_consumer_groups_async(topic_id, f).await
-        })
-    }
-
     pub fn with_consumer_group_by_id<T>(
         &self,
         stream_id: &Identifier,
@@ -565,18 +527,6 @@ impl Streams {
         })
     }
 
-    pub fn with_consumer_group_by_id_async<T>(
-        &self,
-        stream_id: &Identifier,
-        topic_id: &Identifier,
-        group_id: &Identifier,
-        f: impl AsyncFnOnce(ComponentsById<ConsumerGroupRef>) -> T,
-    ) -> impl Future<Output = T> {
-        self.with_consumer_groups_async(stream_id, topic_id, async move 
|container| {
-            container.with_consumer_group_by_id_async(group_id, f).await
-        })
-    }
-
     pub fn with_consumer_groups_mut<T>(
         &self,
         stream_id: &Identifier,
@@ -599,17 +549,6 @@ impl Streams {
         })
     }
 
-    pub fn with_partitions_async<T>(
-        &self,
-        stream_id: &Identifier,
-        topic_id: &Identifier,
-        f: impl AsyncFnOnce(&Partitions) -> T,
-    ) -> impl Future<Output = T> {
-        self.with_topics_async(stream_id, async |container| {
-            container.with_partitions_async(topic_id, f).await
-        })
-    }
-
     pub fn with_partitions_mut<T>(
         &self,
         stream_id: &Identifier,
@@ -633,18 +572,6 @@ impl Streams {
         })
     }
 
-    pub fn with_partition_by_id_async<T>(
-        &self,
-        stream_id: &Identifier,
-        topic_id: &Identifier,
-        id: partitions::ContainerId,
-        f: impl AsyncFnOnce(ComponentsById<PartitionRef>) -> T,
-    ) -> impl Future<Output = T> {
-        self.with_partitions_async(stream_id, topic_id, async move |container| 
{
-            container.with_partition_by_id_async(id, f).await
-        })
-    }
-
     pub fn with_partition_by_id_mut<T>(
         &self,
         stream_id: &Identifier,
@@ -677,13 +604,270 @@ impl Streams {
             helpers::get_segment_range_by_offset(offset),
         );
 
-        self.with_partition_by_id_async(
+        let mut remaining_count = count;
+        let mut batches = IggyMessagesBatchSet::empty();
+        let mut current_offset = offset;
+
+        for idx in range {
+            let (segment_start_offset, segment_end_offset) = 
self.with_partition_by_id(
+                stream_id,
+                topic_id,
+                partition_id,
+                |(_, _, _, _, _, _, log)| {
+                    let segment = &log.segments()[idx];
+                    (segment.start_offset, segment.end_offset)
+                },
+            );
+
+            let start_offset = if current_offset < segment_start_offset {
+                segment_start_offset
+            } else {
+                current_offset
+            };
+
+            let mut end_offset = start_offset + (remaining_count - 1) as u64;
+            if end_offset > segment_end_offset {
+                end_offset = segment_end_offset;
+            }
+
+            let count: u32 = ((end_offset - start_offset + 1) as 
u32).min(remaining_count);
+
+            let messages = self
+                .get_messages_by_offset_base(
+                    stream_id,
+                    topic_id,
+                    partition_id,
+                    idx,
+                    start_offset,
+                    end_offset,
+                    count,
+                    segment_start_offset,
+                )
+                .await?;
+
+            let messages_count = messages.count();
+            if messages_count == 0 {
+                current_offset = segment_end_offset + 1;
+                continue;
+            }
+
+            remaining_count = remaining_count.saturating_sub(messages_count);
+
+            if let Some(last_offset) = messages.last_offset() {
+                current_offset = last_offset + 1;
+            } else if messages_count > 0 {
+                current_offset += messages_count as u64;
+            }
+
+            batches.add_batch_set(messages);
+
+            if remaining_count == 0 {
+                break;
+            }
+        }
+
+        Ok(batches)
+    }
+
+    async fn get_messages_by_offset_base(
+        &self,
+        stream_id: &Identifier,
+        topic_id: &Identifier,
+        partition_id: partitions::ContainerId,
+        idx: usize,
+        offset: u64,
+        end_offset: u64,
+        count: u32,
+        segment_start_offset: u64,
+    ) -> Result<IggyMessagesBatchSet, IggyError> {
+        if count == 0 {
+            return Ok(IggyMessagesBatchSet::default());
+        }
+
+        let (is_journal_empty, journal_first_offset, journal_last_offset) = 
self
+            .with_partition_by_id(
+                stream_id,
+                topic_id,
+                partition_id,
+                |(_, _, _, _, _, _, log)| {
+                    let journal = log.journal();
+                    (
+                        journal.is_empty(),
+                        journal.inner().base_offset,
+                        journal.inner().current_offset,
+                    )
+                },
+            );
+
+        // Case 0: Accumulator is empty, so all messages have to be on disk
+        if is_journal_empty {
+            return self
+                .load_messages_from_disk_by_offset(
+                    stream_id,
+                    topic_id,
+                    partition_id,
+                    idx,
+                    offset,
+                    count,
+                    segment_start_offset,
+                )
+                .await;
+        }
+
+        // Case 1: All messages are in accumulator buffer
+        if offset >= journal_first_offset && end_offset <= journal_last_offset 
{
+            let batches = self.with_partition_by_id(
+                stream_id,
+                topic_id,
+                partition_id,
+                |(_, _, _, _, _, _, log)| {
+                    log.journal()
+                        .get(|batches| batches.get_by_offset(offset, count))
+                },
+            );
+            return Ok(batches);
+        }
+
+        // Case 2: All messages are on disk
+        if end_offset < journal_first_offset {
+            return self
+                .load_messages_from_disk_by_offset(
+                    stream_id,
+                    topic_id,
+                    partition_id,
+                    idx,
+                    offset,
+                    count,
+                    segment_start_offset,
+                )
+                .await;
+        }
+
+        // Case 3: Messages span disk and accumulator buffer boundary
+        // Calculate how many messages we need from disk
+        let disk_count = if offset < journal_first_offset {
+            ((journal_first_offset - offset) as u32).min(count)
+        } else {
+            0
+        };
+        let mut combined_batch_set = IggyMessagesBatchSet::empty();
+
+        // Load messages from disk if needed
+        if disk_count > 0 {
+            let disk_messages = self
+                .load_messages_from_disk_by_offset(
+                    stream_id,
+                    topic_id,
+                    partition_id,
+                    idx,
+                    offset,
+                    disk_count,
+                    segment_start_offset,
+                )
+                .await
+                .with_error_context(|error| {
+                    format!("Failed to load messages from disk, start offset: 
{offset}, count: {disk_count}, error: {error}")
+                })?;
+
+            if !disk_messages.is_empty() {
+                combined_batch_set.add_batch_set(disk_messages);
+            }
+        }
+
+        // Calculate how many more messages we need from the accumulator
+        let remaining_count = count - combined_batch_set.count();
+
+        if remaining_count > 0 {
+            let accumulator_start_offset = std::cmp::max(offset, 
journal_first_offset);
+            let journal_messages = self.with_partition_by_id(
+                stream_id,
+                topic_id,
+                partition_id,
+                |(_, _, _, _, _, _, log)| {
+                    log.journal().get(|batches| {
+                        batches.get_by_offset(accumulator_start_offset, 
remaining_count)
+                    })
+                },
+            );
+
+            if !journal_messages.is_empty() {
+                combined_batch_set.add_batch_set(journal_messages);
+            }
+        }
+
+        Ok(combined_batch_set)
+    }
+
+    async fn load_messages_from_disk_by_offset(
+        &self,
+        stream_id: &Identifier,
+        topic_id: &Identifier,
+        partition_id: partitions::ContainerId,
+        idx: usize,
+        start_offset: u64,
+        count: u32,
+        segment_start_offset: u64,
+    ) -> Result<IggyMessagesBatchSet, IggyError> {
+        let relative_start_offset = (start_offset - segment_start_offset) as 
u32;
+
+        let (index_reader, messages_reader, indexes) = 
self.with_partition_by_id(
             stream_id,
             topic_id,
             partition_id,
-            helpers::get_messages_by_offset_range(offset, count, range),
-        )
-        .await
+            |(_, _, _, _, _, _, log)| {
+                let index_reader = log.storages()[idx]
+                    .index_reader
+                    .as_ref()
+                    .expect("Index reader not initialized")
+                    .clone();
+                let message_reader = log.storages()[idx]
+                    .messages_reader
+                    .as_ref()
+                    .expect("Messages reader not initialized")
+                    .clone();
+                let indexes = log.indexes()[idx].as_ref().map(|indexes| {
+                    indexes
+                        .slice_by_offset(relative_start_offset, count)
+                        .unwrap_or_default()
+                });
+                (index_reader, message_reader, indexes)
+            },
+        );
+
+        let indexes_to_read = if let Some(indexes) = indexes {
+            if !indexes.is_empty() {
+                Some(indexes)
+            } else {
+                index_reader
+                    .as_ref()
+                    .load_from_disk_by_offset(relative_start_offset, count)
+                    .await?
+            }
+        } else {
+            index_reader
+                .as_ref()
+                .load_from_disk_by_offset(relative_start_offset, count)
+                .await?
+        };
+
+        if indexes_to_read.is_none() {
+            return Ok(IggyMessagesBatchSet::empty());
+        }
+
+        let indexes_to_read = indexes_to_read.unwrap();
+        let batch = messages_reader
+            .as_ref()
+            .load_messages_from_disk(indexes_to_read)
+            .await
+            .with_error_context(|error| format!("Failed to load messages from 
disk: {error}"))?;
+
+        batch
+            .validate_checksums_and_offsets(start_offset)
+            .with_error_context(|error| {
+                format!("Failed to validate messages read from disk! error: 
{error}")
+            })?;
+
+        Ok(IggyMessagesBatchSet::from(batch))
     }
 
     pub async fn get_messages_by_timestamp(
@@ -704,13 +888,211 @@ impl Streams {
             return Ok(IggyMessagesBatchSet::default());
         };
 
-        self.with_partition_by_id_async(
+        let mut remaining_count = count;
+        let mut batches = IggyMessagesBatchSet::empty();
+
+        for idx in range {
+            let segment_end_timestamp = self.with_partition_by_id(
+                stream_id,
+                topic_id,
+                partition_id,
+                |(_, _, _, _, _, _, log)| {
+                    let segment = &log.segments()[idx];
+                    segment.end_timestamp
+                },
+            );
+
+            if segment_end_timestamp < timestamp {
+                continue;
+            }
+
+            let messages = self
+                .get_messages_by_timestamp_base(
+                    stream_id,
+                    topic_id,
+                    partition_id,
+                    idx,
+                    timestamp,
+                    remaining_count,
+                )
+                .await?;
+
+            let messages_count = messages.count();
+            if messages_count == 0 {
+                continue;
+            }
+
+            remaining_count = remaining_count.saturating_sub(messages_count);
+            batches.add_batch_set(messages);
+
+            if remaining_count == 0 {
+                break;
+            }
+        }
+
+        Ok(batches)
+    }
+
+    async fn get_messages_by_timestamp_base(
+        &self,
+        stream_id: &Identifier,
+        topic_id: &Identifier,
+        partition_id: partitions::ContainerId,
+        idx: usize,
+        timestamp: u64,
+        count: u32,
+    ) -> Result<IggyMessagesBatchSet, IggyError> {
+        if count == 0 {
+            return Ok(IggyMessagesBatchSet::default());
+        }
+
+        let (is_journal_empty, journal_first_timestamp, 
journal_last_timestamp) = self
+            .with_partition_by_id(
+                stream_id,
+                topic_id,
+                partition_id,
+                |(_, _, _, _, _, _, log)| {
+                    let journal = log.journal();
+                    (
+                        journal.is_empty(),
+                        journal.inner().first_timestamp,
+                        journal.inner().end_timestamp,
+                    )
+                },
+            );
+
+        // Case 0: Accumulator is empty, so all messages have to be on disk
+        if is_journal_empty {
+            return self
+                .load_messages_from_disk_by_timestamp(
+                    stream_id,
+                    topic_id,
+                    partition_id,
+                    idx,
+                    timestamp,
+                    count,
+                )
+                .await;
+        }
+
+        // Case 1: All messages are in accumulator buffer (timestamp is after 
journal ends)
+        if timestamp > journal_last_timestamp {
+            return Ok(IggyMessagesBatchSet::empty());
+        }
+
+        // Case 1b: Timestamp is within journal range
+        if timestamp >= journal_first_timestamp {
+            let batches = self.with_partition_by_id(
+                stream_id,
+                topic_id,
+                partition_id,
+                |(_, _, _, _, _, _, log)| {
+                    log.journal()
+                        .get(|batches| batches.get_by_timestamp(timestamp, 
count))
+                },
+            );
+            return Ok(batches);
+        }
+
+        // Case 2: All messages are on disk (timestamp is before journal's 
first timestamp)
+        let disk_messages = self
+            .load_messages_from_disk_by_timestamp(
+                stream_id,
+                topic_id,
+                partition_id,
+                idx,
+                timestamp,
+                count,
+            )
+            .await?;
+
+        if disk_messages.count() >= count {
+            return Ok(disk_messages);
+        }
+
+        // Case 3: Messages span disk and accumulator buffer boundary
+        let remaining_count = count - disk_messages.count();
+        let journal_messages = self.with_partition_by_id(
             stream_id,
             topic_id,
             partition_id,
-            helpers::get_messages_by_timestamp_range(timestamp, count, range),
-        )
-        .await
+            |(_, _, _, _, _, _, log)| {
+                log.journal()
+                    .get(|batches| batches.get_by_timestamp(timestamp, 
remaining_count))
+            },
+        );
+
+        let mut combined_batch_set = disk_messages;
+        if !journal_messages.is_empty() {
+            combined_batch_set.add_batch_set(journal_messages);
+        }
+        Ok(combined_batch_set)
+    }
+
+    async fn load_messages_from_disk_by_timestamp(
+        &self,
+        stream_id: &Identifier,
+        topic_id: &Identifier,
+        partition_id: partitions::ContainerId,
+        idx: usize,
+        timestamp: u64,
+        count: u32,
+    ) -> Result<IggyMessagesBatchSet, IggyError> {
+        let (index_reader, messages_reader, indexes) = 
self.with_partition_by_id(
+            stream_id,
+            topic_id,
+            partition_id,
+            |(_, _, _, _, _, _, log)| {
+                let index_reader = log.storages()[idx]
+                    .index_reader
+                    .as_ref()
+                    .expect("Index reader not initialized")
+                    .clone();
+                let messages_reader = log.storages()[idx]
+                    .messages_reader
+                    .as_ref()
+                    .expect("Messages reader not initialized")
+                    .clone();
+                let indexes = log.indexes()[idx].as_ref().map(|indexes| {
+                    indexes
+                        .slice_by_timestamp(timestamp, count)
+                        .unwrap_or_default()
+                });
+                (index_reader, messages_reader, indexes)
+            },
+        );
+
+        let indexes_to_read = if let Some(indexes) = indexes {
+            if !indexes.is_empty() {
+                Some(indexes)
+            } else {
+                index_reader
+                    .as_ref()
+                    .load_from_disk_by_timestamp(timestamp, count)
+                    .await?
+            }
+        } else {
+            index_reader
+                .as_ref()
+                .load_from_disk_by_timestamp(timestamp, count)
+                .await?
+        };
+
+        if indexes_to_read.is_none() {
+            return Ok(IggyMessagesBatchSet::empty());
+        }
+
+        let indexes_to_read = indexes_to_read.unwrap();
+
+        let batch = messages_reader
+            .as_ref()
+            .load_messages_from_disk(indexes_to_read)
+            .await
+            .with_error_context(|error| {
+                format!("Failed to load messages from disk by timestamp: 
{error}")
+            })?;
+
+        Ok(IggyMessagesBatchSet::from(batch))
     }
 
     pub async fn handle_full_segment(
@@ -844,21 +1226,70 @@ impl Streams {
             streaming_partitions::helpers::commit_journal(),
         );
 
-        let (saved, batch_count) = self
-            .with_partition_by_id_async(
-                stream_id,
-                topic_id,
-                partition_id,
-                streaming_partitions::helpers::persist_batch(
-                    shard_id,
-                    stream_id,
-                    topic_id,
-                    partition_id,
-                    batches,
-                    reason.to_string(),
-                ),
-            )
-            .await?;
+        shard_trace!(
+            shard_id,
+            "Persisting messages on disk for stream ID: {}, topic ID: {}, 
partition ID: {} because {}...",
+            stream_id,
+            topic_id,
+            partition_id,
+            reason
+        );
+
+        let batch_count = batches.count();
+        let batch_size = batches.size();
+
+        // Extract storage before async operations
+        let (messages_writer, index_writer) =
+            self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {
+                (
+                    log.active_storage()
+                        .messages_writer
+                        .as_ref()
+                        .expect("Messages writer not initialized")
+                        .clone(),
+                    log.active_storage()
+                        .index_writer
+                        .as_ref()
+                        .expect("Index writer not initialized")
+                        .clone(),
+                )
+            });
+
+        let saved = messages_writer
+            .as_ref()
+            .save_batch_set(batches)
+            .await
+            .with_error_context(|error| {
+                format!(
+                    "Failed to save batch of {batch_count} messages \
+                    ({batch_size} bytes) to stream ID: {stream_id}, topic ID: 
{topic_id}, partition ID: {partition_id}. {error}",
+                )
+            })?;
+
+        // Extract unsaved indexes before async operation
+        let unsaved_indexes_slice =
+            self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {
+                log.active_indexes().unwrap().unsaved_slice()
+            });
+
+        let indexes_len = unsaved_indexes_slice.len();
+        index_writer
+            .as_ref()
+            .save_indexes(unsaved_indexes_slice)
+            .await
+            .with_error_context(|error| {
+                format!("Failed to save index of {indexes_len} indexes to 
stream ID: {stream_id}, topic ID: {topic_id} {partition_id}. {error}",)
+            })?;
+
+        shard_trace!(
+            shard_id,
+            "Persisted {} messages on disk for stream ID: {}, topic ID: {}, 
for partition with ID: {}, total bytes written: {}.",
+            batch_count,
+            stream_id,
+            topic_id,
+            partition_id,
+            saved
+        );
 
         self.with_partition_by_id_mut(
             stream_id,
@@ -880,57 +1311,35 @@ impl Streams {
         topic_id: &Identifier,
         partition_id: usize,
     ) -> Result<(), IggyError> {
-        let has_storage =
-            self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {
-                let storage = log.active_storage();
-                storage.messages_writer.is_some() && 
storage.index_writer.is_some()
-            });
+        let storage = self.with_partition_by_id(stream_id, topic_id, 
partition_id, |(.., log)| {
+            log.active_storage().clone()
+        });
 
-        if !has_storage {
+        if storage.messages_writer.is_none() || storage.index_writer.is_none() 
{
             return Ok(());
         }
 
-        self.with_partition_by_id_async(
-            stream_id,
-            topic_id,
-            partition_id,
-            async move |(.., log)| {
-                let storage = log.active_storage();
-
-                if let Some(ref messages_writer) = storage.messages_writer {
-                    if let Err(e) = messages_writer.fsync().await {
-                        tracing::error!(
-                            "Failed to fsync messages writer for partition {}: 
{}",
-                            partition_id,
-                            e
-                        );
-                        return Err(e);
-                    }
-                }
-                Ok(())
-            },
-        )
-        .await?;
+        if let Some(ref messages_writer) = storage.messages_writer {
+            if let Err(e) = messages_writer.fsync().await {
+                tracing::error!(
+                    "Failed to fsync messages writer for partition {}: {}",
+                    partition_id,
+                    e
+                );
+                return Err(e);
+            }
+        }
 
-        self.with_partition_by_id_async(
-            stream_id,
-            topic_id,
-            partition_id,
-            async move |(.., log)| {
-                if let Some(ref index_writer) = 
log.active_storage().index_writer {
-                    if let Err(e) = index_writer.fsync().await {
-                        tracing::error!(
-                            "Failed to fsync index writer for partition {}: 
{}",
-                            partition_id,
-                            e
-                        );
-                        return Err(e);
-                    }
-                }
-                Ok(())
-            },
-        )
-        .await?;
+        if let Some(ref index_writer) = storage.index_writer {
+            if let Err(e) = index_writer.fsync().await {
+                tracing::error!(
+                    "Failed to fsync index writer for partition {}: {}",
+                    partition_id,
+                    e
+                );
+                return Err(e);
+            }
+        }
 
         Ok(())
     }
diff --git a/core/server/src/slab/topics.rs b/core/server/src/slab/topics.rs
index 7508d675..100b4e1f 100644
--- a/core/server/src/slab/topics.rs
+++ b/core/server/src/slab/topics.rs
@@ -11,14 +11,10 @@ use crate::{
         partitions::Partitions,
         traits_ext::{
             ComponentsById, DeleteCell, EntityComponentSystem, 
EntityComponentSystemMutCell,
-            Insert, InsertCell, InteriorMutability, IntoComponents,
+            InsertCell, InteriorMutability, IntoComponents,
         },
     },
     streaming::{
-        partitions::{
-            journal::MemoryMessageJournal,
-            log::{Log, SegmentedLog},
-        },
         stats::stats::TopicStats,
         topics::{
             consumer_group2::{ConsumerGroupRef, ConsumerGroupRefMut},
@@ -133,13 +129,6 @@ impl EntityComponentSystem<InteriorMutability> for Topics {
     {
         f(self.into())
     }
-
-    fn with_components_async<O, F>(&self, f: F) -> impl Future<Output = O>
-    where
-        F: for<'a> AsyncFnOnce(Self::EntityComponents<'a>) -> O,
-    {
-        f(self.into())
-    }
 }
 
 impl EntityComponentSystemMutCell for Topics {
@@ -206,15 +195,6 @@ impl Topics {
         self.with_components_by_id(id, |components| f(components))
     }
 
-    pub fn with_topic_by_id_async<T>(
-        &self,
-        topic_id: &Identifier,
-        f: impl AsyncFnOnce(ComponentsById<TopicRef>) -> T,
-    ) -> impl Future<Output = T> {
-        let id = self.get_index(topic_id);
-        self.with_components_by_id_async(id, async |components| 
f(components).await)
-    }
-
     pub fn with_topic_by_id_mut<T>(
         &self,
         topic_id: &Identifier,
@@ -232,14 +212,6 @@ impl Topics {
         self.with_topic_by_id(topic_id, helpers::consumer_groups(f))
     }
 
-    pub fn with_consumer_groups_async<T>(
-        &self,
-        topic_id: &Identifier,
-        f: impl AsyncFnOnce(&ConsumerGroups) -> T,
-    ) -> impl Future<Output = T> {
-        self.with_topic_by_id_async(topic_id, 
helpers::consumer_groups_async(f))
-    }
-
     pub fn with_consumer_groups_mut<T>(
         &self,
         topic_id: &Identifier,
@@ -259,17 +231,6 @@ impl Topics {
         })
     }
 
-    pub fn with_consumer_group_by_id_async<T>(
-        &self,
-        topic_id: &Identifier,
-        group_id: &Identifier,
-        f: impl AsyncFnOnce(ComponentsById<ConsumerGroupRef>) -> T,
-    ) -> impl Future<Output = T> {
-        self.with_consumer_groups_async(topic_id, async |container| {
-            container.with_consumer_group_by_id_async(group_id, f).await
-        })
-    }
-
     pub fn with_consumer_group_by_id_mut<T>(
         &self,
         topic_id: &Identifier,
@@ -285,14 +246,6 @@ impl Topics {
         self.with_topic_by_id(topic_id, helpers::partitions(f))
     }
 
-    pub fn with_partitions_async<T>(
-        &self,
-        topic_id: &Identifier,
-        f: impl AsyncFnOnce(&Partitions) -> T,
-    ) -> impl Future<Output = T> {
-        self.with_topic_by_id_async(topic_id, helpers::partitions_async(f))
-    }
-
     pub fn with_partitions_mut<T>(
         &self,
         topic_id: &Identifier,
diff --git a/core/server/src/slab/traits_ext.rs 
b/core/server/src/slab/traits_ext.rs
index 949971fa..d69dfd85 100644
--- a/core/server/src/slab/traits_ext.rs
+++ b/core/server/src/slab/traits_ext.rs
@@ -188,23 +188,12 @@ where
     where
         F: for<'a> FnOnce(Self::EntityComponents<'a>) -> O;
 
-    fn with_components_async<O, F>(&self, f: F) -> impl Future<Output = O>
-    where
-        F: for<'a> AsyncFnOnce(Self::EntityComponents<'a>) -> O;
-
     fn with_components_by_id<O, F>(&self, id: Self::Idx, f: F) -> O
     where
         F: for<'a> FnOnce(ComponentsById<'a, Self::EntityComponents<'a>>) -> O,
     {
         self.with_components(|components| 
f(components.into_components_by_id(id)))
     }
-
-    fn with_components_by_id_async<O, F>(&self, id: Self::Idx, f: F) -> impl 
Future<Output = O>
-    where
-        F: for<'a> AsyncFnOnce(ComponentsById<'a, Self::EntityComponents<'a>>) 
-> O,
-    {
-        self.with_components_async(async |components| 
f(components.into_components_by_id(id)).await)
-    }
 }
 
 pub trait EntityComponentSystemMut: EntityComponentSystem<Borrow> {
diff --git a/core/server/src/streaming/partitions/helpers.rs 
b/core/server/src/streaming/partitions/helpers.rs
index 1062a6dc..312152c6 100644
--- a/core/server/src/streaming/partitions/helpers.rs
+++ b/core/server/src/streaming/partitions/helpers.rs
@@ -2,7 +2,7 @@ use error_set::ErrContext;
 use iggy_common::{ConsumerOffsetInfo, Identifier, IggyByteSize, IggyError};
 use std::{
     ops::{AsyncFnOnce, Index},
-    sync::atomic::Ordering,
+    sync::{Arc, atomic::Ordering},
 };
 use sysinfo::Component;
 
@@ -273,212 +273,6 @@ pub fn create_message_deduplicator(config: &SystemConfig) 
-> Option<MessageDedup
     Some(MessageDeduplicator::new(max_entries, expiry))
 }
 
-pub async fn get_messages_by_offset(
-    storage: &Storage,
-    journal: &MemoryMessageJournal,
-    index: &Option<IggyIndexesMut>,
-    offset: u64,
-    end_offset: u64,
-    count: u32,
-    segment_start_offset: u64,
-) -> Result<IggyMessagesBatchSet, IggyError> {
-    if count == 0 {
-        return Ok(IggyMessagesBatchSet::default());
-    }
-
-    // Case 0: Accumulator is empty, so all messages have to be on disk
-    if journal.is_empty() {
-        return load_messages_from_disk_by_offset(
-            storage,
-            index,
-            offset,
-            count,
-            segment_start_offset,
-        )
-        .await;
-    }
-
-    let journal_first_offset = journal.inner().base_offset;
-    let journal_last_offset = journal.inner().current_offset;
-
-    // Case 1: All messages are in accumulator buffer
-    if offset >= journal_first_offset && end_offset <= journal_last_offset {
-        return Ok(journal.get(|batches| batches.get_by_offset(offset, count)));
-    }
-
-    // Case 2: All messages are on disk
-    if end_offset < journal_first_offset {
-        return load_messages_from_disk_by_offset(
-            storage,
-            index,
-            offset,
-            count,
-            segment_start_offset,
-        )
-        .await;
-    }
-
-    // Case 3: Messages span disk and accumulator buffer boundary
-    // Calculate how many messages we need from disk
-    let disk_count = if offset < journal_first_offset {
-        ((journal_first_offset - offset) as u32).min(count)
-    } else {
-        0
-    };
-    let mut combined_batch_set = IggyMessagesBatchSet::empty();
-
-    // Load messages from disk if needed
-    if disk_count > 0 {
-        let disk_messages = load_messages_from_disk_by_offset(storage, index, 
offset, disk_count, segment_start_offset)
-            .await
-            .with_error_context(|error| {
-                format!("Failed to load messages from disk, start offset: 
{offset}, count: {disk_count}, error: {error}")
-            })?;
-
-        if !disk_messages.is_empty() {
-            combined_batch_set.add_batch_set(disk_messages);
-        }
-    }
-
-    // Calculate how many more messages we need from the accumulator
-    let remaining_count = count - combined_batch_set.count();
-
-    if remaining_count > 0 {
-        let accumulator_start_offset = std::cmp::max(offset, 
journal_first_offset);
-        let accumulator_messages =
-            journal.get(|batches| 
batches.get_by_offset(accumulator_start_offset, remaining_count));
-        if !accumulator_messages.is_empty() {
-            combined_batch_set.add_batch_set(accumulator_messages);
-        }
-    }
-
-    Ok(combined_batch_set)
-}
-
-async fn load_messages_from_disk_by_offset(
-    storage: &Storage,
-    index: &Option<IggyIndexesMut>,
-    start_offset: u64,
-    count: u32,
-    segment_start_offset: u64,
-) -> Result<IggyMessagesBatchSet, IggyError> {
-    // Convert start_offset to relative offset within the segment
-    let relative_start_offset = (start_offset - segment_start_offset) as u32;
-
-    // Load indexes first
-    let indexes_to_read = if let Some(indexes) = index {
-        if !indexes.is_empty() {
-            indexes.slice_by_offset(relative_start_offset, count)
-        } else {
-            storage
-                .index_reader
-                .as_ref()
-                .expect("Index reader not initialized")
-                .load_from_disk_by_offset(relative_start_offset, count)
-                .await?
-        }
-    } else {
-        storage
-            .index_reader
-            .as_ref()
-            .expect("Index reader not initialized")
-            .load_from_disk_by_offset(relative_start_offset, count)
-            .await?
-    };
-
-    if indexes_to_read.is_none() {
-        return Ok(IggyMessagesBatchSet::empty());
-    }
-
-    let indexes_to_read = indexes_to_read.unwrap();
-    let batch = storage
-        .messages_reader
-        .as_ref()
-        .expect("Messages reader not initialized")
-        .load_messages_from_disk(indexes_to_read)
-        .await
-        .with_error_context(|error| format!("Failed to load messages from 
disk: {error}"))?;
-
-    batch
-        .validate_checksums_and_offsets(start_offset)
-        .with_error_context(|error| {
-            format!("Failed to validate messages read from disk! error: 
{error}")
-        })?;
-
-    Ok(IggyMessagesBatchSet::from(batch))
-}
-
-pub fn get_messages_by_offset_range(
-    offset: u64,
-    count: u32,
-    range: std::ops::Range<usize>,
-) -> impl AsyncFnOnce(ComponentsById<PartitionRef>) -> 
Result<IggyMessagesBatchSet, IggyError> {
-    async move |(.., log)| -> Result<IggyMessagesBatchSet, IggyError> {
-        let segments = log.segments().iter();
-        let storages = log.storages().iter();
-        let journal = log.journal();
-        let indexes = log.indexes().iter();
-
-        let mut remaining_count = count;
-        let mut batches = IggyMessagesBatchSet::empty();
-        let mut current_offset = offset;
-
-        for (segment, storage, index) in segments
-            .zip(storages)
-            .zip(indexes)
-            .map(|((a, b), c)| (a, b, c))
-            .skip(range.start)
-            .take(range.end - range.start)
-        {
-            let start_offset = if current_offset < segment.start_offset {
-                segment.start_offset
-            } else {
-                current_offset
-            };
-
-            let mut end_offset = start_offset + (remaining_count - 1) as u64;
-            if end_offset > segment.end_offset {
-                end_offset = segment.end_offset;
-            }
-
-            // Calculate the actual count to request from this segment
-            let count: u32 = ((end_offset - start_offset + 1) as 
u32).min(remaining_count);
-
-            let messages = get_messages_by_offset(
-                storage,
-                journal,
-                index,
-                start_offset,
-                end_offset,
-                count,
-                segment.start_offset,
-            )
-            .await?;
-
-            let messages_count = messages.count();
-            if messages_count == 0 {
-                current_offset = segment.end_offset + 1;
-                continue;
-            }
-
-            remaining_count = remaining_count.saturating_sub(messages_count);
-
-            if let Some(last_offset) = messages.last_offset() {
-                current_offset = last_offset + 1;
-            } else if messages_count > 0 {
-                current_offset += messages_count as u64;
-            }
-
-            batches.add_batch_set(messages);
-
-            if remaining_count == 0 {
-                break;
-            }
-        }
-        Ok(batches)
-    }
-}
-
 pub fn get_segment_range_by_offset(
     offset: u64,
 ) -> impl FnOnce(ComponentsById<PartitionRef>) -> std::ops::Range<usize> {
@@ -556,105 +350,6 @@ pub async fn load_messages_from_disk_by_timestamp(
     Ok(IggyMessagesBatchSet::from(batch))
 }
 
-pub async fn get_messages_by_timestamp(
-    storage: &Storage,
-    journal: &MemoryMessageJournal,
-    index: &Option<IggyIndexesMut>,
-    timestamp: u64,
-    count: u32,
-) -> Result<IggyMessagesBatchSet, IggyError> {
-    if count == 0 {
-        return Ok(IggyMessagesBatchSet::default());
-    }
-
-    // Case 0: Accumulator is empty, so all messages have to be on disk
-    if journal.is_empty() {
-        return load_messages_from_disk_by_timestamp(storage, index, timestamp, 
count).await;
-    }
-
-    let journal_first_timestamp = journal.inner().first_timestamp;
-    let journal_last_timestamp = journal.inner().end_timestamp;
-
-    // Case 1: All messages are in accumulator buffer
-    if timestamp > journal_last_timestamp {
-        return Ok(IggyMessagesBatchSet::empty());
-    }
-
-    if timestamp >= journal_first_timestamp {
-        return Ok(journal.get(|batches| batches.get_by_timestamp(timestamp, 
count)));
-    }
-
-    // Case 2: All messages are on disk (timestamp is before journal's first 
timestamp)
-    let disk_messages =
-        load_messages_from_disk_by_timestamp(storage, index, timestamp, 
count).await?;
-
-    if disk_messages.count() >= count {
-        return Ok(disk_messages);
-    }
-
-    // Case 3: Messages span disk and accumulator buffer boundary
-    let remaining_count = count - disk_messages.count();
-    let journal_messages =
-        journal.get(|batches| batches.get_by_timestamp(timestamp, 
remaining_count));
-
-    let mut combined_batch_set = disk_messages;
-    if !journal_messages.is_empty() {
-        combined_batch_set.add_batch_set(journal_messages);
-    }
-    return Ok(combined_batch_set);
-}
-
-pub fn get_messages_by_timestamp_range(
-    timestamp: u64,
-    count: u32,
-    range: std::ops::Range<usize>,
-) -> impl AsyncFnOnce(ComponentsById<PartitionRef>) -> 
Result<IggyMessagesBatchSet, IggyError> {
-    async move |(.., log)| -> Result<IggyMessagesBatchSet, IggyError> {
-        let segments = log.segments().iter();
-        let storages = log.storages().iter();
-        let journal = log.journal();
-        let indexes = log.indexes().iter();
-
-        let mut remaining_count = count;
-        let mut batches = IggyMessagesBatchSet::empty();
-
-        for (segment, storage, index) in segments
-            .zip(storages)
-            .zip(indexes)
-            .map(|((a, b), c)| (a, b, c))
-            .skip(range.start)
-            .take(range.end - range.start)
-        {
-            if remaining_count == 0 {
-                break;
-            }
-
-            // Skip segments that end before our timestamp
-            if segment.end_timestamp < timestamp {
-                continue;
-            }
-
-            let messages =
-                get_messages_by_timestamp(storage, journal, index, timestamp, 
remaining_count)
-                    .await?;
-
-            let messages_count = messages.count();
-            if messages_count == 0 {
-                continue;
-            }
-
-            remaining_count = remaining_count.saturating_sub(messages_count);
-            batches.add_batch_set(messages);
-
-            if remaining_count == 0 {
-                break;
-            }
-        }
-
-        Ok(batches)
-    }
-}
-
 pub fn calculate_current_offset() -> impl FnOnce(ComponentsById<PartitionRef>) 
-> u64 {
     |(root, _, _, offset, ..)| {
         if !root.should_increment_offset() {
@@ -665,21 +360,11 @@ pub fn calculate_current_offset() -> impl 
FnOnce(ComponentsById<PartitionRef>) -
     }
 }
 
-pub fn deduplicate_messages(
-    current_offset: u64,
-    current_position: u32,
-    batch: &mut IggyMessagesBatchMut,
-) -> impl AsyncFnOnce(ComponentsById<PartitionRef>) {
-    async move |(.., deduplicator, _, _, _, log)| {
+pub fn get_segment_start_offset_and_deduplicator()
+-> impl FnOnce(ComponentsById<PartitionRef>) -> (u64, 
Option<Arc<MessageDeduplicator>>) {
+    move |(.., deduplicator, _, _, _, log)| {
         let segment = log.active_segment();
-        batch
-            .prepare_for_persistence(
-                segment.start_offset,
-                current_offset,
-                current_position,
-                deduplicator.as_ref(),
-            )
-            .await;
+        (segment.start_offset, deduplicator.clone())
     }
 }
 
diff --git a/core/server/src/streaming/partitions/partition2.rs 
b/core/server/src/streaming/partitions/partition2.rs
index 6c85dfb0..e965a270 100644
--- a/core/server/src/streaming/partitions/partition2.rs
+++ b/core/server/src/streaming/partitions/partition2.rs
@@ -17,7 +17,10 @@ use crate::{
 };
 use iggy_common::{Identifier, IggyTimestamp};
 use slab::Slab;
-use std::sync::{Arc, atomic::AtomicU64};
+use std::{
+    rc::Rc,
+    sync::{Arc, atomic::AtomicU64},
+};
 
 #[derive(Debug, Clone)]
 pub struct ConsumerOffsets(papaya::HashMap<usize, 
consumer_offset::ConsumerOffset>);
@@ -87,7 +90,7 @@ impl std::ops::DerefMut for ConsumerGroupOffsets {
 pub struct Partition {
     root: PartitionRoot,
     stats: Arc<PartitionStats>,
-    message_deduplicator: Option<MessageDeduplicator>,
+    message_deduplicator: Option<Arc<MessageDeduplicator>>,
     offset: Arc<AtomicU64>,
     consumer_offset: Arc<ConsumerOffsets>,
     consumer_group_offset: Arc<ConsumerGroupOffsets>,
@@ -106,6 +109,7 @@ impl Partition {
         log: SegmentedLog<MemoryMessageJournal>,
     ) -> Self {
         let root = PartitionRoot::new(created_at, should_increment_offset);
+        let message_deduplicator = message_deduplicator.map(Arc::new);
         Self {
             root,
             stats,
@@ -120,7 +124,7 @@ impl Partition {
     pub fn new_with_components(
         root: PartitionRoot,
         stats: Arc<PartitionStats>,
-        message_deduplicator: Option<MessageDeduplicator>,
+        message_deduplicator: Option<Arc<MessageDeduplicator>>,
         offset: Arc<AtomicU64>,
         consumer_offset: Arc<ConsumerOffsets>,
         consumer_group_offset: Arc<ConsumerGroupOffsets>,
@@ -172,7 +176,7 @@ impl IntoComponents for Partition {
     type Components = (
         PartitionRoot,
         Arc<PartitionStats>,
-        Option<MessageDeduplicator>,
+        Option<Arc<MessageDeduplicator>>,
         Arc<AtomicU64>,
         Arc<ConsumerOffsets>,
         Arc<ConsumerGroupOffsets>,
@@ -233,7 +237,7 @@ impl PartitionRoot {
 pub struct PartitionRef<'a> {
     root: &'a Slab<PartitionRoot>,
     stats: &'a Slab<Arc<PartitionStats>>,
-    message_deduplicator: &'a Slab<Option<MessageDeduplicator>>,
+    message_deduplicator: &'a Slab<Option<Arc<MessageDeduplicator>>>,
     offset: &'a Slab<Arc<AtomicU64>>,
     consumer_offset: &'a Slab<Arc<ConsumerOffsets>>,
     consumer_group_offset: &'a Slab<Arc<ConsumerGroupOffsets>>,
@@ -244,7 +248,7 @@ impl<'a> PartitionRef<'a> {
     pub fn new(
         root: &'a Slab<PartitionRoot>,
         stats: &'a Slab<Arc<PartitionStats>>,
-        message_deduplicator: &'a Slab<Option<MessageDeduplicator>>,
+        message_deduplicator: &'a Slab<Option<Arc<MessageDeduplicator>>>,
         offset: &'a Slab<Arc<AtomicU64>>,
         consumer_offset: &'a Slab<Arc<ConsumerOffsets>>,
         consumer_group_offset: &'a Slab<Arc<ConsumerGroupOffsets>>,
@@ -266,7 +270,7 @@ impl<'a> IntoComponents for PartitionRef<'a> {
     type Components = (
         &'a Slab<PartitionRoot>,
         &'a Slab<Arc<PartitionStats>>,
-        &'a Slab<Option<MessageDeduplicator>>,
+        &'a Slab<Option<Arc<MessageDeduplicator>>>,
         &'a Slab<Arc<AtomicU64>>,
         &'a Slab<Arc<ConsumerOffsets>>,
         &'a Slab<Arc<ConsumerGroupOffsets>>,
@@ -291,7 +295,7 @@ impl<'a> IntoComponentsById for PartitionRef<'a> {
     type Output = (
         &'a PartitionRoot,
         &'a Arc<PartitionStats>,
-        &'a Option<MessageDeduplicator>,
+        &'a Option<Arc<MessageDeduplicator>>,
         &'a Arc<AtomicU64>,
         &'a Arc<ConsumerOffsets>,
         &'a Arc<ConsumerGroupOffsets>,
@@ -314,7 +318,7 @@ impl<'a> IntoComponentsById for PartitionRef<'a> {
 pub struct PartitionRefMut<'a> {
     root: &'a mut Slab<PartitionRoot>,
     stats: &'a mut Slab<Arc<PartitionStats>>,
-    message_deduplicator: &'a mut Slab<Option<MessageDeduplicator>>,
+    message_deduplicator: &'a mut Slab<Option<Arc<MessageDeduplicator>>>,
     offset: &'a mut Slab<Arc<AtomicU64>>,
     consumer_offset: &'a mut Slab<Arc<ConsumerOffsets>>,
     consumer_group_offset: &'a mut Slab<Arc<ConsumerGroupOffsets>>,
@@ -325,7 +329,7 @@ impl<'a> PartitionRefMut<'a> {
     pub fn new(
         root: &'a mut Slab<PartitionRoot>,
         stats: &'a mut Slab<Arc<PartitionStats>>,
-        message_deduplicator: &'a mut Slab<Option<MessageDeduplicator>>,
+        message_deduplicator: &'a mut Slab<Option<Arc<MessageDeduplicator>>>,
         offset: &'a mut Slab<Arc<AtomicU64>>,
         consumer_offset: &'a mut Slab<Arc<ConsumerOffsets>>,
         consumer_group_offset: &'a mut Slab<Arc<ConsumerGroupOffsets>>,
@@ -347,7 +351,7 @@ impl<'a> IntoComponents for PartitionRefMut<'a> {
     type Components = (
         &'a mut Slab<PartitionRoot>,
         &'a mut Slab<Arc<PartitionStats>>,
-        &'a mut Slab<Option<MessageDeduplicator>>,
+        &'a mut Slab<Option<Arc<MessageDeduplicator>>>,
         &'a mut Slab<Arc<AtomicU64>>,
         &'a mut Slab<Arc<ConsumerOffsets>>,
         &'a mut Slab<Arc<ConsumerGroupOffsets>>,
@@ -372,7 +376,7 @@ impl<'a> IntoComponentsById for PartitionRefMut<'a> {
     type Output = (
         &'a mut PartitionRoot,
         &'a mut Arc<PartitionStats>,
-        &'a mut Option<MessageDeduplicator>,
+        &'a mut Option<Arc<MessageDeduplicator>>,
         &'a mut Arc<AtomicU64>,
         &'a mut Arc<ConsumerOffsets>,
         &'a mut Arc<ConsumerGroupOffsets>,
diff --git a/core/server/src/streaming/segments/storage.rs 
b/core/server/src/streaming/segments/storage.rs
index 84fcd783..53313282 100644
--- a/core/server/src/streaming/segments/storage.rs
+++ b/core/server/src/streaming/segments/storage.rs
@@ -8,12 +8,14 @@ use crate::streaming::segments::{
     messages::{MessagesReader, MessagesWriter},
 };
 
-#[derive(Debug)]
+unsafe impl Send for Storage {}
+
+#[derive(Debug, Clone)]
 pub struct Storage {
-    pub messages_writer: Option<MessagesWriter>,
-    pub messages_reader: Option<MessagesReader>,
-    pub index_writer: Option<IndexWriter>,
-    pub index_reader: Option<IndexReader>,
+    pub messages_writer: Option<Rc<MessagesWriter>>,
+    pub messages_reader: Option<Rc<MessagesReader>>,
+    pub index_writer: Option<Rc<IndexWriter>>,
+    pub index_reader: Option<Rc<IndexReader>>,
 }
 
 impl Storage {
@@ -28,19 +30,21 @@ impl Storage {
     ) -> Result<Self, IggyError> {
         let size = Rc::new(AtomicU64::new(messages_size));
         let indexes_size = Rc::new(AtomicU64::new(indexes_size));
-        let messages_writer =
-            MessagesWriter::new(messages_path, size.clone(), log_fsync, 
file_exists).await?;
+        let messages_writer = Rc::new(
+            MessagesWriter::new(messages_path, size.clone(), log_fsync, 
file_exists).await?,
+        );
 
-        let index_writer =
-            IndexWriter::new(index_path, indexes_size.clone(), index_fsync, 
file_exists).await?;
+        let index_writer = Rc::new(
+            IndexWriter::new(index_path, indexes_size.clone(), index_fsync, 
file_exists).await?,
+        );
 
         if file_exists {
             messages_writer.fsync().await?;
             index_writer.fsync().await?;
         }
 
-        let messages_reader = MessagesReader::new(messages_path, size).await?;
-        let index_reader = IndexReader::new(index_path, indexes_size).await?;
+        let messages_reader = Rc::new(MessagesReader::new(messages_path, 
size).await?);
+        let index_reader = Rc::new(IndexReader::new(index_path, 
indexes_size).await?);
         Ok(Self {
             messages_writer: Some(messages_writer),
             messages_reader: Some(messages_reader),
@@ -49,7 +53,7 @@ impl Storage {
         })
     }
 
-    pub fn shutdown(&mut self) -> (Option<MessagesWriter>, 
Option<IndexWriter>) {
+    pub fn shutdown(&mut self) -> (Option<Rc<MessagesWriter>>, 
Option<Rc<IndexWriter>>) {
         let messages_writer = self.messages_writer.take();
         let index_writer = self.index_writer.take();
         (messages_writer, index_writer)
diff --git a/core/server/src/streaming/segments/types/messages_batch_mut.rs 
b/core/server/src/streaming/segments/types/messages_batch_mut.rs
index 8e4895da..29e5c4d2 100644
--- a/core/server/src/streaming/segments/types/messages_batch_mut.rs
+++ b/core/server/src/streaming/segments/types/messages_batch_mut.rs
@@ -29,6 +29,7 @@ use iggy_common::{
 };
 use lending_iterator::prelude::*;
 use std::ops::{Deref, Index};
+use std::sync::Arc;
 use tracing::{error, warn};
 
 /// A container for mutable messages that are being prepared for persistence.
@@ -144,7 +145,7 @@ impl IggyMessagesBatchMut {
         start_offset: u64,
         base_offset: u64,
         current_position: u32,
-        deduplicator: Option<&MessageDeduplicator>,
+        deduplicator: Option<&Arc<MessageDeduplicator>>,
     ) {
         let messages_count = self.count();
         if messages_count == 0 {
diff --git a/core/server/src/streaming/topics/storage2.rs 
b/core/server/src/streaming/topics/storage2.rs
index 35e854a7..86d783b1 100644
--- a/core/server/src/streaming/topics/storage2.rs
+++ b/core/server/src/streaming/topics/storage2.rs
@@ -94,191 +94,3 @@ pub async fn delete_topic_from_disk(
     );
     Ok((messages_count, size_bytes, segments_count))
 }
-
-// Old implementation kept for reference
-/*
-async fn load(&self, topic: &mut Topic, mut state: TopicState) -> Result<(), 
IggyError> {
-        info!("Loading topic {} from disk...", topic);
-        if !Path::new(&topic.path).exists() {
-            return Err(IggyError::TopicIdNotFound(topic.topic_id, 
topic.stream_id));
-        }
-
-        let message_expiry = Topic::get_message_expiry(state.message_expiry, 
&topic.config);
-        let max_topic_size = Topic::get_max_topic_size(state.max_topic_size, 
&topic.config)?;
-        topic.created_at = state.created_at;
-        topic.message_expiry = message_expiry;
-        topic.max_topic_size = max_topic_size;
-        topic.compression_algorithm = state.compression_algorithm;
-        topic.replication_factor = state.replication_factor.unwrap_or(1);
-
-        let mut dir_entries = fs::read_dir(&topic.partitions_path).await
-            .with_context(|| format!("Failed to read partition with ID: {} for 
stream with ID: {} for topic with ID: {} and path: {}",
-                                     topic.topic_id, topic.stream_id, 
topic.topic_id, &topic.partitions_path))
-            .map_err(|_| IggyError::CannotReadPartitions)?;
-
-        let mut unloaded_partitions = Vec::new();
-        while let Some(dir_entry) = 
dir_entries.next_entry().await.unwrap_or(None) {
-            let metadata = dir_entry.metadata().await;
-            if metadata.is_err() || metadata.unwrap().is_file() {
-                continue;
-            }
-
-            let name = dir_entry.file_name().into_string().unwrap();
-            let partition_id = name.parse::<u32>();
-            if partition_id.is_err() {
-                error!("Invalid partition ID file with name: '{}'.", name);
-                continue;
-            }
-
-            let partition_id = partition_id.unwrap();
-            let partition_state = state.partitions.get(&partition_id);
-            if partition_state.is_none() {
-                let stream_id = topic.stream_id;
-                let topic_id = topic.topic_id;
-                error!(
-                    "Partition with ID: '{partition_id}' for stream with ID: 
'{stream_id}' and topic with ID: '{topic_id}' was not found in state, but 
exists on disk and will be removed."
-                );
-                if let Err(error) = 
fs::remove_dir_all(&dir_entry.path()).await {
-                    error!("Cannot remove partition directory: {error}");
-                } else {
-                    warn!(
-                        "Partition with ID: '{partition_id}' for stream with 
ID: '{stream_id}' and topic with ID: '{topic_id}' was removed."
-                    );
-                }
-                continue;
-            }
-
-            let partition_state = partition_state.unwrap();
-            let partition = Partition::create(
-                topic.stream_id,
-                topic.topic_id,
-                partition_id,
-                false,
-                topic.config.clone(),
-                topic.storage.clone(),
-                message_expiry,
-                topic.messages_count_of_parent_stream.clone(),
-                topic.messages_count.clone(),
-                topic.size_of_parent_stream.clone(),
-                topic.size_bytes.clone(),
-                topic.segments_count_of_parent_stream.clone(),
-                partition_state.created_at,
-            )
-            .await;
-            unloaded_partitions.push(partition);
-        }
-
-        let state_partition_ids = 
state.partitions.keys().copied().collect::<AHashSet<u32>>();
-        let unloaded_partition_ids = unloaded_partitions
-            .iter()
-            .map(|partition| partition.partition_id)
-            .collect::<AHashSet<u32>>();
-        let missing_ids = state_partition_ids
-            .difference(&unloaded_partition_ids)
-            .copied()
-            .collect::<AHashSet<u32>>();
-        if missing_ids.is_empty() {
-            info!(
-                "All partitions for topic with ID: '{}' for stream with ID: 
'{}' found on disk were found in state.",
-                topic.topic_id, topic.stream_id
-            );
-        } else {
-            warn!(
-                "Partitions with IDs: '{missing_ids:?}' for topic with ID: 
'{topic_id}' for stream with ID: '{stream_id}' were not found on disk.",
-                topic_id = topic.topic_id,
-                stream_id = topic.stream_id
-            );
-            if topic.config.recovery.recreate_missing_state {
-                info!(
-                    "Recreating missing state in recovery config is enabled, 
missing partitions will be created for topic with ID: '{}' for stream with ID: 
'{}'.",
-                    topic.topic_id, topic.stream_id
-                );
-
-                for partition_id in missing_ids {
-                    let partition_state = 
state.partitions.get(&partition_id).unwrap();
-                    let mut partition = Partition::create(
-                        topic.stream_id,
-                        topic.topic_id,
-                        partition_id,
-                        true,
-                        topic.config.clone(),
-                        topic.storage.clone(),
-                        message_expiry,
-                        topic.messages_count_of_parent_stream.clone(),
-                        topic.messages_count.clone(),
-                        topic.size_of_parent_stream.clone(),
-                        topic.size_bytes.clone(),
-                        topic.segments_count_of_parent_stream.clone(),
-                        partition_state.created_at,
-                    )
-                    .await;
-                    partition.persist().await.with_error_context(|error| {
-                        format!(
-                            "{COMPONENT} (error: {error}) - failed to persist 
partition: {partition}"
-                        )
-                    })?;
-                    partition.segments.clear();
-                    unloaded_partitions.push(partition);
-                    info!(
-                        "Created missing partition with ID: '{partition_id}', 
for topic with ID: '{}' for stream with ID: '{}'.",
-                        topic.topic_id, topic.stream_id
-                    );
-                }
-            } else {
-                warn!(
-                    "Recreating missing state in recovery config is disabled, 
missing partitions will not be created for topic with ID: '{}' for stream with 
ID: '{}'.",
-                    topic.topic_id, topic.stream_id
-                );
-            }
-        }
-
-        let stream_id = topic.stream_id;
-        let topic_id = topic.topic_id;
-        let loaded_partitions = Arc::new(Mutex::new(Vec::new()));
-        let mut load_partitions = Vec::new();
-        for mut partition in unloaded_partitions {
-            let loaded_partitions = loaded_partitions.clone();
-            let partition_state = 
state.partitions.remove(&partition.partition_id).unwrap();
-            let load_partition = tokio::spawn(async move {
-                match partition.load(partition_state).await {
-                    Ok(_) => {
-                        loaded_partitions.lock().await.push(partition);
-                    }
-                    Err(error) => {
-                        error!(
-                            "Failed to load partition with ID: {} for stream 
with ID: {stream_id} and topic with ID: {topic_id}. Error: {error}",
-                            partition.partition_id
-                        );
-                    }
-                }
-            });
-            load_partitions.push(load_partition);
-        }
-
-        join_all(load_partitions).await;
-        for partition in loaded_partitions.lock().await.drain(..) {
-            topic
-                .partitions
-                .insert(partition.partition_id, IggySharedMut::new(partition));
-        }
-
-        for consumer_group in state.consumer_groups.into_values() {
-            let consumer_group = ConsumerGroup::new(
-                topic.topic_id,
-                consumer_group.id,
-                &consumer_group.name,
-                topic.get_partitions_count(),
-            );
-            topic
-                .consumer_groups_ids
-                .insert(consumer_group.name.to_owned(), 
consumer_group.group_id);
-            topic
-                .consumer_groups
-                .insert(consumer_group.group_id, RwLock::new(consumer_group));
-        }
-
-        info!("Loaded topic {topic}");
-
-        Ok(())
-    }
-    */


Reply via email to