This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new e46f294b7 refactor(bench): remove TestServer startup capability (#2611)
e46f294b7 is described below

commit e46f294b7af4f86b0d7e26d984205a019a8885f8
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Jan 26 15:40:15 2026 +0100

    refactor(bench): remove TestServer startup capability (#2611)
    
    The integration crate dependency existed solely for TestServer
    functionality that was never used in practice - users always
    run iggy-server separately before benchmarking.
---
 Cargo.lock                                         |   2 -
 core/bench/Cargo.toml                              |   4 -
 .../bench/src/actors/consumer/client/high_level.rs |   2 +-
 core/bench/src/actors/consumer/client/low_level.rs |   2 +-
 .../actors/consumer/typed_benchmark_consumer.rs    |   2 +-
 .../bench/src/actors/producer/client/high_level.rs |   2 +-
 core/bench/src/actors/producer/client/low_level.rs |   2 +-
 .../actors/producer/typed_benchmark_producer.rs    |   2 +-
 .../typed_banchmark_producing_consumer.rs          |   2 +-
 core/bench/src/analytics/report_builder.rs         |   2 +-
 core/bench/src/args/common.rs                      |  45 +----
 core/bench/src/args/defaults.rs                    |   4 -
 core/bench/src/args/examples.rs                    |   3 -
 .../src/benchmarks/balanced_consumer_group.rs      |   2 +-
 core/bench/src/benchmarks/balanced_producer.rs     |   2 +-
 .../balanced_producer_and_consumer_group.rs        |   2 +-
 core/bench/src/benchmarks/benchmark.rs             |   2 +-
 core/bench/src/benchmarks/common.rs                |   2 +-
 .../benchmarks/end_to_end_producing_consumer.rs    |   2 +-
 .../end_to_end_producing_consumer_group.rs         |   2 +-
 core/bench/src/benchmarks/pinned_consumer.rs       |   2 +-
 core/bench/src/benchmarks/pinned_producer.rs       |   2 +-
 .../src/benchmarks/pinned_producer_and_consumer.rs |   2 +-
 core/bench/src/runner.rs                           |  11 +-
 core/bench/src/utils/client_factory.rs             | 163 +++++++++++++++++-
 core/bench/src/utils/mod.rs                        |   4 +-
 core/bench/src/utils/server_starter.rs             | 188 ---------------------
 scripts/check-backwards-compat.sh                  |   6 +-
 28 files changed, 183 insertions(+), 283 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 26de47328..1167a2ea9 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4466,14 +4466,12 @@ dependencies = [
  "hostname",
  "human-repr",
  "iggy",
- "integration",
  "nonzero_lit",
  "rand 0.9.2",
  "rayon",
  "serde",
  "sysinfo",
  "tokio",
- "toml 0.9.11+spec-1.1.0",
  "tracing",
  "tracing-appender",
  "tracing-subscriber",
diff --git a/core/bench/Cargo.toml b/core/bench/Cargo.toml
index 98fe0570a..42ed50e44 100644
--- a/core/bench/Cargo.toml
+++ b/core/bench/Cargo.toml
@@ -29,8 +29,6 @@ readme = "../../README.md"
 [[bin]]
 name = "iggy-bench"
 path = "src/main.rs"
-# Due to dependency to integration, which has a dependency to server, setting
-# mimalloc on server is also setting it on bench.
 
 [dependencies]
 async-trait = { workspace = true }
@@ -45,14 +43,12 @@ governor = "0.10.4"
 hostname = "0.4.2"
 human-repr = { workspace = true }
 iggy = { workspace = true }
-integration = { workspace = true }
 nonzero_lit = { workspace = true }
 rand = { workspace = true }
 rayon = "1.11.0"
 serde = { workspace = true }
 sysinfo = { workspace = true }
 tokio = { workspace = true }
-toml = { workspace = true }
 tracing = { workspace = true }
 tracing-appender = { workspace = true }
 tracing-subscriber = { workspace = true }
diff --git a/core/bench/src/actors/consumer/client/high_level.rs 
b/core/bench/src/actors/consumer/client/high_level.rs
index b1f3a4f04..22b04679f 100644
--- a/core/bench/src/actors/consumer/client/high_level.rs
+++ b/core/bench/src/actors/consumer/client/high_level.rs
@@ -24,9 +24,9 @@ use crate::actors::{
     },
 };
 
+use crate::utils::{ClientFactory, login_root};
 use futures_util::StreamExt;
 use iggy::prelude::*;
-use integration::test_server::{ClientFactory, login_root};
 use std::{sync::Arc, time::Duration};
 use tokio::time::{Instant, timeout};
 use tracing::{error, warn};
diff --git a/core/bench/src/actors/consumer/client/low_level.rs 
b/core/bench/src/actors/consumer/client/low_level.rs
index 27feb3a4e..b501942a0 100644
--- a/core/bench/src/actors/consumer/client/low_level.rs
+++ b/core/bench/src/actors/consumer/client/low_level.rs
@@ -20,9 +20,9 @@ use crate::actors::consumer::client::BenchmarkConsumerClient;
 use crate::actors::consumer::client::interface::{BenchmarkConsumerConfig, 
ConsumerClient};
 use crate::actors::{ApiLabel, BatchMetrics, BenchmarkInit};
 use crate::benchmarks::common::create_consumer;
+use crate::utils::{ClientFactory, login_root};
 use crate::utils::{batch_total_size_bytes, batch_user_size_bytes};
 use iggy::prelude::*;
-use integration::test_server::{ClientFactory, login_root};
 use std::sync::Arc;
 use std::time::Duration;
 use tokio::time::Instant;
diff --git a/core/bench/src/actors/consumer/typed_benchmark_consumer.rs 
b/core/bench/src/actors/consumer/typed_benchmark_consumer.rs
index 723adc35b..4455e6895 100644
--- a/core/bench/src/actors/consumer/typed_benchmark_consumer.rs
+++ b/core/bench/src/actors/consumer/typed_benchmark_consumer.rs
@@ -18,6 +18,7 @@
 
 use std::sync::Arc;
 
+use crate::utils::ClientFactory;
 use crate::{
     actors::consumer::{
         BenchmarkConsumer,
@@ -33,7 +34,6 @@ use bench_report::{
     numeric_parameter::BenchmarkNumericParameter,
 };
 use iggy::prelude::*;
-use integration::test_server::ClientFactory;
 
 pub enum TypedBenchmarkConsumer {
     High(BenchmarkConsumer<HighLevelConsumerClient>),
diff --git a/core/bench/src/actors/producer/client/high_level.rs 
b/core/bench/src/actors/producer/client/high_level.rs
index 7544d2340..7fdf923b1 100644
--- a/core/bench/src/actors/producer/client/high_level.rs
+++ b/core/bench/src/actors/producer/client/high_level.rs
@@ -16,6 +16,7 @@
  * under the License.
  */
 
+use crate::utils::{ClientFactory, login_root};
 use crate::{
     actors::{
         ApiLabel, BatchMetrics, BenchmarkInit,
@@ -27,7 +28,6 @@ use crate::{
     utils::batch_generator::BenchmarkBatchGenerator,
 };
 use iggy::prelude::*;
-use integration::test_server::{ClientFactory, login_root};
 use std::sync::Arc;
 use tokio::time::Instant;
 
diff --git a/core/bench/src/actors/producer/client/low_level.rs 
b/core/bench/src/actors/producer/client/low_level.rs
index fc2f483ed..b132f9a97 100644
--- a/core/bench/src/actors/producer/client/low_level.rs
+++ b/core/bench/src/actors/producer/client/low_level.rs
@@ -18,6 +18,7 @@
 
 use std::sync::Arc;
 
+use crate::utils::{ClientFactory, login_root};
 use crate::{
     actors::{
         ApiLabel, BatchMetrics, BenchmarkInit,
@@ -29,7 +30,6 @@ use crate::{
     utils::batch_generator::BenchmarkBatchGenerator,
 };
 use iggy::prelude::*;
-use integration::test_server::{ClientFactory, login_root};
 use tokio::time::Instant;
 
 pub struct LowLevelProducerClient {
diff --git a/core/bench/src/actors/producer/typed_benchmark_producer.rs 
b/core/bench/src/actors/producer/typed_benchmark_producer.rs
index 2a27fd33a..32655ee83 100644
--- a/core/bench/src/actors/producer/typed_benchmark_producer.rs
+++ b/core/bench/src/actors/producer/typed_benchmark_producer.rs
@@ -16,6 +16,7 @@
  * under the License.
  */
 
+use crate::utils::ClientFactory;
 use crate::{
     actors::producer::{
         BenchmarkProducer,
@@ -31,7 +32,6 @@ use bench_report::{
     numeric_parameter::BenchmarkNumericParameter,
 };
 use iggy::prelude::*;
-use integration::test_server::ClientFactory;
 use std::sync::Arc;
 
 pub enum TypedBenchmarkProducer {
diff --git 
a/core/bench/src/actors/producing_consumer/typed_banchmark_producing_consumer.rs
 
b/core/bench/src/actors/producing_consumer/typed_banchmark_producing_consumer.rs
index ac7e121b6..16e12a59d 100644
--- 
a/core/bench/src/actors/producing_consumer/typed_banchmark_producing_consumer.rs
+++ 
b/core/bench/src/actors/producing_consumer/typed_banchmark_producing_consumer.rs
@@ -18,6 +18,7 @@
 
 use std::sync::Arc;
 
+use crate::utils::ClientFactory;
 use crate::{
     actors::{
         consumer::client::{
@@ -38,7 +39,6 @@ use bench_report::{
 };
 
 use iggy::prelude::*;
-use integration::test_server::ClientFactory;
 
 pub enum TypedBenchmarkProducingConsumer {
     High(BenchmarkProducingConsumer<HighLevelProducerClient, 
HighLevelConsumerClient>),
diff --git a/core/bench/src/analytics/report_builder.rs 
b/core/bench/src/analytics/report_builder.rs
index 8a00c8ccf..2ec001091 100644
--- a/core/bench/src/analytics/report_builder.rs
+++ b/core/bench/src/analytics/report_builder.rs
@@ -19,6 +19,7 @@
 use std::{collections::HashMap, thread};
 
 use super::metrics::group::{from_individual_metrics, 
from_producers_and_consumers_statistics};
+use crate::utils::ClientFactory;
 use crate::utils::get_server_stats;
 use bench_report::{
     actor_kind::ActorKind,
@@ -31,7 +32,6 @@ use bench_report::{
 };
 use chrono::{DateTime, Utc};
 use iggy::prelude::{CacheMetrics, CacheMetricsKey, IggyTimestamp, Stats};
-use integration::test_server::ClientFactory;
 use std::sync::Arc;
 
 pub struct BenchmarkReportBuilder;
diff --git a/core/bench/src/args/common.rs b/core/bench/src/args/common.rs
index c3a6f2bd4..ebc54e48b 100644
--- a/core/bench/src/args/common.rs
+++ b/core/bench/src/args/common.rs
@@ -22,8 +22,7 @@ use super::props::{BenchmarkKindProps, 
BenchmarkTransportProps};
 use super::{
     defaults::{
         DEFAULT_MESSAGE_BATCHES, DEFAULT_MESSAGE_SIZE, 
DEFAULT_MESSAGES_PER_BATCH,
-        DEFAULT_MOVING_AVERAGE_WINDOW, DEFAULT_PERFORM_CLEANUP, 
DEFAULT_SAMPLING_TIME,
-        DEFAULT_SERVER_STDOUT_VISIBILITY, DEFAULT_SKIP_SERVER_START, 
DEFAULT_WARMUP_TIME,
+        DEFAULT_MOVING_AVERAGE_WINDOW, DEFAULT_SAMPLING_TIME, 
DEFAULT_WARMUP_TIME,
     },
     transport::BenchmarkTransportCommand,
 };
@@ -32,9 +31,7 @@ use 
bench_report::numeric_parameter::BenchmarkNumericParameter;
 use clap::error::ErrorKind;
 use clap::{CommandFactory, Parser};
 use iggy::prelude::{IggyByteSize, IggyDuration, TransportProtocol};
-use std::net::SocketAddr;
 use std::num::NonZeroU32;
-use std::path::Path;
 use std::str::FromStr;
 
 #[derive(Parser, Debug)]
@@ -72,10 +69,6 @@ pub struct IggyBenchArgs {
     #[arg(long, short = 'w', default_value_t = 
IggyDuration::from_str(DEFAULT_WARMUP_TIME).unwrap())]
     pub warmup_time: IggyDuration,
 
-    /// Server stdout visibility
-    #[arg(long, short = 'v', default_value_t = 
DEFAULT_SERVER_STDOUT_VISIBILITY)]
-    pub verbose: bool,
-
     /// Sampling time for metrics collection. It is also used as bucket size 
for time series calculations.
     #[arg(long, short = 't', default_value_t = 
IggyDuration::from_str(DEFAULT_SAMPLING_TIME).unwrap(), value_parser = 
IggyDuration::from_str)]
     pub sampling_time: IggyDuration,
@@ -84,34 +77,11 @@ pub struct IggyBenchArgs {
     #[arg(long, short = 'W', default_value_t = DEFAULT_MOVING_AVERAGE_WINDOW)]
     pub moving_average_window: u32,
 
-    /// Shutdown iggy-server and remove server `local_data` directory after 
the benchmark is finished.
-    /// Only applicable to local benchmarks.
-    #[arg(long, default_value_t = DEFAULT_PERFORM_CLEANUP, 
verbatim_doc_comment)]
-    pub cleanup: bool,
-
-    /// iggy-server executable path.
-    /// Only applicable to local benchmarks.
-    #[arg(long, short='e', default_value = None, value_parser = 
validate_server_executable_path)]
-    pub server_executable_path: Option<String>,
-
-    /// Skip server start.
-    /// Only applicable to local benchmarks.
-    #[arg(long, short = 'k', default_value_t = DEFAULT_SKIP_SERVER_START, 
verbatim_doc_comment)]
-    pub skip_server_start: bool,
-
     /// Use high-level API for actors
     #[arg(long, short = 'H', default_value_t = false)]
     pub high_level_api: bool,
 }
 
-fn validate_server_executable_path(v: &str) -> Result<String, String> {
-    if Path::new(v).exists() {
-        Ok(v.to_owned())
-    } else {
-        Err(format!("Provided server executable '{v}' does not exist."))
-    }
-}
-
 impl IggyBenchArgs {
     pub fn transport_command(&self) -> &BenchmarkTransportCommand {
         self.benchmark_kind.transport_command()
@@ -133,19 +103,6 @@ impl IggyBenchArgs {
     }
 
     pub fn validate(&mut self) {
-        let server_address = 
self.server_address().parse::<SocketAddr>().unwrap();
-        if (self.cleanup || self.verbose) && 
!server_address.ip().is_loopback() {
-            Self::command()
-                .error(
-                    ErrorKind::ArgumentConflict,
-                    format!(
-                        "Cannot use cleanup or verbose flags with a 
non-loopback server address: {}",
-                        self.server_address()
-                    ),
-                )
-                .exit();
-        }
-
         if self.output_dir().is_none()
             && (self.gitref().is_some()
                 || self.identifier().is_some()
diff --git a/core/bench/src/args/defaults.rs b/core/bench/src/args/defaults.rs
index 7695782fb..bfd265159 100644
--- a/core/bench/src/args/defaults.rs
+++ b/core/bench/src/args/defaults.rs
@@ -46,11 +46,7 @@ pub const DEFAULT_NUMBER_OF_CONSUMERS: NonZeroU32 = u32!(8);
 pub const DEFAULT_NUMBER_OF_CONSUMER_GROUPS: NonZeroU32 = u32!(1);
 pub const DEFAULT_NUMBER_OF_PRODUCERS: NonZeroU32 = u32!(8);
 
-pub const DEFAULT_PERFORM_CLEANUP: bool = false;
-pub const DEFAULT_SERVER_STDOUT_VISIBILITY: bool = false;
-
 pub const DEFAULT_WARMUP_TIME: &str = "0s";
-pub const DEFAULT_SKIP_SERVER_START: bool = false;
 
 pub const DEFAULT_SAMPLING_TIME: &str = "10ms";
 pub const DEFAULT_MOVING_AVERAGE_WINDOW: u32 = 20;
diff --git a/core/bench/src/args/examples.rs b/core/bench/src/args/examples.rs
index 84a7839df..7a00929e5 100644
--- a/core/bench/src/args/examples.rs
+++ b/core/bench/src/args/examples.rs
@@ -57,13 +57,10 @@ const EXAMPLES: &str = r#"EXAMPLES:
                                 Mutually exclusive with --message-batches
     --message-size (-m): Message size in bytes [default: 1000]
                         For random sizes, use range format: "100..1000"
-    --start-stream-id (-S): Start stream ID [default: 1]
     --rate-limit (-r): Optional throughput limit per producer (e.g., "50KB", 
"10MB")
     --warmup-time (-w): Warmup duration [default: 0s]
     --sampling-time (-t): Metrics sampling interval [default: 10ms]
     --moving-average-window (-W): Window size for moving average [default: 20]
-    --cleanup: Remove server data after benchmark
-    --verbose (-v): Show server output (only applicable for local server)
 
     Benchmark-specific options (after the benchmark command):
     --streams (-s): Number of streams
diff --git a/core/bench/src/benchmarks/balanced_consumer_group.rs 
b/core/bench/src/benchmarks/balanced_consumer_group.rs
index ac60fc6f6..5bf6026e3 100644
--- a/core/bench/src/benchmarks/balanced_consumer_group.rs
+++ b/core/bench/src/benchmarks/balanced_consumer_group.rs
@@ -17,6 +17,7 @@
  */
 
 use super::benchmark::Benchmarkable;
+use crate::utils::ClientFactory;
 use crate::{
     args::common::IggyBenchArgs,
     benchmarks::common::{build_consumer_futures, init_consumer_groups},
@@ -24,7 +25,6 @@ use crate::{
 use async_trait::async_trait;
 use bench_report::{benchmark_kind::BenchmarkKind, 
individual_metrics::BenchmarkIndividualMetrics};
 use iggy::prelude::*;
-use integration::test_server::ClientFactory;
 use std::sync::Arc;
 use tokio::task::JoinSet;
 use tracing::info;
diff --git a/core/bench/src/benchmarks/balanced_producer.rs 
b/core/bench/src/benchmarks/balanced_producer.rs
index 6e81ab5e3..b9e7aa892 100644
--- a/core/bench/src/benchmarks/balanced_producer.rs
+++ b/core/bench/src/benchmarks/balanced_producer.rs
@@ -19,11 +19,11 @@
 use super::benchmark::Benchmarkable;
 use crate::args::common::IggyBenchArgs;
 use crate::benchmarks::common::build_producer_futures;
+use crate::utils::ClientFactory;
 use async_trait::async_trait;
 use bench_report::benchmark_kind::BenchmarkKind;
 use bench_report::individual_metrics::BenchmarkIndividualMetrics;
 use iggy::prelude::*;
-use integration::test_server::ClientFactory;
 use std::sync::Arc;
 use tokio::task::JoinSet;
 use tracing::info;
diff --git a/core/bench/src/benchmarks/balanced_producer_and_consumer_group.rs 
b/core/bench/src/benchmarks/balanced_producer_and_consumer_group.rs
index 0eab5db8b..10ed9bc4b 100644
--- a/core/bench/src/benchmarks/balanced_producer_and_consumer_group.rs
+++ b/core/bench/src/benchmarks/balanced_producer_and_consumer_group.rs
@@ -17,6 +17,7 @@
  */
 
 use super::benchmark::Benchmarkable;
+use crate::utils::ClientFactory;
 use crate::{
     args::common::IggyBenchArgs,
     benchmarks::common::{build_consumer_futures, build_producer_futures, 
init_consumer_groups},
@@ -24,7 +25,6 @@ use crate::{
 use async_trait::async_trait;
 use bench_report::{benchmark_kind::BenchmarkKind, 
individual_metrics::BenchmarkIndividualMetrics};
 use iggy::prelude::*;
-use integration::test_server::ClientFactory;
 use std::sync::Arc;
 use tokio::task::JoinSet;
 use tracing::info;
diff --git a/core/bench/src/benchmarks/benchmark.rs 
b/core/bench/src/benchmarks/benchmark.rs
index 31f1828c3..6a334b4f0 100644
--- a/core/bench/src/benchmarks/benchmark.rs
+++ b/core/bench/src/benchmarks/benchmark.rs
@@ -17,13 +17,13 @@
  */
 
 use crate::args::kind::BenchmarkKindCommand;
+use crate::utils::{ClientFactory, login_root};
 use crate::{args::common::IggyBenchArgs, 
utils::client_factory::create_client_factory};
 use async_trait::async_trait;
 use bench_report::benchmark_kind::BenchmarkKind;
 use bench_report::individual_metrics::BenchmarkIndividualMetrics;
 use iggy::clients::client::IggyClient;
 use iggy::prelude::*;
-use integration::test_server::{ClientFactory, login_root};
 use std::sync::Arc;
 use tokio::task::JoinSet;
 use tracing::info;
diff --git a/core/bench/src/benchmarks/common.rs 
b/core/bench/src/benchmarks/common.rs
index 97c66fb7b..7858f6fca 100644
--- a/core/bench/src/benchmarks/common.rs
+++ b/core/bench/src/benchmarks/common.rs
@@ -17,6 +17,7 @@
  */
 
 use super::{CONSUMER_GROUP_BASE_ID, CONSUMER_GROUP_NAME_PREFIX};
+use crate::utils::{ClientFactory, login_root};
 use crate::{
     actors::{
         consumer::typed_benchmark_consumer::TypedBenchmarkConsumer,
@@ -28,7 +29,6 @@ use crate::{
 };
 use bench_report::{benchmark_kind::BenchmarkKind, 
individual_metrics::BenchmarkIndividualMetrics};
 use iggy::prelude::*;
-use integration::test_server::{ClientFactory, login_root};
 use std::{future::Future, sync::Arc};
 use tracing::{error, info};
 
diff --git a/core/bench/src/benchmarks/end_to_end_producing_consumer.rs 
b/core/bench/src/benchmarks/end_to_end_producing_consumer.rs
index 2818fea6d..1867ca2d2 100644
--- a/core/bench/src/benchmarks/end_to_end_producing_consumer.rs
+++ b/core/bench/src/benchmarks/end_to_end_producing_consumer.rs
@@ -18,11 +18,11 @@
 
 use crate::args::common::IggyBenchArgs;
 use crate::benchmarks::common::build_producing_consumers_futures;
+use crate::utils::ClientFactory;
 use async_trait::async_trait;
 use bench_report::benchmark_kind::BenchmarkKind;
 use bench_report::individual_metrics::BenchmarkIndividualMetrics;
 use iggy::prelude::*;
-use integration::test_server::ClientFactory;
 use std::sync::Arc;
 use tokio::task::JoinSet;
 use tracing::info;
diff --git a/core/bench/src/benchmarks/end_to_end_producing_consumer_group.rs 
b/core/bench/src/benchmarks/end_to_end_producing_consumer_group.rs
index 2adda4d55..2ef5cd190 100644
--- a/core/bench/src/benchmarks/end_to_end_producing_consumer_group.rs
+++ b/core/bench/src/benchmarks/end_to_end_producing_consumer_group.rs
@@ -18,11 +18,11 @@
 
 use crate::args::common::IggyBenchArgs;
 use crate::benchmarks::common::{build_producing_consumer_groups_futures, 
init_consumer_groups};
+use crate::utils::ClientFactory;
 use async_trait::async_trait;
 use bench_report::benchmark_kind::BenchmarkKind;
 use bench_report::individual_metrics::BenchmarkIndividualMetrics;
 use iggy::prelude::*;
-use integration::test_server::ClientFactory;
 use std::sync::Arc;
 use tokio::task::JoinSet;
 use tracing::info;
diff --git a/core/bench/src/benchmarks/pinned_consumer.rs 
b/core/bench/src/benchmarks/pinned_consumer.rs
index 870df77a7..4c44b9fec 100644
--- a/core/bench/src/benchmarks/pinned_consumer.rs
+++ b/core/bench/src/benchmarks/pinned_consumer.rs
@@ -19,11 +19,11 @@
 use crate::args::common::IggyBenchArgs;
 use crate::benchmarks::benchmark::Benchmarkable;
 use crate::benchmarks::common::build_consumer_futures;
+use crate::utils::ClientFactory;
 use async_trait::async_trait;
 use bench_report::benchmark_kind::BenchmarkKind;
 use bench_report::individual_metrics::BenchmarkIndividualMetrics;
 use iggy::prelude::IggyError;
-use integration::test_server::ClientFactory;
 use std::sync::Arc;
 use tokio::task::JoinSet;
 use tracing::info;
diff --git a/core/bench/src/benchmarks/pinned_producer.rs 
b/core/bench/src/benchmarks/pinned_producer.rs
index 3514e24d9..8630eb573 100644
--- a/core/bench/src/benchmarks/pinned_producer.rs
+++ b/core/bench/src/benchmarks/pinned_producer.rs
@@ -19,11 +19,11 @@
 use crate::args::common::IggyBenchArgs;
 use crate::benchmarks::benchmark::Benchmarkable;
 use crate::benchmarks::common::build_producer_futures;
+use crate::utils::ClientFactory;
 use async_trait::async_trait;
 use bench_report::benchmark_kind::BenchmarkKind;
 use bench_report::individual_metrics::BenchmarkIndividualMetrics;
 use iggy::prelude::{IggyError, MaxTopicSize};
-use integration::test_server::ClientFactory;
 use std::sync::Arc;
 use tokio::task::JoinSet;
 use tracing::info;
diff --git a/core/bench/src/benchmarks/pinned_producer_and_consumer.rs 
b/core/bench/src/benchmarks/pinned_producer_and_consumer.rs
index cde6632cc..8f2630396 100644
--- a/core/bench/src/benchmarks/pinned_producer_and_consumer.rs
+++ b/core/bench/src/benchmarks/pinned_producer_and_consumer.rs
@@ -18,11 +18,11 @@
 use crate::args::common::IggyBenchArgs;
 use crate::benchmarks::benchmark::Benchmarkable;
 use crate::benchmarks::common::{build_consumer_futures, 
build_producer_futures};
+use crate::utils::ClientFactory;
 use async_trait::async_trait;
 use bench_report::benchmark_kind::BenchmarkKind;
 use bench_report::individual_metrics::BenchmarkIndividualMetrics;
 use iggy::prelude::*;
-use integration::test_server::ClientFactory;
 use std::sync::Arc;
 use tokio::task::JoinSet;
 use tracing::info;
diff --git a/core/bench/src/runner.rs b/core/bench/src/runner.rs
index ab9553b32..d9007bfd7 100644
--- a/core/bench/src/runner.rs
+++ b/core/bench/src/runner.rs
@@ -21,34 +21,27 @@ use crate::args::common::IggyBenchArgs;
 use crate::benchmarks::benchmark::Benchmarkable;
 use crate::plot::{ChartType, plot_chart};
 use crate::utils::cpu_name::append_cpu_name_lowercase;
-use crate::utils::server_starter::start_server_if_needed;
 use crate::utils::{collect_server_logs_and_save_to_file, 
params_from_args_and_metrics};
 use bench_report::hardware::BenchmarkHardware;
 use iggy::prelude::IggyError;
-use integration::test_server::TestServer;
 use std::path::Path;
 use std::time::Duration;
 use tokio::time::sleep;
 use tracing::{error, info};
 
 pub struct BenchmarkRunner {
-    pub args: Option<IggyBenchArgs>,
-    pub test_server: Option<TestServer>,
+    args: Option<IggyBenchArgs>,
 }
 
 impl BenchmarkRunner {
     pub const fn new(args: IggyBenchArgs) -> Self {
-        Self {
-            args: Some(args),
-            test_server: None,
-        }
+        Self { args: Some(args) }
     }
 
     #[allow(clippy::cognitive_complexity)]
     pub async fn run(mut self) -> Result<(), IggyError> {
         let args = self.args.take().unwrap();
         let should_open_charts = args.open_charts();
-        self.test_server = start_server_if_needed(&args).await;
 
         let transport = args.transport();
         let server_addr = args.server_address();
diff --git a/core/bench/src/utils/client_factory.rs 
b/core/bench/src/utils/client_factory.rs
index 513d22300..3fda38d77 100644
--- a/core/bench/src/utils/client_factory.rs
+++ b/core/bench/src/utils/client_factory.rs
@@ -18,14 +18,165 @@
 
 use crate::args::common::IggyBenchArgs;
 use crate::args::transport::BenchmarkTransportCommand;
-use iggy::prelude::TransportProtocol;
-use integration::http_client::HttpClientFactory;
-use integration::quic_client::QuicClientFactory;
-use integration::tcp_client::TcpClientFactory;
-use integration::test_server::ClientFactory;
-use integration::websocket_client::WebSocketClientFactory;
+use async_trait::async_trait;
+use iggy::http::http_client::HttpClient;
+use iggy::prelude::{
+    Client, ClientWrapper, DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME, 
HttpClientConfig,
+    IdentityInfo, IggyClient, QuicClientConfig, TcpClient, TcpClientConfig, 
TransportProtocol,
+    UserClient, WebSocketClientConfig,
+};
+use iggy::quic::quic_client::QuicClient;
+use iggy::websocket::websocket_client::WebSocketClient;
 use std::sync::Arc;
 
+#[async_trait]
+pub trait ClientFactory: Sync + Send {
+    async fn create_client(&self) -> ClientWrapper;
+    fn transport(&self) -> TransportProtocol;
+    fn server_addr(&self) -> String;
+}
+
+pub async fn login_root(client: &IggyClient) -> IdentityInfo {
+    client
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap()
+}
+
+#[derive(Debug, Clone)]
+pub struct HttpClientFactory {
+    pub server_addr: String,
+}
+
+#[async_trait]
+impl ClientFactory for HttpClientFactory {
+    async fn create_client(&self) -> ClientWrapper {
+        let config = HttpClientConfig {
+            api_url: format!("http://{}";, self.server_addr.clone()),
+            ..HttpClientConfig::default()
+        };
+        let client = HttpClient::create(Arc::new(config)).unwrap();
+        ClientWrapper::Http(client)
+    }
+
+    fn transport(&self) -> TransportProtocol {
+        TransportProtocol::Http
+    }
+
+    fn server_addr(&self) -> String {
+        self.server_addr.clone()
+    }
+}
+
+#[derive(Debug, Clone, Default)]
+pub struct TcpClientFactory {
+    pub server_addr: String,
+    pub nodelay: bool,
+    pub tls_enabled: bool,
+    pub tls_domain: String,
+    pub tls_ca_file: Option<String>,
+    pub tls_validate_certificate: bool,
+}
+
+#[async_trait]
+impl ClientFactory for TcpClientFactory {
+    async fn create_client(&self) -> ClientWrapper {
+        let config = TcpClientConfig {
+            server_address: self.server_addr.clone(),
+            nodelay: self.nodelay,
+            tls_enabled: self.tls_enabled,
+            tls_domain: self.tls_domain.clone(),
+            tls_ca_file: self.tls_ca_file.clone(),
+            tls_validate_certificate: self.tls_validate_certificate,
+            ..TcpClientConfig::default()
+        };
+        let client = TcpClient::create(Arc::new(config)).unwrap_or_else(|e| {
+            panic!(
+                "Failed to create TcpClient, iggy-server has address {}, 
error: {:?}",
+                self.server_addr, e
+            )
+        });
+        Client::connect(&client).await.unwrap_or_else(|e| {
+            if self.tls_enabled {
+                panic!(
+                    "Failed to connect to iggy-server at {} with TLS enabled, 
error: {:?}\n\
+                    Hint: Make sure the server is started with TLS enabled and 
self-signed certificate:\n\
+                    IGGY_TCP_TLS_ENABLED=true IGGY_TCP_TLS_SELF_SIGNED=true\n
+                    or start iggy-bench with relevant tcp tls arguments: --tls 
--tls-domain <domain> --tls-ca-file <ca_file>\n",
+                    self.server_addr, e
+                )
+            } else {
+                panic!(
+                    "Failed to connect to iggy-server at {}, error: {:?}",
+                    self.server_addr, e
+                )
+            }
+        });
+        ClientWrapper::Tcp(client)
+    }
+
+    fn transport(&self) -> TransportProtocol {
+        TransportProtocol::Tcp
+    }
+
+    fn server_addr(&self) -> String {
+        self.server_addr.clone()
+    }
+}
+
+#[derive(Debug, Clone)]
+pub struct QuicClientFactory {
+    pub server_addr: String,
+}
+
+#[async_trait]
+impl ClientFactory for QuicClientFactory {
+    async fn create_client(&self) -> ClientWrapper {
+        let config = QuicClientConfig {
+            server_address: self.server_addr.clone(),
+            max_idle_timeout: 2_000_000,
+            ..QuicClientConfig::default()
+        };
+        let client = QuicClient::create(Arc::new(config)).unwrap();
+        Client::connect(&client).await.unwrap();
+        ClientWrapper::Quic(client)
+    }
+
+    fn transport(&self) -> TransportProtocol {
+        TransportProtocol::Quic
+    }
+
+    fn server_addr(&self) -> String {
+        self.server_addr.clone()
+    }
+}
+
+#[derive(Debug, Clone)]
+pub struct WebSocketClientFactory {
+    pub server_addr: String,
+}
+
+#[async_trait]
+impl ClientFactory for WebSocketClientFactory {
+    async fn create_client(&self) -> ClientWrapper {
+        let config = WebSocketClientConfig {
+            server_address: self.server_addr.clone(),
+            ..WebSocketClientConfig::default()
+        };
+        let client = WebSocketClient::create(Arc::new(config)).unwrap();
+        Client::connect(&client).await.unwrap();
+        ClientWrapper::WebSocket(client)
+    }
+
+    fn transport(&self) -> TransportProtocol {
+        TransportProtocol::WebSocket
+    }
+
+    fn server_addr(&self) -> String {
+        self.server_addr.clone()
+    }
+}
+
 pub fn create_client_factory(args: &IggyBenchArgs) -> Arc<dyn ClientFactory> {
     match &args.transport() {
         TransportProtocol::Http => Arc::new(HttpClientFactory {
diff --git a/core/bench/src/utils/mod.rs b/core/bench/src/utils/mod.rs
index 2559a6a3a..e8ee8ebd3 100644
--- a/core/bench/src/utils/mod.rs
+++ b/core/bench/src/utils/mod.rs
@@ -22,7 +22,6 @@ use bench_report::{
     transport::BenchmarkTransport,
 };
 use iggy::prelude::*;
-use integration::test_server::ClientFactory;
 use std::{fs, path::Path, sync::Arc};
 use tracing::{error, info};
 
@@ -38,12 +37,13 @@ use crate::args::{
     },
 };
 
+pub use client_factory::{ClientFactory, login_root};
+
 pub mod batch_generator;
 pub mod client_factory;
 pub mod cpu_name;
 pub mod finish_condition;
 pub mod rate_limiter;
-pub mod server_starter;
 
 pub fn batch_total_size_bytes(polled_messages: &PolledMessages) -> u64 {
     polled_messages
diff --git a/core/bench/src/utils/server_starter.rs 
b/core/bench/src/utils/server_starter.rs
deleted file mode 100644
index 1ee66d9b6..000000000
--- a/core/bench/src/utils/server_starter.rs
+++ /dev/null
@@ -1,188 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use crate::args::common::IggyBenchArgs;
-use iggy::prelude::TransportProtocol;
-use integration::test_server::{IpAddrKind, SYSTEM_PATH_ENV_VAR, TestServer};
-use serde::Deserialize;
-use std::net::SocketAddr;
-use std::{collections::HashMap, time::Instant};
-use tokio::net::{TcpStream, UdpSocket};
-use tracing::{info, warn};
-
-#[derive(Debug, Deserialize)]
-struct ServerConfig {
-    http: ConfigAddress,
-    tcp: ConfigAddress,
-    quic: ConfigAddress,
-    websocket: ConfigAddress,
-}
-
-#[derive(Debug, Deserialize)]
-struct ConfigAddress {
-    address: String,
-}
-
-#[allow(clippy::cognitive_complexity)]
-pub async fn start_server_if_needed(args: &IggyBenchArgs) -> 
Option<TestServer> {
-    if args.skip_server_start {
-        info!("Skipping iggy-server start");
-        return None;
-    }
-
-    let (should_start, mut envs) = evaluate_server_start_condition(args).await;
-
-    if should_start {
-        envs.insert(SYSTEM_PATH_ENV_VAR.to_owned(), "local_data".to_owned());
-
-        if args.verbose {
-            envs.insert("IGGY_TEST_VERBOSE".to_owned(), "true".to_owned());
-            info!("Enabling verbose output - iggy-server will logs print to 
stdout");
-        } else {
-            info!("Disabling verbose output - iggy-server will print logs to 
files");
-        }
-
-        info!(
-            "Starting test server, transport: {}, cleanup: {}, verbosity: {}",
-            args.transport(),
-            args.cleanup,
-            args.verbose
-        );
-        let mut test_server = TestServer::new(
-            Some(envs),
-            args.cleanup,
-            args.server_executable_path.clone(),
-            IpAddrKind::V4,
-        );
-        let now = Instant::now();
-        test_server.start();
-        let elapsed = now.elapsed();
-        if elapsed.as_millis() > 1000 {
-            warn!(
-                "Test iggy-server started, pid: {}, startup took {} ms because 
it had to load messages from disk to cache",
-                test_server.pid(),
-                elapsed.as_millis()
-            );
-        } else {
-            info!(
-                "Test iggy-server started, pid: {}, startup time: {} ms",
-                test_server.pid(),
-                elapsed.as_millis()
-            );
-        }
-
-        Some(test_server)
-    } else {
-        info!("Skipping iggy-server start");
-        None
-    }
-}
-
-async fn evaluate_server_start_condition(args: &IggyBenchArgs) -> (bool, 
HashMap<String, String>) {
-    let default_config: ServerConfig =
-        toml::from_str(include_str!("../../../configs/server.toml")).unwrap();
-
-    match &args.transport() {
-        TransportProtocol::Http => {
-            let args_http_address = 
args.server_address().parse::<SocketAddr>().unwrap();
-            let config_http_address = 
default_config.http.address.parse::<SocketAddr>().unwrap();
-            let envs = HashMap::from([
-                (
-                    "IGGY_HTTP_ADDRESS".to_owned(),
-                    default_config.http.address.clone(),
-                ),
-                ("IGGY_TCP_ENABLED".to_owned(), "false".to_owned()),
-                ("IGGY_QUIC_ENABLED".to_owned(), "false".to_owned()),
-            ]);
-            (
-                addresses_are_equivalent(&args_http_address, 
&config_http_address)
-                    && !is_tcp_addr_in_use(&args_http_address).await,
-                envs,
-            )
-        }
-        TransportProtocol::Tcp => {
-            let args_tcp_address = 
args.server_address().parse::<SocketAddr>().unwrap();
-            let config_tcp_address = 
default_config.tcp.address.parse::<SocketAddr>().unwrap();
-
-            let envs = HashMap::from([
-                (
-                    "IGGY_TCP_ADDRESS".to_owned(),
-                    default_config.tcp.address.clone(),
-                ),
-                ("IGGY_HTTP_ENABLED".to_owned(), "false".to_owned()),
-                ("IGGY_QUIC_ENABLED".to_owned(), "false".to_owned()),
-            ]);
-            (
-                addresses_are_equivalent(&args_tcp_address, 
&config_tcp_address)
-                    && !is_tcp_addr_in_use(&args_tcp_address).await,
-                envs,
-            )
-        }
-        TransportProtocol::Quic => {
-            let args_quic_address = 
args.server_address().parse::<SocketAddr>().unwrap();
-            let config_quic_address = 
default_config.quic.address.parse::<SocketAddr>().unwrap();
-            let envs = HashMap::from([
-                (
-                    "IGGY_QUIC_ADDRESS".to_owned(),
-                    default_config.quic.address.clone(),
-                ),
-                ("IGGY_HTTP_ENABLED".to_owned(), "false".to_owned()),
-                ("IGGY_TCP_ENABLED".to_owned(), "false".to_owned()),
-            ]);
-
-            (
-                addresses_are_equivalent(&args_quic_address, 
&config_quic_address)
-                    && !is_udp_addr_in_use(&args_quic_address).await,
-                envs,
-            )
-        }
-        TransportProtocol::WebSocket => {
-            let args_websocket_address = 
args.server_address().parse::<SocketAddr>().unwrap();
-            let config_websocket_address = default_config
-                .websocket
-                .address
-                .parse::<SocketAddr>()
-                .unwrap();
-            let envs = HashMap::from([(
-                "IGGY_WEBSOCKET_ADDRESS".to_owned(),
-                default_config.websocket.address.clone(),
-            )]);
-            (
-                addresses_are_equivalent(&args_websocket_address, 
&config_websocket_address)
-                    && !is_tcp_addr_in_use(&args_websocket_address).await,
-                envs,
-            )
-        }
-    }
-}
-
-async fn is_tcp_addr_in_use(addr: &SocketAddr) -> bool {
-    TcpStream::connect(addr).await.is_ok()
-}
-
-async fn is_udp_addr_in_use(addr: &SocketAddr) -> bool {
-    UdpSocket::bind(addr).await.is_err()
-}
-
-fn addresses_are_equivalent(first: &SocketAddr, second: &SocketAddr) -> bool {
-    if first.ip().is_unspecified() || second.ip().is_unspecified() {
-        first.port() == second.port()
-    } else {
-        first == second
-    }
-}
diff --git a/scripts/check-backwards-compat.sh 
b/scripts/check-backwards-compat.sh
index eb1d3bafc..470c6f300 100755
--- a/scripts/check-backwards-compat.sh
+++ b/scripts/check-backwards-compat.sh
@@ -184,13 +184,13 @@ ok "server is ready"
 
 # Producer bench (baseline)
 info "Running producer bench on baseline"
-BENCH_CMD=( target/debug/iggy-bench --verbose --message-batches "$BATCHES" 
--messages-per-batch "$MSGS_PER_BATCH" pinned-producer tcp )
+BENCH_CMD=( target/debug/iggy-bench --message-batches "$BATCHES" 
--messages-per-batch "$MSGS_PER_BATCH" pinned-producer tcp )
 if command -v timeout >/dev/null 2>&1; then timeout 60s "${BENCH_CMD[@]}"; 
else "${BENCH_CMD[@]}"; fi
 ok "producer bench done"
 
 # Consumer bench (baseline)
 info "Running consumer bench on baseline"
-BENCH_CMD=( target/debug/iggy-bench --verbose --message-batches "$BATCHES" 
--messages-per-batch "$MSGS_PER_BATCH" pinned-consumer tcp )
+BENCH_CMD=( target/debug/iggy-bench --message-batches "$BATCHES" 
--messages-per-batch "$MSGS_PER_BATCH" pinned-consumer tcp )
 if command -v timeout >/dev/null 2>&1; then timeout 60s "${BENCH_CMD[@]}"; 
else "${BENCH_CMD[@]}"; fi
 ok "consumer bench done (baseline)"
 
@@ -249,7 +249,7 @@ ok "PR server is ready"
 
 # Only consumer bench against PR
 info "Running consumer bench on PR (compat check)"
-BENCH_CMD=( target/debug/iggy-bench --verbose --message-batches "$BATCHES" 
--messages-per-batch "$MSGS_PER_BATCH" pinned-consumer tcp )
+BENCH_CMD=( target/debug/iggy-bench --message-batches "$BATCHES" 
--messages-per-batch "$MSGS_PER_BATCH" pinned-consumer tcp )
 if command -v timeout >/dev/null 2>&1; then timeout 60s "${BENCH_CMD[@]}"; 
else "${BENCH_CMD[@]}"; fi
 ok "consumer bench done (PR)"
 


Reply via email to