This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 7b78f0cea0 Mark BufferExec and AnalyzeExec as eager (#22711)
7b78f0cea0 is described below

commit 7b78f0cea0409f60c79a3833781e621344d52ffd
Author: Geoffrey Claude <[email protected]>
AuthorDate: Thu Jun 4 21:06:42 2026 +0200

    Mark BufferExec and AnalyzeExec as eager (#22711)
    
    ## Which issue does this PR close?
    
    - Closes #22708.
    
    ## Rationale for this change
    
    `BufferExec` and `AnalyzeExec` both have eager evaluation behavior: they
    can drive child streams in spawned tasks instead of performing exactly
    one downstream poll worth of work at a time. Their `PlanProperties`
    currently report `EvaluationType::Lazy`, so physical-plan metadata does
    not describe how these operators execute.
    
    While making that metadata accurate, this PR also keeps
    `need_data_exchange(...)` scoped to its original purpose. #4585
    introduced that helper as a way to identify physical operators that
    require exchange-style handling because they redistribute partitions or
    gather multiple input partitions into one output partition. #4586
    implemented it for the native exchange/gather operators called out
    there: non-round-robin `RepartitionExec`, `CoalescePartitionsExec`, and
    `SortPreservingMergeExec`.
    
    The later cooperative-scheduling work in #16398 introduced
    `EvaluationType` and made `need_data_exchange(...)` use `evaluation_type
    == EvaluationType::Eager`. That shortcut worked when the eager operators
    and exchange/gather operators were the same set. With `BufferExec` and
    `AnalyzeExec` correctly classified as eager, the shortcut would make
    `need_data_exchange(...)` report operators that do eager child polling
    but do not perform a data exchange.
    
    So the intended split in this PR is:
    
    - `EvaluationType` describes execution behavior.
    - `need_data_exchange(...)` identifies partition redistribution or
    partition gathering.
    
    ## What changes are included in this PR?
    
    - Mark `BufferExec` as `EvaluationType::Eager` while keeping its
    existing cooperative scheduling metadata.
    - Mark `AnalyzeExec` as `EvaluationType::Eager` in its computed plan
    properties.
    - Clarify the `EvaluationType` docs so eager evaluation is not defined
    by whether work starts in `execute` or on the first stream poll.
    - Restore `need_data_exchange(...)` as an exchange/gather predicate for
    the native exchange operators instead of deriving it from all eager
    operators.
    
    ## Are these changes tested?
    
    Targeted tests pass:
    
    ```bash
    cargo test -p datafusion-physical-plan --lib buffer::tests
    cargo test -p datafusion-physical-plan --lib analyze::tests
    cargo test -p datafusion-physical-plan --doc execution_plan
    cargo test -p datafusion-physical-plan --lib 
execution_plan::tests::buffer_exec_does_not_need_data_exchange
    RUSTDOCFLAGS="-D warnings" cargo doc -p datafusion-physical-plan --no-deps
    ```
    
    I also added a focused regression test that `BufferExec` does not
    require data exchange. I did not add tests that merely assert the
    assigned `EvaluationType`, as those would duplicate the implementation
    rather than cover behavior.
    
    ## Are there any user-facing changes?
    
    No query-result changes are expected. This updates physical-plan
    metadata and keeps `need_data_exchange(...)` scoped to operators that
    move data between partitions or gather partitions together.
---
 datafusion/physical-plan/src/analyze.rs        |  2 +
 datafusion/physical-plan/src/buffer.rs         |  5 +-
 datafusion/physical-plan/src/execution_plan.rs | 80 +++++++++++++++++++-------
 3 files changed, 65 insertions(+), 22 deletions(-)

diff --git a/datafusion/physical-plan/src/analyze.rs 
b/datafusion/physical-plan/src/analyze.rs
index 580bf31231..27e0f5e923 100644
--- a/datafusion/physical-plan/src/analyze.rs
+++ b/datafusion/physical-plan/src/analyze.rs
@@ -25,6 +25,7 @@ use super::{
     SendableRecordBatchStream,
 };
 use crate::display::DisplayableExecutionPlan;
+use crate::execution_plan::EvaluationType;
 use crate::metrics::{MetricCategory, MetricType};
 use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
 
@@ -172,6 +173,7 @@ impl AnalyzeExec {
             input.pipeline_behavior(),
             input.boundedness(),
         )
+        .with_evaluation_type(EvaluationType::Eager)
     }
 }
 
