This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 9c6b9ffcd44f58f69433b8227951bab5d3ca8ac5
Author: numinex <[email protected]>
AuthorDate: Tue Jun 24 11:52:06 2025 +0200

    temp
---
 Cargo.lock                                         |  85 +++++++++--
 core/server/Cargo.toml                             |   3 +-
 core/server/src/binary/sender.rs                   |  16 +--
 core/server/src/configs/server.rs                  |   3 +-
 core/server/src/main.rs                            |   8 +-
 core/server/src/quic/listener.rs                   |   3 +
 core/server/src/shard/builder.rs                   |  16 ++-
 core/server/src/shard/gate.rs                      |   2 +-
 core/server/src/shard/mod.rs                       | 158 ++++++++++++++-------
 core/server/src/shard/namespace.rs                 |  17 ++-
 .../{streaming/systems => shard/system}/clients.rs |   2 +-
 .../systems => shard/system}/consumer_groups.rs    |   2 +-
 .../systems => shard/system}/consumer_offsets.rs   |   2 +-
 .../{streaming/systems => shard/system}/info.rs    |   0
 .../systems => shard/system}/messages.rs           |   5 +-
 .../src/{streaming/systems => shard/system}/mod.rs |   2 +-
 .../systems => shard/system}/partitions.rs         |   2 +-
 .../system}/personal_access_tokens.rs              |   2 +-
 .../systems => shard/system}/segments.rs           |   2 +-
 .../systems => shard/system}/snapshot/mod.rs       |   0
 .../systems => shard/system}/snapshot/procdump.rs  |   0
 .../{streaming/systems => shard/system}/stats.rs   |   2 +-
 .../{streaming/systems => shard/system}/storage.rs |   2 -
 .../{streaming/systems => shard/system}/streams.rs | 142 +-----------------
 .../{streaming/systems => shard/system}/system.rs  |   0
 .../{streaming/systems => shard/system}/topics.rs  |   0
 .../{streaming/systems => shard/system}/users.rs   | 106 +-------------
 core/server/src/streaming/mod.rs                   |   1 -
 core/server/src/streaming/storage.rs               |   5 +-
 core/server/src/streaming/streams/stream.rs        |   9 +-
 core/server/src/tcp/sender.rs                      |  16 +--
 core/server/src/tcp/tcp_listener.rs                |  61 +++-----
 core/server/src/tcp/tcp_sender.rs                  |   3 +-
 core/server/src/tcp/tcp_server.rs                  |  21 ++-
 core/server/src/tcp/tcp_socket.rs                  |   1 -
 core/server/src/tcp/tcp_tls_sender.rs              |   5 +-
 36 files changed, 295 insertions(+), 409 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 38cb7c1c..79f73a27 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -651,7 +651,7 @@ checksum = 
"fd73835ad7deb4bd2b389e6f10333b143f025d607c55ca04c66a0bcc6bb2fc6d"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.103",
+ "syn 2.0.104",
 ]
 
 [[package]]
@@ -1677,6 +1677,16 @@ dependencies = [
  "version_check",
 ]
 
+[[package]]
+name = "core-foundation"
+version = "0.9.4"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f"
+dependencies = [
+ "core-foundation-sys",
+ "libc",
+]
+
 [[package]]
 name = "core-foundation"
 version = "0.10.1"
@@ -3310,6 +3320,12 @@ dependencies = [
  "byteorder",
 ]
 
+[[package]]
+name = "hash32"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "8e4e11d13d42fc8d55326b323c63978c75721fbbb695a6e6686765bcb8b33917"
+
 [[package]]
 name = "hashbrown"
 version = "0.12.3"
@@ -3365,7 +3381,7 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "cdc6457c0eb62c71aac4bc17216026d8410337c4126773b9c5daba343f17964f"
 dependencies = [
  "atomic-polyfill",
- "hash32",
+ "hash32 0.2.1",
  "rustc_version",
  "serde",
  "spin",
@@ -4786,6 +4802,15 @@ dependencies = [
  "windows-sys 0.48.0",
 ]
 
+[[package]]
+name = "monoio-io-wrapper"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "4bcfaa76e5daf87cc4d31b4d1b6bc93c12db59c19df50b9200afdbde42077655"
+dependencies = [
+ "monoio",
+]
+
 [[package]]
 name = "monoio-macros"
 version = "0.1.0"
@@ -4794,7 +4819,20 @@ checksum = 
"176a5f5e69613d9e88337cf2a65e11135332b4efbcc628404a7c555e4452084c"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.103",
+ "syn 2.0.104",
+]
+
+[[package]]
+name = "monoio-native-tls"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "b9022f5aaa19f9688f97bfcfa0c4a4318d424851995badb356674ca742652cdb"
+dependencies = [
+ "bytes",
+ "monoio",
+ "monoio-io-wrapper",
+ "native-tls",
+ "thiserror 1.0.69",
 ]
 
 [[package]]
@@ -4806,6 +4844,23 @@ dependencies = [
  "getrandom 0.2.16",
 ]
 
+[[package]]
+name = "native-tls"
+version = "0.2.14"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "87de3442987e9dbec73158d5c715e7ad9072fda936bb03d19d7fa10e00520f0e"
+dependencies = [
+ "libc",
+ "log",
+ "openssl",
+ "openssl-probe",
+ "openssl-sys",
+ "schannel",
+ "security-framework 2.11.1",
+ "security-framework-sys",
+ "tempfile",
+]
+
 [[package]]
 name = "never-say-never"
 version = "6.6.666"
@@ -6439,7 +6494,7 @@ dependencies = [
  "openssl-probe",
  "rustls-pki-types",
  "schannel",
- "security-framework",
+ "security-framework 3.2.0",
 ]
 
 [[package]]
