This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch io_uring_tpc in repository https://gitbox.apache.org/repos/asf/iggy.git
commit d4215002b3b895f8b452b576c22dc91c99fbf7b1 Author: numinex <[email protected]> AuthorDate: Sat May 17 14:00:52 2025 +0200 begin shard init --- core/server/Cargo.toml | 2 +- core/server/src/bootstrap.rs | 131 +++++++++++++++++++++++++++++++++++-- core/server/src/main.rs | 33 +++++----- core/server/src/shard/builder.rs | 54 +++++++++++++-- core/server/src/shard/mod.rs | 49 ++++++++------ core/server/src/shard/namespace.rs | 1 + core/server/src/state/file.rs | 13 +--- 7 files changed, 221 insertions(+), 62 deletions(-) diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index 3c6bccd5..0700dd39 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -64,7 +64,7 @@ jsonwebtoken = "9.3.1" lending-iterator = "0.1.7" mimalloc = { workspace = true, optional = true } moka = { version = "0.12.10", features = ["future"] } -monoio = "0.2.4" +monoio = { version = "0.2.4", features = ["mkdirat", "unlinkat"] } nix = { version = "0.30", features = ["fs"] } once_cell = "1.21.3" opentelemetry = { version = "0.30.0", features = ["trace", "logs"] } diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs index fd6a5aa0..d51096a2 100644 --- a/core/server/src/bootstrap.rs +++ b/core/server/src/bootstrap.rs @@ -1,7 +1,24 @@ -use monoio::{Buildable, Driver, Runtime}; +use iggy_common::{ + defaults::{ + DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME, MAX_PASSWORD_LENGTH, MAX_USERNAME_LENGTH, + MIN_PASSWORD_LENGTH, MIN_USERNAME_LENGTH, + }, + IggyError, +}; +use monoio::{ + fs::create_dir_all, + Buildable, Driver, Runtime, +}; +use tracing::info; -use crate::shard::{connector::ShardConnector, frame::ShardFrame}; -use std::ops::Range; +use crate::{ + configs::{config_provider::ConfigProviderKind, server::ServerConfig, system::SystemConfig}, + server_error::ServerError, + shard::{connector::ShardConnector, frame::ShardFrame}, + streaming::users::user::User, + IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV, +}; +use std::{env, fs::remove_dir_all, ops::Range, path::Path}; pub fn create_shard_connections(shards_set: Range<usize>) -> Vec<ShardConnector<ShardFrame>> { let shards_count = shards_set.len(); @@ -13,12 +30,112 @@ pub fn create_shard_connections(shards_set: Range<usize>) -> Vec<ShardConnector< connectors } -pub async fn create_directories() { - todo!(); +pub async fn load_config( + config_provider: &ConfigProviderKind, +) -> Result<ServerConfig, ServerError> { + let config = ServerConfig::load(config_provider).await?; + Ok(config) } -pub async fn create_root_user() { - todo!(); +pub async fn create_directories(config: &SystemConfig) -> Result<(), IggyError> { + let system_path = config.get_system_path(); + if !Path::new(&system_path).exists() && create_dir_all(&system_path).await.is_err() { + return Err(IggyError::CannotCreateBaseDirectory(system_path)); + } + + let state_path = config.get_state_path(); + if !Path::new(&state_path).exists() && create_dir_all(&state_path).await.is_err() { + return Err(IggyError::CannotCreateStateDirectory(state_path)); + } + + let streams_path = config.get_streams_path(); + if !Path::new(&streams_path).exists() && create_dir_all(&streams_path).await.is_err() { + return Err(IggyError::CannotCreateStreamsDirectory(streams_path)); + } + + let runtime_path = config.get_runtime_path(); + // TODO: Change remove_dir_all to async version, once we implement the dir walk using monoio `remove_dir` method. + if Path::new(&runtime_path).exists() && remove_dir_all(&runtime_path).is_err() { + return Err(IggyError::CannotRemoveRuntimeDirectory(runtime_path)); + } + + if create_dir_all(&runtime_path).await.is_err() { + return Err(IggyError::CannotCreateRuntimeDirectory(runtime_path)); + } + + info!( + "Initializing system, data will be stored at: {}", + config.get_system_path() + ); + Ok(()) + + // TODO: Move this to individual shard level + /* + let state_entries = self.state.init().await.with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to initialize state entries") + })?; + let system_state = SystemState::init(state_entries) + .await + .with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to initialize system state") + })?; + let now = Instant::now(); + self.load_version().await.with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to load version") + })?; + self.load_users(system_state.users.into_values().collect()) + .await + .with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to load users") + })?; + self.load_streams(system_state.streams.into_values().collect()) + .await + .with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to load streams") + })?; + if let Some(archiver) = self.archiver.as_ref() { + archiver + .init() + .await + .expect("Failed to initialize archiver"); + } + info!("Initialized system in {} ms.", now.elapsed().as_millis()); + Ok(()) + */ +} + +pub fn create_root_user() -> User { + info!("Creating root user..."); + let username = env::var(IGGY_ROOT_USERNAME_ENV); + let password = env::var(IGGY_ROOT_PASSWORD_ENV); + if (username.is_ok() && password.is_err()) || (username.is_err() && password.is_ok()) { + panic!("When providing the custom root user credentials, both username and password must be set."); + } + if username.is_ok() && password.is_ok() { + info!("Using the custom root user credentials."); + } else { + info!("Using the default root user credentials."); + } + + let username = username.unwrap_or(DEFAULT_ROOT_USERNAME.to_string()); + let password = password.unwrap_or(DEFAULT_ROOT_PASSWORD.to_string()); + if username.is_empty() || password.is_empty() { + panic!("Root user credentials are not set."); + } + if username.len() < MIN_USERNAME_LENGTH { + panic!("Root username is too short."); + } + if username.len() > MAX_USERNAME_LENGTH { + panic!("Root username is too long."); + } + if password.len() < MIN_PASSWORD_LENGTH { + panic!("Root password is too short."); + } + if password.len() > MAX_PASSWORD_LENGTH { + panic!("Root password is too long."); + } + let user = User::root(&username, &password); + user } pub fn create_default_executor<D>() -> Runtime<D> diff --git a/core/server/src/main.rs b/core/server/src/main.rs index a5bbbe48..cd79cedc 100644 --- a/core/server/src/main.rs +++ b/core/server/src/main.rs @@ -21,11 +21,13 @@ use std::thread::available_parallelism; use anyhow::Result; use clap::Parser; use dotenvy::dotenv; +use error_set::ErrContext; use figlet_rs::FIGfont; use monoio::Buildable; use server::args::Args; use server::bootstrap::{ create_default_executor, create_directories, create_root_user, create_shard_connections, + load_config, }; use server::channels::commands::archive_state::ArchiveStateExecutor; use server::channels::commands::clean_personal_access_tokens::CleanPersonalAccessTokensExecutor; @@ -73,17 +75,12 @@ fn main() -> Result<(), ServerError> { } let args = Args::parse(); + // TODO: I think we could get rid of config provider, since we support only TOML + // as config provider. let config_provider = config_provider::resolve(&args.config_provider)?; let config = std::thread::scope(|scope| { let config = scope .spawn(move || { - async fn load_config( - config_provider: &ConfigProviderKind, - ) -> Result<ServerConfig, ServerError> { - let config = ServerConfig::load(config_provider).await?; - Ok(config) - } - let mut rt = create_default_executor::<monoio::IoUringDriver>(); rt.block_on(load_config(&config_provider)) }) @@ -93,7 +90,7 @@ fn main() -> Result<(), ServerError> { })?; // Create directories and root user. - // Remove `local_data` directory if run with `--fresh` flag. + // Remove `local_data` directory if run with `--fresh` flag. std::thread::scope(|scope| { scope .spawn(|| { @@ -115,14 +112,15 @@ fn main() -> Result<(), ServerError> { } } - // Create directories and root user - create_directories().await; - create_root_user().await; - }); + // Create directories. + create_directories(&config.system).await?; + Ok::<(), ServerError>(()) + }) }) .join() - .expect("Failed to create directories and root user"); - }); + .expect("Failed to create directories and root user") + }) + .with_error_context(|err| format!("Failed to create directories, err: {err}"))?; // Initialize logging let mut logging = Logging::new(config.telemetry.clone()); @@ -153,20 +151,21 @@ fn main() -> Result<(), ServerError> { // let urb = io_uring::IoUring::builder(); // TODO: Shall we make the size of ring be configureable ? let mut rt = monoio::RuntimeBuilder::<monoio::IoUringDriver>::new() - //.uring_builder(urb.setup_coop_taskrun()) + //.uring_builder(urb.setup_coop_taskrun()) // broken shit. .with_entries(1024) // Default size .enable_timer() .build() .expect(format!("Failed to build monoio runtime for shard-{id}").as_str()); rt.block_on(async move { let builder = IggyShard::builder(); - let shard = builder + let mut shard = builder .id(id) .connections(connections) .server_config(server_config) - .build_and_init() + .build() .await; + shard.init().await; shard.assert_init(); }) }) diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs index 750e2b71..640ffb57 100644 --- a/core/server/src/shard/builder.rs +++ b/core/server/src/shard/builder.rs @@ -16,7 +16,12 @@ * under the License. */ -use crate::{configs::server::ServerConfig, shard::Shard}; +use std::{cell::Cell, rc::Rc, sync::Arc}; + +use iggy_common::{Aes256GcmEncryptor, EncryptorKind}; +use tracing::info; + +use crate::{configs::server::ServerConfig, map_toggle_str, shard::Shard, state::{file::FileState, StateKind}, streaming::{persistence::persister::{FilePersister, FileWithSyncPersister, PersisterKind}, storage::SystemStorage}, versioning::SemanticVersion}; use super::{connector::ShardConnector, frame::ShardFrame, IggyShard}; @@ -43,11 +48,12 @@ impl IggyShardBuilder { self } - pub async fn build_and_init(self) -> IggyShard { + // TODO: Too much happens in there, some of those bootstrapping logic should be moved outside. + pub async fn build(self) -> IggyShard { let id = self.id.unwrap(); let config = self.config.unwrap(); let connections = self.connections.unwrap(); - let (stop_sender, stop_receiver, receiver) = connections + let (stop_sender, stop_receiver, frame_receiver) = connections .iter() .filter(|c| c.id == id) .map(|c| { @@ -60,7 +66,47 @@ impl IggyShardBuilder { .next() .expect("Failed to find connection with the specified ID"); let shards = connections.into_iter().map(Shard::new).collect(); + let version = SemanticVersion::current().expect("Invalid version"); + + info!( + "Server-side encryption is {}.", + map_toggle_str(config.system.encryption.enabled) + ); + let encryptor: Option<Arc<EncryptorKind>> = match config.system.encryption.enabled { + true => Some(Arc::new(EncryptorKind::Aes256Gcm( + Aes256GcmEncryptor::from_base64_key(&config.system.encryption.key).unwrap(), + ))), + false => None, + }; + + let state_persister = Self::resolve_persister(config.system.state.enforce_fsync); + let state = Rc::new(StateKind::File(FileState::new( + &config.system.get_state_messages_file_path(), + &version, + state_persister, + encryptor.clone(), + ))); + + let partition_persister = Self::resolve_persister(config.system.partition.enforce_fsync); + let storage = SystemStorage::new(config.system, partition_persister); + + IggyShard { + id: id, + shards: shards, + shards_table: Default::default(), + storage: storage, + state: state, + config: config, + stop_receiver: stop_receiver, + stop_sender: stop_sender, + frame_receiver: Cell::new(Some(frame_receiver)), + } + } - IggyShard::new(id, shards, config, stop_receiver, stop_sender, receiver) + fn resolve_persister(enforce_fsync: bool) -> Arc<PersisterKind> { + match enforce_fsync { + true => Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister)), + false => Arc::new(PersisterKind::File(FilePersister)), + } } } diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index dcb089d9..54b88fb1 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -15,6 +15,7 @@ * specific language governing permissions and limitations * under the License. */ + pub mod builder; pub mod connector; pub mod frame; @@ -25,9 +26,9 @@ use builder::IggyShardBuilder; use connector::{Receiver, ShardConnector, StopReceiver, StopSender}; use frame::ShardFrame; use namespace::IggyNamespace; -use std::cell::{Cell, RefCell}; +use std::{cell::{Cell, RefCell}, rc::Rc, sync::Arc}; -use crate::configs::server::ServerConfig; +use crate::{bootstrap::create_root_user, configs::server::ServerConfig, state::file::FileState, streaming::storage::SystemStorage}; pub(crate) struct Shard { id: u16, connection: ShardConnector<ShardFrame>, @@ -49,21 +50,23 @@ struct ShardInfo { pub struct IggyShard { pub id: u16, shards: Vec<Shard>, - //shards_table: RefCell<HashMap<IggyNamespace, ShardInfo>>, + shards_table: RefCell<HashMap<IggyNamespace, ShardInfo>>, //pub(crate) permissioner: RefCell<Permissioner>, //pub(crate) streams: RwLock<HashMap<u32, Stream>>, //pub(crate) streams_ids: RefCell<HashMap<String, u32>>, //pub(crate) users: RefCell<HashMap<UserId, User>>, + // TODO: Refactor. + pub(crate) storage: Arc<SystemStorage>, // TODO - get rid of this dynamic dispatch. - //pub(crate) state: Rc<FileState>, + pub(crate) state: Rc<FileState>, //pub(crate) encryptor: Option<Rc<dyn Encryptor>>, config: ServerConfig, //pub(crate) client_manager: RefCell<ClientManager>, //pub(crate) active_sessions: RefCell<Vec<Session>>, //pub(crate) metrics: Metrics, - pub message_receiver: Cell<Option<Receiver<ShardFrame>>>, + pub frame_receiver: Cell<Option<Receiver<ShardFrame>>>, stop_receiver: StopReceiver, stop_sender: StopSender, } @@ -73,22 +76,26 @@ impl IggyShard { Default::default() } - pub(crate) fn new( - id: u16, - shards: Vec<Shard>, - config: ServerConfig, - stop_receiver: StopReceiver, - stop_sender: StopSender, - shard_messages_receiver: Receiver<ShardFrame>, - ) -> Self { - Self { - id, - shards, - config, - stop_receiver, - stop_sender, - message_receiver: Cell::new(Some(shard_messages_receiver)), - } + pub async fn init(&mut self) { + let user = create_root_user(); + self.load_state().await; + self.load_users().await; + // Add default root user. + todo!(); + self.load_streams().await; + + } + + async fn load_state(&self) { + todo!() + } + + async fn load_users(&self) { + todo!() + } + + async fn load_streams(&self) { + todo!() } pub fn assert_init(&self) {} diff --git a/core/server/src/shard/namespace.rs b/core/server/src/shard/namespace.rs index 4b972125..d9aedfb7 100644 --- a/core/server/src/shard/namespace.rs +++ b/core/server/src/shard/namespace.rs @@ -15,6 +15,7 @@ * specific language governing permissions and limitations * under the License. */ + #[derive(Debug, Clone, Eq, PartialEq, Hash)] pub struct IggyNamespace { pub(crate) stream_id: u32, diff --git a/core/server/src/state/file.rs b/core/server/src/state/file.rs index c9e77977..f162ed75 100644 --- a/core/server/src/state/file.rs +++ b/core/server/src/state/file.rs @@ -84,18 +84,7 @@ impl FileState { impl State for FileState { async fn init(&self) -> Result<Vec<StateEntry>, IggyError> { - if !Path::new(&self.path).exists() { - info!("State file does not exist, creating a new one"); - self.persister - .overwrite(&self.path, &[]) - .await - .with_error_context(|error| { - format!( - "{COMPONENT} (error: {error}) - failed to overwrite state file, path: {}", - self.path - ) - })?; - } + assert!(Path::new(&self.path).exists()); let entries = self.load_entries().await.with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to load entries")
