milenkovicm opened a new pull request, #1802:
URL: https://github.com/apache/datafusion-ballista/pull/1802
## Which issue does this PR close?
Closes #.
## Rationale for this change
Distributed query engines like Ballista are complex systems where failures
can occur at many points — network partitions, executor crashes, resource
exhaustion. Without a way to deliberately inject failures during testing, it is
difficult to verify that the system recovers correctly and produces consistent
results under partial failure conditions. This PR introduces a chaos monkey
mechanism to enable controlled, reproducible robustness testing of Ballista's
distributed execution pipeline.
Chaos testing is a well-established technique (popularized by Netflix's
Chaos Monkey) for proactively exposing weaknesses in distributed systems by
randomly inducing failures in a controlled environment. By injecting failures
at the physical plan level, we can verify retry logic, fault tolerance, and
error propagation across executor nodes without relying on external fault
injection tools.
## What changes are included in this PR?
Adds `ChaosExec` (`ballista/core/src/execution_plans/chaos_exec.rs`) — a
physical execution plan wrapper node that randomly fails with a configurable
probability. It is transparent to the plan: it preserves the schema and output
partitioning of its input node.
Adds `ChaosExecRule`
(`ballista/scheduler/src/state/aqe/optimizer_rule/chaos_exec.rs`) — an AQE
optimizer rule that randomly selects one node in the physical plan and wraps it
with ChaosExec. Exactly one injection per optimize call.
Adds protobuf definition and serde support for ChaosExec so it can be
serialized and sent to executor nodes.
Adds new `BallistaConfig` keys:
- `ballista.testing.chaos_execution.enabled` - (bool, default false)
- `ballista.testing.chaos_execution.probability` - (f64 in [0.0, 1.0],
default 0.25)
- `ballista.testing.chaos_execution.fault_type` - (string, default
"transient")
- `ballista.testing.chaos_execution.seed` (str, default `""`)
-
New node to be added randomly `ChaosExec`
```
AdaptiveDatafusionExec: is_final=false, plan_id=1, stage_id=pending,
stage_resolved=false
SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST, l_linestatus@1
ASC NULLS LAST]
SortExec: expr=[l_returnflag@0 ASC NULLS LAST, l_linestatus@1 ASC NULLS
LAST], preserve_partitioning=[true]
ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1
as l_linestatus, sum(lineitem.l_quantity)@2 as sum_qty,
sum(lineitem.l_extendedprice)@3 as sum_base_price, sum(lineitem.l_extendedprice
* Int64(1) - lineitem.l_discount)@4 as sum_disc_price,
sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) +
lineitem.l_tax)@5 as sum_charge, avg(lineitem.l_quantity)@6 as avg_qty,
avg(lineitem.l_extendedprice)@7 as avg_price, avg(lineitem.l_discount)@8 as
avg_disc, count(Int64(1))@9 as count_order]
AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as
l_returnflag, l_linestatus@1 as l_linestatus], aggr=[sum(lineitem.l_quantity),
sum(lineitem.l_extendedprice), sum(lineitem.l_extendedprice * Int64(1) -
lineitem.l_discount), sum(lineitem.l_extendedprice * Int64(1) -
lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity),
avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(Int64(1))]
ExchangeExec: partitioning=Hash([l_returnflag@0, l_linestatus@1],
24), plan_id=0, stage_id=0, stage_resolved=false
AggregateExec: mode=Partial, gby=[l_returnflag@5 as
l_returnflag, l_linestatus@6 as l_linestatus], aggr=[sum(lineitem.l_quantity),
sum(lineitem.l_extendedprice), sum(lineitem.l_extendedprice * Int64(1) -
lineitem.l_discount), sum(lineitem.l_extendedprice * Int64(1) -
lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity),
avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(Int64(1))]
ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 -
l_discount@2) as __common_expr_2, l_quantity@0 as l_quantity, l_extendedprice@1
as l_extendedprice, l_discount@2 as l_discount, l_tax@3 as l_tax,
l_returnflag@4 as l_returnflag, l_linestatus@5 as l_linestatus]
ChaosExec: failure_probability=0.15, fault_type=transient
FilterExec: l_shipdate@6 <= 1998-09-02
DataSourceExec: file_groups={24 groups:
[[Users/marko/TMP/tpch_data/tpch-data-sf10/lineitem/part-0.parquet:0..68856801],
[Users/marko/TMP/tpch_data/tpch-data-sf10/lineitem/part-0.parquet:68856801..119179177,
Users/marko/TMP/tpch_data/tpch-data-sf10/lineitem/part-1.parquet:0..18534425],
, ...]}, projection=[l_quantity, l_extendedprice, l_discount, l_tax,
l_returnflag, l_linestatus, l_shipdate], file_type=parquet,
predicate=l_shipdate@10 <= 1998-09-02,
pruning_predicate=l_shipdate_null_count@1 != row_count@2 AND l_shipdate_min@0
<= 1998-09-02, required_guarantees=[]
```
## Are there any user-facing changes?
Two new opt-in configuration keys are introduced, both disabled by default:
- `ballista.testing.chaos_execution.enabled` - must be explicitly set to
true to activate chaos injection.
- `ballista.testing.chaos_execution.probability` - controls the per-node
failure probability when enabled.
- `ballista.testing.chaos_execution.fault_type` - controls fault type, valid
values: `"transient"` (IoError), `"fatal"` (Execution error), `"panic"`,
`"delay"`.
- `ballista.testing.chaos_execution.seed` - optional u64 seed for
reproducible runs
These keys are scoped under `ballista.testing.*` to signal they are for
testing environments only and should not be enabled in production workloads. No
existing behavior is affected when the feature is disabled.
## Context
depends on #1752 which should be merged first
## Things to consider
- [x] add random seed setting for reproducibility
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]