This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch lab
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 3e8290a14f1cbd34459f1cb38e4d87913c5a94e8
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Feb 13 21:13:59 2026 +0100

    lab
---
 Cargo.lock                                 |  18 ++
 Cargo.toml                                 |   1 +
 DEPENDENCIES.md                            |   1 +
 core/lab/Cargo.toml                        |  43 ++++
 core/lab/src/args.rs                       | 169 +++++++++++++
 core/lab/src/client.rs                     |  66 +++++
 core/lab/src/invariants.rs                 | 297 +++++++++++++++++++++++
 core/lab/src/main.rs                       |  94 +++++++
 core/lab/src/ops/classify.rs               | 111 +++++++++
 core/lab/src/ops/execute.rs                | 285 ++++++++++++++++++++++
 core/lab/src/ops/generate.rs               | 189 +++++++++++++++
 core/lab/src/ops/mod.rs                    | 197 +++++++++++++++
 core/lab/src/ops/precondition.rs           |  98 ++++++++
 core/lab/src/ops/shadow_update.rs          |  89 +++++++
 core/lab/src/replay.rs                     | 306 +++++++++++++++++++++++
 core/lab/src/report.rs                     | 101 ++++++++
 core/lab/src/runner.rs                     | 378 +++++++++++++++++++++++++++++
 core/lab/src/safe_name.rs                  | 114 +++++++++
 core/lab/src/scenarios/concurrent_crud.rs  |  50 ++++
 core/lab/src/scenarios/mixed_workload.rs   |  57 +++++
 core/lab/src/scenarios/mod.rs              |  55 +++++
 core/lab/src/scenarios/segment_rotation.rs |  46 ++++
 core/lab/src/shadow.rs                     | 244 +++++++++++++++++++
 core/lab/src/trace.rs                      | 129 ++++++++++
 core/lab/src/worker.rs                     | 244 +++++++++++++++++++
 25 files changed, 3382 insertions(+)

diff --git a/Cargo.lock b/Cargo.lock
index d7ffd97b4..afe8e2a45 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4621,6 +4621,24 @@ dependencies = [
  "tracing-subscriber",
 ]
 
+[[package]]
+name = "iggy-lab"
+version = "0.1.0"
+dependencies = [
+ "bytes",
+ "chrono",
+ "clap",
+ "dashmap",
+ "iggy",
+ "iggy_common",
+ "rand 0.10.0",
+ "serde",
+ "serde_json",
+ "tokio",
+ "tracing",
+ "tracing-subscriber",
+]
+
 [[package]]
 name = "iggy-mcp"
 version = "0.2.3-edge.1"
