This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit a435b97d34da47a2658a70857aa3012b695cd7f6
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Wed May 14 20:38:55 2025 +0200

    feat(io_uring): bootstrap the main function (#1788)
---
 Cargo.lock                         |  1 +
 core/server/Cargo.toml             |  1 +
 core/server/src/bootstrap.rs       | 20 ++++++++++
 core/server/src/lib.rs             |  4 +-
 core/server/src/main.rs            | 80 ++++++++++++++++++++++++++++++++++++--
 core/server/src/shard/builder.rs   | 73 ++++++++++++++++++++++++++++++++++
 core/server/src/shard/connector.rs |  2 +
 core/server/src/shard/frame.rs     |  3 +-
 core/server/src/shard/mod.rs       | 79 +++++++++++++++++++++++++++----------
 9 files changed, 234 insertions(+), 29 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index d9ba5d6a..4ab38a87 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -6807,6 +6807,7 @@ dependencies = [
  "futures",
  "human-repr",
  "iggy_common",
+ "io-uring",
  "jsonwebtoken",
  "lending-iterator",
  "mimalloc",
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 6a526df6..3c6bccd5 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -59,6 +59,7 @@ flume = { workspace = true }
 futures = { workspace = true }
 human-repr = { workspace = true }
 iggy_common = { workspace = true }
+io-uring = "0.6"
 jsonwebtoken = "9.3.1"
 lending-iterator = "0.1.7"
 mimalloc = { workspace = true, optional = true }
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index e69de29b..8eacf223 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -0,0 +1,20 @@
+use crate::shard::{connector::ShardConnector, frame::ShardFrame};
+use std::ops::Range;
+
+pub fn create_shard_connections(shards_set: Range<usize>) -> 
Vec<ShardConnector<ShardFrame>> {
+    let shards_count = shards_set.len();
+    let connectors = shards_set
+        .into_iter()
+        .map(|id| ShardConnector::new(id as u16, shards_count))
+        .collect();
+
+    connectors
+}
+
+pub async fn create_directories() {
+
+}
+
+pub async fn create_root_user() {
+
+}
diff --git a/core/server/src/lib.rs b/core/server/src/lib.rs
index 572301e0..e64837da 100644
--- a/core/server/src/lib.rs
+++ b/core/server/src/lib.rs
@@ -26,18 +26,18 @@ static GLOBAL: MiMalloc = MiMalloc;
 #[cfg(windows)]
 compile_error!("iggy-server doesn't support windows.");
 
-mod bootstrap;
 pub mod archiver;
 pub mod args;
 pub mod binary;
+pub mod bootstrap;
 pub mod channels;
 pub(crate) mod compat;
 pub mod configs;
-pub mod shard;
 pub mod http;
 pub mod log;
 pub mod quic;
 pub mod server_error;
+pub mod shard;
 pub mod state;
 pub mod streaming;
 pub mod tcp;
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index d9d9d753..bb92796d 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -16,11 +16,15 @@
  * under the License.
  */
 
+use std::thread::available_parallelism;
+
 use anyhow::Result;
 use clap::Parser;
 use dotenvy::dotenv;
 use figlet_rs::FIGfont;
+use monoio::Buildable;
 use server::args::Args;
+use server::bootstrap::{create_directories, create_root_user, 
create_shard_connections};
 use server::channels::commands::archive_state::ArchiveStateExecutor;
 use 
server::channels::commands::clean_personal_access_tokens::CleanPersonalAccessTokensExecutor;
 use server::channels::commands::maintain_messages::MaintainMessagesExecutor;
@@ -37,15 +41,15 @@ use server::log::logger::Logging;
 use server::log::tokio_console::Logging;
 use server::quic::quic_server;
 use server::server_error::ServerError;
+use server::shard::IggyShard;
 use server::streaming::systems::system::{SharedSystem, System};
 use server::streaming::utils::MemoryPool;
 use server::tcp::tcp_server;
 use tokio::time::Instant;
 use tracing::{info, instrument};
 
-#[tokio::main]
 #[instrument(skip_all, name = "trace_start_server")]
-async fn main() -> Result<(), ServerError> {
+fn main() -> Result<(), ServerError> {
     let startup_timestamp = Instant::now();
     let standard_font = FIGfont::standard().unwrap();
     let figure = standard_font.convert("Iggy Server");
@@ -68,6 +72,24 @@ async fn main() -> Result<(), ServerError> {
 
     let args = Args::parse();
     let config_provider = config_provider::resolve(&args.config_provider)?;
+    let config = ServerConfig::default();
+    //TODO: Load config.
+    /*
+    let xd = std::thread::scope(|scope| {
+        let config = scope.spawn(move || {
+            let mut rt = 
monoio::RuntimeBuilder::<monoio::IoUringDriver>::new().build().unwrap();
+            let config: Result<ServerConfig, ServerError> = rt.block_on(async {
+                let config = ServerConfig::load(&config_provider).await?;
+                create_directories().await;
+                create_root_user().await;
+                Ok(config)
+            });
+            config
+        }).join().unwrap();
+        config
+    });
+    */
+    /*
     let config = ServerConfig::load(&config_provider).await?;
     if args.fresh {
         let system_path = config.system.get_system_path();
@@ -83,9 +105,56 @@ async fn main() -> Result<(), ServerError> {
     }
     let mut logging = Logging::new(config.telemetry.clone());
     logging.early_init();
+    */
+    // TODO: Make this configurable from config as a range
+    // for example this instance of Iggy will use cores from 0..4
+    let available_cpus = available_parallelism()
+        .expect("Failed to get num of cores")
+        .into();
+    let shards_count = available_cpus;
+    let shards_set = 0..shards_count;
+
+    let connections = create_shard_connections(shards_set.clone());
+
+    for shard_id in shards_set {
+        let id = shard_id as u16;
+        let connections = connections.clone();
+        let server_config = config.clone();
+        std::thread::Builder::new()
+            .name(format!("shard-{id}"))
+            .spawn(move || {
+                monoio::utils::bind_to_cpu_set(Some(shard_id))
+                    .expect(format!("Failed to set CPU affinity for 
shard-{id}").as_str());
 
+                // TODO: Figure out what else we could tweak there
+                // We for sure want to disable the userspace interrupts on new 
cq entry (set_coop_taskrun)
+                // let urb = io_uring::IoUring::builder();
+                // TODO: Shall we make the size of ring be configureable ?
+                let mut rt = 
monoio::RuntimeBuilder::<monoio::IoUringDriver>::new()
+                    //.uring_builder(urb.setup_coop_taskrun())
+                    .with_entries(1024) // Default size
+                    .enable_timer()
+                    .build()
+                    .expect(format!("Failed to build monoio runtime for 
shard-{id}").as_str());
+                rt.block_on(async move {
+                    let builder = IggyShard::builder();
+                    let shard = builder
+                        .id(id)
+                        .connections(connections)
+                        .server_config(server_config)
+                        .build_and_init()
+                        .await;
+
+                    shard.assert_init();
+                })
+            })
+            .expect(format!("Failed to spawn thread for shard-{id}").as_str())
+            .join()
+            .expect(format!("Failed to join thread for shard-{id}").as_str());
+    }
     // From this point on, we can use tracing macros to log messages.
 
+    /*
     logging.late_init(config.system.get_system_path(), 
&config.system.logging)?;
 
     #[cfg(feature = "disable-mimalloc")]
@@ -102,13 +171,15 @@ async fn main() -> Result<(), ServerError> {
         config.data_maintenance.clone(),
         config.personal_access_token.clone(),
     ));
+    */
 
     // Workaround to ensure that the statistics are initialized before the 
server
     // loads streams and starts accepting connections. This is necessary to
     // have the correct statistics when the server starts.
-    system.write().await.get_stats().await?;
-    system.write().await.init().await?;
+    //system.write().await.get_stats().await?;
+    //system.write().await.init().await?;
 
+    /*
     let _command_handler = BackgroundServerCommandHandler::new(system.clone(), 
&config)
         .install_handler(SaveMessagesExecutor)
         .install_handler(MaintainMessagesExecutor)
@@ -184,5 +255,6 @@ async fn main() -> Result<(), ServerError> {
         "Iggy server has shutdown successfully. Shutdown took {} ms.",
         elapsed_time.as_millis()
     );
+    */
     Ok(())
 }
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
new file mode 100644
index 00000000..08a02e54
--- /dev/null
+++ b/core/server/src/shard/builder.rs
@@ -0,0 +1,73 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::{configs::server::ServerConfig, shard::Shard};
+
+use super::{connector::ShardConnector, frame::ShardFrame, IggyShard};
+
+#[derive(Default)]
+pub struct IggyShardBuilder {
+    id: Option<u16>,
+    connections: Option<Vec<ShardConnector<ShardFrame>>>,
+    config: Option<ServerConfig>,
+}
+
+impl IggyShardBuilder {
+    pub fn id(mut self, id: u16) -> Self {
+        self.id = Some(id);
+        self
+    }
+
+    pub fn connections(mut self, connections: Vec<ShardConnector<ShardFrame>>) 
-> Self {
+        self.connections = Some(connections);
+        self
+    }
+
+    pub fn server_config(mut self, config: ServerConfig) -> Self {
+        self.config = Some(config);
+        self
+    }
+
+    pub async fn build_and_init(self) -> IggyShard {
+        let id = self.id.unwrap();
+        let config = self.config.unwrap();
+        let connections = self.connections.unwrap();
+        let (stop_sender, stop_receiver, receiver) = connections
+            .iter()
+            .filter(|c| c.id == id)
+            .map(|c| {
+                (
+                    c.stop_sender.clone(),
+                    c.stop_receiver.clone(),
+                    c.receiver.clone(),
+                )
+            })
+            .next()
+            .expect("Failed to find connection with the specified ID");
+        let shards = connections.into_iter().map(Shard::new).collect();
+
+        IggyShard::new(
+            id,
+            shards,
+            config,
+            stop_receiver,
+            stop_sender,
+            receiver,
+        )
+    }
+}
diff --git a/core/server/src/shard/connector.rs 
b/core/server/src/shard/connector.rs
index 349218ba..1d762d68 100644
--- a/core/server/src/shard/connector.rs
+++ b/core/server/src/shard/connector.rs
@@ -54,6 +54,8 @@ impl<T: Clone> ShardConnector<T> {
     }
 }
 
+// TODO: I think those Arcs can be replaced with 'static lifetimes...
+// Those shards will live for the entire duration of the application.
 #[derive(Clone)]
 pub struct Receiver<T> {
     channel: Arc<ShardedChannel<T>>,
diff --git a/core/server/src/shard/frame.rs b/core/server/src/shard/frame.rs
index b1524ad6..a2c1220f 100644
--- a/core/server/src/shard/frame.rs
+++ b/core/server/src/shard/frame.rs
@@ -27,8 +27,7 @@ pub enum ShardMessage {
 }
 
 #[derive(Debug, Clone)]
-pub enum ShardEvent {
-}
+pub enum ShardEvent {}
 
 #[derive(Debug)]
 pub enum ShardResponse {
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 6ccbcab3..bffaca45 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -15,22 +15,33 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-mod connector;
-mod namespace;
-mod frame;
+pub mod builder;
+pub mod connector;
+pub mod frame;
+pub mod namespace;
 
-
-
-use std::cell::RefCell;
 use ahash::HashMap;
-use connector::ShardConnector;
+use builder::IggyShardBuilder;
+use connector::{Receiver, ShardConnector, StopReceiver, StopSender};
 use frame::ShardFrame;
 use namespace::IggyNamespace;
-struct Shard {
+use std::cell::{Cell, RefCell};
+
+use crate::configs::server::ServerConfig;
+pub(crate) struct Shard {
     id: u16,
     connection: ShardConnector<ShardFrame>,
 }
 
+impl Shard {
+    pub fn new(connection: ShardConnector<ShardFrame>) -> Self {
+        Self {
+            id: connection.id,
+            connection,
+        }
+    }
+}
+
 struct ShardInfo {
     id: u16,
 }
@@ -38,22 +49,48 @@ struct ShardInfo {
 pub struct IggyShard {
     pub id: u16,
     shards: Vec<Shard>,
-    shards_table: RefCell<HashMap<IggyNamespace, ShardInfo>>,
+    //shards_table: RefCell<HashMap<IggyNamespace, ShardInfo>>,
 
-    pub(crate) permissioner: RefCell<Permissioner>,
-    pub(crate) storage: Rc<SystemStorage>,
-    pub(crate) streams: RwLock<HashMap<u32, Stream>>,
-    pub(crate) streams_ids: RefCell<HashMap<String, u32>>,
-    pub(crate) users: RefCell<HashMap<UserId, User>>,
+    //pub(crate) permissioner: RefCell<Permissioner>,
+    //pub(crate) streams: RwLock<HashMap<u32, Stream>>,
+    //pub(crate) streams_ids: RefCell<HashMap<String, u32>>,
+    //pub(crate) users: RefCell<HashMap<UserId, User>>,
 
     // TODO - get rid of this dynamic dispatch.
-    pub(crate) state: Rc<FileState>,
-    pub(crate) encryptor: Option<Rc<dyn Encryptor>>,
-    pub(crate) config: ServerConfig,
-    pub(crate) client_manager: RefCell<ClientManager>,
-    pub(crate) active_sessions: RefCell<Vec<Session>>,
-    pub(crate) metrics: Metrics,
+    //pub(crate) state: Rc<FileState>,
+    //pub(crate) encryptor: Option<Rc<dyn Encryptor>>,
+    config: ServerConfig,
+    //pub(crate) client_manager: RefCell<ClientManager>,
+    //pub(crate) active_sessions: RefCell<Vec<Session>>,
+    //pub(crate) metrics: Metrics,
     pub message_receiver: Cell<Option<Receiver<ShardFrame>>>,
     stop_receiver: StopReceiver,
     stop_sender: StopSender,
-}
\ No newline at end of file
+}
+
+impl IggyShard {
+    pub fn builder() -> IggyShardBuilder {
+        Default::default()
+    }
+
+    pub(crate) fn new(
+        id: u16,
+        shards: Vec<Shard>,
+        config: ServerConfig,
+        stop_receiver: StopReceiver,
+        stop_sender: StopSender,
+        shard_messages_receiver: Receiver<ShardFrame>,
+    ) -> Self {
+        Self {
+            id,
+            shards,
+            config,
+            stop_receiver,
+            stop_sender,
+            message_receiver: Cell::new(Some(shard_messages_receiver)),
+        }
+    }
+
+    pub fn assert_init(&self) {
+    }
+}

Reply via email to