This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 2968331e4c #16994 Ensure CooperativeExec#maintains_input_order returns a Vec of the correct size (#16995) 2968331e4c is described below commit 2968331e4c4a8e3596afb2e56a3f0e9e4a864674 Author: Pepijn Van Eeckhoudt <pep...@vaneeckhoudt.net> AuthorDate: Mon Aug 4 19:14:45 2025 +0200 #16994 Ensure CooperativeExec#maintains_input_order returns a Vec of the correct size (#16995) * #16994 Ensure CooperativeExec#maintains_input_order returns a Vec of the correct size * #16994 Extend default ExecutionPlan invariant checks Add checks that verify the length of the vectors returned by methods that need to return a value per child. --- datafusion/physical-plan/src/coop.rs | 2 +- datafusion/physical-plan/src/execution_plan.rs | 40 +++++++++++++++++++++++--- datafusion/physical-plan/src/union.rs | 7 +++-- datafusion/physical-plan/src/work_table.rs | 8 ------ 4 files changed, 42 insertions(+), 15 deletions(-) diff --git a/datafusion/physical-plan/src/coop.rs b/datafusion/physical-plan/src/coop.rs index 777fd7d9d6..2d358367b4 100644 --- a/datafusion/physical-plan/src/coop.rs +++ b/datafusion/physical-plan/src/coop.rs @@ -252,7 +252,7 @@ impl ExecutionPlan for CooperativeExec { } fn maintains_input_order(&self) -> Vec<bool> { - self.input.maintains_input_order() + vec![true; self.children().len()] } fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 6b402528c1..730d496201 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -47,7 +47,7 @@ use crate::stream::RecordBatchStreamAdapter; use arrow::array::{Array, RecordBatch}; use arrow::datatypes::SchemaRef; use datafusion_common::config::ConfigOptions; -use datafusion_common::{exec_err, Constraints, Result}; +use datafusion_common::{exec_err, Constraints, DataFusionError, Result}; use datafusion_common_runtime::JoinSet; use datafusion_execution::TaskContext; use datafusion_physical_expr::EquivalenceProperties; @@ -117,10 +117,11 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// Returns an error if this individual node does not conform to its invariants. /// These invariants are typically only checked in debug mode. /// - /// A default set of invariants is provided in the default implementation. + /// A default set of invariants is provided in the [check_default_invariants] function. + /// The default implementation of `check_invariants` calls this function. /// Extension nodes can provide their own invariants. - fn check_invariants(&self, _check: InvariantLevel) -> Result<()> { - Ok(()) + fn check_invariants(&self, check: InvariantLevel) -> Result<()> { + check_default_invariants(self, check) } /// Specifies the data distribution requirements for all the @@ -1035,6 +1036,37 @@ impl PlanProperties { } } +macro_rules! check_len { + ($target:expr, $func_name:ident, $expected_len:expr) => { + let actual_len = $target.$func_name().len(); + if actual_len != $expected_len { + return internal_err!( + "{}::{} returned Vec with incorrect size: {} != {}", + $target.name(), + stringify!($func_name), + actual_len, + $expected_len + ); + } + }; +} + +/// Checks a set of invariants that apply to all ExecutionPlan implementations. +/// Returns an error if the given node does not conform. +pub fn check_default_invariants<P: ExecutionPlan + ?Sized>( + plan: &P, + _check: InvariantLevel, +) -> Result<(), DataFusionError> { + let children_len = plan.children().len(); + + check_len!(plan, maintains_input_order, children_len); + check_len!(plan, required_input_ordering, children_len); + check_len!(plan, required_input_distribution, children_len); + check_len!(plan, benefits_from_input_partitioning, children_len); + + Ok(()) +} + /// Indicate whether a data exchange is needed for the input of `plan`, which will be very helpful /// especially for the distributed engine to judge whether need to deal with shuffling. /// Currently, there are 3 kinds of execution plan which needs data exchange diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 73d7933e7c..aca03c57b1 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -33,7 +33,8 @@ use super::{ SendableRecordBatchStream, Statistics, }; use crate::execution_plan::{ - boundedness_from_children, emission_type_from_children, InvariantLevel, + boundedness_from_children, check_default_invariants, emission_type_from_children, + InvariantLevel, }; use crate::metrics::BaselineMetrics; use crate::projection::{make_with_child, ProjectionExec}; @@ -176,7 +177,9 @@ impl ExecutionPlan for UnionExec { &self.cache } - fn check_invariants(&self, _check: InvariantLevel) -> Result<()> { + fn check_invariants(&self, check: InvariantLevel) -> Result<()> { + check_default_invariants(self, check)?; + (self.inputs().len() >= 2) .then_some(()) .ok_or(DataFusionError::Internal( diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index 076e30ab90..40a22f94b8 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -174,14 +174,6 @@ impl ExecutionPlan for WorkTableExec { &self.cache } - fn maintains_input_order(&self) -> Vec<bool> { - vec![false] - } - - fn benefits_from_input_partitioning(&self) -> Vec<bool> { - vec![false] - } - fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { vec![] } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org