This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 7b78f0cea0 Mark BufferExec and AnalyzeExec as eager (#22711)
7b78f0cea0 is described below
commit 7b78f0cea0409f60c79a3833781e621344d52ffd
Author: Geoffrey Claude <[email protected]>
AuthorDate: Thu Jun 4 21:06:42 2026 +0200
Mark BufferExec and AnalyzeExec as eager (#22711)
## Which issue does this PR close?
- Closes #22708.
## Rationale for this change
`BufferExec` and `AnalyzeExec` both have eager evaluation behavior: they
can drive child streams in spawned tasks instead of performing exactly
one downstream poll worth of work at a time. Their `PlanProperties`
currently report `EvaluationType::Lazy`, so physical-plan metadata does
not describe how these operators execute.
While making that metadata accurate, this PR also keeps
`need_data_exchange(...)` scoped to its original purpose. #4585
introduced that helper as a way to identify physical operators that
require exchange-style handling because they redistribute partitions or
gather multiple input partitions into one output partition. #4586
implemented it for the native exchange/gather operators called out
there: non-round-robin `RepartitionExec`, `CoalescePartitionsExec`, and
`SortPreservingMergeExec`.
The later cooperative-scheduling work in #16398 introduced
`EvaluationType` and made `need_data_exchange(...)` use `evaluation_type
== EvaluationType::Eager`. That shortcut worked when the eager operators
and exchange/gather operators were the same set. With `BufferExec` and
`AnalyzeExec` correctly classified as eager, the shortcut would make
`need_data_exchange(...)` report operators that do eager child polling
but do not perform a data exchange.
So the intended split in this PR is:
- `EvaluationType` describes execution behavior.
- `need_data_exchange(...)` identifies partition redistribution or
partition gathering.
## What changes are included in this PR?
- Mark `BufferExec` as `EvaluationType::Eager` while keeping its
existing cooperative scheduling metadata.
- Mark `AnalyzeExec` as `EvaluationType::Eager` in its computed plan
properties.
- Clarify the `EvaluationType` docs so eager evaluation is not defined
by whether work starts in `execute` or on the first stream poll.
- Restore `need_data_exchange(...)` as an exchange/gather predicate for
the native exchange operators instead of deriving it from all eager
operators.
## Are these changes tested?
Targeted tests pass:
```bash
cargo test -p datafusion-physical-plan --lib buffer::tests
cargo test -p datafusion-physical-plan --lib analyze::tests
cargo test -p datafusion-physical-plan --doc execution_plan
cargo test -p datafusion-physical-plan --lib
execution_plan::tests::buffer_exec_does_not_need_data_exchange
RUSTDOCFLAGS="-D warnings" cargo doc -p datafusion-physical-plan --no-deps
```
I also added a focused regression test that `BufferExec` does not
require data exchange. I did not add tests that merely assert the
assigned `EvaluationType`, as those would duplicate the implementation
rather than cover behavior.
## Are there any user-facing changes?
No query-result changes are expected. This updates physical-plan
metadata and keeps `need_data_exchange(...)` scoped to operators that
move data between partitions or gather partitions together.
---
datafusion/physical-plan/src/analyze.rs | 2 +
datafusion/physical-plan/src/buffer.rs | 5 +-
datafusion/physical-plan/src/execution_plan.rs | 80 +++++++++++++++++++-------
3 files changed, 65 insertions(+), 22 deletions(-)
diff --git a/datafusion/physical-plan/src/analyze.rs
b/datafusion/physical-plan/src/analyze.rs
index 580bf31231..27e0f5e923 100644
--- a/datafusion/physical-plan/src/analyze.rs
+++ b/datafusion/physical-plan/src/analyze.rs
@@ -25,6 +25,7 @@ use super::{
SendableRecordBatchStream,
};
use crate::display::DisplayableExecutionPlan;
+use crate::execution_plan::EvaluationType;
use crate::metrics::{MetricCategory, MetricType};
use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
@@ -172,6 +173,7 @@ impl AnalyzeExec {
input.pipeline_behavior(),
input.boundedness(),
)
+ .with_evaluation_type(EvaluationType::Eager)
}
}
diff --git a/datafusion/physical-plan/src/buffer.rs
b/datafusion/physical-plan/src/buffer.rs
index 19a4ebba83..2985dc5766 100644
--- a/datafusion/physical-plan/src/buffer.rs
+++ b/datafusion/physical-plan/src/buffer.rs
@@ -18,7 +18,7 @@
//! [`BufferExec`] decouples production and consumption on messages by
buffering the input in the
//! background up to a certain capacity.
-use crate::execution_plan::{CardinalityEffect, SchedulingType};
+use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType};
use crate::filter_pushdown::{
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
FilterPushdownPropagation,
@@ -101,7 +101,8 @@ impl BufferExec {
/// Builds a new [BufferExec] with the provided capacity in bytes.
pub fn new(input: Arc<dyn ExecutionPlan>, capacity: usize) -> Self {
let properties = PlanProperties::clone(input.properties())
- .with_scheduling_type(SchedulingType::Cooperative);
+ .with_scheduling_type(SchedulingType::Cooperative)
+ .with_evaluation_type(EvaluationType::Eager);
Self {
input,
diff --git a/datafusion/physical-plan/src/execution_plan.rs
b/datafusion/physical-plan/src/execution_plan.rs
index 50eac566d9..8577e86f00 100644
--- a/datafusion/physical-plan/src/execution_plan.rs
+++ b/datafusion/physical-plan/src/execution_plan.rs
@@ -45,6 +45,8 @@ use crate::coalesce_partitions::CoalescePartitionsExec;
use crate::display::DisplayableExecutionPlan;
use crate::metrics::MetricsSet;
use crate::projection::ProjectionExec;
+use crate::repartition::RepartitionExec;
+use crate::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::stream::RecordBatchStreamAdapter;
use arrow::array::{Array, RecordBatch};
@@ -962,25 +964,36 @@ pub enum SchedulingType {
Cooperative,
}
-/// Represents how an operator's `Stream` implementation generates
`RecordBatch`es.
+/// Represents how an operator's stream drives [`RecordBatch`] production
+/// relative to downstream demand.
///
-/// Most operators in DataFusion generate `RecordBatch`es when asked to do so
by a call to
-/// `Stream::poll_next`. This is known as demand-driven or lazy evaluation.
-///
-/// Some operators like `Repartition` need to drive `RecordBatch` generation
themselves though. This
-/// is known as data-driven or eager evaluation.
+/// This is execution-topology metadata for optimizers. It distinguishes
streams
+/// whose batch production is driven directly by downstream calls to
+/// `Stream::poll_next` from streams that may also drive input or output
+/// production independently, such as by spawning tasks or buffering batches
+/// ahead of demand.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EvaluationType {
- /// The stream generated by [`execute`](ExecutionPlan::execute) only
generates `RecordBatch`
- /// instances when it is demanded by invoking `Stream::poll_next`.
- /// Filter, projection, and join are examples of such lazy operators.
+ /// The stream generated by [`execute`](ExecutionPlan::execute) is
+ /// demand-driven: it produces [`RecordBatch`]es in response to downstream
+ /// calls to `Stream::poll_next`.
+ ///
+ /// Filter, projection, and join operators are examples of lazy operators.
///
/// Lazy operators are also known as demand-driven operators.
Lazy,
- /// The stream generated by [`execute`](ExecutionPlan::execute) eagerly
generates `RecordBatch`
- /// in one or more spawned Tokio tasks. Eager evaluation is only started
the first time
- /// `Stream::poll_next` is called.
- /// Examples of eager operators are repartition, coalesce partitions, and
sort preserving merge.
+ /// The stream generated by [`execute`](ExecutionPlan::execute) may drive
+ /// input or output [`RecordBatch`] production ahead of, or independently
+ /// from, downstream calls to `Stream::poll_next`.
+ ///
+ /// Eager operators commonly poll input streams from spawned Tokio tasks,
+ /// buffer batches ahead of demand, or otherwise create an independent
+ /// child-polling pipeline. Eager work may start when `execute` creates the
+ /// stream or when the returned stream is first polled; that timing is an
+ /// implementation detail.
+ ///
+ /// Repartition, coalesce partitions, sort-preserving merge, buffer, and
+ /// analyze operators are examples of eager operators.
///
/// Eager operators are also known as a data-driven operators.
Eager,
@@ -1209,15 +1222,31 @@ pub fn check_default_invariants<P: ExecutionPlan +
?Sized>(
Ok(())
}
-/// Indicate whether a data exchange is needed for the input of `plan`, which
will be very helpful
-/// especially for the distributed engine to judge whether need to deal with
shuffling.
-/// Currently, there are 3 kinds of execution plan which needs data exchange
-/// 1. RepartitionExec for changing the partition number between two
`ExecutionPlan`s
-/// 2. CoalescePartitionsExec for collapsing all of the partitions into
one without ordering guarantee
-/// 3. SortPreservingMergeExec for collapsing all of the sorted partitions
into one with ordering guarantee
+/// Indicate whether a data exchange is needed for the input of `plan`.
+///
+/// This identifies physical operators that redistribute child partitions or
+/// gather multiple child partitions into one output partition:
+///
+/// 1. RepartitionExec for non-round-robin repartitioning
+/// 2. CoalescePartitionsExec for collapsing multiple partitions into one
without ordering guarantee
+/// 3. SortPreservingMergeExec for collapsing multiple sorted partitions into
one with ordering guarantee
#[expect(clippy::needless_pass_by_value)]
pub fn need_data_exchange(plan: Arc<dyn ExecutionPlan>) -> bool {
- plan.properties().evaluation_type == EvaluationType::Eager
+ if let Some(repartition) = plan.downcast_ref::<RepartitionExec>() {
+ !matches!(repartition.partitioning(), Partitioning::RoundRobinBatch(_))
+ } else if let Some(coalesce) =
plan.downcast_ref::<CoalescePartitionsExec>() {
+ coalesce.input().output_partitioning().partition_count() > 1
+ } else if let Some(sort_preserving_merge) =
+ plan.downcast_ref::<SortPreservingMergeExec>()
+ {
+ sort_preserving_merge
+ .input()
+ .output_partitioning()
+ .partition_count()
+ > 1
+ } else {
+ false
+ }
}
/// Returns a copy of this plan if we change any child according to the
pointer comparison.
@@ -1556,6 +1585,8 @@ pub(crate) fn stub_properties() -> Arc<PlanProperties> {
mod tests {
use super::*;
+ use crate::buffer::BufferExec;
+ use crate::test::exec::MockExec;
use crate::{DisplayAs, DisplayFormatType, ExecutionPlan};
use arrow::array::{DictionaryArray, Int32Array, NullArray, RunArray};
@@ -1768,6 +1799,15 @@ mod tests {
let _ = plan.name();
}
+ #[test]
+ fn buffer_exec_does_not_need_data_exchange() {
+ let schema = Arc::new(Schema::empty());
+ let input: Arc<dyn ExecutionPlan> = Arc::new(MockExec::new(vec![],
schema));
+ let buffer: Arc<dyn ExecutionPlan> = Arc::new(BufferExec::new(input,
1024));
+
+ assert!(!need_data_exchange(buffer));
+ }
+
#[test]
fn test_check_not_null_constraints_accept_non_null() -> Result<()> {
check_not_null_constraints(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]