This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch fix_loading_segments
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 76d9cee2cb96d43d59a67d3c432e773551e02f95
Author: numminex <[email protected]>
AuthorDate: Thu Sep 18 11:40:35 2025 +0200

    feat(io_uring): fix loading segments on startup
---
 .../tests/streaming/get_by_timestamp.rs            |  42 ++-
 core/server/src/bootstrap.rs                       | 362 ++++++++++++++++++---
 .../src/compat/index_rebuilding/index_rebuilder.rs |   4 +-
 core/server/src/main.rs                            |   4 +-
 core/server/src/shard/builder.rs                   |   3 +-
 core/server/src/slab/streams.rs                    |  29 +-
 core/server/src/state/system.rs                    |  29 +-
 core/server/src/streaming/partitions/helpers.rs    |   6 -
 core/server/src/streaming/partitions/log.rs        |   6 +
 core/server/src/streaming/segments/storage.rs      |   6 +-
 10 files changed, 402 insertions(+), 89 deletions(-)

diff --git a/core/integration/tests/streaming/get_by_timestamp.rs 
b/core/integration/tests/streaming/get_by_timestamp.rs
index a40d2904..de30345a 100644
--- a/core/integration/tests/streaming/get_by_timestamp.rs
+++ b/core/integration/tests/streaming/get_by_timestamp.rs
@@ -16,17 +16,17 @@
  * under the License.
  */
 
-use crate::streaming::common::test_setup::TestSetup;
 use super::bootstrap_test_environment;
+use crate::streaming::common::test_setup::TestSetup;
 use bytes::BytesMut;
 use iggy::prelude::*;
 use server::configs::cache_indexes::CacheIndexesConfig;
 use server::configs::system::{PartitionConfig, SegmentConfig, SystemConfig};
+use server::shard::namespace::IggyFullNamespace;
 use server::shard::system::messages::PollingArgs;
 use server::streaming::polling_consumer::PollingConsumer;
 use server::streaming::segments::IggyMessagesBatchMut;
 use server::streaming::traits::MainOps;
-use server::shard::namespace::IggyFullNamespace;
 use std::collections::HashMap;
 use std::str::FromStr;
 use std::sync::Arc;
@@ -117,14 +117,20 @@ async fn test_get_messages_by_timestamp(
     };
 
     // Use the bootstrap method to create streams with proper slab structure
-    let bootstrap_result = bootstrap_test_environment(shard_id as u16, 
&config).await.unwrap();
+    let bootstrap_result = bootstrap_test_environment(shard_id as u16, &config)
+        .await
+        .unwrap();
     let streams = bootstrap_result.streams;
     let stream_identifier = bootstrap_result.stream_id;
     let topic_identifier = bootstrap_result.topic_id;
     let partition_id = bootstrap_result.partition_id;
-    
+
     // Create namespace for MainOps calls
-    let namespace = IggyFullNamespace::new(stream_identifier.clone(), 
topic_identifier.clone(), partition_id);
+    let namespace = IggyFullNamespace::new(
+        stream_identifier.clone(),
+        topic_identifier.clone(),
+        partition_id,
+    );
 
     let mut all_messages = Vec::with_capacity(total_messages_count as usize);
 
