This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit fc3f0fdbfba7f083daffc7d280b988f828caf773
Author: numinex <[email protected]>
AuthorDate: Sat Jun 21 13:51:27 2025 +0200

    fix state initialization with gate
---
 core/common/src/utils/crypto.rs               |   3 +-
 core/server/src/bootstrap.rs                  |  48 ++---
 core/server/src/lib.rs                        |   8 +-
 core/server/src/main.rs                       | 151 ++++++++++++---
 core/server/src/shard/builder.rs              |  65 +++----
 core/server/src/shard/gate.rs                 |  42 +++++
 core/server/src/shard/mod.rs                  | 256 ++++++++++++++++++++++++--
 core/server/src/shard/transmission/frame.rs   |  17 ++
 core/server/src/shard/transmission/message.rs |  17 ++
 core/server/src/shard/transmission/mod.rs     |  18 ++
 core/server/src/state/file.rs                 |   4 +-
 core/server/src/streaming/systems/info.rs     |  55 ------
 core/server/src/streaming/systems/system.rs   |  21 ++-
 13 files changed, 524 insertions(+), 181 deletions(-)

diff --git a/core/common/src/utils/crypto.rs b/core/common/src/utils/crypto.rs
index 34cd72c7..11d51cb7 100644
--- a/core/common/src/utils/crypto.rs
+++ b/core/common/src/utils/crypto.rs
@@ -23,7 +23,7 @@ use aes_gcm::aead::{Aead, OsRng};
 use aes_gcm::{AeadCore, Aes256Gcm, KeyInit};
 use std::fmt::Debug;
 
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub enum EncryptorKind {
     Aes256Gcm(Aes256GcmEncryptor),
 }
@@ -46,6 +46,7 @@ pub trait Encryptor {
     fn decrypt(&self, data: &[u8]) -> Result<Vec<u8>, IggyError>;
 }
 
+#[derive(Clone)]
 pub struct Aes256GcmEncryptor {
     cipher: Aes256Gcm,
 }
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index c6108568..4f356a9b 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -12,10 +12,13 @@ use crate::{
     IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV,
     configs::{config_provider::ConfigProviderKind, server::ServerConfig, 
system::SystemConfig},
     server_error::ServerError,
-    shard::{transmission::connector::ShardConnector, 
transmission::frame::ShardFrame},
-    streaming::users::user::User,
+    shard::transmission::{connector::ShardConnector, frame::ShardFrame},
+    streaming::{
+        persistence::persister::{FilePersister, FileWithSyncPersister, 
PersisterKind},
+        users::user::User,
+    },
 };
-use std::{env, fs::remove_dir_all, ops::Range, path::Path};
+use std::{env, fs::remove_dir_all, ops::Range, path::Path, sync::Arc};
 
 pub fn create_shard_connections(shards_set: Range<usize>) -> 
Vec<ShardConnector<ShardFrame>> {
     let shards_count = shards_set.len();
@@ -68,37 +71,7 @@ pub async fn create_directories(config: &SystemConfig) -> 
Result<(), IggyError>
 
     // TODO: Move this to individual shard level
     /*
-    let state_entries = self.state.init().await.with_error_context(|error| {
-        format!("{COMPONENT} (error: {error}) - failed to initialize state 
entries")
-    })?;
-    let system_state = SystemState::init(state_entries)
-        .await
-        .with_error_context(|error| {
-            format!("{COMPONENT} (error: {error}) - failed to initialize 
system state")
-        })?;
-    let now = Instant::now();
-    self.load_version().await.with_error_context(|error| {
-        format!("{COMPONENT} (error: {error}) - failed to load version")
-    })?;
-    self.load_users(system_state.users.into_values().collect())
-        .await
-        .with_error_context(|error| {
-            format!("{COMPONENT} (error: {error}) - failed to load users")
-        })?;
-    self.load_streams(system_state.streams.into_values().collect())
-        .await
-        .with_error_context(|error| {
-            format!("{COMPONENT} (error: {error}) - failed to load streams")
-        })?;
-    if let Some(archiver) = self.archiver.as_ref() {
-        archiver
-            .init()
-            .await
-            .expect("Failed to initialize archiver");
-    }
-    info!("Initialized system in {} ms.", now.elapsed().as_millis());
-    Ok(())
-    */
+     */
 }
 
 pub fn create_root_user() -> User {
@@ -158,3 +131,10 @@ pub fn create_shard_executor() -> 
Runtime<TimeDriver<monoio::IoUringDriver>> {
     let rt = Buildable::build(builder).expect("Failed to create default 
runtime");
     rt
 }
+
+pub fn resolve_persister(enforce_fsync: bool) -> Arc<PersisterKind> {
+    match enforce_fsync {
+        true => Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister)),
+        false => Arc::new(PersisterKind::File(FilePersister)),
+    }
+}
diff --git a/core/server/src/lib.rs b/core/server/src/lib.rs
index e64837da..3dcf64b2 100644
--- a/core/server/src/lib.rs
+++ b/core/server/src/lib.rs
@@ -43,11 +43,11 @@ pub mod streaming;
 pub mod tcp;
 pub mod versioning;
 
