This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch io_uring_tpc in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 47d86c1c3fdd5100a6deb71fe6ffb2222d7f1c1b Author: numinex <[email protected]> AuthorDate: Sat May 17 14:00:52 2025 +0200 begin shard init --- core/server/Cargo.toml | 2 +- core/server/src/bootstrap.rs | 131 +++++++++++++++++++++++++++++++++++-- core/server/src/main.rs | 33 +++++----- core/server/src/shard/builder.rs | 2 +- core/server/src/shard/mod.rs | 28 +++++++- core/server/src/shard/namespace.rs | 1 + core/server/src/state/file.rs | 13 +--- 7 files changed, 170 insertions(+), 40 deletions(-) diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index 63731abb..fba3fcd9 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -61,7 +61,7 @@ jsonwebtoken = "9.3.1" lending-iterator = "0.1.7" mimalloc = { workspace = true, optional = true } moka = { version = "0.12.10", features = ["future"] } -monoio = "0.2.4" +monoio = { version = "0.2.4", features = ["mkdirat", "unlinkat"] } nix = { version = "0.30", features = ["fs"] } once_cell = "1.21.3" openssl = { workspace = true } diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs index fd6a5aa0..d51096a2 100644 --- a/core/server/src/bootstrap.rs +++ b/core/server/src/bootstrap.rs @@ -1,7 +1,24 @@ -use monoio::{Buildable, Driver, Runtime}; +use iggy_common::{ + defaults::{ + DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME, MAX_PASSWORD_LENGTH, MAX_USERNAME_LENGTH, + MIN_PASSWORD_LENGTH, MIN_USERNAME_LENGTH, + }, + IggyError, +}; +use monoio::{ + fs::create_dir_all, + Buildable, Driver, Runtime, +}; +use tracing::info; -use crate::shard::{connector::ShardConnector, frame::ShardFrame}; -use std::ops::Range; +use crate::{ + configs::{config_provider::ConfigProviderKind, server::ServerConfig, system::SystemConfig}, + server_error::ServerError, + shard::{connector::ShardConnector, frame::ShardFrame}, + streaming::users::user::User, + IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV, +}; +use std::{env, fs::remove_dir_all, ops::Range, path::Path}; pub fn create_shard_connections(shards_set: Range<usize>) -> Vec<ShardConnector<ShardFrame>> { let shards_count = shards_set.len(); @@ -13,12 +30,112 @@ pub fn create_shard_connections(shards_set: Range<usize>) -> Vec<ShardConnector< connectors } -pub async fn create_directories() { - todo!(); +pub async fn load_config( + config_provider: &ConfigProviderKind, +) -> Result<ServerConfig, ServerError> { + let config = ServerConfig::load(config_provider).await?; + Ok(config) } -pub async fn create_root_user() { - todo!(); +pub async fn create_directories(config: &SystemConfig) -> Result<(), IggyError> { + let system_path = config.get_system_path(); + if !Path::new(&system_path).exists() && create_dir_all(&system_path).await.is_err() { + return Err(IggyError::CannotCreateBaseDirectory(system_path)); + } + + let state_path = config.get_state_path(); + if !Path::new(&state_path).exists() && create_dir_all(&state_path).await.is_err() { + return Err(IggyError::CannotCreateStateDirectory(state_path)); + } + + let streams_path = config.get_streams_path(); + if !Path::new(&streams_path).exists() && create_dir_all(&streams_path).await.is_err() { + return Err(IggyError::CannotCreateStreamsDirectory(streams_path)); + } + + let runtime_path = config.get_runtime_path(); + // TODO: Change remove_dir_all to async version, once we implement the dir walk using monoio `remove_dir` method. + if Path::new(&runtime_path).exists() && remove_dir_all(&runtime_path).is_err() { + return Err(IggyError::CannotRemoveRuntimeDirectory(runtime_path)); + } + + if create_dir_all(&runtime_path).await.is_err() { + return Err(IggyError::CannotCreateRuntimeDirectory(runtime_path)); + } + + info!( + "Initializing system, data will be stored at: {}", + config.get_system_path() + ); + Ok(()) + + // TODO: Move this to individual shard level + /* + let state_entries = self.state.init().await.with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to initialize state entries") + })?; + let system_state = SystemState::init(state_entries) + .await + .with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to initialize system state") + })?; + let now = Instant::now(); + self.load_version().await.with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to load version") + })?; + self.load_users(system_state.users.into_values().collect()) + .await + .with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to load users") + })?; + self.load_streams(system_state.streams.into_values().collect()) + .await + .with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to load streams") + })?; + if let Some(archiver) = self.archiver.as_ref() { + archiver + .init() + .await + .expect("Failed to initialize archiver"); + } + info!("Initialized system in {} ms.", now.elapsed().as_millis()); + Ok(()) + */ +} + +pub fn create_root_user() -> User { + info!("Creating root user..."); + let username = env::var(IGGY_ROOT_USERNAME_ENV); + let password = env::var(IGGY_ROOT_PASSWORD_ENV); + if (username.is_ok() && password.is_err()) || (username.is_err() && password.is_ok()) { + panic!("When providing the custom root user credentials, both username and password must be set."); + } + if username.is_ok() && password.is_ok() { + info!("Using the custom root user credentials."); + } else { + info!("Using the default root user credentials."); + } + + let username = username.unwrap_or(DEFAULT_ROOT_USERNAME.to_string()); + let password = password.unwrap_or(DEFAULT_ROOT_PASSWORD.to_string()); + if username.is_empty() || password.is_empty() { + panic!("Root user credentials are not set."); + } + if username.len() < MIN_USERNAME_LENGTH { + panic!("Root username is too short."); + } + if username.len() > MAX_USERNAME_LENGTH { + panic!("Root username is too long."); + } + if password.len() < MIN_PASSWORD_LENGTH { + panic!("Root password is too short."); + } + if password.len() > MAX_PASSWORD_LENGTH { + panic!("Root password is too long."); + } + let user = User::root(&username, &password); + user } pub fn create_default_executor<D>() -> Runtime<D> diff --git a/core/server/src/main.rs b/core/server/src/main.rs index a5bbbe48..cd79cedc 100644 --- a/core/server/src/main.rs +++ b/core/server/src/main.rs @@ -21,11 +21,13 @@ use std::thread::available_parallelism; use anyhow::Result; use clap::Parser; use dotenvy::dotenv; +use error_set::ErrContext; use figlet_rs::FIGfont; use monoio::Buildable; use server::args::Args; use server::bootstrap::{ create_default_executor, create_directories, create_root_user, create_shard_connections, + load_config, }; use server::channels::commands::archive_state::ArchiveStateExecutor; use server::channels::commands::clean_personal_access_tokens::CleanPersonalAccessTokensExecutor; @@ -73,17 +75,12 @@ fn main() -> Result<(), ServerError> { } let args = Args::parse(); + // TODO: I think we could get rid of config provider, since we support only TOML + // as config provider. let config_provider = config_provider::resolve(&args.config_provider)?; let config = std::thread::scope(|scope| { let config = scope .spawn(move || { - async fn load_config( - config_provider: &ConfigProviderKind, - ) -> Result<ServerConfig, ServerError> { - let config = ServerConfig::load(config_provider).await?; - Ok(config) - } - let mut rt = create_default_executor::<monoio::IoUringDriver>(); rt.block_on(load_config(&config_provider)) }) @@ -93,7 +90,7 @@ fn main() -> Result<(), ServerError> { })?; // Create directories and root user. - // Remove `local_data` directory if run with `--fresh` flag. + // Remove `local_data` directory if run with `--fresh` flag. std::thread::scope(|scope| { scope .spawn(|| { @@ -115,14 +112,15 @@ fn main() -> Result<(), ServerError> { } } - // Create directories and root user - create_directories().await; - create_root_user().await; - }); + // Create directories. + create_directories(&config.system).await?; + Ok::<(), ServerError>(()) + }) }) .join() - .expect("Failed to create directories and root user"); - }); + .expect("Failed to create directories and root user") + }) + .with_error_context(|err| format!("Failed to create directories, err: {err}"))?; // Initialize logging let mut logging = Logging::new(config.telemetry.clone()); @@ -153,20 +151,21 @@ fn main() -> Result<(), ServerError> { // let urb = io_uring::IoUring::builder(); // TODO: Shall we make the size of ring be configureable ? let mut rt = monoio::RuntimeBuilder::<monoio::IoUringDriver>::new() - //.uring_builder(urb.setup_coop_taskrun()) + //.uring_builder(urb.setup_coop_taskrun()) // broken shit. .with_entries(1024) // Default size .enable_timer() .build() .expect(format!("Failed to build monoio runtime for shard-{id}").as_str()); rt.block_on(async move { let builder = IggyShard::builder(); - let shard = builder + let mut shard = builder .id(id) .connections(connections) .server_config(server_config) - .build_and_init() + .build() .await; + shard.init().await; shard.assert_init(); }) }) diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs index 750e2b71..1a36a8fe 100644 --- a/core/server/src/shard/builder.rs +++ b/core/server/src/shard/builder.rs @@ -43,7 +43,7 @@ impl IggyShardBuilder { self } - pub async fn build_and_init(self) -> IggyShard { + pub async fn build(self) -> IggyShard { let id = self.id.unwrap(); let config = self.config.unwrap(); let connections = self.connections.unwrap(); diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index dcb089d9..5373b766 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -15,6 +15,7 @@ * specific language governing permissions and limitations * under the License. */ + pub mod builder; pub mod connector; pub mod frame; @@ -27,7 +28,7 @@ use frame::ShardFrame; use namespace::IggyNamespace; use std::cell::{Cell, RefCell}; -use crate::configs::server::ServerConfig; +use crate::{bootstrap::create_root_user, configs::server::ServerConfig}; pub(crate) struct Shard { id: u16, connection: ShardConnector<ShardFrame>, @@ -49,7 +50,7 @@ struct ShardInfo { pub struct IggyShard { pub id: u16, shards: Vec<Shard>, - //shards_table: RefCell<HashMap<IggyNamespace, ShardInfo>>, + shards_table: RefCell<HashMap<IggyNamespace, ShardInfo>>, //pub(crate) permissioner: RefCell<Permissioner>, //pub(crate) streams: RwLock<HashMap<u32, Stream>>, @@ -84,6 +85,7 @@ impl IggyShard { Self { id, shards, + shards_table: Default::default(), config, stop_receiver, stop_sender, @@ -91,5 +93,27 @@ impl IggyShard { } } + pub async fn init(&mut self) { + let user = create_root_user(); + self.load_state().await; + self.load_users().await; + // Add default root user. + todo!(); + self.load_streams().await; + + } + + async fn load_state(&self) { + todo!() + } + + async fn load_users(&self) { + todo!() + } + + async fn load_streams(&self) { + todo!() + } + pub fn assert_init(&self) {} } diff --git a/core/server/src/shard/namespace.rs b/core/server/src/shard/namespace.rs index 4b972125..d9aedfb7 100644 --- a/core/server/src/shard/namespace.rs +++ b/core/server/src/shard/namespace.rs @@ -15,6 +15,7 @@ * specific language governing permissions and limitations * under the License. */ + #[derive(Debug, Clone, Eq, PartialEq, Hash)] pub struct IggyNamespace { pub(crate) stream_id: u32, diff --git a/core/server/src/state/file.rs b/core/server/src/state/file.rs index c9e77977..f162ed75 100644 --- a/core/server/src/state/file.rs +++ b/core/server/src/state/file.rs @@ -84,18 +84,7 @@ impl FileState { impl State for FileState { async fn init(&self) -> Result<Vec<StateEntry>, IggyError> { - if !Path::new(&self.path).exists() { - info!("State file does not exist, creating a new one"); - self.persister - .overwrite(&self.path, &[]) - .await - .with_error_context(|error| { - format!( - "{COMPONENT} (error: {error}) - failed to overwrite state file, path: {}", - self.path - ) - })?; - } + assert!(Path::new(&self.path).exists()); let entries = self.load_entries().await.with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to load entries")
