This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch io_uring_tpc_task_registry in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 7a47a615b4dc398ed6d3cec60bcbce0cc09b1809 Author: Hubert Gruszecki <[email protected]> AuthorDate: Wed Sep 24 16:04:22 2025 +0200 thread local storage --- core/server/src/http/jwt/cleaner.rs | 1 + core/server/src/quic/listener.rs | 18 +-- core/server/src/shard/builder.rs | 3 +- core/server/src/shard/mod.rs | 23 ++-- core/server/src/shard/tasks/mod.rs | 2 + .../src/shard/tasks/periodic/verify_heartbeats.rs | 1 - core/server/src/shard/tasks/shutdown.rs | 3 +- core/server/src/shard/tasks/supervisor.rs | 2 +- core/server/src/shard/tasks/tls.rs | 146 +++++++++++++++++++++ core/server/src/slab/streams.rs | 4 +- core/server/src/tcp/tcp_listener.rs | 7 +- core/server/src/tcp/tcp_tls_listener.rs | 7 +- 12 files changed, 184 insertions(+), 33 deletions(-) diff --git a/core/server/src/http/jwt/cleaner.rs b/core/server/src/http/jwt/cleaner.rs index 6705d955..81d9e161 100644 --- a/core/server/src/http/jwt/cleaner.rs +++ b/core/server/src/http/jwt/cleaner.rs @@ -22,6 +22,7 @@ use std::{sync::Arc, time::Duration}; use tracing::{error, trace}; pub fn start_expired_tokens_cleaner(app_state: Arc<AppState>) { + // TODO(hubcio): make it a part of TaskSupervisor compio::runtime::spawn(async move { let mut interval_timer = compio::time::interval(Duration::from_secs(300)); loop { diff --git a/core/server/src/quic/listener.rs b/core/server/src/quic/listener.rs index 247b4b73..60833e2e 100644 --- a/core/server/src/quic/listener.rs +++ b/core/server/src/quic/listener.rs @@ -22,6 +22,7 @@ use crate::binary::command::{ServerCommand, ServerCommandHandler}; use crate::binary::sender::SenderKind; use crate::server_error::ConnectionError; use crate::shard::IggyShard; +use crate::shard::tasks::task_supervisor; use crate::shard::transmission::event::ShardEvent; use crate::streaming::session::Session; use crate::{shard_debug, shard_info}; @@ -47,7 +48,7 @@ pub async fn start(endpoint: Endpoint, shard: Rc<IggyShard>) -> Result<(), IggyE let shard_for_conn = shard_clone.clone(); // Use TaskSupervisor to track connection handlers for graceful shutdown - shard_clone.task_supervisor.spawn_tracked(async move { + task_supervisor().spawn_tracked(async move { trace!("Accepting connection from {}", remote_addr); match incoming_conn.await { Ok(connection) => { @@ -93,27 +94,27 @@ async fn handle_connection( address, transport: TransportProtocol::Quic, }; + + // TODO(hubcio): unused? let _responses = shard.broadcast_event_to_all_shards(event.into()).await; - // Add connection tracking for graceful shutdown - let conn_stop_receiver = shard.task_supervisor.add_connection(client_id); + let conn_stop_receiver = task_supervisor().add_connection(client_id); - // Handle streams until connection closes or shutdown is triggered loop { - // Check for shutdown signal futures::select! { + // Check for shutdown signal _ = conn_stop_receiver.recv().fuse() => { info!("QUIC connection {} shutting down gracefully", client_id); break; } + // Accept new connection stream_result = accept_stream(&connection, &shard, client_id).fuse() => { match stream_result? { Some(stream) => { let shard_clone = shard.clone(); let session_clone = session.clone(); - // Use TaskSupervisor to track stream handlers - shard.task_supervisor.spawn_tracked(async move { + task_supervisor().spawn_tracked(async move { if let Err(err) = handle_stream(stream, shard_clone, session_clone).await { error!("Error when handling QUIC stream: {:?}", err) } @@ -125,8 +126,7 @@ async fn handle_connection( } } - // Remove connection from tracking - shard.task_supervisor.remove_connection(&client_id); + task_supervisor().remove_connection(&client_id); info!("QUIC connection {} closed", client_id); Ok(()) } diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs index b7f10b4a..4d98dd96 100644 --- a/core/server/src/shard/builder.rs +++ b/core/server/src/shard/builder.rs @@ -30,7 +30,7 @@ use tracing::info; use crate::{ configs::server::ServerConfig, io::storage::Storage, - shard::{Shard, ShardInfo, namespace::IggyNamespace, tasks::TaskSupervisor}, + shard::{Shard, ShardInfo, namespace::IggyNamespace}, slab::streams::Streams, state::{StateKind, system::SystemState}, streaming::{ @@ -151,7 +151,6 @@ impl IggyShardBuilder { stop_sender: stop_sender, messages_receiver: Cell::new(Some(frame_receiver)), metrics: metrics, - task_supervisor: TaskSupervisor::new(self.id.unwrap_or(0)), is_shutting_down: AtomicBool::new(false), tcp_bound_address: Cell::new(None), quic_bound_address: Cell::new(None), diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index 20fc1ac8..7b8b8b25 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -59,7 +59,7 @@ use crate::{ io::fs_utils, shard::{ namespace::{IggyFullNamespace, IggyNamespace}, - tasks::{TaskSupervisor, shard_task_specs}, + tasks::{init_supervisor, shard_task_specs, task_supervisor}, transmission::{ event::ShardEvent, frame::{ShardFrame, ShardResponse}, @@ -153,7 +153,6 @@ pub struct IggyShard { pub messages_receiver: Cell<Option<Receiver<ShardFrame>>>, pub(crate) stop_receiver: StopReceiver, pub(crate) stop_sender: StopSender, - pub(crate) task_supervisor: TaskSupervisor, pub(crate) is_shutting_down: AtomicBool, pub(crate) tcp_bound_address: Cell<Option<SocketAddr>>, pub(crate) quic_bound_address: Cell<Option<SocketAddr>>, @@ -197,7 +196,6 @@ impl IggyShard { messages_receiver: Cell::new(None), stop_receiver, stop_sender, - task_supervisor: TaskSupervisor::new(0), is_shutting_down: AtomicBool::new(false), tcp_bound_address: Cell::new(None), quic_bound_address: Cell::new(None), @@ -221,6 +219,8 @@ impl IggyShard { } pub async fn run(self: &Rc<Self>, persister: Arc<PersisterKind>) -> Result<(), IggyError> { + // Initialize TLS task supervisor for this thread + init_supervisor(self.id); // Workaround to ensure that the statistics are initialized before the server // loads streams and starts accepting connections. This is necessary to // have the correct statistics when the server starts. @@ -233,7 +233,7 @@ impl IggyShard { // Create and spawn all tasks via the supervisor let task_specs = shard_task_specs(self.clone()); - self.task_supervisor.spawn(self.clone(), task_specs); + task_supervisor().spawn(self.clone(), task_specs); // Create a oneshot channel for shutdown completion notification let (shutdown_complete_tx, shutdown_complete_rx) = async_channel::bounded(1); @@ -253,7 +253,6 @@ impl IggyShard { shard_info!(shard_for_shutdown.id, "shutdown completed successfully"); } - // Notify that shutdown is complete let _ = shutdown_complete_tx.send(()).await; }) .detach(); @@ -261,8 +260,6 @@ impl IggyShard { let elapsed = now.elapsed(); shard_info!(self.id, "Initialized in {} ms.", elapsed.as_millis()); - // Wait for shutdown completion signal (not the stop signal directly) - // This avoids the double consumer issue shutdown_complete_rx.recv().await.ok(); Ok(()) } @@ -367,13 +364,19 @@ impl IggyShard { self.stop_receiver.clone() } + /// Get the task supervisor for the current thread + /// + /// # Panics + /// Panics if the task supervisor has not been initialized + pub fn task_supervisor() -> Rc<crate::shard::tasks::TaskSupervisor> { + task_supervisor() + } + #[instrument(skip_all, name = "trace_shutdown")] pub async fn trigger_shutdown(&self) -> bool { self.is_shutting_down.store(true, Ordering::SeqCst); info!("Shard {} shutdown state set", self.id); - self.task_supervisor - .graceful_shutdown(SHUTDOWN_TIMEOUT) - .await + task_supervisor().graceful_shutdown(SHUTDOWN_TIMEOUT).await } pub fn get_available_shards_count(&self) -> u32 { diff --git a/core/server/src/shard/tasks/mod.rs b/core/server/src/shard/tasks/mod.rs index 1ad68837..f6fc2c8c 100644 --- a/core/server/src/shard/tasks/mod.rs +++ b/core/server/src/shard/tasks/mod.rs @@ -27,10 +27,12 @@ pub mod periodic; pub mod shutdown; pub mod specs; pub mod supervisor; +pub mod tls; pub use shutdown::{Shutdown, ShutdownToken}; pub use specs::{TaskCtx, TaskFuture, TaskKind, TaskScope, TaskSpec}; pub use supervisor::TaskSupervisor; +pub use tls::{init_supervisor, task_supervisor}; use crate::shard::IggyShard; use specs::IsNoOp; diff --git a/core/server/src/shard/tasks/periodic/verify_heartbeats.rs b/core/server/src/shard/tasks/periodic/verify_heartbeats.rs index 8d5c3ef2..a2ce9685 100644 --- a/core/server/src/shard/tasks/periodic/verify_heartbeats.rs +++ b/core/server/src/shard/tasks/periodic/verify_heartbeats.rs @@ -74,7 +74,6 @@ impl TaskSpec for VerifyHeartbeats { ); loop { - // Use sleep_or_shutdown to allow clean interruption if !ctx.shutdown.sleep_or_shutdown(self.period).await { trace!("Heartbeat verifier shutting down"); break; diff --git a/core/server/src/shard/tasks/shutdown.rs b/core/server/src/shard/tasks/shutdown.rs index 6bb36d63..03be526d 100644 --- a/core/server/src/shard/tasks/shutdown.rs +++ b/core/server/src/shard/tasks/shutdown.rs @@ -48,7 +48,7 @@ impl Shutdown { (shutdown, token) } - pub async fn trigger(&self) { + pub fn trigger(&self) { if self.is_triggered.swap(true, Ordering::SeqCst) { return; // Already triggered } @@ -159,7 +159,6 @@ mod tests { async fn test_sleep_or_shutdown_completes() { let (_shutdown, token) = Shutdown::new(); - // Should complete the full sleep let completed = token.sleep_or_shutdown(Duration::from_millis(10)).await; assert!(completed); } diff --git a/core/server/src/shard/tasks/supervisor.rs b/core/server/src/shard/tasks/supervisor.rs index 9192f30e..72c5b43a 100644 --- a/core/server/src/shard/tasks/supervisor.rs +++ b/core/server/src/shard/tasks/supervisor.rs @@ -233,7 +233,7 @@ impl TaskSupervisor { self.shutdown_connections(); // Trigger shutdown signal - self.shutdown.trigger().await; + self.shutdown.trigger(); // Wait for continuous and periodic tasks with timeout let continuous_periodic_tasks = self.tasks.take(); diff --git a/core/server/src/shard/tasks/tls.rs b/core/server/src/shard/tasks/tls.rs new file mode 100644 index 00000000..729f7027 --- /dev/null +++ b/core/server/src/shard/tasks/tls.rs @@ -0,0 +1,146 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use super::supervisor::TaskSupervisor; +use std::cell::RefCell; +use std::rc::Rc; + +thread_local! { + static SUPERVISOR: RefCell<Option<Rc<TaskSupervisor>>> = RefCell::new(None); +} + +/// Initialize the task supervisor for this thread +pub fn init_supervisor(shard_id: u16) { + SUPERVISOR.with(|s| { + *s.borrow_mut() = Some(Rc::new(TaskSupervisor::new(shard_id))); + }); +} + +/// Get the task supervisor for the current thread +/// +/// # Panics +/// Panics if the supervisor has not been initialized for this thread +pub fn task_supervisor() -> Rc<TaskSupervisor> { + SUPERVISOR.with(|s| { + s.borrow() + .as_ref() + .expect( + "Task supervisor not initialized for this thread. Call init_supervisor() first.", + ) + .clone() + }) +} + +/// Check if the task supervisor has been initialized for this thread +pub fn is_supervisor_initialized() -> bool { + SUPERVISOR.with(|s| s.borrow().is_some()) +} + +/// Clear the task supervisor for this thread (for cleanup/testing) +pub fn clear_supervisor() { + SUPERVISOR.with(|s| { + *s.borrow_mut() = None; + }); +} + +#[cfg(test)] +mod tests { + use super::*; + use std::panic; + + #[test] + fn test_supervisor_initialization() { + // Clean any existing supervisor first + clear_supervisor(); + + // Initially, supervisor should not be initialized + assert!(!is_supervisor_initialized()); + + // Trying to get supervisor without initialization should panic + let result = panic::catch_unwind(|| { + task_supervisor(); + }); + assert!(result.is_err()); + + // Initialize supervisor + init_supervisor(42); + + // Now it should be initialized + assert!(is_supervisor_initialized()); + + // Should be able to get supervisor without panic + let supervisor = task_supervisor(); + // Note: We can't directly check the shard_id since it's private, + // but we can verify the supervisor exists + + // Clean up + clear_supervisor(); + assert!(!is_supervisor_initialized()); + } + + #[test] + fn test_multiple_initializations() { + clear_supervisor(); + + // First initialization + init_supervisor(1); + let _supervisor1 = task_supervisor(); + + // Second initialization (should replace the first) + init_supervisor(2); + let _supervisor2 = task_supervisor(); + + // They should be different instances (different Rc references) + // Note: We can't use ptr::eq since Rc doesn't expose the inner pointer easily + // But we can verify that initialization works multiple times + + clear_supervisor(); + } + + #[test] + fn test_thread_locality() { + use std::thread; + + clear_supervisor(); + + // Initialize in main thread + init_supervisor(100); + assert!(is_supervisor_initialized()); + + // Spawn a new thread and verify it doesn't have the supervisor + let handle = thread::spawn(|| { + // This thread should not have supervisor initialized + assert!(!is_supervisor_initialized()); + + // Initialize different supervisor in this thread + init_supervisor(200); + assert!(is_supervisor_initialized()); + + // Get supervisor to verify it works + let _ = task_supervisor(); + }); + + handle.join().expect("Thread should complete successfully"); + + // Main thread should still have its supervisor + assert!(is_supervisor_initialized()); + let _ = task_supervisor(); + + clear_supervisor(); + } +} diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs index 82dec49b..690b8847 100644 --- a/core/server/src/slab/streams.rs +++ b/core/server/src/slab/streams.rs @@ -735,9 +735,9 @@ impl Streams { (msg.unwrap(), index.unwrap()) }); - // TODO: These fsync operations should use TaskSupervisor::spawn_oneshot with FsyncTask + // TODO(hubcio): These fsync operations should use TaskSupervisor::spawn_oneshot with FsyncTask // for proper tracking and graceful shutdown. However, this requires passing the shard - // reference through multiple layers. For now, we spawn directly to ensure durability. + // reference through multiple layers. For now, we spawn directly and hope for the best. YOLO. compio::runtime::spawn(async move { let _ = log_writer.fsync().await; }) diff --git a/core/server/src/tcp/tcp_listener.rs b/core/server/src/tcp/tcp_listener.rs index c7b165b9..f39dcc94 100644 --- a/core/server/src/tcp/tcp_listener.rs +++ b/core/server/src/tcp/tcp_listener.rs @@ -19,6 +19,7 @@ use crate::binary::sender::SenderKind; use crate::configs::tcp::TcpSocketConfig; use crate::shard::IggyShard; +use crate::shard::tasks::task_supervisor; use crate::shard::transmission::event::ShardEvent; use crate::tcp::connection_handler::{handle_connection, handle_error}; use crate::{shard_error, shard_info}; @@ -180,14 +181,14 @@ async fn accept_loop( shard_info!(shard.id, "Created new session: {}", session); let mut sender = SenderKind::get_tcp_sender(stream); - let conn_stop_receiver = shard_clone.task_supervisor.add_connection(client_id); + let conn_stop_receiver = task_supervisor().add_connection(client_id); let shard_for_conn = shard_clone.clone(); - shard_clone.task_supervisor.spawn_tracked(async move { + task_supervisor().spawn_tracked(async move { if let Err(error) = handle_connection(&session, &mut sender, &shard_for_conn, conn_stop_receiver).await { handle_error(error); } - shard_for_conn.task_supervisor.remove_connection(&client_id); + task_supervisor().remove_connection(&client_id); if let Err(error) = sender.shutdown().await { shard_error!(shard.id, "Failed to shutdown TCP stream for client: {}, address: {}. {}", client_id, address, error); diff --git a/core/server/src/tcp/tcp_tls_listener.rs b/core/server/src/tcp/tcp_tls_listener.rs index 0a5f7f8b..276eb52c 100644 --- a/core/server/src/tcp/tcp_tls_listener.rs +++ b/core/server/src/tcp/tcp_tls_listener.rs @@ -19,6 +19,7 @@ use crate::binary::sender::SenderKind; use crate::configs::tcp::TcpSocketConfig; use crate::shard::IggyShard; +use crate::shard::tasks::task_supervisor; use crate::shard::transmission::event::ShardEvent; use crate::tcp::connection_handler::{handle_connection, handle_error}; use crate::{shard_error, shard_info, shard_warn}; @@ -219,7 +220,7 @@ async fn accept_loop( // Perform TLS handshake in a separate task to avoid blocking the accept loop let task_shard = shard_clone.clone(); - task_shard.task_supervisor.spawn_tracked(async move { + task_supervisor().spawn_tracked(async move { match acceptor.accept(stream).await { Ok(tls_stream) => { // TLS handshake successful, now create session @@ -237,13 +238,13 @@ async fn accept_loop( let client_id = session.client_id; shard_info!(shard_clone.id, "Created new session: {}", session); - let conn_stop_receiver = shard_clone.task_supervisor.add_connection(client_id); + let conn_stop_receiver = task_supervisor().add_connection(client_id); let shard_for_conn = shard_clone.clone(); let mut sender = SenderKind::get_tcp_tls_sender(tls_stream); if let Err(error) = handle_connection(&session, &mut sender, &shard_for_conn, conn_stop_receiver).await { handle_error(error); } - shard_for_conn.task_supervisor.remove_connection(&client_id); + task_supervisor().remove_connection(&client_id); if let Err(error) = sender.shutdown().await { shard_error!(shard.id, "Failed to shutdown TCP TLS stream for client: {}, address: {}. {}", client_id, address, error);