@@ -6467,7 +6522,7 @@ version = "0.5.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "19787cda76408ec5404443dc8b31795c87cd8fec49762dc75fa727740d34acc1"
 dependencies = [
- "core-foundation",
+ "core-foundation 0.10.1",
  "core-foundation-sys",
  "jni",
  "log",
@@ -6476,7 +6531,7 @@ dependencies = [
  "rustls-native-certs",
  "rustls-platform-verifier-android",
  "rustls-webpki",
- "security-framework",
+ "security-framework 3.2.0",
  "security-framework-sys",
  "webpki-root-certs 0.26.11",
  "windows-sys 0.59.0",
@@ -6624,6 +6679,19 @@ dependencies = [
  "zeroize",
 ]
 
+[[package]]
+name = "security-framework"
+version = "2.11.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02"
+dependencies = [
+ "bitflags 2.9.1",
+ "core-foundation 0.9.4",
+ "core-foundation-sys",
+ "libc",
+ "security-framework-sys",
+]
+
 [[package]]
 name = "security-framework"
 version = "3.2.0"
@@ -6631,7 +6699,7 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "271720403f46ca04f7ba6f55d438f8bd878d6b8ca0a1046e8228c4145bcbb316"
 dependencies = [
  "bitflags 2.9.1",
- "core-foundation",
+ "core-foundation 0.10.1",
  "core-foundation-sys",
  "libc",
  "security-framework-sys",
@@ -6839,15 +6907,16 @@ dependencies = [
  "figment",
  "flume",
  "futures",
+ "hash32 1.0.0",
  "human-repr",
  "iggy_common",
- "io-uring",
  "jsonwebtoken",
  "lending-iterator",
  "mimalloc",
  "mockall",
  "moka",
  "monoio",
+ "monoio-native-tls",
  "nix 0.30.1",
  "once_cell",
  "opentelemetry",
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 33bd7db7..adbeebe3 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -60,12 +60,13 @@ async-channel = { workspace = true }
 futures = { workspace = true }
 human-repr = { workspace = true }
 iggy_common = { workspace = true }
-io-uring = "0.6"
 jsonwebtoken = "9.3.1"
 lending-iterator = "0.1.7"
+hash32 = "1.0.0"
 mimalloc = { workspace = true, optional = true }
 moka = { version = "0.12.10", features = ["future"] }
 monoio = { version = "0.2.4", features = ["mkdirat", "unlinkat"] }
+monoio-native-tls = "0.4.0"
 nix = { version = "0.30", features = ["fs"] }
 once_cell = "1.21.3"
 opentelemetry = { version = "0.30.0", features = ["trace", "logs"] }
diff --git a/core/server/src/binary/sender.rs b/core/server/src/binary/sender.rs
index 7273543c..2ccf349b 100644
--- a/core/server/src/binary/sender.rs
+++ b/core/server/src/binary/sender.rs
@@ -24,8 +24,8 @@ use crate::tcp::tcp_tls_sender::TcpTlsSender;
 use crate::{quic::quic_sender::QuicSender, server_error::ServerError};
 use iggy_common::IggyError;
 use quinn::{RecvStream, SendStream};
-use tokio::net::TcpStream;
-use tokio_rustls::server::TlsStream;
+use monoio::net::TcpStream;
+use tokio_native_tls::TlsStream;
 
 macro_rules! forward_async_methods {
     (
@@ -48,22 +48,22 @@ macro_rules! forward_async_methods {
 }
 
 pub trait Sender {
-    fn read(&mut self, buffer: &mut [u8]) -> impl Future<Output = 
Result<usize, IggyError>> + Send;
-    fn send_empty_ok_response(&mut self) -> impl Future<Output = Result<(), 
IggyError>> + Send;
+    fn read(&mut self, buffer: &mut [u8]) -> impl Future<Output = 
Result<usize, IggyError>>;
+    fn send_empty_ok_response(&mut self) -> impl Future<Output = Result<(), 
IggyError>>; 
     fn send_ok_response(
         &mut self,
         payload: &[u8],
-    ) -> impl Future<Output = Result<(), IggyError>> + Send;
+    ) -> impl Future<Output = Result<(), IggyError>>; 
     fn send_ok_response_vectored(
         &mut self,
         length: &[u8],
         slices: Vec<IoSlice<'_>>,
-    ) -> impl Future<Output = Result<(), IggyError>> + Send;
+    ) -> impl Future<Output = Result<(), IggyError>>;
     fn send_error_response(
         &mut self,
         error: IggyError,
-    ) -> impl Future<Output = Result<(), IggyError>> + Send;
-    fn shutdown(&mut self) -> impl Future<Output = Result<(), ServerError>> + 
Send;
+    ) -> impl Future<Output = Result<(), IggyError>>; 
+    fn shutdown(&mut self) -> impl Future<Output = Result<(), ServerError>>; 
 }
 
 #[allow(clippy::large_enum_variant)]
diff --git a/core/server/src/configs/server.rs 
b/core/server/src/configs/server.rs
index 252df9a7..059fd1c4 100644
--- a/core/server/src/configs/server.rs
+++ b/core/server/src/configs/server.rs
@@ -31,6 +31,7 @@ use iggy_common::Validatable;
 use serde::{Deserialize, Serialize};
 use serde_with::DisplayFromStr;
 use serde_with::serde_as;
+use std::rc::Rc;
 use std::str::FromStr;
 use std::sync::Arc;
 
@@ -40,7 +41,7 @@ pub struct ServerConfig {
     pub message_saver: MessageSaverConfig,
     pub personal_access_token: PersonalAccessTokenConfig,
     pub heartbeat: HeartbeatConfig,
-    pub system: Arc<SystemConfig>,
+    pub system: Rc<SystemConfig>,
     pub quic: QuicConfig,
     pub tcp: TcpConfig,
     pub http: HttpConfig,
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index fc2b21f4..06b5f445 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -235,7 +235,7 @@ fn main() -> Result<(), ServerError> {
                     .await;
 
                     let builder = IggyShard::builder();
-                    let mut shard = builder
+                    let shard: Rc<IggyShard> = builder
                         .id(id)
                         .connections(connections)
                         .config(config)
@@ -243,13 +243,13 @@ fn main() -> Result<(), ServerError> {
                         .version(version)
                         .state(state)
                         .build()
-                        .await;
+                        .into();
 
                     //TODO: If one of the shards fails to initialize, we 
should crash the whole program;
-                    shard.init().await.expect("Failed to initialize 
shard-{id}: {e}");
-                    info!("Initiated shard with ID: {id}");
+                    shard.run().await.expect("Failed to run shard");
                     //TODO: If one of the shards fails to initialize, we 
should crash the whole program;
                     shard.assert_init();
+                    let shard = Rc::new(shard);
                 })
             })
             .expect(format!("Failed to spawn thread for shard-{id}").as_str())
diff --git a/core/server/src/quic/listener.rs b/core/server/src/quic/listener.rs
index b5eb859d..e9b329de 100644
--- a/core/server/src/quic/listener.rs
+++ b/core/server/src/quic/listener.rs
@@ -34,6 +34,8 @@ pub fn start(endpoint: Endpoint, system: SharedSystem) {
     for _ in 0..LISTENERS_COUNT {
         let endpoint = endpoint.clone();
         let system = system.clone();
+        //TODO: Fixme
+        /*
         tokio::spawn(async move {
             while let Some(incoming_connection) = endpoint.accept().await {
                 info!(
@@ -57,6 +59,7 @@ pub fn start(endpoint: Endpoint, system: SharedSystem) {
                 });
             }
         });
+        */
     }
 }
 
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
index 0fd5250f..0428a483 100644
--- a/core/server/src/shard/builder.rs
+++ b/core/server/src/shard/builder.rs
@@ -26,8 +26,8 @@ use crate::{
     configs::server::ServerConfig,
     map_toggle_str,
     shard::Shard,
-    state::{StateKind, file::FileState},
-    streaming::storage::SystemStorage,
+    state::{file::FileState, StateKind},
+    streaming::{diagnostics::metrics::Metrics, storage::SystemStorage},
     versioning::SemanticVersion,
 };
 
@@ -75,11 +75,12 @@ impl IggyShardBuilder {
     }
 
     // TODO: Too much happens in there, some of those bootstrapping logic 
should be moved outside.
-    pub async fn build(self) -> IggyShard {
+    pub fn build(self) -> IggyShard {
         let id = self.id.unwrap();
         let config = self.config.unwrap();
         let connections = self.connections.unwrap();
         let state = self.state.unwrap();
+        let encryptor = self.encryptor;
         let version = self.version.unwrap();
         let (stop_sender, stop_receiver, frame_receiver) = connections
             .iter()
@@ -106,12 +107,21 @@ impl IggyShardBuilder {
             shards: shards,
             shards_table: Default::default(),
             storage: storage,
+            encryptor: encryptor,
             state: state,
             config: config,
             version: version,
             stop_receiver: stop_receiver,
             stop_sender: stop_sender,
             frame_receiver: Cell::new(Some(frame_receiver)),
+            metrics: Metrics::init(),
+
+            users: Default::default(),
+            permissioner: Default::default(),
+            streams: Default::default(),
+            streams_ids: Default::default(),
+            client_manager: Default::default(),
+            active_sessions: Default::default(),
         }
     }
 }
diff --git a/core/server/src/shard/gate.rs b/core/server/src/shard/gate.rs
index a26950a4..2ac20445 100644
--- a/core/server/src/shard/gate.rs
+++ b/core/server/src/shard/gate.rs
@@ -1,4 +1,4 @@
-use std::sync::{Condvar, Mutex};
+use std::sync::Mutex;
 
 #[derive(Default)]
 pub struct Gate<T> {
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 23ef1e15..3e7e10f1 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -17,39 +17,37 @@
  */
 
 pub mod builder;
+pub mod system;
 pub mod gate;
 pub mod namespace;
 pub mod transmission;
 
-use ahash::HashMap;
+use ahash::{AHashMap, AHashSet, HashMap};
 use builder::IggyShardBuilder;
 use error_set::ErrContext;
-use iggy_common::IggyError;
+use futures::future::try_join_all;
+use iggy_common::{EncryptorKind, IggyError, UserId};
 use namespace::IggyNamespace;
 use std::{
-    cell::{Cell, RefCell},
-    rc::Rc,
-    str::FromStr,
-    sync::Arc,
-    time::Instant,
+    cell::{Cell, RefCell}, pin::Pin, rc::Rc, str::FromStr, 
sync::{atomic::{AtomicU32, Ordering}, Arc, RwLock}, time::Instant
 };
-use tracing::info;
+use tracing::{error, info, instrument, trace, warn};
 use transmission::connector::{Receiver, ShardConnector, StopReceiver, 
StopSender};
 
 use crate::{
-    bootstrap::create_root_user,
     configs::server::ServerConfig,
-    shard::transmission::frame::ShardFrame,
+    shard::{system::info::SystemInfo, transmission::frame::ShardFrame},
     state::{
-        StateKind,
-        file::FileState,
-        system::{SystemState, UserState},
+        file::FileState, system::{StreamState, SystemState, UserState}, 
StateKind
     },
-    streaming::{storage::SystemStorage, systems::info::SystemInfo},
+    streaming::{clients::client_manager::ClientManager, 
diagnostics::metrics::Metrics, 
personal_access_tokens::personal_access_token::PersonalAccessToken, 
session::Session, storage::SystemStorage, streams::stream::Stream, 
users::{permissioner::Permissioner, user::User}},
     versioning::SemanticVersion,
 };
 
 pub const COMPONENT: &str = "SHARD";
+static USER_ID: AtomicU32 = AtomicU32::new(1);
+
+type Task = Pin<Box<dyn Future<Output = Result<(), IggyError>>>>;
 
 pub(crate) struct Shard {
     id: u16,
@@ -75,19 +73,21 @@ pub struct IggyShard {
     shards_table: RefCell<HashMap<IggyNamespace, ShardInfo>>,
     version: SemanticVersion,
 
-    //pub(crate) permissioner: RefCell<Permissioner>,
-    //pub(crate) streams: RwLock<HashMap<u32, Stream>>,
-    //pub(crate) streams_ids: RefCell<HashMap<String, u32>>,
-    //pub(crate) users: RefCell<HashMap<UserId, User>>,
+    pub(crate) streams: RefCell<HashMap<u32, Stream>>,
+    pub(crate) streams_ids: RefCell<HashMap<String, u32>>,
     // TODO: Refactor.
     pub(crate) storage: Rc<SystemStorage>,
 
     pub(crate) state: StateKind,
-    //pub(crate) encryptor: Option<Rc<dyn Encryptor>>,
-    config: ServerConfig,
-    //pub(crate) client_manager: RefCell<ClientManager>,
-    //pub(crate) active_sessions: RefCell<Vec<Session>>,
-    //pub(crate) metrics: Metrics,
+    pub(crate) encryptor: Option<EncryptorKind>,
+    pub(crate) config: ServerConfig,
+    //TODO: This could be shared.
+    pub(crate) client_manager: RefCell<ClientManager>,
+    pub(crate) active_sessions: RefCell<Vec<Session>>,
+    pub(crate) permissioner: RefCell<Permissioner>,
+    pub(crate) users: RefCell<HashMap<UserId, User>>,
+
+    pub(crate) metrics: Metrics,
     pub frame_receiver: Cell<Option<Receiver<ShardFrame>>>,
     stop_receiver: StopReceiver,
     stop_sender: StopSender,
@@ -98,16 +98,12 @@ impl IggyShard {
         Default::default()
     }
 
-    pub async fn init(&mut self) -> Result<(), IggyError> {
+    pub async fn init(&self) -> Result<(), IggyError> {
         let now = Instant::now();
-        //TODO: Fix this either by moving it to main function, or by using 
`run_once` barrier.
-        //let state_entries = self.state.init().await?;
-        //let system_state = SystemState::init(state_entries).await?;
-        //let user = create_root_user();
+
         self.load_version().await?;
         let SystemState { users, streams } = self.load_state().await?;
         self.load_users(users.into_values().collect()).await;
-        // Add default root user.
         self.load_streams(streams.into_values().collect()).await;
 
         //TODO: Fix the archiver.
@@ -123,6 +119,27 @@ impl IggyShard {
         Ok(())
     }
 
+    pub async fn run(self: &Rc<Self>) -> Result<(), IggyError> {
+        // Workaround to ensure that the statistics are initialized before the 
server
+        // loads streams and starts accepting connections. This is necessary to
+        // have the correct statistics when the server starts.
+        //self.get_stats().await?;
+        self.init().await?;
+        self.assert_init();
+        info!("Initiated shard with ID: {}", self.id);
+        // Create all tasks (tcp listener, http listener, command processor, 
in the future also the background jobs).
+        /*
+        let mut tasks: Vec<Task> = 
vec![Box::pin(spawn_shard_message_task(shard.clone()))];
+        if self.config.tcp.enabled {
+            tasks.push(Box::pin(spawn_tcp_server(self.clone())));
+        }
+        let result = try_join_all(tasks).await;
+        result?;
+        */
+
+        Ok(())
+    }
+
     async fn load_version(&self) -> Result<(), IggyError> {
         async fn update_system_info(
             storage: &Rc<SystemStorage>,
@@ -181,10 +198,8 @@ impl IggyShard {
         Ok(system_state)
     }
 
-    async fn load_users(&mut self, users: Vec<UserState>) -> Result<(), 
IggyError> {
+    async fn load_users(&self, users: Vec<UserState>) -> Result<(), IggyError> 
{
         info!("Loading users...");
-        /*
-
         for user_state in users.into_iter() {
             let mut user = User::with_password(
                 user_state.id,
@@ -210,32 +225,34 @@ impl IggyShard {
                     )
                 })
                 .collect();
-            self.users.insert(user_state.id, user);
+            self.users.borrow_mut().insert(user_state.id, user);
         }
 
-        let users_count = self.users.len();
-        let current_user_id = self.users.keys().max().unwrap_or(&1);
+        let users = self.users.borrow();
+        let users_count = users.len();
+        let current_user_id = users.keys().max().unwrap_or(&1);
         USER_ID.store(current_user_id + 1, Ordering::SeqCst);
         self.permissioner
-            .init(&self.users.values().collect::<Vec<&User>>());
+            .borrow_mut()
+            .init(&users.values().collect::<Vec<_>>());
         self.metrics.increment_users(users_count as u32);
         info!("Initialized {users_count} user(s).");
-        */
         Ok(())
     }
 
-    async fn load_streams(&mut self, streams: Vec<StreamState>) -> Result<(), 
IggyError> {
-        todo!();
+    async fn load_streams(&self, streams: Vec<StreamState>) -> Result<(), 
IggyError> {
         info!("Loading streams from disk...");
         let mut unloaded_streams = Vec::new();
-        let mut dir_entries = read_dir(&self.config.get_streams_path())
-            .await
+        // Does mononio has api for that ?
+        let mut dir_entries = 
std::fs::read_dir(&self.config.system.get_streams_path())
             .map_err(|error| {
                 error!("Cannot read streams directory: {error}");
                 IggyError::CannotReadStreams
             })?;
 
-        while let Some(dir_entry) = 
dir_entries.next_entry().await.unwrap_or(None) {
+        //TODO: User the dir walk impl from main function, once implemented.
+        while let Some(dir_entry) = dir_entries.next() {
+            let dir_entry = dir_entry.unwrap();
             let name = dir_entry.file_name().into_string().unwrap();
             let stream_id = name.parse::<u32>().map_err(|_| {
                 error!("Invalid stream ID file with name: '{name}'.");
@@ -246,7 +263,7 @@ impl IggyShard {
                 error!(
                     "Stream with ID: '{stream_id}' was not found in state, but 
exists on disk and will be removed."
                 );
-                if let Err(error) = 
fs::remove_dir_all(&dir_entry.path()).await {
+                if let Err(error) = std::fs::remove_dir_all(&dir_entry.path()) 
{
                     error!("Cannot remove stream directory: {error}");
                 } else {
                     warn!("Stream with ID: '{stream_id}' was removed.");
@@ -258,7 +275,7 @@ impl IggyShard {
             let mut stream = Stream::empty(
                 stream_id,
                 &stream_state.name,
-                self.config.clone(),
+                self.config.system.clone(),
                 self.storage.clone(),
             );
             stream.created_at = stream_state.created_at;
@@ -281,7 +298,7 @@ impl IggyShard {
             info!("All streams found on disk were found in state.");
         } else {
             warn!("Streams with IDs: '{missing_ids:?}' were not found on 
disk.");
-            if self.config.recovery.recreate_missing_state {
+            if self.config.system.recovery.recreate_missing_state {
                 info!(
                     "Recreating missing state in recovery config is enabled, 
missing streams will be created."
                 );
@@ -291,7 +308,7 @@ impl IggyShard {
                     let stream = Stream::create(
                         stream_id,
                         &stream_state.name,
-                        self.config.clone(),
+                        self.config.system.clone(),
                         self.storage.clone(),
                     );
                     stream.persist().await?;
@@ -327,12 +344,12 @@ impl IggyShard {
         try_join_all(load_stream_tasks).await?;
 
         for stream in loaded_streams.take() {
-            if self.streams.contains_key(&stream.stream_id) {
+            if self.streams.borrow().contains_key(&stream.stream_id) {
                 error!("Stream with ID: '{}' already exists.", 
&stream.stream_id);
                 continue;
             }
 
-            if self.streams_ids.contains_key(&stream.name) {
+            if self.streams_ids.borrow().contains_key(&stream.name) {
                 error!("Stream with name: '{}' already exists.", &stream.name);
                 continue;
             }
@@ -345,13 +362,52 @@ impl IggyShard {
             self.metrics.increment_messages(stream.get_messages_count());
 
             self.streams_ids
+            .borrow_mut()
                 .insert(stream.name.clone(), stream.stream_id);
-            self.streams.insert(stream.stream_id, stream);
+            self.streams.borrow_mut().insert(stream.stream_id, stream);
         }
 
-        info!("Loaded {} stream(s) from disk.", self.streams.len());
+        info!("Loaded {} stream(s) from disk.", self.streams.borrow().len());
         Ok(())
     }
 
-    pub fn assert_init(&self) {}
+    pub fn assert_init(&self) -> Result<(), IggyError> { Ok(())}
+
+    #[instrument(skip_all, name = "trace_shutdown")]
+    pub async fn shutdown(&mut self) -> Result<(), IggyError> {
+        self.persist_messages().await?;
+        Ok(())
+    }
+
+    #[instrument(skip_all, name = "trace_persist_messages")]
+    pub async fn persist_messages(&self) -> Result<usize, IggyError> {
+        trace!("Saving buffered messages on disk...");
+        let mut saved_messages_number = 0;
+        //TODO: Fixme
+        /*
+        for stream in self.streams.values() {
+            saved_messages_number += stream.persist_messages().await?;
+        }
+        */
+
+        Ok(saved_messages_number)
+    }
+
+    pub fn get_available_shards_count(&self) -> u32 {
+        self.shards.len() as u32
+    }
+
+    pub fn ensure_authenticated(&self, client_id: u32) -> Result<u32, 
IggyError> {
+        let active_sessions = self.active_sessions.borrow();
+        let session = active_sessions
+            .iter()
+            .find(|s| s.client_id == client_id)
+            .ok_or_else(|| IggyError::Unauthenticated)?;
+        if session.is_authenticated() {
+            Ok(session.get_user_id())
+        } else {
+            error!("{COMPONENT} - unauthenticated access attempt, session: 
{session}");
+            Err(IggyError::Unauthenticated)
+        }
+    }
 }
diff --git a/core/server/src/shard/namespace.rs 
b/core/server/src/shard/namespace.rs
index d9aedfb7..d9d68ec9 100644
--- a/core/server/src/shard/namespace.rs
+++ b/core/server/src/shard/namespace.rs
@@ -16,15 +16,20 @@
  * under the License.
  */
 
+use std::hash::Hasher as _;
+use iggy_common::Identifier;
+use hash32::{Hasher, Murmur3Hasher};
+
+//TODO: Will probably want to move it to separate crate so we can share it 
with sdk.
 #[derive(Debug, Clone, Eq, PartialEq, Hash)]
 pub struct IggyNamespace {
-    pub(crate) stream_id: u32,
-    pub(crate) topic_id: u32,
+    pub(crate) stream_id: Identifier,
+    pub(crate) topic_id: Identifier,
     pub(crate) partition_id: u32,
 }
 
 impl IggyNamespace {
-    pub fn new(stream_id: u32, topic_id: u32, partition_id: u32) -> Self {
+    pub fn new(stream_id: Identifier, topic_id: Identifier, partition_id: u32) 
-> Self {
         Self {
             stream_id,
             topic_id,
@@ -33,6 +38,10 @@ impl IggyNamespace {
     }
 
     pub fn generate_hash(&self) -> u32 {
-        todo!();
+        let mut hasher = Murmur3Hasher::default();
+        hasher.write(&self.stream_id.value);
+        hasher.write(&self.topic_id.value);
+        hasher.write_u32(self.partition_id);
+        hasher.finish32()
     }
 }
diff --git a/core/server/src/streaming/systems/clients.rs 
b/core/server/src/shard/system/clients.rs
similarity index 99%
rename from core/server/src/streaming/systems/clients.rs
rename to core/server/src/shard/system/clients.rs
index 050a0b58..2cb0fbe8 100644
--- a/core/server/src/streaming/systems/clients.rs
+++ b/core/server/src/shard/system/clients.rs
@@ -29,7 +29,7 @@ use std::net::SocketAddr;
 use std::sync::Arc;
 use tracing::{error, info};
 
-impl System {
+impl IggyShard {
     pub async fn add_client(&self, address: &SocketAddr, transport: Transport) 
-> Arc<Session> {
         let mut client_manager = self.client_manager.write().await;
         let session = client_manager.add_client(address, transport);
diff --git a/core/server/src/streaming/systems/consumer_groups.rs 
b/core/server/src/shard/system/consumer_groups.rs
similarity index 99%
rename from core/server/src/streaming/systems/consumer_groups.rs
rename to core/server/src/shard/system/consumer_groups.rs
index 4303dc4d..ae883d9f 100644
--- a/core/server/src/streaming/systems/consumer_groups.rs
+++ b/core/server/src/shard/system/consumer_groups.rs
@@ -26,7 +26,7 @@ use iggy_common::IggyError;
 use iggy_common::locking::IggySharedMutFn;
 use tokio::sync::RwLock;
 
-impl System {
+impl IggyShard {
     pub fn get_consumer_group(
         &self,
         session: &Session,
diff --git a/core/server/src/streaming/systems/consumer_offsets.rs 
b/core/server/src/shard/system/consumer_offsets.rs
similarity index 99%
rename from core/server/src/streaming/systems/consumer_offsets.rs
rename to core/server/src/shard/system/consumer_offsets.rs
index fdac2b0c..e63815ff 100644
--- a/core/server/src/streaming/systems/consumer_offsets.rs
+++ b/core/server/src/shard/system/consumer_offsets.rs
@@ -22,7 +22,7 @@ use crate::streaming::systems::system::System;
 use error_set::ErrContext;
 use iggy_common::{Consumer, ConsumerOffsetInfo, Identifier, IggyError};
 
-impl System {
+impl IggyShard {
     pub async fn store_consumer_offset(
         &self,
         session: &Session,
diff --git a/core/server/src/streaming/systems/info.rs 
b/core/server/src/shard/system/info.rs
similarity index 100%
rename from core/server/src/streaming/systems/info.rs
rename to core/server/src/shard/system/info.rs
diff --git a/core/server/src/streaming/systems/messages.rs 
b/core/server/src/shard/system/messages.rs
similarity index 99%
rename from core/server/src/streaming/systems/messages.rs
rename to core/server/src/shard/system/messages.rs
index 53ed0432..da9b67c2 100644
--- a/core/server/src/streaming/systems/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -17,10 +17,9 @@
  */
 
 use crate::binary::handlers::messages::poll_messages_handler::IggyPollMetadata;
+use crate::shard::IggyShard;
 use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut, 
IggyMessagesBatchSet};
 use crate::streaming::session::Session;
-use crate::streaming::systems::COMPONENT;
-use crate::streaming::systems::system::System;
 use crate::streaming::utils::PooledBuffer;
 use error_set::ErrContext;
 use iggy_common::{
@@ -29,7 +28,7 @@ use iggy_common::{
 };
 use tracing::{error, trace};
 
-impl System {
+impl IggyShard {
     pub async fn poll_messages(
         &self,
         session: &Session,
diff --git a/core/server/src/streaming/systems/mod.rs 
b/core/server/src/shard/system/mod.rs
similarity index 95%
rename from core/server/src/streaming/systems/mod.rs
rename to core/server/src/shard/system/mod.rs
index 7039e919..1419abb9 100644
--- a/core/server/src/streaming/systems/mod.rs
+++ b/core/server/src/shard/system/mod.rs
@@ -32,4 +32,4 @@ pub mod system;
 pub mod topics;
 pub mod users;
 
-pub const COMPONENT: &str = "STREAMING_SYSTEMS";
+pub const COMPONENT: &str = "SYSTEM";
diff --git a/core/server/src/streaming/systems/partitions.rs 
b/core/server/src/shard/system/partitions.rs
similarity index 99%
rename from core/server/src/streaming/systems/partitions.rs
rename to core/server/src/shard/system/partitions.rs
index 5bb72875..7712f498 100644
--- a/core/server/src/streaming/systems/partitions.rs
+++ b/core/server/src/shard/system/partitions.rs
@@ -23,7 +23,7 @@ use error_set::ErrContext;
 use iggy_common::Identifier;
 use iggy_common::IggyError;
 
-impl System {
+impl IggyShard {
     pub async fn create_partitions(
         &mut self,
         session: &Session,
diff --git a/core/server/src/streaming/systems/personal_access_tokens.rs 
b/core/server/src/shard/system/personal_access_tokens.rs
similarity index 99%
rename from core/server/src/streaming/systems/personal_access_tokens.rs
rename to core/server/src/shard/system/personal_access_tokens.rs
index 945c853b..18125bed 100644
--- a/core/server/src/streaming/systems/personal_access_tokens.rs
+++ b/core/server/src/shard/system/personal_access_tokens.rs
@@ -27,7 +27,7 @@ use iggy_common::IggyExpiry;
 use iggy_common::IggyTimestamp;
 use tracing::{error, info};
 
-impl System {
+impl IggyShard {
     pub async fn get_personal_access_tokens(
         &self,
         session: &Session,
diff --git a/core/server/src/streaming/systems/segments.rs 
b/core/server/src/shard/system/segments.rs
similarity index 99%
rename from core/server/src/streaming/systems/segments.rs
rename to core/server/src/shard/system/segments.rs
index a15952c1..bd4043a2 100644
--- a/core/server/src/streaming/systems/segments.rs
+++ b/core/server/src/shard/system/segments.rs
@@ -23,7 +23,7 @@ use iggy_common::Identifier;
 use iggy_common::IggyError;
 use iggy_common::locking::IggySharedMutFn;
 
-impl System {
+impl IggyShard {
     pub async fn delete_segments(
         &mut self,
         session: &Session,
diff --git a/core/server/src/streaming/systems/snapshot/mod.rs 
b/core/server/src/shard/system/snapshot/mod.rs
similarity index 100%
rename from core/server/src/streaming/systems/snapshot/mod.rs
rename to core/server/src/shard/system/snapshot/mod.rs
diff --git a/core/server/src/streaming/systems/snapshot/procdump.rs 
b/core/server/src/shard/system/snapshot/procdump.rs
similarity index 100%
rename from core/server/src/streaming/systems/snapshot/procdump.rs
rename to core/server/src/shard/system/snapshot/procdump.rs
diff --git a/core/server/src/streaming/systems/stats.rs 
b/core/server/src/shard/system/stats.rs
similarity index 99%
rename from core/server/src/streaming/systems/stats.rs
rename to core/server/src/shard/system/stats.rs
index 80c588e6..2d2d2ea7 100644
--- a/core/server/src/streaming/systems/stats.rs
+++ b/core/server/src/shard/system/stats.rs
@@ -34,7 +34,7 @@ fn sysinfo() -> &'static Mutex<SysinfoSystem> {
     })
 }
 
-impl System {
+impl IggyShard {
     pub async fn get_stats(&self) -> Result<Stats, IggyError> {
         let mut sys = sysinfo().lock().await;
         let process_id = std::process::id();
diff --git a/core/server/src/streaming/systems/storage.rs 
b/core/server/src/shard/system/storage.rs
similarity index 97%
rename from core/server/src/streaming/systems/storage.rs
rename to core/server/src/shard/system/storage.rs
index d4363631..9a493751 100644
--- a/core/server/src/streaming/systems/storage.rs
+++ b/core/server/src/shard/system/storage.rs
@@ -18,8 +18,6 @@
 
 use crate::streaming::persistence::persister::PersisterKind;
 use crate::streaming::storage::SystemInfoStorage;
-use crate::streaming::systems::COMPONENT;
-use crate::streaming::systems::info::SystemInfo;
 use crate::streaming::utils::PooledBuffer;
 use crate::streaming::utils::file;
 use anyhow::Context;
diff --git a/core/server/src/streaming/systems/streams.rs 
b/core/server/src/shard/system/streams.rs
similarity index 70%
rename from core/server/src/streaming/systems/streams.rs
rename to core/server/src/shard/system/streams.rs
index 18c6aa83..efcfff2b 100644
--- a/core/server/src/streaming/systems/streams.rs
+++ b/core/server/src/shard/system/streams.rs
@@ -16,156 +16,18 @@
  * under the License.
  */
 
-use crate::state::system::StreamState;
+use crate::shard::IggyShard;
 use crate::streaming::session::Session;
 use crate::streaming::streams::stream::Stream;
-use crate::streaming::systems::COMPONENT;
-use crate::streaming::systems::system::System;
-use ahash::{AHashMap, AHashSet};
 use error_set::ErrContext;
 use futures::future::try_join_all;
-use iggy_common::locking::IggySharedMutFn;
-use iggy_common::{IdKind, Identifier, IggyError};
 use std::cell::RefCell;
 use std::sync::atomic::{AtomicU32, Ordering};
 use tokio::fs;
-use tokio::fs::read_dir;
 use tracing::{error, info, warn};
 
-static CURRENT_STREAM_ID: AtomicU32 = AtomicU32::new(1);
-
-impl System {
-    pub(crate) async fn load_streams(
-        &mut self,
-        streams: Vec<StreamState>,
-    ) -> Result<(), IggyError> {
-        info!("Loading streams from disk...");
-        let mut unloaded_streams = Vec::new();
-        let mut dir_entries = read_dir(&self.config.get_streams_path())
-            .await
-            .map_err(|error| {
-                error!("Cannot read streams directory: {error}");
-                IggyError::CannotReadStreams
-            })?;
-
-        while let Some(dir_entry) = 
dir_entries.next_entry().await.unwrap_or(None) {
-            let name = dir_entry.file_name().into_string().unwrap();
-            let stream_id = name.parse::<u32>().map_err(|_| {
-                error!("Invalid stream ID file with name: '{name}'.");
-                IggyError::InvalidNumberValue
-            })?;
-            let stream_state = streams.iter().find(|s| s.id == stream_id);
-            if stream_state.is_none() {
-                error!(
-                    "Stream with ID: '{stream_id}' was not found in state, but 
exists on disk and will be removed."
-                );
-                if let Err(error) = 
fs::remove_dir_all(&dir_entry.path()).await {
-                    error!("Cannot remove stream directory: {error}");
-                } else {
-                    warn!("Stream with ID: '{stream_id}' was removed.");
-                }
-                continue;
-            }
-
-            let stream_state = stream_state.unwrap();
-            let mut stream = Stream::empty(
-                stream_id,
-                &stream_state.name,
-                self.config.clone(),
-                self.storage.clone(),
-            );
-            stream.created_at = stream_state.created_at;
-            unloaded_streams.push(stream);
-        }
-
-        let state_stream_ids = streams
-            .iter()
-            .map(|stream| stream.id)
-            .collect::<AHashSet<u32>>();
-        let unloaded_stream_ids = unloaded_streams
-            .iter()
-            .map(|stream| stream.stream_id)
-            .collect::<AHashSet<u32>>();
-        let mut missing_ids = state_stream_ids
-            .difference(&unloaded_stream_ids)
-            .copied()
-            .collect::<AHashSet<u32>>();
-        if missing_ids.is_empty() {
-            info!("All streams found on disk were found in state.");
-        } else {
-            warn!("Streams with IDs: '{missing_ids:?}' were not found on 
disk.");
-            if self.config.recovery.recreate_missing_state {
-                info!(
-                    "Recreating missing state in recovery config is enabled, 
missing streams will be created."
-                );
-                for stream_id in missing_ids.iter() {
-                    let stream_id = *stream_id;
-                    let stream_state = streams.iter().find(|s| s.id == 
stream_id).unwrap();
-                    let stream = Stream::create(
-                        stream_id,
-                        &stream_state.name,
-                        self.config.clone(),
-                        self.storage.clone(),
-                    );
-                    stream.persist().await?;
-                    unloaded_streams.push(stream);
-                    info!(
-                        "Missing stream with ID: '{stream_id}', name: {} was 
recreated.",
-                        stream_state.name
-                    );
-                }
-                missing_ids.clear();
-            } else {
-                warn!(
-                    "Recreating missing state in recovery config is disabled, 
missing streams will not be created."
-                );
-            }
-        }
-
-        let mut streams_states = streams
-            .into_iter()
-            .filter(|s| !missing_ids.contains(&s.id))
-            .map(|s| (s.id, s))
-            .collect::<AHashMap<_, _>>();
-        let loaded_streams = RefCell::new(Vec::new());
-        let load_stream_tasks = unloaded_streams.into_iter().map(|mut stream| {
-            let state = streams_states.remove(&stream.stream_id).unwrap();
-
-            async {
-                stream.load(state).await?;
-                loaded_streams.borrow_mut().push(stream);
-                Result::<(), IggyError>::Ok(())
-            }
-        });
-        try_join_all(load_stream_tasks).await?;
-
-        for stream in loaded_streams.take() {
-            if self.streams.contains_key(&stream.stream_id) {
-                error!("Stream with ID: '{}' already exists.", 
&stream.stream_id);
-                continue;
-            }
-
-            if self.streams_ids.contains_key(&stream.name) {
-                error!("Stream with name: '{}' already exists.", &stream.name);
-                continue;
-            }
-
-            self.metrics.increment_streams(1);
-            self.metrics.increment_topics(stream.get_topics_count());
-            self.metrics
-                .increment_partitions(stream.get_partitions_count());
-            self.metrics.increment_segments(stream.get_segments_count());
-            self.metrics.increment_messages(stream.get_messages_count());
-
-            self.streams_ids
-                .insert(stream.name.clone(), stream.stream_id);
-            self.streams.insert(stream.stream_id, stream);
-        }
-
-        info!("Loaded {} stream(s) from disk.", self.streams.len());
-        Ok(())
-    }
 
+impl IggyShard {
     pub fn get_streams(&self) -> Vec<&Stream> {
         self.streams.values().collect()
     }
diff --git a/core/server/src/streaming/systems/system.rs 
b/core/server/src/shard/system/system.rs
similarity index 100%
rename from core/server/src/streaming/systems/system.rs
rename to core/server/src/shard/system/system.rs
diff --git a/core/server/src/streaming/systems/topics.rs 
b/core/server/src/shard/system/topics.rs
similarity index 100%
rename from core/server/src/streaming/systems/topics.rs
rename to core/server/src/shard/system/topics.rs
diff --git a/core/server/src/streaming/systems/users.rs 
b/core/server/src/shard/system/users.rs
similarity index 79%
rename from core/server/src/streaming/systems/users.rs
rename to core/server/src/shard/system/users.rs
index cc27fcae..f2405f1f 100644
--- a/core/server/src/streaming/systems/users.rs
+++ b/core/server/src/shard/system/users.rs
@@ -16,13 +16,12 @@
  * under the License.
  */
 
+use crate::shard::IggyShard;
 use crate::state::command::EntryCommand;
 use crate::state::models::CreateUserWithId;
 use crate::state::system::UserState;
 use 
crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken;
 use crate::streaming::session::Session;
-use crate::streaming::systems::COMPONENT;
-use crate::streaming::systems::system::System;
 use crate::streaming::users::user::User;
 use crate::streaming::utils::crypto;
 use crate::{IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV};
@@ -42,108 +41,7 @@ use tracing::{error, info, warn};
 static USER_ID: AtomicU32 = AtomicU32::new(1);
 const MAX_USERS: usize = u32::MAX as usize;
 
-impl System {
-    pub(crate) async fn load_users(&mut self, users: Vec<UserState>) -> 
Result<(), IggyError> {
-        info!("Loading users...");
-        if users.is_empty() {
-            info!("No users found, creating the root user...");
-            let root = Self::create_root_user();
-            let command = CreateUser {
-                username: root.username.clone(),
-                password: root.password.clone(),
-                status: root.status,
-                permissions: root.permissions.clone(),
-            };
-            self.state
-                .apply(0, &EntryCommand::CreateUser(CreateUserWithId {
-                    user_id: root.id,
-                    command
-                }))
-                .await
-                .with_error_context(|error| {
-                    format!(
-                        "{COMPONENT} (error: {error}) - failed to apply create 
user command, username: {}",
-                        root.username
-                    )
-                })?;
-
-            self.users.insert(root.id, root);
-            info!("Created the root user.");
-        }
-
-        for user_state in users.into_iter() {
-            let mut user = User::with_password(
-                user_state.id,
-                &user_state.username,
-                user_state.password_hash,
-                user_state.status,
-                user_state.permissions,
-            );
-
-            user.created_at = user_state.created_at;
-            user.personal_access_tokens = user_state
-                .personal_access_tokens
-                .into_values()
-                .map(|token| {
-                    (
-                        Arc::new(token.token_hash.clone()),
-                        PersonalAccessToken::raw(
-                            user_state.id,
-                            &token.name,
-                            &token.token_hash,
-                            token.expiry_at,
-                        ),
-                    )
-                })
-                .collect();
-            self.users.insert(user_state.id, user);
-        }
-
-        let users_count = self.users.len();
-        let current_user_id = self.users.keys().max().unwrap_or(&1);
-        USER_ID.store(current_user_id + 1, Ordering::SeqCst);
-        self.permissioner
-            .init(&self.users.values().collect::<Vec<&User>>());
-        self.metrics.increment_users(users_count as u32);
-        info!("Initialized {users_count} user(s).");
-        Ok(())
-    }
-
-    fn create_root_user() -> User {
-        let username = env::var(IGGY_ROOT_USERNAME_ENV);
-        let password = env::var(IGGY_ROOT_PASSWORD_ENV);
-        if (username.is_ok() && password.is_err()) || (username.is_err() && 
password.is_ok()) {
-            panic!(
-                "When providing the custom root user credentials, both 
username and password must be set."
-            );
-        }
-        if username.is_ok() && password.is_ok() {
-            info!("Using the custom root user credentials.");
-        } else {
-            info!("Using the default root user credentials.");
-        }
-
-        let username = username.unwrap_or(DEFAULT_ROOT_USERNAME.to_string());
-        let password = password.unwrap_or(DEFAULT_ROOT_PASSWORD.to_string());
-        if username.is_empty() || password.is_empty() {
-            panic!("Root user credentials are not set.");
-        }
-        if username.len() < MIN_USERNAME_LENGTH {
-            panic!("Root username is too short.");
-        }
-        if username.len() > MAX_USERNAME_LENGTH {
-            panic!("Root username is too long.");
-        }
-        if password.len() < MIN_PASSWORD_LENGTH {
-            panic!("Root password is too short.");
-        }
-        if password.len() > MAX_PASSWORD_LENGTH {
-            panic!("Root password is too long.");
-        }
-
-        User::root(&username, &password)
-    }
-
+impl IggyShard {
     pub fn find_user(
         &self,
         session: &Session,
diff --git a/core/server/src/streaming/mod.rs b/core/server/src/streaming/mod.rs
index 5009cb9a..f9d40af8 100644
--- a/core/server/src/streaming/mod.rs
+++ b/core/server/src/streaming/mod.rs
@@ -28,7 +28,6 @@ pub mod segments;
 pub mod session;
 pub mod storage;
 pub mod streams;
-pub mod systems;
 pub mod topics;
 pub mod users;
 pub mod utils;
diff --git a/core/server/src/streaming/storage.rs 
b/core/server/src/streaming/storage.rs
index fc0a752d..f423b842 100644
--- a/core/server/src/streaming/storage.rs
+++ b/core/server/src/streaming/storage.rs
@@ -33,6 +33,7 @@ use iggy_common::IggyError;
 use mockall::automock;
 use std::fmt::Debug;
 use std::future::Future;
+use std::rc::Rc;
 use std::sync::Arc;
 
 macro_rules! forward_async_methods {
@@ -143,7 +144,7 @@ pub trait PartitionStorage: Send {
 
 #[derive(Debug)]
 pub struct SystemStorage {
-    pub info: Arc<SystemInfoStorageKind>,
+    pub info: Rc<SystemInfoStorageKind>,
     pub stream: Arc<StreamStorageKind>,
     pub topic: Arc<TopicStorageKind>,
     pub partition: Arc<PartitionStorageKind>,
@@ -151,7 +152,7 @@ pub struct SystemStorage {
 }
 
 impl SystemStorage {
-    pub fn new(config: Arc<SystemConfig>, persister: Arc<PersisterKind>) -> 
Self {
+    pub fn new(config: Rc<SystemConfig>, persister: Arc<PersisterKind>) -> 
Self {
         Self {
             info: 
Arc::new(SystemInfoStorageKind::File(FileSystemInfoStorage::new(
                 config.get_state_info_path(),
diff --git a/core/server/src/streaming/streams/stream.rs 
b/core/server/src/streaming/streams/stream.rs
index ef528197..339fc08a 100644
--- a/core/server/src/streaming/streams/stream.rs
+++ b/core/server/src/streaming/streams/stream.rs
@@ -23,6 +23,7 @@ use ahash::AHashMap;
 use iggy_common::IggyByteSize;
 use iggy_common::IggyTimestamp;
 use std::fmt::Display;
+use std::rc::Rc;
 use std::sync::Arc;
 use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
 
@@ -47,8 +48,8 @@ impl Stream {
     pub fn empty(
         id: u32,
         name: &str,
-        config: Arc<SystemConfig>,
-        storage: Arc<SystemStorage>,
+        config: Rc<SystemConfig>,
+        storage: Rc<SystemStorage>,
     ) -> Self {
         Stream::create(id, name, config, storage)
     }
@@ -56,8 +57,8 @@ impl Stream {
     pub fn create(
         id: u32,
         name: &str,
-        config: Arc<SystemConfig>,
-        storage: Arc<SystemStorage>,
+        config: Rc<SystemConfig>,
+        storage: Rc<SystemStorage>,
     ) -> Self {
         let path = config.get_stream_path(id);
         let topics_path = config.get_topics_path(id);
diff --git a/core/server/src/tcp/sender.rs b/core/server/src/tcp/sender.rs
index 2b6b6d17..4dd11725 100644
--- a/core/server/src/tcp/sender.rs
+++ b/core/server/src/tcp/sender.rs
@@ -17,15 +17,15 @@
  */
 
 use iggy_common::IggyError;
+use monoio::io::{AsyncReadRent, AsyncReadRentExt, AsyncWriteRent, 
AsyncWriteRentExt};
 use std::io::IoSlice;
-use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
 use tracing::debug;
 
 const STATUS_OK: &[u8] = &[0; 4];
 
 pub(crate) async fn read<T>(stream: &mut T, buffer: &mut [u8]) -> 
Result<usize, IggyError>
 where
-    T: AsyncRead + AsyncWrite + Unpin,
+    T: AsyncReadRent + AsyncWriteRent + Unpin,
 {
     match stream.read_exact(buffer).await {
         Ok(0) => Err(IggyError::ConnectionClosed),
@@ -42,14 +42,14 @@ where
 
 pub(crate) async fn send_empty_ok_response<T>(stream: &mut T) -> Result<(), 
IggyError>
 where
-    T: AsyncRead + AsyncWrite + Unpin,
+    T: AsyncReadRent + AsyncWriteRent + Unpin,
 {
     send_ok_response(stream, &[]).await
 }
 
 pub(crate) async fn send_ok_response<T>(stream: &mut T, payload: &[u8]) -> 
Result<(), IggyError>
 where
-    T: AsyncRead + AsyncWrite + Unpin,
+    T: AsyncReadRent + AsyncWriteRent + Unpin,
 {
     send_response(stream, STATUS_OK, payload).await
 }
@@ -60,7 +60,7 @@ pub(crate) async fn send_ok_response_vectored<T>(
     slices: Vec<IoSlice<'_>>,
 ) -> Result<(), IggyError>
 where
-    T: AsyncRead + AsyncWrite + Unpin,
+    T: AsyncReadRentExt + AsyncWriteRentExt + Unpin,
 {
     send_response_vectored(stream, STATUS_OK, length, slices).await
 }
@@ -70,7 +70,7 @@ pub(crate) async fn send_error_response<T>(
     error: IggyError,
 ) -> Result<(), IggyError>
 where
-    T: AsyncRead + AsyncWrite + Unpin,
+    T: AsyncReadRent + AsyncWriteRent + Unpin,
 {
     send_response(stream, &error.as_code().to_le_bytes(), &[]).await
 }
@@ -81,7 +81,7 @@ pub(crate) async fn send_response<T>(
     payload: &[u8],
 ) -> Result<(), IggyError>
 where
-    T: AsyncRead + AsyncWrite + Unpin,
+    T: AsyncReadRent + AsyncWriteRent + Unpin,
 {
     debug!(
         "Sending response of len: {} with status: {:?}...",
@@ -104,7 +104,7 @@ pub(crate) async fn send_response_vectored<T>(
     mut slices: Vec<IoSlice<'_>>,
 ) -> Result<(), IggyError>
 where
-    T: AsyncReadExt + AsyncWriteExt + Unpin,
+    T: AsyncReadRentExt + AsyncWriteRentExt + Unpin,
 {
     debug!(
         "Sending vectored response of len: {} with status: {:?}...",
diff --git a/core/server/src/tcp/tcp_listener.rs 
b/core/server/src/tcp/tcp_listener.rs
index 0d84aa10..2e4f25be 100644
--- a/core/server/src/tcp/tcp_listener.rs
+++ b/core/server/src/tcp/tcp_listener.rs
@@ -17,60 +17,44 @@
  */
 
 use crate::binary::sender::SenderKind;
+use crate::shard::IggyShard;
 use crate::streaming::clients::client_manager::Transport;
-use crate::streaming::systems::system::SharedSystem;
 use crate::tcp::connection_handler::{handle_connection, handle_error};
 use std::net::SocketAddr;
+use std::rc::Rc;
+use monoio::net::TcpListener;
+use rustls::pki_types::Ipv4Addr;
 use tokio::net::TcpSocket;
 use tokio::sync::oneshot;
 use tracing::{error, info};
 
-pub async fn start(address: &str, socket: TcpSocket, system: SharedSystem) -> 
SocketAddr {
-    let address = address.to_string();
-    let (tx, rx) = oneshot::channel();
-    tokio::spawn(async move {
-        let addr = address.parse();
-        if addr.is_err() {
-            panic!("Unable to parse address {:?}", address);
-        }
-
-        socket
-            .bind(addr.unwrap())
-            .expect("Unable to bind socket to address");
-
-        let listener = socket.listen(1024).expect("Unable to start TCP 
server.");
-
-        let local_addr = listener
-            .local_addr()
-            .expect("Failed to get local address for TCP listener");
-
-        tx.send(local_addr).unwrap_or_else(|_| {
-            panic!(
-                "Failed to send the local address {:?} for TCP listener",
-                local_addr
-            )
-        });
-
+pub async fn start(server_name: &str, shard: Rc<IggyShard>) {
+    let addr: SocketAddr = if shard.config.tcp.ipv6 {
+        shard.config.tcp.address.parse().expect("Unable to parse IPv6 address")
+    } else {
+        shard.config.tcp.address.parse().expect("Unable to parse IPv4 address")
+    };
+    monoio::spawn(async move {
+        let listener = TcpListener::bind(addr).expect(format!("Unable to start 
{server_name}.").as_ref());
         loop {
             match listener.accept().await {
                 Ok((stream, address)) => {
+                    let shard = shard.clone();
                     info!("Accepted new TCP connection: {address}");
-                    let session = system
-                        .read()
-                        .await
-                        .add_client(&address, Transport::Tcp)
-                        .await;
+                    let session = shard
+                        .add_client(&address, Transport::Tcp);
 
                     let client_id = session.client_id;
                     info!("Created new session: {session}");
-                    let system = system.clone();
                     let mut sender = SenderKind::get_tcp_sender(stream);
-                    tokio::spawn(async move {
+                    monoio::spawn(async move {
                         if let Err(error) =
-                            handle_connection(session, &mut sender, 
system.clone()).await
+                            handle_connection(session, &mut sender, 
shard.clone()).await
                         {
                             handle_error(error);
-                            system.read().await.delete_client(client_id).await;
+                            //TODO: Fixme
+                            /*
+                            
//system.read().await.delete_client(client_id).await;
                             if let Err(error) = sender.shutdown().await {
                                 error!(
                                     "Failed to shutdown TCP stream for client: 
{client_id}, address: {address}. {error}"
@@ -80,6 +64,7 @@ pub async fn start(address: &str, socket: TcpSocket, system: 
SharedSystem) -> So
                                     "Successfully closed TCP stream for 
client: {client_id}, address: {address}."
                                 );
                             }
+                            */
                         }
                     });
                 }
@@ -87,8 +72,4 @@ pub async fn start(address: &str, socket: TcpSocket, system: 
SharedSystem) -> So
             }
         }
     });
-    match rx.await {
-        Ok(addr) => addr,
-        Err(_) => panic!("Failed to get the local address for TCP listener."),
-    }
 }
diff --git a/core/server/src/tcp/tcp_sender.rs 
b/core/server/src/tcp/tcp_sender.rs
index 837c8f06..a157dead 100644
--- a/core/server/src/tcp/tcp_sender.rs
+++ b/core/server/src/tcp/tcp_sender.rs
@@ -21,7 +21,8 @@ use crate::tcp::COMPONENT;
 use crate::{server_error::ServerError, tcp::sender};
 use error_set::ErrContext;
 use iggy_common::IggyError;
-use tokio::{io::AsyncWriteExt, net::TcpStream};
+use tokio::{io::AsyncWriteExt};
+use monoio::net::TcpStream;
 
 #[derive(Debug)]
 pub struct TcpSender {
diff --git a/core/server/src/tcp/tcp_server.rs 
b/core/server/src/tcp/tcp_server.rs
index a57b7c3f..82bbe0c1 100644
--- a/core/server/src/tcp/tcp_server.rs
+++ b/core/server/src/tcp/tcp_server.rs
@@ -16,26 +16,25 @@
  * under the License.
  */
 
-use crate::configs::tcp::TcpConfig;
-use crate::streaming::systems::system::SharedSystem;
-use crate::tcp::{tcp_listener, tcp_socket, tcp_tls_listener};
-use std::net::SocketAddr;
+use crate::shard::IggyShard;
+use crate::tcp::{tcp_listener};
+use std::rc::Rc;
+use iggy_common::IggyError;
 use tracing::info;
 
 /// Starts the TCP server.
 /// Returns the address the server is listening on.
-pub async fn start(config: TcpConfig, system: SharedSystem) -> SocketAddr {
-    let server_name = if config.tls.enabled {
+pub async fn start(shard: Rc<IggyShard>) -> Result<(), IggyError> {
+    let server_name = if shard.config.tcp.tls.enabled {
         "Iggy TCP TLS"
     } else {
         "Iggy TCP"
     };
     info!("Initializing {server_name} server...");
-    let socket = tcp_socket::build(config.ipv6, config.socket);
-    let addr = match config.tls.enabled {
-        true => tcp_tls_listener::start(&config.address, config.tls, socket, 
system).await,
-        false => tcp_listener::start(&config.address, socket, system).await,
+    let addr = match shard.config.tcp.tls.enabled {
+        true => unimplemented!("TLS support is not implemented yet"),
+        false => tcp_listener::start(server_name, shard).await,
     };
     info!("{server_name} server has started on: {:?}", addr);
-    addr
+    Ok(())
 }
diff --git a/core/server/src/tcp/tcp_socket.rs 
b/core/server/src/tcp/tcp_socket.rs
index 91ed105a..69c1d408 100644
--- a/core/server/src/tcp/tcp_socket.rs
+++ b/core/server/src/tcp/tcp_socket.rs
@@ -18,7 +18,6 @@
 
 use std::num::TryFromIntError;
 
-use tokio::net::TcpSocket;
 
 use crate::configs::tcp::TcpSocketConfig;
 
diff --git a/core/server/src/tcp/tcp_tls_sender.rs 
b/core/server/src/tcp/tcp_tls_sender.rs
index 6e3056a5..6ba62315 100644
--- a/core/server/src/tcp/tcp_tls_sender.rs
+++ b/core/server/src/tcp/tcp_tls_sender.rs
@@ -21,9 +21,8 @@ use crate::tcp::COMPONENT;
 use crate::{server_error::ServerError, tcp::sender};
 use error_set::ErrContext;
 use iggy_common::IggyError;
-use tokio::io::AsyncWriteExt;
-use tokio::net::TcpStream;
-use tokio_rustls::server::TlsStream;
+use monoio::net::TcpStream;
+use monoio_native_tls::TlsStream;
 
 #[derive(Debug)]
 pub struct TcpTlsSender {

Reply via email to