This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch io_uring_tpc_task_registry
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit b6fcb77d15481a4d1aadc67615d9d99af55f5b9b
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Wed Sep 24 16:30:33 2025 +0200

    oneshot fsync
---
 core/server/src/http/http_server.rs          | 12 +++-
 core/server/src/log/logger.rs                |  4 +-
 core/server/src/log/runtime.rs               | 18 ++++--
 core/server/src/shard/mod.rs                 | 15 ++++-
 core/server/src/shard/stats.rs               |  1 -
 core/server/src/shard/system/messages.rs     |  4 +-
 core/server/src/shard/tasks/mod.rs           |  3 +-
 core/server/src/shard/tasks/oneshot/fsync.rs | 87 ----------------------------
 core/server/src/shard/tasks/oneshot/mod.rs   | 23 --------
 core/server/src/shard/tasks/supervisor.rs    | 46 ++++++++++++++-
 core/server/src/shard/tasks/tls.rs           |  5 --
 core/server/src/slab/streams.rs              | 39 +++++++++----
 core/server/src/streaming/storage.rs         |  2 +-
 13 files changed, 110 insertions(+), 149 deletions(-)

diff --git a/core/server/src/http/http_server.rs 
b/core/server/src/http/http_server.rs
index 2ae46271..8277a77d 100644
--- a/core/server/src/http/http_server.rs
+++ b/core/server/src/http/http_server.rs
@@ -66,7 +66,11 @@ impl<'a> Connected<cyper_axum::IncomingStream<'a, 
TcpListener>> for CompioSocket
 
 /// Starts the HTTP API server.
 /// Returns the address the server is listening on.