diff --git a/Cargo.toml b/Cargo.toml
index e306352c1..0d4dbc4fe 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -45,6 +45,7 @@ members = [
     "core/harness_derive",
     "core/integration",
     "core/journal",
+    "core/lab",
     "core/message_bus",
     "core/metadata",
     "core/partitions",
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index a9e14e6d4..94d445ae9 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -393,6 +393,7 @@ iggy-bench: 0.3.3-edge.1, "Apache-2.0",
 iggy-bench-dashboard-server: 0.5.1-edge.1, "Apache-2.0",
 iggy-cli: 0.10.3-edge.1, "Apache-2.0",
 iggy-connectors: 0.2.4-edge.1, "Apache-2.0",
+iggy-lab: 0.1.0, "Apache-2.0",
 iggy-mcp: 0.2.3-edge.1, "Apache-2.0",
 iggy_binary_protocol: 0.8.3-edge.1, "Apache-2.0",
 iggy_common: 0.8.3-edge.1, "Apache-2.0",
diff --git a/core/lab/Cargo.toml b/core/lab/Cargo.toml
new file mode 100644
index 000000000..64e628a6b
--- /dev/null
+++ b/core/lab/Cargo.toml
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "iggy-lab"
+version = "0.1.0"
+edition = "2024"
+license = "Apache-2.0"
+repository = "https://github.com/apache/iggy";
+homepage = "https://iggy.apache.org";
+description = "Chaos stress testing CLI for Iggy message streaming platform"
+
+[[bin]]
+name = "iggy-lab"
+path = "src/main.rs"
+
+[dependencies]
+bytes = { workspace = true }
+chrono = { workspace = true }
+clap = { workspace = true }
+dashmap = { workspace = true }
+iggy = { workspace = true }
+iggy_common = { workspace = true }
+rand = { workspace = true }
+serde = { workspace = true }
+serde_json = { workspace = true }
+tokio = { workspace = true }
+tracing = { workspace = true }
+tracing-subscriber = { workspace = true }
diff --git a/core/lab/src/args.rs b/core/lab/src/args.rs
new file mode 100644
index 000000000..acc35a349
--- /dev/null
+++ b/core/lab/src/args.rs
@@ -0,0 +1,169 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use clap::{Parser, Subcommand, ValueEnum};
+use iggy_common::IggyDuration;
+use std::path::PathBuf;
+use std::str::FromStr;
+
+/// Default message payload size in bytes.
+const DEFAULT_MESSAGE_SIZE: u32 = 256;
+
+#[derive(Parser)]
+#[command(
+    name = "iggy-lab",
+    about = "Chaos stress testing CLI for Iggy message streaming platform"
+)]
+pub struct IggyLabArgs {
+    #[command(subcommand)]
+    pub command: Command,
+}
+
+#[derive(Subcommand)]
+pub enum Command {
+    /// Run a chaos scenario against a live Iggy server
+    Run(RunArgs),
+    /// Replay a recorded trace against a live Iggy server
+    Replay(ReplayArgs),
+    /// List available scenarios
+    List,
+    /// Show detailed description of a scenario
+    Explain {
+        /// Scenario name to explain
+        scenario: ScenarioName,
+    },
+}
+
+#[derive(Parser)]
+pub struct RunArgs {
+    /// Which scenario to run
+    pub scenario: ScenarioName,
+
+    /// Server address (host:port)
+    #[arg(long, default_value = "127.0.0.1:8090")]
+    pub server_address: String,
+
+    /// Transport protocol
+    #[arg(long, default_value = "tcp")]
+    pub transport: Transport,
+
+    /// PRNG seed for reproducibility (random if omitted)
+    #[arg(long)]
+    pub seed: Option<u64>,
+
+    /// How long to run the chaos phase
+    #[arg(long, default_value = "30s", value_parser = IggyDuration::from_str)]
+    pub duration: IggyDuration,
+
+    /// Stop after this many total ops (across all workers). Duration still 
acts as safety timeout.
+    #[arg(long)]
+    pub ops: Option<u64>,
+
+    /// Resource name prefix for safety isolation
+    #[arg(long, default_value = "lab-")]
+    pub prefix: String,
+
+    /// Directory for trace artifacts (stdout-only if omitted)
+    #[arg(long)]
+    pub output_dir: Option<PathBuf>,
+
+    /// Number of concurrent worker tasks
+    #[arg(long, default_value = "8")]
+    pub workers: u32,
+
+    /// Payload size in bytes per message
+    #[arg(long, default_value = "256")]
+    pub message_size: u32,
+
+    /// Messages per send batch
+    #[arg(long, default_value = "10")]
+    pub messages_per_batch: u32,
+
+    /// Continue running after ServerBug classification instead of stopping
+    #[arg(long)]
+    pub no_fail_fast: bool,
+
+    /// Delete stale lab-prefixed resources before starting
+    #[arg(long)]
+    pub force_cleanup: bool,
+
+    /// Skip post-run resource deletion
+    #[arg(long)]
+    pub no_cleanup: bool,
+
+    /// Skip post-run invariant verification
+    #[arg(long)]
+    pub skip_post_run_verify: bool,
+}
+
+#[derive(Parser)]
+pub struct ReplayArgs {
+    /// Directory containing trace-worker-*.jsonl files
+    pub trace_dir: PathBuf,
+
+    /// Server address (host:port)
+    #[arg(long, default_value = "127.0.0.1:8090")]
+    pub server_address: String,
+
+    /// Transport protocol
+    #[arg(long, default_value = "tcp")]
+    pub transport: Transport,
+
+    /// Payload size in bytes per message
+    #[arg(long, default_value_t = DEFAULT_MESSAGE_SIZE)]
+    pub message_size: u32,
+
+    /// Resource name prefix for cleanup
+    #[arg(long, default_value = "lab-")]
+    pub prefix: String,
+
+    /// Stop on first concerning divergence
+    #[arg(long)]
+    pub fail_fast: bool,
+
+    /// Delete stale lab-prefixed resources before replaying
+    #[arg(long)]
+    pub force_cleanup: bool,
+}
+
+/// Maps to TransportProtocol but implements clap::ValueEnum
+#[derive(Debug, Clone, Copy, ValueEnum)]
+pub enum Transport {
+    Tcp,
+    Quic,
+    Http,
+    #[value(alias = "ws")]
+    WebSocket,
+}
+
+#[derive(Debug, Clone, Copy, ValueEnum)]
+pub enum ScenarioName {
+    ConcurrentCrud,
+    SegmentRotation,
+    MixedWorkload,
+}
+
+impl std::fmt::Display for ScenarioName {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Self::ConcurrentCrud => write!(f, "concurrent-crud"),
+            Self::SegmentRotation => write!(f, "segment-rotation"),
+            Self::MixedWorkload => write!(f, "mixed-workload"),
+        }
+    }
+}
diff --git a/core/lab/src/client.rs b/core/lab/src/client.rs
new file mode 100644
index 000000000..ed7ee5d27
--- /dev/null
+++ b/core/lab/src/client.rs
@@ -0,0 +1,66 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::args::Transport;
+use iggy::prelude::*;
+use tracing::warn;
+
+pub async fn create_client(
+    server_address: &str,
+    transport: Transport,
+) -> Result<IggyClient, IggyError> {
+    let client = match transport {
+        Transport::Tcp => IggyClient::builder()
+            .with_tcp()
+            .with_server_address(server_address.to_owned())
+            .build()?,
+        Transport::Quic => IggyClient::builder()
+            .with_quic()
+            .with_server_address(server_address.to_owned())
+            .build()?,
+        Transport::Http => {
+            let api_url = format!("http://{server_address}";);
+            IggyClient::builder()
+                .with_http()
+                .with_api_url(api_url)
+                .build()?
+        }
+        Transport::WebSocket => IggyClient::builder()
+            .with_websocket()
+            .with_server_address(server_address.to_owned())
+            .build()?,
+    };
+    client.connect().await?;
+    client
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await?;
+    Ok(client)
+}
+
+/// Delete all streams whose name starts with `prefix`.
+pub async fn cleanup_prefixed(client: &IggyClient, prefix: &str) {
+    let Ok(streams) = client.get_streams().await else {
+        return;
+    };
+    for stream in streams.iter().filter(|s| s.name.starts_with(prefix)) {
+        let id = Identifier::numeric(stream.id).unwrap();
+        if let Err(e) = client.delete_stream(&id).await {
+            warn!("Cleanup: failed to delete stream '{}': {e}", stream.name);
+        }
+    }
+}
diff --git a/core/lab/src/invariants.rs b/core/lab/src/invariants.rs
new file mode 100644
index 000000000..403acd7a0
--- /dev/null
+++ b/core/lab/src/invariants.rs
@@ -0,0 +1,297 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::args::Transport;
+use crate::client::create_client;
+use crate::ops::{Op, OpOutcome};
+use crate::shadow::{ShadowState, Watermarks};
+use iggy::prelude::*;
+use serde::Serialize;
+use std::collections::HashMap;
+
+pub type PartitionKey = (String, String, u32);
+
+#[derive(Debug, Clone, Serialize)]
+pub struct InvariantViolation {
+    pub kind: &'static str,
+    pub description: String,
+    pub context: String,
+}
+
+/// Check that polled offsets are monotonically increasing per partition.
+/// Offset regressions after a recent purge are expected and not violations.
+pub fn check_offset_monotonicity(
+    last_offsets: &mut HashMap<PartitionKey, u64>,
+    op: &Op,
+    outcome: &OpOutcome,
+    shadow: &ShadowState,
+) -> Result<(), InvariantViolation> {
+    let (stream, topic, partition) = match op {
+        Op::PollMessages {
+            stream,
+            topic,
+            partition,
+            ..
+        } => (stream, topic, partition),
+        _ => return Ok(()),
+    };
+    let offset = match outcome {
+        OpOutcome::Success {
+            detail: Some(detail),
+        } => match parse_poll_offset(detail) {
+            Some(o) => o,
+            None => return Ok(()),
+        },
+        _ => return Ok(()),
+    };
+
+    let key = (stream.clone(), topic.clone(), *partition);
+    if let Some(&last) = last_offsets.get(&key)
+        && offset < last
+    {
+        if shadow.was_recently_purged(stream, topic) {
+            last_offsets.insert(key, offset);
+            return Ok(());
+        }
+        return Err(InvariantViolation {
+            kind: "offset_regression",
+            description: format!("Offset went backwards: {last} -> {offset}"),
+            context: format!("stream={stream}, topic={topic}, 
partition={partition}"),
+        });
+    }
+    last_offsets.insert(key, offset);
+    Ok(())
+}
+
+/// Check that polled offsets don't exceed send watermarks.
+pub fn check_watermark(
+    watermarks: &Watermarks,
+    op: &Op,
+    outcome: &OpOutcome,
+) -> Result<(), InvariantViolation> {
+    let (stream, topic, partition) = match op {
+        Op::PollMessages {
+            stream,
+            topic,
+            partition,
+            ..
+        } => (stream, topic, partition),
+        _ => return Ok(()),
+    };
+    let offset = match outcome {
+        OpOutcome::Success {
+            detail: Some(detail),
+        } => match parse_poll_offset(detail) {
+            Some(o) => o,
+            None => return Ok(()),
+        },
+        _ => return Ok(()),
+    };
+
+    if let Some(wm) = watermarks.get_watermark(stream, topic, *partition)
+        && offset > wm
+    {
+        return Err(InvariantViolation {
+            kind: "watermark_exceeded",
+            description: format!("Poll offset {offset} exceeds send watermark 
{wm}"),
+            context: format!("stream={stream}, topic={topic}, 
partition={partition}"),
+        });
+    }
+    Ok(())
+}
+
+/// Post-run: verify server state matches shadow state for all prefixed 
resources.
+pub async fn post_run_verify(
+    client: &IggyClient,
+    shadow: &ShadowState,
+    prefix: &str,
+) -> Vec<InvariantViolation> {
+    let mut violations = Vec::new();
+
+    let streams = match client.get_streams().await {
+        Ok(s) => s,
+        Err(e) => {
+            violations.push(InvariantViolation {
+                kind: "post_run_fetch_failed",
+                description: format!("Failed to get streams: {e}"),
+                context: String::new(),
+            });
+            return violations;
+        }
+    };
+
+    let server_streams: HashMap<&str, &Stream> = streams
+        .iter()
+        .filter(|s| s.name.starts_with(prefix))
+        .map(|s| (s.name.as_str(), s))
+        .collect();
+
+    // Check all shadow streams exist on server
+    for name in shadow.streams.keys() {
+        if !server_streams.contains_key(name.as_str()) {
+            violations.push(InvariantViolation {
+                kind: "missing_stream",
+                description: format!("Shadow has stream '{name}' but server 
does not"),
+                context: String::new(),
+            });
+        }
+    }
+
+    // Check no extra prefixed streams on server
+    for &name in server_streams.keys() {
+        if !shadow.streams.contains_key(name) {
+            violations.push(InvariantViolation {
+                kind: "extra_stream",
+                description: format!("Server has stream '{name}' not in 
shadow"),
+                context: String::new(),
+            });
+        }
+    }
+
+    // Deep-check topics for each matching stream
+    for (name, shadow_stream) in &shadow.streams {
+        let Some(server_stream) = server_streams.get(name.as_str()) else {
+            continue;
+        };
+
+        let stream_id = Identifier::numeric(server_stream.id).unwrap();
+        let Ok(Some(details)) = client.get_stream(&stream_id).await else {
+            violations.push(InvariantViolation {
+                kind: "stream_details_fetch_failed",
+                description: format!("Failed to get details for stream 
'{name}'"),
+                context: String::new(),
+            });
+            continue;
+        };
+
+        let server_topics: HashMap<&str, &Topic> = details
+            .topics
+            .iter()
+            .map(|t| (t.name.as_str(), t))
+            .collect();
+
+        for topic_name in shadow_stream.topics.keys() {
+            if !server_topics.contains_key(topic_name.as_str()) {
+                violations.push(InvariantViolation {
+                    kind: "missing_topic",
+                    description: format!(
+                        "Shadow has topic '{topic_name}' in stream '{name}' 
but server does not"
+                    ),
+                    context: String::new(),
+                });
+            }
+        }
+
+        for &topic_name in server_topics.keys() {
+            if !shadow_stream.topics.contains_key(topic_name) {
+                violations.push(InvariantViolation {
+                    kind: "extra_topic",
+                    description: format!(
+                        "Server has topic '{topic_name}' in stream '{name}' 
not in shadow"
+                    ),
+                    context: String::new(),
+                });
+            }
+        }
+    }
+
+    violations
+}
+
+/// Cross-client consistency: spawn a fresh client and compare get_streams 
results.
+pub async fn cross_client_consistency(
+    addr: &str,
+    transport: Transport,
+    prefix: &str,
+) -> Vec<InvariantViolation> {
+    let mut violations = Vec::new();
+
+    let client_a = match create_client(addr, transport).await {
+        Ok(c) => c,
+        Err(e) => {
+            violations.push(InvariantViolation {
+                kind: "cross_client_connect_failed",
+                description: format!("Client A: {e}"),
+                context: String::new(),
+            });
+            return violations;
+        }
+    };
+    let client_b = match create_client(addr, transport).await {
+        Ok(c) => c,
+        Err(e) => {
+            violations.push(InvariantViolation {
+                kind: "cross_client_connect_failed",
+                description: format!("Client B: {e}"),
+                context: String::new(),
+            });
+            return violations;
+        }
+    };
+
+    let (a_result, b_result) = tokio::join!(client_a.get_streams(), 
client_b.get_streams());
+
+    let (streams_a, streams_b) = match (a_result, b_result) {
+        (Ok(a), Ok(b)) => (a, b),
+        (Err(e), _) | (_, Err(e)) => {
+            violations.push(InvariantViolation {
+                kind: "cross_client_fetch_failed",
+                description: format!("Failed to get streams: {e}"),
+                context: String::new(),
+            });
+            return violations;
+        }
+    };
+
+    let names_a: std::collections::HashSet<&str> = streams_a
+        .iter()
+        .filter(|s| s.name.starts_with(prefix))
+        .map(|s| s.name.as_str())
+        .collect();
+    let names_b: std::collections::HashSet<&str> = streams_b
+        .iter()
+        .filter(|s| s.name.starts_with(prefix))
+        .map(|s| s.name.as_str())
+        .collect();
+
+    if names_a != names_b {
+        violations.push(InvariantViolation {
+            kind: "cross_client_inconsistency",
+            description: format!(
+                "Client A sees {} streams, Client B sees {}",
+                names_a.len(),
+                names_b.len()
+            ),
+            context: format!(
+                "only_in_a={:?}, only_in_b={:?}",
+                names_a.difference(&names_b).collect::<Vec<_>>(),
+                names_b.difference(&names_a).collect::<Vec<_>>()
+            ),
+        });
+    }
+
+    violations
+}
+
+fn parse_poll_offset(detail: &str) -> Option<u64> {
+    // detail format: "received=N, offset=M"
+    detail
+        .split(", ")
+        .find_map(|part| part.strip_prefix("offset="))
+        .and_then(|v| v.parse().ok())
+}
diff --git a/core/lab/src/main.rs b/core/lab/src/main.rs
new file mode 100644
index 000000000..aa78ce9c4
--- /dev/null
+++ b/core/lab/src/main.rs
@@ -0,0 +1,94 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+mod args;
+mod client;
+mod invariants;
+mod ops;
+mod replay;
+mod report;
+mod runner;
+mod safe_name;
+mod scenarios;
+mod shadow;
+mod trace;
+mod worker;
+
+use args::{Command, IggyLabArgs};
+use clap::Parser;
+use iggy_common::IggyError;
+use replay::LabReplay;
+use runner::LabRunner;
+use tracing_subscriber::EnvFilter;
+use tracing_subscriber::layer::SubscriberExt;
+use tracing_subscriber::util::SubscriberInitExt;
+
+#[tokio::main]
+async fn main() -> Result<(), IggyError> {
+    let args = IggyLabArgs::parse();
+
+    match args.command {
+        Command::List => {
+            println!("Available scenarios:\n");
+            for (name, description) in scenarios::list_scenarios() {
+                let first_line = 
description.lines().next().unwrap_or(description);
+                println!("  {name:<25} {first_line}");
+            }
+        }
+        Command::Explain { scenario } => {
+            let s = scenarios::create_scenario(scenario);
+            println!("{}\n", s.name());
+            println!("{}", s.describe());
+        }
+        Command::Run(run_args) => {
+            init_tracing();
+
+            let runner = LabRunner::new(run_args);
+            let outcome = runner.run().await?;
+
+            if !outcome.passed {
+                std::process::exit(1);
+            }
+        }
+        Command::Replay(replay_args) => {
+            init_tracing();
+
+            let outcome = LabReplay::new(replay_args).run().await?;
+
+            println!(
+                "Replayed {} ops — {} divergences ({} concerning)",
+                outcome.total_ops, outcome.divergences_total, 
outcome.divergences_concerning
+            );
+
+            if outcome.divergences_concerning > 0 {
+                std::process::exit(1);
+            }
+        }
+    }
+
+    Ok(())
+}
+
+fn init_tracing() {
+    let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| 
EnvFilter::new("info"));
+
+    tracing_subscriber::registry()
+        .with(env_filter)
+        .with(tracing_subscriber::fmt::layer().with_ansi(true))
+        .init();
+}
diff --git a/core/lab/src/ops/classify.rs b/core/lab/src/ops/classify.rs
new file mode 100644
index 000000000..9c4350bda
--- /dev/null
+++ b/core/lab/src/ops/classify.rs
@@ -0,0 +1,111 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::{ErrorClass, Op};
+use crate::shadow::ShadowState;
+use iggy_common::IggyError;
+
+#[allow(dead_code)]
+impl Op {
+    /// Classify an IggyError using shadow state knowledge to distinguish
+    /// between concurrent races (benign), server bugs, and transient failures.
+    pub fn classify_error(&self, err: &IggyError, state: &ShadowState) -> 
ErrorClass {
+        match err {
+            IggyError::Disconnected
+            | IggyError::NotConnected
+            | IggyError::CannotEstablishConnection
+            | IggyError::Unauthenticated => ErrorClass::Transient,
+
+            IggyError::StreamNameNotFound(name) | 
IggyError::StreamNameAlreadyExists(name) => {
+                if state.was_recently_deleted(name) || 
!state.stream_exists(name) {
+                    ErrorClass::ExpectedConcurrent
+                } else {
+                    ErrorClass::ServerBug
+                }
+            }
+
+            IggyError::StreamIdNotFound(_) => {
+                if let Some(stream_name) = self.stream_name()
+                    && (state.was_recently_deleted(stream_name)
+                        || !state.stream_exists(stream_name))
+                {
+                    return ErrorClass::ExpectedConcurrent;
+                }
+                ErrorClass::ServerBug
+            }
+
+            IggyError::TopicNameNotFound(_, _)
+            | IggyError::TopicNameAlreadyExists(_, _)
+            | IggyError::TopicIdNotFound(_, _) => {
+                if let (Some(s), Some(t)) = (self.stream_name(), 
self.topic_name())
+                    && (state.was_recently_deleted(&format!("{s}/{t}"))
+                        || state.was_recently_deleted(s)
+                        || !state.topic_exists(s, t))
+                {
+                    return ErrorClass::ExpectedConcurrent;
+                }
+                ErrorClass::ServerBug
+            }
+
+            IggyError::PartitionNotFound(_, _, _)
+            | IggyError::SegmentNotFound
+            | IggyError::ConsumerGroupIdNotFound(_, _)
+            | IggyError::ConsumerGroupNameNotFound(_, _)
+            | IggyError::ConsumerGroupMemberNotFound(_, _, _)
+            | IggyError::ResourceNotFound(_) => ErrorClass::ExpectedConcurrent,
+
+            _ => ErrorClass::ServerBug,
+        }
+    }
+
+    fn stream_name(&self) -> Option<&str> {
+        match self {
+            Op::CreateStream { name } | Op::DeleteStream { name } | 
Op::PurgeStream { name } => {
+                Some(name)
+            }
+            Op::CreateTopic { stream, .. }
+            | Op::DeleteTopic { stream, .. }
+            | Op::PurgeTopic { stream, .. }
+            | Op::SendMessages { stream, .. }
+            | Op::PollMessages { stream, .. }
+            | Op::DeleteSegments { stream, .. }
+            | Op::CreatePartitions { stream, .. }
+            | Op::DeletePartitions { stream, .. }
+            | Op::CreateConsumerGroup { stream, .. }
+            | Op::DeleteConsumerGroup { stream, .. }
+            | Op::StoreConsumerOffset { stream, .. } => Some(stream),
+        }
+    }
+
+    fn topic_name(&self) -> Option<&str> {
+        match self {
+            Op::CreateStream { .. } | Op::DeleteStream { .. } | 
Op::PurgeStream { .. } => None,
+            Op::CreateTopic { name, .. } => Some(name),
+            Op::DeleteTopic { topic, .. }
+            | Op::PurgeTopic { topic, .. }
+            | Op::SendMessages { topic, .. }
+            | Op::PollMessages { topic, .. }
+            | Op::DeleteSegments { topic, .. }
+            | Op::CreatePartitions { topic, .. }
+            | Op::DeletePartitions { topic, .. }
+            | Op::CreateConsumerGroup { topic, .. }
+            | Op::DeleteConsumerGroup { topic, .. }
+            | Op::StoreConsumerOffset { topic, .. } => Some(topic),
+        }
+    }
+}
diff --git a/core/lab/src/ops/execute.rs b/core/lab/src/ops/execute.rs
new file mode 100644
index 000000000..295c719d1
--- /dev/null
+++ b/core/lab/src/ops/execute.rs
@@ -0,0 +1,285 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::{Op, OpOutcome};
+use bytes::Bytes;
+use iggy::prelude::*;
+use rand::{Rng, RngExt};
+
+impl Op {
+    pub async fn execute(
+        &self,
+        client: &IggyClient,
+        msg_size: u32,
+        rng: &mut (impl Rng + ?Sized),
+    ) -> OpOutcome {
+        match self {
+            Op::CreateStream { name } => match 
client.create_stream(name).await {
+                Ok(details) => OpOutcome::success_with(format!("id={}", 
details.id)),
+                Err(e) => classify_sdk_error(e, "create_stream"),
+            },
+
+            Op::DeleteStream { name } => {
+                let id = Identifier::from_str_value(name).unwrap();
+                match client.delete_stream(&id).await {
+                    Ok(()) => OpOutcome::success(),
+                    Err(e) => classify_sdk_error(e, "delete_stream"),
+                }
+            }
+
+            Op::PurgeStream { name } => {
+                let id = Identifier::from_str_value(name).unwrap();
+                match client.purge_stream(&id).await {
+                    Ok(()) => OpOutcome::success(),
+                    Err(e) => classify_sdk_error(e, "purge_stream"),
+                }
+            }
+
+            Op::CreateTopic {
+                stream,
+                name,
+                partitions,
+            } => {
+                let stream_id = Identifier::from_str_value(stream).unwrap();
+                match client
+                    .create_topic(
+                        &stream_id,
+                        name,
+                        *partitions,
+                        CompressionAlgorithm::None,
+                        None,
+                        IggyExpiry::NeverExpire,
+                        MaxTopicSize::Unlimited,
+                    )
+                    .await
+                {
+                    Ok(details) => OpOutcome::success_with(format!("id={}", 
details.id)),
+                    Err(e) => classify_sdk_error(e, "create_topic"),
+                }
+            }
+
+            Op::DeleteTopic { stream, topic } => {
+                let stream_id = Identifier::from_str_value(stream).unwrap();
+                let topic_id = Identifier::from_str_value(topic).unwrap();
+                match client.delete_topic(&stream_id, &topic_id).await {
+                    Ok(()) => OpOutcome::success(),
+                    Err(e) => classify_sdk_error(e, "delete_topic"),
+                }
+            }
+
+            Op::PurgeTopic { stream, topic } => {
+                let stream_id = Identifier::from_str_value(stream).unwrap();
+                let topic_id = Identifier::from_str_value(topic).unwrap();
+                match client.purge_topic(&stream_id, &topic_id).await {
+                    Ok(()) => OpOutcome::success(),
+                    Err(e) => classify_sdk_error(e, "purge_topic"),
+                }
+            }
+
+            Op::SendMessages {
+                stream,
+                topic,
+                partition,
+                count,
+            } => {
+                let stream_id = Identifier::from_str_value(stream).unwrap();
+                let topic_id = Identifier::from_str_value(topic).unwrap();
+                let partitioning = Partitioning::partition_id(*partition);
+                let mut messages: Vec<IggyMessage> = Vec::with_capacity(*count 
as usize);
+                for _ in 0..*count {
+                    let mut payload = vec![0u8; msg_size as usize];
+                    rng.fill(&mut payload[..]);
+                    messages.push(
+                        IggyMessage::builder()
+                            .payload(Bytes::from(payload))
+                            .build()
+                            .unwrap(),
+                    );
+                }
+                match client
+                    .send_messages(&stream_id, &topic_id, &partitioning, &mut 
messages)
+                    .await
+                {
+                    Ok(()) => 
OpOutcome::success_with(format!("count={count}")),
+                    Err(e) => classify_sdk_error(e, "send_messages"),
+                }
+            }
+
+            Op::PollMessages {
+                stream,
+                topic,
+                partition,
+                count,
+            } => {
+                let stream_id = Identifier::from_str_value(stream).unwrap();
+                let topic_id = Identifier::from_str_value(topic).unwrap();
+                let consumer = Consumer::new(Identifier::numeric(1).unwrap());
+                let strategy = PollingStrategy::next();
+                match client
+                    .poll_messages(
+                        &stream_id,
+                        &topic_id,
+                        Some(*partition),
+                        &consumer,
+                        &strategy,
+                        *count,
+                        false,
+                    )
+                    .await
+                {
+                    Ok(polled) => OpOutcome::success_with(format!(
+                        "received={}, offset={}",
+                        polled.messages.len(),
+                        polled.current_offset
+                    )),
+                    Err(e) => classify_sdk_error(e, "poll_messages"),
+                }
+            }
+
+            Op::DeleteSegments {
+                stream,
+                topic,
+                partition,
+                count,
+            } => {
+                let stream_id = Identifier::from_str_value(stream).unwrap();
+                let topic_id = Identifier::from_str_value(topic).unwrap();
+                match client
+                    .delete_segments(&stream_id, &topic_id, *partition, *count)
+                    .await
+                {
+                    Ok(()) => OpOutcome::success(),
+                    Err(e) => classify_sdk_error(e, "delete_segments"),
+                }
+            }
+
+            Op::CreatePartitions {
+                stream,
+                topic,
+                count,
+            } => {
+                let stream_id = Identifier::from_str_value(stream).unwrap();
+                let topic_id = Identifier::from_str_value(topic).unwrap();
+                match client
+                    .create_partitions(&stream_id, &topic_id, *count)
+                    .await
+                {
+                    Ok(()) => OpOutcome::success(),
+                    Err(e) => classify_sdk_error(e, "create_partitions"),
+                }
+            }
+
+            Op::DeletePartitions {
+                stream,
+                topic,
+                count,
+            } => {
+                let stream_id = Identifier::from_str_value(stream).unwrap();
+                let topic_id = Identifier::from_str_value(topic).unwrap();
+                match client
+                    .delete_partitions(&stream_id, &topic_id, *count)
+                    .await
+                {
+                    Ok(()) => OpOutcome::success(),
+                    Err(e) => classify_sdk_error(e, "delete_partitions"),
+                }
+            }
+
+            Op::CreateConsumerGroup {
+                stream,
+                topic,
+                name,
+            } => {
+                let stream_id = Identifier::from_str_value(stream).unwrap();
+                let topic_id = Identifier::from_str_value(topic).unwrap();
+                match client
+                    .create_consumer_group(&stream_id, &topic_id, name)
+                    .await
+                {
+                    Ok(details) => OpOutcome::success_with(format!("id={}", 
details.id)),
+                    Err(e) => classify_sdk_error(e, "create_consumer_group"),
+                }
+            }
+
+            Op::DeleteConsumerGroup {
+                stream,
+                topic,
+                name,
+            } => {
+                let stream_id = Identifier::from_str_value(stream).unwrap();
+                let topic_id = Identifier::from_str_value(topic).unwrap();
+                let group_id = Identifier::from_str_value(name).unwrap();
+                match client
+                    .delete_consumer_group(&stream_id, &topic_id, &group_id)
+                    .await
+                {
+                    Ok(()) => OpOutcome::success(),
+                    Err(e) => classify_sdk_error(e, "delete_consumer_group"),
+                }
+            }
+
+            Op::StoreConsumerOffset {
+                stream,
+                topic,
+                partition,
+                offset,
+            } => {
+                let stream_id = Identifier::from_str_value(stream).unwrap();
+                let topic_id = Identifier::from_str_value(topic).unwrap();
+                let consumer = Consumer::new(Identifier::numeric(1).unwrap());
+                match client
+                    .store_consumer_offset(
+                        &consumer,
+                        &stream_id,
+                        &topic_id,
+                        Some(*partition),
+                        *offset,
+                    )
+                    .await
+                {
+                    Ok(()) => OpOutcome::success(),
+                    Err(e) => classify_sdk_error(e, "store_consumer_offset"),
+                }
+            }
+        }
+    }
+}
+
+fn classify_sdk_error(err: IggyError, op_name: &str) -> OpOutcome {
+    match &err {
+        IggyError::StreamNameAlreadyExists(_)
+        | IggyError::TopicNameAlreadyExists(_, _)
+        | IggyError::StreamNameNotFound(_)
+        | IggyError::TopicNameNotFound(_, _)
+        | IggyError::StreamIdNotFound(_)
+        | IggyError::TopicIdNotFound(_, _)
+        | IggyError::PartitionNotFound(_, _, _)
+        | IggyError::SegmentNotFound
+        | IggyError::ConsumerGroupIdNotFound(_, _)
+        | IggyError::ConsumerGroupNameNotFound(_, _)
+        | IggyError::ConsumerGroupMemberNotFound(_, _, _)
+        | IggyError::ResourceNotFound(_) => 
OpOutcome::expected(err.to_string(), "concurrent_race"),
+        IggyError::Disconnected
+        | IggyError::NotConnected
+        | IggyError::CannotEstablishConnection
+        | IggyError::Unauthenticated => {
+            OpOutcome::unexpected(err.to_string(), format!("{op_name}: 
transient"))
+        }
+        _ => OpOutcome::unexpected(err.to_string(), op_name.to_string()),
+    }
+}
diff --git a/core/lab/src/ops/generate.rs b/core/lab/src/ops/generate.rs
new file mode 100644
index 000000000..03f5e880f
--- /dev/null
+++ b/core/lab/src/ops/generate.rs
@@ -0,0 +1,189 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::{Op, OpKind};
+use crate::safe_name::SafeResourceName;
+use crate::shadow::ShadowState;
+use rand::{Rng, RngExt};
+
+const DEFAULT_PARTITIONS: u32 = 3;
+
+impl Op {
+    /// Generate a concrete Op from an OpKind, resolving targets from the 
shadow state.
+    /// Returns None if the precondition for this kind cannot be met.
+    pub fn generate(
+        kind: OpKind,
+        state: &ShadowState,
+        rng: &mut impl Rng,
+        prefix: &str,
+        messages_per_batch: u32,
+    ) -> Option<Self> {
+        match kind {
+            OpKind::CreateStream => {
+                let name = SafeResourceName::random(prefix, rng);
+                Some(Op::CreateStream {
+                    name: name.into_inner(),
+                })
+            }
+
+            OpKind::DeleteStream => {
+                let name = state.random_stream(rng)?;
+                Some(Op::DeleteStream {
+                    name: name.to_owned(),
+                })
+            }
+
+            OpKind::PurgeStream => {
+                let name = state.random_stream(rng)?;
+                Some(Op::PurgeStream {
+                    name: name.to_owned(),
+                })
+            }
+
+            OpKind::CreateTopic => {
+                let stream = state.random_stream(rng)?;
+                let name = SafeResourceName::random(prefix, rng);
+                Some(Op::CreateTopic {
+                    stream: stream.to_owned(),
+                    name: name.into_inner(),
+                    partitions: DEFAULT_PARTITIONS,
+                })
+            }
+
+            OpKind::DeleteTopic => {
+                let (stream, topic) = state.random_topic(rng)?;
+                Some(Op::DeleteTopic {
+                    stream: stream.to_owned(),
+                    topic: topic.to_owned(),
+                })
+            }
+
+            OpKind::PurgeTopic => {
+                let (stream, topic) = state.random_topic(rng)?;
+                Some(Op::PurgeTopic {
+                    stream: stream.to_owned(),
+                    topic: topic.to_owned(),
+                })
+            }
+
+            OpKind::SendMessages => {
+                let (stream, topic, partitions) = 
state.random_topic_with_partitions(rng)?;
+                if partitions == 0 {
+                    return None;
+                }
+                let partition = rng.random_range(1..=partitions);
+                Some(Op::SendMessages {
+                    stream: stream.to_owned(),
+                    topic: topic.to_owned(),
+                    partition,
+                    count: messages_per_batch,
+                })
+            }
+
+            OpKind::PollMessages => {
+                let (stream, topic, partitions) = 
state.random_topic_with_partitions(rng)?;
+                if partitions == 0 {
+                    return None;
+                }
+                let partition = rng.random_range(1..=partitions);
+                Some(Op::PollMessages {
+                    stream: stream.to_owned(),
+                    topic: topic.to_owned(),
+                    partition,
+                    count: messages_per_batch,
+                })
+            }
+
+            OpKind::DeleteSegments => {
+                let (stream, topic, partitions) = 
state.random_topic_with_partitions(rng)?;
+                if partitions == 0 {
+                    return None;
+                }
+                let partition = rng.random_range(1..=partitions);
+                Some(Op::DeleteSegments {
+                    stream: stream.to_owned(),
+                    topic: topic.to_owned(),
+                    partition,
+                    count: 1,
+                })
+            }
+
+            OpKind::CreatePartitions => {
+                let (stream, topic) = state.random_topic(rng)?;
+                Some(Op::CreatePartitions {
+                    stream: stream.to_owned(),
+                    topic: topic.to_owned(),
+                    count: 1,
+                })
+            }
+
+            OpKind::DeletePartitions => {
+                let (stream, topic, partitions) = 
state.random_topic_with_partitions(rng)?;
+                // Server requires at least 1 partition to remain
+                if partitions <= 1 {
+                    return None;
+                }
+                Some(Op::DeletePartitions {
+                    stream: stream.to_owned(),
+                    topic: topic.to_owned(),
+                    count: 1,
+                })
+            }
+
+            OpKind::CreateConsumerGroup => {
+                let (stream, topic) = state.random_topic(rng)?;
+                let name = SafeResourceName::random(prefix, rng);
+                Some(Op::CreateConsumerGroup {
+                    stream: stream.to_owned(),
+                    topic: topic.to_owned(),
+                    name: name.into_inner(),
+                })
+            }
+
+            OpKind::DeleteConsumerGroup => {
+                let (stream, topic) = state.random_topic(rng)?;
+                let groups = 
&state.streams[stream].topics[topic].consumer_groups;
+                if groups.is_empty() {
+                    return None;
+                }
+                let idx = rng.random_range(0..groups.len());
+                let name = groups.iter().nth(idx)?.to_owned();
+                Some(Op::DeleteConsumerGroup {
+                    stream: stream.to_owned(),
+                    topic: topic.to_owned(),
+                    name,
+                })
+            }
+
+            OpKind::StoreConsumerOffset => {
+                let (stream, topic, partitions) = 
state.random_topic_with_partitions(rng)?;
+                if partitions == 0 {
+                    return None;
+                }
+                let partition = rng.random_range(1..=partitions);
+                let offset = rng.random_range(0..1000);
+                Some(Op::StoreConsumerOffset {
+                    stream: stream.to_owned(),
+                    topic: topic.to_owned(),
+                    partition,
+                    offset,
+                })
+            }
+        }
+    }
+}
diff --git a/core/lab/src/ops/mod.rs b/core/lab/src/ops/mod.rs
new file mode 100644
index 000000000..ff9d1a926
--- /dev/null
+++ b/core/lab/src/ops/mod.rs
@@ -0,0 +1,197 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+pub mod classify;
+pub mod execute;
+pub mod generate;
+pub mod precondition;
+pub mod shadow_update;
+
+use serde::{Deserialize, Serialize};
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(tag = "type")]
+pub enum Op {
+    CreateStream {
+        name: String,
+    },
+    DeleteStream {
+        name: String,
+    },
+    PurgeStream {
+        name: String,
+    },
+    CreateTopic {
+        stream: String,
+        name: String,
+        partitions: u32,
+    },
+    DeleteTopic {
+        stream: String,
+        topic: String,
+    },
+    PurgeTopic {
+        stream: String,
+        topic: String,
+    },
+    SendMessages {
+        stream: String,
+        topic: String,
+        partition: u32,
+        count: u32,
+    },
+    PollMessages {
+        stream: String,
+        topic: String,
+        partition: u32,
+        count: u32,
+    },
+    DeleteSegments {
+        stream: String,
+        topic: String,
+        partition: u32,
+        count: u32,
+    },
+    CreatePartitions {
+        stream: String,
+        topic: String,
+        count: u32,
+    },
+    DeletePartitions {
+        stream: String,
+        topic: String,
+        count: u32,
+    },
+    CreateConsumerGroup {
+        stream: String,
+        topic: String,
+        name: String,
+    },
+    DeleteConsumerGroup {
+        stream: String,
+        topic: String,
+        name: String,
+    },
+    StoreConsumerOffset {
+        stream: String,
+        topic: String,
+        partition: u32,
+        offset: u64,
+    },
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(tag = "status")]
+pub enum OpOutcome {
+    Success { detail: Option<String> },
+    ExpectedError { error: String, reason: String },
+    UnexpectedError { error: String, context: String },
+}
+
+impl OpOutcome {
+    pub fn success() -> Self {
+        Self::Success { detail: None }
+    }
+
+    pub fn success_with(detail: impl Into<String>) -> Self {
+        Self::Success {
+            detail: Some(detail.into()),
+        }
+    }
+
+    pub fn expected(error: impl Into<String>, reason: impl Into<String>) -> 
Self {
+        Self::ExpectedError {
+            error: error.into(),
+            reason: reason.into(),
+        }
+    }
+
+    pub fn unexpected(error: impl Into<String>, context: impl Into<String>) -> 
Self {
+        Self::UnexpectedError {
+            error: error.into(),
+            context: context.into(),
+        }
+    }
+
+    pub fn is_success(&self) -> bool {
+        matches!(self, Self::Success { .. })
+    }
+
+    pub fn is_unexpected(&self) -> bool {
+        matches!(self, Self::UnexpectedError { .. })
+    }
+
+    pub fn result_tag(&self) -> &'static str {
+        match self {
+            Self::Success { .. } => "ok",
+            Self::ExpectedError { .. } => "expected_error",
+            Self::UnexpectedError { .. } => "unexpected_error",
+        }
+    }
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+#[allow(dead_code)]
+pub enum ErrorClass {
+    /// Another worker deleted the resource concurrently — benign
+    ExpectedConcurrent,
+    /// Server returned something that contradicts shadow state
+    ServerBug,
+    /// Transient network/timeout error — retry-safe
+    Transient,
+}
+
+/// Selects which kind of operation to generate (before target resolution).
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum OpKind {
+    CreateStream,
+    DeleteStream,
+    PurgeStream,
+    CreateTopic,
+    DeleteTopic,
+    PurgeTopic,
+    SendMessages,
+    PollMessages,
+    DeleteSegments,
+    CreatePartitions,
+    DeletePartitions,
+    CreateConsumerGroup,
+    DeleteConsumerGroup,
+    StoreConsumerOffset,
+}
+
+impl Op {
+    pub fn kind_tag(&self) -> &'static str {
+        match self {
+            Self::CreateStream { .. } => "create_stream",
+            Self::DeleteStream { .. } => "delete_stream",
+            Self::PurgeStream { .. } => "purge_stream",
+            Self::CreateTopic { .. } => "create_topic",
+            Self::DeleteTopic { .. } => "delete_topic",
+            Self::PurgeTopic { .. } => "purge_topic",
+            Self::SendMessages { .. } => "send_messages",
+            Self::PollMessages { .. } => "poll_messages",
+            Self::DeleteSegments { .. } => "delete_segments",
+            Self::CreatePartitions { .. } => "create_partitions",
+            Self::DeletePartitions { .. } => "delete_partitions",
+            Self::CreateConsumerGroup { .. } => "create_consumer_group",
+            Self::DeleteConsumerGroup { .. } => "delete_consumer_group",
+            Self::StoreConsumerOffset { .. } => "store_consumer_offset",
+        }
+    }
+}
diff --git a/core/lab/src/ops/precondition.rs b/core/lab/src/ops/precondition.rs
new file mode 100644
index 000000000..812726bf8
--- /dev/null
+++ b/core/lab/src/ops/precondition.rs
@@ -0,0 +1,98 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::Op;
+use crate::shadow::ShadowState;
+
+#[allow(dead_code)]
+impl Op {
+    /// Check whether this op's preconditions are met according to our shadow 
state.
+    /// A false return means the op would definitely fail, so the worker 
should skip it.
+    pub fn precondition_met(&self, state: &ShadowState) -> bool {
+        match self {
+            Op::CreateStream { name } => !state.stream_exists(name),
+            Op::DeleteStream { name } | Op::PurgeStream { name } => 
state.stream_exists(name),
+
+            Op::CreateTopic { stream, name, .. } => {
+                state.stream_exists(stream) && !state.topic_exists(stream, 
name)
+            }
+            Op::DeleteTopic { stream, topic } | Op::PurgeTopic { stream, topic 
} => {
+                state.topic_exists(stream, topic)
+            }
+
+            Op::SendMessages {
+                stream,
+                topic,
+                partition,
+                ..
+            }
+            | Op::PollMessages {
+                stream,
+                topic,
+                partition,
+                ..
+            } => state
+                .get_topic(stream, topic)
+                .is_some_and(|t| *partition >= 1 && *partition <= 
t.partitions),
+
+            Op::DeleteSegments {
+                stream,
+                topic,
+                partition,
+                ..
+            } => state
+                .get_topic(stream, topic)
+                .is_some_and(|t| *partition >= 1 && *partition <= 
t.partitions),
+
+            Op::CreatePartitions { stream, topic, .. } => 
state.topic_exists(stream, topic),
+
+            Op::DeletePartitions {
+                stream,
+                topic,
+                count,
+            } => state
+                .get_topic(stream, topic)
+                .is_some_and(|t| t.partitions > *count),
+
+            Op::CreateConsumerGroup {
+                stream,
+                topic,
+                name,
+            } => state
+                .get_topic(stream, topic)
+                .is_some_and(|t| !t.consumer_groups.contains(name)),
+
+            Op::DeleteConsumerGroup {
+                stream,
+                topic,
+                name,
+            } => state
+                .get_topic(stream, topic)
+                .is_some_and(|t| t.consumer_groups.contains(name)),
+
+            Op::StoreConsumerOffset {
+                stream,
+                topic,
+                partition,
+                ..
+            } => state
+                .get_topic(stream, topic)
+                .is_some_and(|t| *partition >= 1 && *partition <= 
t.partitions),
+        }
+    }
+}
diff --git a/core/lab/src/ops/shadow_update.rs 
b/core/lab/src/ops/shadow_update.rs
new file mode 100644
index 000000000..68acd1edf
--- /dev/null
+++ b/core/lab/src/ops/shadow_update.rs
@@ -0,0 +1,89 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::{Op, OpOutcome};
+use crate::shadow::ShadowState;
+
+impl Op {
+    /// Update the shadow state to reflect a completed operation.
+    /// Only mutates on success — failed ops leave shadow unchanged.
+    pub fn update_shadow(&self, state: &mut ShadowState, outcome: &OpOutcome) {
+        if !outcome.is_success() {
+            return;
+        }
+
+        match self {
+            Op::CreateStream { name } => {
+                state.apply_create_stream(name.clone());
+            }
+            Op::DeleteStream { name } => {
+                state.apply_delete_stream(name);
+            }
+            Op::PurgeStream { name } => {
+                state.record_purge(name.clone(), None);
+            }
+            Op::CreateTopic {
+                stream,
+                name,
+                partitions,
+            } => {
+                state.apply_create_topic(stream, name.clone(), *partitions);
+            }
+            Op::DeleteTopic { stream, topic } => {
+                state.apply_delete_topic(stream, topic);
+            }
+            Op::PurgeTopic { stream, topic } => {
+                state.record_purge(stream.clone(), Some(topic.clone()));
+            }
+            Op::SendMessages { .. } => {}
+            Op::PollMessages { .. } => {}
+            Op::DeleteSegments { .. } => {}
+            Op::CreatePartitions {
+                stream,
+                topic,
+                count,
+            } => {
+                state.apply_create_partitions(stream, topic, *count);
+            }
+            Op::DeletePartitions {
+                stream,
+                topic,
+                count,
+            } => {
+                state.apply_delete_partitions(stream, topic, *count);
+            }
+            Op::CreateConsumerGroup {
+                stream,
+                topic,
+                name,
+            } => {
+                state.apply_create_consumer_group(stream, topic, name.clone());
+            }
+            Op::DeleteConsumerGroup {
+                stream,
+                topic,
+                name,
+            } => {
+                state.apply_delete_consumer_group(stream, topic, name);
+            }
+            Op::StoreConsumerOffset { .. } => {}
+        }
+
+        state.prune_tombstones();
+    }
+}
diff --git a/core/lab/src/replay.rs b/core/lab/src/replay.rs
new file mode 100644
index 000000000..bf799019a
--- /dev/null
+++ b/core/lab/src/replay.rs
@@ -0,0 +1,306 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::args::ReplayArgs;
+use crate::client::{cleanup_prefixed, create_client};
+use crate::ops::Op;
+use iggy_common::IggyError;
+use rand::SeedableRng;
+use rand::rngs::StdRng;
+use serde::Deserialize;
+use std::collections::HashMap;
+use std::fmt;
+use std::fs::File;
+use std::io::{BufRead, BufReader};
+use tracing::{error, info, warn};
+
+pub struct LabReplay {
+    args: ReplayArgs,
+}
+
+pub struct ReplayOutcome {
+    pub total_ops: u64,
+    pub divergences_total: u64,
+    pub divergences_concerning: u64,
+}
+
+/// Raw JSON line from a trace file — intent and outcome share the same shape
+/// with optional fields.
+#[derive(Deserialize)]
+struct RawTraceEntry {
+    seq: u64,
+    phase: String,
+    // Intent fields
+    op: Option<Op>,
+    // Outcome fields
+    result: Option<String>,
+    detail: Option<String>,
+}
+
+struct ReplayIntent {
+    seq: u64,
+    op: Op,
+}
+
+struct OriginalOutcome {
+    result: String,
+    detail: Option<String>,
+}
+
+struct Divergence {
+    seq: u64,
+    kind: DivergenceKind,
+    op: Op,
+    original_result: String,
+    replay_result: String,
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+enum DivergenceKind {
+    /// Original succeeded, replay failed — concerning
+    OriginalOkReplayFailed,
+    /// Original succeeded, replay returned expected error — concerning
+    OriginalOkReplayExpected,
+    /// Original got expected error, replay succeeded — benign (no concurrent 
race in sequential replay)
+    OriginalExpectedReplayOk,
+    /// Original got unexpected error, replay succeeded — interesting
+    OriginalUnexpectedReplayOk,
+    /// Both failed with different classifications
+    BothFailed,
+}
+
+impl DivergenceKind {
+    fn is_concerning(self) -> bool {
+        matches!(
+            self,
+            Self::OriginalOkReplayFailed | Self::OriginalOkReplayExpected
+        )
+    }
+}
+
+impl fmt::Display for DivergenceKind {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        match self {
+            Self::OriginalOkReplayFailed => write!(f, 
"original_ok→replay_failed"),
+            Self::OriginalOkReplayExpected => write!(f, 
"original_ok→replay_expected_error"),
+            Self::OriginalExpectedReplayOk => write!(f, 
"original_expected→replay_ok"),
+            Self::OriginalUnexpectedReplayOk => write!(f, 
"original_unexpected→replay_ok"),
+            Self::BothFailed => write!(f, "both_failed"),
+        }
+    }
+}
+
+impl LabReplay {
+    pub fn new(args: ReplayArgs) -> Self {
+        Self { args }
+    }
+
+    pub async fn run(self) -> Result<ReplayOutcome, IggyError> {
+        let (intents, outcomes) = self.load_traces()?;
+        if intents.is_empty() {
+            error!("No intent entries found in trace directory");
+            return Err(IggyError::InvalidConfiguration);
+        }
+
+        info!("=== iggy-lab replay ===");
+        info!("Trace dir: {:?}", self.args.trace_dir);
+        info!("Intents loaded: {}", intents.len());
+        info!(
+            "Server: {} ({})",
+            self.args.server_address,
+            transport_name(self.args.transport)
+        );
+
+        let client = create_client(&self.args.server_address, 
self.args.transport).await?;
+
+        if self.args.force_cleanup {
+            info!("Cleaning up prefixed resources...");
+            cleanup_prefixed(&client, &self.args.prefix).await;
+        }
+
+        // Deterministic RNG for message payloads (content is irrelevant to 
correctness)
+        let mut rng = StdRng::seed_from_u64(0);
+        let mut divergences: Vec<Divergence> = Vec::new();
+        let total_ops = intents.len() as u64;
+
+        for intent in &intents {
+            let replay_outcome = intent
+                .op
+                .execute(&client, self.args.message_size, &mut rng)
+                .await;
+            let replay_tag = replay_outcome.result_tag();
+
+            if let Some(original) = outcomes.get(&intent.seq) {
+                if replay_tag == original.result {
+                    continue;
+                }
+
+                let kind = classify_divergence(&original.result, replay_tag);
+                let divergence = Divergence {
+                    seq: intent.seq,
+                    kind,
+                    op: intent.op.clone(),
+                    original_result: original.result.clone(),
+                    replay_result: replay_tag.to_owned(),
+                };
+
+                if kind.is_concerning() {
+                    warn!(
+                        seq = intent.seq,
+                        kind = %kind,
+                        op = intent.op.kind_tag(),
+                        original = %original.result,
+                        replay = replay_tag,
+                        detail = original.detail.as_deref().unwrap_or(""),
+                        "Concerning divergence"
+                    );
+                } else {
+                    info!(
+                        seq = intent.seq,
+                        kind = %kind,
+                        op = intent.op.kind_tag(),
+                        "Benign divergence"
+                    );
+                }
+
+                let is_concerning = kind.is_concerning();
+                divergences.push(divergence);
+
+                if self.args.fail_fast && is_concerning {
+                    error!(
+                        "Fail-fast: stopping on concerning divergence at 
seq={}",
+                        intent.seq
+                    );
+                    break;
+                }
+            }
+        }
+
+        let divergences_concerning = divergences
+            .iter()
+            .filter(|d| d.kind.is_concerning())
+            .count() as u64;
+        let divergences_total = divergences.len() as u64;
+
+        info!("--- Replay Summary ---");
+        info!("Total ops replayed: {total_ops}");
+        info!("Total divergences: {divergences_total}");
+        info!("Concerning divergences: {divergences_concerning}");
+
+        if divergences_concerning > 0 {
+            error!("REPLAY FAILED — {divergences_concerning} concerning 
divergence(s):");
+            for d in divergences.iter().filter(|d| d.kind.is_concerning()) {
+                error!(
+                    "  seq={} op={} {} (original={}, replay={})",
+                    d.seq,
+                    d.op.kind_tag(),
+                    d.kind,
+                    d.original_result,
+                    d.replay_result,
+                );
+            }
+        } else if divergences_total > 0 {
+            info!("REPLAY OK — {divergences_total} benign divergence(s)");
+        } else {
+            info!("REPLAY OK — exact match");
+        }
+
+        // Cleanup after replay
+        if self.args.force_cleanup {
+            info!("Post-replay cleanup...");
+            cleanup_prefixed(&client, &self.args.prefix).await;
+        }
+
+        Ok(ReplayOutcome {
+            total_ops,
+            divergences_total,
+            divergences_concerning,
+        })
+    }
+
+    fn load_traces(&self) -> Result<(Vec<ReplayIntent>, HashMap<u64, 
OriginalOutcome>), IggyError> {
+        let entries =
+            std::fs::read_dir(&self.args.trace_dir).map_err(|_| 
IggyError::InvalidConfiguration)?;
+
+        let mut intents: Vec<ReplayIntent> = Vec::new();
+        let mut outcomes: HashMap<u64, OriginalOutcome> = HashMap::new();
+
+        for entry in entries {
+            let entry = entry.map_err(|_| IggyError::InvalidConfiguration)?;
+            let path = entry.path();
+            let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
+            if !name.starts_with("trace-worker-") || !name.ends_with(".jsonl") 
{
+                continue;
+            }
+
+            let file = File::open(&path).map_err(|_| 
IggyError::InvalidConfiguration)?;
+            let reader = BufReader::new(file);
+
+            for line in reader.lines() {
+                let line = line.map_err(|_| IggyError::InvalidConfiguration)?;
+                if line.is_empty() {
+                    continue;
+                }
+                let raw: RawTraceEntry =
+                    serde_json::from_str(&line).map_err(|_| 
IggyError::InvalidConfiguration)?;
+
+                match raw.phase.as_str() {
+                    "intent" => {
+                        if let Some(op) = raw.op {
+                            intents.push(ReplayIntent { seq: raw.seq, op });
+                        }
+                    }
+                    "outcome" => {
+                        if let Some(result) = raw.result {
+                            outcomes.insert(
+                                raw.seq,
+                                OriginalOutcome {
+                                    result,
+                                    detail: raw.detail,
+                                },
+                            );
+                        }
+                    }
+                    _ => {}
+                }
+            }
+        }
+
+        intents.sort_by_key(|i| i.seq);
+        Ok((intents, outcomes))
+    }
+}
+
+fn classify_divergence(original_result: &str, replay_result: &str) -> 
DivergenceKind {
+    match (original_result, replay_result) {
+        ("ok", "unexpected_error") => DivergenceKind::OriginalOkReplayFailed,
+        ("ok", "expected_error") => DivergenceKind::OriginalOkReplayExpected,
+        ("expected_error", "ok") => DivergenceKind::OriginalExpectedReplayOk,
+        ("unexpected_error", "ok") => 
DivergenceKind::OriginalUnexpectedReplayOk,
+        _ => DivergenceKind::BothFailed,
+    }
+}
+
+fn transport_name(t: crate::args::Transport) -> &'static str {
+    match t {
+        crate::args::Transport::Tcp => "tcp",
+        crate::args::Transport::Quic => "quic",
+        crate::args::Transport::Http => "http",
+        crate::args::Transport::WebSocket => "websocket",
+    }
+}
diff --git a/core/lab/src/report.rs b/core/lab/src/report.rs
new file mode 100644
index 000000000..13c6d249e
--- /dev/null
+++ b/core/lab/src/report.rs
@@ -0,0 +1,101 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::invariants::InvariantViolation;
+use serde::Serialize;
+use std::fs;
+use std::io::{self, BufWriter, Write};
+use std::path::Path;
+use std::time::Duration;
+
+pub struct ArtifactBundle<'a> {
+    pub seed: u64,
+    pub scenario_name: &'a str,
+    pub server_address: &'a str,
+    pub transport: &'a str,
+    pub workers: u32,
+    pub duration: Duration,
+    pub total_ops: u64,
+    pub passed: bool,
+    pub violations: &'a [InvariantViolation],
+    pub ops_target: Option<u64>,
+}
+
+#[derive(Serialize)]
+struct ConfigJson<'a> {
+    seed: u64,
+    scenario: &'a str,
+    server_address: &'a str,
+    transport: &'a str,
+    workers: u32,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    ops_target: Option<u64>,
+}
+
+#[derive(Serialize)]
+struct SummaryJson<'a> {
+    passed: bool,
+    total_ops: u64,
+    duration_secs: f64,
+    violation_count: usize,
+    scenario: &'a str,
+}
+
+impl<'a> ArtifactBundle<'a> {
+    pub fn write(&self, output_dir: &Path) -> io::Result<()> {
+        fs::create_dir_all(output_dir)?;
+
+        // config.json
+        let config = ConfigJson {
+            seed: self.seed,
+            scenario: self.scenario_name,
+            server_address: self.server_address,
+            transport: self.transport,
+            workers: self.workers,
+            ops_target: self.ops_target,
+        };
+        let config_path = output_dir.join("config.json");
+        let file = fs::File::create(config_path)?;
+        serde_json::to_writer_pretty(BufWriter::new(file), 
&config).map_err(io::Error::other)?;
+
+        // summary.json
+        let summary = SummaryJson {
+            passed: self.passed,
+            total_ops: self.total_ops,
+            duration_secs: self.duration.as_secs_f64(),
+            violation_count: self.violations.len(),
+            scenario: self.scenario_name,
+        };
+        let summary_path = output_dir.join("summary.json");
+        let file = fs::File::create(summary_path)?;
+        serde_json::to_writer_pretty(BufWriter::new(file), 
&summary).map_err(io::Error::other)?;
+
+        // violations.jsonl
+        if !self.violations.is_empty() {
+            let violations_path = output_dir.join("violations.jsonl");
+            let file = fs::File::create(violations_path)?;
+            let mut writer = BufWriter::new(file);
+            for v in self.violations {
+                serde_json::to_writer(&mut writer, 
v).map_err(io::Error::other)?;
+                writer.write_all(b"\n")?;
+            }
+        }
+
+        Ok(())
+    }
+}
diff --git a/core/lab/src/runner.rs b/core/lab/src/runner.rs
new file mode 100644
index 000000000..47764eba1
--- /dev/null
+++ b/core/lab/src/runner.rs
@@ -0,0 +1,378 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::args::RunArgs;
+use crate::client::create_client;
+use crate::invariants::{self, InvariantViolation};
+use crate::report::ArtifactBundle;
+use crate::safe_name::SafeResourceName;
+use crate::scenarios::{self, Scenario};
+use crate::shadow::{ShadowState, Watermarks};
+use crate::trace::WorkerTraceWriter;
+use crate::worker::{WorkerConfig, WorkerInit, WorkerResult, run_worker};
+use iggy::prelude::*;
+use rand::SeedableRng;
+use rand::rngs::StdRng;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
+use std::time::{Duration, Instant};
+use tokio::task::JoinSet;
+use tracing::{error, info, warn};
+
+pub struct LabRunner {
+    args: RunArgs,
+}
+
+#[allow(dead_code)]
+pub struct RunOutcome {
+    pub passed: bool,
+    pub ops_total: u64,
+    pub violations: Vec<InvariantViolation>,
+    pub duration: Duration,
+}
+
+impl LabRunner {
+    pub fn new(args: RunArgs) -> Self {
+        Self { args }
+    }
+
+    pub async fn run(self) -> Result<RunOutcome, IggyError> {
+        let seed = self.args.seed.unwrap_or_else(rand::random);
+        let scenario = scenarios::create_scenario(self.args.scenario);
+
+        info!("=== iggy-lab ===");
+        info!("Scenario: {}", scenario.name());
+        info!("Seed: {seed}");
+        info!(
+            "Server: {} ({})",
+            self.args.server_address,
+            transport_name(self.args.transport)
+        );
+        info!("Workers: {}", self.args.workers);
+        info!("Duration: {}", self.args.duration);
+        if let Some(ops) = self.args.ops {
+            info!("Ops target: {ops}");
+        }
+        info!("Prefix: {:?}", self.args.prefix);
+
+        // Pre-flight check
+        let admin_client = create_client(&self.args.server_address, 
self.args.transport).await?;
+        self.preflight_check(&admin_client, &self.args.prefix, 
self.args.force_cleanup)
+            .await?;
+
+        // Setup phase for scenarios that need it
+        self.setup_phase(&admin_client, scenario.as_ref()).await?;
+
+        if let Some(ref dir) = self.args.output_dir {
+            std::fs::create_dir_all(dir).map_err(|_| 
IggyError::InvalidConfiguration)?;
+        }
+
+        let shared_state = 
Arc::new(tokio::sync::RwLock::new(ShadowState::new()));
+        let watermarks = Arc::new(Watermarks::new());
+        let stop = Arc::new(AtomicBool::new(false));
+        let seq_counter = Arc::new(AtomicU64::new(0));
+        let ops_counter = Arc::new(AtomicU64::new(0));
+
+        // Populate initial shadow state from setup
+        {
+            let mut state = shared_state.write().await;
+            let streams = admin_client.get_streams().await?;
+            for stream in streams
+                .iter()
+                .filter(|s| s.name.starts_with(&self.args.prefix))
+            {
+                state.apply_create_stream(stream.name.clone());
+                let stream_id = Identifier::numeric(stream.id).unwrap();
+                if let Ok(Some(details)) = 
admin_client.get_stream(&stream_id).await {
+                    for topic in &details.topics {
+                        state.apply_create_topic(
+                            &stream.name,
+                            topic.name.clone(),
+                            topic.partitions_count,
+                        );
+                    }
+                }
+            }
+        }
+
+        let op_weights = scenario.op_weights();
+
+        // Spawn workers
+        let mut join_set = JoinSet::new();
+        for worker_id in 0..self.args.workers {
+            let worker_rng = StdRng::seed_from_u64(seed.wrapping_add(worker_id 
as u64));
+            let client = create_client(&self.args.server_address, 
self.args.transport).await?;
+            let trace_writer = self.args.output_dir.as_ref().map(|dir| {
+                WorkerTraceWriter::new(dir, worker_id).expect("Failed to 
create trace writer")
+            });
+            let config = WorkerConfig {
+                msg_size: self.args.message_size,
+                messages_per_batch: self.args.messages_per_batch,
+                prefix: self.args.prefix.clone(),
+                fail_fast: !self.args.no_fail_fast,
+                ops_target: self.args.ops,
+            };
+            let init = WorkerInit {
+                id: worker_id,
+                client,
+                rng: worker_rng,
+                shared_state: Arc::clone(&shared_state),
+                watermarks: Arc::clone(&watermarks),
+                trace_writer,
+                seq_counter: Arc::clone(&seq_counter),
+                ops_counter: Arc::clone(&ops_counter),
+            };
+            let weights = op_weights.clone();
+            let stop_clone = Arc::clone(&stop);
+            join_set.spawn(async move { run_worker(init, &weights, config, 
stop_clone).await });
+        }
+
+        let run_start = Instant::now();
+        let duration: Duration = self.args.duration.get_duration();
+
+        let ops_target = self.args.ops;
+        tokio::select! {
+            _ = tokio::time::sleep(duration) => {
+                info!("Duration elapsed, stopping workers...");
+            }
+            _ = shutdown_signal() => {
+                info!("Received shutdown signal, stopping workers...");
+            }
+            _ = wait_for_ops_target(&ops_counter, ops_target) => {
+                info!("Op count target reached, stopping workers...");
+            }
+        }
+        stop.store(true, Ordering::Relaxed);
+
+        // Collect results
+        let mut total_ops = 0u64;
+        let mut violations = Vec::new();
+        while let Some(result) = join_set.join_next().await {
+            match result {
+                Ok(WorkerResult::Ok { ops_executed }) => {
+                    total_ops += ops_executed;
+                }
+                Ok(WorkerResult::ServerBug(v)) => {
+                    violations.push(v);
+                }
+                Err(e) => {
+                    error!("Worker panicked: {e}");
+                }
+            }
+        }
+
+        let run_duration = run_start.elapsed();
+        info!("All workers stopped. Total ops: {total_ops}, duration: 
{run_duration:.1?}");
+
+        // Post-run verification
+        if !self.args.skip_post_run_verify {
+            info!("Running post-run verification...");
+            let state = shared_state.read().await;
+            let post_violations =
+                invariants::post_run_verify(&admin_client, &state, 
&self.args.prefix).await;
+            if !post_violations.is_empty() {
+                warn!("Post-run violations: {}", post_violations.len());
+            }
+            violations.extend(post_violations);
+
+            let cross_violations = invariants::cross_client_consistency(
+                &self.args.server_address,
+                self.args.transport,
+                &self.args.prefix,
+            )
+            .await;
+            if !cross_violations.is_empty() {
+                warn!("Cross-client violations: {}", cross_violations.len());
+            }
+            violations.extend(cross_violations);
+        }
+
+        let passed = violations.is_empty();
+
+        // Write artifacts
+        if let Some(ref dir) = self.args.output_dir {
+            let _state = shared_state.read().await;
+            let bundle = ArtifactBundle {
+                seed,
+                scenario_name: scenario.name(),
+                server_address: &self.args.server_address,
+                transport: transport_name(self.args.transport),
+                workers: self.args.workers,
+                duration: run_duration,
+                total_ops,
+                passed,
+                violations: &violations,
+                ops_target: self.args.ops,
+            };
+            if let Err(e) = bundle.write(dir) {
+                error!("Failed to write artifacts: {e}");
+            }
+        }
+
+        // Cleanup
+        if !self.args.no_cleanup {
+            info!("Cleaning up lab resources...");
+            self.cleanup(&admin_client, &self.args.prefix).await;
+        }
+
+        if passed {
+            info!("PASSED — {total_ops} ops, 0 violations");
+        } else {
+            error!("FAILED — {total_ops} ops, {} violations", 
violations.len());
+            for v in &violations {
+                error!("  [{}] {}: {}", v.kind, v.description, v.context);
+            }
+        }
+
+        Ok(RunOutcome {
+            passed,
+            ops_total: total_ops,
+            violations,
+            duration: run_duration,
+        })
+    }
+
+    async fn preflight_check(
+        &self,
+        client: &IggyClient,
+        prefix: &str,
+        force: bool,
+    ) -> Result<(), IggyError> {
+        let streams = client.get_streams().await?;
+        let stale: Vec<_> = streams
+            .iter()
+            .filter(|s| s.name.starts_with(prefix))
+            .collect();
+
+        if stale.is_empty() {
+            return Ok(());
+        }
+
+        if force {
+            info!(
+                "Force-cleaning {} stale resources with prefix '{prefix}'",
+                stale.len()
+            );
+            for stream in &stale {
+                let id = Identifier::numeric(stream.id).unwrap();
+                if let Err(e) = client.delete_stream(&id).await {
+                    warn!("Failed to delete stale stream '{}': {e}", 
stream.name);
+                }
+            }
+            Ok(())
+        } else {
+            error!(
+                "Found {} stale resources with prefix '{prefix}'. Use 
--force-cleanup to remove them.",
+                stale.len()
+            );
+            Err(IggyError::InvalidConfiguration)
+        }
+    }
+
+    async fn setup_phase(
+        &self,
+        client: &IggyClient,
+        scenario: &dyn Scenario,
+    ) -> Result<(), IggyError> {
+        match scenario.name() {
+            "concurrent-crud" => {
+                info!("Setup: creating 5 streams with 2 topics each...");
+                for i in 0..5 {
+                    let stream_name =
+                        SafeResourceName::new(&self.args.prefix, 
&format!("setup-s{i}"));
+                    client.create_stream(&stream_name).await?;
+                    for j in 0..2 {
+                        let topic_name =
+                            SafeResourceName::new(&self.args.prefix, 
&format!("setup-t{i}-{j}"));
+                        let stream_id = 
Identifier::from_str_value(&stream_name).unwrap();
+                        client
+                            .create_topic(
+                                &stream_id,
+                                &topic_name,
+                                3,
+                                CompressionAlgorithm::None,
+                                None,
+                                IggyExpiry::NeverExpire,
+                                MaxTopicSize::Unlimited,
+                            )
+                            .await?;
+                    }
+                }
+            }
+            "segment-rotation" => {
+                info!("Setup: creating 1 stream with 1 topic and 1 
partition...");
+                let stream_name = SafeResourceName::new(&self.args.prefix, 
"seg-rot-stream");
+                client.create_stream(&stream_name).await?;
+                let stream_id = 
Identifier::from_str_value(&stream_name).unwrap();
+                let topic_name = SafeResourceName::new(&self.args.prefix, 
"seg-rot-topic");
+                client
+                    .create_topic(
+                        &stream_id,
+                        &topic_name,
+                        1,
+                        CompressionAlgorithm::None,
+                        None,
+                        IggyExpiry::NeverExpire,
+                        MaxTopicSize::Unlimited,
+                    )
+                    .await?;
+            }
+            _ => {}
+        }
+        Ok(())
+    }
+
+    async fn cleanup(&self, client: &IggyClient, prefix: &str) {
+        crate::client::cleanup_prefixed(client, prefix).await;
+    }
+}
+
+async fn wait_for_ops_target(counter: &AtomicU64, target: Option<u64>) {
+    let Some(target) = target else {
+        std::future::pending::<()>().await;
+        return;
+    };
+    let mut interval = tokio::time::interval(Duration::from_millis(50));
+    loop {
+        interval.tick().await;
+        if counter.load(Ordering::Relaxed) >= target {
+            return;
+        }
+    }
+}
+
+/// Resolves when either SIGINT (Ctrl+C) or SIGTERM is received.
+async fn shutdown_signal() {
+    use tokio::signal::unix::{SignalKind, signal};
+
+    let mut sigterm = signal(SignalKind::terminate()).expect("Failed to 
register SIGTERM handler");
+
+    tokio::select! {
+        _ = tokio::signal::ctrl_c() => {}
+        _ = sigterm.recv() => {}
+    }
+}
+
+fn transport_name(t: crate::args::Transport) -> &'static str {
+    match t {
+        crate::args::Transport::Tcp => "tcp",
+        crate::args::Transport::Quic => "quic",
+        crate::args::Transport::Http => "http",
+        crate::args::Transport::WebSocket => "websocket",
+    }
+}
diff --git a/core/lab/src/safe_name.rs b/core/lab/src/safe_name.rs
new file mode 100644
index 000000000..75cef378d
--- /dev/null
+++ b/core/lab/src/safe_name.rs
@@ -0,0 +1,114 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use rand::distr::Alphanumeric;
+use rand::{Rng, RngExt};
+use serde::{Deserialize, Serialize};
+use std::fmt;
+use std::ops::Deref;
+
+const RANDOM_SUFFIX_LEN: usize = 8;
+
+/// Resource name that is guaranteed to start with a configurable prefix.
+///
+/// This prevents chaos tests from accidentally operating on non-lab resources.
+#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
+#[serde(transparent)]
+pub struct SafeResourceName(String);
+
+impl SafeResourceName {
+    pub fn new(prefix: &str, name: &str) -> Self {
+        let full = if name.starts_with(prefix) {
+            name.to_owned()
+        } else {
+            format!("{prefix}{name}")
+        };
+        Self(full)
+    }
+
+    pub fn random(prefix: &str, rng: &mut impl Rng) -> Self {
+        let suffix: String = (0..RANDOM_SUFFIX_LEN)
+            .map(|_| rng.sample(Alphanumeric) as char)
+            .collect();
+        Self(format!("{prefix}{suffix}"))
+    }
+
+    pub fn has_prefix(&self, prefix: &str) -> bool {
+        self.0.starts_with(prefix)
+    }
+
+    pub fn into_inner(self) -> String {
+        self.0
+    }
+}
+
+impl Deref for SafeResourceName {
+    type Target = str;
+
+    fn deref(&self) -> &str {
+        &self.0
+    }
+}
+
+impl fmt::Display for SafeResourceName {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.write_str(&self.0)
+    }
+}
+
+impl From<SafeResourceName> for String {
+    fn from(name: SafeResourceName) -> Self {
+        name.0
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use rand::SeedableRng;
+    use rand::rngs::StdRng;
+
+    #[test]
+    fn new_prepends_prefix_when_missing() {
+        let name = SafeResourceName::new("lab-", "stream1");
+        assert_eq!(&*name, "lab-stream1");
+    }
+
+    #[test]
+    fn new_preserves_existing_prefix() {
+        let name = SafeResourceName::new("lab-", "lab-stream1");
+        assert_eq!(&*name, "lab-stream1");
+    }
+
+    #[test]
+    fn random_produces_prefixed_name() {
+        let mut rng = StdRng::seed_from_u64(42);
+        let name = SafeResourceName::random("lab-", &mut rng);
+        assert!(name.starts_with("lab-"));
+        assert_eq!(name.len(), 4 + RANDOM_SUFFIX_LEN);
+    }
+
+    #[test]
+    fn random_is_deterministic_with_same_seed() {
+        let mut rng1 = StdRng::seed_from_u64(99);
+        let mut rng2 = StdRng::seed_from_u64(99);
+        let a = SafeResourceName::random("lab-", &mut rng1);
+        let b = SafeResourceName::random("lab-", &mut rng2);
+        assert_eq!(a, b);
+    }
+}
diff --git a/core/lab/src/scenarios/concurrent_crud.rs 
b/core/lab/src/scenarios/concurrent_crud.rs
new file mode 100644
index 000000000..ab634710b
--- /dev/null
+++ b/core/lab/src/scenarios/concurrent_crud.rs
@@ -0,0 +1,50 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::Scenario;
+use crate::ops::OpKind;
+
+pub struct ConcurrentCrud;
+
+impl Scenario for ConcurrentCrud {
+    fn name(&self) -> &'static str {
+        "concurrent-crud"
+    }
+
+    fn describe(&self) -> &'static str {
+        "Concurrent stream and topic CRUD operations.\n\n\
+         Phase 1: Setup — creates 5 streams with 2 topics each.\n\
+         Phase 2: Chaos — N workers run weighted create/delete/purge ops.\n\
+         Phase 3: Verify — post-run invariant checks.\n\n\
+         Focuses on detecting race conditions in metadata operations \
+         (create-while-deleting, purge-while-creating, etc.)."
+    }
+
+    fn op_weights(&self) -> Vec<(OpKind, f64)> {
+        vec![
+            (OpKind::CreateStream, 20.0),
+            (OpKind::DeleteStream, 15.0),
+            (OpKind::PurgeStream, 10.0),
+            (OpKind::CreateTopic, 20.0),
+            (OpKind::DeleteTopic, 15.0),
+            (OpKind::PurgeTopic, 10.0),
+            (OpKind::SendMessages, 5.0),
+            (OpKind::PollMessages, 5.0),
+        ]
+    }
+}
diff --git a/core/lab/src/scenarios/mixed_workload.rs 
b/core/lab/src/scenarios/mixed_workload.rs
new file mode 100644
index 000000000..073bbe44a
--- /dev/null
+++ b/core/lab/src/scenarios/mixed_workload.rs
@@ -0,0 +1,57 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::Scenario;
+use crate::ops::OpKind;
+
+pub struct MixedWorkload;
+
+impl Scenario for MixedWorkload {
+    fn name(&self) -> &'static str {
+        "mixed-workload"
+    }
+
+    fn describe(&self) -> &'static str {
+        "Full chaos: all operation types interleaved.\n\n\
+         All workers independently generate a weighted mix of every operation 
type: \
+         create/delete streams and topics, send/poll messages, purge, 
partition \
+         management, consumer groups, and consumer offsets.\n\n\
+         Default weights: SendMessages 40%, PollMessages 25%, \
+         CreateStream/Topic 10%, DeleteStream/Topic 8%, \
+         Purge/Segments 7%, ConsumerGroup 5%, Offsets 5%."
+    }
+
+    fn op_weights(&self) -> Vec<(OpKind, f64)> {
+        vec![
+            (OpKind::SendMessages, 40.0),
+            (OpKind::PollMessages, 25.0),
+            (OpKind::CreateStream, 5.0),
+            (OpKind::CreateTopic, 5.0),
+            (OpKind::DeleteStream, 4.0),
+            (OpKind::DeleteTopic, 4.0),
+            (OpKind::PurgeStream, 2.0),
+            (OpKind::PurgeTopic, 2.0),
+            (OpKind::DeleteSegments, 3.0),
+            (OpKind::CreatePartitions, 2.0),
+            (OpKind::DeletePartitions, 1.0),
+            (OpKind::CreateConsumerGroup, 3.0),
+            (OpKind::DeleteConsumerGroup, 2.0),
+            (OpKind::StoreConsumerOffset, 2.0),
+        ]
+    }
+}
diff --git a/core/lab/src/scenarios/mod.rs b/core/lab/src/scenarios/mod.rs
new file mode 100644
index 000000000..8f6e52990
--- /dev/null
+++ b/core/lab/src/scenarios/mod.rs
@@ -0,0 +1,55 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+pub mod concurrent_crud;
+pub mod mixed_workload;
+pub mod segment_rotation;
+
+use crate::args::ScenarioName;
+use crate::ops::OpKind;
+
+pub trait Scenario: Send + Sync {
+    fn name(&self) -> &'static str;
+    fn describe(&self) -> &'static str;
+    fn op_weights(&self) -> Vec<(OpKind, f64)>;
+}
+
+pub fn create_scenario(name: ScenarioName) -> Box<dyn Scenario> {
+    match name {
+        ScenarioName::ConcurrentCrud => 
Box::new(concurrent_crud::ConcurrentCrud),
+        ScenarioName::SegmentRotation => 
Box::new(segment_rotation::SegmentRotation),
+        ScenarioName::MixedWorkload => Box::new(mixed_workload::MixedWorkload),
+    }
+}
+
+pub fn list_scenarios() -> Vec<(&'static str, &'static str)> {
+    vec![
+        (
+            concurrent_crud::ConcurrentCrud.name(),
+            concurrent_crud::ConcurrentCrud.describe(),
+        ),
+        (
+            segment_rotation::SegmentRotation.name(),
+            segment_rotation::SegmentRotation.describe(),
+        ),
+        (
+            mixed_workload::MixedWorkload.name(),
+            mixed_workload::MixedWorkload.describe(),
+        ),
+    ]
+}
diff --git a/core/lab/src/scenarios/segment_rotation.rs 
b/core/lab/src/scenarios/segment_rotation.rs
new file mode 100644
index 000000000..cecbdecde
--- /dev/null
+++ b/core/lab/src/scenarios/segment_rotation.rs
@@ -0,0 +1,46 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::Scenario;
+use crate::ops::OpKind;
+
+pub struct SegmentRotation;
+
+impl Scenario for SegmentRotation {
+    fn name(&self) -> &'static str {
+        "segment-rotation"
+    }
+
+    fn describe(&self) -> &'static str {
+        "Segment rotation stress test.\n\n\
+         Phase 1: Setup — creates 1 stream with 1 topic and 1 partition.\n\
+         Phase 2: Chaos — all workers send small messages to the same 
partition \
+         while concurrently polling.\n\
+         Phase 3: Verify — offset monotonicity, no gaps, watermark 
violations.\n\n\
+         Targets segment rotation logic by producing high message throughput \
+         to a single partition."
+    }
+
+    fn op_weights(&self) -> Vec<(OpKind, f64)> {
+        vec![
+            (OpKind::SendMessages, 50.0),
+            (OpKind::PollMessages, 40.0),
+            (OpKind::DeleteSegments, 10.0),
+        ]
+    }
+}
diff --git a/core/lab/src/shadow.rs b/core/lab/src/shadow.rs
new file mode 100644
index 000000000..d13774bdb
--- /dev/null
+++ b/core/lab/src/shadow.rs
@@ -0,0 +1,244 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use dashmap::DashMap;
+use rand::{Rng, RngExt};
+use std::collections::{BTreeMap, BTreeSet, VecDeque};
+use std::sync::atomic::{AtomicU64, Ordering};
+use std::time::{Duration, Instant};
+
+const TOMBSTONE_RETENTION: Duration = Duration::from_secs(10);
+
+pub struct ShadowState {
+    pub streams: BTreeMap<String, ShadowStream>,
+    /// Recently deleted resource names with deletion timestamp, for error 
classification.
+    recently_deleted: VecDeque<(String, Instant)>,
+    /// Recently purged (stream, topic) pairs — offsets reset after purge.
+    recently_purged: VecDeque<(String, Option<String>, Instant)>,
+}
+
+pub struct ShadowStream {
+    pub topics: BTreeMap<String, ShadowTopic>,
+}
+
+pub struct ShadowTopic {
+    pub partitions: u32,
+    pub consumer_groups: BTreeSet<String>,
+}
+
+#[allow(dead_code)]
+impl ShadowState {
+    pub fn new() -> Self {
+        Self {
+            streams: BTreeMap::new(),
+            recently_deleted: VecDeque::new(),
+            recently_purged: VecDeque::new(),
+        }
+    }
+
+    pub fn stream_exists(&self, name: &str) -> bool {
+        self.streams.contains_key(name)
+    }
+
+    pub fn topic_exists(&self, stream: &str, topic: &str) -> bool {
+        self.streams
+            .get(stream)
+            .is_some_and(|s| s.topics.contains_key(topic))
+    }
+
+    pub fn get_topic(&self, stream: &str, topic: &str) -> Option<&ShadowTopic> 
{
+        self.streams.get(stream).and_then(|s| s.topics.get(topic))
+    }
+
+    pub fn random_stream(&self, rng: &mut impl Rng) -> Option<&str> {
+        if self.streams.is_empty() {
+            return None;
+        }
+        let idx = rng.random_range(0..self.streams.len());
+        self.streams.keys().nth(idx).map(|s| s.as_str())
+    }
+
+    /// Returns (stream_name, topic_name) for a random topic that exists.
+    pub fn random_topic(&self, rng: &mut impl Rng) -> Option<(&str, &str)> {
+        let streams_with_topics: Vec<_> = self
+            .streams
+            .iter()
+            .filter(|(_, s)| !s.topics.is_empty())
+            .collect();
+        if streams_with_topics.is_empty() {
+            return None;
+        }
+        let (sname, stream) = 
streams_with_topics[rng.random_range(0..streams_with_topics.len())];
+        let idx = rng.random_range(0..stream.topics.len());
+        let tname = stream.topics.keys().nth(idx)?;
+        Some((sname.as_str(), tname.as_str()))
+    }
+
+    /// Returns (stream_name, topic_name, partition_count) for a random topic.
+    pub fn random_topic_with_partitions(&self, rng: &mut impl Rng) -> 
Option<(&str, &str, u32)> {
+        let (s, t) = self.random_topic(rng)?;
+        let partitions = self.streams[s].topics[t].partitions;
+        Some((s, t, partitions))
+    }
+
+    pub fn apply_create_stream(&mut self, name: String) {
+        self.streams.insert(
+            name,
+            ShadowStream {
+                topics: BTreeMap::new(),
+            },
+        );
+    }
+
+    pub fn apply_delete_stream(&mut self, name: &str) {
+        if self.streams.remove(name).is_some() {
+            self.recently_deleted
+                .push_back((name.to_owned(), Instant::now()));
+        }
+    }
+
+    pub fn apply_create_topic(&mut self, stream: &str, name: String, 
partitions: u32) {
+        if let Some(s) = self.streams.get_mut(stream) {
+            s.topics.insert(
+                name,
+                ShadowTopic {
+                    partitions,
+                    consumer_groups: BTreeSet::new(),
+                },
+            );
+        }
+    }
+
+    pub fn apply_delete_topic(&mut self, stream: &str, topic: &str) {
+        if let Some(s) = self.streams.get_mut(stream)
+            && s.topics.remove(topic).is_some()
+        {
+            let key = format!("{stream}/{topic}");
+            self.recently_deleted.push_back((key, Instant::now()));
+        }
+    }
+
+    pub fn apply_create_partitions(&mut self, stream: &str, topic: &str, 
count: u32) {
+        if let Some(t) = self
+            .streams
+            .get_mut(stream)
+            .and_then(|s| s.topics.get_mut(topic))
+        {
+            t.partitions += count;
+        }
+    }
+
+    pub fn apply_delete_partitions(&mut self, stream: &str, topic: &str, 
count: u32) {
+        if let Some(t) = self
+            .streams
+            .get_mut(stream)
+            .and_then(|s| s.topics.get_mut(topic))
+        {
+            t.partitions = t.partitions.saturating_sub(count);
+        }
+    }
+
+    pub fn apply_create_consumer_group(&mut self, stream: &str, topic: &str, 
name: String) {
+        if let Some(t) = self
+            .streams
+            .get_mut(stream)
+            .and_then(|s| s.topics.get_mut(topic))
+        {
+            t.consumer_groups.insert(name);
+        }
+    }
+
+    pub fn apply_delete_consumer_group(&mut self, stream: &str, topic: &str, 
name: &str) {
+        if let Some(t) = self
+            .streams
+            .get_mut(stream)
+            .and_then(|s| s.topics.get_mut(topic))
+        {
+            t.consumer_groups.remove(name);
+        }
+    }
+
+    pub fn was_recently_deleted(&self, name: &str) -> bool {
+        self.recently_deleted.iter().any(|(n, _)| n == name)
+    }
+
+    /// Record that a stream or topic was purged (offsets reset).
+    /// `topic` is `None` for stream-level purge (all topics affected).
+    pub fn record_purge(&mut self, stream: String, topic: Option<String>) {
+        self.recently_purged
+            .push_back((stream, topic, Instant::now()));
+    }
+
+    /// Check whether offsets for this partition may have been reset by a 
recent purge.
+    pub fn was_recently_purged(&self, stream: &str, topic: &str) -> bool {
+        self.recently_purged
+            .iter()
+            .any(|(s, t, _)| s == stream && (t.is_none() || t.as_deref() == 
Some(topic)))
+    }
+
+    pub fn prune_tombstones(&mut self) {
+        let cutoff = Instant::now() - TOMBSTONE_RETENTION;
+        while self
+            .recently_deleted
+            .front()
+            .is_some_and(|(_, ts)| *ts < cutoff)
+        {
+            self.recently_deleted.pop_front();
+        }
+        while self
+            .recently_purged
+            .front()
+            .is_some_and(|(_, _, ts)| *ts < cutoff)
+        {
+            self.recently_purged.pop_front();
+        }
+    }
+}
+
+/// Per-partition watermarks tracking the highest offset successfully sent,
+/// used to detect transient visibility violations during polls.
+pub struct Watermarks {
+    inner: DashMap<PartitionKey, AtomicU64>,
+}
+
+type PartitionKey = (String, String, u32);
+
+#[allow(dead_code)]
+impl Watermarks {
+    pub fn new() -> Self {
+        Self {
+            inner: DashMap::new(),
+        }
+    }
+
+    pub fn record_send(&self, stream: &str, topic: &str, partition: u32, 
offset: u64) {
+        let key = (stream.to_owned(), topic.to_owned(), partition);
+        self.inner
+            .entry(key)
+            .and_modify(|v| {
+                v.fetch_max(offset, Ordering::Relaxed);
+            })
+            .or_insert(AtomicU64::new(offset));
+    }
+
+    /// Returns the highest recorded send offset for a partition.
+    pub fn get_watermark(&self, stream: &str, topic: &str, partition: u32) -> 
Option<u64> {
+        let key = (stream.to_owned(), topic.to_owned(), partition);
+        self.inner.get(&key).map(|v| v.load(Ordering::Relaxed))
+    }
+}
diff --git a/core/lab/src/trace.rs b/core/lab/src/trace.rs
new file mode 100644
index 000000000..f5ca17f8b
--- /dev/null
+++ b/core/lab/src/trace.rs
@@ -0,0 +1,129 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::ops::{Op, OpOutcome};
+use serde::Serialize;
+use std::fs::File;
+use std::io::{self, BufWriter, Write};
+use std::path::Path;
+use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
+
+const FLUSH_INTERVAL_ENTRIES: usize = 64;
+const FLUSH_INTERVAL: Duration = Duration::from_millis(100);
+
+pub struct WorkerTraceWriter {
+    writer: BufWriter<File>,
+    worker_id: u32,
+    pending: usize,
+    last_flush: Instant,
+}
+
+#[derive(Serialize)]
+struct TraceEntry<'a> {
+    seq: u64,
+    ts_us: u128,
+    worker: u32,
+    phase: &'static str,
+    #[serde(flatten)]
+    data: TraceData<'a>,
+}
+
+#[derive(Serialize)]
+#[serde(untagged)]
+enum TraceData<'a> {
+    Intent {
+        op: &'a Op,
+    },
+    Outcome {
+        result: &'static str,
+        latency_us: u64,
+        detail: Option<&'a str>,
+    },
+}
+
+impl WorkerTraceWriter {
+    pub fn new(output_dir: &Path, worker_id: u32) -> io::Result<Self> {
+        let path = output_dir.join(format!("trace-worker-{worker_id}.jsonl"));
+        let file = File::create(path)?;
+        Ok(Self {
+            writer: BufWriter::new(file),
+            worker_id,
+            pending: 0,
+            last_flush: Instant::now(),
+        })
+    }
+
+    pub fn write_intent(&mut self, seq: u64, op: &Op) {
+        let entry = TraceEntry {
+            seq,
+            ts_us: now_micros(),
+            worker: self.worker_id,
+            phase: "intent",
+            data: TraceData::Intent { op },
+        };
+        self.write_entry(&entry);
+    }
+
+    pub fn write_outcome(&mut self, seq: u64, outcome: &OpOutcome, latency: 
Duration) {
+        let detail = match outcome {
+            OpOutcome::Success { detail } => detail.as_deref(),
+            OpOutcome::ExpectedError { reason, .. } => Some(reason.as_str()),
+            OpOutcome::UnexpectedError { context, .. } => 
Some(context.as_str()),
+        };
+        let entry = TraceEntry {
+            seq,
+            ts_us: now_micros(),
+            worker: self.worker_id,
+            phase: "outcome",
+            data: TraceData::Outcome {
+                result: outcome.result_tag(),
+                latency_us: latency.as_micros() as u64,
+                detail,
+            },
+        };
+        self.write_entry(&entry);
+    }
+
+    fn write_entry<T: Serialize>(&mut self, entry: &T) {
+        // Serialization to a BufWriter; I/O errors here are non-fatal for 
chaos testing
+        if serde_json::to_writer(&mut self.writer, entry).is_ok() {
+            let _ = self.writer.write_all(b"\n");
+            self.pending += 1;
+            self.maybe_flush();
+        }
+    }
+
+    fn maybe_flush(&mut self) {
+        if self.pending >= FLUSH_INTERVAL_ENTRIES || self.last_flush.elapsed() 
>= FLUSH_INTERVAL {
+            self.flush();
+        }
+    }
+
+    pub fn flush(&mut self) {
+        let _ = self.writer.flush();
+        self.pending = 0;
+        self.last_flush = Instant::now();
+    }
+}
+
+fn now_micros() -> u128 {
+    SystemTime::now()
+        .duration_since(UNIX_EPOCH)
+        .unwrap_or_default()
+        .as_micros()
+}
diff --git a/core/lab/src/worker.rs b/core/lab/src/worker.rs
new file mode 100644
index 000000000..c758323e3
--- /dev/null
+++ b/core/lab/src/worker.rs
@@ -0,0 +1,244 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::invariants::{self, InvariantViolation, PartitionKey};
+use crate::ops::{ErrorClass, Op, OpKind, OpOutcome};
+use crate::shadow::{ShadowState, Watermarks};
+use crate::trace::WorkerTraceWriter;
+use iggy::prelude::IggyClient;
+use rand::distr::weighted::WeightedIndex;
+use rand::prelude::Distribution;
+use rand::rngs::StdRng;
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
+use std::time::Instant;
+use tokio::sync::RwLock;
+use tracing::{debug, error, warn};
+
+pub struct WorkerConfig {
+    pub msg_size: u32,
+    pub messages_per_batch: u32,
+    pub prefix: String,
+    pub fail_fast: bool,
+    pub ops_target: Option<u64>,
+}
+
+pub struct WorkerInit {
+    pub id: u32,
+    pub client: IggyClient,
+    pub rng: StdRng,
+    pub shared_state: Arc<RwLock<ShadowState>>,
+    pub watermarks: Arc<Watermarks>,
+    pub trace_writer: Option<WorkerTraceWriter>,
+    pub seq_counter: Arc<AtomicU64>,
+    pub ops_counter: Arc<AtomicU64>,
+}
+
+struct Worker {
+    id: u32,
+    client: IggyClient,
+    rng: StdRng,
+    shared_state: Arc<RwLock<ShadowState>>,
+    watermarks: Arc<Watermarks>,
+    trace_writer: Option<WorkerTraceWriter>,
+    op_kinds: Vec<OpKind>,
+    weighted_dist: WeightedIndex<f64>,
+    config: WorkerConfig,
+    seq_counter: Arc<AtomicU64>,
+    ops_counter: Arc<AtomicU64>,
+}
+
+pub enum WorkerResult {
+    Ok { ops_executed: u64 },
+    ServerBug(InvariantViolation),
+}
+
+impl Worker {
+    fn new(init: WorkerInit, op_weights: &[(OpKind, f64)], config: 
WorkerConfig) -> Self {
+        let op_kinds: Vec<OpKind> = op_weights.iter().map(|(k, _)| 
*k).collect();
+        let weights: Vec<f64> = op_weights.iter().map(|(_, w)| *w).collect();
+        let weighted_dist = WeightedIndex::new(&weights).unwrap();
+
+        Self {
+            id: init.id,
+            client: init.client,
+            rng: init.rng,
+            shared_state: init.shared_state,
+            watermarks: init.watermarks,
+            trace_writer: init.trace_writer,
+            op_kinds,
+            weighted_dist,
+            config,
+            seq_counter: init.seq_counter,
+            ops_counter: init.ops_counter,
+        }
+    }
+
+    async fn run(mut self, stop: Arc<AtomicBool>) -> WorkerResult {
+        let mut last_offsets: HashMap<PartitionKey, u64> = HashMap::new();
+        let mut ops_executed: u64 = 0;
+
+        while !stop.load(Ordering::Relaxed) && !self.ops_limit_reached() {
+            let kind = self.pick_op_kind();
+
+            let op = {
+                let state = self.shared_state.read().await;
+                Op::generate(
+                    kind,
+                    &state,
+                    &mut self.rng,
+                    &self.config.prefix,
+                    self.config.messages_per_batch,
+                )
+            };
+
+            let Some(op) = op else {
+                continue;
+            };
+
+            let seq = self.seq_counter.fetch_add(1, Ordering::Relaxed);
+            if let Some(tw) = &mut self.trace_writer {
+                tw.write_intent(seq, &op);
+            }
+
+            let start = Instant::now();
+            let outcome = op
+                .execute(&self.client, self.config.msg_size, &mut self.rng)
+                .await;
+            let latency = start.elapsed();
+
+            if let Some(tw) = &mut self.trace_writer {
+                tw.write_outcome(seq, &outcome, latency);
+            }
+
+            debug!(
+                worker = self.id,
+                op = op.kind_tag(),
+                result = outcome.result_tag(),
+                latency_us = latency.as_micros() as u64,
+            );
+
+            // Update shadow state first (records purge events for invariant 
checks)
+            {
+                let mut state = self.shared_state.write().await;
+                op.update_shadow(&mut state, &outcome);
+            }
+
+            // Invariant checks (after shadow update so purge records are 
visible)
+            {
+                let state = self.shared_state.read().await;
+                if let Err(violation) =
+                    invariants::check_offset_monotonicity(&mut last_offsets, 
&op, &outcome, &state)
+                {
+                    error!(worker = self.id, violation = ?violation, "Offset 
monotonicity violated");
+                    if self.config.fail_fast {
+                        return WorkerResult::ServerBug(violation);
+                    }
+                }
+            }
+            if let Err(violation) = 
invariants::check_watermark(&self.watermarks, &op, &outcome) {
+                warn!(worker = self.id, violation = ?violation, "Watermark 
violation (transient?)");
+            }
+
+            // Classify unexpected errors
+            if outcome.is_unexpected() {
+                let state = self.shared_state.read().await;
+                if let OpOutcome::UnexpectedError { ref error, .. } = outcome {
+                    // Try to parse back to IggyError for classification
+                    // Since we only have the string, we classify based on 
shadow state context
+                    let class =
+                        if 
state.was_recently_deleted(op.stream_name_ref().unwrap_or_default()) {
+                            ErrorClass::ExpectedConcurrent
+                        } else {
+                            ErrorClass::ServerBug
+                        };
+
+                    match class {
+                        ErrorClass::ExpectedConcurrent => {
+                            debug!(worker = self.id, "Downgraded to expected: 
{error}");
+                        }
+                        ErrorClass::ServerBug => {
+                            error!(worker = self.id, "Server bug: {error}");
+                            if self.config.fail_fast {
+                                return 
WorkerResult::ServerBug(InvariantViolation {
+                                    kind: "server_bug",
+                                    description: error.clone(),
+                                    context: format!("op={:?}", op),
+                                });
+                            }
+                        }
+                        ErrorClass::Transient => {
+                            warn!(worker = self.id, "Transient error: 
{error}");
+                        }
+                    }
+                }
+            }
+
+            ops_executed += 1;
+            self.ops_counter.fetch_add(1, Ordering::Relaxed);
+        }
+
+        if let Some(tw) = &mut self.trace_writer {
+            tw.flush();
+        }
+
+        WorkerResult::Ok { ops_executed }
+    }
+
+    fn ops_limit_reached(&self) -> bool {
+        self.config
+            .ops_target
+            .is_some_and(|target| self.ops_counter.load(Ordering::Relaxed) >= 
target)
+    }
+
+    fn pick_op_kind(&mut self) -> OpKind {
+        let idx = self.weighted_dist.sample(&mut self.rng);
+        self.op_kinds[idx]
+    }
+}
+
+pub async fn run_worker(
+    init: WorkerInit,
+    op_weights: &[(OpKind, f64)],
+    config: WorkerConfig,
+    stop: Arc<AtomicBool>,
+) -> WorkerResult {
+    Worker::new(init, op_weights, config).run(stop).await
+}
+
+impl Op {
+    fn stream_name_ref(&self) -> Option<&str> {
+        match self {
+            Op::CreateStream { name } | Op::DeleteStream { name } | 
Op::PurgeStream { name } => {
+                Some(name)
+            }
+            Op::CreateTopic { stream, .. }
+            | Op::DeleteTopic { stream, .. }
+            | Op::PurgeTopic { stream, .. }
+            | Op::SendMessages { stream, .. }
+            | Op::PollMessages { stream, .. }
+            | Op::DeleteSegments { stream, .. }
+            | Op::CreatePartitions { stream, .. }
+            | Op::DeletePartitions { stream, .. }
+            | Op::CreateConsumerGroup { stream, .. }
+            | Op::DeleteConsumerGroup { stream, .. }
+            | Op::StoreConsumerOffset { stream, .. } => Some(stream),
+        }
+    }
+}

Reply via email to