This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 99aaccf20c39c8ee62966c16db2afdb48c62763a
Author: numinex <[email protected]>
AuthorDate: Thu Jun 19 10:17:42 2025 +0200

    todos
---
 Cargo.lock                         | 31 ++++++++++++++++++++++++-----
 core/sdk/src/tcp/tcp_client.rs     |  1 +
 core/server/src/binary/command.rs  |  1 +
 core/server/src/bootstrap.rs       | 27 ++++++++++++++++++-------
 core/server/src/main.rs            | 23 ++++++++--------------
 core/server/src/shard/builder.rs   | 40 +++++++++++++++++++++++++-------------
 core/server/src/shard/connector.rs |  4 ++--
 core/server/src/shard/mod.rs       | 38 ++++++++++++++++++++++++++++--------
 8 files changed, 115 insertions(+), 50 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 4ab38a87..3a7a4958 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -142,7 +142,7 @@ dependencies = [
  "actix-utils",
  "futures-core",
  "futures-util",
- "mio 1.0.3",
+ "mio 1.0.4",
  "socket2",
  "tokio",
  "tracing",
@@ -639,7 +639,7 @@ checksum = 
"fd73835ad7deb4bd2b389e6f10333b143f025d607c55ca04c66a0bcc6bb2fc6d"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.101",
+ "syn 2.0.103",
 ]
 
 [[package]]
@@ -4624,6 +4624,15 @@ version = "2.7.5"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0"
 
+[[package]]
+name = "memoffset"
+version = "0.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4"
+dependencies = [
+ "autocfg",
+]
+
 [[package]]
 name = "mimalloc"
 version = "0.1.47"
@@ -4673,6 +4682,18 @@ dependencies = [
  "adler2",
 ]
 
+[[package]]
+name = "mio"
+version = "0.8.11"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c"
+dependencies = [
+ "libc",
+ "log",
+ "wasi 0.11.1+wasi-snapshot-preview1",
+ "windows-sys 0.48.0",
+]
+
 [[package]]
 name = "mio"
 version = "1.0.4"
@@ -4761,7 +4782,7 @@ checksum = 
"176a5f5e69613d9e88337cf2a65e11135332b4efbcc628404a7c555e4452084c"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.101",
+ "syn 2.0.103",
 ]
 
 [[package]]
