This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new 1f831d97 feat(io_uring): load server_config (#1798)
1f831d97 is described below
commit 1f831d9752f44e93bfdb3ca4000fa6e63989c6b9
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Fri May 16 21:37:23 2025 +0200
feat(io_uring): load server_config (#1798)
---
core/server/src/bootstrap.rs | 13 +++++-
core/server/src/main.rs | 90 +++++++++++++++++++++++++---------------
core/server/src/shard/builder.rs | 9 +---
core/server/src/shard/mod.rs | 3 +-
4 files changed, 70 insertions(+), 45 deletions(-)
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index 8eacf223..fd6a5aa0 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -1,3 +1,5 @@
+use monoio::{Buildable, Driver, Runtime};
+
use crate::shard::{connector::ShardConnector, frame::ShardFrame};
use std::ops::Range;
@@ -12,9 +14,18 @@ pub fn create_shard_connections(shards_set: Range<usize>) ->
Vec<ShardConnector<
}
pub async fn create_directories() {
-
+ todo!();
}
pub async fn create_root_user() {
+ todo!();
+}
+pub fn create_default_executor<D>() -> Runtime<D>
+where
+ D: Driver + Buildable,
+{
+ let builder = monoio::RuntimeBuilder::<D>::new();
+ let rt = Buildable::build(builder).expect("Failed to create default
runtime");
+ rt
}
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 8f0d1053..a906eb60 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -24,7 +24,9 @@ use dotenvy::dotenv;
use figlet_rs::FIGfont;
use monoio::Buildable;
use server::args::Args;
-use server::bootstrap::{create_directories, create_root_user,
create_shard_connections};
+use server::bootstrap::{
+ create_default_executor, create_directories, create_root_user,
create_shard_connections,
+};
use server::channels::commands::archive_state::ArchiveStateExecutor;
use
server::channels::commands::clean_personal_access_tokens::CleanPersonalAccessTokensExecutor;
use server::channels::commands::maintain_messages::MaintainMessagesExecutor;
@@ -32,7 +34,7 @@ use
server::channels::commands::print_sysinfo::SysInfoPrintExecutor;
use server::channels::commands::save_messages::SaveMessagesExecutor;
use server::channels::commands::verify_heartbeats::VerifyHeartbeatsExecutor;
use server::channels::handler::BackgroundServerCommandHandler;
-use server::configs::config_provider;
+use server::configs::config_provider::{self, ConfigProviderKind};
use server::configs::server::ServerConfig;
use server::http::http_server;
#[cfg(not(feature = "tokio-console"))]
@@ -72,40 +74,60 @@ fn main() -> Result<(), ServerError> {
let args = Args::parse();
let config_provider = config_provider::resolve(&args.config_provider)?;
- let config = ServerConfig::default();
- //TODO: Load config.
- /*
- let xd = std::thread::scope(|scope| {
- let config = scope.spawn(move || {
- let mut rt =
monoio::RuntimeBuilder::<monoio::IoUringDriver>::new().build().unwrap();
- let config: Result<ServerConfig, ServerError> = rt.block_on(async {
- let config = ServerConfig::load(&config_provider).await?;
- create_directories().await;
- create_root_user().await;
- Ok(config)
- });
- config
- }).join().unwrap();
+ let config = std::thread::scope(|scope| {
+ let config = scope
+ .spawn(move || {
+ async fn load_config(
+ config_provider: &ConfigProviderKind,
+ ) -> Result<ServerConfig, ServerError> {
+ let config = ServerConfig::load(config_provider).await?;
+ Ok(config)
+ }
+
+ let mut rt =
create_default_executor::<monoio::IoUringDriver>();
+ rt.block_on(load_config(&config_provider))
+ })
+ .join()
+ .expect("Failed to load config");
config
+ })?;
+
+ // Create directories and root user.
+ // Remove `local_data` directory if run with `--fresh` flag.
+ std::thread::scope(|scope| {
+ scope
+ .spawn(|| {
+ let mut rt =
create_default_executor::<monoio::IoUringDriver>();
+ rt.block_on(async {
+ if args.fresh {
+ let system_path = config.system.get_system_path();
+ if monoio::fs::metadata(&system_path).await.is_ok() {
+ println!(
+ "Removing system path at: {} because `--fresh`
flag was set",
+ system_path
+ );
+ //TODO: Impl dir walk and remove the files
+ /*
+ if let Err(e) =
tokio::fs::remove_dir_all(&system_path).await {
+ eprintln!("Failed to remove system path at {}:
{}", system_path, e);
+ }
+ */
+ }
+ }
+
+ // Create directories and root user
+ create_directories().await;
+ create_root_user().await;
+ });
+ })
+ .join()
+ .expect("Failed to create directories and root user");
});
- */
- /*
- let config = ServerConfig::load(&config_provider).await?;
- if args.fresh {
- let system_path = config.system.get_system_path();
- if tokio::fs::metadata(&system_path).await.is_ok() {
- println!(
- "Removing system path at: {} because `--fresh` flag was set",
- system_path
- );
- if let Err(e) = tokio::fs::remove_dir_all(&system_path).await {
- eprintln!("Failed to remove system path at {}: {}",
system_path, e);
- }
- }
- }
+
+ // Initialize logging
let mut logging = Logging::new(config.telemetry.clone());
logging.early_init();
- */
+
// TODO: Make this configurable from config as a range
// for example this instance of Iggy will use cores from 0..4
let available_cpus = available_parallelism()
@@ -152,11 +174,11 @@ fn main() -> Result<(), ServerError> {
.join()
.expect(format!("Failed to join thread for shard-{id}").as_str());
}
- // From this point on, we can use tracing macros to log messages.
- /*
+ // From this point on, we can use tracing macros to log messages.
logging.late_init(config.system.get_system_path(),
&config.system.logging)?;
+ /*
#[cfg(feature = "disable-mimalloc")]
tracing::warn!(
"Using default system allocator because code was build with
`disable-mimalloc` feature"
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
index 08a02e54..750e2b71 100644
--- a/core/server/src/shard/builder.rs
+++ b/core/server/src/shard/builder.rs
@@ -61,13 +61,6 @@ impl IggyShardBuilder {
.expect("Failed to find connection with the specified ID");
let shards = connections.into_iter().map(Shard::new).collect();
- IggyShard::new(
- id,
- shards,
- config,
- stop_receiver,
- stop_sender,
- receiver,
- )
+ IggyShard::new(id, shards, config, stop_receiver, stop_sender,
receiver)
}
}
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index bffaca45..dcb089d9 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -91,6 +91,5 @@ impl IggyShard {
}
}
- pub fn assert_init(&self) {
- }
+ pub fn assert_init(&self) {}
}