This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch io_uring_tpc_task_registry
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 7a47a615b4dc398ed6d3cec60bcbce0cc09b1809
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Wed Sep 24 16:04:22 2025 +0200

    thread local storage
---
 core/server/src/http/jwt/cleaner.rs                |   1 +
 core/server/src/quic/listener.rs                   |  18 +--
 core/server/src/shard/builder.rs                   |   3 +-
 core/server/src/shard/mod.rs                       |  23 ++--
 core/server/src/shard/tasks/mod.rs                 |   2 +
 .../src/shard/tasks/periodic/verify_heartbeats.rs  |   1 -
 core/server/src/shard/tasks/shutdown.rs            |   3 +-
 core/server/src/shard/tasks/supervisor.rs          |   2 +-
 core/server/src/shard/tasks/tls.rs                 | 146 +++++++++++++++++++++
 core/server/src/slab/streams.rs                    |   4 +-
 core/server/src/tcp/tcp_listener.rs                |   7 +-
 core/server/src/tcp/tcp_tls_listener.rs            |   7 +-
 12 files changed, 184 insertions(+), 33 deletions(-)

diff --git a/core/server/src/http/jwt/cleaner.rs 
b/core/server/src/http/jwt/cleaner.rs
index 6705d955..81d9e161 100644
--- a/core/server/src/http/jwt/cleaner.rs
+++ b/core/server/src/http/jwt/cleaner.rs
@@ -22,6 +22,7 @@ use std::{sync::Arc, time::Duration};
 use tracing::{error, trace};
 
 pub fn start_expired_tokens_cleaner(app_state: Arc<AppState>) {
+    // TODO(hubcio): make it a part of TaskSupervisor
     compio::runtime::spawn(async move {
         let mut interval_timer = 
compio::time::interval(Duration::from_secs(300));
         loop {
diff --git a/core/server/src/quic/listener.rs b/core/server/src/quic/listener.rs
index 247b4b73..60833e2e 100644
--- a/core/server/src/quic/listener.rs
+++ b/core/server/src/quic/listener.rs
@@ -22,6 +22,7 @@ use crate::binary::command::{ServerCommand, 
ServerCommandHandler};
 use crate::binary::sender::SenderKind;
 use crate::server_error::ConnectionError;
 use crate::shard::IggyShard;
+use crate::shard::tasks::task_supervisor;
 use crate::shard::transmission::event::ShardEvent;
 use crate::streaming::session::Session;
 use crate::{shard_debug, shard_info};
@@ -47,7 +48,7 @@ pub async fn start(endpoint: Endpoint, shard: Rc<IggyShard>) 
-> Result<(), IggyE
         let shard_for_conn = shard_clone.clone();
 
         // Use TaskSupervisor to track connection handlers for graceful 
shutdown
-        shard_clone.task_supervisor.spawn_tracked(async move {
+        task_supervisor().spawn_tracked(async move {
             trace!("Accepting connection from {}", remote_addr);
             match incoming_conn.await {
                 Ok(connection) => {
@@ -93,27 +94,27 @@ async fn handle_connection(
         address,
         transport: TransportProtocol::Quic,
     };
+
+    // TODO(hubcio): unused?
     let _responses = shard.broadcast_event_to_all_shards(event.into()).await;
 
-    // Add connection tracking for graceful shutdown
-    let conn_stop_receiver = shard.task_supervisor.add_connection(client_id);
+    let conn_stop_receiver = task_supervisor().add_connection(client_id);
 
-    // Handle streams until connection closes or shutdown is triggered
     loop {
-        // Check for shutdown signal
         futures::select! {
+            // Check for shutdown signal
             _ = conn_stop_receiver.recv().fuse() => {
                 info!("QUIC connection {} shutting down gracefully", 
client_id);
                 break;
             }
+            // Accept new connection
             stream_result = accept_stream(&connection, &shard, 
client_id).fuse() => {
                 match stream_result? {
                     Some(stream) => {
                         let shard_clone = shard.clone();
                         let session_clone = session.clone();
 
-                        // Use TaskSupervisor to track stream handlers
-                        shard.task_supervisor.spawn_tracked(async move {
+                        task_supervisor().spawn_tracked(async move {
                             if let Err(err) = handle_stream(stream, 
shard_clone, session_clone).await {
                                 error!("Error when handling QUIC stream: 
{:?}", err)
                             }
@@ -125,8 +126,7 @@ async fn handle_connection(
         }
     }
 
-    // Remove connection from tracking
-    shard.task_supervisor.remove_connection(&client_id);
+    task_supervisor().remove_connection(&client_id);
     info!("QUIC connection {} closed", client_id);
     Ok(())
 }
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
index b7f10b4a..4d98dd96 100644
--- a/core/server/src/shard/builder.rs
+++ b/core/server/src/shard/builder.rs
@@ -30,7 +30,7 @@ use tracing::info;
 use crate::{
     configs::server::ServerConfig,
     io::storage::Storage,
-    shard::{Shard, ShardInfo, namespace::IggyNamespace, tasks::TaskSupervisor},
+    shard::{Shard, ShardInfo, namespace::IggyNamespace},
     slab::streams::Streams,
     state::{StateKind, system::SystemState},
     streaming::{
@@ -151,7 +151,6 @@ impl IggyShardBuilder {
             stop_sender: stop_sender,
             messages_receiver: Cell::new(Some(frame_receiver)),
             metrics: metrics,
-            task_supervisor: TaskSupervisor::new(self.id.unwrap_or(0)),
             is_shutting_down: AtomicBool::new(false),
             tcp_bound_address: Cell::new(None),
             quic_bound_address: Cell::new(None),
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 20fc1ac8..7b8b8b25 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -59,7 +59,7 @@ use crate::{
     io::fs_utils,
     shard::{
         namespace::{IggyFullNamespace, IggyNamespace},
-        tasks::{TaskSupervisor, shard_task_specs},
+        tasks::{init_supervisor, shard_task_specs, task_supervisor},
         transmission::{
             event::ShardEvent,
             frame::{ShardFrame, ShardResponse},
@@ -153,7 +153,6 @@ pub struct IggyShard {
     pub messages_receiver: Cell<Option<Receiver<ShardFrame>>>,
     pub(crate) stop_receiver: StopReceiver,
     pub(crate) stop_sender: StopSender,
-    pub(crate) task_supervisor: TaskSupervisor,
     pub(crate) is_shutting_down: AtomicBool,
     pub(crate) tcp_bound_address: Cell<Option<SocketAddr>>,
     pub(crate) quic_bound_address: Cell<Option<SocketAddr>>,
@@ -197,7 +196,6 @@ impl IggyShard {
             messages_receiver: Cell::new(None),
             stop_receiver,
             stop_sender,
-            task_supervisor: TaskSupervisor::new(0),
             is_shutting_down: AtomicBool::new(false),
             tcp_bound_address: Cell::new(None),
             quic_bound_address: Cell::new(None),
@@ -221,6 +219,8 @@ impl IggyShard {
     }
 
     pub async fn run(self: &Rc<Self>, persister: Arc<PersisterKind>) -> 
Result<(), IggyError> {
+        // Initialize TLS task supervisor for this thread
+        init_supervisor(self.id);
         // Workaround to ensure that the statistics are initialized before the 
server
         // loads streams and starts accepting connections. This is necessary to
         // have the correct statistics when the server starts.
@@ -233,7 +233,7 @@ impl IggyShard {
 
         // Create and spawn all tasks via the supervisor
         let task_specs = shard_task_specs(self.clone());
-        self.task_supervisor.spawn(self.clone(), task_specs);
+        task_supervisor().spawn(self.clone(), task_specs);
 
         // Create a oneshot channel for shutdown completion notification
         let (shutdown_complete_tx, shutdown_complete_rx) = 
async_channel::bounded(1);
@@ -253,7 +253,6 @@ impl IggyShard {
                 shard_info!(shard_for_shutdown.id, "shutdown completed 
successfully");
             }
 
-            // Notify that shutdown is complete
             let _ = shutdown_complete_tx.send(()).await;
         })
         .detach();
@@ -261,8 +260,6 @@ impl IggyShard {
         let elapsed = now.elapsed();
         shard_info!(self.id, "Initialized in {} ms.", elapsed.as_millis());
 
-        // Wait for shutdown completion signal (not the stop signal directly)
-        // This avoids the double consumer issue
         shutdown_complete_rx.recv().await.ok();
         Ok(())
     }
@@ -367,13 +364,19 @@ impl IggyShard {
         self.stop_receiver.clone()
     }
 
+    /// Get the task supervisor for the current thread
+    ///
+    /// # Panics
+    /// Panics if the task supervisor has not been initialized
+    pub fn task_supervisor() -> Rc<crate::shard::tasks::TaskSupervisor> {
+        task_supervisor()
+    }
+
     #[instrument(skip_all, name = "trace_shutdown")]
     pub async fn trigger_shutdown(&self) -> bool {
         self.is_shutting_down.store(true, Ordering::SeqCst);
         info!("Shard {} shutdown state set", self.id);
-        self.task_supervisor
-            .graceful_shutdown(SHUTDOWN_TIMEOUT)
-            .await
+        task_supervisor().graceful_shutdown(SHUTDOWN_TIMEOUT).await
     }
 
     pub fn get_available_shards_count(&self) -> u32 {
diff --git a/core/server/src/shard/tasks/mod.rs 
b/core/server/src/shard/tasks/mod.rs
index 1ad68837..f6fc2c8c 100644
--- a/core/server/src/shard/tasks/mod.rs
+++ b/core/server/src/shard/tasks/mod.rs
@@ -27,10 +27,12 @@ pub mod periodic;
 pub mod shutdown;
 pub mod specs;
 pub mod supervisor;
+pub mod tls;
 
 pub use shutdown::{Shutdown, ShutdownToken};
 pub use specs::{TaskCtx, TaskFuture, TaskKind, TaskScope, TaskSpec};
 pub use supervisor::TaskSupervisor;
+pub use tls::{init_supervisor, task_supervisor};
 
 use crate::shard::IggyShard;
 use specs::IsNoOp;
diff --git a/core/server/src/shard/tasks/periodic/verify_heartbeats.rs 
b/core/server/src/shard/tasks/periodic/verify_heartbeats.rs
index 8d5c3ef2..a2ce9685 100644
--- a/core/server/src/shard/tasks/periodic/verify_heartbeats.rs
+++ b/core/server/src/shard/tasks/periodic/verify_heartbeats.rs
@@ -74,7 +74,6 @@ impl TaskSpec for VerifyHeartbeats {
             );
 
             loop {
-                // Use sleep_or_shutdown to allow clean interruption
                 if !ctx.shutdown.sleep_or_shutdown(self.period).await {
                     trace!("Heartbeat verifier shutting down");
                     break;
diff --git a/core/server/src/shard/tasks/shutdown.rs 
b/core/server/src/shard/tasks/shutdown.rs
index 6bb36d63..03be526d 100644
--- a/core/server/src/shard/tasks/shutdown.rs
+++ b/core/server/src/shard/tasks/shutdown.rs
@@ -48,7 +48,7 @@ impl Shutdown {
         (shutdown, token)
     }
 
-    pub async fn trigger(&self) {
+    pub fn trigger(&self) {
         if self.is_triggered.swap(true, Ordering::SeqCst) {
             return; // Already triggered
         }
@@ -159,7 +159,6 @@ mod tests {
     async fn test_sleep_or_shutdown_completes() {
         let (_shutdown, token) = Shutdown::new();
 
-        // Should complete the full sleep
         let completed = 
token.sleep_or_shutdown(Duration::from_millis(10)).await;
         assert!(completed);
     }
diff --git a/core/server/src/shard/tasks/supervisor.rs 
b/core/server/src/shard/tasks/supervisor.rs
index 9192f30e..72c5b43a 100644
--- a/core/server/src/shard/tasks/supervisor.rs
+++ b/core/server/src/shard/tasks/supervisor.rs
@@ -233,7 +233,7 @@ impl TaskSupervisor {
         self.shutdown_connections();
 
         // Trigger shutdown signal
-        self.shutdown.trigger().await;
+        self.shutdown.trigger();
 
         // Wait for continuous and periodic tasks with timeout
         let continuous_periodic_tasks = self.tasks.take();
diff --git a/core/server/src/shard/tasks/tls.rs 
b/core/server/src/shard/tasks/tls.rs
new file mode 100644
index 00000000..729f7027
--- /dev/null
+++ b/core/server/src/shard/tasks/tls.rs
@@ -0,0 +1,146 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::supervisor::TaskSupervisor;
+use std::cell::RefCell;
+use std::rc::Rc;
+
+thread_local! {
+    static SUPERVISOR: RefCell<Option<Rc<TaskSupervisor>>> = 
RefCell::new(None);
+}
+
+/// Initialize the task supervisor for this thread
+pub fn init_supervisor(shard_id: u16) {
+    SUPERVISOR.with(|s| {
+        *s.borrow_mut() = Some(Rc::new(TaskSupervisor::new(shard_id)));
+    });
+}
+
+/// Get the task supervisor for the current thread
+///
+/// # Panics
+/// Panics if the supervisor has not been initialized for this thread
+pub fn task_supervisor() -> Rc<TaskSupervisor> {
+    SUPERVISOR.with(|s| {
+        s.borrow()
+            .as_ref()
+            .expect(
+                "Task supervisor not initialized for this thread. Call 
init_supervisor() first.",
+            )
+            .clone()
+    })
+}
+
+/// Check if the task supervisor has been initialized for this thread
+pub fn is_supervisor_initialized() -> bool {
+    SUPERVISOR.with(|s| s.borrow().is_some())
+}
+
+/// Clear the task supervisor for this thread (for cleanup/testing)
+pub fn clear_supervisor() {
+    SUPERVISOR.with(|s| {
+        *s.borrow_mut() = None;
+    });
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use std::panic;
+
+    #[test]
+    fn test_supervisor_initialization() {
+        // Clean any existing supervisor first
+        clear_supervisor();
+
+        // Initially, supervisor should not be initialized
+        assert!(!is_supervisor_initialized());
+
+        // Trying to get supervisor without initialization should panic
+        let result = panic::catch_unwind(|| {
+            task_supervisor();
+        });
+        assert!(result.is_err());
+
+        // Initialize supervisor
+        init_supervisor(42);
+
+        // Now it should be initialized
+        assert!(is_supervisor_initialized());
+
+        // Should be able to get supervisor without panic
+        let supervisor = task_supervisor();
+        // Note: We can't directly check the shard_id since it's private,
+        // but we can verify the supervisor exists
+
+        // Clean up
+        clear_supervisor();
+        assert!(!is_supervisor_initialized());
+    }
+
+    #[test]
+    fn test_multiple_initializations() {
+        clear_supervisor();
+
+        // First initialization
+        init_supervisor(1);
+        let _supervisor1 = task_supervisor();
+
+        // Second initialization (should replace the first)
+        init_supervisor(2);
+        let _supervisor2 = task_supervisor();
+
+        // They should be different instances (different Rc references)
+        // Note: We can't use ptr::eq since Rc doesn't expose the inner 
pointer easily
+        // But we can verify that initialization works multiple times
+
+        clear_supervisor();
+    }
+
+    #[test]
+    fn test_thread_locality() {
+        use std::thread;
+
+        clear_supervisor();
+
+        // Initialize in main thread
+        init_supervisor(100);
+        assert!(is_supervisor_initialized());
+
+        // Spawn a new thread and verify it doesn't have the supervisor
+        let handle = thread::spawn(|| {
+            // This thread should not have supervisor initialized
+            assert!(!is_supervisor_initialized());
+
+            // Initialize different supervisor in this thread
+            init_supervisor(200);
+            assert!(is_supervisor_initialized());
+
+            // Get supervisor to verify it works
+            let _ = task_supervisor();
+        });
+
+        handle.join().expect("Thread should complete successfully");
+
+        // Main thread should still have its supervisor
+        assert!(is_supervisor_initialized());
+        let _ = task_supervisor();
+
+        clear_supervisor();
+    }
+}
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index 82dec49b..690b8847 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -735,9 +735,9 @@ impl Streams {
                 (msg.unwrap(), index.unwrap())
             });
 
-        // TODO: These fsync operations should use 
TaskSupervisor::spawn_oneshot with FsyncTask
+        // TODO(hubcio): These fsync operations should use 
TaskSupervisor::spawn_oneshot with FsyncTask
         // for proper tracking and graceful shutdown. However, this requires 
passing the shard
-        // reference through multiple layers. For now, we spawn directly to 
ensure durability.
+        // reference through multiple layers. For now, we spawn directly and 
hope for the best. YOLO.
         compio::runtime::spawn(async move {
             let _ = log_writer.fsync().await;
         })
diff --git a/core/server/src/tcp/tcp_listener.rs 
b/core/server/src/tcp/tcp_listener.rs
index c7b165b9..f39dcc94 100644
--- a/core/server/src/tcp/tcp_listener.rs
+++ b/core/server/src/tcp/tcp_listener.rs
@@ -19,6 +19,7 @@
 use crate::binary::sender::SenderKind;
 use crate::configs::tcp::TcpSocketConfig;
 use crate::shard::IggyShard;
+use crate::shard::tasks::task_supervisor;
 use crate::shard::transmission::event::ShardEvent;
 use crate::tcp::connection_handler::{handle_connection, handle_error};
 use crate::{shard_error, shard_info};
@@ -180,14 +181,14 @@ async fn accept_loop(
                         shard_info!(shard.id, "Created new session: {}", 
session);
                         let mut sender = SenderKind::get_tcp_sender(stream);
 
-                        let conn_stop_receiver = 
shard_clone.task_supervisor.add_connection(client_id);
+                        let conn_stop_receiver = 
task_supervisor().add_connection(client_id);
 
                         let shard_for_conn = shard_clone.clone();
-                        shard_clone.task_supervisor.spawn_tracked(async move {
+                        task_supervisor().spawn_tracked(async move {
                             if let Err(error) = handle_connection(&session, 
&mut sender, &shard_for_conn, conn_stop_receiver).await {
                                 handle_error(error);
                             }
-                            
shard_for_conn.task_supervisor.remove_connection(&client_id);
+                            task_supervisor().remove_connection(&client_id);
 
                             if let Err(error) = sender.shutdown().await {
                                 shard_error!(shard.id, "Failed to shutdown TCP 
stream for client: {}, address: {}. {}", client_id, address, error);
diff --git a/core/server/src/tcp/tcp_tls_listener.rs 
b/core/server/src/tcp/tcp_tls_listener.rs
index 0a5f7f8b..276eb52c 100644
--- a/core/server/src/tcp/tcp_tls_listener.rs
+++ b/core/server/src/tcp/tcp_tls_listener.rs
@@ -19,6 +19,7 @@
 use crate::binary::sender::SenderKind;
 use crate::configs::tcp::TcpSocketConfig;
 use crate::shard::IggyShard;
+use crate::shard::tasks::task_supervisor;
 use crate::shard::transmission::event::ShardEvent;
 use crate::tcp::connection_handler::{handle_connection, handle_error};
 use crate::{shard_error, shard_info, shard_warn};
@@ -219,7 +220,7 @@ async fn accept_loop(
 
                         // Perform TLS handshake in a separate task to avoid 
blocking the accept loop
                         let task_shard = shard_clone.clone();
-                        task_shard.task_supervisor.spawn_tracked(async move {
+                        task_supervisor().spawn_tracked(async move {
                             match acceptor.accept(stream).await {
                                 Ok(tls_stream) => {
                                     // TLS handshake successful, now create 
session
@@ -237,13 +238,13 @@ async fn accept_loop(
                                     let client_id = session.client_id;
                                     shard_info!(shard_clone.id, "Created new 
session: {}", session);
 
-                                    let conn_stop_receiver = 
shard_clone.task_supervisor.add_connection(client_id);
+                                    let conn_stop_receiver = 
task_supervisor().add_connection(client_id);
                                     let shard_for_conn = shard_clone.clone();
                                     let mut sender = 
SenderKind::get_tcp_tls_sender(tls_stream);
                                     if let Err(error) = 
handle_connection(&session, &mut sender, &shard_for_conn, 
conn_stop_receiver).await {
                                         handle_error(error);
                                     }
-                                    
shard_for_conn.task_supervisor.remove_connection(&client_id);
+                                    
task_supervisor().remove_connection(&client_id);
 
                                     if let Err(error) = 
sender.shutdown().await {
                                         shard_error!(shard.id, "Failed to 
shutdown TCP TLS stream for client: {}, address: {}. {}", client_id, address, 
error);

Reply via email to