@@ -193,7 +199,10 @@ async fn test_get_messages_by_timestamp(
 
         let batch = 
IggyMessagesBatchMut::from_messages(messages_slice_to_append, messages_size);
         assert_eq!(batch.count(), batch_len);
-        streams.append_messages(shard_id as u16, &config, &namespace, 
batch).await.unwrap();
+        streams
+            .append_messages(shard_id as u16, &config, &namespace, batch)
+            .await
+            .unwrap();
 
         // Capture the timestamp of this batch
         batch_timestamps.push(IggyTimestamp::now());
@@ -209,10 +218,15 @@ async fn test_get_messages_by_timestamp(
     let total_sent_messages = total_messages_count;
 
     // Create a single consumer to reuse throughout the test
-    let consumer = PollingConsumer::consumer(&Identifier::numeric(1).unwrap(), 
partition_id as usize);
+    let consumer =
+        PollingConsumer::consumer(&Identifier::numeric(1).unwrap(), 
partition_id as usize);
 
     // Test 1: All messages from initial timestamp
-    let args = PollingArgs::new(PollingStrategy::timestamp(initial_timestamp), 
total_sent_messages, false);
+    let args = PollingArgs::new(
+        PollingStrategy::timestamp(initial_timestamp),
+        total_sent_messages,
+        false,
+    );
     let (_, all_loaded_messages) = streams
         .poll_messages(&namespace, consumer, args)
         .await
@@ -235,7 +249,11 @@ async fn test_get_messages_by_timestamp(
         let prior_batches_sum: u32 = batch_lengths[..3].iter().sum();
         let remaining_messages = total_sent_messages - prior_batches_sum;
 
-        let args = 
PollingArgs::new(PollingStrategy::timestamp(middle_timestamp), 
remaining_messages, false);
+        let args = PollingArgs::new(
+            PollingStrategy::timestamp(middle_timestamp),
+            remaining_messages,
+            false,
+        );
         let (_, middle_messages) = streams
             .poll_messages(&namespace, consumer, args)
             .await
@@ -265,7 +283,11 @@ async fn test_get_messages_by_timestamp(
 
     // Test 4: Small subset from initial timestamp
     let subset_size = std::cmp::min(3, total_sent_messages);
-    let args = PollingArgs::new(PollingStrategy::timestamp(initial_timestamp), 
subset_size, false);
+    let args = PollingArgs::new(
+        PollingStrategy::timestamp(initial_timestamp),
+        subset_size,
+        false,
+    );
     let (_, subset_messages) = streams
         .poll_messages(&namespace, consumer, args)
         .await
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index 8e3797b2..a82b357e 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -1,5 +1,7 @@
 use crate::{
     IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV,
+    compat::index_rebuilding::index_rebuilder::IndexRebuilder,
+    configs::cache_indexes::CacheIndexesConfig,
     configs::{config_provider::ConfigProviderKind, server::ServerConfig, 
system::SystemConfig},
     io::fs_utils,
     server_error::ServerError,
@@ -17,12 +19,11 @@ use crate::{
         },
     },
     state::system::{StreamState, TopicState, UserState},
+    streaming::segments::{INDEX_EXTENSION, LOG_EXTENSION, Segment2, 
storage::Storage},
     streaming::{
-        deduplication::message_deduplicator,
         partitions::{
-            consumer_offset::{self, ConsumerOffset},
-            helpers::create_message_deduplicator,
-            partition2::{self, ConsumerGroupOffsets, ConsumerOffsets},
+            consumer_offset::ConsumerOffset, 
helpers::create_message_deduplicator,
+            journal::MemoryMessageJournal, log::SegmentedLog, partition2,
             storage2::load_consumer_offsets,
         },
         persistence::persister::{FilePersister, FileWithSyncPersister, 
PersisterKind},
@@ -38,6 +39,7 @@ use crate::{
 };
 use ahash::HashMap;
 use compio::{fs::create_dir_all, runtime::Runtime};
+use error_set::ErrContext;
 use iggy_common::{
     ConsumerKind, IggyError, UserId,
     defaults::{
@@ -46,9 +48,9 @@ use iggy_common::{
     },
 };
 use std::{collections::HashSet, env, path::Path, sync::Arc};
-use tracing::info;
+use tracing::{info, warn};
 
-pub fn load_streams(
+pub async fn load_streams(
     state: impl IntoIterator<Item = StreamState>,
     config: &SystemConfig,
 ) -> Result<Streams, IggyError> {
@@ -121,56 +123,40 @@ pub fn load_streams(
             let parent_stats = stats.clone();
             let cgs = consumer_groups.into_values();
             let partitions = partitions.into_values();
+
+            // Load each partition asynchronously and insert immediately
             for partition_state in partitions {
                 info!(
                     "Loading partition with ID: {}, for topic with ID: {} from 
state...",
                     partition_state.id, topic_id
                 );
+
+                let partition_id = partition_state.id;
+                let partition = load_partition_with_segments(
+                    config,
+                    stream_id as usize,
+                    topic_id,
+                    partition_state,
+                    parent_stats.clone(),
+                )
+                .await?;
+
+                // Insert partition into the container
                 streams.with_components_by_id(stream_id as usize, |(root, ..)| 
{
-                    let parent_stats = parent_stats.clone();
                     root.topics()
-                        .with_components_by_id_mut(topic_id, |(root, ..)| {
-                            let stats = 
Arc::new(PartitionStats::new(parent_stats));
-                            // TODO: This has to be sampled from segments, if 
there exists more than 0 segements and the segment has more than 0 messages, 
then we set it to true.
-                            let should_increment_offset = todo!();
-                            // TODO: This has to be sampled from segments and 
it's indexes, offset = segment.start_offset + last_index.offset;
-                            let offset = todo!();
-                            let message_deduplicator = 
create_message_deduplicator(config);
-                            let id = partition_state.id;
-
-                            let consumer_offset_path = 
config.get_consumer_offsets_path(stream_id as usize, topic_id as usize, id as 
usize);
-                            let consumer_group_offsets_path = 
config.get_consumer_group_offsets_path(stream_id as usize, topic_id as usize, 
id as usize);
-                            let consumer_offset = 
Arc::new(load_consumer_offsets(&consumer_offset_path, ConsumerKind::Consumer)?
-                            .into_iter().map(|offset| {
-                                (offset.consumer_id as usize, offset)
-                            }).into());
-                            let consumer_group_offset = 
Arc::new(load_consumer_offsets(&consumer_group_offsets_path, 
ConsumerKind::ConsumerGroup)?.into_iter().map(|offset| {
-                                (offset.consumer_id as usize, offset)
-                            }).into());
-
-                            let log = Default::default();
-                            let partition = partition2::Partition::new(
-                                partition_state.created_at,
-                                should_increment_offset,
-                                stats,
-                                message_deduplicator,
-                                offset,
-                                consumer_offset,
-                                consumer_group_offset,
-                                log
-                            );
+                        .with_components_by_id_mut(topic_id, |(mut root, ..)| {
                             let new_id = 
root.partitions_mut().insert(partition);
                             assert_eq!(
-                                new_id, id as usize,
+                                new_id, partition_id as usize,
                                 "load_streams: partition id mismatch when 
inserting partition, mismatch for partition with ID: {}, for topic with ID: {}, 
for stream with ID: {}",
-                                id, topic_id, stream_id
+                                partition_id, topic_id, stream_id
                             );
-                            Ok(())
-                        })
-                })?;
+                        });
+                });
+
                 info!(
                     "Loaded partition with ID: {}, for topic with ID: {} from 
state...",
-                    partition_state.id, topic_id
+                    partition_id, topic_id
                 );
             }
             let partition_ids = streams.with_components_by_id(stream_id as 
usize, |(root, ..)| {
@@ -384,3 +370,293 @@ pub async fn update_system_info(
     storage.info.save(system_info).await?;
     Ok(())
 }
+
+async fn load_partition_with_segments(
+    config: &SystemConfig,
+    stream_id: usize,
+    topic_id: usize,
+    partition_state: crate::state::system::PartitionState,
+    parent_stats: Arc<TopicStats>,
+) -> Result<partition2::Partition, IggyError> {
+    use std::sync::atomic::AtomicU64;
+
+    let stats = Arc::new(PartitionStats::new(parent_stats));
+    let partition_id = partition_state.id as u32;
+
+    // Load segments from disk to determine should_increment_offset and 
current offset
+    let partition_path = config.get_partition_path(stream_id, topic_id, 
partition_id as usize);
+
+    info!(
+        "Loading partition with ID: {} for stream with ID: {} and topic with 
ID: {}, for path: {} from disk...",
+        partition_id, stream_id, topic_id, partition_path
+    );
+
+    // Read directory entries to find log files using async fs_utils
+    let dir_entries = fs_utils::walk_dir(&partition_path)
+        .await
+        .map_err(|_| IggyError::CannotReadPartitions)?;
+
+    let mut log_files = Vec::new();
+    for entry in dir_entries {
+        if entry.is_dir {
+            continue;
+        }
+
+        let extension = entry.path.extension();
+        if extension.is_none() || extension.unwrap() != LOG_EXTENSION {
+            continue;
+        }
+
+        log_files.push(entry);
+    }
+
+    log_files.sort_by(|a, b| a.path.file_name().cmp(&b.path.file_name()));
+
+    let mut should_increment_offset = false;
+    let mut current_offset = 0u64;
+    let mut log = SegmentedLog::<MemoryMessageJournal>::default();
+
+    for entry in log_files {
+        let log_file_name = entry
+            .path
+            .file_stem()
+            .unwrap()
+            .to_string_lossy()
+            .to_string();
+
+        let start_offset = log_file_name.parse::<u64>().unwrap();
+
+        // Build file paths directly
+        let messages_file_path = format!("{}/{}.{}", partition_path, 
start_offset, LOG_EXTENSION);
+        let index_file_path = format!("{}/{}.{}", partition_path, 
start_offset, INDEX_EXTENSION);
+        let time_index_path = index_file_path.replace(INDEX_EXTENSION, 
"timeindex");
+
+        // Check if index files exist
+        async fn try_exists(path: &str) -> Result<bool, std::io::Error> {
+            match compio::fs::metadata(path).await {
+                Ok(_) => Ok(true),
+                Err(err) => match err.kind() {
+                    std::io::ErrorKind::NotFound => Ok(false),
+                    _ => Err(err),
+                },
+            }
+        }
+
+        let index_path_exists = try_exists(&index_file_path).await.unwrap();
+        let time_index_path_exists = 
try_exists(&time_index_path).await.unwrap();
+        let index_cache_enabled = matches!(
+            config.segment.cache_indexes,
+            CacheIndexesConfig::All | CacheIndexesConfig::OpenSegment
+        );
+
+        // Rebuild indexes if index cache is enabled and index at path does 
not exist
+        if index_cache_enabled && (!index_path_exists || 
time_index_path_exists) {
+            warn!(
+                "Index at path {} does not exist, rebuilding it based on 
{}...",
+                index_file_path, messages_file_path
+            );
+            let now = std::time::Instant::now();
+            let index_rebuilder = IndexRebuilder::new(
+                messages_file_path.clone(),
+                index_file_path.clone(),
+                start_offset,
+            );
+            index_rebuilder.rebuild().await.unwrap_or_else(|e| {
+                panic!(
+                    "Failed to rebuild index for partition with ID: {} for 
stream with ID: {} and topic with ID: {}. Error: {e}",
+                    partition_id, stream_id, topic_id,
+                )
+            });
+            info!(
+                "Rebuilding index for path {} finished, it took {} ms",
+                index_file_path,
+                now.elapsed().as_millis()
+            );
+        }
+
+        if time_index_path_exists {
+            compio::fs::remove_file(&time_index_path).await.unwrap();
+        }
+
+        // Get file metadata to determine segment properties
+        let messages_metadata = compio::fs::metadata(&messages_file_path)
+            .await
+            .map_err(|_| IggyError::CannotReadPartitions)?;
+        let messages_size = messages_metadata.len() as u32;
+
+        let index_size = match compio::fs::metadata(&index_file_path).await {
+            Ok(metadata) => metadata.len() as u32,
+            Err(_) => 0, // Default to 0 if index file doesn't exist
+        };
+
+        // If the first segment has messages, we should increment the offset
+        if !should_increment_offset {
+            should_increment_offset = messages_size > 0;
+        }
+
+        // For segment validation, we'd need to implement checksum validation
+        // directly on the files if needed - skipping for now as it requires
+        // understanding the message format
+        if config.partition.validate_checksum {
+            info!("Checksum validation for segment at offset {}", 
start_offset);
+        }
+
+        // Create storage for the segment using existing files
+        let storage = Storage::new(
+            &messages_file_path,
+            &index_file_path,
+            messages_size as u64,
+            index_size as u64,
+            config.partition.enforce_fsync,
+            config.partition.enforce_fsync,
+            true, // file_exists = true for existing segments
+        )
+        .await?;
+
+        // Load indexes from disk to calculate the correct end offset and 
cache them if needed
+        // This matches the logic in Segment::load_from_disk method
+        let loaded_indexes = {
+            storage.
+            index_reader
+            .as_ref()
+            .unwrap()
+            .load_all_indexes_from_disk()
+            .await
+            .with_error_context(|error| format!("Failed to load indexes during 
startup for stream ID: {}, topic ID: {}, partition_id: {}, {error}", stream_id, 
topic_id, partition_id))
+            .map_err(|_| IggyError::CannotReadFile)?
+        };
+
+        // Calculate end offset based on loaded indexes
+        let end_offset = if loaded_indexes.count() == 0 {
+            0
+        } else {
+            let last_index_offset = loaded_indexes.last().unwrap().offset() as 
u64;
+            start_offset + last_index_offset
+        };
+
+        let (start_timestamp, end_timestamp) = if loaded_indexes.count() == 0 {
+            (0, 0)
+        } else {
+            (
+                loaded_indexes.get(0).unwrap().timestamp(),
+                loaded_indexes.last().unwrap().timestamp(),
+            )
+        };
+
+        current_offset = current_offset.max(end_offset);
+
+        // Create the new Segment with proper values from file system
+        let mut segment = Segment2::new(
+            start_offset,
+            config.segment.size,
+            config.segment.message_expiry,
+        );
+
+        // Set properties based on file data
+        segment.start_timestamp = start_timestamp;
+        segment.end_timestamp = end_timestamp;
+        segment.end_offset = end_offset;
+        segment.size = messages_size;
+        segment.sealed = true; // Persisted segments are assumed to be sealed
+
+        // Add segment to log first
+        log.add_persisted_segment(segment, storage);
+
+        // Increment stats for partition - this matches the behavior from 
partition storage load method
+        stats.increment_segments_count(1);
+        
+        // Increment size and message counts based on the loaded segment data
+        stats.increment_size_bytes(messages_size as u64);
+        
+        // Calculate message count from segment data (end_offset - 
start_offset + 1 if there are messages)
+        let messages_count = if end_offset > start_offset {
+            (end_offset - start_offset + 1) as u64
+        } else if messages_size > 0 {
+            // Fallback: estimate based on loaded indexes count if available
+            loaded_indexes.count() as u64
+        } else {
+            0
+        };
+        
+        if messages_count > 0 {
+            stats.increment_messages_count(messages_count);
+        }
+
+        // Now handle index caching based on configuration
+        let should_cache_indexes = match config.segment.cache_indexes {
+            CacheIndexesConfig::All => true,
+            CacheIndexesConfig::OpenSegment => false, // Will be handled after 
all segments are loaded
+            CacheIndexesConfig::None => false,
+        };
+
+        // Set the loaded indexes if we should cache them
+        if should_cache_indexes {
+            let segment_index = log.segments().len() - 1;
+            log.set_segment_indexes(segment_index, loaded_indexes);
+        }
+    }
+
+    // Handle OpenSegment cache configuration: only the last segment should 
keep its indexes
+    if matches!(
+        config.segment.cache_indexes,
+        CacheIndexesConfig::OpenSegment
+    ) && log.has_segments()
+    {
+        let segments_count = log.segments().len();
+        if segments_count > 0 {
+            // Use the IndexReader from the last segment's storage to load 
indexes
+            let last_storage = log.storages().last().unwrap();
+            match last_storage.index_reader.as_ref() {
+                Some(index_reader) => {
+                    if let Ok(loaded_indexes) = 
index_reader.load_all_indexes_from_disk().await {
+                        log.set_segment_indexes(segments_count - 1, 
loaded_indexes);
+                    }
+                }
+                None => {
+                    warn!("Index reader not available for last segment in 
OpenSegment mode");
+                }
+            }
+        }
+    }
+
+    // Load consumer offsets
+    let message_deduplicator = create_message_deduplicator(config);
+    let consumer_offset_path =
+        config.get_consumer_offsets_path(stream_id, topic_id, partition_id as 
usize);
+    let consumer_group_offsets_path =
+        config.get_consumer_group_offsets_path(stream_id, topic_id, 
partition_id as usize);
+
+    let consumer_offset = Arc::new(
+        load_consumer_offsets(&consumer_offset_path, ConsumerKind::Consumer)?
+            .into_iter()
+            .map(|offset| (offset.consumer_id as usize, offset))
+            .collect::<HashMap<usize, ConsumerOffset>>()
+            .into(),
+    );
+
+    let consumer_group_offset = Arc::new(
+        load_consumer_offsets(&consumer_group_offsets_path, 
ConsumerKind::ConsumerGroup)?
+            .into_iter()
+            .map(|offset| (offset.consumer_id as usize, offset))
+            .collect::<HashMap<usize, ConsumerOffset>>()
+            .into(),
+    );
+
+    let partition = partition2::Partition::new(
+        partition_state.created_at,
+        should_increment_offset,
+        stats,
+        message_deduplicator,
+        Arc::new(AtomicU64::new(current_offset)),
+        consumer_offset,
+        consumer_group_offset,
+        log
+    );
+
+    info!(
+        "Loaded partition with ID: {} for stream with ID: {} and topic with 
ID: {}, current offset: {}.",
+        partition_id, stream_id, topic_id, current_offset
+    );
+
+    Ok(partition)
+}
diff --git a/core/server/src/compat/index_rebuilding/index_rebuilder.rs 
b/core/server/src/compat/index_rebuilding/index_rebuilder.rs
index dbc93ebe..402739be 100644
--- a/core/server/src/compat/index_rebuilding/index_rebuilder.rs
+++ b/core/server/src/compat/index_rebuilding/index_rebuilder.rs
@@ -18,13 +18,11 @@
 
 use crate::server_error::CompatError;
 use crate::streaming::utils::file;
-use async_zip::tokio::write;
 use compio::{
     fs::File,
-    io::{AsyncBufRead, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, 
BufReader, BufWriter},
+    io::{AsyncBufRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader, 
BufWriter},
 };
 use iggy_common::{IGGY_MESSAGE_HEADER_SIZE, IggyMessageHeader};
-use std::io::{Seek, SeekFrom};
 
 pub struct IndexRebuilder {
     pub messages_file_path: String,
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 44917d0b..c31f4cc8 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -189,7 +189,8 @@ async fn main() -> Result<(), ServerError> {
     ));
     let state = SystemState::load(state).await?;
     let (streams_state, users_state) = state.decompose();
-    let streams = load_streams(streams_state.into_values(), &config.system)?;
+    let streams = load_streams(streams_state.into_values(), 
&config.system).await?;
+    tracing::warn!("Streams: {:?}", streams);
     let users = load_users(users_state.into_values());
 
     // ELEVENTH DISCRETE LOADING STEP.
@@ -329,7 +330,6 @@ async fn main() -> Result<(), ServerError> {
     let _command_handler = BackgroundServerCommandHandler::new(system.clone(), 
&config)
         .install_handler(SaveMessagesExecutor)
         .install_handler(MaintainMessagesExecutor)
-        .install_handler(ArchiveStateExecutor)
         .install_handler(CleanPersonalAccessTokensExecutor)
         .install_handler(SysInfoPrintExecutor)
         .install_handler(VerifyHeartbeatsExecutor);
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
index 615a5f71..fc203b77 100644
--- a/core/server/src/shard/builder.rs
+++ b/core/server/src/shard/builder.rs
@@ -149,8 +149,7 @@ impl IggyShardBuilder {
             id: id,
             shards: shards,
             shards_table,
-            //streams2: streams, // TODO: Fixme
-            streams2: Default::default(),
+            streams2: streams, // TODO: Fixme
             users: RefCell::new(users),
             storage: storage,
             encryptor: encryptor,
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index 0343231f..549b8d82 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -4,16 +4,24 @@ use crate::{
     shard::{namespace::IggyFullNamespace, system::messages::PollingArgs},
     shard_info,
     slab::{
-        consumer_groups::ConsumerGroups, helpers, partitions::{self, 
Partitions}, topics::Topics, traits_ext::{
+        Keyed,
+        consumer_groups::ConsumerGroups,
+        helpers,
+        partitions::{self, Partitions},
+        topics::Topics,
+        traits_ext::{
             ComponentsById, DeleteCell, EntityComponentSystem, 
EntityComponentSystemMutCell,
             InsertCell, InteriorMutability, IntoComponents,
-        }, Keyed
+        },
     },
     streaming::{
-        partitions::{journal::Journal, partition2::{PartitionRef, 
PartitionRefMut}},
+        partitions::{
+            journal::Journal,
+            partition2::{PartitionRef, PartitionRefMut},
+        },
         polling_consumer::PollingConsumer,
         segments::{
-            storage::create_segment_storage, IggyMessagesBatchMut, 
IggyMessagesBatchSet, Segment2
+            IggyMessagesBatchMut, IggyMessagesBatchSet, Segment2, 
storage::create_segment_storage,
         },
         stats::stats::StreamStats,
         streams::{
@@ -166,14 +174,19 @@ impl MainOps for Streams {
             streaming_partitions::helpers::calculate_current_offset(),
         );
 
-        let current_position = self.with_partition_by_id(stream_id, topic_id, 
partition_id, |(.., log)| {
-            log.journal().inner().size + log.active_segment().size
-        });
+        let current_position =
+            self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {
+                log.journal().inner().size + log.active_segment().size
+            });
         self.with_partition_by_id_async(
             stream_id,
             topic_id,
             partition_id,
-            
streaming_partitions::helpers::deduplicate_messages(current_offset, 
current_position, &mut input),
+            streaming_partitions::helpers::deduplicate_messages(
+                current_offset,
+                current_position,
+                &mut input,
+            ),
         )
         .await;
 
diff --git a/core/server/src/state/system.rs b/core/server/src/state/system.rs
index 2c3ca43b..b5535d0e 100644
--- a/core/server/src/state/system.rs
+++ b/core/server/src/state/system.rs
@@ -30,17 +30,18 @@ use iggy_common::MaxTopicSize;
 use iggy_common::create_user::CreateUser;
 use iggy_common::defaults::DEFAULT_ROOT_USER_ID;
 use iggy_common::{IdKind, Identifier, Permissions, UserStatus};
+use std::collections::BTreeMap;
 use std::fmt::Display;
 use tracing::{debug, error, info};
 
 #[derive(Debug, Clone)]
 pub struct SystemState {
-    pub streams: AHashMap<u32, StreamState>,
+    pub streams: BTreeMap<u32, StreamState>,
     pub users: AHashMap<u32, UserState>,
 }
 
 impl SystemState {
-    pub fn decompose(self) -> (AHashMap<u32, StreamState>, AHashMap<u32, 
UserState>) {
+    pub fn decompose(self) -> (BTreeMap<u32, StreamState>, AHashMap<u32, 
UserState>) {
         (self.streams, self.users)
     }
 }
@@ -50,15 +51,15 @@ pub struct StreamState {
     pub id: u32,
     pub name: String,
     pub created_at: IggyTimestamp,
-    pub topics: AHashMap<u32, TopicState>,
+    pub topics: BTreeMap<u32, TopicState>,
 }
 
 #[derive(Debug, Clone)]
 pub struct TopicState {
     pub id: u32,
     pub name: String,
-    pub partitions: AHashMap<u32, PartitionState>,
-    pub consumer_groups: AHashMap<u32, ConsumerGroupState>,
+    pub partitions: BTreeMap<u32, PartitionState>,
+    pub consumer_groups: BTreeMap<u32, ConsumerGroupState>,
     pub compression_algorithm: CompressionAlgorithm,
     pub message_expiry: IggyExpiry,
     pub max_topic_size: MaxTopicSize,
@@ -161,7 +162,7 @@ impl SystemState {
     }
 
     pub async fn init(entries: Vec<StateEntry>) -> Result<Self, IggyError> {
-        let mut streams = AHashMap::new();
+        let mut streams = BTreeMap::new();
         let mut users = AHashMap::new();
         for entry in entries {
             debug!("Processing state entry: {entry}",);
@@ -177,7 +178,7 @@ impl SystemState {
                     let stream = StreamState {
                         id: stream_id,
                         name: command.name.clone(),
-                        topics: AHashMap::new(),
+                        topics: BTreeMap::new(),
                         created_at: entry.timestamp,
                     };
                     streams.insert(stream.id, stream);
@@ -210,15 +211,15 @@ impl SystemState {
                     let topic = TopicState {
                         id: topic_id,
                         name: command.name,
-                        consumer_groups: AHashMap::new(),
+                        consumer_groups: BTreeMap::new(),
                         compression_algorithm: command.compression_algorithm,
                         message_expiry: command.message_expiry,
                         max_topic_size: command.max_topic_size,
                         replication_factor: command.replication_factor,
                         created_at: entry.timestamp,
                         partitions: if command.partitions_count > 0 {
-                            let mut partitions = AHashMap::new();
-                            for i in 1..=command.partitions_count {
+                            let mut partitions = BTreeMap::new();
+                            for i in 0..command.partitions_count {
                                 partitions.insert(
                                     i,
                                     PartitionState {
@@ -229,7 +230,7 @@ impl SystemState {
                             }
                             partitions
                         } else {
-                            AHashMap::new()
+                            BTreeMap::new()
                         },
                     };
                     stream.topics.insert(topic.id, topic);
@@ -488,7 +489,7 @@ impl SystemState {
     }
 }
 
-fn find_stream_id(streams: &AHashMap<u32, StreamState>, stream_id: 
&Identifier) -> u32 {
+fn find_stream_id(streams: &BTreeMap<u32, StreamState>, stream_id: 
&Identifier) -> u32 {
     match stream_id.kind {
         IdKind::Numeric => stream_id
             .get_u32_value()
@@ -506,7 +507,7 @@ fn find_stream_id(streams: &AHashMap<u32, StreamState>, 
stream_id: &Identifier)
     }
 }
 
-fn find_topic_id(topics: &AHashMap<u32, TopicState>, topic_id: &Identifier) -> 
u32 {
+fn find_topic_id(topics: &BTreeMap<u32, TopicState>, topic_id: &Identifier) -> 
u32 {
     match topic_id.kind {
         IdKind::Numeric => topic_id
             .get_u32_value()
@@ -525,7 +526,7 @@ fn find_topic_id(topics: &AHashMap<u32, TopicState>, 
topic_id: &Identifier) -> u
 }
 
 fn find_consumer_group_id(
-    groups: &AHashMap<u32, ConsumerGroupState>,
+    groups: &BTreeMap<u32, ConsumerGroupState>,
     group_id: &Identifier,
 ) -> u32 {
     match group_id.kind {
diff --git a/core/server/src/streaming/partitions/helpers.rs 
b/core/server/src/streaming/partitions/helpers.rs
index 8ce35655..c16020d8 100644
--- a/core/server/src/streaming/partitions/helpers.rs
+++ b/core/server/src/streaming/partitions/helpers.rs
@@ -472,9 +472,6 @@ pub fn get_segment_range_by_timestamp(
 ) -> impl FnOnce(ComponentsById<PartitionRef>) -> 
Result<std::ops::Range<usize>, IggyError> {
     move |(.., log)| -> Result<std::ops::Range<usize>, IggyError> {
         let segments = log.segments();
-        for seg in segments {
-            tracing::warn!("timestamp: {}, segment: {:?}", timestamp, seg);
-        }
         let start = log
             .segments()
             .iter()
@@ -705,9 +702,6 @@ pub fn commit_journal() -> impl 
FnOnce(ComponentsById<PartitionRefMut>) -> IggyM
         let batches = log.journal_mut().commit();
         log.ensure_indexes();
         batches.append_indexes_to(log.active_indexes_mut().unwrap());
-        let indexes = log.active_indexes_mut().unwrap();
-        let first = indexes.get(0).unwrap();
-        let last = indexes.last().unwrap();
         batches
     }
 }
diff --git a/core/server/src/streaming/partitions/log.rs 
b/core/server/src/streaming/partitions/log.rs
index 83712975..d2bc75f7 100644
--- a/core/server/src/streaming/partitions/log.rs
+++ b/core/server/src/streaming/partitions/log.rs
@@ -135,6 +135,12 @@ where
         self.storage.push(storage);
         self.indexes.push(None);
     }
+
+    pub fn set_segment_indexes(&mut self, segment_index: usize, indexes: 
IggyIndexesMut) {
+        if let Some(segment_indexes) = self.indexes.get_mut(segment_index) {
+            *segment_indexes = Some(indexes);
+        }
+    }
 }
 
 impl<J> SegmentedLog<J>
diff --git a/core/server/src/streaming/segments/storage.rs 
b/core/server/src/streaming/segments/storage.rs
index 737b8947..a046af12 100644
--- a/core/server/src/streaming/segments/storage.rs
+++ b/core/server/src/streaming/segments/storage.rs
@@ -36,9 +36,13 @@ impl Storage {
         let index_writer =
             IndexWriter::new(index_path, indexes_size.clone(), index_fsync, 
file_exists).await?;
 
+        if file_exists {
+            messages_writer.fsync().await?;
+            index_writer.fsync().await?;
+        }
+
         let messages_reader = MessagesReader::new(messages_path, size).await?;
         let index_reader = IndexReader::new(index_path, indexes_size).await?;
-
         Ok(Self {
             messages_writer: Some(messages_writer),
             messages_reader: Some(messages_reader),


Reply via email to