This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit d4215002b3b895f8b452b576c22dc91c99fbf7b1
Author: numinex <[email protected]>
AuthorDate: Sat May 17 14:00:52 2025 +0200

    begin shard init
---
 core/server/Cargo.toml             |   2 +-
 core/server/src/bootstrap.rs       | 131 +++++++++++++++++++++++++++++++++++--
 core/server/src/main.rs            |  33 +++++-----
 core/server/src/shard/builder.rs   |  54 +++++++++++++--
 core/server/src/shard/mod.rs       |  49 ++++++++------
 core/server/src/shard/namespace.rs |   1 +
 core/server/src/state/file.rs      |  13 +---
 7 files changed, 221 insertions(+), 62 deletions(-)

diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 3c6bccd5..0700dd39 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -64,7 +64,7 @@ jsonwebtoken = "9.3.1"
 lending-iterator = "0.1.7"
 mimalloc = { workspace = true, optional = true }
 moka = { version = "0.12.10", features = ["future"] }
-monoio = "0.2.4"
+monoio = { version = "0.2.4", features = ["mkdirat", "unlinkat"] }
 nix = { version = "0.30", features = ["fs"] }
 once_cell = "1.21.3"
 opentelemetry = { version = "0.30.0", features = ["trace", "logs"] }
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index fd6a5aa0..d51096a2 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -1,7 +1,24 @@
-use monoio::{Buildable, Driver, Runtime};
+use iggy_common::{
+    defaults::{
+        DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME, MAX_PASSWORD_LENGTH, 
MAX_USERNAME_LENGTH,
+        MIN_PASSWORD_LENGTH, MIN_USERNAME_LENGTH,
+    },
+    IggyError,
+};
+use monoio::{
+    fs::create_dir_all,
+    Buildable, Driver, Runtime,
+};
+use tracing::info;
 
-use crate::shard::{connector::ShardConnector, frame::ShardFrame};
-use std::ops::Range;
+use crate::{
+    configs::{config_provider::ConfigProviderKind, server::ServerConfig, 
system::SystemConfig},
+    server_error::ServerError,
+    shard::{connector::ShardConnector, frame::ShardFrame},
+    streaming::users::user::User,
+    IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV,
+};
+use std::{env, fs::remove_dir_all, ops::Range, path::Path};
 
 pub fn create_shard_connections(shards_set: Range<usize>) -> 
Vec<ShardConnector<ShardFrame>> {
     let shards_count = shards_set.len();
@@ -13,12 +30,112 @@ pub fn create_shard_connections(shards_set: Range<usize>) 
-> Vec<ShardConnector<
     connectors
 }
 