-const VERSION: &str = env!("CARGO_PKG_VERSION");
-const IGGY_ROOT_USERNAME_ENV: &str = "IGGY_ROOT_USERNAME";
-const IGGY_ROOT_PASSWORD_ENV: &str = "IGGY_ROOT_PASSWORD";
+pub const VERSION: &str = env!("CARGO_PKG_VERSION");
+pub const IGGY_ROOT_USERNAME_ENV: &str = "IGGY_ROOT_USERNAME";
+pub const IGGY_ROOT_PASSWORD_ENV: &str = "IGGY_ROOT_PASSWORD";
 
-pub(crate) fn map_toggle_str<'a>(enabled: bool) -> &'a str {
+pub fn map_toggle_str<'a>(enabled: bool) -> &'a str {
     match enabled {
         true => "enabled",
         false => "disabled",
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 67728862..fc2b21f4 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -16,6 +16,7 @@
  * under the License.
  */
 
+use std::sync::Arc;
 use std::thread::available_parallelism;
 
 use anyhow::Result;
@@ -23,33 +24,32 @@ use clap::Parser;
 use dotenvy::dotenv;
 use error_set::ErrContext;
 use figlet_rs::FIGfont;
+use iggy_common::create_user::CreateUser;
+use iggy_common::{Aes256GcmEncryptor, EncryptorKind, IggyError};
 use server::args::Args;
 use server::bootstrap::{
     create_default_executor, create_directories, create_root_user, 
create_shard_connections,
-    create_shard_executor, load_config,
+    create_shard_executor, load_config, resolve_persister,
 };
-use server::channels::commands::archive_state::ArchiveStateExecutor;
-use 
server::channels::commands::clean_personal_access_tokens::CleanPersonalAccessTokensExecutor;
-use server::channels::commands::maintain_messages::MaintainMessagesExecutor;
-use server::channels::commands::print_sysinfo::SysInfoPrintExecutor;
-use server::channels::commands::save_messages::SaveMessagesExecutor;
-use server::channels::commands::verify_heartbeats::VerifyHeartbeatsExecutor;
-use server::channels::handler::BackgroundServerCommandHandler;
-use server::configs::config_provider::{self, ConfigProviderKind};
-use server::configs::server::ServerConfig;
-use server::http::http_server;
+use server::configs::config_provider::{self};
 #[cfg(not(feature = "tokio-console"))]
 use server::log::logger::Logging;
 #[cfg(feature = "tokio-console")]
 use server::log::tokio_console::Logging;
-use server::quic::quic_server;
 use server::server_error::ServerError;
 use server::shard::IggyShard;
-use server::streaming::systems::system::{SharedSystem, System};
-use server::streaming::utils::MemoryPool;
-use server::tcp::tcp_server;
+use server::shard::gate::Gate;
+use server::state::StateKind;
+use server::state::command::EntryCommand;
+use server::state::file::FileState;
+use server::state::models::CreateUserWithId;
+use server::state::system::SystemState;
+use server::versioning::SemanticVersion;
+use server::{IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV, map_toggle_str};
 use tokio::time::Instant;
-use tracing::{info, instrument};
+use tracing::{error, info, instrument};
+
+const COMPONENT: &str = "MAIN";
 
 #[instrument(skip_all, name = "trace_start_server")]
 fn main() -> Result<(), ServerError> {
@@ -88,7 +88,11 @@ fn main() -> Result<(), ServerError> {
         config
     })?;
 
-    // Create directories and root user.
+    // Initialize logging
+    let mut logging = Logging::new(config.telemetry.clone());
+    logging.early_init();
+
+    // Create directories.
     // Remove `local_data` directory if run with `--fresh` flag.
     std::thread::scope(|scope| {
         scope
@@ -117,13 +121,9 @@ fn main() -> Result<(), ServerError> {
                 })
             })
             .join()
-            .expect("Failed to create directories and root user")
+            .expect("Failed join thread")
     })
