This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch fix_flush_messages_method
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit af3a56671c249f9d0e9ea37020680d2c86d40775
Author: numminex <[email protected]>
AuthorDate: Fri Oct 3 09:46:40 2025 +0200

    feat(io_uring): fix flush_unsaved_buffer command
---
 .../messages/flush_unsaved_buffer_handler.rs       |  2 +-
 core/server/src/bootstrap.rs                       | 32 +-------
 core/server/src/main.rs                            | 86 ++--------------------
 core/server/src/shard/system/messages.rs           | 44 ++++++++++-
 core/server/src/slab/partitions.rs                 |  2 -
 core/server/src/slab/streams.rs                    | 19 ++++-
 core/server/src/streaming/topics/topic2.rs         |  4 -
 7 files changed, 70 insertions(+), 119 deletions(-)

diff --git 
a/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs 
b/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs
index 45bc9db6..f38fdba8 100644
--- a/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs
+++ b/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs
@@ -44,7 +44,7 @@ impl ServerCommandHandler for FlushUnsavedBuffer {
 
         let stream_id = self.stream_id.clone();
         let topic_id = self.topic_id.clone();
-        let partition_id = self.partition_id;
+        let partition_id = self.partition_id as usize;
         let fsync = self.fsync;
         shard
             .flush_unsaved_buffer(session, stream_id, topic_id, partition_id, 
fsync)
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index fc38b069..8e6c06b3 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -144,7 +144,6 @@ pub async fn load_streams(
                     parent_stats.clone(),
                 )
                 .await?;
-                // Insert partition into the container
                 streams.with_components_by_id(stream_id as usize, |(root, ..)| 
{
                     root.topics()
                         .with_components_by_id_mut(topic_id, |(mut root, ..)| {
@@ -335,13 +334,12 @@ pub fn create_shard_executor(cpu_set: HashSet<usize>) -> 
Runtime {
     // TODO: The event interval tick, could be configured based on the fact
     // How many clients we expect to have connected.
     // This roughly estimates the number of tasks we will create.
-
     let mut proactor = compio::driver::ProactorBuilder::new();
 
     proactor
         .capacity(4096)
         .coop_taskrun(true)
-        .taskrun_flag(true); // TODO: Try enabling this.
+        .taskrun_flag(true);
 
     // FIXME(hubcio): Only set thread_pool_limit(0) on non-macOS platforms
     // This causes a freeze on macOS with compio fs operations
@@ -403,7 +401,6 @@ pub async fn load_segments(
     partition_path: String,
     stats: Arc<PartitionStats>,
 ) -> Result<SegmentedLog<MemoryMessageJournal>, IggyError> {
-    // Read directory entries to find log files using async fs_utils
     let mut log_files = collect_log_files(&partition_path).await?;
     log_files.sort_by(|a, b| a.path.file_name().cmp(&b.path.file_name()));
     let mut log = SegmentedLog::<MemoryMessageJournal>::default();
@@ -417,12 +414,10 @@ pub async fn load_segments(
 
         let start_offset = log_file_name.parse::<u64>().unwrap();
 
-        // Build file paths directly
         let messages_file_path = format!("{}/{}.{}", partition_path, 
log_file_name, LOG_EXTENSION);
         let index_file_path = format!("{}/{}.{}", partition_path, 
log_file_name, INDEX_EXTENSION);
         let time_index_path = index_file_path.replace(INDEX_EXTENSION, 
"timeindex");
 
-        // Check if index files exist
         async fn try_exists(path: &str) -> Result<bool, std::io::Error> {
             match compio::fs::metadata(path).await {
                 Ok(_) => Ok(true),
@@ -440,7 +435,6 @@ pub async fn load_segments(
             CacheIndexesConfig::All | CacheIndexesConfig::OpenSegment
         );
 
-        // Rebuild indexes if index cache is enabled and index at path does 
not exist
         if index_cache_enabled && (!index_path_exists || 
time_index_path_exists) {
             warn!(
                 "Index at path {} does not exist, rebuilding it based on 
{}...",
@@ -469,7 +463,6 @@ pub async fn load_segments(
             compio::fs::remove_file(&time_index_path).await.unwrap();
         }
 
-        // Get file metadata to determine segment properties
         let messages_metadata = compio::fs::metadata(&messages_file_path)
             .await
             .map_err(|_| IggyError::CannotReadPartitions)?;
@@ -480,7 +473,6 @@ pub async fn load_segments(
             Err(_) => 0, // Default to 0 if index file doesn't exist
         };
 
-        // Create storage for the segment using existing files
         let storage = Storage::new(
             &messages_file_path,
             &index_file_path,
@@ -488,12 +480,10 @@ pub async fn load_segments(
             index_size as u64,
             config.partition.enforce_fsync,
             config.partition.enforce_fsync,
-            true, // file_exists = true for existing segments
+            true,
         )
         .await?;
 
-        // Load indexes from disk to calculate the correct end offset and 
cache them if needed
-        // This matches the logic in Segment::load_from_disk method
         let loaded_indexes = {
             storage.
             index_reader
@@ -505,7 +495,6 @@ pub async fn load_segments(
             .map_err(|_| IggyError::CannotReadFile)?
         };
 
-        // Calculate end offset based on loaded indexes
         let end_offset = if loaded_indexes.count() == 0 {
             0
         } else {
@@ -522,14 +511,12 @@ pub async fn load_segments(
             )
         };
 
-        // Create the new Segment with proper values from file system
         let mut segment = Segment2::new(
             start_offset,
             config.segment.size,
             config.segment.message_expiry,
         );
 
-        // Set properties based on file data
         segment.start_timestamp = start_timestamp;
         segment.end_timestamp = end_timestamp;
         segment.end_offset = end_offset;
@@ -585,20 +572,15 @@ pub async fn load_segments(
             }
         }
 
-        // Add segment to log
         log.add_persisted_segment(segment, storage);
 
-        // Increment stats for partition - this matches the behavior from 
partition storage load method
         stats.increment_segments_count(1);
 
-        // Increment size and message counts based on the loaded segment data
         stats.increment_size_bytes(messages_size as u64);
 
-        // Calculate message count from segment data (end_offset - 
start_offset + 1 if there are messages)
         let messages_count = if end_offset > start_offset {
             (end_offset - start_offset + 1) as u64
         } else if messages_size > 0 {
-            // Fallback: estimate based on loaded indexes count if available
             loaded_indexes.count() as u64
         } else {
             0
@@ -608,21 +590,18 @@ pub async fn load_segments(
             stats.increment_messages_count(messages_count);
         }
 
-        // Now handle index caching based on configuration
         let should_cache_indexes = match config.segment.cache_indexes {
             CacheIndexesConfig::All => true,
-            CacheIndexesConfig::OpenSegment => false, // Will be handled after 
all segments are loaded
+            CacheIndexesConfig::OpenSegment => false,
             CacheIndexesConfig::None => false,
         };
 
-        // Set the loaded indexes if we should cache them
         if should_cache_indexes {
             let segment_index = log.segments().len() - 1;
             log.set_segment_indexes(segment_index, loaded_indexes);
         }
     }
 
-    // Handle OpenSegment cache configuration: only the last segment should 
keep its indexes
     if matches!(
         config.segment.cache_indexes,
         CacheIndexesConfig::OpenSegment
@@ -630,7 +609,6 @@ pub async fn load_segments(
     {
         let segments_count = log.segments().len();
         if segments_count > 0 {
-            // Use the IndexReader from the last segment's storage to load 
indexes
             let last_storage = log.storages().last().unwrap();
             match last_storage.index_reader.as_ref() {
                 Some(index_reader) => {
@@ -655,11 +633,9 @@ async fn load_partition(
     partition_state: crate::state::system::PartitionState,
     parent_stats: Arc<TopicStats>,
 ) -> Result<partition2::Partition, IggyError> {
-    use std::sync::atomic::AtomicU64;
     let stats = Arc::new(PartitionStats::new(parent_stats));
     let partition_id = partition_state.id as u32;
 
-    // Load segments from disk to determine should_increment_offset and 
current offset
     let partition_path = config.get_partition_path(stream_id, topic_id, 
partition_id as usize);
     let log_files = collect_log_files(&partition_path).await?;
     let should_increment_offset = !log_files.is_empty()
@@ -675,7 +651,6 @@ async fn load_partition(
 
                 let start_offset = log_file_name.parse::<u64>().unwrap();
 
-                // Build file paths directly
                 let messages_file_path =
                     format!("{}/{}.{}", partition_path, start_offset, 
LOG_EXTENSION);
                 std::fs::metadata(&messages_file_path).is_ok_and(|metadata| 
metadata.len() > 0)
@@ -686,6 +661,7 @@ async fn load_partition(
         "Loading partition with ID: {} for stream with ID: {} and topic with 
ID: {}, for path: {} from disk...",
         partition_id, stream_id, topic_id, partition_path
     );
+
     // Load consumer offsets
     let message_deduplicator = create_message_deduplicator(config);
     let consumer_offset_path =
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 35a37fee..37f225e6 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -230,6 +230,11 @@ async fn main() -> Result<(), ServerError> {
         }
     }
 
+    #[cfg(feature = "disable-mimalloc")]
+    warn!("Using default system allocator because code was build with 
`disable-mimalloc` feature");
+    #[cfg(not(feature = "disable-mimalloc"))]
+    info!("Using mimalloc allocator");
+
     // DISCRETE STEP.
     // Increment the metrics.
     let metrics = Metrics::init();
@@ -343,7 +348,6 @@ async fn main() -> Result<(), ServerError> {
     }
 
     let shutdown_handles_for_signal = shutdown_handles.clone();
-
     ctrlc::set_handler(move || {
         let now = std::time::SystemTime::now()
             .duration_since(std::time::UNIX_EPOCH)
@@ -367,7 +371,9 @@ async fn main() -> Result<(), ServerError> {
     info!("Iggy server is running. Press Ctrl+C or send SIGTERM to shutdown.");
 
     for (idx, handle) in handles.into_iter().enumerate() {
-        handle.join().expect("Failed to join shard thread");
+        handle
+            .join()
+            .expect(format!("Failed to join shard thread-{}", idx).as_str());
     }
 
     let shutdown_duration_msg = {
@@ -389,81 +395,5 @@ async fn main() -> Result<(), ServerError> {
         shutdown_duration_msg
     );
 
-    /*
-    #[cfg(feature = "disable-mimalloc")]
-    warn!("Using default system allocator because code was build with 
`disable-mimalloc` feature");
-    #[cfg(not(feature = "disable-mimalloc"))]
-    info!("Using mimalloc allocator");
-
-    let system = SharedSystem::new(System::new(
-        config.system.clone(),
-        config.cluster.clone(),
-        config.cluster.clone(),
-        config.data_maintenance.clone(),
-        config.personal_access_token.clone(),
-    ));
-    */
-
-    /*
-
-    let mut current_config = config.clone();
-
-    if config.http.enabled {
-        let http_addr = http_server::start(config.http, system.clone()).await;
-        current_config.http.address = http_addr.to_string();
-    }
-
-    if config.quic.enabled {
-        let quic_addr = quic_server::start(config.quic, system.clone());
-        current_config.quic.address = quic_addr.to_string();
-    }
-
-    if config.tcp.enabled {
-        let tcp_addr = tcp_server::start(config.tcp, system.clone()).await;
-        current_config.tcp.address = tcp_addr.to_string();
-    }
-
-    let runtime_path = current_config.system.get_runtime_path();
-    let current_config_path = format!("{runtime_path}/current_config.toml");
-    let current_config_content =
-        toml::to_string(&current_config).expect("Cannot serialize 
current_config");
-    tokio::fs::write(current_config_path, current_config_content).await?;
-
-    let elapsed_time = startup_timestamp.elapsed();
-    info!(
-        "Iggy server has started - overall startup took {} ms.",
-        elapsed_time.as_millis()
-    );
-
-    #[cfg(unix)]
-    tokio::select! {
-        _ = ctrl_c.recv() => {
-            info!("Received SIGINT. Shutting down Iggy server...");
-        },
-        _ = sigterm.recv() => {
-            info!("Received SIGTERM. Shutting down Iggy server...");
-        }
-    }
-
-    #[cfg(windows)]
-    match tokio::signal::ctrl_c().await {
-        Ok(()) => {
-            info!("Received CTRL-C. Shutting down Iggy server...");
-        }
-        Err(err) => {
-            eprintln!("Unable to listen for shutdown signal: {}", err);
-        }
-    }
-
-    let shutdown_timestamp = Instant::now();
-    let mut system = system.write().await;
-    system.shutdown().await?;
-    let elapsed_time = shutdown_timestamp.elapsed();
-
-    info!(
-        "Iggy server has shutdown successfully. Shutdown took {} ms.",
-        elapsed_time.as_millis()
-    );
-    */
     Ok(())
 }
diff --git a/core/server/src/shard/system/messages.rs 
b/core/server/src/shard/system/messages.rs
index 211d4a54..3ac56826 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -25,6 +25,7 @@ use crate::shard::transmission::message::{
     ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult,
 };
 use crate::shard_trace;
+use crate::streaming::partitions::journal::Journal;
 use crate::streaming::polling_consumer::PollingConsumer;
 use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut, 
IggyMessagesBatchSet};
 use crate::streaming::session::Session;
@@ -364,11 +365,48 @@ impl IggyShard {
         session: &Session,
         stream_id: Identifier,
         topic_id: Identifier,
-        partition_id: u32,
-        fsync: bool,
+        partition_id: usize,
+        _fsync: bool,
     ) -> Result<(), IggyError> {
         self.ensure_authenticated(session)?;
-        todo!();
+        let numeric_stream_id = self
+            .streams2
+            .with_stream_by_id(&stream_id, streams::helpers::get_stream_id());
+
+        let numeric_topic_id =
+            self.streams2
+                .with_topic_by_id(&stream_id, &topic_id, 
topics::helpers::get_topic_id());
+
+        // Validate permissions for given user on stream and topic.
+        self.permissioner
+            .borrow()
+            .append_messages(
+                session.get_user_id(),
+                numeric_stream_id as u32,
+                numeric_topic_id as u32,
+            )
+            .with_error_context(|error| {
+                format!("{COMPONENT} (error: {error}) - permission denied to 
append messages for user {} on stream ID: {}, topic ID: {}", 
session.get_user_id(), numeric_stream_id as u32, numeric_topic_id as u32)
+            })?;
+
+        let batches = self.streams2.with_partition_by_id_mut(
+            &stream_id,
+            &topic_id,
+            partition_id,
+            |(.., log)| log.journal_mut().commit(),
+        );
+
+        self.streams2
+            .persist_messages_to_disk(
+                self.id,
+                &stream_id,
+                &topic_id,
+                partition_id,
+                batches,
+                &self.config.system,
+            )
+            .await?;
+        Ok(())
     }
 
     async fn decrypt_messages(
diff --git a/core/server/src/slab/partitions.rs 
b/core/server/src/slab/partitions.rs
index b3d3fda9..f430d1ab 100644
--- a/core/server/src/slab/partitions.rs
+++ b/core/server/src/slab/partitions.rs
@@ -164,7 +164,6 @@ impl EntityComponentSystem<Borrow> for Partitions {
     {
         f(self.into())
     }
-
 }
 
 impl EntityComponentSystemMut for Partitions {
@@ -220,5 +219,4 @@ impl Partitions {
     ) -> T {
         self.with_components_by_id_mut(id, |components| f(components))
     }
-
 }
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index decc6b63..6167987c 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -1,8 +1,6 @@
 use crate::shard::task_registry::TaskRegistry;
 use crate::shard_trace;
 use crate::streaming::partitions as streaming_partitions;
-use crate::streaming::segments::IggyIndexesMut;
-use crate::streaming::segments::storage::Storage;
 use crate::{
     binary::handlers::messages::poll_messages_handler::IggyPollMetadata,
     configs::{cache_indexes::CacheIndexesConfig, system::SystemConfig},
@@ -1235,9 +1233,24 @@ impl Streams {
             reason
         );
 
+        let batch_count = self
+            .persist_messages_to_disk(shard_id, stream_id, topic_id, 
partition_id, batches, config)
+            .await?;
+
+        Ok(batch_count)
+    }
+
+    pub async fn persist_messages_to_disk(
+        &self,
+        shard_id: u16,
+        stream_id: &Identifier,
+        topic_id: &Identifier,
+        partition_id: usize,
+        batches: IggyMessagesBatchSet,
+        config: &SystemConfig,
+    ) -> Result<u32, IggyError> {
         let batch_count = batches.count();
         let batch_size = batches.size();
-
         // Extract storage before async operations
         let (messages_writer, index_writer) =
             self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {
diff --git a/core/server/src/streaming/topics/topic2.rs 
b/core/server/src/streaming/topics/topic2.rs
index 64d4fc69..e46a5c3b 100644
--- a/core/server/src/streaming/topics/topic2.rs
+++ b/core/server/src/streaming/topics/topic2.rs
@@ -260,10 +260,6 @@ impl TopicRoot {
         f(self)
     }
 
-    pub async fn invoke_async<T>(&self, f: impl AsyncFnOnce(&Self) -> T) -> T {
-        f(self).await
-    }
-
     pub fn update_id(&mut self, id: usize) {
         self.id = id;
     }

Reply via email to