This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch bench-stress
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit c333352a97530582a3db652db67141fbd85e04a2
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Wed Feb 11 18:37:22 2026 +0100

    feat(bench): add stress test with chaos churning
    
    The bench suite lacked a way to surface concurrency bugs
    in the server's CRUD and data-plane paths under
    simultaneous load.
    
    Adds `iggy-bench stress` — a phased benchmark
    (baseline → chaos → drain → verify) that:
    
    - Runs producers/consumers alongside control-plane
      churners that create/delete topics, partitions,
      consumer groups, and segments concurrently
    - Classifies errors into expected (races) vs unexpected
      via a three-tier error classifier
    - Verifies message ordering post-chaos with a gap and
      duplicate detector
    - Supports ApiMix modes (Mixed, DataPlaneOnly,
      ControlPlaneHeavy, All) to gate destructive ops like
      delete-and-recreate-topic and stream purge
    - Exercises untested polling strategies (First, Last,
      Timestamp) that race against segment mutations
    - Tightens defaults (50MiB max topic, 3s churn interval)
      to increase race window density
---
 Cargo.lock                                         |   1 +
 core/bench/Cargo.toml                              |   1 +
 .../src/components/selectors/benchmark_selector.rs |   1 +
 core/bench/report/src/lib.rs                       |   2 +
 core/bench/report/src/prints.rs                    |   1 +
 core/bench/report/src/types/actor_kind.rs          |   4 +
 core/bench/report/src/types/benchmark_kind.rs      |   3 +
 core/bench/report/src/types/group_metrics_kind.rs  |   4 +
 core/bench/report/src/types/individual_metrics.rs  |  43 ++
 core/bench/report/src/types/params.rs              |   6 +
 core/bench/src/actors/mod.rs                       |   1 +
 .../bench/src/actors/producer/client/high_level.rs |  11 +-
 core/bench/src/actors/producer/client/low_level.rs |   8 +
 core/bench/src/actors/stress/admin_exerciser.rs    | 206 +++++++
 .../src/actors/stress/control_plane_churner.rs     | 653 +++++++++++++++++++++
 core/bench/src/actors/stress/error_classifier.rs   |  72 +++
 core/bench/src/actors/stress/health_monitor.rs     | 143 +++++
 .../bench/src/{args/kinds => actors/stress}/mod.rs |   9 +-
 core/bench/src/actors/stress/stress_context.rs     | 177 ++++++
 core/bench/src/actors/stress/verifier.rs           | 198 +++++++
 core/bench/src/analytics/metrics/group.rs          |   1 +
 core/bench/src/args/common.rs                      |  44 +-
 core/bench/src/args/kind.rs                        |  34 ++
 core/bench/src/args/kinds/mod.rs                   |   1 +
 core/bench/src/args/kinds/stress/args.rs           | 160 +++++
 core/bench/src/args/kinds/{ => stress}/mod.rs      |   4 +-
 core/bench/src/benchmarks/benchmark.rs             |   4 +
 core/bench/src/benchmarks/common.rs                |   6 +-
 core/bench/src/benchmarks/mod.rs                   |   2 +
 core/bench/src/benchmarks/stress.rs                | 281 +++++++++
 core/bench/src/benchmarks/stress_report.rs         | 215 +++++++
 core/bench/src/utils/finish_condition.rs           | 103 +++-
 core/bench/src/utils/mod.rs                        |   4 +-
 33 files changed, 2356 insertions(+), 47 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 4dbeac128..ab7370199 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4464,6 +4464,7 @@ dependencies = [
  "hostname",
  "human-repr",
  "iggy",
+ "iggy_common",
  "nonzero_lit",
  "rand 0.9.2",
  "rayon",
diff --git a/core/bench/Cargo.toml b/core/bench/Cargo.toml
index 294a47dd7..9f020e73b 100644
--- a/core/bench/Cargo.toml
+++ b/core/bench/Cargo.toml
@@ -43,6 +43,7 @@ governor = { workspace = true }
 hostname = { workspace = true }
 human-repr = { workspace = true }
 iggy = { workspace = true }
+iggy_common = { workspace = true }
 nonzero_lit = { workspace = true }
 rand = { workspace = true }
 rayon = { workspace = true }
diff --git 
a/core/bench/dashboard/frontend/src/components/selectors/benchmark_selector.rs 
b/core/bench/dashboard/frontend/src/components/selectors/benchmark_selector.rs
index d93daedf4..e6b3c6afb 100644
--- 
a/core/bench/dashboard/frontend/src/components/selectors/benchmark_selector.rs
+++ 
b/core/bench/dashboard/frontend/src/components/selectors/benchmark_selector.rs
@@ -66,6 +66,7 @@ pub fn benchmark_selector(props: &BenchmarkSelectorProps) -> 
Html {
                         | BenchmarkKind::EndToEndProducingConsumerGroup
                 )
             }
+            BenchmarkKind::Stress => matches!(k, BenchmarkKind::Stress),
         })
         .cloned()
         .collect();
diff --git a/core/bench/report/src/lib.rs b/core/bench/report/src/lib.rs
index 8a180752f..b01960d87 100644
--- a/core/bench/report/src/lib.rs
+++ b/core/bench/report/src/lib.rs
@@ -48,6 +48,7 @@ pub fn create_throughput_chart(
             ActorKind::Producer => "Producer",
             ActorKind::Consumer => "Consumer",
             ActorKind::ProducingConsumer => "Producing Consumer",
+            ActorKind::StressActor => "Stress Actor",
         };
 
         chart = chart.add_dual_time_line_series(
@@ -113,6 +114,7 @@ pub fn create_latency_chart(
             ActorKind::Producer => "Producer",
             ActorKind::Consumer => "Consumer",
             ActorKind::ProducingConsumer => "Producing Consumer",
+            ActorKind::StressActor => "Stress Actor",
         };
 
         chart = chart.add_time_series(
diff --git a/core/bench/report/src/prints.rs b/core/bench/report/src/prints.rs
index 045560ec3..8cc32ab9c 100644
--- a/core/bench/report/src/prints.rs
+++ b/core/bench/report/src/prints.rs
@@ -142,6 +142,7 @@ impl BenchmarkGroupMetrics {
             GroupMetricsKind::Consumers => ("Consumers Results", Color::Green),
             GroupMetricsKind::ProducersAndConsumers => ("Aggregate Results", 
Color::Red),
             GroupMetricsKind::ProducingConsumers => ("Producing Consumer 
Results", Color::Red),
+            GroupMetricsKind::StressActors => ("Stress Actor Results", 
Color::Yellow),
         };
 
         let actor = self.summary.kind.actor();
diff --git a/core/bench/report/src/types/actor_kind.rs 
b/core/bench/report/src/types/actor_kind.rs
index 7376d3ac4..13b20d81e 100644
--- a/core/bench/report/src/types/actor_kind.rs
+++ b/core/bench/report/src/types/actor_kind.rs
@@ -30,6 +30,9 @@ pub enum ActorKind {
     #[display("Producing Consumer")]
     #[serde(rename = "producing_consumer")]
     ProducingConsumer,
+    #[display("Stress Actor")]
+    #[serde(rename = "stress_actor")]
+    StressActor,
 }
 
 impl ActorKind {
@@ -38,6 +41,7 @@ impl ActorKind {
             ActorKind::Producer => "Producers",
             ActorKind::Consumer => "Consumers",
             ActorKind::ProducingConsumer => "Producing Consumers",
+            ActorKind::StressActor => "Stress Actors",
         }
     }
 }
diff --git a/core/bench/report/src/types/benchmark_kind.rs 
b/core/bench/report/src/types/benchmark_kind.rs
index 43393e532..8747be59a 100644
--- a/core/bench/report/src/types/benchmark_kind.rs
+++ b/core/bench/report/src/types/benchmark_kind.rs
@@ -59,4 +59,7 @@ pub enum BenchmarkKind {
     #[display("End To End Producing Consumer Group")]
     #[serde(rename = "end_to_end_producing_consumer_group")]
     EndToEndProducingConsumerGroup,
+    #[display("Stress")]
+    #[serde(rename = "stress")]
+    Stress,
 }
diff --git a/core/bench/report/src/types/group_metrics_kind.rs 
b/core/bench/report/src/types/group_metrics_kind.rs
index 928f28489..8334e66bf 100644
--- a/core/bench/report/src/types/group_metrics_kind.rs
+++ b/core/bench/report/src/types/group_metrics_kind.rs
@@ -34,6 +34,9 @@ pub enum GroupMetricsKind {
     #[display("Producing Consumers")]
     #[serde(rename = "producing_consumers")]
     ProducingConsumers,
+    #[display("Stress Actors")]
+    #[serde(rename = "stress_actors")]
+    StressActors,
 }
 
 impl GroupMetricsKind {
@@ -43,6 +46,7 @@ impl GroupMetricsKind {
             GroupMetricsKind::Consumers => "Consumer",
             GroupMetricsKind::ProducersAndConsumers => "Actor",
             GroupMetricsKind::ProducingConsumers => "Producing Consumer",
+            GroupMetricsKind::StressActors => "Stress Actor",
         }
     }
 }
diff --git a/core/bench/report/src/types/individual_metrics.rs 
b/core/bench/report/src/types/individual_metrics.rs
index 2ba13c3c8..24d0d9e2d 100644
--- a/core/bench/report/src/types/individual_metrics.rs
+++ b/core/bench/report/src/types/individual_metrics.rs
@@ -17,6 +17,7 @@
  */
 
 use super::{
+    actor_kind::ActorKind, benchmark_kind::BenchmarkKind,
     individual_metrics_summary::BenchmarkIndividualMetricsSummary, 
time_series::TimeSeries,
 };
 use crate::utils::{max, min, std_dev};
@@ -32,6 +33,48 @@ pub struct BenchmarkIndividualMetrics {
     pub latency_ts: TimeSeries,
 }
 
+impl BenchmarkIndividualMetrics {
+    /// Creates a zero-valued placeholder for stress actors that don't produce
+    /// standard throughput/latency metrics (churners, monitors, etc.).
+    pub fn placeholder(actor_name: &str) -> Self {
+        Self {
+            summary: BenchmarkIndividualMetricsSummary {
+                benchmark_kind: BenchmarkKind::Stress,
+                actor_kind: ActorKind::StressActor,
+                actor_id: u32::from_ne_bytes(
+                    actor_name
+                        .as_bytes()
+                        .get(..4)
+                        .unwrap_or(&[0; 4])
+                        .try_into()
+                        .unwrap_or([0; 4]),
+                ),
+                total_time_secs: 0.0,
+                total_user_data_bytes: 0,
+                total_bytes: 0,
+                total_messages: 0,
+                total_message_batches: 0,
+                throughput_megabytes_per_second: 0.0,
+                throughput_messages_per_second: 0.0,
+                p50_latency_ms: 0.0,
+                p90_latency_ms: 0.0,
+                p95_latency_ms: 0.0,
+                p99_latency_ms: 0.0,
+                p999_latency_ms: 0.0,
+                p9999_latency_ms: 0.0,
+                avg_latency_ms: 0.0,
+                median_latency_ms: 0.0,
+                min_latency_ms: 0.0,
+                max_latency_ms: 0.0,
+                std_dev_latency_ms: 0.0,
+            },
+            throughput_mb_ts: TimeSeries::default(),
+            throughput_msg_ts: TimeSeries::default(),
+            latency_ts: TimeSeries::default(),
+        }
+    }
+}
+
 // Custom deserializer implementation
 impl<'de> Deserialize<'de> for BenchmarkIndividualMetrics {
     fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
diff --git a/core/bench/report/src/types/params.rs 
b/core/bench/report/src/types/params.rs
index 7e90942ba..13e7b9a4a 100644
--- a/core/bench/report/src/types/params.rs
+++ b/core/bench/report/src/types/params.rs
@@ -73,6 +73,12 @@ impl BenchmarkParams {
                     self.producers, self.consumer_groups
                 )
             }
+            BenchmarkKind::Stress => {
+                format!(
+                    "{} Producers/{} Consumers (Stress)",
+                    self.producers, self.consumers
+                )
+            }
         }
     }
 }
diff --git a/core/bench/src/actors/mod.rs b/core/bench/src/actors/mod.rs
index d992ad5ac..6770941ec 100644
--- a/core/bench/src/actors/mod.rs
+++ b/core/bench/src/actors/mod.rs
@@ -23,6 +23,7 @@ use iggy::prelude::IggyError;
 pub mod consumer;
 pub mod producer;
 pub mod producing_consumer;
+pub mod stress;
 
 #[derive(Debug, Clone)]
 pub struct BatchMetrics {
diff --git a/core/bench/src/actors/producer/client/high_level.rs 
b/core/bench/src/actors/producer/client/high_level.rs
index 7fdf923b1..f8c2d1547 100644
--- a/core/bench/src/actors/producer/client/high_level.rs
+++ b/core/bench/src/actors/producer/client/high_level.rs
@@ -35,6 +35,7 @@ pub struct HighLevelProducerClient {
     client_factory: Arc<dyn ClientFactory>,
     config: BenchmarkProducerConfig,
     producer: Option<IggyProducer>,
+    next_sequence: u64,
 }
 
 impl HighLevelProducerClient {
@@ -43,6 +44,7 @@ impl HighLevelProducerClient {
             client_factory,
             config,
             producer: None,
+            next_sequence: 0,
         }
     }
 }
