This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new d4288aaf feat(bench): add TCP TLS support to `iggy-bench` (#1899)
d4288aaf is described below
commit d4288aaf4bba76f6df945d4fcce52e80d4f712df
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Tue Jun 24 13:24:55 2025 +0200
feat(bench): add TCP TLS support to `iggy-bench` (#1899)
This commit introduces TLS support for the benchmark binary, enhancing
security by enabling encrypted communication. The changes include:
- Integration of `rcgen` and `rustls` for TLS certificate handling.
- Modifications to the `TcpClientFactory` to support TLS configuration.
- Addition of TLS-related arguments in the `TcpArgs` struct.
- Implementation of self-signed certificate generation for testing.
- Updates to the server configuration to support TLS and self-signed
certificates.
- New tests to validate TLS and self-signed certificate scenarios.
---
Cargo.lock | 11 +-
Cargo.toml | 4 +-
DEPENDENCIES.md | 4 +-
core/bench/src/analytics/report_builder.rs | 8 +-
core/bench/src/args/mod.rs | 2 +-
core/bench/src/args/transport.rs | 18 ++-
core/bench/src/runner.rs | 6 +-
core/bench/src/utils/client_factory.rs | 20 ++-
core/bench/src/utils/mod.rs | 48 ++-----
core/common/Cargo.toml | 2 +
.../src/args/mod.rs => common/src/certificates.rs} | 20 +--
core/common/src/lib.rs | 2 +
.../configuration/tcp_config/tcp_client_config.rs | 5 +
.../tcp_config/tcp_client_config_builder.rs | 6 +
core/configs/server.toml | 17 ++-
core/connectors/runtime/README.md | 4 +-
core/connectors/runtime/config.toml | 4 +-
core/integration/src/tcp_client.rs | 26 +++-
core/integration/tests/server/specific.rs | 35 +++++
core/integration/tests/streaming/messages.rs | 153 ---------------------
core/sdk/src/client_provider.rs | 1 +
core/sdk/src/clients/client_builder.rs | 8 ++
core/sdk/src/tcp/mod.rs | 1 +
core/sdk/src/tcp/tcp_client.rs | 58 ++++----
core/sdk/src/tcp/tcp_tls_verifier.rs | 59 ++++++++
core/server/Cargo.toml | 1 -
core/server/src/configs/defaults.rs | 1 +
core/server/src/configs/tcp.rs | 1 +
core/server/src/quic/quic_server.rs | 11 +-
core/server/src/tcp/tcp_tls_listener.rs | 109 ++++++++-------
foreign/cpp/tests/e2e/server.toml | 8 +-
31 files changed, 331 insertions(+), 322 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 0406b044..ea6eb0e1 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3827,6 +3827,8 @@ dependencies = [
"derive_more 2.0.1",
"fast-async-mutex",
"humantime",
+ "rcgen",
+ "rustls",
"serde",
"serde_json",
"serde_with",
@@ -4366,9 +4368,9 @@ dependencies = [
[[package]]
name = "libgit2-sys"
-version = "0.18.1+1.9.0"
+version = "0.18.2+1.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e1dcb20f84ffcdd825c7a311ae347cce604a6f084a767dec4a4929829645290e"
+checksum = "1c42fe03df2bd3c53a3a9c7317ad91d80c81cd1fb0caec8d7cc4cd2bfa10c222"
dependencies = [
"cc",
"libc",
@@ -6747,7 +6749,6 @@ dependencies = [
"opentelemetry_sdk",
"prometheus-client",
"quinn",
- "rcgen",
"reqwest",
"ring",
"rust-s3",
@@ -8909,9 +8910,9 @@ dependencies = [
[[package]]
name = "zip"
-version = "4.1.0"
+version = "4.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "af7dcdb4229c0e79c2531a24de7726a0e980417a74fb4d030a35f535665439a0"
+checksum = "95ab361742de920c5535880f89bbd611ee62002bf11341d16a5f057bb8ba6899"
dependencies = [
"aes",
"arbitrary",
diff --git a/Cargo.toml b/Cargo.toml
index 71715d67..479e12c3 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -157,8 +157,8 @@ tower-http = { version = "0.6.6", features = [
"trace",
] }
trait-variant = "0.1.2"
-webpki-roots = "1.0.0"
-zip = "4.1.0"
+webpki-roots = "1.0.1"
+zip = "4.2.0"
# Optional dependencies
mimalloc = "0.1"
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index b1621ca2..40c5dd69 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -384,7 +384,7 @@ lending-iterator-proc_macros: 0.1.7, "Apache-2.0 OR MIT OR
Zlib",
libbz2-rs-sys: 0.2.1, "bzip2-1.0.6",
libc: 0.2.174, "Apache-2.0 OR MIT",
libdbus-sys: 0.2.5, "Apache-2.0 OR MIT",
-libgit2-sys: 0.18.1+1.9.0, "Apache-2.0 OR MIT",
+libgit2-sys: 0.18.2+1.9.1, "Apache-2.0 OR MIT",
libloading: 0.8.8, "ISC",
liblzma: 0.4.2, "Apache-2.0 OR MIT",
liblzma-sys: 0.4.4, "Apache-2.0 OR MIT",
@@ -829,7 +829,7 @@ zeroize_derive: 1.4.2, "Apache-2.0 OR MIT",
zerotrie: 0.2.2, "Unicode-3.0",
zerovec: 0.11.2, "Unicode-3.0",
zerovec-derive: 0.11.1, "Unicode-3.0",
-zip: 4.1.0, "MIT",
+zip: 4.2.0, "MIT",
zlib-rs: 0.5.1, "Zlib",
zopfli: 0.8.2, "Apache-2.0",
zstd: 0.13.3, "MIT",
diff --git a/core/bench/src/analytics/report_builder.rs
b/core/bench/src/analytics/report_builder.rs
index 248708e0..8e778919 100644
--- a/core/bench/src/analytics/report_builder.rs
+++ b/core/bench/src/analytics/report_builder.rs
@@ -31,6 +31,8 @@ use bench_report::{
};
use chrono::{DateTime, Utc};
use iggy::prelude::{CacheMetrics, CacheMetricsKey, IggyTimestamp, Stats};
+use integration::test_server::ClientFactory;
+use std::sync::Arc;
pub struct BenchmarkReportBuilder;
@@ -41,6 +43,7 @@ impl BenchmarkReportBuilder {
mut params: BenchmarkParams,
mut individual_metrics: Vec<BenchmarkIndividualMetrics>,
moving_average_window: u32,
+ client_factory: &Arc<dyn ClientFactory>,
) -> BenchmarkReport {
let uuid = uuid::Uuid::new_v4();
@@ -48,10 +51,7 @@ impl BenchmarkReportBuilder {
DateTime::<Utc>::from_timestamp_micros(IggyTimestamp::now().as_micros() as i64)
.map_or_else(|| String::from("unknown"), |dt| dt.to_rfc3339());
- let transport = params.transport;
- let server_addr = params.server_address.clone();
-
- let server_stats = get_server_stats(&transport, &server_addr)
+ let server_stats = get_server_stats(client_factory)
.await
.expect("Failed to get server stats");
diff --git a/core/bench/src/args/mod.rs b/core/bench/src/args/mod.rs
index 458b051b..bfc04b31 100644
--- a/core/bench/src/args/mod.rs
+++ b/core/bench/src/args/mod.rs
@@ -20,8 +20,8 @@ pub mod common;
pub mod defaults;
pub mod kind;
pub mod kinds;
+pub mod transport;
mod examples;
mod output;
mod props;
-mod transport;
diff --git a/core/bench/src/args/transport.rs b/core/bench/src/args/transport.rs
index 18ca668d..c8bbc484 100644
--- a/core/bench/src/args/transport.rs
+++ b/core/bench/src/args/transport.rs
@@ -127,6 +127,22 @@ pub struct TcpArgs {
#[arg(long, default_value_t = false)]
pub nodelay: bool,
+ /// Enable TLS encryption
+ #[arg(long, default_value_t = false)]
+ pub tls: bool,
+
+ /// TLS domain name
+ #[arg(long, default_value_t = String::from("localhost"), requires = "tls")]
+ pub tls_domain: String,
+
+ /// Validate TLS certificate
+ #[arg(long, default_value_t = false, requires = "tls")]
+ pub tls_validate_certificate: bool,
+
+ /// Path to CA certificate file for TLS validation
+ #[arg(long, requires = "tls")]
+ pub tls_ca_file: Option<String>,
+
/// Optional output command, used to output results (charts, raw json
data) to a directory
#[command(subcommand)]
output: Option<BenchmarkOutputCommand>,
@@ -142,7 +158,7 @@ impl BenchmarkTransportProps for TcpArgs {
}
fn validate_certificate(&self) -> bool {
- panic!("Cannot validate certificate for TCP transport!")
+ self.tls_validate_certificate
}
fn client_address(&self) -> &str {
diff --git a/core/bench/src/runner.rs b/core/bench/src/runner.rs
index 49c5e696..4494557f 100644
--- a/core/bench/src/runner.rs
+++ b/core/bench/src/runner.rs
@@ -71,14 +71,13 @@ impl BenchmarkRunner {
let hardware =
BenchmarkHardware::get_system_info_with_identifier(benchmark.args().identifier());
let params = params_from_args_and_metrics(benchmark.args(),
&individual_metrics);
- let transport = params.transport;
- let server_addr = params.server_address.clone();
let report = BenchmarkReportBuilder::build(
hardware,
params,
individual_metrics,
benchmark.args().moving_average_window(),
+ benchmark.client_factory(),
)
.await;
@@ -100,8 +99,7 @@ impl BenchmarkRunner {
report.dump_to_json(&full_output_path);
if let Err(e) = collect_server_logs_and_save_to_file(
- &transport,
- &server_addr,
+ benchmark.client_factory(),
Path::new(&full_output_path),
)
.await
diff --git a/core/bench/src/utils/client_factory.rs
b/core/bench/src/utils/client_factory.rs
index 525d66dd..41d1f54f 100644
--- a/core/bench/src/utils/client_factory.rs
+++ b/core/bench/src/utils/client_factory.rs
@@ -17,6 +17,7 @@
*/
use crate::args::common::IggyBenchArgs;
+use crate::args::transport::BenchmarkTransportCommand;
use integration::http_client::HttpClientFactory;
use integration::quic_client::QuicClientFactory;
use integration::tcp_client::TcpClientFactory;
@@ -28,10 +29,21 @@ pub fn create_client_factory(args: &IggyBenchArgs) ->
Arc<dyn ClientFactory> {
Transport::Http => Arc::new(HttpClientFactory {
server_addr: args.server_address().to_owned(),
}),
- Transport::Tcp => Arc::new(TcpClientFactory {
- server_addr: args.server_address().to_owned(),
- nodelay: args.nodelay(),
- }),
+ Transport::Tcp => {
+ let transport_command = args.transport_command();
+ if let BenchmarkTransportCommand::Tcp(tcp_args) =
transport_command {
+ Arc::new(TcpClientFactory {
+ server_addr: args.server_address().to_owned(),
+ nodelay: args.nodelay(),
+ tls_enabled: tcp_args.tls,
+ tls_domain: tcp_args.tls_domain.clone(),
+ tls_ca_file: tcp_args.tls_ca_file.clone(),
+ tls_validate_certificate:
tcp_args.tls_validate_certificate,
+ })
+ } else {
+ unreachable!("Transport is TCP but transport command is not
TcpArgs")
+ }
+ }
Transport::Quic => Arc::new(QuicClientFactory {
server_addr: args.server_address().to_owned(),
}),
diff --git a/core/bench/src/utils/mod.rs b/core/bench/src/utils/mod.rs
index 42d316f0..cc6d5fc4 100644
--- a/core/bench/src/utils/mod.rs
+++ b/core/bench/src/utils/mod.rs
@@ -22,8 +22,8 @@ use bench_report::{
transport::BenchmarkTransport,
};
use iggy::prelude::*;
-use integration::test_server::Transport;
-use std::{fs, path::Path};
+use integration::test_server::{ClientFactory, Transport};
+use std::{fs, path::Path, sync::Arc};
use tracing::{error, info};
use crate::args::{
@@ -61,26 +61,9 @@ pub fn batch_user_size_bytes(polled_messages:
&PolledMessages) -> u64 {
.sum()
}
-pub async fn get_server_stats(
- transport: &BenchmarkTransport,
- server_address: &str,
-) -> Result<Stats, IggyError> {
- let client = IggyClientBuilder::new();
-
- let client = match transport {
- BenchmarkTransport::Tcp => client
- .with_tcp()
- .with_server_address(server_address.to_string())
- .build()?,
- BenchmarkTransport::Http => client
- .with_http()
- .with_api_url(format!("http://{server_address}"))
- .build()?,
- BenchmarkTransport::Quic => client
- .with_quic()
- .with_server_address(server_address.to_string())
- .build()?,
- };
+pub async fn get_server_stats(client_factory: &Arc<dyn ClientFactory>) ->
Result<Stats, IggyError> {
+ let client = client_factory.create_client().await;
+ let client = IggyClient::create(client, None, None);
client.connect().await?;
client
@@ -91,26 +74,11 @@ pub async fn get_server_stats(
}
pub async fn collect_server_logs_and_save_to_file(
- transport: &BenchmarkTransport,
- server_address: &str,
+ client_factory: &Arc<dyn ClientFactory>,
output_dir: &Path,
) -> Result<(), IggyError> {
- let client = IggyClientBuilder::new();
-
- let client = match transport {
- BenchmarkTransport::Tcp => client
- .with_tcp()
- .with_server_address(server_address.to_string())
- .build()?,
- BenchmarkTransport::Http => client
- .with_http()
- .with_api_url(format!("http://{server_address}"))
- .build()?,
- BenchmarkTransport::Quic => client
- .with_quic()
- .with_server_address(server_address.to_string())
- .build()?,
- };
+ let client = client_factory.create_client().await;
+ let client = IggyClient::create(client, None, None);
client.connect().await?;
client
diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml
index 660d70f4..443b0150 100644
--- a/core/common/Cargo.toml
+++ b/core/common/Cargo.toml
@@ -46,6 +46,8 @@ crc32fast = { workspace = true }
derive_more = { workspace = true }
fast-async-mutex = { version = "0.6.7", optional = true }
humantime = { workspace = true }
+rcgen = "0.13.2"
+rustls = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_with = { workspace = true, features = ["base64"] }
diff --git a/core/bench/src/args/mod.rs b/core/common/src/certificates.rs
similarity index 58%
copy from core/bench/src/args/mod.rs
copy to core/common/src/certificates.rs
index 458b051b..03056e99 100644
--- a/core/bench/src/args/mod.rs
+++ b/core/common/src/certificates.rs
@@ -16,12 +16,16 @@
* under the License.
*/
-pub mod common;
-pub mod defaults;
-pub mod kind;
-pub mod kinds;
+use rustls::pki_types::{CertificateDer, PrivateKeyDer};
-mod examples;
-mod output;
-mod props;
-mod transport;
+/// Generates a self-signed certificate for the given domain.
+/// Returns a tuple of (certificate chain, private key).
+pub fn generate_self_signed_certificate(
+ domain: &str,
+) -> Result<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>), Box<dyn
std::error::Error>> {
+ let cert = rcgen::generate_simple_self_signed(vec![domain.to_string()])?;
+ let cert_der = cert.cert.der();
+ let key_der = cert.key_pair.serialize_der();
+ let key = PrivateKeyDer::try_from(key_der)?;
+ Ok((vec![cert_der.clone()], key))
+}
diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs
index bd83fa66..6603a7d5 100644
--- a/core/common/src/lib.rs
+++ b/core/common/src/lib.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+mod certificates;
mod commands;
mod error;
mod traits;
@@ -81,6 +82,7 @@ pub use types::user::user_identity_info::*;
pub use types::user::user_info::*;
pub use types::user::user_status::*;
// Utils
+pub use certificates::generate_self_signed_certificate;
pub use utils::byte_size::IggyByteSize;
pub use utils::checksum::*;
pub use utils::crypto::*;
diff --git
a/core/common/src/types/configuration/tcp_config/tcp_client_config.rs
b/core/common/src/types/configuration/tcp_config/tcp_client_config.rs
index 9ebc6c1c..2976f499 100644
--- a/core/common/src/types/configuration/tcp_config/tcp_client_config.rs
+++ b/core/common/src/types/configuration/tcp_config/tcp_client_config.rs
@@ -32,6 +32,8 @@ pub struct TcpClientConfig {
pub tls_domain: String,
/// The path to the CA file for TLS.
pub tls_ca_file: Option<String>,
+ /// Whether to validate the TLS certificate.
+ pub tls_validate_certificate: bool,
/// Whether to automatically login user after establishing connection.
pub auto_login: AutoLogin,
/// Whether to automatically reconnect when disconnected.
@@ -49,6 +51,7 @@ impl Default for TcpClientConfig {
tls_enabled: false,
tls_domain: "localhost".to_string(),
tls_ca_file: None,
+ tls_validate_certificate: true,
heartbeat_interval: IggyDuration::from_str("5s").unwrap(),
auto_login: AutoLogin::Disabled,
reconnection: TcpClientReconnectionConfig::default(),
@@ -65,6 +68,8 @@ impl From<ConnectionString<TcpConnectionStringOptions>> for
TcpClientConfig {
tls_enabled: connection_string.options().tls_enabled(),
tls_domain: connection_string.options().tls_domain().into(),
tls_ca_file: connection_string.options().tls_ca_file().to_owned(),
+ // Always validate TLS certificate for connection strings, we
don't want to allow self-signed certificates for connection strings
+ tls_validate_certificate: true,
reconnection:
connection_string.options().reconnection().to_owned(),
heartbeat_interval:
connection_string.options().heartbeat_interval(),
nodelay: connection_string.options().nodelay(),
diff --git
a/core/common/src/types/configuration/tcp_config/tcp_client_config_builder.rs
b/core/common/src/types/configuration/tcp_config/tcp_client_config_builder.rs
index 097e9501..6844bbb4 100644
---
a/core/common/src/types/configuration/tcp_config/tcp_client_config_builder.rs
+++
b/core/common/src/types/configuration/tcp_config/tcp_client_config_builder.rs
@@ -83,6 +83,12 @@ impl TcpClientConfigBuilder {
self
}
+ /// Sets whether to validate the TLS certificate.
+ pub fn with_tls_validate_certificate(mut self, tls_validate_certificate:
bool) -> Self {
+ self.config.tls_validate_certificate = tls_validate_certificate;
+ self
+ }
+
/// Sets the nodelay option for the TCP socket.
pub fn with_no_delay(mut self) -> Self {
self.config.nodelay = true;
diff --git a/core/configs/server.toml b/core/configs/server.toml
index 42a558a6..5445ff18 100644
--- a/core/configs/server.toml
+++ b/core/configs/server.toml
@@ -168,10 +168,10 @@ endpoint = "/metrics"
enabled = false
# Path to the TLS certificate file.
-cert_file = "certs/iggy_cert.pem"
+cert_file = "core/certs/iggy_cert.pem"
# Path to the TLS key file.
-key_file = "certs/iggy_key.pem"
+key_file = "core/certs/iggy_key.pem"
# TCP server configuration.
[tcp]
@@ -194,11 +194,16 @@ ipv6 = false
# `false` leaves TCP connections unencrypted.
enabled = false
+# Enables or disables self-signed certificate generation.
+# `true` generates a self-signed certificate if cert files don't exist.
+# `false` requires certificate files to exist at the specified paths.
+self_signed = true
+
# Path to the TLS certificate file.
-cert_file = "certs/iggy_cert.pem"
+cert_file = "core/certs/iggy_cert.pem"
# Path to the TLS key file.
-key_file = "certs/iggy_key.pem"
+key_file = "core/certs/iggy_key.pem"
# Configuration for the TCP socket
[tcp.socket]
@@ -261,10 +266,10 @@ max_idle_timeout = "10 s"
self_signed = true
# Path to the QUIC TLS certificate file.
-cert_file = "certs/iggy_cert.pem"
+cert_file = "core/certs/iggy_cert.pem"
# Path to the QUIC TLS key file.
-key_file = "certs/iggy_key.pem"
+key_file = "core/certs/iggy_key.pem"
# Message cleaner configuration.
[message_cleaner]
diff --git a/core/connectors/runtime/README.md
b/core/connectors/runtime/README.md
index b502bf67..18da28d5 100644
--- a/core/connectors/runtime/README.md
+++ b/core/connectors/runtime/README.md
@@ -45,8 +45,8 @@ allow_private_network = false
[http_api.tls] # Optional TLS configuration for HTTP API
enabled = false
-cert_file = "certs/iggy_cert.pem"
-key_file = "certs/iggy_key.pem"
+cert_file = "core/certs/iggy_cert.pem"
+key_file = "core/certs/iggy_key.pem"
```
Currently, it does expose the following endpoints:
diff --git a/core/connectors/runtime/config.toml
b/core/connectors/runtime/config.toml
index 171e3a32..7f811ffc 100644
--- a/core/connectors/runtime/config.toml
+++ b/core/connectors/runtime/config.toml
@@ -31,8 +31,8 @@ allow_private_network = false
[http_api.tls] # Optional TLS configuration for HTTP API
enabled = false
-cert_file = "certs/iggy_cert.pem"
-key_file = "certs/iggy_key.pem"
+cert_file = "core/certs/iggy_cert.pem"
+key_file = "core/certs/iggy_key.pem"
[iggy]
address = "localhost:8090"
diff --git a/core/integration/src/tcp_client.rs
b/core/integration/src/tcp_client.rs
index a5a4ae04..87a0cec3 100644
--- a/core/integration/src/tcp_client.rs
+++ b/core/integration/src/tcp_client.rs
@@ -25,6 +25,10 @@ use std::sync::Arc;
pub struct TcpClientFactory {
pub server_addr: String,
pub nodelay: bool,
+ pub tls_enabled: bool,
+ pub tls_domain: String,
+ pub tls_ca_file: Option<String>,
+ pub tls_validate_certificate: bool,
}
#[async_trait]
@@ -33,6 +37,10 @@ impl ClientFactory for TcpClientFactory {
let config = TcpClientConfig {
server_address: self.server_addr.clone(),
nodelay: self.nodelay,
+ tls_enabled: self.tls_enabled,
+ tls_domain: self.tls_domain.clone(),
+ tls_ca_file: self.tls_ca_file.clone(),
+ tls_validate_certificate: self.tls_validate_certificate,
..TcpClientConfig::default()
};
let client = TcpClient::create(Arc::new(config)).unwrap_or_else(|e| {
@@ -42,10 +50,20 @@ impl ClientFactory for TcpClientFactory {
)
});
Client::connect(&client).await.unwrap_or_else(|e| {
- panic!(
- "Failed to connect to iggy-server at {}, error: {:?}",
- self.server_addr, e
- )
+ if self.tls_enabled {
+ panic!(
+ "Failed to connect to iggy-server at {} with TLS enabled,
error: {:?}\n\
+ Hint: Make sure the server is started with TLS enabled and
self-signed certificate:\n\
+ IGGY_TCP_TLS_ENABLED=true IGGY_TCP_TLS_SELF_SIGNED=true\n
+ or start iggy-bench with relevant tcp tls arguments: --tls
--tls-domain <domain> --tls-ca-file <ca_file>\n",
+ self.server_addr, e
+ )
+ } else {
+ panic!(
+ "Failed to connect to iggy-server at {}, error: {:?}",
+ self.server_addr, e
+ )
+ }
});
Box::new(client)
}
diff --git a/core/integration/tests/server/specific.rs
b/core/integration/tests/server/specific.rs
index d2d28e37..6d2c7606 100644
--- a/core/integration/tests/server/specific.rs
+++ b/core/integration/tests/server/specific.rs
@@ -93,6 +93,41 @@ async fn tcp_tls_scenario_should_be_valid() {
tcp_tls_scenario::run(&client).await;
}
+#[tokio::test]
+#[parallel]
+async fn tcp_tls_self_signed_scenario_should_be_valid() {
+ use iggy::clients::client_builder::IggyClientBuilder;
+
+ let mut extra_envs = HashMap::new();
+ extra_envs.insert("IGGY_TCP_TLS_ENABLED".to_string(), "true".to_string());
+ extra_envs.insert("IGGY_TCP_TLS_SELF_SIGNED".to_string(),
"true".to_string());
+
+ let mut test_server = TestServer::new(Some(extra_envs), true, None,
IpAddrKind::V4);
+ test_server.start();
+
+ tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
+
+ let server_addr = test_server.get_raw_tcp_addr().unwrap();
+
+ let client = IggyClientBuilder::new()
+ .with_tcp()
+ .with_server_address(server_addr)
+ .with_tls_enabled(true)
+ .with_tls_domain("localhost".to_string())
+ .with_tls_validate_certificate(false)
+ .build()
+ .expect("Failed to create TLS client");
+
+ client
+ .connect()
+ .await
+ .expect("Failed to connect TLS client with self-signed cert");
+
+ let client = iggy::clients::client::IggyClient::create(Box::new(client),
None, None);
+
+ tcp_tls_scenario::run(&client).await;
+}
+
// Message size scenario is specific to TCP transport to test the behavior
around the maximum message size.
// When run on other transports, it will fail because both QUIC and HTTP have
different message size limits.
#[tokio::test]
diff --git a/core/integration/tests/streaming/messages.rs
b/core/integration/tests/streaming/messages.rs
index 96e4c10a..78a04642 100644
--- a/core/integration/tests/streaming/messages.rs
+++ b/core/integration/tests/streaming/messages.rs
@@ -28,159 +28,6 @@ use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicU64};
-#[tokio::test]
-async fn should_persist_messages_and_then_load_them_by_timestamp() {
- let setup = TestSetup::init().await;
- let stream_id = 1;
- let topic_id = 1;
- let partition_id = 1;
- let messages_count = 100;
- let config = Arc::new(SystemConfig {
- path: setup.config.path.to_string(),
- partition: PartitionConfig {
- messages_required_to_save: messages_count,
- enforce_fsync: true,
- ..Default::default()
- },
- ..Default::default()
- });
- let mut partition = Partition::create(
- stream_id,
- topic_id,
- partition_id,
- true,
- config.clone(),
- setup.storage.clone(),
- IggyExpiry::NeverExpire,
- Arc::new(AtomicU64::new(0)),
- Arc::new(AtomicU64::new(0)),
- Arc::new(AtomicU64::new(0)),
- Arc::new(AtomicU64::new(0)),
- Arc::new(AtomicU32::new(0)),
- IggyTimestamp::now(),
- )
- .await;
-
- let mut messages = Vec::with_capacity(messages_count as usize);
- let mut appended_messages = Vec::with_capacity(messages_count as usize);
- let mut messages_two = Vec::with_capacity(messages_count as usize);
- for i in 1..=messages_count {
- let id = i as u128;
- let payload = Bytes::from(format!("message {}", i));
- let mut headers = HashMap::new();
- headers.insert(
- HeaderKey::new("key_1").unwrap(),
- HeaderValue::from_str("Value 1").unwrap(),
- );
- headers.insert(
- HeaderKey::new("key 2").unwrap(),
- HeaderValue::from_bool(true).unwrap(),
- );
- headers.insert(
- HeaderKey::new("key-3").unwrap(),
- HeaderValue::from_uint64(123456).unwrap(),
- );
- let message = IggyMessage::builder()
- .id(id)
- .payload(payload)
- .user_headers(headers)
- .build()
- .expect("Failed to create message with valid payload and headers");
-
- messages.push(message);
- }
-
- for i in (messages_count + 1)..=(messages_count * 2) {
- let id = i as u128;
- let payload = Bytes::from(format!("message {}", i));
- let mut headers = HashMap::new();
- headers.insert(
- HeaderKey::new("key_1").unwrap(),
- HeaderValue::from_str("Value 1").unwrap(),
- );
- headers.insert(
- HeaderKey::new("key 2").unwrap(),
- HeaderValue::from_bool(true).unwrap(),
- );
- headers.insert(
- HeaderKey::new("key-3").unwrap(),
- HeaderValue::from_uint64(123456).unwrap(),
- );
-
- let message = IggyMessage::builder()
- .id(id)
- .payload(payload.clone())
- .user_headers(headers.clone())
- .build()
- .expect("Failed to create message with valid payload and headers");
-
- let message_clone = IggyMessage::builder()
- .id(id)
- .payload(payload)
- .user_headers(headers)
- .build()
- .expect("Failed to create message with valid payload and headers");
-
- appended_messages.push(message);
- messages_two.push(message_clone);
- }
-
- setup.create_partitions_directory(stream_id, topic_id).await;
- partition.persist().await.unwrap();
- let messages_size = messages
- .iter()
- .map(|msg| msg.get_size_bytes().as_bytes_u32())
- .sum::<u32>();
- let batch = IggyMessagesBatchMut::from_messages(&messages, messages_size);
-
- let messages_size_two = messages_two
- .iter()
- .map(|msg| msg.get_size_bytes().as_bytes_u32())
- .sum::<u32>();
- let batch_two = IggyMessagesBatchMut::from_messages(&messages_two,
messages_size_two);
-
- partition.append_messages(batch, None).await.unwrap();
- let test_timestamp = IggyTimestamp::now();
- partition.append_messages(batch_two, None).await.unwrap();
-
- let loaded_messages = partition
- .get_messages_by_timestamp(test_timestamp, messages_count)
- .await
- .unwrap();
-
- assert_eq!(
- loaded_messages.count(),
- messages_count,
- "Unexpected loaded messages count"
- );
- (0..loaded_messages.count() as usize).for_each(|i| {
- let loaded_message = &loaded_messages.get(i).unwrap();
- let appended_message = &appended_messages[i];
- assert_eq!(
- loaded_message.header().id(),
- appended_message.header.id,
- "Message ID mismatch at position {i}"
- );
- assert_eq!(
- loaded_message.payload(),
- appended_message.payload,
- "Payload mismatch at position {i}",
- );
- assert!(
- loaded_message.header().timestamp() >= test_timestamp.as_micros(),
- "Message timestamp {} at position {} is less than test timestamp
{}",
- loaded_message.header().timestamp(),
- i,
- test_timestamp
- );
- assert_eq!(
- loaded_message.user_headers_map().unwrap().unwrap(),
- appended_message.user_headers_map().unwrap().unwrap(),
- "Headers mismatch at position {i}",
- );
- });
-}
-
#[tokio::test]
async fn should_persist_messages_and_then_load_them_from_disk() {
let setup = TestSetup::init().await;
diff --git a/core/sdk/src/client_provider.rs b/core/sdk/src/client_provider.rs
index 9bad4f16..bde7757f 100644
--- a/core/sdk/src/client_provider.rs
+++ b/core/sdk/src/client_provider.rs
@@ -130,6 +130,7 @@ impl ClientProviderConfig {
tls_enabled: args.tcp_tls_enabled,
tls_domain: args.tcp_tls_domain,
tls_ca_file: args.tcp_tls_ca_file,
+ tls_validate_certificate: true,
nodelay: args.tcp_nodelay,
heartbeat_interval:
IggyDuration::from_str(&args.tcp_heartbeat_interval)
.unwrap(),
diff --git a/core/sdk/src/clients/client_builder.rs
b/core/sdk/src/clients/client_builder.rs
index 04f8cbea..1698c126 100644
--- a/core/sdk/src/clients/client_builder.rs
+++ b/core/sdk/src/clients/client_builder.rs
@@ -182,6 +182,14 @@ impl TcpClientBuilder {
self
}
+ /// Sets whether to validate the TLS certificate.
+ pub fn with_tls_validate_certificate(mut self, tls_validate_certificate:
bool) -> Self {
+ self.config = self
+ .config
+ .with_tls_validate_certificate(tls_validate_certificate);
+ self
+ }
+
/// Sets the nodelay option for the TCP socket.
pub fn with_no_delay(mut self) -> Self {
self.config = self.config.with_no_delay();
diff --git a/core/sdk/src/tcp/mod.rs b/core/sdk/src/tcp/mod.rs
index 2424bec9..c073ff0e 100644
--- a/core/sdk/src/tcp/mod.rs
+++ b/core/sdk/src/tcp/mod.rs
@@ -21,3 +21,4 @@ pub(crate) mod tcp_connection_stream;
pub(crate) mod tcp_connection_stream_kind;
pub(crate) mod tcp_stream;
pub(crate) mod tcp_tls_connection_stream;
+pub(crate) mod tcp_tls_verifier;
diff --git a/core/sdk/src/tcp/tcp_client.rs b/core/sdk/src/tcp/tcp_client.rs
index 8db50940..692d7689 100644
--- a/core/sdk/src/tcp/tcp_client.rs
+++ b/core/sdk/src/tcp/tcp_client.rs
@@ -351,32 +351,42 @@ impl TcpClient {
let _ =
rustls::crypto::aws_lc_rs::default_provider().install_default();
- let mut root_cert_store = rustls::RootCertStore::empty();
- if let Some(certificate_path) = &self.config.tls_ca_file {
- for cert in
CertificateDer::pem_file_iter(certificate_path).map_err(|error| {
- error!("Failed to read the CA file: {certificate_path}.
{error}",);
- IggyError::InvalidTlsCertificatePath
- })? {
- let certificate = cert.map_err(|error| {
- error!(
- "Failed to read a certificate from the CA file:
{certificate_path}. {error}",
- );
- IggyError::InvalidTlsCertificate
- })?;
- root_cert_store.add(certificate).map_err(|error| {
- error!(
- "Failed to add a certificate to the root
certificate store. {error}",
- );
- IggyError::InvalidTlsCertificate
- })?;
+ let config = if self.config.tls_validate_certificate {
+ let mut root_cert_store = rustls::RootCertStore::empty();
+ if let Some(certificate_path) = &self.config.tls_ca_file {
+ for cert in
+
CertificateDer::pem_file_iter(certificate_path).map_err(|error| {
+ error!("Failed to read the CA file:
{certificate_path}. {error}",);
+ IggyError::InvalidTlsCertificatePath
+ })?
+ {
+ let certificate = cert.map_err(|error| {
+ error!(
+ "Failed to read a certificate from the CA
file: {certificate_path}. {error}",
+ );
+ IggyError::InvalidTlsCertificate
+ })?;
+ root_cert_store.add(certificate).map_err(|error| {
+ error!(
+ "Failed to add a certificate to the root
certificate store. {error}",
+ );
+ IggyError::InvalidTlsCertificate
+ })?;
+ }
+ } else {
+
root_cert_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
}
- } else {
-
root_cert_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
- }
- let config = rustls::ClientConfig::builder()
- .with_root_certificates(root_cert_store)
- .with_no_client_auth();
+ rustls::ClientConfig::builder()
+ .with_root_certificates(root_cert_store)
+ .with_no_client_auth()
+ } else {
+ use crate::tcp::tcp_tls_verifier::NoServerVerification;
+ rustls::ClientConfig::builder()
+ .dangerous()
+
.with_custom_certificate_verifier(Arc::new(NoServerVerification))
+ .with_no_client_auth()
+ };
let connector = TlsConnector::from(Arc::new(config));
let tls_domain = self.config.tls_domain.to_owned();
let domain = ServerName::try_from(tls_domain).map_err(|error| {
diff --git a/core/sdk/src/tcp/tcp_tls_verifier.rs
b/core/sdk/src/tcp/tcp_tls_verifier.rs
new file mode 100644
index 00000000..0a1e710f
--- /dev/null
+++ b/core/sdk/src/tcp/tcp_tls_verifier.rs
@@ -0,0 +1,59 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified,
ServerCertVerifier};
+use rustls::pki_types::{CertificateDer, ServerName, UnixTime};
+use rustls::{DigitallySignedStruct, Error, SignatureScheme};
+
+#[derive(Debug)]
+pub struct NoServerVerification;
+
+impl ServerCertVerifier for NoServerVerification {
+ fn verify_server_cert(
+ &self,
+ _end_entity: &CertificateDer<'_>,
+ _intermediates: &[CertificateDer<'_>],
+ _server_name: &ServerName<'_>,
+ _ocsp_response: &[u8],
+ _now: UnixTime,
+ ) -> Result<ServerCertVerified, Error> {
+ Ok(ServerCertVerified::assertion())
+ }
+
+ fn verify_tls12_signature(
+ &self,
+ _message: &[u8],
+ _cert: &CertificateDer<'_>,
+ _dss: &DigitallySignedStruct,
+ ) -> Result<HandshakeSignatureValid, Error> {
+ Ok(HandshakeSignatureValid::assertion())
+ }
+
+ fn verify_tls13_signature(
+ &self,
+ _message: &[u8],
+ _cert: &CertificateDer<'_>,
+ _dss: &DigitallySignedStruct,
+ ) -> Result<HandshakeSignatureValid, Error> {
+ Ok(HandshakeSignatureValid::assertion())
+ }
+
+ fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
+ vec![SignatureScheme::ECDSA_NISTP256_SHA256]
+ }
+}
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 34ab57fb..b0e46455 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -88,7 +88,6 @@ opentelemetry_sdk = { version = "0.30.0", features = [
] }
prometheus-client = "0.23.1"
quinn = { workspace = true }
-rcgen = "0.13.2"
reqwest = { workspace = true, features = ["rustls-tls-no-provider"] }
ring = "0.17.14"
rust-s3 = { workspace = true }
diff --git a/core/server/src/configs/defaults.rs
b/core/server/src/configs/defaults.rs
index d0d4f261..82b560cf 100644
--- a/core/server/src/configs/defaults.rs
+++ b/core/server/src/configs/defaults.rs
@@ -153,6 +153,7 @@ impl Default for TcpTlsConfig {
fn default() -> TcpTlsConfig {
TcpTlsConfig {
enabled: SERVER_CONFIG.tcp.tls.enabled,
+ self_signed: SERVER_CONFIG.tcp.tls.self_signed,
cert_file: SERVER_CONFIG.tcp.tls.cert_file.parse().unwrap(),
key_file: SERVER_CONFIG.tcp.tls.key_file.parse().unwrap(),
}
diff --git a/core/server/src/configs/tcp.rs b/core/server/src/configs/tcp.rs
index e5cfed12..eeec7779 100644
--- a/core/server/src/configs/tcp.rs
+++ b/core/server/src/configs/tcp.rs
@@ -33,6 +33,7 @@ pub struct TcpConfig {
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct TcpTlsConfig {
pub enabled: bool,
+ pub self_signed: bool,
pub cert_file: String,
pub key_file: String,
}
diff --git a/core/server/src/quic/quic_server.rs
b/core/server/src/quic/quic_server.rs
index 96d78d96..e024312e 100644
--- a/core/server/src/quic/quic_server.rs
+++ b/core/server/src/quic/quic_server.rs
@@ -97,16 +97,11 @@ fn configure_quic(config: QuicConfig) ->
Result<quinn::ServerConfig, QuicError>
fn generate_self_signed_cert<'a>() -> Result<(Vec<CertificateDer<'a>>,
PrivateKeyDer<'a>), QuicError>
{
- let certificate =
rcgen::generate_simple_self_signed(vec!["localhost".into()]).unwrap();
- let certificate_der = certificate.cert.der();
- let private_key = certificate.key_pair.serialize_der();
- let private_key = PrivateKeyDer::try_from(private_key)
+ iggy_common::generate_self_signed_certificate("localhost")
.with_error_context(|error| {
- format!("{COMPONENT} (error: {error}) - failed to parse private
key")
+ format!("{COMPONENT} (error: {error}) - failed to generate
self-signed certificate")
})
- .map_err(|_| QuicError::CertGenerationError)?;
- let cert_chain = vec![certificate_der.clone()];
- Ok((cert_chain, private_key))
+ .map_err(|_| QuicError::CertGenerationError)
}
fn load_certificates(
diff --git a/core/server/src/tcp/tcp_tls_listener.rs
b/core/server/src/tcp/tcp_tls_listener.rs
index 91094e48..b333841c 100644
--- a/core/server/src/tcp/tcp_tls_listener.rs
+++ b/core/server/src/tcp/tcp_tls_listener.rs
@@ -22,6 +22,7 @@ use crate::streaming::clients::client_manager::Transport;
use crate::streaming::systems::system::SharedSystem;
use crate::tcp::connection_handler::{handle_connection, handle_error};
use rustls::ServerConfig;
+use rustls::pki_types::{CertificateDer, PrivateKeyDer};
use rustls_pemfile::{certs, private_key};
use std::io::BufReader;
use std::net::SocketAddr;
@@ -42,36 +43,20 @@ pub(crate) async fn start(
tokio::spawn(async move {
let _ =
rustls::crypto::aws_lc_rs::default_provider().install_default();
- let cert_file_path = &config.cert_file;
- let cert_file = std::fs::File::open(cert_file_path)
- .unwrap_or_else(|e| panic!("Unable to open certificate file
'{cert_file_path}': {e}"));
- let mut cert_reader = BufReader::new(cert_file);
-
- let certs: Vec<_> = certs(&mut cert_reader)
- .collect::<Result<Vec<_>, _>>()
- .unwrap_or_else(|e| {
- panic!("Unable to parse certificates from '{cert_file_path}':
{e}")
- });
-
- if certs.is_empty() {
- panic!("No certificates found in certificate file
'{cert_file_path}'");
- }
-
- let key_file_path = &config.key_file;
- let key_file = std::fs::File::open(key_file_path)
- .unwrap_or_else(|e| panic!("Unable to open key file
'{key_file_path}': {e}"));
- let mut key_reader = BufReader::new(key_file);
-
- let key = private_key(&mut key_reader)
- .unwrap_or_else(|e| panic!("Unable to parse private key from
'{key_file_path}': {e}"))
- .unwrap_or_else(|| panic!("No private key found in key file
'{key_file_path}'"));
+ let (certs, key) =
+ if config.self_signed &&
!std::path::Path::new(&config.cert_file).exists() {
+ info!("Generating self-signed certificate for TCP TLS server");
+ generate_self_signed_cert()
+ .unwrap_or_else(|e| panic!("Failed to generate self-signed
certificate: {e}"))
+ } else {
+ load_certificates(&config.cert_file, &config.key_file)
+ .unwrap_or_else(|e| panic!("Failed to load certificates:
{e}"))
+ };
let server_config = ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(certs, key)
- .unwrap_or_else(|e| {
- panic!("Unable to create TLS server config with cert
'{cert_file_path}' and key '{key_file_path}': {e}")
- });
+ .unwrap_or_else(|e| panic!("Unable to create TLS server config:
{e}"));
let acceptor = TlsAcceptor::from(Arc::new(server_config));
@@ -109,28 +94,34 @@ pub(crate) async fn start(
let client_id = session.client_id;
let acceptor = acceptor.clone();
- let stream =
acceptor.accept(stream).await.unwrap_or_else(|e| {
- panic!("Failed to accept TLS connection from
'{address}': {e}",);
- });
- let system = system.clone();
- let mut sender = SenderKind::get_tcp_tls_sender(stream);
- tokio::spawn(async move {
- if let Err(error) =
- handle_connection(session, &mut sender,
system.clone()).await
- {
- handle_error(error);
- system.read().await.delete_client(client_id).await;
- if let Err(error) = sender.shutdown().await {
- error!(
- "Failed to shutdown TCP stream for client:
{client_id}, address: {address}. {error}"
- );
- } else {
- info!(
- "Successfully closed TCP stream for
client: {client_id}, address: {address}."
- );
- }
+ let system_clone = system.clone();
+ match acceptor.accept(stream).await {
+ Ok(stream) => {
+ let mut sender =
SenderKind::get_tcp_tls_sender(stream);
+ tokio::spawn(async move {
+ if let Err(error) =
+ handle_connection(session, &mut sender,
system_clone.clone())
+ .await
+ {
+ handle_error(error);
+
system_clone.read().await.delete_client(client_id).await;
+ if let Err(error) =
sender.shutdown().await {
+ error!(
+ "Failed to shutdown TCP stream for
client: {client_id}, address: {address}. {error}"
+ );
+ } else {
+ info!(
+ "Successfully closed TCP stream
for client: {client_id}, address: {address}."
+ );
+ }
+ }
+ });
+ }
+ Err(e) => {
+ error!("Failed to accept TLS connection from
'{address}': {e}");
+
system_clone.read().await.delete_client(client_id).await;
}
- });
+ }
}
Err(error) => error!("Unable to accept TCP TLS socket.
{error}"),
}
@@ -141,3 +132,27 @@ pub(crate) async fn start(
Err(_) => panic!("Failed to get the local address for TCP TLS
listener."),
}
}
+
+fn generate_self_signed_cert()
+-> Result<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>), Box<dyn
std::error::Error>> {
+ iggy_common::generate_self_signed_certificate("localhost")
+}
+
+fn load_certificates(
+ cert_file: &str,
+ key_file: &str,
+) -> Result<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>), Box<dyn
std::error::Error>> {
+ let cert_file = std::fs::File::open(cert_file)?;
+ let mut cert_reader = BufReader::new(cert_file);
+ let certs: Vec<_> = certs(&mut cert_reader).collect::<Result<Vec<_>,
_>>()?;
+
+ if certs.is_empty() {
+ return Err("No certificates found in certificate file".into());
+ }
+
+ let key_file = std::fs::File::open(key_file)?;
+ let mut key_reader = BufReader::new(key_file);
+ let key = private_key(&mut key_reader)?.ok_or("No private key found in key
file")?;
+
+ Ok((certs, key))
+}
diff --git a/foreign/cpp/tests/e2e/server.toml
b/foreign/cpp/tests/e2e/server.toml
index d765b88e..9be27112 100644
--- a/foreign/cpp/tests/e2e/server.toml
+++ b/foreign/cpp/tests/e2e/server.toml
@@ -104,10 +104,10 @@ endpoint = "/metrics"
enabled = false
# Path to the TLS certificate file.
-cert_file = "certs/iggy_cert.pem"
+cert_file = "core/certs/iggy_cert.pem"
# Path to the TLS key file.
-key_file = "certs/iggy_key.pem"
+key_file = "core/certs/iggy_key.pem"
# TCP server configuration.
[tcp]
@@ -173,10 +173,10 @@ max_idle_timeout = "10 s"
self_signed = true
# Path to the QUIC TLS certificate file.
-cert_file = "certs/iggy_cert.pem"
+cert_file = "core/certs/iggy_cert.pem"
# Path to the QUIC TLS key file.
-key_file = "certs/iggy_key.pem"
+key_file = "core/certs/iggy_key.pem"
# Message cleaner configuration.
[message_cleaner]