This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch fix_loading_segments in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 76d9cee2cb96d43d59a67d3c432e773551e02f95 Author: numminex <[email protected]> AuthorDate: Thu Sep 18 11:40:35 2025 +0200 feat(io_uring): fix loading segments on startup --- .../tests/streaming/get_by_timestamp.rs | 42 ++- core/server/src/bootstrap.rs | 362 ++++++++++++++++++--- .../src/compat/index_rebuilding/index_rebuilder.rs | 4 +- core/server/src/main.rs | 4 +- core/server/src/shard/builder.rs | 3 +- core/server/src/slab/streams.rs | 29 +- core/server/src/state/system.rs | 29 +- core/server/src/streaming/partitions/helpers.rs | 6 - core/server/src/streaming/partitions/log.rs | 6 + core/server/src/streaming/segments/storage.rs | 6 +- 10 files changed, 402 insertions(+), 89 deletions(-) diff --git a/core/integration/tests/streaming/get_by_timestamp.rs b/core/integration/tests/streaming/get_by_timestamp.rs index a40d2904..de30345a 100644 --- a/core/integration/tests/streaming/get_by_timestamp.rs +++ b/core/integration/tests/streaming/get_by_timestamp.rs @@ -16,17 +16,17 @@ * under the License. */ -use crate::streaming::common::test_setup::TestSetup; use super::bootstrap_test_environment; +use crate::streaming::common::test_setup::TestSetup; use bytes::BytesMut; use iggy::prelude::*; use server::configs::cache_indexes::CacheIndexesConfig; use server::configs::system::{PartitionConfig, SegmentConfig, SystemConfig}; +use server::shard::namespace::IggyFullNamespace; use server::shard::system::messages::PollingArgs; use server::streaming::polling_consumer::PollingConsumer; use server::streaming::segments::IggyMessagesBatchMut; use server::streaming::traits::MainOps; -use server::shard::namespace::IggyFullNamespace; use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; @@ -117,14 +117,20 @@ async fn test_get_messages_by_timestamp( }; // Use the bootstrap method to create streams with proper slab structure - let bootstrap_result = bootstrap_test_environment(shard_id as u16, &config).await.unwrap(); + let bootstrap_result = bootstrap_test_environment(shard_id as u16, &config) + .await + .unwrap(); let streams = bootstrap_result.streams; let stream_identifier = bootstrap_result.stream_id; let topic_identifier = bootstrap_result.topic_id; let partition_id = bootstrap_result.partition_id; - + // Create namespace for MainOps calls - let namespace = IggyFullNamespace::new(stream_identifier.clone(), topic_identifier.clone(), partition_id); + let namespace = IggyFullNamespace::new( + stream_identifier.clone(), + topic_identifier.clone(), + partition_id, + ); let mut all_messages = Vec::with_capacity(total_messages_count as usize); @@ -193,7 +199,10 @@ async fn test_get_messages_by_timestamp( let batch = IggyMessagesBatchMut::from_messages(messages_slice_to_append, messages_size); assert_eq!(batch.count(), batch_len); - streams.append_messages(shard_id as u16, &config, &namespace, batch).await.unwrap(); + streams + .append_messages(shard_id as u16, &config, &namespace, batch) + .await + .unwrap(); // Capture the timestamp of this batch batch_timestamps.push(IggyTimestamp::now()); @@ -209,10 +218,15 @@ async fn test_get_messages_by_timestamp( let total_sent_messages = total_messages_count; // Create a single consumer to reuse throughout the test - let consumer = PollingConsumer::consumer(&Identifier::numeric(1).unwrap(), partition_id as usize); + let consumer = + PollingConsumer::consumer(&Identifier::numeric(1).unwrap(), partition_id as usize); // Test 1: All messages from initial timestamp - let args = PollingArgs::new(PollingStrategy::timestamp(initial_timestamp), total_sent_messages, false); + let args = PollingArgs::new( + PollingStrategy::timestamp(initial_timestamp), + total_sent_messages, + false, + ); let (_, all_loaded_messages) = streams .poll_messages(&namespace, consumer, args) .await @@ -235,7 +249,11 @@ async fn test_get_messages_by_timestamp( let prior_batches_sum: u32 = batch_lengths[..3].iter().sum(); let remaining_messages = total_sent_messages - prior_batches_sum; - let args = PollingArgs::new(PollingStrategy::timestamp(middle_timestamp), remaining_messages, false); + let args = PollingArgs::new( + PollingStrategy::timestamp(middle_timestamp), + remaining_messages, + false, + ); let (_, middle_messages) = streams .poll_messages(&namespace, consumer, args) .await @@ -265,7 +283,11 @@ async fn test_get_messages_by_timestamp( // Test 4: Small subset from initial timestamp let subset_size = std::cmp::min(3, total_sent_messages); - let args = PollingArgs::new(PollingStrategy::timestamp(initial_timestamp), subset_size, false); + let args = PollingArgs::new( + PollingStrategy::timestamp(initial_timestamp), + subset_size, + false, + ); let (_, subset_messages) = streams .poll_messages(&namespace, consumer, args) .await diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs index 8e3797b2..a82b357e 100644 --- a/core/server/src/bootstrap.rs +++ b/core/server/src/bootstrap.rs @@ -1,5 +1,7 @@ use crate::{ IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV, + compat::index_rebuilding::index_rebuilder::IndexRebuilder, + configs::cache_indexes::CacheIndexesConfig, configs::{config_provider::ConfigProviderKind, server::ServerConfig, system::SystemConfig}, io::fs_utils, server_error::ServerError, @@ -17,12 +19,11 @@ use crate::{ }, }, state::system::{StreamState, TopicState, UserState}, + streaming::segments::{INDEX_EXTENSION, LOG_EXTENSION, Segment2, storage::Storage}, streaming::{ - deduplication::message_deduplicator, partitions::{ - consumer_offset::{self, ConsumerOffset}, - helpers::create_message_deduplicator, - partition2::{self, ConsumerGroupOffsets, ConsumerOffsets}, + consumer_offset::ConsumerOffset, helpers::create_message_deduplicator, + journal::MemoryMessageJournal, log::SegmentedLog, partition2, storage2::load_consumer_offsets, }, persistence::persister::{FilePersister, FileWithSyncPersister, PersisterKind}, @@ -38,6 +39,7 @@ use crate::{ }; use ahash::HashMap; use compio::{fs::create_dir_all, runtime::Runtime}; +use error_set::ErrContext; use iggy_common::{ ConsumerKind, IggyError, UserId, defaults::{ @@ -46,9 +48,9 @@ use iggy_common::{ }, }; use std::{collections::HashSet, env, path::Path, sync::Arc}; -use tracing::info; +use tracing::{info, warn}; -pub fn load_streams( +pub async fn load_streams( state: impl IntoIterator<Item = StreamState>, config: &SystemConfig, ) -> Result<Streams, IggyError> { @@ -121,56 +123,40 @@ pub fn load_streams( let parent_stats = stats.clone(); let cgs = consumer_groups.into_values(); let partitions = partitions.into_values(); + + // Load each partition asynchronously and insert immediately for partition_state in partitions { info!( "Loading partition with ID: {}, for topic with ID: {} from state...", partition_state.id, topic_id ); + + let partition_id = partition_state.id; + let partition = load_partition_with_segments( + config, + stream_id as usize, + topic_id, + partition_state, + parent_stats.clone(), + ) + .await?; + + // Insert partition into the container streams.with_components_by_id(stream_id as usize, |(root, ..)| { - let parent_stats = parent_stats.clone(); root.topics() - .with_components_by_id_mut(topic_id, |(root, ..)| { - let stats = Arc::new(PartitionStats::new(parent_stats)); - // TODO: This has to be sampled from segments, if there exists more than 0 segements and the segment has more than 0 messages, then we set it to true. - let should_increment_offset = todo!(); - // TODO: This has to be sampled from segments and it's indexes, offset = segment.start_offset + last_index.offset; - let offset = todo!(); - let message_deduplicator = create_message_deduplicator(config); - let id = partition_state.id; - - let consumer_offset_path = config.get_consumer_offsets_path(stream_id as usize, topic_id as usize, id as usize); - let consumer_group_offsets_path = config.get_consumer_group_offsets_path(stream_id as usize, topic_id as usize, id as usize); - let consumer_offset = Arc::new(load_consumer_offsets(&consumer_offset_path, ConsumerKind::Consumer)? - .into_iter().map(|offset| { - (offset.consumer_id as usize, offset) - }).into()); - let consumer_group_offset = Arc::new(load_consumer_offsets(&consumer_group_offsets_path, ConsumerKind::ConsumerGroup)?.into_iter().map(|offset| { - (offset.consumer_id as usize, offset) - }).into()); - - let log = Default::default(); - let partition = partition2::Partition::new( - partition_state.created_at, - should_increment_offset, - stats, - message_deduplicator, - offset, - consumer_offset, - consumer_group_offset, - log - ); + .with_components_by_id_mut(topic_id, |(mut root, ..)| { let new_id = root.partitions_mut().insert(partition); assert_eq!( - new_id, id as usize, + new_id, partition_id as usize, "load_streams: partition id mismatch when inserting partition, mismatch for partition with ID: {}, for topic with ID: {}, for stream with ID: {}", - id, topic_id, stream_id + partition_id, topic_id, stream_id ); - Ok(()) - }) - })?; + }); + }); + info!( "Loaded partition with ID: {}, for topic with ID: {} from state...", - partition_state.id, topic_id + partition_id, topic_id ); } let partition_ids = streams.with_components_by_id(stream_id as usize, |(root, ..)| { @@ -384,3 +370,293 @@ pub async fn update_system_info( storage.info.save(system_info).await?; Ok(()) } + +async fn load_partition_with_segments( + config: &SystemConfig, + stream_id: usize, + topic_id: usize, + partition_state: crate::state::system::PartitionState, + parent_stats: Arc<TopicStats>, +) -> Result<partition2::Partition, IggyError> { + use std::sync::atomic::AtomicU64; + + let stats = Arc::new(PartitionStats::new(parent_stats)); + let partition_id = partition_state.id as u32; + + // Load segments from disk to determine should_increment_offset and current offset + let partition_path = config.get_partition_path(stream_id, topic_id, partition_id as usize); + + info!( + "Loading partition with ID: {} for stream with ID: {} and topic with ID: {}, for path: {} from disk...", + partition_id, stream_id, topic_id, partition_path + ); + + // Read directory entries to find log files using async fs_utils + let dir_entries = fs_utils::walk_dir(&partition_path) + .await + .map_err(|_| IggyError::CannotReadPartitions)?; + + let mut log_files = Vec::new(); + for entry in dir_entries { + if entry.is_dir { + continue; + } + + let extension = entry.path.extension(); + if extension.is_none() || extension.unwrap() != LOG_EXTENSION { + continue; + } + + log_files.push(entry); + } + + log_files.sort_by(|a, b| a.path.file_name().cmp(&b.path.file_name())); + + let mut should_increment_offset = false; + let mut current_offset = 0u64; + let mut log = SegmentedLog::<MemoryMessageJournal>::default(); + + for entry in log_files { + let log_file_name = entry + .path + .file_stem() + .unwrap() + .to_string_lossy() + .to_string(); + + let start_offset = log_file_name.parse::<u64>().unwrap(); + + // Build file paths directly + let messages_file_path = format!("{}/{}.{}", partition_path, start_offset, LOG_EXTENSION); + let index_file_path = format!("{}/{}.{}", partition_path, start_offset, INDEX_EXTENSION); + let time_index_path = index_file_path.replace(INDEX_EXTENSION, "timeindex"); + + // Check if index files exist + async fn try_exists(path: &str) -> Result<bool, std::io::Error> { + match compio::fs::metadata(path).await { + Ok(_) => Ok(true), + Err(err) => match err.kind() { + std::io::ErrorKind::NotFound => Ok(false), + _ => Err(err), + }, + } + } + + let index_path_exists = try_exists(&index_file_path).await.unwrap(); + let time_index_path_exists = try_exists(&time_index_path).await.unwrap(); + let index_cache_enabled = matches!( + config.segment.cache_indexes, + CacheIndexesConfig::All | CacheIndexesConfig::OpenSegment + ); + + // Rebuild indexes if index cache is enabled and index at path does not exist + if index_cache_enabled && (!index_path_exists || time_index_path_exists) { + warn!( + "Index at path {} does not exist, rebuilding it based on {}...", + index_file_path, messages_file_path + ); + let now = std::time::Instant::now(); + let index_rebuilder = IndexRebuilder::new( + messages_file_path.clone(), + index_file_path.clone(), + start_offset, + ); + index_rebuilder.rebuild().await.unwrap_or_else(|e| { + panic!( + "Failed to rebuild index for partition with ID: {} for stream with ID: {} and topic with ID: {}. Error: {e}", + partition_id, stream_id, topic_id, + ) + }); + info!( + "Rebuilding index for path {} finished, it took {} ms", + index_file_path, + now.elapsed().as_millis() + ); + } + + if time_index_path_exists { + compio::fs::remove_file(&time_index_path).await.unwrap(); + } + + // Get file metadata to determine segment properties + let messages_metadata = compio::fs::metadata(&messages_file_path) + .await + .map_err(|_| IggyError::CannotReadPartitions)?; + let messages_size = messages_metadata.len() as u32; + + let index_size = match compio::fs::metadata(&index_file_path).await { + Ok(metadata) => metadata.len() as u32, + Err(_) => 0, // Default to 0 if index file doesn't exist + }; + + // If the first segment has messages, we should increment the offset + if !should_increment_offset { + should_increment_offset = messages_size > 0; + } + + // For segment validation, we'd need to implement checksum validation + // directly on the files if needed - skipping for now as it requires + // understanding the message format + if config.partition.validate_checksum { + info!("Checksum validation for segment at offset {}", start_offset); + } + + // Create storage for the segment using existing files + let storage = Storage::new( + &messages_file_path, + &index_file_path, + messages_size as u64, + index_size as u64, + config.partition.enforce_fsync, + config.partition.enforce_fsync, + true, // file_exists = true for existing segments + ) + .await?; + + // Load indexes from disk to calculate the correct end offset and cache them if needed + // This matches the logic in Segment::load_from_disk method + let loaded_indexes = { + storage. + index_reader + .as_ref() + .unwrap() + .load_all_indexes_from_disk() + .await + .with_error_context(|error| format!("Failed to load indexes during startup for stream ID: {}, topic ID: {}, partition_id: {}, {error}", stream_id, topic_id, partition_id)) + .map_err(|_| IggyError::CannotReadFile)? + }; + + // Calculate end offset based on loaded indexes + let end_offset = if loaded_indexes.count() == 0 { + 0 + } else { + let last_index_offset = loaded_indexes.last().unwrap().offset() as u64; + start_offset + last_index_offset + }; + + let (start_timestamp, end_timestamp) = if loaded_indexes.count() == 0 { + (0, 0) + } else { + ( + loaded_indexes.get(0).unwrap().timestamp(), + loaded_indexes.last().unwrap().timestamp(), + ) + }; + + current_offset = current_offset.max(end_offset); + + // Create the new Segment with proper values from file system + let mut segment = Segment2::new( + start_offset, + config.segment.size, + config.segment.message_expiry, + ); + + // Set properties based on file data + segment.start_timestamp = start_timestamp; + segment.end_timestamp = end_timestamp; + segment.end_offset = end_offset; + segment.size = messages_size; + segment.sealed = true; // Persisted segments are assumed to be sealed + + // Add segment to log first + log.add_persisted_segment(segment, storage); + + // Increment stats for partition - this matches the behavior from partition storage load method + stats.increment_segments_count(1); + + // Increment size and message counts based on the loaded segment data + stats.increment_size_bytes(messages_size as u64); + + // Calculate message count from segment data (end_offset - start_offset + 1 if there are messages) + let messages_count = if end_offset > start_offset { + (end_offset - start_offset + 1) as u64 + } else if messages_size > 0 { + // Fallback: estimate based on loaded indexes count if available + loaded_indexes.count() as u64 + } else { + 0 + }; + + if messages_count > 0 { + stats.increment_messages_count(messages_count); + } + + // Now handle index caching based on configuration + let should_cache_indexes = match config.segment.cache_indexes { + CacheIndexesConfig::All => true, + CacheIndexesConfig::OpenSegment => false, // Will be handled after all segments are loaded + CacheIndexesConfig::None => false, + }; + + // Set the loaded indexes if we should cache them + if should_cache_indexes { + let segment_index = log.segments().len() - 1; + log.set_segment_indexes(segment_index, loaded_indexes); + } + } + + // Handle OpenSegment cache configuration: only the last segment should keep its indexes + if matches!( + config.segment.cache_indexes, + CacheIndexesConfig::OpenSegment + ) && log.has_segments() + { + let segments_count = log.segments().len(); + if segments_count > 0 { + // Use the IndexReader from the last segment's storage to load indexes + let last_storage = log.storages().last().unwrap(); + match last_storage.index_reader.as_ref() { + Some(index_reader) => { + if let Ok(loaded_indexes) = index_reader.load_all_indexes_from_disk().await { + log.set_segment_indexes(segments_count - 1, loaded_indexes); + } + } + None => { + warn!("Index reader not available for last segment in OpenSegment mode"); + } + } + } + } + + // Load consumer offsets + let message_deduplicator = create_message_deduplicator(config); + let consumer_offset_path = + config.get_consumer_offsets_path(stream_id, topic_id, partition_id as usize); + let consumer_group_offsets_path = + config.get_consumer_group_offsets_path(stream_id, topic_id, partition_id as usize); + + let consumer_offset = Arc::new( + load_consumer_offsets(&consumer_offset_path, ConsumerKind::Consumer)? + .into_iter() + .map(|offset| (offset.consumer_id as usize, offset)) + .collect::<HashMap<usize, ConsumerOffset>>() + .into(), + ); + + let consumer_group_offset = Arc::new( + load_consumer_offsets(&consumer_group_offsets_path, ConsumerKind::ConsumerGroup)? + .into_iter() + .map(|offset| (offset.consumer_id as usize, offset)) + .collect::<HashMap<usize, ConsumerOffset>>() + .into(), + ); + + let partition = partition2::Partition::new( + partition_state.created_at, + should_increment_offset, + stats, + message_deduplicator, + Arc::new(AtomicU64::new(current_offset)), + consumer_offset, + consumer_group_offset, + log + ); + + info!( + "Loaded partition with ID: {} for stream with ID: {} and topic with ID: {}, current offset: {}.", + partition_id, stream_id, topic_id, current_offset + ); + + Ok(partition) +} diff --git a/core/server/src/compat/index_rebuilding/index_rebuilder.rs b/core/server/src/compat/index_rebuilding/index_rebuilder.rs index dbc93ebe..402739be 100644 --- a/core/server/src/compat/index_rebuilding/index_rebuilder.rs +++ b/core/server/src/compat/index_rebuilding/index_rebuilder.rs @@ -18,13 +18,11 @@ use crate::server_error::CompatError; use crate::streaming::utils::file; -use async_zip::tokio::write; use compio::{ fs::File, - io::{AsyncBufRead, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter}, + io::{AsyncBufRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter}, }; use iggy_common::{IGGY_MESSAGE_HEADER_SIZE, IggyMessageHeader}; -use std::io::{Seek, SeekFrom}; pub struct IndexRebuilder { pub messages_file_path: String, diff --git a/core/server/src/main.rs b/core/server/src/main.rs index 44917d0b..c31f4cc8 100644 --- a/core/server/src/main.rs +++ b/core/server/src/main.rs @@ -189,7 +189,8 @@ async fn main() -> Result<(), ServerError> { )); let state = SystemState::load(state).await?; let (streams_state, users_state) = state.decompose(); - let streams = load_streams(streams_state.into_values(), &config.system)?; + let streams = load_streams(streams_state.into_values(), &config.system).await?; + tracing::warn!("Streams: {:?}", streams); let users = load_users(users_state.into_values()); // ELEVENTH DISCRETE LOADING STEP. @@ -329,7 +330,6 @@ async fn main() -> Result<(), ServerError> { let _command_handler = BackgroundServerCommandHandler::new(system.clone(), &config) .install_handler(SaveMessagesExecutor) .install_handler(MaintainMessagesExecutor) - .install_handler(ArchiveStateExecutor) .install_handler(CleanPersonalAccessTokensExecutor) .install_handler(SysInfoPrintExecutor) .install_handler(VerifyHeartbeatsExecutor); diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs index 615a5f71..fc203b77 100644 --- a/core/server/src/shard/builder.rs +++ b/core/server/src/shard/builder.rs @@ -149,8 +149,7 @@ impl IggyShardBuilder { id: id, shards: shards, shards_table, - //streams2: streams, // TODO: Fixme - streams2: Default::default(), + streams2: streams, // TODO: Fixme users: RefCell::new(users), storage: storage, encryptor: encryptor, diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs index 0343231f..549b8d82 100644 --- a/core/server/src/slab/streams.rs +++ b/core/server/src/slab/streams.rs @@ -4,16 +4,24 @@ use crate::{ shard::{namespace::IggyFullNamespace, system::messages::PollingArgs}, shard_info, slab::{ - consumer_groups::ConsumerGroups, helpers, partitions::{self, Partitions}, topics::Topics, traits_ext::{ + Keyed, + consumer_groups::ConsumerGroups, + helpers, + partitions::{self, Partitions}, + topics::Topics, + traits_ext::{ ComponentsById, DeleteCell, EntityComponentSystem, EntityComponentSystemMutCell, InsertCell, InteriorMutability, IntoComponents, - }, Keyed + }, }, streaming::{ - partitions::{journal::Journal, partition2::{PartitionRef, PartitionRefMut}}, + partitions::{ + journal::Journal, + partition2::{PartitionRef, PartitionRefMut}, + }, polling_consumer::PollingConsumer, segments::{ - storage::create_segment_storage, IggyMessagesBatchMut, IggyMessagesBatchSet, Segment2 + IggyMessagesBatchMut, IggyMessagesBatchSet, Segment2, storage::create_segment_storage, }, stats::stats::StreamStats, streams::{ @@ -166,14 +174,19 @@ impl MainOps for Streams { streaming_partitions::helpers::calculate_current_offset(), ); - let current_position = self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., log)| { - log.journal().inner().size + log.active_segment().size - }); + let current_position = + self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., log)| { + log.journal().inner().size + log.active_segment().size + }); self.with_partition_by_id_async( stream_id, topic_id, partition_id, - streaming_partitions::helpers::deduplicate_messages(current_offset, current_position, &mut input), + streaming_partitions::helpers::deduplicate_messages( + current_offset, + current_position, + &mut input, + ), ) .await; diff --git a/core/server/src/state/system.rs b/core/server/src/state/system.rs index 2c3ca43b..b5535d0e 100644 --- a/core/server/src/state/system.rs +++ b/core/server/src/state/system.rs @@ -30,17 +30,18 @@ use iggy_common::MaxTopicSize; use iggy_common::create_user::CreateUser; use iggy_common::defaults::DEFAULT_ROOT_USER_ID; use iggy_common::{IdKind, Identifier, Permissions, UserStatus}; +use std::collections::BTreeMap; use std::fmt::Display; use tracing::{debug, error, info}; #[derive(Debug, Clone)] pub struct SystemState { - pub streams: AHashMap<u32, StreamState>, + pub streams: BTreeMap<u32, StreamState>, pub users: AHashMap<u32, UserState>, } impl SystemState { - pub fn decompose(self) -> (AHashMap<u32, StreamState>, AHashMap<u32, UserState>) { + pub fn decompose(self) -> (BTreeMap<u32, StreamState>, AHashMap<u32, UserState>) { (self.streams, self.users) } } @@ -50,15 +51,15 @@ pub struct StreamState { pub id: u32, pub name: String, pub created_at: IggyTimestamp, - pub topics: AHashMap<u32, TopicState>, + pub topics: BTreeMap<u32, TopicState>, } #[derive(Debug, Clone)] pub struct TopicState { pub id: u32, pub name: String, - pub partitions: AHashMap<u32, PartitionState>, - pub consumer_groups: AHashMap<u32, ConsumerGroupState>, + pub partitions: BTreeMap<u32, PartitionState>, + pub consumer_groups: BTreeMap<u32, ConsumerGroupState>, pub compression_algorithm: CompressionAlgorithm, pub message_expiry: IggyExpiry, pub max_topic_size: MaxTopicSize, @@ -161,7 +162,7 @@ impl SystemState { } pub async fn init(entries: Vec<StateEntry>) -> Result<Self, IggyError> { - let mut streams = AHashMap::new(); + let mut streams = BTreeMap::new(); let mut users = AHashMap::new(); for entry in entries { debug!("Processing state entry: {entry}",); @@ -177,7 +178,7 @@ impl SystemState { let stream = StreamState { id: stream_id, name: command.name.clone(), - topics: AHashMap::new(), + topics: BTreeMap::new(), created_at: entry.timestamp, }; streams.insert(stream.id, stream); @@ -210,15 +211,15 @@ impl SystemState { let topic = TopicState { id: topic_id, name: command.name, - consumer_groups: AHashMap::new(), + consumer_groups: BTreeMap::new(), compression_algorithm: command.compression_algorithm, message_expiry: command.message_expiry, max_topic_size: command.max_topic_size, replication_factor: command.replication_factor, created_at: entry.timestamp, partitions: if command.partitions_count > 0 { - let mut partitions = AHashMap::new(); - for i in 1..=command.partitions_count { + let mut partitions = BTreeMap::new(); + for i in 0..command.partitions_count { partitions.insert( i, PartitionState { @@ -229,7 +230,7 @@ impl SystemState { } partitions } else { - AHashMap::new() + BTreeMap::new() }, }; stream.topics.insert(topic.id, topic); @@ -488,7 +489,7 @@ impl SystemState { } } -fn find_stream_id(streams: &AHashMap<u32, StreamState>, stream_id: &Identifier) -> u32 { +fn find_stream_id(streams: &BTreeMap<u32, StreamState>, stream_id: &Identifier) -> u32 { match stream_id.kind { IdKind::Numeric => stream_id .get_u32_value() @@ -506,7 +507,7 @@ fn find_stream_id(streams: &AHashMap<u32, StreamState>, stream_id: &Identifier) } } -fn find_topic_id(topics: &AHashMap<u32, TopicState>, topic_id: &Identifier) -> u32 { +fn find_topic_id(topics: &BTreeMap<u32, TopicState>, topic_id: &Identifier) -> u32 { match topic_id.kind { IdKind::Numeric => topic_id .get_u32_value() @@ -525,7 +526,7 @@ fn find_topic_id(topics: &AHashMap<u32, TopicState>, topic_id: &Identifier) -> u } fn find_consumer_group_id( - groups: &AHashMap<u32, ConsumerGroupState>, + groups: &BTreeMap<u32, ConsumerGroupState>, group_id: &Identifier, ) -> u32 { match group_id.kind { diff --git a/core/server/src/streaming/partitions/helpers.rs b/core/server/src/streaming/partitions/helpers.rs index 8ce35655..c16020d8 100644 --- a/core/server/src/streaming/partitions/helpers.rs +++ b/core/server/src/streaming/partitions/helpers.rs @@ -472,9 +472,6 @@ pub fn get_segment_range_by_timestamp( ) -> impl FnOnce(ComponentsById<PartitionRef>) -> Result<std::ops::Range<usize>, IggyError> { move |(.., log)| -> Result<std::ops::Range<usize>, IggyError> { let segments = log.segments(); - for seg in segments { - tracing::warn!("timestamp: {}, segment: {:?}", timestamp, seg); - } let start = log .segments() .iter() @@ -705,9 +702,6 @@ pub fn commit_journal() -> impl FnOnce(ComponentsById<PartitionRefMut>) -> IggyM let batches = log.journal_mut().commit(); log.ensure_indexes(); batches.append_indexes_to(log.active_indexes_mut().unwrap()); - let indexes = log.active_indexes_mut().unwrap(); - let first = indexes.get(0).unwrap(); - let last = indexes.last().unwrap(); batches } } diff --git a/core/server/src/streaming/partitions/log.rs b/core/server/src/streaming/partitions/log.rs index 83712975..d2bc75f7 100644 --- a/core/server/src/streaming/partitions/log.rs +++ b/core/server/src/streaming/partitions/log.rs @@ -135,6 +135,12 @@ where self.storage.push(storage); self.indexes.push(None); } + + pub fn set_segment_indexes(&mut self, segment_index: usize, indexes: IggyIndexesMut) { + if let Some(segment_indexes) = self.indexes.get_mut(segment_index) { + *segment_indexes = Some(indexes); + } + } } impl<J> SegmentedLog<J> diff --git a/core/server/src/streaming/segments/storage.rs b/core/server/src/streaming/segments/storage.rs index 737b8947..a046af12 100644 --- a/core/server/src/streaming/segments/storage.rs +++ b/core/server/src/streaming/segments/storage.rs @@ -36,9 +36,13 @@ impl Storage { let index_writer = IndexWriter::new(index_path, indexes_size.clone(), index_fsync, file_exists).await?; + if file_exists { + messages_writer.fsync().await?; + index_writer.fsync().await?; + } + let messages_reader = MessagesReader::new(messages_path, size).await?; let index_reader = IndexReader::new(index_path, indexes_size).await?; - Ok(Self { messages_writer: Some(messages_writer), messages_reader: Some(messages_reader),