-pub async fn start(config: HttpConfig, persister: Arc<PersisterKind>, shard: 
Rc<IggyShard>) -> Result<(), IggyError> {
+pub async fn start(
+    config: HttpConfig,
+    persister: Arc<PersisterKind>,
+    shard: Rc<IggyShard>,
+) -> Result<(), IggyError> {
     if shard.id != 0 {
         info!(
             "HTTP server disabled for shard {} (only runs on shard 0)",
@@ -158,7 +162,11 @@ pub async fn start(config: HttpConfig, persister: 
Arc<PersisterKind>, shard: Rc<
     }
 }
 
-async fn build_app_state(config: &HttpConfig, persister: Arc<PersisterKind>,  
shard: Rc<IggyShard>) -> Arc<AppState> {
+async fn build_app_state(
+    config: &HttpConfig,
+    persister: Arc<PersisterKind>,
+    shard: Rc<IggyShard>,
+) -> Arc<AppState> {
     let tokens_path;
     {
         tokens_path = shard.config.system.get_state_tokens_path();
diff --git a/core/server/src/log/logger.rs b/core/server/src/log/logger.rs
index 59725685..b400233a 100644
--- a/core/server/src/log/logger.rs
+++ b/core/server/src/log/logger.rs
@@ -16,10 +16,10 @@
  * under the License.
  */
 
-use crate::log::runtime::CompioRuntime;
 use crate::VERSION;
 use crate::configs::server::{TelemetryConfig, TelemetryTransport};
 use crate::configs::system::LoggingConfig;
+use crate::log::runtime::CompioRuntime;
 use crate::server_error::LogError;
 use opentelemetry::KeyValue;
 use opentelemetry::global;
@@ -250,7 +250,7 @@ impl Logging {
                     .with_span_processor(
                         
span_processor_with_async_runtime::BatchSpanProcessor::builder(
                             trace_exporter,
-                            CompioRuntime
+                            CompioRuntime,
                         )
                         .build(),
                     )
diff --git a/core/server/src/log/runtime.rs b/core/server/src/log/runtime.rs
index 1683b34a..95f77585 100644
--- a/core/server/src/log/runtime.rs
+++ b/core/server/src/log/runtime.rs
@@ -1,6 +1,6 @@
 use std::{pin::Pin, task::Poll, time::Duration};
 
-use futures::{channel::mpsc, future::poll_fn, FutureExt, SinkExt, Stream, 
StreamExt};
+use futures::{FutureExt, SinkExt, Stream, StreamExt, channel::mpsc, 
future::poll_fn};
 use opentelemetry_sdk::runtime::{Runtime, RuntimeChannel, TrySend};
 
 #[derive(Clone)]
@@ -34,17 +34,20 @@ impl<T> CompioSender<T> {
     pub fn new(sender: mpsc::UnboundedSender<T>) -> Self {
         Self { sender }
     }
-}   
+}
 
-// Safety: Since we use compio runtime which is single-threaded, or rather the 
Future: !Send + !Sync, 
+// Safety: Since we use compio runtime which is single-threaded, or rather the 
Future: !Send + !Sync,
 // we can implement those traits, to satisfy the trait bounds from `Runtime` 
and `RuntimeChannel` traits.
 unsafe impl<T> Send for CompioSender<T> {}
 unsafe impl<T> Sync for CompioSender<T> {}
 
-impl<T: std::fmt::Debug + Send> TrySend for CompioSender<T>  {
+impl<T: std::fmt::Debug + Send> TrySend for CompioSender<T> {
     type Message = T;
 
-    fn try_send(&self, item: Self::Message) -> Result<(), 
opentelemetry_sdk::runtime::TrySendError> {
+    fn try_send(
+        &self,
+        item: Self::Message,
+    ) -> Result<(), opentelemetry_sdk::runtime::TrySendError> {
         self.sender.unbounded_send(item).map_err(|_err| {
             // Unbounded channels can only fail if disconnected, never full
             opentelemetry_sdk::runtime::TrySendError::ChannelClosed
@@ -65,7 +68,10 @@ impl<T> CompioReceiver<T> {
 impl<T: std::fmt::Debug + Send> Stream for CompioReceiver<T> {
     type Item = T;
 
-    fn poll_next(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> 
std::task::Poll<Option<Self::Item>> {
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<Self::Item>> {
         self.receiver.poll_next_unpin(cx)
     }
 }
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 7b8b8b25..7cb6c6f7 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -19,7 +19,6 @@ inner() * or more contributor license agreements.  See the 
NOTICE file
 pub mod builder;
 pub mod logging;
 pub mod namespace;
-pub mod stats;
 pub mod system;
 pub mod tasks;
 pub mod transmission;
@@ -72,10 +71,20 @@ use crate::{
         traits_ext::{EntityComponentSystem, EntityMarker, Insert},
     },
     state::{
-        file::FileState, system::{StreamState, SystemState, UserState}, 
StateKind
+        StateKind,
+        file::FileState,
+        system::{StreamState, SystemState, UserState},
     },
     streaming::{
-        clients::client_manager::ClientManager, diagnostics::metrics::Metrics, 
partitions, persistence::persister::PersisterKind, 
polling_consumer::PollingConsumer, session::Session, traits::MainOps, 
users::{permissioner::Permissioner, user::User}, utils::ptr::EternalPtr
+        clients::client_manager::ClientManager,
+        diagnostics::metrics::Metrics,
+        partitions,
+        persistence::persister::PersisterKind,
+        polling_consumer::PollingConsumer,
+        session::Session,
+        traits::MainOps,
+        users::{permissioner::Permissioner, user::User},
+        utils::ptr::EternalPtr,
     },
     versioning::SemanticVersion,
 };
diff --git a/core/server/src/shard/stats.rs b/core/server/src/shard/stats.rs
deleted file mode 100644
index 8b137891..00000000
--- a/core/server/src/shard/stats.rs
+++ /dev/null
@@ -1 +0,0 @@
-
diff --git a/core/server/src/shard/system/messages.rs 
b/core/server/src/shard/system/messages.rs
index a64602d0..08237d91 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -36,7 +36,8 @@ use crate::streaming::{partitions, streams, topics};
 use error_set::ErrContext;
 
 use iggy_common::{
-    BytesSerializable, Consumer, EncryptorKind, Identifier, IggyError, 
IggyTimestamp, Partitioning, PartitioningKind, PollingKind, PollingStrategy, 
IGGY_MESSAGE_HEADER_SIZE
+    BytesSerializable, Consumer, EncryptorKind, IGGY_MESSAGE_HEADER_SIZE, 
Identifier, IggyError,
+    IggyTimestamp, Partitioning, PartitioningKind, PollingKind, 
PollingStrategy,
 };
 use tracing::{error, trace};
 
@@ -352,7 +353,6 @@ impl IggyShard {
         todo!();
     }
 
-
     async fn decrypt_messages(
         &self,
         batches: IggyMessagesBatchSet,
diff --git a/core/server/src/shard/tasks/mod.rs 
b/core/server/src/shard/tasks/mod.rs
index f6fc2c8c..640d6dc2 100644
--- a/core/server/src/shard/tasks/mod.rs
+++ b/core/server/src/shard/tasks/mod.rs
@@ -22,7 +22,6 @@
 //! within a shard, including servers, periodic maintenance, and one-shot 
operations.
 
 pub mod continuous;
-pub mod oneshot;
 pub mod periodic;
 pub mod shutdown;
 pub mod specs;
@@ -32,7 +31,7 @@ pub mod tls;
 pub use shutdown::{Shutdown, ShutdownToken};
 pub use specs::{TaskCtx, TaskFuture, TaskKind, TaskScope, TaskSpec};
 pub use supervisor::TaskSupervisor;
-pub use tls::{init_supervisor, task_supervisor};
+pub use tls::{init_supervisor, is_supervisor_initialized, task_supervisor};
 
 use crate::shard::IggyShard;
 use specs::IsNoOp;
diff --git a/core/server/src/shard/tasks/oneshot/fsync.rs 
b/core/server/src/shard/tasks/oneshot/fsync.rs
deleted file mode 100644
index d03c1ad6..00000000
--- a/core/server/src/shard/tasks/oneshot/fsync.rs
+++ /dev/null
@@ -1,87 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use crate::shard::tasks::{TaskCtx, TaskFuture, TaskKind, TaskScope, TaskSpec, 
specs::OneShotSpec};
-use compio::fs::File;
-use iggy_common::IggyError;
-use std::{path::PathBuf, time::Duration};
-use tracing::{debug, error, trace};
-
-/// One-shot task for performing fsync on critical files
-#[derive(Debug)]
-pub struct FsyncTask {
-    path: PathBuf,
-    description: String,
-}
-
-impl FsyncTask {
-    pub fn new(path: PathBuf, description: String) -> Self {
-        Self { path, description }
-    }
-}
-
-impl TaskSpec for FsyncTask {
-    fn name(&self) -> &'static str {
-        "fsync"
-    }
-
-    fn kind(&self) -> TaskKind {
-        TaskKind::OneShot
-    }
-
-    fn scope(&self) -> TaskScope {
-        TaskScope::AllShards
-    }
-
-    fn is_critical(&self) -> bool {
-        true
-    }
-
-    fn run(self: Box<Self>, _ctx: TaskCtx) -> TaskFuture {
-        Box::pin(async move {
-            trace!(
-                "Performing fsync on {} - {}",
-                self.path.display(),
-                self.description
-            );
-
-            match File::open(&self.path).await {
-                Ok(file) => match file.sync_all().await {
-                    Ok(_) => {
-                        debug!("Successfully synced {} to disk", 
self.path.display());
-                        Ok(())
-                    }
-                    Err(e) => {
-                        error!("Failed to fsync {}: {}", self.path.display(), 
e);
-                        Err(IggyError::CannotSyncFile)
-                    }
-                },
-                Err(e) => {
-                    error!("Failed to open {} for fsync: {}", 
self.path.display(), e);
-                    Err(IggyError::CannotReadFile)
-                }
-            }
-        })
-    }
-}
-
-impl OneShotSpec for FsyncTask {
-    fn timeout(&self) -> Option<Duration> {
-        Some(Duration::from_secs(30)) // 30 second timeout for fsync
-    }
-}
diff --git a/core/server/src/shard/tasks/oneshot/mod.rs 
b/core/server/src/shard/tasks/oneshot/mod.rs
deleted file mode 100644
index a07e8582..00000000
--- a/core/server/src/shard/tasks/oneshot/mod.rs
+++ /dev/null
@@ -1,23 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-//! One-shot task specifications for durability and administrative operations
-
-pub mod fsync;
-
-pub use fsync::FsyncTask;
diff --git a/core/server/src/shard/tasks/supervisor.rs 
b/core/server/src/shard/tasks/supervisor.rs
index 72c5b43a..5e4bf690 100644
--- a/core/server/src/shard/tasks/supervisor.rs
+++ b/core/server/src/shard/tasks/supervisor.rs
@@ -123,7 +123,7 @@ impl TaskSupervisor {
         let handle = match kind {
             TaskKind::Continuous => self.spawn_continuous(spec, ctx),
             TaskKind::Periodic { period } => self.spawn_periodic(spec, ctx, 
period),
-            TaskKind::OneShot => self.spawn_oneshot(spec, ctx),
+            TaskKind::OneShot => self.spawn_oneshot_spec(spec, ctx),
         };
 
         let task_handle = TaskHandle {
@@ -196,8 +196,8 @@ impl TaskSupervisor {
         })
     }
 
-    /// Spawn a oneshot task
-    fn spawn_oneshot(
+    /// Spawn a oneshot task from a TaskSpec
+    fn spawn_oneshot_spec(
         &self,
         spec: Box<dyn TaskSpec>,
         ctx: TaskCtx,
@@ -390,6 +390,46 @@ impl TaskSupervisor {
         });
     }
 
+    /// Spawn a oneshot task directly without going through TaskSpec
+    pub fn spawn_oneshot<F>(&self, name: impl Into<String>, critical: bool, f: 
F)
+    where
+        F: Future<Output = Result<(), IggyError>> + 'static,
+    {
+        let name = name.into();
+        let shard_id = self.shard_id;
+
+        trace!("Spawning oneshot task '{}' on shard {}", name, shard_id);
+
+        let task_name = name.clone();
+        let handle = compio::runtime::spawn(async move {
+            trace!(
+                "OneShot task '{}' starting on shard {}",
+                task_name, shard_id
+            );
+            let result = f.await;
+
+            match &result {
+                Ok(()) => trace!(
+                    "OneShot task '{}' completed on shard {}",
+                    task_name, shard_id
+                ),
+                Err(e) => error!(
+                    "OneShot task '{}' failed on shard {}: {}",
+                    task_name, shard_id, e
+                ),
+            }
+
+            result
+        });
+
+        self.oneshot_handles.borrow_mut().push(TaskHandle {
+            name,
+            kind: TaskKind::OneShot,
+            handle,
+            is_critical: critical,
+        });
+    }
+
     /// Shutdown all connections gracefully
     fn shutdown_connections(&self) {
         info!(
diff --git a/core/server/src/shard/tasks/tls.rs 
b/core/server/src/shard/tasks/tls.rs
index 729f7027..1370c5e6 100644
--- a/core/server/src/shard/tasks/tls.rs
+++ b/core/server/src/shard/tasks/tls.rs
@@ -83,11 +83,6 @@ mod tests {
         // Now it should be initialized
         assert!(is_supervisor_initialized());
 
-        // Should be able to get supervisor without panic
-        let supervisor = task_supervisor();
-        // Note: We can't directly check the shard_id since it's private,
-        // but we can verify the supervisor exists
-
         // Clean up
         clear_supervisor();
         assert!(!is_supervisor_initialized());
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index 690b8847..8ac12f6f 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -735,18 +735,33 @@ impl Streams {
                 (msg.unwrap(), index.unwrap())
             });
 
-        // TODO(hubcio): These fsync operations should use 
TaskSupervisor::spawn_oneshot with FsyncTask
-        // for proper tracking and graceful shutdown. However, this requires 
passing the shard
-        // reference through multiple layers. For now, we spawn directly and 
hope for the best. YOLO.
-        compio::runtime::spawn(async move {
-            let _ = log_writer.fsync().await;
-        })
-        .detach();
-        compio::runtime::spawn(async move {
-            let _ = index_writer.fsync().await;
-            drop(index_writer)
-        })
-        .detach();
+        // Use task supervisor for proper tracking and graceful shutdown
+        use crate::shard::tasks::tls::task_supervisor;
+        use tracing::error;
+
+        task_supervisor().spawn_oneshot("fsync:segment-close-messages", true, 
async move {
+            match log_writer.fsync().await {
+                Ok(_) => Ok(()),
+                Err(e) => {
+                    error!("Failed to fsync log writer on segment close: {}", 
e);
+                    Err(e)
+                }
+            }
+        });
+
+        task_supervisor().spawn_oneshot("fsync:segment-close-index", true, 
async move {
+            match index_writer.fsync().await {
+                Ok(_) => {
+                    drop(index_writer);
+                    Ok(())
+                }
+                Err(e) => {
+                    error!("Failed to fsync index writer on segment close: 
{}", e);
+                    drop(index_writer);
+                    Err(e)
+                }
+            }
+        });
 
         let (start_offset, size, end_offset) =
             self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {
diff --git a/core/server/src/streaming/storage.rs 
b/core/server/src/streaming/storage.rs
index 03199d70..d64845a2 100644
--- a/core/server/src/streaming/storage.rs
+++ b/core/server/src/streaming/storage.rs
@@ -47,7 +47,7 @@ macro_rules! forward_async_methods {
     }
 }
 
-// TODO: Tech debt, how to get rid of this ? 
+// TODO: Tech debt, how to get rid of this ?
 #[derive(Debug)]
 pub enum SystemInfoStorageKind {
     File(FileSystemInfoStorage),

Reply via email to