This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new 8507c732 feat(io_uring): bootstrap the main function (#1788)
8507c732 is described below
commit 8507c732efc193717c818149d345c68bab473f76
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Wed May 14 20:38:55 2025 +0200
feat(io_uring): bootstrap the main function (#1788)
---
Cargo.lock | 1 +
core/server/Cargo.toml | 1 +
core/server/src/bootstrap.rs | 20 ++++++++++
core/server/src/lib.rs | 4 +-
core/server/src/main.rs | 80 ++++++++++++++++++++++++++++++++++++--
core/server/src/shard/builder.rs | 73 ++++++++++++++++++++++++++++++++++
core/server/src/shard/connector.rs | 2 +
core/server/src/shard/frame.rs | 3 +-
core/server/src/shard/mod.rs | 79 +++++++++++++++++++++++++++----------
9 files changed, 234 insertions(+), 29 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index e3a0fa12..de8beee8 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -6252,6 +6252,7 @@ dependencies = [
"futures",
"human-repr",
"iggy_common",
+ "io-uring",
"jsonwebtoken",
"lending-iterator",
"mimalloc",
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index bbc02478..3c0e17e7 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -56,6 +56,7 @@ flume = { workspace = true }
futures = { workspace = true }
human-repr = { workspace = true }
iggy_common = { workspace = true }
+io-uring = "0.6"
jsonwebtoken = "9.3.1"
lending-iterator = "0.1.7"
mimalloc = { workspace = true, optional = true }
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index e69de29b..8eacf223 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -0,0 +1,20 @@
+use crate::shard::{connector::ShardConnector, frame::ShardFrame};
+use std::ops::Range;
+
+pub fn create_shard_connections(shards_set: Range<usize>) ->
Vec<ShardConnector<ShardFrame>> {
+ let shards_count = shards_set.len();
+ let connectors = shards_set
+ .into_iter()
+ .map(|id| ShardConnector::new(id as u16, shards_count))
+ .collect();
+
+ connectors
+}
+
+pub async fn create_directories() {
+
+}
+
+pub async fn create_root_user() {
+
+}
diff --git a/core/server/src/lib.rs b/core/server/src/lib.rs
index 572301e0..e64837da 100644
--- a/core/server/src/lib.rs
+++ b/core/server/src/lib.rs
@@ -26,18 +26,18 @@ static GLOBAL: MiMalloc = MiMalloc;
#[cfg(windows)]
compile_error!("iggy-server doesn't support windows.");
-mod bootstrap;
pub mod archiver;
pub mod args;
pub mod binary;
+pub mod bootstrap;
pub mod channels;
pub(crate) mod compat;
pub mod configs;
-pub mod shard;
pub mod http;
pub mod log;
pub mod quic;
pub mod server_error;
+pub mod shard;
pub mod state;
pub mod streaming;
pub mod tcp;
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 00180faa..8f0d1053 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -16,11 +16,15 @@
* under the License.
*/
+use std::thread::available_parallelism;
+
use anyhow::Result;
use clap::Parser;
use dotenvy::dotenv;
use figlet_rs::FIGfont;
+use monoio::Buildable;
use server::args::Args;
+use server::bootstrap::{create_directories, create_root_user,
create_shard_connections};
use server::channels::commands::archive_state::ArchiveStateExecutor;
use
server::channels::commands::clean_personal_access_tokens::CleanPersonalAccessTokensExecutor;
use server::channels::commands::maintain_messages::MaintainMessagesExecutor;
@@ -37,15 +41,15 @@ use server::log::logger::Logging;
use server::log::tokio_console::Logging;
use server::quic::quic_server;
use server::server_error::ServerError;
+use server::shard::IggyShard;
use server::streaming::systems::system::{SharedSystem, System};
use server::streaming::utils::MemoryPool;
use server::tcp::tcp_server;
use tokio::time::Instant;
use tracing::{info, instrument};
-#[tokio::main]
#[instrument(skip_all, name = "trace_start_server")]
-async fn main() -> Result<(), ServerError> {
+fn main() -> Result<(), ServerError> {
let startup_timestamp = Instant::now();
let standard_font = FIGfont::standard().unwrap();
let figure = standard_font.convert("Iggy Server");
@@ -68,6 +72,24 @@ async fn main() -> Result<(), ServerError> {
let args = Args::parse();
let config_provider = config_provider::resolve(&args.config_provider)?;
+ let config = ServerConfig::default();
+ //TODO: Load config.
+ /*
+ let xd = std::thread::scope(|scope| {
+ let config = scope.spawn(move || {
+ let mut rt =
monoio::RuntimeBuilder::<monoio::IoUringDriver>::new().build().unwrap();
+ let config: Result<ServerConfig, ServerError> = rt.block_on(async {
+ let config = ServerConfig::load(&config_provider).await?;
+ create_directories().await;
+ create_root_user().await;
+ Ok(config)
+ });
+ config
+ }).join().unwrap();
+ config
+ });
+ */
+ /*
let config = ServerConfig::load(&config_provider).await?;
if args.fresh {
let system_path = config.system.get_system_path();
@@ -83,9 +105,56 @@ async fn main() -> Result<(), ServerError> {
}
let mut logging = Logging::new(config.telemetry.clone());
logging.early_init();
+ */
+ // TODO: Make this configurable from config as a range
+ // for example this instance of Iggy will use cores from 0..4
+ let available_cpus = available_parallelism()
+ .expect("Failed to get num of cores")
+ .into();
+ let shards_count = available_cpus;
+ let shards_set = 0..shards_count;
+
+ let connections = create_shard_connections(shards_set.clone());
+
+ for shard_id in shards_set {
+ let id = shard_id as u16;
+ let connections = connections.clone();
+ let server_config = config.clone();
+ std::thread::Builder::new()
+ .name(format!("shard-{id}"))
+ .spawn(move || {
+ monoio::utils::bind_to_cpu_set(Some(shard_id))
+ .expect(format!("Failed to set CPU affinity for
shard-{id}").as_str());
+ // TODO: Figure out what else we could tweak there
+ // We for sure want to disable the userspace interrupts on new
cq entry (set_coop_taskrun)
+ // let urb = io_uring::IoUring::builder();
+ // TODO: Shall we make the size of ring be configureable ?
+ let mut rt =
monoio::RuntimeBuilder::<monoio::IoUringDriver>::new()
+ //.uring_builder(urb.setup_coop_taskrun())
+ .with_entries(1024) // Default size
+ .enable_timer()
+ .build()
+ .expect(format!("Failed to build monoio runtime for
shard-{id}").as_str());
+ rt.block_on(async move {
+ let builder = IggyShard::builder();
+ let shard = builder
+ .id(id)
+ .connections(connections)
+ .server_config(server_config)
+ .build_and_init()
+ .await;
+
+ shard.assert_init();
+ })
+ })
+ .expect(format!("Failed to spawn thread for shard-{id}").as_str())
+ .join()
+ .expect(format!("Failed to join thread for shard-{id}").as_str());
+ }
// From this point on, we can use tracing macros to log messages.
+ /*
logging.late_init(config.system.get_system_path(),
&config.system.logging)?;
#[cfg(feature = "disable-mimalloc")]
@@ -102,13 +171,15 @@ async fn main() -> Result<(), ServerError> {
config.data_maintenance.clone(),
config.personal_access_token.clone(),
));
+ */
// Workaround to ensure that the statistics are initialized before the
server
// loads streams and starts accepting connections. This is necessary to
// have the correct statistics when the server starts.
- system.write().await.get_stats().await?;
- system.write().await.init().await?;
+ //system.write().await.get_stats().await?;
+ //system.write().await.init().await?;
+ /*
let _command_handler = BackgroundServerCommandHandler::new(system.clone(),
&config)
.install_handler(SaveMessagesExecutor)
.install_handler(MaintainMessagesExecutor)
@@ -184,5 +255,6 @@ async fn main() -> Result<(), ServerError> {
"Iggy server has shutdown successfully. Shutdown took {} ms.",
elapsed_time.as_millis()
);
+ */
Ok(())
}
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
new file mode 100644
index 00000000..08a02e54
--- /dev/null
+++ b/core/server/src/shard/builder.rs
@@ -0,0 +1,73 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::{configs::server::ServerConfig, shard::Shard};
+
+use super::{connector::ShardConnector, frame::ShardFrame, IggyShard};
+
+#[derive(Default)]
+pub struct IggyShardBuilder {
+ id: Option<u16>,
+ connections: Option<Vec<ShardConnector<ShardFrame>>>,
+ config: Option<ServerConfig>,
+}
+
+impl IggyShardBuilder {
+ pub fn id(mut self, id: u16) -> Self {
+ self.id = Some(id);
+ self
+ }
+
+ pub fn connections(mut self, connections: Vec<ShardConnector<ShardFrame>>)
-> Self {
+ self.connections = Some(connections);
+ self
+ }
+
+ pub fn server_config(mut self, config: ServerConfig) -> Self {
+ self.config = Some(config);
+ self
+ }
+
+ pub async fn build_and_init(self) -> IggyShard {
+ let id = self.id.unwrap();
+ let config = self.config.unwrap();
+ let connections = self.connections.unwrap();
+ let (stop_sender, stop_receiver, receiver) = connections
+ .iter()
+ .filter(|c| c.id == id)
+ .map(|c| {
+ (
+ c.stop_sender.clone(),
+ c.stop_receiver.clone(),
+ c.receiver.clone(),
+ )
+ })
+ .next()
+ .expect("Failed to find connection with the specified ID");
+ let shards = connections.into_iter().map(Shard::new).collect();
+
+ IggyShard::new(
+ id,
+ shards,
+ config,
+ stop_receiver,
+ stop_sender,
+ receiver,
+ )
+ }
+}
diff --git a/core/server/src/shard/connector.rs
b/core/server/src/shard/connector.rs
index 349218ba..1d762d68 100644
--- a/core/server/src/shard/connector.rs
+++ b/core/server/src/shard/connector.rs
@@ -54,6 +54,8 @@ impl<T: Clone> ShardConnector<T> {
}
}
+// TODO: I think those Arcs can be replaced with 'static lifetimes...
+// Those shards will live for the entire duration of the application.
#[derive(Clone)]
pub struct Receiver<T> {
channel: Arc<ShardedChannel<T>>,
diff --git a/core/server/src/shard/frame.rs b/core/server/src/shard/frame.rs
index b1524ad6..a2c1220f 100644
--- a/core/server/src/shard/frame.rs
+++ b/core/server/src/shard/frame.rs
@@ -27,8 +27,7 @@ pub enum ShardMessage {
}
#[derive(Debug, Clone)]
-pub enum ShardEvent {
-}
+pub enum ShardEvent {}
#[derive(Debug)]
pub enum ShardResponse {
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 6ccbcab3..bffaca45 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -15,22 +15,33 @@
* specific language governing permissions and limitations
* under the License.
*/
-mod connector;
-mod namespace;
-mod frame;
+pub mod builder;
+pub mod connector;
+pub mod frame;
+pub mod namespace;
-
-
-use std::cell::RefCell;
use ahash::HashMap;
-use connector::ShardConnector;
+use builder::IggyShardBuilder;
+use connector::{Receiver, ShardConnector, StopReceiver, StopSender};
use frame::ShardFrame;
use namespace::IggyNamespace;
-struct Shard {
+use std::cell::{Cell, RefCell};
+
+use crate::configs::server::ServerConfig;
+pub(crate) struct Shard {
id: u16,
connection: ShardConnector<ShardFrame>,
}
+impl Shard {
+ pub fn new(connection: ShardConnector<ShardFrame>) -> Self {
+ Self {
+ id: connection.id,
+ connection,
+ }
+ }
+}
+
struct ShardInfo {
id: u16,
}
@@ -38,22 +49,48 @@ struct ShardInfo {
pub struct IggyShard {
pub id: u16,
shards: Vec<Shard>,
- shards_table: RefCell<HashMap<IggyNamespace, ShardInfo>>,
+ //shards_table: RefCell<HashMap<IggyNamespace, ShardInfo>>,
- pub(crate) permissioner: RefCell<Permissioner>,
- pub(crate) storage: Rc<SystemStorage>,
- pub(crate) streams: RwLock<HashMap<u32, Stream>>,
- pub(crate) streams_ids: RefCell<HashMap<String, u32>>,
- pub(crate) users: RefCell<HashMap<UserId, User>>,
+ //pub(crate) permissioner: RefCell<Permissioner>,
+ //pub(crate) streams: RwLock<HashMap<u32, Stream>>,
+ //pub(crate) streams_ids: RefCell<HashMap<String, u32>>,
+ //pub(crate) users: RefCell<HashMap<UserId, User>>,
// TODO - get rid of this dynamic dispatch.
- pub(crate) state: Rc<FileState>,
- pub(crate) encryptor: Option<Rc<dyn Encryptor>>,
- pub(crate) config: ServerConfig,
- pub(crate) client_manager: RefCell<ClientManager>,
- pub(crate) active_sessions: RefCell<Vec<Session>>,
- pub(crate) metrics: Metrics,
+ //pub(crate) state: Rc<FileState>,
+ //pub(crate) encryptor: Option<Rc<dyn Encryptor>>,
+ config: ServerConfig,
+ //pub(crate) client_manager: RefCell<ClientManager>,
+ //pub(crate) active_sessions: RefCell<Vec<Session>>,
+ //pub(crate) metrics: Metrics,
pub message_receiver: Cell<Option<Receiver<ShardFrame>>>,
stop_receiver: StopReceiver,
stop_sender: StopSender,
-}
\ No newline at end of file
+}
+
+impl IggyShard {
+ pub fn builder() -> IggyShardBuilder {
+ Default::default()
+ }
+
+ pub(crate) fn new(
+ id: u16,
+ shards: Vec<Shard>,
+ config: ServerConfig,
+ stop_receiver: StopReceiver,
+ stop_sender: StopSender,
+ shard_messages_receiver: Receiver<ShardFrame>,
+ ) -> Self {
+ Self {
+ id,
+ shards,
+ config,
+ stop_receiver,
+ stop_sender,
+ message_receiver: Cell::new(Some(shard_messages_receiver)),
+ }
+ }
+
+ pub fn assert_init(&self) {
+ }
+}