This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch io_uring_tpc in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 99aaccf20c39c8ee62966c16db2afdb48c62763a Author: numinex <[email protected]> AuthorDate: Thu Jun 19 10:17:42 2025 +0200 todos --- Cargo.lock | 31 ++++++++++++++++++++++++----- core/sdk/src/tcp/tcp_client.rs | 1 + core/server/src/binary/command.rs | 1 + core/server/src/bootstrap.rs | 27 ++++++++++++++++++------- core/server/src/main.rs | 23 ++++++++-------------- core/server/src/shard/builder.rs | 40 +++++++++++++++++++++++++------------- core/server/src/shard/connector.rs | 4 ++-- core/server/src/shard/mod.rs | 38 ++++++++++++++++++++++++++++-------- 8 files changed, 115 insertions(+), 50 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4ab38a87..3a7a4958 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -142,7 +142,7 @@ dependencies = [ "actix-utils", "futures-core", "futures-util", - "mio 1.0.3", + "mio 1.0.4", "socket2", "tokio", "tracing", @@ -639,7 +639,7 @@ checksum = "fd73835ad7deb4bd2b389e6f10333b143f025d607c55ca04c66a0bcc6bb2fc6d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.103", ] [[package]] @@ -4624,6 +4624,15 @@ version = "2.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0" +[[package]] +name = "memoffset" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4" +dependencies = [ + "autocfg", +] + [[package]] name = "mimalloc" version = "0.1.47" @@ -4673,6 +4682,18 @@ dependencies = [ "adler2", ] +[[package]] +name = "mio" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +dependencies = [ + "libc", + "log", + "wasi 0.11.1+wasi-snapshot-preview1", + "windows-sys 0.48.0", +] + [[package]] name = "mio" version = "1.0.4" @@ -4761,7 +4782,7 @@ checksum = "176a5f5e69613d9e88337cf2a65e11135332b4efbcc628404a7c555e4452084c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.103", ] [[package]] @@ -4856,7 +4877,7 @@ dependencies = [ "kqueue", "libc", "log", - "mio 1.0.3", + "mio 1.0.4", "notify-types", "walkdir", "windows-sys 0.59.0", @@ -7403,7 +7424,7 @@ dependencies = [ "backtrace", "bytes", "libc", - "mio", + "mio 1.0.4", "parking_lot 0.12.4", "pin-project-lite", "signal-hook-registry", diff --git a/core/sdk/src/tcp/tcp_client.rs b/core/sdk/src/tcp/tcp_client.rs index 8db50940..086c5964 100644 --- a/core/sdk/src/tcp/tcp_client.rs +++ b/core/sdk/src/tcp/tcp_client.rs @@ -17,6 +17,7 @@ */ use crate::prelude::Client; +use crate::prelude::IggyConsumer; use crate::prelude::TcpClientConfig; use crate::tcp::tcp_connection_stream::TcpConnectionStream; use crate::tcp::tcp_connection_stream_kind::ConnectionStreamKind; diff --git a/core/server/src/binary/command.rs b/core/server/src/binary/command.rs index c7802be9..0b997941 100644 --- a/core/server/src/binary/command.rs +++ b/core/server/src/binary/command.rs @@ -66,6 +66,7 @@ use iggy_common::update_stream::UpdateStream; use iggy_common::update_topic::UpdateTopic; use iggy_common::update_user::UpdateUser; use iggy_common::*; +use rustls::crypto::hash::Output; use strum::EnumString; use tracing::error; diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs index d51096a2..d70d0317 100644 --- a/core/server/src/bootstrap.rs +++ b/core/server/src/bootstrap.rs @@ -1,22 +1,19 @@ use iggy_common::{ + IggyError, defaults::{ DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME, MAX_PASSWORD_LENGTH, MAX_USERNAME_LENGTH, MIN_PASSWORD_LENGTH, MIN_USERNAME_LENGTH, }, - IggyError, -}; -use monoio::{ - fs::create_dir_all, - Buildable, Driver, Runtime, }; +use monoio::{fs::create_dir_all, time::TimeDriver, Buildable, Driver, Runtime}; use tracing::info; use crate::{ + IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV, configs::{config_provider::ConfigProviderKind, server::ServerConfig, system::SystemConfig}, server_error::ServerError, shard::{connector::ShardConnector, frame::ShardFrame}, streaming::users::user::User, - IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV, }; use std::{env, fs::remove_dir_all, ops::Range, path::Path}; @@ -109,7 +106,9 @@ pub fn create_root_user() -> User { let username = env::var(IGGY_ROOT_USERNAME_ENV); let password = env::var(IGGY_ROOT_PASSWORD_ENV); if (username.is_ok() && password.is_err()) || (username.is_err() && password.is_ok()) { - panic!("When providing the custom root user credentials, both username and password must be set."); + panic!( + "When providing the custom root user credentials, both username and password must be set." + ); } if username.is_ok() && password.is_ok() { info!("Using the custom root user credentials."); @@ -146,3 +145,17 @@ where let rt = Buildable::build(builder).expect("Failed to create default runtime"); rt } + +pub fn create_shard_executor() -> Runtime<TimeDriver<monoio::IoUringDriver>> +{ + // TODO: Figure out what else we could tweak there + // We for sure want to disable the userspace interrupts on new cq entry (set_coop_taskrun) + // let urb = io_uring::IoUring::builder(); + // TODO: Shall we make the size of ring be configureable ? + let builder = monoio::RuntimeBuilder::<monoio::IoUringDriver>::new() + //.uring_builder(urb.setup_coop_taskrun()) // broken shit. + .with_entries(1024) // Default size + .enable_timer(); + let rt = Buildable::build(builder).expect("Failed to create default runtime"); + rt +} diff --git a/core/server/src/main.rs b/core/server/src/main.rs index cd79cedc..e7409e49 100644 --- a/core/server/src/main.rs +++ b/core/server/src/main.rs @@ -23,11 +23,9 @@ use clap::Parser; use dotenvy::dotenv; use error_set::ErrContext; use figlet_rs::FIGfont; -use monoio::Buildable; use server::args::Args; use server::bootstrap::{ - create_default_executor, create_directories, create_root_user, create_shard_connections, - load_config, + create_default_executor, create_directories, create_root_user, create_shard_connections, create_shard_executor, load_config }; use server::channels::commands::archive_state::ArchiveStateExecutor; use server::channels::commands::clean_personal_access_tokens::CleanPersonalAccessTokensExecutor; @@ -112,7 +110,7 @@ fn main() -> Result<(), ServerError> { } } - // Create directories. + // Create directories. create_directories(&config.system).await?; Ok::<(), ServerError>(()) }) @@ -146,16 +144,7 @@ fn main() -> Result<(), ServerError> { monoio::utils::bind_to_cpu_set(Some(shard_id)) .expect(format!("Failed to set CPU affinity for shard-{id}").as_str()); - // TODO: Figure out what else we could tweak there - // We for sure want to disable the userspace interrupts on new cq entry (set_coop_taskrun) - // let urb = io_uring::IoUring::builder(); - // TODO: Shall we make the size of ring be configureable ? - let mut rt = monoio::RuntimeBuilder::<monoio::IoUringDriver>::new() - //.uring_builder(urb.setup_coop_taskrun()) // broken shit. - .with_entries(1024) // Default size - .enable_timer() - .build() - .expect(format!("Failed to build monoio runtime for shard-{id}").as_str()); + let mut rt = create_shard_executor(); rt.block_on(async move { let builder = IggyShard::builder(); let mut shard = builder @@ -165,7 +154,11 @@ fn main() -> Result<(), ServerError> { .build() .await; - shard.init().await; + if let Err(e) = shard.init().await { + //TODO: If one of the shards fails to initialize, we should crash the whole program; + panic!("Failed to initialize shard-{id}: {e}"); + } + //TODO: If one of the shards fails to initialize, we should crash the whole program; shard.assert_init(); }) }) diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs index 640ffb57..06366ebf 100644 --- a/core/server/src/shard/builder.rs +++ b/core/server/src/shard/builder.rs @@ -21,9 +21,19 @@ use std::{cell::Cell, rc::Rc, sync::Arc}; use iggy_common::{Aes256GcmEncryptor, EncryptorKind}; use tracing::info; -use crate::{configs::server::ServerConfig, map_toggle_str, shard::Shard, state::{file::FileState, StateKind}, streaming::{persistence::persister::{FilePersister, FileWithSyncPersister, PersisterKind}, storage::SystemStorage}, versioning::SemanticVersion}; +use crate::{ + configs::server::ServerConfig, + map_toggle_str, + shard::Shard, + state::{StateKind, file::FileState}, + streaming::{ + persistence::persister::{FilePersister, FileWithSyncPersister, PersisterKind}, + storage::SystemStorage, + }, + versioning::SemanticVersion, +}; -use super::{connector::ShardConnector, frame::ShardFrame, IggyShard}; +use super::{IggyShard, connector::ShardConnector, frame::ShardFrame}; #[derive(Default)] pub struct IggyShardBuilder { @@ -66,8 +76,10 @@ impl IggyShardBuilder { .next() .expect("Failed to find connection with the specified ID"); let shards = connections.into_iter().map(Shard::new).collect(); + //TODO: This can be discrete step in the builder bootstrapped from main function. let version = SemanticVersion::current().expect("Invalid version"); + //TODO: This can be discrete step in the builder bootstrapped from main function. info!( "Server-side encryption is {}.", map_toggle_str(config.system.encryption.enabled) @@ -79,6 +91,7 @@ impl IggyShardBuilder { false => None, }; + //TODO: This can be discrete step in the builder bootstrapped from main function. let state_persister = Self::resolve_persister(config.system.state.enforce_fsync); let state = Rc::new(StateKind::File(FileState::new( &config.system.get_state_messages_file_path(), @@ -87,20 +100,21 @@ impl IggyShardBuilder { encryptor.clone(), ))); + //TODO: This can be discrete step in the builder bootstrapped from main function. let partition_persister = Self::resolve_persister(config.system.partition.enforce_fsync); - let storage = SystemStorage::new(config.system, partition_persister); + let storage = Rc::new(SystemStorage::new(config.system.clone(), partition_persister)); IggyShard { - id: id, - shards: shards, - shards_table: Default::default(), - storage: storage, - state: state, - config: config, - stop_receiver: stop_receiver, - stop_sender: stop_sender, - frame_receiver: Cell::new(Some(frame_receiver)), - } + id: id, + shards: shards, + shards_table: Default::default(), + storage: storage, + state: state, + config: config, + stop_receiver: stop_receiver, + stop_sender: stop_sender, + frame_receiver: Cell::new(Some(frame_receiver)), + } } fn resolve_persister(enforce_fsync: bool) -> Arc<PersisterKind> { diff --git a/core/server/src/shard/connector.rs b/core/server/src/shard/connector.rs index 1d762d68..95aedc73 100644 --- a/core/server/src/shard/connector.rs +++ b/core/server/src/shard/connector.rs @@ -15,10 +15,10 @@ * specific language governing permissions and limitations * under the License. */ -use futures::{task::AtomicWaker, Stream}; +use futures::{Stream, task::AtomicWaker}; use sharded_queue::ShardedQueue; use std::{ - sync::{atomic::AtomicUsize, Arc}, + sync::{Arc, atomic::AtomicUsize}, task::Poll, }; diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index 54b88fb1..b98f6381 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -25,10 +25,19 @@ use ahash::HashMap; use builder::IggyShardBuilder; use connector::{Receiver, ShardConnector, StopReceiver, StopSender}; use frame::ShardFrame; +use iggy_common::IggyError; use namespace::IggyNamespace; -use std::{cell::{Cell, RefCell}, rc::Rc, sync::Arc}; +use tracing::info; +use std::{ + cell::{Cell, RefCell}, + rc::Rc, + sync::Arc, time::Instant, +}; -use crate::{bootstrap::create_root_user, configs::server::ServerConfig, state::file::FileState, streaming::storage::SystemStorage}; +use crate::{ + bootstrap::create_root_user, configs::server::ServerConfig, state::{file::FileState, StateKind}, + streaming::storage::SystemStorage, +}; pub(crate) struct Shard { id: u16, connection: ShardConnector<ShardFrame>, @@ -57,10 +66,10 @@ pub struct IggyShard { //pub(crate) streams_ids: RefCell<HashMap<String, u32>>, //pub(crate) users: RefCell<HashMap<UserId, User>>, // TODO: Refactor. - pub(crate) storage: Arc<SystemStorage>, + pub(crate) storage: Rc<SystemStorage>, // TODO - get rid of this dynamic dispatch. - pub(crate) state: Rc<FileState>, + pub(crate) state: Rc<StateKind>, //pub(crate) encryptor: Option<Rc<dyn Encryptor>>, config: ServerConfig, //pub(crate) client_manager: RefCell<ClientManager>, @@ -76,14 +85,27 @@ impl IggyShard { Default::default() } - pub async fn init(&mut self) { - let user = create_root_user(); + pub async fn init(&mut self) -> Result<(), IggyError> { + let now = Instant::now(); + //TODO: Fix this either by moving it to main function, or by using `run_once` barrier. + //let state_entries = self.state.init().await?; + //let system_state = SystemState::init(state_entries).await?; + //let user = create_root_user(); self.load_state().await; self.load_users().await; // Add default root user. - todo!(); self.load_streams().await; - + //TODO: Fix the archiver. + /* + if let Some(archiver) = self.archiver.as_ref() { + archiver + .init() + .await + .expect("Failed to initialize archiver"); + } + */ + info!("Initialized system in {} ms.", now.elapsed().as_millis()); + Ok(()) } async fn load_state(&self) {
