This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 870e738a chore(bench): fix clippy pedantic and nursery lints in bench
(#1843)
870e738a is described below
commit 870e738a2723196b887d6b2107546edc5c22351a
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Sat Jun 7 16:13:46 2025 +0200
chore(bench): fix clippy pedantic and nursery lints in bench (#1843)
---
core/bench/Cargo.toml | 5 +
core/bench/src/actors/consumer.rs | 218 ++++++++++------
core/bench/src/actors/producer.rs | 11 +-
core/bench/src/actors/producing_consumer.rs | 276 +++++++++++++--------
core/bench/src/analytics/metrics/group.rs | 220 +++++++++-------
core/bench/src/analytics/metrics/individual.rs | 258 ++++++++++++-------
core/bench/src/analytics/record.rs | 2 +-
core/bench/src/analytics/report_builder.rs | 10 +-
core/bench/src/analytics/time_series/calculator.rs | 24 +-
.../analytics/time_series/calculators/latency.rs | 3 +
.../time_series/calculators/throughput.rs | 6 +-
.../time_series/processors/moving_average.rs | 4 +-
core/bench/src/args/common.rs | 75 +++---
core/bench/src/args/defaults.rs | 2 +-
core/bench/src/args/examples.rs | 2 +-
core/bench/src/args/kind.rs | 42 ++--
.../src/args/kinds/balanced/consumer_group.rs | 10 +-
core/bench/src/args/kinds/balanced/producer.rs | 7 +-
.../kinds/balanced/producer_and_consumer_group.rs | 8 +-
.../args/kinds/end_to_end/producing_consumer.rs | 4 +-
.../kinds/end_to_end/producing_consumer_group.rs | 11 +-
core/bench/src/args/kinds/pinned/consumer.rs | 2 +-
core/bench/src/args/kinds/pinned/producer.rs | 2 +-
.../src/args/kinds/pinned/producer_and_consumer.rs | 7 +-
core/bench/src/args/props.rs | 2 +-
core/bench/src/args/transport.rs | 31 +--
.../src/benchmarks/balanced_consumer_group.rs | 13 +-
core/bench/src/benchmarks/balanced_producer.rs | 18 +-
.../balanced_producer_and_consumer_group.rs | 23 +-
core/bench/src/benchmarks/benchmark.rs | 82 +++---
core/bench/src/benchmarks/common.rs | 70 +++---
.../benchmarks/end_to_end_producing_consumer.rs | 18 +-
.../end_to_end_producing_consumer_group.rs | 10 +-
core/bench/src/benchmarks/pinned_consumer.rs | 10 +-
core/bench/src/benchmarks/pinned_producer.rs | 18 +-
.../src/benchmarks/pinned_producer_and_consumer.rs | 20 +-
core/bench/src/main.rs | 4 +-
core/bench/src/plot.rs | 24 +-
core/bench/src/runner.rs | 12 +-
core/bench/src/utils/cpu_name.rs | 6 +-
core/bench/src/utils/finish_condition.rs | 60 ++---
core/bench/src/utils/mod.rs | 106 +++++---
core/bench/src/utils/rate_limiter.rs | 6 +-
core/bench/src/utils/server_starter.rs | 105 ++++----
44 files changed, 1088 insertions(+), 759 deletions(-)
diff --git a/core/bench/Cargo.toml b/core/bench/Cargo.toml
index 3b2c6c64..ab5ec18d 100644
--- a/core/bench/Cargo.toml
+++ b/core/bench/Cargo.toml
@@ -50,3 +50,8 @@ uuid = { workspace = true }
[[bin]]
name = "iggy-bench"
path = "src/main.rs"
+
+[lints.clippy]
+enum_glob_use = "deny"
+pedantic = "deny"
+nursery = "deny"
diff --git a/core/bench/src/actors/consumer.rs
b/core/bench/src/actors/consumer.rs
index a00a7e80..8ea71282 100644
--- a/core/bench/src/actors/consumer.rs
+++ b/core/bench/src/actors/consumer.rs
@@ -85,6 +85,44 @@ impl BenchmarkConsumer {
}
pub async fn run(self) -> Result<BenchmarkIndividualMetrics, IggyError> {
+ let (client, stream_id, topic_id, partition_id, consumer,
rate_limiter) =
+ self.setup_client().await;
+
+ if self.warmup_time.get_duration() != Duration::from_millis(0) {
+ self.run_warmup(
+ &client,
+ &stream_id,
+ &topic_id,
+ partition_id,
+ &consumer,
+ rate_limiter.as_ref(),
+ )
+ .await?;
+ }
+
+ let metrics = self
+ .run_benchmark(
+ &client,
+ &stream_id,
+ &topic_id,
+ partition_id,
+ &consumer,
+ rate_limiter.as_ref(),
+ )
+ .await?;
+ Ok(metrics)
+ }
+
+ async fn setup_client(
+ &self,
+ ) -> (
+ IggyClient,
+ Identifier,
+ Identifier,
+ Option<u32>,
+ Consumer,
+ Option<BenchmarkRateLimiter>,
+ ) {
let topic_id: u32 = 1;
let default_partition_id: u32 = 1;
let client = self.client_factory.create_client().await;
@@ -99,77 +137,103 @@ impl BenchmarkConsumer {
Some(default_partition_id)
};
let cg_id = self.consumer_group_id;
- let consumer =
- create_consumer(&client, &cg_id, &stream_id, &topic_id,
self.consumer_id).await;
-
+ let consumer = create_consumer(
+ &client,
+ cg_id.as_ref(),
+ &stream_id,
+ &topic_id,
+ self.consumer_id,
+ )
+ .await;
let rate_limiter =
self.limit_bytes_per_second.map(BenchmarkRateLimiter::new);
- // -----------------------
- // WARM-UP
- // -----------------------
+ (
+ client,
+ stream_id,
+ topic_id,
+ partition_id,
+ consumer,
+ rate_limiter,
+ )
+ }
- if self.warmup_time.get_duration() != Duration::from_millis(0) {
- if let Some(cg_id) = self.consumer_group_id {
- info!(
- "Consumer #{}, part of consumer group #{}, → warming up
for {}...",
- self.consumer_id, cg_id, self.warmup_time
- );
- } else {
- info!(
- "Consumer #{} → warming up for {}...",
- self.consumer_id, self.warmup_time
- );
- }
+ async fn run_warmup(
+ &self,
+ client: &IggyClient,
+ stream_id: &Identifier,
+ topic_id: &Identifier,
+ partition_id: Option<u32>,
+ consumer: &Consumer,
+ rate_limiter: Option<&BenchmarkRateLimiter>,
+ ) -> Result<(), IggyError> {
+ if let Some(cg_id) = self.consumer_group_id {
+ info!(
+ "Consumer #{}, part of consumer group #{}, → warming up for
{}...",
+ self.consumer_id, cg_id, self.warmup_time
+ );
+ } else {
+ info!(
+ "Consumer #{} → warming up for {}...",
+ self.consumer_id, self.warmup_time
+ );
+ }
- let warmup_end = Instant::now() + self.warmup_time.get_duration();
- let mut messages_processed = 0;
- let mut last_batch_user_size_bytes = 0;
- while Instant::now() < warmup_end {
- if let Some(rate_limiter) = &rate_limiter {
- if last_batch_user_size_bytes > 0 {
- rate_limiter
- .wait_until_necessary(last_batch_user_size_bytes)
- .await;
- }
+ let warmup_end = Instant::now() + self.warmup_time.get_duration();
+ let mut messages_processed = 0;
+ let mut last_batch_user_size_bytes = 0;
+ while Instant::now() < warmup_end {
+ if let Some(rate_limiter) = rate_limiter {
+ if last_batch_user_size_bytes > 0 {
+ rate_limiter
+ .wait_until_necessary(last_batch_user_size_bytes)
+ .await;
}
- let messages_to_receive = self.messages_per_batch.get();
- let offset = messages_processed;
- let (strategy, auto_commit) = match self.polling_kind {
- PollingKind::Offset => (PollingStrategy::offset(offset),
false),
- PollingKind::Next => (PollingStrategy::next(), true),
- _ => panic!(
- "Unsupported polling kind for benchmark: {:?}",
- self.polling_kind
- ),
- };
- let polled_messages = client
- .poll_messages(
- &stream_id,
- &topic_id,
- partition_id,
- &consumer,
- &strategy,
- messages_to_receive,
- auto_commit,
- )
- .await?;
+ }
+ let messages_to_receive = self.messages_per_batch.get();
+ let offset = messages_processed;
+ let (strategy, auto_commit) = match self.polling_kind {
+ PollingKind::Offset => (PollingStrategy::offset(offset),
false),
+ PollingKind::Next => (PollingStrategy::next(), true),
+ _ => panic!(
+ "Unsupported polling kind for benchmark: {:?}",
+ self.polling_kind
+ ),
+ };
+ let polled_messages = client
+ .poll_messages(
+ stream_id,
+ topic_id,
+ partition_id,
+ consumer,
+ &strategy,
+ messages_to_receive,
+ auto_commit,
+ )
+ .await?;
- if polled_messages.messages.is_empty() {
- warn!(
- "Consumer #{} → Messages are empty for offset: {},
retrying...",
- self.consumer_id, offset
- );
- continue;
- }
- messages_processed = polled_messages.messages.len() as u64;
- last_batch_user_size_bytes =
batch_user_size_bytes(&polled_messages);
+ if polled_messages.messages.is_empty() {
+ warn!(
+ "Consumer #{} → Messages are empty for offset: {},
retrying...",
+ self.consumer_id, offset
+ );
+ continue;
}
+ messages_processed = polled_messages.messages.len() as u64;
+ last_batch_user_size_bytes =
batch_user_size_bytes(&polled_messages);
}
+ Ok(())
+ }
- // -----------------------
- // MAIN BENCHMARK
- // -----------------------
-
+ #[allow(clippy::too_many_lines)]
+ async fn run_benchmark(
+ &self,
+ client: &IggyClient,
+ stream_id: &Identifier,
+ topic_id: &Identifier,
+ partition_id: Option<u32>,
+ consumer: &Consumer,
+ rate_limiter: Option<&BenchmarkRateLimiter>,
+ ) -> Result<BenchmarkIndividualMetrics, IggyError> {
if let Some(cg_id) = self.consumer_group_id {
info!(
"Consumer #{}, part of consumer group #{} → polling {} in {}
messages per batch from stream {}, rate limit: {:?}...",
@@ -193,7 +257,6 @@ impl BenchmarkConsumer {
let max_capacity = self.finish_condition.max_capacity();
let mut records = Vec::with_capacity(max_capacity);
- let mut latencies: Vec<Duration> = Vec::with_capacity(max_capacity);
let mut skipped_warnings_count: u32 = 0;
let mut topic_not_found_counter = 0;
let mut initial_poll_timestamp: Option<Instant> = None;
@@ -222,10 +285,10 @@ impl BenchmarkConsumer {
let before_poll = Instant::now();
let polled_messages = client
.poll_messages(
- &stream_id,
- &topic_id,
+ stream_id,
+ topic_id,
partition_id,
- &consumer,
+ consumer,
&strategy,
messages_to_receive,
auto_commit,
@@ -249,9 +312,8 @@ impl BenchmarkConsumer {
if initial_poll_timestamp.is_none() {
initial_poll_timestamp = Some(before_poll);
}
- let should_warn = last_warning_time
- .map(|t| t.elapsed() >= Duration::from_secs(1))
- .unwrap_or(true);
+ let should_warn =
+ last_warning_time.is_none_or(|t| t.elapsed() >=
Duration::from_secs(1));
if should_warn {
warn!(
@@ -270,9 +332,8 @@ impl BenchmarkConsumer {
}
if polled_messages.messages.len() != messages_to_receive as usize {
- let should_warn = last_warning_time
- .map(|t| t.elapsed() >= Duration::from_secs(1))
- .unwrap_or(true);
+ let should_warn =
+ last_warning_time.is_none_or(|t| t.elapsed() >=
Duration::from_secs(1));
if should_warn {
warn!(
@@ -296,8 +357,6 @@ impl BenchmarkConsumer {
initial_poll_timestamp.unwrap_or(before_poll).elapsed()
};
- latencies.push(latency);
-
let batch_user_size_bytes =
batch_user_size_bytes(&polled_messages);
let batch_total_size_bytes =
batch_total_size_bytes(&polled_messages);
@@ -309,15 +368,16 @@ impl BenchmarkConsumer {
user_data_bytes_processed += batch_user_size_bytes;
records.push(BenchmarkRecord {
- elapsed_time_us: start_timestamp.elapsed().as_micros() as u64,
- latency_us: latency.as_micros() as u64,
+ elapsed_time_us:
u64::try_from(start_timestamp.elapsed().as_micros())
+ .unwrap_or(u64::MAX),
+ latency_us:
u64::try_from(latency.as_micros()).unwrap_or(u64::MAX),
messages: messages_processed,
message_batches: batches_processed,
user_data_bytes: user_data_bytes_processed,
total_bytes: bytes_processed,
});
- if let Some(rate_limiter) = &rate_limiter {
+ if let Some(rate_limiter) = rate_limiter {
rate_limiter
.wait_until_necessary(batch_user_size_bytes)
.await;
@@ -332,7 +392,7 @@ impl BenchmarkConsumer {
}
let metrics = from_records(
- records,
+ &records,
self.benchmark_kind,
ActorKind::Consumer,
self.consumer_id,
@@ -343,7 +403,7 @@ impl BenchmarkConsumer {
Self::log_statistics(
self.consumer_id,
messages_processed,
- batches_processed as u32,
+ u32::try_from(batches_processed).unwrap_or(u32::MAX),
&self.messages_per_batch,
&metrics,
);
diff --git a/core/bench/src/actors/producer.rs
b/core/bench/src/actors/producer.rs
index 972aed22..c32191ab 100644
--- a/core/bench/src/actors/producer.rs
+++ b/core/bench/src/actors/producer.rs
@@ -64,7 +64,7 @@ impl BenchmarkProducer {
moving_average_window: u32,
limit_bytes_per_second: Option<IggyByteSize>,
) -> Self {
- BenchmarkProducer {
+ Self {
client_factory,
benchmark_kind,
producer_id,
@@ -134,7 +134,6 @@ impl BenchmarkProducer {
);
let max_capacity = self.finish_condition.max_capacity();
- let mut latencies: Vec<Duration> = Vec::with_capacity(max_capacity);
let mut records: Vec<BenchmarkRecord> =
Vec::with_capacity(max_capacity);
let mut messages_processed = 0;
let mut batches_processed = 0;
@@ -158,10 +157,10 @@ impl BenchmarkProducer {
user_data_bytes_processed += batch.user_data_bytes;
total_bytes_processed += batch.total_bytes;
- latencies.push(latency);
records.push(BenchmarkRecord {
- elapsed_time_us: start_timestamp.elapsed().as_micros() as u64,
- latency_us: latency.as_micros() as u64,
+ elapsed_time_us:
u64::try_from(start_timestamp.elapsed().as_micros())
+ .unwrap_or(u64::MAX),
+ latency_us:
u64::try_from(latency.as_micros()).unwrap_or(u64::MAX),
messages: messages_processed,
message_batches: batches_processed,
user_data_bytes: user_data_bytes_processed,
@@ -183,7 +182,7 @@ impl BenchmarkProducer {
}
let metrics = from_records(
- records,
+ &records,
self.benchmark_kind,
ActorKind::Producer,
self.producer_id,
diff --git a/core/bench/src/actors/producing_consumer.rs
b/core/bench/src/actors/producing_consumer.rs
index f99d62e0..bc102ffd 100644
--- a/core/bench/src/actors/producing_consumer.rs
+++ b/core/bench/src/actors/producing_consumer.rs
@@ -93,12 +93,65 @@ impl BenchmarkProducingConsumer {
}
pub async fn run(self) -> Result<BenchmarkIndividualMetrics, IggyError> {
+ let (
+ client,
+ stream_id,
+ topic_id,
+ partitioning,
+ partition_id,
+ consumer,
+ mut batch_generator,
+ rate_limiter,
+ ) = self.setup_client().await?;
+
+ if self.warmup_time.get_duration() != Duration::from_millis(0) {
+ self.run_warmup(
+ &client,
+ &stream_id,
+ &topic_id,
+ &partitioning,
+ partition_id,
+ &consumer,
+ &mut batch_generator,
+ )
+ .await?;
+ }
+
+ let metrics = self
+ .run_benchmark(
+ client,
+ stream_id,
+ topic_id,
+ partitioning,
+ partition_id,
+ consumer,
+ batch_generator,
+ rate_limiter,
+ )
+ .await?;
+ Ok(metrics)
+ }
+
+ async fn setup_client(
+ &self,
+ ) -> Result<
+ (
+ IggyClient,
+ Identifier,
+ Identifier,
+ Partitioning,
+ Option<u32>,
+ Consumer,
+ BenchmarkBatchGenerator,
+ Option<BenchmarkRateLimiter>,
+ ),
+ IggyError,
+ > {
let topic_id: u32 = 1;
let default_partition_id: u32 = 1;
let client = self.client_factory.create_client().await;
let client = IggyClient::create(client, None, None);
-
login_root(&client).await;
let stream_id = self.stream_id.try_into()?;
@@ -116,91 +169,116 @@ impl BenchmarkProducingConsumer {
let consumer = create_consumer(
&client,
- &self.consumer_group_id,
+ self.consumer_group_id.as_ref(),
&stream_id,
&topic_id,
self.actor_id,
)
.await;
- let mut batch_generator =
+ let batch_generator =
BenchmarkBatchGenerator::new(self.message_size,
self.messages_per_batch);
-
let rate_limiter =
self.limit_bytes_per_second.map(BenchmarkRateLimiter::new);
- // -----------------------
- // WARM-UP
- // -----------------------
-
- if self.warmup_time.get_duration() != Duration::from_millis(0) {
- let warmup_end = Instant::now() + self.warmup_time.get_duration();
- let mut offset: u64 = 0;
- let mut last_warning_time: Option<Instant> = None;
- let mut skipped_warnings_count: u32 = 0;
-
- if let Some(cg_id) = self.consumer_group_id {
- info!(
- "ProducingConsumer #{}, part of consumer group #{}, →
warming up for {}...",
- self.actor_id, cg_id, self.warmup_time
- );
- } else {
- info!(
- "ProducingConsumer #{} → warming up for {}...",
- self.actor_id, self.warmup_time
- );
- }
- while Instant::now() < warmup_end {
- let batch = batch_generator.generate_batch();
- client
- .send_messages(&stream_id, &topic_id, &partitioning, &mut
batch.messages)
- .await?;
-
- let (strategy, auto_commit) = match self.polling_kind {
- PollingKind::Offset => (PollingStrategy::offset(offset),
false),
- PollingKind::Next => (PollingStrategy::next(), true),
- other => panic!("Unsupported polling kind for warmup:
{:?}", other),
- };
-
- let polled_messages = client
- .poll_messages(
- &stream_id,
- &topic_id,
- partition_id,
- &consumer,
- &strategy,
- batch.messages.len() as u32,
- auto_commit,
- )
- .await?;
+ Ok((
+ client,
+ stream_id,
+ topic_id,
+ partitioning,
+ partition_id,
+ consumer,
+ batch_generator,
+ rate_limiter,
+ ))
+ }
- if polled_messages.messages.is_empty() {
- let should_warn = last_warning_time
- .map(|t| t.elapsed() >= Duration::from_secs(1))
- .unwrap_or(true);
+ #[allow(clippy::too_many_arguments)]
+ async fn run_warmup(
+ &self,
+ client: &IggyClient,
+ stream_id: &Identifier,
+ topic_id: &Identifier,
+ partitioning: &Partitioning,
+ partition_id: Option<u32>,
+ consumer: &Consumer,
+ batch_generator: &mut BenchmarkBatchGenerator,
+ ) -> Result<(), IggyError> {
+ let warmup_end = Instant::now() + self.warmup_time.get_duration();
+ let mut offset: u64 = 0;
+ let mut last_warning_time: Option<Instant> = None;
+ let mut skipped_warnings_count: u32 = 0;
- if should_warn {
- warn!(
- "ProducingConsumer #{} (warmup) → expected {}
messages but got {}, retrying... ({} warnings skipped)",
- self.actor_id,
- self.messages_per_batch,
- polled_messages.messages.len(),
- skipped_warnings_count
- );
- last_warning_time = Some(Instant::now());
- skipped_warnings_count = 0;
- } else {
- skipped_warnings_count += 1;
- }
- continue;
+ if let Some(cg_id) = self.consumer_group_id {
+ info!(
+ "ProducingConsumer #{}, part of consumer group #{}, → warming
up for {}...",
+ self.actor_id, cg_id, self.warmup_time
+ );
+ } else {
+ info!(
+ "ProducingConsumer #{} → warming up for {}...",
+ self.actor_id, self.warmup_time
+ );
+ }
+ while Instant::now() < warmup_end {
+ let batch = batch_generator.generate_batch();
+ client
+ .send_messages(stream_id, topic_id, partitioning, &mut
batch.messages)
+ .await?;
+
+ let (strategy, auto_commit) = match self.polling_kind {
+ PollingKind::Offset => (PollingStrategy::offset(offset),
false),
+ PollingKind::Next => (PollingStrategy::next(), true),
+ other => panic!("Unsupported polling kind for warmup:
{other:?}"),
+ };
+
+ let polled_messages = client
+ .poll_messages(
+ stream_id,
+ topic_id,
+ partition_id,
+ consumer,
+ &strategy,
+ u32::try_from(batch.messages.len()).unwrap_or(u32::MAX),
+ auto_commit,
+ )
+ .await?;
+
+ if polled_messages.messages.is_empty() {
+ let should_warn =
+ last_warning_time.is_none_or(|t| t.elapsed() >=
Duration::from_secs(1));
+
+ if should_warn {
+ warn!(
+ "ProducingConsumer #{actor_id} (warmup) → expected
{messages_per_batch} messages but got {actual_count}, retrying...
({skipped_warnings_count} warnings skipped)",
+ actor_id = self.actor_id,
+ messages_per_batch = self.messages_per_batch,
+ actual_count = polled_messages.messages.len()
+ );
+ last_warning_time = Some(Instant::now());
+ skipped_warnings_count = 0;
+ } else {
+ skipped_warnings_count += 1;
}
-
- offset += batch.messages.len() as u64;
+ continue;
}
+
+ offset += u64::try_from(batch.messages.len()).unwrap_or(u64::MAX);
}
+ Ok(())
+ }
- // --------------------------------
- // MAIN BENCHMARK LOOP
- // --------------------------------
+ #[allow(clippy::too_many_arguments, clippy::too_many_lines)]
+ async fn run_benchmark(
+ self,
+ client: IggyClient,
+ stream_id: Identifier,
+ topic_id: Identifier,
+ partitioning: Partitioning,
+ partition_id: Option<u32>,
+ consumer: Consumer,
+ mut batch_generator: BenchmarkBatchGenerator,
+ rate_limiter: Option<BenchmarkRateLimiter>,
+ ) -> Result<BenchmarkIndividualMetrics, IggyError> {
info!(
"ProducingConsumer #{} → sending {} and polling {} ({} msgs/batch)
on stream {}, rate limit: {:?}",
self.actor_id,
@@ -215,7 +293,6 @@ impl BenchmarkProducingConsumer {
.send_finish_condition
.max_capacity()
.max(self.poll_finish_condition.max_capacity());
- let mut latencies: Vec<Duration> = Vec::with_capacity(max_capacity);
let mut records: Vec<BenchmarkRecord> =
Vec::with_capacity(max_capacity);
let mut offset = 0;
let mut last_warning_time: Option<Instant> = None;
@@ -243,7 +320,7 @@ impl BenchmarkProducingConsumer {
let is_consumer = self.poll_finish_condition.total() > 0;
let require_reply = is_producer && is_consumer &&
self.consumer_group_id.is_none();
- let mut awaiting_reply = false; // meaningful only if require_reply ==
true
+ let mut awaiting_reply = false;
let start_timestamp = Instant::now();
@@ -262,7 +339,7 @@ impl BenchmarkProducingConsumer {
sent_total_bytes_processed += batch.total_bytes;
sent_user_bytes_processed += batch.user_data_bytes;
- sent_messages += batch.messages.len() as u32;
+ sent_messages +=
u32::try_from(batch.messages.len()).unwrap_or(u32::MAX);
sent_batches += 1;
awaiting_reply = is_consumer;
@@ -272,14 +349,14 @@ impl BenchmarkProducingConsumer {
.account_and_check(batch.user_data_bytes)
{
info!(
- "ProducingConsumer #{} → finished sending {} messages
in {} batches ({} bytes of user data, {} bytes of total data), send finish
condition: {}, poll finish condition: {}",
- self.actor_id,
- sent_messages.human_count_bare(),
- sent_batches.human_count_bare(),
- sent_user_bytes_processed.human_count_bytes(),
- sent_total_bytes_processed.human_count_bytes(),
- self.send_finish_condition.status(),
- self.poll_finish_condition.status()
+ "ProducingConsumer #{actor_id} → finished sending
{sent_messages} messages in {sent_batches} batches ({sent_user_bytes_processed}
bytes of user data, {sent_total_bytes_processed} bytes of total data), send
finish condition: {send_status}, poll finish condition: {poll_status}",
+ actor_id = self.actor_id,
+ sent_messages = sent_messages.human_count_bare(),
+ sent_batches = sent_batches.human_count_bare(),
+ sent_user_bytes_processed =
sent_user_bytes_processed.human_count_bytes(),
+ sent_total_bytes_processed =
sent_total_bytes_processed.human_count_bytes(),
+ send_status = self.send_finish_condition.status(),
+ poll_status = self.poll_finish_condition.status()
);
}
}
@@ -288,7 +365,7 @@ impl BenchmarkProducingConsumer {
let (strategy, auto_commit) = match self.polling_kind {
PollingKind::Offset => (PollingStrategy::offset(offset),
false),
PollingKind::Next => (PollingStrategy::next(), true),
- other => panic!("Unsupported polling kind for benchmark:
{:?}", other),
+ other => panic!("Unsupported polling kind for benchmark:
{other:?}"),
};
let polled_messages = client
@@ -304,18 +381,16 @@ impl BenchmarkProducingConsumer {
.await?;
if polled_messages.messages.is_empty() {
- let should_warn = last_warning_time
- .map(|t| t.elapsed() >= Duration::from_secs(1))
- .unwrap_or(true);
+ let should_warn =
+ last_warning_time.is_none_or(|t| t.elapsed() >=
Duration::from_secs(1));
if should_warn {
warn!(
- "ProducingConsumer #{} → received empty batch,
sent: {}, polled: {}, polling kind: {:?}, retrying... ({} warnings skipped in
last second)",
- self.actor_id,
- self.send_finish_condition.status(),
- self.poll_finish_condition.status(),
- self.polling_kind,
- skipped_warnings_count
+ "ProducingConsumer #{actor_id} → received empty
batch, sent: {send_status}, polled: {poll_status}, polling kind:
{polling_kind:?}, retrying... ({skipped_warnings_count} warnings skipped in
last second)",
+ actor_id = self.actor_id,
+ send_status = self.send_finish_condition.status(),
+ poll_status = self.poll_finish_condition.status(),
+ polling_kind = self.polling_kind
);
last_warning_time = Some(Instant::now());
skipped_warnings_count = 0;
@@ -330,17 +405,17 @@ impl BenchmarkProducingConsumer {
let latency = Duration::from_micros(
now - polled_messages.messages[0].header.origin_timestamp,
);
- latencies.push(latency);
last_received_batch_user_data_bytes =
batch_user_size_bytes(&polled_messages);
rl_value += last_received_batch_user_data_bytes;
received_user_bytes_processed +=
last_received_batch_user_data_bytes;
received_total_bytes_processed +=
batch_total_size_bytes(&polled_messages);
- received_messages += polled_messages.messages.len() as u32;
+ received_messages +=
+
u32::try_from(polled_messages.messages.len()).unwrap_or(u32::MAX);
received_batches += 1;
- offset += polled_messages.messages.len() as u64;
+ offset +=
u64::try_from(polled_messages.messages.len()).unwrap_or(u64::MAX);
total_user_data_bytes_processed =
received_user_bytes_processed + sent_user_bytes_processed;
@@ -349,9 +424,10 @@ impl BenchmarkProducingConsumer {
total_batches_processed = received_batches + sent_batches;
records.push(BenchmarkRecord {
- elapsed_time_us: start_timestamp.elapsed().as_micros() as
u64,
- latency_us: latency.as_micros() as u64,
- messages: total_messages_processed as u64,
+ elapsed_time_us:
u64::try_from(start_timestamp.elapsed().as_micros())
+ .unwrap_or(u64::MAX),
+ latency_us:
u64::try_from(latency.as_micros()).unwrap_or(u64::MAX),
+ messages: u64::from(total_messages_processed),
message_batches: total_batches_processed,
user_data_bytes: total_user_data_bytes_processed,
total_bytes: total_bytes_processed,
@@ -372,7 +448,7 @@ impl BenchmarkProducingConsumer {
}
let metrics = from_records(
- records,
+ &records,
self.benchmark_kind,
ActorKind::ProducingConsumer,
self.actor_id,
@@ -382,7 +458,7 @@ impl BenchmarkProducingConsumer {
Self::log_statistics(
self.actor_id,
- total_messages_processed as u64,
+ u64::from(total_messages_processed),
total_batches_processed,
&self.messages_per_batch,
&metrics,
diff --git a/core/bench/src/analytics/metrics/group.rs
b/core/bench/src/analytics/metrics/group.rs
index 47aaf83e..dd585e37 100644
--- a/core/bench/src/analytics/metrics/group.rs
+++ b/core/bench/src/analytics/metrics/group.rs
@@ -16,14 +16,20 @@
* under the License.
*/
+#![allow(clippy::cast_precision_loss)]
+#![allow(clippy::struct_field_names)]
+
use crate::analytics::time_series::{
calculator::TimeSeriesCalculator,
processors::{TimeSeriesProcessor, moving_average::MovingAverageProcessor},
};
use bench_report::{
- actor_kind::ActorKind, group_metrics::BenchmarkGroupMetrics,
- group_metrics_kind::GroupMetricsKind,
group_metrics_summary::BenchmarkGroupMetricsSummary,
- individual_metrics::BenchmarkIndividualMetrics, utils::*,
+ actor_kind::ActorKind,
+ group_metrics::BenchmarkGroupMetrics,
+ group_metrics_kind::GroupMetricsKind,
+ group_metrics_summary::BenchmarkGroupMetricsSummary,
+ individual_metrics::BenchmarkIndividualMetrics,
+ utils::{max, min, std_dev},
};
pub fn from_producers_and_consumers_statistics(
@@ -46,78 +52,132 @@ pub fn from_individual_metrics(
if stats.is_empty() {
return None;
}
- let count = stats.len() as f64;
- // Compute aggregate throughput MB/s
- let total_throughput_megabytes_per_second: f64 = stats
+ let throughput_metrics = calculate_throughput_metrics(stats);
+ let latency_metrics = calculate_latency_metrics(stats);
+ let kind = determine_group_kind(stats);
+ let time_series = calculate_group_time_series(stats,
moving_average_window);
+ let (min_latency_ms_value, max_latency_ms_value) =
+ calculate_min_max_latencies(stats, &time_series.2);
+
+ let summary = BenchmarkGroupMetricsSummary {
+ kind,
+ total_throughput_megabytes_per_second:
throughput_metrics.total_megabytes_per_sec,
+ total_throughput_messages_per_second:
throughput_metrics.total_messages_per_sec,
+ average_throughput_megabytes_per_second:
throughput_metrics.average_megabytes_per_sec,
+ average_throughput_messages_per_second:
throughput_metrics.average_messages_per_sec,
+ average_p50_latency_ms: latency_metrics.p50_latency,
+ average_p90_latency_ms: latency_metrics.p90_latency,
+ average_p95_latency_ms: latency_metrics.p95_latency,
+ average_p99_latency_ms: latency_metrics.p99_latency,
+ average_p999_latency_ms: latency_metrics.p999_latency,
+ average_p9999_latency_ms: latency_metrics.p9999_latency,
+ average_latency_ms: latency_metrics.average_latency,
+ average_median_latency_ms: latency_metrics.median_latency,
+ min_latency_ms: min_latency_ms_value,
+ max_latency_ms: max_latency_ms_value,
+ std_dev_latency_ms: std_dev(&time_series.2).unwrap_or(0.0),
+ };
+
+ Some(BenchmarkGroupMetrics {
+ summary,
+ avg_throughput_mb_ts: time_series.0,
+ avg_throughput_msg_ts: time_series.1,
+ avg_latency_ts: time_series.2,
+ })
+}
+
+struct ThroughputMetrics {
+ total_megabytes_per_sec: f64,
+ total_messages_per_sec: f64,
+ average_megabytes_per_sec: f64,
+ average_messages_per_sec: f64,
+}
+
+fn calculate_throughput_metrics(stats: &[BenchmarkIndividualMetrics]) ->
ThroughputMetrics {
+ let count = stats.len() as f64;
+ let total_mb_per_sec = stats
.iter()
.map(|r| r.summary.throughput_megabytes_per_second)
.sum();
-
- // Compute aggregate throughput messages/s
- let total_throughput_messages_per_second: f64 = stats
+ let total_msg_per_sec = stats
.iter()
.map(|r| r.summary.throughput_messages_per_second)
.sum();
- // Compute average throughput MB/s
- let average_throughput_megabytes_per_second =
total_throughput_megabytes_per_second / count;
-
- // Compute average throughput messages/s
- let average_throughput_messages_per_second =
total_throughput_messages_per_second / count;
-
- // Compute average latencies
- let average_p50_latency_ms =
- stats.iter().map(|r| r.summary.p50_latency_ms).sum::<f64>() / count;
- let average_p90_latency_ms =
- stats.iter().map(|r| r.summary.p90_latency_ms).sum::<f64>() / count;
- let average_p95_latency_ms =
- stats.iter().map(|r| r.summary.p95_latency_ms).sum::<f64>() / count;
- let average_p99_latency_ms =
- stats.iter().map(|r| r.summary.p99_latency_ms).sum::<f64>() / count;
- let average_p999_latency_ms: f64 =
- stats.iter().map(|r| r.summary.p999_latency_ms).sum::<f64>() / count;
- let average_p9999_latency_ms: f64 = stats
- .iter()
- .map(|r| r.summary.p9999_latency_ms)
- .sum::<f64>()
- / count;
- let average_avg_latency_ms =
- stats.iter().map(|r| r.summary.avg_latency_ms).sum::<f64>() / count;
- let average_median_latency_ms = stats
- .iter()
- .map(|r| r.summary.median_latency_ms)
- .sum::<f64>()
- / count;
+ ThroughputMetrics {
+ total_megabytes_per_sec: total_mb_per_sec,
+ total_messages_per_sec: total_msg_per_sec,
+ average_megabytes_per_sec: total_mb_per_sec / count,
+ average_messages_per_sec: total_msg_per_sec / count,
+ }
+}
+
+struct LatencyMetrics {
+ p50_latency: f64,
+ p90_latency: f64,
+ p95_latency: f64,
+ p99_latency: f64,
+ p999_latency: f64,
+ p9999_latency: f64,
+ average_latency: f64,
+ median_latency: f64,
+}
+
+fn calculate_latency_metrics(stats: &[BenchmarkIndividualMetrics]) ->
LatencyMetrics {
+ let count = stats.len() as f64;
+
+ LatencyMetrics {
+ p50_latency: stats.iter().map(|r|
r.summary.p50_latency_ms).sum::<f64>() / count,
+ p90_latency: stats.iter().map(|r|
r.summary.p90_latency_ms).sum::<f64>() / count,
+ p95_latency: stats.iter().map(|r|
r.summary.p95_latency_ms).sum::<f64>() / count,
+ p99_latency: stats.iter().map(|r|
r.summary.p99_latency_ms).sum::<f64>() / count,
+ p999_latency: stats.iter().map(|r|
r.summary.p999_latency_ms).sum::<f64>() / count,
+ p9999_latency: stats
+ .iter()
+ .map(|r| r.summary.p9999_latency_ms)
+ .sum::<f64>()
+ / count,
+ average_latency: stats.iter().map(|r|
r.summary.avg_latency_ms).sum::<f64>() / count,
+ median_latency: stats
+ .iter()
+ .map(|r| r.summary.median_latency_ms)
+ .sum::<f64>()
+ / count,
+ }
+}
- let kind = match stats.iter().next().unwrap().summary.actor_kind {
+fn determine_group_kind(stats: &[BenchmarkIndividualMetrics]) ->
GroupMetricsKind {
+ match stats.iter().next().unwrap().summary.actor_kind {
ActorKind::Producer => GroupMetricsKind::Producers,
ActorKind::Consumer => GroupMetricsKind::Consumers,
ActorKind::ProducingConsumer => GroupMetricsKind::ProducingConsumers,
- };
+ }
+}
- let calculator = TimeSeriesCalculator::new();
+use bench_report::time_series::TimeSeries;
- let mut avg_throughput_mb_ts = calculator.aggregate_sum(
- stats
+fn calculate_group_time_series(
+ stats: &[BenchmarkIndividualMetrics],
+ moving_average_window: u32,
+) -> (TimeSeries, TimeSeries, TimeSeries) {
+ let mut avg_throughput_mb_ts = TimeSeriesCalculator::aggregate_sum(
+ &stats
.iter()
.map(|r| r.throughput_mb_ts.clone())
- .collect::<Vec<_>>()
- .as_slice(),
+ .collect::<Vec<_>>(),
);
- let mut avg_throughput_msg_ts = calculator.aggregate_sum(
- stats
+ let mut avg_throughput_msg_ts = TimeSeriesCalculator::aggregate_sum(
+ &stats
.iter()
.map(|r| r.throughput_msg_ts.clone())
- .collect::<Vec<_>>()
- .as_slice(),
+ .collect::<Vec<_>>(),
);
- let mut avg_latency_ts = calculator.aggregate_avg(
- stats
+ let mut avg_latency_ts = TimeSeriesCalculator::aggregate_avg(
+ &stats
.iter()
.map(|r| r.latency_ts.clone())
- .collect::<Vec<_>>()
- .as_slice(),
+ .collect::<Vec<_>>(),
);
let sma = MovingAverageProcessor::new(moving_average_window as usize);
@@ -125,55 +185,33 @@ pub fn from_individual_metrics(
avg_throughput_msg_ts = sma.process(&avg_throughput_msg_ts);
avg_latency_ts = sma.process(&avg_latency_ts);
- let min_latency_ms = if !stats.is_empty() {
+ (avg_throughput_mb_ts, avg_throughput_msg_ts, avg_latency_ts)
+}
+
+fn calculate_min_max_latencies(
+ stats: &[BenchmarkIndividualMetrics],
+ avg_latency_ts: &TimeSeries,
+) -> (f64, f64) {
+ let min_latency_ms = if stats.is_empty() {
+ None
+ } else {
stats
.iter()
.map(|s| s.summary.min_latency_ms)
.min_by(|a, b|
a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal))
- } else {
- None
};
- let max_latency_ms = if !stats.is_empty() {
+ let max_latency_ms = if stats.is_empty() {
+ None
+ } else {
stats
.iter()
.map(|s| s.summary.max_latency_ms)
.max_by(|a, b|
a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal))
- } else {
- None
};
- let min_latency_ms_value =
- min_latency_ms.unwrap_or_else(|| min(&avg_latency_ts).unwrap_or(0.0));
+ let min_latency_ms_value = min_latency_ms.unwrap_or_else(||
min(avg_latency_ts).unwrap_or(0.0));
+ let max_latency_ms_value = max_latency_ms.unwrap_or_else(||
max(avg_latency_ts).unwrap_or(0.0));
- let max_latency_ms_value =
- max_latency_ms.unwrap_or_else(|| max(&avg_latency_ts).unwrap_or(0.0));
-
- let std_dev_latency_ms = std_dev(&avg_latency_ts);
-
- let summary = BenchmarkGroupMetricsSummary {
- kind,
- total_throughput_megabytes_per_second,
- total_throughput_messages_per_second,
- average_throughput_megabytes_per_second,
- average_throughput_messages_per_second,
- average_p50_latency_ms,
- average_p90_latency_ms,
- average_p95_latency_ms,
- average_p99_latency_ms,
- average_p999_latency_ms,
- average_p9999_latency_ms,
- average_latency_ms: average_avg_latency_ms,
- average_median_latency_ms,
- min_latency_ms: min_latency_ms_value,
- max_latency_ms: max_latency_ms_value,
- std_dev_latency_ms: std_dev_latency_ms.unwrap_or(0.0),
- };
-
- Some(BenchmarkGroupMetrics {
- summary,
- avg_throughput_mb_ts,
- avg_throughput_msg_ts,
- avg_latency_ts,
- })
+ (min_latency_ms_value, max_latency_ms_value)
}
diff --git a/core/bench/src/analytics/metrics/individual.rs
b/core/bench/src/analytics/metrics/individual.rs
index 30ec44af..f9746a0e 100644
--- a/core/bench/src/analytics/metrics/individual.rs
+++ b/core/bench/src/analytics/metrics/individual.rs
@@ -16,6 +16,10 @@
* under the License.
*/
+#![allow(clippy::cast_precision_loss)]
+#![allow(clippy::cast_possible_truncation)]
+#![allow(clippy::cast_sign_loss)]
+
use crate::analytics::record::BenchmarkRecord;
use crate::analytics::time_series::calculator::TimeSeriesCalculator;
use crate::analytics::time_series::processors::TimeSeriesProcessor;
@@ -25,11 +29,11 @@ use bench_report::benchmark_kind::BenchmarkKind;
use bench_report::individual_metrics::BenchmarkIndividualMetrics;
use
bench_report::individual_metrics_summary::BenchmarkIndividualMetricsSummary;
use bench_report::time_series::TimeSeries;
-use bench_report::utils::*;
+use bench_report::utils::{max, min, std_dev};
use iggy::prelude::IggyDuration;
pub fn from_records(
- records: Vec<BenchmarkRecord>,
+ records: &[BenchmarkRecord],
benchmark_kind: BenchmarkKind,
actor_kind: ActorKind,
actor_id: u32,
@@ -37,52 +41,131 @@ pub fn from_records(
moving_average_window: u32,
) -> BenchmarkIndividualMetrics {
if records.is_empty() {
- return BenchmarkIndividualMetrics {
- summary: BenchmarkIndividualMetricsSummary {
- benchmark_kind,
- actor_kind,
- actor_id,
- total_time_secs: 0.0,
- total_user_data_bytes: 0,
- total_bytes: 0,
- total_messages: 0,
- total_message_batches: 0,
- throughput_megabytes_per_second: 0.0,
- throughput_messages_per_second: 0.0,
- p50_latency_ms: 0.0,
- p90_latency_ms: 0.0,
- p95_latency_ms: 0.0,
- p99_latency_ms: 0.0,
- p999_latency_ms: 0.0,
- p9999_latency_ms: 0.0,
- avg_latency_ms: 0.0,
- median_latency_ms: 0.0,
- min_latency_ms: 0.0,
- max_latency_ms: 0.0,
- std_dev_latency_ms: 0.0,
- },
- throughput_mb_ts: TimeSeries::default(),
- throughput_msg_ts: TimeSeries::default(),
- latency_ts: TimeSeries::default(),
- };
+ return create_empty_metrics(benchmark_kind, actor_kind, actor_id);
}
- let total_time_secs = records.last().unwrap().elapsed_time_us as f64 /
1_000_000.0;
+ let (
+ total_time_secs,
+ total_user_data_bytes,
+ total_bytes,
+ total_messages,
+ total_message_batches,
+ ) = extract_totals(records);
+
+ let (throughput_mb_ts, throughput_msg_ts, latency_ts) =
+ calculate_time_series(records, sampling_time, moving_average_window);
+
+ let (throughput_megabytes_per_second, throughput_messages_per_second) =
calculate_throughput(
+ &throughput_mb_ts,
+ &throughput_msg_ts,
+ total_time_secs,
+ total_user_data_bytes,
+ total_messages,
+ );
+
+ let latency_metrics = calculate_latency_metrics(records, &latency_ts);
- let total_user_data_bytes = records.iter().last().unwrap().user_data_bytes;
- let total_bytes = records.iter().last().unwrap().total_bytes;
- let total_messages = records.iter().last().unwrap().messages;
- let total_message_batches = records.iter().last().unwrap().message_batches;
+ BenchmarkIndividualMetrics {
+ summary: BenchmarkIndividualMetricsSummary {
+ benchmark_kind,
+ actor_kind,
+ actor_id,
+ total_time_secs,
+ total_user_data_bytes,
+ total_bytes,
+ total_messages,
+ total_message_batches,
+ throughput_megabytes_per_second,
+ throughput_messages_per_second,
+ p50_latency_ms: latency_metrics.p50,
+ p90_latency_ms: latency_metrics.p90,
+ p95_latency_ms: latency_metrics.p95,
+ p99_latency_ms: latency_metrics.p99,
+ p999_latency_ms: latency_metrics.p999,
+ p9999_latency_ms: latency_metrics.p9999,
+ avg_latency_ms: latency_metrics.avg,
+ median_latency_ms: latency_metrics.median,
+ min_latency_ms: latency_metrics.min,
+ max_latency_ms: latency_metrics.max,
+ std_dev_latency_ms: latency_metrics.std_dev,
+ },
+ throughput_mb_ts,
+ throughput_msg_ts,
+ latency_ts,
+ }
+}
+
+fn create_empty_metrics(
+ benchmark_kind: BenchmarkKind,
+ actor_kind: ActorKind,
+ actor_id: u32,
+) -> BenchmarkIndividualMetrics {
+ BenchmarkIndividualMetrics {
+ summary: BenchmarkIndividualMetricsSummary {
+ benchmark_kind,
+ actor_kind,
+ actor_id,
+ total_time_secs: 0.0,
+ total_user_data_bytes: 0,
+ total_bytes: 0,
+ total_messages: 0,
+ total_message_batches: 0,
+ throughput_megabytes_per_second: 0.0,
+ throughput_messages_per_second: 0.0,
+ p50_latency_ms: 0.0,
+ p90_latency_ms: 0.0,
+ p95_latency_ms: 0.0,
+ p99_latency_ms: 0.0,
+ p999_latency_ms: 0.0,
+ p9999_latency_ms: 0.0,
+ avg_latency_ms: 0.0,
+ median_latency_ms: 0.0,
+ min_latency_ms: 0.0,
+ max_latency_ms: 0.0,
+ std_dev_latency_ms: 0.0,
+ },
+ throughput_mb_ts: TimeSeries::default(),
+ throughput_msg_ts: TimeSeries::default(),
+ latency_ts: TimeSeries::default(),
+ }
+}
- let calculator = TimeSeriesCalculator::new();
+fn extract_totals(records: &[BenchmarkRecord]) -> (f64, u64, u64, u64, u64) {
+ let last_record = records.last().unwrap();
+ let total_time_secs = last_record.elapsed_time_us as f64 / 1_000_000.0;
+ (
+ total_time_secs,
+ last_record.user_data_bytes,
+ last_record.total_bytes,
+ last_record.messages,
+ last_record.message_batches,
+ )
+}
- let throughput_mb_ts = calculator.throughput_mb(&records, sampling_time);
- let throughput_msg_ts = calculator.throughput_msg(&records, sampling_time);
+fn calculate_time_series(
+ records: &[BenchmarkRecord],
+ sampling_time: IggyDuration,
+ moving_average_window: u32,
+) -> (TimeSeries, TimeSeries, TimeSeries) {
+ let throughput_mb_ts = TimeSeriesCalculator::throughput_mb(records,
sampling_time);
+ let throughput_msg_ts = TimeSeriesCalculator::throughput_msg(records,
sampling_time);
let sma = MovingAverageProcessor::new(moving_average_window as usize);
let throughput_mb_ts = sma.process(&throughput_mb_ts);
let throughput_msg_ts = sma.process(&throughput_msg_ts);
+ let latency_ts = TimeSeriesCalculator::latency(records, sampling_time);
+
+ (throughput_mb_ts, throughput_msg_ts, latency_ts)
+}
+
+fn calculate_throughput(
+ throughput_mb_ts: &TimeSeries,
+ throughput_msg_ts: &TimeSeries,
+ total_time_secs: f64,
+ total_user_data_bytes: u64,
+ total_messages: u64,
+) -> (f64, f64) {
let throughput_megabytes_per_second = if
!throughput_mb_ts.points.is_empty() {
throughput_mb_ts
.points
@@ -91,7 +174,7 @@ pub fn from_records(
.sum::<f64>()
/ throughput_mb_ts.points.len() as f64
} else if total_time_secs > 0.0 {
- (total_user_data_bytes as f64) / 1_000_000.0 / total_time_secs
+ total_user_data_bytes as f64 / 1_000_000.0 / total_time_secs
} else {
0.0
};
@@ -104,65 +187,72 @@ pub fn from_records(
.sum::<f64>()
/ throughput_msg_ts.points.len() as f64
} else if total_time_secs > 0.0 {
- (total_messages as f64) / total_time_secs
+ total_messages as f64 / total_time_secs
} else {
0.0
};
+ (
+ throughput_megabytes_per_second,
+ throughput_messages_per_second,
+ )
+}
+
+struct LatencyMetrics {
+ p50: f64,
+ p90: f64,
+ p95: f64,
+ p99: f64,
+ p999: f64,
+ p9999: f64,
+ avg: f64,
+ median: f64,
+ min: f64,
+ max: f64,
+ std_dev: f64,
+}
+
+fn calculate_latency_metrics(
+ records: &[BenchmarkRecord],
+ latency_ts: &TimeSeries,
+) -> LatencyMetrics {
let mut latencies_ms: Vec<f64> = records
.iter()
.map(|r| r.latency_us as f64 / 1_000.0)
.collect();
latencies_ms.sort_by(|a, b| a.partial_cmp(b).unwrap());
- let p50_latency_ms = calculate_percentile(&latencies_ms, 50.0);
- let p90_latency_ms = calculate_percentile(&latencies_ms, 90.0);
- let p95_latency_ms = calculate_percentile(&latencies_ms, 95.0);
- let p99_latency_ms = calculate_percentile(&latencies_ms, 99.0);
- let p999_latency_ms = calculate_percentile(&latencies_ms, 99.9);
- let p9999_latency_ms = calculate_percentile(&latencies_ms, 99.99);
+ let p50 = calculate_percentile(&latencies_ms, 50.0);
+ let p90 = calculate_percentile(&latencies_ms, 90.0);
+ let p95 = calculate_percentile(&latencies_ms, 95.0);
+ let p99 = calculate_percentile(&latencies_ms, 99.0);
+ let p999 = calculate_percentile(&latencies_ms, 99.9);
+ let p9999 = calculate_percentile(&latencies_ms, 99.99);
- let avg_latency_ms = latencies_ms.iter().sum::<f64>() / latencies_ms.len()
as f64;
+ let avg = latencies_ms.iter().sum::<f64>() / latencies_ms.len() as f64;
let len = latencies_ms.len() / 2;
- let median_latency_ms = if latencies_ms.len() % 2 == 0 {
- (latencies_ms[len - 1] + latencies_ms[len]) / 2.0
+ let median = if latencies_ms.len() % 2 == 0 {
+ f64::midpoint(latencies_ms[len - 1], latencies_ms[len])
} else {
latencies_ms[len]
};
- let latency_ts = calculator.latency(&records, sampling_time);
+ let min = min(latency_ts).unwrap_or(0.0);
+ let max = max(latency_ts).unwrap_or(0.0);
+ let std_dev = std_dev(latency_ts).unwrap_or(0.0);
- let min_latency_ms = min(&latency_ts).unwrap_or(0.0);
- let max_latency_ms = max(&latency_ts).unwrap_or(0.0);
- let std_dev_latency_ms = std_dev(&latency_ts).unwrap_or(0.0);
-
- BenchmarkIndividualMetrics {
- summary: BenchmarkIndividualMetricsSummary {
- benchmark_kind,
- actor_kind,
- actor_id,
- total_time_secs,
- total_user_data_bytes,
- total_bytes,
- total_messages,
- total_message_batches,
- throughput_megabytes_per_second,
- throughput_messages_per_second,
- p50_latency_ms,
- p90_latency_ms,
- p95_latency_ms,
- p99_latency_ms,
- p999_latency_ms,
- p9999_latency_ms,
- avg_latency_ms,
- median_latency_ms,
- min_latency_ms,
- max_latency_ms,
- std_dev_latency_ms,
- },
- throughput_mb_ts,
- throughput_msg_ts,
- latency_ts,
+ LatencyMetrics {
+ p50,
+ p90,
+ p95,
+ p99,
+ p999,
+ p9999,
+ avg,
+ median,
+ min,
+ max,
+ std_dev,
}
}
@@ -172,13 +262,13 @@ fn calculate_percentile(sorted_data: &[f64], percentile:
f64) -> f64 {
}
let rank = percentile / 100.0 * (sorted_data.len() - 1) as f64;
- let lower = rank.floor() as usize;
- let upper = rank.ceil() as usize;
+ let lower = rank.floor().clamp(0.0, (sorted_data.len() - 1) as f64) as
usize;
+ let upper = rank.ceil().clamp(0.0, (sorted_data.len() - 1) as f64) as
usize;
if upper >= sorted_data.len() {
return sorted_data[sorted_data.len() - 1];
}
let weight = rank - lower as f64;
- sorted_data[lower] * (1.0 - weight) + sorted_data[upper] * weight
+ sorted_data[lower].mul_add(1.0 - weight, sorted_data[upper] * weight)
}
diff --git a/core/bench/src/analytics/record.rs
b/core/bench/src/analytics/record.rs
index 979bf051..168e8633 100644
--- a/core/bench/src/analytics/record.rs
+++ b/core/bench/src/analytics/record.rs
@@ -16,7 +16,7 @@
* under the License.
*/
-#[derive(Debug, Clone, PartialEq)]
+#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BenchmarkRecord {
pub elapsed_time_us: u64,
pub latency_us: u64,
diff --git a/core/bench/src/analytics/report_builder.rs
b/core/bench/src/analytics/report_builder.rs
index ffd4dc74..248708e0 100644
--- a/core/bench/src/analytics/report_builder.rs
+++ b/core/bench/src/analytics/report_builder.rs
@@ -35,6 +35,7 @@ use iggy::prelude::{CacheMetrics, CacheMetricsKey,
IggyTimestamp, Stats};
pub struct BenchmarkReportBuilder;
impl BenchmarkReportBuilder {
+ #[allow(clippy::cast_possible_wrap)]
pub async fn build(
hardware: BenchmarkHardware,
mut params: BenchmarkParams,
@@ -45,8 +46,7 @@ impl BenchmarkReportBuilder {
let timestamp =
DateTime::<Utc>::from_timestamp_micros(IggyTimestamp::now().as_micros() as i64)
- .map(|dt| dt.to_rfc3339())
- .unwrap_or_else(|| String::from("unknown"));
+ .map_or_else(|| String::from("unknown"), |dt| dt.to_rfc3339());
let transport = params.transport;
let server_addr = params.server_address.clone();
@@ -57,7 +57,7 @@ impl BenchmarkReportBuilder {
if params.gitref.is_none() {
params.gitref = Some(server_stats.iggy_server_version.clone());
- };
+ }
if params.gitref_date.is_none() {
params.gitref_date = Some(timestamp.clone());
@@ -134,7 +134,7 @@ impl BenchmarkReportBuilder {
}
/// This function is a workaround.
-/// See server_stats.rs in `bench_report` crate for more details.
+/// See `server_stats.rs` in `bench_report` crate for more details.
fn stats_to_benchmark_server_stats(stats: Stats) -> BenchmarkServerStats {
BenchmarkServerStats {
process_id: stats.process_id,
@@ -166,7 +166,7 @@ fn stats_to_benchmark_server_stats(stats: Stats) ->
BenchmarkServerStats {
}
/// This function is a workaround.
-/// See server_stats.rs in `bench_report` crate for more details.
+/// See `server_stats.rs` in `bench_report` crate for more details.
fn cache_metrics_to_benchmark_cache_metrics(
cache_metrics: HashMap<CacheMetricsKey, CacheMetrics>,
) -> HashMap<BenchmarkCacheMetricsKey, BenchmarkCacheMetrics> {
diff --git a/core/bench/src/analytics/time_series/calculator.rs
b/core/bench/src/analytics/time_series/calculator.rs
index 8d1a6df1..42cf211d 100644
--- a/core/bench/src/analytics/time_series/calculator.rs
+++ b/core/bench/src/analytics/time_series/calculator.rs
@@ -16,6 +16,8 @@
* under the License.
*/
+#![allow(clippy::cast_precision_loss)]
+
use super::calculators::{
LatencyTimeSeriesCalculator, MBThroughputCalculator,
MessageThroughputCalculator,
ThroughputTimeSeriesCalculator, TimeSeriesCalculation,
@@ -29,34 +31,22 @@ use tracing::warn;
pub struct TimeSeriesCalculator;
impl TimeSeriesCalculator {
- pub fn new() -> Self {
- Self
- }
-
- pub fn throughput_mb(
- &self,
- records: &[BenchmarkRecord],
- bucket_size: IggyDuration,
- ) -> TimeSeries {
+ pub fn throughput_mb(records: &[BenchmarkRecord], bucket_size:
IggyDuration) -> TimeSeries {
let calculator =
ThroughputTimeSeriesCalculator::new(MBThroughputCalculator);
calculator.calculate(records, bucket_size)
}
- pub fn throughput_msg(
- &self,
- records: &[BenchmarkRecord],
- bucket_size: IggyDuration,
- ) -> TimeSeries {
+ pub fn throughput_msg(records: &[BenchmarkRecord], bucket_size:
IggyDuration) -> TimeSeries {
let calculator =
ThroughputTimeSeriesCalculator::new(MessageThroughputCalculator);
calculator.calculate(records, bucket_size)
}
- pub fn latency(&self, records: &[BenchmarkRecord], bucket_size:
IggyDuration) -> TimeSeries {
+ pub fn latency(records: &[BenchmarkRecord], bucket_size: IggyDuration) ->
TimeSeries {
let calculator = LatencyTimeSeriesCalculator;
calculator.calculate(records, bucket_size)
}
- pub fn aggregate_sum(&self, series: &[TimeSeries]) -> TimeSeries {
+ pub fn aggregate_sum(series: &[TimeSeries]) -> TimeSeries {
if series.is_empty() {
warn!("Attempting to aggregate empty series");
return TimeSeries {
@@ -92,7 +82,7 @@ impl TimeSeriesCalculator {
TimeSeries { points, kind }
}
- pub fn aggregate_avg(&self, series: &[TimeSeries]) -> TimeSeries {
+ pub fn aggregate_avg(series: &[TimeSeries]) -> TimeSeries {
if series.is_empty() {
warn!("Attempting to aggregate empty series");
return TimeSeries {
diff --git a/core/bench/src/analytics/time_series/calculators/latency.rs
b/core/bench/src/analytics/time_series/calculators/latency.rs
index 2bca3d84..93cb9fdc 100644
--- a/core/bench/src/analytics/time_series/calculators/latency.rs
+++ b/core/bench/src/analytics/time_series/calculators/latency.rs
@@ -16,6 +16,9 @@
* under the License.
*/
+#![allow(clippy::cast_precision_loss)]
+#![allow(clippy::cast_possible_truncation)]
+
use super::TimeSeriesCalculation;
use crate::analytics::record::BenchmarkRecord;
use bench_report::time_series::{TimePoint, TimeSeries, TimeSeriesKind};
diff --git a/core/bench/src/analytics/time_series/calculators/throughput.rs
b/core/bench/src/analytics/time_series/calculators/throughput.rs
index b6708fa2..f2b76c1a 100644
--- a/core/bench/src/analytics/time_series/calculators/throughput.rs
+++ b/core/bench/src/analytics/time_series/calculators/throughput.rs
@@ -16,6 +16,10 @@
* under the License.
*/
+#![allow(clippy::cast_precision_loss)]
+#![allow(clippy::cast_possible_truncation)]
+#![allow(clippy::cast_sign_loss)]
+
use super::TimeSeriesCalculation;
use crate::analytics::record::BenchmarkRecord;
use bench_report::time_series::{TimePoint, TimeSeries, TimeSeriesKind};
@@ -68,7 +72,7 @@ pub struct ThroughputTimeSeriesCalculator<T:
ThroughputCalculation> {
}
impl<T: ThroughputCalculation> ThroughputTimeSeriesCalculator<T> {
- pub fn new(calculator: T) -> Self {
+ pub const fn new(calculator: T) -> Self {
Self { calculator }
}
}
diff --git a/core/bench/src/analytics/time_series/processors/moving_average.rs
b/core/bench/src/analytics/time_series/processors/moving_average.rs
index 3a0034d3..292fbba1 100644
--- a/core/bench/src/analytics/time_series/processors/moving_average.rs
+++ b/core/bench/src/analytics/time_series/processors/moving_average.rs
@@ -16,6 +16,8 @@
* under the License.
*/
+#![allow(clippy::cast_precision_loss)]
+
use super::TimeSeriesProcessor;
use bench_report::time_series::{TimePoint, TimeSeries};
use std::collections::VecDeque;
@@ -27,7 +29,7 @@ pub struct MovingAverageProcessor {
}
impl MovingAverageProcessor {
- pub fn new(window_size: usize) -> Self {
+ pub const fn new(window_size: usize) -> Self {
Self { window_size }
}
}
diff --git a/core/bench/src/args/common.rs b/core/bench/src/args/common.rs
index dec4ac3b..1ca93f0c 100644
--- a/core/bench/src/args/common.rs
+++ b/core/bench/src/args/common.rs
@@ -19,7 +19,15 @@
use super::kind::BenchmarkKindCommand;
use super::output::BenchmarkOutputCommand;
use super::props::{BenchmarkKindProps, BenchmarkTransportProps};
-use super::{defaults::*, transport::BenchmarkTransportCommand};
+use super::{
+ defaults::{
+ DEFAULT_MESSAGE_BATCHES, DEFAULT_MESSAGE_SIZE,
DEFAULT_MESSAGES_PER_BATCH,
+ DEFAULT_MOVING_AVERAGE_WINDOW, DEFAULT_PERFORM_CLEANUP,
DEFAULT_SAMPLING_TIME,
+ DEFAULT_SERVER_STDOUT_VISIBILITY, DEFAULT_SKIP_SERVER_START,
DEFAULT_START_STREAM_ID,
+ DEFAULT_WARMUP_TIME,
+ },
+ transport::BenchmarkTransportCommand,
+};
use bench_report::benchmark_kind::BenchmarkKind;
use bench_report::numeric_parameter::BenchmarkNumericParameter;
use clap::error::ErrorKind;
@@ -81,7 +89,7 @@ pub struct IggyBenchArgs {
#[arg(long, short = 'W', default_value_t = DEFAULT_MOVING_AVERAGE_WINDOW)]
pub moving_average_window: u32,
- /// Shutdown iggy-server and remove server local_data directory after the
benchmark is finished.
+ /// Shutdown iggy-server and remove server `local_data` directory after
the benchmark is finished.
/// Only applicable to local benchmarks.
#[arg(long, default_value_t = DEFAULT_PERFORM_CLEANUP,
verbatim_doc_comment)]
pub cleanup: bool,
@@ -125,14 +133,14 @@ impl IggyBenchArgs {
.server_address()
}
- pub fn start_stream_id(&self) -> u32 {
+ pub const fn start_stream_id(&self) -> u32 {
self.start_stream_id.get()
}
pub fn validate(&mut self) {
let server_address =
self.server_address().parse::<SocketAddr>().unwrap();
if (self.cleanup || self.verbose) &&
!server_address.ip().is_loopback() {
- IggyBenchArgs::command()
+ Self::command()
.error(
ErrorKind::ArgumentConflict,
format!(
@@ -150,7 +158,7 @@ impl IggyBenchArgs {
|| self.extra_info().is_some()
|| self.gitref_date().is_some())
{
- IggyBenchArgs::command()
+ Self::command()
.error(
ErrorKind::ArgumentConflict,
"--git-ref, --git-ref-date, --identifier, --remark,
--extra-info can only be used with --output-dir",
@@ -158,14 +166,14 @@ impl IggyBenchArgs {
.exit();
}
- if let (None, None) = (self.message_batches, self.total_data) {
+ if (self.message_batches, self.total_data) == (None, None) {
self.message_batches = Some(DEFAULT_MESSAGE_BATCHES);
}
if let Some(total_data) = self.total_data {
- let samples = total_data.as_bytes_u64() /
self.message_size().min() as u64;
+ let samples = total_data.as_bytes_u64() /
u64::from(self.message_size().min());
if samples <= 1 {
- IggyBenchArgs::command()
+ Self::command()
.error(
ErrorKind::ArgumentConflict,
"--total-messages-size must be at least 2x greater
than --message-size",
@@ -174,32 +182,31 @@ impl IggyBenchArgs {
}
}
- self.benchmark_kind.inner().validate()
+ self.benchmark_kind.inner().validate();
}
- pub fn messages_per_batch(&self) -> BenchmarkNumericParameter {
+ pub const fn messages_per_batch(&self) -> BenchmarkNumericParameter {
self.messages_per_batch
}
- pub fn message_batches(&self) -> Option<NonZeroU32> {
+ pub const fn message_batches(&self) -> Option<NonZeroU32> {
self.message_batches
}
- pub fn message_size(&self) -> BenchmarkNumericParameter {
+ pub const fn message_size(&self) -> BenchmarkNumericParameter {
self.message_size
}
- pub fn total_data(&self) -> Option<IggyByteSize> {
+ pub const fn total_data(&self) -> Option<IggyByteSize> {
self.total_data
}
// Used only for generation of unique directory name
pub fn data_volume_identifier(&self) -> String {
- if let Some(total_messages_size) = self.total_data() {
- format!("{}B", total_messages_size.as_bytes_u64())
- } else {
- self.message_batches().unwrap().to_string()
- }
+ self.total_data().map_or_else(
+ || self.message_batches().unwrap().to_string(),
+ |total_messages_size| format!("{}B",
total_messages_size.as_bytes_u64()),
+ )
}
pub fn streams(&self) -> u32 {
@@ -226,19 +233,19 @@ impl IggyBenchArgs {
self.benchmark_kind.inner().number_of_consumer_groups()
}
- pub fn warmup_time(&self) -> IggyDuration {
+ pub const fn warmup_time(&self) -> IggyDuration {
self.warmup_time
}
- pub fn sampling_time(&self) -> IggyDuration {
+ pub const fn sampling_time(&self) -> IggyDuration {
self.sampling_time
}
- pub fn moving_average_window(&self) -> u32 {
+ pub const fn moving_average_window(&self) -> u32 {
self.moving_average_window
}
- pub fn rate_limit(&self) -> Option<IggyByteSize> {
+ pub const fn rate_limit(&self) -> Option<IggyByteSize> {
self.rate_limit
}
@@ -348,24 +355,22 @@ impl IggyBenchArgs {
};
let actors = match &self.benchmark_kind {
- BenchmarkKindCommand::PinnedProducer(_) => self.producers(),
- BenchmarkKindCommand::PinnedConsumer(_) => self.consumers(),
- BenchmarkKindCommand::PinnedProducerAndConsumer(_) => {
- self.producers() + self.consumers()
- }
- BenchmarkKindCommand::BalancedProducer(_) => self.producers(),
- BenchmarkKindCommand::BalancedConsumerGroup(_) => self.consumers(),
- BenchmarkKindCommand::BalancedProducerAndConsumerGroup(_) => {
+ BenchmarkKindCommand::PinnedProducer(_)
+ | BenchmarkKindCommand::BalancedProducer(_)
+ | BenchmarkKindCommand::EndToEndProducingConsumer(_)
+ | BenchmarkKindCommand::EndToEndProducingConsumerGroup(_) =>
self.producers(),
+ BenchmarkKindCommand::PinnedConsumer(_)
+ | BenchmarkKindCommand::BalancedConsumerGroup(_) =>
self.consumers(),
+ BenchmarkKindCommand::PinnedProducerAndConsumer(_)
+ | BenchmarkKindCommand::BalancedProducerAndConsumerGroup(_) => {
self.producers() + self.consumers()
}
- BenchmarkKindCommand::EndToEndProducingConsumer(_) =>
self.producers(),
- BenchmarkKindCommand::EndToEndProducingConsumerGroup(_) =>
self.producers(),
BenchmarkKindCommand::Examples => unreachable!(),
};
let data_volume_arg = match (self.total_data, self.message_batches) {
- (Some(total), None) => format!("{}", total),
- (None, Some(batches)) => format!("{}", batches),
+ (Some(total), None) => format!("{total}"),
+ (None, Some(batches)) => format!("{batches}"),
_ => unreachable!(),
};
@@ -430,7 +435,7 @@ impl IggyBenchArgs {
);
if let Some(remark) = &self.remark() {
- name.push_str(&format!(" ({})", remark));
+ name = format!("{name} ({remark})");
}
name
diff --git a/core/bench/src/args/defaults.rs b/core/bench/src/args/defaults.rs
index b49c31c9..2e3f2b1e 100644
--- a/core/bench/src/args/defaults.rs
+++ b/core/bench/src/args/defaults.rs
@@ -33,7 +33,7 @@ pub const DEFAULT_MESSAGES_PER_BATCH: NonZeroU32 = u32!(1000);
pub const DEFAULT_MESSAGE_BATCHES: NonZeroU32 = u32!(1000);
pub const DEFAULT_MESSAGE_SIZE: NonZeroU32 = u32!(1000);
pub const DEFAULT_TOTAL_MESSAGES_SIZE: IggyByteSize =
IggyByteSize::new(8_000_000);
-pub const DEFAULT_START_STREAM_ID: NonZeroU32 = u32!(3000000);
+pub const DEFAULT_START_STREAM_ID: NonZeroU32 = u32!(3_000_000);
pub const DEFAULT_PINNED_NUMBER_OF_STREAMS: NonZeroU32 = u32!(8);
pub const DEFAULT_BALANCED_NUMBER_OF_STREAMS: NonZeroU32 = u32!(1);
diff --git a/core/bench/src/args/examples.rs b/core/bench/src/args/examples.rs
index b187a12e..84a7839d 100644
--- a/core/bench/src/args/examples.rs
+++ b/core/bench/src/args/examples.rs
@@ -169,5 +169,5 @@ const EXAMPLES: &str = r#"EXAMPLES:
"#;
pub fn print_examples() {
- println!("{}", EXAMPLES);
+ println!("{EXAMPLES}");
}
diff --git a/core/bench/src/args/kind.rs b/core/bench/src/args/kind.rs
index deb78815..9ffa1acf 100644
--- a/core/bench/src/args/kind.rs
+++ b/core/bench/src/args/kind.rs
@@ -100,23 +100,19 @@ pub enum BenchmarkKindCommand {
impl BenchmarkKindCommand {
pub fn as_simple_kind(&self) -> BenchmarkKind {
match self {
- BenchmarkKindCommand::PinnedProducer(_) =>
BenchmarkKind::PinnedProducer,
- BenchmarkKindCommand::PinnedConsumer(_) =>
BenchmarkKind::PinnedConsumer,
- BenchmarkKindCommand::PinnedProducerAndConsumer(_) => {
- BenchmarkKind::PinnedProducerAndConsumer
- }
- BenchmarkKindCommand::BalancedProducer(_) =>
BenchmarkKind::BalancedProducer,
- BenchmarkKindCommand::BalancedConsumerGroup(_) =>
BenchmarkKind::BalancedConsumerGroup,
- BenchmarkKindCommand::BalancedProducerAndConsumerGroup(_) => {
+ Self::PinnedProducer(_) => BenchmarkKind::PinnedProducer,
+ Self::PinnedConsumer(_) => BenchmarkKind::PinnedConsumer,
+ Self::PinnedProducerAndConsumer(_) =>
BenchmarkKind::PinnedProducerAndConsumer,
+ Self::BalancedProducer(_) => BenchmarkKind::BalancedProducer,
+ Self::BalancedConsumerGroup(_) =>
BenchmarkKind::BalancedConsumerGroup,
+ Self::BalancedProducerAndConsumerGroup(_) => {
BenchmarkKind::BalancedProducerAndConsumerGroup
}
- BenchmarkKindCommand::EndToEndProducingConsumer(_) => {
- BenchmarkKind::EndToEndProducingConsumer
- }
- BenchmarkKindCommand::EndToEndProducingConsumerGroup(_) => {
+ Self::EndToEndProducingConsumer(_) =>
BenchmarkKind::EndToEndProducingConsumer,
+ Self::EndToEndProducingConsumerGroup(_) => {
BenchmarkKind::EndToEndProducingConsumerGroup
}
- BenchmarkKindCommand::Examples => {
+ Self::Examples => {
print_examples();
std::process::exit(0);
}
@@ -155,15 +151,15 @@ impl BenchmarkKindProps for BenchmarkKindCommand {
fn inner(&self) -> &dyn BenchmarkKindProps {
match self {
- BenchmarkKindCommand::PinnedProducer(args) => args,
- BenchmarkKindCommand::PinnedConsumer(args) => args,
- BenchmarkKindCommand::PinnedProducerAndConsumer(args) => args,
- BenchmarkKindCommand::BalancedProducer(args) => args,
- BenchmarkKindCommand::BalancedConsumerGroup(args) => args,
- BenchmarkKindCommand::BalancedProducerAndConsumerGroup(args) =>
args,
- BenchmarkKindCommand::EndToEndProducingConsumer(args) => args,
- BenchmarkKindCommand::EndToEndProducingConsumerGroup(args) => args,
- BenchmarkKindCommand::Examples => {
+ Self::PinnedProducer(args) => args,
+ Self::PinnedConsumer(args) => args,
+ Self::PinnedProducerAndConsumer(args) => args,
+ Self::BalancedProducer(args) => args,
+ Self::BalancedConsumerGroup(args) => args,
+ Self::BalancedProducerAndConsumerGroup(args) => args,
+ Self::EndToEndProducingConsumer(args) => args,
+ Self::EndToEndProducingConsumerGroup(args) => args,
+ Self::Examples => {
print_examples();
std::process::exit(0);
}
@@ -171,6 +167,6 @@ impl BenchmarkKindProps for BenchmarkKindCommand {
}
fn validate(&self) {
- self.inner().validate()
+ self.inner().validate();
}
}
diff --git a/core/bench/src/args/kinds/balanced/consumer_group.rs
b/core/bench/src/args/kinds/balanced/consumer_group.rs
index d9068c51..64602bdf 100644
--- a/core/bench/src/args/kinds/balanced/consumer_group.rs
+++ b/core/bench/src/args/kinds/balanced/consumer_group.rs
@@ -17,7 +17,12 @@
*/
use crate::args::{
- common::IggyBenchArgs, defaults::*, props::BenchmarkKindProps,
+ common::IggyBenchArgs,
+ defaults::{
+ DEFAULT_BALANCED_NUMBER_OF_STREAMS, DEFAULT_NUMBER_OF_CONSUMER_GROUPS,
+ DEFAULT_NUMBER_OF_CONSUMERS,
+ },
+ props::BenchmarkKindProps,
transport::BenchmarkTransportCommand,
};
use clap::{CommandFactory, Parser, error::ErrorKind};
@@ -77,8 +82,7 @@ impl BenchmarkKindProps for BalancedConsumerGroupArgs {
cmd.error(
ErrorKind::ArgumentConflict,
format!(
- "In balanced consumer group, consumer groups number ({})
must be less than the number of streams ({})",
- cg_number, streams
+ "In balanced consumer group, consumer groups number
({cg_number}) must be less than the number of streams ({streams})"
),
)
.exit();
diff --git a/core/bench/src/args/kinds/balanced/producer.rs
b/core/bench/src/args/kinds/balanced/producer.rs
index 10ffdd2f..cc58e9eb 100644
--- a/core/bench/src/args/kinds/balanced/producer.rs
+++ b/core/bench/src/args/kinds/balanced/producer.rs
@@ -17,7 +17,12 @@
*/
use crate::args::{
- common::IggyBenchArgs, defaults::*, props::BenchmarkKindProps,
+ common::IggyBenchArgs,
+ defaults::{
+ DEFAULT_BALANCED_NUMBER_OF_PARTITIONS,
DEFAULT_BALANCED_NUMBER_OF_STREAMS,
+ DEFAULT_NUMBER_OF_PRODUCERS,
+ },
+ props::BenchmarkKindProps,
transport::BenchmarkTransportCommand,
};
use clap::{CommandFactory, Parser, error::ErrorKind};
diff --git a/core/bench/src/args/kinds/balanced/producer_and_consumer_group.rs
b/core/bench/src/args/kinds/balanced/producer_and_consumer_group.rs
index 6a288f3b..ab7ced0f 100644
--- a/core/bench/src/args/kinds/balanced/producer_and_consumer_group.rs
+++ b/core/bench/src/args/kinds/balanced/producer_and_consumer_group.rs
@@ -17,7 +17,13 @@
*/
use crate::args::{
- common::IggyBenchArgs, defaults::*, props::BenchmarkKindProps,
+ common::IggyBenchArgs,
+ defaults::{
+ DEFAULT_BALANCED_NUMBER_OF_PARTITIONS,
DEFAULT_BALANCED_NUMBER_OF_STREAMS,
+ DEFAULT_NUMBER_OF_CONSUMER_GROUPS, DEFAULT_NUMBER_OF_CONSUMERS,
+ DEFAULT_NUMBER_OF_PRODUCERS,
+ },
+ props::BenchmarkKindProps,
transport::BenchmarkTransportCommand,
};
use clap::{CommandFactory, Parser, error::ErrorKind};
diff --git a/core/bench/src/args/kinds/end_to_end/producing_consumer.rs
b/core/bench/src/args/kinds/end_to_end/producing_consumer.rs
index b33ee05e..dc0826df 100644
--- a/core/bench/src/args/kinds/end_to_end/producing_consumer.rs
+++ b/core/bench/src/args/kinds/end_to_end/producing_consumer.rs
@@ -17,7 +17,9 @@
*/
use crate::args::{
- common::IggyBenchArgs, defaults::*, props::BenchmarkKindProps,
+ common::IggyBenchArgs,
+ defaults::{DEFAULT_NUMBER_OF_PRODUCERS, DEFAULT_PINNED_NUMBER_OF_STREAMS},
+ props::BenchmarkKindProps,
transport::BenchmarkTransportCommand,
};
use clap::{CommandFactory, Parser, error::ErrorKind};
diff --git a/core/bench/src/args/kinds/end_to_end/producing_consumer_group.rs
b/core/bench/src/args/kinds/end_to_end/producing_consumer_group.rs
index 218b4e3d..af7829b5 100644
--- a/core/bench/src/args/kinds/end_to_end/producing_consumer_group.rs
+++ b/core/bench/src/args/kinds/end_to_end/producing_consumer_group.rs
@@ -17,7 +17,13 @@
*/
use crate::args::{
- common::IggyBenchArgs, defaults::*, props::BenchmarkKindProps,
+ common::IggyBenchArgs,
+ defaults::{
+ DEFAULT_BALANCED_NUMBER_OF_PARTITIONS,
DEFAULT_BALANCED_NUMBER_OF_STREAMS,
+ DEFAULT_NUMBER_OF_CONSUMER_GROUPS, DEFAULT_NUMBER_OF_CONSUMERS,
+ DEFAULT_NUMBER_OF_PRODUCERS,
+ },
+ props::BenchmarkKindProps,
transport::BenchmarkTransportCommand,
};
use clap::{CommandFactory, Parser, error::ErrorKind};
@@ -100,8 +106,7 @@ impl BenchmarkKindProps for
EndToEndProducingConsumerGroupArgs {
cmd.error(
ErrorKind::ArgumentConflict,
format!(
- "For producing consumer group benchmark, consumer groups
number ({}) must be less than the number of streams ({})",
- cg_number, streams
+ "For producing consumer group benchmark, consumer groups
number ({cg_number}) must be less than the number of streams ({streams})"
),
)
.exit();
diff --git a/core/bench/src/args/kinds/pinned/consumer.rs
b/core/bench/src/args/kinds/pinned/consumer.rs
index 5d29d395..c6a1b347 100644
--- a/core/bench/src/args/kinds/pinned/consumer.rs
+++ b/core/bench/src/args/kinds/pinned/consumer.rs
@@ -17,7 +17,7 @@
*/
use crate::args::{
- common::IggyBenchArgs, defaults::*, props::BenchmarkKindProps,
+ common::IggyBenchArgs, defaults::DEFAULT_NUMBER_OF_PRODUCERS,
props::BenchmarkKindProps,
transport::BenchmarkTransportCommand,
};
use clap::{CommandFactory, Parser, error::ErrorKind};
diff --git a/core/bench/src/args/kinds/pinned/producer.rs
b/core/bench/src/args/kinds/pinned/producer.rs
index 5ae60eea..138988fb 100644
--- a/core/bench/src/args/kinds/pinned/producer.rs
+++ b/core/bench/src/args/kinds/pinned/producer.rs
@@ -17,7 +17,7 @@
*/
use crate::args::{
- common::IggyBenchArgs, defaults::*, props::BenchmarkKindProps,
+ common::IggyBenchArgs, defaults::DEFAULT_NUMBER_OF_PRODUCERS,
props::BenchmarkKindProps,
transport::BenchmarkTransportCommand,
};
use clap::{CommandFactory, Parser, error::ErrorKind};
diff --git a/core/bench/src/args/kinds/pinned/producer_and_consumer.rs
b/core/bench/src/args/kinds/pinned/producer_and_consumer.rs
index bb21073e..6bf6314d 100644
--- a/core/bench/src/args/kinds/pinned/producer_and_consumer.rs
+++ b/core/bench/src/args/kinds/pinned/producer_and_consumer.rs
@@ -17,7 +17,12 @@
*/
use crate::args::{
- common::IggyBenchArgs, defaults::*, props::BenchmarkKindProps,
+ common::IggyBenchArgs,
+ defaults::{
+ DEFAULT_NUMBER_OF_CONSUMERS, DEFAULT_NUMBER_OF_PRODUCERS,
+ DEFAULT_PINNED_NUMBER_OF_PARTITIONS,
+ },
+ props::BenchmarkKindProps,
transport::BenchmarkTransportCommand,
};
use clap::{CommandFactory, Parser, error::ErrorKind};
diff --git a/core/bench/src/args/props.rs b/core/bench/src/args/props.rs
index 67368ed7..c3386320 100644
--- a/core/bench/src/args/props.rs
+++ b/core/bench/src/args/props.rs
@@ -42,7 +42,7 @@ pub trait BenchmarkTransportProps {
fn server_address(&self) -> &str;
fn client_address(&self) -> &str;
fn validate_certificate(&self) -> bool;
- fn output_command(&self) -> &Option<BenchmarkOutputCommand>;
+ fn output_command(&self) -> Option<&BenchmarkOutputCommand>;
fn nodelay(&self) -> bool;
fn inner(&self) -> &dyn BenchmarkTransportProps
where
diff --git a/core/bench/src/args/transport.rs b/core/bench/src/args/transport.rs
index f9702579..18ca668d 100644
--- a/core/bench/src/args/transport.rs
+++ b/core/bench/src/args/transport.rs
@@ -16,7 +16,10 @@
* under the License.
*/
-use super::defaults::*;
+use super::defaults::{
+ DEFAULT_HTTP_SERVER_ADDRESS, DEFAULT_QUIC_CLIENT_ADDRESS,
DEFAULT_QUIC_SERVER_ADDRESS,
+ DEFAULT_QUIC_SERVER_NAME, DEFAULT_QUIC_VALIDATE_CERTIFICATE,
DEFAULT_TCP_SERVER_ADDRESS,
+};
use super::{output::BenchmarkOutputCommand, props::BenchmarkTransportProps};
use clap::{Parser, Subcommand};
use integration::test_server::Transport;
@@ -35,9 +38,9 @@ impl Serialize for BenchmarkTransportCommand {
S: Serializer,
{
let variant_str = match self {
- BenchmarkTransportCommand::Http(_) => "http",
- BenchmarkTransportCommand::Tcp(_) => "tcp",
- BenchmarkTransportCommand::Quic(_) => "quic",
+ Self::Http(_) => "http",
+ Self::Tcp(_) => "tcp",
+ Self::Quic(_) => "quic",
};
serializer.serialize_str(variant_str)
}
@@ -66,13 +69,13 @@ impl BenchmarkTransportProps for BenchmarkTransportCommand {
fn inner(&self) -> &dyn BenchmarkTransportProps {
match self {
- BenchmarkTransportCommand::Http(args) => args,
- BenchmarkTransportCommand::Tcp(args) => args,
- BenchmarkTransportCommand::Quic(args) => args,
+ Self::Http(args) => args,
+ Self::Tcp(args) => args,
+ Self::Quic(args) => args,
}
}
- fn output_command(&self) -> &Option<BenchmarkOutputCommand> {
+ fn output_command(&self) -> Option<&BenchmarkOutputCommand> {
self.inner().output_command()
}
}
@@ -109,8 +112,8 @@ impl BenchmarkTransportProps for HttpArgs {
panic!("Setting nodelay for HTTP transport is not supported!")
}
- fn output_command(&self) -> &Option<BenchmarkOutputCommand> {
- &self.output
+ fn output_command(&self) -> Option<&BenchmarkOutputCommand> {
+ self.output.as_ref()
}
}
@@ -150,8 +153,8 @@ impl BenchmarkTransportProps for TcpArgs {
self.nodelay
}
- fn output_command(&self) -> &Option<BenchmarkOutputCommand> {
- &self.output
+ fn output_command(&self) -> Option<&BenchmarkOutputCommand> {
+ self.output.as_ref()
}
}
@@ -199,7 +202,7 @@ impl BenchmarkTransportProps for QuicArgs {
panic!("Setting nodelay for QUIC transport is not supported!")
}
- fn output_command(&self) -> &Option<BenchmarkOutputCommand> {
- &self.output
+ fn output_command(&self) -> Option<&BenchmarkOutputCommand> {
+ self.output.as_ref()
}
}
diff --git a/core/bench/src/benchmarks/balanced_consumer_group.rs
b/core/bench/src/benchmarks/balanced_consumer_group.rs
index bb044806..ac60fc6f 100644
--- a/core/bench/src/benchmarks/balanced_consumer_group.rs
+++ b/core/bench/src/benchmarks/balanced_consumer_group.rs
@@ -17,7 +17,10 @@
*/
use super::benchmark::Benchmarkable;
-use crate::{args::common::IggyBenchArgs, benchmarks::common::*};
+use crate::{
+ args::common::IggyBenchArgs,
+ benchmarks::common::{build_consumer_futures, init_consumer_groups},
+};
use async_trait::async_trait;
use bench_report::{benchmark_kind::BenchmarkKind,
individual_metrics::BenchmarkIndividualMetrics};
use iggy::prelude::*;
@@ -32,11 +35,11 @@ pub struct BalancedConsumerGroupBenchmark {
}
impl BalancedConsumerGroupBenchmark {
- pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn
ClientFactory>) -> Box<Self> {
- Box::new(Self {
+ pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn
ClientFactory>) -> Self {
+ Self {
args,
client_factory,
- })
+ }
}
}
@@ -52,7 +55,7 @@ impl Benchmarkable for BalancedConsumerGroupBenchmark {
init_consumer_groups(cf, &args).await?;
- let consumer_futures = build_consumer_futures(cf, &args)?;
+ let consumer_futures = build_consumer_futures(cf, &args);
for fut in consumer_futures {
tasks.spawn(fut);
}
diff --git a/core/bench/src/benchmarks/balanced_producer.rs
b/core/bench/src/benchmarks/balanced_producer.rs
index 1f8261e6..6e81ab5e 100644
--- a/core/bench/src/benchmarks/balanced_producer.rs
+++ b/core/bench/src/benchmarks/balanced_producer.rs
@@ -18,7 +18,7 @@
use super::benchmark::Benchmarkable;
use crate::args::common::IggyBenchArgs;
-use crate::benchmarks::common::*;
+use crate::benchmarks::common::build_producer_futures;
use async_trait::async_trait;
use bench_report::benchmark_kind::BenchmarkKind;
use bench_report::individual_metrics::BenchmarkIndividualMetrics;
@@ -34,11 +34,11 @@ pub struct BalancedProducerBenchmark {
}
impl BalancedProducerBenchmark {
- pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn
ClientFactory>) -> Box<Self> {
- Box::new(Self {
+ pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn
ClientFactory>) -> Self {
+ Self {
args,
client_factory,
- })
+ }
}
}
@@ -50,7 +50,7 @@ impl Benchmarkable for BalancedProducerBenchmark {
self.init_streams().await?;
let cf = &self.client_factory;
let args = self.args.clone();
- let producer_futures = build_producer_futures(cf, &args)?;
+ let producer_futures = build_producer_futures(cf, &args);
let mut tasks = JoinSet::new();
for fut in producer_futures {
@@ -76,10 +76,10 @@ impl Benchmarkable for BalancedProducerBenchmark {
let streams = format!("streams: {}", self.args.streams());
let partitions = format!("partitions: {}",
self.args.number_of_partitions());
let producers = format!("producers: {}", self.args.producers());
- let max_topic_size = match self.args.max_topic_size() {
- Some(size) => format!(" max topic size: {}", size),
- None => format!(" max topic size: {}",
MaxTopicSize::ServerDefault),
- };
+ let max_topic_size = self.args.max_topic_size().map_or_else(
+ || format!(" max topic size: {}", MaxTopicSize::ServerDefault),
+ |size| format!(" max topic size: {size}"),
+ );
let common_params = self.common_params_str();
info!(
diff --git a/core/bench/src/benchmarks/balanced_producer_and_consumer_group.rs
b/core/bench/src/benchmarks/balanced_producer_and_consumer_group.rs
index 8c86afc3..0eab5db8 100644
--- a/core/bench/src/benchmarks/balanced_producer_and_consumer_group.rs
+++ b/core/bench/src/benchmarks/balanced_producer_and_consumer_group.rs
@@ -17,7 +17,10 @@
*/
use super::benchmark::Benchmarkable;
-use crate::{args::common::IggyBenchArgs, benchmarks::common::*};
+use crate::{
+ args::common::IggyBenchArgs,
+ benchmarks::common::{build_consumer_futures, build_producer_futures,
init_consumer_groups},
+};
use async_trait::async_trait;
use bench_report::{benchmark_kind::BenchmarkKind,
individual_metrics::BenchmarkIndividualMetrics};
use iggy::prelude::*;
@@ -32,11 +35,11 @@ pub struct BalancedProducerAndConsumerGroupBenchmark {
}
impl BalancedProducerAndConsumerGroupBenchmark {
- pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn
ClientFactory>) -> Box<Self> {
- Box::new(Self {
+ pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn
ClientFactory>) -> Self {
+ Self {
args,
client_factory,
- })
+ }
}
}
@@ -52,8 +55,8 @@ impl Benchmarkable for
BalancedProducerAndConsumerGroupBenchmark {
init_consumer_groups(cf, &args).await?;
- let producer_futures = build_producer_futures(cf, &args)?;
- let consumer_futures = build_consumer_futures(cf, &args)?;
+ let producer_futures = build_producer_futures(cf, &args);
+ let consumer_futures = build_consumer_futures(cf, &args);
for fut in producer_futures {
tasks.spawn(fut);
@@ -84,10 +87,10 @@ impl Benchmarkable for
BalancedProducerAndConsumerGroupBenchmark {
let producers = format!("producers: {}", self.args.producers());
let consumers = format!("consumers: {}", self.args.consumers());
let cg_count = format!("consumer groups: {}",
self.args.number_of_consumer_groups());
- let max_topic_size = match self.args.max_topic_size() {
- Some(size) => format!(" max topic size: {}", size),
- None => format!(" max topic size: {}",
MaxTopicSize::ServerDefault),
- };
+ let max_topic_size = self.args.max_topic_size().map_or_else(
+ || format!(" max topic size: {}", MaxTopicSize::ServerDefault),
+ |size| format!(" max topic size: {size}"),
+ );
let common_params = self.common_params_str();
info!(
diff --git a/core/bench/src/benchmarks/benchmark.rs
b/core/bench/src/benchmarks/benchmark.rs
index e14fd132..9b96a1b8 100644
--- a/core/bench/src/benchmarks/benchmark.rs
+++ b/core/bench/src/benchmarks/benchmark.rs
@@ -43,43 +43,46 @@ impl From<IggyBenchArgs> for Box<dyn Benchmarkable> {
match args.benchmark_kind {
BenchmarkKindCommand::PinnedProducer(_) => {
- PinnedProducerBenchmark::new(Arc::new(args), client_factory)
+ Box::new(PinnedProducerBenchmark::new(Arc::new(args),
client_factory))
}
BenchmarkKindCommand::PinnedConsumer(_) => {
- PinnedConsumerBenchmark::new(Arc::new(args), client_factory)
+ Box::new(PinnedConsumerBenchmark::new(Arc::new(args),
client_factory))
}
- BenchmarkKindCommand::PinnedProducerAndConsumer(_) => {
- PinnedProducerAndConsumerBenchmark::new(Arc::new(args),
client_factory)
- }
+ BenchmarkKindCommand::PinnedProducerAndConsumer(_) => Box::new(
+ PinnedProducerAndConsumerBenchmark::new(Arc::new(args),
client_factory),
+ ),
- BenchmarkKindCommand::BalancedProducer(_) => {
- BalancedProducerBenchmark::new(Arc::new(args), client_factory)
- }
+ BenchmarkKindCommand::BalancedProducer(_) =>
Box::new(BalancedProducerBenchmark::new(
+ Arc::new(args),
+ client_factory,
+ )),
- BenchmarkKindCommand::BalancedConsumerGroup(_) => {
- BalancedConsumerGroupBenchmark::new(Arc::new(args),
client_factory)
- }
+ BenchmarkKindCommand::BalancedConsumerGroup(_) => Box::new(
+ BalancedConsumerGroupBenchmark::new(Arc::new(args),
client_factory),
+ ),
- BenchmarkKindCommand::BalancedProducerAndConsumerGroup(_) => {
- BalancedProducerAndConsumerGroupBenchmark::new(Arc::new(args),
client_factory)
- }
+ BenchmarkKindCommand::BalancedProducerAndConsumerGroup(_) =>
Box::new(
+ BalancedProducerAndConsumerGroupBenchmark::new(Arc::new(args),
client_factory),
+ ),
- BenchmarkKindCommand::EndToEndProducingConsumer(_) => {
- EndToEndProducingConsumerBenchmark::new(Arc::new(args),
client_factory)
- }
+ BenchmarkKindCommand::EndToEndProducingConsumer(_) => Box::new(
+ EndToEndProducingConsumerBenchmark::new(Arc::new(args),
client_factory),
+ ),
- BenchmarkKindCommand::EndToEndProducingConsumerGroup(_) => {
- EndToEndProducingConsumerGroupBenchmark::new(Arc::new(args),
client_factory)
+ BenchmarkKindCommand::EndToEndProducingConsumerGroup(_) =>
Box::new(
+ EndToEndProducingConsumerGroupBenchmark::new(Arc::new(args),
client_factory),
+ ),
+ BenchmarkKindCommand::Examples => {
+ unreachable!("Examples should be handled before this point")
}
- _ => todo!(),
}
}
}
#[async_trait]
-pub trait Benchmarkable {
+pub trait Benchmarkable: Send {
async fn run(
&mut self,
) -> Result<JoinSet<Result<BenchmarkIndividualMetrics, IggyError>>,
IggyError>;
@@ -104,13 +107,13 @@ pub trait Benchmarkable {
let stream_id = start_stream_id + i;
if streams.iter().all(|s| s.id != stream_id) {
info!("Creating the test stream {}", stream_id);
- let name = format!("stream {}", stream_id);
+ let name = format!("stream {stream_id}");
client.create_stream(&name, Some(stream_id)).await?;
- let name = format!("topic {}", topic_id);
- let max_topic_size = match self.args().max_topic_size() {
- Some(size) => MaxTopicSize::Custom(size),
- None => MaxTopicSize::Unlimited,
- };
+ let name = format!("topic {topic_id}");
+ let max_topic_size = self
+ .args()
+ .max_topic_size()
+ .map_or(MaxTopicSize::Unlimited, MaxTopicSize::Custom);
info!(
"Creating the test topic {} for stream {} with max topic
size: {:?}",
@@ -145,8 +148,7 @@ pub trait Benchmarkable {
let stream_id = start_stream_id + i;
if streams.iter().all(|s| s.id != stream_id) {
return Err(IggyError::ResourceNotFound(format!(
- "Streams for testing are not properly initialized. Stream
with id: {} is missing.",
- stream_id
+ "Streams for testing are not properly initialized. Stream
with id: {stream_id} is missing."
)));
}
}
@@ -159,23 +161,21 @@ pub trait Benchmarkable {
" messages per batch: {} b,",
self.args().messages_per_batch()
);
- let data = if let Some(data) = self.args().total_data() {
- format!(" total data to send: {},", data)
- } else {
- format!(
- " total batches to send: {},",
- self.args().message_batches().unwrap()
- )
- };
+ let data = self.args().total_data().map_or_else(
+ || {
+ format!(
+ " total batches to send: {},",
+ self.args().message_batches().unwrap()
+ )
+ },
+ |data| format!(" total data to send: {data},"),
+ );
let rate_limit = self
.args()
.rate_limit()
.map(|rl| format!(" global rate limit: {rl}/s"))
.unwrap_or_default();
- format!(
- "{}{}{}{}",
- message_size, messages_per_batch, data, rate_limit,
- )
+ format!("{message_size}{messages_per_batch}{data}{rate_limit}",)
}
}
diff --git a/core/bench/src/benchmarks/common.rs
b/core/bench/src/benchmarks/common.rs
index de6356d2..22c9ad22 100644
--- a/core/bench/src/benchmarks/common.rs
+++ b/core/bench/src/benchmarks/common.rs
@@ -16,7 +16,7 @@
* under the License.
*/
-use super::*;
+use super::{CONSUMER_GROUP_BASE_ID, CONSUMER_GROUP_NAME_PREFIX};
use crate::{
actors::{
consumer::BenchmarkConsumer, producer::BenchmarkProducer,
@@ -33,7 +33,7 @@ use tracing::{error, info};
pub async fn create_consumer(
client: &IggyClient,
- consumer_group_id: &Option<u32>,
+ consumer_group_id: Option<&u32>,
stream_id: &Identifier,
topic_id: &Identifier,
consumer_id: u32,
@@ -56,7 +56,14 @@ pub async fn create_consumer(
}
pub fn rate_limit_per_actor(total_rate: Option<IggyByteSize>, actors: u32) ->
Option<IggyByteSize> {
- total_rate.map(|rl| (rl.as_bytes_u64() / (actors as u64)).into())
+ total_rate.and_then(|rl| {
+ let per_actor = rl.as_bytes_u64() / u64::from(actors);
+ if per_actor > 0 {
+ Some(per_actor.into())
+ } else {
+ None
+ }
+ })
}
pub async fn init_consumer_groups(
@@ -73,7 +80,7 @@ pub async fn init_consumer_groups(
for i in 1..=cg_count {
let consumer_group_id = CONSUMER_GROUP_BASE_ID + i;
let stream_id = start_stream_id + i;
- let consumer_group_name = format!("{}-{}", CONSUMER_GROUP_NAME_PREFIX,
consumer_group_id);
+ let consumer_group_name =
format!("{CONSUMER_GROUP_NAME_PREFIX}-{consumer_group_id}");
info!(
"Creating test consumer group: name={}, id={}, stream={},
topic={}",
consumer_group_name, consumer_group_id, stream_id, topic_id
@@ -105,10 +112,7 @@ pub async fn init_consumer_groups(
pub fn build_producer_futures(
client_factory: &Arc<dyn ClientFactory>,
args: &IggyBenchArgs,
-) -> Result<
- Vec<impl Future<Output = Result<BenchmarkIndividualMetrics, IggyError>> +
Send + use<>>,
- IggyError,
-> {
+) -> Vec<impl Future<Output = Result<BenchmarkIndividualMetrics, IggyError>> +
Send + use<>> {
let streams = args.streams();
let partitions = args.number_of_partitions();
let start_stream_id = args.start_stream_id();
@@ -124,7 +128,7 @@ pub fn build_producer_futures(
BenchmarkFinishCondition::new(args,
BenchmarkFinishConditionMode::Shared);
let rate_limit = rate_limit_per_actor(args.rate_limit(), actors);
- let futures = (1..=producers)
+ (1..=producers)
.map(|producer_id| {
let client_factory = client_factory.clone();
@@ -154,18 +158,13 @@ pub fn build_producer_futures(
producer.run().await
}
})
- .collect();
-
- Ok(futures)
+ .collect()
}
pub fn build_consumer_futures(
client_factory: &Arc<dyn ClientFactory>,
args: &IggyBenchArgs,
-) -> Result<
- Vec<impl Future<Output = Result<BenchmarkIndividualMetrics, IggyError>> +
Send + use<>>,
- IggyError,
-> {
+) -> Vec<impl Future<Output = Result<BenchmarkIndividualMetrics, IggyError>> +
Send + use<>> {
let start_stream_id = args.start_stream_id();
let cg_count = args.number_of_consumer_groups();
let consumers = args.consumers();
@@ -181,9 +180,9 @@ pub fn build_consumer_futures(
PollingKind::Offset
};
let origin_timestamp_latency_calculation = match args.kind() {
- BenchmarkKind::PinnedConsumer => false,
- BenchmarkKind::PinnedProducerAndConsumer => false, // TODO(hubcio): in
future, it can also be true
- BenchmarkKind::BalancedConsumerGroup => false,
+ BenchmarkKind::PinnedConsumer
+ | BenchmarkKind::PinnedProducerAndConsumer
+ | BenchmarkKind::BalancedConsumerGroup => false, // TODO(hubcio): in
future, PinnedProducerAndConsumer can also be true
BenchmarkKind::BalancedProducerAndConsumerGroup => true,
_ => unreachable!(),
};
@@ -192,7 +191,7 @@ pub fn build_consumer_futures(
BenchmarkFinishCondition::new(args,
BenchmarkFinishConditionMode::Shared);
let rate_limit = rate_limit_per_actor(args.rate_limit(), actors);
- let futures = (1..=consumers)
+ (1..=consumers)
.map(|consumer_id| {
let client_factory = client_factory.clone();
let finish_condition = if cg_count > 0 {
@@ -230,19 +229,14 @@ pub fn build_consumer_futures(
consumer.run().await
}
})
- .collect();
-
- Ok(futures)
+ .collect()
}
-#[allow(clippy::too_many_arguments)]
+#[allow(clippy::needless_pass_by_value)]
pub fn build_producing_consumers_futures(
client_factory: Arc<dyn ClientFactory>,
args: Arc<IggyBenchArgs>,
-) -> Result<
- Vec<impl Future<Output = Result<BenchmarkIndividualMetrics, IggyError>> +
Send>,
- IggyError,
-> {
+) -> Vec<impl Future<Output = Result<BenchmarkIndividualMetrics, IggyError>> +
Send> {
let producing_consumers = args.producers();
let streams = args.streams();
let partitions = args.number_of_partitions();
@@ -252,7 +246,7 @@ pub fn build_producing_consumers_futures(
let start_stream_id = args.start_stream_id();
let polling_kind = PollingKind::Offset;
- let futures = (1..=producing_consumers)
+ (1..=producing_consumers)
.map(|actor_id| {
let client_factory_clone = client_factory.clone();
let args_clone = args.clone();
@@ -294,18 +288,14 @@ pub fn build_producing_consumers_futures(
actor.run().await
}
})
- .collect();
-
- Ok(futures)
+ .collect()
}
+#[allow(clippy::needless_pass_by_value)]
pub fn build_producing_consumer_groups_futures(
client_factory: Arc<dyn ClientFactory>,
args: Arc<IggyBenchArgs>,
-) -> Result<
- Vec<impl Future<Output = Result<BenchmarkIndividualMetrics, IggyError>> +
Send>,
- IggyError,
-> {
+) -> Vec<impl Future<Output = Result<BenchmarkIndividualMetrics, IggyError>> +
Send> {
let producers = args.producers();
let consumers = args.consumers();
let total_actors = producers.max(consumers);
@@ -324,7 +314,7 @@ pub fn build_producing_consumer_groups_futures(
let shared_poll_finish_condition =
BenchmarkFinishCondition::new(&args,
BenchmarkFinishConditionMode::SharedHalf);
- let futures = (1..=total_actors)
+ (1..=total_actors)
.map(|actor_id| {
let client_factory_clone = client_factory.clone();
let args_clone = args.clone();
@@ -371,7 +361,7 @@ pub fn build_producing_consumer_groups_futures(
if should_consume {
format!(" in group={}", consumer_group_id.unwrap())
} else {
- "".to_string()
+ String::new()
},
stream_id,
actor_type
@@ -396,7 +386,5 @@ pub fn build_producing_consumer_groups_futures(
actor.run().await
}
})
- .collect();
-
- Ok(futures)
+ .collect()
}
diff --git a/core/bench/src/benchmarks/end_to_end_producing_consumer.rs
b/core/bench/src/benchmarks/end_to_end_producing_consumer.rs
index a7be281f..2818fea6 100644
--- a/core/bench/src/benchmarks/end_to_end_producing_consumer.rs
+++ b/core/bench/src/benchmarks/end_to_end_producing_consumer.rs
@@ -17,7 +17,7 @@
*/
use crate::args::common::IggyBenchArgs;
-use crate::benchmarks::common::*;
+use crate::benchmarks::common::build_producing_consumers_futures;
use async_trait::async_trait;
use bench_report::benchmark_kind::BenchmarkKind;
use bench_report::individual_metrics::BenchmarkIndividualMetrics;
@@ -35,11 +35,11 @@ pub struct EndToEndProducingConsumerBenchmark {
}
impl EndToEndProducingConsumerBenchmark {
- pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn
ClientFactory>) -> Box<Self> {
- Box::new(Self {
+ pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn
ClientFactory>) -> Self {
+ Self {
args,
client_factory,
- })
+ }
}
}
@@ -53,7 +53,7 @@ impl Benchmarkable for EndToEndProducingConsumerBenchmark {
let args = self.args.clone();
let mut tasks = JoinSet::new();
- let futures = build_producing_consumers_futures(cf, args.clone())?;
+ let futures = build_producing_consumers_futures(cf, args);
for fut in futures {
tasks.spawn(fut);
}
@@ -76,10 +76,10 @@ impl Benchmarkable for EndToEndProducingConsumerBenchmark {
fn print_info(&self) {
let streams = format!("streams: {}", self.args.streams());
let producers = format!("producers: {}", self.args.producers());
- let max_topic_size = match self.args.max_topic_size() {
- Some(size) => format!(" max topic size: {}", size),
- None => format!(" max topic size: {}",
MaxTopicSize::ServerDefault),
- };
+ let max_topic_size = self.args.max_topic_size().map_or_else(
+ || format!(" max topic size: {}", MaxTopicSize::ServerDefault),
+ |size| format!(" max topic size: {size}"),
+ );
let common_params = self.common_params_str();
info!(
diff --git a/core/bench/src/benchmarks/end_to_end_producing_consumer_group.rs
b/core/bench/src/benchmarks/end_to_end_producing_consumer_group.rs
index 3545cee9..2adda4d5 100644
--- a/core/bench/src/benchmarks/end_to_end_producing_consumer_group.rs
+++ b/core/bench/src/benchmarks/end_to_end_producing_consumer_group.rs
@@ -17,7 +17,7 @@
*/
use crate::args::common::IggyBenchArgs;
-use crate::benchmarks::common::*;
+use crate::benchmarks::common::{build_producing_consumer_groups_futures,
init_consumer_groups};
use async_trait::async_trait;
use bench_report::benchmark_kind::BenchmarkKind;
use bench_report::individual_metrics::BenchmarkIndividualMetrics;
@@ -35,11 +35,11 @@ pub struct EndToEndProducingConsumerGroupBenchmark {
}
impl EndToEndProducingConsumerGroupBenchmark {
- pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn
ClientFactory>) -> Box<Self> {
- Box::new(Self {
+ pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn
ClientFactory>) -> Self {
+ Self {
args,
client_factory,
- })
+ }
}
}
@@ -55,7 +55,7 @@ impl Benchmarkable for
EndToEndProducingConsumerGroupBenchmark {
init_consumer_groups(&cf, &args).await?;
- let futures = build_producing_consumer_groups_futures(cf, args)?;
+ let futures = build_producing_consumer_groups_futures(cf, args);
for fut in futures {
tasks.spawn(fut);
}
diff --git a/core/bench/src/benchmarks/pinned_consumer.rs
b/core/bench/src/benchmarks/pinned_consumer.rs
index 15da8676..870df77a 100644
--- a/core/bench/src/benchmarks/pinned_consumer.rs
+++ b/core/bench/src/benchmarks/pinned_consumer.rs
@@ -18,7 +18,7 @@
use crate::args::common::IggyBenchArgs;
use crate::benchmarks::benchmark::Benchmarkable;
-use crate::benchmarks::common::*;
+use crate::benchmarks::common::build_consumer_futures;
use async_trait::async_trait;
use bench_report::benchmark_kind::BenchmarkKind;
use bench_report::individual_metrics::BenchmarkIndividualMetrics;
@@ -34,11 +34,11 @@ pub struct PinnedConsumerBenchmark {
}
impl PinnedConsumerBenchmark {
- pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn
ClientFactory>) -> Box<Self> {
- Box::new(Self {
+ pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn
ClientFactory>) -> Self {
+ Self {
args,
client_factory,
- })
+ }
}
}
@@ -52,7 +52,7 @@ impl Benchmarkable for PinnedConsumerBenchmark {
let args = self.args.clone();
let mut tasks: JoinSet<_> = JoinSet::new();
- let futures = build_consumer_futures(cf, &args)?;
+ let futures = build_consumer_futures(cf, &args);
for fut in futures {
tasks.spawn(fut);
}
diff --git a/core/bench/src/benchmarks/pinned_producer.rs
b/core/bench/src/benchmarks/pinned_producer.rs
index 578819c8..3514e24d 100644
--- a/core/bench/src/benchmarks/pinned_producer.rs
+++ b/core/bench/src/benchmarks/pinned_producer.rs
@@ -18,7 +18,7 @@
use crate::args::common::IggyBenchArgs;
use crate::benchmarks::benchmark::Benchmarkable;
-use crate::benchmarks::common::*;
+use crate::benchmarks::common::build_producer_futures;
use async_trait::async_trait;
use bench_report::benchmark_kind::BenchmarkKind;
use bench_report::individual_metrics::BenchmarkIndividualMetrics;
@@ -34,11 +34,11 @@ pub struct PinnedProducerBenchmark {
}
impl PinnedProducerBenchmark {
- pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn
ClientFactory>) -> Box<Self> {
- Box::new(Self {
+ pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn
ClientFactory>) -> Self {
+ Self {
args,
client_factory,
- })
+ }
}
}
@@ -52,7 +52,7 @@ impl Benchmarkable for PinnedProducerBenchmark {
let args = self.args.clone();
let mut tasks = JoinSet::new();
- let producer_futures = build_producer_futures(client_factory, &args)?;
+ let producer_futures = build_producer_futures(client_factory, &args);
for fut in producer_futures {
tasks.spawn(fut);
@@ -76,10 +76,10 @@ impl Benchmarkable for PinnedProducerBenchmark {
fn print_info(&self) {
let streams = format!("streams: {}", self.args.streams());
let producers = format!("producers: {}", self.args.producers());
- let max_topic_size = match self.args.max_topic_size() {
- Some(size) => format!(" max topic size: {}", size),
- None => format!(" max topic size: {}",
MaxTopicSize::ServerDefault),
- };
+ let max_topic_size = self.args.max_topic_size().map_or_else(
+ || format!(" max topic size: {}", MaxTopicSize::ServerDefault),
+ |size| format!(" max topic size: {size}"),
+ );
let common_params = self.common_params_str();
info!(
diff --git a/core/bench/src/benchmarks/pinned_producer_and_consumer.rs
b/core/bench/src/benchmarks/pinned_producer_and_consumer.rs
index 1940dc9b..cde6632c 100644
--- a/core/bench/src/benchmarks/pinned_producer_and_consumer.rs
+++ b/core/bench/src/benchmarks/pinned_producer_and_consumer.rs
@@ -17,7 +17,7 @@
use crate::args::common::IggyBenchArgs;
use crate::benchmarks::benchmark::Benchmarkable;
-use crate::benchmarks::common::*;
+use crate::benchmarks::common::{build_consumer_futures,
build_producer_futures};
use async_trait::async_trait;
use bench_report::benchmark_kind::BenchmarkKind;
use bench_report::individual_metrics::BenchmarkIndividualMetrics;
@@ -33,11 +33,11 @@ pub struct PinnedProducerAndConsumerBenchmark {
}
impl PinnedProducerAndConsumerBenchmark {
- pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn
ClientFactory>) -> Box<Self> {
- Box::new(Self {
+ pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn
ClientFactory>) -> Self {
+ Self {
args,
client_factory,
- })
+ }
}
}
@@ -51,8 +51,8 @@ impl Benchmarkable for PinnedProducerAndConsumerBenchmark {
let args = self.args.clone();
let mut tasks = JoinSet::new();
- let producer_futures = build_producer_futures(cf, &args)?;
- let consumer_futures = build_consumer_futures(cf, &args)?;
+ let producer_futures = build_producer_futures(cf, &args);
+ let consumer_futures = build_consumer_futures(cf, &args);
for fut in producer_futures {
tasks.spawn(fut);
@@ -82,10 +82,10 @@ impl Benchmarkable for PinnedProducerAndConsumerBenchmark {
let partitions = format!("partitions: {}",
self.args.number_of_partitions());
let producers = format!("producers: {}", self.args.producers());
let consumers = format!("consumers: {}", self.args.consumers());
- let max_topic_size = match self.args.max_topic_size() {
- Some(size) => format!(" max topic size: {}", size),
- None => format!(" max topic size: {}",
MaxTopicSize::ServerDefault),
- };
+ let max_topic_size = self.args.max_topic_size().map_or_else(
+ || format!(" max topic size: {}", MaxTopicSize::ServerDefault),
+ |size| format!(" max topic size: {size}"),
+ );
let common_params = self.common_params_str();
info!(
diff --git a/core/bench/src/main.rs b/core/bench/src/main.rs
index fc23cf5f..77d7d66b 100644
--- a/core/bench/src/main.rs
+++ b/core/bench/src/main.rs
@@ -56,7 +56,7 @@ async fn main() -> Result<(), IggyError> {
});
// Configure logging
- let env_filter =
EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO"));
+ let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_|
EnvFilter::new("INFO"));
let stdout_layer = fmt::layer().with_ansi(true);
// If output directory is specified, also log to file
@@ -78,7 +78,7 @@ async fn main() -> Result<(), IggyError> {
.init();
}
- let mut benchmark_runner = BenchmarkRunner::new(args);
+ let benchmark_runner = BenchmarkRunner::new(args);
info!("Starting the benchmarks...");
let ctrl_c = tokio::signal::ctrl_c();
diff --git a/core/bench/src/plot.rs b/core/bench/src/plot.rs
index 3d75be77..fae733aa 100644
--- a/core/bench/src/plot.rs
+++ b/core/bench/src/plot.rs
@@ -31,28 +31,28 @@ pub enum ChartType {
}
impl ChartType {
- fn name(&self) -> &'static str {
+ const fn name(&self) -> &'static str {
match self {
- ChartType::Throughput => "throughput",
- ChartType::Latency => "latency",
+ Self::Throughput => "throughput",
+ Self::Latency => "latency",
}
}
fn create_chart(&self) -> fn(&BenchmarkReport, bool, bool) -> Chart {
match self {
- ChartType::Throughput => bench_report::create_throughput_chart,
- ChartType::Latency => bench_report::create_latency_chart,
+ Self::Throughput => bench_report::create_throughput_chart,
+ Self::Latency => bench_report::create_latency_chart,
}
}
fn get_samples(&self, report: &BenchmarkReport) -> usize {
match self {
- ChartType::Throughput => report
+ Self::Throughput => report
.individual_metrics
.iter()
.map(|m| m.throughput_mb_ts.points.len())
.sum(),
- ChartType::Latency => report
+ Self::Latency => report
.individual_metrics
.iter()
.filter(|m| !m.latency_ts.points.is_empty())
@@ -85,7 +85,7 @@ fn open_in_browser(path: &str) -> std::io::Result<()> {
pub fn plot_chart(
report: &BenchmarkReport,
output_directory: &str,
- chart_type: ChartType,
+ chart_type: &ChartType,
should_open_in_browser: bool,
) -> std::io::Result<()> {
let data_processing_start = Instant::now();
@@ -97,12 +97,12 @@ pub fn plot_chart(
save_chart(&chart, file_name, output_directory, 1600, 1200)?;
if should_open_in_browser {
- let chart_path = format!("{}/{}.html", output_directory, file_name);
+ let chart_path = format!("{output_directory}/{file_name}.html");
open_in_browser(&chart_path)?;
}
let total_samples = chart_type.get_samples(report);
- let report_path = format!("{}/report.json", output_directory);
+ let report_path = format!("{output_directory}/report.json");
let report_size =
IggyByteSize::from(std::fs::metadata(&report_path)?.len());
let chart_render_time = chart_render_start.elapsed();
@@ -129,10 +129,10 @@ fn save_chart(
) -> std::io::Result<()> {
let parent = Path::new(output_directory).parent().unwrap();
std::fs::create_dir_all(parent)?;
- let full_output_path = Path::new(output_directory).join(format!("{}.html",
file_name));
+ let full_output_path =
Path::new(output_directory).join(format!("{file_name}.html"));
let mut renderer = HtmlRenderer::new(file_name, width,
height).theme(Theme::Dark);
renderer
.save(chart, &full_output_path)
- .map_err(|e| std::io::Error::other(format!("Failed to save HTML plot:
{}", e)))
+ .map_err(|e| std::io::Error::other(format!("Failed to save HTML plot:
{e}")))
}
diff --git a/core/bench/src/runner.rs b/core/bench/src/runner.rs
index db0feaf6..49c5e696 100644
--- a/core/bench/src/runner.rs
+++ b/core/bench/src/runner.rs
@@ -37,17 +37,17 @@ pub struct BenchmarkRunner {
}
impl BenchmarkRunner {
- pub fn new(args: IggyBenchArgs) -> Self {
+ pub const fn new(args: IggyBenchArgs) -> Self {
Self {
args: Some(args),
test_server: None,
}
}
- pub async fn run(&mut self) -> Result<(), IggyError> {
- let mut args = self.args.take().unwrap();
+ pub async fn run(mut self) -> Result<(), IggyError> {
+ let args = self.args.take().unwrap();
let should_open_charts = args.open_charts();
- self.test_server = start_server_if_needed(&mut args).await;
+ self.test_server = start_server_if_needed(&args).await;
let transport = args.transport();
let server_addr = args.server_address();
@@ -113,7 +113,7 @@ impl BenchmarkRunner {
plot_chart(
&report,
&full_output_path,
- ChartType::Throughput,
+ &ChartType::Throughput,
should_open_charts,
)
.map_err(|e| {
@@ -123,7 +123,7 @@ impl BenchmarkRunner {
plot_chart(
&report,
&full_output_path,
- ChartType::Latency,
+ &ChartType::Latency,
should_open_charts,
)
.map_err(|e| {
diff --git a/core/bench/src/utils/cpu_name.rs b/core/bench/src/utils/cpu_name.rs
index e36ff2c6..adf08d95 100644
--- a/core/bench/src/utils/cpu_name.rs
+++ b/core/bench/src/utils/cpu_name.rs
@@ -25,8 +25,10 @@ pub fn append_cpu_name_lowercase(to: &mut String) {
let cpu = sys
.cpus()
.first()
- .map(|cpu| (cpu.brand().to_string(), cpu.frequency()))
- .unwrap_or_else(|| (String::from("unknown"), 0))
+ .map_or_else(
+ || (String::from("unknown"), 0),
+ |cpu| (cpu.brand().to_string(), cpu.frequency()),
+ )
.0
.to_lowercase()
.replace(' ', "_");
diff --git a/core/bench/src/utils/finish_condition.rs
b/core/bench/src/utils/finish_condition.rs
index 102e65ab..01bd174e 100644
--- a/core/bench/src/utils/finish_condition.rs
+++ b/core/bench/src/utils/finish_condition.rs
@@ -28,7 +28,7 @@ use std::{
const MINIMUM_MSG_PAYLOAD_SIZE: usize = 20;
/// Determines how to calculate the finish condition's workload division
-#[derive(Debug, Clone, Copy, PartialEq)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BenchmarkFinishConditionMode {
/// Global condition shares work across all actors
Shared,
@@ -68,9 +68,9 @@ impl BenchmarkFinishCondition {
///
/// The mode parameter automatically determines the appropriate workload
division factor:
/// - Global: factor = 1 (total workload is shared across all actors)
- /// - PerProducer: factor = number of producers
- /// - PerConsumer: factor = number of consumers
- /// - PerProducingConsumer: factor = number of producing consumers * 2
+ /// - `PerProducer`: factor = number of producers
+ /// - `PerConsumer`: factor = number of consumers
+ /// - `PerProducingConsumer`: factor = number of producing consumers * 2
pub fn new(args: &IggyBenchArgs, mode: BenchmarkFinishConditionMode) ->
Arc<Self> {
let total_data = args.total_data();
let batches_count = args.message_batches();
@@ -84,17 +84,17 @@ impl BenchmarkFinishCondition {
};
let total_data_multiplier = match args.benchmark_kind {
- BenchmarkKindCommand::PinnedProducer(_) => args.producers(),
- BenchmarkKindCommand::PinnedConsumer(_) => args.consumers(),
+ BenchmarkKindCommand::PinnedProducer(_)
+ | BenchmarkKindCommand::BalancedProducer(_)
+ | BenchmarkKindCommand::BalancedProducerAndConsumerGroup(_) =>
args.producers(),
+ BenchmarkKindCommand::PinnedConsumer(_)
+ | BenchmarkKindCommand::BalancedConsumerGroup(_) =>
args.consumers(),
BenchmarkKindCommand::PinnedProducerAndConsumer(_) => {
args.producers() + args.consumers()
}
- BenchmarkKindCommand::BalancedProducer(_) => args.producers(),
- BenchmarkKindCommand::BalancedConsumerGroup(_) => args.consumers(),
- BenchmarkKindCommand::BalancedProducerAndConsumerGroup(_) =>
args.producers(),
- BenchmarkKindCommand::EndToEndProducingConsumer(_) =>
args.producers() * 2,
- BenchmarkKindCommand::EndToEndProducingConsumerGroup(_) =>
args.producers() * 2,
- _ => unreachable!(),
+ BenchmarkKindCommand::EndToEndProducingConsumer(_)
+ | BenchmarkKindCommand::EndToEndProducingConsumerGroup(_) =>
args.producers() * 2,
+ BenchmarkKindCommand::Examples => unreachable!(),
};
Arc::new(match (total_data, batches_count) {
@@ -103,18 +103,20 @@ impl BenchmarkFinishCondition {
Self {
kind: BenchmarkFinishConditionType::ByMessageBatchesCount,
- total: count_per_actor as u64,
- left_total: Arc::new(AtomicI64::new(count_per_actor as
i64)),
+ total: u64::from(count_per_actor),
+ left_total:
Arc::new(AtomicI64::new(i64::from(count_per_actor))),
mode,
}
}
(Some(size), None) => {
- let bytes_per_actor = size.as_bytes_u64() / total_data_factor
as u64;
+ let bytes_per_actor = size.as_bytes_u64() /
u64::from(total_data_factor);
Self {
kind: BenchmarkFinishConditionType::ByTotalData,
total: bytes_per_actor,
- left_total: Arc::new(AtomicI64::new(bytes_per_actor as
i64)),
+ left_total: Arc::new(AtomicI64::new(
+ i64::try_from(bytes_per_actor).unwrap_or(i64::MAX),
+ )),
mode,
}
}
@@ -136,8 +138,10 @@ impl BenchmarkFinishCondition {
pub fn account_and_check(&self, size_to_subtract: u64) -> bool {
match self.kind {
BenchmarkFinishConditionType::ByTotalData => {
- self.left_total
- .fetch_sub(size_to_subtract as i64, Ordering::AcqRel);
+ self.left_total.fetch_sub(
+ i64::try_from(size_to_subtract).unwrap_or(i64::MAX),
+ Ordering::AcqRel,
+ );
}
BenchmarkFinishConditionType::ByMessageBatchesCount => {
self.left_total.fetch_sub(1, Ordering::AcqRel);
@@ -150,7 +154,7 @@ impl BenchmarkFinishCondition {
self.left() <= 0
}
- pub fn total(&self) -> u64 {
+ pub const fn total(&self) -> u64 {
self.total
}
@@ -175,8 +179,8 @@ impl BenchmarkFinishCondition {
}
pub fn status(&self) -> String {
- let done = self.total() as i64 - self.left();
- let total = self.total() as i64;
+ let done = i64::try_from(self.total()).unwrap_or(i64::MAX) -
self.left();
+ let total = i64::try_from(self.total()).unwrap_or(i64::MAX);
match self.kind {
BenchmarkFinishConditionType::ByTotalData => {
format!(
@@ -200,9 +204,9 @@ impl BenchmarkFinishCondition {
pub fn max_capacity(&self) -> usize {
let value = self.left_total.load(Ordering::Relaxed);
if self.kind == BenchmarkFinishConditionType::ByTotalData {
- value as usize / MINIMUM_MSG_PAYLOAD_SIZE
+ usize::try_from(value).unwrap_or(0) / MINIMUM_MSG_PAYLOAD_SIZE
} else {
- value as usize
+ usize::try_from(value).unwrap_or(0)
}
}
}
@@ -210,11 +214,11 @@ impl BenchmarkFinishCondition {
impl Display for BenchmarkFinishConditionMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
- BenchmarkFinishConditionMode::Shared => write!(f, "shared"),
- BenchmarkFinishConditionMode::SharedHalf => write!(f,
"shared-half"),
- BenchmarkFinishConditionMode::PerProducer => write!(f,
"per-producer"),
- BenchmarkFinishConditionMode::PerConsumer => write!(f,
"per-consumer"),
- BenchmarkFinishConditionMode::PerProducingConsumer => {
+ Self::Shared => write!(f, "shared"),
+ Self::SharedHalf => write!(f, "shared-half"),
+ Self::PerProducer => write!(f, "per-producer"),
+ Self::PerConsumer => write!(f, "per-consumer"),
+ Self::PerProducingConsumer => {
write!(f, "per-producing-consumer")
}
}
diff --git a/core/bench/src/utils/mod.rs b/core/bench/src/utils/mod.rs
index c27cb3ae..42d316f0 100644
--- a/core/bench/src/utils/mod.rs
+++ b/core/bench/src/utils/mod.rs
@@ -26,7 +26,17 @@ use integration::test_server::Transport;
use std::{fs, path::Path};
use tracing::{error, info};
-use crate::args::{common::IggyBenchArgs, defaults::*};
+use crate::args::{
+ common::IggyBenchArgs,
+ defaults::{
+ DEFAULT_BALANCED_NUMBER_OF_PARTITIONS,
DEFAULT_BALANCED_NUMBER_OF_STREAMS,
+ DEFAULT_HTTP_SERVER_ADDRESS, DEFAULT_MESSAGE_BATCHES,
DEFAULT_MESSAGE_SIZE,
+ DEFAULT_MESSAGES_PER_BATCH, DEFAULT_NUMBER_OF_CONSUMER_GROUPS,
DEFAULT_NUMBER_OF_CONSUMERS,
+ DEFAULT_NUMBER_OF_PRODUCERS, DEFAULT_PINNED_NUMBER_OF_PARTITIONS,
+ DEFAULT_PINNED_NUMBER_OF_STREAMS, DEFAULT_QUIC_SERVER_ADDRESS,
DEFAULT_TCP_SERVER_ADDRESS,
+ DEFAULT_TOTAL_MESSAGES_SIZE, DEFAULT_WARMUP_TIME,
+ },
+};
pub mod batch_generator;
pub mod client_factory;
@@ -64,7 +74,7 @@ pub async fn get_server_stats(
.build()?,
BenchmarkTransport::Http => client
.with_http()
- .with_api_url(format!("http://{}", server_address))
+ .with_api_url(format!("http://{server_address}"))
.build()?,
BenchmarkTransport::Quic => client
.with_quic()
@@ -94,7 +104,7 @@ pub async fn collect_server_logs_and_save_to_file(
.build()?,
BenchmarkTransport::Http => client
.with_http()
- .with_api_url(format!("http://{}", server_address))
+ .with_api_url(format!("http://{server_address}"))
.build()?,
BenchmarkTransport::Quic => client
.with_quic()
@@ -159,7 +169,7 @@ pub fn params_from_args_and_metrics(
let remark_for_identifier = remark
.clone()
- .unwrap_or("no_remark".to_string())
+ .unwrap_or_else(|| "no_remark".to_string())
.replace(' ', "_");
let data_volume_identifier = args.data_volume_identifier();
@@ -206,60 +216,68 @@ pub fn params_from_args_and_metrics(
fn recreate_bench_command(args: &IggyBenchArgs) -> String {
let mut parts = Vec::new();
- // If using localhost, add env vars
- let server_address = args.server_address();
+ add_environment_variables(&mut parts, args.server_address());
+ parts.push("iggy-bench".to_string());
+
+ add_basic_arguments(&mut parts, args);
+ add_benchmark_kind_arguments(&mut parts, args);
+ add_infrastructure_arguments(&mut parts, args);
+ add_output_arguments(&mut parts, args);
+
+ parts.join(" ")
+}
+
+fn add_environment_variables(parts: &mut Vec<String>, server_address: &str) {
let is_localhost = server_address
.split(':')
.next()
- .map(|host| host == "localhost" || host == "127.0.0.1")
- .unwrap_or(false);
+ .is_some_and(|host| host == "localhost" || host == "127.0.0.1");
if is_localhost {
- // Get all env vars starting with IGGY_
let iggy_vars: Vec<_> = std::env::vars()
.filter(|(k, _)| k.starts_with("IGGY_"))
.collect();
if !iggy_vars.is_empty() {
info!("Found env vars starting with IGGY_: {:?}", iggy_vars);
- parts.extend(iggy_vars.into_iter().map(|(k, v)| format!("{}={}",
k, v)));
+ parts.extend(iggy_vars.into_iter().map(|(k, v)|
format!("{k}={v}")));
}
}
+}
- parts.push("iggy-bench".to_string());
-
+fn add_basic_arguments(parts: &mut Vec<String>, args: &IggyBenchArgs) {
let messages_per_batch = args.messages_per_batch();
if messages_per_batch !=
BenchmarkNumericParameter::Value(DEFAULT_MESSAGES_PER_BATCH.get()) {
- parts.push(format!("--messages-per-batch {}", messages_per_batch));
+ parts.push(format!("--messages-per-batch {messages_per_batch}"));
}
- let message_batches = args.message_batches();
- if let Some(message_batches) = message_batches {
+ if let Some(message_batches) = args.message_batches() {
if message_batches != DEFAULT_MESSAGE_BATCHES {
- parts.push(format!("--message-batches {}", message_batches));
+ parts.push(format!("--message-batches {message_batches}"));
}
}
- let total_messages_size = args.total_data();
- if let Some(total_messages_size) = total_messages_size {
+ if let Some(total_messages_size) = args.total_data() {
if total_messages_size != DEFAULT_TOTAL_MESSAGES_SIZE {
- parts.push(format!("--total-messages-size {}",
total_messages_size));
+ parts.push(format!("--total-messages-size {total_messages_size}"));
}
}
let message_size = args.message_size();
if message_size !=
BenchmarkNumericParameter::Value(DEFAULT_MESSAGE_SIZE.get()) {
- parts.push(format!("--message-size {}", message_size));
+ parts.push(format!("--message-size {message_size}"));
}
if let Some(rate_limit) = args.rate_limit() {
- parts.push(format!("--rate-limit \'{}\'", rate_limit));
+ parts.push(format!("--rate-limit \'{rate_limit}\'"));
}
if args.warmup_time().to_string() != DEFAULT_WARMUP_TIME {
parts.push(format!("--warmup-time \'{}\'", args.warmup_time()));
}
+}
+fn add_benchmark_kind_arguments(parts: &mut Vec<String>, args: &IggyBenchArgs)
{
let kind_str = match args.benchmark_kind.as_simple_kind() {
BenchmarkKind::PinnedProducer => "pinned-producer",
BenchmarkKind::PinnedConsumer => "pinned-consumer",
@@ -272,6 +290,10 @@ fn recreate_bench_command(args: &IggyBenchArgs) -> String {
};
parts.push(kind_str.to_string());
+ add_actor_arguments(parts, args);
+}
+
+fn add_actor_arguments(parts: &mut Vec<String>, args: &IggyBenchArgs) {
let producers = args.producers();
let consumers = args.consumers();
let number_of_consumer_groups = args.number_of_consumer_groups();
@@ -281,36 +303,38 @@ fn recreate_bench_command(args: &IggyBenchArgs) -> String
{
| BenchmarkKind::BalancedProducer
| BenchmarkKind::EndToEndProducingConsumer => {
if producers != DEFAULT_NUMBER_OF_PRODUCERS.get() {
- parts.push(format!("--producers {}", producers));
+ parts.push(format!("--producers {producers}"));
}
}
BenchmarkKind::PinnedConsumer | BenchmarkKind::BalancedConsumerGroup
=> {
if consumers != DEFAULT_NUMBER_OF_CONSUMERS.get() {
- parts.push(format!("--consumers {}", consumers));
+ parts.push(format!("--consumers {consumers}"));
}
}
BenchmarkKind::PinnedProducerAndConsumer
| BenchmarkKind::BalancedProducerAndConsumerGroup => {
if producers != DEFAULT_NUMBER_OF_PRODUCERS.get() {
- parts.push(format!("--producers {}", producers));
+ parts.push(format!("--producers {producers}"));
}
if consumers != DEFAULT_NUMBER_OF_CONSUMERS.get() {
- parts.push(format!("--consumers {}", consumers));
+ parts.push(format!("--consumers {consumers}"));
}
}
BenchmarkKind::EndToEndProducingConsumerGroup => {
if producers != DEFAULT_NUMBER_OF_PRODUCERS.get() {
- parts.push(format!("--producers {}", producers));
+ parts.push(format!("--producers {producers}"));
}
if consumers != DEFAULT_NUMBER_OF_CONSUMERS.get() {
- parts.push(format!("--consumers {}", consumers));
+ parts.push(format!("--consumers {consumers}"));
}
if number_of_consumer_groups !=
DEFAULT_NUMBER_OF_CONSUMER_GROUPS.get() {
- parts.push(format!("--consumer-groups {}",
number_of_consumer_groups));
+ parts.push(format!("--consumer-groups
{number_of_consumer_groups}"));
}
}
}
+}
+fn add_infrastructure_arguments(parts: &mut Vec<String>, args: &IggyBenchArgs)
{
let streams = args.streams();
let default_streams = match args.benchmark_kind.as_simple_kind() {
BenchmarkKind::BalancedProducerAndConsumerGroup
@@ -319,7 +343,7 @@ fn recreate_bench_command(args: &IggyBenchArgs) -> String {
_ => DEFAULT_PINNED_NUMBER_OF_STREAMS.get(),
};
if streams != default_streams {
- parts.push(format!("--streams {}", streams));
+ parts.push(format!("--streams {streams}"));
}
let partitions = args.number_of_partitions();
@@ -330,24 +354,25 @@ fn recreate_bench_command(args: &IggyBenchArgs) -> String
{
_ => DEFAULT_PINNED_NUMBER_OF_PARTITIONS.get(),
};
if partitions != default_partitions {
- parts.push(format!("--partitions {}", partitions));
+ parts.push(format!("--partitions {partitions}"));
}
let consumer_groups = args.number_of_consumer_groups();
- if args.benchmark_kind.as_simple_kind() ==
BenchmarkKind::BalancedConsumerGroup
- || args.benchmark_kind.as_simple_kind() ==
BenchmarkKind::BalancedProducerAndConsumerGroup
- && consumer_groups != DEFAULT_NUMBER_OF_CONSUMER_GROUPS.get()
+ if (args.benchmark_kind.as_simple_kind() ==
BenchmarkKind::BalancedConsumerGroup
+ || args.benchmark_kind.as_simple_kind() ==
BenchmarkKind::BalancedProducerAndConsumerGroup)
+ && consumer_groups != DEFAULT_NUMBER_OF_CONSUMER_GROUPS.get()
{
- parts.push(format!("--consumer-groups {}", consumer_groups));
+ parts.push(format!("--consumer-groups {consumer_groups}"));
}
if let Some(max_topic_size) = args.max_topic_size() {
- parts.push(format!("--max-topic-size \'{}\'", max_topic_size));
+ parts.push(format!("--max-topic-size \'{max_topic_size}\'"));
}
let transport = args.transport().to_string().to_lowercase();
parts.push(transport.clone());
+ let server_address = args.server_address();
let default_address = match transport.as_str() {
"tcp" => DEFAULT_TCP_SERVER_ADDRESS,
"quic" => DEFAULT_QUIC_SERVER_ADDRESS,
@@ -356,16 +381,15 @@ fn recreate_bench_command(args: &IggyBenchArgs) -> String
{
};
if server_address != default_address {
- parts.push(format!("--server-address {}", server_address));
+ parts.push(format!("--server-address {server_address}"));
}
+}
+fn add_output_arguments(parts: &mut Vec<String>, args: &IggyBenchArgs) {
parts.push("output".to_string());
-
parts.push("-o performance_results".to_string());
- let remark = args.remark();
- if let Some(remark) = remark {
- parts.push(format!("--remark \'{}\'", remark));
+ if let Some(remark) = args.remark() {
+ parts.push(format!("--remark \'{remark}\'"));
}
- parts.join(" ")
}
diff --git a/core/bench/src/utils/rate_limiter.rs
b/core/bench/src/utils/rate_limiter.rs
index f365c958..fdc86193 100644
--- a/core/bench/src/utils/rate_limiter.rs
+++ b/core/bench/src/utils/rate_limiter.rs
@@ -29,7 +29,9 @@ pub struct BenchmarkRateLimiter {
impl BenchmarkRateLimiter {
pub fn new(bytes_per_second: IggyByteSize) -> Self {
- let bytes_per_second = NonZeroU32::new(bytes_per_second.as_bytes_u64()
as u32).unwrap();
+ let bytes_per_second =
+
NonZeroU32::new(u32::try_from(bytes_per_second.as_bytes_u64()).unwrap_or(u32::MAX))
+ .unwrap();
let rate_limiter =
GovernorRateLimiter::direct(Quota::per_second(bytes_per_second));
// Fill the bucket to avoid burst
@@ -40,7 +42,7 @@ impl BenchmarkRateLimiter {
pub async fn wait_until_necessary(&self, bytes: u64) {
self.rate_limiter
- .until_n_ready(NonZeroU32::new(bytes as u32).unwrap())
+
.until_n_ready(NonZeroU32::new(u32::try_from(bytes).unwrap_or(u32::MAX)).unwrap())
.await
.unwrap();
}
diff --git a/core/bench/src/utils/server_starter.rs
b/core/bench/src/utils/server_starter.rs
index c3d0df52..17dc8fc5 100644
--- a/core/bench/src/utils/server_starter.rs
+++ b/core/bench/src/utils/server_starter.rs
@@ -36,22 +36,72 @@ struct ConfigAddress {
address: String,
}
-pub async fn start_server_if_needed(args: &mut IggyBenchArgs) ->
Option<TestServer> {
+pub async fn start_server_if_needed(args: &IggyBenchArgs) ->
Option<TestServer> {
if args.skip_server_start {
info!("Skipping iggy-server start");
return None;
}
+ let (should_start, mut envs) = evaluate_server_start_condition(args).await;
+
+ if should_start {
+ envs.insert(SYSTEM_PATH_ENV_VAR.to_owned(), "local_data".to_owned());
+
+ if args.verbose {
+ envs.insert("IGGY_TEST_VERBOSE".to_owned(), "true".to_owned());
+ info!("Enabling verbose output - iggy-server will logs print to
stdout");
+ } else {
+ info!("Disabling verbose output - iggy-server will print logs to
files");
+ }
+
+ info!(
+ "Starting test server, transport: {}, cleanup: {}, verbosity: {}",
+ args.transport(),
+ args.cleanup,
+ args.verbose
+ );
+ let mut test_server = TestServer::new(
+ Some(envs),
+ args.cleanup,
+ args.server_executable_path.clone(),
+ IpAddrKind::V4,
+ );
+ let now = Instant::now();
+ test_server.start();
+ let elapsed = now.elapsed();
+ if elapsed.as_millis() > 1000 {
+ warn!(
+ "Test iggy-server started, pid: {}, startup took {} ms because
it had to load messages from disk to cache",
+ test_server.pid(),
+ elapsed.as_millis()
+ );
+ } else {
+ info!(
+ "Test iggy-server started, pid: {}, startup time: {} ms",
+ test_server.pid(),
+ elapsed.as_millis()
+ );
+ }
+
+ Some(test_server)
+ } else {
+ info!("Skipping iggy-server start");
+ None
+ }
+}
+
+async fn evaluate_server_start_condition(args: &IggyBenchArgs) -> (bool,
HashMap<String, String>) {
let default_config: ServerConfig =
toml::from_str(include_str!("../../../configs/server.toml")).unwrap();
- let (should_start, mut envs) = match &args.transport() {
+
+ match &args.transport() {
Transport::Http => {
let args_http_address =
args.server_address().parse::<SocketAddr>().unwrap();
let config_http_address =
default_config.http.address.parse::<SocketAddr>().unwrap();
let envs = HashMap::from([
(
"IGGY_HTTP_ADDRESS".to_owned(),
- default_config.http.address.to_owned(),
+ default_config.http.address.clone(),
),
("IGGY_TCP_ENABLED".to_owned(), "false".to_owned()),
("IGGY_QUIC_ENABLED".to_owned(), "false".to_owned()),
@@ -69,7 +119,7 @@ pub async fn start_server_if_needed(args: &mut
IggyBenchArgs) -> Option<TestServ
let envs = HashMap::from([
(
"IGGY_TCP_ADDRESS".to_owned(),
- default_config.tcp.address.to_owned(),
+ default_config.tcp.address.clone(),
),
("IGGY_HTTP_ENABLED".to_owned(), "false".to_owned()),
("IGGY_QUIC_ENABLED".to_owned(), "false".to_owned()),
@@ -86,7 +136,7 @@ pub async fn start_server_if_needed(args: &mut
IggyBenchArgs) -> Option<TestServ
let envs = HashMap::from([
(
"IGGY_QUIC_ADDRESS".to_owned(),
- default_config.quic.address.to_owned(),
+ default_config.quic.address.clone(),
),
("IGGY_HTTP_ENABLED".to_owned(), "false".to_owned()),
("IGGY_TCP_ENABLED".to_owned(), "false".to_owned()),
@@ -98,51 +148,6 @@ pub async fn start_server_if_needed(args: &mut
IggyBenchArgs) -> Option<TestServ
envs,
)
}
- };
-
- if should_start {
- envs.insert(SYSTEM_PATH_ENV_VAR.to_owned(), "local_data".to_owned());
-
- if args.verbose {
- envs.insert("IGGY_TEST_VERBOSE".to_owned(), "true".to_owned());
- info!("Enabling verbose output - iggy-server will logs print to
stdout")
- } else {
- info!("Disabling verbose output - iggy-server will print logs to
files")
- }
-
- info!(
- "Starting test server, transport: {}, cleanup: {}, verbosity: {}",
- args.transport(),
- args.cleanup,
- args.verbose
- );
- let mut test_server = TestServer::new(
- Some(envs),
- args.cleanup,
- args.server_executable_path.clone(),
- IpAddrKind::V4,
- );
- let now = Instant::now();
- test_server.start();
- let elapsed = now.elapsed();
- if elapsed.as_millis() > 1000 {
- warn!(
- "Test iggy-server started, pid: {}, startup took {} ms because
it had to load messages from disk to cache",
- test_server.pid(),
- elapsed.as_millis()
- );
- } else {
- info!(
- "Test iggy-server started, pid: {}, startup time: {} ms",
- test_server.pid(),
- elapsed.as_millis()
- )
- }
-
- Some(test_server)
- } else {
- info!("Skipping iggy-server start");
- None
}
}