This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch fix_flush_messages_method in repository https://gitbox.apache.org/repos/asf/iggy.git
commit af3a56671c249f9d0e9ea37020680d2c86d40775 Author: numminex <[email protected]> AuthorDate: Fri Oct 3 09:46:40 2025 +0200 feat(io_uring): fix flush_unsaved_buffer command --- .../messages/flush_unsaved_buffer_handler.rs | 2 +- core/server/src/bootstrap.rs | 32 +------- core/server/src/main.rs | 86 ++-------------------- core/server/src/shard/system/messages.rs | 44 ++++++++++- core/server/src/slab/partitions.rs | 2 - core/server/src/slab/streams.rs | 19 ++++- core/server/src/streaming/topics/topic2.rs | 4 - 7 files changed, 70 insertions(+), 119 deletions(-) diff --git a/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs b/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs index 45bc9db6..f38fdba8 100644 --- a/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs +++ b/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs @@ -44,7 +44,7 @@ impl ServerCommandHandler for FlushUnsavedBuffer { let stream_id = self.stream_id.clone(); let topic_id = self.topic_id.clone(); - let partition_id = self.partition_id; + let partition_id = self.partition_id as usize; let fsync = self.fsync; shard .flush_unsaved_buffer(session, stream_id, topic_id, partition_id, fsync) diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs index fc38b069..8e6c06b3 100644 --- a/core/server/src/bootstrap.rs +++ b/core/server/src/bootstrap.rs @@ -144,7 +144,6 @@ pub async fn load_streams( parent_stats.clone(), ) .await?; - // Insert partition into the container streams.with_components_by_id(stream_id as usize, |(root, ..)| { root.topics() .with_components_by_id_mut(topic_id, |(mut root, ..)| { @@ -335,13 +334,12 @@ pub fn create_shard_executor(cpu_set: HashSet<usize>) -> Runtime { // TODO: The event interval tick, could be configured based on the fact // How many clients we expect to have connected. // This roughly estimates the number of tasks we will create. - let mut proactor = compio::driver::ProactorBuilder::new(); proactor .capacity(4096) .coop_taskrun(true) - .taskrun_flag(true); // TODO: Try enabling this. + .taskrun_flag(true); // FIXME(hubcio): Only set thread_pool_limit(0) on non-macOS platforms // This causes a freeze on macOS with compio fs operations @@ -403,7 +401,6 @@ pub async fn load_segments( partition_path: String, stats: Arc<PartitionStats>, ) -> Result<SegmentedLog<MemoryMessageJournal>, IggyError> { - // Read directory entries to find log files using async fs_utils let mut log_files = collect_log_files(&partition_path).await?; log_files.sort_by(|a, b| a.path.file_name().cmp(&b.path.file_name())); let mut log = SegmentedLog::<MemoryMessageJournal>::default(); @@ -417,12 +414,10 @@ pub async fn load_segments( let start_offset = log_file_name.parse::<u64>().unwrap(); - // Build file paths directly let messages_file_path = format!("{}/{}.{}", partition_path, log_file_name, LOG_EXTENSION); let index_file_path = format!("{}/{}.{}", partition_path, log_file_name, INDEX_EXTENSION); let time_index_path = index_file_path.replace(INDEX_EXTENSION, "timeindex"); - // Check if index files exist async fn try_exists(path: &str) -> Result<bool, std::io::Error> { match compio::fs::metadata(path).await { Ok(_) => Ok(true), @@ -440,7 +435,6 @@ pub async fn load_segments( CacheIndexesConfig::All | CacheIndexesConfig::OpenSegment ); - // Rebuild indexes if index cache is enabled and index at path does not exist if index_cache_enabled && (!index_path_exists || time_index_path_exists) { warn!( "Index at path {} does not exist, rebuilding it based on {}...", @@ -469,7 +463,6 @@ pub async fn load_segments( compio::fs::remove_file(&time_index_path).await.unwrap(); } - // Get file metadata to determine segment properties let messages_metadata = compio::fs::metadata(&messages_file_path) .await .map_err(|_| IggyError::CannotReadPartitions)?; @@ -480,7 +473,6 @@ pub async fn load_segments( Err(_) => 0, // Default to 0 if index file doesn't exist }; - // Create storage for the segment using existing files let storage = Storage::new( &messages_file_path, &index_file_path, @@ -488,12 +480,10 @@ pub async fn load_segments( index_size as u64, config.partition.enforce_fsync, config.partition.enforce_fsync, - true, // file_exists = true for existing segments + true, ) .await?; - // Load indexes from disk to calculate the correct end offset and cache them if needed - // This matches the logic in Segment::load_from_disk method let loaded_indexes = { storage. index_reader @@ -505,7 +495,6 @@ pub async fn load_segments( .map_err(|_| IggyError::CannotReadFile)? }; - // Calculate end offset based on loaded indexes let end_offset = if loaded_indexes.count() == 0 { 0 } else { @@ -522,14 +511,12 @@ pub async fn load_segments( ) }; - // Create the new Segment with proper values from file system let mut segment = Segment2::new( start_offset, config.segment.size, config.segment.message_expiry, ); - // Set properties based on file data segment.start_timestamp = start_timestamp; segment.end_timestamp = end_timestamp; segment.end_offset = end_offset; @@ -585,20 +572,15 @@ pub async fn load_segments( } } - // Add segment to log log.add_persisted_segment(segment, storage); - // Increment stats for partition - this matches the behavior from partition storage load method stats.increment_segments_count(1); - // Increment size and message counts based on the loaded segment data stats.increment_size_bytes(messages_size as u64); - // Calculate message count from segment data (end_offset - start_offset + 1 if there are messages) let messages_count = if end_offset > start_offset { (end_offset - start_offset + 1) as u64 } else if messages_size > 0 { - // Fallback: estimate based on loaded indexes count if available loaded_indexes.count() as u64 } else { 0 @@ -608,21 +590,18 @@ pub async fn load_segments( stats.increment_messages_count(messages_count); } - // Now handle index caching based on configuration let should_cache_indexes = match config.segment.cache_indexes { CacheIndexesConfig::All => true, - CacheIndexesConfig::OpenSegment => false, // Will be handled after all segments are loaded + CacheIndexesConfig::OpenSegment => false, CacheIndexesConfig::None => false, }; - // Set the loaded indexes if we should cache them if should_cache_indexes { let segment_index = log.segments().len() - 1; log.set_segment_indexes(segment_index, loaded_indexes); } } - // Handle OpenSegment cache configuration: only the last segment should keep its indexes if matches!( config.segment.cache_indexes, CacheIndexesConfig::OpenSegment @@ -630,7 +609,6 @@ pub async fn load_segments( { let segments_count = log.segments().len(); if segments_count > 0 { - // Use the IndexReader from the last segment's storage to load indexes let last_storage = log.storages().last().unwrap(); match last_storage.index_reader.as_ref() { Some(index_reader) => { @@ -655,11 +633,9 @@ async fn load_partition( partition_state: crate::state::system::PartitionState, parent_stats: Arc<TopicStats>, ) -> Result<partition2::Partition, IggyError> { - use std::sync::atomic::AtomicU64; let stats = Arc::new(PartitionStats::new(parent_stats)); let partition_id = partition_state.id as u32; - // Load segments from disk to determine should_increment_offset and current offset let partition_path = config.get_partition_path(stream_id, topic_id, partition_id as usize); let log_files = collect_log_files(&partition_path).await?; let should_increment_offset = !log_files.is_empty() @@ -675,7 +651,6 @@ async fn load_partition( let start_offset = log_file_name.parse::<u64>().unwrap(); - // Build file paths directly let messages_file_path = format!("{}/{}.{}", partition_path, start_offset, LOG_EXTENSION); std::fs::metadata(&messages_file_path).is_ok_and(|metadata| metadata.len() > 0) @@ -686,6 +661,7 @@ async fn load_partition( "Loading partition with ID: {} for stream with ID: {} and topic with ID: {}, for path: {} from disk...", partition_id, stream_id, topic_id, partition_path ); + // Load consumer offsets let message_deduplicator = create_message_deduplicator(config); let consumer_offset_path = diff --git a/core/server/src/main.rs b/core/server/src/main.rs index 35a37fee..37f225e6 100644 --- a/core/server/src/main.rs +++ b/core/server/src/main.rs @@ -230,6 +230,11 @@ async fn main() -> Result<(), ServerError> { } } + #[cfg(feature = "disable-mimalloc")] + warn!("Using default system allocator because code was build with `disable-mimalloc` feature"); + #[cfg(not(feature = "disable-mimalloc"))] + info!("Using mimalloc allocator"); + // DISCRETE STEP. // Increment the metrics. let metrics = Metrics::init(); @@ -343,7 +348,6 @@ async fn main() -> Result<(), ServerError> { } let shutdown_handles_for_signal = shutdown_handles.clone(); - ctrlc::set_handler(move || { let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) @@ -367,7 +371,9 @@ async fn main() -> Result<(), ServerError> { info!("Iggy server is running. Press Ctrl+C or send SIGTERM to shutdown."); for (idx, handle) in handles.into_iter().enumerate() { - handle.join().expect("Failed to join shard thread"); + handle + .join() + .expect(format!("Failed to join shard thread-{}", idx).as_str()); } let shutdown_duration_msg = { @@ -389,81 +395,5 @@ async fn main() -> Result<(), ServerError> { shutdown_duration_msg ); - /* - #[cfg(feature = "disable-mimalloc")] - warn!("Using default system allocator because code was build with `disable-mimalloc` feature"); - #[cfg(not(feature = "disable-mimalloc"))] - info!("Using mimalloc allocator"); - - let system = SharedSystem::new(System::new( - config.system.clone(), - config.cluster.clone(), - config.cluster.clone(), - config.data_maintenance.clone(), - config.personal_access_token.clone(), - )); - */ - - /* - - let mut current_config = config.clone(); - - if config.http.enabled { - let http_addr = http_server::start(config.http, system.clone()).await; - current_config.http.address = http_addr.to_string(); - } - - if config.quic.enabled { - let quic_addr = quic_server::start(config.quic, system.clone()); - current_config.quic.address = quic_addr.to_string(); - } - - if config.tcp.enabled { - let tcp_addr = tcp_server::start(config.tcp, system.clone()).await; - current_config.tcp.address = tcp_addr.to_string(); - } - - let runtime_path = current_config.system.get_runtime_path(); - let current_config_path = format!("{runtime_path}/current_config.toml"); - let current_config_content = - toml::to_string(¤t_config).expect("Cannot serialize current_config"); - tokio::fs::write(current_config_path, current_config_content).await?; - - let elapsed_time = startup_timestamp.elapsed(); - info!( - "Iggy server has started - overall startup took {} ms.", - elapsed_time.as_millis() - ); - - #[cfg(unix)] - tokio::select! { - _ = ctrl_c.recv() => { - info!("Received SIGINT. Shutting down Iggy server..."); - }, - _ = sigterm.recv() => { - info!("Received SIGTERM. Shutting down Iggy server..."); - } - } - - #[cfg(windows)] - match tokio::signal::ctrl_c().await { - Ok(()) => { - info!("Received CTRL-C. Shutting down Iggy server..."); - } - Err(err) => { - eprintln!("Unable to listen for shutdown signal: {}", err); - } - } - - let shutdown_timestamp = Instant::now(); - let mut system = system.write().await; - system.shutdown().await?; - let elapsed_time = shutdown_timestamp.elapsed(); - - info!( - "Iggy server has shutdown successfully. Shutdown took {} ms.", - elapsed_time.as_millis() - ); - */ Ok(()) } diff --git a/core/server/src/shard/system/messages.rs b/core/server/src/shard/system/messages.rs index 211d4a54..3ac56826 100644 --- a/core/server/src/shard/system/messages.rs +++ b/core/server/src/shard/system/messages.rs @@ -25,6 +25,7 @@ use crate::shard::transmission::message::{ ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult, }; use crate::shard_trace; +use crate::streaming::partitions::journal::Journal; use crate::streaming::polling_consumer::PollingConsumer; use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet}; use crate::streaming::session::Session; @@ -364,11 +365,48 @@ impl IggyShard { session: &Session, stream_id: Identifier, topic_id: Identifier, - partition_id: u32, - fsync: bool, + partition_id: usize, + _fsync: bool, ) -> Result<(), IggyError> { self.ensure_authenticated(session)?; - todo!(); + let numeric_stream_id = self + .streams2 + .with_stream_by_id(&stream_id, streams::helpers::get_stream_id()); + + let numeric_topic_id = + self.streams2 + .with_topic_by_id(&stream_id, &topic_id, topics::helpers::get_topic_id()); + + // Validate permissions for given user on stream and topic. + self.permissioner + .borrow() + .append_messages( + session.get_user_id(), + numeric_stream_id as u32, + numeric_topic_id as u32, + ) + .with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - permission denied to append messages for user {} on stream ID: {}, topic ID: {}", session.get_user_id(), numeric_stream_id as u32, numeric_topic_id as u32) + })?; + + let batches = self.streams2.with_partition_by_id_mut( + &stream_id, + &topic_id, + partition_id, + |(.., log)| log.journal_mut().commit(), + ); + + self.streams2 + .persist_messages_to_disk( + self.id, + &stream_id, + &topic_id, + partition_id, + batches, + &self.config.system, + ) + .await?; + Ok(()) } async fn decrypt_messages( diff --git a/core/server/src/slab/partitions.rs b/core/server/src/slab/partitions.rs index b3d3fda9..f430d1ab 100644 --- a/core/server/src/slab/partitions.rs +++ b/core/server/src/slab/partitions.rs @@ -164,7 +164,6 @@ impl EntityComponentSystem<Borrow> for Partitions { { f(self.into()) } - } impl EntityComponentSystemMut for Partitions { @@ -220,5 +219,4 @@ impl Partitions { ) -> T { self.with_components_by_id_mut(id, |components| f(components)) } - } diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs index decc6b63..6167987c 100644 --- a/core/server/src/slab/streams.rs +++ b/core/server/src/slab/streams.rs @@ -1,8 +1,6 @@ use crate::shard::task_registry::TaskRegistry; use crate::shard_trace; use crate::streaming::partitions as streaming_partitions; -use crate::streaming::segments::IggyIndexesMut; -use crate::streaming::segments::storage::Storage; use crate::{ binary::handlers::messages::poll_messages_handler::IggyPollMetadata, configs::{cache_indexes::CacheIndexesConfig, system::SystemConfig}, @@ -1235,9 +1233,24 @@ impl Streams { reason ); + let batch_count = self + .persist_messages_to_disk(shard_id, stream_id, topic_id, partition_id, batches, config) + .await?; + + Ok(batch_count) + } + + pub async fn persist_messages_to_disk( + &self, + shard_id: u16, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: usize, + batches: IggyMessagesBatchSet, + config: &SystemConfig, + ) -> Result<u32, IggyError> { let batch_count = batches.count(); let batch_size = batches.size(); - // Extract storage before async operations let (messages_writer, index_writer) = self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., log)| { diff --git a/core/server/src/streaming/topics/topic2.rs b/core/server/src/streaming/topics/topic2.rs index 64d4fc69..e46a5c3b 100644 --- a/core/server/src/streaming/topics/topic2.rs +++ b/core/server/src/streaming/topics/topic2.rs @@ -260,10 +260,6 @@ impl TopicRoot { f(self) } - pub async fn invoke_async<T>(&self, f: impl AsyncFnOnce(&Self) -> T) -> T { - f(self).await - } - pub fn update_id(&mut self, id: usize) { self.id = id; }
