milenkovicm commented on code in PR #1802:
URL: 
https://github.com/apache/datafusion-ballista/pull/1802#discussion_r3341864936


##########
ballista/core/src/execution_plans/chaos_exec.rs:
##########
@@ -0,0 +1,316 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// ChaosExec is a physical execution plan node for robustness/chaos testing.
+// It wraps a single child node, preserving its schema and partitioning, and
+// randomly injects faults according to a configurable failure_probability
+// (in [0.0, 1.0]) and fault_type:
+//
+//   "transient"  — returns a recoverable IoError on the first batch
+//   "fatal"      — returns a non-recoverable Execution error on the first 
batch
+//   "panic"      — panics on the first batch
+//   "delay"      — sleeps 1 ms before every batch
+//   "delay:N"    — sleeps N ms before every batch
+//
+// ChaosExec is inserted into query plans by physical optimizer rule, which
+// probabilistically wraps leaf nodes.
+
+use datafusion::common::{DataFusionError, Result, Statistics, internal_err};
+use datafusion::config::ConfigOptions;
+use datafusion::execution::TaskContext;
+use datafusion::physical_plan::execution_plan::CardinalityEffect;
+use datafusion::physical_plan::metrics::MetricsSet;
+use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
+use datafusion::physical_plan::{
+    DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
+    SendableRecordBatchStream,
+};
+use futures::StreamExt;
+use rand::rngs::StdRng;
+use rand::{RngExt, SeedableRng};
+use std::any::Any;
+use std::sync::Arc;
+
+/// Physical execution plan node that randomly injects failures for 
chaos/robustness testing.
+#[derive(Debug, Clone)]
+pub struct ChaosExec {
+    input: Arc<dyn ExecutionPlan>,
+    // a probability this node will fail when run
+    failure_probability: f64,
+    // controls what kind of fault is injected: "transient", "fatal", "panic", 
or "delay"
+    fault_type: String,
+    // optional RNG seed; None means thread-local non-deterministic RNG
+    seed: Option<u64>,
+}
+
+impl ChaosExec {
+    /// Creates a new `ChaosExec` wrapping `input`.
+    ///
+    /// `failure_probability` must be in `[0.0, 1.0]`.
+    /// `fault_type` must be one of `"transient"`, `"fatal"`, `"panic"`, 
`"delay"`, or `"delay:N"`
+    /// where N is a positive integer number of milliseconds.
+    pub fn new(
+        input: Arc<dyn ExecutionPlan>,
+        failure_probability: f64,
+        fault_type: &str,
+        seed: Option<u64>,
+    ) -> Result<Self> {
+        if !(0.0..=1.0).contains(&failure_probability) {
+            return internal_err!(
+                "ChaosExec failure_probability must be in [0.0, 1.0], got 
{failure_probability}"
+            );
+        }
+        match fault_type {
+            "transient" | "fatal" | "panic" | "delay" => {}
+            other if other.starts_with("delay:") => {
+                let ms_str = &other["delay:".len()..];
+                if ms_str.parse::<u64>().is_err() {
+                    return internal_err!(
+                        "ChaosExec delay suffix must be a positive integer 
(ms), got \"{ms_str}\""
+                    );
+                }
+            }
+            other => {
+                return internal_err!(
+                    "ChaosExec fault_type must be one of 
transient/fatal/panic/delay/delay:N, got {other}"
+                );
+            }
+        }
+
+        Ok(Self {
+            input,
+            failure_probability,
+            fault_type: fault_type.to_string(),
+            seed,
+        })
+    }
+
+    /// Returns the configured RNG seed, if any.
+    pub fn seed(&self) -> Option<u64> {
+        self.seed
+    }
+
+    /// Returns the configured failure probability.
+    pub fn failure_probability(&self) -> f64 {
+        self.failure_probability
+    }
+
+    /// Returns the configured fault type.
+    pub fn fault_type(&self) -> &str {
+        &self.fault_type
+    }
+}
+
+impl ExecutionPlan for ChaosExec {
+    fn name(&self) -> &str {
+        "ChaosExec"
+    }
+
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn properties(&self) -> &Arc<PlanProperties> {
+        self.input.properties()
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![&self.input]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        mut children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        if children.len() != 1 {
+            return internal_err!("ChaosExec expected one child, got {}", 
children.len());
+        }
+        let new_input = children.pop().unwrap();
+        Ok(Arc::new(Self::new(
+            new_input,
+            self.failure_probability,
+            &self.fault_type,
+            self.seed,
+        )?))
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        let input_stream = self.input.execute(partition, context)?;
+        let failure_probability = self.failure_probability;
+        let fault_type = self.fault_type.clone();
+        let schema = self.input.schema();
+
+        // Decide once per partition execution whether to inject a fault on 
the first batch.
+        // With a seed, mix in the partition index so each partition has an 
independent but
+        // deterministic sequence; without a seed, fall back to the 
thread-local RNG.
+        let should_fail = if let Some(s) = self.seed {
+            let mut rng = StdRng::seed_from_u64(s.wrapping_add(partition as 
u64));
+            rng.random::<f64>() < failure_probability
+        } else {
+            rand::random::<f64>() < failure_probability
+        };
+
+        // Wrap the child stream. For error/panic modes, inject on the first 
batch (idx == 0)
+        // to mirror how real IO failures surface in production. For "delay", 
sleep every batch.
+        let wrapped = input_stream.enumerate().map(move |(idx, batch_result)| {
+            match fault_type.to_lowercase().as_str() {
+                ft if ft.starts_with("delay") => {

Review Comment:
   i was thinking to have it across all cases, adding locking and latency 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to