This is an automated email from the ASF dual-hosted git repository. piotr pushed a commit to branch bench-tls in repository https://gitbox.apache.org/repos/asf/iggy.git
commit a484f8d27e097a385cb31bd0b1eeb2b7bf61469a Author: Hubert Gruszecki <[email protected]> AuthorDate: Sun Jun 22 12:46:44 2025 +0200 feat(bench): add TCP TLS support to `iggy-bench` This commit introduces TLS support for the benchmark binary, enhancing security by enabling encrypted communication. The changes include: - Integration of `rcgen` and `rustls` for TLS certificate handling. - Modifications to the `TcpClientFactory` to support TLS configuration. - Addition of TLS-related arguments in the `TcpArgs` struct. - Implementation of self-signed certificate generation for testing. - Updates to the server configuration to support TLS and self-signed certificates. - New tests to validate TLS and self-signed certificate scenarios. --- Cargo.lock | 11 +- Cargo.toml | 4 +- core/bench/src/analytics/report_builder.rs | 8 +- core/bench/src/args/mod.rs | 2 +- core/bench/src/args/transport.rs | 18 ++- core/bench/src/runner.rs | 6 +- core/bench/src/utils/client_factory.rs | 20 ++- core/bench/src/utils/mod.rs | 48 ++----- core/common/Cargo.toml | 2 + .../src/args/mod.rs => common/src/certificates.rs} | 20 +-- core/common/src/lib.rs | 2 + .../configuration/tcp_config/tcp_client_config.rs | 5 + .../tcp_config/tcp_client_config_builder.rs | 6 + core/configs/server.toml | 17 ++- core/connectors/runtime/README.md | 4 +- core/connectors/runtime/config.toml | 4 +- core/integration/src/tcp_client.rs | 26 +++- core/integration/tests/server/specific.rs | 35 +++++ core/integration/tests/streaming/messages.rs | 153 --------------------- core/sdk/src/client_provider.rs | 1 + core/sdk/src/clients/client_builder.rs | 8 ++ core/sdk/src/tcp/mod.rs | 1 + core/sdk/src/tcp/tcp_client.rs | 58 ++++---- core/sdk/src/tcp/tcp_tls_verifier.rs | 59 ++++++++ core/server/Cargo.toml | 1 - core/server/src/configs/defaults.rs | 1 + core/server/src/configs/tcp.rs | 1 + core/server/src/quic/quic_server.rs | 11 +- core/server/src/tcp/tcp_tls_listener.rs | 109 ++++++++------- foreign/cpp/tests/e2e/server.toml | 8 +- 30 files changed, 329 insertions(+), 320 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0406b044..ea6eb0e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3827,6 +3827,8 @@ dependencies = [ "derive_more 2.0.1", "fast-async-mutex", "humantime", + "rcgen", + "rustls", "serde", "serde_json", "serde_with", @@ -4366,9 +4368,9 @@ dependencies = [ [[package]] name = "libgit2-sys" -version = "0.18.1+1.9.0" +version = "0.18.2+1.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1dcb20f84ffcdd825c7a311ae347cce604a6f084a767dec4a4929829645290e" +checksum = "1c42fe03df2bd3c53a3a9c7317ad91d80c81cd1fb0caec8d7cc4cd2bfa10c222" dependencies = [ "cc", "libc", @@ -6747,7 +6749,6 @@ dependencies = [ "opentelemetry_sdk", "prometheus-client", "quinn", - "rcgen", "reqwest", "ring", "rust-s3", @@ -8909,9 +8910,9 @@ dependencies = [ [[package]] name = "zip" -version = "4.1.0" +version = "4.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af7dcdb4229c0e79c2531a24de7726a0e980417a74fb4d030a35f535665439a0" +checksum = "95ab361742de920c5535880f89bbd611ee62002bf11341d16a5f057bb8ba6899" dependencies = [ "aes", "arbitrary", diff --git a/Cargo.toml b/Cargo.toml index 71715d67..479e12c3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -157,8 +157,8 @@ tower-http = { version = "0.6.6", features = [ "trace", ] } trait-variant = "0.1.2" -webpki-roots = "1.0.0" -zip = "4.1.0" +webpki-roots = "1.0.1" +zip = "4.2.0" # Optional dependencies mimalloc = "0.1" diff --git a/core/bench/src/analytics/report_builder.rs b/core/bench/src/analytics/report_builder.rs index 248708e0..8e778919 100644 --- a/core/bench/src/analytics/report_builder.rs +++ b/core/bench/src/analytics/report_builder.rs @@ -31,6 +31,8 @@ use bench_report::{ }; use chrono::{DateTime, Utc}; use iggy::prelude::{CacheMetrics, CacheMetricsKey, IggyTimestamp, Stats}; +use integration::test_server::ClientFactory; +use std::sync::Arc; pub struct BenchmarkReportBuilder; @@ -41,6 +43,7 @@ impl BenchmarkReportBuilder { mut params: BenchmarkParams, mut individual_metrics: Vec<BenchmarkIndividualMetrics>, moving_average_window: u32, + client_factory: &Arc<dyn ClientFactory>, ) -> BenchmarkReport { let uuid = uuid::Uuid::new_v4(); @@ -48,10 +51,7 @@ impl BenchmarkReportBuilder { DateTime::<Utc>::from_timestamp_micros(IggyTimestamp::now().as_micros() as i64) .map_or_else(|| String::from("unknown"), |dt| dt.to_rfc3339()); - let transport = params.transport; - let server_addr = params.server_address.clone(); - - let server_stats = get_server_stats(&transport, &server_addr) + let server_stats = get_server_stats(client_factory) .await .expect("Failed to get server stats"); diff --git a/core/bench/src/args/mod.rs b/core/bench/src/args/mod.rs index 458b051b..bfc04b31 100644 --- a/core/bench/src/args/mod.rs +++ b/core/bench/src/args/mod.rs @@ -20,8 +20,8 @@ pub mod common; pub mod defaults; pub mod kind; pub mod kinds; +pub mod transport; mod examples; mod output; mod props; -mod transport; diff --git a/core/bench/src/args/transport.rs b/core/bench/src/args/transport.rs index 18ca668d..c8bbc484 100644 --- a/core/bench/src/args/transport.rs +++ b/core/bench/src/args/transport.rs @@ -127,6 +127,22 @@ pub struct TcpArgs { #[arg(long, default_value_t = false)] pub nodelay: bool, + /// Enable TLS encryption + #[arg(long, default_value_t = false)] + pub tls: bool, + + /// TLS domain name + #[arg(long, default_value_t = String::from("localhost"), requires = "tls")] + pub tls_domain: String, + + /// Validate TLS certificate + #[arg(long, default_value_t = false, requires = "tls")] + pub tls_validate_certificate: bool, + + /// Path to CA certificate file for TLS validation + #[arg(long, requires = "tls")] + pub tls_ca_file: Option<String>, + /// Optional output command, used to output results (charts, raw json data) to a directory #[command(subcommand)] output: Option<BenchmarkOutputCommand>, @@ -142,7 +158,7 @@ impl BenchmarkTransportProps for TcpArgs { } fn validate_certificate(&self) -> bool { - panic!("Cannot validate certificate for TCP transport!") + self.tls_validate_certificate } fn client_address(&self) -> &str { diff --git a/core/bench/src/runner.rs b/core/bench/src/runner.rs index 49c5e696..4494557f 100644 --- a/core/bench/src/runner.rs +++ b/core/bench/src/runner.rs @@ -71,14 +71,13 @@ impl BenchmarkRunner { let hardware = BenchmarkHardware::get_system_info_with_identifier(benchmark.args().identifier()); let params = params_from_args_and_metrics(benchmark.args(), &individual_metrics); - let transport = params.transport; - let server_addr = params.server_address.clone(); let report = BenchmarkReportBuilder::build( hardware, params, individual_metrics, benchmark.args().moving_average_window(), + benchmark.client_factory(), ) .await; @@ -100,8 +99,7 @@ impl BenchmarkRunner { report.dump_to_json(&full_output_path); if let Err(e) = collect_server_logs_and_save_to_file( - &transport, - &server_addr, + benchmark.client_factory(), Path::new(&full_output_path), ) .await diff --git a/core/bench/src/utils/client_factory.rs b/core/bench/src/utils/client_factory.rs index 525d66dd..41d1f54f 100644 --- a/core/bench/src/utils/client_factory.rs +++ b/core/bench/src/utils/client_factory.rs @@ -17,6 +17,7 @@ */ use crate::args::common::IggyBenchArgs; +use crate::args::transport::BenchmarkTransportCommand; use integration::http_client::HttpClientFactory; use integration::quic_client::QuicClientFactory; use integration::tcp_client::TcpClientFactory; @@ -28,10 +29,21 @@ pub fn create_client_factory(args: &IggyBenchArgs) -> Arc<dyn ClientFactory> { Transport::Http => Arc::new(HttpClientFactory { server_addr: args.server_address().to_owned(), }), - Transport::Tcp => Arc::new(TcpClientFactory { - server_addr: args.server_address().to_owned(), - nodelay: args.nodelay(), - }), + Transport::Tcp => { + let transport_command = args.transport_command(); + if let BenchmarkTransportCommand::Tcp(tcp_args) = transport_command { + Arc::new(TcpClientFactory { + server_addr: args.server_address().to_owned(), + nodelay: args.nodelay(), + tls_enabled: tcp_args.tls, + tls_domain: tcp_args.tls_domain.clone(), + tls_ca_file: tcp_args.tls_ca_file.clone(), + tls_validate_certificate: tcp_args.tls_validate_certificate, + }) + } else { + unreachable!("Transport is TCP but transport command is not TcpArgs") + } + } Transport::Quic => Arc::new(QuicClientFactory { server_addr: args.server_address().to_owned(), }), diff --git a/core/bench/src/utils/mod.rs b/core/bench/src/utils/mod.rs index 42d316f0..cc6d5fc4 100644 --- a/core/bench/src/utils/mod.rs +++ b/core/bench/src/utils/mod.rs @@ -22,8 +22,8 @@ use bench_report::{ transport::BenchmarkTransport, }; use iggy::prelude::*; -use integration::test_server::Transport; -use std::{fs, path::Path}; +use integration::test_server::{ClientFactory, Transport}; +use std::{fs, path::Path, sync::Arc}; use tracing::{error, info}; use crate::args::{ @@ -61,26 +61,9 @@ pub fn batch_user_size_bytes(polled_messages: &PolledMessages) -> u64 { .sum() } -pub async fn get_server_stats( - transport: &BenchmarkTransport, - server_address: &str, -) -> Result<Stats, IggyError> { - let client = IggyClientBuilder::new(); - - let client = match transport { - BenchmarkTransport::Tcp => client - .with_tcp() - .with_server_address(server_address.to_string()) - .build()?, - BenchmarkTransport::Http => client - .with_http() - .with_api_url(format!("http://{server_address}")) - .build()?, - BenchmarkTransport::Quic => client - .with_quic() - .with_server_address(server_address.to_string()) - .build()?, - }; +pub async fn get_server_stats(client_factory: &Arc<dyn ClientFactory>) -> Result<Stats, IggyError> { + let client = client_factory.create_client().await; + let client = IggyClient::create(client, None, None); client.connect().await?; client @@ -91,26 +74,11 @@ pub async fn get_server_stats( } pub async fn collect_server_logs_and_save_to_file( - transport: &BenchmarkTransport, - server_address: &str, + client_factory: &Arc<dyn ClientFactory>, output_dir: &Path, ) -> Result<(), IggyError> { - let client = IggyClientBuilder::new(); - - let client = match transport { - BenchmarkTransport::Tcp => client - .with_tcp() - .with_server_address(server_address.to_string()) - .build()?, - BenchmarkTransport::Http => client - .with_http() - .with_api_url(format!("http://{server_address}")) - .build()?, - BenchmarkTransport::Quic => client - .with_quic() - .with_server_address(server_address.to_string()) - .build()?, - }; + let client = client_factory.create_client().await; + let client = IggyClient::create(client, None, None); client.connect().await?; client diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml index 660d70f4..443b0150 100644 --- a/core/common/Cargo.toml +++ b/core/common/Cargo.toml @@ -46,6 +46,8 @@ crc32fast = { workspace = true } derive_more = { workspace = true } fast-async-mutex = { version = "0.6.7", optional = true } humantime = { workspace = true } +rcgen = "0.13.2" +rustls = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } serde_with = { workspace = true, features = ["base64"] } diff --git a/core/bench/src/args/mod.rs b/core/common/src/certificates.rs similarity index 58% copy from core/bench/src/args/mod.rs copy to core/common/src/certificates.rs index 458b051b..03056e99 100644 --- a/core/bench/src/args/mod.rs +++ b/core/common/src/certificates.rs @@ -16,12 +16,16 @@ * under the License. */ -pub mod common; -pub mod defaults; -pub mod kind; -pub mod kinds; +use rustls::pki_types::{CertificateDer, PrivateKeyDer}; -mod examples; -mod output; -mod props; -mod transport; +/// Generates a self-signed certificate for the given domain. +/// Returns a tuple of (certificate chain, private key). +pub fn generate_self_signed_certificate( + domain: &str, +) -> Result<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>), Box<dyn std::error::Error>> { + let cert = rcgen::generate_simple_self_signed(vec![domain.to_string()])?; + let cert_der = cert.cert.der(); + let key_der = cert.key_pair.serialize_der(); + let key = PrivateKeyDer::try_from(key_der)?; + Ok((vec![cert_der.clone()], key)) +} diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs index bd83fa66..6603a7d5 100644 --- a/core/common/src/lib.rs +++ b/core/common/src/lib.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +mod certificates; mod commands; mod error; mod traits; @@ -81,6 +82,7 @@ pub use types::user::user_identity_info::*; pub use types::user::user_info::*; pub use types::user::user_status::*; // Utils +pub use certificates::generate_self_signed_certificate; pub use utils::byte_size::IggyByteSize; pub use utils::checksum::*; pub use utils::crypto::*; diff --git a/core/common/src/types/configuration/tcp_config/tcp_client_config.rs b/core/common/src/types/configuration/tcp_config/tcp_client_config.rs index 9ebc6c1c..2976f499 100644 --- a/core/common/src/types/configuration/tcp_config/tcp_client_config.rs +++ b/core/common/src/types/configuration/tcp_config/tcp_client_config.rs @@ -32,6 +32,8 @@ pub struct TcpClientConfig { pub tls_domain: String, /// The path to the CA file for TLS. pub tls_ca_file: Option<String>, + /// Whether to validate the TLS certificate. + pub tls_validate_certificate: bool, /// Whether to automatically login user after establishing connection. pub auto_login: AutoLogin, /// Whether to automatically reconnect when disconnected. @@ -49,6 +51,7 @@ impl Default for TcpClientConfig { tls_enabled: false, tls_domain: "localhost".to_string(), tls_ca_file: None, + tls_validate_certificate: true, heartbeat_interval: IggyDuration::from_str("5s").unwrap(), auto_login: AutoLogin::Disabled, reconnection: TcpClientReconnectionConfig::default(), @@ -65,6 +68,8 @@ impl From<ConnectionString<TcpConnectionStringOptions>> for TcpClientConfig { tls_enabled: connection_string.options().tls_enabled(), tls_domain: connection_string.options().tls_domain().into(), tls_ca_file: connection_string.options().tls_ca_file().to_owned(), + // Always validate TLS certificate for connection strings, we don't want to allow self-signed certificates for connection strings + tls_validate_certificate: true, reconnection: connection_string.options().reconnection().to_owned(), heartbeat_interval: connection_string.options().heartbeat_interval(), nodelay: connection_string.options().nodelay(), diff --git a/core/common/src/types/configuration/tcp_config/tcp_client_config_builder.rs b/core/common/src/types/configuration/tcp_config/tcp_client_config_builder.rs index 097e9501..6844bbb4 100644 --- a/core/common/src/types/configuration/tcp_config/tcp_client_config_builder.rs +++ b/core/common/src/types/configuration/tcp_config/tcp_client_config_builder.rs @@ -83,6 +83,12 @@ impl TcpClientConfigBuilder { self } + /// Sets whether to validate the TLS certificate. + pub fn with_tls_validate_certificate(mut self, tls_validate_certificate: bool) -> Self { + self.config.tls_validate_certificate = tls_validate_certificate; + self + } + /// Sets the nodelay option for the TCP socket. pub fn with_no_delay(mut self) -> Self { self.config.nodelay = true; diff --git a/core/configs/server.toml b/core/configs/server.toml index 42a558a6..5445ff18 100644 --- a/core/configs/server.toml +++ b/core/configs/server.toml @@ -168,10 +168,10 @@ endpoint = "/metrics" enabled = false # Path to the TLS certificate file. -cert_file = "certs/iggy_cert.pem" +cert_file = "core/certs/iggy_cert.pem" # Path to the TLS key file. -key_file = "certs/iggy_key.pem" +key_file = "core/certs/iggy_key.pem" # TCP server configuration. [tcp] @@ -194,11 +194,16 @@ ipv6 = false # `false` leaves TCP connections unencrypted. enabled = false +# Enables or disables self-signed certificate generation. +# `true` generates a self-signed certificate if cert files don't exist. +# `false` requires certificate files to exist at the specified paths. +self_signed = true + # Path to the TLS certificate file. -cert_file = "certs/iggy_cert.pem" +cert_file = "core/certs/iggy_cert.pem" # Path to the TLS key file. -key_file = "certs/iggy_key.pem" +key_file = "core/certs/iggy_key.pem" # Configuration for the TCP socket [tcp.socket] @@ -261,10 +266,10 @@ max_idle_timeout = "10 s" self_signed = true # Path to the QUIC TLS certificate file. -cert_file = "certs/iggy_cert.pem" +cert_file = "core/certs/iggy_cert.pem" # Path to the QUIC TLS key file. -key_file = "certs/iggy_key.pem" +key_file = "core/certs/iggy_key.pem" # Message cleaner configuration. [message_cleaner] diff --git a/core/connectors/runtime/README.md b/core/connectors/runtime/README.md index b502bf67..18da28d5 100644 --- a/core/connectors/runtime/README.md +++ b/core/connectors/runtime/README.md @@ -45,8 +45,8 @@ allow_private_network = false [http_api.tls] # Optional TLS configuration for HTTP API enabled = false -cert_file = "certs/iggy_cert.pem" -key_file = "certs/iggy_key.pem" +cert_file = "core/certs/iggy_cert.pem" +key_file = "core/certs/iggy_key.pem" ``` Currently, it does expose the following endpoints: diff --git a/core/connectors/runtime/config.toml b/core/connectors/runtime/config.toml index 171e3a32..7f811ffc 100644 --- a/core/connectors/runtime/config.toml +++ b/core/connectors/runtime/config.toml @@ -31,8 +31,8 @@ allow_private_network = false [http_api.tls] # Optional TLS configuration for HTTP API enabled = false -cert_file = "certs/iggy_cert.pem" -key_file = "certs/iggy_key.pem" +cert_file = "core/certs/iggy_cert.pem" +key_file = "core/certs/iggy_key.pem" [iggy] address = "localhost:8090" diff --git a/core/integration/src/tcp_client.rs b/core/integration/src/tcp_client.rs index a5a4ae04..87a0cec3 100644 --- a/core/integration/src/tcp_client.rs +++ b/core/integration/src/tcp_client.rs @@ -25,6 +25,10 @@ use std::sync::Arc; pub struct TcpClientFactory { pub server_addr: String, pub nodelay: bool, + pub tls_enabled: bool, + pub tls_domain: String, + pub tls_ca_file: Option<String>, + pub tls_validate_certificate: bool, } #[async_trait] @@ -33,6 +37,10 @@ impl ClientFactory for TcpClientFactory { let config = TcpClientConfig { server_address: self.server_addr.clone(), nodelay: self.nodelay, + tls_enabled: self.tls_enabled, + tls_domain: self.tls_domain.clone(), + tls_ca_file: self.tls_ca_file.clone(), + tls_validate_certificate: self.tls_validate_certificate, ..TcpClientConfig::default() }; let client = TcpClient::create(Arc::new(config)).unwrap_or_else(|e| { @@ -42,10 +50,20 @@ impl ClientFactory for TcpClientFactory { ) }); Client::connect(&client).await.unwrap_or_else(|e| { - panic!( - "Failed to connect to iggy-server at {}, error: {:?}", - self.server_addr, e - ) + if self.tls_enabled { + panic!( + "Failed to connect to iggy-server at {} with TLS enabled, error: {:?}\n\ + Hint: Make sure the server is started with TLS enabled and self-signed certificate:\n\ + IGGY_TCP_TLS_ENABLED=true IGGY_TCP_TLS_SELF_SIGNED=true\n + or start iggy-bench with relevant tcp tls arguments: --tls --tls-domain <domain> --tls-ca-file <ca_file>\n", + self.server_addr, e + ) + } else { + panic!( + "Failed to connect to iggy-server at {}, error: {:?}", + self.server_addr, e + ) + } }); Box::new(client) } diff --git a/core/integration/tests/server/specific.rs b/core/integration/tests/server/specific.rs index d2d28e37..6d2c7606 100644 --- a/core/integration/tests/server/specific.rs +++ b/core/integration/tests/server/specific.rs @@ -93,6 +93,41 @@ async fn tcp_tls_scenario_should_be_valid() { tcp_tls_scenario::run(&client).await; } +#[tokio::test] +#[parallel] +async fn tcp_tls_self_signed_scenario_should_be_valid() { + use iggy::clients::client_builder::IggyClientBuilder; + + let mut extra_envs = HashMap::new(); + extra_envs.insert("IGGY_TCP_TLS_ENABLED".to_string(), "true".to_string()); + extra_envs.insert("IGGY_TCP_TLS_SELF_SIGNED".to_string(), "true".to_string()); + + let mut test_server = TestServer::new(Some(extra_envs), true, None, IpAddrKind::V4); + test_server.start(); + + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + + let server_addr = test_server.get_raw_tcp_addr().unwrap(); + + let client = IggyClientBuilder::new() + .with_tcp() + .with_server_address(server_addr) + .with_tls_enabled(true) + .with_tls_domain("localhost".to_string()) + .with_tls_validate_certificate(false) + .build() + .expect("Failed to create TLS client"); + + client + .connect() + .await + .expect("Failed to connect TLS client with self-signed cert"); + + let client = iggy::clients::client::IggyClient::create(Box::new(client), None, None); + + tcp_tls_scenario::run(&client).await; +} + // Message size scenario is specific to TCP transport to test the behavior around the maximum message size. // When run on other transports, it will fail because both QUIC and HTTP have different message size limits. #[tokio::test] diff --git a/core/integration/tests/streaming/messages.rs b/core/integration/tests/streaming/messages.rs index 96e4c10a..78a04642 100644 --- a/core/integration/tests/streaming/messages.rs +++ b/core/integration/tests/streaming/messages.rs @@ -28,159 +28,6 @@ use std::str::FromStr; use std::sync::Arc; use std::sync::atomic::{AtomicU32, AtomicU64}; -#[tokio::test] -async fn should_persist_messages_and_then_load_them_by_timestamp() { - let setup = TestSetup::init().await; - let stream_id = 1; - let topic_id = 1; - let partition_id = 1; - let messages_count = 100; - let config = Arc::new(SystemConfig { - path: setup.config.path.to_string(), - partition: PartitionConfig { - messages_required_to_save: messages_count, - enforce_fsync: true, - ..Default::default() - }, - ..Default::default() - }); - let mut partition = Partition::create( - stream_id, - topic_id, - partition_id, - true, - config.clone(), - setup.storage.clone(), - IggyExpiry::NeverExpire, - Arc::new(AtomicU64::new(0)), - Arc::new(AtomicU64::new(0)), - Arc::new(AtomicU64::new(0)), - Arc::new(AtomicU64::new(0)), - Arc::new(AtomicU32::new(0)), - IggyTimestamp::now(), - ) - .await; - - let mut messages = Vec::with_capacity(messages_count as usize); - let mut appended_messages = Vec::with_capacity(messages_count as usize); - let mut messages_two = Vec::with_capacity(messages_count as usize); - for i in 1..=messages_count { - let id = i as u128; - let payload = Bytes::from(format!("message {}", i)); - let mut headers = HashMap::new(); - headers.insert( - HeaderKey::new("key_1").unwrap(), - HeaderValue::from_str("Value 1").unwrap(), - ); - headers.insert( - HeaderKey::new("key 2").unwrap(), - HeaderValue::from_bool(true).unwrap(), - ); - headers.insert( - HeaderKey::new("key-3").unwrap(), - HeaderValue::from_uint64(123456).unwrap(), - ); - let message = IggyMessage::builder() - .id(id) - .payload(payload) - .user_headers(headers) - .build() - .expect("Failed to create message with valid payload and headers"); - - messages.push(message); - } - - for i in (messages_count + 1)..=(messages_count * 2) { - let id = i as u128; - let payload = Bytes::from(format!("message {}", i)); - let mut headers = HashMap::new(); - headers.insert( - HeaderKey::new("key_1").unwrap(), - HeaderValue::from_str("Value 1").unwrap(), - ); - headers.insert( - HeaderKey::new("key 2").unwrap(), - HeaderValue::from_bool(true).unwrap(), - ); - headers.insert( - HeaderKey::new("key-3").unwrap(), - HeaderValue::from_uint64(123456).unwrap(), - ); - - let message = IggyMessage::builder() - .id(id) - .payload(payload.clone()) - .user_headers(headers.clone()) - .build() - .expect("Failed to create message with valid payload and headers"); - - let message_clone = IggyMessage::builder() - .id(id) - .payload(payload) - .user_headers(headers) - .build() - .expect("Failed to create message with valid payload and headers"); - - appended_messages.push(message); - messages_two.push(message_clone); - } - - setup.create_partitions_directory(stream_id, topic_id).await; - partition.persist().await.unwrap(); - let messages_size = messages - .iter() - .map(|msg| msg.get_size_bytes().as_bytes_u32()) - .sum::<u32>(); - let batch = IggyMessagesBatchMut::from_messages(&messages, messages_size); - - let messages_size_two = messages_two - .iter() - .map(|msg| msg.get_size_bytes().as_bytes_u32()) - .sum::<u32>(); - let batch_two = IggyMessagesBatchMut::from_messages(&messages_two, messages_size_two); - - partition.append_messages(batch, None).await.unwrap(); - let test_timestamp = IggyTimestamp::now(); - partition.append_messages(batch_two, None).await.unwrap(); - - let loaded_messages = partition - .get_messages_by_timestamp(test_timestamp, messages_count) - .await - .unwrap(); - - assert_eq!( - loaded_messages.count(), - messages_count, - "Unexpected loaded messages count" - ); - (0..loaded_messages.count() as usize).for_each(|i| { - let loaded_message = &loaded_messages.get(i).unwrap(); - let appended_message = &appended_messages[i]; - assert_eq!( - loaded_message.header().id(), - appended_message.header.id, - "Message ID mismatch at position {i}" - ); - assert_eq!( - loaded_message.payload(), - appended_message.payload, - "Payload mismatch at position {i}", - ); - assert!( - loaded_message.header().timestamp() >= test_timestamp.as_micros(), - "Message timestamp {} at position {} is less than test timestamp {}", - loaded_message.header().timestamp(), - i, - test_timestamp - ); - assert_eq!( - loaded_message.user_headers_map().unwrap().unwrap(), - appended_message.user_headers_map().unwrap().unwrap(), - "Headers mismatch at position {i}", - ); - }); -} - #[tokio::test] async fn should_persist_messages_and_then_load_them_from_disk() { let setup = TestSetup::init().await; diff --git a/core/sdk/src/client_provider.rs b/core/sdk/src/client_provider.rs index 9bad4f16..bde7757f 100644 --- a/core/sdk/src/client_provider.rs +++ b/core/sdk/src/client_provider.rs @@ -130,6 +130,7 @@ impl ClientProviderConfig { tls_enabled: args.tcp_tls_enabled, tls_domain: args.tcp_tls_domain, tls_ca_file: args.tcp_tls_ca_file, + tls_validate_certificate: true, nodelay: args.tcp_nodelay, heartbeat_interval: IggyDuration::from_str(&args.tcp_heartbeat_interval) .unwrap(), diff --git a/core/sdk/src/clients/client_builder.rs b/core/sdk/src/clients/client_builder.rs index 04f8cbea..1698c126 100644 --- a/core/sdk/src/clients/client_builder.rs +++ b/core/sdk/src/clients/client_builder.rs @@ -182,6 +182,14 @@ impl TcpClientBuilder { self } + /// Sets whether to validate the TLS certificate. + pub fn with_tls_validate_certificate(mut self, tls_validate_certificate: bool) -> Self { + self.config = self + .config + .with_tls_validate_certificate(tls_validate_certificate); + self + } + /// Sets the nodelay option for the TCP socket. pub fn with_no_delay(mut self) -> Self { self.config = self.config.with_no_delay(); diff --git a/core/sdk/src/tcp/mod.rs b/core/sdk/src/tcp/mod.rs index 2424bec9..c073ff0e 100644 --- a/core/sdk/src/tcp/mod.rs +++ b/core/sdk/src/tcp/mod.rs @@ -21,3 +21,4 @@ pub(crate) mod tcp_connection_stream; pub(crate) mod tcp_connection_stream_kind; pub(crate) mod tcp_stream; pub(crate) mod tcp_tls_connection_stream; +pub(crate) mod tcp_tls_verifier; diff --git a/core/sdk/src/tcp/tcp_client.rs b/core/sdk/src/tcp/tcp_client.rs index 8db50940..692d7689 100644 --- a/core/sdk/src/tcp/tcp_client.rs +++ b/core/sdk/src/tcp/tcp_client.rs @@ -351,32 +351,42 @@ impl TcpClient { let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); - let mut root_cert_store = rustls::RootCertStore::empty(); - if let Some(certificate_path) = &self.config.tls_ca_file { - for cert in CertificateDer::pem_file_iter(certificate_path).map_err(|error| { - error!("Failed to read the CA file: {certificate_path}. {error}",); - IggyError::InvalidTlsCertificatePath - })? { - let certificate = cert.map_err(|error| { - error!( - "Failed to read a certificate from the CA file: {certificate_path}. {error}", - ); - IggyError::InvalidTlsCertificate - })?; - root_cert_store.add(certificate).map_err(|error| { - error!( - "Failed to add a certificate to the root certificate store. {error}", - ); - IggyError::InvalidTlsCertificate - })?; + let config = if self.config.tls_validate_certificate { + let mut root_cert_store = rustls::RootCertStore::empty(); + if let Some(certificate_path) = &self.config.tls_ca_file { + for cert in + CertificateDer::pem_file_iter(certificate_path).map_err(|error| { + error!("Failed to read the CA file: {certificate_path}. {error}",); + IggyError::InvalidTlsCertificatePath + })? + { + let certificate = cert.map_err(|error| { + error!( + "Failed to read a certificate from the CA file: {certificate_path}. {error}", + ); + IggyError::InvalidTlsCertificate + })?; + root_cert_store.add(certificate).map_err(|error| { + error!( + "Failed to add a certificate to the root certificate store. {error}", + ); + IggyError::InvalidTlsCertificate + })?; + } + } else { + root_cert_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); } - } else { - root_cert_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); - } - let config = rustls::ClientConfig::builder() - .with_root_certificates(root_cert_store) - .with_no_client_auth(); + rustls::ClientConfig::builder() + .with_root_certificates(root_cert_store) + .with_no_client_auth() + } else { + use crate::tcp::tcp_tls_verifier::NoServerVerification; + rustls::ClientConfig::builder() + .dangerous() + .with_custom_certificate_verifier(Arc::new(NoServerVerification)) + .with_no_client_auth() + }; let connector = TlsConnector::from(Arc::new(config)); let tls_domain = self.config.tls_domain.to_owned(); let domain = ServerName::try_from(tls_domain).map_err(|error| { diff --git a/core/sdk/src/tcp/tcp_tls_verifier.rs b/core/sdk/src/tcp/tcp_tls_verifier.rs new file mode 100644 index 00000000..0a1e710f --- /dev/null +++ b/core/sdk/src/tcp/tcp_tls_verifier.rs @@ -0,0 +1,59 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier}; +use rustls::pki_types::{CertificateDer, ServerName, UnixTime}; +use rustls::{DigitallySignedStruct, Error, SignatureScheme}; + +#[derive(Debug)] +pub struct NoServerVerification; + +impl ServerCertVerifier for NoServerVerification { + fn verify_server_cert( + &self, + _end_entity: &CertificateDer<'_>, + _intermediates: &[CertificateDer<'_>], + _server_name: &ServerName<'_>, + _ocsp_response: &[u8], + _now: UnixTime, + ) -> Result<ServerCertVerified, Error> { + Ok(ServerCertVerified::assertion()) + } + + fn verify_tls12_signature( + &self, + _message: &[u8], + _cert: &CertificateDer<'_>, + _dss: &DigitallySignedStruct, + ) -> Result<HandshakeSignatureValid, Error> { + Ok(HandshakeSignatureValid::assertion()) + } + + fn verify_tls13_signature( + &self, + _message: &[u8], + _cert: &CertificateDer<'_>, + _dss: &DigitallySignedStruct, + ) -> Result<HandshakeSignatureValid, Error> { + Ok(HandshakeSignatureValid::assertion()) + } + + fn supported_verify_schemes(&self) -> Vec<SignatureScheme> { + vec![SignatureScheme::ECDSA_NISTP256_SHA256] + } +} diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index 34ab57fb..b0e46455 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -88,7 +88,6 @@ opentelemetry_sdk = { version = "0.30.0", features = [ ] } prometheus-client = "0.23.1" quinn = { workspace = true } -rcgen = "0.13.2" reqwest = { workspace = true, features = ["rustls-tls-no-provider"] } ring = "0.17.14" rust-s3 = { workspace = true } diff --git a/core/server/src/configs/defaults.rs b/core/server/src/configs/defaults.rs index d0d4f261..82b560cf 100644 --- a/core/server/src/configs/defaults.rs +++ b/core/server/src/configs/defaults.rs @@ -153,6 +153,7 @@ impl Default for TcpTlsConfig { fn default() -> TcpTlsConfig { TcpTlsConfig { enabled: SERVER_CONFIG.tcp.tls.enabled, + self_signed: SERVER_CONFIG.tcp.tls.self_signed, cert_file: SERVER_CONFIG.tcp.tls.cert_file.parse().unwrap(), key_file: SERVER_CONFIG.tcp.tls.key_file.parse().unwrap(), } diff --git a/core/server/src/configs/tcp.rs b/core/server/src/configs/tcp.rs index e5cfed12..eeec7779 100644 --- a/core/server/src/configs/tcp.rs +++ b/core/server/src/configs/tcp.rs @@ -33,6 +33,7 @@ pub struct TcpConfig { #[derive(Debug, Deserialize, Serialize, Clone)] pub struct TcpTlsConfig { pub enabled: bool, + pub self_signed: bool, pub cert_file: String, pub key_file: String, } diff --git a/core/server/src/quic/quic_server.rs b/core/server/src/quic/quic_server.rs index 96d78d96..e024312e 100644 --- a/core/server/src/quic/quic_server.rs +++ b/core/server/src/quic/quic_server.rs @@ -97,16 +97,11 @@ fn configure_quic(config: QuicConfig) -> Result<quinn::ServerConfig, QuicError> fn generate_self_signed_cert<'a>() -> Result<(Vec<CertificateDer<'a>>, PrivateKeyDer<'a>), QuicError> { - let certificate = rcgen::generate_simple_self_signed(vec!["localhost".into()]).unwrap(); - let certificate_der = certificate.cert.der(); - let private_key = certificate.key_pair.serialize_der(); - let private_key = PrivateKeyDer::try_from(private_key) + iggy_common::generate_self_signed_certificate("localhost") .with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to parse private key") + format!("{COMPONENT} (error: {error}) - failed to generate self-signed certificate") }) - .map_err(|_| QuicError::CertGenerationError)?; - let cert_chain = vec![certificate_der.clone()]; - Ok((cert_chain, private_key)) + .map_err(|_| QuicError::CertGenerationError) } fn load_certificates( diff --git a/core/server/src/tcp/tcp_tls_listener.rs b/core/server/src/tcp/tcp_tls_listener.rs index 91094e48..b333841c 100644 --- a/core/server/src/tcp/tcp_tls_listener.rs +++ b/core/server/src/tcp/tcp_tls_listener.rs @@ -22,6 +22,7 @@ use crate::streaming::clients::client_manager::Transport; use crate::streaming::systems::system::SharedSystem; use crate::tcp::connection_handler::{handle_connection, handle_error}; use rustls::ServerConfig; +use rustls::pki_types::{CertificateDer, PrivateKeyDer}; use rustls_pemfile::{certs, private_key}; use std::io::BufReader; use std::net::SocketAddr; @@ -42,36 +43,20 @@ pub(crate) async fn start( tokio::spawn(async move { let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); - let cert_file_path = &config.cert_file; - let cert_file = std::fs::File::open(cert_file_path) - .unwrap_or_else(|e| panic!("Unable to open certificate file '{cert_file_path}': {e}")); - let mut cert_reader = BufReader::new(cert_file); - - let certs: Vec<_> = certs(&mut cert_reader) - .collect::<Result<Vec<_>, _>>() - .unwrap_or_else(|e| { - panic!("Unable to parse certificates from '{cert_file_path}': {e}") - }); - - if certs.is_empty() { - panic!("No certificates found in certificate file '{cert_file_path}'"); - } - - let key_file_path = &config.key_file; - let key_file = std::fs::File::open(key_file_path) - .unwrap_or_else(|e| panic!("Unable to open key file '{key_file_path}': {e}")); - let mut key_reader = BufReader::new(key_file); - - let key = private_key(&mut key_reader) - .unwrap_or_else(|e| panic!("Unable to parse private key from '{key_file_path}': {e}")) - .unwrap_or_else(|| panic!("No private key found in key file '{key_file_path}'")); + let (certs, key) = + if config.self_signed && !std::path::Path::new(&config.cert_file).exists() { + info!("Generating self-signed certificate for TCP TLS server"); + generate_self_signed_cert() + .unwrap_or_else(|e| panic!("Failed to generate self-signed certificate: {e}")) + } else { + load_certificates(&config.cert_file, &config.key_file) + .unwrap_or_else(|e| panic!("Failed to load certificates: {e}")) + }; let server_config = ServerConfig::builder() .with_no_client_auth() .with_single_cert(certs, key) - .unwrap_or_else(|e| { - panic!("Unable to create TLS server config with cert '{cert_file_path}' and key '{key_file_path}': {e}") - }); + .unwrap_or_else(|e| panic!("Unable to create TLS server config: {e}")); let acceptor = TlsAcceptor::from(Arc::new(server_config)); @@ -109,28 +94,34 @@ pub(crate) async fn start( let client_id = session.client_id; let acceptor = acceptor.clone(); - let stream = acceptor.accept(stream).await.unwrap_or_else(|e| { - panic!("Failed to accept TLS connection from '{address}': {e}",); - }); - let system = system.clone(); - let mut sender = SenderKind::get_tcp_tls_sender(stream); - tokio::spawn(async move { - if let Err(error) = - handle_connection(session, &mut sender, system.clone()).await - { - handle_error(error); - system.read().await.delete_client(client_id).await; - if let Err(error) = sender.shutdown().await { - error!( - "Failed to shutdown TCP stream for client: {client_id}, address: {address}. {error}" - ); - } else { - info!( - "Successfully closed TCP stream for client: {client_id}, address: {address}." - ); - } + let system_clone = system.clone(); + match acceptor.accept(stream).await { + Ok(stream) => { + let mut sender = SenderKind::get_tcp_tls_sender(stream); + tokio::spawn(async move { + if let Err(error) = + handle_connection(session, &mut sender, system_clone.clone()) + .await + { + handle_error(error); + system_clone.read().await.delete_client(client_id).await; + if let Err(error) = sender.shutdown().await { + error!( + "Failed to shutdown TCP stream for client: {client_id}, address: {address}. {error}" + ); + } else { + info!( + "Successfully closed TCP stream for client: {client_id}, address: {address}." + ); + } + } + }); + } + Err(e) => { + error!("Failed to accept TLS connection from '{address}': {e}"); + system_clone.read().await.delete_client(client_id).await; } - }); + } } Err(error) => error!("Unable to accept TCP TLS socket. {error}"), } @@ -141,3 +132,27 @@ pub(crate) async fn start( Err(_) => panic!("Failed to get the local address for TCP TLS listener."), } } + +fn generate_self_signed_cert() +-> Result<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>), Box<dyn std::error::Error>> { + iggy_common::generate_self_signed_certificate("localhost") +} + +fn load_certificates( + cert_file: &str, + key_file: &str, +) -> Result<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>), Box<dyn std::error::Error>> { + let cert_file = std::fs::File::open(cert_file)?; + let mut cert_reader = BufReader::new(cert_file); + let certs: Vec<_> = certs(&mut cert_reader).collect::<Result<Vec<_>, _>>()?; + + if certs.is_empty() { + return Err("No certificates found in certificate file".into()); + } + + let key_file = std::fs::File::open(key_file)?; + let mut key_reader = BufReader::new(key_file); + let key = private_key(&mut key_reader)?.ok_or("No private key found in key file")?; + + Ok((certs, key)) +} diff --git a/foreign/cpp/tests/e2e/server.toml b/foreign/cpp/tests/e2e/server.toml index d765b88e..9be27112 100644 --- a/foreign/cpp/tests/e2e/server.toml +++ b/foreign/cpp/tests/e2e/server.toml @@ -104,10 +104,10 @@ endpoint = "/metrics" enabled = false # Path to the TLS certificate file. -cert_file = "certs/iggy_cert.pem" +cert_file = "core/certs/iggy_cert.pem" # Path to the TLS key file. -key_file = "certs/iggy_key.pem" +key_file = "core/certs/iggy_key.pem" # TCP server configuration. [tcp] @@ -173,10 +173,10 @@ max_idle_timeout = "10 s" self_signed = true # Path to the QUIC TLS certificate file. -cert_file = "certs/iggy_cert.pem" +cert_file = "core/certs/iggy_cert.pem" # Path to the QUIC TLS key file. -key_file = "certs/iggy_key.pem" +key_file = "core/certs/iggy_key.pem" # Message cleaner configuration. [message_cleaner]