@@ -4856,7 +4877,7 @@ dependencies = [
  "kqueue",
  "libc",
  "log",
- "mio 1.0.3",
+ "mio 1.0.4",
  "notify-types",
  "walkdir",
  "windows-sys 0.59.0",
@@ -7403,7 +7424,7 @@ dependencies = [
  "backtrace",
  "bytes",
  "libc",
- "mio",
+ "mio 1.0.4",
  "parking_lot 0.12.4",
  "pin-project-lite",
  "signal-hook-registry",
diff --git a/core/sdk/src/tcp/tcp_client.rs b/core/sdk/src/tcp/tcp_client.rs
index 8db50940..086c5964 100644
--- a/core/sdk/src/tcp/tcp_client.rs
+++ b/core/sdk/src/tcp/tcp_client.rs
@@ -17,6 +17,7 @@
  */
 
 use crate::prelude::Client;
+use crate::prelude::IggyConsumer;
 use crate::prelude::TcpClientConfig;
 use crate::tcp::tcp_connection_stream::TcpConnectionStream;
 use crate::tcp::tcp_connection_stream_kind::ConnectionStreamKind;
diff --git a/core/server/src/binary/command.rs 
b/core/server/src/binary/command.rs
index c7802be9..0b997941 100644
--- a/core/server/src/binary/command.rs
+++ b/core/server/src/binary/command.rs
@@ -66,6 +66,7 @@ use iggy_common::update_stream::UpdateStream;
 use iggy_common::update_topic::UpdateTopic;
 use iggy_common::update_user::UpdateUser;
 use iggy_common::*;
+use rustls::crypto::hash::Output;
 use strum::EnumString;
 use tracing::error;
 
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index d51096a2..d70d0317 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -1,22 +1,19 @@
 use iggy_common::{
+    IggyError,
     defaults::{
         DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME, MAX_PASSWORD_LENGTH, 
MAX_USERNAME_LENGTH,
         MIN_PASSWORD_LENGTH, MIN_USERNAME_LENGTH,
     },
-    IggyError,
-};
-use monoio::{
-    fs::create_dir_all,
-    Buildable, Driver, Runtime,
 };
+use monoio::{fs::create_dir_all, time::TimeDriver, Buildable, Driver, Runtime};
 use tracing::info;
 
 use crate::{
+    IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV,
     configs::{config_provider::ConfigProviderKind, server::ServerConfig, 
system::SystemConfig},
     server_error::ServerError,
     shard::{connector::ShardConnector, frame::ShardFrame},
     streaming::users::user::User,
-    IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV,
 };
 use std::{env, fs::remove_dir_all, ops::Range, path::Path};
 
@@ -109,7 +106,9 @@ pub fn create_root_user() -> User {
     let username = env::var(IGGY_ROOT_USERNAME_ENV);
     let password = env::var(IGGY_ROOT_PASSWORD_ENV);
     if (username.is_ok() && password.is_err()) || (username.is_err() && 
password.is_ok()) {
-        panic!("When providing the custom root user credentials, both username 
and password must be set.");
+        panic!(
+            "When providing the custom root user credentials, both username 
and password must be set."
+        );
     }
     if username.is_ok() && password.is_ok() {
         info!("Using the custom root user credentials.");
@@ -146,3 +145,17 @@ where
     let rt = Buildable::build(builder).expect("Failed to create default 
runtime");
     rt
 }
+
+pub fn create_shard_executor() -> Runtime<TimeDriver<monoio::IoUringDriver>>
+{
+    // TODO: Figure out what else we could tweak there
+    // We for sure want to disable the userspace interrupts on new cq entry 
(set_coop_taskrun)
+    // let urb = io_uring::IoUring::builder();
+    // TODO: Shall we make the size of ring be configureable ?
+    let builder = monoio::RuntimeBuilder::<monoio::IoUringDriver>::new()
+        //.uring_builder(urb.setup_coop_taskrun()) // broken shit.
+        .with_entries(1024) // Default size
+        .enable_timer();
+    let rt = Buildable::build(builder).expect("Failed to create default 
runtime");
+    rt
+}
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index cd79cedc..e7409e49 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -23,11 +23,9 @@ use clap::Parser;
 use dotenvy::dotenv;
 use error_set::ErrContext;
 use figlet_rs::FIGfont;
-use monoio::Buildable;
 use server::args::Args;
 use server::bootstrap::{
-    create_default_executor, create_directories, create_root_user, 
create_shard_connections,
-    load_config,
+    create_default_executor, create_directories, create_root_user, 
create_shard_connections, create_shard_executor, load_config
 };
 use server::channels::commands::archive_state::ArchiveStateExecutor;
 use 
server::channels::commands::clean_personal_access_tokens::CleanPersonalAccessTokensExecutor;
@@ -112,7 +110,7 @@ fn main() -> Result<(), ServerError> {
                         }
                     }
 
-                    // Create directories. 
+                    // Create directories.
                     create_directories(&config.system).await?;
                     Ok::<(), ServerError>(())
                 })
@@ -146,16 +144,7 @@ fn main() -> Result<(), ServerError> {
                 monoio::utils::bind_to_cpu_set(Some(shard_id))
                     .expect(format!("Failed to set CPU affinity for 
shard-{id}").as_str());
 
-                // TODO: Figure out what else we could tweak there
-                // We for sure want to disable the userspace interrupts on new 
cq entry (set_coop_taskrun)
-                // let urb = io_uring::IoUring::builder();
-                // TODO: Shall we make the size of ring be configureable ?
-                let mut rt = 
monoio::RuntimeBuilder::<monoio::IoUringDriver>::new()
-                    //.uring_builder(urb.setup_coop_taskrun()) // broken shit.
-                    .with_entries(1024) // Default size
-                    .enable_timer()
-                    .build()
-                    .expect(format!("Failed to build monoio runtime for 
shard-{id}").as_str());
+                let mut rt = create_shard_executor();
                 rt.block_on(async move {
                     let builder = IggyShard::builder();
                     let mut shard = builder
@@ -165,7 +154,11 @@ fn main() -> Result<(), ServerError> {
                         .build()
                         .await;
 
-                    shard.init().await;
+                    if let Err(e) = shard.init().await {
+                        //TODO: If one of the shards fails to initialize, we 
should crash the whole program;
+                        panic!("Failed to initialize shard-{id}: {e}");
+                    }
+                    //TODO: If one of the shards fails to initialize, we 
should crash the whole program;
                     shard.assert_init();
                 })
             })
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
index 640ffb57..06366ebf 100644
--- a/core/server/src/shard/builder.rs
+++ b/core/server/src/shard/builder.rs
@@ -21,9 +21,19 @@ use std::{cell::Cell, rc::Rc, sync::Arc};
 use iggy_common::{Aes256GcmEncryptor, EncryptorKind};
 use tracing::info;
 
-use crate::{configs::server::ServerConfig, map_toggle_str, shard::Shard, 
state::{file::FileState, StateKind}, 
streaming::{persistence::persister::{FilePersister, FileWithSyncPersister, 
PersisterKind}, storage::SystemStorage}, versioning::SemanticVersion};
+use crate::{
+    configs::server::ServerConfig,
+    map_toggle_str,
+    shard::Shard,
+    state::{StateKind, file::FileState},
+    streaming::{
+        persistence::persister::{FilePersister, FileWithSyncPersister, 
PersisterKind},
+        storage::SystemStorage,
+    },
+    versioning::SemanticVersion,
+};
 
-use super::{connector::ShardConnector, frame::ShardFrame, IggyShard};
+use super::{IggyShard, connector::ShardConnector, frame::ShardFrame};
 
 #[derive(Default)]
 pub struct IggyShardBuilder {
@@ -66,8 +76,10 @@ impl IggyShardBuilder {
             .next()
             .expect("Failed to find connection with the specified ID");
         let shards = connections.into_iter().map(Shard::new).collect();
+        //TODO: This can be discrete step in the builder bootstrapped from 
main function.
         let version = SemanticVersion::current().expect("Invalid version");
 
+        //TODO: This can be discrete step in the builder bootstrapped from 
main function.
         info!(
             "Server-side encryption is {}.",
             map_toggle_str(config.system.encryption.enabled)
@@ -79,6 +91,7 @@ impl IggyShardBuilder {
             false => None,
         };
 
+        //TODO: This can be discrete step in the builder bootstrapped from 
main function.
         let state_persister = 
Self::resolve_persister(config.system.state.enforce_fsync);
         let state = Rc::new(StateKind::File(FileState::new(
             &config.system.get_state_messages_file_path(),
@@ -87,20 +100,21 @@ impl IggyShardBuilder {
             encryptor.clone(),
         )));
 
+        //TODO: This can be discrete step in the builder bootstrapped from 
main function.
         let partition_persister = 
Self::resolve_persister(config.system.partition.enforce_fsync);
-        let storage = SystemStorage::new(config.system, partition_persister);
+        let storage = Rc::new(SystemStorage::new(config.system.clone(), 
partition_persister));
 
         IggyShard {
-                id: id,
-                shards: shards,
-                shards_table: Default::default(),
-                storage: storage,
-                state: state,
-                config: config,
-                stop_receiver: stop_receiver,
-                stop_sender: stop_sender,
-                frame_receiver: Cell::new(Some(frame_receiver)),
-            }
+            id: id,
+            shards: shards,
+            shards_table: Default::default(),
+            storage: storage,
+            state: state,
+            config: config,
+            stop_receiver: stop_receiver,
+            stop_sender: stop_sender,
+            frame_receiver: Cell::new(Some(frame_receiver)),
+        }
     }
 
     fn resolve_persister(enforce_fsync: bool) -> Arc<PersisterKind> {
diff --git a/core/server/src/shard/connector.rs 
b/core/server/src/shard/connector.rs
index 1d762d68..95aedc73 100644
--- a/core/server/src/shard/connector.rs
+++ b/core/server/src/shard/connector.rs
@@ -15,10 +15,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-use futures::{task::AtomicWaker, Stream};
+use futures::{Stream, task::AtomicWaker};
 use sharded_queue::ShardedQueue;
 use std::{
-    sync::{atomic::AtomicUsize, Arc},
+    sync::{Arc, atomic::AtomicUsize},
     task::Poll,
 };
 
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 54b88fb1..b98f6381 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -25,10 +25,19 @@ use ahash::HashMap;
 use builder::IggyShardBuilder;
 use connector::{Receiver, ShardConnector, StopReceiver, StopSender};
 use frame::ShardFrame;
+use iggy_common::IggyError;
 use namespace::IggyNamespace;
-use std::{cell::{Cell, RefCell}, rc::Rc, sync::Arc};
+use tracing::info;
+use std::{
+    cell::{Cell, RefCell},
+    rc::Rc,
+    sync::Arc, time::Instant,
+};
 
-use crate::{bootstrap::create_root_user, configs::server::ServerConfig, 
state::file::FileState, streaming::storage::SystemStorage};
+use crate::{
+    bootstrap::create_root_user, configs::server::ServerConfig, 
state::{file::FileState, StateKind},
+    streaming::storage::SystemStorage,
+};
 pub(crate) struct Shard {
     id: u16,
     connection: ShardConnector<ShardFrame>,
@@ -57,10 +66,10 @@ pub struct IggyShard {
     //pub(crate) streams_ids: RefCell<HashMap<String, u32>>,
     //pub(crate) users: RefCell<HashMap<UserId, User>>,
     // TODO: Refactor.
-    pub(crate) storage: Arc<SystemStorage>,
+    pub(crate) storage: Rc<SystemStorage>,
 
     // TODO - get rid of this dynamic dispatch.
-    pub(crate) state: Rc<FileState>,
+    pub(crate) state: Rc<StateKind>,
     //pub(crate) encryptor: Option<Rc<dyn Encryptor>>,
     config: ServerConfig,
     //pub(crate) client_manager: RefCell<ClientManager>,
@@ -76,14 +85,27 @@ impl IggyShard {
         Default::default()
     }
 
-    pub async fn init(&mut self) {
-        let user = create_root_user();
+    pub async fn init(&mut self) -> Result<(), IggyError> {
+        let now = Instant::now();
+        //TODO: Fix this either by moving it to main function, or by using 
`run_once` barrier.
+        //let state_entries = self.state.init().await?;
+        //let system_state = SystemState::init(state_entries).await?;
+        //let user = create_root_user();
         self.load_state().await;
         self.load_users().await;
         // Add default root user.
-        todo!();
         self.load_streams().await;
-
+        //TODO: Fix the archiver.
+        /*
+        if let Some(archiver) = self.archiver.as_ref() {
+            archiver
+                .init()
+                .await
+                .expect("Failed to initialize archiver");
+        }
+        */
+        info!("Initialized system in {} ms.", now.elapsed().as_millis());
+        Ok(())
     }
 
     async fn load_state(&self) {

Reply via email to