This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch io_uring_tpc_task_registry in repository https://gitbox.apache.org/repos/asf/iggy.git
commit b6fcb77d15481a4d1aadc67615d9d99af55f5b9b Author: Hubert Gruszecki <[email protected]> AuthorDate: Wed Sep 24 16:30:33 2025 +0200 oneshot fsync --- core/server/src/http/http_server.rs | 12 +++- core/server/src/log/logger.rs | 4 +- core/server/src/log/runtime.rs | 18 ++++-- core/server/src/shard/mod.rs | 15 ++++- core/server/src/shard/stats.rs | 1 - core/server/src/shard/system/messages.rs | 4 +- core/server/src/shard/tasks/mod.rs | 3 +- core/server/src/shard/tasks/oneshot/fsync.rs | 87 ---------------------------- core/server/src/shard/tasks/oneshot/mod.rs | 23 -------- core/server/src/shard/tasks/supervisor.rs | 46 ++++++++++++++- core/server/src/shard/tasks/tls.rs | 5 -- core/server/src/slab/streams.rs | 39 +++++++++---- core/server/src/streaming/storage.rs | 2 +- 13 files changed, 110 insertions(+), 149 deletions(-) diff --git a/core/server/src/http/http_server.rs b/core/server/src/http/http_server.rs index 2ae46271..8277a77d 100644 --- a/core/server/src/http/http_server.rs +++ b/core/server/src/http/http_server.rs @@ -66,7 +66,11 @@ impl<'a> Connected<cyper_axum::IncomingStream<'a, TcpListener>> for CompioSocket /// Starts the HTTP API server. /// Returns the address the server is listening on. -pub async fn start(config: HttpConfig, persister: Arc<PersisterKind>, shard: Rc<IggyShard>) -> Result<(), IggyError> { +pub async fn start( + config: HttpConfig, + persister: Arc<PersisterKind>, + shard: Rc<IggyShard>, +) -> Result<(), IggyError> { if shard.id != 0 { info!( "HTTP server disabled for shard {} (only runs on shard 0)", @@ -158,7 +162,11 @@ pub async fn start(config: HttpConfig, persister: Arc<PersisterKind>, shard: Rc< } } -async fn build_app_state(config: &HttpConfig, persister: Arc<PersisterKind>, shard: Rc<IggyShard>) -> Arc<AppState> { +async fn build_app_state( + config: &HttpConfig, + persister: Arc<PersisterKind>, + shard: Rc<IggyShard>, +) -> Arc<AppState> { let tokens_path; { tokens_path = shard.config.system.get_state_tokens_path(); diff --git a/core/server/src/log/logger.rs b/core/server/src/log/logger.rs index 59725685..b400233a 100644 --- a/core/server/src/log/logger.rs +++ b/core/server/src/log/logger.rs @@ -16,10 +16,10 @@ * under the License. */ -use crate::log::runtime::CompioRuntime; use crate::VERSION; use crate::configs::server::{TelemetryConfig, TelemetryTransport}; use crate::configs::system::LoggingConfig; +use crate::log::runtime::CompioRuntime; use crate::server_error::LogError; use opentelemetry::KeyValue; use opentelemetry::global; @@ -250,7 +250,7 @@ impl Logging { .with_span_processor( span_processor_with_async_runtime::BatchSpanProcessor::builder( trace_exporter, - CompioRuntime + CompioRuntime, ) .build(), ) diff --git a/core/server/src/log/runtime.rs b/core/server/src/log/runtime.rs index 1683b34a..95f77585 100644 --- a/core/server/src/log/runtime.rs +++ b/core/server/src/log/runtime.rs @@ -1,6 +1,6 @@ use std::{pin::Pin, task::Poll, time::Duration}; -use futures::{channel::mpsc, future::poll_fn, FutureExt, SinkExt, Stream, StreamExt}; +use futures::{FutureExt, SinkExt, Stream, StreamExt, channel::mpsc, future::poll_fn}; use opentelemetry_sdk::runtime::{Runtime, RuntimeChannel, TrySend}; #[derive(Clone)] @@ -34,17 +34,20 @@ impl<T> CompioSender<T> { pub fn new(sender: mpsc::UnboundedSender<T>) -> Self { Self { sender } } -} +} -// Safety: Since we use compio runtime which is single-threaded, or rather the Future: !Send + !Sync, +// Safety: Since we use compio runtime which is single-threaded, or rather the Future: !Send + !Sync, // we can implement those traits, to satisfy the trait bounds from `Runtime` and `RuntimeChannel` traits. unsafe impl<T> Send for CompioSender<T> {} unsafe impl<T> Sync for CompioSender<T> {} -impl<T: std::fmt::Debug + Send> TrySend for CompioSender<T> { +impl<T: std::fmt::Debug + Send> TrySend for CompioSender<T> { type Message = T; - fn try_send(&self, item: Self::Message) -> Result<(), opentelemetry_sdk::runtime::TrySendError> { + fn try_send( + &self, + item: Self::Message, + ) -> Result<(), opentelemetry_sdk::runtime::TrySendError> { self.sender.unbounded_send(item).map_err(|_err| { // Unbounded channels can only fail if disconnected, never full opentelemetry_sdk::runtime::TrySendError::ChannelClosed @@ -65,7 +68,10 @@ impl<T> CompioReceiver<T> { impl<T: std::fmt::Debug + Send> Stream for CompioReceiver<T> { type Item = T; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> { + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Option<Self::Item>> { self.receiver.poll_next_unpin(cx) } } diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index 7b8b8b25..7cb6c6f7 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -19,7 +19,6 @@ inner() * or more contributor license agreements. See the NOTICE file pub mod builder; pub mod logging; pub mod namespace; -pub mod stats; pub mod system; pub mod tasks; pub mod transmission; @@ -72,10 +71,20 @@ use crate::{ traits_ext::{EntityComponentSystem, EntityMarker, Insert}, }, state::{ - file::FileState, system::{StreamState, SystemState, UserState}, StateKind + StateKind, + file::FileState, + system::{StreamState, SystemState, UserState}, }, streaming::{ - clients::client_manager::ClientManager, diagnostics::metrics::Metrics, partitions, persistence::persister::PersisterKind, polling_consumer::PollingConsumer, session::Session, traits::MainOps, users::{permissioner::Permissioner, user::User}, utils::ptr::EternalPtr + clients::client_manager::ClientManager, + diagnostics::metrics::Metrics, + partitions, + persistence::persister::PersisterKind, + polling_consumer::PollingConsumer, + session::Session, + traits::MainOps, + users::{permissioner::Permissioner, user::User}, + utils::ptr::EternalPtr, }, versioning::SemanticVersion, }; diff --git a/core/server/src/shard/stats.rs b/core/server/src/shard/stats.rs deleted file mode 100644 index 8b137891..00000000 --- a/core/server/src/shard/stats.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/core/server/src/shard/system/messages.rs b/core/server/src/shard/system/messages.rs index a64602d0..08237d91 100644 --- a/core/server/src/shard/system/messages.rs +++ b/core/server/src/shard/system/messages.rs @@ -36,7 +36,8 @@ use crate::streaming::{partitions, streams, topics}; use error_set::ErrContext; use iggy_common::{ - BytesSerializable, Consumer, EncryptorKind, Identifier, IggyError, IggyTimestamp, Partitioning, PartitioningKind, PollingKind, PollingStrategy, IGGY_MESSAGE_HEADER_SIZE + BytesSerializable, Consumer, EncryptorKind, IGGY_MESSAGE_HEADER_SIZE, Identifier, IggyError, + IggyTimestamp, Partitioning, PartitioningKind, PollingKind, PollingStrategy, }; use tracing::{error, trace}; @@ -352,7 +353,6 @@ impl IggyShard { todo!(); } - async fn decrypt_messages( &self, batches: IggyMessagesBatchSet, diff --git a/core/server/src/shard/tasks/mod.rs b/core/server/src/shard/tasks/mod.rs index f6fc2c8c..640d6dc2 100644 --- a/core/server/src/shard/tasks/mod.rs +++ b/core/server/src/shard/tasks/mod.rs @@ -22,7 +22,6 @@ //! within a shard, including servers, periodic maintenance, and one-shot operations. pub mod continuous; -pub mod oneshot; pub mod periodic; pub mod shutdown; pub mod specs; @@ -32,7 +31,7 @@ pub mod tls; pub use shutdown::{Shutdown, ShutdownToken}; pub use specs::{TaskCtx, TaskFuture, TaskKind, TaskScope, TaskSpec}; pub use supervisor::TaskSupervisor; -pub use tls::{init_supervisor, task_supervisor}; +pub use tls::{init_supervisor, is_supervisor_initialized, task_supervisor}; use crate::shard::IggyShard; use specs::IsNoOp; diff --git a/core/server/src/shard/tasks/oneshot/fsync.rs b/core/server/src/shard/tasks/oneshot/fsync.rs deleted file mode 100644 index d03c1ad6..00000000 --- a/core/server/src/shard/tasks/oneshot/fsync.rs +++ /dev/null @@ -1,87 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -use crate::shard::tasks::{TaskCtx, TaskFuture, TaskKind, TaskScope, TaskSpec, specs::OneShotSpec}; -use compio::fs::File; -use iggy_common::IggyError; -use std::{path::PathBuf, time::Duration}; -use tracing::{debug, error, trace}; - -/// One-shot task for performing fsync on critical files -#[derive(Debug)] -pub struct FsyncTask { - path: PathBuf, - description: String, -} - -impl FsyncTask { - pub fn new(path: PathBuf, description: String) -> Self { - Self { path, description } - } -} - -impl TaskSpec for FsyncTask { - fn name(&self) -> &'static str { - "fsync" - } - - fn kind(&self) -> TaskKind { - TaskKind::OneShot - } - - fn scope(&self) -> TaskScope { - TaskScope::AllShards - } - - fn is_critical(&self) -> bool { - true - } - - fn run(self: Box<Self>, _ctx: TaskCtx) -> TaskFuture { - Box::pin(async move { - trace!( - "Performing fsync on {} - {}", - self.path.display(), - self.description - ); - - match File::open(&self.path).await { - Ok(file) => match file.sync_all().await { - Ok(_) => { - debug!("Successfully synced {} to disk", self.path.display()); - Ok(()) - } - Err(e) => { - error!("Failed to fsync {}: {}", self.path.display(), e); - Err(IggyError::CannotSyncFile) - } - }, - Err(e) => { - error!("Failed to open {} for fsync: {}", self.path.display(), e); - Err(IggyError::CannotReadFile) - } - } - }) - } -} - -impl OneShotSpec for FsyncTask { - fn timeout(&self) -> Option<Duration> { - Some(Duration::from_secs(30)) // 30 second timeout for fsync - } -} diff --git a/core/server/src/shard/tasks/oneshot/mod.rs b/core/server/src/shard/tasks/oneshot/mod.rs deleted file mode 100644 index a07e8582..00000000 --- a/core/server/src/shard/tasks/oneshot/mod.rs +++ /dev/null @@ -1,23 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -//! One-shot task specifications for durability and administrative operations - -pub mod fsync; - -pub use fsync::FsyncTask; diff --git a/core/server/src/shard/tasks/supervisor.rs b/core/server/src/shard/tasks/supervisor.rs index 72c5b43a..5e4bf690 100644 --- a/core/server/src/shard/tasks/supervisor.rs +++ b/core/server/src/shard/tasks/supervisor.rs @@ -123,7 +123,7 @@ impl TaskSupervisor { let handle = match kind { TaskKind::Continuous => self.spawn_continuous(spec, ctx), TaskKind::Periodic { period } => self.spawn_periodic(spec, ctx, period), - TaskKind::OneShot => self.spawn_oneshot(spec, ctx), + TaskKind::OneShot => self.spawn_oneshot_spec(spec, ctx), }; let task_handle = TaskHandle { @@ -196,8 +196,8 @@ impl TaskSupervisor { }) } - /// Spawn a oneshot task - fn spawn_oneshot( + /// Spawn a oneshot task from a TaskSpec + fn spawn_oneshot_spec( &self, spec: Box<dyn TaskSpec>, ctx: TaskCtx, @@ -390,6 +390,46 @@ impl TaskSupervisor { }); } + /// Spawn a oneshot task directly without going through TaskSpec + pub fn spawn_oneshot<F>(&self, name: impl Into<String>, critical: bool, f: F) + where + F: Future<Output = Result<(), IggyError>> + 'static, + { + let name = name.into(); + let shard_id = self.shard_id; + + trace!("Spawning oneshot task '{}' on shard {}", name, shard_id); + + let task_name = name.clone(); + let handle = compio::runtime::spawn(async move { + trace!( + "OneShot task '{}' starting on shard {}", + task_name, shard_id + ); + let result = f.await; + + match &result { + Ok(()) => trace!( + "OneShot task '{}' completed on shard {}", + task_name, shard_id + ), + Err(e) => error!( + "OneShot task '{}' failed on shard {}: {}", + task_name, shard_id, e + ), + } + + result + }); + + self.oneshot_handles.borrow_mut().push(TaskHandle { + name, + kind: TaskKind::OneShot, + handle, + is_critical: critical, + }); + } + /// Shutdown all connections gracefully fn shutdown_connections(&self) { info!( diff --git a/core/server/src/shard/tasks/tls.rs b/core/server/src/shard/tasks/tls.rs index 729f7027..1370c5e6 100644 --- a/core/server/src/shard/tasks/tls.rs +++ b/core/server/src/shard/tasks/tls.rs @@ -83,11 +83,6 @@ mod tests { // Now it should be initialized assert!(is_supervisor_initialized()); - // Should be able to get supervisor without panic - let supervisor = task_supervisor(); - // Note: We can't directly check the shard_id since it's private, - // but we can verify the supervisor exists - // Clean up clear_supervisor(); assert!(!is_supervisor_initialized()); diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs index 690b8847..8ac12f6f 100644 --- a/core/server/src/slab/streams.rs +++ b/core/server/src/slab/streams.rs @@ -735,18 +735,33 @@ impl Streams { (msg.unwrap(), index.unwrap()) }); - // TODO(hubcio): These fsync operations should use TaskSupervisor::spawn_oneshot with FsyncTask - // for proper tracking and graceful shutdown. However, this requires passing the shard - // reference through multiple layers. For now, we spawn directly and hope for the best. YOLO. - compio::runtime::spawn(async move { - let _ = log_writer.fsync().await; - }) - .detach(); - compio::runtime::spawn(async move { - let _ = index_writer.fsync().await; - drop(index_writer) - }) - .detach(); + // Use task supervisor for proper tracking and graceful shutdown + use crate::shard::tasks::tls::task_supervisor; + use tracing::error; + + task_supervisor().spawn_oneshot("fsync:segment-close-messages", true, async move { + match log_writer.fsync().await { + Ok(_) => Ok(()), + Err(e) => { + error!("Failed to fsync log writer on segment close: {}", e); + Err(e) + } + } + }); + + task_supervisor().spawn_oneshot("fsync:segment-close-index", true, async move { + match index_writer.fsync().await { + Ok(_) => { + drop(index_writer); + Ok(()) + } + Err(e) => { + error!("Failed to fsync index writer on segment close: {}", e); + drop(index_writer); + Err(e) + } + } + }); let (start_offset, size, end_offset) = self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., log)| { diff --git a/core/server/src/streaming/storage.rs b/core/server/src/streaming/storage.rs index 03199d70..d64845a2 100644 --- a/core/server/src/streaming/storage.rs +++ b/core/server/src/streaming/storage.rs @@ -47,7 +47,7 @@ macro_rules! forward_async_methods { } } -// TODO: Tech debt, how to get rid of this ? +// TODO: Tech debt, how to get rid of this ? #[derive(Debug)] pub enum SystemInfoStorageKind { File(FileSystemInfoStorage),
