This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch bench-stress in repository https://gitbox.apache.org/repos/asf/iggy.git
commit c999d2af074c8dd100bb53ac5d9a6f755855e23a Author: Hubert Gruszecki <[email protected]> AuthorDate: Wed Feb 11 18:37:22 2026 +0100 feat(bench): add stress test with chaos churning The bench suite lacked a way to surface concurrency bugs in the server's CRUD and data-plane paths under simultaneous load. Adds `iggy-bench stress` — a phased benchmark (baseline → chaos → drain → verify) that: - Runs producers/consumers alongside control-plane churners that create/delete topics, partitions, consumer groups, and segments concurrently - Classifies errors into expected (races) vs unexpected via a three-tier error classifier - Verifies message ordering post-chaos with a gap and duplicate detector - Supports ApiMix modes (Mixed, DataPlaneOnly, ControlPlaneHeavy, All) to gate destructive ops like delete-and-recreate-topic and stream purge - Exercises untested polling strategies (First, Last, Timestamp) that race against segment mutations - Tightens defaults (50MiB max topic, 3s churn interval) to increase race window density --- .../src/components/selectors/benchmark_selector.rs | 1 + core/bench/report/src/lib.rs | 2 + core/bench/report/src/prints.rs | 1 + core/bench/report/src/types/actor_kind.rs | 4 + core/bench/report/src/types/benchmark_kind.rs | 3 + core/bench/report/src/types/group_metrics_kind.rs | 4 + core/bench/report/src/types/individual_metrics.rs | 43 ++ core/bench/report/src/types/params.rs | 6 + core/bench/src/actors/mod.rs | 1 + core/bench/src/actors/stress/admin_exerciser.rs | 206 +++++++ .../src/actors/stress/control_plane_churner.rs | 653 +++++++++++++++++++++ core/bench/src/actors/stress/error_classifier.rs | 72 +++ core/bench/src/actors/stress/health_monitor.rs | 143 +++++ .../bench/src/{args/kinds => actors/stress}/mod.rs | 9 +- core/bench/src/actors/stress/stress_context.rs | 177 ++++++ core/bench/src/actors/stress/verifier.rs | 163 +++++ core/bench/src/analytics/metrics/group.rs | 1 + core/bench/src/args/common.rs | 44 +- core/bench/src/args/kind.rs | 34 ++ core/bench/src/args/kinds/mod.rs | 1 + core/bench/src/args/kinds/stress/args.rs | 160 +++++ core/bench/src/args/kinds/{ => stress}/mod.rs | 4 +- core/bench/src/benchmarks/benchmark.rs | 4 + core/bench/src/benchmarks/common.rs | 6 +- core/bench/src/benchmarks/mod.rs | 2 + core/bench/src/benchmarks/stress.rs | 281 +++++++++ core/bench/src/benchmarks/stress_report.rs | 206 +++++++ core/bench/src/utils/finish_condition.rs | 103 +++- core/bench/src/utils/mod.rs | 4 +- 29 files changed, 2292 insertions(+), 46 deletions(-) diff --git a/core/bench/dashboard/frontend/src/components/selectors/benchmark_selector.rs b/core/bench/dashboard/frontend/src/components/selectors/benchmark_selector.rs index d93daedf4..e6b3c6afb 100644 --- a/core/bench/dashboard/frontend/src/components/selectors/benchmark_selector.rs +++ b/core/bench/dashboard/frontend/src/components/selectors/benchmark_selector.rs @@ -66,6 +66,7 @@ pub fn benchmark_selector(props: &BenchmarkSelectorProps) -> Html { | BenchmarkKind::EndToEndProducingConsumerGroup ) } + BenchmarkKind::Stress => matches!(k, BenchmarkKind::Stress), }) .cloned() .collect(); diff --git a/core/bench/report/src/lib.rs b/core/bench/report/src/lib.rs index 8a180752f..b01960d87 100644 --- a/core/bench/report/src/lib.rs +++ b/core/bench/report/src/lib.rs @@ -48,6 +48,7 @@ pub fn create_throughput_chart( ActorKind::Producer => "Producer", ActorKind::Consumer => "Consumer", ActorKind::ProducingConsumer => "Producing Consumer", + ActorKind::StressActor => "Stress Actor", }; chart = chart.add_dual_time_line_series( @@ -113,6 +114,7 @@ pub fn create_latency_chart( ActorKind::Producer => "Producer", ActorKind::Consumer => "Consumer", ActorKind::ProducingConsumer => "Producing Consumer", + ActorKind::StressActor => "Stress Actor", }; chart = chart.add_time_series( diff --git a/core/bench/report/src/prints.rs b/core/bench/report/src/prints.rs index 045560ec3..8cc32ab9c 100644 --- a/core/bench/report/src/prints.rs +++ b/core/bench/report/src/prints.rs @@ -142,6 +142,7 @@ impl BenchmarkGroupMetrics { GroupMetricsKind::Consumers => ("Consumers Results", Color::Green), GroupMetricsKind::ProducersAndConsumers => ("Aggregate Results", Color::Red), GroupMetricsKind::ProducingConsumers => ("Producing Consumer Results", Color::Red), + GroupMetricsKind::StressActors => ("Stress Actor Results", Color::Yellow), }; let actor = self.summary.kind.actor(); diff --git a/core/bench/report/src/types/actor_kind.rs b/core/bench/report/src/types/actor_kind.rs index 7376d3ac4..13b20d81e 100644 --- a/core/bench/report/src/types/actor_kind.rs +++ b/core/bench/report/src/types/actor_kind.rs @@ -30,6 +30,9 @@ pub enum ActorKind { #[display("Producing Consumer")] #[serde(rename = "producing_consumer")] ProducingConsumer, + #[display("Stress Actor")] + #[serde(rename = "stress_actor")] + StressActor, } impl ActorKind { @@ -38,6 +41,7 @@ impl ActorKind { ActorKind::Producer => "Producers", ActorKind::Consumer => "Consumers", ActorKind::ProducingConsumer => "Producing Consumers", + ActorKind::StressActor => "Stress Actors", } } } diff --git a/core/bench/report/src/types/benchmark_kind.rs b/core/bench/report/src/types/benchmark_kind.rs index 43393e532..8747be59a 100644 --- a/core/bench/report/src/types/benchmark_kind.rs +++ b/core/bench/report/src/types/benchmark_kind.rs @@ -59,4 +59,7 @@ pub enum BenchmarkKind { #[display("End To End Producing Consumer Group")] #[serde(rename = "end_to_end_producing_consumer_group")] EndToEndProducingConsumerGroup, + #[display("Stress")] + #[serde(rename = "stress")] + Stress, } diff --git a/core/bench/report/src/types/group_metrics_kind.rs b/core/bench/report/src/types/group_metrics_kind.rs index 928f28489..8334e66bf 100644 --- a/core/bench/report/src/types/group_metrics_kind.rs +++ b/core/bench/report/src/types/group_metrics_kind.rs @@ -34,6 +34,9 @@ pub enum GroupMetricsKind { #[display("Producing Consumers")] #[serde(rename = "producing_consumers")] ProducingConsumers, + #[display("Stress Actors")] + #[serde(rename = "stress_actors")] + StressActors, } impl GroupMetricsKind { @@ -43,6 +46,7 @@ impl GroupMetricsKind { GroupMetricsKind::Consumers => "Consumer", GroupMetricsKind::ProducersAndConsumers => "Actor", GroupMetricsKind::ProducingConsumers => "Producing Consumer", + GroupMetricsKind::StressActors => "Stress Actor", } } } diff --git a/core/bench/report/src/types/individual_metrics.rs b/core/bench/report/src/types/individual_metrics.rs index 2ba13c3c8..24d0d9e2d 100644 --- a/core/bench/report/src/types/individual_metrics.rs +++ b/core/bench/report/src/types/individual_metrics.rs @@ -17,6 +17,7 @@ */ use super::{ + actor_kind::ActorKind, benchmark_kind::BenchmarkKind, individual_metrics_summary::BenchmarkIndividualMetricsSummary, time_series::TimeSeries, }; use crate::utils::{max, min, std_dev}; @@ -32,6 +33,48 @@ pub struct BenchmarkIndividualMetrics { pub latency_ts: TimeSeries, } +impl BenchmarkIndividualMetrics { + /// Creates a zero-valued placeholder for stress actors that don't produce + /// standard throughput/latency metrics (churners, monitors, etc.). + pub fn placeholder(actor_name: &str) -> Self { + Self { + summary: BenchmarkIndividualMetricsSummary { + benchmark_kind: BenchmarkKind::Stress, + actor_kind: ActorKind::StressActor, + actor_id: u32::from_ne_bytes( + actor_name + .as_bytes() + .get(..4) + .unwrap_or(&[0; 4]) + .try_into() + .unwrap_or([0; 4]), + ), + total_time_secs: 0.0, + total_user_data_bytes: 0, + total_bytes: 0, + total_messages: 0, + total_message_batches: 0, + throughput_megabytes_per_second: 0.0, + throughput_messages_per_second: 0.0, + p50_latency_ms: 0.0, + p90_latency_ms: 0.0, + p95_latency_ms: 0.0, + p99_latency_ms: 0.0, + p999_latency_ms: 0.0, + p9999_latency_ms: 0.0, + avg_latency_ms: 0.0, + median_latency_ms: 0.0, + min_latency_ms: 0.0, + max_latency_ms: 0.0, + std_dev_latency_ms: 0.0, + }, + throughput_mb_ts: TimeSeries::default(), + throughput_msg_ts: TimeSeries::default(), + latency_ts: TimeSeries::default(), + } + } +} + // Custom deserializer implementation impl<'de> Deserialize<'de> for BenchmarkIndividualMetrics { fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> diff --git a/core/bench/report/src/types/params.rs b/core/bench/report/src/types/params.rs index 7e90942ba..13e7b9a4a 100644 --- a/core/bench/report/src/types/params.rs +++ b/core/bench/report/src/types/params.rs @@ -73,6 +73,12 @@ impl BenchmarkParams { self.producers, self.consumer_groups ) } + BenchmarkKind::Stress => { + format!( + "{} Producers/{} Consumers (Stress)", + self.producers, self.consumers + ) + } } } } diff --git a/core/bench/src/actors/mod.rs b/core/bench/src/actors/mod.rs index d992ad5ac..6770941ec 100644 --- a/core/bench/src/actors/mod.rs +++ b/core/bench/src/actors/mod.rs @@ -23,6 +23,7 @@ use iggy::prelude::IggyError; pub mod consumer; pub mod producer; pub mod producing_consumer; +pub mod stress; #[derive(Debug, Clone)] pub struct BatchMetrics { diff --git a/core/bench/src/actors/stress/admin_exerciser.rs b/core/bench/src/actors/stress/admin_exerciser.rs new file mode 100644 index 000000000..36faa0758 --- /dev/null +++ b/core/bench/src/actors/stress/admin_exerciser.rs @@ -0,0 +1,206 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use super::error_classifier; +use super::stress_context::StressContext; +use crate::utils::{ClientFactory, login_root}; +use iggy::clients::client::IggyClient; +use iggy::prelude::*; +use std::sync::Arc; +use std::sync::atomic::Ordering; +use tracing::{debug, warn}; + +const ADMIN_CYCLE_INTERVAL_SECS: u64 = 15; + +/// Exercises user management and PAT lifecycle APIs at lower frequency. +/// +/// Each cycle: create user -> create PAT -> delete PAT -> delete user. +/// Also exercises consumer offset store/get and `flush_unsaved_buffer`. +pub struct AdminExerciser { + client_factory: Arc<dyn ClientFactory>, + ctx: Arc<StressContext>, +} + +impl AdminExerciser { + pub fn new(client_factory: Arc<dyn ClientFactory>, ctx: Arc<StressContext>) -> Self { + Self { + client_factory, + ctx, + } + } + + pub async fn run(self) { + let client = self.client_factory.create_client().await; + let client = IggyClient::create(client, None, None); + login_root(&client).await; + + let mut cycle = 0u64; + while !self.ctx.is_cancelled() { + self.user_pat_lifecycle(&client, cycle).await; + self.offset_lifecycle(&client, cycle).await; + self.flush_buffers(&client).await; + + cycle += 1; + tokio::time::sleep(std::time::Duration::from_secs(ADMIN_CYCLE_INTERVAL_SECS)).await; + } + } + + async fn user_pat_lifecycle(&self, client: &IggyClient, cycle: u64) { + let username = format!("stress-user-{cycle}"); + let password = "StressP@ss123!"; + + // Create user + match client + .create_user(&username, password, UserStatus::Active, None) + .await + { + Ok(_) => { + self.ctx + .stats + .create_user_ok + .fetch_add(1, Ordering::Relaxed); + } + Err(e) => { + self.ctx + .stats + .create_user_err + .fetch_add(1, Ordering::Relaxed); + error_classifier::record_error(&self.ctx.stats, &e); + warn!("Admin: create user failed: {e}"); + return; + } + } + + // Create PAT for current session (root user) + let pat_name = format!("stress-pat-{cycle}"); + match client + .create_personal_access_token(&pat_name, IggyExpiry::NeverExpire) + .await + { + Ok(_) => { + self.ctx.stats.create_pat_ok.fetch_add(1, Ordering::Relaxed); + + // Delete PAT + match client.delete_personal_access_token(&pat_name).await { + Ok(()) => { + self.ctx.stats.delete_pat_ok.fetch_add(1, Ordering::Relaxed); + } + Err(e) => { + self.ctx + .stats + .delete_pat_err + .fetch_add(1, Ordering::Relaxed); + error_classifier::record_error(&self.ctx.stats, &e); + debug!("Admin: delete PAT failed: {e}"); + } + } + } + Err(e) => { + self.ctx + .stats + .create_pat_err + .fetch_add(1, Ordering::Relaxed); + error_classifier::record_error(&self.ctx.stats, &e); + debug!("Admin: create PAT failed: {e}"); + } + } + + // Delete user + let user_id: Identifier = username.as_str().try_into().expect("valid identifier"); + match client.delete_user(&user_id).await { + Ok(()) => { + self.ctx + .stats + .delete_user_ok + .fetch_add(1, Ordering::Relaxed); + } + Err(e) => { + self.ctx + .stats + .delete_user_err + .fetch_add(1, Ordering::Relaxed); + error_classifier::record_error(&self.ctx.stats, &e); + debug!("Admin: delete user failed: {e}"); + } + } + } + + async fn offset_lifecycle(&self, client: &IggyClient, cycle: u64) { + let stream_id: Identifier = "bench-stream-1".try_into().expect("valid identifier"); + let topic_id: Identifier = "topic-1".try_into().expect("valid identifier"); + let consumer_id = u32::try_from(cycle % 1_000_000).unwrap_or(0) + 1000; + let consumer = Consumer::new(Identifier::numeric(consumer_id).expect("valid id")); + + // Store offset + match client + .store_consumer_offset(&consumer, &stream_id, &topic_id, Some(1), cycle) + .await + { + Ok(()) => { + self.ctx + .stats + .store_offset_ok + .fetch_add(1, Ordering::Relaxed); + } + Err(e) => { + self.ctx + .stats + .store_offset_err + .fetch_add(1, Ordering::Relaxed); + error_classifier::record_error(&self.ctx.stats, &e); + return; + } + } + + // Get offset + match client + .get_consumer_offset(&consumer, &stream_id, &topic_id, Some(1)) + .await + { + Ok(_) => { + self.ctx.stats.get_offset_ok.fetch_add(1, Ordering::Relaxed); + } + Err(e) => { + self.ctx + .stats + .get_offset_err + .fetch_add(1, Ordering::Relaxed); + error_classifier::record_error(&self.ctx.stats, &e); + } + } + } + + async fn flush_buffers(&self, client: &IggyClient) { + let stream_id: Identifier = "bench-stream-1".try_into().expect("valid identifier"); + let topic_id: Identifier = "topic-1".try_into().expect("valid identifier"); + + match client + .flush_unsaved_buffer(&stream_id, &topic_id, 1, false) + .await + { + Ok(()) => { + self.ctx.stats.flush_ok.fetch_add(1, Ordering::Relaxed); + } + Err(e) => { + self.ctx.stats.flush_err.fetch_add(1, Ordering::Relaxed); + error_classifier::record_error(&self.ctx.stats, &e); + debug!("Admin: flush_unsaved_buffer failed: {e}"); + } + } + } +} diff --git a/core/bench/src/actors/stress/control_plane_churner.rs b/core/bench/src/actors/stress/control_plane_churner.rs new file mode 100644 index 000000000..39df92259 --- /dev/null +++ b/core/bench/src/actors/stress/control_plane_churner.rs @@ -0,0 +1,653 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use super::error_classifier; +use super::stress_context::StressContext; +use crate::args::kinds::stress::args::ApiMix; +use crate::benchmarks::{CONSUMER_GROUP_BASE_ID, CONSUMER_GROUP_NAME_PREFIX}; +use crate::utils::{ClientFactory, login_root}; +use iggy::clients::client::IggyClient; +use iggy::prelude::*; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; +use std::sync::Arc; +use std::sync::atomic::Ordering; +use tracing::{debug, warn}; + +/// CRUD lifecycle operations exercised during the chaos phase. +#[derive(Debug, Clone, Copy)] +enum ChurnOp { + CreateDeleteTopic, + AddRemovePartitions, + ConsumerGroupJoinLeave, + PurgeTopic, + DeleteSegments, + UpdateTopic, + StressPoll, + // Destructive ops — only enabled with ApiMix::All + DeleteAndRecreateTopic, + PurgeStream, + DisruptConsumerGroup, +} + +const STANDARD_OPS: [ChurnOp; 7] = [ + ChurnOp::CreateDeleteTopic, + ChurnOp::AddRemovePartitions, + ChurnOp::ConsumerGroupJoinLeave, + ChurnOp::PurgeTopic, + ChurnOp::DeleteSegments, + ChurnOp::UpdateTopic, + ChurnOp::StressPoll, +]; + +const ALL_OPS: [ChurnOp; 10] = [ + ChurnOp::CreateDeleteTopic, + ChurnOp::AddRemovePartitions, + ChurnOp::ConsumerGroupJoinLeave, + ChurnOp::PurgeTopic, + ChurnOp::DeleteSegments, + ChurnOp::UpdateTopic, + ChurnOp::StressPoll, + ChurnOp::DeleteAndRecreateTopic, + ChurnOp::PurgeStream, + ChurnOp::DisruptConsumerGroup, +]; + +/// Topic configuration shared between the benchmark setup and the churner, +/// so that `DeleteAndRecreateTopic` can recreate with the same parameters. +pub struct ChurnerConfig { + pub api_mix: ApiMix, + pub partitions: u32, + pub message_expiry: IggyExpiry, + pub max_topic_size: MaxTopicSize, +} + +/// Periodically executes CRUD lifecycle operations against the server. +/// +/// Targets the TOCTOU race in consumer group rebalance and exercises +/// create/delete paths under concurrent data-plane load. In `ApiMix::All` +/// mode, also exercises destructive ops (topic deletion, stream purge) +/// that race against active data-plane actors. +pub struct ControlPlaneChurner { + churner_id: u32, + client_factory: Arc<dyn ClientFactory>, + ctx: Arc<StressContext>, + churn_interval: std::time::Duration, + rng: StdRng, + api_mix: ApiMix, + partitions: u32, + message_expiry: IggyExpiry, + max_topic_size: MaxTopicSize, +} + +impl ControlPlaneChurner { + pub fn new( + churner_id: u32, + client_factory: Arc<dyn ClientFactory>, + ctx: Arc<StressContext>, + churn_interval: IggyDuration, + chaos_seed: u64, + config: &ChurnerConfig, + ) -> Self { + let rng = StdRng::seed_from_u64(chaos_seed.wrapping_add(u64::from(churner_id))); + Self { + churner_id, + client_factory, + ctx, + churn_interval: churn_interval.get_duration(), + rng, + api_mix: config.api_mix, + partitions: config.partitions, + message_expiry: config.message_expiry, + max_topic_size: config.max_topic_size, + } + } + + fn available_ops(&self) -> &'static [ChurnOp] { + match self.api_mix { + ApiMix::Mixed | ApiMix::ControlPlaneHeavy => &STANDARD_OPS, + ApiMix::All => &ALL_OPS, + ApiMix::DataPlaneOnly => unreachable!("churner not spawned for DataPlaneOnly"), + } + } + + pub async fn run(mut self) { + let client = self.client_factory.create_client().await; + let client = IggyClient::create(client, None, None); + login_root(&client).await; + + let mut cycle = 0u64; + while !self.ctx.is_cancelled() { + let ops = self.available_ops(); + let op = ops[self.rng.random_range(0..ops.len())]; + debug!( + "Churner #{} cycle {cycle}: executing {:?}", + self.churner_id, op + ); + + match op { + ChurnOp::CreateDeleteTopic => { + self.create_delete_topic(&client, cycle).await; + } + ChurnOp::AddRemovePartitions => { + self.add_remove_partitions(&client).await; + } + ChurnOp::ConsumerGroupJoinLeave => { + self.consumer_group_join_leave(&client, cycle).await; + } + ChurnOp::PurgeTopic => { + self.purge_random_topic(&client).await; + } + ChurnOp::DeleteSegments => { + self.delete_segments(&client).await; + } + ChurnOp::UpdateTopic => { + self.update_topic(&client).await; + } + ChurnOp::StressPoll => { + self.stress_poll(&client).await; + } + ChurnOp::DeleteAndRecreateTopic => { + self.delete_and_recreate_topic(&client).await; + } + ChurnOp::PurgeStream => { + self.purge_stream(&client).await; + } + ChurnOp::DisruptConsumerGroup => { + self.disrupt_consumer_group(&client).await; + } + } + + cycle += 1; + tokio::time::sleep(self.churn_interval).await; + } + } + + fn random_stream_id(&self) -> Identifier { + let stream_idx = self.rng.clone().random_range(1..=2u32); + format!("bench-stream-{stream_idx}") + .as_str() + .try_into() + .expect("valid identifier") + } + + fn topic_id() -> Identifier { + "topic-1".try_into().expect("valid identifier") + } + + // --- Original ops --- + + async fn create_delete_topic(&self, client: &IggyClient, cycle: u64) { + let stream_id: Identifier = "bench-stream-1".try_into().expect("valid identifier"); + let topic_name = format!("churn-{}-{cycle}", self.churner_id); + + match client + .create_topic( + &stream_id, + &topic_name, + 1, + CompressionAlgorithm::default(), + None, + IggyExpiry::NeverExpire, + MaxTopicSize::ServerDefault, + ) + .await + { + Ok(_) => { + self.ctx + .stats + .create_topic_ok + .fetch_add(1, Ordering::Relaxed); + + self.ctx + .ephemeral_topics + .lock() + .await + .push((stream_id.clone(), topic_name.clone())); + + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + + let topic_id: Identifier = + topic_name.as_str().try_into().expect("valid identifier"); + match client.delete_topic(&stream_id, &topic_id).await { + Ok(()) => { + self.ctx + .stats + .delete_topic_ok + .fetch_add(1, Ordering::Relaxed); + let mut topics = self.ctx.ephemeral_topics.lock().await; + topics.retain(|(_, name)| name != &topic_name); + } + Err(e) => { + self.ctx + .stats + .delete_topic_err + .fetch_add(1, Ordering::Relaxed); + error_classifier::record_error(&self.ctx.stats, &e); + warn!("Churner #{}: delete topic failed: {e}", self.churner_id); + } + } + } + Err(e) => { + self.ctx + .stats + .create_topic_err + .fetch_add(1, Ordering::Relaxed); + error_classifier::record_error(&self.ctx.stats, &e); + warn!("Churner #{}: create topic failed: {e}", self.churner_id); + } + } + } + + async fn add_remove_partitions(&self, client: &IggyClient) { + let stream_id: Identifier = "bench-stream-1".try_into().expect("valid identifier"); + let topic_id = Self::topic_id(); + + match client.create_partitions(&stream_id, &topic_id, 1).await { + Ok(()) => { + self.ctx + .stats + .create_partitions_ok + .fetch_add(1, Ordering::Relaxed); + + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + + match client.delete_partitions(&stream_id, &topic_id, 1).await { + Ok(()) => { + self.ctx + .stats + .delete_partitions_ok + .fetch_add(1, Ordering::Relaxed); + } + Err(e) => { + self.ctx + .stats + .delete_partitions_err + .fetch_add(1, Ordering::Relaxed); + error_classifier::record_error(&self.ctx.stats, &e); + debug!( + "Churner #{}: delete partitions failed: {e}", + self.churner_id + ); + } + } + } + Err(e) => { + self.ctx + .stats + .create_partitions_err + .fetch_add(1, Ordering::Relaxed); + error_classifier::record_error(&self.ctx.stats, &e); + debug!( + "Churner #{}: create partitions failed: {e}", + self.churner_id + ); + } + } + } + + /// Rapid join -> poll once -> leave cycle targeting the TOCTOU race + /// in `resolve_consumer_with_partition_id()`. + async fn consumer_group_join_leave(&self, client: &IggyClient, cycle: u64) { + let stream_id: Identifier = "bench-stream-1".try_into().expect("valid identifier"); + let topic_id = Self::topic_id(); + let cg_name = format!("churn-cg-{}-{cycle}", self.churner_id); + + match client + .create_consumer_group(&stream_id, &topic_id, &cg_name) + .await + { + Ok(_) => { + self.ctx + .stats + .create_consumer_group_ok + .fetch_add(1, Ordering::Relaxed); + } + Err(e) => { + self.ctx + .stats + .create_consumer_group_err + .fetch_add(1, Ordering::Relaxed); + error_classifier::record_error(&self.ctx.stats, &e); + return; + } + } + + let cg_id: Identifier = cg_name.as_str().try_into().expect("valid identifier"); + + match client + .join_consumer_group(&stream_id, &topic_id, &cg_id) + .await + { + Ok(()) => { + self.ctx + .stats + .join_consumer_group_ok + .fetch_add(1, Ordering::Relaxed); + } + Err(e) => { + self.ctx + .stats + .join_consumer_group_err + .fetch_add(1, Ordering::Relaxed); + error_classifier::record_error(&self.ctx.stats, &e); + } + } + + match client + .leave_consumer_group(&stream_id, &topic_id, &cg_id) + .await + { + Ok(()) => { + self.ctx + .stats + .leave_consumer_group_ok + .fetch_add(1, Ordering::Relaxed); + } + Err(e) => { + self.ctx + .stats + .leave_consumer_group_err + .fetch_add(1, Ordering::Relaxed); + error_classifier::record_error(&self.ctx.stats, &e); + } + } + + match client + .delete_consumer_group(&stream_id, &topic_id, &cg_id) + .await + { + Ok(()) => { + self.ctx + .stats + .delete_consumer_group_ok + .fetch_add(1, Ordering::Relaxed); + } + Err(e) => { + self.ctx + .stats + .delete_consumer_group_err + .fetch_add(1, Ordering::Relaxed); + error_classifier::record_error(&self.ctx.stats, &e); + } + } + } + + async fn purge_random_topic(&self, client: &IggyClient) { + let stream_id = self.random_stream_id(); + let topic_id = Self::topic_id(); + + match client.purge_topic(&stream_id, &topic_id).await { + Ok(()) => { + self.ctx + .stats + .purge_topic_ok + .fetch_add(1, Ordering::Relaxed); + } + Err(e) => { + self.ctx + .stats + .purge_topic_err + .fetch_add(1, Ordering::Relaxed); + error_classifier::record_error(&self.ctx.stats, &e); + debug!("Churner #{}: purge topic failed: {e}", self.churner_id); + } + } + } + + // --- New safe ops --- + + async fn delete_segments(&self, client: &IggyClient) { + let stream_id = self.random_stream_id(); + let topic_id = Self::topic_id(); + let partition_id = self.rng.clone().random_range(1..=self.partitions); + + match client + .delete_segments(&stream_id, &topic_id, partition_id, 1) + .await + { + Ok(()) => { + self.ctx + .stats + .delete_segments_ok + .fetch_add(1, Ordering::Relaxed); + } + Err(e) => { + self.ctx + .stats + .delete_segments_err + .fetch_add(1, Ordering::Relaxed); + error_classifier::record_error(&self.ctx.stats, &e); + debug!("Churner #{}: delete segments failed: {e}", self.churner_id); + } + } + } + + async fn update_topic(&self, client: &IggyClient) { + let stream_id = self.random_stream_id(); + let topic_id = Self::topic_id(); + + let compression = if self.rng.clone().random_bool(0.5) { + CompressionAlgorithm::None + } else { + CompressionAlgorithm::Gzip + }; + + match client + .update_topic( + &stream_id, + &topic_id, + "topic-1", + compression, + None, + self.message_expiry, + self.max_topic_size, + ) + .await + { + Ok(()) => { + self.ctx + .stats + .update_topic_ok + .fetch_add(1, Ordering::Relaxed); + } + Err(e) => { + self.ctx + .stats + .update_topic_err + .fetch_add(1, Ordering::Relaxed); + error_classifier::record_error(&self.ctx.stats, &e); + debug!("Churner #{}: update topic failed: {e}", self.churner_id); + } + } + } + + /// One-off polls with First/Last/Timestamp strategies — these are "victim" + /// operations that race against concurrent segment mutations. + async fn stress_poll(&self, client: &IggyClient) { + let stream_id = self.random_stream_id(); + let topic_id = Self::topic_id(); + let partition_id = self.rng.clone().random_range(1..=self.partitions); + let consumer = Consumer::new(Identifier::numeric(8888).expect("valid consumer id")); + + let strategy = match self.rng.clone().random_range(0..3u32) { + 0 => PollingStrategy::first(), + 1 => PollingStrategy::last(), + _ => PollingStrategy::timestamp(IggyTimestamp::now()), + }; + + match client + .poll_messages( + &stream_id, + &topic_id, + Some(partition_id), + &consumer, + &strategy, + 10, + false, + ) + .await + { + Ok(_) => { + self.ctx + .stats + .stress_poll_ok + .fetch_add(1, Ordering::Relaxed); + } + Err(e) => { + self.ctx + .stats + .stress_poll_err + .fetch_add(1, Ordering::Relaxed); + error_classifier::record_error(&self.ctx.stats, &e); + debug!("Churner #{}: stress poll failed: {e}", self.churner_id); + } + } + } + + // --- Destructive ops (ApiMix::All only) --- + + /// Deletes and immediately recreates topic-1 on a random stream. + /// Exercises 23+ `expect()` panic sites reachable when consumers + /// hold stale topic references during deletion. + async fn delete_and_recreate_topic(&self, client: &IggyClient) { + let stream_id = self.random_stream_id(); + let topic_id = Self::topic_id(); + + match client.delete_topic(&stream_id, &topic_id).await { + Ok(()) => { + self.ctx + .stats + .delete_topic_ok + .fetch_add(1, Ordering::Relaxed); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + match client + .create_topic( + &stream_id, + "topic-1", + self.partitions, + CompressionAlgorithm::default(), + None, + self.message_expiry, + self.max_topic_size, + ) + .await + { + Ok(_) => { + self.ctx + .stats + .create_topic_ok + .fetch_add(1, Ordering::Relaxed); + } + Err(e) => { + self.ctx + .stats + .create_topic_err + .fetch_add(1, Ordering::Relaxed); + error_classifier::record_error(&self.ctx.stats, &e); + warn!("Churner #{}: recreate topic failed: {e}", self.churner_id); + } + } + } + Err(e) => { + self.ctx + .stats + .delete_topic_err + .fetch_add(1, Ordering::Relaxed); + error_classifier::record_error(&self.ctx.stats, &e); + debug!( + "Churner #{}: delete topic (for recreate) failed: {e}", + self.churner_id + ); + } + } + } + + async fn purge_stream(&self, client: &IggyClient) { + let stream_id = self.random_stream_id(); + + match client.purge_stream(&stream_id).await { + Ok(()) => { + self.ctx + .stats + .purge_stream_ok + .fetch_add(1, Ordering::Relaxed); + } + Err(e) => { + self.ctx + .stats + .purge_stream_err + .fetch_add(1, Ordering::Relaxed); + error_classifier::record_error(&self.ctx.stats, &e); + debug!("Churner #{}: purge stream failed: {e}", self.churner_id); + } + } + } + + /// Joins and leaves the consumer group that active consumers are using, + /// forcing a rebalance mid-poll (TOCTOU in CG resolution). + async fn disrupt_consumer_group(&self, client: &IggyClient) { + let stream_id: Identifier = "bench-stream-1".try_into().expect("valid identifier"); + let topic_id = Self::topic_id(); + let cg_name = format!("{CONSUMER_GROUP_NAME_PREFIX}-{CONSUMER_GROUP_BASE_ID}"); + let cg_id: Identifier = cg_name.as_str().try_into().expect("valid identifier"); + + match client + .join_consumer_group(&stream_id, &topic_id, &cg_id) + .await + { + Ok(()) => { + self.ctx + .stats + .join_consumer_group_ok + .fetch_add(1, Ordering::Relaxed); + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + match client + .leave_consumer_group(&stream_id, &topic_id, &cg_id) + .await + { + Ok(()) => { + self.ctx + .stats + .leave_consumer_group_ok + .fetch_add(1, Ordering::Relaxed); + } + Err(e) => { + self.ctx + .stats + .leave_consumer_group_err + .fetch_add(1, Ordering::Relaxed); + error_classifier::record_error(&self.ctx.stats, &e); + } + } + } + Err(e) => { + self.ctx + .stats + .join_consumer_group_err + .fetch_add(1, Ordering::Relaxed); + error_classifier::record_error(&self.ctx.stats, &e); + debug!("Churner #{}: disrupt CG join failed: {e}", self.churner_id); + } + } + } +} diff --git a/core/bench/src/actors/stress/error_classifier.rs b/core/bench/src/actors/stress/error_classifier.rs new file mode 100644 index 000000000..7e853a571 --- /dev/null +++ b/core/bench/src/actors/stress/error_classifier.rs @@ -0,0 +1,72 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use iggy::prelude::IggyError; +use std::sync::atomic::Ordering; + +use super::stress_context::StressStats; + +/// Three-tier error classification for stress test chaos tolerance. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ErrorSeverity { + /// Expected during chaos: races, stale references, already-exists conflicts. + Expected, + /// Not anticipated but non-fatal: server-side validation, capacity limits. + Unexpected, +} + +/// Classifies an `IggyError` into expected vs unexpected during a stress test. +/// +/// During chaos (CRUD churn + concurrent data-plane), certain errors are normal: +/// - Resource-not-found errors when a churner deletes a topic another actor references +/// - Already-exists errors from concurrent create attempts +/// - Consumer group member-not-found during rebalance races +pub const fn classify(error: &IggyError) -> ErrorSeverity { + match error { + // Resource races, already-exists conflicts, user/PAT concurrency + IggyError::StreamIdNotFound(_) + | IggyError::TopicIdNotFound(_, _) + | IggyError::PartitionNotFound(_, _, _) + | IggyError::ConsumerGroupIdNotFound(_, _) + | IggyError::ConsumerGroupNameNotFound(_, _) + | IggyError::ConsumerGroupMemberNotFound(_, _, _) + | IggyError::ResourceNotFound(_) + | IggyError::StreamNameAlreadyExists(_) + | IggyError::TopicNameAlreadyExists(_, _) + | IggyError::ConsumerGroupNameAlreadyExists(_, _) + | IggyError::UserAlreadyExists + | IggyError::PersonalAccessTokenAlreadyExists(_, _) + | IggyError::InvalidPersonalAccessToken + | IggyError::SegmentNotFound + | IggyError::SegmentClosed(_, _) => ErrorSeverity::Expected, + + _ => ErrorSeverity::Unexpected, + } +} + +/// Records an error in the stress stats based on its severity. +pub fn record_error(stats: &StressStats, error: &IggyError) { + match classify(error) { + ErrorSeverity::Expected => { + stats.expected_errors.fetch_add(1, Ordering::Relaxed); + } + ErrorSeverity::Unexpected => { + stats.unexpected_errors.fetch_add(1, Ordering::Relaxed); + } + } +} diff --git a/core/bench/src/actors/stress/health_monitor.rs b/core/bench/src/actors/stress/health_monitor.rs new file mode 100644 index 000000000..601fab220 --- /dev/null +++ b/core/bench/src/actors/stress/health_monitor.rs @@ -0,0 +1,143 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use super::error_classifier; +use super::stress_context::StressContext; +use crate::utils::{ClientFactory, login_root}; +use iggy::clients::client::IggyClient; +use iggy::prelude::*; +use std::sync::Arc; +use std::sync::atomic::Ordering; +use std::time::Instant; +use tracing::{debug, info, warn}; + +const PING_INTERVAL_SECS: u64 = 5; +const STATS_INTERVAL_SECS: u64 = 10; +const METADATA_INTERVAL_SECS: u64 = 30; + +/// Periodically probes server health and metadata convergence. +/// +/// Runs `ping`, `get_stats`, `get_me`, `get_clients`, and metadata queries +/// at different frequencies. Reports latency degradation. +pub struct HealthMonitor { + client_factory: Arc<dyn ClientFactory>, + ctx: Arc<StressContext>, +} + +impl HealthMonitor { + pub fn new(client_factory: Arc<dyn ClientFactory>, ctx: Arc<StressContext>) -> Self { + Self { + client_factory, + ctx, + } + } + + pub async fn run(self) { + let client = self.client_factory.create_client().await; + let client = IggyClient::create(client, None, None); + login_root(&client).await; + + let mut tick = 0u64; + while !self.ctx.is_cancelled() { + // Ping every cycle (5s) + self.probe_ping(&client).await; + + // Stats every 2 cycles (10s) + if tick.is_multiple_of(STATS_INTERVAL_SECS / PING_INTERVAL_SECS) { + self.probe_stats(&client).await; + } + + if tick.is_multiple_of(METADATA_INTERVAL_SECS / PING_INTERVAL_SECS) { + self.probe_metadata(&client).await; + } + + tick += 1; + tokio::time::sleep(std::time::Duration::from_secs(PING_INTERVAL_SECS)).await; + } + } + + async fn probe_ping(&self, client: &IggyClient) { + let start = Instant::now(); + match client.ping().await { + Ok(()) => { + let latency = start.elapsed(); + self.ctx.stats.ping_ok.fetch_add(1, Ordering::Relaxed); + if latency.as_millis() > 500 { + warn!("Health: ping latency {latency:?} exceeds 500ms"); + } + } + Err(e) => { + self.ctx.stats.ping_err.fetch_add(1, Ordering::Relaxed); + error_classifier::record_error(&self.ctx.stats, &e); + warn!("Health: ping failed: {e}"); + } + } + } + + async fn probe_stats(&self, client: &IggyClient) { + let start = Instant::now(); + match client.get_stats().await { + Ok(stats) => { + let latency = start.elapsed(); + self.ctx.stats.get_stats_ok.fetch_add(1, Ordering::Relaxed); + info!( + "Health: server stats in {latency:?} - messages: {}, streams: {}, topics: {}", + stats.messages_count, stats.streams_count, stats.topics_count + ); + } + Err(e) => { + self.ctx.stats.get_stats_err.fetch_add(1, Ordering::Relaxed); + error_classifier::record_error(&self.ctx.stats, &e); + warn!("Health: get_stats failed: {e}"); + } + } + } + + async fn probe_metadata(&self, client: &IggyClient) { + // get_me + match client.get_me().await { + Ok(_) => { + self.ctx.stats.get_me_ok.fetch_add(1, Ordering::Relaxed); + } + Err(e) => { + self.ctx.stats.get_me_err.fetch_add(1, Ordering::Relaxed); + error_classifier::record_error(&self.ctx.stats, &e); + debug!("Health: get_me failed: {e}"); + } + } + + // get_clients + match client.get_clients().await { + Ok(clients) => { + self.ctx + .stats + .get_clients_ok + .fetch_add(1, Ordering::Relaxed); + debug!("Health: {} connected clients", clients.len()); + } + Err(e) => { + self.ctx + .stats + .get_clients_err + .fetch_add(1, Ordering::Relaxed); + error_classifier::record_error(&self.ctx.stats, &e); + debug!("Health: get_clients failed: {e}"); + } + } + } +} diff --git a/core/bench/src/args/kinds/mod.rs b/core/bench/src/actors/stress/mod.rs similarity index 84% copy from core/bench/src/args/kinds/mod.rs copy to core/bench/src/actors/stress/mod.rs index 66e522a58..1b146595f 100644 --- a/core/bench/src/args/kinds/mod.rs +++ b/core/bench/src/actors/stress/mod.rs @@ -16,6 +16,9 @@ * under the License. */ -pub mod balanced; -pub mod end_to_end; -pub mod pinned; +pub mod admin_exerciser; +pub mod control_plane_churner; +pub mod error_classifier; +pub mod health_monitor; +pub mod stress_context; +pub mod verifier; diff --git a/core/bench/src/actors/stress/stress_context.rs b/core/bench/src/actors/stress/stress_context.rs new file mode 100644 index 000000000..c1540a81d --- /dev/null +++ b/core/bench/src/actors/stress/stress_context.rs @@ -0,0 +1,177 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use iggy::prelude::Identifier; +use std::sync::{ + Arc, + atomic::{AtomicBool, AtomicU64, Ordering}, +}; +use tokio::sync::Mutex; + +/// Shared state across all stress test actors. +pub struct StressContext { + pub cancelled: Arc<AtomicBool>, + pub stats: Arc<StressStats>, + /// Tracks ephemeral topics created by churners for cleanup: `(stream_id, topic_name)` + pub ephemeral_topics: Arc<Mutex<Vec<(Identifier, String)>>>, +} + +impl StressContext { + pub fn new() -> Self { + Self { + cancelled: Arc::new(AtomicBool::new(false)), + stats: Arc::new(StressStats::default()), + ephemeral_topics: Arc::new(Mutex::new(Vec::new())), + } + } + + pub fn cancel(&self) { + self.cancelled.store(true, Ordering::Release); + } + + pub fn is_cancelled(&self) -> bool { + self.cancelled.load(Ordering::Acquire) + } +} + +/// Per-API atomic counters for stress test telemetry. +#[derive(Default)] +pub struct StressStats { + pub send_messages_ok: AtomicU64, + pub send_messages_err: AtomicU64, + pub poll_messages_ok: AtomicU64, + pub poll_messages_err: AtomicU64, + pub create_stream_ok: AtomicU64, + pub create_stream_err: AtomicU64, + pub delete_stream_ok: AtomicU64, + pub delete_stream_err: AtomicU64, + pub create_topic_ok: AtomicU64, + pub create_topic_err: AtomicU64, + pub delete_topic_ok: AtomicU64, + pub delete_topic_err: AtomicU64, + pub create_partitions_ok: AtomicU64, + pub create_partitions_err: AtomicU64, + pub delete_partitions_ok: AtomicU64, + pub delete_partitions_err: AtomicU64, + pub create_consumer_group_ok: AtomicU64, + pub create_consumer_group_err: AtomicU64, + pub delete_consumer_group_ok: AtomicU64, + pub delete_consumer_group_err: AtomicU64, + pub join_consumer_group_ok: AtomicU64, + pub join_consumer_group_err: AtomicU64, + pub leave_consumer_group_ok: AtomicU64, + pub leave_consumer_group_err: AtomicU64, + pub purge_topic_ok: AtomicU64, + pub purge_topic_err: AtomicU64, + pub delete_segments_ok: AtomicU64, + pub delete_segments_err: AtomicU64, + pub update_topic_ok: AtomicU64, + pub update_topic_err: AtomicU64, + pub purge_stream_ok: AtomicU64, + pub purge_stream_err: AtomicU64, + pub stress_poll_ok: AtomicU64, + pub stress_poll_err: AtomicU64, + pub create_user_ok: AtomicU64, + pub create_user_err: AtomicU64, + pub delete_user_ok: AtomicU64, + pub delete_user_err: AtomicU64, + pub create_pat_ok: AtomicU64, + pub create_pat_err: AtomicU64, + pub delete_pat_ok: AtomicU64, + pub delete_pat_err: AtomicU64, + pub store_offset_ok: AtomicU64, + pub store_offset_err: AtomicU64, + pub get_offset_ok: AtomicU64, + pub get_offset_err: AtomicU64, + pub ping_ok: AtomicU64, + pub ping_err: AtomicU64, + pub get_stats_ok: AtomicU64, + pub get_stats_err: AtomicU64, + pub get_me_ok: AtomicU64, + pub get_me_err: AtomicU64, + pub get_clients_ok: AtomicU64, + pub get_clients_err: AtomicU64, + pub flush_ok: AtomicU64, + pub flush_err: AtomicU64, + pub expected_errors: AtomicU64, + pub unexpected_errors: AtomicU64, +} + +impl StressStats { + pub fn total_ok(&self) -> u64 { + self.send_messages_ok.load(Ordering::Relaxed) + + self.poll_messages_ok.load(Ordering::Relaxed) + + self.create_stream_ok.load(Ordering::Relaxed) + + self.delete_stream_ok.load(Ordering::Relaxed) + + self.create_topic_ok.load(Ordering::Relaxed) + + self.delete_topic_ok.load(Ordering::Relaxed) + + self.create_partitions_ok.load(Ordering::Relaxed) + + self.delete_partitions_ok.load(Ordering::Relaxed) + + self.create_consumer_group_ok.load(Ordering::Relaxed) + + self.delete_consumer_group_ok.load(Ordering::Relaxed) + + self.join_consumer_group_ok.load(Ordering::Relaxed) + + self.leave_consumer_group_ok.load(Ordering::Relaxed) + + self.purge_topic_ok.load(Ordering::Relaxed) + + self.delete_segments_ok.load(Ordering::Relaxed) + + self.update_topic_ok.load(Ordering::Relaxed) + + self.purge_stream_ok.load(Ordering::Relaxed) + + self.stress_poll_ok.load(Ordering::Relaxed) + + self.create_user_ok.load(Ordering::Relaxed) + + self.delete_user_ok.load(Ordering::Relaxed) + + self.create_pat_ok.load(Ordering::Relaxed) + + self.delete_pat_ok.load(Ordering::Relaxed) + + self.store_offset_ok.load(Ordering::Relaxed) + + self.get_offset_ok.load(Ordering::Relaxed) + + self.ping_ok.load(Ordering::Relaxed) + + self.get_stats_ok.load(Ordering::Relaxed) + + self.get_me_ok.load(Ordering::Relaxed) + + self.get_clients_ok.load(Ordering::Relaxed) + + self.flush_ok.load(Ordering::Relaxed) + } + + pub fn total_err(&self) -> u64 { + self.send_messages_err.load(Ordering::Relaxed) + + self.poll_messages_err.load(Ordering::Relaxed) + + self.create_stream_err.load(Ordering::Relaxed) + + self.delete_stream_err.load(Ordering::Relaxed) + + self.create_topic_err.load(Ordering::Relaxed) + + self.delete_topic_err.load(Ordering::Relaxed) + + self.create_partitions_err.load(Ordering::Relaxed) + + self.delete_partitions_err.load(Ordering::Relaxed) + + self.create_consumer_group_err.load(Ordering::Relaxed) + + self.delete_consumer_group_err.load(Ordering::Relaxed) + + self.join_consumer_group_err.load(Ordering::Relaxed) + + self.leave_consumer_group_err.load(Ordering::Relaxed) + + self.purge_topic_err.load(Ordering::Relaxed) + + self.delete_segments_err.load(Ordering::Relaxed) + + self.update_topic_err.load(Ordering::Relaxed) + + self.purge_stream_err.load(Ordering::Relaxed) + + self.stress_poll_err.load(Ordering::Relaxed) + + self.create_user_err.load(Ordering::Relaxed) + + self.delete_user_err.load(Ordering::Relaxed) + + self.create_pat_err.load(Ordering::Relaxed) + + self.delete_pat_err.load(Ordering::Relaxed) + + self.store_offset_err.load(Ordering::Relaxed) + + self.get_offset_err.load(Ordering::Relaxed) + + self.ping_err.load(Ordering::Relaxed) + + self.get_stats_err.load(Ordering::Relaxed) + + self.get_me_err.load(Ordering::Relaxed) + + self.get_clients_err.load(Ordering::Relaxed) + + self.flush_err.load(Ordering::Relaxed) + } +} diff --git a/core/bench/src/actors/stress/verifier.rs b/core/bench/src/actors/stress/verifier.rs new file mode 100644 index 000000000..cb9ff753c --- /dev/null +++ b/core/bench/src/actors/stress/verifier.rs @@ -0,0 +1,163 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::utils::{ClientFactory, login_root}; +use iggy::clients::client::IggyClient; +use iggy::prelude::*; +use std::collections::BTreeSet; +use std::sync::Arc; +use tracing::{info, warn}; + +/// Post-test verification results. +#[derive(Debug, Default)] +pub struct VerificationResult { + pub partitions_checked: u32, + pub total_messages: u64, + pub gaps_found: u64, + pub duplicates_found: u64, + pub passed: bool, +} + +/// Runs drain-phase verification: polls all partitions and checks offset continuity. +/// +/// During the stress test, messages may expire, so we verify that within each +/// partition the offsets we can still poll are monotonically increasing with no +/// gaps in the remaining range. +pub struct StressVerifier { + client_factory: Arc<dyn ClientFactory>, + streams: u32, + partitions: u32, +} + +impl StressVerifier { + pub fn new(client_factory: Arc<dyn ClientFactory>, streams: u32, partitions: u32) -> Self { + Self { + client_factory, + streams, + partitions, + } + } + + pub async fn verify(&self) -> VerificationResult { + let client = self.client_factory.create_client().await; + let client = IggyClient::create(client, None, None); + login_root(&client).await; + + let mut result = VerificationResult::default(); + + for stream_idx in 1..=self.streams { + let stream_id: Identifier = format!("bench-stream-{stream_idx}") + .as_str() + .try_into() + .expect("valid identifier"); + let topic_id: Identifier = "topic-1".try_into().expect("valid identifier"); + + for partition_id in 0..self.partitions { + let partition_result = self + .verify_partition(&client, &stream_id, &topic_id, partition_id) + .await; + result.partitions_checked += 1; + result.total_messages += partition_result.total_messages; + result.gaps_found += partition_result.gaps_found; + result.duplicates_found += partition_result.duplicates_found; + } + } + + result.passed = result.gaps_found == 0 && result.duplicates_found == 0; + + if result.passed { + info!( + "Verification PASSED: {} partitions, {} messages, 0 gaps, 0 duplicates", + result.partitions_checked, result.total_messages + ); + } else { + warn!( + "Verification FAILED: {} partitions, {} messages, {} gaps, {} duplicates", + result.partitions_checked, + result.total_messages, + result.gaps_found, + result.duplicates_found + ); + } + + result + } + + async fn verify_partition( + &self, + client: &IggyClient, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: u32, + ) -> VerificationResult { + let mut result = VerificationResult::default(); + let mut seen_offsets = BTreeSet::new(); + let mut current_offset = 0u64; + let consumer = Consumer::new(Identifier::numeric(9999).expect("valid id")); + let batch_size = 1000u32; + + loop { + let strategy = PollingStrategy::offset(current_offset); + match client + .poll_messages( + stream_id, + topic_id, + Some(partition_id), + &consumer, + &strategy, + batch_size, + false, + ) + .await + { + Ok(polled) => { + if polled.messages.is_empty() { + break; + } + + for msg in &polled.messages { + let offset = msg.header.offset; + if !seen_offsets.insert(offset) { + result.duplicates_found += 1; + } + result.total_messages += 1; + } + + current_offset = polled.messages.last().expect("non-empty").header.offset + 1; + } + Err(e) => { + warn!( + "Verifier: poll partition {partition_id} at offset {current_offset} failed: {e}" + ); + break; + } + } + } + + // Check for gaps in the seen offsets + if let (Some(&min), Some(&max)) = (seen_offsets.first(), seen_offsets.last()) { + let expected_count = max - min + 1; + let actual_count = seen_offsets.len() as u64; + if actual_count < expected_count { + result.gaps_found = expected_count - actual_count; + } + } + + result + } +} diff --git a/core/bench/src/analytics/metrics/group.rs b/core/bench/src/analytics/metrics/group.rs index 9e0bd4cf6..e46b05a6e 100644 --- a/core/bench/src/analytics/metrics/group.rs +++ b/core/bench/src/analytics/metrics/group.rs @@ -154,6 +154,7 @@ fn determine_group_kind(stats: &[BenchmarkIndividualMetrics]) -> GroupMetricsKin ActorKind::Producer => GroupMetricsKind::Producers, ActorKind::Consumer => GroupMetricsKind::Consumers, ActorKind::ProducingConsumer => GroupMetricsKind::ProducingConsumers, + ActorKind::StressActor => GroupMetricsKind::StressActors, } } diff --git a/core/bench/src/args/common.rs b/core/bench/src/args/common.rs index be983c209..d60cf7faa 100644 --- a/core/bench/src/args/common.rs +++ b/core/bench/src/args/common.rs @@ -118,7 +118,10 @@ impl IggyBenchArgs { .exit(); } - if (self.message_batches, self.total_data) == (None, None) { + // Stress uses --duration instead of --message-batches/--total-data + let is_stress = matches!(self.benchmark_kind, BenchmarkKindCommand::Stress(_)); + + if !is_stress && (self.message_batches, self.total_data) == (None, None) { self.message_batches = Some(DEFAULT_MESSAGE_BATCHES); } @@ -163,11 +166,17 @@ impl IggyBenchArgs { } // Used only for generation of unique directory name + #[allow(clippy::option_if_let_else)] pub fn data_volume_identifier(&self) -> String { - self.total_data().map_or_else( - || self.message_batches().unwrap().to_string(), - |total_messages_size| format!("{}B", total_messages_size.as_bytes_u64()), - ) + if let Some(total_messages_size) = self.total_data() { + format!("{}B", total_messages_size.as_bytes_u64()) + } else if let Some(batches) = self.message_batches() { + batches.to_string() + } else if let BenchmarkKindCommand::Stress(args) = &self.benchmark_kind { + format!("{}s", args.duration().as_secs()) + } else { + "unknown".to_string() + } } pub fn streams(&self) -> u32 { @@ -318,6 +327,7 @@ impl IggyBenchArgs { BenchmarkKindCommand::EndToEndProducingConsumerGroup(_) => { "end_to_end_producing_consumer_group" } + BenchmarkKindCommand::Stress(_) => "stress", BenchmarkKindCommand::Examples => unreachable!(), }; @@ -336,16 +346,18 @@ impl IggyBenchArgs { BenchmarkKindCommand::PinnedConsumer(_) | BenchmarkKindCommand::BalancedConsumerGroup(_) => self.consumers(), BenchmarkKindCommand::PinnedProducerAndConsumer(_) - | BenchmarkKindCommand::BalancedProducerAndConsumerGroup(_) => { - self.producers() + self.consumers() - } + | BenchmarkKindCommand::BalancedProducerAndConsumerGroup(_) + | BenchmarkKindCommand::Stress(_) => self.producers() + self.consumers(), BenchmarkKindCommand::Examples => unreachable!(), }; - let data_volume_arg = match (self.total_data, self.message_batches) { - (Some(total), None) => format!("{total}"), - (None, Some(batches)) => format!("{batches}"), - _ => unreachable!(), + let data_volume_arg = match &self.benchmark_kind { + BenchmarkKindCommand::Stress(args) => format!("{}", args.duration()), + _ => match (self.total_data, self.message_batches) { + (Some(total), None) => format!("{total}"), + (None, Some(batches)) => format!("{batches}"), + _ => unreachable!(), + }, }; let mut parts = vec![ @@ -398,6 +410,14 @@ impl IggyBenchArgs { self.consumers() ) } + BenchmarkKindCommand::Stress(args) => { + format!( + "stress {} producers/{} consumers for {}", + self.producers(), + self.consumers(), + args.duration() + ) + } BenchmarkKindCommand::Examples => unreachable!(), }; diff --git a/core/bench/src/args/kind.rs b/core/bench/src/args/kind.rs index 7cbdfb7b8..91d8b7ab6 100644 --- a/core/bench/src/args/kind.rs +++ b/core/bench/src/args/kind.rs @@ -21,6 +21,7 @@ use super::kinds::balanced::producer::BalancedProducerArgs; use super::kinds::balanced::producer_and_consumer_group::BalancedProducerAndConsumerGroupArgs; use super::kinds::end_to_end::producing_consumer::EndToEndProducingConsumerArgs; use super::kinds::end_to_end::producing_consumer_group::EndToEndProducingConsumerGroupArgs; +use super::kinds::stress::args::StressArgs; use super::props::BenchmarkKindProps; use super::transport::BenchmarkTransportCommand; use crate::args::kinds::balanced::consumer_group::BalancedConsumerGroupArgs; @@ -93,6 +94,37 @@ pub enum BenchmarkKindCommand { )] EndToEndProducingConsumerGroup(EndToEndProducingConsumerGroupArgs), + #[command( + about = "Comprehensive stress test with heterogeneous actors", + long_about = "Duration-based stress test mixing data-plane (produce/consume), control-plane \ + (CRUD churn), admin (user/PAT), and health monitoring actors.\n\n\ + Phases: Baseline (15%) -> Chaos (65%) -> Drain (20% or max 5min)", + after_long_help = "\ +Recommended server environment variables: + + IGGY_SYSTEM_SEGMENT_SIZE=16MiB + IGGY_SYSTEM_TOPIC_MESSAGE_EXPIRY=30s + IGGY_SYSTEM_TOPIC_MAX_SIZE=200MiB + IGGY_DATA_MAINTENANCE_MESSAGES_CLEANER_ENABLED=true + IGGY_DATA_MAINTENANCE_MESSAGES_INTERVAL=5s + IGGY_MESSAGE_SAVER_INTERVAL=5s + +Example: + + IGGY_SYSTEM_SEGMENT_SIZE=16MiB \\ + IGGY_SYSTEM_TOPIC_MESSAGE_EXPIRY=30s \\ + IGGY_SYSTEM_TOPIC_MAX_SIZE=200MiB \\ + IGGY_DATA_MAINTENANCE_MESSAGES_CLEANER_ENABLED=true \\ + IGGY_DATA_MAINTENANCE_MESSAGES_INTERVAL=5s \\ + IGGY_MESSAGE_SAVER_INTERVAL=5s \\ + cargo run --bin iggy-server -- --fresh --with-default-root-credentials + + cargo run --bin iggy-bench -- stress -d 2m tcp", + visible_alias = "st", + verbatim_doc_comment + )] + Stress(StressArgs), + #[command(about = "Print examples", visible_alias = "e", verbatim_doc_comment)] Examples, } @@ -112,6 +144,7 @@ impl BenchmarkKindCommand { Self::EndToEndProducingConsumerGroup(_) => { BenchmarkKind::EndToEndProducingConsumerGroup } + Self::Stress(_) => BenchmarkKind::Stress, Self::Examples => { print_examples(); std::process::exit(0); @@ -163,6 +196,7 @@ impl BenchmarkKindProps for BenchmarkKindCommand { Self::BalancedProducerAndConsumerGroup(args) => args, Self::EndToEndProducingConsumer(args) => args, Self::EndToEndProducingConsumerGroup(args) => args, + Self::Stress(args) => args, Self::Examples => { print_examples(); std::process::exit(0); diff --git a/core/bench/src/args/kinds/mod.rs b/core/bench/src/args/kinds/mod.rs index 66e522a58..2238ffd5b 100644 --- a/core/bench/src/args/kinds/mod.rs +++ b/core/bench/src/args/kinds/mod.rs @@ -19,3 +19,4 @@ pub mod balanced; pub mod end_to_end; pub mod pinned; +pub mod stress; diff --git a/core/bench/src/args/kinds/stress/args.rs b/core/bench/src/args/kinds/stress/args.rs new file mode 100644 index 000000000..2b8c4a920 --- /dev/null +++ b/core/bench/src/args/kinds/stress/args.rs @@ -0,0 +1,160 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::args::{props::BenchmarkKindProps, transport::BenchmarkTransportCommand}; +use clap::{CommandFactory, Parser, ValueEnum, error::ErrorKind}; +use iggy::prelude::{IggyByteSize, IggyDuration, IggyExpiry}; +use std::num::NonZeroU32; +use std::str::FromStr; + +const DEFAULT_PRODUCERS: NonZeroU32 = nonzero_lit::u32!(4); +const DEFAULT_CONSUMERS: NonZeroU32 = nonzero_lit::u32!(4); +const DEFAULT_CHURN_CONCURRENCY: NonZeroU32 = nonzero_lit::u32!(1); +const DEFAULT_STREAMS: u32 = 2; +const DEFAULT_PARTITIONS: u32 = 4; +const DEFAULT_CONSUMER_GROUPS: u32 = 2; + +/// Determines the mix of API operations exercised during the stress test. +#[derive(Debug, Clone, Copy, ValueEnum, Default)] +pub enum ApiMix { + /// Data-plane + control-plane CRUD + admin operations + #[default] + Mixed, + /// Only `send_messages` and `poll_messages` + DataPlaneOnly, + /// Heavy CRUD churn with minimal data-plane + ControlPlaneHeavy, + /// All available APIs including admin operations + All, +} + +#[derive(Parser, Debug, Clone)] +pub struct StressArgs { + #[command(subcommand)] + pub transport: BenchmarkTransportCommand, + + /// Total test duration (e.g. "2m", "10m", "1h") + #[arg(long, short = 'd', value_parser = IggyDuration::from_str)] + pub duration: IggyDuration, + + /// Number of data-plane producer actors + #[arg(long, short = 'p', default_value_t = DEFAULT_PRODUCERS)] + pub producers: NonZeroU32, + + /// Number of data-plane consumer actors + #[arg(long, short = 'c', default_value_t = DEFAULT_CONSUMERS)] + pub consumers: NonZeroU32, + + /// Number of control-plane churner actors + #[arg(long, default_value_t = DEFAULT_CHURN_CONCURRENCY)] + pub churn_concurrency: NonZeroU32, + + /// Interval between CRUD churn operations (e.g. "3s", "10s") + #[arg(long, default_value = "3s", value_parser = IggyDuration::from_str)] + pub churn_interval: IggyDuration, + + /// Max topic size to bound disk usage. For maximum race density, also run + /// the server with `IGGY_SYSTEM_SEGMENT_SIZE="1MiB"`. + #[arg(long, default_value = "50MiB")] + pub max_topic_size: IggyByteSize, + + /// Message TTL for automatic cleanup + #[arg(long, default_value = "30s", value_parser = IggyExpiry::from_str)] + pub message_expiry: IggyExpiry, + + /// API operation mix + #[arg(long, value_enum, default_value_t = ApiMix::Mixed)] + pub api_mix: ApiMix, + + /// RNG seed for reproducible chaos operations + #[arg(long)] + pub chaos_seed: Option<u64>, +} + +impl BenchmarkKindProps for StressArgs { + fn streams(&self) -> u32 { + DEFAULT_STREAMS + } + + fn partitions(&self) -> u32 { + DEFAULT_PARTITIONS + } + + fn consumers(&self) -> u32 { + self.consumers.get() + } + + fn producers(&self) -> u32 { + self.producers.get() + } + + fn transport_command(&self) -> &BenchmarkTransportCommand { + &self.transport + } + + fn number_of_consumer_groups(&self) -> u32 { + DEFAULT_CONSUMER_GROUPS + } + + fn max_topic_size(&self) -> Option<IggyByteSize> { + Some(self.max_topic_size) + } + + fn message_expiry(&self) -> IggyExpiry { + self.message_expiry + } + + fn validate(&self) { + if self.duration.as_secs() < 10 { + crate::args::common::IggyBenchArgs::command() + .error( + ErrorKind::ValueValidation, + "Stress test duration must be at least 10 seconds", + ) + .exit(); + } + } +} + +impl StressArgs { + pub const fn duration(&self) -> IggyDuration { + self.duration + } + + pub const fn churn_concurrency(&self) -> NonZeroU32 { + self.churn_concurrency + } + + pub const fn churn_interval(&self) -> IggyDuration { + self.churn_interval + } + + pub const fn api_mix(&self) -> ApiMix { + self.api_mix + } + + #[allow(clippy::cast_possible_truncation)] + pub fn chaos_seed(&self) -> u64 { + self.chaos_seed.unwrap_or_else(|| { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("system clock before epoch") + .as_nanos() as u64 + }) + } +} diff --git a/core/bench/src/args/kinds/mod.rs b/core/bench/src/args/kinds/stress/mod.rs similarity index 93% copy from core/bench/src/args/kinds/mod.rs copy to core/bench/src/args/kinds/stress/mod.rs index 66e522a58..6de087720 100644 --- a/core/bench/src/args/kinds/mod.rs +++ b/core/bench/src/args/kinds/stress/mod.rs @@ -16,6 +16,4 @@ * under the License. */ -pub mod balanced; -pub mod end_to_end; -pub mod pinned; +pub mod args; diff --git a/core/bench/src/benchmarks/benchmark.rs b/core/bench/src/benchmarks/benchmark.rs index 594a00101..e35558cc6 100644 --- a/core/bench/src/benchmarks/benchmark.rs +++ b/core/bench/src/benchmarks/benchmark.rs @@ -36,6 +36,7 @@ use super::end_to_end_producing_consumer_group::EndToEndProducingConsumerGroupBe use super::pinned_consumer::PinnedConsumerBenchmark; use super::pinned_producer::PinnedProducerBenchmark; use super::pinned_producer_and_consumer::PinnedProducerAndConsumerBenchmark; +use super::stress::StressBenchmark; impl From<IggyBenchArgs> for Box<dyn Benchmarkable> { fn from(args: IggyBenchArgs) -> Self { @@ -74,6 +75,9 @@ impl From<IggyBenchArgs> for Box<dyn Benchmarkable> { BenchmarkKindCommand::EndToEndProducingConsumerGroup(_) => Box::new( EndToEndProducingConsumerGroupBenchmark::new(Arc::new(args), client_factory), ), + BenchmarkKindCommand::Stress(_) => { + Box::new(StressBenchmark::new(Arc::new(args), client_factory)) + } BenchmarkKindCommand::Examples => { unreachable!("Examples should be handled before this point") } diff --git a/core/bench/src/benchmarks/common.rs b/core/bench/src/benchmarks/common.rs index 7858f6fca..548a2f876 100644 --- a/core/bench/src/benchmarks/common.rs +++ b/core/bench/src/benchmarks/common.rs @@ -171,7 +171,8 @@ pub fn build_consumer_futures( let origin_timestamp_latency_calculation = match args.kind() { BenchmarkKind::PinnedConsumer | BenchmarkKind::BalancedConsumerGroup => false, BenchmarkKind::PinnedProducerAndConsumer - | BenchmarkKind::BalancedProducerAndConsumerGroup => true, + | BenchmarkKind::BalancedProducerAndConsumerGroup + | BenchmarkKind::Stress => true, _ => unreachable!(), }; @@ -209,8 +210,9 @@ pub fn build_consumer_futures( consumer_id }; let stream_id = format!("bench-stream-{stream_idx}"); + // Each stream has exactly one CG, server assigns IDs starting from 0 let consumer_group_id = if cg_count > 0 { - Some(CONSUMER_GROUP_BASE_ID + (consumer_id % cg_count)) + Some(CONSUMER_GROUP_BASE_ID) } else { None }; diff --git a/core/bench/src/benchmarks/mod.rs b/core/bench/src/benchmarks/mod.rs index 6e829b362..cd1b5eac0 100644 --- a/core/bench/src/benchmarks/mod.rs +++ b/core/bench/src/benchmarks/mod.rs @@ -27,6 +27,8 @@ pub mod end_to_end_producing_consumer_group; pub mod pinned_consumer; pub mod pinned_producer; pub mod pinned_producer_and_consumer; +pub mod stress; +pub mod stress_report; pub const CONSUMER_GROUP_BASE_ID: u32 = 0; pub const CONSUMER_GROUP_NAME_PREFIX: &str = "cg"; diff --git a/core/bench/src/benchmarks/stress.rs b/core/bench/src/benchmarks/stress.rs new file mode 100644 index 000000000..8fc1dc13c --- /dev/null +++ b/core/bench/src/benchmarks/stress.rs @@ -0,0 +1,281 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use super::benchmark::Benchmarkable; +use super::common::{build_consumer_futures, build_producer_futures, init_consumer_groups}; +use super::stress_report::StressReport; +use crate::actors::stress::admin_exerciser::AdminExerciser; +use crate::actors::stress::control_plane_churner::{ChurnerConfig, ControlPlaneChurner}; +use crate::actors::stress::health_monitor::HealthMonitor; +use crate::actors::stress::stress_context::StressContext; +use crate::actors::stress::verifier::StressVerifier; +use crate::args::common::IggyBenchArgs; +use crate::args::kind::BenchmarkKindCommand; +use crate::args::kinds::stress::args::ApiMix; +use crate::utils::ClientFactory; +use crate::utils::finish_condition::BenchmarkFinishCondition; +use async_trait::async_trait; +use bench_report::{benchmark_kind::BenchmarkKind, individual_metrics::BenchmarkIndividualMetrics}; +use iggy::prelude::*; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::task::JoinSet; +use tracing::{info, warn}; + +/// Phase durations as fractions of total test time. +const BASELINE_FRACTION: f64 = 0.15; +const CHAOS_FRACTION: f64 = 0.65; +/// Max drain phase duration. +const MAX_DRAIN_SECS: u64 = 300; + +pub struct StressBenchmark { + args: Arc<IggyBenchArgs>, + client_factory: Arc<dyn ClientFactory>, +} + +impl StressBenchmark { + pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn ClientFactory>) -> Self { + Self { + args, + client_factory, + } + } + + fn stress_args(&self) -> &crate::args::kinds::stress::args::StressArgs { + match &self.args.benchmark_kind { + BenchmarkKindCommand::Stress(args) => args, + _ => unreachable!("StressBenchmark only used with Stress variant"), + } + } + + fn compute_phase_durations(&self) -> (Duration, Duration, Duration) { + let total = self.stress_args().duration().get_duration(); + let total_secs = total.as_secs_f64(); + + let baseline = Duration::from_secs_f64(total_secs * BASELINE_FRACTION); + let chaos = Duration::from_secs_f64(total_secs * CHAOS_FRACTION); + #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)] + let drain_secs = (total_secs * (1.0 - BASELINE_FRACTION - CHAOS_FRACTION)) as u64; + let drain = Duration::from_secs(drain_secs.min(MAX_DRAIN_SECS)); + + (baseline, chaos, drain) + } + + /// Spawns chaos actors: control-plane churner, admin exerciser, health monitor. + fn spawn_chaos_actors( + &self, + tasks: &mut JoinSet<Result<BenchmarkIndividualMetrics, IggyError>>, + ctx: &Arc<StressContext>, + ) { + let stress_args = self.stress_args(); + let api_mix = stress_args.api_mix(); + let chaos_seed = stress_args.chaos_seed(); + + // Health monitor always runs + let monitor = HealthMonitor::new(self.client_factory.clone(), ctx.clone()); + tasks.spawn(async move { + monitor.run().await; + Ok(BenchmarkIndividualMetrics::placeholder("health_monitor")) + }); + + // Control-plane churner(s) unless data-plane-only + if !matches!(api_mix, ApiMix::DataPlaneOnly) { + let churn_concurrency = stress_args.churn_concurrency().get(); + let churn_interval = stress_args.churn_interval(); + + let churner_config = ChurnerConfig { + api_mix, + partitions: self.args.number_of_partitions(), + message_expiry: stress_args.message_expiry, + max_topic_size: MaxTopicSize::Custom(stress_args.max_topic_size), + }; + + for i in 0..churn_concurrency { + let churner = ControlPlaneChurner::new( + i + 1, + self.client_factory.clone(), + ctx.clone(), + churn_interval, + chaos_seed, + &churner_config, + ); + tasks.spawn(async move { + churner.run().await; + Ok(BenchmarkIndividualMetrics::placeholder( + "control_plane_churner", + )) + }); + } + } + + // Admin exerciser for mixed/all modes + if matches!(api_mix, ApiMix::Mixed | ApiMix::All) { + let admin = AdminExerciser::new(self.client_factory.clone(), ctx.clone()); + tasks.spawn(async move { + admin.run().await; + Ok(BenchmarkIndividualMetrics::placeholder("admin_exerciser")) + }); + } + } +} + +#[async_trait] +impl Benchmarkable for StressBenchmark { + async fn run( + &mut self, + ) -> Result<JoinSet<Result<BenchmarkIndividualMetrics, IggyError>>, IggyError> { + let overall_start = Instant::now(); + let (baseline_duration, chaos_duration, drain_max) = self.compute_phase_durations(); + + info!( + "Stress test starting: baseline={baseline_duration:?}, chaos={chaos_duration:?}, drain_max={drain_max:?}" + ); + + // Setup: create streams, topics, consumer groups + self.init_streams().await?; + init_consumer_groups(&self.client_factory, &self.args).await?; + + let ctx = Arc::new(StressContext::new()); + + let mut tasks: JoinSet<Result<BenchmarkIndividualMetrics, IggyError>> = JoinSet::new(); + + // === PHASE 1: Baseline (data-plane only) === + info!("=== Phase 1: Baseline ({baseline_duration:?}) ==="); + + // Create duration-based finish conditions for baseline + let baseline_finish = + BenchmarkFinishCondition::new_duration(IggyDuration::from(baseline_duration)); + + // Spawn producers and consumers with duration-based finish + let producer_futures = build_producer_futures(&self.client_factory, &self.args); + let consumer_futures = build_consumer_futures(&self.client_factory, &self.args); + + for fut in producer_futures { + tasks.spawn(fut); + } + for fut in consumer_futures { + tasks.spawn(fut); + } + + // Wait for baseline to complete (time-based) + tokio::time::sleep(baseline_duration).await; + let baseline_elapsed = overall_start.elapsed(); + info!("Baseline phase completed in {baseline_elapsed:?}"); + + // Cancel baseline actors by dropping the finish condition reference + // (they'll see is_done() == true on the shared condition). + // The existing actors use their own finish conditions from build_*_futures. + // For the stress benchmark, we overlap phases: baseline actors continue into chaos. + drop(baseline_finish); + + // === PHASE 2: Chaos (add churners + admin + health alongside ongoing data-plane) === + info!("=== Phase 2: Chaos ({chaos_duration:?}) ==="); + + self.spawn_chaos_actors(&mut tasks, &ctx); + + tokio::time::sleep(chaos_duration).await; + let chaos_elapsed = overall_start.elapsed(); + info!("Chaos phase completed in {chaos_elapsed:?}"); + + // === PHASE 3: Drain === + info!("=== Phase 3: Drain (max {drain_max:?}) ==="); + + // Signal all chaos actors to stop + ctx.cancel(); + + // Wait for remaining tasks with a timeout + let drain_start = Instant::now(); + let drain_deadline = drain_start + drain_max; + while !tasks.is_empty() { + let remaining = drain_deadline.saturating_duration_since(Instant::now()); + if remaining.is_zero() { + warn!("Drain phase timed out, aborting remaining tasks"); + tasks.abort_all(); + break; + } + + match tokio::time::timeout(remaining, tasks.join_next()).await { + Ok(Some(result)) => { + if let Err(e) = result { + warn!("Actor join failed: {e}"); + } + } + Ok(None) => break, + Err(_) => { + warn!("Drain phase timed out"); + tasks.abort_all(); + break; + } + } + } + let drain_elapsed = drain_start.elapsed(); + + // === Verification === + info!("Running post-test verification..."); + let verifier = StressVerifier::new( + self.client_factory.clone(), + self.args.streams(), + self.args.number_of_partitions(), + ); + let verification = verifier.verify().await; + + // Build and print the stress report + let report = StressReport::build( + &ctx.stats, + &verification, + overall_start.elapsed(), + baseline_duration, + chaos_duration, + drain_elapsed, + ); + report.print_summary(); + + if !verification.passed { + warn!("Stress test verification FAILED"); + } + + // Return an empty JoinSet since we already joined everything + Ok(JoinSet::new()) + } + + fn kind(&self) -> BenchmarkKind { + BenchmarkKind::Stress + } + + fn args(&self) -> &IggyBenchArgs { + &self.args + } + + fn client_factory(&self) -> &Arc<dyn ClientFactory> { + &self.client_factory + } + + fn print_info(&self) { + let stress_args = self.stress_args(); + info!( + "Starting Stress benchmark: duration={}, producers={}, consumers={}, churn_concurrency={}, churn_interval={}, api_mix={:?}, chaos_seed={}", + stress_args.duration(), + stress_args.producers.get(), + stress_args.consumers.get(), + stress_args.churn_concurrency().get(), + stress_args.churn_interval(), + stress_args.api_mix(), + stress_args.chaos_seed(), + ); + } +} diff --git a/core/bench/src/benchmarks/stress_report.rs b/core/bench/src/benchmarks/stress_report.rs new file mode 100644 index 000000000..29c8fdc25 --- /dev/null +++ b/core/bench/src/benchmarks/stress_report.rs @@ -0,0 +1,206 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::actors::stress::{stress_context::StressStats, verifier::VerificationResult}; +use serde::Serialize; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::Duration; + +/// Structured stress test report, serializable to JSON for `--output-dir`. +#[derive(Debug, Serialize)] +pub struct StressReport { + pub total_duration_secs: f64, + pub baseline_duration_secs: f64, + pub chaos_duration_secs: f64, + pub drain_duration_secs: f64, + pub api_calls: ApiCallSummary, + pub error_tiers: ErrorTierSummary, + pub verification: VerificationSummary, +} + +#[derive(Debug, Serialize)] +pub struct ApiCallSummary { + pub send_messages: CallCount, + pub poll_messages: CallCount, + pub create_topic: CallCount, + pub delete_topic: CallCount, + pub create_partitions: CallCount, + pub delete_partitions: CallCount, + pub create_consumer_group: CallCount, + pub delete_consumer_group: CallCount, + pub join_consumer_group: CallCount, + pub leave_consumer_group: CallCount, + pub purge_topic: CallCount, + pub delete_segments: CallCount, + pub update_topic: CallCount, + pub purge_stream: CallCount, + pub stress_poll: CallCount, + pub create_user: CallCount, + pub delete_user: CallCount, + pub create_pat: CallCount, + pub delete_pat: CallCount, + pub store_offset: CallCount, + pub get_offset: CallCount, + pub ping: CallCount, + pub get_stats: CallCount, + pub get_me: CallCount, + pub get_clients: CallCount, + pub flush: CallCount, + pub total_ok: u64, + pub total_err: u64, +} + +#[derive(Debug, Serialize)] +pub struct CallCount { + pub ok: u64, + pub err: u64, +} + +impl CallCount { + fn load(ok: &AtomicU64, err: &AtomicU64) -> Self { + Self { + ok: ok.load(Ordering::Relaxed), + err: err.load(Ordering::Relaxed), + } + } +} + +#[derive(Debug, Serialize)] +pub struct ErrorTierSummary { + pub expected: u64, + pub unexpected: u64, +} + +#[derive(Debug, Serialize)] +pub struct VerificationSummary { + pub partitions_checked: u32, + pub total_messages: u64, + pub gaps_found: u64, + pub duplicates_found: u64, + pub passed: bool, +} + +impl StressReport { + pub fn build( + stats: &StressStats, + verification: &VerificationResult, + total_duration: Duration, + baseline_duration: Duration, + chaos_duration: Duration, + drain_duration: Duration, + ) -> Self { + let api_calls = ApiCallSummary::from_stats(stats); + Self { + total_duration_secs: total_duration.as_secs_f64(), + baseline_duration_secs: baseline_duration.as_secs_f64(), + chaos_duration_secs: chaos_duration.as_secs_f64(), + drain_duration_secs: drain_duration.as_secs_f64(), + error_tiers: ErrorTierSummary { + expected: stats.expected_errors.load(Ordering::Relaxed), + unexpected: stats.unexpected_errors.load(Ordering::Relaxed), + }, + api_calls, + verification: VerificationSummary { + partitions_checked: verification.partitions_checked, + total_messages: verification.total_messages, + gaps_found: verification.gaps_found, + duplicates_found: verification.duplicates_found, + passed: verification.passed, + }, + } + } + + pub fn print_summary(&self) { + println!("\n=== STRESS TEST REPORT ==="); + println!( + "Duration: {:.1}s (baseline: {:.1}s, chaos: {:.1}s, drain: {:.1}s)", + self.total_duration_secs, + self.baseline_duration_secs, + self.chaos_duration_secs, + self.drain_duration_secs + ); + println!( + "API calls: {} ok, {} err", + self.api_calls.total_ok, self.api_calls.total_err + ); + println!( + "Errors: {} expected, {} unexpected", + self.error_tiers.expected, self.error_tiers.unexpected + ); + println!( + "Verification: {} partitions, {} messages, {} gaps, {} duplicates -> {}", + self.verification.partitions_checked, + self.verification.total_messages, + self.verification.gaps_found, + self.verification.duplicates_found, + if self.verification.passed { + "PASSED" + } else { + "FAILED" + } + ); + println!("==========================\n"); + } +} + +impl ApiCallSummary { + fn from_stats(s: &StressStats) -> Self { + Self { + send_messages: CallCount::load(&s.send_messages_ok, &s.send_messages_err), + poll_messages: CallCount::load(&s.poll_messages_ok, &s.poll_messages_err), + create_topic: CallCount::load(&s.create_topic_ok, &s.create_topic_err), + delete_topic: CallCount::load(&s.delete_topic_ok, &s.delete_topic_err), + create_partitions: CallCount::load(&s.create_partitions_ok, &s.create_partitions_err), + delete_partitions: CallCount::load(&s.delete_partitions_ok, &s.delete_partitions_err), + create_consumer_group: CallCount::load( + &s.create_consumer_group_ok, + &s.create_consumer_group_err, + ), + delete_consumer_group: CallCount::load( + &s.delete_consumer_group_ok, + &s.delete_consumer_group_err, + ), + join_consumer_group: CallCount::load( + &s.join_consumer_group_ok, + &s.join_consumer_group_err, + ), + leave_consumer_group: CallCount::load( + &s.leave_consumer_group_ok, + &s.leave_consumer_group_err, + ), + purge_topic: CallCount::load(&s.purge_topic_ok, &s.purge_topic_err), + delete_segments: CallCount::load(&s.delete_segments_ok, &s.delete_segments_err), + update_topic: CallCount::load(&s.update_topic_ok, &s.update_topic_err), + purge_stream: CallCount::load(&s.purge_stream_ok, &s.purge_stream_err), + stress_poll: CallCount::load(&s.stress_poll_ok, &s.stress_poll_err), + create_user: CallCount::load(&s.create_user_ok, &s.create_user_err), + delete_user: CallCount::load(&s.delete_user_ok, &s.delete_user_err), + create_pat: CallCount::load(&s.create_pat_ok, &s.create_pat_err), + delete_pat: CallCount::load(&s.delete_pat_ok, &s.delete_pat_err), + store_offset: CallCount::load(&s.store_offset_ok, &s.store_offset_err), + get_offset: CallCount::load(&s.get_offset_ok, &s.get_offset_err), + ping: CallCount::load(&s.ping_ok, &s.ping_err), + get_stats: CallCount::load(&s.get_stats_ok, &s.get_stats_err), + get_me: CallCount::load(&s.get_me_ok, &s.get_me_err), + get_clients: CallCount::load(&s.get_clients_ok, &s.get_clients_err), + flush: CallCount::load(&s.flush_ok, &s.flush_err), + total_ok: s.total_ok(), + total_err: s.total_err(), + } + } +} diff --git a/core/bench/src/utils/finish_condition.rs b/core/bench/src/utils/finish_condition.rs index 01bd174ef..858ca569f 100644 --- a/core/bench/src/utils/finish_condition.rs +++ b/core/bench/src/utils/finish_condition.rs @@ -17,12 +17,14 @@ use crate::args::{common::IggyBenchArgs, kind::BenchmarkKindCommand}; use human_repr::HumanCount; +use iggy::prelude::IggyDuration; use std::{ fmt::Display, sync::{ Arc, atomic::{AtomicI64, Ordering}, }, + time::Instant, }; const MINIMUM_MSG_PAYLOAD_SIZE: usize = 20; @@ -48,8 +50,9 @@ pub enum BenchmarkFinishConditionMode { #[derive(Debug, Clone, Copy, PartialEq)] enum BenchmarkFinishConditionType { - ByTotalData, - ByMessageBatchesCount, + TotalData, + MessageBatchesCount, + Duration, } pub struct BenchmarkFinishCondition { @@ -57,6 +60,7 @@ pub struct BenchmarkFinishCondition { total: u64, left_total: Arc<AtomicI64>, mode: BenchmarkFinishConditionMode, + start_time: Option<Instant>, } impl BenchmarkFinishCondition { @@ -94,64 +98,99 @@ impl BenchmarkFinishCondition { } BenchmarkKindCommand::EndToEndProducingConsumer(_) | BenchmarkKindCommand::EndToEndProducingConsumerGroup(_) => args.producers() * 2, + BenchmarkKindCommand::Stress(_) => args.producers() + args.consumers(), BenchmarkKindCommand::Examples => unreachable!(), }; - Arc::new(match (total_data, batches_count) { + match (total_data, batches_count) { (None, Some(count)) => { let count_per_actor = (count.get() * total_data_multiplier) / total_data_factor; - Self { - kind: BenchmarkFinishConditionType::ByMessageBatchesCount, + Arc::new(Self { + kind: BenchmarkFinishConditionType::MessageBatchesCount, total: u64::from(count_per_actor), left_total: Arc::new(AtomicI64::new(i64::from(count_per_actor))), mode, - } + start_time: None, + }) } (Some(size), None) => { let bytes_per_actor = size.as_bytes_u64() / u64::from(total_data_factor); - Self { - kind: BenchmarkFinishConditionType::ByTotalData, + Arc::new(Self { + kind: BenchmarkFinishConditionType::TotalData, total: bytes_per_actor, left_total: Arc::new(AtomicI64::new( i64::try_from(bytes_per_actor).unwrap_or(i64::MAX), )), mode, + start_time: None, + }) + } + (None, None) => { + // Stress benchmark uses --duration; extract it from args + if let BenchmarkKindCommand::Stress(stress_args) = &args.benchmark_kind { + Self::new_duration(stress_args.duration()) + } else { + panic!("Either --total-messages-size or --message-batches must be provided") } } - _ => unreachable!(), - }) + (Some(_), Some(_)) => { + panic!("Cannot specify both --total-messages-size and --message-batches") + } + } } /// Creates an "empty" benchmark finish condition that is already satisfied. /// This is useful for consumer-only actors that don't need to produce any messages. pub fn new_empty() -> Arc<Self> { Arc::new(Self { - kind: BenchmarkFinishConditionType::ByMessageBatchesCount, + kind: BenchmarkFinishConditionType::MessageBatchesCount, total: 0, left_total: Arc::new(AtomicI64::new(0)), mode: BenchmarkFinishConditionMode::Shared, + start_time: None, + }) + } + + /// Creates a duration-based finish condition that completes after the given time elapses. + pub fn new_duration(duration: IggyDuration) -> Arc<Self> { + Arc::new(Self { + kind: BenchmarkFinishConditionType::Duration, + total: duration.as_micros(), + left_total: Arc::new(AtomicI64::new(i64::MAX)), + mode: BenchmarkFinishConditionMode::Shared, + start_time: Some(Instant::now()), }) } pub fn account_and_check(&self, size_to_subtract: u64) -> bool { match self.kind { - BenchmarkFinishConditionType::ByTotalData => { + BenchmarkFinishConditionType::TotalData => { self.left_total.fetch_sub( i64::try_from(size_to_subtract).unwrap_or(i64::MAX), Ordering::AcqRel, ); + self.left_total.load(Ordering::Acquire) <= 0 } - BenchmarkFinishConditionType::ByMessageBatchesCount => { + BenchmarkFinishConditionType::MessageBatchesCount => { self.left_total.fetch_sub(1, Ordering::AcqRel); + self.left_total.load(Ordering::Acquire) <= 0 } + BenchmarkFinishConditionType::Duration => self.is_elapsed(), } - self.left_total.load(Ordering::Acquire) <= 0 } pub fn is_done(&self) -> bool { - self.left() <= 0 + match self.kind { + BenchmarkFinishConditionType::Duration => self.is_elapsed(), + _ => self.left() <= 0, + } + } + + fn is_elapsed(&self) -> bool { + self.start_time + .is_some_and(|start| start.elapsed() >= std::time::Duration::from_micros(self.total)) } pub const fn total(&self) -> u64 { @@ -160,17 +199,20 @@ impl BenchmarkFinishCondition { pub fn total_str(&self) -> String { match self.kind { - BenchmarkFinishConditionType::ByTotalData => { + BenchmarkFinishConditionType::TotalData => { format!( "messages of size: {} ({})", self.total.human_count_bytes(), self.mode ) } - - BenchmarkFinishConditionType::ByMessageBatchesCount => { + BenchmarkFinishConditionType::MessageBatchesCount => { format!("{} batches ({})", self.total.human_count_bare(), self.mode) } + BenchmarkFinishConditionType::Duration => { + let secs = self.total / 1_000_000; + format!("duration: {secs}s ({mode})", mode = self.mode) + } } } @@ -182,7 +224,7 @@ impl BenchmarkFinishCondition { let done = i64::try_from(self.total()).unwrap_or(i64::MAX) - self.left(); let total = i64::try_from(self.total()).unwrap_or(i64::MAX); match self.kind { - BenchmarkFinishConditionType::ByTotalData => { + BenchmarkFinishConditionType::TotalData => { format!( "{}/{} ({})", done.human_count_bytes(), @@ -190,7 +232,7 @@ impl BenchmarkFinishCondition { self.mode ) } - BenchmarkFinishConditionType::ByMessageBatchesCount => { + BenchmarkFinishConditionType::MessageBatchesCount => { format!( "{}/{} ({})", done.human_count_bare(), @@ -198,15 +240,26 @@ impl BenchmarkFinishCondition { self.mode ) } + BenchmarkFinishConditionType::Duration => { + let elapsed_secs = self.start_time.map_or(0, |s| s.elapsed().as_secs()); + let total_secs = self.total / 1_000_000; + format!("{elapsed_secs}s/{total_secs}s ({mode})", mode = self.mode) + } } } pub fn max_capacity(&self) -> usize { - let value = self.left_total.load(Ordering::Relaxed); - if self.kind == BenchmarkFinishConditionType::ByTotalData { - usize::try_from(value).unwrap_or(0) / MINIMUM_MSG_PAYLOAD_SIZE - } else { - usize::try_from(value).unwrap_or(0) + match self.kind { + BenchmarkFinishConditionType::TotalData => { + let value = self.left_total.load(Ordering::Relaxed); + usize::try_from(value).unwrap_or(0) / MINIMUM_MSG_PAYLOAD_SIZE + } + BenchmarkFinishConditionType::MessageBatchesCount => { + let value = self.left_total.load(Ordering::Relaxed); + usize::try_from(value).unwrap_or(0) + } + // Duration-based conditions use a reasonable default buffer size + BenchmarkFinishConditionType::Duration => 10_000, } } } diff --git a/core/bench/src/utils/mod.rs b/core/bench/src/utils/mod.rs index e8ee8ebd3..1b76f03a5 100644 --- a/core/bench/src/utils/mod.rs +++ b/core/bench/src/utils/mod.rs @@ -256,6 +256,7 @@ fn add_benchmark_kind_arguments(parts: &mut Vec<String>, args: &IggyBenchArgs) { BenchmarkKind::BalancedProducerAndConsumerGroup => "balanced-producer-and-consumer-group", BenchmarkKind::EndToEndProducingConsumer => "end-to-end-producing-consumer", BenchmarkKind::EndToEndProducingConsumerGroup => "end-to-end-producing-consumer-group", + BenchmarkKind::Stress => "stress", }; parts.push(kind_str.to_string()); @@ -281,7 +282,8 @@ fn add_actor_arguments(parts: &mut Vec<String>, args: &IggyBenchArgs) { } } BenchmarkKind::PinnedProducerAndConsumer - | BenchmarkKind::BalancedProducerAndConsumerGroup => { + | BenchmarkKind::BalancedProducerAndConsumerGroup + | BenchmarkKind::Stress => { if producers != DEFAULT_NUMBER_OF_PRODUCERS.get() { parts.push(format!("--producers {producers}")); }
