This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new e46f294b7 refactor(bench): remove TestServer startup capability (#2611)
e46f294b7 is described below
commit e46f294b7af4f86b0d7e26d984205a019a8885f8
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Jan 26 15:40:15 2026 +0100
refactor(bench): remove TestServer startup capability (#2611)
The integration crate dependency existed solely for TestServer
functionality that was never used in practice - users always
run iggy-server separately before benchmarking.
---
Cargo.lock | 2 -
core/bench/Cargo.toml | 4 -
.../bench/src/actors/consumer/client/high_level.rs | 2 +-
core/bench/src/actors/consumer/client/low_level.rs | 2 +-
.../actors/consumer/typed_benchmark_consumer.rs | 2 +-
.../bench/src/actors/producer/client/high_level.rs | 2 +-
core/bench/src/actors/producer/client/low_level.rs | 2 +-
.../actors/producer/typed_benchmark_producer.rs | 2 +-
.../typed_banchmark_producing_consumer.rs | 2 +-
core/bench/src/analytics/report_builder.rs | 2 +-
core/bench/src/args/common.rs | 45 +----
core/bench/src/args/defaults.rs | 4 -
core/bench/src/args/examples.rs | 3 -
.../src/benchmarks/balanced_consumer_group.rs | 2 +-
core/bench/src/benchmarks/balanced_producer.rs | 2 +-
.../balanced_producer_and_consumer_group.rs | 2 +-
core/bench/src/benchmarks/benchmark.rs | 2 +-
core/bench/src/benchmarks/common.rs | 2 +-
.../benchmarks/end_to_end_producing_consumer.rs | 2 +-
.../end_to_end_producing_consumer_group.rs | 2 +-
core/bench/src/benchmarks/pinned_consumer.rs | 2 +-
core/bench/src/benchmarks/pinned_producer.rs | 2 +-
.../src/benchmarks/pinned_producer_and_consumer.rs | 2 +-
core/bench/src/runner.rs | 11 +-
core/bench/src/utils/client_factory.rs | 163 +++++++++++++++++-
core/bench/src/utils/mod.rs | 4 +-
core/bench/src/utils/server_starter.rs | 188 ---------------------
scripts/check-backwards-compat.sh | 6 +-
28 files changed, 183 insertions(+), 283 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 26de47328..1167a2ea9 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4466,14 +4466,12 @@ dependencies = [
"hostname",
"human-repr",
"iggy",
- "integration",
"nonzero_lit",
"rand 0.9.2",
"rayon",
"serde",
"sysinfo",
"tokio",
- "toml 0.9.11+spec-1.1.0",
"tracing",
"tracing-appender",
"tracing-subscriber",
diff --git a/core/bench/Cargo.toml b/core/bench/Cargo.toml
index 98fe0570a..42ed50e44 100644
--- a/core/bench/Cargo.toml
+++ b/core/bench/Cargo.toml
@@ -29,8 +29,6 @@ readme = "../../README.md"
[[bin]]
name = "iggy-bench"
path = "src/main.rs"
-# Due to dependency to integration, which has a dependency to server, setting
-# mimalloc on server is also setting it on bench.
[dependencies]
async-trait = { workspace = true }
@@ -45,14 +43,12 @@ governor = "0.10.4"
hostname = "0.4.2"
human-repr = { workspace = true }
iggy = { workspace = true }
-integration = { workspace = true }
nonzero_lit = { workspace = true }
rand = { workspace = true }
rayon = "1.11.0"
serde = { workspace = true }
sysinfo = { workspace = true }
tokio = { workspace = true }
-toml = { workspace = true }
tracing = { workspace = true }
tracing-appender = { workspace = true }
tracing-subscriber = { workspace = true }
diff --git a/core/bench/src/actors/consumer/client/high_level.rs
b/core/bench/src/actors/consumer/client/high_level.rs
index b1f3a4f04..22b04679f 100644
--- a/core/bench/src/actors/consumer/client/high_level.rs
+++ b/core/bench/src/actors/consumer/client/high_level.rs
@@ -24,9 +24,9 @@ use crate::actors::{
},
};
+use crate::utils::{ClientFactory, login_root};
use futures_util::StreamExt;
use iggy::prelude::*;
-use integration::test_server::{ClientFactory, login_root};
use std::{sync::Arc, time::Duration};
use tokio::time::{Instant, timeout};
use tracing::{error, warn};
diff --git a/core/bench/src/actors/consumer/client/low_level.rs
b/core/bench/src/actors/consumer/client/low_level.rs
index 27feb3a4e..b501942a0 100644
--- a/core/bench/src/actors/consumer/client/low_level.rs
+++ b/core/bench/src/actors/consumer/client/low_level.rs
@@ -20,9 +20,9 @@ use crate::actors::consumer::client::BenchmarkConsumerClient;
use crate::actors::consumer::client::interface::{BenchmarkConsumerConfig,
ConsumerClient};
use crate::actors::{ApiLabel, BatchMetrics, BenchmarkInit};
use crate::benchmarks::common::create_consumer;
+use crate::utils::{ClientFactory, login_root};
use crate::utils::{batch_total_size_bytes, batch_user_size_bytes};
use iggy::prelude::*;
-use integration::test_server::{ClientFactory, login_root};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Instant;
diff --git a/core/bench/src/actors/consumer/typed_benchmark_consumer.rs
b/core/bench/src/actors/consumer/typed_benchmark_consumer.rs
index 723adc35b..4455e6895 100644
--- a/core/bench/src/actors/consumer/typed_benchmark_consumer.rs
+++ b/core/bench/src/actors/consumer/typed_benchmark_consumer.rs
@@ -18,6 +18,7 @@
use std::sync::Arc;
+use crate::utils::ClientFactory;
use crate::{
actors::consumer::{
BenchmarkConsumer,
@@ -33,7 +34,6 @@ use bench_report::{
numeric_parameter::BenchmarkNumericParameter,
};
use iggy::prelude::*;
-use integration::test_server::ClientFactory;
pub enum TypedBenchmarkConsumer {
High(BenchmarkConsumer<HighLevelConsumerClient>),
diff --git a/core/bench/src/actors/producer/client/high_level.rs
b/core/bench/src/actors/producer/client/high_level.rs
index 7544d2340..7fdf923b1 100644
--- a/core/bench/src/actors/producer/client/high_level.rs
+++ b/core/bench/src/actors/producer/client/high_level.rs
@@ -16,6 +16,7 @@
* under the License.
*/
+use crate::utils::{ClientFactory, login_root};
use crate::{
actors::{
ApiLabel, BatchMetrics, BenchmarkInit,
@@ -27,7 +28,6 @@ use crate::{
utils::batch_generator::BenchmarkBatchGenerator,
};
use iggy::prelude::*;
-use integration::test_server::{ClientFactory, login_root};
use std::sync::Arc;
use tokio::time::Instant;
diff --git a/core/bench/src/actors/producer/client/low_level.rs
b/core/bench/src/actors/producer/client/low_level.rs
index fc2f483ed..b132f9a97 100644
--- a/core/bench/src/actors/producer/client/low_level.rs
+++ b/core/bench/src/actors/producer/client/low_level.rs
@@ -18,6 +18,7 @@
use std::sync::Arc;
+use crate::utils::{ClientFactory, login_root};
use crate::{
actors::{
ApiLabel, BatchMetrics, BenchmarkInit,
@@ -29,7 +30,6 @@ use crate::{
utils::batch_generator::BenchmarkBatchGenerator,
};
use iggy::prelude::*;
-use integration::test_server::{ClientFactory, login_root};
use tokio::time::Instant;
pub struct LowLevelProducerClient {
diff --git a/core/bench/src/actors/producer/typed_benchmark_producer.rs
b/core/bench/src/actors/producer/typed_benchmark_producer.rs
index 2a27fd33a..32655ee83 100644
--- a/core/bench/src/actors/producer/typed_benchmark_producer.rs
+++ b/core/bench/src/actors/producer/typed_benchmark_producer.rs
@@ -16,6 +16,7 @@
* under the License.
*/
+use crate::utils::ClientFactory;
use crate::{
actors::producer::{
BenchmarkProducer,
@@ -31,7 +32,6 @@ use bench_report::{
numeric_parameter::BenchmarkNumericParameter,
};
use iggy::prelude::*;
-use integration::test_server::ClientFactory;
use std::sync::Arc;
pub enum TypedBenchmarkProducer {
diff --git
a/core/bench/src/actors/producing_consumer/typed_banchmark_producing_consumer.rs
b/core/bench/src/actors/producing_consumer/typed_banchmark_producing_consumer.rs
index ac7e121b6..16e12a59d 100644
---
a/core/bench/src/actors/producing_consumer/typed_banchmark_producing_consumer.rs
+++
b/core/bench/src/actors/producing_consumer/typed_banchmark_producing_consumer.rs
@@ -18,6 +18,7 @@
use std::sync::Arc;
+use crate::utils::ClientFactory;
use crate::{
actors::{
consumer::client::{
@@ -38,7 +39,6 @@ use bench_report::{
};
use iggy::prelude::*;
-use integration::test_server::ClientFactory;
pub enum TypedBenchmarkProducingConsumer {
High(BenchmarkProducingConsumer<HighLevelProducerClient,
HighLevelConsumerClient>),
diff --git a/core/bench/src/analytics/report_builder.rs
b/core/bench/src/analytics/report_builder.rs
index 8a00c8ccf..2ec001091 100644
--- a/core/bench/src/analytics/report_builder.rs
+++ b/core/bench/src/analytics/report_builder.rs
@@ -19,6 +19,7 @@
use std::{collections::HashMap, thread};
use super::metrics::group::{from_individual_metrics,
from_producers_and_consumers_statistics};
+use crate::utils::ClientFactory;
use crate::utils::get_server_stats;
use bench_report::{
actor_kind::ActorKind,
@@ -31,7 +32,6 @@ use bench_report::{
};
use chrono::{DateTime, Utc};
use iggy::prelude::{CacheMetrics, CacheMetricsKey, IggyTimestamp, Stats};
-use integration::test_server::ClientFactory;
use std::sync::Arc;
pub struct BenchmarkReportBuilder;
diff --git a/core/bench/src/args/common.rs b/core/bench/src/args/common.rs
index c3a6f2bd4..ebc54e48b 100644
--- a/core/bench/src/args/common.rs
+++ b/core/bench/src/args/common.rs
@@ -22,8 +22,7 @@ use super::props::{BenchmarkKindProps,
BenchmarkTransportProps};
use super::{
defaults::{
DEFAULT_MESSAGE_BATCHES, DEFAULT_MESSAGE_SIZE,
DEFAULT_MESSAGES_PER_BATCH,
- DEFAULT_MOVING_AVERAGE_WINDOW, DEFAULT_PERFORM_CLEANUP,
DEFAULT_SAMPLING_TIME,
- DEFAULT_SERVER_STDOUT_VISIBILITY, DEFAULT_SKIP_SERVER_START,
DEFAULT_WARMUP_TIME,
+ DEFAULT_MOVING_AVERAGE_WINDOW, DEFAULT_SAMPLING_TIME,
DEFAULT_WARMUP_TIME,
},
transport::BenchmarkTransportCommand,
};
@@ -32,9 +31,7 @@ use
bench_report::numeric_parameter::BenchmarkNumericParameter;
use clap::error::ErrorKind;
use clap::{CommandFactory, Parser};
use iggy::prelude::{IggyByteSize, IggyDuration, TransportProtocol};
-use std::net::SocketAddr;
use std::num::NonZeroU32;
-use std::path::Path;
use std::str::FromStr;
#[derive(Parser, Debug)]
@@ -72,10 +69,6 @@ pub struct IggyBenchArgs {
#[arg(long, short = 'w', default_value_t =
IggyDuration::from_str(DEFAULT_WARMUP_TIME).unwrap())]
pub warmup_time: IggyDuration,
- /// Server stdout visibility
- #[arg(long, short = 'v', default_value_t =
DEFAULT_SERVER_STDOUT_VISIBILITY)]
- pub verbose: bool,
-
/// Sampling time for metrics collection. It is also used as bucket size
for time series calculations.
#[arg(long, short = 't', default_value_t =
IggyDuration::from_str(DEFAULT_SAMPLING_TIME).unwrap(), value_parser =
IggyDuration::from_str)]
pub sampling_time: IggyDuration,
@@ -84,34 +77,11 @@ pub struct IggyBenchArgs {
#[arg(long, short = 'W', default_value_t = DEFAULT_MOVING_AVERAGE_WINDOW)]
pub moving_average_window: u32,
- /// Shutdown iggy-server and remove server `local_data` directory after
the benchmark is finished.
- /// Only applicable to local benchmarks.
- #[arg(long, default_value_t = DEFAULT_PERFORM_CLEANUP,
verbatim_doc_comment)]
- pub cleanup: bool,
-
- /// iggy-server executable path.
- /// Only applicable to local benchmarks.
- #[arg(long, short='e', default_value = None, value_parser =
validate_server_executable_path)]
- pub server_executable_path: Option<String>,
-
- /// Skip server start.
- /// Only applicable to local benchmarks.
- #[arg(long, short = 'k', default_value_t = DEFAULT_SKIP_SERVER_START,
verbatim_doc_comment)]
- pub skip_server_start: bool,
-
/// Use high-level API for actors
#[arg(long, short = 'H', default_value_t = false)]
pub high_level_api: bool,
}
-fn validate_server_executable_path(v: &str) -> Result<String, String> {
- if Path::new(v).exists() {
- Ok(v.to_owned())
- } else {
- Err(format!("Provided server executable '{v}' does not exist."))
- }
-}
-
impl IggyBenchArgs {
pub fn transport_command(&self) -> &BenchmarkTransportCommand {
self.benchmark_kind.transport_command()
@@ -133,19 +103,6 @@ impl IggyBenchArgs {
}
pub fn validate(&mut self) {
- let server_address =
self.server_address().parse::<SocketAddr>().unwrap();
- if (self.cleanup || self.verbose) &&
!server_address.ip().is_loopback() {
- Self::command()
- .error(
- ErrorKind::ArgumentConflict,
- format!(
- "Cannot use cleanup or verbose flags with a
non-loopback server address: {}",
- self.server_address()
- ),
- )
- .exit();
- }
-
if self.output_dir().is_none()
&& (self.gitref().is_some()
|| self.identifier().is_some()
diff --git a/core/bench/src/args/defaults.rs b/core/bench/src/args/defaults.rs
index 7695782fb..bfd265159 100644
--- a/core/bench/src/args/defaults.rs
+++ b/core/bench/src/args/defaults.rs
@@ -46,11 +46,7 @@ pub const DEFAULT_NUMBER_OF_CONSUMERS: NonZeroU32 = u32!(8);
pub const DEFAULT_NUMBER_OF_CONSUMER_GROUPS: NonZeroU32 = u32!(1);
pub const DEFAULT_NUMBER_OF_PRODUCERS: NonZeroU32 = u32!(8);
-pub const DEFAULT_PERFORM_CLEANUP: bool = false;
-pub const DEFAULT_SERVER_STDOUT_VISIBILITY: bool = false;
-
pub const DEFAULT_WARMUP_TIME: &str = "0s";
-pub const DEFAULT_SKIP_SERVER_START: bool = false;
pub const DEFAULT_SAMPLING_TIME: &str = "10ms";
pub const DEFAULT_MOVING_AVERAGE_WINDOW: u32 = 20;
diff --git a/core/bench/src/args/examples.rs b/core/bench/src/args/examples.rs
index 84a7839df..7a00929e5 100644
--- a/core/bench/src/args/examples.rs
+++ b/core/bench/src/args/examples.rs
@@ -57,13 +57,10 @@ const EXAMPLES: &str = r#"EXAMPLES:
Mutually exclusive with --message-batches
--message-size (-m): Message size in bytes [default: 1000]
For random sizes, use range format: "100..1000"
- --start-stream-id (-S): Start stream ID [default: 1]
--rate-limit (-r): Optional throughput limit per producer (e.g., "50KB",
"10MB")
--warmup-time (-w): Warmup duration [default: 0s]
--sampling-time (-t): Metrics sampling interval [default: 10ms]
--moving-average-window (-W): Window size for moving average [default: 20]
- --cleanup: Remove server data after benchmark
- --verbose (-v): Show server output (only applicable for local server)
Benchmark-specific options (after the benchmark command):
--streams (-s): Number of streams
diff --git a/core/bench/src/benchmarks/balanced_consumer_group.rs
b/core/bench/src/benchmarks/balanced_consumer_group.rs
index ac60fc6f6..5bf6026e3 100644
--- a/core/bench/src/benchmarks/balanced_consumer_group.rs
+++ b/core/bench/src/benchmarks/balanced_consumer_group.rs
@@ -17,6 +17,7 @@
*/
use super::benchmark::Benchmarkable;
+use crate::utils::ClientFactory;
use crate::{
args::common::IggyBenchArgs,
benchmarks::common::{build_consumer_futures, init_consumer_groups},
@@ -24,7 +25,6 @@ use crate::{
use async_trait::async_trait;
use bench_report::{benchmark_kind::BenchmarkKind,
individual_metrics::BenchmarkIndividualMetrics};
use iggy::prelude::*;
-use integration::test_server::ClientFactory;
use std::sync::Arc;
use tokio::task::JoinSet;
use tracing::info;
diff --git a/core/bench/src/benchmarks/balanced_producer.rs
b/core/bench/src/benchmarks/balanced_producer.rs
index 6e81ab5e3..b9e7aa892 100644
--- a/core/bench/src/benchmarks/balanced_producer.rs
+++ b/core/bench/src/benchmarks/balanced_producer.rs
@@ -19,11 +19,11 @@
use super::benchmark::Benchmarkable;
use crate::args::common::IggyBenchArgs;
use crate::benchmarks::common::build_producer_futures;
+use crate::utils::ClientFactory;
use async_trait::async_trait;
use bench_report::benchmark_kind::BenchmarkKind;
use bench_report::individual_metrics::BenchmarkIndividualMetrics;
use iggy::prelude::*;
-use integration::test_server::ClientFactory;
use std::sync::Arc;
use tokio::task::JoinSet;
use tracing::info;
diff --git a/core/bench/src/benchmarks/balanced_producer_and_consumer_group.rs
b/core/bench/src/benchmarks/balanced_producer_and_consumer_group.rs
index 0eab5db8b..10ed9bc4b 100644
--- a/core/bench/src/benchmarks/balanced_producer_and_consumer_group.rs
+++ b/core/bench/src/benchmarks/balanced_producer_and_consumer_group.rs
@@ -17,6 +17,7 @@
*/
use super::benchmark::Benchmarkable;
+use crate::utils::ClientFactory;
use crate::{
args::common::IggyBenchArgs,
benchmarks::common::{build_consumer_futures, build_producer_futures,
init_consumer_groups},
@@ -24,7 +25,6 @@ use crate::{
use async_trait::async_trait;
use bench_report::{benchmark_kind::BenchmarkKind,
individual_metrics::BenchmarkIndividualMetrics};
use iggy::prelude::*;
-use integration::test_server::ClientFactory;
use std::sync::Arc;
use tokio::task::JoinSet;
use tracing::info;
diff --git a/core/bench/src/benchmarks/benchmark.rs
b/core/bench/src/benchmarks/benchmark.rs
index 31f1828c3..6a334b4f0 100644
--- a/core/bench/src/benchmarks/benchmark.rs
+++ b/core/bench/src/benchmarks/benchmark.rs
@@ -17,13 +17,13 @@
*/
use crate::args::kind::BenchmarkKindCommand;
+use crate::utils::{ClientFactory, login_root};
use crate::{args::common::IggyBenchArgs,
utils::client_factory::create_client_factory};
use async_trait::async_trait;
use bench_report::benchmark_kind::BenchmarkKind;
use bench_report::individual_metrics::BenchmarkIndividualMetrics;
use iggy::clients::client::IggyClient;
use iggy::prelude::*;
-use integration::test_server::{ClientFactory, login_root};
use std::sync::Arc;
use tokio::task::JoinSet;
use tracing::info;
diff --git a/core/bench/src/benchmarks/common.rs
b/core/bench/src/benchmarks/common.rs
index 97c66fb7b..7858f6fca 100644
--- a/core/bench/src/benchmarks/common.rs
+++ b/core/bench/src/benchmarks/common.rs
@@ -17,6 +17,7 @@
*/
use super::{CONSUMER_GROUP_BASE_ID, CONSUMER_GROUP_NAME_PREFIX};
+use crate::utils::{ClientFactory, login_root};
use crate::{
actors::{
consumer::typed_benchmark_consumer::TypedBenchmarkConsumer,
@@ -28,7 +29,6 @@ use crate::{
};
use bench_report::{benchmark_kind::BenchmarkKind,
individual_metrics::BenchmarkIndividualMetrics};
use iggy::prelude::*;
-use integration::test_server::{ClientFactory, login_root};
use std::{future::Future, sync::Arc};
use tracing::{error, info};
diff --git a/core/bench/src/benchmarks/end_to_end_producing_consumer.rs
b/core/bench/src/benchmarks/end_to_end_producing_consumer.rs
index 2818fea6d..1867ca2d2 100644
--- a/core/bench/src/benchmarks/end_to_end_producing_consumer.rs
+++ b/core/bench/src/benchmarks/end_to_end_producing_consumer.rs
@@ -18,11 +18,11 @@
use crate::args::common::IggyBenchArgs;
use crate::benchmarks::common::build_producing_consumers_futures;
+use crate::utils::ClientFactory;
use async_trait::async_trait;
use bench_report::benchmark_kind::BenchmarkKind;
use bench_report::individual_metrics::BenchmarkIndividualMetrics;
use iggy::prelude::*;
-use integration::test_server::ClientFactory;
use std::sync::Arc;
use tokio::task::JoinSet;
use tracing::info;
diff --git a/core/bench/src/benchmarks/end_to_end_producing_consumer_group.rs
b/core/bench/src/benchmarks/end_to_end_producing_consumer_group.rs
index 2adda4d55..2ef5cd190 100644
--- a/core/bench/src/benchmarks/end_to_end_producing_consumer_group.rs
+++ b/core/bench/src/benchmarks/end_to_end_producing_consumer_group.rs
@@ -18,11 +18,11 @@
use crate::args::common::IggyBenchArgs;
use crate::benchmarks::common::{build_producing_consumer_groups_futures,
init_consumer_groups};
+use crate::utils::ClientFactory;
use async_trait::async_trait;
use bench_report::benchmark_kind::BenchmarkKind;
use bench_report::individual_metrics::BenchmarkIndividualMetrics;
use iggy::prelude::*;
-use integration::test_server::ClientFactory;
use std::sync::Arc;
use tokio::task::JoinSet;
use tracing::info;
diff --git a/core/bench/src/benchmarks/pinned_consumer.rs
b/core/bench/src/benchmarks/pinned_consumer.rs
index 870df77a7..4c44b9fec 100644
--- a/core/bench/src/benchmarks/pinned_consumer.rs
+++ b/core/bench/src/benchmarks/pinned_consumer.rs
@@ -19,11 +19,11 @@
use crate::args::common::IggyBenchArgs;
use crate::benchmarks::benchmark::Benchmarkable;
use crate::benchmarks::common::build_consumer_futures;
+use crate::utils::ClientFactory;
use async_trait::async_trait;
use bench_report::benchmark_kind::BenchmarkKind;
use bench_report::individual_metrics::BenchmarkIndividualMetrics;
use iggy::prelude::IggyError;
-use integration::test_server::ClientFactory;
use std::sync::Arc;
use tokio::task::JoinSet;
use tracing::info;
diff --git a/core/bench/src/benchmarks/pinned_producer.rs
b/core/bench/src/benchmarks/pinned_producer.rs
index 3514e24d9..8630eb573 100644
--- a/core/bench/src/benchmarks/pinned_producer.rs
+++ b/core/bench/src/benchmarks/pinned_producer.rs
@@ -19,11 +19,11 @@
use crate::args::common::IggyBenchArgs;
use crate::benchmarks::benchmark::Benchmarkable;
use crate::benchmarks::common::build_producer_futures;
+use crate::utils::ClientFactory;
use async_trait::async_trait;
use bench_report::benchmark_kind::BenchmarkKind;
use bench_report::individual_metrics::BenchmarkIndividualMetrics;
use iggy::prelude::{IggyError, MaxTopicSize};
-use integration::test_server::ClientFactory;
use std::sync::Arc;
use tokio::task::JoinSet;
use tracing::info;
diff --git a/core/bench/src/benchmarks/pinned_producer_and_consumer.rs
b/core/bench/src/benchmarks/pinned_producer_and_consumer.rs
index cde6632cc..8f2630396 100644
--- a/core/bench/src/benchmarks/pinned_producer_and_consumer.rs
+++ b/core/bench/src/benchmarks/pinned_producer_and_consumer.rs
@@ -18,11 +18,11 @@
use crate::args::common::IggyBenchArgs;
use crate::benchmarks::benchmark::Benchmarkable;
use crate::benchmarks::common::{build_consumer_futures,
build_producer_futures};
+use crate::utils::ClientFactory;
use async_trait::async_trait;
use bench_report::benchmark_kind::BenchmarkKind;
use bench_report::individual_metrics::BenchmarkIndividualMetrics;
use iggy::prelude::*;
-use integration::test_server::ClientFactory;
use std::sync::Arc;
use tokio::task::JoinSet;
use tracing::info;
diff --git a/core/bench/src/runner.rs b/core/bench/src/runner.rs
index ab9553b32..d9007bfd7 100644
--- a/core/bench/src/runner.rs
+++ b/core/bench/src/runner.rs
@@ -21,34 +21,27 @@ use crate::args::common::IggyBenchArgs;
use crate::benchmarks::benchmark::Benchmarkable;
use crate::plot::{ChartType, plot_chart};
use crate::utils::cpu_name::append_cpu_name_lowercase;
-use crate::utils::server_starter::start_server_if_needed;
use crate::utils::{collect_server_logs_and_save_to_file,
params_from_args_and_metrics};
use bench_report::hardware::BenchmarkHardware;
use iggy::prelude::IggyError;
-use integration::test_server::TestServer;
use std::path::Path;
use std::time::Duration;
use tokio::time::sleep;
use tracing::{error, info};
pub struct BenchmarkRunner {
- pub args: Option<IggyBenchArgs>,
- pub test_server: Option<TestServer>,
+ args: Option<IggyBenchArgs>,
}
impl BenchmarkRunner {
pub const fn new(args: IggyBenchArgs) -> Self {
- Self {
- args: Some(args),
- test_server: None,
- }
+ Self { args: Some(args) }
}
#[allow(clippy::cognitive_complexity)]
pub async fn run(mut self) -> Result<(), IggyError> {
let args = self.args.take().unwrap();
let should_open_charts = args.open_charts();
- self.test_server = start_server_if_needed(&args).await;
let transport = args.transport();
let server_addr = args.server_address();
diff --git a/core/bench/src/utils/client_factory.rs
b/core/bench/src/utils/client_factory.rs
index 513d22300..3fda38d77 100644
--- a/core/bench/src/utils/client_factory.rs
+++ b/core/bench/src/utils/client_factory.rs
@@ -18,14 +18,165 @@
use crate::args::common::IggyBenchArgs;
use crate::args::transport::BenchmarkTransportCommand;
-use iggy::prelude::TransportProtocol;
-use integration::http_client::HttpClientFactory;
-use integration::quic_client::QuicClientFactory;
-use integration::tcp_client::TcpClientFactory;
-use integration::test_server::ClientFactory;
-use integration::websocket_client::WebSocketClientFactory;
+use async_trait::async_trait;
+use iggy::http::http_client::HttpClient;
+use iggy::prelude::{
+ Client, ClientWrapper, DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME,
HttpClientConfig,
+ IdentityInfo, IggyClient, QuicClientConfig, TcpClient, TcpClientConfig,
TransportProtocol,
+ UserClient, WebSocketClientConfig,
+};
+use iggy::quic::quic_client::QuicClient;
+use iggy::websocket::websocket_client::WebSocketClient;
use std::sync::Arc;
+#[async_trait]
+pub trait ClientFactory: Sync + Send {
+ async fn create_client(&self) -> ClientWrapper;
+ fn transport(&self) -> TransportProtocol;
+ fn server_addr(&self) -> String;
+}
+
+pub async fn login_root(client: &IggyClient) -> IdentityInfo {
+ client
+ .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+ .await
+ .unwrap()
+}
+
+#[derive(Debug, Clone)]
+pub struct HttpClientFactory {
+ pub server_addr: String,
+}
+
+#[async_trait]
+impl ClientFactory for HttpClientFactory {
+ async fn create_client(&self) -> ClientWrapper {
+ let config = HttpClientConfig {
+ api_url: format!("http://{}", self.server_addr.clone()),
+ ..HttpClientConfig::default()
+ };
+ let client = HttpClient::create(Arc::new(config)).unwrap();
+ ClientWrapper::Http(client)
+ }
+
+ fn transport(&self) -> TransportProtocol {
+ TransportProtocol::Http
+ }
+
+ fn server_addr(&self) -> String {
+ self.server_addr.clone()
+ }
+}
+
+#[derive(Debug, Clone, Default)]
+pub struct TcpClientFactory {
+ pub server_addr: String,
+ pub nodelay: bool,
+ pub tls_enabled: bool,
+ pub tls_domain: String,
+ pub tls_ca_file: Option<String>,
+ pub tls_validate_certificate: bool,
+}
+
+#[async_trait]
+impl ClientFactory for TcpClientFactory {
+ async fn create_client(&self) -> ClientWrapper {
+ let config = TcpClientConfig {
+ server_address: self.server_addr.clone(),
+ nodelay: self.nodelay,
+ tls_enabled: self.tls_enabled,
+ tls_domain: self.tls_domain.clone(),
+ tls_ca_file: self.tls_ca_file.clone(),
+ tls_validate_certificate: self.tls_validate_certificate,
+ ..TcpClientConfig::default()
+ };
+ let client = TcpClient::create(Arc::new(config)).unwrap_or_else(|e| {
+ panic!(
+ "Failed to create TcpClient, iggy-server has address {},
error: {:?}",
+ self.server_addr, e
+ )
+ });
+ Client::connect(&client).await.unwrap_or_else(|e| {
+ if self.tls_enabled {
+ panic!(
+ "Failed to connect to iggy-server at {} with TLS enabled,
error: {:?}\n\
+ Hint: Make sure the server is started with TLS enabled and
self-signed certificate:\n\
+ IGGY_TCP_TLS_ENABLED=true IGGY_TCP_TLS_SELF_SIGNED=true\n
+ or start iggy-bench with relevant tcp tls arguments: --tls
--tls-domain <domain> --tls-ca-file <ca_file>\n",
+ self.server_addr, e
+ )
+ } else {
+ panic!(
+ "Failed to connect to iggy-server at {}, error: {:?}",
+ self.server_addr, e
+ )
+ }
+ });
+ ClientWrapper::Tcp(client)
+ }
+
+ fn transport(&self) -> TransportProtocol {
+ TransportProtocol::Tcp
+ }
+
+ fn server_addr(&self) -> String {
+ self.server_addr.clone()
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct QuicClientFactory {
+ pub server_addr: String,
+}
+
+#[async_trait]
+impl ClientFactory for QuicClientFactory {
+ async fn create_client(&self) -> ClientWrapper {
+ let config = QuicClientConfig {
+ server_address: self.server_addr.clone(),
+ max_idle_timeout: 2_000_000,
+ ..QuicClientConfig::default()
+ };
+ let client = QuicClient::create(Arc::new(config)).unwrap();
+ Client::connect(&client).await.unwrap();
+ ClientWrapper::Quic(client)
+ }
+
+ fn transport(&self) -> TransportProtocol {
+ TransportProtocol::Quic
+ }
+
+ fn server_addr(&self) -> String {
+ self.server_addr.clone()
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct WebSocketClientFactory {
+ pub server_addr: String,
+}
+
+#[async_trait]
+impl ClientFactory for WebSocketClientFactory {
+ async fn create_client(&self) -> ClientWrapper {
+ let config = WebSocketClientConfig {
+ server_address: self.server_addr.clone(),
+ ..WebSocketClientConfig::default()
+ };
+ let client = WebSocketClient::create(Arc::new(config)).unwrap();
+ Client::connect(&client).await.unwrap();
+ ClientWrapper::WebSocket(client)
+ }
+
+ fn transport(&self) -> TransportProtocol {
+ TransportProtocol::WebSocket
+ }
+
+ fn server_addr(&self) -> String {
+ self.server_addr.clone()
+ }
+}
+
pub fn create_client_factory(args: &IggyBenchArgs) -> Arc<dyn ClientFactory> {
match &args.transport() {
TransportProtocol::Http => Arc::new(HttpClientFactory {
diff --git a/core/bench/src/utils/mod.rs b/core/bench/src/utils/mod.rs
index 2559a6a3a..e8ee8ebd3 100644
--- a/core/bench/src/utils/mod.rs
+++ b/core/bench/src/utils/mod.rs
@@ -22,7 +22,6 @@ use bench_report::{
transport::BenchmarkTransport,
};
use iggy::prelude::*;
-use integration::test_server::ClientFactory;
use std::{fs, path::Path, sync::Arc};
use tracing::{error, info};
@@ -38,12 +37,13 @@ use crate::args::{
},
};
+pub use client_factory::{ClientFactory, login_root};
+
pub mod batch_generator;
pub mod client_factory;
pub mod cpu_name;
pub mod finish_condition;
pub mod rate_limiter;
-pub mod server_starter;
pub fn batch_total_size_bytes(polled_messages: &PolledMessages) -> u64 {
polled_messages
diff --git a/core/bench/src/utils/server_starter.rs
b/core/bench/src/utils/server_starter.rs
deleted file mode 100644
index 1ee66d9b6..000000000
--- a/core/bench/src/utils/server_starter.rs
+++ /dev/null
@@ -1,188 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use crate::args::common::IggyBenchArgs;
-use iggy::prelude::TransportProtocol;
-use integration::test_server::{IpAddrKind, SYSTEM_PATH_ENV_VAR, TestServer};
-use serde::Deserialize;
-use std::net::SocketAddr;
-use std::{collections::HashMap, time::Instant};
-use tokio::net::{TcpStream, UdpSocket};
-use tracing::{info, warn};
-
-#[derive(Debug, Deserialize)]
-struct ServerConfig {
- http: ConfigAddress,
- tcp: ConfigAddress,
- quic: ConfigAddress,
- websocket: ConfigAddress,
-}
-
-#[derive(Debug, Deserialize)]
-struct ConfigAddress {
- address: String,
-}
-
-#[allow(clippy::cognitive_complexity)]
-pub async fn start_server_if_needed(args: &IggyBenchArgs) ->
Option<TestServer> {
- if args.skip_server_start {
- info!("Skipping iggy-server start");
- return None;
- }
-
- let (should_start, mut envs) = evaluate_server_start_condition(args).await;
-
- if should_start {
- envs.insert(SYSTEM_PATH_ENV_VAR.to_owned(), "local_data".to_owned());
-
- if args.verbose {
- envs.insert("IGGY_TEST_VERBOSE".to_owned(), "true".to_owned());
- info!("Enabling verbose output - iggy-server will logs print to
stdout");
- } else {
- info!("Disabling verbose output - iggy-server will print logs to
files");
- }
-
- info!(
- "Starting test server, transport: {}, cleanup: {}, verbosity: {}",
- args.transport(),
- args.cleanup,
- args.verbose
- );
- let mut test_server = TestServer::new(
- Some(envs),
- args.cleanup,
- args.server_executable_path.clone(),
- IpAddrKind::V4,
- );
- let now = Instant::now();
- test_server.start();
- let elapsed = now.elapsed();
- if elapsed.as_millis() > 1000 {
- warn!(
- "Test iggy-server started, pid: {}, startup took {} ms because
it had to load messages from disk to cache",
- test_server.pid(),
- elapsed.as_millis()
- );
- } else {
- info!(
- "Test iggy-server started, pid: {}, startup time: {} ms",
- test_server.pid(),
- elapsed.as_millis()
- );
- }
-
- Some(test_server)
- } else {
- info!("Skipping iggy-server start");
- None
- }
-}
-
-async fn evaluate_server_start_condition(args: &IggyBenchArgs) -> (bool,
HashMap<String, String>) {
- let default_config: ServerConfig =
- toml::from_str(include_str!("../../../configs/server.toml")).unwrap();
-
- match &args.transport() {
- TransportProtocol::Http => {
- let args_http_address =
args.server_address().parse::<SocketAddr>().unwrap();
- let config_http_address =
default_config.http.address.parse::<SocketAddr>().unwrap();
- let envs = HashMap::from([
- (
- "IGGY_HTTP_ADDRESS".to_owned(),
- default_config.http.address.clone(),
- ),
- ("IGGY_TCP_ENABLED".to_owned(), "false".to_owned()),
- ("IGGY_QUIC_ENABLED".to_owned(), "false".to_owned()),
- ]);
- (
- addresses_are_equivalent(&args_http_address,
&config_http_address)
- && !is_tcp_addr_in_use(&args_http_address).await,
- envs,
- )
- }
- TransportProtocol::Tcp => {
- let args_tcp_address =
args.server_address().parse::<SocketAddr>().unwrap();
- let config_tcp_address =
default_config.tcp.address.parse::<SocketAddr>().unwrap();
-
- let envs = HashMap::from([
- (
- "IGGY_TCP_ADDRESS".to_owned(),
- default_config.tcp.address.clone(),
- ),
- ("IGGY_HTTP_ENABLED".to_owned(), "false".to_owned()),
- ("IGGY_QUIC_ENABLED".to_owned(), "false".to_owned()),
- ]);
- (
- addresses_are_equivalent(&args_tcp_address,
&config_tcp_address)
- && !is_tcp_addr_in_use(&args_tcp_address).await,
- envs,
- )
- }
- TransportProtocol::Quic => {
- let args_quic_address =
args.server_address().parse::<SocketAddr>().unwrap();
- let config_quic_address =
default_config.quic.address.parse::<SocketAddr>().unwrap();
- let envs = HashMap::from([
- (
- "IGGY_QUIC_ADDRESS".to_owned(),
- default_config.quic.address.clone(),
- ),
- ("IGGY_HTTP_ENABLED".to_owned(), "false".to_owned()),
- ("IGGY_TCP_ENABLED".to_owned(), "false".to_owned()),
- ]);
-
- (
- addresses_are_equivalent(&args_quic_address,
&config_quic_address)
- && !is_udp_addr_in_use(&args_quic_address).await,
- envs,
- )
- }
- TransportProtocol::WebSocket => {
- let args_websocket_address =
args.server_address().parse::<SocketAddr>().unwrap();
- let config_websocket_address = default_config
- .websocket
- .address
- .parse::<SocketAddr>()
- .unwrap();
- let envs = HashMap::from([(
- "IGGY_WEBSOCKET_ADDRESS".to_owned(),
- default_config.websocket.address.clone(),
- )]);
- (
- addresses_are_equivalent(&args_websocket_address,
&config_websocket_address)
- && !is_tcp_addr_in_use(&args_websocket_address).await,
- envs,
- )
- }
- }
-}
-
-async fn is_tcp_addr_in_use(addr: &SocketAddr) -> bool {
- TcpStream::connect(addr).await.is_ok()
-}
-
-async fn is_udp_addr_in_use(addr: &SocketAddr) -> bool {
- UdpSocket::bind(addr).await.is_err()
-}
-
-fn addresses_are_equivalent(first: &SocketAddr, second: &SocketAddr) -> bool {
- if first.ip().is_unspecified() || second.ip().is_unspecified() {
- first.port() == second.port()
- } else {
- first == second
- }
-}
diff --git a/scripts/check-backwards-compat.sh
b/scripts/check-backwards-compat.sh
index eb1d3bafc..470c6f300 100755
--- a/scripts/check-backwards-compat.sh
+++ b/scripts/check-backwards-compat.sh
@@ -184,13 +184,13 @@ ok "server is ready"
# Producer bench (baseline)
info "Running producer bench on baseline"
-BENCH_CMD=( target/debug/iggy-bench --verbose --message-batches "$BATCHES"
--messages-per-batch "$MSGS_PER_BATCH" pinned-producer tcp )
+BENCH_CMD=( target/debug/iggy-bench --message-batches "$BATCHES"
--messages-per-batch "$MSGS_PER_BATCH" pinned-producer tcp )
if command -v timeout >/dev/null 2>&1; then timeout 60s "${BENCH_CMD[@]}";
else "${BENCH_CMD[@]}"; fi
ok "producer bench done"
# Consumer bench (baseline)
info "Running consumer bench on baseline"
-BENCH_CMD=( target/debug/iggy-bench --verbose --message-batches "$BATCHES"
--messages-per-batch "$MSGS_PER_BATCH" pinned-consumer tcp )
+BENCH_CMD=( target/debug/iggy-bench --message-batches "$BATCHES"
--messages-per-batch "$MSGS_PER_BATCH" pinned-consumer tcp )
if command -v timeout >/dev/null 2>&1; then timeout 60s "${BENCH_CMD[@]}";
else "${BENCH_CMD[@]}"; fi
ok "consumer bench done (baseline)"
@@ -249,7 +249,7 @@ ok "PR server is ready"
# Only consumer bench against PR
info "Running consumer bench on PR (compat check)"
-BENCH_CMD=( target/debug/iggy-bench --verbose --message-batches "$BATCHES"
--messages-per-batch "$MSGS_PER_BATCH" pinned-consumer tcp )
+BENCH_CMD=( target/debug/iggy-bench --message-batches "$BATCHES"
--messages-per-batch "$MSGS_PER_BATCH" pinned-consumer tcp )
if command -v timeout >/dev/null 2>&1; then timeout 60s "${BENCH_CMD[@]}";
else "${BENCH_CMD[@]}"; fi
ok "consumer bench done (PR)"