This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch io_uring_tpc in repository https://gitbox.apache.org/repos/asf/iggy.git
commit fc3f0fdbfba7f083daffc7d280b988f828caf773 Author: numinex <[email protected]> AuthorDate: Sat Jun 21 13:51:27 2025 +0200 fix state initialization with gate --- core/common/src/utils/crypto.rs | 3 +- core/server/src/bootstrap.rs | 48 ++--- core/server/src/lib.rs | 8 +- core/server/src/main.rs | 151 ++++++++++++--- core/server/src/shard/builder.rs | 65 +++---- core/server/src/shard/gate.rs | 42 +++++ core/server/src/shard/mod.rs | 256 ++++++++++++++++++++++++-- core/server/src/shard/transmission/frame.rs | 17 ++ core/server/src/shard/transmission/message.rs | 17 ++ core/server/src/shard/transmission/mod.rs | 18 ++ core/server/src/state/file.rs | 4 +- core/server/src/streaming/systems/info.rs | 55 ------ core/server/src/streaming/systems/system.rs | 21 ++- 13 files changed, 524 insertions(+), 181 deletions(-) diff --git a/core/common/src/utils/crypto.rs b/core/common/src/utils/crypto.rs index 34cd72c7..11d51cb7 100644 --- a/core/common/src/utils/crypto.rs +++ b/core/common/src/utils/crypto.rs @@ -23,7 +23,7 @@ use aes_gcm::aead::{Aead, OsRng}; use aes_gcm::{AeadCore, Aes256Gcm, KeyInit}; use std::fmt::Debug; -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum EncryptorKind { Aes256Gcm(Aes256GcmEncryptor), } @@ -46,6 +46,7 @@ pub trait Encryptor { fn decrypt(&self, data: &[u8]) -> Result<Vec<u8>, IggyError>; } +#[derive(Clone)] pub struct Aes256GcmEncryptor { cipher: Aes256Gcm, } diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs index c6108568..4f356a9b 100644 --- a/core/server/src/bootstrap.rs +++ b/core/server/src/bootstrap.rs @@ -12,10 +12,13 @@ use crate::{ IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV, configs::{config_provider::ConfigProviderKind, server::ServerConfig, system::SystemConfig}, server_error::ServerError, - shard::{transmission::connector::ShardConnector, transmission::frame::ShardFrame}, - streaming::users::user::User, + shard::transmission::{connector::ShardConnector, frame::ShardFrame}, + streaming::{ + persistence::persister::{FilePersister, FileWithSyncPersister, PersisterKind}, + users::user::User, + }, }; -use std::{env, fs::remove_dir_all, ops::Range, path::Path}; +use std::{env, fs::remove_dir_all, ops::Range, path::Path, sync::Arc}; pub fn create_shard_connections(shards_set: Range<usize>) -> Vec<ShardConnector<ShardFrame>> { let shards_count = shards_set.len(); @@ -68,37 +71,7 @@ pub async fn create_directories(config: &SystemConfig) -> Result<(), IggyError> // TODO: Move this to individual shard level /* - let state_entries = self.state.init().await.with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to initialize state entries") - })?; - let system_state = SystemState::init(state_entries) - .await - .with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to initialize system state") - })?; - let now = Instant::now(); - self.load_version().await.with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to load version") - })?; - self.load_users(system_state.users.into_values().collect()) - .await - .with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to load users") - })?; - self.load_streams(system_state.streams.into_values().collect()) - .await - .with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to load streams") - })?; - if let Some(archiver) = self.archiver.as_ref() { - archiver - .init() - .await - .expect("Failed to initialize archiver"); - } - info!("Initialized system in {} ms.", now.elapsed().as_millis()); - Ok(()) - */ + */ } pub fn create_root_user() -> User { @@ -158,3 +131,10 @@ pub fn create_shard_executor() -> Runtime<TimeDriver<monoio::IoUringDriver>> { let rt = Buildable::build(builder).expect("Failed to create default runtime"); rt } + +pub fn resolve_persister(enforce_fsync: bool) -> Arc<PersisterKind> { + match enforce_fsync { + true => Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister)), + false => Arc::new(PersisterKind::File(FilePersister)), + } +} diff --git a/core/server/src/lib.rs b/core/server/src/lib.rs index e64837da..3dcf64b2 100644 --- a/core/server/src/lib.rs +++ b/core/server/src/lib.rs @@ -43,11 +43,11 @@ pub mod streaming; pub mod tcp; pub mod versioning; -const VERSION: &str = env!("CARGO_PKG_VERSION"); -const IGGY_ROOT_USERNAME_ENV: &str = "IGGY_ROOT_USERNAME"; -const IGGY_ROOT_PASSWORD_ENV: &str = "IGGY_ROOT_PASSWORD"; +pub const VERSION: &str = env!("CARGO_PKG_VERSION"); +pub const IGGY_ROOT_USERNAME_ENV: &str = "IGGY_ROOT_USERNAME"; +pub const IGGY_ROOT_PASSWORD_ENV: &str = "IGGY_ROOT_PASSWORD"; -pub(crate) fn map_toggle_str<'a>(enabled: bool) -> &'a str { +pub fn map_toggle_str<'a>(enabled: bool) -> &'a str { match enabled { true => "enabled", false => "disabled", diff --git a/core/server/src/main.rs b/core/server/src/main.rs index 67728862..fc2b21f4 100644 --- a/core/server/src/main.rs +++ b/core/server/src/main.rs @@ -16,6 +16,7 @@ * under the License. */ +use std::sync::Arc; use std::thread::available_parallelism; use anyhow::Result; @@ -23,33 +24,32 @@ use clap::Parser; use dotenvy::dotenv; use error_set::ErrContext; use figlet_rs::FIGfont; +use iggy_common::create_user::CreateUser; +use iggy_common::{Aes256GcmEncryptor, EncryptorKind, IggyError}; use server::args::Args; use server::bootstrap::{ create_default_executor, create_directories, create_root_user, create_shard_connections, - create_shard_executor, load_config, + create_shard_executor, load_config, resolve_persister, }; -use server::channels::commands::archive_state::ArchiveStateExecutor; -use server::channels::commands::clean_personal_access_tokens::CleanPersonalAccessTokensExecutor; -use server::channels::commands::maintain_messages::MaintainMessagesExecutor; -use server::channels::commands::print_sysinfo::SysInfoPrintExecutor; -use server::channels::commands::save_messages::SaveMessagesExecutor; -use server::channels::commands::verify_heartbeats::VerifyHeartbeatsExecutor; -use server::channels::handler::BackgroundServerCommandHandler; -use server::configs::config_provider::{self, ConfigProviderKind}; -use server::configs::server::ServerConfig; -use server::http::http_server; +use server::configs::config_provider::{self}; #[cfg(not(feature = "tokio-console"))] use server::log::logger::Logging; #[cfg(feature = "tokio-console")] use server::log::tokio_console::Logging; -use server::quic::quic_server; use server::server_error::ServerError; use server::shard::IggyShard; -use server::streaming::systems::system::{SharedSystem, System}; -use server::streaming::utils::MemoryPool; -use server::tcp::tcp_server; +use server::shard::gate::Gate; +use server::state::StateKind; +use server::state::command::EntryCommand; +use server::state::file::FileState; +use server::state::models::CreateUserWithId; +use server::state::system::SystemState; +use server::versioning::SemanticVersion; +use server::{IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV, map_toggle_str}; use tokio::time::Instant; -use tracing::{info, instrument}; +use tracing::{error, info, instrument}; + +const COMPONENT: &str = "MAIN"; #[instrument(skip_all, name = "trace_start_server")] fn main() -> Result<(), ServerError> { @@ -88,7 +88,11 @@ fn main() -> Result<(), ServerError> { config })?; - // Create directories and root user. + // Initialize logging + let mut logging = Logging::new(config.telemetry.clone()); + logging.early_init(); + + // Create directories. // Remove `local_data` directory if run with `--fresh` flag. std::thread::scope(|scope| { scope @@ -117,13 +121,9 @@ fn main() -> Result<(), ServerError> { }) }) .join() - .expect("Failed to create directories and root user") + .expect("Failed join thread") }) - .with_error_context(|err| format!("Failed to create directories, err: {err}"))?; - - // Initialize logging - let mut logging = Logging::new(config.telemetry.clone()); - logging.early_init(); + .with_error_context(|err| format!("Failed to init server: {err}"))?; // TODO: Make this configurable from config as a range // for example this instance of Iggy will use cores from 0..4 @@ -132,9 +132,11 @@ fn main() -> Result<(), ServerError> { let shards_set = 0..shards_count; let connections = create_shard_connections(shards_set.clone()); for shard_id in shards_set { + let gate: Arc<Gate<()>> = Arc::new(Gate::new()); let id = shard_id as u16; let connections = connections.clone(); - let server_config = config.clone(); + let config = config.clone(); + let state_persister = resolve_persister(config.system.state.enforce_fsync); std::thread::Builder::new() .name(format!("shard-{id}")) .spawn(move || { @@ -143,18 +145,109 @@ fn main() -> Result<(), ServerError> { let mut rt = create_shard_executor(); rt.block_on(async move { + let version = SemanticVersion::current().expect("Invalid version"); + info!( + "Server-side encryption is {}.", + map_toggle_str(config.system.encryption.enabled) + ); + let encryptor: Option<EncryptorKind> = match config.system.encryption.enabled { + true => Some(EncryptorKind::Aes256Gcm( + Aes256GcmEncryptor::from_base64_key(&config.system.encryption.key) + .unwrap(), + )), + false => None, + }; + + let state = StateKind::File(FileState::new( + &config.system.get_state_messages_file_path(), + &version, + state_persister, + encryptor.clone(), + )); + + // We can't use std::sync::Once because it doesn't support async. + // Trait bound on the closure is FnOnce. + let gate = gate.clone(); + // Peak into the state to check if the root user exists. + // If it does not exist, create it. + gate.with_async::<Result<(), IggyError>>(async |gate_state| { + // A thread already initialized state + // Thus, we can skip it. + if let Some(_) = gate_state.inner() { + return Ok(()); + } + + let state_entries = state.load_entries().await.with_error_context(|error| { + format!( + "{COMPONENT} (error: {error}) - failed to load state entries" + ) + })?; + let root_exists = state_entries + .into_iter() + .find(|entry| { + entry + .command() + .and_then(|command| match command { + EntryCommand::CreateUser(payload) + if payload.command.username + == IGGY_ROOT_USERNAME_ENV && payload.command.password == IGGY_ROOT_PASSWORD_ENV => + { + Ok(true) + } + _ => Ok(false), + }) + .map_or_else( + |err| { + error!("Failed to check if root user exists: {err}"); + false + }, + |v| v, + ) + }) + .is_some(); + + if !root_exists { + info!("No users found, creating the root user..."); + let root = create_root_user(); + let command = CreateUser { + username: root.username.clone(), + password: root.password.clone(), + status: root.status, + permissions: root.permissions.clone(), + }; + state + .apply(0, &EntryCommand::CreateUser(CreateUserWithId { + user_id: root.id, + command + })) + .await + .with_error_context(|error| { + format!( + "{COMPONENT} (error: {error}) - failed to apply create user command, username: {}", + root.username + ) + })?; + } + + gate_state.set_result(()); + Ok(()) + }) + .await; + let builder = IggyShard::builder(); let mut shard = builder .id(id) .connections(connections) - .server_config(server_config) + .config(config) + .encryptor(encryptor) + .version(version) + .state(state) .build() .await; - if let Err(e) = shard.init().await { - //TODO: If one of the shards fails to initialize, we should crash the whole program; - panic!("Failed to initialize shard-{id}: {e}"); - } + //TODO: If one of the shards fails to initialize, we should crash the whole program; + shard.init().await.expect("Failed to initialize shard-{id}: {e}"); + info!("Initiated shard with ID: {id}"); //TODO: If one of the shards fails to initialize, we should crash the whole program; shard.assert_init(); }) diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs index 36fe0e54..0fd5250f 100644 --- a/core/server/src/shard/builder.rs +++ b/core/server/src/shard/builder.rs @@ -22,14 +22,12 @@ use iggy_common::{Aes256GcmEncryptor, EncryptorKind}; use tracing::info; use crate::{ + bootstrap::resolve_persister, configs::server::ServerConfig, map_toggle_str, shard::Shard, state::{StateKind, file::FileState}, - streaming::{ - persistence::persister::{FilePersister, FileWithSyncPersister, PersisterKind}, - storage::SystemStorage, - }, + streaming::storage::SystemStorage, versioning::SemanticVersion, }; @@ -40,6 +38,9 @@ pub struct IggyShardBuilder { id: Option<u16>, connections: Option<Vec<ShardConnector<ShardFrame>>>, config: Option<ServerConfig>, + encryptor: Option<EncryptorKind>, + version: Option<SemanticVersion>, + state: Option<StateKind>, } impl IggyShardBuilder { @@ -53,16 +54,33 @@ impl IggyShardBuilder { self } - pub fn server_config(mut self, config: ServerConfig) -> Self { + pub fn config(mut self, config: ServerConfig) -> Self { self.config = Some(config); self } + pub fn encryptor(mut self, encryptor: Option<EncryptorKind>) -> Self { + self.encryptor = encryptor; + self + } + + pub fn version(mut self, version: SemanticVersion) -> Self { + self.version = Some(version); + self + } + + pub fn state(mut self, state: StateKind) -> Self { + self.state = Some(state); + self + } + // TODO: Too much happens in there, some of those bootstrapping logic should be moved outside. pub async fn build(self) -> IggyShard { let id = self.id.unwrap(); let config = self.config.unwrap(); let connections = self.connections.unwrap(); + let state = self.state.unwrap(); + let version = self.version.unwrap(); let (stop_sender, stop_receiver, frame_receiver) = connections .iter() .filter(|c| c.id == id) @@ -76,33 +94,8 @@ impl IggyShardBuilder { .next() .expect("Failed to find connection with the specified ID"); let shards = connections.into_iter().map(Shard::new).collect(); - - //TODO: This can be discrete step in the builder bootstrapped from main function. - let version = SemanticVersion::current().expect("Invalid version"); - - //TODO: This can be discrete step in the builder bootstrapped from main function. - info!( - "Server-side encryption is {}.", - map_toggle_str(config.system.encryption.enabled) - ); - let encryptor: Option<Arc<EncryptorKind>> = match config.system.encryption.enabled { - true => Some(Arc::new(EncryptorKind::Aes256Gcm( - Aes256GcmEncryptor::from_base64_key(&config.system.encryption.key).unwrap(), - ))), - false => None, - }; - - //TODO: This can be discrete step in the builder bootstrapped from main function. - let state_persister = Self::resolve_persister(config.system.state.enforce_fsync); - let state = Rc::new(StateKind::File(FileState::new( - &config.system.get_state_messages_file_path(), - &version, - state_persister, - encryptor.clone(), - ))); - - //TODO: This can be discrete step in the builder bootstrapped from main function. - let partition_persister = Self::resolve_persister(config.system.partition.enforce_fsync); + //TODO: Eghhhh....... + let partition_persister = resolve_persister(config.system.partition.enforce_fsync); let storage = Rc::new(SystemStorage::new( config.system.clone(), partition_persister, @@ -115,16 +108,10 @@ impl IggyShardBuilder { storage: storage, state: state, config: config, + version: version, stop_receiver: stop_receiver, stop_sender: stop_sender, frame_receiver: Cell::new(Some(frame_receiver)), } } - - fn resolve_persister(enforce_fsync: bool) -> Arc<PersisterKind> { - match enforce_fsync { - true => Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister)), - false => Arc::new(PersisterKind::File(FilePersister)), - } - } } diff --git a/core/server/src/shard/gate.rs b/core/server/src/shard/gate.rs new file mode 100644 index 00000000..a26950a4 --- /dev/null +++ b/core/server/src/shard/gate.rs @@ -0,0 +1,42 @@ +use std::sync::{Condvar, Mutex}; + +#[derive(Default)] +pub struct Gate<T> { + state: Mutex<GateState<T>>, +} + +#[derive(Default)] +pub struct GateState<T> { + result: Option<T>, +} + +impl<T> GateState<T> { + pub fn set_result(&mut self, result: T) { + self.result = Some(result); + } + + pub fn inner(&self) -> &Option<T> { + &self.result + } +} + +impl<T> Gate<T> +where + T: Default, +{ + pub fn new() -> Self { + Gate { + state: Default::default(), + } + } + + pub async fn with_async<R>(&self, f: impl AsyncFnOnce(&mut GateState<T>) -> R) { + let mut guard = self.state.lock().unwrap(); + f(&mut guard).await; + } + + pub async fn with<R>(&self, f: impl FnOnce(&mut GateState<T>) -> R) { + let mut guard = self.state.lock().unwrap(); + f(&mut guard); + } +} diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index 6a6d35b8..23ef1e15 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -17,16 +17,19 @@ */ pub mod builder; +pub mod gate; pub mod namespace; pub mod transmission; use ahash::HashMap; use builder::IggyShardBuilder; +use error_set::ErrContext; use iggy_common::IggyError; use namespace::IggyNamespace; use std::{ cell::{Cell, RefCell}, rc::Rc, + str::FromStr, sync::Arc, time::Instant, }; @@ -37,9 +40,17 @@ use crate::{ bootstrap::create_root_user, configs::server::ServerConfig, shard::transmission::frame::ShardFrame, - state::{StateKind, file::FileState}, - streaming::storage::SystemStorage, + state::{ + StateKind, + file::FileState, + system::{SystemState, UserState}, + }, + streaming::{storage::SystemStorage, systems::info::SystemInfo}, + versioning::SemanticVersion, }; + +pub const COMPONENT: &str = "SHARD"; + pub(crate) struct Shard { id: u16, connection: ShardConnector<ShardFrame>, @@ -62,6 +73,7 @@ pub struct IggyShard { pub id: u16, shards: Vec<Shard>, shards_table: RefCell<HashMap<IggyNamespace, ShardInfo>>, + version: SemanticVersion, //pub(crate) permissioner: RefCell<Permissioner>, //pub(crate) streams: RwLock<HashMap<u32, Stream>>, @@ -70,7 +82,7 @@ pub struct IggyShard { // TODO: Refactor. pub(crate) storage: Rc<SystemStorage>, - pub(crate) state: Rc<StateKind>, + pub(crate) state: StateKind, //pub(crate) encryptor: Option<Rc<dyn Encryptor>>, config: ServerConfig, //pub(crate) client_manager: RefCell<ClientManager>, @@ -92,10 +104,12 @@ impl IggyShard { //let state_entries = self.state.init().await?; //let system_state = SystemState::init(state_entries).await?; //let user = create_root_user(); - self.load_state().await; - self.load_users().await; + self.load_version().await?; + let SystemState { users, streams } = self.load_state().await?; + self.load_users(users.into_values().collect()).await; // Add default root user. - self.load_streams().await; + self.load_streams(streams.into_values().collect()).await; + //TODO: Fix the archiver. /* if let Some(archiver) = self.archiver.as_ref() { @@ -109,16 +123,234 @@ impl IggyShard { Ok(()) } - async fn load_state(&self) { - todo!() + async fn load_version(&self) -> Result<(), IggyError> { + async fn update_system_info( + storage: &Rc<SystemStorage>, + system_info: &mut SystemInfo, + version: &SemanticVersion, + ) -> Result<(), IggyError> { + system_info.update_version(version); + storage.info.save(system_info).await?; + Ok(()) + } + + let current_version = &self.version; + let mut system_info; + let load_system_info = self.storage.info.load().await; + if load_system_info.is_err() { + let error = load_system_info.err().unwrap(); + if let IggyError::ResourceNotFound(_) = error { + info!("System info not found, creating..."); + system_info = SystemInfo::default(); + update_system_info(&self.storage, &mut system_info, current_version).await?; + } else { + return Err(error); + } + } else { + system_info = load_system_info.unwrap(); + } + + info!("Loaded {system_info}."); + let loaded_version = SemanticVersion::from_str(&system_info.version.version)?; + if current_version.is_equal_to(&loaded_version) { + info!("System version {current_version} is up to date."); + } else if current_version.is_greater_than(&loaded_version) { + info!( + "System version {current_version} is greater than {loaded_version}, checking the available migrations..." + ); + update_system_info(&self.storage, &mut system_info, current_version).await?; + } else { + info!( + "System version {current_version} is lower than {loaded_version}, possible downgrade." + ); + update_system_info(&self.storage, &mut system_info, current_version).await?; + } + + Ok(()) } - async fn load_users(&self) { - todo!() + async fn load_state(&self) -> Result<SystemState, IggyError> { + let state_entries = self.state.init().await.with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to initialize state entries") + })?; + let system_state = SystemState::init(state_entries) + .await + .with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to initialize system state") + })?; + Ok(system_state) } - async fn load_streams(&self) { - todo!() + async fn load_users(&mut self, users: Vec<UserState>) -> Result<(), IggyError> { + info!("Loading users..."); + /* + + for user_state in users.into_iter() { + let mut user = User::with_password( + user_state.id, + &user_state.username, + user_state.password_hash, + user_state.status, + user_state.permissions, + ); + + user.created_at = user_state.created_at; + user.personal_access_tokens = user_state + .personal_access_tokens + .into_values() + .map(|token| { + ( + Arc::new(token.token_hash.clone()), + PersonalAccessToken::raw( + user_state.id, + &token.name, + &token.token_hash, + token.expiry_at, + ), + ) + }) + .collect(); + self.users.insert(user_state.id, user); + } + + let users_count = self.users.len(); + let current_user_id = self.users.keys().max().unwrap_or(&1); + USER_ID.store(current_user_id + 1, Ordering::SeqCst); + self.permissioner + .init(&self.users.values().collect::<Vec<&User>>()); + self.metrics.increment_users(users_count as u32); + info!("Initialized {users_count} user(s)."); + */ + Ok(()) + } + + async fn load_streams(&mut self, streams: Vec<StreamState>) -> Result<(), IggyError> { + todo!(); + info!("Loading streams from disk..."); + let mut unloaded_streams = Vec::new(); + let mut dir_entries = read_dir(&self.config.get_streams_path()) + .await + .map_err(|error| { + error!("Cannot read streams directory: {error}"); + IggyError::CannotReadStreams + })?; + + while let Some(dir_entry) = dir_entries.next_entry().await.unwrap_or(None) { + let name = dir_entry.file_name().into_string().unwrap(); + let stream_id = name.parse::<u32>().map_err(|_| { + error!("Invalid stream ID file with name: '{name}'."); + IggyError::InvalidNumberValue + })?; + let stream_state = streams.iter().find(|s| s.id == stream_id); + if stream_state.is_none() { + error!( + "Stream with ID: '{stream_id}' was not found in state, but exists on disk and will be removed." + ); + if let Err(error) = fs::remove_dir_all(&dir_entry.path()).await { + error!("Cannot remove stream directory: {error}"); + } else { + warn!("Stream with ID: '{stream_id}' was removed."); + } + continue; + } + + let stream_state = stream_state.unwrap(); + let mut stream = Stream::empty( + stream_id, + &stream_state.name, + self.config.clone(), + self.storage.clone(), + ); + stream.created_at = stream_state.created_at; + unloaded_streams.push(stream); + } + + let state_stream_ids = streams + .iter() + .map(|stream| stream.id) + .collect::<AHashSet<u32>>(); + let unloaded_stream_ids = unloaded_streams + .iter() + .map(|stream| stream.stream_id) + .collect::<AHashSet<u32>>(); + let mut missing_ids = state_stream_ids + .difference(&unloaded_stream_ids) + .copied() + .collect::<AHashSet<u32>>(); + if missing_ids.is_empty() { + info!("All streams found on disk were found in state."); + } else { + warn!("Streams with IDs: '{missing_ids:?}' were not found on disk."); + if self.config.recovery.recreate_missing_state { + info!( + "Recreating missing state in recovery config is enabled, missing streams will be created." + ); + for stream_id in missing_ids.iter() { + let stream_id = *stream_id; + let stream_state = streams.iter().find(|s| s.id == stream_id).unwrap(); + let stream = Stream::create( + stream_id, + &stream_state.name, + self.config.clone(), + self.storage.clone(), + ); + stream.persist().await?; + unloaded_streams.push(stream); + info!( + "Missing stream with ID: '{stream_id}', name: {} was recreated.", + stream_state.name + ); + } + missing_ids.clear(); + } else { + warn!( + "Recreating missing state in recovery config is disabled, missing streams will not be created." + ); + } + } + + let mut streams_states = streams + .into_iter() + .filter(|s| !missing_ids.contains(&s.id)) + .map(|s| (s.id, s)) + .collect::<AHashMap<_, _>>(); + let loaded_streams = RefCell::new(Vec::new()); + let load_stream_tasks = unloaded_streams.into_iter().map(|mut stream| { + let state = streams_states.remove(&stream.stream_id).unwrap(); + + async { + stream.load(state).await?; + loaded_streams.borrow_mut().push(stream); + Result::<(), IggyError>::Ok(()) + } + }); + try_join_all(load_stream_tasks).await?; + + for stream in loaded_streams.take() { + if self.streams.contains_key(&stream.stream_id) { + error!("Stream with ID: '{}' already exists.", &stream.stream_id); + continue; + } + + if self.streams_ids.contains_key(&stream.name) { + error!("Stream with name: '{}' already exists.", &stream.name); + continue; + } + + self.metrics.increment_streams(1); + self.metrics.increment_topics(stream.get_topics_count()); + self.metrics + .increment_partitions(stream.get_partitions_count()); + self.metrics.increment_segments(stream.get_segments_count()); + self.metrics.increment_messages(stream.get_messages_count()); + + self.streams_ids + .insert(stream.name.clone(), stream.stream_id); + self.streams.insert(stream.stream_id, stream); + } + + info!("Loaded {} stream(s) from disk.", self.streams.len()); + Ok(()) } pub fn assert_init(&self) {} diff --git a/core/server/src/shard/transmission/frame.rs b/core/server/src/shard/transmission/frame.rs index ac60863d..ec63daf0 100644 --- a/core/server/src/shard/transmission/frame.rs +++ b/core/server/src/shard/transmission/frame.rs @@ -1,3 +1,20 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ use async_channel::Sender; use bytes::Bytes; use iggy_common::IggyError; diff --git a/core/server/src/shard/transmission/message.rs b/core/server/src/shard/transmission/message.rs index e9f1dd4b..3572c0ec 100644 --- a/core/server/src/shard/transmission/message.rs +++ b/core/server/src/shard/transmission/message.rs @@ -1,3 +1,20 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ use crate::binary::command::ServerCommand; #[derive(Debug)] diff --git a/core/server/src/shard/transmission/mod.rs b/core/server/src/shard/transmission/mod.rs index b871152a..1f9a79ed 100644 --- a/core/server/src/shard/transmission/mod.rs +++ b/core/server/src/shard/transmission/mod.rs @@ -1,3 +1,21 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + pub mod connector; pub mod frame; pub mod message; diff --git a/core/server/src/state/file.rs b/core/server/src/state/file.rs index f162ed75..aa14b501 100644 --- a/core/server/src/state/file.rs +++ b/core/server/src/state/file.rs @@ -47,7 +47,7 @@ pub struct FileState { version: u32, path: String, persister: Arc<PersisterKind>, - encryptor: Option<Arc<EncryptorKind>>, + encryptor: Option<EncryptorKind>, } impl FileState { @@ -55,7 +55,7 @@ impl FileState { path: &str, version: &SemanticVersion, persister: Arc<PersisterKind>, - encryptor: Option<Arc<EncryptorKind>>, + encryptor: Option<EncryptorKind>, ) -> Self { Self { current_index: AtomicU64::new(0), diff --git a/core/server/src/streaming/systems/info.rs b/core/server/src/streaming/systems/info.rs index 61c1bcbb..3c20fd63 100644 --- a/core/server/src/streaming/systems/info.rs +++ b/core/server/src/streaming/systems/info.rs @@ -16,15 +16,11 @@ * under the License. */ -use crate::streaming::systems::system::System; use crate::versioning::SemanticVersion; -use iggy_common::IggyError; use serde::{Deserialize, Serialize}; use std::collections::hash_map::DefaultHasher; use std::fmt::Display; use std::hash::{Hash, Hasher}; -use std::str::FromStr; -use tracing::info; #[derive(Debug, Serialize, Deserialize, Default)] pub struct SystemInfo { @@ -46,57 +42,6 @@ pub struct Migration { pub applied_at: u64, } -impl System { - pub(crate) async fn load_version(&mut self) -> Result<(), IggyError> { - let current_version = SemanticVersion::current()?; - let mut system_info; - let load_system_info = self.storage.info.load().await; - if load_system_info.is_err() { - let error = load_system_info.err().unwrap(); - if let IggyError::ResourceNotFound(_) = error { - info!("System info not found, creating..."); - system_info = SystemInfo::default(); - self.update_system_info(&mut system_info, ¤t_version) - .await?; - } else { - return Err(error); - } - } else { - system_info = load_system_info.unwrap(); - } - - info!("Loaded {system_info}."); - let loaded_version = SemanticVersion::from_str(&system_info.version.version)?; - if current_version.is_equal_to(&loaded_version) { - info!("System version {current_version} is up to date."); - } else if current_version.is_greater_than(&loaded_version) { - info!( - "System version {current_version} is greater than {loaded_version}, checking the available migrations..." - ); - self.update_system_info(&mut system_info, ¤t_version) - .await?; - } else { - info!( - "System version {current_version} is lower than {loaded_version}, possible downgrade." - ); - self.update_system_info(&mut system_info, ¤t_version) - .await?; - } - - Ok(()) - } - - async fn update_system_info( - &self, - system_info: &mut SystemInfo, - version: &SemanticVersion, - ) -> Result<(), IggyError> { - system_info.update_version(version); - self.storage.info.save(system_info).await?; - Ok(()) - } -} - impl SystemInfo { pub fn update_version(&mut self, version: &SemanticVersion) { self.version.version = version.to_string(); diff --git a/core/server/src/streaming/systems/system.rs b/core/server/src/streaming/systems/system.rs index 761f6d1b..5d830d71 100644 --- a/core/server/src/streaming/systems/system.rs +++ b/core/server/src/streaming/systems/system.rs @@ -102,10 +102,10 @@ impl System { map_toggle_str(config.encryption.enabled) ); - let encryptor: Option<Arc<EncryptorKind>> = match config.encryption.enabled { - true => Some(Arc::new(EncryptorKind::Aes256Gcm( + let encryptor: Option<EncryptorKind> = match config.encryption.enabled { + true => Some(EncryptorKind::Aes256Gcm( Aes256GcmEncryptor::from_base64_key(&config.encryption.key).unwrap(), - ))), + )), false => None, }; @@ -116,13 +116,21 @@ impl System { &config.get_state_messages_file_path(), &version, state_persister, - encryptor.clone(), + encryptor, ))); + + //TODO: Just shut the fuck up rust-analyzer. + let encryptor: Option<EncryptorKind> = match config.encryption.enabled { + true => Some(EncryptorKind::Aes256Gcm( + Aes256GcmEncryptor::from_base64_key(&config.encryption.key).unwrap(), + )), + false => None, + }; Self::create( config.clone(), SystemStorage::new(config, partition_persister), state, - encryptor, + encryptor.map(Arc::new), data_maintenance_config, pat_config, ) @@ -224,9 +232,12 @@ impl System { format!("{COMPONENT} (error: {error}) - failed to initialize system state") })?; let now = Instant::now(); + //DONE + /* self.load_version().await.with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to load version") })?; + */ self.load_users(system_state.users.into_values().collect()) .await .with_error_context(|error| {
