This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch io_uring_tpc in repository https://gitbox.apache.org/repos/asf/iggy.git
commit a435b97d34da47a2658a70857aa3012b695cd7f6 Author: Grzegorz Koszyk <[email protected]> AuthorDate: Wed May 14 20:38:55 2025 +0200 feat(io_uring): bootstrap the main function (#1788) --- Cargo.lock | 1 + core/server/Cargo.toml | 1 + core/server/src/bootstrap.rs | 20 ++++++++++ core/server/src/lib.rs | 4 +- core/server/src/main.rs | 80 ++++++++++++++++++++++++++++++++++++-- core/server/src/shard/builder.rs | 73 ++++++++++++++++++++++++++++++++++ core/server/src/shard/connector.rs | 2 + core/server/src/shard/frame.rs | 3 +- core/server/src/shard/mod.rs | 79 +++++++++++++++++++++++++++---------- 9 files changed, 234 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d9ba5d6a..4ab38a87 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6807,6 +6807,7 @@ dependencies = [ "futures", "human-repr", "iggy_common", + "io-uring", "jsonwebtoken", "lending-iterator", "mimalloc", diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index 6a526df6..3c6bccd5 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -59,6 +59,7 @@ flume = { workspace = true } futures = { workspace = true } human-repr = { workspace = true } iggy_common = { workspace = true } +io-uring = "0.6" jsonwebtoken = "9.3.1" lending-iterator = "0.1.7" mimalloc = { workspace = true, optional = true } diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs index e69de29b..8eacf223 100644 --- a/core/server/src/bootstrap.rs +++ b/core/server/src/bootstrap.rs @@ -0,0 +1,20 @@ +use crate::shard::{connector::ShardConnector, frame::ShardFrame}; +use std::ops::Range; + +pub fn create_shard_connections(shards_set: Range<usize>) -> Vec<ShardConnector<ShardFrame>> { + let shards_count = shards_set.len(); + let connectors = shards_set + .into_iter() + .map(|id| ShardConnector::new(id as u16, shards_count)) + .collect(); + + connectors +} + +pub async fn create_directories() { + +} + +pub async fn create_root_user() { + +} diff --git a/core/server/src/lib.rs b/core/server/src/lib.rs index 572301e0..e64837da 100644 --- a/core/server/src/lib.rs +++ b/core/server/src/lib.rs @@ -26,18 +26,18 @@ static GLOBAL: MiMalloc = MiMalloc; #[cfg(windows)] compile_error!("iggy-server doesn't support windows."); -mod bootstrap; pub mod archiver; pub mod args; pub mod binary; +pub mod bootstrap; pub mod channels; pub(crate) mod compat; pub mod configs; -pub mod shard; pub mod http; pub mod log; pub mod quic; pub mod server_error; +pub mod shard; pub mod state; pub mod streaming; pub mod tcp; diff --git a/core/server/src/main.rs b/core/server/src/main.rs index d9d9d753..bb92796d 100644 --- a/core/server/src/main.rs +++ b/core/server/src/main.rs @@ -16,11 +16,15 @@ * under the License. */ +use std::thread::available_parallelism; + use anyhow::Result; use clap::Parser; use dotenvy::dotenv; use figlet_rs::FIGfont; +use monoio::Buildable; use server::args::Args; +use server::bootstrap::{create_directories, create_root_user, create_shard_connections}; use server::channels::commands::archive_state::ArchiveStateExecutor; use server::channels::commands::clean_personal_access_tokens::CleanPersonalAccessTokensExecutor; use server::channels::commands::maintain_messages::MaintainMessagesExecutor; @@ -37,15 +41,15 @@ use server::log::logger::Logging; use server::log::tokio_console::Logging; use server::quic::quic_server; use server::server_error::ServerError; +use server::shard::IggyShard; use server::streaming::systems::system::{SharedSystem, System}; use server::streaming::utils::MemoryPool; use server::tcp::tcp_server; use tokio::time::Instant; use tracing::{info, instrument}; -#[tokio::main] #[instrument(skip_all, name = "trace_start_server")] -async fn main() -> Result<(), ServerError> { +fn main() -> Result<(), ServerError> { let startup_timestamp = Instant::now(); let standard_font = FIGfont::standard().unwrap(); let figure = standard_font.convert("Iggy Server"); @@ -68,6 +72,24 @@ async fn main() -> Result<(), ServerError> { let args = Args::parse(); let config_provider = config_provider::resolve(&args.config_provider)?; + let config = ServerConfig::default(); + //TODO: Load config. + /* + let xd = std::thread::scope(|scope| { + let config = scope.spawn(move || { + let mut rt = monoio::RuntimeBuilder::<monoio::IoUringDriver>::new().build().unwrap(); + let config: Result<ServerConfig, ServerError> = rt.block_on(async { + let config = ServerConfig::load(&config_provider).await?; + create_directories().await; + create_root_user().await; + Ok(config) + }); + config + }).join().unwrap(); + config + }); + */ + /* let config = ServerConfig::load(&config_provider).await?; if args.fresh { let system_path = config.system.get_system_path(); @@ -83,9 +105,56 @@ async fn main() -> Result<(), ServerError> { } let mut logging = Logging::new(config.telemetry.clone()); logging.early_init(); + */ + // TODO: Make this configurable from config as a range + // for example this instance of Iggy will use cores from 0..4 + let available_cpus = available_parallelism() + .expect("Failed to get num of cores") + .into(); + let shards_count = available_cpus; + let shards_set = 0..shards_count; + + let connections = create_shard_connections(shards_set.clone()); + + for shard_id in shards_set { + let id = shard_id as u16; + let connections = connections.clone(); + let server_config = config.clone(); + std::thread::Builder::new() + .name(format!("shard-{id}")) + .spawn(move || { + monoio::utils::bind_to_cpu_set(Some(shard_id)) + .expect(format!("Failed to set CPU affinity for shard-{id}").as_str()); + // TODO: Figure out what else we could tweak there + // We for sure want to disable the userspace interrupts on new cq entry (set_coop_taskrun) + // let urb = io_uring::IoUring::builder(); + // TODO: Shall we make the size of ring be configureable ? + let mut rt = monoio::RuntimeBuilder::<monoio::IoUringDriver>::new() + //.uring_builder(urb.setup_coop_taskrun()) + .with_entries(1024) // Default size + .enable_timer() + .build() + .expect(format!("Failed to build monoio runtime for shard-{id}").as_str()); + rt.block_on(async move { + let builder = IggyShard::builder(); + let shard = builder + .id(id) + .connections(connections) + .server_config(server_config) + .build_and_init() + .await; + + shard.assert_init(); + }) + }) + .expect(format!("Failed to spawn thread for shard-{id}").as_str()) + .join() + .expect(format!("Failed to join thread for shard-{id}").as_str()); + } // From this point on, we can use tracing macros to log messages. + /* logging.late_init(config.system.get_system_path(), &config.system.logging)?; #[cfg(feature = "disable-mimalloc")] @@ -102,13 +171,15 @@ async fn main() -> Result<(), ServerError> { config.data_maintenance.clone(), config.personal_access_token.clone(), )); + */ // Workaround to ensure that the statistics are initialized before the server // loads streams and starts accepting connections. This is necessary to // have the correct statistics when the server starts. - system.write().await.get_stats().await?; - system.write().await.init().await?; + //system.write().await.get_stats().await?; + //system.write().await.init().await?; + /* let _command_handler = BackgroundServerCommandHandler::new(system.clone(), &config) .install_handler(SaveMessagesExecutor) .install_handler(MaintainMessagesExecutor) @@ -184,5 +255,6 @@ async fn main() -> Result<(), ServerError> { "Iggy server has shutdown successfully. Shutdown took {} ms.", elapsed_time.as_millis() ); + */ Ok(()) } diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs new file mode 100644 index 00000000..08a02e54 --- /dev/null +++ b/core/server/src/shard/builder.rs @@ -0,0 +1,73 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::{configs::server::ServerConfig, shard::Shard}; + +use super::{connector::ShardConnector, frame::ShardFrame, IggyShard}; + +#[derive(Default)] +pub struct IggyShardBuilder { + id: Option<u16>, + connections: Option<Vec<ShardConnector<ShardFrame>>>, + config: Option<ServerConfig>, +} + +impl IggyShardBuilder { + pub fn id(mut self, id: u16) -> Self { + self.id = Some(id); + self + } + + pub fn connections(mut self, connections: Vec<ShardConnector<ShardFrame>>) -> Self { + self.connections = Some(connections); + self + } + + pub fn server_config(mut self, config: ServerConfig) -> Self { + self.config = Some(config); + self + } + + pub async fn build_and_init(self) -> IggyShard { + let id = self.id.unwrap(); + let config = self.config.unwrap(); + let connections = self.connections.unwrap(); + let (stop_sender, stop_receiver, receiver) = connections + .iter() + .filter(|c| c.id == id) + .map(|c| { + ( + c.stop_sender.clone(), + c.stop_receiver.clone(), + c.receiver.clone(), + ) + }) + .next() + .expect("Failed to find connection with the specified ID"); + let shards = connections.into_iter().map(Shard::new).collect(); + + IggyShard::new( + id, + shards, + config, + stop_receiver, + stop_sender, + receiver, + ) + } +} diff --git a/core/server/src/shard/connector.rs b/core/server/src/shard/connector.rs index 349218ba..1d762d68 100644 --- a/core/server/src/shard/connector.rs +++ b/core/server/src/shard/connector.rs @@ -54,6 +54,8 @@ impl<T: Clone> ShardConnector<T> { } } +// TODO: I think those Arcs can be replaced with 'static lifetimes... +// Those shards will live for the entire duration of the application. #[derive(Clone)] pub struct Receiver<T> { channel: Arc<ShardedChannel<T>>, diff --git a/core/server/src/shard/frame.rs b/core/server/src/shard/frame.rs index b1524ad6..a2c1220f 100644 --- a/core/server/src/shard/frame.rs +++ b/core/server/src/shard/frame.rs @@ -27,8 +27,7 @@ pub enum ShardMessage { } #[derive(Debug, Clone)] -pub enum ShardEvent { -} +pub enum ShardEvent {} #[derive(Debug)] pub enum ShardResponse { diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index 6ccbcab3..bffaca45 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -15,22 +15,33 @@ * specific language governing permissions and limitations * under the License. */ -mod connector; -mod namespace; -mod frame; +pub mod builder; +pub mod connector; +pub mod frame; +pub mod namespace; - - -use std::cell::RefCell; use ahash::HashMap; -use connector::ShardConnector; +use builder::IggyShardBuilder; +use connector::{Receiver, ShardConnector, StopReceiver, StopSender}; use frame::ShardFrame; use namespace::IggyNamespace; -struct Shard { +use std::cell::{Cell, RefCell}; + +use crate::configs::server::ServerConfig; +pub(crate) struct Shard { id: u16, connection: ShardConnector<ShardFrame>, } +impl Shard { + pub fn new(connection: ShardConnector<ShardFrame>) -> Self { + Self { + id: connection.id, + connection, + } + } +} + struct ShardInfo { id: u16, } @@ -38,22 +49,48 @@ struct ShardInfo { pub struct IggyShard { pub id: u16, shards: Vec<Shard>, - shards_table: RefCell<HashMap<IggyNamespace, ShardInfo>>, + //shards_table: RefCell<HashMap<IggyNamespace, ShardInfo>>, - pub(crate) permissioner: RefCell<Permissioner>, - pub(crate) storage: Rc<SystemStorage>, - pub(crate) streams: RwLock<HashMap<u32, Stream>>, - pub(crate) streams_ids: RefCell<HashMap<String, u32>>, - pub(crate) users: RefCell<HashMap<UserId, User>>, + //pub(crate) permissioner: RefCell<Permissioner>, + //pub(crate) streams: RwLock<HashMap<u32, Stream>>, + //pub(crate) streams_ids: RefCell<HashMap<String, u32>>, + //pub(crate) users: RefCell<HashMap<UserId, User>>, // TODO - get rid of this dynamic dispatch. - pub(crate) state: Rc<FileState>, - pub(crate) encryptor: Option<Rc<dyn Encryptor>>, - pub(crate) config: ServerConfig, - pub(crate) client_manager: RefCell<ClientManager>, - pub(crate) active_sessions: RefCell<Vec<Session>>, - pub(crate) metrics: Metrics, + //pub(crate) state: Rc<FileState>, + //pub(crate) encryptor: Option<Rc<dyn Encryptor>>, + config: ServerConfig, + //pub(crate) client_manager: RefCell<ClientManager>, + //pub(crate) active_sessions: RefCell<Vec<Session>>, + //pub(crate) metrics: Metrics, pub message_receiver: Cell<Option<Receiver<ShardFrame>>>, stop_receiver: StopReceiver, stop_sender: StopSender, -} \ No newline at end of file +} + +impl IggyShard { + pub fn builder() -> IggyShardBuilder { + Default::default() + } + + pub(crate) fn new( + id: u16, + shards: Vec<Shard>, + config: ServerConfig, + stop_receiver: StopReceiver, + stop_sender: StopSender, + shard_messages_receiver: Receiver<ShardFrame>, + ) -> Self { + Self { + id, + shards, + config, + stop_receiver, + stop_sender, + message_receiver: Cell::new(Some(shard_messages_receiver)), + } + } + + pub fn assert_init(&self) { + } +}
