This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch rebase_master in repository https://gitbox.apache.org/repos/asf/iggy.git
commit d0ea2c19eb9e0e2e2314e3510675a2ca808dd11e Author: numinex <[email protected]> AuthorDate: Tue Jun 24 18:24:30 2025 +0200 begin fix cg --- Cargo.lock | 1 + core/server/Cargo.toml | 1 + core/server/src/binary/command.rs | 7 +- core/server/src/binary/sender.rs | 28 ++++--- core/server/src/channels/commands/archive_state.rs | 4 + .../commands/clean_personal_access_tokens.rs | 3 + .../src/channels/commands/maintain_messages.rs | 5 ++ core/server/src/channels/commands/print_sysinfo.rs | 4 + core/server/src/channels/commands/save_messages.rs | 4 + .../src/channels/commands/verify_heartbeats.rs | 3 + core/server/src/channels/handler.rs | 4 + core/server/src/channels/server_command.rs | 4 + core/server/src/configs/defaults.rs | 3 +- core/server/src/http/http_server.rs | 3 + core/server/src/quic/listener.rs | 9 +- core/server/src/quic/quic_sender.rs | 15 +++- core/server/src/quic/quic_server.rs | 4 +- core/server/src/shard/builder.rs | 2 +- core/server/src/shard/mod.rs | 95 +++++++++++++++++----- core/server/src/shard/namespace.rs | 4 +- core/server/src/shard/system/clients.rs | 10 +-- core/server/src/shard/system/snapshot/mod.rs | 1 - core/server/src/shard/system/storage.rs | 1 + core/server/src/shard/system/streams.rs | 1 - core/server/src/shard/transmission/message.rs | 14 ++-- .../server/src/streaming/clients/client_manager.rs | 65 +++++++-------- core/server/src/streaming/partitions/messages.rs | 5 +- core/server/src/streaming/partitions/partition.rs | 20 +++-- core/server/src/streaming/segments/segment.rs | 9 +- core/server/src/streaming/session.rs | 21 +++-- core/server/src/streaming/storage.rs | 42 +++++----- core/server/src/streaming/streams/storage.rs | 2 +- core/server/src/streaming/streams/stream.rs | 10 +-- core/server/src/streaming/streams/topics.rs | 9 +- core/server/src/streaming/topics/consumer_group.rs | 12 +-- .../server/src/streaming/topics/consumer_groups.rs | 16 ++-- core/server/src/streaming/topics/topic.rs | 18 ++-- core/server/src/streaming/utils/memory_pool.rs | 5 +- core/server/src/tcp/connection_handler.rs | 32 +++++--- core/server/src/tcp/sender.rs | 53 +++++++----- core/server/src/tcp/tcp_listener.rs | 45 ++++++---- core/server/src/tcp/tcp_sender.rs | 9 +- core/server/src/tcp/tcp_server.rs | 4 +- core/server/src/tcp/tcp_socket.rs | 24 ++++-- core/server/src/tcp/tcp_tls_listener.rs | 11 ++- core/server/src/tcp/tcp_tls_sender.rs | 7 +- 46 files changed, 408 insertions(+), 241 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 17894833..3e51b287 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7071,6 +7071,7 @@ dependencies = [ "serde_with", "serial_test", "sharded_queue", + "socket2", "static-toml", "strum", "sysinfo 0.35.2", diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index a493e837..70f6a7fd 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -61,6 +61,7 @@ futures = { workspace = true } human-repr = { workspace = true } iggy_common = { workspace = true } jsonwebtoken = "9.3.1" +socket2 = "0.5.10" lending-iterator = "0.1.7" hash32 = "1.0.0" mimalloc = { workspace = true, optional = true } diff --git a/core/server/src/binary/command.rs b/core/server/src/binary/command.rs index 0b997941..aa935e09 100644 --- a/core/server/src/binary/command.rs +++ b/core/server/src/binary/command.rs @@ -16,8 +16,11 @@ * under the License. */ +use std::rc::Rc; + use crate::binary::sender::SenderKind; use crate::define_server_command_enum; +use crate::shard::IggyShard; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use bytes::{BufMut, Bytes, BytesMut}; @@ -130,8 +133,8 @@ pub trait ServerCommandHandler { self, sender: &mut SenderKind, length: u32, - session: &Session, - system: &SharedSystem, + session: &Rc<Session>, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError>; } diff --git a/core/server/src/binary/sender.rs b/core/server/src/binary/sender.rs index 2ccf349b..06e9c51f 100644 --- a/core/server/src/binary/sender.rs +++ b/core/server/src/binary/sender.rs @@ -22,10 +22,13 @@ use std::io::IoSlice; use crate::tcp::tcp_sender::TcpSender; use crate::tcp::tcp_tls_sender::TcpTlsSender; use crate::{quic::quic_sender::QuicSender, server_error::ServerError}; +use bytes::BytesMut; use iggy_common::IggyError; -use quinn::{RecvStream, SendStream}; +use monoio::buf::IoBufMut; use monoio::net::TcpStream; -use tokio_native_tls::TlsStream; +use monoio_native_tls::TlsStream; +use nix::libc; +use quinn::{RecvStream, SendStream}; macro_rules! forward_async_methods { ( @@ -48,25 +51,24 @@ macro_rules! forward_async_methods { } pub trait Sender { - fn read(&mut self, buffer: &mut [u8]) -> impl Future<Output = Result<usize, IggyError>>; - fn send_empty_ok_response(&mut self) -> impl Future<Output = Result<(), IggyError>>; - fn send_ok_response( + fn read( &mut self, - payload: &[u8], - ) -> impl Future<Output = Result<(), IggyError>>; + buffer: BytesMut, + ) -> impl Future<Output = (Result<usize, IggyError>, BytesMut)>; + fn send_empty_ok_response(&mut self) -> impl Future<Output = Result<(), IggyError>>; + fn send_ok_response(&mut self, payload: &[u8]) -> impl Future<Output = Result<(), IggyError>>; fn send_ok_response_vectored( &mut self, length: &[u8], - slices: Vec<IoSlice<'_>>, + slices: Vec<libc::iovec>, ) -> impl Future<Output = Result<(), IggyError>>; fn send_error_response( &mut self, error: IggyError, - ) -> impl Future<Output = Result<(), IggyError>>; - fn shutdown(&mut self) -> impl Future<Output = Result<(), ServerError>>; + ) -> impl Future<Output = Result<(), IggyError>>; + fn shutdown(&mut self) -> impl Future<Output = Result<(), ServerError>>; } -#[allow(clippy::large_enum_variant)] pub enum SenderKind { Tcp(TcpSender), TcpTls(TcpTlsSender), @@ -90,10 +92,10 @@ impl SenderKind { } forward_async_methods! { - async fn read(&mut self, buffer: &mut [u8]) -> Result<usize, IggyError>; + async fn read(&mut self, buffer: BytesMut) -> (Result<usize, IggyError>, BytesMut); async fn send_empty_ok_response(&mut self) -> Result<(), IggyError>; async fn send_ok_response(&mut self, payload: &[u8]) -> Result<(), IggyError>; - async fn send_ok_response_vectored(&mut self, length: &[u8], slices: Vec<IoSlice<'_>>) -> Result<(), IggyError>; + async fn send_ok_response_vectored(&mut self, length: &[u8], slices: Vec<libc::iovec>) -> Result<(), IggyError>; async fn send_error_response(&mut self, error: IggyError) -> Result<(), IggyError>; async fn shutdown(&mut self) -> Result<(), ServerError>; } diff --git a/core/server/src/channels/commands/archive_state.rs b/core/server/src/channels/commands/archive_state.rs index e80d17d3..f42393ae 100644 --- a/core/server/src/channels/commands/archive_state.rs +++ b/core/server/src/channels/commands/archive_state.rs @@ -16,6 +16,8 @@ * under the License. */ +//TODO: Fixme +/* use crate::channels::server_command::BackgroundServerCommand; use crate::configs::server::StateMaintenanceConfig; use crate::streaming::systems::system::SharedSystem; @@ -137,3 +139,5 @@ impl BackgroundServerCommand<ArchiveStateCommand> for ArchiveStateExecutor { }); } } + +*/ diff --git a/core/server/src/channels/commands/clean_personal_access_tokens.rs b/core/server/src/channels/commands/clean_personal_access_tokens.rs index c815a18f..fd2eaf11 100644 --- a/core/server/src/channels/commands/clean_personal_access_tokens.rs +++ b/core/server/src/channels/commands/clean_personal_access_tokens.rs @@ -16,6 +16,7 @@ * under the License. */ +/* use crate::channels::server_command::BackgroundServerCommand; use crate::configs::server::PersonalAccessTokenCleanerConfig; use crate::streaming::systems::system::SharedSystem; @@ -135,3 +136,5 @@ impl BackgroundServerCommand<CleanPersonalAccessTokensCommand> }); } } + +*/ diff --git a/core/server/src/channels/commands/maintain_messages.rs b/core/server/src/channels/commands/maintain_messages.rs index 16ef3647..2dc526e2 100644 --- a/core/server/src/channels/commands/maintain_messages.rs +++ b/core/server/src/channels/commands/maintain_messages.rs @@ -16,6 +16,9 @@ * under the License. */ +//TODO: Fixme +/* + use crate::archiver::ArchiverKind; use crate::channels::server_command::BackgroundServerCommand; use crate::configs::server::MessagesMaintenanceConfig; @@ -553,3 +556,5 @@ async fn delete_segments( messages_count, }) } + +*/ diff --git a/core/server/src/channels/commands/print_sysinfo.rs b/core/server/src/channels/commands/print_sysinfo.rs index c130b58c..c0f2b1a2 100644 --- a/core/server/src/channels/commands/print_sysinfo.rs +++ b/core/server/src/channels/commands/print_sysinfo.rs @@ -16,6 +16,8 @@ * under the License. */ +// TODO: Fixme +/* use crate::{ channels::server_command::BackgroundServerCommand, configs::server::ServerConfig, @@ -121,3 +123,5 @@ impl BackgroundServerCommand<SysInfoPrintCommand> for SysInfoPrintExecutor { }); } } + +*/ diff --git a/core/server/src/channels/commands/save_messages.rs b/core/server/src/channels/commands/save_messages.rs index 586fb8b0..bf832762 100644 --- a/core/server/src/channels/commands/save_messages.rs +++ b/core/server/src/channels/commands/save_messages.rs @@ -16,6 +16,8 @@ * under the License. */ +//Todo: Fixme +/* use crate::channels::server_command::BackgroundServerCommand; use crate::configs::server::MessageSaverConfig; use crate::configs::server::ServerConfig; @@ -116,3 +118,5 @@ impl BackgroundServerCommand<SaveMessagesCommand> for SaveMessagesExecutor { }); } } + +*/ diff --git a/core/server/src/channels/commands/verify_heartbeats.rs b/core/server/src/channels/commands/verify_heartbeats.rs index ba48c69e..b2b01c49 100644 --- a/core/server/src/channels/commands/verify_heartbeats.rs +++ b/core/server/src/channels/commands/verify_heartbeats.rs @@ -16,6 +16,8 @@ * under the License. */ +//TODO: Fixme +/* use crate::channels::server_command::BackgroundServerCommand; use crate::configs::server::HeartbeatConfig; use crate::streaming::systems::system::SharedSystem; @@ -148,3 +150,4 @@ impl BackgroundServerCommand<VerifyHeartbeatsCommand> for VerifyHeartbeatsExecut }); } } +*/ diff --git a/core/server/src/channels/handler.rs b/core/server/src/channels/handler.rs index 8bd82e0d..7612a259 100644 --- a/core/server/src/channels/handler.rs +++ b/core/server/src/channels/handler.rs @@ -16,6 +16,8 @@ * under the License. */ +//TODO: Fixme +/* use super::server_command::BackgroundServerCommand; use crate::configs::server::ServerConfig; use crate::streaming::systems::system::SharedSystem; @@ -44,3 +46,5 @@ impl<'a> BackgroundServerCommandHandler<'a> { } } } + +*/ diff --git a/core/server/src/channels/server_command.rs b/core/server/src/channels/server_command.rs index 326b3d77..16b0dc2f 100644 --- a/core/server/src/channels/server_command.rs +++ b/core/server/src/channels/server_command.rs @@ -16,6 +16,8 @@ * under the License. */ +//TODO: Fixme +/* use crate::configs::server::ServerConfig; use crate::streaming::systems::system::SharedSystem; use flume::{Receiver, Sender}; @@ -38,3 +40,5 @@ pub trait BackgroundServerCommand<C> { receiver: Receiver<C>, ); } + +*/ diff --git a/core/server/src/configs/defaults.rs b/core/server/src/configs/defaults.rs index 82b560cf..fc333ec7 100644 --- a/core/server/src/configs/defaults.rs +++ b/core/server/src/configs/defaults.rs @@ -36,6 +36,7 @@ use crate::configs::system::{ use crate::configs::tcp::{TcpConfig, TcpTlsConfig}; use iggy_common::IggyByteSize; use iggy_common::IggyDuration; +use std::rc::Rc; use std::sync::Arc; use std::time::Duration; @@ -51,7 +52,7 @@ impl Default for ServerConfig { heartbeat: HeartbeatConfig::default(), message_saver: MessageSaverConfig::default(), personal_access_token: PersonalAccessTokenConfig::default(), - system: Arc::new(SystemConfig::default()), + system: Rc::new(SystemConfig::default()), quic: QuicConfig::default(), tcp: TcpConfig::default(), http: HttpConfig::default(), diff --git a/core/server/src/http/http_server.rs b/core/server/src/http/http_server.rs index b91857f5..7dec1a7f 100644 --- a/core/server/src/http/http_server.rs +++ b/core/server/src/http/http_server.rs @@ -16,6 +16,8 @@ * under the License. */ +//TODO: Fixme +/* use crate::configs::http::{HttpConfig, HttpCorsConfig}; use crate::http::diagnostics::request_diagnostics; use crate::http::jwt::cleaner::start_expired_tokens_cleaner; @@ -191,3 +193,4 @@ fn configure_cors(config: HttpCorsConfig) -> CorsLayer { .allow_credentials(config.allow_credentials) .allow_private_network(config.allow_private_network) } +*/ diff --git a/core/server/src/quic/listener.rs b/core/server/src/quic/listener.rs index e9b329de..f5cfc429 100644 --- a/core/server/src/quic/listener.rs +++ b/core/server/src/quic/listener.rs @@ -16,12 +16,14 @@ * under the License. */ +use std::rc::Rc; + use crate::binary::command::{ServerCommand, ServerCommandHandler}; use crate::binary::sender::SenderKind; use crate::server_error::ConnectionError; +use crate::shard::IggyShard; use crate::streaming::clients::client_manager::Transport; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::anyhow; use iggy_common::IggyError; use quinn::{Connection, Endpoint, RecvStream, SendStream}; @@ -29,6 +31,8 @@ use tracing::{error, info, trace}; const LISTENERS_COUNT: u32 = 10; const INITIAL_BYTES_LENGTH: usize = 4; +//TODO: Fixme +/* pub fn start(endpoint: Endpoint, system: SharedSystem) { for _ in 0..LISTENERS_COUNT { @@ -65,7 +69,7 @@ pub fn start(endpoint: Endpoint, system: SharedSystem) { async fn handle_connection( incoming_connection: quinn::Connecting, - system: SharedSystem, + shard: Rc<IggyShard>, ) -> Result<(), ConnectionError> { let connection = incoming_connection.await?; let address = connection.remote_address(); @@ -179,3 +183,4 @@ async fn handle_stream( } } } +*/ diff --git a/core/server/src/quic/quic_sender.rs b/core/server/src/quic/quic_sender.rs index f1631cb2..17b60818 100644 --- a/core/server/src/quic/quic_sender.rs +++ b/core/server/src/quic/quic_sender.rs @@ -18,8 +18,11 @@ use crate::quic::COMPONENT; use crate::{binary::sender::Sender, server_error::ServerError}; +use bytes::BytesMut; use error_set::ErrContext; use iggy_common::IggyError; +use monoio::buf::IoBufMut; +use nix::libc; use quinn::{RecvStream, SendStream}; use std::io::IoSlice; use tracing::{debug, error}; @@ -33,15 +36,18 @@ pub struct QuicSender { } impl Sender for QuicSender { - async fn read(&mut self, buffer: &mut [u8]) -> Result<usize, IggyError> { + async fn read(&mut self, buffer: BytesMut) -> (Result<usize, IggyError>, BytesMut) { // Not-so-nice code because quinn recv stream has different API for read_exact + /* let read_bytes = buffer.len(); self.recv.read_exact(buffer).await.map_err(|error| { error!("Failed to read from the stream: {:?}", error); IggyError::QuicError })?; + */ - Ok(read_bytes) + //Ok(read_bytes) + todo!(); } async fn send_empty_ok_response(&mut self) -> Result<(), IggyError> { @@ -64,7 +70,7 @@ impl Sender for QuicSender { async fn send_ok_response_vectored( &mut self, length: &[u8], - slices: Vec<IoSlice<'_>>, + slices: Vec<libc::iovec>, ) -> Result<(), IggyError> { debug!("Sending vectored response with status: {:?}...", STATUS_OK); @@ -79,6 +85,8 @@ impl Sender for QuicSender { let mut total_bytes_written = 0; + //TODO: Fixme + /* for slice in slices { let slice_data = &*slice; if !slice_data.is_empty() { @@ -93,6 +101,7 @@ impl Sender for QuicSender { total_bytes_written += slice_data.len(); } } + */ debug!( "Sent vectored response: {} bytes of payload", diff --git a/core/server/src/quic/quic_server.rs b/core/server/src/quic/quic_server.rs index 7b8edf21..0a8020b8 100644 --- a/core/server/src/quic/quic_server.rs +++ b/core/server/src/quic/quic_server.rs @@ -31,8 +31,9 @@ use crate::configs::quic::QuicConfig; use crate::quic::COMPONENT; use crate::quic::listener; use crate::server_error::QuicError; -use crate::streaming::systems::system::SharedSystem; +//TODO: Fixme +/* /// Starts the QUIC server. /// Returns the address the server is listening on. pub fn start(config: QuicConfig, system: SharedSystem) -> SocketAddr { @@ -136,3 +137,4 @@ fn load_certificates( let key = keys.remove(0); Ok((certs, key)) } + */ diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs index 0428a483..a2a3fdfb 100644 --- a/core/server/src/shard/builder.rs +++ b/core/server/src/shard/builder.rs @@ -26,7 +26,7 @@ use crate::{ configs::server::ServerConfig, map_toggle_str, shard::Shard, - state::{file::FileState, StateKind}, + state::{StateKind, file::FileState}, streaming::{diagnostics::metrics::Metrics, storage::SystemStorage}, versioning::SemanticVersion, }; diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index 3e7e10f1..12ba64bb 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -17,9 +17,9 @@ */ pub mod builder; -pub mod system; pub mod gate; pub mod namespace; +pub mod system; pub mod transmission; use ahash::{AHashMap, AHashSet, HashMap}; @@ -29,18 +29,42 @@ use futures::future::try_join_all; use iggy_common::{EncryptorKind, IggyError, UserId}; use namespace::IggyNamespace; use std::{ - cell::{Cell, RefCell}, pin::Pin, rc::Rc, str::FromStr, sync::{atomic::{AtomicU32, Ordering}, Arc, RwLock}, time::Instant + cell::{Cell, RefCell}, + pin::Pin, + rc::Rc, + str::FromStr, + sync::{ + Arc, RwLock, + atomic::{AtomicU32, Ordering}, + }, + time::Instant, }; use tracing::{error, info, instrument, trace, warn}; use transmission::connector::{Receiver, ShardConnector, StopReceiver, StopSender}; use crate::{ configs::server::ServerConfig, - shard::{system::info::SystemInfo, transmission::frame::ShardFrame}, + shard::{ + system::info::SystemInfo, + transmission::{ + frame::ShardFrame, + message::{ShardEvent, ShardMessage}, + }, + }, state::{ - file::FileState, system::{StreamState, SystemState, UserState}, StateKind + StateKind, + file::FileState, + system::{StreamState, SystemState, UserState}, + }, + streaming::{ + clients::client_manager::ClientManager, + diagnostics::metrics::Metrics, + personal_access_tokens::personal_access_token::PersonalAccessToken, + session::Session, + storage::SystemStorage, + streams::stream::Stream, + users::{permissioner::Permissioner, user::User}, }, - streaming::{clients::client_manager::ClientManager, diagnostics::metrics::Metrics, personal_access_tokens::personal_access_token::PersonalAccessToken, session::Session, storage::SystemStorage, streams::stream::Stream, users::{permissioner::Permissioner, user::User}}, versioning::SemanticVersion, }; @@ -83,7 +107,7 @@ pub struct IggyShard { pub(crate) config: ServerConfig, //TODO: This could be shared. pub(crate) client_manager: RefCell<ClientManager>, - pub(crate) active_sessions: RefCell<Vec<Session>>, + pub(crate) active_sessions: RefCell<Vec<Rc<Session>>>, pub(crate) permissioner: RefCell<Permissioner>, pub(crate) users: RefCell<HashMap<UserId, User>>, @@ -244,8 +268,8 @@ impl IggyShard { info!("Loading streams from disk..."); let mut unloaded_streams = Vec::new(); // Does mononio has api for that ? - let mut dir_entries = std::fs::read_dir(&self.config.system.get_streams_path()) - .map_err(|error| { + let mut dir_entries = + std::fs::read_dir(&self.config.system.get_streams_path()).map_err(|error| { error!("Cannot read streams directory: {error}"); IggyError::CannotReadStreams })?; @@ -362,16 +386,21 @@ impl IggyShard { self.metrics.increment_messages(stream.get_messages_count()); self.streams_ids - .borrow_mut() + .borrow_mut() .insert(stream.name.clone(), stream.stream_id); self.streams.borrow_mut().insert(stream.stream_id, stream); } - info!("Loaded {} stream(s) from disk.", self.streams.borrow().len()); + info!( + "Loaded {} stream(s) from disk.", + self.streams.borrow().len() + ); Ok(()) } - pub fn assert_init(&self) -> Result<(), IggyError> { Ok(())} + pub fn assert_init(&self) -> Result<(), IggyError> { + Ok(()) + } #[instrument(skip_all, name = "trace_shutdown")] pub async fn shutdown(&mut self) -> Result<(), IggyError> { @@ -397,17 +426,41 @@ impl IggyShard { self.shards.len() as u32 } - pub fn ensure_authenticated(&self, client_id: u32) -> Result<u32, IggyError> { + pub fn broadcast_event_to_all_shards(&self, client_id: u32, event: Rc<ShardEvent>) { + self.shards + .iter() + .filter_map(|shard| { + if shard.id != self.id { + Some(shard.connection.clone()) + } else { + None + } + }) + .map(|conn| { + let message = ShardMessage::Event(event.clone()); + conn.send(ShardFrame::new(client_id, message, None)); + }) + .collect::<Vec<_>>(); + } + + pub fn add_active_session(&self, session: Rc<Session>) { + self.active_sessions.borrow_mut().push(session); + } + + pub fn ensure_authenticated(&self, session: &Session) -> Result<u32, IggyError> { let active_sessions = self.active_sessions.borrow(); - let session = active_sessions + let user_id = active_sessions .iter() - .find(|s| s.client_id == client_id) - .ok_or_else(|| IggyError::Unauthenticated)?; - if session.is_authenticated() { - Ok(session.get_user_id()) - } else { - error!("{COMPONENT} - unauthenticated access attempt, session: {session}"); - Err(IggyError::Unauthenticated) - } + .find(|s| s.get_user_id() == session.get_user_id()) + .ok_or_else(|| IggyError::Unauthenticated) + .and_then(|session| { + if session.is_authenticated() { + Ok(session.get_user_id()) + } else { + error!("{COMPONENT} - unauthenticated access attempt, session: {session}"); + Err(IggyError::Unauthenticated) + } + })?; + Ok(user_id) } } diff --git a/core/server/src/shard/namespace.rs b/core/server/src/shard/namespace.rs index d9d68ec9..a7c899aa 100644 --- a/core/server/src/shard/namespace.rs +++ b/core/server/src/shard/namespace.rs @@ -16,9 +16,9 @@ * under the License. */ -use std::hash::Hasher as _; -use iggy_common::Identifier; use hash32::{Hasher, Murmur3Hasher}; +use iggy_common::Identifier; +use std::hash::Hasher as _; //TODO: Will probably want to move it to separate crate so we can share it with sdk. #[derive(Debug, Clone, Eq, PartialEq, Hash)] diff --git a/core/server/src/shard/system/clients.rs b/core/server/src/shard/system/clients.rs index 2cb0fbe8..fde182ce 100644 --- a/core/server/src/shard/system/clients.rs +++ b/core/server/src/shard/system/clients.rs @@ -16,22 +16,22 @@ * under the License. */ +use crate::shard::IggyShard; use crate::streaming::clients::client_manager::{Client, Transport}; use crate::streaming::session::Session; -use crate::streaming::systems::COMPONENT; -use crate::streaming::systems::system::System; use error_set::ErrContext; use iggy_common::Identifier; use iggy_common::IggyError; use iggy_common::locking::IggySharedMut; use iggy_common::locking::IggySharedMutFn; use std::net::SocketAddr; +use std::rc::Rc; use std::sync::Arc; use tracing::{error, info}; impl IggyShard { - pub async fn add_client(&self, address: &SocketAddr, transport: Transport) -> Arc<Session> { - let mut client_manager = self.client_manager.write().await; + pub fn add_client(&self, address: &SocketAddr, transport: Transport) -> Rc<Session> { + let mut client_manager = self.client_manager.borrow_mut(); let session = client_manager.add_client(address, transport); info!("Added {transport} client with session: {session} for IP address: {address}"); self.metrics.increment_clients(1); @@ -42,7 +42,7 @@ impl IggyShard { let consumer_groups: Vec<(u32, u32, u32)>; { - let mut client_manager = self.client_manager.write().await; + let mut client_manager = self.client_manager.borrow_mut(); let client = client_manager.delete_client(client_id).await; if client.is_none() { error!("Client with ID: {client_id} was not found in the client manager.",); diff --git a/core/server/src/shard/system/snapshot/mod.rs b/core/server/src/shard/system/snapshot/mod.rs index 92b0c400..f6e07785 100644 --- a/core/server/src/shard/system/snapshot/mod.rs +++ b/core/server/src/shard/system/snapshot/mod.rs @@ -20,7 +20,6 @@ mod procdump; use crate::configs::system::SystemConfig; use crate::streaming::session::Session; -use crate::streaming::systems::system::System; use async_zip::tokio::write::ZipFileWriter; use async_zip::{Compression, ZipEntryBuilder}; use iggy_common::{IggyDuration, IggyError, Snapshot, SnapshotCompression, SystemSnapshotType}; diff --git a/core/server/src/shard/system/storage.rs b/core/server/src/shard/system/storage.rs index 9a493751..4cb41120 100644 --- a/core/server/src/shard/system/storage.rs +++ b/core/server/src/shard/system/storage.rs @@ -16,6 +16,7 @@ * under the License. */ +use crate::shard::system::info::SystemInfo; use crate::streaming::persistence::persister::PersisterKind; use crate::streaming::storage::SystemInfoStorage; use crate::streaming::utils::PooledBuffer; diff --git a/core/server/src/shard/system/streams.rs b/core/server/src/shard/system/streams.rs index efcfff2b..8a0772a9 100644 --- a/core/server/src/shard/system/streams.rs +++ b/core/server/src/shard/system/streams.rs @@ -26,7 +26,6 @@ use std::sync::atomic::{AtomicU32, Ordering}; use tokio::fs; use tracing::{error, info, warn}; - impl IggyShard { pub fn get_streams(&self) -> Vec<&Stream> { self.streams.values().collect() diff --git a/core/server/src/shard/transmission/message.rs b/core/server/src/shard/transmission/message.rs index 3572c0ec..8d1e66eb 100644 --- a/core/server/src/shard/transmission/message.rs +++ b/core/server/src/shard/transmission/message.rs @@ -1,3 +1,5 @@ +use std::rc::Rc; + /* Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,16 +17,18 @@ * specific language governing permissions and limitations * under the License. */ -use crate::binary::command::ServerCommand; +use crate::{binary::command::ServerCommand, streaming::session::Session}; #[derive(Debug)] pub enum ShardMessage { Command(ServerCommand), - Event(ShardEvent), + Event(Rc<ShardEvent>), } #[derive(Debug)] -pub enum ShardEvent {} +pub enum ShardEvent { + NewSession(Rc<Session>), +} impl From<ServerCommand> for ShardMessage { fn from(command: ServerCommand) -> Self { @@ -32,8 +36,8 @@ impl From<ServerCommand> for ShardMessage { } } -impl From<ShardEvent> for ShardMessage { - fn from(event: ShardEvent) -> Self { +impl From<Rc<ShardEvent>> for ShardMessage { + fn from(event: Rc<ShardEvent>) -> Self { ShardMessage::Event(event) } } diff --git a/core/server/src/streaming/clients/client_manager.rs b/core/server/src/streaming/clients/client_manager.rs index 4a84a58c..fc5c00cc 100644 --- a/core/server/src/streaming/clients/client_manager.rs +++ b/core/server/src/streaming/clients/client_manager.rs @@ -22,27 +22,26 @@ use ahash::AHashMap; use iggy_common::IggyError; use iggy_common::IggyTimestamp; use iggy_common::UserId; -use iggy_common::locking::IggySharedMut; use iggy_common::locking::IggySharedMutFn; use std::fmt::{Display, Formatter}; use std::net::SocketAddr; -use std::sync::Arc; +use std::rc::Rc; #[derive(Debug, Default)] pub struct ClientManager { - clients: AHashMap<u32, IggySharedMut<Client>>, + clients: AHashMap<u32, Client>, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Client { pub user_id: Option<u32>, - pub session: Arc<Session>, + pub session: Rc<Session>, pub transport: Transport, pub consumer_groups: Vec<ConsumerGroup>, pub last_heartbeat: IggyTimestamp, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ConsumerGroup { pub stream_id: u32, pub topic_id: u32, @@ -65,9 +64,9 @@ impl Display for Transport { } impl ClientManager { - pub fn add_client(&mut self, address: &SocketAddr, transport: Transport) -> Arc<Session> { + pub fn add_client(&mut self, address: &SocketAddr, transport: Transport) -> Rc<Session> { let client_id = hash::calculate_32(address.to_string().as_bytes()); - let session = Arc::new(Session::from_client_id(client_id, *address)); + let session = Rc::new(Session::from_client_id(client_id, *address)); let client = Client { user_id: None, session: session.clone(), @@ -75,44 +74,43 @@ impl ClientManager { consumer_groups: Vec::new(), last_heartbeat: IggyTimestamp::now(), }; - self.clients.insert(client_id, IggySharedMut::new(client)); + self.clients.insert(client_id, client); session } - pub async fn set_user_id(&mut self, client_id: u32, user_id: UserId) -> Result<(), IggyError> { - let client = self.clients.get(&client_id); + pub fn set_user_id(&mut self, client_id: u32, user_id: UserId) -> Result<(), IggyError> { + let client = self.clients.get_mut(&client_id); if client.is_none() { return Err(IggyError::ClientNotFound(client_id)); } - let mut client = client.unwrap().write().await; + let mut client = client.unwrap(); client.user_id = Some(user_id); Ok(()) } - pub async fn clear_user_id(&mut self, client_id: u32) -> Result<(), IggyError> { - let client = self.clients.get(&client_id); + pub fn clear_user_id(&mut self, client_id: u32) -> Result<(), IggyError> { + let client = self.clients.get_mut(&client_id); if client.is_none() { return Err(IggyError::ClientNotFound(client_id)); } - let mut client = client.unwrap().write().await; + let mut client = client.unwrap(); client.user_id = None; Ok(()) } - pub fn try_get_client(&self, client_id: u32) -> Option<IggySharedMut<Client>> { + pub fn try_get_client(&self, client_id: u32) -> Option<Client> { self.clients.get(&client_id).cloned() } - pub fn get_clients(&self) -> Vec<IggySharedMut<Client>> { + pub fn get_clients(&self) -> Vec<Client> { self.clients.values().cloned().collect() } - pub async fn delete_clients_for_user(&mut self, user_id: UserId) -> Result<(), IggyError> { + pub fn delete_clients_for_user(&mut self, user_id: UserId) -> Result<(), IggyError> { let mut clients_to_remove = Vec::new(); for client in self.clients.values() { - let client = client.read().await; if let Some(client_user_id) = client.user_id { if client_user_id == user_id { clients_to_remove.push(client.session.client_id); @@ -127,28 +125,27 @@ impl ClientManager { Ok(()) } - pub async fn delete_client(&mut self, client_id: u32) -> Option<IggySharedMut<Client>> { + pub fn delete_client(&mut self, client_id: u32) -> Option<Client> { let client = self.clients.remove(&client_id); if let Some(client) = client.as_ref() { - let client = client.read().await; client.session.clear_user_id(); } client } - pub async fn join_consumer_group( - &self, + pub fn join_consumer_group( + &mut self, client_id: u32, stream_id: u32, topic_id: u32, group_id: u32, ) -> Result<(), IggyError> { - let client = self.clients.get(&client_id); + let client = self.clients.get_mut(&client_id); if client.is_none() { return Err(IggyError::ClientNotFound(client_id)); } - let mut client = client.unwrap().write().await; + let client = client.unwrap(); if client.consumer_groups.iter().any(|consumer_group| { consumer_group.group_id == group_id && consumer_group.topic_id == topic_id @@ -165,18 +162,18 @@ impl ClientManager { Ok(()) } - pub async fn leave_consumer_group( - &self, + pub fn leave_consumer_group( + &mut self, client_id: u32, stream_id: u32, topic_id: u32, consumer_group_id: u32, ) -> Result<(), IggyError> { - let client = self.clients.get(&client_id); + let client = self.clients.get_mut(&client_id); if client.is_none() { return Err(IggyError::ClientNotFound(client_id)); } - let mut client = client.unwrap().write().await; + let mut client = client.unwrap(); for (index, consumer_group) in client.consumer_groups.iter().enumerate() { if consumer_group.stream_id == stream_id && consumer_group.topic_id == topic_id @@ -189,9 +186,8 @@ impl ClientManager { Ok(()) } - pub async fn delete_consumer_groups_for_stream(&self, stream_id: u32) { - for client in self.clients.values() { - let mut client = client.write().await; + pub fn delete_consumer_groups_for_stream(&mut self, stream_id: u32) { + for client in self.clients.values_mut() { let indexes_to_remove = client .consumer_groups .iter() @@ -210,9 +206,8 @@ impl ClientManager { } } - pub async fn delete_consumer_groups_for_topic(&self, stream_id: u32, topic_id: u32) { - for client in self.clients.values() { - let mut client = client.write().await; + pub fn delete_consumer_groups_for_topic(&mut self, stream_id: u32, topic_id: u32) { + for client in self.clients.values_mut() { let indexes_to_remove = client .consumer_groups .iter() diff --git a/core/server/src/streaming/partitions/messages.rs b/core/server/src/streaming/partitions/messages.rs index a00b371f..a2d71c8e 100644 --- a/core/server/src/streaming/partitions/messages.rs +++ b/core/server/src/streaming/partitions/messages.rs @@ -368,6 +368,7 @@ mod tests { use crate::streaming::utils::MemoryPool; use bytes::Bytes; use iggy_common::{IggyExpiry, IggyMessage}; + use std::rc::Rc; use std::sync::Arc; use std::sync::atomic::{AtomicU32, AtomicU64}; use tempfile::TempDir; @@ -710,7 +711,7 @@ mod tests { let partition_id = 3; let with_segment = true; let temp_dir = TempDir::new().unwrap(); - let config = Arc::new(SystemConfig { + let config = Rc::new(SystemConfig { path: temp_dir.path().to_path_buf().to_str().unwrap().to_string(), message_deduplication: MessageDeduplicationConfig { enabled: deduplication_enabled, @@ -718,7 +719,7 @@ mod tests { }, ..Default::default() }); - let storage = Arc::new(SystemStorage::new( + let storage = Rc::new(SystemStorage::new( config.clone(), Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})), )); diff --git a/core/server/src/streaming/partitions/partition.rs b/core/server/src/streaming/partitions/partition.rs index 73c190cc..4a00ca85 100644 --- a/core/server/src/streaming/partitions/partition.rs +++ b/core/server/src/streaming/partitions/partition.rs @@ -28,6 +28,7 @@ use iggy_common::IggyExpiry; use iggy_common::IggyTimestamp; use iggy_common::Sizeable; use std::fmt; +use std::rc::Rc; use std::sync::Arc; use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; @@ -58,8 +59,8 @@ pub struct Partition { pub(crate) consumer_offsets: DashMap<u32, ConsumerOffset>, pub(crate) consumer_group_offsets: DashMap<u32, ConsumerOffset>, pub(crate) segments: Vec<Segment>, - pub(crate) config: Arc<SystemConfig>, - pub(crate) storage: Arc<SystemStorage>, + pub(crate) config: Rc<SystemConfig>, + pub(crate) storage: Rc<SystemStorage>, } #[derive(Debug, PartialEq, Clone)] @@ -88,8 +89,8 @@ impl Partition { topic_id: u32, partition_id: u32, with_segment: bool, - config: Arc<SystemConfig>, - storage: Arc<SystemStorage>, + config: Rc<SystemConfig>, + storage: Rc<SystemStorage>, message_expiry: IggyExpiry, messages_count_of_parent_stream: Arc<AtomicU64>, messages_count_of_parent_topic: Arc<AtomicU64>, @@ -209,17 +210,18 @@ mod tests { use iggy_common::IggyDuration; use iggy_common::IggyExpiry; use iggy_common::IggyTimestamp; + use std::rc::Rc; use std::sync::Arc; use std::sync::atomic::{AtomicU32, AtomicU64}; #[tokio::test] async fn should_be_created_with_a_single_segment_given_valid_parameters() { let tempdir = tempfile::TempDir::new().unwrap(); - let config = Arc::new(SystemConfig { + let config = Rc::new(SystemConfig { path: tempdir.path().to_str().unwrap().to_string(), ..Default::default() }); - let storage = Arc::new(SystemStorage::new( + let storage = Rc::new(SystemStorage::new( config.clone(), Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})), )); @@ -264,11 +266,11 @@ mod tests { #[tokio::test] async fn should_not_initialize_segments_given_false_with_segment_parameter() { let tempdir = tempfile::TempDir::new().unwrap(); - let config = Arc::new(SystemConfig { + let config = Rc::new(SystemConfig { path: tempdir.path().to_str().unwrap().to_string(), ..Default::default() }); - let storage = Arc::new(SystemStorage::new( + let storage = Rc::new(SystemStorage::new( config.clone(), Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})), )); @@ -279,7 +281,7 @@ mod tests { topic_id, 1, false, - Arc::new(SystemConfig::default()), + config, storage, IggyExpiry::NeverExpire, Arc::new(AtomicU64::new(0)), diff --git a/core/server/src/streaming/segments/segment.rs b/core/server/src/streaming/segments/segment.rs index acfc98f4..252fe7f1 100644 --- a/core/server/src/streaming/segments/segment.rs +++ b/core/server/src/streaming/segments/segment.rs @@ -27,6 +27,7 @@ use iggy_common::IggyByteSize; use iggy_common::IggyError; use iggy_common::IggyExpiry; use iggy_common::IggyTimestamp; +use std::rc::Rc; use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use tokio::fs::remove_file; @@ -60,7 +61,7 @@ pub struct Segment { pub(super) index_reader: Option<IndexReader>, pub(super) message_expiry: IggyExpiry, pub(super) accumulator: MessagesAccumulator, - pub(super) config: Arc<SystemConfig>, + pub(super) config: Rc<SystemConfig>, pub(super) indexes: IggyIndexesMut, pub(super) messages_size: Arc<AtomicU64>, pub(super) indexes_size: Arc<AtomicU64>, @@ -73,7 +74,7 @@ impl Segment { topic_id: u32, partition_id: u32, start_offset: u64, - config: Arc<SystemConfig>, + config: Rc<SystemConfig>, message_expiry: IggyExpiry, size_of_parent_stream: Arc<AtomicU64>, size_of_parent_topic: Arc<AtomicU64>, @@ -450,7 +451,7 @@ mod tests { let topic_id = 2; let partition_id = 3; let start_offset = 0; - let config = Arc::new(SystemConfig::default()); + let config = Rc::new(SystemConfig::default()); let path = config.get_segment_path(stream_id, topic_id, partition_id, start_offset); let messages_file_path = Segment::get_messages_file_path(&path); let index_path = Segment::get_index_path(&path); @@ -499,7 +500,7 @@ mod tests { let topic_id = 2; let partition_id = 3; let start_offset = 0; - let config = Arc::new(SystemConfig { + let config = Rc::new(SystemConfig { segment: SegmentConfig { cache_indexes: CacheIndexesConfig::None, ..Default::default() diff --git a/core/server/src/streaming/session.rs b/core/server/src/streaming/session.rs index 1eae4eb0..3f412bce 100644 --- a/core/server/src/streaming/session.rs +++ b/core/server/src/streaming/session.rs @@ -19,13 +19,16 @@ use iggy_common::{AtomicUserId, UserId}; use std::fmt::Display; use std::net::SocketAddr; +use std::rc::Rc; use std::sync::atomic::{AtomicBool, Ordering}; // This might be extended with more fields in the future e.g. custom name, permissions etc. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Session { - user_id: AtomicUserId, - active: AtomicBool, + // TODO: Fixme, those don't have to be atomics anymore, simple integers will suffice + // Once the atomics are removed, we can impl Copy trait aswell. + user_id: Rc<AtomicUserId>, + active: Rc<AtomicBool>, pub client_id: u32, pub ip_address: SocketAddr, } @@ -34,8 +37,8 @@ impl Session { pub fn new(client_id: u32, user_id: UserId, ip_address: SocketAddr) -> Self { Self { client_id, - active: AtomicBool::new(true), - user_id: AtomicUserId::new(user_id), + active: Rc::new(AtomicBool::new(true)), + user_id: Rc::new(AtomicUserId::new(user_id)), ip_address, } } @@ -49,15 +52,15 @@ impl Session { } pub fn get_user_id(&self) -> UserId { - self.user_id.load(Ordering::Acquire) + self.user_id.load(Ordering::Relaxed) } pub fn set_user_id(&self, user_id: UserId) { - self.user_id.store(user_id, Ordering::Release) + self.user_id.store(user_id, Ordering::Relaxed) } pub fn set_stale(&self) { - self.active.store(false, Ordering::Release) + self.active.store(false, Ordering::Relaxed) } pub fn clear_user_id(&self) { @@ -65,7 +68,7 @@ impl Session { } pub fn is_active(&self) -> bool { - self.active.load(Ordering::Acquire) + self.active.load(Ordering::Relaxed) } pub fn is_authenticated(&self) -> bool { diff --git a/core/server/src/streaming/storage.rs b/core/server/src/streaming/storage.rs index f423b842..51e2565e 100644 --- a/core/server/src/streaming/storage.rs +++ b/core/server/src/streaming/storage.rs @@ -18,13 +18,13 @@ use super::persistence::persister::PersisterKind; use crate::configs::system::SystemConfig; +use crate::shard::system::info::SystemInfo; +use crate::shard::system::storage::FileSystemInfoStorage; use crate::state::system::{PartitionState, StreamState, TopicState}; use crate::streaming::partitions::partition::{ConsumerOffset, Partition}; use crate::streaming::partitions::storage::FilePartitionStorage; use crate::streaming::streams::storage::FileStreamStorage; use crate::streaming::streams::stream::Stream; -use crate::streaming::systems::info::SystemInfo; -use crate::streaming::systems::storage::FileSystemInfoStorage; use crate::streaming::topics::storage::FileTopicStorage; use crate::streaming::topics::topic::Topic; use iggy_common::ConsumerKind; @@ -85,61 +85,61 @@ pub enum PartitionStorageKind { } #[cfg_attr(test, automock)] -pub trait SystemInfoStorage: Send { - fn load(&self) -> impl Future<Output = Result<SystemInfo, IggyError>> + Send; - fn save(&self, system_info: &SystemInfo) -> impl Future<Output = Result<(), IggyError>> + Send; +pub trait SystemInfoStorage { + fn load(&self) -> impl Future<Output = Result<SystemInfo, IggyError>>; + fn save(&self, system_info: &SystemInfo) -> impl Future<Output = Result<(), IggyError>>; } #[cfg_attr(test, automock)] -pub trait StreamStorage: Send { +pub trait StreamStorage { fn load( &self, stream: &mut Stream, state: StreamState, - ) -> impl Future<Output = Result<(), IggyError>> + Send; - fn save(&self, stream: &Stream) -> impl Future<Output = Result<(), IggyError>> + Send; - fn delete(&self, stream: &Stream) -> impl Future<Output = Result<(), IggyError>> + Send; + ) -> impl Future<Output = Result<(), IggyError>>; + fn save(&self, stream: &Stream) -> impl Future<Output = Result<(), IggyError>>; + fn delete(&self, stream: &Stream) -> impl Future<Output = Result<(), IggyError>>; } #[cfg_attr(test, automock)] -pub trait TopicStorage: Send { +pub trait TopicStorage { fn load( &self, topic: &mut Topic, state: TopicState, - ) -> impl Future<Output = Result<(), IggyError>> + Send; - fn save(&self, topic: &Topic) -> impl Future<Output = Result<(), IggyError>> + Send; - fn delete(&self, topic: &Topic) -> impl Future<Output = Result<(), IggyError>> + Send; + ) -> impl Future<Output = Result<(), IggyError>>; + fn save(&self, topic: &Topic) -> impl Future<Output = Result<(), IggyError>>; + fn delete(&self, topic: &Topic) -> impl Future<Output = Result<(), IggyError>>; } #[cfg_attr(test, automock)] -pub trait PartitionStorage: Send { +pub trait PartitionStorage { fn load( &self, partition: &mut Partition, state: PartitionState, - ) -> impl Future<Output = Result<(), IggyError>> + Send; + ) -> impl Future<Output = Result<(), IggyError>>; fn save(&self, partition: &mut Partition) - -> impl Future<Output = Result<(), IggyError>> + Send; - fn delete(&self, partition: &Partition) -> impl Future<Output = Result<(), IggyError>> + Send; + -> impl Future<Output = Result<(), IggyError>>; + fn delete(&self, partition: &Partition) -> impl Future<Output = Result<(), IggyError>>; fn save_consumer_offset( &self, offset: u64, path: &str, - ) -> impl Future<Output = Result<(), IggyError>> + Send; + ) -> impl Future<Output = Result<(), IggyError>>; fn load_consumer_offsets( &self, kind: ConsumerKind, path: &str, - ) -> impl Future<Output = Result<Vec<ConsumerOffset>, IggyError>> + Send; + ) -> impl Future<Output = Result<Vec<ConsumerOffset>, IggyError>>; fn delete_consumer_offsets( &self, path: &str, - ) -> impl Future<Output = Result<(), IggyError>> + Send; + ) -> impl Future<Output = Result<(), IggyError>>; fn delete_consumer_offset( &self, path: &str, - ) -> impl Future<Output = Result<(), IggyError>> + Send; + ) -> impl Future<Output = Result<(), IggyError>>; } #[derive(Debug)] diff --git a/core/server/src/streaming/streams/storage.rs b/core/server/src/streaming/streams/storage.rs index 3e3e5e76..53639de9 100644 --- a/core/server/src/streaming/streams/storage.rs +++ b/core/server/src/streaming/streams/storage.rs @@ -156,7 +156,7 @@ impl StreamStorage for FileStreamStorage { for mut topic in unloaded_topics { let loaded_topics = loaded_topics.clone(); let topic_state = state.topics.remove(&topic.topic_id).unwrap(); - let load_topic = tokio::spawn(async move { + let load_topic = monoio::spawn(async move { match topic.load(topic_state).await { Ok(_) => loaded_topics.lock().await.push(topic), Err(error) => error!( diff --git a/core/server/src/streaming/streams/stream.rs b/core/server/src/streaming/streams/stream.rs index 339fc08a..03681b75 100644 --- a/core/server/src/streaming/streams/stream.rs +++ b/core/server/src/streaming/streams/stream.rs @@ -40,8 +40,8 @@ pub struct Stream { pub segments_count: Arc<AtomicU32>, pub(crate) topics: AHashMap<u32, Topic>, pub(crate) topics_ids: AHashMap<String, u32>, - pub(crate) config: Arc<SystemConfig>, - pub(crate) storage: Arc<SystemStorage>, + pub(crate) config: Rc<SystemConfig>, + pub(crate) storage: Rc<SystemStorage>, } impl Stream { @@ -106,18 +106,18 @@ mod tests { #[test] fn should_be_created_given_valid_parameters() { let tempdir = tempfile::TempDir::new().unwrap(); - let config = Arc::new(SystemConfig { + let config = Rc::new(SystemConfig { path: tempdir.path().to_str().unwrap().to_string(), ..Default::default() }); - let storage = Arc::new(SystemStorage::new( + let storage = Rc::new(SystemStorage::new( config.clone(), Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})), )); MemoryPool::init_pool(config.clone()); let id = 1; let name = "test"; - let config = Arc::new(SystemConfig::default()); + let config = Rc::new(SystemConfig::default()); let path = config.get_stream_path(id); let topics_path = config.get_topics_path(id); diff --git a/core/server/src/streaming/streams/topics.rs b/core/server/src/streaming/streams/topics.rs index f8c742be..b975ee83 100644 --- a/core/server/src/streaming/streams/topics.rs +++ b/core/server/src/streaming/streams/topics.rs @@ -146,8 +146,7 @@ impl Stream { topic.name = name.to_owned(); topic.message_expiry = message_expiry; topic.compression_algorithm = compression_algorithm; - for partition in topic.partitions.values_mut() { - let mut partition = partition.write().await; + for partition in topic.partitions.borrow_mut().values_mut() { partition.message_expiry = message_expiry; for segment in partition.segments.iter_mut() { segment.update_message_expiry(message_expiry); @@ -282,16 +281,16 @@ mod tests { }, }; use iggy_common::IggyByteSize; - use std::sync::Arc; + use std::{rc::Rc, sync::Arc}; #[tokio::test] async fn should_get_topic_by_id_and_name() { let tempdir = tempfile::TempDir::new().unwrap(); - let config = Arc::new(SystemConfig { + let config = Rc::new(SystemConfig { path: tempdir.path().to_str().unwrap().to_string(), ..Default::default() }); - let storage = Arc::new(SystemStorage::new( + let storage = Rc::new(SystemStorage::new( config.clone(), Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})), )); diff --git a/core/server/src/streaming/topics/consumer_group.rs b/core/server/src/streaming/topics/consumer_group.rs index dedb29c7..43d9611b 100644 --- a/core/server/src/streaming/topics/consumer_group.rs +++ b/core/server/src/streaming/topics/consumer_group.rs @@ -21,16 +21,16 @@ use iggy_common::IggyError; use tokio::sync::RwLock; use tracing::trace; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ConsumerGroup { pub topic_id: u32, pub group_id: u32, pub name: String, pub partitions_count: u32, - members: AHashMap<u32, RwLock<ConsumerGroupMember>>, + members: AHashMap<u32, ConsumerGroupMember>, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ConsumerGroupMember { pub id: u32, partitions: AHashMap<u32, u32>, @@ -49,8 +49,8 @@ impl ConsumerGroup { } } - pub fn get_members(&self) -> Vec<&RwLock<ConsumerGroupMember>> { - self.members.values().collect() + pub fn get_members(&self) -> Vec<ConsumerGroupMember> { + self.members.values().cloned().collect() } pub async fn reassign_partitions(&mut self, partitions_count: u32) { @@ -61,7 +61,7 @@ impl ConsumerGroup { pub async fn calculate_partition_id(&self, member_id: u32) -> Result<Option<u32>, IggyError> { let member = self.members.get(&member_id); if let Some(member) = member { - return Ok(member.write().await.calculate_partition_id()); + return Ok(member.await.calculate_partition_id()); } Err(IggyError::ConsumerGroupMemberNotFound( member_id, diff --git a/core/server/src/streaming/topics/consumer_groups.rs b/core/server/src/streaming/topics/consumer_groups.rs index dec2f163..26505355 100644 --- a/core/server/src/streaming/topics/consumer_groups.rs +++ b/core/server/src/streaming/topics/consumer_groups.rs @@ -91,7 +91,7 @@ impl Topic { self.get_consumer_group_by_id(*group_id.unwrap()) } - pub fn get_consumer_group_by_id(&self, id: u32) -> Result<&RwLock<ConsumerGroup>, IggyError> { + pub fn get_consumer_group_by_id(&self, id: u32) -> Result<ConsumerGroup, IggyError> { let consumer_group = self.consumer_groups.get(&id); if consumer_group.is_none() { return Err(IggyError::ConsumerGroupIdNotFound(id, self.topic_id)); @@ -104,7 +104,7 @@ impl Topic { &mut self, group_id: Option<u32>, name: &str, - ) -> Result<&RwLock<ConsumerGroup>, IggyError> { + ) -> Result<ConsumerGroup, IggyError> { if self.consumer_groups_ids.contains_key(name) { return Err(IggyError::ConsumerGroupNameAlreadyExists( name.to_owned(), @@ -138,20 +138,20 @@ impl Topic { } let consumer_group = - ConsumerGroup::new(self.topic_id, id, name, self.partitions.len() as u32); - self.consumer_groups.insert(id, RwLock::new(consumer_group)); + ConsumerGroup::new(self.topic_id, id, name, self.partitions.borrow().len() as u32); + self.consumer_groups.insert(id, consumer_group.clone()); self.consumer_groups_ids.insert(name.to_owned(), id); info!( "Created consumer group with ID: {} for topic with ID: {} and stream with ID: {}.", id, self.topic_id, self.stream_id ); - self.get_consumer_group_by_id(id) + consumer_group } pub async fn delete_consumer_group( &mut self, id: &Identifier, - ) -> Result<RwLock<ConsumerGroup>, IggyError> { + ) -> Result<ConsumerGroup, IggyError> { let group_id; { let consumer_group = self.get_consumer_group(id).with_error_context(|error| { @@ -167,7 +167,6 @@ impl Topic { } let consumer_group = consumer_group.unwrap(); { - let consumer_group = consumer_group.read().await; let group_id = consumer_group.group_id; self.consumer_groups_ids.remove(&consumer_group.name); let current_group_id = self.current_consumer_group_id.load(Ordering::SeqCst); @@ -176,8 +175,7 @@ impl Topic { .store(group_id, Ordering::SeqCst); } - for (_, partition) in self.partitions.iter() { - let partition = partition.read().await; + for (_, partition) in self.partitions.borrow().iter() { if let Some((_, offset)) = partition.consumer_group_offsets.remove(&group_id) { self.storage .partition diff --git a/core/server/src/streaming/topics/topic.rs b/core/server/src/streaming/topics/topic.rs index b0913856..ad3e25ef 100644 --- a/core/server/src/streaming/topics/topic.rs +++ b/core/server/src/streaming/topics/topic.rs @@ -23,6 +23,8 @@ use crate::streaming::storage::SystemStorage; use crate::streaming::topics::consumer_group::ConsumerGroup; use ahash::AHashMap; use core::fmt; +use std::cell::RefCell; +use std::rc::Rc; use iggy_common::locking::IggySharedMut; use iggy_common::{ CompressionAlgorithm, Consumer, ConsumerKind, IggyByteSize, IggyError, IggyExpiry, @@ -48,10 +50,10 @@ pub struct Topic { pub(crate) messages_count_of_parent_stream: Arc<AtomicU64>, pub(crate) messages_count: Arc<AtomicU64>, pub(crate) segments_count_of_parent_stream: Arc<AtomicU32>, - pub(crate) config: Arc<SystemConfig>, - pub(crate) partitions: AHashMap<u32, IggySharedMut<Partition>>, - pub(crate) storage: Arc<SystemStorage>, - pub(crate) consumer_groups: AHashMap<u32, RwLock<ConsumerGroup>>, + pub(crate) config: Rc<SystemConfig>, + pub(crate) partitions: RefCell<AHashMap<u32, Partition>>, + pub(crate) storage: Rc<SystemStorage>, + pub(crate) consumer_groups: AHashMap<u32, ConsumerGroup>, pub(crate) consumer_groups_ids: AHashMap<String, u32>, pub(crate) current_consumer_group_id: AtomicU32, pub(crate) current_partition_id: AtomicU32, @@ -71,8 +73,8 @@ impl Topic { size_of_parent_stream: Arc<AtomicU64>, messages_count_of_parent_stream: Arc<AtomicU64>, segments_count_of_parent_stream: Arc<AtomicU32>, - config: Arc<SystemConfig>, - storage: Arc<SystemStorage>, + config: Rc<SystemConfig>, + storage: Rc<SystemStorage>, ) -> Topic { Topic::create( stream_id, @@ -99,8 +101,8 @@ impl Topic { topic_id: u32, name: &str, partitions_count: u32, - config: Arc<SystemConfig>, - storage: Arc<SystemStorage>, + config: Rc<SystemConfig>, + storage: Rc<SystemStorage>, size_of_parent_stream: Arc<AtomicU64>, messages_count_of_parent_stream: Arc<AtomicU64>, segments_count_of_parent_stream: Arc<AtomicU32>, diff --git a/core/server/src/streaming/utils/memory_pool.rs b/core/server/src/streaming/utils/memory_pool.rs index 61962f0a..e05b7359 100644 --- a/core/server/src/streaming/utils/memory_pool.rs +++ b/core/server/src/streaming/utils/memory_pool.rs @@ -21,6 +21,7 @@ use bytes::BytesMut; use crossbeam::queue::ArrayQueue; use human_repr::HumanCount; use once_cell::sync::OnceCell; +use std::rc::Rc; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use tracing::{info, trace, warn}; @@ -153,7 +154,7 @@ impl MemoryPool { } /// Initialize the global pool from the given config. - pub fn init_pool(config: Arc<SystemConfig>) { + pub fn init_pool(config: Rc<SystemConfig>) { let is_enabled = config.memory_pool.enabled; let memory_limit = config.memory_pool.size.as_bytes_usize(); let bucket_capacity = config.memory_pool.bucket_capacity as usize; @@ -467,7 +468,7 @@ mod tests { fn initialize_pool_for_tests() { TEST_INIT.call_once(|| { - let config = Arc::new(SystemConfig { + let config = Rc::new(SystemConfig { memory_pool: MemoryPoolConfig { enabled: true, size: IggyByteSize::from_str("4GiB").unwrap(), diff --git a/core/server/src/tcp/connection_handler.rs b/core/server/src/tcp/connection_handler.rs index ec61b23b..9c92d27d 100644 --- a/core/server/src/tcp/connection_handler.rs +++ b/core/server/src/tcp/connection_handler.rs @@ -19,27 +19,28 @@ use crate::binary::command::ServerCommandHandler; use crate::binary::{command, sender::SenderKind}; use crate::server_error::ConnectionError; +use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use crate::tcp::connection_handler::command::ServerCommand; +use bytes::BytesMut; use iggy_common::IggyError; use std::io::ErrorKind; -use std::sync::Arc; +use std::rc::Rc; use tracing::{debug, error, info}; const INITIAL_BYTES_LENGTH: usize = 4; pub(crate) async fn handle_connection( - session: Arc<Session>, + session: &Rc<Session>, sender: &mut SenderKind, - system: SharedSystem, + shard: &Rc<IggyShard>, ) -> Result<(), ConnectionError> { - let mut length_buffer = [0u8; INITIAL_BYTES_LENGTH]; - let mut code_buffer = [0u8; INITIAL_BYTES_LENGTH]; + let length_buffer = BytesMut::with_capacity(INITIAL_BYTES_LENGTH); + let code_buffer = BytesMut::with_capacity(INITIAL_BYTES_LENGTH); loop { - let read_length = match sender.read(&mut length_buffer).await { - Ok(read_length) => read_length, - Err(error) => { + let (read_length, initial_buffer) = match sender.read(length_buffer.clone()).await { + (Ok(read_length), initial_buffer) => (read_length, initial_buffer), + (Err(error), _) => { if error.as_code() == IggyError::ConnectionClosed.as_code() { return Err(ConnectionError::from(error)); } else { @@ -56,13 +57,18 @@ pub(crate) async fn handle_connection( continue; } - let length = u32::from_le_bytes(length_buffer); - sender.read(&mut code_buffer).await?; - let code = u32::from_le_bytes(code_buffer); + let initial_buffer = initial_buffer.freeze(); + let length = + u32::from_le_bytes(initial_buffer[0..INITIAL_BYTES_LENGTH].try_into().unwrap()); + let (res, code_buffer) = sender.read(code_buffer.clone()).await; + let _ = res?; + let code_buffer = code_buffer.freeze(); + let code: u32 = + u32::from_le_bytes(code_buffer[0..INITIAL_BYTES_LENGTH].try_into().unwrap()); debug!("Received a TCP request, length: {length}, code: {code}"); let command = ServerCommand::from_code_and_reader(code, sender, length - 4).await?; debug!("Received a TCP command: {command}, payload size: {length}"); - match command.handle(sender, length, &session, &system).await { + match command.handle(sender, length, session, shard).await { Ok(_) => { debug!( "Command was handled successfully, session: {session}. TCP response was sent." diff --git a/core/server/src/tcp/sender.rs b/core/server/src/tcp/sender.rs index 4dd11725..6a10adb2 100644 --- a/core/server/src/tcp/sender.rs +++ b/core/server/src/tcp/sender.rs @@ -16,25 +16,33 @@ * under the License. */ +use bytes::BytesMut; use iggy_common::IggyError; -use monoio::io::{AsyncReadRent, AsyncReadRentExt, AsyncWriteRent, AsyncWriteRentExt}; +use monoio::{ + buf::IoBufMut, + io::{AsyncReadRent, AsyncReadRentExt, AsyncWriteRent, AsyncWriteRentExt}, +}; +use nix::libc; use std::io::IoSlice; use tracing::debug; const STATUS_OK: &[u8] = &[0; 4]; -pub(crate) async fn read<T>(stream: &mut T, buffer: &mut [u8]) -> Result<usize, IggyError> +pub(crate) async fn read<T>( + stream: &mut T, + buffer: BytesMut, +) -> (Result<usize, IggyError>, BytesMut) where T: AsyncReadRent + AsyncWriteRent + Unpin, { match stream.read_exact(buffer).await { - Ok(0) => Err(IggyError::ConnectionClosed), - Ok(read_bytes) => Ok(read_bytes), - Err(error) => { + (Ok(0), buffer) => (Err(IggyError::ConnectionClosed), buffer), + (Ok(read_bytes), buffer) => (Ok(read_bytes), buffer), + (Err(error), buffer) => { if error.kind() == std::io::ErrorKind::UnexpectedEof { - Err(IggyError::ConnectionClosed) + (Err(IggyError::ConnectionClosed), buffer) } else { - Err(IggyError::TcpError) + (Err(IggyError::TcpError), buffer) } } } @@ -57,7 +65,7 @@ where pub(crate) async fn send_ok_response_vectored<T>( stream: &mut T, length: &[u8], - slices: Vec<IoSlice<'_>>, + slices: Vec<libc::iovec>, ) -> Result<(), IggyError> where T: AsyncReadRentExt + AsyncWriteRentExt + Unpin, @@ -90,8 +98,9 @@ where ); let length = (payload.len() as u32).to_le_bytes(); stream - .write_all(&[status, &length, payload].as_slice().concat()) + .write_all([status, &length, payload].concat()) .await + .0 .map_err(|_| IggyError::TcpError)?; debug!("Sent response with status: {:?}", status); Ok(()) @@ -101,7 +110,7 @@ pub(crate) async fn send_response_vectored<T>( stream: &mut T, status: &[u8], length: &[u8], - mut slices: Vec<IoSlice<'_>>, + mut slices: Vec<libc::iovec>, ) -> Result<(), IggyError> where T: AsyncReadRentExt + AsyncWriteRentExt + Unpin, @@ -111,16 +120,22 @@ where slices.len(), status ); - let prefix = [IoSlice::new(status), IoSlice::new(length)]; + let prefix = [ + libc::iovec { + iov_base: length.as_ptr() as _, + iov_len: length.len(), + }, + libc::iovec { + iov_base: status.as_ptr() as _, + iov_len: status.len(), + }, + ]; slices.splice(0..0, prefix); - let mut slice_refs = slices.as_mut_slice(); - while !slice_refs.is_empty() { - let bytes_written = stream - .write_vectored(slice_refs) - .await - .map_err(|_| IggyError::TcpError)?; - IoSlice::advance_slices(&mut slice_refs, bytes_written); - } + stream + .write_vectored_all(slices) + .await + .0 + .map_err(|_| IggyError::TcpError)?; debug!("Sent response with status: {:?}", status); Ok(()) } diff --git a/core/server/src/tcp/tcp_listener.rs b/core/server/src/tcp/tcp_listener.rs index 2e4f25be..ffa01eb1 100644 --- a/core/server/src/tcp/tcp_listener.rs +++ b/core/server/src/tcp/tcp_listener.rs @@ -18,39 +18,50 @@ use crate::binary::sender::SenderKind; use crate::shard::IggyShard; +use crate::shard::transmission::message::ShardEvent; use crate::streaming::clients::client_manager::Transport; use crate::tcp::connection_handler::{handle_connection, handle_error}; +use crate::tcp::tcp_socket; use std::net::SocketAddr; use std::rc::Rc; -use monoio::net::TcpListener; -use rustls::pki_types::Ipv4Addr; -use tokio::net::TcpSocket; -use tokio::sync::oneshot; use tracing::{error, info}; -pub async fn start(server_name: &str, shard: Rc<IggyShard>) { - let addr: SocketAddr = if shard.config.tcp.ipv6 { - shard.config.tcp.address.parse().expect("Unable to parse IPv6 address") - } else { - shard.config.tcp.address.parse().expect("Unable to parse IPv4 address") - }; +pub async fn start(server_name: &'static str, shard: Rc<IggyShard>) { + let ip_v6 = shard.config.tcp.ipv6; + let socket_config = &shard.config.tcp.socket; + let addr: SocketAddr = shard + .config + .tcp + .address + .parse() + .expect("Failed to parse TCP address"); + + let socket = tcp_socket::build(ip_v6, socket_config); monoio::spawn(async move { - let listener = TcpListener::bind(addr).expect(format!("Unable to start {server_name}.").as_ref()); + socket + .bind(&addr.into()) + .expect("Failed to bind eTCP listener"); + socket.listen(1024); + let listener: std::net::TcpListener = socket.into(); + let listener = monoio::net::TcpListener::from_std(listener).unwrap(); + info!("{server_name} server has started on: {:?}", addr); loop { match listener.accept().await { Ok((stream, address)) => { let shard = shard.clone(); info!("Accepted new TCP connection: {address}"); - let session = shard - .add_client(&address, Transport::Tcp); + let session = shard.add_client(&address, Transport::Tcp); + //TODO: Those can be shared with other shards. + shard.add_active_session(session.clone()); + // Broadcast session to all shards. + let event = Rc::new(ShardEvent::NewSession(session.clone())); + shard.broadcast_event_to_all_shards(session.client_id, event); - let client_id = session.client_id; + let _client_id = session.client_id; info!("Created new session: {session}"); let mut sender = SenderKind::get_tcp_sender(stream); monoio::spawn(async move { - if let Err(error) = - handle_connection(session, &mut sender, shard.clone()).await - { + if let Err(error) = handle_connection(&session, &mut sender, &shard).await { handle_error(error); //TODO: Fixme /* diff --git a/core/server/src/tcp/tcp_sender.rs b/core/server/src/tcp/tcp_sender.rs index a157dead..81089c85 100644 --- a/core/server/src/tcp/tcp_sender.rs +++ b/core/server/src/tcp/tcp_sender.rs @@ -19,10 +19,13 @@ use crate::binary::sender::Sender; use crate::tcp::COMPONENT; use crate::{server_error::ServerError, tcp::sender}; +use bytes::BytesMut; use error_set::ErrContext; use iggy_common::IggyError; -use tokio::{io::AsyncWriteExt}; +use monoio::buf::IoBufMut; +use monoio::io::AsyncWriteRent; use monoio::net::TcpStream; +use nix::libc; #[derive(Debug)] pub struct TcpSender { @@ -30,7 +33,7 @@ pub struct TcpSender { } impl Sender for TcpSender { - async fn read(&mut self, buffer: &mut [u8]) -> Result<usize, IggyError> { + async fn read(&mut self, buffer: BytesMut) -> (Result<usize, IggyError>, BytesMut) { sender::read(&mut self.stream, buffer).await } @@ -59,7 +62,7 @@ impl Sender for TcpSender { async fn send_ok_response_vectored( &mut self, length: &[u8], - slices: Vec<std::io::IoSlice<'_>>, + slices: Vec<libc::iovec>, ) -> Result<(), IggyError> { sender::send_ok_response_vectored(&mut self.stream, length, slices).await } diff --git a/core/server/src/tcp/tcp_server.rs b/core/server/src/tcp/tcp_server.rs index 82bbe0c1..9b8e1c04 100644 --- a/core/server/src/tcp/tcp_server.rs +++ b/core/server/src/tcp/tcp_server.rs @@ -17,9 +17,9 @@ */ use crate::shard::IggyShard; -use crate::tcp::{tcp_listener}; -use std::rc::Rc; +use crate::tcp::tcp_listener; use iggy_common::IggyError; +use std::rc::Rc; use tracing::info; /// Starts the TCP server. diff --git a/core/server/src/tcp/tcp_socket.rs b/core/server/src/tcp/tcp_socket.rs index 69c1d408..747540fa 100644 --- a/core/server/src/tcp/tcp_socket.rs +++ b/core/server/src/tcp/tcp_socket.rs @@ -16,16 +16,18 @@ * under the License. */ +use socket2::{Domain, Protocol, Socket, Type}; use std::num::TryFromIntError; - use crate::configs::tcp::TcpSocketConfig; -pub fn build(ipv6: bool, config: TcpSocketConfig) -> TcpSocket { +pub fn build(ipv6: bool, config: &TcpSocketConfig) -> Socket { let socket = if ipv6 { - TcpSocket::new_v6().expect("Unable to create an ipv6 socket") + Socket::new(Domain::IPV6, Type::STREAM, Some(Protocol::TCP)) + .expect("Unable to create an ipv6 socket") } else { - TcpSocket::new_v4().expect("Unable to create an ipv4 socket") + Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP)) + .expect("Unable to create an ipv4 socket") }; if config.override_defaults { @@ -34,7 +36,7 @@ pub fn build(ipv6: bool, config: TcpSocketConfig) -> TcpSocket { .as_bytes_u64() .try_into() .map_err(|e: TryFromIntError| std::io::Error::other(e.to_string())) - .and_then(|size: u32| socket.set_recv_buffer_size(size)) + .and_then(|size| socket.set_recv_buffer_size(size)) .expect("Unable to set SO_RCVBUF on socket"); config .send_buffer_size @@ -52,6 +54,12 @@ pub fn build(ipv6: bool, config: TcpSocketConfig) -> TcpSocket { socket .set_linger(Some(config.linger.get_duration())) .expect("Unable to set SO_LINGER on socket"); + socket + .set_reuse_address(true) + .expect("Unable to set SO_REUSEADDR on socket"); + socket + .set_reuse_port(true) + .expect("Unable to set SO_REUSEPORT on socket"); } socket @@ -77,9 +85,9 @@ mod tests { nodelay: true, linger: IggyDuration::new(linger_dur), }; - let socket = build(false, config); - assert!(socket.recv_buffer_size().unwrap() >= buffer_size as u32); - assert!(socket.send_buffer_size().unwrap() >= buffer_size as u32); + let socket = build(false, &config); + assert!(socket.recv_buffer_size().unwrap() >= buffer_size as usize); + assert!(socket.send_buffer_size().unwrap() >= buffer_size as usize); assert!(socket.keepalive().unwrap()); assert!(socket.nodelay().unwrap()); assert_eq!(socket.linger().unwrap(), Some(linger_dur)); diff --git a/core/server/src/tcp/tcp_tls_listener.rs b/core/server/src/tcp/tcp_tls_listener.rs index d2bbf8da..53df4b0b 100644 --- a/core/server/src/tcp/tcp_tls_listener.rs +++ b/core/server/src/tcp/tcp_tls_listener.rs @@ -18,26 +18,30 @@ use crate::binary::sender::SenderKind; use crate::configs::tcp::TcpTlsConfig; +use crate::shard::IggyShard; use crate::streaming::clients::client_manager::Transport; -use crate::streaming::systems::system::SharedSystem; use crate::tcp::connection_handler::{handle_connection, handle_error}; +use monoio_native_tls::TlsAcceptor; use rustls::ServerConfig; use rustls::pki_types::{CertificateDer, PrivateKeyDer}; use rustls_pemfile::{certs, private_key}; use std::io::BufReader; use std::net::SocketAddr; +use std::rc::Rc; use std::sync::Arc; use tokio::net::TcpSocket; use tokio::sync::oneshot; -use tokio_rustls::{TlsAcceptor, rustls}; use tracing::{error, info}; pub(crate) async fn start( address: &str, config: TcpTlsConfig, socket: TcpSocket, - system: SharedSystem, + system: Rc<IggyShard>, ) -> SocketAddr { + //TODO: Fixme + todo!(); + /* let address = address.to_string(); let (tx, rx) = oneshot::channel(); tokio::spawn(async move { @@ -131,6 +135,7 @@ pub(crate) async fn start( Ok(addr) => addr, Err(_) => panic!("Failed to get the local address for TCP TLS listener."), } + */ } fn generate_self_signed_cert() diff --git a/core/server/src/tcp/tcp_tls_sender.rs b/core/server/src/tcp/tcp_tls_sender.rs index 6ba62315..7213ad64 100644 --- a/core/server/src/tcp/tcp_tls_sender.rs +++ b/core/server/src/tcp/tcp_tls_sender.rs @@ -19,10 +19,13 @@ use crate::binary::sender::Sender; use crate::tcp::COMPONENT; use crate::{server_error::ServerError, tcp::sender}; +use bytes::BytesMut; use error_set::ErrContext; use iggy_common::IggyError; +use monoio::io::AsyncWriteRent; use monoio::net::TcpStream; use monoio_native_tls::TlsStream; +use nix::libc; #[derive(Debug)] pub struct TcpTlsSender { @@ -30,7 +33,7 @@ pub struct TcpTlsSender { } impl Sender for TcpTlsSender { - async fn read(&mut self, buffer: &mut [u8]) -> Result<usize, IggyError> { + async fn read(&mut self, buffer: BytesMut) -> (Result<usize, IggyError>, BytesMut) { sender::read(&mut self.stream, buffer).await } @@ -59,7 +62,7 @@ impl Sender for TcpTlsSender { async fn send_ok_response_vectored( &mut self, length: &[u8], - slices: Vec<std::io::IoSlice<'_>>, + slices: Vec<libc::iovec>, ) -> Result<(), IggyError> { sender::send_ok_response_vectored(&mut self.stream, length, slices).await }