diff --git a/datafusion/physical-plan/src/buffer.rs 
b/datafusion/physical-plan/src/buffer.rs
index 19a4ebba83..2985dc5766 100644
--- a/datafusion/physical-plan/src/buffer.rs
+++ b/datafusion/physical-plan/src/buffer.rs
@@ -18,7 +18,7 @@
 //! [`BufferExec`] decouples production and consumption on messages by 
buffering the input in the
 //! background up to a certain capacity.
 
-use crate::execution_plan::{CardinalityEffect, SchedulingType};
+use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType};
 use crate::filter_pushdown::{
     ChildPushdownResult, FilterDescription, FilterPushdownPhase,
     FilterPushdownPropagation,
@@ -101,7 +101,8 @@ impl BufferExec {
     /// Builds a new [BufferExec] with the provided capacity in bytes.
     pub fn new(input: Arc<dyn ExecutionPlan>, capacity: usize) -> Self {
         let properties = PlanProperties::clone(input.properties())
-            .with_scheduling_type(SchedulingType::Cooperative);
+            .with_scheduling_type(SchedulingType::Cooperative)
+            .with_evaluation_type(EvaluationType::Eager);
 
         Self {
             input,
diff --git a/datafusion/physical-plan/src/execution_plan.rs 
b/datafusion/physical-plan/src/execution_plan.rs
index 50eac566d9..8577e86f00 100644
--- a/datafusion/physical-plan/src/execution_plan.rs
+++ b/datafusion/physical-plan/src/execution_plan.rs
@@ -45,6 +45,8 @@ use crate::coalesce_partitions::CoalescePartitionsExec;
 use crate::display::DisplayableExecutionPlan;
 use crate::metrics::MetricsSet;
 use crate::projection::ProjectionExec;
+use crate::repartition::RepartitionExec;
+use crate::sorts::sort_preserving_merge::SortPreservingMergeExec;
 use crate::stream::RecordBatchStreamAdapter;
 
 use arrow::array::{Array, RecordBatch};
@@ -962,25 +964,36 @@ pub enum SchedulingType {
     Cooperative,
 }
 
-/// Represents how an operator's `Stream` implementation generates 
`RecordBatch`es.
+/// Represents how an operator's stream drives [`RecordBatch`] production
+/// relative to downstream demand.
 ///
-/// Most operators in DataFusion generate `RecordBatch`es when asked to do so 
by a call to
-/// `Stream::poll_next`. This is known as demand-driven or lazy evaluation.
-///
-/// Some operators like `Repartition` need to drive `RecordBatch` generation 
themselves though. This
-/// is known as data-driven or eager evaluation.
+/// This is execution-topology metadata for optimizers. It distinguishes 
streams
+/// whose batch production is driven directly by downstream calls to
+/// `Stream::poll_next` from streams that may also drive input or output
+/// production independently, such as by spawning tasks or buffering batches
+/// ahead of demand.
 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
 pub enum EvaluationType {
-    /// The stream generated by [`execute`](ExecutionPlan::execute) only 
generates `RecordBatch`
-    /// instances when it is demanded by invoking `Stream::poll_next`.
-    /// Filter, projection, and join are examples of such lazy operators.
+    /// The stream generated by [`execute`](ExecutionPlan::execute) is
+    /// demand-driven: it produces [`RecordBatch`]es in response to downstream
+    /// calls to `Stream::poll_next`.
+    ///
+    /// Filter, projection, and join operators are examples of lazy operators.
     ///
     /// Lazy operators are also known as demand-driven operators.
     Lazy,
-    /// The stream generated by [`execute`](ExecutionPlan::execute) eagerly 
generates `RecordBatch`
-    /// in one or more spawned Tokio tasks. Eager evaluation is only started 
the first time
-    /// `Stream::poll_next` is called.
-    /// Examples of eager operators are repartition, coalesce partitions, and 
sort preserving merge.
+    /// The stream generated by [`execute`](ExecutionPlan::execute) may drive
+    /// input or output [`RecordBatch`] production ahead of, or independently
+    /// from, downstream calls to `Stream::poll_next`.
+    ///
+    /// Eager operators commonly poll input streams from spawned Tokio tasks,
+    /// buffer batches ahead of demand, or otherwise create an independent
+    /// child-polling pipeline. Eager work may start when `execute` creates the
+    /// stream or when the returned stream is first polled; that timing is an
+    /// implementation detail.
+    ///
+    /// Repartition, coalesce partitions, sort-preserving merge, buffer, and
+    /// analyze operators are examples of eager operators.
     ///
     /// Eager operators are also known as a data-driven operators.
     Eager,
@@ -1209,15 +1222,31 @@ pub fn check_default_invariants<P: ExecutionPlan + 
?Sized>(
     Ok(())
 }
 
-/// Indicate whether a data exchange is needed for the input of `plan`, which 
will be very helpful
-/// especially for the distributed engine to judge whether need to deal with 
shuffling.
-/// Currently, there are 3 kinds of execution plan which needs data exchange
-///     1. RepartitionExec for changing the partition number between two 
`ExecutionPlan`s
-///     2. CoalescePartitionsExec for collapsing all of the partitions into 
one without ordering guarantee
-///     3. SortPreservingMergeExec for collapsing all of the sorted partitions 
into one with ordering guarantee
+/// Indicate whether a data exchange is needed for the input of `plan`.
+///
+/// This identifies physical operators that redistribute child partitions or
+/// gather multiple child partitions into one output partition:
+///
+/// 1. RepartitionExec for non-round-robin repartitioning
+/// 2. CoalescePartitionsExec for collapsing multiple partitions into one 
without ordering guarantee
+/// 3. SortPreservingMergeExec for collapsing multiple sorted partitions into 
one with ordering guarantee
 #[expect(clippy::needless_pass_by_value)]
 pub fn need_data_exchange(plan: Arc<dyn ExecutionPlan>) -> bool {
-    plan.properties().evaluation_type == EvaluationType::Eager
+    if let Some(repartition) = plan.downcast_ref::<RepartitionExec>() {
+        !matches!(repartition.partitioning(), Partitioning::RoundRobinBatch(_))
+    } else if let Some(coalesce) = 
plan.downcast_ref::<CoalescePartitionsExec>() {
+        coalesce.input().output_partitioning().partition_count() > 1
+    } else if let Some(sort_preserving_merge) =
+        plan.downcast_ref::<SortPreservingMergeExec>()
+    {
+        sort_preserving_merge
+            .input()
+            .output_partitioning()
+            .partition_count()
+            > 1
+    } else {
+        false
+    }
 }
 
 /// Returns a copy of this plan if we change any child according to the 
pointer comparison.
@@ -1556,6 +1585,8 @@ pub(crate) fn stub_properties() -> Arc<PlanProperties> {
 mod tests {
 
     use super::*;
+    use crate::buffer::BufferExec;
+    use crate::test::exec::MockExec;
     use crate::{DisplayAs, DisplayFormatType, ExecutionPlan};
 
     use arrow::array::{DictionaryArray, Int32Array, NullArray, RunArray};
@@ -1768,6 +1799,15 @@ mod tests {
         let _ = plan.name();
     }
 
+    #[test]
+    fn buffer_exec_does_not_need_data_exchange() {
+        let schema = Arc::new(Schema::empty());
+        let input: Arc<dyn ExecutionPlan> = Arc::new(MockExec::new(vec![], 
schema));
+        let buffer: Arc<dyn ExecutionPlan> = Arc::new(BufferExec::new(input, 
1024));
+
+        assert!(!need_data_exchange(buffer));
+    }
+
     #[test]
     fn test_check_not_null_constraints_accept_non_null() -> Result<()> {
         check_not_null_constraints(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to