This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/io_uring_tpc by this push:
     new b766cf70 feat(io_uring): fix loading segments on startup (#2176)
b766cf70 is described below

commit b766cf70d9b8d144f70a7a3e454a0dbfce0a151b
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Thu Sep 18 19:00:17 2025 +0200

    feat(io_uring): fix loading segments on startup (#2176)
---
 .../tests/streaming/get_by_timestamp.rs            |  42 +-
 .../binary/handlers/topics/create_topic_handler.rs |   1 -
 core/server/src/bootstrap.rs                       | 434 +++++++++++++++--
 .../src/compat/index_rebuilding/index_rebuilder.rs |   4 +-
 core/server/src/configs/system.rs                  |  14 +-
 core/server/src/main.rs                            |  53 ++-
 core/server/src/shard/builder.rs                   |   3 +-
 core/server/src/shard/mod.rs                       |  96 +++-
 core/server/src/shard/system/partitions.rs         |   4 +-
 core/server/src/slab/partitions.rs                 |   6 +-
 core/server/src/slab/streams.rs                    |  29 +-
 core/server/src/state/system.rs                    |  29 +-
 core/server/src/streaming/partitions/helpers.rs    |   6 -
 core/server/src/streaming/partitions/log.rs        |  15 +-
 core/server/src/streaming/partitions/mod.rs        |   1 -
 core/server/src/streaming/partitions/partition2.rs |   2 +-
 core/server/src/streaming/partitions/storage.rs    | 266 -----------
 core/server/src/streaming/segments/mod.rs          |   5 -
 .../src/streaming/segments/reading_messages.rs     | 401 ----------------
 core/server/src/streaming/segments/segment.rs      | 529 ---------------------
 core/server/src/streaming/segments/storage.rs      |   9 +-
 21 files changed, 631 insertions(+), 1318 deletions(-)

diff --git a/core/integration/tests/streaming/get_by_timestamp.rs 
b/core/integration/tests/streaming/get_by_timestamp.rs
index a40d2904..de30345a 100644
--- a/core/integration/tests/streaming/get_by_timestamp.rs
+++ b/core/integration/tests/streaming/get_by_timestamp.rs
@@ -16,17 +16,17 @@
  * under the License.
  */
 
-use crate::streaming::common::test_setup::TestSetup;
 use super::bootstrap_test_environment;
+use crate::streaming::common::test_setup::TestSetup;
 use bytes::BytesMut;
 use iggy::prelude::*;
 use server::configs::cache_indexes::CacheIndexesConfig;
 use server::configs::system::{PartitionConfig, SegmentConfig, SystemConfig};
+use server::shard::namespace::IggyFullNamespace;
 use server::shard::system::messages::PollingArgs;
 use server::streaming::polling_consumer::PollingConsumer;
 use server::streaming::segments::IggyMessagesBatchMut;
 use server::streaming::traits::MainOps;
-use server::shard::namespace::IggyFullNamespace;
 use std::collections::HashMap;
 use std::str::FromStr;
 use std::sync::Arc;
@@ -117,14 +117,20 @@ async fn test_get_messages_by_timestamp(
     };
 
     // Use the bootstrap method to create streams with proper slab structure
-    let bootstrap_result = bootstrap_test_environment(shard_id as u16, 
&config).await.unwrap();
+    let bootstrap_result = bootstrap_test_environment(shard_id as u16, &config)
+        .await
+        .unwrap();
     let streams = bootstrap_result.streams;
     let stream_identifier = bootstrap_result.stream_id;
     let topic_identifier = bootstrap_result.topic_id;
     let partition_id = bootstrap_result.partition_id;
-    
+
     // Create namespace for MainOps calls
-    let namespace = IggyFullNamespace::new(stream_identifier.clone(), 
topic_identifier.clone(), partition_id);
+    let namespace = IggyFullNamespace::new(
+        stream_identifier.clone(),
+        topic_identifier.clone(),
+        partition_id,
+    );
 
     let mut all_messages = Vec::with_capacity(total_messages_count as usize);
 
@@ -193,7 +199,10 @@ async fn test_get_messages_by_timestamp(
 
         let batch = 
IggyMessagesBatchMut::from_messages(messages_slice_to_append, messages_size);
         assert_eq!(batch.count(), batch_len);
-        streams.append_messages(shard_id as u16, &config, &namespace, 
batch).await.unwrap();
+        streams
+            .append_messages(shard_id as u16, &config, &namespace, batch)
+            .await
+            .unwrap();
 
         // Capture the timestamp of this batch
         batch_timestamps.push(IggyTimestamp::now());
@@ -209,10 +218,15 @@ async fn test_get_messages_by_timestamp(
     let total_sent_messages = total_messages_count;
 
     // Create a single consumer to reuse throughout the test
-    let consumer = PollingConsumer::consumer(&Identifier::numeric(1).unwrap(), 
partition_id as usize);
+    let consumer =
+        PollingConsumer::consumer(&Identifier::numeric(1).unwrap(), 
partition_id as usize);
 
     // Test 1: All messages from initial timestamp
-    let args = PollingArgs::new(PollingStrategy::timestamp(initial_timestamp), 
total_sent_messages, false);
+    let args = PollingArgs::new(
+        PollingStrategy::timestamp(initial_timestamp),
+        total_sent_messages,
+        false,
+    );
     let (_, all_loaded_messages) = streams
         .poll_messages(&namespace, consumer, args)
         .await
@@ -235,7 +249,11 @@ async fn test_get_messages_by_timestamp(
         let prior_batches_sum: u32 = batch_lengths[..3].iter().sum();
         let remaining_messages = total_sent_messages - prior_batches_sum;
 
-        let args = 
PollingArgs::new(PollingStrategy::timestamp(middle_timestamp), 
remaining_messages, false);
+        let args = PollingArgs::new(
+            PollingStrategy::timestamp(middle_timestamp),
+            remaining_messages,
+            false,
+        );
         let (_, middle_messages) = streams
             .poll_messages(&namespace, consumer, args)
             .await
@@ -265,7 +283,11 @@ async fn test_get_messages_by_timestamp(
 
     // Test 4: Small subset from initial timestamp
     let subset_size = std::cmp::min(3, total_sent_messages);
-    let args = PollingArgs::new(PollingStrategy::timestamp(initial_timestamp), 
subset_size, false);
+    let args = PollingArgs::new(
+        PollingStrategy::timestamp(initial_timestamp),
+        subset_size,
+        false,
+    );
     let (_, subset_messages) = streams
         .poll_messages(&namespace, consumer, args)
         .await
diff --git a/core/server/src/binary/handlers/topics/create_topic_handler.rs 
b/core/server/src/binary/handlers/topics/create_topic_handler.rs
index 577ab62f..5e456e6a 100644
--- a/core/server/src/binary/handlers/topics/create_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/create_topic_handler.rs
@@ -87,7 +87,6 @@ impl ServerCommandHandler for CreateTopic {
             partitions,
         };
         let _responses = shard.broadcast_event_to_all_shards(event).await;
-        // TODO: Create shard_table records for partitions.
         let response = shard.streams2.with_topic_by_id(
             &self.stream_id,
             &Identifier::numeric(topic_id as u32).unwrap(),
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index 8e3797b2..fc38b069 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -1,7 +1,13 @@
 use crate::{
     IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV,
-    configs::{config_provider::ConfigProviderKind, server::ServerConfig, 
system::SystemConfig},
-    io::fs_utils,
+    compat::index_rebuilding::index_rebuilder::IndexRebuilder,
+    configs::{
+        cache_indexes::CacheIndexesConfig,
+        config_provider::ConfigProviderKind,
+        server::ServerConfig,
+        system::{INDEX_EXTENSION, LOG_EXTENSION, SystemConfig},
+    },
+    io::fs_utils::{self, DirEntry},
     server_error::ServerError,
     shard::{
         system::info::SystemInfo,
@@ -18,15 +24,14 @@ use crate::{
     },
     state::system::{StreamState, TopicState, UserState},
     streaming::{
-        deduplication::message_deduplicator,
         partitions::{
-            consumer_offset::{self, ConsumerOffset},
-            helpers::create_message_deduplicator,
-            partition2::{self, ConsumerGroupOffsets, ConsumerOffsets},
+            consumer_offset::ConsumerOffset, 
helpers::create_message_deduplicator,
+            journal::MemoryMessageJournal, log::SegmentedLog, partition2,
             storage2::load_consumer_offsets,
         },
         persistence::persister::{FilePersister, FileWithSyncPersister, 
PersisterKind},
         personal_access_tokens::personal_access_token::PersonalAccessToken,
+        segments::{Segment2, storage::Storage},
         stats::stats::{PartitionStats, StreamStats, TopicStats},
         storage::SystemStorage,
         streams::stream2,
@@ -38,6 +43,7 @@ use crate::{
 };
 use ahash::HashMap;
 use compio::{fs::create_dir_all, runtime::Runtime};
+use error_set::ErrContext;
 use iggy_common::{
     ConsumerKind, IggyError, UserId,
     defaults::{
@@ -46,9 +52,9 @@ use iggy_common::{
     },
 };
 use std::{collections::HashSet, env, path::Path, sync::Arc};
-use tracing::info;
+use tracing::{info, warn};
 
-pub fn load_streams(
+pub async fn load_streams(
     state: impl IntoIterator<Item = StreamState>,
     config: &SystemConfig,
 ) -> Result<Streams, IggyError> {
@@ -121,56 +127,39 @@ pub fn load_streams(
             let parent_stats = stats.clone();
             let cgs = consumer_groups.into_values();
             let partitions = partitions.into_values();
+
+            // Load each partition asynchronously and insert immediately
             for partition_state in partitions {
                 info!(
                     "Loading partition with ID: {}, for topic with ID: {} from 
state...",
                     partition_state.id, topic_id
                 );
+
+                let partition_id = partition_state.id;
+                let partition = load_partition(
+                    config,
+                    stream_id as usize,
+                    topic_id,
+                    partition_state,
+                    parent_stats.clone(),
+                )
+                .await?;
+                // Insert partition into the container
                 streams.with_components_by_id(stream_id as usize, |(root, ..)| 
{
-                    let parent_stats = parent_stats.clone();
                     root.topics()
-                        .with_components_by_id_mut(topic_id, |(root, ..)| {
-                            let stats = 
Arc::new(PartitionStats::new(parent_stats));
-                            // TODO: This has to be sampled from segments, if 
there exists more than 0 segements and the segment has more than 0 messages, 
then we set it to true.
-                            let should_increment_offset = todo!();
-                            // TODO: This has to be sampled from segments and 
it's indexes, offset = segment.start_offset + last_index.offset;
-                            let offset = todo!();
-                            let message_deduplicator = 
create_message_deduplicator(config);
-                            let id = partition_state.id;
-
-                            let consumer_offset_path = 
config.get_consumer_offsets_path(stream_id as usize, topic_id as usize, id as 
usize);
-                            let consumer_group_offsets_path = 
config.get_consumer_group_offsets_path(stream_id as usize, topic_id as usize, 
id as usize);
-                            let consumer_offset = 
Arc::new(load_consumer_offsets(&consumer_offset_path, ConsumerKind::Consumer)?
-                            .into_iter().map(|offset| {
-                                (offset.consumer_id as usize, offset)
-                            }).into());
-                            let consumer_group_offset = 
Arc::new(load_consumer_offsets(&consumer_group_offsets_path, 
ConsumerKind::ConsumerGroup)?.into_iter().map(|offset| {
-                                (offset.consumer_id as usize, offset)
-                            }).into());
-
-                            let log = Default::default();
-                            let partition = partition2::Partition::new(
-                                partition_state.created_at,
-                                should_increment_offset,
-                                stats,
-                                message_deduplicator,
-                                offset,
-                                consumer_offset,
-                                consumer_group_offset,
-                                log
-                            );
+                        .with_components_by_id_mut(topic_id, |(mut root, ..)| {
                             let new_id = 
root.partitions_mut().insert(partition);
                             assert_eq!(
-                                new_id, id as usize,
+                                new_id, partition_id as usize,
                                 "load_streams: partition id mismatch when 
inserting partition, mismatch for partition with ID: {}, for topic with ID: {}, 
for stream with ID: {}",
-                                id, topic_id, stream_id
+                                partition_id, topic_id, stream_id
                             );
-                            Ok(())
-                        })
-                })?;
+                        });
+                });
+
                 info!(
                     "Loaded partition with ID: {}, for topic with ID: {} from 
state...",
-                    partition_state.id, topic_id
+                    partition_id, topic_id
                 );
             }
             let partition_ids = streams.with_components_by_id(stream_id as 
usize, |(root, ..)| {
@@ -384,3 +373,358 @@ pub async fn update_system_info(
     storage.info.save(system_info).await?;
     Ok(())
 }
+
+async fn collect_log_files(partition_path: &str) -> Result<Vec<DirEntry>, 
IggyError> {
+    let dir_entries = fs_utils::walk_dir(&partition_path)
+        .await
+        .map_err(|_| IggyError::CannotReadPartitions)?;
+    let mut log_files = Vec::new();
+    for entry in dir_entries {
+        if entry.is_dir {
+            continue;
+        }
+
+        let extension = entry.path.extension();
+        if extension.is_none() || extension.unwrap() != LOG_EXTENSION {
+            continue;
+        }
+
+        log_files.push(entry);
+    }
+
+    Ok(log_files)
+}
+
+pub async fn load_segments(
+    config: &SystemConfig,
+    stream_id: usize,
+    topic_id: usize,
+    partition_id: usize,
+    partition_path: String,
+    stats: Arc<PartitionStats>,
+) -> Result<SegmentedLog<MemoryMessageJournal>, IggyError> {
+    // Read directory entries to find log files using async fs_utils
+    let mut log_files = collect_log_files(&partition_path).await?;
+    log_files.sort_by(|a, b| a.path.file_name().cmp(&b.path.file_name()));
+    let mut log = SegmentedLog::<MemoryMessageJournal>::default();
+    for entry in log_files {
+        let log_file_name = entry
+            .path
+            .file_stem()
+            .unwrap()
+            .to_string_lossy()
+            .to_string();
+
+        let start_offset = log_file_name.parse::<u64>().unwrap();
+
+        // Build file paths directly
+        let messages_file_path = format!("{}/{}.{}", partition_path, 
log_file_name, LOG_EXTENSION);
+        let index_file_path = format!("{}/{}.{}", partition_path, 
log_file_name, INDEX_EXTENSION);
+        let time_index_path = index_file_path.replace(INDEX_EXTENSION, 
"timeindex");
+
+        // Check if index files exist
+        async fn try_exists(path: &str) -> Result<bool, std::io::Error> {
+            match compio::fs::metadata(path).await {
+                Ok(_) => Ok(true),
+                Err(err) => match err.kind() {
+                    std::io::ErrorKind::NotFound => Ok(false),
+                    _ => Err(err),
+                },
+            }
+        }
+
+        let index_path_exists = try_exists(&index_file_path).await.unwrap();
+        let time_index_path_exists = 
try_exists(&time_index_path).await.unwrap();
+        let index_cache_enabled = matches!(
+            config.segment.cache_indexes,
+            CacheIndexesConfig::All | CacheIndexesConfig::OpenSegment
+        );
+
+        // Rebuild indexes if index cache is enabled and index at path does 
not exist
+        if index_cache_enabled && (!index_path_exists || 
time_index_path_exists) {
+            warn!(
+                "Index at path {} does not exist, rebuilding it based on 
{}...",
+                index_file_path, messages_file_path
+            );
+            let now = std::time::Instant::now();
+            let index_rebuilder = IndexRebuilder::new(
+                messages_file_path.clone(),
+                index_file_path.clone(),
+                start_offset,
+            );
+            index_rebuilder.rebuild().await.unwrap_or_else(|e| {
+                panic!(
+                    "Failed to rebuild index for partition with ID: {} for 
stream with ID: {} and topic with ID: {}. Error: {e}",
+                    partition_id, stream_id, topic_id,
+                )
+            });
+            info!(
+                "Rebuilding index for path {} finished, it took {} ms",
+                index_file_path,
+                now.elapsed().as_millis()
+            );
+        }
+
+        if time_index_path_exists {
+            compio::fs::remove_file(&time_index_path).await.unwrap();
+        }
+
+        // Get file metadata to determine segment properties
+        let messages_metadata = compio::fs::metadata(&messages_file_path)
+            .await
+            .map_err(|_| IggyError::CannotReadPartitions)?;
+        let messages_size = messages_metadata.len() as u32;
+
+        let index_size = match compio::fs::metadata(&index_file_path).await {
+            Ok(metadata) => metadata.len() as u32,
+            Err(_) => 0, // Default to 0 if index file doesn't exist
+        };
+
+        // Create storage for the segment using existing files
+        let storage = Storage::new(
+            &messages_file_path,
+            &index_file_path,
+            messages_size as u64,
+            index_size as u64,
+            config.partition.enforce_fsync,
+            config.partition.enforce_fsync,
+            true, // file_exists = true for existing segments
+        )
+        .await?;
+
+        // Load indexes from disk to calculate the correct end offset and 
cache them if needed
+        // This matches the logic in Segment::load_from_disk method
+        let loaded_indexes = {
+            storage.
+            index_reader
+            .as_ref()
+            .unwrap()
+            .load_all_indexes_from_disk()
+            .await
+            .with_error_context(|error| format!("Failed to load indexes during 
startup for stream ID: {}, topic ID: {}, partition_id: {}, {error}", stream_id, 
topic_id, partition_id))
+            .map_err(|_| IggyError::CannotReadFile)?
+        };
+
+        // Calculate end offset based on loaded indexes
+        let end_offset = if loaded_indexes.count() == 0 {
+            0
+        } else {
+            let last_index_offset = loaded_indexes.last().unwrap().offset() as 
u64;
+            start_offset + last_index_offset
+        };
+
+        let (start_timestamp, end_timestamp) = if loaded_indexes.count() == 0 {
+            (0, 0)
+        } else {
+            (
+                loaded_indexes.get(0).unwrap().timestamp(),
+                loaded_indexes.last().unwrap().timestamp(),
+            )
+        };
+
+        // Create the new Segment with proper values from file system
+        let mut segment = Segment2::new(
+            start_offset,
+            config.segment.size,
+            config.segment.message_expiry,
+        );
+
+        // Set properties based on file data
+        segment.start_timestamp = start_timestamp;
+        segment.end_timestamp = end_timestamp;
+        segment.end_offset = end_offset;
+        segment.size = messages_size;
+        segment.sealed = true; // Persisted segments are assumed to be sealed
+
+        if config.partition.validate_checksum {
+            info!(
+                "Validating checksum for segment at offset {} in stream ID: 
{}, topic ID: {}, partition ID: {}",
+                start_offset, stream_id, topic_id, partition_id
+            );
+            let messages_count = loaded_indexes.count() as u32;
+            if messages_count > 0 {
+                const BATCH_COUNT: u32 = 10000;
+                let mut current_relative_offset = 0u32;
+                let mut processed_count = 0u32;
+
+                while processed_count < messages_count {
+                    let remaining_count = messages_count - processed_count;
+                    let batch_count = std::cmp::min(BATCH_COUNT, 
remaining_count);
+                    let batch_indexes = loaded_indexes
+                        .slice_by_offset(current_relative_offset, batch_count)
+                        .unwrap();
+
+                    let messages_reader = 
storage.messages_reader.as_ref().unwrap();
+                    match 
messages_reader.load_messages_from_disk(batch_indexes).await {
+                        Ok(messages_batch) => {
+                            if let Err(e) = 
messages_batch.validate_checksums() {
+                                return 
Err(IggyError::CannotReadPartitions).with_error_context(|_| {
+                                    format!(
+                                        "Failed to validate message checksum 
for segment at offset {} in stream ID: {}, topic ID: {}, partition ID: {}, 
error: {}",
+                                        start_offset, stream_id, topic_id, 
partition_id, e
+                                    )
+                                });
+                            }
+                            processed_count += messages_batch.count();
+                            current_relative_offset += batch_count;
+                        }
+                        Err(e) => {
+                            return Err(e).with_error_context(|_| {
+                                format!(
+                                    "Failed to load messages from disk for 
checksum validation at offset {} in stream ID: {}, topic ID: {}, partition ID: 
{}",
+                                    start_offset, stream_id, topic_id, 
partition_id
+                                )
+                            });
+                        }
+                    }
+                }
+                info!(
+                    "Checksum validation completed for segment at offset {}",
+                    start_offset
+                );
+            }
+        }
+
+        // Add segment to log
+        log.add_persisted_segment(segment, storage);
+
+        // Increment stats for partition - this matches the behavior from 
partition storage load method
+        stats.increment_segments_count(1);
+
+        // Increment size and message counts based on the loaded segment data
+        stats.increment_size_bytes(messages_size as u64);
+
+        // Calculate message count from segment data (end_offset - 
start_offset + 1 if there are messages)
+        let messages_count = if end_offset > start_offset {
+            (end_offset - start_offset + 1) as u64
+        } else if messages_size > 0 {
+            // Fallback: estimate based on loaded indexes count if available
+            loaded_indexes.count() as u64
+        } else {
+            0
+        };
+
+        if messages_count > 0 {
+            stats.increment_messages_count(messages_count);
+        }
+
+        // Now handle index caching based on configuration
+        let should_cache_indexes = match config.segment.cache_indexes {
+            CacheIndexesConfig::All => true,
+            CacheIndexesConfig::OpenSegment => false, // Will be handled after 
all segments are loaded
+            CacheIndexesConfig::None => false,
+        };
+
+        // Set the loaded indexes if we should cache them
+        if should_cache_indexes {
+            let segment_index = log.segments().len() - 1;
+            log.set_segment_indexes(segment_index, loaded_indexes);
+        }
+    }
+
+    // Handle OpenSegment cache configuration: only the last segment should 
keep its indexes
+    if matches!(
+        config.segment.cache_indexes,
+        CacheIndexesConfig::OpenSegment
+    ) && log.has_segments()
+    {
+        let segments_count = log.segments().len();
+        if segments_count > 0 {
+            // Use the IndexReader from the last segment's storage to load 
indexes
+            let last_storage = log.storages().last().unwrap();
+            match last_storage.index_reader.as_ref() {
+                Some(index_reader) => {
+                    if let Ok(loaded_indexes) = 
index_reader.load_all_indexes_from_disk().await {
+                        log.set_segment_indexes(segments_count - 1, 
loaded_indexes);
+                    }
+                }
+                None => {
+                    warn!("Index reader not available for last segment in 
OpenSegment mode");
+                }
+            }
+        }
+    }
+
+    Ok(log)
+}
+
+async fn load_partition(
+    config: &SystemConfig,
+    stream_id: usize,
+    topic_id: usize,
+    partition_state: crate::state::system::PartitionState,
+    parent_stats: Arc<TopicStats>,
+) -> Result<partition2::Partition, IggyError> {
+    use std::sync::atomic::AtomicU64;
+    let stats = Arc::new(PartitionStats::new(parent_stats));
+    let partition_id = partition_state.id as u32;
+
+    // Load segments from disk to determine should_increment_offset and 
current offset
+    let partition_path = config.get_partition_path(stream_id, topic_id, 
partition_id as usize);
+    let log_files = collect_log_files(&partition_path).await?;
+    let should_increment_offset = !log_files.is_empty()
+        && log_files
+            .first()
+            .map(|entry| {
+                let log_file_name = entry
+                    .path
+                    .file_stem()
+                    .unwrap()
+                    .to_string_lossy()
+                    .to_string();
+
+                let start_offset = log_file_name.parse::<u64>().unwrap();
+
+                // Build file paths directly
+                let messages_file_path =
+                    format!("{}/{}.{}", partition_path, start_offset, 
LOG_EXTENSION);
+                std::fs::metadata(&messages_file_path).is_ok_and(|metadata| 
metadata.len() > 0)
+            })
+            .unwrap_or_else(|| false);
+
+    info!(
+        "Loading partition with ID: {} for stream with ID: {} and topic with 
ID: {}, for path: {} from disk...",
+        partition_id, stream_id, topic_id, partition_path
+    );
+    // Load consumer offsets
+    let message_deduplicator = create_message_deduplicator(config);
+    let consumer_offset_path =
+        config.get_consumer_offsets_path(stream_id, topic_id, partition_id as 
usize);
+    let consumer_group_offsets_path =
+        config.get_consumer_group_offsets_path(stream_id, topic_id, 
partition_id as usize);
+
+    let consumer_offset = Arc::new(
+        load_consumer_offsets(&consumer_offset_path, ConsumerKind::Consumer)?
+            .into_iter()
+            .map(|offset| (offset.consumer_id as usize, offset))
+            .collect::<HashMap<usize, ConsumerOffset>>()
+            .into(),
+    );
+
+    let consumer_group_offset = Arc::new(
+        load_consumer_offsets(&consumer_group_offsets_path, 
ConsumerKind::ConsumerGroup)?
+            .into_iter()
+            .map(|offset| (offset.consumer_id as usize, offset))
+            .collect::<HashMap<usize, ConsumerOffset>>()
+            .into(),
+    );
+
+    let log = Default::default();
+    let partition = partition2::Partition::new(
+        partition_state.created_at,
+        should_increment_offset,
+        stats,
+        message_deduplicator,
+        Arc::new(Default::default()),
+        consumer_offset,
+        consumer_group_offset,
+        log,
+    );
+
+    info!(
+        "Loaded partition with ID: {} for stream with ID: {} and topic with 
ID: {}",
+        partition_id, stream_id, topic_id
+    );
+
+    Ok(partition)
+}
diff --git a/core/server/src/compat/index_rebuilding/index_rebuilder.rs 
b/core/server/src/compat/index_rebuilding/index_rebuilder.rs
index dbc93ebe..402739be 100644
--- a/core/server/src/compat/index_rebuilding/index_rebuilder.rs
+++ b/core/server/src/compat/index_rebuilding/index_rebuilder.rs
@@ -18,13 +18,11 @@
 
 use crate::server_error::CompatError;
 use crate::streaming::utils::file;
-use async_zip::tokio::write;
 use compio::{
     fs::File,
-    io::{AsyncBufRead, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, 
BufReader, BufWriter},
+    io::{AsyncBufRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader, 
BufWriter},
 };
 use iggy_common::{IGGY_MESSAGE_HEADER_SIZE, IggyMessageHeader};
-use std::io::{Seek, SeekFrom};
 
 pub struct IndexRebuilder {
     pub messages_file_path: String,
diff --git a/core/server/src/configs/system.rs 
b/core/server/src/configs/system.rs
index add75b31..fae172ca 100644
--- a/core/server/src/configs/system.rs
+++ b/core/server/src/configs/system.rs
@@ -31,7 +31,8 @@ use serde::{Deserialize, Serialize};
 use serde_with::DisplayFromStr;
 use serde_with::serde_as;
 
-const INDEX_EXTENSION: &str = "index";
+pub const INDEX_EXTENSION: &str = "index";
+pub const LOG_EXTENSION: &str = "log";
 
 #[derive(Debug, Deserialize, Serialize)]
 pub struct SystemConfig {
@@ -283,6 +284,17 @@ impl SystemConfig {
         )
     }
 
+    pub fn get_messages_file_path(
+        &self,
+        stream_id: usize,
+        topic_id: usize,
+        partition_id: usize,
+        start_offset: u64,
+    ) -> String {
+        let path = self.get_segment_path(stream_id, topic_id, partition_id, 
start_offset);
+        format!("{path}.{LOG_EXTENSION}")
+    }
+
     pub fn get_index_path(
         &self,
         stream_id: usize,
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 44917d0b..02ed206a 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -30,7 +30,7 @@ use error_set::ErrContext;
 use figlet_rs::FIGfont;
 use iggy_common::create_user::CreateUser;
 use iggy_common::defaults::DEFAULT_ROOT_USER_ID;
-use iggy_common::{Aes256GcmEncryptor, EncryptorKind, IggyError};
+use iggy_common::{Aes256GcmEncryptor, EncryptorKind, Identifier, IggyError};
 use lending_iterator::lending_iterator::constructors::into_lending_iter;
 use server::args::Args;
 use server::binary::handlers::streams;
@@ -49,7 +49,8 @@ use server::log::tokio_console::Logging;
 use server::server_error::{ConfigError, ServerError};
 use server::shard::namespace::IggyNamespace;
 use server::shard::system::info::SystemInfo;
-use server::shard::{IggyShard, ShardInfo};
+use server::shard::{IggyShard, ShardInfo, calculate_shard_assignment};
+use server::slab::traits_ext::{EntityComponentSystem, 
EntityComponentSystemMutCell, IntoComponents};
 use server::state::StateKind;
 use server::state::command::EntryCommand;
 use server::state::file::FileState;
@@ -189,7 +190,7 @@ async fn main() -> Result<(), ServerError> {
     ));
     let state = SystemState::load(state).await?;
     let (streams_state, users_state) = state.decompose();
-    let streams = load_streams(streams_state.into_values(), &config.system)?;
+    let streams = load_streams(streams_state.into_values(), 
&config.system).await?;
     let users = load_users(users_state.into_values());
 
     // ELEVENTH DISCRETE LOADING STEP.
@@ -221,11 +222,34 @@ async fn main() -> Result<(), ServerError> {
     let (connections, shutdown_handles) = 
create_shard_connections(&shards_set);
     let mut handles = Vec::with_capacity(shards_set.len());
 
+    // TODO: Persist the shards table and load it from the disk, so it does 
not have to be
     // THIRTEENTH DISCRETE LOADING STEP.
     // Shared resources bootstrap.
     let shards_table = Box::new(DashMap::with_capacity(SHARDS_TABLE_CAPACITY));
     let shards_table = Box::leak(shards_table);
     let shards_table: EternalPtr<DashMap<IggyNamespace, ShardInfo>> = 
shards_table.into();
+    streams.with_components(|components| {
+        let (root, ..) = components.into_components();
+        for (_, stream) in root.iter() {
+            stream.topics().with_components(|components| {
+                let (root, ..) = components.into_components();
+                for (_, topic) in root.iter() {
+                    topic.partitions().with_components(|components| {
+                        let (root, ..) = components.into_components();
+                        for (_, partition) in root.iter() {
+                            let stream_id = stream.id();
+                            let topic_id = topic.id();
+                            let partition_id = partition.id();
+                            let ns = IggyNamespace::new(stream_id, topic_id, 
partition_id);
+                            let shard_id = calculate_shard_assignment(&ns, 
shards_set.len() as u32);
+                            let shard_info = ShardInfo::new(shard_id);
+                            shards_table.insert(ns, shard_info);
+                        }
+                    });
+                }
+            })
+        }
+    });
 
     for shard_id in shards_set {
         let id = shard_id as u16;
@@ -245,6 +269,28 @@ async fn main() -> Result<(), ServerError> {
             encryptor.clone(),
         ));
 
+        // Ergh... I knew this will backfire to include `Log` as part of the 
`Partition` entity,
+        // We have to initialize with an default log with every partition, 
once we `Clone` the Streams / Topics / Partitions,
+        // because `Clone` impl for `Partition` does not clone the actual log, 
just creates an empty one.
+        streams.with_components(|components| {
+            let (root, ..) = components.into_components();
+            for (_, stream) in root.iter() {
+                stream.topics().with_components_mut(|components| {
+                    let (mut root, ..) = components.into_components();
+                    for (_, topic) in root.iter_mut() {
+                        let partitions_count = topic.partitions().len();
+                        for log_id in 0..partitions_count {
+                            let id = 
topic.partitions_mut().insert_default_log();
+                            assert_eq!(
+                                id, log_id,
+                                "main: partition_insert_default_log: id 
mismatch when creating default log"
+                            );
+                        }
+                    }
+                })
+            }
+        });
+
         let handle = std::thread::Builder::new()
             .name(format!("shard-{id}"))
             .spawn(move || {
@@ -329,7 +375,6 @@ async fn main() -> Result<(), ServerError> {
     let _command_handler = BackgroundServerCommandHandler::new(system.clone(), 
&config)
         .install_handler(SaveMessagesExecutor)
         .install_handler(MaintainMessagesExecutor)
-        .install_handler(ArchiveStateExecutor)
         .install_handler(CleanPersonalAccessTokensExecutor)
         .install_handler(SysInfoPrintExecutor)
         .install_handler(VerifyHeartbeatsExecutor);
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
index 615a5f71..fc203b77 100644
--- a/core/server/src/shard/builder.rs
+++ b/core/server/src/shard/builder.rs
@@ -149,8 +149,7 @@ impl IggyShardBuilder {
             id: id,
             shards: shards,
             shards_table,
-            //streams2: streams, // TODO: Fixme
-            streams2: Default::default(),
+            streams2: streams, // TODO: Fixme
             users: RefCell::new(users),
             storage: storage,
             encryptor: encryptor,
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index d4f57a89..e64cafb6 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -70,7 +70,10 @@ use crate::{
         },
     },
     shard_error, shard_info, shard_warn,
-    slab::{streams::Streams, traits_ext::EntityMarker},
+    slab::{
+        streams::Streams,
+        traits_ext::{EntityComponentSystem, EntityMarker, Insert},
+    },
     state::{
         StateKind,
         file::FileState,
@@ -235,6 +238,7 @@ impl IggyShard {
     }
 
     pub async fn init(&self) -> Result<(), IggyError> {
+        self.load_segments().await?;
         let _ = self.load_users().await;
         Ok(())
     }
@@ -244,7 +248,7 @@ impl IggyShard {
         // loads streams and starts accepting connections. This is necessary to
         // have the correct statistics when the server starts.
         let now = Instant::now();
-        //self.get_stats().await?;
+        self.get_stats().await?;
         shard_info!(self.id, "Starting...");
         self.init().await?;
         // TODO: Fixme
@@ -294,6 +298,82 @@ impl IggyShard {
         Ok(())
     }
 
+    async fn load_segments(&self) -> Result<(), IggyError> {
+        use crate::bootstrap::load_segments;
+        use crate::shard::namespace::IggyNamespace;
+        for shard_entry in self.shards_table.iter() {
+            let (namespace, shard_info) = shard_entry.pair();
+
+            if shard_info.id == self.id {
+                let stream_id = namespace.stream_id();
+                let topic_id = namespace.topic_id();
+                let partition_id = namespace.partition_id();
+
+                shard_info!(
+                    self.id,
+                    "Loading segments for stream: {}, topic: {}, partition: 
{}",
+                    stream_id,
+                    topic_id,
+                    partition_id
+                );
+
+                let partition_path =
+                    self.config
+                        .system
+                        .get_partition_path(stream_id, topic_id, partition_id);
+                let stats = self.streams2.with_partition_by_id(
+                    &Identifier::numeric(stream_id as u32).unwrap(),
+                    &Identifier::numeric(topic_id as u32).unwrap(),
+                    partition_id,
+                    |(_, stats, ..)| stats.clone(),
+                );
+                match load_segments(
+                    &self.config.system,
+                    stream_id,
+                    topic_id,
+                    partition_id,
+                    partition_path,
+                    stats,
+                )
+                .await
+                {
+                    Ok(loaded_log) => {
+                        self.streams2.with_partition_by_id_mut(
+                            &Identifier::numeric(stream_id as u32).unwrap(),
+                            &Identifier::numeric(topic_id as u32).unwrap(),
+                            partition_id,
+                            |(_,_,_ , offset,  .., mut log)| {
+                                *log = loaded_log;
+                                let current_offset = 
log.active_segment().end_offset;
+                                offset.store(current_offset, 
Ordering::Relaxed);
+                            },
+                        );
+                        shard_info!(
+                            self.id,
+                            "Successfully loaded segments for stream: {}, 
topic: {}, partition: {}",
+                            stream_id,
+                            topic_id,
+                            partition_id
+                        );
+                    }
+                    Err(e) => {
+                        shard_error!(
+                            self.id,
+                            "Failed to load segments for stream: {}, topic: 
{}, partition: {}: {}",
+                            stream_id,
+                            topic_id,
+                            partition_id,
+                            e
+                        );
+                        return Err(e);
+                    }
+                }
+            }
+        }
+
+        Ok(())
+    }
+
     async fn load_users(&self) -> Result<(), IggyError> {
         let users = self.users.borrow();
         let users_count = users.len();
@@ -330,12 +410,6 @@ impl IggyShard {
         self.shards.len() as u32
     }
 
-    pub fn calculate_shard_assignment(&self, ns: &IggyNamespace) -> u16 {
-        let mut hasher = Murmur3Hasher::default();
-        hasher.write_u64(ns.inner());
-        (hasher.finish32() % self.get_available_shards_count()) as u16
-    }
-
     pub async fn handle_shard_message(&self, message: ShardMessage) -> 
Option<ShardResponse> {
         match message {
             ShardMessage::Request(request) => match 
self.handle_request(request).await {
@@ -955,3 +1029,9 @@ impl IggyShard {
         }
     }
 }
+
+pub fn calculate_shard_assignment(ns: &IggyNamespace, upperbound: u32) -> u16 {
+    let mut hasher = Murmur3Hasher::default();
+    hasher.write_u64(ns.inner());
+    (hasher.finish32() % upperbound) as u16
+}
diff --git a/core/server/src/shard/system/partitions.rs 
b/core/server/src/shard/system/partitions.rs
index 432eb0fd..05c0c74c 100644
--- a/core/server/src/shard/system/partitions.rs
+++ b/core/server/src/shard/system/partitions.rs
@@ -19,6 +19,7 @@
 use super::COMPONENT;
 use crate::shard::IggyShard;
 use crate::shard::ShardInfo;
+use crate::shard::calculate_shard_assignment;
 use crate::shard::namespace::IggyNamespace;
 use crate::shard_info;
 use crate::slab::traits_ext::EntityMarker;
@@ -112,10 +113,11 @@ impl IggyShard {
         self.metrics.increment_partitions(partitions_count);
         self.metrics.increment_segments(partitions_count);
 
+        let shards_count = self.get_available_shards_count();
         for partition_id in partitions.iter().map(|p| p.id()) {
             // TODO: Create shard table recordsj.
             let ns = IggyNamespace::new(numeric_stream_id, numeric_topic_id, 
partition_id);
-            let shard_id = self.calculate_shard_assignment(&ns);
+            let shard_id = calculate_shard_assignment(&ns, shards_count);
             let shard_info = ShardInfo::new(shard_id);
             let is_current_shard = self.id == shard_info.id;
             self.insert_shard_table_record(ns, shard_info);
diff --git a/core/server/src/slab/partitions.rs 
b/core/server/src/slab/partitions.rs
index 7c7f15b7..34ca117d 100644
--- a/core/server/src/slab/partitions.rs
+++ b/core/server/src/slab/partitions.rs
@@ -52,7 +52,7 @@ impl Clone for Partitions {
             offset: self.offset.clone(),
             consumer_offset: self.consumer_offset.clone(),
             consumer_group_offset: self.consumer_group_offset.clone(),
-            log: Slab::with_capacity(PARTITIONS_CAPACITY),
+            log: Slab::with_capacity(PARTITIONS_CAPACITY), // Empty log, we 
don't clone the actual logs.
         }
     }
 }
@@ -188,6 +188,10 @@ impl Partitions {
         self.root.len()
     }
 
+    pub fn insert_default_log(&mut self) -> ContainerId {
+        self.log.insert(Default::default())
+    }
+
     pub fn with_partition_by_id<T>(
         &self,
         id: ContainerId,
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index 0343231f..549b8d82 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -4,16 +4,24 @@ use crate::{
     shard::{namespace::IggyFullNamespace, system::messages::PollingArgs},
     shard_info,
     slab::{
-        consumer_groups::ConsumerGroups, helpers, partitions::{self, 
Partitions}, topics::Topics, traits_ext::{
+        Keyed,
+        consumer_groups::ConsumerGroups,
+        helpers,
+        partitions::{self, Partitions},
+        topics::Topics,
+        traits_ext::{
             ComponentsById, DeleteCell, EntityComponentSystem, 
EntityComponentSystemMutCell,
             InsertCell, InteriorMutability, IntoComponents,
-        }, Keyed
+        },
     },
     streaming::{
-        partitions::{journal::Journal, partition2::{PartitionRef, 
PartitionRefMut}},
+        partitions::{
+            journal::Journal,
+            partition2::{PartitionRef, PartitionRefMut},
+        },
         polling_consumer::PollingConsumer,
         segments::{
-            storage::create_segment_storage, IggyMessagesBatchMut, 
IggyMessagesBatchSet, Segment2
+            IggyMessagesBatchMut, IggyMessagesBatchSet, Segment2, 
storage::create_segment_storage,
         },
         stats::stats::StreamStats,
         streams::{
@@ -166,14 +174,19 @@ impl MainOps for Streams {
             streaming_partitions::helpers::calculate_current_offset(),
         );
 
-        let current_position = self.with_partition_by_id(stream_id, topic_id, 
partition_id, |(.., log)| {
-            log.journal().inner().size + log.active_segment().size
-        });
+        let current_position =
+            self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {
+                log.journal().inner().size + log.active_segment().size
+            });
         self.with_partition_by_id_async(
             stream_id,
             topic_id,
             partition_id,
-            
streaming_partitions::helpers::deduplicate_messages(current_offset, 
current_position, &mut input),
+            streaming_partitions::helpers::deduplicate_messages(
+                current_offset,
+                current_position,
+                &mut input,
+            ),
         )
         .await;
 
diff --git a/core/server/src/state/system.rs b/core/server/src/state/system.rs
index 2c3ca43b..b5535d0e 100644
--- a/core/server/src/state/system.rs
+++ b/core/server/src/state/system.rs
@@ -30,17 +30,18 @@ use iggy_common::MaxTopicSize;
 use iggy_common::create_user::CreateUser;
 use iggy_common::defaults::DEFAULT_ROOT_USER_ID;
 use iggy_common::{IdKind, Identifier, Permissions, UserStatus};
+use std::collections::BTreeMap;
 use std::fmt::Display;
 use tracing::{debug, error, info};
 
 #[derive(Debug, Clone)]
 pub struct SystemState {
-    pub streams: AHashMap<u32, StreamState>,
+    pub streams: BTreeMap<u32, StreamState>,
     pub users: AHashMap<u32, UserState>,
 }
 
 impl SystemState {
-    pub fn decompose(self) -> (AHashMap<u32, StreamState>, AHashMap<u32, 
UserState>) {
+    pub fn decompose(self) -> (BTreeMap<u32, StreamState>, AHashMap<u32, 
UserState>) {
         (self.streams, self.users)
     }
 }
@@ -50,15 +51,15 @@ pub struct StreamState {
     pub id: u32,
     pub name: String,
     pub created_at: IggyTimestamp,
-    pub topics: AHashMap<u32, TopicState>,
+    pub topics: BTreeMap<u32, TopicState>,
 }
 
 #[derive(Debug, Clone)]
 pub struct TopicState {
     pub id: u32,
     pub name: String,
-    pub partitions: AHashMap<u32, PartitionState>,
-    pub consumer_groups: AHashMap<u32, ConsumerGroupState>,
+    pub partitions: BTreeMap<u32, PartitionState>,
+    pub consumer_groups: BTreeMap<u32, ConsumerGroupState>,
     pub compression_algorithm: CompressionAlgorithm,
     pub message_expiry: IggyExpiry,
     pub max_topic_size: MaxTopicSize,
@@ -161,7 +162,7 @@ impl SystemState {
     }
 
     pub async fn init(entries: Vec<StateEntry>) -> Result<Self, IggyError> {
-        let mut streams = AHashMap::new();
+        let mut streams = BTreeMap::new();
         let mut users = AHashMap::new();
         for entry in entries {
             debug!("Processing state entry: {entry}",);
@@ -177,7 +178,7 @@ impl SystemState {
                     let stream = StreamState {
                         id: stream_id,
                         name: command.name.clone(),
-                        topics: AHashMap::new(),
+                        topics: BTreeMap::new(),
                         created_at: entry.timestamp,
                     };
                     streams.insert(stream.id, stream);
@@ -210,15 +211,15 @@ impl SystemState {
                     let topic = TopicState {
                         id: topic_id,
                         name: command.name,
-                        consumer_groups: AHashMap::new(),
+                        consumer_groups: BTreeMap::new(),
                         compression_algorithm: command.compression_algorithm,
                         message_expiry: command.message_expiry,
                         max_topic_size: command.max_topic_size,
                         replication_factor: command.replication_factor,
                         created_at: entry.timestamp,
                         partitions: if command.partitions_count > 0 {
-                            let mut partitions = AHashMap::new();
-                            for i in 1..=command.partitions_count {
+                            let mut partitions = BTreeMap::new();
+                            for i in 0..command.partitions_count {
                                 partitions.insert(
                                     i,
                                     PartitionState {
@@ -229,7 +230,7 @@ impl SystemState {
                             }
                             partitions
                         } else {
-                            AHashMap::new()
+                            BTreeMap::new()
                         },
                     };
                     stream.topics.insert(topic.id, topic);
@@ -488,7 +489,7 @@ impl SystemState {
     }
 }
 
-fn find_stream_id(streams: &AHashMap<u32, StreamState>, stream_id: 
&Identifier) -> u32 {
+fn find_stream_id(streams: &BTreeMap<u32, StreamState>, stream_id: 
&Identifier) -> u32 {
     match stream_id.kind {
         IdKind::Numeric => stream_id
             .get_u32_value()
@@ -506,7 +507,7 @@ fn find_stream_id(streams: &AHashMap<u32, StreamState>, 
stream_id: &Identifier)
     }
 }
 
-fn find_topic_id(topics: &AHashMap<u32, TopicState>, topic_id: &Identifier) -> 
u32 {
+fn find_topic_id(topics: &BTreeMap<u32, TopicState>, topic_id: &Identifier) -> 
u32 {
     match topic_id.kind {
         IdKind::Numeric => topic_id
             .get_u32_value()
@@ -525,7 +526,7 @@ fn find_topic_id(topics: &AHashMap<u32, TopicState>, 
topic_id: &Identifier) -> u
 }
 
 fn find_consumer_group_id(
-    groups: &AHashMap<u32, ConsumerGroupState>,
+    groups: &BTreeMap<u32, ConsumerGroupState>,
     group_id: &Identifier,
 ) -> u32 {
     match group_id.kind {
diff --git a/core/server/src/streaming/partitions/helpers.rs 
b/core/server/src/streaming/partitions/helpers.rs
index 8ce35655..c16020d8 100644
--- a/core/server/src/streaming/partitions/helpers.rs
+++ b/core/server/src/streaming/partitions/helpers.rs
@@ -472,9 +472,6 @@ pub fn get_segment_range_by_timestamp(
 ) -> impl FnOnce(ComponentsById<PartitionRef>) -> 
Result<std::ops::Range<usize>, IggyError> {
     move |(.., log)| -> Result<std::ops::Range<usize>, IggyError> {
         let segments = log.segments();
-        for seg in segments {
-            tracing::warn!("timestamp: {}, segment: {:?}", timestamp, seg);
-        }
         let start = log
             .segments()
             .iter()
@@ -705,9 +702,6 @@ pub fn commit_journal() -> impl 
FnOnce(ComponentsById<PartitionRefMut>) -> IggyM
         let batches = log.journal_mut().commit();
         log.ensure_indexes();
         batches.append_indexes_to(log.active_indexes_mut().unwrap());
-        let indexes = log.active_indexes_mut().unwrap();
-        let first = indexes.get(0).unwrap();
-        let last = indexes.last().unwrap();
         batches
     }
 }
diff --git a/core/server/src/streaming/partitions/log.rs 
b/core/server/src/streaming/partitions/log.rs
index 83712975..ebee527e 100644
--- a/core/server/src/streaming/partitions/log.rs
+++ b/core/server/src/streaming/partitions/log.rs
@@ -28,15 +28,6 @@ where
     storage: Vec<Storage>,
 }
 
-impl<J> Clone for SegmentedLog<J>
-where
-    J: Journal + Default + Debug + Clone,
-{
-    fn clone(&self) -> Self {
-        Default::default()
-    }
-}
-
 impl<J> Default for SegmentedLog<J>
 where
     J: Journal + Debug + Default,
@@ -135,6 +126,12 @@ where
         self.storage.push(storage);
         self.indexes.push(None);
     }
+
+    pub fn set_segment_indexes(&mut self, segment_index: usize, indexes: 
IggyIndexesMut) {
+        if let Some(segment_indexes) = self.indexes.get_mut(segment_index) {
+            *segment_indexes = Some(indexes);
+        }
+    }
 }
 
 impl<J> SegmentedLog<J>
diff --git a/core/server/src/streaming/partitions/mod.rs 
b/core/server/src/streaming/partitions/mod.rs
index b3863f4d..28d91e4f 100644
--- a/core/server/src/streaming/partitions/mod.rs
+++ b/core/server/src/streaming/partitions/mod.rs
@@ -23,7 +23,6 @@ pub mod log;
 pub mod messages;
 pub mod partition2;
 pub mod segments;
-pub mod storage;
 pub mod storage2;
 
 pub const COMPONENT: &str = "STREAMING_PARTITIONS";
diff --git a/core/server/src/streaming/partitions/partition2.rs 
b/core/server/src/streaming/partitions/partition2.rs
index a88bac02..92d8f738 100644
--- a/core/server/src/streaming/partitions/partition2.rs
+++ b/core/server/src/streaming/partitions/partition2.rs
@@ -131,7 +131,7 @@ impl Clone for Partition {
             offset: Arc::clone(&self.offset),
             consumer_offset: Arc::clone(&self.consumer_offset),
             consumer_group_offset: Arc::clone(&self.consumer_group_offset),
-            log: self.log.clone(),
+            log: Default::default(),
         }
     }
 }
diff --git a/core/server/src/streaming/partitions/storage.rs 
b/core/server/src/streaming/partitions/storage.rs
deleted file mode 100644
index 683b50a0..00000000
--- a/core/server/src/streaming/partitions/storage.rs
+++ /dev/null
@@ -1,266 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use crate::compat::index_rebuilding::index_rebuilder::IndexRebuilder;
-use crate::configs::cache_indexes::CacheIndexesConfig;
-use crate::io::fs_utils;
-use crate::state::system::PartitionState;
-use crate::streaming::partitions::COMPONENT;
-use crate::streaming::partitions::consumer_offset::ConsumerOffset;
-use crate::streaming::persistence::persister::PersisterKind;
-use crate::streaming::segments::*;
-use crate::streaming::utils::file;
-use compio::fs;
-use compio::fs::create_dir_all;
-use compio::io::AsyncReadExt;
-use error_set::ErrContext;
-use iggy_common::ConsumerKind;
-use iggy_common::IggyError;
-use std::path::Path;
-use std::sync::Arc;
-use std::sync::atomic::Ordering;
-use tracing::{error, info, trace, warn};
-
-#[derive(Debug)]
-pub struct FilePartitionStorage {
-    persister: Arc<PersisterKind>,
-}
-
-impl FilePartitionStorage {
-    pub fn new(persister: Arc<PersisterKind>) -> Self {
-        Self { persister }
-    }
-}
-/*
-
-impl PartitionStorage for FilePartitionStorage {
-    async fn load(
-        &self,
-        partition: &mut Partition,
-        state: PartitionState,
-    ) -> Result<(), IggyError> {
-        info!(
-            "Loading partition with ID: {} for stream with ID: {} and topic 
with ID: {}, for path: {} from disk...",
-            partition.partition_id,
-            partition.stream_id,
-            partition.topic_id,
-            partition.partition_path
-        );
-        partition.created_at = state.created_at;
-        // TODO: Replace this with the dir walk impl, that is mentined
-        // in the main function.
-        let mut dir_entries = std::fs::read_dir(&partition.partition_path)
-                .with_error_context(|error| format!(
-                    "{COMPONENT} (error: {error}) - failed to read partition 
with ID: {} for stream with ID: {} and topic with ID: {} and path: {}.",
-                    partition.partition_id, partition.stream_id, 
partition.topic_id, partition.partition_path,
-                ))
-                .map_err(|_| IggyError::CannotReadPartitions)?;
-
-        let mut log_files = Vec::new();
-        while let Some(dir_entry) = dir_entries.next() {
-            let dir_entry = dir_entry.unwrap();
-            let path = dir_entry.path();
-            let extension = path.extension();
-            if extension.is_none() || extension.unwrap() != LOG_EXTENSION {
-                continue;
-            }
-            let metadata = dir_entry.metadata().unwrap();
-            if metadata.is_dir() {
-                continue;
-            }
-            log_files.push(dir_entry);
-        }
-
-        log_files.sort_by_key(|a| a.file_name());
-
-        for dir_entry in log_files {
-            let log_file_name = dir_entry
-                .file_name()
-                .into_string()
-                .unwrap()
-                .replace(&format!(".{LOG_EXTENSION}"), "");
-
-            let start_offset = log_file_name.parse::<u64>().unwrap();
-            let mut segment = Segment::create(
-                partition.stream_id,
-                partition.topic_id,
-                partition.partition_id,
-                start_offset,
-                partition.config.clone(),
-                partition.message_expiry,
-                partition.size_of_parent_stream.clone(),
-                partition.size_of_parent_topic.clone(),
-                partition.size_bytes.clone(),
-                partition.messages_count_of_parent_stream.clone(),
-                partition.messages_count_of_parent_topic.clone(),
-                partition.messages_count.clone(),
-                false,
-            );
-
-            let index_path = segment.index_file_path().to_owned();
-            let messages_file_path = segment.messages_file_path().to_owned();
-            let time_index_path = index_path.replace(INDEX_EXTENSION, 
"timeindex");
-
-            // TODO: Move to fs_utils
-            async fn try_exists(index_path: &str) -> Result<bool, 
std::io::Error> {
-                match compio::fs::metadata(index_path).await {
-                    Ok(_) => Ok(true),
-                    Err(err) => match err.kind() {
-                        std::io::ErrorKind::NotFound => Ok(false),
-                        _ => Err(err),
-                    },
-                }
-            }
-            let index_path_exists = try_exists(&index_path).await.unwrap();
-            let time_index_path_exists = 
try_exists(&time_index_path).await.unwrap();
-            let index_cache_enabled = matches!(
-                partition.config.segment.cache_indexes,
-                CacheIndexesConfig::All | CacheIndexesConfig::OpenSegment
-            );
-
-            // Rebuild indexes if index cache is enabled and index at path 
does not exists.
-            if index_cache_enabled && (!index_path_exists || 
time_index_path_exists) {
-                warn!(
-                    "Index at path {} does not exist, rebuilding it based on 
{}...",
-                    index_path, messages_file_path
-                );
-                let now = tokio::time::Instant::now();
-                let index_rebuilder = IndexRebuilder::new(
-                    messages_file_path.clone(),
-                    index_path.clone(),
-                    start_offset,
-                );
-                index_rebuilder.rebuild().await.unwrap_or_else(|e| {
-                    panic!(
-                        "Failed to rebuild index for partition with ID: {} for
-                    stream with ID: {} and topic with ID: {}. Error: {e}",
-                        partition.partition_id, partition.stream_id, 
partition.topic_id,
-                    )
-                });
-                info!(
-                    "Rebuilding index for path {} finished, it took {} ms",
-                    index_path,
-                    now.elapsed().as_millis()
-                );
-            }
-
-            if time_index_path_exists {
-                tokio::fs::remove_file(&time_index_path).await.unwrap();
-            }
-
-            segment.load_from_disk().await.with_error_context(|error| {
-                format!("{COMPONENT} (error: {error}) - failed to load 
segment: {segment}",)
-            })?;
-
-            // If the first segment has at least a single message, we should 
increment the offset.
-            if !partition.should_increment_offset {
-                partition.should_increment_offset = 
segment.get_messages_size() > 0;
-            }
-
-            if partition.config.partition.validate_checksum {
-                info!(
-                    "Validating messages checksum for partition with ID: {} 
and segment with start offset: {}...",
-                    partition.partition_id,
-                    segment.start_offset()
-                );
-                segment.validate_messages_checksums().await?;
-                info!(
-                    "Validated messages checksum for partition with ID: {} and 
segment with start offset: {}.",
-                    partition.partition_id,
-                    segment.start_offset()
-                );
-            }
-
-            // Load the unique message IDs for the partition if the 
deduplication feature is enabled.
-            let mut unique_message_ids_count = 0;
-            if let Some(message_deduplicator) = 
&partition.message_deduplicator {
-                let max_entries = 
partition.config.message_deduplication.max_entries as u32;
-                info!(
-                    "Loading {max_entries} unique message IDs for partition 
with ID: {} and segment with start offset: {}...",
-                    partition.partition_id,
-                    segment.start_offset()
-                );
-                let message_ids = 
segment.load_message_ids(max_entries).await.with_error_context(|error| {
-                    format!("{COMPONENT} (error: {error}) - failed to load 
message ids, segment: {segment}",)
-                })?;
-                for message_id in message_ids {
-                    if message_deduplicator.try_insert(message_id).await {
-                        unique_message_ids_count += 1;
-                    } else {
-                        warn!(
-                            "Duplicated message ID: {} for partition with ID: 
{} and segment with start offset: {}.",
-                            message_id,
-                            partition.partition_id,
-                            segment.start_offset()
-                        );
-                    }
-                }
-                info!(
-                    "Loaded: {} unique message IDs for partition with ID: {} 
and segment with start offset: {}...",
-                    unique_message_ids_count,
-                    partition.partition_id,
-                    segment.start_offset()
-                );
-            }
-
-            if CacheIndexesConfig::None == 
partition.config.segment.cache_indexes {
-                segment.drop_indexes();
-            }
-
-            partition
-                .segments_count_of_parent_stream
-                .fetch_add(1, Ordering::SeqCst);
-            partition.segments.push(segment);
-        }
-
-        if !partition.segments.is_empty() {
-            let last_segment = partition.segments.last_mut().unwrap();
-            partition.current_offset = last_segment.end_offset();
-        }
-
-        // If cache_indexes is OpenSegment, clear all segment indexes except 
the last one
-        if matches!(
-            partition.config.segment.cache_indexes,
-            CacheIndexesConfig::OpenSegment
-        ) && !partition.segments.is_empty()
-        {
-            let segments_count = partition.segments.len();
-            for i in 0..segments_count - 1 {
-                partition.segments[i].drop_indexes();
-            }
-        }
-
-        partition
-            .load_consumer_offsets()
-            .await
-            .with_error_context(|error| {
-                format!("{COMPONENT} (error: {error}) - failed to load 
consumer offsets, partition: {partition}",)
-            })?;
-        info!(
-            "Loaded partition with ID: {} for stream with ID: {} and topic 
with ID: {}, current offset: {}.",
-            partition.partition_id,
-            partition.stream_id,
-            partition.topic_id,
-            partition.current_offset
-        );
-
-        Ok(())
-    }
-}
-
-*/
diff --git a/core/server/src/streaming/segments/mod.rs 
b/core/server/src/streaming/segments/mod.rs
index 000fc972..6a2b28fc 100644
--- a/core/server/src/streaming/segments/mod.rs
+++ b/core/server/src/streaming/segments/mod.rs
@@ -18,21 +18,16 @@
 
 mod indexes;
 mod messages;
-mod reading_messages;
-mod segment;
 mod segment2;
 mod types;
 
 pub mod storage;
 
 pub use indexes::IggyIndexesMut;
-pub use segment::Segment;
 pub use segment2::Segment2;
 pub use types::IggyMessageHeaderViewMut;
 pub use types::IggyMessageViewMut;
 pub use types::IggyMessagesBatchMut;
 pub use types::IggyMessagesBatchSet;
 
-pub const LOG_EXTENSION: &str = "log";
-pub const INDEX_EXTENSION: &str = "index";
 pub const SEGMENT_MAX_SIZE_BYTES: u64 = 1024 * 1024 * 1024;
diff --git a/core/server/src/streaming/segments/reading_messages.rs 
b/core/server/src/streaming/segments/reading_messages.rs
deleted file mode 100644
index 514cc353..00000000
--- a/core/server/src/streaming/segments/reading_messages.rs
+++ /dev/null
@@ -1,401 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use super::{IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet};
-use crate::streaming::segments::segment::Segment;
-use error_set::ErrContext;
-use iggy_common::{IggyByteSize, IggyError};
-use std::sync::atomic::Ordering;
-use tracing::{error, trace};
-
-const COMPONENT: &str = "STREAMING_SEGMENT";
-
-/*
-impl Segment {
-    pub fn get_messages_size(&self) -> IggyByteSize {
-        let on_disk_size = self.messages_size.load(Ordering::Relaxed);
-        let accumulator_size = self.accumulator.size() as u64;
-        IggyByteSize::from(on_disk_size + accumulator_size)
-    }
-
-    pub fn get_messages_count(&self) -> u32 {
-        if self.get_messages_size() == 0 {
-            return 0;
-        }
-
-        (self.end_offset - self.start_offset + 1) as u32
-    }
-
-    pub async fn get_messages_by_timestamp(
-        &self,
-        timestamp: u64,
-        count: u32,
-    ) -> Result<IggyMessagesBatchSet, IggyError> {
-        if count == 0 {
-            return Ok(IggyMessagesBatchSet::default());
-        }
-
-        trace!(
-            "Getting {count} messages by timestamp {timestamp}, 
current_offset: {}...",
-            self.end_offset
-        );
-
-        // Case 0: Accumulator is empty, so all messages have to be on disk
-        if self.accumulator.is_empty() {
-            return Ok(IggyMessagesBatchSet::from(
-                self.load_messages_from_disk_by_timestamp(timestamp, count)
-                    .await?,
-            ));
-        }
-
-        let accumulator_first_timestamp = self.accumulator.first_timestamp();
-        let accumulator_last_timestamp = self.accumulator.last_timestamp();
-
-        // Case 1: Requested timestamp is higher than any available timestamp
-        if timestamp > accumulator_last_timestamp {
-            return Ok(IggyMessagesBatchSet::empty());
-        }
-
-        // Case 2: Requested timestamp falls within accumulator range only
-        if timestamp >= accumulator_first_timestamp {
-            // Get all messages from accumulator with timestamp >= the 
requested timestamp
-            return Ok(self.accumulator.get_messages_by_timestamp(timestamp, 
count));
-        }
-
-        // Case 3: Timestamp is lower than accumulator's first timestamp
-        // Need to get messages from disk and potentially combine with 
accumulator
-        let messages_from_disk = IggyMessagesBatchSet::from(self
-            .load_messages_from_disk_by_timestamp(timestamp, count)
-            .await
-            .with_error_context(|error| {
-                format!(
-                    "{COMPONENT} (error: {error}) - failed to load messages 
from disk by timestamp, stream ID: {}, topic ID: {}, partition ID: {}, 
timestamp: {timestamp}",
-                    self.stream_id, self.topic_id, self.partition_id
-                )
-            })?);
-
-        // If we got enough messages from disk or there are no messages from 
disk,
-        // we don't need to consider messages from the accumulator
-        if messages_from_disk.count() >= count {
-            return Ok(messages_from_disk);
-        }
-
-        // If we need more messages, get them from accumulator, respecting the 
original timestamp
-        // This ensures we don't miss messages with the same or very close 
timestamps
-        let remaining_count = count - messages_from_disk.count();
-        let accumulator_messages = self
-            .accumulator
-            .get_messages_by_timestamp(timestamp, remaining_count);
-
-        // Combine the messages
-        let mut out = messages_from_disk;
-        out.add_batch_set(accumulator_messages);
-
-        Ok(out)
-    }
-
-    pub async fn get_messages_by_offset(
-        &self,
-        mut offset: u64,
-        count: u32,
-    ) -> Result<IggyMessagesBatchSet, IggyError> {
-        if count == 0 {
-            return Ok(IggyMessagesBatchSet::default());
-        }
-
-        if offset < self.start_offset {
-            offset = self.start_offset;
-        }
-
-        let mut end_offset = offset + (count - 1) as u64;
-        if end_offset > self.end_offset {
-            end_offset = self.end_offset;
-        }
-
-        trace!(
-            "Getting messages by offset: {}, count: {}, segment start_offset: 
{}, segment end_offset: {}",
-            offset, count, self.start_offset, self.end_offset
-        );
-
-        // Case 0: Accumulator is empty, so all messages have to be on disk
-        if self.accumulator.is_empty() {
-            return self.load_messages_from_disk_by_offset(offset, count).await;
-        }
-
-        let accumulator_first_msg_offset = self.accumulator.first_offset();
-        let accumulator_last_msg_offset = self.accumulator.last_offset();
-
-        // Case 1: All messages are in accumulator buffer
-        if offset >= accumulator_first_msg_offset && end_offset <= 
accumulator_last_msg_offset {
-            return Ok(self.accumulator.get_messages_by_offset(offset, count));
-        }
-
-        // Case 2: All messages are on disk
-        if end_offset < accumulator_first_msg_offset {
-            return self.load_messages_from_disk_by_offset(offset, count).await;
-        }
-
-        // Case 3: Messages span disk and accumulator buffer boundary
-        // Calculate how many messages we need from disk
-        let disk_count = if offset < accumulator_first_msg_offset {
-            ((accumulator_first_msg_offset - offset) as u32).min(count)
-        } else {
-            0
-        };
-
-        let mut combined_batch_set = IggyMessagesBatchSet::empty();
-
-        // Load messages from disk if needed
-        if disk_count > 0 {
-            let disk_messages = self
-            .load_messages_from_disk_by_offset(offset, disk_count)
-            .await
-            .with_error_context(|error| {
-                format!(
-                    "STREAMING_SEGMENT (error: {error}) - failed to load 
messages from disk, stream ID: {}, topic ID: {}, partition ID: {}, start 
offset: {offset}, count: {disk_count}",
-                    self.stream_id, self.topic_id, self.partition_id
-                )
-            })?;
-
-            if !disk_messages.is_empty() {
-                combined_batch_set.add_batch_set(disk_messages);
-            }
-        }
-
-        // Calculate how many more messages we need from the accumulator
-        let remaining_count = count - combined_batch_set.count();
-
-        if remaining_count > 0 {
-            let accumulator_start_offset = std::cmp::max(offset, 
accumulator_first_msg_offset);
-
-            let accumulator_messages = self
-                .accumulator
-                .get_messages_by_offset(accumulator_start_offset, 
remaining_count);
-
-            if !accumulator_messages.is_empty() {
-                combined_batch_set.add_batch_set(accumulator_messages);
-            }
-        }
-
-        Ok(combined_batch_set)
-    }
-
-    /// Loads and returns `count` newest message IDs from the log file.
-    pub async fn load_message_ids(&self, count: u32) -> Result<Vec<u128>, 
IggyError> {
-        let messages_count = self.get_messages_count();
-        trace!(
-            "Loading message IDs for {messages_count} messages from log file: 
{}",
-            self.messages_path
-        );
-
-        if count == 0 || messages_count == 0 {
-            return Ok(vec![]);
-        }
-
-        let adjusted_count = std::cmp::min(count, messages_count);
-        let relative_start_offset = messages_count - adjusted_count;
-
-        let indexes = self
-            .load_indexes_by_offset(relative_start_offset, adjusted_count)
-            .await?;
-
-        if indexes.is_none() {
-            return Ok(vec![]);
-        }
-
-        let indexes = indexes.unwrap();
-
-        let ids = self
-            .messages_reader
-            .as_ref()
-            .unwrap()
-            .load_all_message_ids_from_disk(indexes, messages_count)
-            .await
-            .with_error_context(|error| {
-                format!("Failed to load message IDs, error: {error} for 
{self}")
-            })?;
-
-        trace!(
-            "Loaded {} message IDs from log file: {}",
-            ids.len(),
-            self.messages_path
-        );
-        Ok(ids)
-    }
-
-    pub async fn validate_messages_checksums(&self) -> Result<(), IggyError> {
-        let messages_count = self.get_messages_count();
-        if messages_count == 0 {
-            return Ok(());
-        }
-
-        const BATCH_COUNT: u32 = 10000;
-        let end_offset = self.end_offset;
-        let mut current_offset = self.start_offset;
-        let mut processed_count = 0;
-
-        while current_offset <= end_offset {
-            let remaining_count = messages_count - processed_count;
-            let batch_count = std::cmp::min(BATCH_COUNT, remaining_count);
-
-            let messages_batch = self
-                .get_messages_by_offset(current_offset, batch_count)
-                .await?;
-
-            for batch in messages_batch.iter() {
-                batch.validate_checksums().with_error_context(|error| {
-                    format!("Failed to validate message checksum, error: 
{error} for {self}")
-                })?;
-                processed_count += batch.count();
-            }
-            current_offset += batch_count as u64;
-        }
-
-        Ok(())
-    }
-
-    async fn load_indexes_by_offset(
-        &self,
-        relative_start_offset: u32,
-        count: u32,
-    ) -> Result<Option<IggyIndexesMut>, IggyError> {
-        let indexes = if let Some(ref indexes) = self.indexes {
-            if !indexes.is_empty() {
-                indexes.slice_by_offset(relative_start_offset, count)
-            } else {
-                self.index_reader
-                    .as_ref()
-                    .expect("Index reader not initialized")
-                    .load_from_disk_by_offset(relative_start_offset, count)
-                    .await?
-            }
-        } else {
-            self.index_reader
-                .as_ref()
-                .expect("Index reader not initialized")
-                .load_from_disk_by_offset(relative_start_offset, count)
-                .await?
-        };
-        Ok(indexes)
-    }
-
-    async fn load_indexes_by_timestamp(
-        &self,
-        timestamp: u64,
-        count: u32,
-    ) -> Result<Option<IggyIndexesMut>, IggyError> {
-        let indexes = if let Some(ref indexes) = self.indexes {
-            if !indexes.is_empty() {
-                indexes.slice_by_timestamp(timestamp, count)
-            } else {
-                self.index_reader
-                    .as_ref()
-                    .unwrap()
-                    .load_from_disk_by_timestamp(timestamp, count)
-                    .await?
-            }
-        } else {
-            self.index_reader
-                .as_ref()
-                .unwrap()
-                .load_from_disk_by_timestamp(timestamp, count)
-                .await?
-        };
-        Ok(indexes)
-    }
-
-    async fn load_messages_from_disk_by_offset(
-        &self,
-        start_offset: u64,
-        count: u32,
-    ) -> Result<IggyMessagesBatchSet, IggyError> {
-        tracing::trace!(
-            "Loading {count} messages from disk, start_offset: {start_offset}, 
segment start_offset: {}, segment end_offset: {}...",
-            self.start_offset,
-            self.end_offset
-        );
-        let relative_start_offset = (start_offset - self.start_offset) as u32;
-
-        let indexes_to_read = self
-            .load_indexes_by_offset(relative_start_offset, count)
-            .await?;
-
-        if indexes_to_read.is_none() {
-            return Ok(IggyMessagesBatchSet::empty());
-        }
-        let indexes_to_read = indexes_to_read.unwrap();
-
-        let batch = self
-            .messages_reader
-            .as_ref()
-            .expect("Messages reader not initialized")
-            .load_messages_from_disk(indexes_to_read)
-            .await
-            .with_error_context(|error| {
-                format!("Failed to load messages from segment file: {self}. 
{error}")
-            })?;
-
-        batch
-            .validate_checksums_and_offsets(start_offset)
-            .with_error_context(|error| {
-                format!(
-                    "Failed to validate messages read from disk! error: 
{error}, file: {}",
-                    self.messages_path
-                )
-            })?;
-
-        tracing::trace!(
-            "Loaded {} messages ({} bytes) from disk (requested {count} 
messages), start_offset: {start_offset}, end_offset: {}",
-            batch.count(),
-            batch.size(),
-            self.end_offset
-        );
-
-        Ok(IggyMessagesBatchSet::from(batch))
-    }
-
-    async fn load_messages_from_disk_by_timestamp(
-        &self,
-        timestamp: u64,
-        count: u32,
-    ) -> Result<IggyMessagesBatchMut, IggyError> {
-        tracing::trace!(
-            "Loading {count} messages from disk, timestamp: {timestamp}, 
current_timestamp: {}...",
-            self.end_timestamp
-        );
-
-        let indexes_to_read = self.load_indexes_by_timestamp(timestamp, 
count).await?;
-
-        if indexes_to_read.is_none() {
-            return Ok(IggyMessagesBatchMut::empty());
-        }
-
-        let indexes_to_read = indexes_to_read.unwrap();
-
-        self.messages_reader
-            .as_ref()
-            .expect("Messages reader not initialized")
-            .load_messages_from_disk(indexes_to_read)
-            .await
-            .with_error_context(|error| {
-                format!("Failed to load messages from segment file by 
timestamp: {self}. {error}")
-            })
-    }
-}
-
-*/
diff --git a/core/server/src/streaming/segments/segment.rs 
b/core/server/src/streaming/segments/segment.rs
deleted file mode 100644
index 1c0311b3..00000000
--- a/core/server/src/streaming/segments/segment.rs
+++ /dev/null
@@ -1,529 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use super::indexes::*;
-use super::messages::*;
-use crate::configs::system::SystemConfig;
-use crate::streaming::segments::*;
-use compio::fs::remove_file;
-use error_set::ErrContext;
-use iggy_common::INDEX_SIZE;
-use iggy_common::IggyByteSize;
-use iggy_common::IggyError;
-use iggy_common::IggyExpiry;
-use iggy_common::IggyTimestamp;
-use std::rc::Rc;
-use std::sync::Arc;
-use std::sync::atomic::{AtomicU64, Ordering};
-use tracing::error;
-use tracing::{info, warn};
-
-const SIZE_16MB: usize = 16 * 1024 * 1024;
-
-#[derive(Debug)]
-pub struct Segment {
-    pub(super) stream_id: u32,
-    pub(super) topic_id: u32,
-    pub(super) partition_id: u32,
-    pub(super) start_offset: u64,
-    pub(super) start_timestamp: u64, // first message timestamp
-    pub(super) end_timestamp: u64,   // last message timestamp
-    pub(super) end_offset: u64,
-    pub(super) index_path: String,
-    pub(super) messages_path: String,
-    pub(super) last_index_position: u32,
-    pub(super) max_size_bytes: IggyByteSize,
-    pub(super) size_of_parent_stream: Arc<AtomicU64>,
-    pub(super) size_of_parent_topic: Arc<AtomicU64>,
-    pub(super) size_of_parent_partition: Arc<AtomicU64>,
-    pub(super) messages_count_of_parent_stream: Arc<AtomicU64>,
-    pub(super) messages_count_of_parent_topic: Arc<AtomicU64>,
-    pub(super) messages_count_of_parent_partition: Arc<AtomicU64>,
-    pub(super) is_closed: bool,
-    pub(super) messages_writer: Option<MessagesWriter>,
-    pub(super) messages_reader: Option<MessagesReader>,
-    pub(super) index_writer: Option<IndexWriter>,
-    pub(super) index_reader: Option<IndexReader>,
-    pub(super) message_expiry: IggyExpiry,
-    pub(super) config: Arc<SystemConfig>,
-    pub(super) indexes: Option<IggyIndexesMut>,
-    pub(super) messages_size: Arc<AtomicU64>,
-    pub(super) indexes_size: Arc<AtomicU64>,
-}
-/*
-
-impl Segment {
-    #[allow(clippy::too_many_arguments)]
-    pub fn create(
-        stream_id: u32,
-        topic_id: u32,
-        partition_id: u32,
-        start_offset: u64,
-        config: Arc<SystemConfig>,
-        message_expiry: IggyExpiry,
-        size_of_parent_stream: Arc<AtomicU64>,
-        size_of_parent_topic: Arc<AtomicU64>,
-        size_of_parent_partition: Arc<AtomicU64>,
-        messages_count_of_parent_stream: Arc<AtomicU64>,
-        messages_count_of_parent_topic: Arc<AtomicU64>,
-        messages_count_of_parent_partition: Arc<AtomicU64>,
-        fresh: bool, // `fresh` means created and persisted in this runtime, 
in other words it's set to false when loading from disk
-    ) -> Segment {
-        let path = config.get_segment_path(
-            stream_id as usize,
-            topic_id as usize,
-            partition_id as usize,
-            start_offset,
-        );
-        let messages_path = Self::get_messages_file_path(&path);
-        let index_path = Self::get_index_path(&path);
-        let message_expiry = match message_expiry {
-            IggyExpiry::ServerDefault => config.segment.message_expiry,
-            _ => message_expiry,
-        };
-        Segment {
-            stream_id,
-            topic_id,
-            partition_id,
-            start_offset,
-            start_timestamp: IggyTimestamp::now().as_micros(),
-            end_timestamp: IggyTimestamp::now().as_micros(),
-            end_offset: start_offset,
-            messages_path,
-            index_path,
-            last_index_position: 0,
-            max_size_bytes: config.segment.size,
-            message_expiry,
-            indexes: None,
-            is_closed: false,
-            messages_writer: None,
-            messages_reader: None,
-            index_writer: None,
-            index_reader: None,
-            size_of_parent_stream,
-            size_of_parent_partition,
-            size_of_parent_topic,
-            messages_count_of_parent_stream,
-            messages_count_of_parent_topic,
-            messages_count_of_parent_partition,
-            config,
-            messages_size: Arc::new(AtomicU64::new(0)),
-            indexes_size: Arc::new(AtomicU64::new(0)),
-        }
-    }
-
-    /// Load the segment state from disk.
-    pub async fn load_from_disk(&mut self) -> Result<(), IggyError> {
-        if self.messages_reader.is_none() || self.index_reader.is_none() {
-            self.initialize_writing(true).await?;
-            self.initialize_reading().await?;
-        }
-
-        let log_size_bytes = self.messages_size.load(Ordering::Acquire);
-        info!(
-            "Loading segment from disk: messages_file_path: {}, index_path: 
{}, log_size: {}",
-            self.messages_path,
-            self.index_path,
-            IggyByteSize::from(log_size_bytes)
-        );
-
-        self.last_index_position = log_size_bytes as _;
-
-        let loaded_indexes = self
-            .index_reader
-            .as_ref()
-            .unwrap()
-            .load_all_indexes_from_disk()
-            .await
-            .with_error_context(|error| format!("Failed to load indexes for 
{self}. {error}"))
-            .map_err(|_| IggyError::CannotReadFile)?;
-
-        info!(
-            "Loaded {} indexes for segment with start offset: {}, end offset: 
{}, and partition with ID: {}, topic with ID: {}, and stream with ID: {}.",
-            self.indexes.as_ref().map_or(0, |idx| idx.count()),
-            self.start_offset,
-            self.end_offset,
-            self.partition_id,
-            self.topic_id,
-            self.stream_id
-        );
-
-        let last_index_offset = if loaded_indexes.is_empty() {
-            0_u64
-        } else {
-            loaded_indexes.last().unwrap().offset() as u64
-        };
-
-        self.end_offset = self.start_offset + last_index_offset;
-
-        info!(
-            "Loaded {} indexes for segment with start offset: {}, end offset: 
{}, and partition with ID: {}, topic with ID: {}, and stream with ID: {}.",
-            self.indexes.as_ref().map_or(0, |idx| idx.count()),
-            self.start_offset,
-            self.end_offset,
-            self.partition_id,
-            self.topic_id,
-            self.stream_id
-        );
-
-        if self.is_full().await {
-            self.is_closed = true;
-        }
-
-        let messages_count = self.get_messages_count() as u64;
-
-        info!(
-            "Loaded segment with log file of size {} ({} messages) for start 
offset {}, end offset: {}, and partition with ID: {} for topic with ID: {} and 
stream with ID: {}.",
-            IggyByteSize::from(log_size_bytes),
-            messages_count,
-            self.start_offset,
-            self.end_offset,
-            self.partition_id,
-            self.topic_id,
-            self.stream_id
-        );
-
-        self.size_of_parent_stream
-            .fetch_add(log_size_bytes, Ordering::SeqCst);
-        self.size_of_parent_topic
-            .fetch_add(log_size_bytes, Ordering::SeqCst);
-        self.size_of_parent_partition
-            .fetch_add(log_size_bytes, Ordering::SeqCst);
-        self.messages_count_of_parent_stream
-            .fetch_add(messages_count, Ordering::SeqCst);
-        self.messages_count_of_parent_topic
-            .fetch_add(messages_count, Ordering::SeqCst);
-        self.messages_count_of_parent_partition
-            .fetch_add(messages_count, Ordering::SeqCst);
-
-        Ok(())
-    }
-
-    /// Open segment.
-    pub async fn open(&mut self) -> Result<(), IggyError> {
-        info!(
-            "Saving segment with start offset: {} for partition with ID: {} 
for topic with ID: {} and stream with ID: {}",
-            self.start_offset, self.partition_id, self.topic_id, self.stream_id
-        );
-        self.initialize_writing(false).await?;
-        self.initialize_reading().await?;
-        info!(
-            "Saved segment log file with start offset: {} for partition with 
ID: {} for topic with ID: {} and stream with ID: {}",
-            self.start_offset, self.partition_id, self.topic_id, self.stream_id
-        );
-        Ok(())
-    }
-
-    pub async fn initialize_writing(&mut self, file_exists: bool) -> 
Result<(), IggyError> {
-        let log_fsync = self.config.partition.enforce_fsync;
-        let index_fsync = self.config.partition.enforce_fsync;
-
-        let messages_writer = MessagesWriter::new(
-            &self.messages_path,
-            self.messages_size.clone(),
-            log_fsync,
-            file_exists,
-        )
-        .await?;
-
-        let index_writer = IndexWriter::new(
-            &self.index_path,
-            self.indexes_size.clone(),
-            index_fsync,
-            file_exists,
-        )
-        .await?;
-
-        self.messages_writer = Some(messages_writer);
-        self.index_writer = Some(index_writer);
-        Ok(())
-    }
-
-    pub async fn initialize_reading(&mut self) -> Result<(), IggyError> {
-        let messages_reader =
-            MessagesReader::new(&self.messages_path, 
self.messages_size.clone()).await?;
-        self.messages_reader = Some(messages_reader);
-
-        let index_reader = IndexReader::new(&self.index_path, 
self.indexes_size.clone()).await?;
-        self.index_reader = Some(index_reader);
-
-        Ok(())
-    }
-
-    pub async fn is_full(&self) -> bool {
-        if self.get_messages_size() >= self.max_size_bytes {
-            return true;
-        }
-
-        self.is_expired(IggyTimestamp::now()).await
-    }
-
-    pub async fn is_expired(&self, now: IggyTimestamp) -> bool {
-        if !self.is_closed {
-            return false;
-        }
-
-        match self.message_expiry {
-            IggyExpiry::NeverExpire => false,
-            IggyExpiry::ServerDefault => false,
-            IggyExpiry::ExpireDuration(expiry) => {
-                let last_messages = 
self.get_messages_by_offset(self.end_offset, 1).await;
-                if last_messages.is_err() {
-                    return false;
-                }
-
-                let last_messages = last_messages.unwrap();
-                if last_messages.is_empty() {
-                    return false;
-                }
-
-                let last_message = 
last_messages.iter().last().unwrap().iter().last().unwrap();
-                let last_message_timestamp = last_message.header().timestamp();
-                last_message_timestamp + expiry.as_micros() <= now.as_micros()
-            }
-        }
-    }
-
-    pub async fn shutdown_reading(&mut self) {
-        if let Some(log_reader) = self.messages_reader.take() {
-            drop(log_reader);
-        }
-        if let Some(index_reader) = self.index_reader.take() {
-            drop(index_reader);
-        }
-    }
-
-    pub async fn delete(&mut self) -> Result<(), IggyError> {
-        let segment_size = self.get_messages_size();
-        let segment_count_of_messages = self.get_messages_count() as u64;
-        info!(
-            "Deleting segment of size {segment_size} 
({segment_count_of_messages} messages) with start offset: {} for partition with 
ID: {} for stream with ID: {} and topic with ID: {}...",
-            self.start_offset, self.partition_id, self.stream_id, 
self.topic_id,
-        );
-
-        self.shutdown_reading().await;
-
-        if !self.is_closed {
-            self.shutdown_writing().await;
-        }
-
-        let _ = remove_file(&self.messages_path)
-            .await
-            .with_error_context(|error| {
-                format!("Failed to delete log file: {}. {error}", 
self.messages_path)
-            });
-        let _ = remove_file(&self.index_path)
-            .await
-            .with_error_context(|error| {
-                format!("Failed to delete index file: {}. {error}", 
self.index_path)
-            });
-
-        let segment_size_bytes = segment_size.as_bytes_u64();
-        self.size_of_parent_stream
-            .fetch_sub(segment_size_bytes, Ordering::SeqCst);
-        self.size_of_parent_topic
-            .fetch_sub(segment_size_bytes, Ordering::SeqCst);
-        self.size_of_parent_partition
-            .fetch_sub(segment_size_bytes, Ordering::SeqCst);
-        self.messages_count_of_parent_stream
-            .fetch_sub(segment_count_of_messages, Ordering::SeqCst);
-        self.messages_count_of_parent_topic
-            .fetch_sub(segment_count_of_messages, Ordering::SeqCst);
-        self.messages_count_of_parent_partition
-            .fetch_sub(segment_count_of_messages, Ordering::SeqCst);
-
-        info!(
-            "Deleted segment of size {segment_size} with start offset: {} for 
partition with ID: {} for stream with ID: {} and topic with ID: {}.",
-            self.start_offset, self.partition_id, self.stream_id, 
self.topic_id,
-        );
-
-        Ok(())
-    }
-
-    fn get_messages_file_path(path: &str) -> String {
-        format!("{path}.{LOG_EXTENSION}")
-    }
-
-    fn get_index_path(path: &str) -> String {
-        format!("{path}.{INDEX_EXTENSION}")
-    }
-
-    pub fn update_message_expiry(&mut self, message_expiry: IggyExpiry) {
-        self.message_expiry = message_expiry;
-    }
-
-    /// Ensure indexes are initialized with proper capacity
-    pub fn ensure_indexes(&mut self) {
-        if self.indexes.is_none() {
-            let capacity = SIZE_16MB / INDEX_SIZE;
-            self.indexes = Some(IggyIndexesMut::with_capacity(capacity, 0));
-        }
-    }
-
-    pub fn is_closed(&self) -> bool {
-        self.is_closed
-    }
-
-    pub fn start_offset(&self) -> u64 {
-        self.start_offset
-    }
-
-    pub fn end_offset(&self) -> u64 {
-        self.end_offset
-    }
-
-    pub fn end_timestamp(&self) -> u64 {
-        self.end_timestamp
-    }
-
-    pub fn index_file_path(&self) -> &str {
-        &self.index_path
-    }
-
-    pub fn partition_id(&self) -> u32 {
-        self.partition_id
-    }
-
-    pub fn messages_file_path(&self) -> &str {
-        &self.messages_path
-    }
-
-    /// Explicitly drop the old indexes to ensure memory is freed
-    pub fn drop_indexes(&mut self) {
-        self.indexes = None;
-    }
-}
-
-impl std::fmt::Display for Segment {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        write!(
-            f,
-            "Segment {{ stream_id: {}, topic_id: {}, partition_id: {}, 
start_offset: {}, end_offset: {}, size_bytes: {}, last_index_position: {}, 
max_size_bytes: {}, closed: {} }}",
-            self.stream_id,
-            self.topic_id,
-            self.partition_id,
-            self.start_offset,
-            self.end_offset,
-            self.get_messages_size(),
-            self.last_index_position,
-            self.max_size_bytes,
-            self.is_closed
-        )
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::configs::cache_indexes::CacheIndexesConfig;
-    use crate::configs::system::SegmentConfig;
-    use crate::streaming::utils::MemoryPool;
-    use iggy_common::IggyDuration;
-
-    #[tokio::test]
-    async fn should_be_created_given_valid_parameters() {
-        let stream_id = 1usize;
-        let topic_id = 2usize;
-        let partition_id = 3usize;
-        let start_offset = 0;
-        let config = Arc::new(SystemConfig::default());
-        let path = config.get_segment_path(stream_id, topic_id, partition_id, 
start_offset);
-        let messages_file_path = Segment::get_messages_file_path(&path);
-        let index_path = Segment::get_index_path(&path);
-        let message_expiry = 
IggyExpiry::ExpireDuration(IggyDuration::from(10));
-        let size_of_parent_stream = Arc::new(AtomicU64::new(0));
-        let size_of_parent_topic = Arc::new(AtomicU64::new(0));
-        let size_of_parent_partition = Arc::new(AtomicU64::new(0));
-        let messages_count_of_parent_stream = Arc::new(AtomicU64::new(0));
-        let messages_count_of_parent_topic = Arc::new(AtomicU64::new(0));
-        let messages_count_of_parent_partition = Arc::new(AtomicU64::new(0));
-        MemoryPool::init_pool(config.clone());
-
-        let segment = Segment::create(
-            stream_id as u32,
-            topic_id as u32,
-            partition_id as u32,
-            start_offset,
-            config,
-            message_expiry,
-            size_of_parent_stream,
-            size_of_parent_topic,
-            size_of_parent_partition,
-            messages_count_of_parent_stream,
-            messages_count_of_parent_topic,
-            messages_count_of_parent_partition,
-            true,
-        );
-
-        assert_eq!(segment.stream_id, stream_id as u32);
-        assert_eq!(segment.topic_id, topic_id as u32);
-        assert_eq!(segment.partition_id, partition_id as u32);
-        assert_eq!(segment.start_offset(), start_offset);
-        assert_eq!(segment.end_offset(), 0);
-        assert_eq!(segment.get_messages_size(), 0);
-        assert_eq!(segment.messages_file_path(), messages_file_path);
-        assert_eq!(segment.index_file_path(), index_path);
-        assert_eq!(segment.message_expiry, message_expiry);
-        assert!(segment.indexes.is_none());
-        assert!(!segment.is_closed());
-        assert!(!segment.is_full().await);
-    }
-
-    #[tokio::test]
-    async fn should_not_initialize_indexes_cache_when_disabled() {
-        let stream_id = 1;
-        let topic_id = 2;
-        let partition_id = 3;
-        let start_offset = 0;
-        let config = Arc::new(SystemConfig {
-            segment: SegmentConfig {
-                cache_indexes: CacheIndexesConfig::None,
-                ..Default::default()
-            },
-            ..Default::default()
-        });
-        let message_expiry = IggyExpiry::NeverExpire;
-        let size_of_parent_stream = Arc::new(AtomicU64::new(0));
-        let size_of_parent_topic = Arc::new(AtomicU64::new(0));
-        let size_of_parent_partition = Arc::new(AtomicU64::new(0));
-        let messages_count_of_parent_stream = Arc::new(AtomicU64::new(0));
-        let messages_count_of_parent_topic = Arc::new(AtomicU64::new(0));
-        let messages_count_of_parent_partition = Arc::new(AtomicU64::new(0));
-        MemoryPool::init_pool(config.clone());
-
-        let segment = Segment::create(
-            stream_id,
-            topic_id,
-            partition_id,
-            start_offset,
-            config,
-            message_expiry,
-            size_of_parent_stream,
-            size_of_parent_topic,
-            size_of_parent_partition,
-            messages_count_of_parent_stream,
-            messages_count_of_parent_topic,
-            messages_count_of_parent_partition,
-            true,
-        );
-
-        assert!(segment.indexes.is_none());
-    }
-}
-
-*/
diff --git a/core/server/src/streaming/segments/storage.rs 
b/core/server/src/streaming/segments/storage.rs
index 737b8947..249b67eb 100644
--- a/core/server/src/streaming/segments/storage.rs
+++ b/core/server/src/streaming/segments/storage.rs
@@ -36,9 +36,13 @@ impl Storage {
         let index_writer =
             IndexWriter::new(index_path, indexes_size.clone(), index_fsync, 
file_exists).await?;
 
+        if file_exists {
+            messages_writer.fsync().await?;
+            index_writer.fsync().await?;
+        }
+
         let messages_reader = MessagesReader::new(messages_path, size).await?;
         let index_reader = IndexReader::new(index_path, indexes_size).await?;
-
         Ok(Self {
             messages_writer: Some(messages_writer),
             messages_reader: Some(messages_reader),
@@ -64,7 +68,8 @@ pub async fn create_segment_storage(
     indexes_size: u64,
     start_offset: u64,
 ) -> Result<Storage, IggyError> {
-    let messages_path = config.get_segment_path(stream_id, topic_id, 
partition_id, start_offset);
+    let messages_path =
+        config.get_messages_file_path(stream_id, topic_id, partition_id, 
start_offset);
     let index_path = config.get_index_path(stream_id, topic_id, partition_id, 
start_offset);
     let log_fsync = config.partition.enforce_fsync;
     let index_fsync = config.partition.enforce_fsync;

Reply via email to