This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 2968331e4c #16994 Ensure CooperativeExec#maintains_input_order returns 
a Vec of the correct size (#16995)
2968331e4c is described below

commit 2968331e4c4a8e3596afb2e56a3f0e9e4a864674
Author: Pepijn Van Eeckhoudt <pep...@vaneeckhoudt.net>
AuthorDate: Mon Aug 4 19:14:45 2025 +0200

    #16994 Ensure CooperativeExec#maintains_input_order returns a Vec of the 
correct size (#16995)
    
    * #16994 Ensure CooperativeExec#maintains_input_order returns a Vec of the 
correct size
    
    * #16994 Extend default ExecutionPlan invariant checks
    
    Add checks that verify the length of the vectors returned by methods that 
need to return a value per child.
---
 datafusion/physical-plan/src/coop.rs           |  2 +-
 datafusion/physical-plan/src/execution_plan.rs | 40 +++++++++++++++++++++++---
 datafusion/physical-plan/src/union.rs          |  7 +++--
 datafusion/physical-plan/src/work_table.rs     |  8 ------
 4 files changed, 42 insertions(+), 15 deletions(-)

diff --git a/datafusion/physical-plan/src/coop.rs 
b/datafusion/physical-plan/src/coop.rs
index 777fd7d9d6..2d358367b4 100644
--- a/datafusion/physical-plan/src/coop.rs
+++ b/datafusion/physical-plan/src/coop.rs
@@ -252,7 +252,7 @@ impl ExecutionPlan for CooperativeExec {
     }
 
     fn maintains_input_order(&self) -> Vec<bool> {
-        self.input.maintains_input_order()
+        vec![true; self.children().len()]
     }
 
     fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
diff --git a/datafusion/physical-plan/src/execution_plan.rs 
b/datafusion/physical-plan/src/execution_plan.rs
index 6b402528c1..730d496201 100644
--- a/datafusion/physical-plan/src/execution_plan.rs
+++ b/datafusion/physical-plan/src/execution_plan.rs
@@ -47,7 +47,7 @@ use crate::stream::RecordBatchStreamAdapter;
 use arrow::array::{Array, RecordBatch};
 use arrow::datatypes::SchemaRef;
 use datafusion_common::config::ConfigOptions;
-use datafusion_common::{exec_err, Constraints, Result};
+use datafusion_common::{exec_err, Constraints, DataFusionError, Result};
 use datafusion_common_runtime::JoinSet;
 use datafusion_execution::TaskContext;
 use datafusion_physical_expr::EquivalenceProperties;
@@ -117,10 +117,11 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     /// Returns an error if this individual node does not conform to its 
invariants.
     /// These invariants are typically only checked in debug mode.
     ///
-    /// A default set of invariants is provided in the default implementation.
+    /// A default set of invariants is provided in the 
[check_default_invariants] function.
+    /// The default implementation of `check_invariants` calls this function.
     /// Extension nodes can provide their own invariants.
-    fn check_invariants(&self, _check: InvariantLevel) -> Result<()> {
-        Ok(())
+    fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
+        check_default_invariants(self, check)
     }
 
     /// Specifies the data distribution requirements for all the
@@ -1035,6 +1036,37 @@ impl PlanProperties {
     }
 }
 
+macro_rules! check_len {
+    ($target:expr, $func_name:ident, $expected_len:expr) => {
+        let actual_len = $target.$func_name().len();
+        if actual_len != $expected_len {
+            return internal_err!(
+                "{}::{} returned Vec with incorrect size: {} != {}",
+                $target.name(),
+                stringify!($func_name),
+                actual_len,
+                $expected_len
+            );
+        }
+    };
+}
+
+/// Checks a set of invariants that apply to all ExecutionPlan implementations.
+/// Returns an error if the given node does not conform.
+pub fn check_default_invariants<P: ExecutionPlan + ?Sized>(
+    plan: &P,
+    _check: InvariantLevel,
+) -> Result<(), DataFusionError> {
+    let children_len = plan.children().len();
+
+    check_len!(plan, maintains_input_order, children_len);
+    check_len!(plan, required_input_ordering, children_len);
+    check_len!(plan, required_input_distribution, children_len);
+    check_len!(plan, benefits_from_input_partitioning, children_len);
+
+    Ok(())
+}
+
 /// Indicate whether a data exchange is needed for the input of `plan`, which 
will be very helpful
 /// especially for the distributed engine to judge whether need to deal with 
shuffling.
 /// Currently, there are 3 kinds of execution plan which needs data exchange
diff --git a/datafusion/physical-plan/src/union.rs 
b/datafusion/physical-plan/src/union.rs
index 73d7933e7c..aca03c57b1 100644
--- a/datafusion/physical-plan/src/union.rs
+++ b/datafusion/physical-plan/src/union.rs
@@ -33,7 +33,8 @@ use super::{
     SendableRecordBatchStream, Statistics,
 };
 use crate::execution_plan::{
-    boundedness_from_children, emission_type_from_children, InvariantLevel,
+    boundedness_from_children, check_default_invariants, 
emission_type_from_children,
+    InvariantLevel,
 };
 use crate::metrics::BaselineMetrics;
 use crate::projection::{make_with_child, ProjectionExec};
@@ -176,7 +177,9 @@ impl ExecutionPlan for UnionExec {
         &self.cache
     }
 
-    fn check_invariants(&self, _check: InvariantLevel) -> Result<()> {
+    fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
+        check_default_invariants(self, check)?;
+
         (self.inputs().len() >= 2)
             .then_some(())
             .ok_or(DataFusionError::Internal(
diff --git a/datafusion/physical-plan/src/work_table.rs 
b/datafusion/physical-plan/src/work_table.rs
index 076e30ab90..40a22f94b8 100644
--- a/datafusion/physical-plan/src/work_table.rs
+++ b/datafusion/physical-plan/src/work_table.rs
@@ -174,14 +174,6 @@ impl ExecutionPlan for WorkTableExec {
         &self.cache
     }
 
-    fn maintains_input_order(&self) -> Vec<bool> {
-        vec![false]
-    }
-
-    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
-        vec![false]
-    }
-
     fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
         vec![]
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to