This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new 7bb27bde begin shard init
7bb27bde is described below
commit 7bb27bde089c717232e65c260af27a39bb7bd624
Author: numinex <[email protected]>
AuthorDate: Sat May 17 14:00:52 2025 +0200
begin shard init
---
core/server/Cargo.toml | 2 +-
core/server/src/bootstrap.rs | 132 ++++++++++++++++++++++++++++++++++++---
core/server/src/main.rs | 33 +++++-----
core/server/src/shard/builder.rs | 2 +-
core/server/src/shard/mod.rs | 7 ++-
core/server/src/state/file.rs | 13 +---
6 files changed, 150 insertions(+), 39 deletions(-)
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 3c0e17e7..d29d935f 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -61,7 +61,7 @@ jsonwebtoken = "9.3.1"
lending-iterator = "0.1.7"
mimalloc = { workspace = true, optional = true }
moka = { version = "0.12.10", features = ["future"] }
-monoio = "0.2.4"
+monoio = { version = "0.2.4", features = ["mkdirat", "unlinkat"] }
nix = { version = "0.30", features = ["fs"] }
once_cell = "1.21.3"
openssl = { workspace = true }
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index fd6a5aa0..67b0a37a 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -1,7 +1,25 @@
-use monoio::{Buildable, Driver, Runtime};
+use iggy_common::{
+ defaults::{
+ DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME, MAX_PASSWORD_LENGTH,
MAX_USERNAME_LENGTH,
+ MIN_PASSWORD_LENGTH, MIN_USERNAME_LENGTH,
+ },
+ IggyError,
+};
+use monoio::{
+ fs::{create_dir_all, remove_dir},
+ Buildable, Driver, Runtime,
+};
+use tracing::info;
-use crate::shard::{connector::ShardConnector, frame::ShardFrame};
-use std::ops::Range;
+use crate::{
+ configs::{config_provider::ConfigProviderKind, server::ServerConfig,
system::SystemConfig},
+ server_error::ServerError,
+ shard::{connector::ShardConnector, frame::ShardFrame},
+ state::command::EntryCommand,
+ streaming::users::user::User,
+ IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV,
+};
+use std::{env, fs::remove_dir_all, ops::Range, path::Path};
pub fn create_shard_connections(shards_set: Range<usize>) ->
Vec<ShardConnector<ShardFrame>> {
let shards_count = shards_set.len();
@@ -13,12 +31,112 @@ pub fn create_shard_connections(shards_set: Range<usize>)
-> Vec<ShardConnector<
connectors
}
-pub async fn create_directories() {
- todo!();
+pub async fn load_config(
+ config_provider: &ConfigProviderKind,
+) -> Result<ServerConfig, ServerError> {
+ let config = ServerConfig::load(config_provider).await?;
+ Ok(config)
}
-pub async fn create_root_user() {
- todo!();
+pub async fn create_directories(config: &SystemConfig) -> Result<(),
IggyError> {
+ let system_path = config.get_system_path();
+ if !Path::new(&system_path).exists() &&
create_dir_all(&system_path).await.is_err() {
+ return Err(IggyError::CannotCreateBaseDirectory(system_path));
+ }
+
+ let state_path = config.get_state_path();
+ if !Path::new(&state_path).exists() &&
create_dir_all(&state_path).await.is_err() {
+ return Err(IggyError::CannotCreateStateDirectory(state_path));
+ }
+
+ let streams_path = config.get_streams_path();
+ if !Path::new(&streams_path).exists() &&
create_dir_all(&streams_path).await.is_err() {
+ return Err(IggyError::CannotCreateStreamsDirectory(streams_path));
+ }
+
+ let runtime_path = config.get_runtime_path();
+ // TODO: Change remove_dir_all to async version, once we implement the dir
walk using monoio `remove_dir` method.
+ if Path::new(&runtime_path).exists() &&
remove_dir_all(&runtime_path).is_err() {
+ return Err(IggyError::CannotRemoveRuntimeDirectory(runtime_path));
+ }
+
+ if create_dir_all(&runtime_path).await.is_err() {
+ return Err(IggyError::CannotCreateRuntimeDirectory(runtime_path));
+ }
+
+ info!(
+ "Initializing system, data will be stored at: {}",
+ config.get_system_path()
+ );
+ Ok(())
+
+ // TODO: Move this to individual shard level
+ /*
+ let state_entries = self.state.init().await.with_error_context(|error| {
+ format!("{COMPONENT} (error: {error}) - failed to initialize state
entries")
+ })?;
+ let system_state = SystemState::init(state_entries)
+ .await
+ .with_error_context(|error| {
+ format!("{COMPONENT} (error: {error}) - failed to initialize
system state")
+ })?;
+ let now = Instant::now();
+ self.load_version().await.with_error_context(|error| {
+ format!("{COMPONENT} (error: {error}) - failed to load version")
+ })?;
+ self.load_users(system_state.users.into_values().collect())
+ .await
+ .with_error_context(|error| {
+ format!("{COMPONENT} (error: {error}) - failed to load users")
+ })?;
+ self.load_streams(system_state.streams.into_values().collect())
+ .await
+ .with_error_context(|error| {
+ format!("{COMPONENT} (error: {error}) - failed to load streams")
+ })?;
+ if let Some(archiver) = self.archiver.as_ref() {
+ archiver
+ .init()
+ .await
+ .expect("Failed to initialize archiver");
+ }
+ info!("Initialized system in {} ms.", now.elapsed().as_millis());
+ Ok(())
+ */
+}
+
+pub fn create_root_user() -> User {
+ info!("Creating root user...");
+ let username = env::var(IGGY_ROOT_USERNAME_ENV);
+ let password = env::var(IGGY_ROOT_PASSWORD_ENV);
+ if (username.is_ok() && password.is_err()) || (username.is_err() &&
password.is_ok()) {
+ panic!("When providing the custom root user credentials, both username
and password must be set.");
+ }
+ if username.is_ok() && password.is_ok() {
+ info!("Using the custom root user credentials.");
+ } else {
+ info!("Using the default root user credentials.");
+ }
+
+ let username = username.unwrap_or(DEFAULT_ROOT_USERNAME.to_string());
+ let password = password.unwrap_or(DEFAULT_ROOT_PASSWORD.to_string());
+ if username.is_empty() || password.is_empty() {
+ panic!("Root user credentials are not set.");
+ }
+ if username.len() < MIN_USERNAME_LENGTH {
+ panic!("Root username is too short.");
+ }
+ if username.len() > MAX_USERNAME_LENGTH {
+ panic!("Root username is too long.");
+ }
+ if password.len() < MIN_PASSWORD_LENGTH {
+ panic!("Root password is too short.");
+ }
+ if password.len() > MAX_PASSWORD_LENGTH {
+ panic!("Root password is too long.");
+ }
+ let user = User::root(&username, &password);
+ user
}
pub fn create_default_executor<D>() -> Runtime<D>
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index a906eb60..c5aed121 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -21,11 +21,13 @@ use std::thread::available_parallelism;
use anyhow::Result;
use clap::Parser;
use dotenvy::dotenv;
+use error_set::ErrContext;
use figlet_rs::FIGfont;
use monoio::Buildable;
use server::args::Args;
use server::bootstrap::{
create_default_executor, create_directories, create_root_user,
create_shard_connections,
+ load_config,
};
use server::channels::commands::archive_state::ArchiveStateExecutor;
use
server::channels::commands::clean_personal_access_tokens::CleanPersonalAccessTokensExecutor;
@@ -73,17 +75,12 @@ fn main() -> Result<(), ServerError> {
}
let args = Args::parse();
+ // TODO: I think we could get rid of config provider, since we support
only TOML
+ // as config provider.
let config_provider = config_provider::resolve(&args.config_provider)?;
let config = std::thread::scope(|scope| {
let config = scope
.spawn(move || {
- async fn load_config(
- config_provider: &ConfigProviderKind,
- ) -> Result<ServerConfig, ServerError> {
- let config = ServerConfig::load(config_provider).await?;
- Ok(config)
- }
-
let mut rt =
create_default_executor::<monoio::IoUringDriver>();
rt.block_on(load_config(&config_provider))
})
@@ -93,7 +90,7 @@ fn main() -> Result<(), ServerError> {
})?;
// Create directories and root user.
- // Remove `local_data` directory if run with `--fresh` flag.
+ // Remove `local_data` directory if run with `--fresh` flag.
std::thread::scope(|scope| {
scope
.spawn(|| {
@@ -115,14 +112,15 @@ fn main() -> Result<(), ServerError> {
}
}
- // Create directories and root user
- create_directories().await;
- create_root_user().await;
- });
+ // Create directories.
+ create_directories(&config.system).await?;
+ Ok::<(), ServerError>(())
+ })
})
.join()
- .expect("Failed to create directories and root user");
- });
+ .expect("Failed to create directories and root user")
+ })
+ .with_error_context(|err| format!("Failed to create directories, err:
{err}"))?;
// Initialize logging
let mut logging = Logging::new(config.telemetry.clone());
@@ -153,20 +151,21 @@ fn main() -> Result<(), ServerError> {
// let urb = io_uring::IoUring::builder();
// TODO: Shall we make the size of ring be configureable ?
let mut rt =
monoio::RuntimeBuilder::<monoio::IoUringDriver>::new()
- //.uring_builder(urb.setup_coop_taskrun())
+ //.uring_builder(urb.setup_coop_taskrun()) // broken shit.
.with_entries(1024) // Default size
.enable_timer()
.build()
.expect(format!("Failed to build monoio runtime for
shard-{id}").as_str());
rt.block_on(async move {
let builder = IggyShard::builder();
- let shard = builder
+ let mut shard = builder
.id(id)
.connections(connections)
.server_config(server_config)
- .build_and_init()
+ .build()
.await;
+ shard.init().await;
shard.assert_init();
})
})
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
index 750e2b71..1a36a8fe 100644
--- a/core/server/src/shard/builder.rs
+++ b/core/server/src/shard/builder.rs
@@ -43,7 +43,7 @@ impl IggyShardBuilder {
self
}
- pub async fn build_and_init(self) -> IggyShard {
+ pub async fn build(self) -> IggyShard {
let id = self.id.unwrap();
let config = self.config.unwrap();
let connections = self.connections.unwrap();
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index dcb089d9..69ec9441 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -27,7 +27,7 @@ use frame::ShardFrame;
use namespace::IggyNamespace;
use std::cell::{Cell, RefCell};
-use crate::configs::server::ServerConfig;
+use crate::{bootstrap::create_root_user, configs::server::ServerConfig};
pub(crate) struct Shard {
id: u16,
connection: ShardConnector<ShardFrame>,
@@ -91,5 +91,10 @@ impl IggyShard {
}
}
+ pub async fn init(&mut self) {
+ let user = create_root_user();
+
+ }
+
pub fn assert_init(&self) {}
}
diff --git a/core/server/src/state/file.rs b/core/server/src/state/file.rs
index 458ffbab..ffe8943e 100644
--- a/core/server/src/state/file.rs
+++ b/core/server/src/state/file.rs
@@ -84,18 +84,7 @@ impl FileState {
impl State for FileState {
async fn init(&self) -> Result<Vec<StateEntry>, IggyError> {
- if !Path::new(&self.path).exists() {
- info!("State file does not exist, creating a new one");
- self.persister
- .overwrite(&self.path, &[])
- .await
- .with_error_context(|error| {
- format!(
- "{COMPONENT} (error: {error}) - failed to overwrite
state file, path: {}",
- self.path
- )
- })?;
- }
+ assert!(Path::new(&self.path).exists());
let entries = self.load_entries().await.with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to load entries")