@@ -52,10 +54,17 @@ impl ProducerClient for HighLevelProducerClient {
         &mut self,
         batch_generator: &mut BenchmarkBatchGenerator,
     ) -> Result<Option<BatchMetrics>, IggyError> {
-        let batch = batch_generator.generate_owned_batch();
+        let mut batch = batch_generator.generate_owned_batch();
         if batch.messages.is_empty() {
             return Ok(None);
         }
+
+        for msg in &mut batch.messages {
+            msg.header.id =
+                (u128::from(self.config.producer_id) << 64) | 
u128::from(self.next_sequence);
+            self.next_sequence += 1;
+        }
+
         let message_count = u32::try_from(batch.messages.len()).unwrap();
         let user_data_bytes = batch.user_data_bytes;
         let total_bytes = batch.total_bytes;
diff --git a/core/bench/src/actors/producer/client/low_level.rs 
b/core/bench/src/actors/producer/client/low_level.rs
index b132f9a97..77fbad4b7 100644
--- a/core/bench/src/actors/producer/client/low_level.rs
+++ b/core/bench/src/actors/producer/client/low_level.rs
@@ -39,6 +39,7 @@ pub struct LowLevelProducerClient {
     stream_id: Identifier,
     topic_id: Identifier,
     partitioning: Partitioning,
+    next_sequence: u64,
 }
 
 impl LowLevelProducerClient {
@@ -50,6 +51,7 @@ impl LowLevelProducerClient {
             stream_id: Identifier::default(),
             topic_id: Identifier::default(),
             partitioning: Partitioning::partition_id(0),
+            next_sequence: 0,
         }
     }
 }
@@ -65,6 +67,12 @@ impl ProducerClient for LowLevelProducerClient {
             return Ok(None);
         }
 
