This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch lab in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 3e8290a14f1cbd34459f1cb38e4d87913c5a94e8 Author: Hubert Gruszecki <[email protected]> AuthorDate: Fri Feb 13 21:13:59 2026 +0100 lab --- Cargo.lock | 18 ++ Cargo.toml | 1 + DEPENDENCIES.md | 1 + core/lab/Cargo.toml | 43 ++++ core/lab/src/args.rs | 169 +++++++++++++ core/lab/src/client.rs | 66 +++++ core/lab/src/invariants.rs | 297 +++++++++++++++++++++++ core/lab/src/main.rs | 94 +++++++ core/lab/src/ops/classify.rs | 111 +++++++++ core/lab/src/ops/execute.rs | 285 ++++++++++++++++++++++ core/lab/src/ops/generate.rs | 189 +++++++++++++++ core/lab/src/ops/mod.rs | 197 +++++++++++++++ core/lab/src/ops/precondition.rs | 98 ++++++++ core/lab/src/ops/shadow_update.rs | 89 +++++++ core/lab/src/replay.rs | 306 +++++++++++++++++++++++ core/lab/src/report.rs | 101 ++++++++ core/lab/src/runner.rs | 378 +++++++++++++++++++++++++++++ core/lab/src/safe_name.rs | 114 +++++++++ core/lab/src/scenarios/concurrent_crud.rs | 50 ++++ core/lab/src/scenarios/mixed_workload.rs | 57 +++++ core/lab/src/scenarios/mod.rs | 55 +++++ core/lab/src/scenarios/segment_rotation.rs | 46 ++++ core/lab/src/shadow.rs | 244 +++++++++++++++++++ core/lab/src/trace.rs | 129 ++++++++++ core/lab/src/worker.rs | 244 +++++++++++++++++++ 25 files changed, 3382 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index d7ffd97b4..afe8e2a45 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4621,6 +4621,24 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "iggy-lab" +version = "0.1.0" +dependencies = [ + "bytes", + "chrono", + "clap", + "dashmap", + "iggy", + "iggy_common", + "rand 0.10.0", + "serde", + "serde_json", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "iggy-mcp" version = "0.2.3-edge.1" diff --git a/Cargo.toml b/Cargo.toml index e306352c1..0d4dbc4fe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,7 @@ members = [ "core/harness_derive", "core/integration", "core/journal", + "core/lab", "core/message_bus", "core/metadata", "core/partitions", diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md index a9e14e6d4..94d445ae9 100644 --- a/DEPENDENCIES.md +++ b/DEPENDENCIES.md @@ -393,6 +393,7 @@ iggy-bench: 0.3.3-edge.1, "Apache-2.0", iggy-bench-dashboard-server: 0.5.1-edge.1, "Apache-2.0", iggy-cli: 0.10.3-edge.1, "Apache-2.0", iggy-connectors: 0.2.4-edge.1, "Apache-2.0", +iggy-lab: 0.1.0, "Apache-2.0", iggy-mcp: 0.2.3-edge.1, "Apache-2.0", iggy_binary_protocol: 0.8.3-edge.1, "Apache-2.0", iggy_common: 0.8.3-edge.1, "Apache-2.0", diff --git a/core/lab/Cargo.toml b/core/lab/Cargo.toml new file mode 100644 index 000000000..64e628a6b --- /dev/null +++ b/core/lab/Cargo.toml @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iggy-lab" +version = "0.1.0" +edition = "2024" +license = "Apache-2.0" +repository = "https://github.com/apache/iggy" +homepage = "https://iggy.apache.org" +description = "Chaos stress testing CLI for Iggy message streaming platform" + +[[bin]] +name = "iggy-lab" +path = "src/main.rs" + +[dependencies] +bytes = { workspace = true } +chrono = { workspace = true } +clap = { workspace = true } +dashmap = { workspace = true } +iggy = { workspace = true } +iggy_common = { workspace = true } +rand = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } diff --git a/core/lab/src/args.rs b/core/lab/src/args.rs new file mode 100644 index 000000000..acc35a349 --- /dev/null +++ b/core/lab/src/args.rs @@ -0,0 +1,169 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use clap::{Parser, Subcommand, ValueEnum}; +use iggy_common::IggyDuration; +use std::path::PathBuf; +use std::str::FromStr; + +/// Default message payload size in bytes. +const DEFAULT_MESSAGE_SIZE: u32 = 256; + +#[derive(Parser)] +#[command( + name = "iggy-lab", + about = "Chaos stress testing CLI for Iggy message streaming platform" +)] +pub struct IggyLabArgs { + #[command(subcommand)] + pub command: Command, +} + +#[derive(Subcommand)] +pub enum Command { + /// Run a chaos scenario against a live Iggy server + Run(RunArgs), + /// Replay a recorded trace against a live Iggy server + Replay(ReplayArgs), + /// List available scenarios + List, + /// Show detailed description of a scenario + Explain { + /// Scenario name to explain + scenario: ScenarioName, + }, +} + +#[derive(Parser)] +pub struct RunArgs { + /// Which scenario to run + pub scenario: ScenarioName, + + /// Server address (host:port) + #[arg(long, default_value = "127.0.0.1:8090")] + pub server_address: String, + + /// Transport protocol + #[arg(long, default_value = "tcp")] + pub transport: Transport, + + /// PRNG seed for reproducibility (random if omitted) + #[arg(long)] + pub seed: Option<u64>, + + /// How long to run the chaos phase + #[arg(long, default_value = "30s", value_parser = IggyDuration::from_str)] + pub duration: IggyDuration, + + /// Stop after this many total ops (across all workers). Duration still acts as safety timeout. + #[arg(long)] + pub ops: Option<u64>, + + /// Resource name prefix for safety isolation + #[arg(long, default_value = "lab-")] + pub prefix: String, + + /// Directory for trace artifacts (stdout-only if omitted) + #[arg(long)] + pub output_dir: Option<PathBuf>, + + /// Number of concurrent worker tasks + #[arg(long, default_value = "8")] + pub workers: u32, + + /// Payload size in bytes per message + #[arg(long, default_value = "256")] + pub message_size: u32, + + /// Messages per send batch + #[arg(long, default_value = "10")] + pub messages_per_batch: u32, + + /// Continue running after ServerBug classification instead of stopping + #[arg(long)] + pub no_fail_fast: bool, + + /// Delete stale lab-prefixed resources before starting + #[arg(long)] + pub force_cleanup: bool, + + /// Skip post-run resource deletion + #[arg(long)] + pub no_cleanup: bool, + + /// Skip post-run invariant verification + #[arg(long)] + pub skip_post_run_verify: bool, +} + +#[derive(Parser)] +pub struct ReplayArgs { + /// Directory containing trace-worker-*.jsonl files + pub trace_dir: PathBuf, + + /// Server address (host:port) + #[arg(long, default_value = "127.0.0.1:8090")] + pub server_address: String, + + /// Transport protocol + #[arg(long, default_value = "tcp")] + pub transport: Transport, + + /// Payload size in bytes per message + #[arg(long, default_value_t = DEFAULT_MESSAGE_SIZE)] + pub message_size: u32, + + /// Resource name prefix for cleanup + #[arg(long, default_value = "lab-")] + pub prefix: String, + + /// Stop on first concerning divergence + #[arg(long)] + pub fail_fast: bool, + + /// Delete stale lab-prefixed resources before replaying + #[arg(long)] + pub force_cleanup: bool, +} + +/// Maps to TransportProtocol but implements clap::ValueEnum +#[derive(Debug, Clone, Copy, ValueEnum)] +pub enum Transport { + Tcp, + Quic, + Http, + #[value(alias = "ws")] + WebSocket, +} + +#[derive(Debug, Clone, Copy, ValueEnum)] +pub enum ScenarioName { + ConcurrentCrud, + SegmentRotation, + MixedWorkload, +} + +impl std::fmt::Display for ScenarioName { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::ConcurrentCrud => write!(f, "concurrent-crud"), + Self::SegmentRotation => write!(f, "segment-rotation"), + Self::MixedWorkload => write!(f, "mixed-workload"), + } + } +} diff --git a/core/lab/src/client.rs b/core/lab/src/client.rs new file mode 100644 index 000000000..ed7ee5d27 --- /dev/null +++ b/core/lab/src/client.rs @@ -0,0 +1,66 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::args::Transport; +use iggy::prelude::*; +use tracing::warn; + +pub async fn create_client( + server_address: &str, + transport: Transport, +) -> Result<IggyClient, IggyError> { + let client = match transport { + Transport::Tcp => IggyClient::builder() + .with_tcp() + .with_server_address(server_address.to_owned()) + .build()?, + Transport::Quic => IggyClient::builder() + .with_quic() + .with_server_address(server_address.to_owned()) + .build()?, + Transport::Http => { + let api_url = format!("http://{server_address}"); + IggyClient::builder() + .with_http() + .with_api_url(api_url) + .build()? + } + Transport::WebSocket => IggyClient::builder() + .with_websocket() + .with_server_address(server_address.to_owned()) + .build()?, + }; + client.connect().await?; + client + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await?; + Ok(client) +} + +/// Delete all streams whose name starts with `prefix`. +pub async fn cleanup_prefixed(client: &IggyClient, prefix: &str) { + let Ok(streams) = client.get_streams().await else { + return; + }; + for stream in streams.iter().filter(|s| s.name.starts_with(prefix)) { + let id = Identifier::numeric(stream.id).unwrap(); + if let Err(e) = client.delete_stream(&id).await { + warn!("Cleanup: failed to delete stream '{}': {e}", stream.name); + } + } +} diff --git a/core/lab/src/invariants.rs b/core/lab/src/invariants.rs new file mode 100644 index 000000000..403acd7a0 --- /dev/null +++ b/core/lab/src/invariants.rs @@ -0,0 +1,297 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::args::Transport; +use crate::client::create_client; +use crate::ops::{Op, OpOutcome}; +use crate::shadow::{ShadowState, Watermarks}; +use iggy::prelude::*; +use serde::Serialize; +use std::collections::HashMap; + +pub type PartitionKey = (String, String, u32); + +#[derive(Debug, Clone, Serialize)] +pub struct InvariantViolation { + pub kind: &'static str, + pub description: String, + pub context: String, +} + +/// Check that polled offsets are monotonically increasing per partition. +/// Offset regressions after a recent purge are expected and not violations. +pub fn check_offset_monotonicity( + last_offsets: &mut HashMap<PartitionKey, u64>, + op: &Op, + outcome: &OpOutcome, + shadow: &ShadowState, +) -> Result<(), InvariantViolation> { + let (stream, topic, partition) = match op { + Op::PollMessages { + stream, + topic, + partition, + .. + } => (stream, topic, partition), + _ => return Ok(()), + }; + let offset = match outcome { + OpOutcome::Success { + detail: Some(detail), + } => match parse_poll_offset(detail) { + Some(o) => o, + None => return Ok(()), + }, + _ => return Ok(()), + }; + + let key = (stream.clone(), topic.clone(), *partition); + if let Some(&last) = last_offsets.get(&key) + && offset < last + { + if shadow.was_recently_purged(stream, topic) { + last_offsets.insert(key, offset); + return Ok(()); + } + return Err(InvariantViolation { + kind: "offset_regression", + description: format!("Offset went backwards: {last} -> {offset}"), + context: format!("stream={stream}, topic={topic}, partition={partition}"), + }); + } + last_offsets.insert(key, offset); + Ok(()) +} + +/// Check that polled offsets don't exceed send watermarks. +pub fn check_watermark( + watermarks: &Watermarks, + op: &Op, + outcome: &OpOutcome, +) -> Result<(), InvariantViolation> { + let (stream, topic, partition) = match op { + Op::PollMessages { + stream, + topic, + partition, + .. + } => (stream, topic, partition), + _ => return Ok(()), + }; + let offset = match outcome { + OpOutcome::Success { + detail: Some(detail), + } => match parse_poll_offset(detail) { + Some(o) => o, + None => return Ok(()), + }, + _ => return Ok(()), + }; + + if let Some(wm) = watermarks.get_watermark(stream, topic, *partition) + && offset > wm + { + return Err(InvariantViolation { + kind: "watermark_exceeded", + description: format!("Poll offset {offset} exceeds send watermark {wm}"), + context: format!("stream={stream}, topic={topic}, partition={partition}"), + }); + } + Ok(()) +} + +/// Post-run: verify server state matches shadow state for all prefixed resources. +pub async fn post_run_verify( + client: &IggyClient, + shadow: &ShadowState, + prefix: &str, +) -> Vec<InvariantViolation> { + let mut violations = Vec::new(); + + let streams = match client.get_streams().await { + Ok(s) => s, + Err(e) => { + violations.push(InvariantViolation { + kind: "post_run_fetch_failed", + description: format!("Failed to get streams: {e}"), + context: String::new(), + }); + return violations; + } + }; + + let server_streams: HashMap<&str, &Stream> = streams + .iter() + .filter(|s| s.name.starts_with(prefix)) + .map(|s| (s.name.as_str(), s)) + .collect(); + + // Check all shadow streams exist on server + for name in shadow.streams.keys() { + if !server_streams.contains_key(name.as_str()) { + violations.push(InvariantViolation { + kind: "missing_stream", + description: format!("Shadow has stream '{name}' but server does not"), + context: String::new(), + }); + } + } + + // Check no extra prefixed streams on server + for &name in server_streams.keys() { + if !shadow.streams.contains_key(name) { + violations.push(InvariantViolation { + kind: "extra_stream", + description: format!("Server has stream '{name}' not in shadow"), + context: String::new(), + }); + } + } + + // Deep-check topics for each matching stream + for (name, shadow_stream) in &shadow.streams { + let Some(server_stream) = server_streams.get(name.as_str()) else { + continue; + }; + + let stream_id = Identifier::numeric(server_stream.id).unwrap(); + let Ok(Some(details)) = client.get_stream(&stream_id).await else { + violations.push(InvariantViolation { + kind: "stream_details_fetch_failed", + description: format!("Failed to get details for stream '{name}'"), + context: String::new(), + }); + continue; + }; + + let server_topics: HashMap<&str, &Topic> = details + .topics + .iter() + .map(|t| (t.name.as_str(), t)) + .collect(); + + for topic_name in shadow_stream.topics.keys() { + if !server_topics.contains_key(topic_name.as_str()) { + violations.push(InvariantViolation { + kind: "missing_topic", + description: format!( + "Shadow has topic '{topic_name}' in stream '{name}' but server does not" + ), + context: String::new(), + }); + } + } + + for &topic_name in server_topics.keys() { + if !shadow_stream.topics.contains_key(topic_name) { + violations.push(InvariantViolation { + kind: "extra_topic", + description: format!( + "Server has topic '{topic_name}' in stream '{name}' not in shadow" + ), + context: String::new(), + }); + } + } + } + + violations +} + +/// Cross-client consistency: spawn a fresh client and compare get_streams results. +pub async fn cross_client_consistency( + addr: &str, + transport: Transport, + prefix: &str, +) -> Vec<InvariantViolation> { + let mut violations = Vec::new(); + + let client_a = match create_client(addr, transport).await { + Ok(c) => c, + Err(e) => { + violations.push(InvariantViolation { + kind: "cross_client_connect_failed", + description: format!("Client A: {e}"), + context: String::new(), + }); + return violations; + } + }; + let client_b = match create_client(addr, transport).await { + Ok(c) => c, + Err(e) => { + violations.push(InvariantViolation { + kind: "cross_client_connect_failed", + description: format!("Client B: {e}"), + context: String::new(), + }); + return violations; + } + }; + + let (a_result, b_result) = tokio::join!(client_a.get_streams(), client_b.get_streams()); + + let (streams_a, streams_b) = match (a_result, b_result) { + (Ok(a), Ok(b)) => (a, b), + (Err(e), _) | (_, Err(e)) => { + violations.push(InvariantViolation { + kind: "cross_client_fetch_failed", + description: format!("Failed to get streams: {e}"), + context: String::new(), + }); + return violations; + } + }; + + let names_a: std::collections::HashSet<&str> = streams_a + .iter() + .filter(|s| s.name.starts_with(prefix)) + .map(|s| s.name.as_str()) + .collect(); + let names_b: std::collections::HashSet<&str> = streams_b + .iter() + .filter(|s| s.name.starts_with(prefix)) + .map(|s| s.name.as_str()) + .collect(); + + if names_a != names_b { + violations.push(InvariantViolation { + kind: "cross_client_inconsistency", + description: format!( + "Client A sees {} streams, Client B sees {}", + names_a.len(), + names_b.len() + ), + context: format!( + "only_in_a={:?}, only_in_b={:?}", + names_a.difference(&names_b).collect::<Vec<_>>(), + names_b.difference(&names_a).collect::<Vec<_>>() + ), + }); + } + + violations +} + +fn parse_poll_offset(detail: &str) -> Option<u64> { + // detail format: "received=N, offset=M" + detail + .split(", ") + .find_map(|part| part.strip_prefix("offset=")) + .and_then(|v| v.parse().ok()) +} diff --git a/core/lab/src/main.rs b/core/lab/src/main.rs new file mode 100644 index 000000000..aa78ce9c4 --- /dev/null +++ b/core/lab/src/main.rs @@ -0,0 +1,94 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +mod args; +mod client; +mod invariants; +mod ops; +mod replay; +mod report; +mod runner; +mod safe_name; +mod scenarios; +mod shadow; +mod trace; +mod worker; + +use args::{Command, IggyLabArgs}; +use clap::Parser; +use iggy_common::IggyError; +use replay::LabReplay; +use runner::LabRunner; +use tracing_subscriber::EnvFilter; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; + +#[tokio::main] +async fn main() -> Result<(), IggyError> { + let args = IggyLabArgs::parse(); + + match args.command { + Command::List => { + println!("Available scenarios:\n"); + for (name, description) in scenarios::list_scenarios() { + let first_line = description.lines().next().unwrap_or(description); + println!(" {name:<25} {first_line}"); + } + } + Command::Explain { scenario } => { + let s = scenarios::create_scenario(scenario); + println!("{}\n", s.name()); + println!("{}", s.describe()); + } + Command::Run(run_args) => { + init_tracing(); + + let runner = LabRunner::new(run_args); + let outcome = runner.run().await?; + + if !outcome.passed { + std::process::exit(1); + } + } + Command::Replay(replay_args) => { + init_tracing(); + + let outcome = LabReplay::new(replay_args).run().await?; + + println!( + "Replayed {} ops — {} divergences ({} concerning)", + outcome.total_ops, outcome.divergences_total, outcome.divergences_concerning + ); + + if outcome.divergences_concerning > 0 { + std::process::exit(1); + } + } + } + + Ok(()) +} + +fn init_tracing() { + let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); + + tracing_subscriber::registry() + .with(env_filter) + .with(tracing_subscriber::fmt::layer().with_ansi(true)) + .init(); +} diff --git a/core/lab/src/ops/classify.rs b/core/lab/src/ops/classify.rs new file mode 100644 index 000000000..9c4350bda --- /dev/null +++ b/core/lab/src/ops/classify.rs @@ -0,0 +1,111 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use super::{ErrorClass, Op}; +use crate::shadow::ShadowState; +use iggy_common::IggyError; + +#[allow(dead_code)] +impl Op { + /// Classify an IggyError using shadow state knowledge to distinguish + /// between concurrent races (benign), server bugs, and transient failures. + pub fn classify_error(&self, err: &IggyError, state: &ShadowState) -> ErrorClass { + match err { + IggyError::Disconnected + | IggyError::NotConnected + | IggyError::CannotEstablishConnection + | IggyError::Unauthenticated => ErrorClass::Transient, + + IggyError::StreamNameNotFound(name) | IggyError::StreamNameAlreadyExists(name) => { + if state.was_recently_deleted(name) || !state.stream_exists(name) { + ErrorClass::ExpectedConcurrent + } else { + ErrorClass::ServerBug + } + } + + IggyError::StreamIdNotFound(_) => { + if let Some(stream_name) = self.stream_name() + && (state.was_recently_deleted(stream_name) + || !state.stream_exists(stream_name)) + { + return ErrorClass::ExpectedConcurrent; + } + ErrorClass::ServerBug + } + + IggyError::TopicNameNotFound(_, _) + | IggyError::TopicNameAlreadyExists(_, _) + | IggyError::TopicIdNotFound(_, _) => { + if let (Some(s), Some(t)) = (self.stream_name(), self.topic_name()) + && (state.was_recently_deleted(&format!("{s}/{t}")) + || state.was_recently_deleted(s) + || !state.topic_exists(s, t)) + { + return ErrorClass::ExpectedConcurrent; + } + ErrorClass::ServerBug + } + + IggyError::PartitionNotFound(_, _, _) + | IggyError::SegmentNotFound + | IggyError::ConsumerGroupIdNotFound(_, _) + | IggyError::ConsumerGroupNameNotFound(_, _) + | IggyError::ConsumerGroupMemberNotFound(_, _, _) + | IggyError::ResourceNotFound(_) => ErrorClass::ExpectedConcurrent, + + _ => ErrorClass::ServerBug, + } + } + + fn stream_name(&self) -> Option<&str> { + match self { + Op::CreateStream { name } | Op::DeleteStream { name } | Op::PurgeStream { name } => { + Some(name) + } + Op::CreateTopic { stream, .. } + | Op::DeleteTopic { stream, .. } + | Op::PurgeTopic { stream, .. } + | Op::SendMessages { stream, .. } + | Op::PollMessages { stream, .. } + | Op::DeleteSegments { stream, .. } + | Op::CreatePartitions { stream, .. } + | Op::DeletePartitions { stream, .. } + | Op::CreateConsumerGroup { stream, .. } + | Op::DeleteConsumerGroup { stream, .. } + | Op::StoreConsumerOffset { stream, .. } => Some(stream), + } + } + + fn topic_name(&self) -> Option<&str> { + match self { + Op::CreateStream { .. } | Op::DeleteStream { .. } | Op::PurgeStream { .. } => None, + Op::CreateTopic { name, .. } => Some(name), + Op::DeleteTopic { topic, .. } + | Op::PurgeTopic { topic, .. } + | Op::SendMessages { topic, .. } + | Op::PollMessages { topic, .. } + | Op::DeleteSegments { topic, .. } + | Op::CreatePartitions { topic, .. } + | Op::DeletePartitions { topic, .. } + | Op::CreateConsumerGroup { topic, .. } + | Op::DeleteConsumerGroup { topic, .. } + | Op::StoreConsumerOffset { topic, .. } => Some(topic), + } + } +} diff --git a/core/lab/src/ops/execute.rs b/core/lab/src/ops/execute.rs new file mode 100644 index 000000000..295c719d1 --- /dev/null +++ b/core/lab/src/ops/execute.rs @@ -0,0 +1,285 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use super::{Op, OpOutcome}; +use bytes::Bytes; +use iggy::prelude::*; +use rand::{Rng, RngExt}; + +impl Op { + pub async fn execute( + &self, + client: &IggyClient, + msg_size: u32, + rng: &mut (impl Rng + ?Sized), + ) -> OpOutcome { + match self { + Op::CreateStream { name } => match client.create_stream(name).await { + Ok(details) => OpOutcome::success_with(format!("id={}", details.id)), + Err(e) => classify_sdk_error(e, "create_stream"), + }, + + Op::DeleteStream { name } => { + let id = Identifier::from_str_value(name).unwrap(); + match client.delete_stream(&id).await { + Ok(()) => OpOutcome::success(), + Err(e) => classify_sdk_error(e, "delete_stream"), + } + } + + Op::PurgeStream { name } => { + let id = Identifier::from_str_value(name).unwrap(); + match client.purge_stream(&id).await { + Ok(()) => OpOutcome::success(), + Err(e) => classify_sdk_error(e, "purge_stream"), + } + } + + Op::CreateTopic { + stream, + name, + partitions, + } => { + let stream_id = Identifier::from_str_value(stream).unwrap(); + match client + .create_topic( + &stream_id, + name, + *partitions, + CompressionAlgorithm::None, + None, + IggyExpiry::NeverExpire, + MaxTopicSize::Unlimited, + ) + .await + { + Ok(details) => OpOutcome::success_with(format!("id={}", details.id)), + Err(e) => classify_sdk_error(e, "create_topic"), + } + } + + Op::DeleteTopic { stream, topic } => { + let stream_id = Identifier::from_str_value(stream).unwrap(); + let topic_id = Identifier::from_str_value(topic).unwrap(); + match client.delete_topic(&stream_id, &topic_id).await { + Ok(()) => OpOutcome::success(), + Err(e) => classify_sdk_error(e, "delete_topic"), + } + } + + Op::PurgeTopic { stream, topic } => { + let stream_id = Identifier::from_str_value(stream).unwrap(); + let topic_id = Identifier::from_str_value(topic).unwrap(); + match client.purge_topic(&stream_id, &topic_id).await { + Ok(()) => OpOutcome::success(), + Err(e) => classify_sdk_error(e, "purge_topic"), + } + } + + Op::SendMessages { + stream, + topic, + partition, + count, + } => { + let stream_id = Identifier::from_str_value(stream).unwrap(); + let topic_id = Identifier::from_str_value(topic).unwrap(); + let partitioning = Partitioning::partition_id(*partition); + let mut messages: Vec<IggyMessage> = Vec::with_capacity(*count as usize); + for _ in 0..*count { + let mut payload = vec![0u8; msg_size as usize]; + rng.fill(&mut payload[..]); + messages.push( + IggyMessage::builder() + .payload(Bytes::from(payload)) + .build() + .unwrap(), + ); + } + match client + .send_messages(&stream_id, &topic_id, &partitioning, &mut messages) + .await + { + Ok(()) => OpOutcome::success_with(format!("count={count}")), + Err(e) => classify_sdk_error(e, "send_messages"), + } + } + + Op::PollMessages { + stream, + topic, + partition, + count, + } => { + let stream_id = Identifier::from_str_value(stream).unwrap(); + let topic_id = Identifier::from_str_value(topic).unwrap(); + let consumer = Consumer::new(Identifier::numeric(1).unwrap()); + let strategy = PollingStrategy::next(); + match client + .poll_messages( + &stream_id, + &topic_id, + Some(*partition), + &consumer, + &strategy, + *count, + false, + ) + .await + { + Ok(polled) => OpOutcome::success_with(format!( + "received={}, offset={}", + polled.messages.len(), + polled.current_offset + )), + Err(e) => classify_sdk_error(e, "poll_messages"), + } + } + + Op::DeleteSegments { + stream, + topic, + partition, + count, + } => { + let stream_id = Identifier::from_str_value(stream).unwrap(); + let topic_id = Identifier::from_str_value(topic).unwrap(); + match client + .delete_segments(&stream_id, &topic_id, *partition, *count) + .await + { + Ok(()) => OpOutcome::success(), + Err(e) => classify_sdk_error(e, "delete_segments"), + } + } + + Op::CreatePartitions { + stream, + topic, + count, + } => { + let stream_id = Identifier::from_str_value(stream).unwrap(); + let topic_id = Identifier::from_str_value(topic).unwrap(); + match client + .create_partitions(&stream_id, &topic_id, *count) + .await + { + Ok(()) => OpOutcome::success(), + Err(e) => classify_sdk_error(e, "create_partitions"), + } + } + + Op::DeletePartitions { + stream, + topic, + count, + } => { + let stream_id = Identifier::from_str_value(stream).unwrap(); + let topic_id = Identifier::from_str_value(topic).unwrap(); + match client + .delete_partitions(&stream_id, &topic_id, *count) + .await + { + Ok(()) => OpOutcome::success(), + Err(e) => classify_sdk_error(e, "delete_partitions"), + } + } + + Op::CreateConsumerGroup { + stream, + topic, + name, + } => { + let stream_id = Identifier::from_str_value(stream).unwrap(); + let topic_id = Identifier::from_str_value(topic).unwrap(); + match client + .create_consumer_group(&stream_id, &topic_id, name) + .await + { + Ok(details) => OpOutcome::success_with(format!("id={}", details.id)), + Err(e) => classify_sdk_error(e, "create_consumer_group"), + } + } + + Op::DeleteConsumerGroup { + stream, + topic, + name, + } => { + let stream_id = Identifier::from_str_value(stream).unwrap(); + let topic_id = Identifier::from_str_value(topic).unwrap(); + let group_id = Identifier::from_str_value(name).unwrap(); + match client + .delete_consumer_group(&stream_id, &topic_id, &group_id) + .await + { + Ok(()) => OpOutcome::success(), + Err(e) => classify_sdk_error(e, "delete_consumer_group"), + } + } + + Op::StoreConsumerOffset { + stream, + topic, + partition, + offset, + } => { + let stream_id = Identifier::from_str_value(stream).unwrap(); + let topic_id = Identifier::from_str_value(topic).unwrap(); + let consumer = Consumer::new(Identifier::numeric(1).unwrap()); + match client + .store_consumer_offset( + &consumer, + &stream_id, + &topic_id, + Some(*partition), + *offset, + ) + .await + { + Ok(()) => OpOutcome::success(), + Err(e) => classify_sdk_error(e, "store_consumer_offset"), + } + } + } + } +} + +fn classify_sdk_error(err: IggyError, op_name: &str) -> OpOutcome { + match &err { + IggyError::StreamNameAlreadyExists(_) + | IggyError::TopicNameAlreadyExists(_, _) + | IggyError::StreamNameNotFound(_) + | IggyError::TopicNameNotFound(_, _) + | IggyError::StreamIdNotFound(_) + | IggyError::TopicIdNotFound(_, _) + | IggyError::PartitionNotFound(_, _, _) + | IggyError::SegmentNotFound + | IggyError::ConsumerGroupIdNotFound(_, _) + | IggyError::ConsumerGroupNameNotFound(_, _) + | IggyError::ConsumerGroupMemberNotFound(_, _, _) + | IggyError::ResourceNotFound(_) => OpOutcome::expected(err.to_string(), "concurrent_race"), + IggyError::Disconnected + | IggyError::NotConnected + | IggyError::CannotEstablishConnection + | IggyError::Unauthenticated => { + OpOutcome::unexpected(err.to_string(), format!("{op_name}: transient")) + } + _ => OpOutcome::unexpected(err.to_string(), op_name.to_string()), + } +} diff --git a/core/lab/src/ops/generate.rs b/core/lab/src/ops/generate.rs new file mode 100644 index 000000000..03f5e880f --- /dev/null +++ b/core/lab/src/ops/generate.rs @@ -0,0 +1,189 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use super::{Op, OpKind}; +use crate::safe_name::SafeResourceName; +use crate::shadow::ShadowState; +use rand::{Rng, RngExt}; + +const DEFAULT_PARTITIONS: u32 = 3; + +impl Op { + /// Generate a concrete Op from an OpKind, resolving targets from the shadow state. + /// Returns None if the precondition for this kind cannot be met. + pub fn generate( + kind: OpKind, + state: &ShadowState, + rng: &mut impl Rng, + prefix: &str, + messages_per_batch: u32, + ) -> Option<Self> { + match kind { + OpKind::CreateStream => { + let name = SafeResourceName::random(prefix, rng); + Some(Op::CreateStream { + name: name.into_inner(), + }) + } + + OpKind::DeleteStream => { + let name = state.random_stream(rng)?; + Some(Op::DeleteStream { + name: name.to_owned(), + }) + } + + OpKind::PurgeStream => { + let name = state.random_stream(rng)?; + Some(Op::PurgeStream { + name: name.to_owned(), + }) + } + + OpKind::CreateTopic => { + let stream = state.random_stream(rng)?; + let name = SafeResourceName::random(prefix, rng); + Some(Op::CreateTopic { + stream: stream.to_owned(), + name: name.into_inner(), + partitions: DEFAULT_PARTITIONS, + }) + } + + OpKind::DeleteTopic => { + let (stream, topic) = state.random_topic(rng)?; + Some(Op::DeleteTopic { + stream: stream.to_owned(), + topic: topic.to_owned(), + }) + } + + OpKind::PurgeTopic => { + let (stream, topic) = state.random_topic(rng)?; + Some(Op::PurgeTopic { + stream: stream.to_owned(), + topic: topic.to_owned(), + }) + } + + OpKind::SendMessages => { + let (stream, topic, partitions) = state.random_topic_with_partitions(rng)?; + if partitions == 0 { + return None; + } + let partition = rng.random_range(1..=partitions); + Some(Op::SendMessages { + stream: stream.to_owned(), + topic: topic.to_owned(), + partition, + count: messages_per_batch, + }) + } + + OpKind::PollMessages => { + let (stream, topic, partitions) = state.random_topic_with_partitions(rng)?; + if partitions == 0 { + return None; + } + let partition = rng.random_range(1..=partitions); + Some(Op::PollMessages { + stream: stream.to_owned(), + topic: topic.to_owned(), + partition, + count: messages_per_batch, + }) + } + + OpKind::DeleteSegments => { + let (stream, topic, partitions) = state.random_topic_with_partitions(rng)?; + if partitions == 0 { + return None; + } + let partition = rng.random_range(1..=partitions); + Some(Op::DeleteSegments { + stream: stream.to_owned(), + topic: topic.to_owned(), + partition, + count: 1, + }) + } + + OpKind::CreatePartitions => { + let (stream, topic) = state.random_topic(rng)?; + Some(Op::CreatePartitions { + stream: stream.to_owned(), + topic: topic.to_owned(), + count: 1, + }) + } + + OpKind::DeletePartitions => { + let (stream, topic, partitions) = state.random_topic_with_partitions(rng)?; + // Server requires at least 1 partition to remain + if partitions <= 1 { + return None; + } + Some(Op::DeletePartitions { + stream: stream.to_owned(), + topic: topic.to_owned(), + count: 1, + }) + } + + OpKind::CreateConsumerGroup => { + let (stream, topic) = state.random_topic(rng)?; + let name = SafeResourceName::random(prefix, rng); + Some(Op::CreateConsumerGroup { + stream: stream.to_owned(), + topic: topic.to_owned(), + name: name.into_inner(), + }) + } + + OpKind::DeleteConsumerGroup => { + let (stream, topic) = state.random_topic(rng)?; + let groups = &state.streams[stream].topics[topic].consumer_groups; + if groups.is_empty() { + return None; + } + let idx = rng.random_range(0..groups.len()); + let name = groups.iter().nth(idx)?.to_owned(); + Some(Op::DeleteConsumerGroup { + stream: stream.to_owned(), + topic: topic.to_owned(), + name, + }) + } + + OpKind::StoreConsumerOffset => { + let (stream, topic, partitions) = state.random_topic_with_partitions(rng)?; + if partitions == 0 { + return None; + } + let partition = rng.random_range(1..=partitions); + let offset = rng.random_range(0..1000); + Some(Op::StoreConsumerOffset { + stream: stream.to_owned(), + topic: topic.to_owned(), + partition, + offset, + }) + } + } + } +} diff --git a/core/lab/src/ops/mod.rs b/core/lab/src/ops/mod.rs new file mode 100644 index 000000000..ff9d1a926 --- /dev/null +++ b/core/lab/src/ops/mod.rs @@ -0,0 +1,197 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +pub mod classify; +pub mod execute; +pub mod generate; +pub mod precondition; +pub mod shadow_update; + +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type")] +pub enum Op { + CreateStream { + name: String, + }, + DeleteStream { + name: String, + }, + PurgeStream { + name: String, + }, + CreateTopic { + stream: String, + name: String, + partitions: u32, + }, + DeleteTopic { + stream: String, + topic: String, + }, + PurgeTopic { + stream: String, + topic: String, + }, + SendMessages { + stream: String, + topic: String, + partition: u32, + count: u32, + }, + PollMessages { + stream: String, + topic: String, + partition: u32, + count: u32, + }, + DeleteSegments { + stream: String, + topic: String, + partition: u32, + count: u32, + }, + CreatePartitions { + stream: String, + topic: String, + count: u32, + }, + DeletePartitions { + stream: String, + topic: String, + count: u32, + }, + CreateConsumerGroup { + stream: String, + topic: String, + name: String, + }, + DeleteConsumerGroup { + stream: String, + topic: String, + name: String, + }, + StoreConsumerOffset { + stream: String, + topic: String, + partition: u32, + offset: u64, + }, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "status")] +pub enum OpOutcome { + Success { detail: Option<String> }, + ExpectedError { error: String, reason: String }, + UnexpectedError { error: String, context: String }, +} + +impl OpOutcome { + pub fn success() -> Self { + Self::Success { detail: None } + } + + pub fn success_with(detail: impl Into<String>) -> Self { + Self::Success { + detail: Some(detail.into()), + } + } + + pub fn expected(error: impl Into<String>, reason: impl Into<String>) -> Self { + Self::ExpectedError { + error: error.into(), + reason: reason.into(), + } + } + + pub fn unexpected(error: impl Into<String>, context: impl Into<String>) -> Self { + Self::UnexpectedError { + error: error.into(), + context: context.into(), + } + } + + pub fn is_success(&self) -> bool { + matches!(self, Self::Success { .. }) + } + + pub fn is_unexpected(&self) -> bool { + matches!(self, Self::UnexpectedError { .. }) + } + + pub fn result_tag(&self) -> &'static str { + match self { + Self::Success { .. } => "ok", + Self::ExpectedError { .. } => "expected_error", + Self::UnexpectedError { .. } => "unexpected_error", + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[allow(dead_code)] +pub enum ErrorClass { + /// Another worker deleted the resource concurrently — benign + ExpectedConcurrent, + /// Server returned something that contradicts shadow state + ServerBug, + /// Transient network/timeout error — retry-safe + Transient, +} + +/// Selects which kind of operation to generate (before target resolution). +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum OpKind { + CreateStream, + DeleteStream, + PurgeStream, + CreateTopic, + DeleteTopic, + PurgeTopic, + SendMessages, + PollMessages, + DeleteSegments, + CreatePartitions, + DeletePartitions, + CreateConsumerGroup, + DeleteConsumerGroup, + StoreConsumerOffset, +} + +impl Op { + pub fn kind_tag(&self) -> &'static str { + match self { + Self::CreateStream { .. } => "create_stream", + Self::DeleteStream { .. } => "delete_stream", + Self::PurgeStream { .. } => "purge_stream", + Self::CreateTopic { .. } => "create_topic", + Self::DeleteTopic { .. } => "delete_topic", + Self::PurgeTopic { .. } => "purge_topic", + Self::SendMessages { .. } => "send_messages", + Self::PollMessages { .. } => "poll_messages", + Self::DeleteSegments { .. } => "delete_segments", + Self::CreatePartitions { .. } => "create_partitions", + Self::DeletePartitions { .. } => "delete_partitions", + Self::CreateConsumerGroup { .. } => "create_consumer_group", + Self::DeleteConsumerGroup { .. } => "delete_consumer_group", + Self::StoreConsumerOffset { .. } => "store_consumer_offset", + } + } +} diff --git a/core/lab/src/ops/precondition.rs b/core/lab/src/ops/precondition.rs new file mode 100644 index 000000000..812726bf8 --- /dev/null +++ b/core/lab/src/ops/precondition.rs @@ -0,0 +1,98 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use super::Op; +use crate::shadow::ShadowState; + +#[allow(dead_code)] +impl Op { + /// Check whether this op's preconditions are met according to our shadow state. + /// A false return means the op would definitely fail, so the worker should skip it. + pub fn precondition_met(&self, state: &ShadowState) -> bool { + match self { + Op::CreateStream { name } => !state.stream_exists(name), + Op::DeleteStream { name } | Op::PurgeStream { name } => state.stream_exists(name), + + Op::CreateTopic { stream, name, .. } => { + state.stream_exists(stream) && !state.topic_exists(stream, name) + } + Op::DeleteTopic { stream, topic } | Op::PurgeTopic { stream, topic } => { + state.topic_exists(stream, topic) + } + + Op::SendMessages { + stream, + topic, + partition, + .. + } + | Op::PollMessages { + stream, + topic, + partition, + .. + } => state + .get_topic(stream, topic) + .is_some_and(|t| *partition >= 1 && *partition <= t.partitions), + + Op::DeleteSegments { + stream, + topic, + partition, + .. + } => state + .get_topic(stream, topic) + .is_some_and(|t| *partition >= 1 && *partition <= t.partitions), + + Op::CreatePartitions { stream, topic, .. } => state.topic_exists(stream, topic), + + Op::DeletePartitions { + stream, + topic, + count, + } => state + .get_topic(stream, topic) + .is_some_and(|t| t.partitions > *count), + + Op::CreateConsumerGroup { + stream, + topic, + name, + } => state + .get_topic(stream, topic) + .is_some_and(|t| !t.consumer_groups.contains(name)), + + Op::DeleteConsumerGroup { + stream, + topic, + name, + } => state + .get_topic(stream, topic) + .is_some_and(|t| t.consumer_groups.contains(name)), + + Op::StoreConsumerOffset { + stream, + topic, + partition, + .. + } => state + .get_topic(stream, topic) + .is_some_and(|t| *partition >= 1 && *partition <= t.partitions), + } + } +} diff --git a/core/lab/src/ops/shadow_update.rs b/core/lab/src/ops/shadow_update.rs new file mode 100644 index 000000000..68acd1edf --- /dev/null +++ b/core/lab/src/ops/shadow_update.rs @@ -0,0 +1,89 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use super::{Op, OpOutcome}; +use crate::shadow::ShadowState; + +impl Op { + /// Update the shadow state to reflect a completed operation. + /// Only mutates on success — failed ops leave shadow unchanged. + pub fn update_shadow(&self, state: &mut ShadowState, outcome: &OpOutcome) { + if !outcome.is_success() { + return; + } + + match self { + Op::CreateStream { name } => { + state.apply_create_stream(name.clone()); + } + Op::DeleteStream { name } => { + state.apply_delete_stream(name); + } + Op::PurgeStream { name } => { + state.record_purge(name.clone(), None); + } + Op::CreateTopic { + stream, + name, + partitions, + } => { + state.apply_create_topic(stream, name.clone(), *partitions); + } + Op::DeleteTopic { stream, topic } => { + state.apply_delete_topic(stream, topic); + } + Op::PurgeTopic { stream, topic } => { + state.record_purge(stream.clone(), Some(topic.clone())); + } + Op::SendMessages { .. } => {} + Op::PollMessages { .. } => {} + Op::DeleteSegments { .. } => {} + Op::CreatePartitions { + stream, + topic, + count, + } => { + state.apply_create_partitions(stream, topic, *count); + } + Op::DeletePartitions { + stream, + topic, + count, + } => { + state.apply_delete_partitions(stream, topic, *count); + } + Op::CreateConsumerGroup { + stream, + topic, + name, + } => { + state.apply_create_consumer_group(stream, topic, name.clone()); + } + Op::DeleteConsumerGroup { + stream, + topic, + name, + } => { + state.apply_delete_consumer_group(stream, topic, name); + } + Op::StoreConsumerOffset { .. } => {} + } + + state.prune_tombstones(); + } +} diff --git a/core/lab/src/replay.rs b/core/lab/src/replay.rs new file mode 100644 index 000000000..bf799019a --- /dev/null +++ b/core/lab/src/replay.rs @@ -0,0 +1,306 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::args::ReplayArgs; +use crate::client::{cleanup_prefixed, create_client}; +use crate::ops::Op; +use iggy_common::IggyError; +use rand::SeedableRng; +use rand::rngs::StdRng; +use serde::Deserialize; +use std::collections::HashMap; +use std::fmt; +use std::fs::File; +use std::io::{BufRead, BufReader}; +use tracing::{error, info, warn}; + +pub struct LabReplay { + args: ReplayArgs, +} + +pub struct ReplayOutcome { + pub total_ops: u64, + pub divergences_total: u64, + pub divergences_concerning: u64, +} + +/// Raw JSON line from a trace file — intent and outcome share the same shape +/// with optional fields. +#[derive(Deserialize)] +struct RawTraceEntry { + seq: u64, + phase: String, + // Intent fields + op: Option<Op>, + // Outcome fields + result: Option<String>, + detail: Option<String>, +} + +struct ReplayIntent { + seq: u64, + op: Op, +} + +struct OriginalOutcome { + result: String, + detail: Option<String>, +} + +struct Divergence { + seq: u64, + kind: DivergenceKind, + op: Op, + original_result: String, + replay_result: String, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum DivergenceKind { + /// Original succeeded, replay failed — concerning + OriginalOkReplayFailed, + /// Original succeeded, replay returned expected error — concerning + OriginalOkReplayExpected, + /// Original got expected error, replay succeeded — benign (no concurrent race in sequential replay) + OriginalExpectedReplayOk, + /// Original got unexpected error, replay succeeded — interesting + OriginalUnexpectedReplayOk, + /// Both failed with different classifications + BothFailed, +} + +impl DivergenceKind { + fn is_concerning(self) -> bool { + matches!( + self, + Self::OriginalOkReplayFailed | Self::OriginalOkReplayExpected + ) + } +} + +impl fmt::Display for DivergenceKind { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::OriginalOkReplayFailed => write!(f, "original_ok→replay_failed"), + Self::OriginalOkReplayExpected => write!(f, "original_ok→replay_expected_error"), + Self::OriginalExpectedReplayOk => write!(f, "original_expected→replay_ok"), + Self::OriginalUnexpectedReplayOk => write!(f, "original_unexpected→replay_ok"), + Self::BothFailed => write!(f, "both_failed"), + } + } +} + +impl LabReplay { + pub fn new(args: ReplayArgs) -> Self { + Self { args } + } + + pub async fn run(self) -> Result<ReplayOutcome, IggyError> { + let (intents, outcomes) = self.load_traces()?; + if intents.is_empty() { + error!("No intent entries found in trace directory"); + return Err(IggyError::InvalidConfiguration); + } + + info!("=== iggy-lab replay ==="); + info!("Trace dir: {:?}", self.args.trace_dir); + info!("Intents loaded: {}", intents.len()); + info!( + "Server: {} ({})", + self.args.server_address, + transport_name(self.args.transport) + ); + + let client = create_client(&self.args.server_address, self.args.transport).await?; + + if self.args.force_cleanup { + info!("Cleaning up prefixed resources..."); + cleanup_prefixed(&client, &self.args.prefix).await; + } + + // Deterministic RNG for message payloads (content is irrelevant to correctness) + let mut rng = StdRng::seed_from_u64(0); + let mut divergences: Vec<Divergence> = Vec::new(); + let total_ops = intents.len() as u64; + + for intent in &intents { + let replay_outcome = intent + .op + .execute(&client, self.args.message_size, &mut rng) + .await; + let replay_tag = replay_outcome.result_tag(); + + if let Some(original) = outcomes.get(&intent.seq) { + if replay_tag == original.result { + continue; + } + + let kind = classify_divergence(&original.result, replay_tag); + let divergence = Divergence { + seq: intent.seq, + kind, + op: intent.op.clone(), + original_result: original.result.clone(), + replay_result: replay_tag.to_owned(), + }; + + if kind.is_concerning() { + warn!( + seq = intent.seq, + kind = %kind, + op = intent.op.kind_tag(), + original = %original.result, + replay = replay_tag, + detail = original.detail.as_deref().unwrap_or(""), + "Concerning divergence" + ); + } else { + info!( + seq = intent.seq, + kind = %kind, + op = intent.op.kind_tag(), + "Benign divergence" + ); + } + + let is_concerning = kind.is_concerning(); + divergences.push(divergence); + + if self.args.fail_fast && is_concerning { + error!( + "Fail-fast: stopping on concerning divergence at seq={}", + intent.seq + ); + break; + } + } + } + + let divergences_concerning = divergences + .iter() + .filter(|d| d.kind.is_concerning()) + .count() as u64; + let divergences_total = divergences.len() as u64; + + info!("--- Replay Summary ---"); + info!("Total ops replayed: {total_ops}"); + info!("Total divergences: {divergences_total}"); + info!("Concerning divergences: {divergences_concerning}"); + + if divergences_concerning > 0 { + error!("REPLAY FAILED — {divergences_concerning} concerning divergence(s):"); + for d in divergences.iter().filter(|d| d.kind.is_concerning()) { + error!( + " seq={} op={} {} (original={}, replay={})", + d.seq, + d.op.kind_tag(), + d.kind, + d.original_result, + d.replay_result, + ); + } + } else if divergences_total > 0 { + info!("REPLAY OK — {divergences_total} benign divergence(s)"); + } else { + info!("REPLAY OK — exact match"); + } + + // Cleanup after replay + if self.args.force_cleanup { + info!("Post-replay cleanup..."); + cleanup_prefixed(&client, &self.args.prefix).await; + } + + Ok(ReplayOutcome { + total_ops, + divergences_total, + divergences_concerning, + }) + } + + fn load_traces(&self) -> Result<(Vec<ReplayIntent>, HashMap<u64, OriginalOutcome>), IggyError> { + let entries = + std::fs::read_dir(&self.args.trace_dir).map_err(|_| IggyError::InvalidConfiguration)?; + + let mut intents: Vec<ReplayIntent> = Vec::new(); + let mut outcomes: HashMap<u64, OriginalOutcome> = HashMap::new(); + + for entry in entries { + let entry = entry.map_err(|_| IggyError::InvalidConfiguration)?; + let path = entry.path(); + let name = path.file_name().and_then(|n| n.to_str()).unwrap_or(""); + if !name.starts_with("trace-worker-") || !name.ends_with(".jsonl") { + continue; + } + + let file = File::open(&path).map_err(|_| IggyError::InvalidConfiguration)?; + let reader = BufReader::new(file); + + for line in reader.lines() { + let line = line.map_err(|_| IggyError::InvalidConfiguration)?; + if line.is_empty() { + continue; + } + let raw: RawTraceEntry = + serde_json::from_str(&line).map_err(|_| IggyError::InvalidConfiguration)?; + + match raw.phase.as_str() { + "intent" => { + if let Some(op) = raw.op { + intents.push(ReplayIntent { seq: raw.seq, op }); + } + } + "outcome" => { + if let Some(result) = raw.result { + outcomes.insert( + raw.seq, + OriginalOutcome { + result, + detail: raw.detail, + }, + ); + } + } + _ => {} + } + } + } + + intents.sort_by_key(|i| i.seq); + Ok((intents, outcomes)) + } +} + +fn classify_divergence(original_result: &str, replay_result: &str) -> DivergenceKind { + match (original_result, replay_result) { + ("ok", "unexpected_error") => DivergenceKind::OriginalOkReplayFailed, + ("ok", "expected_error") => DivergenceKind::OriginalOkReplayExpected, + ("expected_error", "ok") => DivergenceKind::OriginalExpectedReplayOk, + ("unexpected_error", "ok") => DivergenceKind::OriginalUnexpectedReplayOk, + _ => DivergenceKind::BothFailed, + } +} + +fn transport_name(t: crate::args::Transport) -> &'static str { + match t { + crate::args::Transport::Tcp => "tcp", + crate::args::Transport::Quic => "quic", + crate::args::Transport::Http => "http", + crate::args::Transport::WebSocket => "websocket", + } +} diff --git a/core/lab/src/report.rs b/core/lab/src/report.rs new file mode 100644 index 000000000..13c6d249e --- /dev/null +++ b/core/lab/src/report.rs @@ -0,0 +1,101 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::invariants::InvariantViolation; +use serde::Serialize; +use std::fs; +use std::io::{self, BufWriter, Write}; +use std::path::Path; +use std::time::Duration; + +pub struct ArtifactBundle<'a> { + pub seed: u64, + pub scenario_name: &'a str, + pub server_address: &'a str, + pub transport: &'a str, + pub workers: u32, + pub duration: Duration, + pub total_ops: u64, + pub passed: bool, + pub violations: &'a [InvariantViolation], + pub ops_target: Option<u64>, +} + +#[derive(Serialize)] +struct ConfigJson<'a> { + seed: u64, + scenario: &'a str, + server_address: &'a str, + transport: &'a str, + workers: u32, + #[serde(skip_serializing_if = "Option::is_none")] + ops_target: Option<u64>, +} + +#[derive(Serialize)] +struct SummaryJson<'a> { + passed: bool, + total_ops: u64, + duration_secs: f64, + violation_count: usize, + scenario: &'a str, +} + +impl<'a> ArtifactBundle<'a> { + pub fn write(&self, output_dir: &Path) -> io::Result<()> { + fs::create_dir_all(output_dir)?; + + // config.json + let config = ConfigJson { + seed: self.seed, + scenario: self.scenario_name, + server_address: self.server_address, + transport: self.transport, + workers: self.workers, + ops_target: self.ops_target, + }; + let config_path = output_dir.join("config.json"); + let file = fs::File::create(config_path)?; + serde_json::to_writer_pretty(BufWriter::new(file), &config).map_err(io::Error::other)?; + + // summary.json + let summary = SummaryJson { + passed: self.passed, + total_ops: self.total_ops, + duration_secs: self.duration.as_secs_f64(), + violation_count: self.violations.len(), + scenario: self.scenario_name, + }; + let summary_path = output_dir.join("summary.json"); + let file = fs::File::create(summary_path)?; + serde_json::to_writer_pretty(BufWriter::new(file), &summary).map_err(io::Error::other)?; + + // violations.jsonl + if !self.violations.is_empty() { + let violations_path = output_dir.join("violations.jsonl"); + let file = fs::File::create(violations_path)?; + let mut writer = BufWriter::new(file); + for v in self.violations { + serde_json::to_writer(&mut writer, v).map_err(io::Error::other)?; + writer.write_all(b"\n")?; + } + } + + Ok(()) + } +} diff --git a/core/lab/src/runner.rs b/core/lab/src/runner.rs new file mode 100644 index 000000000..47764eba1 --- /dev/null +++ b/core/lab/src/runner.rs @@ -0,0 +1,378 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::args::RunArgs; +use crate::client::create_client; +use crate::invariants::{self, InvariantViolation}; +use crate::report::ArtifactBundle; +use crate::safe_name::SafeResourceName; +use crate::scenarios::{self, Scenario}; +use crate::shadow::{ShadowState, Watermarks}; +use crate::trace::WorkerTraceWriter; +use crate::worker::{WorkerConfig, WorkerInit, WorkerResult, run_worker}; +use iggy::prelude::*; +use rand::SeedableRng; +use rand::rngs::StdRng; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::time::{Duration, Instant}; +use tokio::task::JoinSet; +use tracing::{error, info, warn}; + +pub struct LabRunner { + args: RunArgs, +} + +#[allow(dead_code)] +pub struct RunOutcome { + pub passed: bool, + pub ops_total: u64, + pub violations: Vec<InvariantViolation>, + pub duration: Duration, +} + +impl LabRunner { + pub fn new(args: RunArgs) -> Self { + Self { args } + } + + pub async fn run(self) -> Result<RunOutcome, IggyError> { + let seed = self.args.seed.unwrap_or_else(rand::random); + let scenario = scenarios::create_scenario(self.args.scenario); + + info!("=== iggy-lab ==="); + info!("Scenario: {}", scenario.name()); + info!("Seed: {seed}"); + info!( + "Server: {} ({})", + self.args.server_address, + transport_name(self.args.transport) + ); + info!("Workers: {}", self.args.workers); + info!("Duration: {}", self.args.duration); + if let Some(ops) = self.args.ops { + info!("Ops target: {ops}"); + } + info!("Prefix: {:?}", self.args.prefix); + + // Pre-flight check + let admin_client = create_client(&self.args.server_address, self.args.transport).await?; + self.preflight_check(&admin_client, &self.args.prefix, self.args.force_cleanup) + .await?; + + // Setup phase for scenarios that need it + self.setup_phase(&admin_client, scenario.as_ref()).await?; + + if let Some(ref dir) = self.args.output_dir { + std::fs::create_dir_all(dir).map_err(|_| IggyError::InvalidConfiguration)?; + } + + let shared_state = Arc::new(tokio::sync::RwLock::new(ShadowState::new())); + let watermarks = Arc::new(Watermarks::new()); + let stop = Arc::new(AtomicBool::new(false)); + let seq_counter = Arc::new(AtomicU64::new(0)); + let ops_counter = Arc::new(AtomicU64::new(0)); + + // Populate initial shadow state from setup + { + let mut state = shared_state.write().await; + let streams = admin_client.get_streams().await?; + for stream in streams + .iter() + .filter(|s| s.name.starts_with(&self.args.prefix)) + { + state.apply_create_stream(stream.name.clone()); + let stream_id = Identifier::numeric(stream.id).unwrap(); + if let Ok(Some(details)) = admin_client.get_stream(&stream_id).await { + for topic in &details.topics { + state.apply_create_topic( + &stream.name, + topic.name.clone(), + topic.partitions_count, + ); + } + } + } + } + + let op_weights = scenario.op_weights(); + + // Spawn workers + let mut join_set = JoinSet::new(); + for worker_id in 0..self.args.workers { + let worker_rng = StdRng::seed_from_u64(seed.wrapping_add(worker_id as u64)); + let client = create_client(&self.args.server_address, self.args.transport).await?; + let trace_writer = self.args.output_dir.as_ref().map(|dir| { + WorkerTraceWriter::new(dir, worker_id).expect("Failed to create trace writer") + }); + let config = WorkerConfig { + msg_size: self.args.message_size, + messages_per_batch: self.args.messages_per_batch, + prefix: self.args.prefix.clone(), + fail_fast: !self.args.no_fail_fast, + ops_target: self.args.ops, + }; + let init = WorkerInit { + id: worker_id, + client, + rng: worker_rng, + shared_state: Arc::clone(&shared_state), + watermarks: Arc::clone(&watermarks), + trace_writer, + seq_counter: Arc::clone(&seq_counter), + ops_counter: Arc::clone(&ops_counter), + }; + let weights = op_weights.clone(); + let stop_clone = Arc::clone(&stop); + join_set.spawn(async move { run_worker(init, &weights, config, stop_clone).await }); + } + + let run_start = Instant::now(); + let duration: Duration = self.args.duration.get_duration(); + + let ops_target = self.args.ops; + tokio::select! { + _ = tokio::time::sleep(duration) => { + info!("Duration elapsed, stopping workers..."); + } + _ = shutdown_signal() => { + info!("Received shutdown signal, stopping workers..."); + } + _ = wait_for_ops_target(&ops_counter, ops_target) => { + info!("Op count target reached, stopping workers..."); + } + } + stop.store(true, Ordering::Relaxed); + + // Collect results + let mut total_ops = 0u64; + let mut violations = Vec::new(); + while let Some(result) = join_set.join_next().await { + match result { + Ok(WorkerResult::Ok { ops_executed }) => { + total_ops += ops_executed; + } + Ok(WorkerResult::ServerBug(v)) => { + violations.push(v); + } + Err(e) => { + error!("Worker panicked: {e}"); + } + } + } + + let run_duration = run_start.elapsed(); + info!("All workers stopped. Total ops: {total_ops}, duration: {run_duration:.1?}"); + + // Post-run verification + if !self.args.skip_post_run_verify { + info!("Running post-run verification..."); + let state = shared_state.read().await; + let post_violations = + invariants::post_run_verify(&admin_client, &state, &self.args.prefix).await; + if !post_violations.is_empty() { + warn!("Post-run violations: {}", post_violations.len()); + } + violations.extend(post_violations); + + let cross_violations = invariants::cross_client_consistency( + &self.args.server_address, + self.args.transport, + &self.args.prefix, + ) + .await; + if !cross_violations.is_empty() { + warn!("Cross-client violations: {}", cross_violations.len()); + } + violations.extend(cross_violations); + } + + let passed = violations.is_empty(); + + // Write artifacts + if let Some(ref dir) = self.args.output_dir { + let _state = shared_state.read().await; + let bundle = ArtifactBundle { + seed, + scenario_name: scenario.name(), + server_address: &self.args.server_address, + transport: transport_name(self.args.transport), + workers: self.args.workers, + duration: run_duration, + total_ops, + passed, + violations: &violations, + ops_target: self.args.ops, + }; + if let Err(e) = bundle.write(dir) { + error!("Failed to write artifacts: {e}"); + } + } + + // Cleanup + if !self.args.no_cleanup { + info!("Cleaning up lab resources..."); + self.cleanup(&admin_client, &self.args.prefix).await; + } + + if passed { + info!("PASSED — {total_ops} ops, 0 violations"); + } else { + error!("FAILED — {total_ops} ops, {} violations", violations.len()); + for v in &violations { + error!(" [{}] {}: {}", v.kind, v.description, v.context); + } + } + + Ok(RunOutcome { + passed, + ops_total: total_ops, + violations, + duration: run_duration, + }) + } + + async fn preflight_check( + &self, + client: &IggyClient, + prefix: &str, + force: bool, + ) -> Result<(), IggyError> { + let streams = client.get_streams().await?; + let stale: Vec<_> = streams + .iter() + .filter(|s| s.name.starts_with(prefix)) + .collect(); + + if stale.is_empty() { + return Ok(()); + } + + if force { + info!( + "Force-cleaning {} stale resources with prefix '{prefix}'", + stale.len() + ); + for stream in &stale { + let id = Identifier::numeric(stream.id).unwrap(); + if let Err(e) = client.delete_stream(&id).await { + warn!("Failed to delete stale stream '{}': {e}", stream.name); + } + } + Ok(()) + } else { + error!( + "Found {} stale resources with prefix '{prefix}'. Use --force-cleanup to remove them.", + stale.len() + ); + Err(IggyError::InvalidConfiguration) + } + } + + async fn setup_phase( + &self, + client: &IggyClient, + scenario: &dyn Scenario, + ) -> Result<(), IggyError> { + match scenario.name() { + "concurrent-crud" => { + info!("Setup: creating 5 streams with 2 topics each..."); + for i in 0..5 { + let stream_name = + SafeResourceName::new(&self.args.prefix, &format!("setup-s{i}")); + client.create_stream(&stream_name).await?; + for j in 0..2 { + let topic_name = + SafeResourceName::new(&self.args.prefix, &format!("setup-t{i}-{j}")); + let stream_id = Identifier::from_str_value(&stream_name).unwrap(); + client + .create_topic( + &stream_id, + &topic_name, + 3, + CompressionAlgorithm::None, + None, + IggyExpiry::NeverExpire, + MaxTopicSize::Unlimited, + ) + .await?; + } + } + } + "segment-rotation" => { + info!("Setup: creating 1 stream with 1 topic and 1 partition..."); + let stream_name = SafeResourceName::new(&self.args.prefix, "seg-rot-stream"); + client.create_stream(&stream_name).await?; + let stream_id = Identifier::from_str_value(&stream_name).unwrap(); + let topic_name = SafeResourceName::new(&self.args.prefix, "seg-rot-topic"); + client + .create_topic( + &stream_id, + &topic_name, + 1, + CompressionAlgorithm::None, + None, + IggyExpiry::NeverExpire, + MaxTopicSize::Unlimited, + ) + .await?; + } + _ => {} + } + Ok(()) + } + + async fn cleanup(&self, client: &IggyClient, prefix: &str) { + crate::client::cleanup_prefixed(client, prefix).await; + } +} + +async fn wait_for_ops_target(counter: &AtomicU64, target: Option<u64>) { + let Some(target) = target else { + std::future::pending::<()>().await; + return; + }; + let mut interval = tokio::time::interval(Duration::from_millis(50)); + loop { + interval.tick().await; + if counter.load(Ordering::Relaxed) >= target { + return; + } + } +} + +/// Resolves when either SIGINT (Ctrl+C) or SIGTERM is received. +async fn shutdown_signal() { + use tokio::signal::unix::{SignalKind, signal}; + + let mut sigterm = signal(SignalKind::terminate()).expect("Failed to register SIGTERM handler"); + + tokio::select! { + _ = tokio::signal::ctrl_c() => {} + _ = sigterm.recv() => {} + } +} + +fn transport_name(t: crate::args::Transport) -> &'static str { + match t { + crate::args::Transport::Tcp => "tcp", + crate::args::Transport::Quic => "quic", + crate::args::Transport::Http => "http", + crate::args::Transport::WebSocket => "websocket", + } +} diff --git a/core/lab/src/safe_name.rs b/core/lab/src/safe_name.rs new file mode 100644 index 000000000..75cef378d --- /dev/null +++ b/core/lab/src/safe_name.rs @@ -0,0 +1,114 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use rand::distr::Alphanumeric; +use rand::{Rng, RngExt}; +use serde::{Deserialize, Serialize}; +use std::fmt; +use std::ops::Deref; + +const RANDOM_SUFFIX_LEN: usize = 8; + +/// Resource name that is guaranteed to start with a configurable prefix. +/// +/// This prevents chaos tests from accidentally operating on non-lab resources. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(transparent)] +pub struct SafeResourceName(String); + +impl SafeResourceName { + pub fn new(prefix: &str, name: &str) -> Self { + let full = if name.starts_with(prefix) { + name.to_owned() + } else { + format!("{prefix}{name}") + }; + Self(full) + } + + pub fn random(prefix: &str, rng: &mut impl Rng) -> Self { + let suffix: String = (0..RANDOM_SUFFIX_LEN) + .map(|_| rng.sample(Alphanumeric) as char) + .collect(); + Self(format!("{prefix}{suffix}")) + } + + pub fn has_prefix(&self, prefix: &str) -> bool { + self.0.starts_with(prefix) + } + + pub fn into_inner(self) -> String { + self.0 + } +} + +impl Deref for SafeResourceName { + type Target = str; + + fn deref(&self) -> &str { + &self.0 + } +} + +impl fmt::Display for SafeResourceName { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(&self.0) + } +} + +impl From<SafeResourceName> for String { + fn from(name: SafeResourceName) -> Self { + name.0 + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rand::SeedableRng; + use rand::rngs::StdRng; + + #[test] + fn new_prepends_prefix_when_missing() { + let name = SafeResourceName::new("lab-", "stream1"); + assert_eq!(&*name, "lab-stream1"); + } + + #[test] + fn new_preserves_existing_prefix() { + let name = SafeResourceName::new("lab-", "lab-stream1"); + assert_eq!(&*name, "lab-stream1"); + } + + #[test] + fn random_produces_prefixed_name() { + let mut rng = StdRng::seed_from_u64(42); + let name = SafeResourceName::random("lab-", &mut rng); + assert!(name.starts_with("lab-")); + assert_eq!(name.len(), 4 + RANDOM_SUFFIX_LEN); + } + + #[test] + fn random_is_deterministic_with_same_seed() { + let mut rng1 = StdRng::seed_from_u64(99); + let mut rng2 = StdRng::seed_from_u64(99); + let a = SafeResourceName::random("lab-", &mut rng1); + let b = SafeResourceName::random("lab-", &mut rng2); + assert_eq!(a, b); + } +} diff --git a/core/lab/src/scenarios/concurrent_crud.rs b/core/lab/src/scenarios/concurrent_crud.rs new file mode 100644 index 000000000..ab634710b --- /dev/null +++ b/core/lab/src/scenarios/concurrent_crud.rs @@ -0,0 +1,50 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use super::Scenario; +use crate::ops::OpKind; + +pub struct ConcurrentCrud; + +impl Scenario for ConcurrentCrud { + fn name(&self) -> &'static str { + "concurrent-crud" + } + + fn describe(&self) -> &'static str { + "Concurrent stream and topic CRUD operations.\n\n\ + Phase 1: Setup — creates 5 streams with 2 topics each.\n\ + Phase 2: Chaos — N workers run weighted create/delete/purge ops.\n\ + Phase 3: Verify — post-run invariant checks.\n\n\ + Focuses on detecting race conditions in metadata operations \ + (create-while-deleting, purge-while-creating, etc.)." + } + + fn op_weights(&self) -> Vec<(OpKind, f64)> { + vec![ + (OpKind::CreateStream, 20.0), + (OpKind::DeleteStream, 15.0), + (OpKind::PurgeStream, 10.0), + (OpKind::CreateTopic, 20.0), + (OpKind::DeleteTopic, 15.0), + (OpKind::PurgeTopic, 10.0), + (OpKind::SendMessages, 5.0), + (OpKind::PollMessages, 5.0), + ] + } +} diff --git a/core/lab/src/scenarios/mixed_workload.rs b/core/lab/src/scenarios/mixed_workload.rs new file mode 100644 index 000000000..073bbe44a --- /dev/null +++ b/core/lab/src/scenarios/mixed_workload.rs @@ -0,0 +1,57 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use super::Scenario; +use crate::ops::OpKind; + +pub struct MixedWorkload; + +impl Scenario for MixedWorkload { + fn name(&self) -> &'static str { + "mixed-workload" + } + + fn describe(&self) -> &'static str { + "Full chaos: all operation types interleaved.\n\n\ + All workers independently generate a weighted mix of every operation type: \ + create/delete streams and topics, send/poll messages, purge, partition \ + management, consumer groups, and consumer offsets.\n\n\ + Default weights: SendMessages 40%, PollMessages 25%, \ + CreateStream/Topic 10%, DeleteStream/Topic 8%, \ + Purge/Segments 7%, ConsumerGroup 5%, Offsets 5%." + } + + fn op_weights(&self) -> Vec<(OpKind, f64)> { + vec![ + (OpKind::SendMessages, 40.0), + (OpKind::PollMessages, 25.0), + (OpKind::CreateStream, 5.0), + (OpKind::CreateTopic, 5.0), + (OpKind::DeleteStream, 4.0), + (OpKind::DeleteTopic, 4.0), + (OpKind::PurgeStream, 2.0), + (OpKind::PurgeTopic, 2.0), + (OpKind::DeleteSegments, 3.0), + (OpKind::CreatePartitions, 2.0), + (OpKind::DeletePartitions, 1.0), + (OpKind::CreateConsumerGroup, 3.0), + (OpKind::DeleteConsumerGroup, 2.0), + (OpKind::StoreConsumerOffset, 2.0), + ] + } +} diff --git a/core/lab/src/scenarios/mod.rs b/core/lab/src/scenarios/mod.rs new file mode 100644 index 000000000..8f6e52990 --- /dev/null +++ b/core/lab/src/scenarios/mod.rs @@ -0,0 +1,55 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +pub mod concurrent_crud; +pub mod mixed_workload; +pub mod segment_rotation; + +use crate::args::ScenarioName; +use crate::ops::OpKind; + +pub trait Scenario: Send + Sync { + fn name(&self) -> &'static str; + fn describe(&self) -> &'static str; + fn op_weights(&self) -> Vec<(OpKind, f64)>; +} + +pub fn create_scenario(name: ScenarioName) -> Box<dyn Scenario> { + match name { + ScenarioName::ConcurrentCrud => Box::new(concurrent_crud::ConcurrentCrud), + ScenarioName::SegmentRotation => Box::new(segment_rotation::SegmentRotation), + ScenarioName::MixedWorkload => Box::new(mixed_workload::MixedWorkload), + } +} + +pub fn list_scenarios() -> Vec<(&'static str, &'static str)> { + vec![ + ( + concurrent_crud::ConcurrentCrud.name(), + concurrent_crud::ConcurrentCrud.describe(), + ), + ( + segment_rotation::SegmentRotation.name(), + segment_rotation::SegmentRotation.describe(), + ), + ( + mixed_workload::MixedWorkload.name(), + mixed_workload::MixedWorkload.describe(), + ), + ] +} diff --git a/core/lab/src/scenarios/segment_rotation.rs b/core/lab/src/scenarios/segment_rotation.rs new file mode 100644 index 000000000..cecbdecde --- /dev/null +++ b/core/lab/src/scenarios/segment_rotation.rs @@ -0,0 +1,46 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use super::Scenario; +use crate::ops::OpKind; + +pub struct SegmentRotation; + +impl Scenario for SegmentRotation { + fn name(&self) -> &'static str { + "segment-rotation" + } + + fn describe(&self) -> &'static str { + "Segment rotation stress test.\n\n\ + Phase 1: Setup — creates 1 stream with 1 topic and 1 partition.\n\ + Phase 2: Chaos — all workers send small messages to the same partition \ + while concurrently polling.\n\ + Phase 3: Verify — offset monotonicity, no gaps, watermark violations.\n\n\ + Targets segment rotation logic by producing high message throughput \ + to a single partition." + } + + fn op_weights(&self) -> Vec<(OpKind, f64)> { + vec![ + (OpKind::SendMessages, 50.0), + (OpKind::PollMessages, 40.0), + (OpKind::DeleteSegments, 10.0), + ] + } +} diff --git a/core/lab/src/shadow.rs b/core/lab/src/shadow.rs new file mode 100644 index 000000000..d13774bdb --- /dev/null +++ b/core/lab/src/shadow.rs @@ -0,0 +1,244 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use dashmap::DashMap; +use rand::{Rng, RngExt}; +use std::collections::{BTreeMap, BTreeSet, VecDeque}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::{Duration, Instant}; + +const TOMBSTONE_RETENTION: Duration = Duration::from_secs(10); + +pub struct ShadowState { + pub streams: BTreeMap<String, ShadowStream>, + /// Recently deleted resource names with deletion timestamp, for error classification. + recently_deleted: VecDeque<(String, Instant)>, + /// Recently purged (stream, topic) pairs — offsets reset after purge. + recently_purged: VecDeque<(String, Option<String>, Instant)>, +} + +pub struct ShadowStream { + pub topics: BTreeMap<String, ShadowTopic>, +} + +pub struct ShadowTopic { + pub partitions: u32, + pub consumer_groups: BTreeSet<String>, +} + +#[allow(dead_code)] +impl ShadowState { + pub fn new() -> Self { + Self { + streams: BTreeMap::new(), + recently_deleted: VecDeque::new(), + recently_purged: VecDeque::new(), + } + } + + pub fn stream_exists(&self, name: &str) -> bool { + self.streams.contains_key(name) + } + + pub fn topic_exists(&self, stream: &str, topic: &str) -> bool { + self.streams + .get(stream) + .is_some_and(|s| s.topics.contains_key(topic)) + } + + pub fn get_topic(&self, stream: &str, topic: &str) -> Option<&ShadowTopic> { + self.streams.get(stream).and_then(|s| s.topics.get(topic)) + } + + pub fn random_stream(&self, rng: &mut impl Rng) -> Option<&str> { + if self.streams.is_empty() { + return None; + } + let idx = rng.random_range(0..self.streams.len()); + self.streams.keys().nth(idx).map(|s| s.as_str()) + } + + /// Returns (stream_name, topic_name) for a random topic that exists. + pub fn random_topic(&self, rng: &mut impl Rng) -> Option<(&str, &str)> { + let streams_with_topics: Vec<_> = self + .streams + .iter() + .filter(|(_, s)| !s.topics.is_empty()) + .collect(); + if streams_with_topics.is_empty() { + return None; + } + let (sname, stream) = streams_with_topics[rng.random_range(0..streams_with_topics.len())]; + let idx = rng.random_range(0..stream.topics.len()); + let tname = stream.topics.keys().nth(idx)?; + Some((sname.as_str(), tname.as_str())) + } + + /// Returns (stream_name, topic_name, partition_count) for a random topic. + pub fn random_topic_with_partitions(&self, rng: &mut impl Rng) -> Option<(&str, &str, u32)> { + let (s, t) = self.random_topic(rng)?; + let partitions = self.streams[s].topics[t].partitions; + Some((s, t, partitions)) + } + + pub fn apply_create_stream(&mut self, name: String) { + self.streams.insert( + name, + ShadowStream { + topics: BTreeMap::new(), + }, + ); + } + + pub fn apply_delete_stream(&mut self, name: &str) { + if self.streams.remove(name).is_some() { + self.recently_deleted + .push_back((name.to_owned(), Instant::now())); + } + } + + pub fn apply_create_topic(&mut self, stream: &str, name: String, partitions: u32) { + if let Some(s) = self.streams.get_mut(stream) { + s.topics.insert( + name, + ShadowTopic { + partitions, + consumer_groups: BTreeSet::new(), + }, + ); + } + } + + pub fn apply_delete_topic(&mut self, stream: &str, topic: &str) { + if let Some(s) = self.streams.get_mut(stream) + && s.topics.remove(topic).is_some() + { + let key = format!("{stream}/{topic}"); + self.recently_deleted.push_back((key, Instant::now())); + } + } + + pub fn apply_create_partitions(&mut self, stream: &str, topic: &str, count: u32) { + if let Some(t) = self + .streams + .get_mut(stream) + .and_then(|s| s.topics.get_mut(topic)) + { + t.partitions += count; + } + } + + pub fn apply_delete_partitions(&mut self, stream: &str, topic: &str, count: u32) { + if let Some(t) = self + .streams + .get_mut(stream) + .and_then(|s| s.topics.get_mut(topic)) + { + t.partitions = t.partitions.saturating_sub(count); + } + } + + pub fn apply_create_consumer_group(&mut self, stream: &str, topic: &str, name: String) { + if let Some(t) = self + .streams + .get_mut(stream) + .and_then(|s| s.topics.get_mut(topic)) + { + t.consumer_groups.insert(name); + } + } + + pub fn apply_delete_consumer_group(&mut self, stream: &str, topic: &str, name: &str) { + if let Some(t) = self + .streams + .get_mut(stream) + .and_then(|s| s.topics.get_mut(topic)) + { + t.consumer_groups.remove(name); + } + } + + pub fn was_recently_deleted(&self, name: &str) -> bool { + self.recently_deleted.iter().any(|(n, _)| n == name) + } + + /// Record that a stream or topic was purged (offsets reset). + /// `topic` is `None` for stream-level purge (all topics affected). + pub fn record_purge(&mut self, stream: String, topic: Option<String>) { + self.recently_purged + .push_back((stream, topic, Instant::now())); + } + + /// Check whether offsets for this partition may have been reset by a recent purge. + pub fn was_recently_purged(&self, stream: &str, topic: &str) -> bool { + self.recently_purged + .iter() + .any(|(s, t, _)| s == stream && (t.is_none() || t.as_deref() == Some(topic))) + } + + pub fn prune_tombstones(&mut self) { + let cutoff = Instant::now() - TOMBSTONE_RETENTION; + while self + .recently_deleted + .front() + .is_some_and(|(_, ts)| *ts < cutoff) + { + self.recently_deleted.pop_front(); + } + while self + .recently_purged + .front() + .is_some_and(|(_, _, ts)| *ts < cutoff) + { + self.recently_purged.pop_front(); + } + } +} + +/// Per-partition watermarks tracking the highest offset successfully sent, +/// used to detect transient visibility violations during polls. +pub struct Watermarks { + inner: DashMap<PartitionKey, AtomicU64>, +} + +type PartitionKey = (String, String, u32); + +#[allow(dead_code)] +impl Watermarks { + pub fn new() -> Self { + Self { + inner: DashMap::new(), + } + } + + pub fn record_send(&self, stream: &str, topic: &str, partition: u32, offset: u64) { + let key = (stream.to_owned(), topic.to_owned(), partition); + self.inner + .entry(key) + .and_modify(|v| { + v.fetch_max(offset, Ordering::Relaxed); + }) + .or_insert(AtomicU64::new(offset)); + } + + /// Returns the highest recorded send offset for a partition. + pub fn get_watermark(&self, stream: &str, topic: &str, partition: u32) -> Option<u64> { + let key = (stream.to_owned(), topic.to_owned(), partition); + self.inner.get(&key).map(|v| v.load(Ordering::Relaxed)) + } +} diff --git a/core/lab/src/trace.rs b/core/lab/src/trace.rs new file mode 100644 index 000000000..f5ca17f8b --- /dev/null +++ b/core/lab/src/trace.rs @@ -0,0 +1,129 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::ops::{Op, OpOutcome}; +use serde::Serialize; +use std::fs::File; +use std::io::{self, BufWriter, Write}; +use std::path::Path; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; + +const FLUSH_INTERVAL_ENTRIES: usize = 64; +const FLUSH_INTERVAL: Duration = Duration::from_millis(100); + +pub struct WorkerTraceWriter { + writer: BufWriter<File>, + worker_id: u32, + pending: usize, + last_flush: Instant, +} + +#[derive(Serialize)] +struct TraceEntry<'a> { + seq: u64, + ts_us: u128, + worker: u32, + phase: &'static str, + #[serde(flatten)] + data: TraceData<'a>, +} + +#[derive(Serialize)] +#[serde(untagged)] +enum TraceData<'a> { + Intent { + op: &'a Op, + }, + Outcome { + result: &'static str, + latency_us: u64, + detail: Option<&'a str>, + }, +} + +impl WorkerTraceWriter { + pub fn new(output_dir: &Path, worker_id: u32) -> io::Result<Self> { + let path = output_dir.join(format!("trace-worker-{worker_id}.jsonl")); + let file = File::create(path)?; + Ok(Self { + writer: BufWriter::new(file), + worker_id, + pending: 0, + last_flush: Instant::now(), + }) + } + + pub fn write_intent(&mut self, seq: u64, op: &Op) { + let entry = TraceEntry { + seq, + ts_us: now_micros(), + worker: self.worker_id, + phase: "intent", + data: TraceData::Intent { op }, + }; + self.write_entry(&entry); + } + + pub fn write_outcome(&mut self, seq: u64, outcome: &OpOutcome, latency: Duration) { + let detail = match outcome { + OpOutcome::Success { detail } => detail.as_deref(), + OpOutcome::ExpectedError { reason, .. } => Some(reason.as_str()), + OpOutcome::UnexpectedError { context, .. } => Some(context.as_str()), + }; + let entry = TraceEntry { + seq, + ts_us: now_micros(), + worker: self.worker_id, + phase: "outcome", + data: TraceData::Outcome { + result: outcome.result_tag(), + latency_us: latency.as_micros() as u64, + detail, + }, + }; + self.write_entry(&entry); + } + + fn write_entry<T: Serialize>(&mut self, entry: &T) { + // Serialization to a BufWriter; I/O errors here are non-fatal for chaos testing + if serde_json::to_writer(&mut self.writer, entry).is_ok() { + let _ = self.writer.write_all(b"\n"); + self.pending += 1; + self.maybe_flush(); + } + } + + fn maybe_flush(&mut self) { + if self.pending >= FLUSH_INTERVAL_ENTRIES || self.last_flush.elapsed() >= FLUSH_INTERVAL { + self.flush(); + } + } + + pub fn flush(&mut self) { + let _ = self.writer.flush(); + self.pending = 0; + self.last_flush = Instant::now(); + } +} + +fn now_micros() -> u128 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_micros() +} diff --git a/core/lab/src/worker.rs b/core/lab/src/worker.rs new file mode 100644 index 000000000..c758323e3 --- /dev/null +++ b/core/lab/src/worker.rs @@ -0,0 +1,244 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::invariants::{self, InvariantViolation, PartitionKey}; +use crate::ops::{ErrorClass, Op, OpKind, OpOutcome}; +use crate::shadow::{ShadowState, Watermarks}; +use crate::trace::WorkerTraceWriter; +use iggy::prelude::IggyClient; +use rand::distr::weighted::WeightedIndex; +use rand::prelude::Distribution; +use rand::rngs::StdRng; +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::time::Instant; +use tokio::sync::RwLock; +use tracing::{debug, error, warn}; + +pub struct WorkerConfig { + pub msg_size: u32, + pub messages_per_batch: u32, + pub prefix: String, + pub fail_fast: bool, + pub ops_target: Option<u64>, +} + +pub struct WorkerInit { + pub id: u32, + pub client: IggyClient, + pub rng: StdRng, + pub shared_state: Arc<RwLock<ShadowState>>, + pub watermarks: Arc<Watermarks>, + pub trace_writer: Option<WorkerTraceWriter>, + pub seq_counter: Arc<AtomicU64>, + pub ops_counter: Arc<AtomicU64>, +} + +struct Worker { + id: u32, + client: IggyClient, + rng: StdRng, + shared_state: Arc<RwLock<ShadowState>>, + watermarks: Arc<Watermarks>, + trace_writer: Option<WorkerTraceWriter>, + op_kinds: Vec<OpKind>, + weighted_dist: WeightedIndex<f64>, + config: WorkerConfig, + seq_counter: Arc<AtomicU64>, + ops_counter: Arc<AtomicU64>, +} + +pub enum WorkerResult { + Ok { ops_executed: u64 }, + ServerBug(InvariantViolation), +} + +impl Worker { + fn new(init: WorkerInit, op_weights: &[(OpKind, f64)], config: WorkerConfig) -> Self { + let op_kinds: Vec<OpKind> = op_weights.iter().map(|(k, _)| *k).collect(); + let weights: Vec<f64> = op_weights.iter().map(|(_, w)| *w).collect(); + let weighted_dist = WeightedIndex::new(&weights).unwrap(); + + Self { + id: init.id, + client: init.client, + rng: init.rng, + shared_state: init.shared_state, + watermarks: init.watermarks, + trace_writer: init.trace_writer, + op_kinds, + weighted_dist, + config, + seq_counter: init.seq_counter, + ops_counter: init.ops_counter, + } + } + + async fn run(mut self, stop: Arc<AtomicBool>) -> WorkerResult { + let mut last_offsets: HashMap<PartitionKey, u64> = HashMap::new(); + let mut ops_executed: u64 = 0; + + while !stop.load(Ordering::Relaxed) && !self.ops_limit_reached() { + let kind = self.pick_op_kind(); + + let op = { + let state = self.shared_state.read().await; + Op::generate( + kind, + &state, + &mut self.rng, + &self.config.prefix, + self.config.messages_per_batch, + ) + }; + + let Some(op) = op else { + continue; + }; + + let seq = self.seq_counter.fetch_add(1, Ordering::Relaxed); + if let Some(tw) = &mut self.trace_writer { + tw.write_intent(seq, &op); + } + + let start = Instant::now(); + let outcome = op + .execute(&self.client, self.config.msg_size, &mut self.rng) + .await; + let latency = start.elapsed(); + + if let Some(tw) = &mut self.trace_writer { + tw.write_outcome(seq, &outcome, latency); + } + + debug!( + worker = self.id, + op = op.kind_tag(), + result = outcome.result_tag(), + latency_us = latency.as_micros() as u64, + ); + + // Update shadow state first (records purge events for invariant checks) + { + let mut state = self.shared_state.write().await; + op.update_shadow(&mut state, &outcome); + } + + // Invariant checks (after shadow update so purge records are visible) + { + let state = self.shared_state.read().await; + if let Err(violation) = + invariants::check_offset_monotonicity(&mut last_offsets, &op, &outcome, &state) + { + error!(worker = self.id, violation = ?violation, "Offset monotonicity violated"); + if self.config.fail_fast { + return WorkerResult::ServerBug(violation); + } + } + } + if let Err(violation) = invariants::check_watermark(&self.watermarks, &op, &outcome) { + warn!(worker = self.id, violation = ?violation, "Watermark violation (transient?)"); + } + + // Classify unexpected errors + if outcome.is_unexpected() { + let state = self.shared_state.read().await; + if let OpOutcome::UnexpectedError { ref error, .. } = outcome { + // Try to parse back to IggyError for classification + // Since we only have the string, we classify based on shadow state context + let class = + if state.was_recently_deleted(op.stream_name_ref().unwrap_or_default()) { + ErrorClass::ExpectedConcurrent + } else { + ErrorClass::ServerBug + }; + + match class { + ErrorClass::ExpectedConcurrent => { + debug!(worker = self.id, "Downgraded to expected: {error}"); + } + ErrorClass::ServerBug => { + error!(worker = self.id, "Server bug: {error}"); + if self.config.fail_fast { + return WorkerResult::ServerBug(InvariantViolation { + kind: "server_bug", + description: error.clone(), + context: format!("op={:?}", op), + }); + } + } + ErrorClass::Transient => { + warn!(worker = self.id, "Transient error: {error}"); + } + } + } + } + + ops_executed += 1; + self.ops_counter.fetch_add(1, Ordering::Relaxed); + } + + if let Some(tw) = &mut self.trace_writer { + tw.flush(); + } + + WorkerResult::Ok { ops_executed } + } + + fn ops_limit_reached(&self) -> bool { + self.config + .ops_target + .is_some_and(|target| self.ops_counter.load(Ordering::Relaxed) >= target) + } + + fn pick_op_kind(&mut self) -> OpKind { + let idx = self.weighted_dist.sample(&mut self.rng); + self.op_kinds[idx] + } +} + +pub async fn run_worker( + init: WorkerInit, + op_weights: &[(OpKind, f64)], + config: WorkerConfig, + stop: Arc<AtomicBool>, +) -> WorkerResult { + Worker::new(init, op_weights, config).run(stop).await +} + +impl Op { + fn stream_name_ref(&self) -> Option<&str> { + match self { + Op::CreateStream { name } | Op::DeleteStream { name } | Op::PurgeStream { name } => { + Some(name) + } + Op::CreateTopic { stream, .. } + | Op::DeleteTopic { stream, .. } + | Op::PurgeTopic { stream, .. } + | Op::SendMessages { stream, .. } + | Op::PollMessages { stream, .. } + | Op::DeleteSegments { stream, .. } + | Op::CreatePartitions { stream, .. } + | Op::DeletePartitions { stream, .. } + | Op::CreateConsumerGroup { stream, .. } + | Op::DeleteConsumerGroup { stream, .. } + | Op::StoreConsumerOffset { stream, .. } => Some(stream), + } + } +}
