This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch rebase_master in repository https://gitbox.apache.org/repos/asf/iggy.git
commit e1e31ea824212b57e780723209eaf0f80dfbf5f5 Author: Grzegorz Koszyk <[email protected]> AuthorDate: Tue Jul 1 11:14:39 2025 +0200 feat(io_uring): fix tcp tls server (#1952) Closes #1936 --- Cargo.lock | 56 ++---------- core/server/Cargo.toml | 2 +- core/server/src/binary/sender.rs | 5 +- core/server/src/tcp/tcp_listener.rs | 14 +-- core/server/src/tcp/tcp_server.rs | 16 +++- core/server/src/tcp/tcp_tls_listener.rs | 153 +++++++++++++++++--------------- core/server/src/tcp/tcp_tls_sender.rs | 5 +- 7 files changed, 110 insertions(+), 141 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6898bb77..ca282813 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1686,16 +1686,6 @@ dependencies = [ "version_check", ] -[[package]] -name = "core-foundation" -version = "0.9.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" -dependencies = [ - "core-foundation-sys", - "libc", -] - [[package]] name = "core-foundation" version = "0.10.1" @@ -4921,15 +4911,15 @@ dependencies = [ ] [[package]] -name = "monoio-native-tls" +name = "monoio-rustls" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9022f5aaa19f9688f97bfcfa0c4a4318d424851995badb356674ca742652cdb" +checksum = "6e31f422825bd7fb19957af6eaf89d7234ba143fcc0e515f5a2f526e332d1875" dependencies = [ "bytes", "monoio", "monoio-io-wrapper", - "native-tls", + "rustls", "thiserror 1.0.69", ] @@ -4942,23 +4932,6 @@ dependencies = [ "getrandom 0.2.16", ] -[[package]] -name = "native-tls" -version = "0.2.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87de3442987e9dbec73158d5c715e7ad9072fda936bb03d19d7fa10e00520f0e" -dependencies = [ - "libc", - "log", - "openssl", - "openssl-probe", - "openssl-sys", - "schannel", - "security-framework 2.11.1", - "security-framework-sys", - "tempfile", -] - [[package]] name = "never-say-never" version = "6.6.666" @@ -6631,7 +6604,7 @@ dependencies = [ "openssl-probe", "rustls-pki-types", "schannel", - "security-framework 3.2.0", + "security-framework", ] [[package]] @@ -6659,7 +6632,7 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19787cda76408ec5404443dc8b31795c87cd8fec49762dc75fa727740d34acc1" dependencies = [ - "core-foundation 0.10.1", + "core-foundation", "core-foundation-sys", "jni", "log", @@ -6668,7 +6641,7 @@ dependencies = [ "rustls-native-certs", "rustls-platform-verifier-android", "rustls-webpki", - "security-framework 3.2.0", + "security-framework", "security-framework-sys", "webpki-root-certs 0.26.11", "windows-sys 0.59.0", @@ -6816,19 +6789,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "security-framework" -version = "2.11.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" -dependencies = [ - "bitflags 2.9.1", - "core-foundation 0.9.4", - "core-foundation-sys", - "libc", - "security-framework-sys", -] - [[package]] name = "security-framework" version = "3.2.0" @@ -6836,7 +6796,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "271720403f46ca04f7ba6f55d438f8bd878d6b8ca0a1046e8228c4145bcbb316" dependencies = [ "bitflags 2.9.1", - "core-foundation 0.10.1", + "core-foundation", "core-foundation-sys", "libc", "security-framework-sys", @@ -7054,7 +7014,7 @@ dependencies = [ "mockall", "moka", "monoio", - "monoio-native-tls", + "monoio-rustls", "nix 0.30.1", "once_cell", "opentelemetry", diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index 3e6c801e..4b330383 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -68,7 +68,7 @@ hash32 = "1.0.0" mimalloc = { workspace = true, optional = true } moka = { version = "0.12.10", features = ["future"] } monoio = { version = "0.2.4", features = ["mkdirat", "unlinkat", "renameat", "sync"] } -monoio-native-tls = "0.4.0" +monoio-rustls = "0.4.0" nix = { version = "0.30", features = ["fs"] } once_cell = "1.21.3" opentelemetry = { version = "0.30.0", features = ["trace", "logs"] } diff --git a/core/server/src/binary/sender.rs b/core/server/src/binary/sender.rs index adbf05ee..45066c75 100644 --- a/core/server/src/binary/sender.rs +++ b/core/server/src/binary/sender.rs @@ -25,10 +25,11 @@ use crate::{quic::quic_sender::QuicSender, server_error::ServerError}; use bytes::BytesMut; use iggy_common::IggyError; use monoio::buf::IoBufMut; +use monoio::io::{AsyncReadRent, AsyncWriteRent}; use monoio::net::TcpStream; -use monoio_native_tls::TlsStream; use nix::libc; use quinn::{RecvStream, SendStream}; +use monoio_rustls::{ServerTlsStream, TlsStream}; macro_rules! forward_async_methods { ( @@ -84,7 +85,7 @@ impl SenderKind { Self::Tcp(TcpSender { stream }) } - pub fn get_tcp_tls_sender(stream: TlsStream<TcpStream>) -> Self { + pub fn get_tcp_tls_sender(stream: ServerTlsStream<TcpStream>) -> Self { Self::TcpTls(TcpTlsSender { stream }) } diff --git a/core/server/src/tcp/tcp_listener.rs b/core/server/src/tcp/tcp_listener.rs index 90b1eabb..4f528016 100644 --- a/core/server/src/tcp/tcp_listener.rs +++ b/core/server/src/tcp/tcp_listener.rs @@ -21,25 +21,15 @@ use crate::shard::IggyShard; use crate::shard::transmission::event::ShardEvent; use crate::streaming::clients::client_manager::Transport; use crate::tcp::connection_handler::{handle_connection, handle_error}; -use crate::tcp::tcp_socket; use futures::FutureExt; use iggy_common::IggyError; +use socket2::Socket; use std::net::SocketAddr; use std::rc::Rc; use std::time::Duration; use tracing::{error, info}; -pub async fn start(server_name: &'static str, shard: Rc<IggyShard>) -> Result<(), IggyError> { - let ip_v6 = shard.config.tcp.ipv6; - let socket_config = &shard.config.tcp.socket; - let addr: SocketAddr = shard - .config - .tcp - .address - .parse() - .expect("Failed to parse TCP address"); - - let socket = tcp_socket::build(ip_v6, socket_config); +pub async fn start(server_name: &'static str, addr: SocketAddr, socket: Socket, shard: Rc<IggyShard>) -> Result<(), IggyError> { socket .bind(&addr.into()) .expect("Failed to bind TCP listener"); diff --git a/core/server/src/tcp/tcp_server.rs b/core/server/src/tcp/tcp_server.rs index e558d08f..ac019bb0 100644 --- a/core/server/src/tcp/tcp_server.rs +++ b/core/server/src/tcp/tcp_server.rs @@ -17,8 +17,9 @@ */ use crate::shard::IggyShard; -use crate::tcp::tcp_listener; +use crate::tcp::{tcp_listener, tcp_socket, tcp_tls_listener}; use iggy_common::IggyError; +use std::net::SocketAddr; use std::rc::Rc; use tracing::info; @@ -30,11 +31,20 @@ pub async fn spawn_tcp_server(shard: Rc<IggyShard>) -> Result<(), IggyError> { } else { "Iggy TCP" }; + let ip_v6 = shard.config.tcp.ipv6; + let socket_config = &shard.config.tcp.socket; + let addr: SocketAddr = shard + .config + .tcp + .address + .parse() + .expect("Failed to parse TCP address"); + let socket = tcp_socket::build(ip_v6, socket_config); info!("Initializing {server_name} server..."); // TODO: Fixme -- storing addr of the server inside of the config for integration tests... match shard.config.tcp.tls.enabled { - true => unimplemented!("TLS support is not implemented yet"), - false => tcp_listener::start(server_name, shard.clone()).await?, + true => tcp_tls_listener::start(server_name, addr, socket, shard.clone()).await?, + false => tcp_listener::start(server_name, addr, socket, shard.clone()).await?, }; Ok(()) diff --git a/core/server/src/tcp/tcp_tls_listener.rs b/core/server/src/tcp/tcp_tls_listener.rs index 53df4b0b..2c81eb60 100644 --- a/core/server/src/tcp/tcp_tls_listener.rs +++ b/core/server/src/tcp/tcp_tls_listener.rs @@ -19,33 +19,31 @@ use crate::binary::sender::SenderKind; use crate::configs::tcp::TcpTlsConfig; use crate::shard::IggyShard; +use crate::shard::transmission::event::ShardEvent; use crate::streaming::clients::client_manager::Transport; use crate::tcp::connection_handler::{handle_connection, handle_error}; -use monoio_native_tls::TlsAcceptor; +use futures::FutureExt; +use iggy_common::IggyError; +use monoio_rustls::TlsAcceptor; use rustls::ServerConfig; use rustls::pki_types::{CertificateDer, PrivateKeyDer}; use rustls_pemfile::{certs, private_key}; +use socket2::Socket; use std::io::BufReader; use std::net::SocketAddr; use std::rc::Rc; use std::sync::Arc; -use tokio::net::TcpSocket; -use tokio::sync::oneshot; +use std::time::Duration; use tracing::{error, info}; pub(crate) async fn start( - address: &str, - config: TcpTlsConfig, - socket: TcpSocket, - system: Rc<IggyShard>, -) -> SocketAddr { - //TODO: Fixme - todo!(); - /* - let address = address.to_string(); - let (tx, rx) = oneshot::channel(); - tokio::spawn(async move { + server_name: &'static str, + addr: SocketAddr, + socket: Socket, + shard: Rc<IggyShard>, +) -> Result<(), IggyError> { let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + let config = &shard.config.tcp.tls; let (certs, key) = if config.self_signed && !std::path::Path::new(&config.cert_file).exists() { @@ -64,78 +62,87 @@ pub(crate) async fn start( let acceptor = TlsAcceptor::from(Arc::new(server_config)); - let addr = address.parse(); - if addr.is_err() { - panic!("Unable to parse address {address:?}"); - } - - let addr = addr.unwrap(); socket - .bind(addr) + .bind(&addr.into()) .unwrap_or_else(|e| panic!("Unable to bind socket to address '{addr}': {e}",)); - let listener = socket - .listen(1024) - .unwrap_or_else(|e| panic!("Unable to start TCP TLS server on '{address}': {e}",)); - - let local_addr = listener - .local_addr() - .unwrap_or_else(|e| panic!("Failed to get local address for TCP TLS listener: {e}",)); - - tx.send(local_addr).unwrap_or_else(|_| { - panic!("Failed to send the local address '{local_addr}' for TCP TLS listener") - }); + let listener: std::net::TcpListener = socket.into(); + let listener = monoio::net::TcpListener::from_std(listener).unwrap(); + info!("{server_name} server has started on: {:?}", addr); loop { - match listener.accept().await { - Ok((stream, address)) => { - info!("Accepted new TCP TLS connection: {}", address); - let session = system - .read() - .await - .add_client(&address, Transport::Tcp) - .await; - - let client_id = session.client_id; - let acceptor = acceptor.clone(); - let system_clone = system.clone(); - match acceptor.accept(stream).await { - Ok(stream) => { - let mut sender = SenderKind::get_tcp_tls_sender(stream); - tokio::spawn(async move { - if let Err(error) = - handle_connection(session, &mut sender, system_clone.clone()) - .await - { - handle_error(error); - system_clone.read().await.delete_client(client_id).await; - if let Err(error) = sender.shutdown().await { - error!( - "Failed to shutdown TCP stream for client: {client_id}, address: {address}. {error}" - ); - } else { - info!( - "Successfully closed TCP stream for client: {client_id}, address: {address}." - ); + let shutdown_check = async { + loop { + if shard.is_shutting_down() { + return; + } + monoio::time::sleep(Duration::from_millis(100)).await; + } + }; + + let accept_future = listener.accept(); + futures::select! { + _ = shutdown_check.fuse() => { + info!("TCP TLS server detected shutdown flag, no longer accepting connections"); + break; + } + result = accept_future.fuse() => { + match result { + Ok((stream, address)) => { + if shard.is_shutting_down() { + info!("Rejecting new TLS connection from {} during shutdown", address); + continue; + } + let shard_clone = shard.clone(); + info!("Accepted new TCP TLS connection: {}", address); + let transport = Transport::Tcp; + let session = shard_clone.add_client(&address, transport); + //TODO: Those can be shared with other shards. + shard_clone.add_active_session(session.clone()); + // Broadcast session to all shards. + let event = ShardEvent::NewSession { address, transport }; + // TODO: Fixme look inside of broadcast_event_to_all_shards method. + let _responses = shard_clone.broadcast_event_to_all_shards(event.into()); + + let client_id = session.client_id; + info!("Created new session: {session}"); + let acceptor = acceptor.clone(); + + let conn_stop_receiver = shard_clone.task_registry.add_connection(client_id); + + let shard_for_conn = shard_clone.clone(); + shard_clone.task_registry.spawn_tracked(async move { + match acceptor.accept(stream).await { + Ok(tls_stream) => { + let mut sender = SenderKind::get_tcp_tls_sender(tls_stream.into()); + if let Err(error) = handle_connection(&session, &mut sender, &shard_for_conn, conn_stop_receiver).await { + handle_error(error); + } + shard_for_conn.task_registry.remove_connection(&client_id); + + if let Err(error) = sender.shutdown().await { + error!( + "Failed to shutdown TCP TLS stream for client: {client_id}, address: {address}. {error}" + ); + } else { + info!( + "Successfully closed TCP TLS stream for client: {client_id}, address: {address}." + ); + } + } + Err(e) => { + error!("Failed to accept TLS connection from '{address}': {e}"); + shard_for_conn.task_registry.remove_connection(&client_id); } } }); } - Err(e) => { - error!("Failed to accept TLS connection from '{address}': {e}"); - system_clone.read().await.delete_client(client_id).await; - } + Err(error) => error!("Unable to accept TCP TLS socket. {error}"), } } - Err(error) => error!("Unable to accept TCP TLS socket. {error}"), } } - }); - match rx.await { - Ok(addr) => addr, - Err(_) => panic!("Failed to get the local address for TCP TLS listener."), - } - */ + Ok(()) } fn generate_self_signed_cert() diff --git a/core/server/src/tcp/tcp_tls_sender.rs b/core/server/src/tcp/tcp_tls_sender.rs index 167427c1..b214e5a6 100644 --- a/core/server/src/tcp/tcp_tls_sender.rs +++ b/core/server/src/tcp/tcp_tls_sender.rs @@ -25,12 +25,13 @@ use iggy_common::IggyError; use monoio::buf::IoBufMut; use monoio::io::AsyncWriteRent; use monoio::net::TcpStream; -use monoio_native_tls::TlsStream; +use monoio_rustls::ServerTlsStream; +//use tokio_rustls::server::TlsStream; use nix::libc; #[derive(Debug)] pub struct TcpTlsSender { - pub(crate) stream: TlsStream<TcpStream>, + pub(crate) stream: ServerTlsStream<TcpStream>, } impl Sender for TcpTlsSender {