+        for msg in &mut batch.messages {
+            msg.header.id =
+                (u128::from(self.config.producer_id) << 64) | 
u128::from(self.next_sequence);
+            self.next_sequence += 1;
+        }
+
         let before_send = Instant::now();
         client
             .send_messages(
diff --git a/core/bench/src/actors/stress/admin_exerciser.rs 
b/core/bench/src/actors/stress/admin_exerciser.rs
new file mode 100644
index 000000000..36faa0758
--- /dev/null
+++ b/core/bench/src/actors/stress/admin_exerciser.rs
@@ -0,0 +1,206 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::error_classifier;
+use super::stress_context::StressContext;
+use crate::utils::{ClientFactory, login_root};
+use iggy::clients::client::IggyClient;
+use iggy::prelude::*;
+use std::sync::Arc;
+use std::sync::atomic::Ordering;
+use tracing::{debug, warn};
+
+const ADMIN_CYCLE_INTERVAL_SECS: u64 = 15;
+
+/// Exercises user management and PAT lifecycle APIs at lower frequency.
+///
+/// Each cycle: create user -> create PAT -> delete PAT -> delete user.
+/// Also exercises consumer offset store/get and `flush_unsaved_buffer`.
+pub struct AdminExerciser {
+    client_factory: Arc<dyn ClientFactory>,
+    ctx: Arc<StressContext>,
+}
+
+impl AdminExerciser {
+    pub fn new(client_factory: Arc<dyn ClientFactory>, ctx: 
Arc<StressContext>) -> Self {
+        Self {
+            client_factory,
+            ctx,
+        }
+    }
+
+    pub async fn run(self) {
+        let client = self.client_factory.create_client().await;
+        let client = IggyClient::create(client, None, None);
+        login_root(&client).await;
+
+        let mut cycle = 0u64;
+        while !self.ctx.is_cancelled() {
+            self.user_pat_lifecycle(&client, cycle).await;
+            self.offset_lifecycle(&client, cycle).await;
+            self.flush_buffers(&client).await;
+
+            cycle += 1;
+            
tokio::time::sleep(std::time::Duration::from_secs(ADMIN_CYCLE_INTERVAL_SECS)).await;
+        }
+    }
+
+    async fn user_pat_lifecycle(&self, client: &IggyClient, cycle: u64) {
+        let username = format!("stress-user-{cycle}");
+        let password = "StressP@ss123!";
+
+        // Create user
+        match client
+            .create_user(&username, password, UserStatus::Active, None)
+            .await
+        {
+            Ok(_) => {
+                self.ctx
+                    .stats
+                    .create_user_ok
+                    .fetch_add(1, Ordering::Relaxed);
+            }
+            Err(e) => {
+                self.ctx
+                    .stats
+                    .create_user_err
+                    .fetch_add(1, Ordering::Relaxed);
+                error_classifier::record_error(&self.ctx.stats, &e);
+                warn!("Admin: create user failed: {e}");
+                return;
+            }
+        }
+
+        // Create PAT for current session (root user)
+        let pat_name = format!("stress-pat-{cycle}");
+        match client
+            .create_personal_access_token(&pat_name, IggyExpiry::NeverExpire)
+            .await
+        {
+            Ok(_) => {
+                self.ctx.stats.create_pat_ok.fetch_add(1, Ordering::Relaxed);
+
+                // Delete PAT
+                match client.delete_personal_access_token(&pat_name).await {
+                    Ok(()) => {
+                        self.ctx.stats.delete_pat_ok.fetch_add(1, 
Ordering::Relaxed);
+                    }
+                    Err(e) => {
+                        self.ctx
+                            .stats
+                            .delete_pat_err
+                            .fetch_add(1, Ordering::Relaxed);
+                        error_classifier::record_error(&self.ctx.stats, &e);
+                        debug!("Admin: delete PAT failed: {e}");
+                    }
+                }
+            }
+            Err(e) => {
+                self.ctx
+                    .stats
+                    .create_pat_err
+                    .fetch_add(1, Ordering::Relaxed);
+                error_classifier::record_error(&self.ctx.stats, &e);
+                debug!("Admin: create PAT failed: {e}");
+            }
+        }
+
+        // Delete user
+        let user_id: Identifier = username.as_str().try_into().expect("valid 
identifier");
+        match client.delete_user(&user_id).await {
+            Ok(()) => {
+                self.ctx
+                    .stats
+                    .delete_user_ok
+                    .fetch_add(1, Ordering::Relaxed);
+            }
+            Err(e) => {
+                self.ctx
+                    .stats
+                    .delete_user_err
+                    .fetch_add(1, Ordering::Relaxed);
+                error_classifier::record_error(&self.ctx.stats, &e);
+                debug!("Admin: delete user failed: {e}");
+            }
+        }
+    }
+
+    async fn offset_lifecycle(&self, client: &IggyClient, cycle: u64) {
+        let stream_id: Identifier = "bench-stream-1".try_into().expect("valid 
identifier");
+        let topic_id: Identifier = "topic-1".try_into().expect("valid 
identifier");
+        let consumer_id = u32::try_from(cycle % 1_000_000).unwrap_or(0) + 1000;
+        let consumer = 
Consumer::new(Identifier::numeric(consumer_id).expect("valid id"));
+
+        // Store offset
+        match client
+            .store_consumer_offset(&consumer, &stream_id, &topic_id, Some(1), 
cycle)
+            .await
+        {
+            Ok(()) => {
+                self.ctx
+                    .stats
+                    .store_offset_ok
+                    .fetch_add(1, Ordering::Relaxed);
+            }
+            Err(e) => {
+                self.ctx
+                    .stats
+                    .store_offset_err
+                    .fetch_add(1, Ordering::Relaxed);
+                error_classifier::record_error(&self.ctx.stats, &e);
+                return;
+            }
+        }
+
+        // Get offset
+        match client
+            .get_consumer_offset(&consumer, &stream_id, &topic_id, Some(1))
+            .await
+        {
+            Ok(_) => {
+                self.ctx.stats.get_offset_ok.fetch_add(1, Ordering::Relaxed);
+            }
+            Err(e) => {
+                self.ctx
+                    .stats
+                    .get_offset_err
+                    .fetch_add(1, Ordering::Relaxed);
+                error_classifier::record_error(&self.ctx.stats, &e);
+            }
+        }
+    }
+
+    async fn flush_buffers(&self, client: &IggyClient) {
+        let stream_id: Identifier = "bench-stream-1".try_into().expect("valid 
identifier");
+        let topic_id: Identifier = "topic-1".try_into().expect("valid 
identifier");
+
+        match client
+            .flush_unsaved_buffer(&stream_id, &topic_id, 1, false)
+            .await
+        {
+            Ok(()) => {
+                self.ctx.stats.flush_ok.fetch_add(1, Ordering::Relaxed);
+            }
+            Err(e) => {
+                self.ctx.stats.flush_err.fetch_add(1, Ordering::Relaxed);
+                error_classifier::record_error(&self.ctx.stats, &e);
+                debug!("Admin: flush_unsaved_buffer failed: {e}");
+            }
+        }
+    }
+}
diff --git a/core/bench/src/actors/stress/control_plane_churner.rs 
b/core/bench/src/actors/stress/control_plane_churner.rs
new file mode 100644
index 000000000..39df92259
--- /dev/null
+++ b/core/bench/src/actors/stress/control_plane_churner.rs
@@ -0,0 +1,653 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::error_classifier;
+use super::stress_context::StressContext;
+use crate::args::kinds::stress::args::ApiMix;
+use crate::benchmarks::{CONSUMER_GROUP_BASE_ID, CONSUMER_GROUP_NAME_PREFIX};
+use crate::utils::{ClientFactory, login_root};
+use iggy::clients::client::IggyClient;
+use iggy::prelude::*;
+use rand::rngs::StdRng;
+use rand::{Rng, SeedableRng};
+use std::sync::Arc;
+use std::sync::atomic::Ordering;
+use tracing::{debug, warn};
+
+/// CRUD lifecycle operations exercised during the chaos phase.
+#[derive(Debug, Clone, Copy)]
+enum ChurnOp {
+    CreateDeleteTopic,
+    AddRemovePartitions,
+    ConsumerGroupJoinLeave,
+    PurgeTopic,
+    DeleteSegments,
+    UpdateTopic,
+    StressPoll,
+    // Destructive ops — only enabled with ApiMix::All
+    DeleteAndRecreateTopic,
+    PurgeStream,
+    DisruptConsumerGroup,
+}
+
+const STANDARD_OPS: [ChurnOp; 7] = [
+    ChurnOp::CreateDeleteTopic,
+    ChurnOp::AddRemovePartitions,
+    ChurnOp::ConsumerGroupJoinLeave,
+    ChurnOp::PurgeTopic,
+    ChurnOp::DeleteSegments,
+    ChurnOp::UpdateTopic,
+    ChurnOp::StressPoll,
+];
+
+const ALL_OPS: [ChurnOp; 10] = [
+    ChurnOp::CreateDeleteTopic,
+    ChurnOp::AddRemovePartitions,
+    ChurnOp::ConsumerGroupJoinLeave,
+    ChurnOp::PurgeTopic,
+    ChurnOp::DeleteSegments,
+    ChurnOp::UpdateTopic,
+    ChurnOp::StressPoll,
+    ChurnOp::DeleteAndRecreateTopic,
+    ChurnOp::PurgeStream,
+    ChurnOp::DisruptConsumerGroup,
+];
+
+/// Topic configuration shared between the benchmark setup and the churner,
+/// so that `DeleteAndRecreateTopic` can recreate with the same parameters.
+pub struct ChurnerConfig {
+    pub api_mix: ApiMix,
+    pub partitions: u32,
+    pub message_expiry: IggyExpiry,
+    pub max_topic_size: MaxTopicSize,
+}
+
+/// Periodically executes CRUD lifecycle operations against the server.
+///
+/// Targets the TOCTOU race in consumer group rebalance and exercises
+/// create/delete paths under concurrent data-plane load. In `ApiMix::All`
+/// mode, also exercises destructive ops (topic deletion, stream purge)
+/// that race against active data-plane actors.
+pub struct ControlPlaneChurner {
+    churner_id: u32,
+    client_factory: Arc<dyn ClientFactory>,
+    ctx: Arc<StressContext>,
+    churn_interval: std::time::Duration,
+    rng: StdRng,
+    api_mix: ApiMix,
+    partitions: u32,
+    message_expiry: IggyExpiry,
+    max_topic_size: MaxTopicSize,
+}
+
+impl ControlPlaneChurner {
+    pub fn new(
+        churner_id: u32,
+        client_factory: Arc<dyn ClientFactory>,
+        ctx: Arc<StressContext>,
+        churn_interval: IggyDuration,
+        chaos_seed: u64,
+        config: &ChurnerConfig,
+    ) -> Self {
+        let rng = 
StdRng::seed_from_u64(chaos_seed.wrapping_add(u64::from(churner_id)));
+        Self {
+            churner_id,
+            client_factory,
+            ctx,
+            churn_interval: churn_interval.get_duration(),
+            rng,
+            api_mix: config.api_mix,
+            partitions: config.partitions,
+            message_expiry: config.message_expiry,
+            max_topic_size: config.max_topic_size,
+        }
+    }
+
+    fn available_ops(&self) -> &'static [ChurnOp] {
+        match self.api_mix {
+            ApiMix::Mixed | ApiMix::ControlPlaneHeavy => &STANDARD_OPS,
+            ApiMix::All => &ALL_OPS,
+            ApiMix::DataPlaneOnly => unreachable!("churner not spawned for 
DataPlaneOnly"),
+        }
+    }
+
+    pub async fn run(mut self) {
+        let client = self.client_factory.create_client().await;
+        let client = IggyClient::create(client, None, None);
+        login_root(&client).await;
+
+        let mut cycle = 0u64;
+        while !self.ctx.is_cancelled() {
+            let ops = self.available_ops();
+            let op = ops[self.rng.random_range(0..ops.len())];
+            debug!(
+                "Churner #{} cycle {cycle}: executing {:?}",
+                self.churner_id, op
+            );
+
+            match op {
+                ChurnOp::CreateDeleteTopic => {
+                    self.create_delete_topic(&client, cycle).await;
+                }
+                ChurnOp::AddRemovePartitions => {
+                    self.add_remove_partitions(&client).await;
+                }
+                ChurnOp::ConsumerGroupJoinLeave => {
+                    self.consumer_group_join_leave(&client, cycle).await;
+                }
+                ChurnOp::PurgeTopic => {
+                    self.purge_random_topic(&client).await;
+                }
+                ChurnOp::DeleteSegments => {
+                    self.delete_segments(&client).await;
+                }
+                ChurnOp::UpdateTopic => {
+                    self.update_topic(&client).await;
+                }
+                ChurnOp::StressPoll => {
+                    self.stress_poll(&client).await;
+                }
+                ChurnOp::DeleteAndRecreateTopic => {
+                    self.delete_and_recreate_topic(&client).await;
+                }
+                ChurnOp::PurgeStream => {
+                    self.purge_stream(&client).await;
+                }
+                ChurnOp::DisruptConsumerGroup => {
+                    self.disrupt_consumer_group(&client).await;
+                }
+            }
+
+            cycle += 1;
+            tokio::time::sleep(self.churn_interval).await;
+        }
+    }
+
+    fn random_stream_id(&self) -> Identifier {
+        let stream_idx = self.rng.clone().random_range(1..=2u32);
+        format!("bench-stream-{stream_idx}")
+            .as_str()
+            .try_into()
+            .expect("valid identifier")
+    }
+
+    fn topic_id() -> Identifier {
+        "topic-1".try_into().expect("valid identifier")
+    }
+
+    // --- Original ops ---
+
+    async fn create_delete_topic(&self, client: &IggyClient, cycle: u64) {
+        let stream_id: Identifier = "bench-stream-1".try_into().expect("valid 
identifier");
+        let topic_name = format!("churn-{}-{cycle}", self.churner_id);
+
+        match client
+            .create_topic(
+                &stream_id,
+                &topic_name,
+                1,
+                CompressionAlgorithm::default(),
+                None,
+                IggyExpiry::NeverExpire,
+                MaxTopicSize::ServerDefault,
+            )
+            .await
+        {
+            Ok(_) => {
+                self.ctx
+                    .stats
+                    .create_topic_ok
+                    .fetch_add(1, Ordering::Relaxed);
+
+                self.ctx
+                    .ephemeral_topics
+                    .lock()
+                    .await
+                    .push((stream_id.clone(), topic_name.clone()));
+
+                
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
+
+                let topic_id: Identifier =
+                    topic_name.as_str().try_into().expect("valid identifier");
+                match client.delete_topic(&stream_id, &topic_id).await {
+                    Ok(()) => {
+                        self.ctx
+                            .stats
+                            .delete_topic_ok
+                            .fetch_add(1, Ordering::Relaxed);
+                        let mut topics = 
self.ctx.ephemeral_topics.lock().await;
+                        topics.retain(|(_, name)| name != &topic_name);
+                    }
+                    Err(e) => {
+                        self.ctx
+                            .stats
+                            .delete_topic_err
+                            .fetch_add(1, Ordering::Relaxed);
+                        error_classifier::record_error(&self.ctx.stats, &e);
+                        warn!("Churner #{}: delete topic failed: {e}", 
self.churner_id);
+                    }
+                }
+            }
+            Err(e) => {
+                self.ctx
+                    .stats
+                    .create_topic_err
+                    .fetch_add(1, Ordering::Relaxed);
+                error_classifier::record_error(&self.ctx.stats, &e);
+                warn!("Churner #{}: create topic failed: {e}", 
self.churner_id);
+            }
+        }
+    }
+
+    async fn add_remove_partitions(&self, client: &IggyClient) {
+        let stream_id: Identifier = "bench-stream-1".try_into().expect("valid 
identifier");
+        let topic_id = Self::topic_id();
+
+        match client.create_partitions(&stream_id, &topic_id, 1).await {
+            Ok(()) => {
+                self.ctx
+                    .stats
+                    .create_partitions_ok
+                    .fetch_add(1, Ordering::Relaxed);
+
+                
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
+
+                match client.delete_partitions(&stream_id, &topic_id, 1).await 
{
+                    Ok(()) => {
+                        self.ctx
+                            .stats
+                            .delete_partitions_ok
+                            .fetch_add(1, Ordering::Relaxed);
+                    }
+                    Err(e) => {
+                        self.ctx
+                            .stats
+                            .delete_partitions_err
+                            .fetch_add(1, Ordering::Relaxed);
+                        error_classifier::record_error(&self.ctx.stats, &e);
+                        debug!(
+                            "Churner #{}: delete partitions failed: {e}",
+                            self.churner_id
+                        );
+                    }
+                }
+            }
+            Err(e) => {
+                self.ctx
+                    .stats
+                    .create_partitions_err
+                    .fetch_add(1, Ordering::Relaxed);
+                error_classifier::record_error(&self.ctx.stats, &e);
+                debug!(
+                    "Churner #{}: create partitions failed: {e}",
+                    self.churner_id
+                );
+            }
+        }
+    }
+
+    /// Rapid join -> poll once -> leave cycle targeting the TOCTOU race
+    /// in `resolve_consumer_with_partition_id()`.
+    async fn consumer_group_join_leave(&self, client: &IggyClient, cycle: u64) 
{
+        let stream_id: Identifier = "bench-stream-1".try_into().expect("valid 
identifier");
+        let topic_id = Self::topic_id();
+        let cg_name = format!("churn-cg-{}-{cycle}", self.churner_id);
+
+        match client
+            .create_consumer_group(&stream_id, &topic_id, &cg_name)
+            .await
+        {
+            Ok(_) => {
+                self.ctx
+                    .stats
+                    .create_consumer_group_ok
+                    .fetch_add(1, Ordering::Relaxed);
+            }
+            Err(e) => {
+                self.ctx
+                    .stats
+                    .create_consumer_group_err
+                    .fetch_add(1, Ordering::Relaxed);
+                error_classifier::record_error(&self.ctx.stats, &e);
+                return;
+            }
+        }
+
+        let cg_id: Identifier = cg_name.as_str().try_into().expect("valid 
identifier");
+
+        match client
+            .join_consumer_group(&stream_id, &topic_id, &cg_id)
+            .await
+        {
+            Ok(()) => {
+                self.ctx
+                    .stats
+                    .join_consumer_group_ok
+                    .fetch_add(1, Ordering::Relaxed);
+            }
+            Err(e) => {
+                self.ctx
+                    .stats
+                    .join_consumer_group_err
+                    .fetch_add(1, Ordering::Relaxed);
+                error_classifier::record_error(&self.ctx.stats, &e);
+            }
+        }
+
+        match client
+            .leave_consumer_group(&stream_id, &topic_id, &cg_id)
+            .await
+        {
+            Ok(()) => {
+                self.ctx
+                    .stats
+                    .leave_consumer_group_ok
+                    .fetch_add(1, Ordering::Relaxed);
+            }
+            Err(e) => {
+                self.ctx
+                    .stats
+                    .leave_consumer_group_err
+                    .fetch_add(1, Ordering::Relaxed);
+                error_classifier::record_error(&self.ctx.stats, &e);
+            }
+        }
+
+        match client
+            .delete_consumer_group(&stream_id, &topic_id, &cg_id)
+            .await
+        {
+            Ok(()) => {
+                self.ctx
+                    .stats
+                    .delete_consumer_group_ok
+                    .fetch_add(1, Ordering::Relaxed);
+            }
+            Err(e) => {
+                self.ctx
+                    .stats
+                    .delete_consumer_group_err
+                    .fetch_add(1, Ordering::Relaxed);
+                error_classifier::record_error(&self.ctx.stats, &e);
+            }
+        }
+    }
+
+    async fn purge_random_topic(&self, client: &IggyClient) {
+        let stream_id = self.random_stream_id();
+        let topic_id = Self::topic_id();
+
+        match client.purge_topic(&stream_id, &topic_id).await {
+            Ok(()) => {
+                self.ctx
+                    .stats
+                    .purge_topic_ok
+                    .fetch_add(1, Ordering::Relaxed);
+            }
+            Err(e) => {
+                self.ctx
+                    .stats
+                    .purge_topic_err
+                    .fetch_add(1, Ordering::Relaxed);
+                error_classifier::record_error(&self.ctx.stats, &e);
+                debug!("Churner #{}: purge topic failed: {e}", 
self.churner_id);
+            }
+        }
+    }
+
+    // --- New safe ops ---
+
+    async fn delete_segments(&self, client: &IggyClient) {
+        let stream_id = self.random_stream_id();
+        let topic_id = Self::topic_id();
+        let partition_id = self.rng.clone().random_range(1..=self.partitions);
+
+        match client
+            .delete_segments(&stream_id, &topic_id, partition_id, 1)
+            .await
+        {
+            Ok(()) => {
+                self.ctx
+                    .stats
+                    .delete_segments_ok
+                    .fetch_add(1, Ordering::Relaxed);
+            }
+            Err(e) => {
+                self.ctx
+                    .stats
+                    .delete_segments_err
+                    .fetch_add(1, Ordering::Relaxed);
+                error_classifier::record_error(&self.ctx.stats, &e);
+                debug!("Churner #{}: delete segments failed: {e}", 
self.churner_id);
+            }
+        }
+    }
+
+    async fn update_topic(&self, client: &IggyClient) {
+        let stream_id = self.random_stream_id();
+        let topic_id = Self::topic_id();
+
+        let compression = if self.rng.clone().random_bool(0.5) {
+            CompressionAlgorithm::None
+        } else {
+            CompressionAlgorithm::Gzip
+        };
+
+        match client
+            .update_topic(
+                &stream_id,
+                &topic_id,
+                "topic-1",
+                compression,
+                None,
+                self.message_expiry,
+                self.max_topic_size,
+            )
+            .await
+        {
+            Ok(()) => {
+                self.ctx
+                    .stats
+                    .update_topic_ok
+                    .fetch_add(1, Ordering::Relaxed);
+            }
+            Err(e) => {
+                self.ctx
+                    .stats
+                    .update_topic_err
+                    .fetch_add(1, Ordering::Relaxed);
+                error_classifier::record_error(&self.ctx.stats, &e);
+                debug!("Churner #{}: update topic failed: {e}", 
self.churner_id);
+            }
+        }
+    }
+
+    /// One-off polls with First/Last/Timestamp strategies — these are "victim"
+    /// operations that race against concurrent segment mutations.
+    async fn stress_poll(&self, client: &IggyClient) {
+        let stream_id = self.random_stream_id();
+        let topic_id = Self::topic_id();
+        let partition_id = self.rng.clone().random_range(1..=self.partitions);
+        let consumer = Consumer::new(Identifier::numeric(8888).expect("valid 
consumer id"));
+
+        let strategy = match self.rng.clone().random_range(0..3u32) {
+            0 => PollingStrategy::first(),
+            1 => PollingStrategy::last(),
+            _ => PollingStrategy::timestamp(IggyTimestamp::now()),
+        };
+
+        match client
+            .poll_messages(
+                &stream_id,
+                &topic_id,
+                Some(partition_id),
+                &consumer,
+                &strategy,
+                10,
+                false,
+            )
+            .await
+        {
+            Ok(_) => {
+                self.ctx
+                    .stats
+                    .stress_poll_ok
+                    .fetch_add(1, Ordering::Relaxed);
+            }
+            Err(e) => {
+                self.ctx
+                    .stats
+                    .stress_poll_err
+                    .fetch_add(1, Ordering::Relaxed);
+                error_classifier::record_error(&self.ctx.stats, &e);
+                debug!("Churner #{}: stress poll failed: {e}", 
self.churner_id);
+            }
+        }
+    }
+
+    // --- Destructive ops (ApiMix::All only) ---
+
+    /// Deletes and immediately recreates topic-1 on a random stream.
+    /// Exercises 23+ `expect()` panic sites reachable when consumers
+    /// hold stale topic references during deletion.
+    async fn delete_and_recreate_topic(&self, client: &IggyClient) {
+        let stream_id = self.random_stream_id();
+        let topic_id = Self::topic_id();
+
+        match client.delete_topic(&stream_id, &topic_id).await {
+            Ok(()) => {
+                self.ctx
+                    .stats
+                    .delete_topic_ok
+                    .fetch_add(1, Ordering::Relaxed);
+
+                
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
+
+                match client
+                    .create_topic(
+                        &stream_id,
+                        "topic-1",
+                        self.partitions,
+                        CompressionAlgorithm::default(),
+                        None,
+                        self.message_expiry,
+                        self.max_topic_size,
+                    )
+                    .await
+                {
+                    Ok(_) => {
+                        self.ctx
+                            .stats
+                            .create_topic_ok
+                            .fetch_add(1, Ordering::Relaxed);
+                    }
+                    Err(e) => {
+                        self.ctx
+                            .stats
+                            .create_topic_err
+                            .fetch_add(1, Ordering::Relaxed);
+                        error_classifier::record_error(&self.ctx.stats, &e);
+                        warn!("Churner #{}: recreate topic failed: {e}", 
self.churner_id);
+                    }
+                }
+            }
+            Err(e) => {
+                self.ctx
+                    .stats
+                    .delete_topic_err
+                    .fetch_add(1, Ordering::Relaxed);
+                error_classifier::record_error(&self.ctx.stats, &e);
+                debug!(
+                    "Churner #{}: delete topic (for recreate) failed: {e}",
+                    self.churner_id
+                );
+            }
+        }
+    }
+
+    async fn purge_stream(&self, client: &IggyClient) {
+        let stream_id = self.random_stream_id();
+
+        match client.purge_stream(&stream_id).await {
+            Ok(()) => {
+                self.ctx
+                    .stats
+                    .purge_stream_ok
+                    .fetch_add(1, Ordering::Relaxed);
+            }
+            Err(e) => {
+                self.ctx
+                    .stats
+                    .purge_stream_err
+                    .fetch_add(1, Ordering::Relaxed);
+                error_classifier::record_error(&self.ctx.stats, &e);
+                debug!("Churner #{}: purge stream failed: {e}", 
self.churner_id);
+            }
+        }
+    }
+
+    /// Joins and leaves the consumer group that active consumers are using,
+    /// forcing a rebalance mid-poll (TOCTOU in CG resolution).
+    async fn disrupt_consumer_group(&self, client: &IggyClient) {
+        let stream_id: Identifier = "bench-stream-1".try_into().expect("valid 
identifier");
+        let topic_id = Self::topic_id();
+        let cg_name = 
format!("{CONSUMER_GROUP_NAME_PREFIX}-{CONSUMER_GROUP_BASE_ID}");
+        let cg_id: Identifier = cg_name.as_str().try_into().expect("valid 
identifier");
+
+        match client
+            .join_consumer_group(&stream_id, &topic_id, &cg_id)
+            .await
+        {
+            Ok(()) => {
+                self.ctx
+                    .stats
+                    .join_consumer_group_ok
+                    .fetch_add(1, Ordering::Relaxed);
+
+                tokio::time::sleep(std::time::Duration::from_millis(50)).await;
+
+                match client
+                    .leave_consumer_group(&stream_id, &topic_id, &cg_id)
+                    .await
+                {
+                    Ok(()) => {
+                        self.ctx
+                            .stats
+                            .leave_consumer_group_ok
+                            .fetch_add(1, Ordering::Relaxed);
+                    }
+                    Err(e) => {
+                        self.ctx
+                            .stats
+                            .leave_consumer_group_err
+                            .fetch_add(1, Ordering::Relaxed);
+                        error_classifier::record_error(&self.ctx.stats, &e);
+                    }
+                }
+            }
+            Err(e) => {
+                self.ctx
+                    .stats
+                    .join_consumer_group_err
+                    .fetch_add(1, Ordering::Relaxed);
+                error_classifier::record_error(&self.ctx.stats, &e);
+                debug!("Churner #{}: disrupt CG join failed: {e}", 
self.churner_id);
+            }
+        }
+    }
+}
diff --git a/core/bench/src/actors/stress/error_classifier.rs 
b/core/bench/src/actors/stress/error_classifier.rs
new file mode 100644
index 000000000..7e853a571
--- /dev/null
+++ b/core/bench/src/actors/stress/error_classifier.rs
@@ -0,0 +1,72 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use iggy::prelude::IggyError;
+use std::sync::atomic::Ordering;
+
+use super::stress_context::StressStats;
+
+/// Three-tier error classification for stress test chaos tolerance.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum ErrorSeverity {
+    /// Expected during chaos: races, stale references, already-exists 
conflicts.
+    Expected,
+    /// Not anticipated but non-fatal: server-side validation, capacity limits.
+    Unexpected,
+}
+
+/// Classifies an `IggyError` into expected vs unexpected during a stress test.
+///
+/// During chaos (CRUD churn + concurrent data-plane), certain errors are 
normal:
+/// - Resource-not-found errors when a churner deletes a topic another actor 
references
+/// - Already-exists errors from concurrent create attempts
+/// - Consumer group member-not-found during rebalance races
+pub const fn classify(error: &IggyError) -> ErrorSeverity {
+    match error {
+        // Resource races, already-exists conflicts, user/PAT concurrency
+        IggyError::StreamIdNotFound(_)
+        | IggyError::TopicIdNotFound(_, _)
+        | IggyError::PartitionNotFound(_, _, _)
+        | IggyError::ConsumerGroupIdNotFound(_, _)
+        | IggyError::ConsumerGroupNameNotFound(_, _)
+        | IggyError::ConsumerGroupMemberNotFound(_, _, _)
+        | IggyError::ResourceNotFound(_)
+        | IggyError::StreamNameAlreadyExists(_)
+        | IggyError::TopicNameAlreadyExists(_, _)
+        | IggyError::ConsumerGroupNameAlreadyExists(_, _)
+        | IggyError::UserAlreadyExists
+        | IggyError::PersonalAccessTokenAlreadyExists(_, _)
+        | IggyError::InvalidPersonalAccessToken
+        | IggyError::SegmentNotFound
+        | IggyError::SegmentClosed(_, _) => ErrorSeverity::Expected,
+
+        _ => ErrorSeverity::Unexpected,
+    }
+}
+
+/// Records an error in the stress stats based on its severity.
+pub fn record_error(stats: &StressStats, error: &IggyError) {
+    match classify(error) {
+        ErrorSeverity::Expected => {
+            stats.expected_errors.fetch_add(1, Ordering::Relaxed);
+        }
+        ErrorSeverity::Unexpected => {
+            stats.unexpected_errors.fetch_add(1, Ordering::Relaxed);
+        }
+    }
+}
diff --git a/core/bench/src/actors/stress/health_monitor.rs 
b/core/bench/src/actors/stress/health_monitor.rs
new file mode 100644
index 000000000..601fab220
--- /dev/null
+++ b/core/bench/src/actors/stress/health_monitor.rs
@@ -0,0 +1,143 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::error_classifier;
+use super::stress_context::StressContext;
+use crate::utils::{ClientFactory, login_root};
+use iggy::clients::client::IggyClient;
+use iggy::prelude::*;
+use std::sync::Arc;
+use std::sync::atomic::Ordering;
+use std::time::Instant;
+use tracing::{debug, info, warn};
+
+const PING_INTERVAL_SECS: u64 = 5;
+const STATS_INTERVAL_SECS: u64 = 10;
+const METADATA_INTERVAL_SECS: u64 = 30;
+
+/// Periodically probes server health and metadata convergence.
+///
+/// Runs `ping`, `get_stats`, `get_me`, `get_clients`, and metadata queries
+/// at different frequencies. Reports latency degradation.
+pub struct HealthMonitor {
+    client_factory: Arc<dyn ClientFactory>,
+    ctx: Arc<StressContext>,
+}
+
+impl HealthMonitor {
+    pub fn new(client_factory: Arc<dyn ClientFactory>, ctx: 
Arc<StressContext>) -> Self {
+        Self {
+            client_factory,
+            ctx,
+        }
+    }
+
+    pub async fn run(self) {
+        let client = self.client_factory.create_client().await;
+        let client = IggyClient::create(client, None, None);
+        login_root(&client).await;
+
+        let mut tick = 0u64;
+        while !self.ctx.is_cancelled() {
+            // Ping every cycle (5s)
+            self.probe_ping(&client).await;
+
+            // Stats every 2 cycles (10s)
+            if tick.is_multiple_of(STATS_INTERVAL_SECS / PING_INTERVAL_SECS) {
+                self.probe_stats(&client).await;
+            }
+
+            if tick.is_multiple_of(METADATA_INTERVAL_SECS / 
PING_INTERVAL_SECS) {
+                self.probe_metadata(&client).await;
+            }
+
+            tick += 1;
+            
tokio::time::sleep(std::time::Duration::from_secs(PING_INTERVAL_SECS)).await;
+        }
+    }
+
+    async fn probe_ping(&self, client: &IggyClient) {
+        let start = Instant::now();
+        match client.ping().await {
+            Ok(()) => {
+                let latency = start.elapsed();
+                self.ctx.stats.ping_ok.fetch_add(1, Ordering::Relaxed);
+                if latency.as_millis() > 500 {
+                    warn!("Health: ping latency {latency:?} exceeds 500ms");
+                }
+            }
+            Err(e) => {
+                self.ctx.stats.ping_err.fetch_add(1, Ordering::Relaxed);
+                error_classifier::record_error(&self.ctx.stats, &e);
+                warn!("Health: ping failed: {e}");
+            }
+        }
+    }
+
+    async fn probe_stats(&self, client: &IggyClient) {
+        let start = Instant::now();
+        match client.get_stats().await {
+            Ok(stats) => {
+                let latency = start.elapsed();
+                self.ctx.stats.get_stats_ok.fetch_add(1, Ordering::Relaxed);
+                info!(
+                    "Health: server stats in {latency:?} - messages: {}, 
streams: {}, topics: {}",
+                    stats.messages_count, stats.streams_count, 
stats.topics_count
+                );
+            }
+            Err(e) => {
+                self.ctx.stats.get_stats_err.fetch_add(1, Ordering::Relaxed);
+                error_classifier::record_error(&self.ctx.stats, &e);
+                warn!("Health: get_stats failed: {e}");
+            }
+        }
+    }
+
+    async fn probe_metadata(&self, client: &IggyClient) {
+        // get_me
+        match client.get_me().await {
+            Ok(_) => {
+                self.ctx.stats.get_me_ok.fetch_add(1, Ordering::Relaxed);
+            }
+            Err(e) => {
+                self.ctx.stats.get_me_err.fetch_add(1, Ordering::Relaxed);
+                error_classifier::record_error(&self.ctx.stats, &e);
+                debug!("Health: get_me failed: {e}");
+            }
+        }
+
+        // get_clients
+        match client.get_clients().await {
+            Ok(clients) => {
+                self.ctx
+                    .stats
+                    .get_clients_ok
+                    .fetch_add(1, Ordering::Relaxed);
+                debug!("Health: {} connected clients", clients.len());
+            }
+            Err(e) => {
+                self.ctx
+                    .stats
+                    .get_clients_err
+                    .fetch_add(1, Ordering::Relaxed);
+                error_classifier::record_error(&self.ctx.stats, &e);
+                debug!("Health: get_clients failed: {e}");
+            }
+        }
+    }
+}
diff --git a/core/bench/src/args/kinds/mod.rs 
b/core/bench/src/actors/stress/mod.rs
similarity index 84%
copy from core/bench/src/args/kinds/mod.rs
copy to core/bench/src/actors/stress/mod.rs
index 66e522a58..1b146595f 100644
--- a/core/bench/src/args/kinds/mod.rs
+++ b/core/bench/src/actors/stress/mod.rs
@@ -16,6 +16,9 @@
  * under the License.
  */
 
-pub mod balanced;
-pub mod end_to_end;
-pub mod pinned;
+pub mod admin_exerciser;
+pub mod control_plane_churner;
+pub mod error_classifier;
+pub mod health_monitor;
+pub mod stress_context;
+pub mod verifier;
diff --git a/core/bench/src/actors/stress/stress_context.rs 
b/core/bench/src/actors/stress/stress_context.rs
new file mode 100644
index 000000000..c1540a81d
--- /dev/null
+++ b/core/bench/src/actors/stress/stress_context.rs
@@ -0,0 +1,177 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use iggy::prelude::Identifier;
+use std::sync::{
+    Arc,
+    atomic::{AtomicBool, AtomicU64, Ordering},
+};
+use tokio::sync::Mutex;
+
+/// Shared state across all stress test actors.
+pub struct StressContext {
+    pub cancelled: Arc<AtomicBool>,
+    pub stats: Arc<StressStats>,
+    /// Tracks ephemeral topics created by churners for cleanup: `(stream_id, 
topic_name)`
+    pub ephemeral_topics: Arc<Mutex<Vec<(Identifier, String)>>>,
+}
+
+impl StressContext {
+    pub fn new() -> Self {
+        Self {
+            cancelled: Arc::new(AtomicBool::new(false)),
+            stats: Arc::new(StressStats::default()),
+            ephemeral_topics: Arc::new(Mutex::new(Vec::new())),
+        }
+    }
+
+    pub fn cancel(&self) {
+        self.cancelled.store(true, Ordering::Release);
+    }
+
+    pub fn is_cancelled(&self) -> bool {
+        self.cancelled.load(Ordering::Acquire)
+    }
+}
+
+/// Per-API atomic counters for stress test telemetry.
+#[derive(Default)]
+pub struct StressStats {
+    pub send_messages_ok: AtomicU64,
+    pub send_messages_err: AtomicU64,
+    pub poll_messages_ok: AtomicU64,
+    pub poll_messages_err: AtomicU64,
+    pub create_stream_ok: AtomicU64,
+    pub create_stream_err: AtomicU64,
+    pub delete_stream_ok: AtomicU64,
+    pub delete_stream_err: AtomicU64,
+    pub create_topic_ok: AtomicU64,
+    pub create_topic_err: AtomicU64,
+    pub delete_topic_ok: AtomicU64,
+    pub delete_topic_err: AtomicU64,
+    pub create_partitions_ok: AtomicU64,
+    pub create_partitions_err: AtomicU64,
+    pub delete_partitions_ok: AtomicU64,
+    pub delete_partitions_err: AtomicU64,
+    pub create_consumer_group_ok: AtomicU64,
+    pub create_consumer_group_err: AtomicU64,
+    pub delete_consumer_group_ok: AtomicU64,
+    pub delete_consumer_group_err: AtomicU64,
+    pub join_consumer_group_ok: AtomicU64,
+    pub join_consumer_group_err: AtomicU64,
+    pub leave_consumer_group_ok: AtomicU64,
+    pub leave_consumer_group_err: AtomicU64,
+    pub purge_topic_ok: AtomicU64,
+    pub purge_topic_err: AtomicU64,
+    pub delete_segments_ok: AtomicU64,
+    pub delete_segments_err: AtomicU64,
+    pub update_topic_ok: AtomicU64,
+    pub update_topic_err: AtomicU64,
+    pub purge_stream_ok: AtomicU64,
+    pub purge_stream_err: AtomicU64,
+    pub stress_poll_ok: AtomicU64,
+    pub stress_poll_err: AtomicU64,
+    pub create_user_ok: AtomicU64,
+    pub create_user_err: AtomicU64,
+    pub delete_user_ok: AtomicU64,
+    pub delete_user_err: AtomicU64,
+    pub create_pat_ok: AtomicU64,
+    pub create_pat_err: AtomicU64,
+    pub delete_pat_ok: AtomicU64,
+    pub delete_pat_err: AtomicU64,
+    pub store_offset_ok: AtomicU64,
+    pub store_offset_err: AtomicU64,
+    pub get_offset_ok: AtomicU64,
+    pub get_offset_err: AtomicU64,
+    pub ping_ok: AtomicU64,
+    pub ping_err: AtomicU64,
+    pub get_stats_ok: AtomicU64,
+    pub get_stats_err: AtomicU64,
+    pub get_me_ok: AtomicU64,
+    pub get_me_err: AtomicU64,
+    pub get_clients_ok: AtomicU64,
+    pub get_clients_err: AtomicU64,
+    pub flush_ok: AtomicU64,
+    pub flush_err: AtomicU64,
+    pub expected_errors: AtomicU64,
+    pub unexpected_errors: AtomicU64,
+}
+
+impl StressStats {
+    pub fn total_ok(&self) -> u64 {
+        self.send_messages_ok.load(Ordering::Relaxed)
+            + self.poll_messages_ok.load(Ordering::Relaxed)
+            + self.create_stream_ok.load(Ordering::Relaxed)
+            + self.delete_stream_ok.load(Ordering::Relaxed)
+            + self.create_topic_ok.load(Ordering::Relaxed)
+            + self.delete_topic_ok.load(Ordering::Relaxed)
+            + self.create_partitions_ok.load(Ordering::Relaxed)
+            + self.delete_partitions_ok.load(Ordering::Relaxed)
+            + self.create_consumer_group_ok.load(Ordering::Relaxed)
+            + self.delete_consumer_group_ok.load(Ordering::Relaxed)
+            + self.join_consumer_group_ok.load(Ordering::Relaxed)
+            + self.leave_consumer_group_ok.load(Ordering::Relaxed)
+            + self.purge_topic_ok.load(Ordering::Relaxed)
+            + self.delete_segments_ok.load(Ordering::Relaxed)
+            + self.update_topic_ok.load(Ordering::Relaxed)
+            + self.purge_stream_ok.load(Ordering::Relaxed)
+            + self.stress_poll_ok.load(Ordering::Relaxed)
+            + self.create_user_ok.load(Ordering::Relaxed)
+            + self.delete_user_ok.load(Ordering::Relaxed)
+            + self.create_pat_ok.load(Ordering::Relaxed)
+            + self.delete_pat_ok.load(Ordering::Relaxed)
+            + self.store_offset_ok.load(Ordering::Relaxed)
+            + self.get_offset_ok.load(Ordering::Relaxed)
+            + self.ping_ok.load(Ordering::Relaxed)
+            + self.get_stats_ok.load(Ordering::Relaxed)
+            + self.get_me_ok.load(Ordering::Relaxed)
+            + self.get_clients_ok.load(Ordering::Relaxed)
+            + self.flush_ok.load(Ordering::Relaxed)
+    }
+
+    pub fn total_err(&self) -> u64 {
+        self.send_messages_err.load(Ordering::Relaxed)
+            + self.poll_messages_err.load(Ordering::Relaxed)
+            + self.create_stream_err.load(Ordering::Relaxed)
+            + self.delete_stream_err.load(Ordering::Relaxed)
+            + self.create_topic_err.load(Ordering::Relaxed)
+            + self.delete_topic_err.load(Ordering::Relaxed)
+            + self.create_partitions_err.load(Ordering::Relaxed)
+            + self.delete_partitions_err.load(Ordering::Relaxed)
+            + self.create_consumer_group_err.load(Ordering::Relaxed)
+            + self.delete_consumer_group_err.load(Ordering::Relaxed)
+            + self.join_consumer_group_err.load(Ordering::Relaxed)
+            + self.leave_consumer_group_err.load(Ordering::Relaxed)
+            + self.purge_topic_err.load(Ordering::Relaxed)
+            + self.delete_segments_err.load(Ordering::Relaxed)
+            + self.update_topic_err.load(Ordering::Relaxed)
+            + self.purge_stream_err.load(Ordering::Relaxed)
+            + self.stress_poll_err.load(Ordering::Relaxed)
+            + self.create_user_err.load(Ordering::Relaxed)
+            + self.delete_user_err.load(Ordering::Relaxed)
+            + self.create_pat_err.load(Ordering::Relaxed)
+            + self.delete_pat_err.load(Ordering::Relaxed)
+            + self.store_offset_err.load(Ordering::Relaxed)
+            + self.get_offset_err.load(Ordering::Relaxed)
+            + self.ping_err.load(Ordering::Relaxed)
+            + self.get_stats_err.load(Ordering::Relaxed)
+            + self.get_me_err.load(Ordering::Relaxed)
+            + self.get_clients_err.load(Ordering::Relaxed)
+            + self.flush_err.load(Ordering::Relaxed)
+    }
+}
diff --git a/core/bench/src/actors/stress/verifier.rs 
b/core/bench/src/actors/stress/verifier.rs
new file mode 100644
index 000000000..bc10a22c2
--- /dev/null
+++ b/core/bench/src/actors/stress/verifier.rs
@@ -0,0 +1,198 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::utils::{ClientFactory, login_root};
+use iggy::clients::client::IggyClient;
+use iggy::prelude::*;
+use iggy_common::calculate_checksum;
+use std::collections::BTreeSet;
+use std::sync::Arc;
+use tracing::{info, warn};
+
+/// Post-test verification results.
+#[derive(Debug, Default)]
+pub struct VerificationResult {
+    pub partitions_checked: u32,
+    pub total_messages: u64,
+    pub gaps_found: u64,
+    pub duplicates_found: u64,
+    pub checksum_mismatches: u64,
+    pub payload_length_mismatches: u64,
+    pub id_missing_fingerprint: u64,
+    pub passed: bool,
+}
+
+/// Runs drain-phase verification: polls all partitions and checks offset 
continuity.
+///
+/// During the stress test, messages may expire, so we verify that within each
+/// partition the offsets we can still poll are monotonically increasing with 
no
+/// gaps in the remaining range.
+pub struct StressVerifier {
+    client_factory: Arc<dyn ClientFactory>,
+    streams: u32,
+    partitions: u32,
+}
+
+impl StressVerifier {
+    pub fn new(client_factory: Arc<dyn ClientFactory>, streams: u32, 
partitions: u32) -> Self {
+        Self {
+            client_factory,
+            streams,
+            partitions,
+        }
+    }
+
+    pub async fn verify(&self) -> VerificationResult {
+        let client = self.client_factory.create_client().await;
+        let client = IggyClient::create(client, None, None);
+        login_root(&client).await;
+
+        let mut result = VerificationResult::default();
+
+        for stream_idx in 1..=self.streams {
+            let stream_id: Identifier = format!("bench-stream-{stream_idx}")
+                .as_str()
+                .try_into()
+                .expect("valid identifier");
+            let topic_id: Identifier = "topic-1".try_into().expect("valid 
identifier");
+
+            for partition_id in 0..self.partitions {
+                let partition_result = self
+                    .verify_partition(&client, &stream_id, &topic_id, 
partition_id)
+                    .await;
+                result.partitions_checked += 1;
+                result.total_messages += partition_result.total_messages;
+                result.gaps_found += partition_result.gaps_found;
+                result.duplicates_found += partition_result.duplicates_found;
+                result.checksum_mismatches += 
partition_result.checksum_mismatches;
+                result.payload_length_mismatches += 
partition_result.payload_length_mismatches;
+                result.id_missing_fingerprint += 
partition_result.id_missing_fingerprint;
+            }
+        }
+
+        result.passed = result.gaps_found == 0
+            && result.duplicates_found == 0
+            && result.checksum_mismatches == 0
+            && result.payload_length_mismatches == 0;
+
+        if result.passed {
+            info!(
+                "Verification PASSED: {} partitions, {} msgs, 0 gaps, 0 dups, 
0 checksum, 0 len",
+                result.partitions_checked, result.total_messages
+            );
+        } else {
+            warn!(
+                "Verification FAILED: {} partitions, {} msgs, {} gaps, {} 
dups, {} checksum, {} len",
+                result.partitions_checked,
+                result.total_messages,
+                result.gaps_found,
+                result.duplicates_found,
+                result.checksum_mismatches,
+                result.payload_length_mismatches,
+            );
+        }
+
+        if result.id_missing_fingerprint > 0 {
+            warn!(
+                "Verification: {} messages missing producer ID fingerprint 
(server-assigned IDs)",
+                result.id_missing_fingerprint
+            );
+        }
+
+        result
+    }
+
+    async fn verify_partition(
+        &self,
+        client: &IggyClient,
+        stream_id: &Identifier,
+        topic_id: &Identifier,
+        partition_id: u32,
+    ) -> VerificationResult {
+        let mut result = VerificationResult::default();
+        let mut seen_offsets = BTreeSet::new();
+        let mut current_offset = 0u64;
+        let consumer = Consumer::new(Identifier::numeric(9999).expect("valid 
id"));
+        let batch_size = 1000u32;
+
+        loop {
+            let strategy = PollingStrategy::offset(current_offset);
+            match client
+                .poll_messages(
+                    stream_id,
+                    topic_id,
+                    Some(partition_id),
+                    &consumer,
+                    &strategy,
+                    batch_size,
+                    false,
+                )
+                .await
+            {
+                Ok(polled) => {
+                    if polled.messages.is_empty() {
+                        break;
+                    }
+
+                    for msg in &polled.messages {
+                        let offset = msg.header.offset;
+                        if !seen_offsets.insert(offset) {
+                            result.duplicates_found += 1;
+                        }
+
+                        // Checksum re-verification: serialize and hash 
everything after the checksum field
+                        let raw = msg.to_bytes();
+                        let recomputed = calculate_checksum(&raw[8..]);
+                        if msg.header.checksum != recomputed {
+                            result.checksum_mismatches += 1;
+                        }
+
+                        if msg.header.payload_length as usize != 
msg.payload.len() {
+                            result.payload_length_mismatches += 1;
+                        }
+
+                        if msg.header.id == 0 {
+                            result.id_missing_fingerprint += 1;
+                        }
+
+                        result.total_messages += 1;
+                    }
+
+                    current_offset = 
polled.messages.last().expect("non-empty").header.offset + 1;
+                }
+                Err(e) => {
+                    warn!(
+                        "Verifier: poll partition {partition_id} at offset 
{current_offset} failed: {e}"
+                    );
+                    break;
+                }
+            }
+        }
+
+        // Check for gaps in the seen offsets
+        if let (Some(&min), Some(&max)) = (seen_offsets.first(), 
seen_offsets.last()) {
+            let expected_count = max - min + 1;
+            let actual_count = seen_offsets.len() as u64;
+            if actual_count < expected_count {
+                result.gaps_found = expected_count - actual_count;
+            }
+        }
+
+        result
+    }
+}
diff --git a/core/bench/src/analytics/metrics/group.rs 
b/core/bench/src/analytics/metrics/group.rs
index 9e0bd4cf6..e46b05a6e 100644
--- a/core/bench/src/analytics/metrics/group.rs
+++ b/core/bench/src/analytics/metrics/group.rs
@@ -154,6 +154,7 @@ fn determine_group_kind(stats: 
&[BenchmarkIndividualMetrics]) -> GroupMetricsKin
         ActorKind::Producer => GroupMetricsKind::Producers,
         ActorKind::Consumer => GroupMetricsKind::Consumers,
         ActorKind::ProducingConsumer => GroupMetricsKind::ProducingConsumers,
+        ActorKind::StressActor => GroupMetricsKind::StressActors,
     }
 }
 
diff --git a/core/bench/src/args/common.rs b/core/bench/src/args/common.rs
index be983c209..d60cf7faa 100644
--- a/core/bench/src/args/common.rs
+++ b/core/bench/src/args/common.rs
@@ -118,7 +118,10 @@ impl IggyBenchArgs {
                 .exit();
         }
 
-        if (self.message_batches, self.total_data) == (None, None) {
+        // Stress uses --duration instead of --message-batches/--total-data
+        let is_stress = matches!(self.benchmark_kind, 
BenchmarkKindCommand::Stress(_));
+
+        if !is_stress && (self.message_batches, self.total_data) == (None, 
None) {
             self.message_batches = Some(DEFAULT_MESSAGE_BATCHES);
         }
 
@@ -163,11 +166,17 @@ impl IggyBenchArgs {
     }
 
     // Used only for generation of unique directory name
+    #[allow(clippy::option_if_let_else)]
     pub fn data_volume_identifier(&self) -> String {
-        self.total_data().map_or_else(
-            || self.message_batches().unwrap().to_string(),
-            |total_messages_size| format!("{}B", 
total_messages_size.as_bytes_u64()),
-        )
+        if let Some(total_messages_size) = self.total_data() {
+            format!("{}B", total_messages_size.as_bytes_u64())
+        } else if let Some(batches) = self.message_batches() {
+            batches.to_string()
+        } else if let BenchmarkKindCommand::Stress(args) = 
&self.benchmark_kind {
+            format!("{}s", args.duration().as_secs())
+        } else {
+            "unknown".to_string()
+        }
     }
 
     pub fn streams(&self) -> u32 {
@@ -318,6 +327,7 @@ impl IggyBenchArgs {
             BenchmarkKindCommand::EndToEndProducingConsumerGroup(_) => {
                 "end_to_end_producing_consumer_group"
             }
+            BenchmarkKindCommand::Stress(_) => "stress",
             BenchmarkKindCommand::Examples => unreachable!(),
         };
 
@@ -336,16 +346,18 @@ impl IggyBenchArgs {
             BenchmarkKindCommand::PinnedConsumer(_)
             | BenchmarkKindCommand::BalancedConsumerGroup(_) => 
self.consumers(),
             BenchmarkKindCommand::PinnedProducerAndConsumer(_)
-            | BenchmarkKindCommand::BalancedProducerAndConsumerGroup(_) => {
-                self.producers() + self.consumers()
-            }
+            | BenchmarkKindCommand::BalancedProducerAndConsumerGroup(_)
+            | BenchmarkKindCommand::Stress(_) => self.producers() + 
self.consumers(),
             BenchmarkKindCommand::Examples => unreachable!(),
         };
 
-        let data_volume_arg = match (self.total_data, self.message_batches) {
-            (Some(total), None) => format!("{total}"),
-            (None, Some(batches)) => format!("{batches}"),
-            _ => unreachable!(),
+        let data_volume_arg = match &self.benchmark_kind {
+            BenchmarkKindCommand::Stress(args) => format!("{}", 
args.duration()),
+            _ => match (self.total_data, self.message_batches) {
+                (Some(total), None) => format!("{total}"),
+                (None, Some(batches)) => format!("{batches}"),
+                _ => unreachable!(),
+            },
         };
 
         let mut parts = vec![
@@ -398,6 +410,14 @@ impl IggyBenchArgs {
                     self.consumers()
                 )
             }
+            BenchmarkKindCommand::Stress(args) => {
+                format!(
+                    "stress {} producers/{} consumers for {}",
+                    self.producers(),
+                    self.consumers(),
+                    args.duration()
+                )
+            }
             BenchmarkKindCommand::Examples => unreachable!(),
         };
 
diff --git a/core/bench/src/args/kind.rs b/core/bench/src/args/kind.rs
index 7cbdfb7b8..91d8b7ab6 100644
--- a/core/bench/src/args/kind.rs
+++ b/core/bench/src/args/kind.rs
@@ -21,6 +21,7 @@ use super::kinds::balanced::producer::BalancedProducerArgs;
 use 
super::kinds::balanced::producer_and_consumer_group::BalancedProducerAndConsumerGroupArgs;
 use 
super::kinds::end_to_end::producing_consumer::EndToEndProducingConsumerArgs;
 use 
super::kinds::end_to_end::producing_consumer_group::EndToEndProducingConsumerGroupArgs;
+use super::kinds::stress::args::StressArgs;
 use super::props::BenchmarkKindProps;
 use super::transport::BenchmarkTransportCommand;
 use crate::args::kinds::balanced::consumer_group::BalancedConsumerGroupArgs;
@@ -93,6 +94,37 @@ pub enum BenchmarkKindCommand {
     )]
     EndToEndProducingConsumerGroup(EndToEndProducingConsumerGroupArgs),
 
+    #[command(
+        about = "Comprehensive stress test with heterogeneous actors",
+        long_about = "Duration-based stress test mixing data-plane 
(produce/consume), control-plane \
+            (CRUD churn), admin (user/PAT), and health monitoring actors.\n\n\
+            Phases: Baseline (15%) -> Chaos (65%) -> Drain (20% or max 5min)",
+        after_long_help = "\
+Recommended server environment variables:
+
+  IGGY_SYSTEM_SEGMENT_SIZE=16MiB
+  IGGY_SYSTEM_TOPIC_MESSAGE_EXPIRY=30s
+  IGGY_SYSTEM_TOPIC_MAX_SIZE=200MiB
+  IGGY_DATA_MAINTENANCE_MESSAGES_CLEANER_ENABLED=true
+  IGGY_DATA_MAINTENANCE_MESSAGES_INTERVAL=5s
+  IGGY_MESSAGE_SAVER_INTERVAL=5s
+
+Example:
+
+  IGGY_SYSTEM_SEGMENT_SIZE=16MiB \\
+  IGGY_SYSTEM_TOPIC_MESSAGE_EXPIRY=30s \\
+  IGGY_SYSTEM_TOPIC_MAX_SIZE=200MiB \\
+  IGGY_DATA_MAINTENANCE_MESSAGES_CLEANER_ENABLED=true \\
+  IGGY_DATA_MAINTENANCE_MESSAGES_INTERVAL=5s \\
+  IGGY_MESSAGE_SAVER_INTERVAL=5s \\
+    cargo run --bin iggy-server -- --fresh --with-default-root-credentials
+
+  cargo run --bin iggy-bench -- stress -d 2m tcp",
+        visible_alias = "st",
+        verbatim_doc_comment
+    )]
+    Stress(StressArgs),
+
     #[command(about = "Print examples", visible_alias = "e", 
verbatim_doc_comment)]
     Examples,
 }
@@ -112,6 +144,7 @@ impl BenchmarkKindCommand {
             Self::EndToEndProducingConsumerGroup(_) => {
                 BenchmarkKind::EndToEndProducingConsumerGroup
             }
+            Self::Stress(_) => BenchmarkKind::Stress,
             Self::Examples => {
                 print_examples();
                 std::process::exit(0);
@@ -163,6 +196,7 @@ impl BenchmarkKindProps for BenchmarkKindCommand {
             Self::BalancedProducerAndConsumerGroup(args) => args,
             Self::EndToEndProducingConsumer(args) => args,
             Self::EndToEndProducingConsumerGroup(args) => args,
+            Self::Stress(args) => args,
             Self::Examples => {
                 print_examples();
                 std::process::exit(0);
diff --git a/core/bench/src/args/kinds/mod.rs b/core/bench/src/args/kinds/mod.rs
index 66e522a58..2238ffd5b 100644
--- a/core/bench/src/args/kinds/mod.rs
+++ b/core/bench/src/args/kinds/mod.rs
@@ -19,3 +19,4 @@
 pub mod balanced;
 pub mod end_to_end;
 pub mod pinned;
+pub mod stress;
diff --git a/core/bench/src/args/kinds/stress/args.rs 
b/core/bench/src/args/kinds/stress/args.rs
new file mode 100644
index 000000000..2b8c4a920
--- /dev/null
+++ b/core/bench/src/args/kinds/stress/args.rs
@@ -0,0 +1,160 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::args::{props::BenchmarkKindProps, 
transport::BenchmarkTransportCommand};
+use clap::{CommandFactory, Parser, ValueEnum, error::ErrorKind};
+use iggy::prelude::{IggyByteSize, IggyDuration, IggyExpiry};
+use std::num::NonZeroU32;
+use std::str::FromStr;
+
+const DEFAULT_PRODUCERS: NonZeroU32 = nonzero_lit::u32!(4);
+const DEFAULT_CONSUMERS: NonZeroU32 = nonzero_lit::u32!(4);
+const DEFAULT_CHURN_CONCURRENCY: NonZeroU32 = nonzero_lit::u32!(1);
+const DEFAULT_STREAMS: u32 = 2;
+const DEFAULT_PARTITIONS: u32 = 4;
+const DEFAULT_CONSUMER_GROUPS: u32 = 2;
+
+/// Determines the mix of API operations exercised during the stress test.
+#[derive(Debug, Clone, Copy, ValueEnum, Default)]
+pub enum ApiMix {
+    /// Data-plane + control-plane CRUD + admin operations
+    #[default]
+    Mixed,
+    /// Only `send_messages` and `poll_messages`
+    DataPlaneOnly,
+    /// Heavy CRUD churn with minimal data-plane
+    ControlPlaneHeavy,
+    /// All available APIs including admin operations
+    All,
+}
+
+#[derive(Parser, Debug, Clone)]
+pub struct StressArgs {
+    #[command(subcommand)]
+    pub transport: BenchmarkTransportCommand,
+
+    /// Total test duration (e.g. "2m", "10m", "1h")
+    #[arg(long, short = 'd', value_parser = IggyDuration::from_str)]
+    pub duration: IggyDuration,
+
+    /// Number of data-plane producer actors
+    #[arg(long, short = 'p', default_value_t = DEFAULT_PRODUCERS)]
+    pub producers: NonZeroU32,
+
+    /// Number of data-plane consumer actors
+    #[arg(long, short = 'c', default_value_t = DEFAULT_CONSUMERS)]
+    pub consumers: NonZeroU32,
+
+    /// Number of control-plane churner actors
+    #[arg(long, default_value_t = DEFAULT_CHURN_CONCURRENCY)]
+    pub churn_concurrency: NonZeroU32,
+
+    /// Interval between CRUD churn operations (e.g. "3s", "10s")
+    #[arg(long, default_value = "3s", value_parser = IggyDuration::from_str)]
+    pub churn_interval: IggyDuration,
+
+    /// Max topic size to bound disk usage. For maximum race density, also run
+    /// the server with `IGGY_SYSTEM_SEGMENT_SIZE="1MiB"`.
+    #[arg(long, default_value = "50MiB")]
+    pub max_topic_size: IggyByteSize,
+
+    /// Message TTL for automatic cleanup
+    #[arg(long, default_value = "30s", value_parser = IggyExpiry::from_str)]
+    pub message_expiry: IggyExpiry,
+
+    /// API operation mix
+    #[arg(long, value_enum, default_value_t = ApiMix::Mixed)]
+    pub api_mix: ApiMix,
+
+    /// RNG seed for reproducible chaos operations
+    #[arg(long)]
+    pub chaos_seed: Option<u64>,
+}
+
+impl BenchmarkKindProps for StressArgs {
+    fn streams(&self) -> u32 {
+        DEFAULT_STREAMS
+    }
+
+    fn partitions(&self) -> u32 {
+        DEFAULT_PARTITIONS
+    }
+
+    fn consumers(&self) -> u32 {
+        self.consumers.get()
+    }
+
+    fn producers(&self) -> u32 {
+        self.producers.get()
+    }
+
+    fn transport_command(&self) -> &BenchmarkTransportCommand {
+        &self.transport
+    }
+
+    fn number_of_consumer_groups(&self) -> u32 {
+        DEFAULT_CONSUMER_GROUPS
+    }
+
+    fn max_topic_size(&self) -> Option<IggyByteSize> {
+        Some(self.max_topic_size)
+    }
+
+    fn message_expiry(&self) -> IggyExpiry {
+        self.message_expiry
+    }
+
+    fn validate(&self) {
+        if self.duration.as_secs() < 10 {
+            crate::args::common::IggyBenchArgs::command()
+                .error(
+                    ErrorKind::ValueValidation,
+                    "Stress test duration must be at least 10 seconds",
+                )
+                .exit();
+        }
+    }
+}
+
+impl StressArgs {
+    pub const fn duration(&self) -> IggyDuration {
+        self.duration
+    }
+
+    pub const fn churn_concurrency(&self) -> NonZeroU32 {
+        self.churn_concurrency
+    }
+
+    pub const fn churn_interval(&self) -> IggyDuration {
+        self.churn_interval
+    }
+
+    pub const fn api_mix(&self) -> ApiMix {
+        self.api_mix
+    }
+
+    #[allow(clippy::cast_possible_truncation)]
+    pub fn chaos_seed(&self) -> u64 {
+        self.chaos_seed.unwrap_or_else(|| {
+            std::time::SystemTime::now()
+                .duration_since(std::time::UNIX_EPOCH)
+                .expect("system clock before epoch")
+                .as_nanos() as u64
+        })
+    }
+}
diff --git a/core/bench/src/args/kinds/mod.rs 
b/core/bench/src/args/kinds/stress/mod.rs
similarity index 93%
copy from core/bench/src/args/kinds/mod.rs
copy to core/bench/src/args/kinds/stress/mod.rs
index 66e522a58..6de087720 100644
--- a/core/bench/src/args/kinds/mod.rs
+++ b/core/bench/src/args/kinds/stress/mod.rs
@@ -16,6 +16,4 @@
  * under the License.
  */
 
-pub mod balanced;
-pub mod end_to_end;
-pub mod pinned;
+pub mod args;
diff --git a/core/bench/src/benchmarks/benchmark.rs 
b/core/bench/src/benchmarks/benchmark.rs
index 594a00101..e35558cc6 100644
--- a/core/bench/src/benchmarks/benchmark.rs
+++ b/core/bench/src/benchmarks/benchmark.rs
@@ -36,6 +36,7 @@ use 
super::end_to_end_producing_consumer_group::EndToEndProducingConsumerGroupBe
 use super::pinned_consumer::PinnedConsumerBenchmark;
 use super::pinned_producer::PinnedProducerBenchmark;
 use super::pinned_producer_and_consumer::PinnedProducerAndConsumerBenchmark;
+use super::stress::StressBenchmark;
 
 impl From<IggyBenchArgs> for Box<dyn Benchmarkable> {
     fn from(args: IggyBenchArgs) -> Self {
@@ -74,6 +75,9 @@ impl From<IggyBenchArgs> for Box<dyn Benchmarkable> {
             BenchmarkKindCommand::EndToEndProducingConsumerGroup(_) => 
Box::new(
                 EndToEndProducingConsumerGroupBenchmark::new(Arc::new(args), 
client_factory),
             ),
+            BenchmarkKindCommand::Stress(_) => {
+                Box::new(StressBenchmark::new(Arc::new(args), client_factory))
+            }
             BenchmarkKindCommand::Examples => {
                 unreachable!("Examples should be handled before this point")
             }
diff --git a/core/bench/src/benchmarks/common.rs 
b/core/bench/src/benchmarks/common.rs
index 7858f6fca..548a2f876 100644
--- a/core/bench/src/benchmarks/common.rs
+++ b/core/bench/src/benchmarks/common.rs
@@ -171,7 +171,8 @@ pub fn build_consumer_futures(
     let origin_timestamp_latency_calculation = match args.kind() {
         BenchmarkKind::PinnedConsumer | BenchmarkKind::BalancedConsumerGroup 
=> false,
         BenchmarkKind::PinnedProducerAndConsumer
-        | BenchmarkKind::BalancedProducerAndConsumerGroup => true,
+        | BenchmarkKind::BalancedProducerAndConsumerGroup
+        | BenchmarkKind::Stress => true,
         _ => unreachable!(),
     };
 
@@ -209,8 +210,9 @@ pub fn build_consumer_futures(
                 consumer_id
             };
             let stream_id = format!("bench-stream-{stream_idx}");
+            // Each stream has exactly one CG, server assigns IDs starting 
from 0
             let consumer_group_id = if cg_count > 0 {
-                Some(CONSUMER_GROUP_BASE_ID + (consumer_id % cg_count))
+                Some(CONSUMER_GROUP_BASE_ID)
             } else {
                 None
             };
diff --git a/core/bench/src/benchmarks/mod.rs b/core/bench/src/benchmarks/mod.rs
index 6e829b362..cd1b5eac0 100644
--- a/core/bench/src/benchmarks/mod.rs
+++ b/core/bench/src/benchmarks/mod.rs
@@ -27,6 +27,8 @@ pub mod end_to_end_producing_consumer_group;
 pub mod pinned_consumer;
 pub mod pinned_producer;
 pub mod pinned_producer_and_consumer;
+pub mod stress;
+pub mod stress_report;
 
 pub const CONSUMER_GROUP_BASE_ID: u32 = 0;
 pub const CONSUMER_GROUP_NAME_PREFIX: &str = "cg";
diff --git a/core/bench/src/benchmarks/stress.rs 
b/core/bench/src/benchmarks/stress.rs
new file mode 100644
index 000000000..8fc1dc13c
--- /dev/null
+++ b/core/bench/src/benchmarks/stress.rs
@@ -0,0 +1,281 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::benchmark::Benchmarkable;
+use super::common::{build_consumer_futures, build_producer_futures, 
init_consumer_groups};
+use super::stress_report::StressReport;
+use crate::actors::stress::admin_exerciser::AdminExerciser;
+use crate::actors::stress::control_plane_churner::{ChurnerConfig, 
ControlPlaneChurner};
+use crate::actors::stress::health_monitor::HealthMonitor;
+use crate::actors::stress::stress_context::StressContext;
+use crate::actors::stress::verifier::StressVerifier;
+use crate::args::common::IggyBenchArgs;
+use crate::args::kind::BenchmarkKindCommand;
+use crate::args::kinds::stress::args::ApiMix;
+use crate::utils::ClientFactory;
+use crate::utils::finish_condition::BenchmarkFinishCondition;
+use async_trait::async_trait;
+use bench_report::{benchmark_kind::BenchmarkKind, 
individual_metrics::BenchmarkIndividualMetrics};
+use iggy::prelude::*;
+use std::sync::Arc;
+use std::time::{Duration, Instant};
+use tokio::task::JoinSet;
+use tracing::{info, warn};
+
+/// Phase durations as fractions of total test time.
+const BASELINE_FRACTION: f64 = 0.15;
+const CHAOS_FRACTION: f64 = 0.65;
+/// Max drain phase duration.
+const MAX_DRAIN_SECS: u64 = 300;
+
+pub struct StressBenchmark {
+    args: Arc<IggyBenchArgs>,
+    client_factory: Arc<dyn ClientFactory>,
+}
+
+impl StressBenchmark {
+    pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn 
ClientFactory>) -> Self {
+        Self {
+            args,
+            client_factory,
+        }
+    }
+
+    fn stress_args(&self) -> &crate::args::kinds::stress::args::StressArgs {
+        match &self.args.benchmark_kind {
+            BenchmarkKindCommand::Stress(args) => args,
+            _ => unreachable!("StressBenchmark only used with Stress variant"),
+        }
+    }
+
+    fn compute_phase_durations(&self) -> (Duration, Duration, Duration) {
+        let total = self.stress_args().duration().get_duration();
+        let total_secs = total.as_secs_f64();
+
+        let baseline = Duration::from_secs_f64(total_secs * BASELINE_FRACTION);
+        let chaos = Duration::from_secs_f64(total_secs * CHAOS_FRACTION);
+        #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
+        let drain_secs = (total_secs * (1.0 - BASELINE_FRACTION - 
CHAOS_FRACTION)) as u64;
+        let drain = Duration::from_secs(drain_secs.min(MAX_DRAIN_SECS));
+
+        (baseline, chaos, drain)
+    }
+
+    /// Spawns chaos actors: control-plane churner, admin exerciser, health 
monitor.
+    fn spawn_chaos_actors(
+        &self,
+        tasks: &mut JoinSet<Result<BenchmarkIndividualMetrics, IggyError>>,
+        ctx: &Arc<StressContext>,
+    ) {
+        let stress_args = self.stress_args();
+        let api_mix = stress_args.api_mix();
+        let chaos_seed = stress_args.chaos_seed();
+
+        // Health monitor always runs
+        let monitor = HealthMonitor::new(self.client_factory.clone(), 
ctx.clone());
+        tasks.spawn(async move {
+            monitor.run().await;
+            Ok(BenchmarkIndividualMetrics::placeholder("health_monitor"))
+        });
+
+        // Control-plane churner(s) unless data-plane-only
+        if !matches!(api_mix, ApiMix::DataPlaneOnly) {
+            let churn_concurrency = stress_args.churn_concurrency().get();
+            let churn_interval = stress_args.churn_interval();
+
+            let churner_config = ChurnerConfig {
+                api_mix,
+                partitions: self.args.number_of_partitions(),
+                message_expiry: stress_args.message_expiry,
+                max_topic_size: 
MaxTopicSize::Custom(stress_args.max_topic_size),
+            };
+
+            for i in 0..churn_concurrency {
+                let churner = ControlPlaneChurner::new(
+                    i + 1,
+                    self.client_factory.clone(),
+                    ctx.clone(),
+                    churn_interval,
+                    chaos_seed,
+                    &churner_config,
+                );
+                tasks.spawn(async move {
+                    churner.run().await;
+                    Ok(BenchmarkIndividualMetrics::placeholder(
+                        "control_plane_churner",
+                    ))
+                });
+            }
+        }
+
+        // Admin exerciser for mixed/all modes
+        if matches!(api_mix, ApiMix::Mixed | ApiMix::All) {
+            let admin = AdminExerciser::new(self.client_factory.clone(), 
ctx.clone());
+            tasks.spawn(async move {
+                admin.run().await;
+                Ok(BenchmarkIndividualMetrics::placeholder("admin_exerciser"))
+            });
+        }
+    }
+}
+
+#[async_trait]
+impl Benchmarkable for StressBenchmark {
+    async fn run(
+        &mut self,
+    ) -> Result<JoinSet<Result<BenchmarkIndividualMetrics, IggyError>>, 
IggyError> {
+        let overall_start = Instant::now();
+        let (baseline_duration, chaos_duration, drain_max) = 
self.compute_phase_durations();
+
+        info!(
+            "Stress test starting: baseline={baseline_duration:?}, 
chaos={chaos_duration:?}, drain_max={drain_max:?}"
+        );
+
+        // Setup: create streams, topics, consumer groups
+        self.init_streams().await?;
+        init_consumer_groups(&self.client_factory, &self.args).await?;
+
+        let ctx = Arc::new(StressContext::new());
+
+        let mut tasks: JoinSet<Result<BenchmarkIndividualMetrics, IggyError>> 
= JoinSet::new();
+
+        // === PHASE 1: Baseline (data-plane only) ===
+        info!("=== Phase 1: Baseline ({baseline_duration:?}) ===");
+
+        // Create duration-based finish conditions for baseline
+        let baseline_finish =
+            
BenchmarkFinishCondition::new_duration(IggyDuration::from(baseline_duration));
+
+        // Spawn producers and consumers with duration-based finish
+        let producer_futures = build_producer_futures(&self.client_factory, 
&self.args);
+        let consumer_futures = build_consumer_futures(&self.client_factory, 
&self.args);
+
+        for fut in producer_futures {
+            tasks.spawn(fut);
+        }
+        for fut in consumer_futures {
+            tasks.spawn(fut);
+        }
+
+        // Wait for baseline to complete (time-based)
+        tokio::time::sleep(baseline_duration).await;
+        let baseline_elapsed = overall_start.elapsed();
+        info!("Baseline phase completed in {baseline_elapsed:?}");
+
+        // Cancel baseline actors by dropping the finish condition reference
+        // (they'll see is_done() == true on the shared condition).
+        // The existing actors use their own finish conditions from 
build_*_futures.
+        // For the stress benchmark, we overlap phases: baseline actors 
continue into chaos.
+        drop(baseline_finish);
+
+        // === PHASE 2: Chaos (add churners + admin + health alongside ongoing 
data-plane) ===
+        info!("=== Phase 2: Chaos ({chaos_duration:?}) ===");
+
+        self.spawn_chaos_actors(&mut tasks, &ctx);
+
+        tokio::time::sleep(chaos_duration).await;
+        let chaos_elapsed = overall_start.elapsed();
+        info!("Chaos phase completed in {chaos_elapsed:?}");
+
+        // === PHASE 3: Drain ===
+        info!("=== Phase 3: Drain (max {drain_max:?}) ===");
+
+        // Signal all chaos actors to stop
+        ctx.cancel();
+
+        // Wait for remaining tasks with a timeout
+        let drain_start = Instant::now();
+        let drain_deadline = drain_start + drain_max;
+        while !tasks.is_empty() {
+            let remaining = 
drain_deadline.saturating_duration_since(Instant::now());
+            if remaining.is_zero() {
+                warn!("Drain phase timed out, aborting remaining tasks");
+                tasks.abort_all();
+                break;
+            }
+
+            match tokio::time::timeout(remaining, tasks.join_next()).await {
+                Ok(Some(result)) => {
+                    if let Err(e) = result {
+                        warn!("Actor join failed: {e}");
+                    }
+                }
+                Ok(None) => break,
+                Err(_) => {
+                    warn!("Drain phase timed out");
+                    tasks.abort_all();
+                    break;
+                }
+            }
+        }
+        let drain_elapsed = drain_start.elapsed();
+
+        // === Verification ===
+        info!("Running post-test verification...");
+        let verifier = StressVerifier::new(
+            self.client_factory.clone(),
+            self.args.streams(),
+            self.args.number_of_partitions(),
+        );
+        let verification = verifier.verify().await;
+
+        // Build and print the stress report
+        let report = StressReport::build(
+            &ctx.stats,
+            &verification,
+            overall_start.elapsed(),
+            baseline_duration,
+            chaos_duration,
+            drain_elapsed,
+        );
+        report.print_summary();
+
+        if !verification.passed {
+            warn!("Stress test verification FAILED");
+        }
+
+        // Return an empty JoinSet since we already joined everything
+        Ok(JoinSet::new())
+    }
+
+    fn kind(&self) -> BenchmarkKind {
+        BenchmarkKind::Stress
+    }
+
+    fn args(&self) -> &IggyBenchArgs {
+        &self.args
+    }
+
+    fn client_factory(&self) -> &Arc<dyn ClientFactory> {
+        &self.client_factory
+    }
+
+    fn print_info(&self) {
+        let stress_args = self.stress_args();
+        info!(
+            "Starting Stress benchmark: duration={}, producers={}, 
consumers={}, churn_concurrency={}, churn_interval={}, api_mix={:?}, 
chaos_seed={}",
+            stress_args.duration(),
+            stress_args.producers.get(),
+            stress_args.consumers.get(),
+            stress_args.churn_concurrency().get(),
+            stress_args.churn_interval(),
+            stress_args.api_mix(),
+            stress_args.chaos_seed(),
+        );
+    }
+}
diff --git a/core/bench/src/benchmarks/stress_report.rs 
b/core/bench/src/benchmarks/stress_report.rs
new file mode 100644
index 000000000..6ff549f0c
--- /dev/null
+++ b/core/bench/src/benchmarks/stress_report.rs
@@ -0,0 +1,215 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::actors::stress::{stress_context::StressStats, 
verifier::VerificationResult};
+use serde::Serialize;
+use std::sync::atomic::{AtomicU64, Ordering};
+use std::time::Duration;
+
+/// Structured stress test report, serializable to JSON for `--output-dir`.
+#[derive(Debug, Serialize)]
+pub struct StressReport {
+    pub total_duration_secs: f64,
+    pub baseline_duration_secs: f64,
+    pub chaos_duration_secs: f64,
+    pub drain_duration_secs: f64,
+    pub api_calls: ApiCallSummary,
+    pub error_tiers: ErrorTierSummary,
+    pub verification: VerificationSummary,
+}
+
+#[derive(Debug, Serialize)]
+pub struct ApiCallSummary {
+    pub send_messages: CallCount,
+    pub poll_messages: CallCount,
+    pub create_topic: CallCount,
+    pub delete_topic: CallCount,
+    pub create_partitions: CallCount,
+    pub delete_partitions: CallCount,
+    pub create_consumer_group: CallCount,
+    pub delete_consumer_group: CallCount,
+    pub join_consumer_group: CallCount,
+    pub leave_consumer_group: CallCount,
+    pub purge_topic: CallCount,
+    pub delete_segments: CallCount,
+    pub update_topic: CallCount,
+    pub purge_stream: CallCount,
+    pub stress_poll: CallCount,
+    pub create_user: CallCount,
+    pub delete_user: CallCount,
+    pub create_pat: CallCount,
+    pub delete_pat: CallCount,
+    pub store_offset: CallCount,
+    pub get_offset: CallCount,
+    pub ping: CallCount,
+    pub get_stats: CallCount,
+    pub get_me: CallCount,
+    pub get_clients: CallCount,
+    pub flush: CallCount,
+    pub total_ok: u64,
+    pub total_err: u64,
+}
+
+#[derive(Debug, Serialize)]
+pub struct CallCount {
+    pub ok: u64,
+    pub err: u64,
+}
+
+impl CallCount {
+    fn load(ok: &AtomicU64, err: &AtomicU64) -> Self {
+        Self {
+            ok: ok.load(Ordering::Relaxed),
+            err: err.load(Ordering::Relaxed),
+        }
+    }
+}
+
+#[derive(Debug, Serialize)]
+pub struct ErrorTierSummary {
+    pub expected: u64,
+    pub unexpected: u64,
+}
+
+#[derive(Debug, Serialize)]
+pub struct VerificationSummary {
+    pub partitions_checked: u32,
+    pub total_messages: u64,
+    pub gaps_found: u64,
+    pub duplicates_found: u64,
+    pub checksum_mismatches: u64,
+    pub payload_length_mismatches: u64,
+    pub id_missing_fingerprint: u64,
+    pub passed: bool,
+}
+
+impl StressReport {
+    pub fn build(
+        stats: &StressStats,
+        verification: &VerificationResult,
+        total_duration: Duration,
+        baseline_duration: Duration,
+        chaos_duration: Duration,
+        drain_duration: Duration,
+    ) -> Self {
+        let api_calls = ApiCallSummary::from_stats(stats);
+        Self {
+            total_duration_secs: total_duration.as_secs_f64(),
+            baseline_duration_secs: baseline_duration.as_secs_f64(),
+            chaos_duration_secs: chaos_duration.as_secs_f64(),
+            drain_duration_secs: drain_duration.as_secs_f64(),
+            error_tiers: ErrorTierSummary {
+                expected: stats.expected_errors.load(Ordering::Relaxed),
+                unexpected: stats.unexpected_errors.load(Ordering::Relaxed),
+            },
+            api_calls,
+            verification: VerificationSummary {
+                partitions_checked: verification.partitions_checked,
+                total_messages: verification.total_messages,
+                gaps_found: verification.gaps_found,
+                duplicates_found: verification.duplicates_found,
+                checksum_mismatches: verification.checksum_mismatches,
+                payload_length_mismatches: 
verification.payload_length_mismatches,
+                id_missing_fingerprint: verification.id_missing_fingerprint,
+                passed: verification.passed,
+            },
+        }
+    }
+
+    pub fn print_summary(&self) {
+        println!("\n=== STRESS TEST REPORT ===");
+        println!(
+            "Duration: {:.1}s (baseline: {:.1}s, chaos: {:.1}s, drain: 
{:.1}s)",
+            self.total_duration_secs,
+            self.baseline_duration_secs,
+            self.chaos_duration_secs,
+            self.drain_duration_secs
+        );
+        println!(
+            "API calls: {} ok, {} err",
+            self.api_calls.total_ok, self.api_calls.total_err
+        );
+        println!(
+            "Errors: {} expected, {} unexpected",
+            self.error_tiers.expected, self.error_tiers.unexpected
+        );
+        println!(
+            "Verification: {} partitions, {} msgs, {} gaps, {} dups, {} 
checksum, {} len, {} id -> {}",
+            self.verification.partitions_checked,
+            self.verification.total_messages,
+            self.verification.gaps_found,
+            self.verification.duplicates_found,
+            self.verification.checksum_mismatches,
+            self.verification.payload_length_mismatches,
+            self.verification.id_missing_fingerprint,
+            if self.verification.passed {
+                "PASSED"
+            } else {
+                "FAILED"
+            }
+        );
+        println!("==========================\n");
+    }
+}
+
+impl ApiCallSummary {
+    fn from_stats(s: &StressStats) -> Self {
+        Self {
+            send_messages: CallCount::load(&s.send_messages_ok, 
&s.send_messages_err),
+            poll_messages: CallCount::load(&s.poll_messages_ok, 
&s.poll_messages_err),
+            create_topic: CallCount::load(&s.create_topic_ok, 
&s.create_topic_err),
+            delete_topic: CallCount::load(&s.delete_topic_ok, 
&s.delete_topic_err),
+            create_partitions: CallCount::load(&s.create_partitions_ok, 
&s.create_partitions_err),
+            delete_partitions: CallCount::load(&s.delete_partitions_ok, 
&s.delete_partitions_err),
+            create_consumer_group: CallCount::load(
+                &s.create_consumer_group_ok,
+                &s.create_consumer_group_err,
+            ),
+            delete_consumer_group: CallCount::load(
+                &s.delete_consumer_group_ok,
+                &s.delete_consumer_group_err,
+            ),
+            join_consumer_group: CallCount::load(
+                &s.join_consumer_group_ok,
+                &s.join_consumer_group_err,
+            ),
+            leave_consumer_group: CallCount::load(
+                &s.leave_consumer_group_ok,
+                &s.leave_consumer_group_err,
+            ),
+            purge_topic: CallCount::load(&s.purge_topic_ok, 
&s.purge_topic_err),
+            delete_segments: CallCount::load(&s.delete_segments_ok, 
&s.delete_segments_err),
+            update_topic: CallCount::load(&s.update_topic_ok, 
&s.update_topic_err),
+            purge_stream: CallCount::load(&s.purge_stream_ok, 
&s.purge_stream_err),
+            stress_poll: CallCount::load(&s.stress_poll_ok, 
&s.stress_poll_err),
+            create_user: CallCount::load(&s.create_user_ok, 
&s.create_user_err),
+            delete_user: CallCount::load(&s.delete_user_ok, 
&s.delete_user_err),
+            create_pat: CallCount::load(&s.create_pat_ok, &s.create_pat_err),
+            delete_pat: CallCount::load(&s.delete_pat_ok, &s.delete_pat_err),
+            store_offset: CallCount::load(&s.store_offset_ok, 
&s.store_offset_err),
+            get_offset: CallCount::load(&s.get_offset_ok, &s.get_offset_err),
+            ping: CallCount::load(&s.ping_ok, &s.ping_err),
+            get_stats: CallCount::load(&s.get_stats_ok, &s.get_stats_err),
+            get_me: CallCount::load(&s.get_me_ok, &s.get_me_err),
+            get_clients: CallCount::load(&s.get_clients_ok, 
&s.get_clients_err),
+            flush: CallCount::load(&s.flush_ok, &s.flush_err),
+            total_ok: s.total_ok(),
+            total_err: s.total_err(),
+        }
+    }
+}
diff --git a/core/bench/src/utils/finish_condition.rs 
b/core/bench/src/utils/finish_condition.rs
index 01bd174ef..858ca569f 100644
--- a/core/bench/src/utils/finish_condition.rs
+++ b/core/bench/src/utils/finish_condition.rs
@@ -17,12 +17,14 @@
 
 use crate::args::{common::IggyBenchArgs, kind::BenchmarkKindCommand};
 use human_repr::HumanCount;
+use iggy::prelude::IggyDuration;
 use std::{
     fmt::Display,
     sync::{
         Arc,
         atomic::{AtomicI64, Ordering},
     },
+    time::Instant,
 };
 
 const MINIMUM_MSG_PAYLOAD_SIZE: usize = 20;
@@ -48,8 +50,9 @@ pub enum BenchmarkFinishConditionMode {
 
 #[derive(Debug, Clone, Copy, PartialEq)]
 enum BenchmarkFinishConditionType {
-    ByTotalData,
-    ByMessageBatchesCount,
+    TotalData,
+    MessageBatchesCount,
+    Duration,
 }
 
 pub struct BenchmarkFinishCondition {
@@ -57,6 +60,7 @@ pub struct BenchmarkFinishCondition {
     total: u64,
     left_total: Arc<AtomicI64>,
     mode: BenchmarkFinishConditionMode,
+    start_time: Option<Instant>,
 }
 
 impl BenchmarkFinishCondition {
@@ -94,64 +98,99 @@ impl BenchmarkFinishCondition {
             }
             BenchmarkKindCommand::EndToEndProducingConsumer(_)
             | BenchmarkKindCommand::EndToEndProducingConsumerGroup(_) => 
args.producers() * 2,
+            BenchmarkKindCommand::Stress(_) => args.producers() + 
args.consumers(),
             BenchmarkKindCommand::Examples => unreachable!(),
         };
 
-        Arc::new(match (total_data, batches_count) {
+        match (total_data, batches_count) {
             (None, Some(count)) => {
                 let count_per_actor = (count.get() * total_data_multiplier) / 
total_data_factor;
 
-                Self {
-                    kind: BenchmarkFinishConditionType::ByMessageBatchesCount,
+                Arc::new(Self {
+                    kind: BenchmarkFinishConditionType::MessageBatchesCount,
                     total: u64::from(count_per_actor),
                     left_total: 
Arc::new(AtomicI64::new(i64::from(count_per_actor))),
                     mode,
-                }
+                    start_time: None,
+                })
             }
             (Some(size), None) => {
                 let bytes_per_actor = size.as_bytes_u64() / 
u64::from(total_data_factor);
 
-                Self {
-                    kind: BenchmarkFinishConditionType::ByTotalData,
+                Arc::new(Self {
+                    kind: BenchmarkFinishConditionType::TotalData,
                     total: bytes_per_actor,
                     left_total: Arc::new(AtomicI64::new(
                         i64::try_from(bytes_per_actor).unwrap_or(i64::MAX),
                     )),
                     mode,
+                    start_time: None,
+                })
+            }
+            (None, None) => {
+                // Stress benchmark uses --duration; extract it from args
+                if let BenchmarkKindCommand::Stress(stress_args) = 
&args.benchmark_kind {
+                    Self::new_duration(stress_args.duration())
+                } else {
+                    panic!("Either --total-messages-size or --message-batches 
must be provided")
                 }
             }
-            _ => unreachable!(),
-        })
+            (Some(_), Some(_)) => {
+                panic!("Cannot specify both --total-messages-size and 
--message-batches")
+            }
+        }
     }
 
     /// Creates an "empty" benchmark finish condition that is already 
satisfied.
     /// This is useful for consumer-only actors that don't need to produce any 
messages.
     pub fn new_empty() -> Arc<Self> {
         Arc::new(Self {
-            kind: BenchmarkFinishConditionType::ByMessageBatchesCount,
+            kind: BenchmarkFinishConditionType::MessageBatchesCount,
             total: 0,
             left_total: Arc::new(AtomicI64::new(0)),
             mode: BenchmarkFinishConditionMode::Shared,
+            start_time: None,
+        })
+    }
+
+    /// Creates a duration-based finish condition that completes after the 
given time elapses.
+    pub fn new_duration(duration: IggyDuration) -> Arc<Self> {
+        Arc::new(Self {
+            kind: BenchmarkFinishConditionType::Duration,
+            total: duration.as_micros(),
+            left_total: Arc::new(AtomicI64::new(i64::MAX)),
+            mode: BenchmarkFinishConditionMode::Shared,
+            start_time: Some(Instant::now()),
         })
     }
 
     pub fn account_and_check(&self, size_to_subtract: u64) -> bool {
         match self.kind {
-            BenchmarkFinishConditionType::ByTotalData => {
+            BenchmarkFinishConditionType::TotalData => {
                 self.left_total.fetch_sub(
                     i64::try_from(size_to_subtract).unwrap_or(i64::MAX),
                     Ordering::AcqRel,
                 );
+                self.left_total.load(Ordering::Acquire) <= 0
             }
-            BenchmarkFinishConditionType::ByMessageBatchesCount => {
+            BenchmarkFinishConditionType::MessageBatchesCount => {
                 self.left_total.fetch_sub(1, Ordering::AcqRel);
+                self.left_total.load(Ordering::Acquire) <= 0
             }
+            BenchmarkFinishConditionType::Duration => self.is_elapsed(),
         }
-        self.left_total.load(Ordering::Acquire) <= 0
     }
 
     pub fn is_done(&self) -> bool {
-        self.left() <= 0
+        match self.kind {
+            BenchmarkFinishConditionType::Duration => self.is_elapsed(),
+            _ => self.left() <= 0,
+        }
+    }
+
+    fn is_elapsed(&self) -> bool {
+        self.start_time
+            .is_some_and(|start| start.elapsed() >= 
std::time::Duration::from_micros(self.total))
     }
 
     pub const fn total(&self) -> u64 {
@@ -160,17 +199,20 @@ impl BenchmarkFinishCondition {
 
     pub fn total_str(&self) -> String {
         match self.kind {
-            BenchmarkFinishConditionType::ByTotalData => {
+            BenchmarkFinishConditionType::TotalData => {
                 format!(
                     "messages of size: {} ({})",
                     self.total.human_count_bytes(),
                     self.mode
                 )
             }
-
-            BenchmarkFinishConditionType::ByMessageBatchesCount => {
+            BenchmarkFinishConditionType::MessageBatchesCount => {
                 format!("{} batches ({})", self.total.human_count_bare(), 
self.mode)
             }
+            BenchmarkFinishConditionType::Duration => {
+                let secs = self.total / 1_000_000;
+                format!("duration: {secs}s ({mode})", mode = self.mode)
+            }
         }
     }
 
@@ -182,7 +224,7 @@ impl BenchmarkFinishCondition {
         let done = i64::try_from(self.total()).unwrap_or(i64::MAX) - 
self.left();
         let total = i64::try_from(self.total()).unwrap_or(i64::MAX);
         match self.kind {
-            BenchmarkFinishConditionType::ByTotalData => {
+            BenchmarkFinishConditionType::TotalData => {
                 format!(
                     "{}/{} ({})",
                     done.human_count_bytes(),
@@ -190,7 +232,7 @@ impl BenchmarkFinishCondition {
                     self.mode
                 )
             }
-            BenchmarkFinishConditionType::ByMessageBatchesCount => {
+            BenchmarkFinishConditionType::MessageBatchesCount => {
                 format!(
                     "{}/{} ({})",
                     done.human_count_bare(),
@@ -198,15 +240,26 @@ impl BenchmarkFinishCondition {
                     self.mode
                 )
             }
+            BenchmarkFinishConditionType::Duration => {
+                let elapsed_secs = self.start_time.map_or(0, |s| 
s.elapsed().as_secs());
+                let total_secs = self.total / 1_000_000;
+                format!("{elapsed_secs}s/{total_secs}s ({mode})", mode = 
self.mode)
+            }
         }
     }
 
     pub fn max_capacity(&self) -> usize {
-        let value = self.left_total.load(Ordering::Relaxed);
-        if self.kind == BenchmarkFinishConditionType::ByTotalData {
-            usize::try_from(value).unwrap_or(0) / MINIMUM_MSG_PAYLOAD_SIZE
-        } else {
-            usize::try_from(value).unwrap_or(0)
+        match self.kind {
+            BenchmarkFinishConditionType::TotalData => {
+                let value = self.left_total.load(Ordering::Relaxed);
+                usize::try_from(value).unwrap_or(0) / MINIMUM_MSG_PAYLOAD_SIZE
+            }
+            BenchmarkFinishConditionType::MessageBatchesCount => {
+                let value = self.left_total.load(Ordering::Relaxed);
+                usize::try_from(value).unwrap_or(0)
+            }
+            // Duration-based conditions use a reasonable default buffer size
+            BenchmarkFinishConditionType::Duration => 10_000,
         }
     }
 }
diff --git a/core/bench/src/utils/mod.rs b/core/bench/src/utils/mod.rs
index e8ee8ebd3..1b76f03a5 100644
--- a/core/bench/src/utils/mod.rs
+++ b/core/bench/src/utils/mod.rs
@@ -256,6 +256,7 @@ fn add_benchmark_kind_arguments(parts: &mut Vec<String>, 
args: &IggyBenchArgs) {
         BenchmarkKind::BalancedProducerAndConsumerGroup => 
"balanced-producer-and-consumer-group",
         BenchmarkKind::EndToEndProducingConsumer => 
"end-to-end-producing-consumer",
         BenchmarkKind::EndToEndProducingConsumerGroup => 
"end-to-end-producing-consumer-group",
+        BenchmarkKind::Stress => "stress",
     };
     parts.push(kind_str.to_string());
 
@@ -281,7 +282,8 @@ fn add_actor_arguments(parts: &mut Vec<String>, args: 
&IggyBenchArgs) {
             }
         }
         BenchmarkKind::PinnedProducerAndConsumer
-        | BenchmarkKind::BalancedProducerAndConsumerGroup => {
+        | BenchmarkKind::BalancedProducerAndConsumerGroup
+        | BenchmarkKind::Stress => {
             if producers != DEFAULT_NUMBER_OF_PRODUCERS.get() {
                 parts.push(format!("--producers {producers}"));
             }

Reply via email to