-    .with_error_context(|err| format!("Failed to create directories, err: 
{err}"))?;
-
-    // Initialize logging
-    let mut logging = Logging::new(config.telemetry.clone());
-    logging.early_init();
+    .with_error_context(|err| format!("Failed to init server: {err}"))?;
 
     // TODO: Make this configurable from config as a range
     // for example this instance of Iggy will use cores from 0..4
@@ -132,9 +132,11 @@ fn main() -> Result<(), ServerError> {
     let shards_set = 0..shards_count;
     let connections = create_shard_connections(shards_set.clone());
     for shard_id in shards_set {
+        let gate: Arc<Gate<()>> = Arc::new(Gate::new());
         let id = shard_id as u16;
         let connections = connections.clone();
-        let server_config = config.clone();
+        let config = config.clone();
+        let state_persister = 
resolve_persister(config.system.state.enforce_fsync);
         std::thread::Builder::new()
             .name(format!("shard-{id}"))
             .spawn(move || {
@@ -143,18 +145,109 @@ fn main() -> Result<(), ServerError> {
 
                 let mut rt = create_shard_executor();
                 rt.block_on(async move {
+                    let version = SemanticVersion::current().expect("Invalid 
version");
+                    info!(
+                        "Server-side encryption is {}.",
+                        map_toggle_str(config.system.encryption.enabled)
+                    );
+                    let encryptor: Option<EncryptorKind> = match 
config.system.encryption.enabled {
+                        true => Some(EncryptorKind::Aes256Gcm(
+                            
Aes256GcmEncryptor::from_base64_key(&config.system.encryption.key)
+                                .unwrap(),
+                        )),
+                        false => None,
+                    };
+
+                    let state = StateKind::File(FileState::new(
+                        &config.system.get_state_messages_file_path(),
+                        &version,
+                        state_persister,
+                        encryptor.clone(),
+                    ));
+
+                    // We can't use std::sync::Once because it doesn't support 
async.
+                    // Trait bound on the closure is FnOnce.
+                    let gate = gate.clone();
+                    // Peak into the state to check if the root user exists.
+                    // If it does not exist, create it.
+                    gate.with_async::<Result<(), IggyError>>(async 
|gate_state| {
+                        // A thread already initialized state
+                        // Thus, we can skip it.
+                        if let Some(_) = gate_state.inner() {
+                            return Ok(());
+                        }
+
+                        let state_entries = 
state.load_entries().await.with_error_context(|error| {
+                            format!(
+                                "{COMPONENT} (error: {error}) - failed to load 
state entries"
+                            )
+                        })?;
+                        let root_exists = state_entries
+                            .into_iter()
+                            .find(|entry| {
+                                entry
+                                    .command()
+                                    .and_then(|command| match command {
+                                        EntryCommand::CreateUser(payload)
+                                            if payload.command.username
+                                            == IGGY_ROOT_USERNAME_ENV && 
payload.command.password == IGGY_ROOT_PASSWORD_ENV =>
+                                        {
+                                            Ok(true)
+                                        }
+                                        _ => Ok(false),
+                                    })
+                                    .map_or_else(
+                                        |err| {
+                                            error!("Failed to check if root 
user exists: {err}");
+                                            false
+                                        },
+                                        |v| v,
+                                    )
+                            })
+                            .is_some();
+
+                        if !root_exists {
+                            info!("No users found, creating the root user...");
+                            let root = create_root_user();
+                            let command = CreateUser {
+                                username: root.username.clone(),
+                                password: root.password.clone(),
+                                status: root.status,
+                                permissions: root.permissions.clone(),
+                            };
+                            state
+                                .apply(0, 
&EntryCommand::CreateUser(CreateUserWithId {
+                                    user_id: root.id,
+                                    command
+                                }))
+                                .await
+                                .with_error_context(|error| {
+                                    format!(
+                                        "{COMPONENT} (error: {error}) - failed 
to apply create user command, username: {}",
+                                        root.username
+                                    )
+                                })?;
+                        }
+
+                        gate_state.set_result(());
+                        Ok(())
+                    })
+                    .await;
+
                     let builder = IggyShard::builder();
                     let mut shard = builder
                         .id(id)
                         .connections(connections)
-                        .server_config(server_config)
+                        .config(config)
+                        .encryptor(encryptor)
+                        .version(version)
+                        .state(state)
                         .build()
                         .await;
 
-                    if let Err(e) = shard.init().await {
-                        //TODO: If one of the shards fails to initialize, we 
should crash the whole program;
-                        panic!("Failed to initialize shard-{id}: {e}");
-                    }
+                    //TODO: If one of the shards fails to initialize, we 
should crash the whole program;
+                    shard.init().await.expect("Failed to initialize 
shard-{id}: {e}");
+                    info!("Initiated shard with ID: {id}");
                     //TODO: If one of the shards fails to initialize, we 
should crash the whole program;
                     shard.assert_init();
                 })
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
index 36fe0e54..0fd5250f 100644
--- a/core/server/src/shard/builder.rs
+++ b/core/server/src/shard/builder.rs
@@ -22,14 +22,12 @@ use iggy_common::{Aes256GcmEncryptor, EncryptorKind};
 use tracing::info;
 
 use crate::{
+    bootstrap::resolve_persister,
     configs::server::ServerConfig,
     map_toggle_str,
     shard::Shard,
     state::{StateKind, file::FileState},
-    streaming::{
-        persistence::persister::{FilePersister, FileWithSyncPersister, 
PersisterKind},
-        storage::SystemStorage,
-    },
+    streaming::storage::SystemStorage,
     versioning::SemanticVersion,
 };
 
@@ -40,6 +38,9 @@ pub struct IggyShardBuilder {
     id: Option<u16>,
     connections: Option<Vec<ShardConnector<ShardFrame>>>,
     config: Option<ServerConfig>,
+    encryptor: Option<EncryptorKind>,
+    version: Option<SemanticVersion>,
+    state: Option<StateKind>,
 }
 
 impl IggyShardBuilder {
@@ -53,16 +54,33 @@ impl IggyShardBuilder {
         self
     }
 
-    pub fn server_config(mut self, config: ServerConfig) -> Self {
+    pub fn config(mut self, config: ServerConfig) -> Self {
         self.config = Some(config);
         self
     }
 
+    pub fn encryptor(mut self, encryptor: Option<EncryptorKind>) -> Self {
+        self.encryptor = encryptor;
+        self
+    }
+
+    pub fn version(mut self, version: SemanticVersion) -> Self {
+        self.version = Some(version);
+        self
+    }
+
+    pub fn state(mut self, state: StateKind) -> Self {
+        self.state = Some(state);
+        self
+    }
+
     // TODO: Too much happens in there, some of those bootstrapping logic 
should be moved outside.
     pub async fn build(self) -> IggyShard {
         let id = self.id.unwrap();
         let config = self.config.unwrap();
         let connections = self.connections.unwrap();
+        let state = self.state.unwrap();
+        let version = self.version.unwrap();
         let (stop_sender, stop_receiver, frame_receiver) = connections
             .iter()
             .filter(|c| c.id == id)
@@ -76,33 +94,8 @@ impl IggyShardBuilder {
             .next()
             .expect("Failed to find connection with the specified ID");
         let shards = connections.into_iter().map(Shard::new).collect();
-
-        //TODO: This can be discrete step in the builder bootstrapped from 
main function.
-        let version = SemanticVersion::current().expect("Invalid version");
-
-        //TODO: This can be discrete step in the builder bootstrapped from 
main function.
-        info!(
-            "Server-side encryption is {}.",
-            map_toggle_str(config.system.encryption.enabled)
-        );
-        let encryptor: Option<Arc<EncryptorKind>> = match 
config.system.encryption.enabled {
-            true => Some(Arc::new(EncryptorKind::Aes256Gcm(
-                
Aes256GcmEncryptor::from_base64_key(&config.system.encryption.key).unwrap(),
-            ))),
-            false => None,
-        };
-
-        //TODO: This can be discrete step in the builder bootstrapped from 
main function.
-        let state_persister = 
Self::resolve_persister(config.system.state.enforce_fsync);
-        let state = Rc::new(StateKind::File(FileState::new(
-            &config.system.get_state_messages_file_path(),
-            &version,
-            state_persister,
-            encryptor.clone(),
-        )));
-
-        //TODO: This can be discrete step in the builder bootstrapped from 
main function.
-        let partition_persister = 
Self::resolve_persister(config.system.partition.enforce_fsync);
+        //TODO: Eghhhh.......
+        let partition_persister = 
resolve_persister(config.system.partition.enforce_fsync);
         let storage = Rc::new(SystemStorage::new(
             config.system.clone(),
             partition_persister,
@@ -115,16 +108,10 @@ impl IggyShardBuilder {
             storage: storage,
             state: state,
             config: config,
+            version: version,
             stop_receiver: stop_receiver,
             stop_sender: stop_sender,
             frame_receiver: Cell::new(Some(frame_receiver)),
         }
     }
-
-    fn resolve_persister(enforce_fsync: bool) -> Arc<PersisterKind> {
-        match enforce_fsync {
-            true => 
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister)),
-            false => Arc::new(PersisterKind::File(FilePersister)),
-        }
-    }
 }
diff --git a/core/server/src/shard/gate.rs b/core/server/src/shard/gate.rs
new file mode 100644
index 00000000..a26950a4
--- /dev/null
+++ b/core/server/src/shard/gate.rs
@@ -0,0 +1,42 @@
+use std::sync::{Condvar, Mutex};
+
+#[derive(Default)]
+pub struct Gate<T> {
+    state: Mutex<GateState<T>>,
+}
+
+#[derive(Default)]
+pub struct GateState<T> {
+    result: Option<T>,
+}
+
+impl<T> GateState<T> {
+    pub fn set_result(&mut self, result: T) {
+        self.result = Some(result);
+    }
+
+    pub fn inner(&self) -> &Option<T> {
+        &self.result
+    }
+}
+
+impl<T> Gate<T>
+where
+    T: Default,
+{
+    pub fn new() -> Self {
+        Gate {
+            state: Default::default(),
+        }
+    }
+
+    pub async fn with_async<R>(&self, f: impl AsyncFnOnce(&mut GateState<T>) 
-> R) {
+        let mut guard = self.state.lock().unwrap();
+        f(&mut guard).await;
+    }
+
+    pub async fn with<R>(&self, f: impl FnOnce(&mut GateState<T>) -> R) {
+        let mut guard = self.state.lock().unwrap();
+        f(&mut guard);
+    }
+}
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 6a6d35b8..23ef1e15 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -17,16 +17,19 @@
  */
 
 pub mod builder;
+pub mod gate;
 pub mod namespace;
 pub mod transmission;
 
 use ahash::HashMap;
 use builder::IggyShardBuilder;
+use error_set::ErrContext;
 use iggy_common::IggyError;
 use namespace::IggyNamespace;
 use std::{
     cell::{Cell, RefCell},
     rc::Rc,
+    str::FromStr,
     sync::Arc,
     time::Instant,
 };
@@ -37,9 +40,17 @@ use crate::{
     bootstrap::create_root_user,
     configs::server::ServerConfig,
     shard::transmission::frame::ShardFrame,
-    state::{StateKind, file::FileState},
-    streaming::storage::SystemStorage,
+    state::{
+        StateKind,
+        file::FileState,
+        system::{SystemState, UserState},
+    },
+    streaming::{storage::SystemStorage, systems::info::SystemInfo},
+    versioning::SemanticVersion,
 };
+
+pub const COMPONENT: &str = "SHARD";
+
 pub(crate) struct Shard {
     id: u16,
     connection: ShardConnector<ShardFrame>,
@@ -62,6 +73,7 @@ pub struct IggyShard {
     pub id: u16,
     shards: Vec<Shard>,
     shards_table: RefCell<HashMap<IggyNamespace, ShardInfo>>,
+    version: SemanticVersion,
 
     //pub(crate) permissioner: RefCell<Permissioner>,
     //pub(crate) streams: RwLock<HashMap<u32, Stream>>,
@@ -70,7 +82,7 @@ pub struct IggyShard {
     // TODO: Refactor.
     pub(crate) storage: Rc<SystemStorage>,
 
-    pub(crate) state: Rc<StateKind>,
+    pub(crate) state: StateKind,
     //pub(crate) encryptor: Option<Rc<dyn Encryptor>>,
     config: ServerConfig,
     //pub(crate) client_manager: RefCell<ClientManager>,
@@ -92,10 +104,12 @@ impl IggyShard {
         //let state_entries = self.state.init().await?;
         //let system_state = SystemState::init(state_entries).await?;
         //let user = create_root_user();
-        self.load_state().await;
-        self.load_users().await;
+        self.load_version().await?;
+        let SystemState { users, streams } = self.load_state().await?;
+        self.load_users(users.into_values().collect()).await;
         // Add default root user.
-        self.load_streams().await;
+        self.load_streams(streams.into_values().collect()).await;
+
         //TODO: Fix the archiver.
         /*
         if let Some(archiver) = self.archiver.as_ref() {
@@ -109,16 +123,234 @@ impl IggyShard {
         Ok(())
     }
 
-    async fn load_state(&self) {
-        todo!()
+    async fn load_version(&self) -> Result<(), IggyError> {
+        async fn update_system_info(
+            storage: &Rc<SystemStorage>,
+            system_info: &mut SystemInfo,
+            version: &SemanticVersion,
+        ) -> Result<(), IggyError> {
+            system_info.update_version(version);
+            storage.info.save(system_info).await?;
+            Ok(())
+        }
+
+        let current_version = &self.version;
+        let mut system_info;
+        let load_system_info = self.storage.info.load().await;
+        if load_system_info.is_err() {
+            let error = load_system_info.err().unwrap();
+            if let IggyError::ResourceNotFound(_) = error {
+                info!("System info not found, creating...");
+                system_info = SystemInfo::default();
+                update_system_info(&self.storage, &mut system_info, 
current_version).await?;
+            } else {
+                return Err(error);
+            }
+        } else {
+            system_info = load_system_info.unwrap();
+        }
+
+        info!("Loaded {system_info}.");
+        let loaded_version = 
SemanticVersion::from_str(&system_info.version.version)?;
+        if current_version.is_equal_to(&loaded_version) {
+            info!("System version {current_version} is up to date.");
+        } else if current_version.is_greater_than(&loaded_version) {
+            info!(
+                "System version {current_version} is greater than 
{loaded_version}, checking the available migrations..."
+            );
+            update_system_info(&self.storage, &mut system_info, 
current_version).await?;
+        } else {
+            info!(
+                "System version {current_version} is lower than 
{loaded_version}, possible downgrade."
+            );
+            update_system_info(&self.storage, &mut system_info, 
current_version).await?;
+        }
+
+        Ok(())
     }
 
-    async fn load_users(&self) {
-        todo!()
+    async fn load_state(&self) -> Result<SystemState, IggyError> {
+        let state_entries = self.state.init().await.with_error_context(|error| 
{
+            format!("{COMPONENT} (error: {error}) - failed to initialize state 
entries")
+        })?;
+        let system_state = SystemState::init(state_entries)
+            .await
+            .with_error_context(|error| {
+                format!("{COMPONENT} (error: {error}) - failed to initialize 
system state")
+            })?;
+        Ok(system_state)
     }
 
-    async fn load_streams(&self) {
-        todo!()
+    async fn load_users(&mut self, users: Vec<UserState>) -> Result<(), 
IggyError> {
+        info!("Loading users...");
+        /*
+
+        for user_state in users.into_iter() {
+            let mut user = User::with_password(
+                user_state.id,
+                &user_state.username,
+                user_state.password_hash,
+                user_state.status,
+                user_state.permissions,
+            );
+
+            user.created_at = user_state.created_at;
+            user.personal_access_tokens = user_state
+                .personal_access_tokens
+                .into_values()
+                .map(|token| {
+                    (
+                        Arc::new(token.token_hash.clone()),
+                        PersonalAccessToken::raw(
+                            user_state.id,
+                            &token.name,
+                            &token.token_hash,
+                            token.expiry_at,
+                        ),
+                    )
+                })
+                .collect();
+            self.users.insert(user_state.id, user);
+        }
+
+        let users_count = self.users.len();
+        let current_user_id = self.users.keys().max().unwrap_or(&1);
+        USER_ID.store(current_user_id + 1, Ordering::SeqCst);
+        self.permissioner
+            .init(&self.users.values().collect::<Vec<&User>>());
+        self.metrics.increment_users(users_count as u32);
+        info!("Initialized {users_count} user(s).");
+        */
+        Ok(())
+    }
+
+    async fn load_streams(&mut self, streams: Vec<StreamState>) -> Result<(), 
IggyError> {
+        todo!();
+        info!("Loading streams from disk...");
+        let mut unloaded_streams = Vec::new();
+        let mut dir_entries = read_dir(&self.config.get_streams_path())
+            .await
+            .map_err(|error| {
+                error!("Cannot read streams directory: {error}");
+                IggyError::CannotReadStreams
+            })?;
+
+        while let Some(dir_entry) = 
dir_entries.next_entry().await.unwrap_or(None) {
+            let name = dir_entry.file_name().into_string().unwrap();
+            let stream_id = name.parse::<u32>().map_err(|_| {
+                error!("Invalid stream ID file with name: '{name}'.");
+                IggyError::InvalidNumberValue
+            })?;
+            let stream_state = streams.iter().find(|s| s.id == stream_id);
+            if stream_state.is_none() {
+                error!(
+                    "Stream with ID: '{stream_id}' was not found in state, but 
exists on disk and will be removed."
+                );
+                if let Err(error) = 
fs::remove_dir_all(&dir_entry.path()).await {
+                    error!("Cannot remove stream directory: {error}");
+                } else {
+                    warn!("Stream with ID: '{stream_id}' was removed.");
+                }
+                continue;
+            }
+
+            let stream_state = stream_state.unwrap();
+            let mut stream = Stream::empty(
+                stream_id,
+                &stream_state.name,
+                self.config.clone(),
+                self.storage.clone(),
+            );
+            stream.created_at = stream_state.created_at;
+            unloaded_streams.push(stream);
+        }
+
+        let state_stream_ids = streams
+            .iter()
+            .map(|stream| stream.id)
+            .collect::<AHashSet<u32>>();
+        let unloaded_stream_ids = unloaded_streams
+            .iter()
+            .map(|stream| stream.stream_id)
+            .collect::<AHashSet<u32>>();
+        let mut missing_ids = state_stream_ids
+            .difference(&unloaded_stream_ids)
+            .copied()
+            .collect::<AHashSet<u32>>();
+        if missing_ids.is_empty() {
+            info!("All streams found on disk were found in state.");
+        } else {
+            warn!("Streams with IDs: '{missing_ids:?}' were not found on 
disk.");
+            if self.config.recovery.recreate_missing_state {
+                info!(
+                    "Recreating missing state in recovery config is enabled, 
missing streams will be created."
+                );
+                for stream_id in missing_ids.iter() {
+                    let stream_id = *stream_id;
+                    let stream_state = streams.iter().find(|s| s.id == 
stream_id).unwrap();
+                    let stream = Stream::create(
+                        stream_id,
+                        &stream_state.name,
+                        self.config.clone(),
+                        self.storage.clone(),
+                    );
+                    stream.persist().await?;
+                    unloaded_streams.push(stream);
+                    info!(
+                        "Missing stream with ID: '{stream_id}', name: {} was 
recreated.",
+                        stream_state.name
+                    );
+                }
+                missing_ids.clear();
+            } else {
+                warn!(
+                    "Recreating missing state in recovery config is disabled, 
missing streams will not be created."
+                );
+            }
+        }
+
+        let mut streams_states = streams
+            .into_iter()
+            .filter(|s| !missing_ids.contains(&s.id))
+            .map(|s| (s.id, s))
+            .collect::<AHashMap<_, _>>();
+        let loaded_streams = RefCell::new(Vec::new());
+        let load_stream_tasks = unloaded_streams.into_iter().map(|mut stream| {
+            let state = streams_states.remove(&stream.stream_id).unwrap();
+
+            async {
+                stream.load(state).await?;
+                loaded_streams.borrow_mut().push(stream);
+                Result::<(), IggyError>::Ok(())
+            }
+        });
+        try_join_all(load_stream_tasks).await?;
+
+        for stream in loaded_streams.take() {
+            if self.streams.contains_key(&stream.stream_id) {
+                error!("Stream with ID: '{}' already exists.", 
&stream.stream_id);
+                continue;
+            }
+
+            if self.streams_ids.contains_key(&stream.name) {
+                error!("Stream with name: '{}' already exists.", &stream.name);
+                continue;
+            }
+
+            self.metrics.increment_streams(1);
+            self.metrics.increment_topics(stream.get_topics_count());
+            self.metrics
+                .increment_partitions(stream.get_partitions_count());
+            self.metrics.increment_segments(stream.get_segments_count());
+            self.metrics.increment_messages(stream.get_messages_count());
+
+            self.streams_ids
+                .insert(stream.name.clone(), stream.stream_id);
+            self.streams.insert(stream.stream_id, stream);
+        }
+
+        info!("Loaded {} stream(s) from disk.", self.streams.len());
+        Ok(())
     }
 
     pub fn assert_init(&self) {}
diff --git a/core/server/src/shard/transmission/frame.rs 
b/core/server/src/shard/transmission/frame.rs
index ac60863d..ec63daf0 100644
--- a/core/server/src/shard/transmission/frame.rs
+++ b/core/server/src/shard/transmission/frame.rs
@@ -1,3 +1,20 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 use async_channel::Sender;
 use bytes::Bytes;
 use iggy_common::IggyError;
diff --git a/core/server/src/shard/transmission/message.rs 
b/core/server/src/shard/transmission/message.rs
index e9f1dd4b..3572c0ec 100644
--- a/core/server/src/shard/transmission/message.rs
+++ b/core/server/src/shard/transmission/message.rs
@@ -1,3 +1,20 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 use crate::binary::command::ServerCommand;
 
 #[derive(Debug)]
diff --git a/core/server/src/shard/transmission/mod.rs 
b/core/server/src/shard/transmission/mod.rs
index b871152a..1f9a79ed 100644
--- a/core/server/src/shard/transmission/mod.rs
+++ b/core/server/src/shard/transmission/mod.rs
@@ -1,3 +1,21 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 pub mod connector;
 pub mod frame;
 pub mod message;
diff --git a/core/server/src/state/file.rs b/core/server/src/state/file.rs
index f162ed75..aa14b501 100644
--- a/core/server/src/state/file.rs
+++ b/core/server/src/state/file.rs
@@ -47,7 +47,7 @@ pub struct FileState {
     version: u32,
     path: String,
     persister: Arc<PersisterKind>,
-    encryptor: Option<Arc<EncryptorKind>>,
+    encryptor: Option<EncryptorKind>,
 }
 
 impl FileState {
@@ -55,7 +55,7 @@ impl FileState {
         path: &str,
         version: &SemanticVersion,
         persister: Arc<PersisterKind>,
-        encryptor: Option<Arc<EncryptorKind>>,
+        encryptor: Option<EncryptorKind>,
     ) -> Self {
         Self {
             current_index: AtomicU64::new(0),
diff --git a/core/server/src/streaming/systems/info.rs 
b/core/server/src/streaming/systems/info.rs
index 61c1bcbb..3c20fd63 100644
--- a/core/server/src/streaming/systems/info.rs
+++ b/core/server/src/streaming/systems/info.rs
@@ -16,15 +16,11 @@
  * under the License.
  */
 
-use crate::streaming::systems::system::System;
 use crate::versioning::SemanticVersion;
-use iggy_common::IggyError;
 use serde::{Deserialize, Serialize};
 use std::collections::hash_map::DefaultHasher;
 use std::fmt::Display;
 use std::hash::{Hash, Hasher};
-use std::str::FromStr;
-use tracing::info;
 
 #[derive(Debug, Serialize, Deserialize, Default)]
 pub struct SystemInfo {
@@ -46,57 +42,6 @@ pub struct Migration {
     pub applied_at: u64,
 }
 
-impl System {
-    pub(crate) async fn load_version(&mut self) -> Result<(), IggyError> {
-        let current_version = SemanticVersion::current()?;
-        let mut system_info;
-        let load_system_info = self.storage.info.load().await;
-        if load_system_info.is_err() {
-            let error = load_system_info.err().unwrap();
-            if let IggyError::ResourceNotFound(_) = error {
-                info!("System info not found, creating...");
-                system_info = SystemInfo::default();
-                self.update_system_info(&mut system_info, &current_version)
-                    .await?;
-            } else {
-                return Err(error);
-            }
-        } else {
-            system_info = load_system_info.unwrap();
-        }
-
-        info!("Loaded {system_info}.");
-        let loaded_version = 
SemanticVersion::from_str(&system_info.version.version)?;
-        if current_version.is_equal_to(&loaded_version) {
-            info!("System version {current_version} is up to date.");
-        } else if current_version.is_greater_than(&loaded_version) {
-            info!(
-                "System version {current_version} is greater than 
{loaded_version}, checking the available migrations..."
-            );
-            self.update_system_info(&mut system_info, &current_version)
-                .await?;
-        } else {
-            info!(
-                "System version {current_version} is lower than 
{loaded_version}, possible downgrade."
-            );
-            self.update_system_info(&mut system_info, &current_version)
-                .await?;
-        }
-
-        Ok(())
-    }
-
-    async fn update_system_info(
-        &self,
-        system_info: &mut SystemInfo,
-        version: &SemanticVersion,
-    ) -> Result<(), IggyError> {
-        system_info.update_version(version);
-        self.storage.info.save(system_info).await?;
-        Ok(())
-    }
-}
-
 impl SystemInfo {
     pub fn update_version(&mut self, version: &SemanticVersion) {
         self.version.version = version.to_string();
diff --git a/core/server/src/streaming/systems/system.rs 
b/core/server/src/streaming/systems/system.rs
index 761f6d1b..5d830d71 100644
--- a/core/server/src/streaming/systems/system.rs
+++ b/core/server/src/streaming/systems/system.rs
@@ -102,10 +102,10 @@ impl System {
             map_toggle_str(config.encryption.enabled)
         );
 
-        let encryptor: Option<Arc<EncryptorKind>> = match 
config.encryption.enabled {
-            true => Some(Arc::new(EncryptorKind::Aes256Gcm(
+        let encryptor: Option<EncryptorKind> = match config.encryption.enabled 
{
+            true => Some(EncryptorKind::Aes256Gcm(
                 
Aes256GcmEncryptor::from_base64_key(&config.encryption.key).unwrap(),
-            ))),
+            )),
             false => None,
         };
 
@@ -116,13 +116,21 @@ impl System {
             &config.get_state_messages_file_path(),
             &version,
             state_persister,
-            encryptor.clone(),
+            encryptor,
         )));
+
+        //TODO: Just shut the fuck up rust-analyzer.
+        let encryptor: Option<EncryptorKind> = match config.encryption.enabled 
{
+            true => Some(EncryptorKind::Aes256Gcm(
+                
Aes256GcmEncryptor::from_base64_key(&config.encryption.key).unwrap(),
+            )),
+            false => None,
+        };
         Self::create(
             config.clone(),
             SystemStorage::new(config, partition_persister),
             state,
-            encryptor,
+            encryptor.map(Arc::new),
             data_maintenance_config,
             pat_config,
         )
@@ -224,9 +232,12 @@ impl System {
                 format!("{COMPONENT} (error: {error}) - failed to initialize 
system state")
             })?;
         let now = Instant::now();
+        //DONE
+        /*
         self.load_version().await.with_error_context(|error| {
             format!("{COMPONENT} (error: {error}) - failed to load version")
         })?;
+        */
         self.load_users(system_state.users.into_values().collect())
             .await
             .with_error_context(|error| {


Reply via email to