This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch io_uring_tpc in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 9c6b9ffcd44f58f69433b8227951bab5d3ca8ac5 Author: numinex <[email protected]> AuthorDate: Tue Jun 24 11:52:06 2025 +0200 temp --- Cargo.lock | 85 +++++++++-- core/server/Cargo.toml | 3 +- core/server/src/binary/sender.rs | 16 +-- core/server/src/configs/server.rs | 3 +- core/server/src/main.rs | 8 +- core/server/src/quic/listener.rs | 3 + core/server/src/shard/builder.rs | 16 ++- core/server/src/shard/gate.rs | 2 +- core/server/src/shard/mod.rs | 158 ++++++++++++++------- core/server/src/shard/namespace.rs | 17 ++- .../{streaming/systems => shard/system}/clients.rs | 2 +- .../systems => shard/system}/consumer_groups.rs | 2 +- .../systems => shard/system}/consumer_offsets.rs | 2 +- .../{streaming/systems => shard/system}/info.rs | 0 .../systems => shard/system}/messages.rs | 5 +- .../src/{streaming/systems => shard/system}/mod.rs | 2 +- .../systems => shard/system}/partitions.rs | 2 +- .../system}/personal_access_tokens.rs | 2 +- .../systems => shard/system}/segments.rs | 2 +- .../systems => shard/system}/snapshot/mod.rs | 0 .../systems => shard/system}/snapshot/procdump.rs | 0 .../{streaming/systems => shard/system}/stats.rs | 2 +- .../{streaming/systems => shard/system}/storage.rs | 2 - .../{streaming/systems => shard/system}/streams.rs | 142 +----------------- .../{streaming/systems => shard/system}/system.rs | 0 .../{streaming/systems => shard/system}/topics.rs | 0 .../{streaming/systems => shard/system}/users.rs | 106 +------------- core/server/src/streaming/mod.rs | 1 - core/server/src/streaming/storage.rs | 5 +- core/server/src/streaming/streams/stream.rs | 9 +- core/server/src/tcp/sender.rs | 16 +-- core/server/src/tcp/tcp_listener.rs | 61 +++----- core/server/src/tcp/tcp_sender.rs | 3 +- core/server/src/tcp/tcp_server.rs | 21 ++- core/server/src/tcp/tcp_socket.rs | 1 - core/server/src/tcp/tcp_tls_sender.rs | 5 +- 36 files changed, 295 insertions(+), 409 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 38cb7c1c..79f73a27 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -651,7 +651,7 @@ checksum = "fd73835ad7deb4bd2b389e6f10333b143f025d607c55ca04c66a0bcc6bb2fc6d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.103", + "syn 2.0.104", ] [[package]] @@ -1677,6 +1677,16 @@ dependencies = [ "version_check", ] +[[package]] +name = "core-foundation" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation" version = "0.10.1" @@ -3310,6 +3320,12 @@ dependencies = [ "byteorder", ] +[[package]] +name = "hash32" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e4e11d13d42fc8d55326b323c63978c75721fbbb695a6e6686765bcb8b33917" + [[package]] name = "hashbrown" version = "0.12.3" @@ -3365,7 +3381,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdc6457c0eb62c71aac4bc17216026d8410337c4126773b9c5daba343f17964f" dependencies = [ "atomic-polyfill", - "hash32", + "hash32 0.2.1", "rustc_version", "serde", "spin", @@ -4786,6 +4802,15 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "monoio-io-wrapper" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bcfaa76e5daf87cc4d31b4d1b6bc93c12db59c19df50b9200afdbde42077655" +dependencies = [ + "monoio", +] + [[package]] name = "monoio-macros" version = "0.1.0" @@ -4794,7 +4819,20 @@ checksum = "176a5f5e69613d9e88337cf2a65e11135332b4efbcc628404a7c555e4452084c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.103", + "syn 2.0.104", +] + +[[package]] +name = "monoio-native-tls" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9022f5aaa19f9688f97bfcfa0c4a4318d424851995badb356674ca742652cdb" +dependencies = [ + "bytes", + "monoio", + "monoio-io-wrapper", + "native-tls", + "thiserror 1.0.69", ] [[package]] @@ -4806,6 +4844,23 @@ dependencies = [ "getrandom 0.2.16", ] +[[package]] +name = "native-tls" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87de3442987e9dbec73158d5c715e7ad9072fda936bb03d19d7fa10e00520f0e" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework 2.11.1", + "security-framework-sys", + "tempfile", +] + [[package]] name = "never-say-never" version = "6.6.666" @@ -6439,7 +6494,7 @@ dependencies = [ "openssl-probe", "rustls-pki-types", "schannel", - "security-framework", + "security-framework 3.2.0", ] [[package]] @@ -6467,7 +6522,7 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19787cda76408ec5404443dc8b31795c87cd8fec49762dc75fa727740d34acc1" dependencies = [ - "core-foundation", + "core-foundation 0.10.1", "core-foundation-sys", "jni", "log", @@ -6476,7 +6531,7 @@ dependencies = [ "rustls-native-certs", "rustls-platform-verifier-android", "rustls-webpki", - "security-framework", + "security-framework 3.2.0", "security-framework-sys", "webpki-root-certs 0.26.11", "windows-sys 0.59.0", @@ -6624,6 +6679,19 @@ dependencies = [ "zeroize", ] +[[package]] +name = "security-framework" +version = "2.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" +dependencies = [ + "bitflags 2.9.1", + "core-foundation 0.9.4", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + [[package]] name = "security-framework" version = "3.2.0" @@ -6631,7 +6699,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "271720403f46ca04f7ba6f55d438f8bd878d6b8ca0a1046e8228c4145bcbb316" dependencies = [ "bitflags 2.9.1", - "core-foundation", + "core-foundation 0.10.1", "core-foundation-sys", "libc", "security-framework-sys", @@ -6839,15 +6907,16 @@ dependencies = [ "figment", "flume", "futures", + "hash32 1.0.0", "human-repr", "iggy_common", - "io-uring", "jsonwebtoken", "lending-iterator", "mimalloc", "mockall", "moka", "monoio", + "monoio-native-tls", "nix 0.30.1", "once_cell", "opentelemetry", diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index 33bd7db7..adbeebe3 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -60,12 +60,13 @@ async-channel = { workspace = true } futures = { workspace = true } human-repr = { workspace = true } iggy_common = { workspace = true } -io-uring = "0.6" jsonwebtoken = "9.3.1" lending-iterator = "0.1.7" +hash32 = "1.0.0" mimalloc = { workspace = true, optional = true } moka = { version = "0.12.10", features = ["future"] } monoio = { version = "0.2.4", features = ["mkdirat", "unlinkat"] } +monoio-native-tls = "0.4.0" nix = { version = "0.30", features = ["fs"] } once_cell = "1.21.3" opentelemetry = { version = "0.30.0", features = ["trace", "logs"] } diff --git a/core/server/src/binary/sender.rs b/core/server/src/binary/sender.rs index 7273543c..2ccf349b 100644 --- a/core/server/src/binary/sender.rs +++ b/core/server/src/binary/sender.rs @@ -24,8 +24,8 @@ use crate::tcp::tcp_tls_sender::TcpTlsSender; use crate::{quic::quic_sender::QuicSender, server_error::ServerError}; use iggy_common::IggyError; use quinn::{RecvStream, SendStream}; -use tokio::net::TcpStream; -use tokio_rustls::server::TlsStream; +use monoio::net::TcpStream; +use tokio_native_tls::TlsStream; macro_rules! forward_async_methods { ( @@ -48,22 +48,22 @@ macro_rules! forward_async_methods { } pub trait Sender { - fn read(&mut self, buffer: &mut [u8]) -> impl Future<Output = Result<usize, IggyError>> + Send; - fn send_empty_ok_response(&mut self) -> impl Future<Output = Result<(), IggyError>> + Send; + fn read(&mut self, buffer: &mut [u8]) -> impl Future<Output = Result<usize, IggyError>>; + fn send_empty_ok_response(&mut self) -> impl Future<Output = Result<(), IggyError>>; fn send_ok_response( &mut self, payload: &[u8], - ) -> impl Future<Output = Result<(), IggyError>> + Send; + ) -> impl Future<Output = Result<(), IggyError>>; fn send_ok_response_vectored( &mut self, length: &[u8], slices: Vec<IoSlice<'_>>, - ) -> impl Future<Output = Result<(), IggyError>> + Send; + ) -> impl Future<Output = Result<(), IggyError>>; fn send_error_response( &mut self, error: IggyError, - ) -> impl Future<Output = Result<(), IggyError>> + Send; - fn shutdown(&mut self) -> impl Future<Output = Result<(), ServerError>> + Send; + ) -> impl Future<Output = Result<(), IggyError>>; + fn shutdown(&mut self) -> impl Future<Output = Result<(), ServerError>>; } #[allow(clippy::large_enum_variant)] diff --git a/core/server/src/configs/server.rs b/core/server/src/configs/server.rs index 252df9a7..059fd1c4 100644 --- a/core/server/src/configs/server.rs +++ b/core/server/src/configs/server.rs @@ -31,6 +31,7 @@ use iggy_common::Validatable; use serde::{Deserialize, Serialize}; use serde_with::DisplayFromStr; use serde_with::serde_as; +use std::rc::Rc; use std::str::FromStr; use std::sync::Arc; @@ -40,7 +41,7 @@ pub struct ServerConfig { pub message_saver: MessageSaverConfig, pub personal_access_token: PersonalAccessTokenConfig, pub heartbeat: HeartbeatConfig, - pub system: Arc<SystemConfig>, + pub system: Rc<SystemConfig>, pub quic: QuicConfig, pub tcp: TcpConfig, pub http: HttpConfig, diff --git a/core/server/src/main.rs b/core/server/src/main.rs index fc2b21f4..06b5f445 100644 --- a/core/server/src/main.rs +++ b/core/server/src/main.rs @@ -235,7 +235,7 @@ fn main() -> Result<(), ServerError> { .await; let builder = IggyShard::builder(); - let mut shard = builder + let shard: Rc<IggyShard> = builder .id(id) .connections(connections) .config(config) @@ -243,13 +243,13 @@ fn main() -> Result<(), ServerError> { .version(version) .state(state) .build() - .await; + .into(); //TODO: If one of the shards fails to initialize, we should crash the whole program; - shard.init().await.expect("Failed to initialize shard-{id}: {e}"); - info!("Initiated shard with ID: {id}"); + shard.run().await.expect("Failed to run shard"); //TODO: If one of the shards fails to initialize, we should crash the whole program; shard.assert_init(); + let shard = Rc::new(shard); }) }) .expect(format!("Failed to spawn thread for shard-{id}").as_str()) diff --git a/core/server/src/quic/listener.rs b/core/server/src/quic/listener.rs index b5eb859d..e9b329de 100644 --- a/core/server/src/quic/listener.rs +++ b/core/server/src/quic/listener.rs @@ -34,6 +34,8 @@ pub fn start(endpoint: Endpoint, system: SharedSystem) { for _ in 0..LISTENERS_COUNT { let endpoint = endpoint.clone(); let system = system.clone(); + //TODO: Fixme + /* tokio::spawn(async move { while let Some(incoming_connection) = endpoint.accept().await { info!( @@ -57,6 +59,7 @@ pub fn start(endpoint: Endpoint, system: SharedSystem) { }); } }); + */ } } diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs index 0fd5250f..0428a483 100644 --- a/core/server/src/shard/builder.rs +++ b/core/server/src/shard/builder.rs @@ -26,8 +26,8 @@ use crate::{ configs::server::ServerConfig, map_toggle_str, shard::Shard, - state::{StateKind, file::FileState}, - streaming::storage::SystemStorage, + state::{file::FileState, StateKind}, + streaming::{diagnostics::metrics::Metrics, storage::SystemStorage}, versioning::SemanticVersion, }; @@ -75,11 +75,12 @@ impl IggyShardBuilder { } // TODO: Too much happens in there, some of those bootstrapping logic should be moved outside. - pub async fn build(self) -> IggyShard { + pub fn build(self) -> IggyShard { let id = self.id.unwrap(); let config = self.config.unwrap(); let connections = self.connections.unwrap(); let state = self.state.unwrap(); + let encryptor = self.encryptor; let version = self.version.unwrap(); let (stop_sender, stop_receiver, frame_receiver) = connections .iter() @@ -106,12 +107,21 @@ impl IggyShardBuilder { shards: shards, shards_table: Default::default(), storage: storage, + encryptor: encryptor, state: state, config: config, version: version, stop_receiver: stop_receiver, stop_sender: stop_sender, frame_receiver: Cell::new(Some(frame_receiver)), + metrics: Metrics::init(), + + users: Default::default(), + permissioner: Default::default(), + streams: Default::default(), + streams_ids: Default::default(), + client_manager: Default::default(), + active_sessions: Default::default(), } } } diff --git a/core/server/src/shard/gate.rs b/core/server/src/shard/gate.rs index a26950a4..2ac20445 100644 --- a/core/server/src/shard/gate.rs +++ b/core/server/src/shard/gate.rs @@ -1,4 +1,4 @@ -use std::sync::{Condvar, Mutex}; +use std::sync::Mutex; #[derive(Default)] pub struct Gate<T> { diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index 23ef1e15..3e7e10f1 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -17,39 +17,37 @@ */ pub mod builder; +pub mod system; pub mod gate; pub mod namespace; pub mod transmission; -use ahash::HashMap; +use ahash::{AHashMap, AHashSet, HashMap}; use builder::IggyShardBuilder; use error_set::ErrContext; -use iggy_common::IggyError; +use futures::future::try_join_all; +use iggy_common::{EncryptorKind, IggyError, UserId}; use namespace::IggyNamespace; use std::{ - cell::{Cell, RefCell}, - rc::Rc, - str::FromStr, - sync::Arc, - time::Instant, + cell::{Cell, RefCell}, pin::Pin, rc::Rc, str::FromStr, sync::{atomic::{AtomicU32, Ordering}, Arc, RwLock}, time::Instant }; -use tracing::info; +use tracing::{error, info, instrument, trace, warn}; use transmission::connector::{Receiver, ShardConnector, StopReceiver, StopSender}; use crate::{ - bootstrap::create_root_user, configs::server::ServerConfig, - shard::transmission::frame::ShardFrame, + shard::{system::info::SystemInfo, transmission::frame::ShardFrame}, state::{ - StateKind, - file::FileState, - system::{SystemState, UserState}, + file::FileState, system::{StreamState, SystemState, UserState}, StateKind }, - streaming::{storage::SystemStorage, systems::info::SystemInfo}, + streaming::{clients::client_manager::ClientManager, diagnostics::metrics::Metrics, personal_access_tokens::personal_access_token::PersonalAccessToken, session::Session, storage::SystemStorage, streams::stream::Stream, users::{permissioner::Permissioner, user::User}}, versioning::SemanticVersion, }; pub const COMPONENT: &str = "SHARD"; +static USER_ID: AtomicU32 = AtomicU32::new(1); + +type Task = Pin<Box<dyn Future<Output = Result<(), IggyError>>>>; pub(crate) struct Shard { id: u16, @@ -75,19 +73,21 @@ pub struct IggyShard { shards_table: RefCell<HashMap<IggyNamespace, ShardInfo>>, version: SemanticVersion, - //pub(crate) permissioner: RefCell<Permissioner>, - //pub(crate) streams: RwLock<HashMap<u32, Stream>>, - //pub(crate) streams_ids: RefCell<HashMap<String, u32>>, - //pub(crate) users: RefCell<HashMap<UserId, User>>, + pub(crate) streams: RefCell<HashMap<u32, Stream>>, + pub(crate) streams_ids: RefCell<HashMap<String, u32>>, // TODO: Refactor. pub(crate) storage: Rc<SystemStorage>, pub(crate) state: StateKind, - //pub(crate) encryptor: Option<Rc<dyn Encryptor>>, - config: ServerConfig, - //pub(crate) client_manager: RefCell<ClientManager>, - //pub(crate) active_sessions: RefCell<Vec<Session>>, - //pub(crate) metrics: Metrics, + pub(crate) encryptor: Option<EncryptorKind>, + pub(crate) config: ServerConfig, + //TODO: This could be shared. + pub(crate) client_manager: RefCell<ClientManager>, + pub(crate) active_sessions: RefCell<Vec<Session>>, + pub(crate) permissioner: RefCell<Permissioner>, + pub(crate) users: RefCell<HashMap<UserId, User>>, + + pub(crate) metrics: Metrics, pub frame_receiver: Cell<Option<Receiver<ShardFrame>>>, stop_receiver: StopReceiver, stop_sender: StopSender, @@ -98,16 +98,12 @@ impl IggyShard { Default::default() } - pub async fn init(&mut self) -> Result<(), IggyError> { + pub async fn init(&self) -> Result<(), IggyError> { let now = Instant::now(); - //TODO: Fix this either by moving it to main function, or by using `run_once` barrier. - //let state_entries = self.state.init().await?; - //let system_state = SystemState::init(state_entries).await?; - //let user = create_root_user(); + self.load_version().await?; let SystemState { users, streams } = self.load_state().await?; self.load_users(users.into_values().collect()).await; - // Add default root user. self.load_streams(streams.into_values().collect()).await; //TODO: Fix the archiver. @@ -123,6 +119,27 @@ impl IggyShard { Ok(()) } + pub async fn run(self: &Rc<Self>) -> Result<(), IggyError> { + // Workaround to ensure that the statistics are initialized before the server + // loads streams and starts accepting connections. This is necessary to + // have the correct statistics when the server starts. + //self.get_stats().await?; + self.init().await?; + self.assert_init(); + info!("Initiated shard with ID: {}", self.id); + // Create all tasks (tcp listener, http listener, command processor, in the future also the background jobs). + /* + let mut tasks: Vec<Task> = vec![Box::pin(spawn_shard_message_task(shard.clone()))]; + if self.config.tcp.enabled { + tasks.push(Box::pin(spawn_tcp_server(self.clone()))); + } + let result = try_join_all(tasks).await; + result?; + */ + + Ok(()) + } + async fn load_version(&self) -> Result<(), IggyError> { async fn update_system_info( storage: &Rc<SystemStorage>, @@ -181,10 +198,8 @@ impl IggyShard { Ok(system_state) } - async fn load_users(&mut self, users: Vec<UserState>) -> Result<(), IggyError> { + async fn load_users(&self, users: Vec<UserState>) -> Result<(), IggyError> { info!("Loading users..."); - /* - for user_state in users.into_iter() { let mut user = User::with_password( user_state.id, @@ -210,32 +225,34 @@ impl IggyShard { ) }) .collect(); - self.users.insert(user_state.id, user); + self.users.borrow_mut().insert(user_state.id, user); } - let users_count = self.users.len(); - let current_user_id = self.users.keys().max().unwrap_or(&1); + let users = self.users.borrow(); + let users_count = users.len(); + let current_user_id = users.keys().max().unwrap_or(&1); USER_ID.store(current_user_id + 1, Ordering::SeqCst); self.permissioner - .init(&self.users.values().collect::<Vec<&User>>()); + .borrow_mut() + .init(&users.values().collect::<Vec<_>>()); self.metrics.increment_users(users_count as u32); info!("Initialized {users_count} user(s)."); - */ Ok(()) } - async fn load_streams(&mut self, streams: Vec<StreamState>) -> Result<(), IggyError> { - todo!(); + async fn load_streams(&self, streams: Vec<StreamState>) -> Result<(), IggyError> { info!("Loading streams from disk..."); let mut unloaded_streams = Vec::new(); - let mut dir_entries = read_dir(&self.config.get_streams_path()) - .await + // Does mononio has api for that ? + let mut dir_entries = std::fs::read_dir(&self.config.system.get_streams_path()) .map_err(|error| { error!("Cannot read streams directory: {error}"); IggyError::CannotReadStreams })?; - while let Some(dir_entry) = dir_entries.next_entry().await.unwrap_or(None) { + //TODO: User the dir walk impl from main function, once implemented. + while let Some(dir_entry) = dir_entries.next() { + let dir_entry = dir_entry.unwrap(); let name = dir_entry.file_name().into_string().unwrap(); let stream_id = name.parse::<u32>().map_err(|_| { error!("Invalid stream ID file with name: '{name}'."); @@ -246,7 +263,7 @@ impl IggyShard { error!( "Stream with ID: '{stream_id}' was not found in state, but exists on disk and will be removed." ); - if let Err(error) = fs::remove_dir_all(&dir_entry.path()).await { + if let Err(error) = std::fs::remove_dir_all(&dir_entry.path()) { error!("Cannot remove stream directory: {error}"); } else { warn!("Stream with ID: '{stream_id}' was removed."); @@ -258,7 +275,7 @@ impl IggyShard { let mut stream = Stream::empty( stream_id, &stream_state.name, - self.config.clone(), + self.config.system.clone(), self.storage.clone(), ); stream.created_at = stream_state.created_at; @@ -281,7 +298,7 @@ impl IggyShard { info!("All streams found on disk were found in state."); } else { warn!("Streams with IDs: '{missing_ids:?}' were not found on disk."); - if self.config.recovery.recreate_missing_state { + if self.config.system.recovery.recreate_missing_state { info!( "Recreating missing state in recovery config is enabled, missing streams will be created." ); @@ -291,7 +308,7 @@ impl IggyShard { let stream = Stream::create( stream_id, &stream_state.name, - self.config.clone(), + self.config.system.clone(), self.storage.clone(), ); stream.persist().await?; @@ -327,12 +344,12 @@ impl IggyShard { try_join_all(load_stream_tasks).await?; for stream in loaded_streams.take() { - if self.streams.contains_key(&stream.stream_id) { + if self.streams.borrow().contains_key(&stream.stream_id) { error!("Stream with ID: '{}' already exists.", &stream.stream_id); continue; } - if self.streams_ids.contains_key(&stream.name) { + if self.streams_ids.borrow().contains_key(&stream.name) { error!("Stream with name: '{}' already exists.", &stream.name); continue; } @@ -345,13 +362,52 @@ impl IggyShard { self.metrics.increment_messages(stream.get_messages_count()); self.streams_ids + .borrow_mut() .insert(stream.name.clone(), stream.stream_id); - self.streams.insert(stream.stream_id, stream); + self.streams.borrow_mut().insert(stream.stream_id, stream); } - info!("Loaded {} stream(s) from disk.", self.streams.len()); + info!("Loaded {} stream(s) from disk.", self.streams.borrow().len()); Ok(()) } - pub fn assert_init(&self) {} + pub fn assert_init(&self) -> Result<(), IggyError> { Ok(())} + + #[instrument(skip_all, name = "trace_shutdown")] + pub async fn shutdown(&mut self) -> Result<(), IggyError> { + self.persist_messages().await?; + Ok(()) + } + + #[instrument(skip_all, name = "trace_persist_messages")] + pub async fn persist_messages(&self) -> Result<usize, IggyError> { + trace!("Saving buffered messages on disk..."); + let mut saved_messages_number = 0; + //TODO: Fixme + /* + for stream in self.streams.values() { + saved_messages_number += stream.persist_messages().await?; + } + */ + + Ok(saved_messages_number) + } + + pub fn get_available_shards_count(&self) -> u32 { + self.shards.len() as u32 + } + + pub fn ensure_authenticated(&self, client_id: u32) -> Result<u32, IggyError> { + let active_sessions = self.active_sessions.borrow(); + let session = active_sessions + .iter() + .find(|s| s.client_id == client_id) + .ok_or_else(|| IggyError::Unauthenticated)?; + if session.is_authenticated() { + Ok(session.get_user_id()) + } else { + error!("{COMPONENT} - unauthenticated access attempt, session: {session}"); + Err(IggyError::Unauthenticated) + } + } } diff --git a/core/server/src/shard/namespace.rs b/core/server/src/shard/namespace.rs index d9aedfb7..d9d68ec9 100644 --- a/core/server/src/shard/namespace.rs +++ b/core/server/src/shard/namespace.rs @@ -16,15 +16,20 @@ * under the License. */ +use std::hash::Hasher as _; +use iggy_common::Identifier; +use hash32::{Hasher, Murmur3Hasher}; + +//TODO: Will probably want to move it to separate crate so we can share it with sdk. #[derive(Debug, Clone, Eq, PartialEq, Hash)] pub struct IggyNamespace { - pub(crate) stream_id: u32, - pub(crate) topic_id: u32, + pub(crate) stream_id: Identifier, + pub(crate) topic_id: Identifier, pub(crate) partition_id: u32, } impl IggyNamespace { - pub fn new(stream_id: u32, topic_id: u32, partition_id: u32) -> Self { + pub fn new(stream_id: Identifier, topic_id: Identifier, partition_id: u32) -> Self { Self { stream_id, topic_id, @@ -33,6 +38,10 @@ impl IggyNamespace { } pub fn generate_hash(&self) -> u32 { - todo!(); + let mut hasher = Murmur3Hasher::default(); + hasher.write(&self.stream_id.value); + hasher.write(&self.topic_id.value); + hasher.write_u32(self.partition_id); + hasher.finish32() } } diff --git a/core/server/src/streaming/systems/clients.rs b/core/server/src/shard/system/clients.rs similarity index 99% rename from core/server/src/streaming/systems/clients.rs rename to core/server/src/shard/system/clients.rs index 050a0b58..2cb0fbe8 100644 --- a/core/server/src/streaming/systems/clients.rs +++ b/core/server/src/shard/system/clients.rs @@ -29,7 +29,7 @@ use std::net::SocketAddr; use std::sync::Arc; use tracing::{error, info}; -impl System { +impl IggyShard { pub async fn add_client(&self, address: &SocketAddr, transport: Transport) -> Arc<Session> { let mut client_manager = self.client_manager.write().await; let session = client_manager.add_client(address, transport); diff --git a/core/server/src/streaming/systems/consumer_groups.rs b/core/server/src/shard/system/consumer_groups.rs similarity index 99% rename from core/server/src/streaming/systems/consumer_groups.rs rename to core/server/src/shard/system/consumer_groups.rs index 4303dc4d..ae883d9f 100644 --- a/core/server/src/streaming/systems/consumer_groups.rs +++ b/core/server/src/shard/system/consumer_groups.rs @@ -26,7 +26,7 @@ use iggy_common::IggyError; use iggy_common::locking::IggySharedMutFn; use tokio::sync::RwLock; -impl System { +impl IggyShard { pub fn get_consumer_group( &self, session: &Session, diff --git a/core/server/src/streaming/systems/consumer_offsets.rs b/core/server/src/shard/system/consumer_offsets.rs similarity index 99% rename from core/server/src/streaming/systems/consumer_offsets.rs rename to core/server/src/shard/system/consumer_offsets.rs index fdac2b0c..e63815ff 100644 --- a/core/server/src/streaming/systems/consumer_offsets.rs +++ b/core/server/src/shard/system/consumer_offsets.rs @@ -22,7 +22,7 @@ use crate::streaming::systems::system::System; use error_set::ErrContext; use iggy_common::{Consumer, ConsumerOffsetInfo, Identifier, IggyError}; -impl System { +impl IggyShard { pub async fn store_consumer_offset( &self, session: &Session, diff --git a/core/server/src/streaming/systems/info.rs b/core/server/src/shard/system/info.rs similarity index 100% rename from core/server/src/streaming/systems/info.rs rename to core/server/src/shard/system/info.rs diff --git a/core/server/src/streaming/systems/messages.rs b/core/server/src/shard/system/messages.rs similarity index 99% rename from core/server/src/streaming/systems/messages.rs rename to core/server/src/shard/system/messages.rs index 53ed0432..da9b67c2 100644 --- a/core/server/src/streaming/systems/messages.rs +++ b/core/server/src/shard/system/messages.rs @@ -17,10 +17,9 @@ */ use crate::binary::handlers::messages::poll_messages_handler::IggyPollMetadata; +use crate::shard::IggyShard; use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet}; use crate::streaming::session::Session; -use crate::streaming::systems::COMPONENT; -use crate::streaming::systems::system::System; use crate::streaming::utils::PooledBuffer; use error_set::ErrContext; use iggy_common::{ @@ -29,7 +28,7 @@ use iggy_common::{ }; use tracing::{error, trace}; -impl System { +impl IggyShard { pub async fn poll_messages( &self, session: &Session, diff --git a/core/server/src/streaming/systems/mod.rs b/core/server/src/shard/system/mod.rs similarity index 95% rename from core/server/src/streaming/systems/mod.rs rename to core/server/src/shard/system/mod.rs index 7039e919..1419abb9 100644 --- a/core/server/src/streaming/systems/mod.rs +++ b/core/server/src/shard/system/mod.rs @@ -32,4 +32,4 @@ pub mod system; pub mod topics; pub mod users; -pub const COMPONENT: &str = "STREAMING_SYSTEMS"; +pub const COMPONENT: &str = "SYSTEM"; diff --git a/core/server/src/streaming/systems/partitions.rs b/core/server/src/shard/system/partitions.rs similarity index 99% rename from core/server/src/streaming/systems/partitions.rs rename to core/server/src/shard/system/partitions.rs index 5bb72875..7712f498 100644 --- a/core/server/src/streaming/systems/partitions.rs +++ b/core/server/src/shard/system/partitions.rs @@ -23,7 +23,7 @@ use error_set::ErrContext; use iggy_common::Identifier; use iggy_common::IggyError; -impl System { +impl IggyShard { pub async fn create_partitions( &mut self, session: &Session, diff --git a/core/server/src/streaming/systems/personal_access_tokens.rs b/core/server/src/shard/system/personal_access_tokens.rs similarity index 99% rename from core/server/src/streaming/systems/personal_access_tokens.rs rename to core/server/src/shard/system/personal_access_tokens.rs index 945c853b..18125bed 100644 --- a/core/server/src/streaming/systems/personal_access_tokens.rs +++ b/core/server/src/shard/system/personal_access_tokens.rs @@ -27,7 +27,7 @@ use iggy_common::IggyExpiry; use iggy_common::IggyTimestamp; use tracing::{error, info}; -impl System { +impl IggyShard { pub async fn get_personal_access_tokens( &self, session: &Session, diff --git a/core/server/src/streaming/systems/segments.rs b/core/server/src/shard/system/segments.rs similarity index 99% rename from core/server/src/streaming/systems/segments.rs rename to core/server/src/shard/system/segments.rs index a15952c1..bd4043a2 100644 --- a/core/server/src/streaming/systems/segments.rs +++ b/core/server/src/shard/system/segments.rs @@ -23,7 +23,7 @@ use iggy_common::Identifier; use iggy_common::IggyError; use iggy_common::locking::IggySharedMutFn; -impl System { +impl IggyShard { pub async fn delete_segments( &mut self, session: &Session, diff --git a/core/server/src/streaming/systems/snapshot/mod.rs b/core/server/src/shard/system/snapshot/mod.rs similarity index 100% rename from core/server/src/streaming/systems/snapshot/mod.rs rename to core/server/src/shard/system/snapshot/mod.rs diff --git a/core/server/src/streaming/systems/snapshot/procdump.rs b/core/server/src/shard/system/snapshot/procdump.rs similarity index 100% rename from core/server/src/streaming/systems/snapshot/procdump.rs rename to core/server/src/shard/system/snapshot/procdump.rs diff --git a/core/server/src/streaming/systems/stats.rs b/core/server/src/shard/system/stats.rs similarity index 99% rename from core/server/src/streaming/systems/stats.rs rename to core/server/src/shard/system/stats.rs index 80c588e6..2d2d2ea7 100644 --- a/core/server/src/streaming/systems/stats.rs +++ b/core/server/src/shard/system/stats.rs @@ -34,7 +34,7 @@ fn sysinfo() -> &'static Mutex<SysinfoSystem> { }) } -impl System { +impl IggyShard { pub async fn get_stats(&self) -> Result<Stats, IggyError> { let mut sys = sysinfo().lock().await; let process_id = std::process::id(); diff --git a/core/server/src/streaming/systems/storage.rs b/core/server/src/shard/system/storage.rs similarity index 97% rename from core/server/src/streaming/systems/storage.rs rename to core/server/src/shard/system/storage.rs index d4363631..9a493751 100644 --- a/core/server/src/streaming/systems/storage.rs +++ b/core/server/src/shard/system/storage.rs @@ -18,8 +18,6 @@ use crate::streaming::persistence::persister::PersisterKind; use crate::streaming::storage::SystemInfoStorage; -use crate::streaming::systems::COMPONENT; -use crate::streaming::systems::info::SystemInfo; use crate::streaming::utils::PooledBuffer; use crate::streaming::utils::file; use anyhow::Context; diff --git a/core/server/src/streaming/systems/streams.rs b/core/server/src/shard/system/streams.rs similarity index 70% rename from core/server/src/streaming/systems/streams.rs rename to core/server/src/shard/system/streams.rs index 18c6aa83..efcfff2b 100644 --- a/core/server/src/streaming/systems/streams.rs +++ b/core/server/src/shard/system/streams.rs @@ -16,156 +16,18 @@ * under the License. */ -use crate::state::system::StreamState; +use crate::shard::IggyShard; use crate::streaming::session::Session; use crate::streaming::streams::stream::Stream; -use crate::streaming::systems::COMPONENT; -use crate::streaming::systems::system::System; -use ahash::{AHashMap, AHashSet}; use error_set::ErrContext; use futures::future::try_join_all; -use iggy_common::locking::IggySharedMutFn; -use iggy_common::{IdKind, Identifier, IggyError}; use std::cell::RefCell; use std::sync::atomic::{AtomicU32, Ordering}; use tokio::fs; -use tokio::fs::read_dir; use tracing::{error, info, warn}; -static CURRENT_STREAM_ID: AtomicU32 = AtomicU32::new(1); - -impl System { - pub(crate) async fn load_streams( - &mut self, - streams: Vec<StreamState>, - ) -> Result<(), IggyError> { - info!("Loading streams from disk..."); - let mut unloaded_streams = Vec::new(); - let mut dir_entries = read_dir(&self.config.get_streams_path()) - .await - .map_err(|error| { - error!("Cannot read streams directory: {error}"); - IggyError::CannotReadStreams - })?; - - while let Some(dir_entry) = dir_entries.next_entry().await.unwrap_or(None) { - let name = dir_entry.file_name().into_string().unwrap(); - let stream_id = name.parse::<u32>().map_err(|_| { - error!("Invalid stream ID file with name: '{name}'."); - IggyError::InvalidNumberValue - })?; - let stream_state = streams.iter().find(|s| s.id == stream_id); - if stream_state.is_none() { - error!( - "Stream with ID: '{stream_id}' was not found in state, but exists on disk and will be removed." - ); - if let Err(error) = fs::remove_dir_all(&dir_entry.path()).await { - error!("Cannot remove stream directory: {error}"); - } else { - warn!("Stream with ID: '{stream_id}' was removed."); - } - continue; - } - - let stream_state = stream_state.unwrap(); - let mut stream = Stream::empty( - stream_id, - &stream_state.name, - self.config.clone(), - self.storage.clone(), - ); - stream.created_at = stream_state.created_at; - unloaded_streams.push(stream); - } - - let state_stream_ids = streams - .iter() - .map(|stream| stream.id) - .collect::<AHashSet<u32>>(); - let unloaded_stream_ids = unloaded_streams - .iter() - .map(|stream| stream.stream_id) - .collect::<AHashSet<u32>>(); - let mut missing_ids = state_stream_ids - .difference(&unloaded_stream_ids) - .copied() - .collect::<AHashSet<u32>>(); - if missing_ids.is_empty() { - info!("All streams found on disk were found in state."); - } else { - warn!("Streams with IDs: '{missing_ids:?}' were not found on disk."); - if self.config.recovery.recreate_missing_state { - info!( - "Recreating missing state in recovery config is enabled, missing streams will be created." - ); - for stream_id in missing_ids.iter() { - let stream_id = *stream_id; - let stream_state = streams.iter().find(|s| s.id == stream_id).unwrap(); - let stream = Stream::create( - stream_id, - &stream_state.name, - self.config.clone(), - self.storage.clone(), - ); - stream.persist().await?; - unloaded_streams.push(stream); - info!( - "Missing stream with ID: '{stream_id}', name: {} was recreated.", - stream_state.name - ); - } - missing_ids.clear(); - } else { - warn!( - "Recreating missing state in recovery config is disabled, missing streams will not be created." - ); - } - } - - let mut streams_states = streams - .into_iter() - .filter(|s| !missing_ids.contains(&s.id)) - .map(|s| (s.id, s)) - .collect::<AHashMap<_, _>>(); - let loaded_streams = RefCell::new(Vec::new()); - let load_stream_tasks = unloaded_streams.into_iter().map(|mut stream| { - let state = streams_states.remove(&stream.stream_id).unwrap(); - - async { - stream.load(state).await?; - loaded_streams.borrow_mut().push(stream); - Result::<(), IggyError>::Ok(()) - } - }); - try_join_all(load_stream_tasks).await?; - - for stream in loaded_streams.take() { - if self.streams.contains_key(&stream.stream_id) { - error!("Stream with ID: '{}' already exists.", &stream.stream_id); - continue; - } - - if self.streams_ids.contains_key(&stream.name) { - error!("Stream with name: '{}' already exists.", &stream.name); - continue; - } - - self.metrics.increment_streams(1); - self.metrics.increment_topics(stream.get_topics_count()); - self.metrics - .increment_partitions(stream.get_partitions_count()); - self.metrics.increment_segments(stream.get_segments_count()); - self.metrics.increment_messages(stream.get_messages_count()); - - self.streams_ids - .insert(stream.name.clone(), stream.stream_id); - self.streams.insert(stream.stream_id, stream); - } - - info!("Loaded {} stream(s) from disk.", self.streams.len()); - Ok(()) - } +impl IggyShard { pub fn get_streams(&self) -> Vec<&Stream> { self.streams.values().collect() } diff --git a/core/server/src/streaming/systems/system.rs b/core/server/src/shard/system/system.rs similarity index 100% rename from core/server/src/streaming/systems/system.rs rename to core/server/src/shard/system/system.rs diff --git a/core/server/src/streaming/systems/topics.rs b/core/server/src/shard/system/topics.rs similarity index 100% rename from core/server/src/streaming/systems/topics.rs rename to core/server/src/shard/system/topics.rs diff --git a/core/server/src/streaming/systems/users.rs b/core/server/src/shard/system/users.rs similarity index 79% rename from core/server/src/streaming/systems/users.rs rename to core/server/src/shard/system/users.rs index cc27fcae..f2405f1f 100644 --- a/core/server/src/streaming/systems/users.rs +++ b/core/server/src/shard/system/users.rs @@ -16,13 +16,12 @@ * under the License. */ +use crate::shard::IggyShard; use crate::state::command::EntryCommand; use crate::state::models::CreateUserWithId; use crate::state::system::UserState; use crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken; use crate::streaming::session::Session; -use crate::streaming::systems::COMPONENT; -use crate::streaming::systems::system::System; use crate::streaming::users::user::User; use crate::streaming::utils::crypto; use crate::{IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV}; @@ -42,108 +41,7 @@ use tracing::{error, info, warn}; static USER_ID: AtomicU32 = AtomicU32::new(1); const MAX_USERS: usize = u32::MAX as usize; -impl System { - pub(crate) async fn load_users(&mut self, users: Vec<UserState>) -> Result<(), IggyError> { - info!("Loading users..."); - if users.is_empty() { - info!("No users found, creating the root user..."); - let root = Self::create_root_user(); - let command = CreateUser { - username: root.username.clone(), - password: root.password.clone(), - status: root.status, - permissions: root.permissions.clone(), - }; - self.state - .apply(0, &EntryCommand::CreateUser(CreateUserWithId { - user_id: root.id, - command - })) - .await - .with_error_context(|error| { - format!( - "{COMPONENT} (error: {error}) - failed to apply create user command, username: {}", - root.username - ) - })?; - - self.users.insert(root.id, root); - info!("Created the root user."); - } - - for user_state in users.into_iter() { - let mut user = User::with_password( - user_state.id, - &user_state.username, - user_state.password_hash, - user_state.status, - user_state.permissions, - ); - - user.created_at = user_state.created_at; - user.personal_access_tokens = user_state - .personal_access_tokens - .into_values() - .map(|token| { - ( - Arc::new(token.token_hash.clone()), - PersonalAccessToken::raw( - user_state.id, - &token.name, - &token.token_hash, - token.expiry_at, - ), - ) - }) - .collect(); - self.users.insert(user_state.id, user); - } - - let users_count = self.users.len(); - let current_user_id = self.users.keys().max().unwrap_or(&1); - USER_ID.store(current_user_id + 1, Ordering::SeqCst); - self.permissioner - .init(&self.users.values().collect::<Vec<&User>>()); - self.metrics.increment_users(users_count as u32); - info!("Initialized {users_count} user(s)."); - Ok(()) - } - - fn create_root_user() -> User { - let username = env::var(IGGY_ROOT_USERNAME_ENV); - let password = env::var(IGGY_ROOT_PASSWORD_ENV); - if (username.is_ok() && password.is_err()) || (username.is_err() && password.is_ok()) { - panic!( - "When providing the custom root user credentials, both username and password must be set." - ); - } - if username.is_ok() && password.is_ok() { - info!("Using the custom root user credentials."); - } else { - info!("Using the default root user credentials."); - } - - let username = username.unwrap_or(DEFAULT_ROOT_USERNAME.to_string()); - let password = password.unwrap_or(DEFAULT_ROOT_PASSWORD.to_string()); - if username.is_empty() || password.is_empty() { - panic!("Root user credentials are not set."); - } - if username.len() < MIN_USERNAME_LENGTH { - panic!("Root username is too short."); - } - if username.len() > MAX_USERNAME_LENGTH { - panic!("Root username is too long."); - } - if password.len() < MIN_PASSWORD_LENGTH { - panic!("Root password is too short."); - } - if password.len() > MAX_PASSWORD_LENGTH { - panic!("Root password is too long."); - } - - User::root(&username, &password) - } - +impl IggyShard { pub fn find_user( &self, session: &Session, diff --git a/core/server/src/streaming/mod.rs b/core/server/src/streaming/mod.rs index 5009cb9a..f9d40af8 100644 --- a/core/server/src/streaming/mod.rs +++ b/core/server/src/streaming/mod.rs @@ -28,7 +28,6 @@ pub mod segments; pub mod session; pub mod storage; pub mod streams; -pub mod systems; pub mod topics; pub mod users; pub mod utils; diff --git a/core/server/src/streaming/storage.rs b/core/server/src/streaming/storage.rs index fc0a752d..f423b842 100644 --- a/core/server/src/streaming/storage.rs +++ b/core/server/src/streaming/storage.rs @@ -33,6 +33,7 @@ use iggy_common::IggyError; use mockall::automock; use std::fmt::Debug; use std::future::Future; +use std::rc::Rc; use std::sync::Arc; macro_rules! forward_async_methods { @@ -143,7 +144,7 @@ pub trait PartitionStorage: Send { #[derive(Debug)] pub struct SystemStorage { - pub info: Arc<SystemInfoStorageKind>, + pub info: Rc<SystemInfoStorageKind>, pub stream: Arc<StreamStorageKind>, pub topic: Arc<TopicStorageKind>, pub partition: Arc<PartitionStorageKind>, @@ -151,7 +152,7 @@ pub struct SystemStorage { } impl SystemStorage { - pub fn new(config: Arc<SystemConfig>, persister: Arc<PersisterKind>) -> Self { + pub fn new(config: Rc<SystemConfig>, persister: Arc<PersisterKind>) -> Self { Self { info: Arc::new(SystemInfoStorageKind::File(FileSystemInfoStorage::new( config.get_state_info_path(), diff --git a/core/server/src/streaming/streams/stream.rs b/core/server/src/streaming/streams/stream.rs index ef528197..339fc08a 100644 --- a/core/server/src/streaming/streams/stream.rs +++ b/core/server/src/streaming/streams/stream.rs @@ -23,6 +23,7 @@ use ahash::AHashMap; use iggy_common::IggyByteSize; use iggy_common::IggyTimestamp; use std::fmt::Display; +use std::rc::Rc; use std::sync::Arc; use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; @@ -47,8 +48,8 @@ impl Stream { pub fn empty( id: u32, name: &str, - config: Arc<SystemConfig>, - storage: Arc<SystemStorage>, + config: Rc<SystemConfig>, + storage: Rc<SystemStorage>, ) -> Self { Stream::create(id, name, config, storage) } @@ -56,8 +57,8 @@ impl Stream { pub fn create( id: u32, name: &str, - config: Arc<SystemConfig>, - storage: Arc<SystemStorage>, + config: Rc<SystemConfig>, + storage: Rc<SystemStorage>, ) -> Self { let path = config.get_stream_path(id); let topics_path = config.get_topics_path(id); diff --git a/core/server/src/tcp/sender.rs b/core/server/src/tcp/sender.rs index 2b6b6d17..4dd11725 100644 --- a/core/server/src/tcp/sender.rs +++ b/core/server/src/tcp/sender.rs @@ -17,15 +17,15 @@ */ use iggy_common::IggyError; +use monoio::io::{AsyncReadRent, AsyncReadRentExt, AsyncWriteRent, AsyncWriteRentExt}; use std::io::IoSlice; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tracing::debug; const STATUS_OK: &[u8] = &[0; 4]; pub(crate) async fn read<T>(stream: &mut T, buffer: &mut [u8]) -> Result<usize, IggyError> where - T: AsyncRead + AsyncWrite + Unpin, + T: AsyncReadRent + AsyncWriteRent + Unpin, { match stream.read_exact(buffer).await { Ok(0) => Err(IggyError::ConnectionClosed), @@ -42,14 +42,14 @@ where pub(crate) async fn send_empty_ok_response<T>(stream: &mut T) -> Result<(), IggyError> where - T: AsyncRead + AsyncWrite + Unpin, + T: AsyncReadRent + AsyncWriteRent + Unpin, { send_ok_response(stream, &[]).await } pub(crate) async fn send_ok_response<T>(stream: &mut T, payload: &[u8]) -> Result<(), IggyError> where - T: AsyncRead + AsyncWrite + Unpin, + T: AsyncReadRent + AsyncWriteRent + Unpin, { send_response(stream, STATUS_OK, payload).await } @@ -60,7 +60,7 @@ pub(crate) async fn send_ok_response_vectored<T>( slices: Vec<IoSlice<'_>>, ) -> Result<(), IggyError> where - T: AsyncRead + AsyncWrite + Unpin, + T: AsyncReadRentExt + AsyncWriteRentExt + Unpin, { send_response_vectored(stream, STATUS_OK, length, slices).await } @@ -70,7 +70,7 @@ pub(crate) async fn send_error_response<T>( error: IggyError, ) -> Result<(), IggyError> where - T: AsyncRead + AsyncWrite + Unpin, + T: AsyncReadRent + AsyncWriteRent + Unpin, { send_response(stream, &error.as_code().to_le_bytes(), &[]).await } @@ -81,7 +81,7 @@ pub(crate) async fn send_response<T>( payload: &[u8], ) -> Result<(), IggyError> where - T: AsyncRead + AsyncWrite + Unpin, + T: AsyncReadRent + AsyncWriteRent + Unpin, { debug!( "Sending response of len: {} with status: {:?}...", @@ -104,7 +104,7 @@ pub(crate) async fn send_response_vectored<T>( mut slices: Vec<IoSlice<'_>>, ) -> Result<(), IggyError> where - T: AsyncReadExt + AsyncWriteExt + Unpin, + T: AsyncReadRentExt + AsyncWriteRentExt + Unpin, { debug!( "Sending vectored response of len: {} with status: {:?}...", diff --git a/core/server/src/tcp/tcp_listener.rs b/core/server/src/tcp/tcp_listener.rs index 0d84aa10..2e4f25be 100644 --- a/core/server/src/tcp/tcp_listener.rs +++ b/core/server/src/tcp/tcp_listener.rs @@ -17,60 +17,44 @@ */ use crate::binary::sender::SenderKind; +use crate::shard::IggyShard; use crate::streaming::clients::client_manager::Transport; -use crate::streaming::systems::system::SharedSystem; use crate::tcp::connection_handler::{handle_connection, handle_error}; use std::net::SocketAddr; +use std::rc::Rc; +use monoio::net::TcpListener; +use rustls::pki_types::Ipv4Addr; use tokio::net::TcpSocket; use tokio::sync::oneshot; use tracing::{error, info}; -pub async fn start(address: &str, socket: TcpSocket, system: SharedSystem) -> SocketAddr { - let address = address.to_string(); - let (tx, rx) = oneshot::channel(); - tokio::spawn(async move { - let addr = address.parse(); - if addr.is_err() { - panic!("Unable to parse address {:?}", address); - } - - socket - .bind(addr.unwrap()) - .expect("Unable to bind socket to address"); - - let listener = socket.listen(1024).expect("Unable to start TCP server."); - - let local_addr = listener - .local_addr() - .expect("Failed to get local address for TCP listener"); - - tx.send(local_addr).unwrap_or_else(|_| { - panic!( - "Failed to send the local address {:?} for TCP listener", - local_addr - ) - }); - +pub async fn start(server_name: &str, shard: Rc<IggyShard>) { + let addr: SocketAddr = if shard.config.tcp.ipv6 { + shard.config.tcp.address.parse().expect("Unable to parse IPv6 address") + } else { + shard.config.tcp.address.parse().expect("Unable to parse IPv4 address") + }; + monoio::spawn(async move { + let listener = TcpListener::bind(addr).expect(format!("Unable to start {server_name}.").as_ref()); loop { match listener.accept().await { Ok((stream, address)) => { + let shard = shard.clone(); info!("Accepted new TCP connection: {address}"); - let session = system - .read() - .await - .add_client(&address, Transport::Tcp) - .await; + let session = shard + .add_client(&address, Transport::Tcp); let client_id = session.client_id; info!("Created new session: {session}"); - let system = system.clone(); let mut sender = SenderKind::get_tcp_sender(stream); - tokio::spawn(async move { + monoio::spawn(async move { if let Err(error) = - handle_connection(session, &mut sender, system.clone()).await + handle_connection(session, &mut sender, shard.clone()).await { handle_error(error); - system.read().await.delete_client(client_id).await; + //TODO: Fixme + /* + //system.read().await.delete_client(client_id).await; if let Err(error) = sender.shutdown().await { error!( "Failed to shutdown TCP stream for client: {client_id}, address: {address}. {error}" @@ -80,6 +64,7 @@ pub async fn start(address: &str, socket: TcpSocket, system: SharedSystem) -> So "Successfully closed TCP stream for client: {client_id}, address: {address}." ); } + */ } }); } @@ -87,8 +72,4 @@ pub async fn start(address: &str, socket: TcpSocket, system: SharedSystem) -> So } } }); - match rx.await { - Ok(addr) => addr, - Err(_) => panic!("Failed to get the local address for TCP listener."), - } } diff --git a/core/server/src/tcp/tcp_sender.rs b/core/server/src/tcp/tcp_sender.rs index 837c8f06..a157dead 100644 --- a/core/server/src/tcp/tcp_sender.rs +++ b/core/server/src/tcp/tcp_sender.rs @@ -21,7 +21,8 @@ use crate::tcp::COMPONENT; use crate::{server_error::ServerError, tcp::sender}; use error_set::ErrContext; use iggy_common::IggyError; -use tokio::{io::AsyncWriteExt, net::TcpStream}; +use tokio::{io::AsyncWriteExt}; +use monoio::net::TcpStream; #[derive(Debug)] pub struct TcpSender { diff --git a/core/server/src/tcp/tcp_server.rs b/core/server/src/tcp/tcp_server.rs index a57b7c3f..82bbe0c1 100644 --- a/core/server/src/tcp/tcp_server.rs +++ b/core/server/src/tcp/tcp_server.rs @@ -16,26 +16,25 @@ * under the License. */ -use crate::configs::tcp::TcpConfig; -use crate::streaming::systems::system::SharedSystem; -use crate::tcp::{tcp_listener, tcp_socket, tcp_tls_listener}; -use std::net::SocketAddr; +use crate::shard::IggyShard; +use crate::tcp::{tcp_listener}; +use std::rc::Rc; +use iggy_common::IggyError; use tracing::info; /// Starts the TCP server. /// Returns the address the server is listening on. -pub async fn start(config: TcpConfig, system: SharedSystem) -> SocketAddr { - let server_name = if config.tls.enabled { +pub async fn start(shard: Rc<IggyShard>) -> Result<(), IggyError> { + let server_name = if shard.config.tcp.tls.enabled { "Iggy TCP TLS" } else { "Iggy TCP" }; info!("Initializing {server_name} server..."); - let socket = tcp_socket::build(config.ipv6, config.socket); - let addr = match config.tls.enabled { - true => tcp_tls_listener::start(&config.address, config.tls, socket, system).await, - false => tcp_listener::start(&config.address, socket, system).await, + let addr = match shard.config.tcp.tls.enabled { + true => unimplemented!("TLS support is not implemented yet"), + false => tcp_listener::start(server_name, shard).await, }; info!("{server_name} server has started on: {:?}", addr); - addr + Ok(()) } diff --git a/core/server/src/tcp/tcp_socket.rs b/core/server/src/tcp/tcp_socket.rs index 91ed105a..69c1d408 100644 --- a/core/server/src/tcp/tcp_socket.rs +++ b/core/server/src/tcp/tcp_socket.rs @@ -18,7 +18,6 @@ use std::num::TryFromIntError; -use tokio::net::TcpSocket; use crate::configs::tcp::TcpSocketConfig; diff --git a/core/server/src/tcp/tcp_tls_sender.rs b/core/server/src/tcp/tcp_tls_sender.rs index 6e3056a5..6ba62315 100644 --- a/core/server/src/tcp/tcp_tls_sender.rs +++ b/core/server/src/tcp/tcp_tls_sender.rs @@ -21,9 +21,8 @@ use crate::tcp::COMPONENT; use crate::{server_error::ServerError, tcp::sender}; use error_set::ErrContext; use iggy_common::IggyError; -use tokio::io::AsyncWriteExt; -use tokio::net::TcpStream; -use tokio_rustls::server::TlsStream; +use monoio::net::TcpStream; +use monoio_native_tls::TlsStream; #[derive(Debug)] pub struct TcpTlsSender {