-pub async fn create_directories() {
-    todo!();
+pub async fn load_config(
+    config_provider: &ConfigProviderKind,
+) -> Result<ServerConfig, ServerError> {
+    let config = ServerConfig::load(config_provider).await?;
+    Ok(config)
 }
 
-pub async fn create_root_user() {
-    todo!();
+pub async fn create_directories(config: &SystemConfig) -> Result<(), 
IggyError> {
+    let system_path = config.get_system_path();
+    if !Path::new(&system_path).exists() && 
create_dir_all(&system_path).await.is_err() {
+        return Err(IggyError::CannotCreateBaseDirectory(system_path));
+    }
+
+    let state_path = config.get_state_path();
+    if !Path::new(&state_path).exists() && 
create_dir_all(&state_path).await.is_err() {
+        return Err(IggyError::CannotCreateStateDirectory(state_path));
+    }
+
+    let streams_path = config.get_streams_path();
+    if !Path::new(&streams_path).exists() && 
create_dir_all(&streams_path).await.is_err() {
+        return Err(IggyError::CannotCreateStreamsDirectory(streams_path));
+    }
+
+    let runtime_path = config.get_runtime_path();
+    // TODO: Change remove_dir_all to async version, once we implement the dir 
walk using monoio `remove_dir` method.
+    if Path::new(&runtime_path).exists() && 
remove_dir_all(&runtime_path).is_err() {
+        return Err(IggyError::CannotRemoveRuntimeDirectory(runtime_path));
+    }
+
+    if create_dir_all(&runtime_path).await.is_err() {
+        return Err(IggyError::CannotCreateRuntimeDirectory(runtime_path));
+    }
+
+    info!(
+        "Initializing system, data will be stored at: {}",
+        config.get_system_path()
+    );
+    Ok(())
+
+    // TODO: Move this to individual shard level
+    /*
+    let state_entries = self.state.init().await.with_error_context(|error| {
+        format!("{COMPONENT} (error: {error}) - failed to initialize state 
entries")
+    })?;
+    let system_state = SystemState::init(state_entries)
+        .await
+        .with_error_context(|error| {
+            format!("{COMPONENT} (error: {error}) - failed to initialize 
system state")
+        })?;
+    let now = Instant::now();
+    self.load_version().await.with_error_context(|error| {
+        format!("{COMPONENT} (error: {error}) - failed to load version")
+    })?;
+    self.load_users(system_state.users.into_values().collect())
+        .await
+        .with_error_context(|error| {
+            format!("{COMPONENT} (error: {error}) - failed to load users")
+        })?;
+    self.load_streams(system_state.streams.into_values().collect())
+        .await
+        .with_error_context(|error| {
+            format!("{COMPONENT} (error: {error}) - failed to load streams")
+        })?;
+    if let Some(archiver) = self.archiver.as_ref() {
+        archiver
+            .init()
+            .await
+            .expect("Failed to initialize archiver");
+    }
+    info!("Initialized system in {} ms.", now.elapsed().as_millis());
+    Ok(())
+    */
+}
+
+pub fn create_root_user() -> User {
+    info!("Creating root user...");
+    let username = env::var(IGGY_ROOT_USERNAME_ENV);
+    let password = env::var(IGGY_ROOT_PASSWORD_ENV);
+    if (username.is_ok() && password.is_err()) || (username.is_err() && 
password.is_ok()) {
+        panic!("When providing the custom root user credentials, both username 
and password must be set.");
+    }
+    if username.is_ok() && password.is_ok() {
+        info!("Using the custom root user credentials.");
+    } else {
+        info!("Using the default root user credentials.");
+    }
+
+    let username = username.unwrap_or(DEFAULT_ROOT_USERNAME.to_string());
+    let password = password.unwrap_or(DEFAULT_ROOT_PASSWORD.to_string());
+    if username.is_empty() || password.is_empty() {
+        panic!("Root user credentials are not set.");
+    }
+    if username.len() < MIN_USERNAME_LENGTH {
+        panic!("Root username is too short.");
+    }
+    if username.len() > MAX_USERNAME_LENGTH {
+        panic!("Root username is too long.");
+    }
+    if password.len() < MIN_PASSWORD_LENGTH {
+        panic!("Root password is too short.");
+    }
+    if password.len() > MAX_PASSWORD_LENGTH {
+        panic!("Root password is too long.");
+    }
+    let user = User::root(&username, &password);
+    user
 }
 
 pub fn create_default_executor<D>() -> Runtime<D>
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index a5bbbe48..cd79cedc 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -21,11 +21,13 @@ use std::thread::available_parallelism;
 use anyhow::Result;
 use clap::Parser;
 use dotenvy::dotenv;
+use error_set::ErrContext;
 use figlet_rs::FIGfont;
 use monoio::Buildable;
 use server::args::Args;
 use server::bootstrap::{
     create_default_executor, create_directories, create_root_user, 
create_shard_connections,
+    load_config,
 };
 use server::channels::commands::archive_state::ArchiveStateExecutor;
 use 
server::channels::commands::clean_personal_access_tokens::CleanPersonalAccessTokensExecutor;
@@ -73,17 +75,12 @@ fn main() -> Result<(), ServerError> {
     }
 
     let args = Args::parse();
+    // TODO: I think we could get rid of config provider, since we support 
only TOML
+    // as config provider.
     let config_provider = config_provider::resolve(&args.config_provider)?;
     let config = std::thread::scope(|scope| {
         let config = scope
             .spawn(move || {
-                async fn load_config(
-                    config_provider: &ConfigProviderKind,
-                ) -> Result<ServerConfig, ServerError> {
-                    let config = ServerConfig::load(config_provider).await?;
-                    Ok(config)
-                }
-
                 let mut rt = 
create_default_executor::<monoio::IoUringDriver>();
                 rt.block_on(load_config(&config_provider))
             })
@@ -93,7 +90,7 @@ fn main() -> Result<(), ServerError> {
     })?;
 
     // Create directories and root user.
-    // Remove `local_data` directory if run with `--fresh` flag. 
+    // Remove `local_data` directory if run with `--fresh` flag.
     std::thread::scope(|scope| {
         scope
             .spawn(|| {
@@ -115,14 +112,15 @@ fn main() -> Result<(), ServerError> {
                         }
                     }
 
-                    // Create directories and root user
-                    create_directories().await;
-                    create_root_user().await;
-                });
+                    // Create directories. 
+                    create_directories(&config.system).await?;
+                    Ok::<(), ServerError>(())
+                })
             })
             .join()
-            .expect("Failed to create directories and root user");
-    });
+            .expect("Failed to create directories and root user")
+    })
+    .with_error_context(|err| format!("Failed to create directories, err: 
{err}"))?;
 
     // Initialize logging
     let mut logging = Logging::new(config.telemetry.clone());
@@ -153,20 +151,21 @@ fn main() -> Result<(), ServerError> {
                 // let urb = io_uring::IoUring::builder();
                 // TODO: Shall we make the size of ring be configureable ?
                 let mut rt = 
monoio::RuntimeBuilder::<monoio::IoUringDriver>::new()
-                    //.uring_builder(urb.setup_coop_taskrun())
+                    //.uring_builder(urb.setup_coop_taskrun()) // broken shit.
                     .with_entries(1024) // Default size
                     .enable_timer()
                     .build()
                     .expect(format!("Failed to build monoio runtime for 
shard-{id}").as_str());
                 rt.block_on(async move {
                     let builder = IggyShard::builder();
-                    let shard = builder
+                    let mut shard = builder
                         .id(id)
                         .connections(connections)
                         .server_config(server_config)
-                        .build_and_init()
+                        .build()
                         .await;
 
+                    shard.init().await;
                     shard.assert_init();
                 })
             })
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
index 750e2b71..640ffb57 100644
--- a/core/server/src/shard/builder.rs
+++ b/core/server/src/shard/builder.rs
@@ -16,7 +16,12 @@
  * under the License.
  */
 
-use crate::{configs::server::ServerConfig, shard::Shard};
+use std::{cell::Cell, rc::Rc, sync::Arc};
+
+use iggy_common::{Aes256GcmEncryptor, EncryptorKind};
+use tracing::info;
+
+use crate::{configs::server::ServerConfig, map_toggle_str, shard::Shard, 
state::{file::FileState, StateKind}, 
streaming::{persistence::persister::{FilePersister, FileWithSyncPersister, 
PersisterKind}, storage::SystemStorage}, versioning::SemanticVersion};
 
 use super::{connector::ShardConnector, frame::ShardFrame, IggyShard};
 
@@ -43,11 +48,12 @@ impl IggyShardBuilder {
         self
     }
 
-    pub async fn build_and_init(self) -> IggyShard {
+    // TODO: Too much happens in there, some of those bootstrapping logic 
should be moved outside.
+    pub async fn build(self) -> IggyShard {
         let id = self.id.unwrap();
         let config = self.config.unwrap();
         let connections = self.connections.unwrap();
-        let (stop_sender, stop_receiver, receiver) = connections
+        let (stop_sender, stop_receiver, frame_receiver) = connections
             .iter()
             .filter(|c| c.id == id)
             .map(|c| {
@@ -60,7 +66,47 @@ impl IggyShardBuilder {
             .next()
             .expect("Failed to find connection with the specified ID");
         let shards = connections.into_iter().map(Shard::new).collect();
+        let version = SemanticVersion::current().expect("Invalid version");
+
+        info!(
+            "Server-side encryption is {}.",
+            map_toggle_str(config.system.encryption.enabled)
+        );
+        let encryptor: Option<Arc<EncryptorKind>> = match 
config.system.encryption.enabled {
+            true => Some(Arc::new(EncryptorKind::Aes256Gcm(
+                
Aes256GcmEncryptor::from_base64_key(&config.system.encryption.key).unwrap(),
+            ))),
+            false => None,
+        };
+
+        let state_persister = 
Self::resolve_persister(config.system.state.enforce_fsync);
+        let state = Rc::new(StateKind::File(FileState::new(
+            &config.system.get_state_messages_file_path(),
+            &version,
+            state_persister,
+            encryptor.clone(),
+        )));
+
+        let partition_persister = 
Self::resolve_persister(config.system.partition.enforce_fsync);
+        let storage = SystemStorage::new(config.system, partition_persister);
+
+        IggyShard {
+                id: id,
+                shards: shards,
+                shards_table: Default::default(),
+                storage: storage,
+                state: state,
+                config: config,
+                stop_receiver: stop_receiver,
+                stop_sender: stop_sender,
+                frame_receiver: Cell::new(Some(frame_receiver)),
+            }
+    }
 
-        IggyShard::new(id, shards, config, stop_receiver, stop_sender, 
receiver)
+    fn resolve_persister(enforce_fsync: bool) -> Arc<PersisterKind> {
+        match enforce_fsync {
+            true => 
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister)),
+            false => Arc::new(PersisterKind::File(FilePersister)),
+        }
     }
 }
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index dcb089d9..54b88fb1 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -15,6 +15,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 pub mod builder;
 pub mod connector;
 pub mod frame;
@@ -25,9 +26,9 @@ use builder::IggyShardBuilder;
 use connector::{Receiver, ShardConnector, StopReceiver, StopSender};
 use frame::ShardFrame;
 use namespace::IggyNamespace;
-use std::cell::{Cell, RefCell};
+use std::{cell::{Cell, RefCell}, rc::Rc, sync::Arc};
 
-use crate::configs::server::ServerConfig;
+use crate::{bootstrap::create_root_user, configs::server::ServerConfig, 
state::file::FileState, streaming::storage::SystemStorage};
 pub(crate) struct Shard {
     id: u16,
     connection: ShardConnector<ShardFrame>,
@@ -49,21 +50,23 @@ struct ShardInfo {
 pub struct IggyShard {
     pub id: u16,
     shards: Vec<Shard>,
-    //shards_table: RefCell<HashMap<IggyNamespace, ShardInfo>>,
+    shards_table: RefCell<HashMap<IggyNamespace, ShardInfo>>,
 
     //pub(crate) permissioner: RefCell<Permissioner>,
     //pub(crate) streams: RwLock<HashMap<u32, Stream>>,
     //pub(crate) streams_ids: RefCell<HashMap<String, u32>>,
     //pub(crate) users: RefCell<HashMap<UserId, User>>,
+    // TODO: Refactor.
+    pub(crate) storage: Arc<SystemStorage>,
 
     // TODO - get rid of this dynamic dispatch.
-    //pub(crate) state: Rc<FileState>,
+    pub(crate) state: Rc<FileState>,
     //pub(crate) encryptor: Option<Rc<dyn Encryptor>>,
     config: ServerConfig,
     //pub(crate) client_manager: RefCell<ClientManager>,
     //pub(crate) active_sessions: RefCell<Vec<Session>>,
     //pub(crate) metrics: Metrics,
-    pub message_receiver: Cell<Option<Receiver<ShardFrame>>>,
+    pub frame_receiver: Cell<Option<Receiver<ShardFrame>>>,
     stop_receiver: StopReceiver,
     stop_sender: StopSender,
 }
@@ -73,22 +76,26 @@ impl IggyShard {
         Default::default()
     }
 
-    pub(crate) fn new(
-        id: u16,
-        shards: Vec<Shard>,
-        config: ServerConfig,
-        stop_receiver: StopReceiver,
-        stop_sender: StopSender,
-        shard_messages_receiver: Receiver<ShardFrame>,
-    ) -> Self {
-        Self {
-            id,
-            shards,
-            config,
-            stop_receiver,
-            stop_sender,
-            message_receiver: Cell::new(Some(shard_messages_receiver)),
-        }
+    pub async fn init(&mut self) {
+        let user = create_root_user();
+        self.load_state().await;
+        self.load_users().await;
+        // Add default root user.
+        todo!();
+        self.load_streams().await;
+
+    }
+
+    async fn load_state(&self) {
+        todo!()
+    }
+
+    async fn load_users(&self) {
+        todo!()
+    }
+
+    async fn load_streams(&self) {
+        todo!()
     }
 
     pub fn assert_init(&self) {}
diff --git a/core/server/src/shard/namespace.rs 
b/core/server/src/shard/namespace.rs
index 4b972125..d9aedfb7 100644
--- a/core/server/src/shard/namespace.rs
+++ b/core/server/src/shard/namespace.rs
@@ -15,6 +15,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 #[derive(Debug, Clone, Eq, PartialEq, Hash)]
 pub struct IggyNamespace {
     pub(crate) stream_id: u32,
diff --git a/core/server/src/state/file.rs b/core/server/src/state/file.rs
index c9e77977..f162ed75 100644
--- a/core/server/src/state/file.rs
+++ b/core/server/src/state/file.rs
@@ -84,18 +84,7 @@ impl FileState {
 
 impl State for FileState {
     async fn init(&self) -> Result<Vec<StateEntry>, IggyError> {
-        if !Path::new(&self.path).exists() {
-            info!("State file does not exist, creating a new one");
-            self.persister
-                .overwrite(&self.path, &[])
-                .await
-                .with_error_context(|error| {
-                    format!(
-                        "{COMPONENT} (error: {error}) - failed to overwrite 
state file, path: {}",
-                        self.path
-                    )
-                })?;
-        }
+        assert!(Path::new(&self.path).exists());
 
         let entries = self.load_entries().await.with_error_context(|error| {
             format!("{COMPONENT} (error: {error}) - failed to load entries")

Reply via email to