martin-g commented on code in PR #1802: URL: https://github.com/apache/datafusion-ballista/pull/1802#discussion_r3341315746
########## ballista/core/src/execution_plans/chaos_exec.rs: ########## @@ -0,0 +1,316 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// ChaosExec is a physical execution plan node for robustness/chaos testing. +// It wraps a single child node, preserving its schema and partitioning, and +// randomly injects faults according to a configurable failure_probability +// (in [0.0, 1.0]) and fault_type: +// +// "transient" — returns a recoverable IoError on the first batch +// "fatal" — returns a non-recoverable Execution error on the first batch +// "panic" — panics on the first batch +// "delay" — sleeps 1 ms before every batch +// "delay:N" — sleeps N ms before every batch +// +// ChaosExec is inserted into query plans by physical optimizer rule, which +// probabilistically wraps leaf nodes. + +use datafusion::common::{DataFusionError, Result, Statistics, internal_err}; +use datafusion::config::ConfigOptions; +use datafusion::execution::TaskContext; +use datafusion::physical_plan::execution_plan::CardinalityEffect; +use datafusion::physical_plan::metrics::MetricsSet; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, + SendableRecordBatchStream, +}; +use futures::StreamExt; +use rand::rngs::StdRng; +use rand::{RngExt, SeedableRng}; +use std::any::Any; +use std::sync::Arc; + +/// Physical execution plan node that randomly injects failures for chaos/robustness testing. +#[derive(Debug, Clone)] +pub struct ChaosExec { + input: Arc<dyn ExecutionPlan>, + // a probability this node will fail when run + failure_probability: f64, + // controls what kind of fault is injected: "transient", "fatal", "panic", or "delay" + fault_type: String, + // optional RNG seed; None means thread-local non-deterministic RNG + seed: Option<u64>, +} + +impl ChaosExec { + /// Creates a new `ChaosExec` wrapping `input`. + /// + /// `failure_probability` must be in `[0.0, 1.0]`. + /// `fault_type` must be one of `"transient"`, `"fatal"`, `"panic"`, `"delay"`, or `"delay:N"` + /// where N is a positive integer number of milliseconds. + pub fn new( + input: Arc<dyn ExecutionPlan>, + failure_probability: f64, + fault_type: &str, + seed: Option<u64>, + ) -> Result<Self> { + if !(0.0..=1.0).contains(&failure_probability) { + return internal_err!( + "ChaosExec failure_probability must be in [0.0, 1.0], got {failure_probability}" + ); + } + match fault_type { + "transient" | "fatal" | "panic" | "delay" => {} + other if other.starts_with("delay:") => { + let ms_str = &other["delay:".len()..]; + if ms_str.parse::<u64>().is_err() { + return internal_err!( + "ChaosExec delay suffix must be a positive integer (ms), got \"{ms_str}\"" + ); + } + } + other => { + return internal_err!( + "ChaosExec fault_type must be one of transient/fatal/panic/delay/delay:N, got {other}" + ); + } + } + + Ok(Self { + input, + failure_probability, + fault_type: fault_type.to_string(), + seed, + }) + } + + /// Returns the configured RNG seed, if any. + pub fn seed(&self) -> Option<u64> { + self.seed + } + + /// Returns the configured failure probability. + pub fn failure_probability(&self) -> f64 { + self.failure_probability + } + + /// Returns the configured fault type. + pub fn fault_type(&self) -> &str { + &self.fault_type + } +} + +impl ExecutionPlan for ChaosExec { + fn name(&self) -> &str { + "ChaosExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &Arc<PlanProperties> { + self.input.properties() + } + + fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { + vec![&self.input] + } + + fn with_new_children( + self: Arc<Self>, + mut children: Vec<Arc<dyn ExecutionPlan>>, + ) -> Result<Arc<dyn ExecutionPlan>> { + if children.len() != 1 { + return internal_err!("ChaosExec expected one child, got {}", children.len()); + } + let new_input = children.pop().unwrap(); + Ok(Arc::new(Self::new( + new_input, + self.failure_probability, + &self.fault_type, + self.seed, + )?)) + } + + fn execute( + &self, + partition: usize, + context: Arc<TaskContext>, + ) -> Result<SendableRecordBatchStream> { + let input_stream = self.input.execute(partition, context)?; + let failure_probability = self.failure_probability; + let fault_type = self.fault_type.clone(); + let schema = self.input.schema(); + + // Decide once per partition execution whether to inject a fault on the first batch. + // With a seed, mix in the partition index so each partition has an independent but + // deterministic sequence; without a seed, fall back to the thread-local RNG. + let should_fail = if let Some(s) = self.seed { + let mut rng = StdRng::seed_from_u64(s.wrapping_add(partition as u64)); + rng.random::<f64>() < failure_probability + } else { + rand::random::<f64>() < failure_probability + }; + + // Wrap the child stream. For error/panic modes, inject on the first batch (idx == 0) + // to mirror how real IO failures surface in production. For "delay", sleep every batch. + let wrapped = input_stream.enumerate().map(move |(idx, batch_result)| { + match fault_type.to_lowercase().as_str() { Review Comment: ```suggestion match fault_type.as_str() { ``` ########## ballista/core/src/execution_plans/mod.rs: ########## @@ -18,6 +18,7 @@ //! This module contains execution plans that are needed to distribute DataFusion's execution plans into //! several Ballista executors. +mod chaos_exec; Review Comment: Should the chaos_exec be feature gated ? There is no need to be in production builds. ########## ballista/core/src/execution_plans/chaos_exec.rs: ########## @@ -0,0 +1,316 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// ChaosExec is a physical execution plan node for robustness/chaos testing. +// It wraps a single child node, preserving its schema and partitioning, and +// randomly injects faults according to a configurable failure_probability +// (in [0.0, 1.0]) and fault_type: +// +// "transient" — returns a recoverable IoError on the first batch +// "fatal" — returns a non-recoverable Execution error on the first batch +// "panic" — panics on the first batch +// "delay" — sleeps 1 ms before every batch +// "delay:N" — sleeps N ms before every batch +// +// ChaosExec is inserted into query plans by physical optimizer rule, which +// probabilistically wraps leaf nodes. + +use datafusion::common::{DataFusionError, Result, Statistics, internal_err}; +use datafusion::config::ConfigOptions; +use datafusion::execution::TaskContext; +use datafusion::physical_plan::execution_plan::CardinalityEffect; +use datafusion::physical_plan::metrics::MetricsSet; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, + SendableRecordBatchStream, +}; +use futures::StreamExt; +use rand::rngs::StdRng; +use rand::{RngExt, SeedableRng}; +use std::any::Any; +use std::sync::Arc; + +/// Physical execution plan node that randomly injects failures for chaos/robustness testing. +#[derive(Debug, Clone)] +pub struct ChaosExec { + input: Arc<dyn ExecutionPlan>, + // a probability this node will fail when run + failure_probability: f64, + // controls what kind of fault is injected: "transient", "fatal", "panic", or "delay" + fault_type: String, + // optional RNG seed; None means thread-local non-deterministic RNG + seed: Option<u64>, +} + +impl ChaosExec { + /// Creates a new `ChaosExec` wrapping `input`. + /// + /// `failure_probability` must be in `[0.0, 1.0]`. + /// `fault_type` must be one of `"transient"`, `"fatal"`, `"panic"`, `"delay"`, or `"delay:N"` + /// where N is a positive integer number of milliseconds. + pub fn new( + input: Arc<dyn ExecutionPlan>, + failure_probability: f64, + fault_type: &str, + seed: Option<u64>, + ) -> Result<Self> { + if !(0.0..=1.0).contains(&failure_probability) { + return internal_err!( + "ChaosExec failure_probability must be in [0.0, 1.0], got {failure_probability}" + ); + } + match fault_type { + "transient" | "fatal" | "panic" | "delay" => {} + other if other.starts_with("delay:") => { + let ms_str = &other["delay:".len()..]; + if ms_str.parse::<u64>().is_err() { + return internal_err!( + "ChaosExec delay suffix must be a positive integer (ms), got \"{ms_str}\"" + ); + } + } + other => { + return internal_err!( + "ChaosExec fault_type must be one of transient/fatal/panic/delay/delay:N, got {other}" + ); + } + } + + Ok(Self { + input, + failure_probability, + fault_type: fault_type.to_string(), + seed, + }) + } + + /// Returns the configured RNG seed, if any. + pub fn seed(&self) -> Option<u64> { + self.seed + } + + /// Returns the configured failure probability. + pub fn failure_probability(&self) -> f64 { + self.failure_probability + } + + /// Returns the configured fault type. + pub fn fault_type(&self) -> &str { + &self.fault_type + } +} + +impl ExecutionPlan for ChaosExec { + fn name(&self) -> &str { + "ChaosExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &Arc<PlanProperties> { + self.input.properties() + } + + fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { + vec![&self.input] + } + + fn with_new_children( + self: Arc<Self>, + mut children: Vec<Arc<dyn ExecutionPlan>>, + ) -> Result<Arc<dyn ExecutionPlan>> { + if children.len() != 1 { + return internal_err!("ChaosExec expected one child, got {}", children.len()); + } + let new_input = children.pop().unwrap(); + Ok(Arc::new(Self::new( + new_input, + self.failure_probability, + &self.fault_type, + self.seed, + )?)) + } + + fn execute( + &self, + partition: usize, + context: Arc<TaskContext>, + ) -> Result<SendableRecordBatchStream> { + let input_stream = self.input.execute(partition, context)?; + let failure_probability = self.failure_probability; + let fault_type = self.fault_type.clone(); + let schema = self.input.schema(); + + // Decide once per partition execution whether to inject a fault on the first batch. + // With a seed, mix in the partition index so each partition has an independent but + // deterministic sequence; without a seed, fall back to the thread-local RNG. + let should_fail = if let Some(s) = self.seed { + let mut rng = StdRng::seed_from_u64(s.wrapping_add(partition as u64)); + rng.random::<f64>() < failure_probability + } else { + rand::random::<f64>() < failure_probability + }; + + // Wrap the child stream. For error/panic modes, inject on the first batch (idx == 0) + // to mirror how real IO failures surface in production. For "delay", sleep every batch. + let wrapped = input_stream.enumerate().map(move |(idx, batch_result)| { + match fault_type.to_lowercase().as_str() { + ft if ft.starts_with("delay") => { Review Comment: should `delay` depend on `failure_probability` ? i.e. ```suggestion ft if should_fail && ft.starts_with("delay") => { ``` ########## ballista/scheduler/src/state/aqe/optimizer_rule/chaos_exec.rs: ########## @@ -0,0 +1,202 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use ballista_core::config::BallistaConfig; +use ballista_core::execution_plans::ChaosExec; +use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; +use datafusion::config::ConfigOptions; +use datafusion::error::Result; +use datafusion::physical_optimizer::PhysicalOptimizerRule; +use datafusion::physical_plan::ExecutionPlan; +use rand::rngs::StdRng; +use rand::{RngExt, SeedableRng}; +use std::sync::Arc; + +/// Optimizer rule that randomly injects a single [`ChaosExec`] into the physical plan. +/// +/// Controlled by four Ballista config keys: +/// +/// - `ballista.testing.chaos_execution.enabled` (bool, default `false`) +/// - `ballista.testing.chaos_execution.probability` (f64 in 0.0–1.0, default `0.25`) +/// - `ballista.testing.chaos_execution.fault_type` (str, default `"transient"`) — one of `transient`, `fatal`, `panic`, `delay` +/// - `ballista.testing.chaos_execution.seed` (str, default `""`) — optional u64 seed for reproducible runs +/// +/// When enabled, the rule walks the plan in pre-order, picks a random node, and wraps it +/// with [`ChaosExec`]. Exactly one injection per `optimize` call. +/// +/// This optimizer rule is not idempotent and should not be called multiple times +/// on the same plan. +#[derive(Debug, Default)] +pub struct ChaosCreatingRule {} + +impl PhysicalOptimizerRule for ChaosCreatingRule { + fn optimize( + &self, + plan: Arc<dyn ExecutionPlan>, + config: &ConfigOptions, + ) -> Result<Arc<dyn ExecutionPlan>> { + let bc = config + .extensions + .get::<BallistaConfig>() + .cloned() + .unwrap_or_default(); + + if !bc.chaos_execution_enabled() { + return Ok(plan); + } + + let failure_probability = bc.chaos_execution_probability(); + let fault_type = bc.chaos_execution_fault_type(); + let seed = bc.chaos_execution_seed(); + + // Count total nodes (pre-order DFS). + let mut node_count = 0usize; + plan.apply(|_| { + node_count += 1; + Ok(TreeNodeRecursion::Continue) + })?; + + // Pick a uniformly random target. % 1 == 0 always, so single-node plans + // are deterministic (index 0), which makes tests predictable. + let target_idx = match seed { + Some(s) => (StdRng::seed_from_u64(s).random::<u64>() as usize) % node_count, + None => (rand::random::<u64>() as usize) % node_count, + }; + let mut current_idx = 0usize; + + // transform_down visits nodes in the same pre-order as apply, so + // target_idx addresses the same node in both passes. + let result = plan.transform_down(|node| { + let idx = current_idx; + current_idx += 1; + if idx == target_idx { + let chaos = + Arc::new(ChaosExec::new(node, failure_probability, &fault_type, seed)?) + as Arc<dyn ExecutionPlan>; + log::warn!("A ChaosExec node has been injected in your execution plan, making execution undeterministic"); + Ok(Transformed::yes(chaos)) + } else { + Ok(Transformed::no(node)) + } + })?; + + Ok(result.data) + } + + fn name(&self) -> &str { + "ChaosCreatingRule" + } + + fn schema_check(&self) -> bool { + false + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::assert_plan; + use ballista_core::config::BallistaConfig; + use ballista_core::execution_plans::ChaosExec; + use datafusion::arrow::datatypes::{DataType, Field, Schema}; + use datafusion::common::{ColumnStatistics, Statistics}; + use datafusion::config::{ConfigOptions, ExtensionOptions}; + use datafusion::physical_plan::ExecutionPlan; + use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; + use datafusion::physical_plan::test::exec::StatisticsExec; + use std::sync::Arc; + + fn leaf_exec() -> Arc<dyn ExecutionPlan> { + let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); + let stats = Statistics { + num_rows: Default::default(), + total_byte_size: Default::default(), + column_statistics: vec![ColumnStatistics::new_unknown()], + }; + Arc::new(StatisticsExec::new(stats, schema)) + } + + fn chaos_config(probability: f64) -> ConfigOptions { + let mut bc = BallistaConfig::default(); + bc.set("testing.chaos_execution.enabled", "true").unwrap(); Review Comment: ```suggestion bc.set("ballista.testing.chaos_execution.enabled", "true").unwrap(); ``` ########## ballista/core/src/execution_plans/chaos_exec.rs: ########## @@ -0,0 +1,316 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// ChaosExec is a physical execution plan node for robustness/chaos testing. +// It wraps a single child node, preserving its schema and partitioning, and +// randomly injects faults according to a configurable failure_probability +// (in [0.0, 1.0]) and fault_type: +// +// "transient" — returns a recoverable IoError on the first batch +// "fatal" — returns a non-recoverable Execution error on the first batch +// "panic" — panics on the first batch +// "delay" — sleeps 1 ms before every batch +// "delay:N" — sleeps N ms before every batch +// +// ChaosExec is inserted into query plans by physical optimizer rule, which +// probabilistically wraps leaf nodes. + +use datafusion::common::{DataFusionError, Result, Statistics, internal_err}; +use datafusion::config::ConfigOptions; +use datafusion::execution::TaskContext; +use datafusion::physical_plan::execution_plan::CardinalityEffect; +use datafusion::physical_plan::metrics::MetricsSet; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, + SendableRecordBatchStream, +}; +use futures::StreamExt; +use rand::rngs::StdRng; +use rand::{RngExt, SeedableRng}; +use std::any::Any; +use std::sync::Arc; + +/// Physical execution plan node that randomly injects failures for chaos/robustness testing. +#[derive(Debug, Clone)] +pub struct ChaosExec { + input: Arc<dyn ExecutionPlan>, + // a probability this node will fail when run + failure_probability: f64, + // controls what kind of fault is injected: "transient", "fatal", "panic", or "delay" + fault_type: String, + // optional RNG seed; None means thread-local non-deterministic RNG + seed: Option<u64>, +} + +impl ChaosExec { + /// Creates a new `ChaosExec` wrapping `input`. + /// + /// `failure_probability` must be in `[0.0, 1.0]`. + /// `fault_type` must be one of `"transient"`, `"fatal"`, `"panic"`, `"delay"`, or `"delay:N"` + /// where N is a positive integer number of milliseconds. + pub fn new( + input: Arc<dyn ExecutionPlan>, + failure_probability: f64, + fault_type: &str, + seed: Option<u64>, + ) -> Result<Self> { + if !(0.0..=1.0).contains(&failure_probability) { + return internal_err!( + "ChaosExec failure_probability must be in [0.0, 1.0], got {failure_probability}" + ); + } + match fault_type { + "transient" | "fatal" | "panic" | "delay" => {} + other if other.starts_with("delay:") => { + let ms_str = &other["delay:".len()..]; + if ms_str.parse::<u64>().is_err() { + return internal_err!( + "ChaosExec delay suffix must be a positive integer (ms), got \"{ms_str}\"" + ); + } + } + other => { + return internal_err!( + "ChaosExec fault_type must be one of transient/fatal/panic/delay/delay:N, got {other}" + ); + } + } + + Ok(Self { + input, + failure_probability, + fault_type: fault_type.to_string(), + seed, + }) + } + + /// Returns the configured RNG seed, if any. + pub fn seed(&self) -> Option<u64> { + self.seed + } + + /// Returns the configured failure probability. + pub fn failure_probability(&self) -> f64 { + self.failure_probability + } + + /// Returns the configured fault type. + pub fn fault_type(&self) -> &str { + &self.fault_type + } +} + +impl ExecutionPlan for ChaosExec { + fn name(&self) -> &str { + "ChaosExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &Arc<PlanProperties> { + self.input.properties() + } + + fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { + vec![&self.input] + } + + fn with_new_children( + self: Arc<Self>, + mut children: Vec<Arc<dyn ExecutionPlan>>, + ) -> Result<Arc<dyn ExecutionPlan>> { + if children.len() != 1 { + return internal_err!("ChaosExec expected one child, got {}", children.len()); + } + let new_input = children.pop().unwrap(); + Ok(Arc::new(Self::new( + new_input, + self.failure_probability, + &self.fault_type, + self.seed, + )?)) + } + + fn execute( + &self, + partition: usize, + context: Arc<TaskContext>, + ) -> Result<SendableRecordBatchStream> { + let input_stream = self.input.execute(partition, context)?; + let failure_probability = self.failure_probability; + let fault_type = self.fault_type.clone(); Review Comment: ```suggestion let fault_type = self.fault_type.to_lowercase(); ``` `to_lowercase()` allocates a new String for every batch_result. ########## ballista/core/src/config.rs: ########## @@ -233,6 +246,38 @@ static CONFIG_ENTRIES: LazyLock<HashMap<String, ConfigEntry>> = LazyLock::new(|| DataType::Boolean, Some(true.to_string()), ), + ConfigEntry::new( + BALLISTA_CHAOS_EXECUTION_ENABLED.to_string(), + "Enables chaos-monkey execution injection for robustness testing. \ + When true, ChaosMonkeyExec is inserted at a random point in the plan \ Review Comment: ```suggestion When true, ChaosExec is inserted at a random point in the plan \ ``` ########## ballista/core/src/serde/mod.rs: ########## @@ -532,6 +532,15 @@ impl PhysicalExtensionCodec for BallistaPhysicalExtensionCodec { }; Ok(Arc::new(exec)) } + PhysicalPlanType::ChaosExec(chaos_exec) => { + let input = inputs[0].clone(); Review Comment: ```suggestion let input = match inputs { [input] => input.clone(), _ => { return Err(DataFusionError::Internal(format!( "ChaosExec expects exactly 1 input, got {}", inputs.len() ))); } }; ``` ########## ballista/core/src/config.rs: ########## @@ -233,6 +246,38 @@ static CONFIG_ENTRIES: LazyLock<HashMap<String, ConfigEntry>> = LazyLock::new(|| DataType::Boolean, Some(true.to_string()), ), + ConfigEntry::new( + BALLISTA_CHAOS_EXECUTION_ENABLED.to_string(), + "Enables chaos-monkey execution injection for robustness testing. \ + When true, ChaosMonkeyExec is inserted at a random point in the plan \ + once per optimize call.".to_string(), + DataType::Boolean, + Some(false.to_string()), + ), + ConfigEntry::new( + BALLISTA_CHAOS_EXECUTION_PROBABILITY.to_string(), + "Failure probability (0.0–1.0) passed to ChaosMonkeyExec when \ Review Comment: ```suggestion "Failure probability (0.0–1.0) passed to ChaosExec when \ ``` ########## ballista/scheduler/src/state/aqe/optimizer_rule/chaos_exec.rs: ########## @@ -0,0 +1,202 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use ballista_core::config::BallistaConfig; +use ballista_core::execution_plans::ChaosExec; +use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; +use datafusion::config::ConfigOptions; +use datafusion::error::Result; +use datafusion::physical_optimizer::PhysicalOptimizerRule; +use datafusion::physical_plan::ExecutionPlan; +use rand::rngs::StdRng; +use rand::{RngExt, SeedableRng}; +use std::sync::Arc; + +/// Optimizer rule that randomly injects a single [`ChaosExec`] into the physical plan. +/// +/// Controlled by four Ballista config keys: +/// +/// - `ballista.testing.chaos_execution.enabled` (bool, default `false`) +/// - `ballista.testing.chaos_execution.probability` (f64 in 0.0–1.0, default `0.25`) +/// - `ballista.testing.chaos_execution.fault_type` (str, default `"transient"`) — one of `transient`, `fatal`, `panic`, `delay` +/// - `ballista.testing.chaos_execution.seed` (str, default `""`) — optional u64 seed for reproducible runs +/// +/// When enabled, the rule walks the plan in pre-order, picks a random node, and wraps it +/// with [`ChaosExec`]. Exactly one injection per `optimize` call. +/// +/// This optimizer rule is not idempotent and should not be called multiple times +/// on the same plan. +#[derive(Debug, Default)] +pub struct ChaosCreatingRule {} + +impl PhysicalOptimizerRule for ChaosCreatingRule { + fn optimize( + &self, + plan: Arc<dyn ExecutionPlan>, + config: &ConfigOptions, + ) -> Result<Arc<dyn ExecutionPlan>> { + let bc = config + .extensions + .get::<BallistaConfig>() + .cloned() + .unwrap_or_default(); + + if !bc.chaos_execution_enabled() { + return Ok(plan); + } + + let failure_probability = bc.chaos_execution_probability(); + let fault_type = bc.chaos_execution_fault_type(); + let seed = bc.chaos_execution_seed(); + + // Count total nodes (pre-order DFS). + let mut node_count = 0usize; + plan.apply(|_| { + node_count += 1; + Ok(TreeNodeRecursion::Continue) + })?; + + // Pick a uniformly random target. % 1 == 0 always, so single-node plans + // are deterministic (index 0), which makes tests predictable. + let target_idx = match seed { + Some(s) => (StdRng::seed_from_u64(s).random::<u64>() as usize) % node_count, + None => (rand::random::<u64>() as usize) % node_count, + }; + let mut current_idx = 0usize; + + // transform_down visits nodes in the same pre-order as apply, so + // target_idx addresses the same node in both passes. + let result = plan.transform_down(|node| { + let idx = current_idx; + current_idx += 1; + if idx == target_idx { + let chaos = + Arc::new(ChaosExec::new(node, failure_probability, &fault_type, seed)?) + as Arc<dyn ExecutionPlan>; + log::warn!("A ChaosExec node has been injected in your execution plan, making execution undeterministic"); Review Comment: ```suggestion log::warn!("A ChaosExec node has been injected in your execution plan, making execution non-deterministic"); ``` ########## ballista/scheduler/src/state/aqe/optimizer_rule/chaos_exec.rs: ########## @@ -0,0 +1,202 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use ballista_core::config::BallistaConfig; +use ballista_core::execution_plans::ChaosExec; +use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; +use datafusion::config::ConfigOptions; +use datafusion::error::Result; +use datafusion::physical_optimizer::PhysicalOptimizerRule; +use datafusion::physical_plan::ExecutionPlan; +use rand::rngs::StdRng; +use rand::{RngExt, SeedableRng}; +use std::sync::Arc; + +/// Optimizer rule that randomly injects a single [`ChaosExec`] into the physical plan. +/// +/// Controlled by four Ballista config keys: +/// +/// - `ballista.testing.chaos_execution.enabled` (bool, default `false`) +/// - `ballista.testing.chaos_execution.probability` (f64 in 0.0–1.0, default `0.25`) +/// - `ballista.testing.chaos_execution.fault_type` (str, default `"transient"`) — one of `transient`, `fatal`, `panic`, `delay` +/// - `ballista.testing.chaos_execution.seed` (str, default `""`) — optional u64 seed for reproducible runs +/// +/// When enabled, the rule walks the plan in pre-order, picks a random node, and wraps it +/// with [`ChaosExec`]. Exactly one injection per `optimize` call. +/// +/// This optimizer rule is not idempotent and should not be called multiple times +/// on the same plan. +#[derive(Debug, Default)] +pub struct ChaosCreatingRule {} + +impl PhysicalOptimizerRule for ChaosCreatingRule { + fn optimize( + &self, + plan: Arc<dyn ExecutionPlan>, + config: &ConfigOptions, + ) -> Result<Arc<dyn ExecutionPlan>> { + let bc = config + .extensions + .get::<BallistaConfig>() + .cloned() + .unwrap_or_default(); + + if !bc.chaos_execution_enabled() { + return Ok(plan); + } + + let failure_probability = bc.chaos_execution_probability(); + let fault_type = bc.chaos_execution_fault_type(); + let seed = bc.chaos_execution_seed(); + + // Count total nodes (pre-order DFS). + let mut node_count = 0usize; + plan.apply(|_| { + node_count += 1; + Ok(TreeNodeRecursion::Continue) + })?; + + // Pick a uniformly random target. % 1 == 0 always, so single-node plans + // are deterministic (index 0), which makes tests predictable. + let target_idx = match seed { + Some(s) => (StdRng::seed_from_u64(s).random::<u64>() as usize) % node_count, + None => (rand::random::<u64>() as usize) % node_count, + }; + let mut current_idx = 0usize; + + // transform_down visits nodes in the same pre-order as apply, so + // target_idx addresses the same node in both passes. + let result = plan.transform_down(|node| { + let idx = current_idx; + current_idx += 1; + if idx == target_idx { + let chaos = + Arc::new(ChaosExec::new(node, failure_probability, &fault_type, seed)?) + as Arc<dyn ExecutionPlan>; + log::warn!("A ChaosExec node has been injected in your execution plan, making execution undeterministic"); + Ok(Transformed::yes(chaos)) + } else { + Ok(Transformed::no(node)) + } + })?; + + Ok(result.data) + } + + fn name(&self) -> &str { + "ChaosCreatingRule" + } + + fn schema_check(&self) -> bool { + false + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::assert_plan; + use ballista_core::config::BallistaConfig; + use ballista_core::execution_plans::ChaosExec; + use datafusion::arrow::datatypes::{DataType, Field, Schema}; + use datafusion::common::{ColumnStatistics, Statistics}; + use datafusion::config::{ConfigOptions, ExtensionOptions}; + use datafusion::physical_plan::ExecutionPlan; + use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; + use datafusion::physical_plan::test::exec::StatisticsExec; + use std::sync::Arc; + + fn leaf_exec() -> Arc<dyn ExecutionPlan> { + let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); + let stats = Statistics { + num_rows: Default::default(), + total_byte_size: Default::default(), + column_statistics: vec![ColumnStatistics::new_unknown()], + }; + Arc::new(StatisticsExec::new(stats, schema)) + } + + fn chaos_config(probability: f64) -> ConfigOptions { + let mut bc = BallistaConfig::default(); + bc.set("testing.chaos_execution.enabled", "true").unwrap(); + bc.set( + "testing.chaos_execution.probability", Review Comment: ```suggestion "ballista.testing.chaos_execution.probability", ``` ########## ballista/core/src/config.rs: ########## @@ -468,6 +513,28 @@ impl BallistaConfig { self.get_bool_setting(BALLISTA_ADAPTIVE_JOIN_ENABLED) } + /// Returns whether chaos-monkey execution injection is enabled. + pub fn chaos_execution_enabled(&self) -> bool { + self.get_bool_setting(BALLISTA_CHAOS_EXECUTION_ENABLED) + } + + /// Returns the failure probability for chaos-monkey execution (0.0–1.0). + pub fn chaos_execution_probability(&self) -> f64 { + self.get_float_setting(BALLISTA_CHAOS_EXECUTION_PROBABILITY) + } + + /// Returns the fault type injected by chaos-monkey execution (default `"transient"`). + pub fn chaos_execution_fault_type(&self) -> String { + self.get_string_setting(BALLISTA_CHAOS_EXECUTION_FAULT_TYPE) + } + + /// Returns the optional RNG seed for chaos-monkey execution. + /// `None` means non-deterministic (thread-local RNG); `Some(n)` gives reproducible runs. + pub fn chaos_execution_seed(&self) -> Option<u64> { + let s = self.get_string_setting(BALLISTA_CHAOS_EXECUTION_SEED); + if s.is_empty() { None } else { s.parse().ok() } Review Comment: invalid values are silently ignored -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
