This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new 9bca8d43 feat(io_uring): fix flush_unsaved_buffer command (#2233)
9bca8d43 is described below
commit 9bca8d436bcbaeb96083ef5f4d21882d5e07ca62
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Fri Oct 3 10:24:47 2025 +0200
feat(io_uring): fix flush_unsaved_buffer command (#2233)
---
.../messages/flush_unsaved_buffer_handler.rs | 12 ++-
core/server/src/bootstrap.rs | 32 +-------
core/server/src/http/messages.rs | 5 +-
core/server/src/main.rs | 86 ++--------------------
core/server/src/shard/mod.rs | 10 +++
core/server/src/shard/system/messages.rs | 69 +++++++++++++++--
core/server/src/shard/transmission/event.rs | 6 ++
core/server/src/slab/partitions.rs | 2 -
core/server/src/slab/streams.rs | 19 ++++-
core/server/src/streaming/topics/topic2.rs | 4 -
10 files changed, 121 insertions(+), 124 deletions(-)
diff --git
a/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs
b/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs
index 45bc9db6..a799cfc4 100644
--- a/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs
+++ b/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs
@@ -20,6 +20,7 @@ use crate::binary::command::{BinaryServerCommand,
ServerCommand, ServerCommandHa
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::messages::COMPONENT, sender::SenderKind};
use crate::shard::IggyShard;
+use crate::shard::transmission::event::ShardEvent;
use crate::streaming::session::Session;
use anyhow::Result;
use error_set::ErrContext;
@@ -44,10 +45,10 @@ impl ServerCommandHandler for FlushUnsavedBuffer {
let stream_id = self.stream_id.clone();
let topic_id = self.topic_id.clone();
- let partition_id = self.partition_id;
+ let partition_id = self.partition_id as usize;
let fsync = self.fsync;
shard
- .flush_unsaved_buffer(session, stream_id, topic_id, partition_id,
fsync)
+ .flush_unsaved_buffer(session, &stream_id, &topic_id,
partition_id, fsync)
.await
.with_error_context(|error| {
format!(
@@ -55,6 +56,13 @@ impl ServerCommandHandler for FlushUnsavedBuffer {
self.stream_id, self.topic_id, self.partition_id, session
)
})?;
+ let event = ShardEvent::FlushUnsavedBuffer {
+ stream_id,
+ topic_id,
+ partition_id,
+ fsync,
+ };
+ let _responses = shard.broadcast_event_to_all_shards(event).await;
sender.send_empty_ok_response().await?;
Ok(())
}
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index fc38b069..8e6c06b3 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -144,7 +144,6 @@ pub async fn load_streams(
parent_stats.clone(),
)
.await?;
- // Insert partition into the container
streams.with_components_by_id(stream_id as usize, |(root, ..)|
{
root.topics()
.with_components_by_id_mut(topic_id, |(mut root, ..)| {
@@ -335,13 +334,12 @@ pub fn create_shard_executor(cpu_set: HashSet<usize>) ->
Runtime {
// TODO: The event interval tick, could be configured based on the fact
// How many clients we expect to have connected.
// This roughly estimates the number of tasks we will create.
-
let mut proactor = compio::driver::ProactorBuilder::new();
proactor
.capacity(4096)
.coop_taskrun(true)
- .taskrun_flag(true); // TODO: Try enabling this.
+ .taskrun_flag(true);
// FIXME(hubcio): Only set thread_pool_limit(0) on non-macOS platforms
// This causes a freeze on macOS with compio fs operations
@@ -403,7 +401,6 @@ pub async fn load_segments(
partition_path: String,
stats: Arc<PartitionStats>,
) -> Result<SegmentedLog<MemoryMessageJournal>, IggyError> {
- // Read directory entries to find log files using async fs_utils
let mut log_files = collect_log_files(&partition_path).await?;
log_files.sort_by(|a, b| a.path.file_name().cmp(&b.path.file_name()));
let mut log = SegmentedLog::<MemoryMessageJournal>::default();
@@ -417,12 +414,10 @@ pub async fn load_segments(
let start_offset = log_file_name.parse::<u64>().unwrap();
- // Build file paths directly
let messages_file_path = format!("{}/{}.{}", partition_path,
log_file_name, LOG_EXTENSION);
let index_file_path = format!("{}/{}.{}", partition_path,
log_file_name, INDEX_EXTENSION);
let time_index_path = index_file_path.replace(INDEX_EXTENSION,
"timeindex");
- // Check if index files exist
async fn try_exists(path: &str) -> Result<bool, std::io::Error> {
match compio::fs::metadata(path).await {
Ok(_) => Ok(true),
@@ -440,7 +435,6 @@ pub async fn load_segments(
CacheIndexesConfig::All | CacheIndexesConfig::OpenSegment
);
- // Rebuild indexes if index cache is enabled and index at path does
not exist
if index_cache_enabled && (!index_path_exists ||
time_index_path_exists) {
warn!(
"Index at path {} does not exist, rebuilding it based on
{}...",
@@ -469,7 +463,6 @@ pub async fn load_segments(
compio::fs::remove_file(&time_index_path).await.unwrap();
}
- // Get file metadata to determine segment properties
let messages_metadata = compio::fs::metadata(&messages_file_path)
.await
.map_err(|_| IggyError::CannotReadPartitions)?;
@@ -480,7 +473,6 @@ pub async fn load_segments(
Err(_) => 0, // Default to 0 if index file doesn't exist
};
- // Create storage for the segment using existing files
let storage = Storage::new(
&messages_file_path,
&index_file_path,
@@ -488,12 +480,10 @@ pub async fn load_segments(
index_size as u64,
config.partition.enforce_fsync,
config.partition.enforce_fsync,
- true, // file_exists = true for existing segments
+ true,
)
.await?;
- // Load indexes from disk to calculate the correct end offset and
cache them if needed
- // This matches the logic in Segment::load_from_disk method
let loaded_indexes = {
storage.
index_reader
@@ -505,7 +495,6 @@ pub async fn load_segments(
.map_err(|_| IggyError::CannotReadFile)?
};
- // Calculate end offset based on loaded indexes
let end_offset = if loaded_indexes.count() == 0 {
0
} else {
@@ -522,14 +511,12 @@ pub async fn load_segments(
)
};
- // Create the new Segment with proper values from file system
let mut segment = Segment2::new(
start_offset,
config.segment.size,
config.segment.message_expiry,
);
- // Set properties based on file data
segment.start_timestamp = start_timestamp;
segment.end_timestamp = end_timestamp;
segment.end_offset = end_offset;
@@ -585,20 +572,15 @@ pub async fn load_segments(
}
}
- // Add segment to log
log.add_persisted_segment(segment, storage);
- // Increment stats for partition - this matches the behavior from
partition storage load method
stats.increment_segments_count(1);
- // Increment size and message counts based on the loaded segment data
stats.increment_size_bytes(messages_size as u64);
- // Calculate message count from segment data (end_offset -
start_offset + 1 if there are messages)
let messages_count = if end_offset > start_offset {
(end_offset - start_offset + 1) as u64
} else if messages_size > 0 {
- // Fallback: estimate based on loaded indexes count if available
loaded_indexes.count() as u64
} else {
0
@@ -608,21 +590,18 @@ pub async fn load_segments(
stats.increment_messages_count(messages_count);
}
- // Now handle index caching based on configuration
let should_cache_indexes = match config.segment.cache_indexes {
CacheIndexesConfig::All => true,
- CacheIndexesConfig::OpenSegment => false, // Will be handled after
all segments are loaded
+ CacheIndexesConfig::OpenSegment => false,
CacheIndexesConfig::None => false,
};
- // Set the loaded indexes if we should cache them
if should_cache_indexes {
let segment_index = log.segments().len() - 1;
log.set_segment_indexes(segment_index, loaded_indexes);
}
}
- // Handle OpenSegment cache configuration: only the last segment should
keep its indexes
if matches!(
config.segment.cache_indexes,
CacheIndexesConfig::OpenSegment
@@ -630,7 +609,6 @@ pub async fn load_segments(
{
let segments_count = log.segments().len();
if segments_count > 0 {
- // Use the IndexReader from the last segment's storage to load
indexes
let last_storage = log.storages().last().unwrap();
match last_storage.index_reader.as_ref() {
Some(index_reader) => {
@@ -655,11 +633,9 @@ async fn load_partition(
partition_state: crate::state::system::PartitionState,
parent_stats: Arc<TopicStats>,
) -> Result<partition2::Partition, IggyError> {
- use std::sync::atomic::AtomicU64;
let stats = Arc::new(PartitionStats::new(parent_stats));
let partition_id = partition_state.id as u32;
- // Load segments from disk to determine should_increment_offset and
current offset
let partition_path = config.get_partition_path(stream_id, topic_id,
partition_id as usize);
let log_files = collect_log_files(&partition_path).await?;
let should_increment_offset = !log_files.is_empty()
@@ -675,7 +651,6 @@ async fn load_partition(
let start_offset = log_file_name.parse::<u64>().unwrap();
- // Build file paths directly
let messages_file_path =
format!("{}/{}.{}", partition_path, start_offset,
LOG_EXTENSION);
std::fs::metadata(&messages_file_path).is_ok_and(|metadata|
metadata.len() > 0)
@@ -686,6 +661,7 @@ async fn load_partition(
"Loading partition with ID: {} for stream with ID: {} and topic with
ID: {}, for path: {} from disk...",
partition_id, stream_id, topic_id, partition_path
);
+
// Load consumer offsets
let message_deduplicator = create_message_deduplicator(config);
let consumer_offset_path =
diff --git a/core/server/src/http/messages.rs b/core/server/src/http/messages.rs
index 1acc4d02..61a5b95d 100644
--- a/core/server/src/http/messages.rs
+++ b/core/server/src/http/messages.rs
@@ -131,12 +131,13 @@ async fn flush_unsaved_buffer(
) -> Result<StatusCode, CustomError> {
let stream_id = Identifier::from_str_value(&stream_id)?;
let topic_id = Identifier::from_str_value(&topic_id)?;
+ let partition_id = partition_id as usize;
let session = Session::stateless(identity.user_id, identity.ip_address);
let flush_future =
SendWrapper::new(state.shard.shard().flush_unsaved_buffer(
&session,
- stream_id,
- topic_id,
+ &stream_id,
+ &topic_id,
partition_id,
fsync,
));
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 35a37fee..37f225e6 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -230,6 +230,11 @@ async fn main() -> Result<(), ServerError> {
}
}
+ #[cfg(feature = "disable-mimalloc")]
+ warn!("Using default system allocator because code was build with
`disable-mimalloc` feature");
+ #[cfg(not(feature = "disable-mimalloc"))]
+ info!("Using mimalloc allocator");
+
// DISCRETE STEP.
// Increment the metrics.
let metrics = Metrics::init();
@@ -343,7 +348,6 @@ async fn main() -> Result<(), ServerError> {
}
let shutdown_handles_for_signal = shutdown_handles.clone();
-
ctrlc::set_handler(move || {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
@@ -367,7 +371,9 @@ async fn main() -> Result<(), ServerError> {
info!("Iggy server is running. Press Ctrl+C or send SIGTERM to shutdown.");
for (idx, handle) in handles.into_iter().enumerate() {
- handle.join().expect("Failed to join shard thread");
+ handle
+ .join()
+ .expect(format!("Failed to join shard thread-{}", idx).as_str());
}
let shutdown_duration_msg = {
@@ -389,81 +395,5 @@ async fn main() -> Result<(), ServerError> {
shutdown_duration_msg
);
- /*
- #[cfg(feature = "disable-mimalloc")]
- warn!("Using default system allocator because code was build with
`disable-mimalloc` feature");
- #[cfg(not(feature = "disable-mimalloc"))]
- info!("Using mimalloc allocator");
-
- let system = SharedSystem::new(System::new(
- config.system.clone(),
- config.cluster.clone(),
- config.cluster.clone(),
- config.data_maintenance.clone(),
- config.personal_access_token.clone(),
- ));
- */
-
- /*
-
- let mut current_config = config.clone();
-
- if config.http.enabled {
- let http_addr = http_server::start(config.http, system.clone()).await;
- current_config.http.address = http_addr.to_string();
- }
-
- if config.quic.enabled {
- let quic_addr = quic_server::start(config.quic, system.clone());
- current_config.quic.address = quic_addr.to_string();
- }
-
- if config.tcp.enabled {
- let tcp_addr = tcp_server::start(config.tcp, system.clone()).await;
- current_config.tcp.address = tcp_addr.to_string();
- }
-
- let runtime_path = current_config.system.get_runtime_path();
- let current_config_path = format!("{runtime_path}/current_config.toml");
- let current_config_content =
- toml::to_string(¤t_config).expect("Cannot serialize
current_config");
- tokio::fs::write(current_config_path, current_config_content).await?;
-
- let elapsed_time = startup_timestamp.elapsed();
- info!(
- "Iggy server has started - overall startup took {} ms.",
- elapsed_time.as_millis()
- );
-
- #[cfg(unix)]
- tokio::select! {
- _ = ctrl_c.recv() => {
- info!("Received SIGINT. Shutting down Iggy server...");
- },
- _ = sigterm.recv() => {
- info!("Received SIGTERM. Shutting down Iggy server...");
- }
- }
-
- #[cfg(windows)]
- match tokio::signal::ctrl_c().await {
- Ok(()) => {
- info!("Received CTRL-C. Shutting down Iggy server...");
- }
- Err(err) => {
- eprintln!("Unable to listen for shutdown signal: {}", err);
- }
- }
-
- let shutdown_timestamp = Instant::now();
- let mut system = system.write().await;
- system.shutdown().await?;
- let elapsed_time = shutdown_timestamp.elapsed();
-
- info!(
- "Iggy server has shutdown successfully. Shutdown took {} ms.",
- elapsed_time.as_millis()
- );
- */
Ok(())
}
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 8a5b7d8b..24c80a2b 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -958,6 +958,16 @@ impl IggyShard {
.await?;
Ok(())
}
+ ShardEvent::FlushUnsavedBuffer {
+ stream_id,
+ topic_id,
+ partition_id,
+ ..
+ } => {
+ self.flush_unsaved_buffer_bypass_auth(&stream_id, &topic_id,
partition_id)
+ .await?;
+ Ok(())
+ }
}
}
diff --git a/core/server/src/shard/system/messages.rs
b/core/server/src/shard/system/messages.rs
index 211d4a54..3f8fd41d 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -25,6 +25,7 @@ use crate::shard::transmission::message::{
ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult,
};
use crate::shard_trace;
+use crate::streaming::partitions::journal::Journal;
use crate::streaming::polling_consumer::PollingConsumer;
use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut,
IggyMessagesBatchSet};
use crate::streaming::session::Session;
@@ -362,13 +363,71 @@ impl IggyShard {
pub async fn flush_unsaved_buffer(
&self,
session: &Session,
- stream_id: Identifier,
- topic_id: Identifier,
- partition_id: u32,
- fsync: bool,
+ stream_id: &Identifier,
+ topic_id: &Identifier,
+ partition_id: usize,
+ _fsync: bool,
) -> Result<(), IggyError> {
self.ensure_authenticated(session)?;
- todo!();
+ let numeric_stream_id = self
+ .streams2
+ .with_stream_by_id(&stream_id, streams::helpers::get_stream_id());
+
+ let numeric_topic_id =
+ self.streams2
+ .with_topic_by_id(&stream_id, &topic_id,
topics::helpers::get_topic_id());
+
+ // Validate permissions for given user on stream and topic.
+ self.permissioner
+ .borrow()
+ .append_messages(
+ session.get_user_id(),
+ numeric_stream_id as u32,
+ numeric_topic_id as u32,
+ )
+ .with_error_context(|error| {
+ format!("{COMPONENT} (error: {error}) - permission denied to
append messages for user {} on stream ID: {}, topic ID: {}",
session.get_user_id(), numeric_stream_id as u32, numeric_topic_id as u32)
+ })?;
+
+ self.flush_unsaved_buffer_base(stream_id, topic_id, partition_id)
+ .await?;
+ Ok(())
+ }
+
+ pub async fn flush_unsaved_buffer_bypass_auth(
+ &self,
+ stream_id: &Identifier,
+ topic_id: &Identifier,
+ partition_id: usize,
+ ) -> Result<(), IggyError> {
+ self.flush_unsaved_buffer_base(stream_id, topic_id, partition_id)
+ .await
+ }
+
+ async fn flush_unsaved_buffer_base(
+ &self,
+ stream_id: &Identifier,
+ topic_id: &Identifier,
+ partition_id: usize,
+ ) -> Result<(), IggyError> {
+ let batches = self.streams2.with_partition_by_id_mut(
+ &stream_id,
+ &topic_id,
+ partition_id,
+ |(.., log)| log.journal_mut().commit(),
+ );
+
+ self.streams2
+ .persist_messages_to_disk(
+ self.id,
+ &stream_id,
+ &topic_id,
+ partition_id,
+ batches,
+ &self.config.system,
+ )
+ .await?;
+ Ok(())
}
async fn decrypt_messages(
diff --git a/core/server/src/shard/transmission/event.rs
b/core/server/src/shard/transmission/event.rs
index afaa6da4..544c6871 100644
--- a/core/server/src/shard/transmission/event.rs
+++ b/core/server/src/shard/transmission/event.rs
@@ -16,6 +16,12 @@ use std::net::SocketAddr;
#[derive(Debug, Clone)]
pub enum ShardEvent {
+ FlushUnsavedBuffer {
+ stream_id: Identifier,
+ topic_id: Identifier,
+ partition_id: usize,
+ fsync: bool,
+ },
CreatedStream2 {
id: usize,
stream: stream2::Stream,
diff --git a/core/server/src/slab/partitions.rs
b/core/server/src/slab/partitions.rs
index b3d3fda9..f430d1ab 100644
--- a/core/server/src/slab/partitions.rs
+++ b/core/server/src/slab/partitions.rs
@@ -164,7 +164,6 @@ impl EntityComponentSystem<Borrow> for Partitions {
{
f(self.into())
}
-
}
impl EntityComponentSystemMut for Partitions {
@@ -220,5 +219,4 @@ impl Partitions {
) -> T {
self.with_components_by_id_mut(id, |components| f(components))
}
-
}
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index decc6b63..6167987c 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -1,8 +1,6 @@
use crate::shard::task_registry::TaskRegistry;
use crate::shard_trace;
use crate::streaming::partitions as streaming_partitions;
-use crate::streaming::segments::IggyIndexesMut;
-use crate::streaming::segments::storage::Storage;
use crate::{
binary::handlers::messages::poll_messages_handler::IggyPollMetadata,
configs::{cache_indexes::CacheIndexesConfig, system::SystemConfig},
@@ -1235,9 +1233,24 @@ impl Streams {
reason
);
+ let batch_count = self
+ .persist_messages_to_disk(shard_id, stream_id, topic_id,
partition_id, batches, config)
+ .await?;
+
+ Ok(batch_count)
+ }
+
+ pub async fn persist_messages_to_disk(
+ &self,
+ shard_id: u16,
+ stream_id: &Identifier,
+ topic_id: &Identifier,
+ partition_id: usize,
+ batches: IggyMessagesBatchSet,
+ config: &SystemConfig,
+ ) -> Result<u32, IggyError> {
let batch_count = batches.count();
let batch_size = batches.size();
-
// Extract storage before async operations
let (messages_writer, index_writer) =
self.with_partition_by_id(stream_id, topic_id, partition_id, |(..,
log)| {
diff --git a/core/server/src/streaming/topics/topic2.rs
b/core/server/src/streaming/topics/topic2.rs
index 64d4fc69..e46a5c3b 100644
--- a/core/server/src/streaming/topics/topic2.rs
+++ b/core/server/src/streaming/topics/topic2.rs
@@ -260,10 +260,6 @@ impl TopicRoot {
f(self)
}
- pub async fn invoke_async<T>(&self, f: impl AsyncFnOnce(&Self) -> T) -> T {
- f(self).await
- }
-
pub fn update_id(&mut self, id: usize) {
self.id = id;
}