This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-22711-18e7c8ed2b18c23750286ba914fe4a1e4d199719 in repository https://gitbox.apache.org/repos/asf/datafusion.git
commit 7b78f0cea0409f60c79a3833781e621344d52ffd Author: Geoffrey Claude <[email protected]> AuthorDate: Thu Jun 4 21:06:42 2026 +0200 Mark BufferExec and AnalyzeExec as eager (#22711) ## Which issue does this PR close? - Closes #22708. ## Rationale for this change `BufferExec` and `AnalyzeExec` both have eager evaluation behavior: they can drive child streams in spawned tasks instead of performing exactly one downstream poll worth of work at a time. Their `PlanProperties` currently report `EvaluationType::Lazy`, so physical-plan metadata does not describe how these operators execute. While making that metadata accurate, this PR also keeps `need_data_exchange(...)` scoped to its original purpose. #4585 introduced that helper as a way to identify physical operators that require exchange-style handling because they redistribute partitions or gather multiple input partitions into one output partition. #4586 implemented it for the native exchange/gather operators called out there: non-round-robin `RepartitionExec`, `CoalescePartitionsExec`, and `SortPreservingMergeExec`. The later cooperative-scheduling work in #16398 introduced `EvaluationType` and made `need_data_exchange(...)` use `evaluation_type == EvaluationType::Eager`. That shortcut worked when the eager operators and exchange/gather operators were the same set. With `BufferExec` and `AnalyzeExec` correctly classified as eager, the shortcut would make `need_data_exchange(...)` report operators that do eager child polling but do not perform a data exchange. So the intended split in this PR is: - `EvaluationType` describes execution behavior. - `need_data_exchange(...)` identifies partition redistribution or partition gathering. ## What changes are included in this PR? - Mark `BufferExec` as `EvaluationType::Eager` while keeping its existing cooperative scheduling metadata. - Mark `AnalyzeExec` as `EvaluationType::Eager` in its computed plan properties. - Clarify the `EvaluationType` docs so eager evaluation is not defined by whether work starts in `execute` or on the first stream poll. - Restore `need_data_exchange(...)` as an exchange/gather predicate for the native exchange operators instead of deriving it from all eager operators. ## Are these changes tested? Targeted tests pass: ```bash cargo test -p datafusion-physical-plan --lib buffer::tests cargo test -p datafusion-physical-plan --lib analyze::tests cargo test -p datafusion-physical-plan --doc execution_plan cargo test -p datafusion-physical-plan --lib execution_plan::tests::buffer_exec_does_not_need_data_exchange RUSTDOCFLAGS="-D warnings" cargo doc -p datafusion-physical-plan --no-deps ``` I also added a focused regression test that `BufferExec` does not require data exchange. I did not add tests that merely assert the assigned `EvaluationType`, as those would duplicate the implementation rather than cover behavior. ## Are there any user-facing changes? No query-result changes are expected. This updates physical-plan metadata and keeps `need_data_exchange(...)` scoped to operators that move data between partitions or gather partitions together. --- datafusion/physical-plan/src/analyze.rs | 2 + datafusion/physical-plan/src/buffer.rs | 5 +- datafusion/physical-plan/src/execution_plan.rs | 80 +++++++++++++++++++------- 3 files changed, 65 insertions(+), 22 deletions(-) diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index 580bf31231..27e0f5e923 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -25,6 +25,7 @@ use super::{ SendableRecordBatchStream, }; use crate::display::DisplayableExecutionPlan; +use crate::execution_plan::EvaluationType; use crate::metrics::{MetricCategory, MetricType}; use crate::{DisplayFormatType, ExecutionPlan, Partitioning}; @@ -172,6 +173,7 @@ impl AnalyzeExec { input.pipeline_behavior(), input.boundedness(), ) + .with_evaluation_type(EvaluationType::Eager) } } diff --git a/datafusion/physical-plan/src/buffer.rs b/datafusion/physical-plan/src/buffer.rs index 19a4ebba83..2985dc5766 100644 --- a/datafusion/physical-plan/src/buffer.rs +++ b/datafusion/physical-plan/src/buffer.rs @@ -18,7 +18,7 @@ //! [`BufferExec`] decouples production and consumption on messages by buffering the input in the //! background up to a certain capacity. -use crate::execution_plan::{CardinalityEffect, SchedulingType}; +use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType}; use crate::filter_pushdown::{ ChildPushdownResult, FilterDescription, FilterPushdownPhase, FilterPushdownPropagation, @@ -101,7 +101,8 @@ impl BufferExec { /// Builds a new [BufferExec] with the provided capacity in bytes. pub fn new(input: Arc<dyn ExecutionPlan>, capacity: usize) -> Self { let properties = PlanProperties::clone(input.properties()) - .with_scheduling_type(SchedulingType::Cooperative); + .with_scheduling_type(SchedulingType::Cooperative) + .with_evaluation_type(EvaluationType::Eager); Self { input, diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 50eac566d9..8577e86f00 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -45,6 +45,8 @@ use crate::coalesce_partitions::CoalescePartitionsExec; use crate::display::DisplayableExecutionPlan; use crate::metrics::MetricsSet; use crate::projection::ProjectionExec; +use crate::repartition::RepartitionExec; +use crate::sorts::sort_preserving_merge::SortPreservingMergeExec; use crate::stream::RecordBatchStreamAdapter; use arrow::array::{Array, RecordBatch}; @@ -962,25 +964,36 @@ pub enum SchedulingType { Cooperative, } -/// Represents how an operator's `Stream` implementation generates `RecordBatch`es. +/// Represents how an operator's stream drives [`RecordBatch`] production +/// relative to downstream demand. /// -/// Most operators in DataFusion generate `RecordBatch`es when asked to do so by a call to -/// `Stream::poll_next`. This is known as demand-driven or lazy evaluation. -/// -/// Some operators like `Repartition` need to drive `RecordBatch` generation themselves though. This -/// is known as data-driven or eager evaluation. +/// This is execution-topology metadata for optimizers. It distinguishes streams +/// whose batch production is driven directly by downstream calls to +/// `Stream::poll_next` from streams that may also drive input or output +/// production independently, such as by spawning tasks or buffering batches +/// ahead of demand. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum EvaluationType { - /// The stream generated by [`execute`](ExecutionPlan::execute) only generates `RecordBatch` - /// instances when it is demanded by invoking `Stream::poll_next`. - /// Filter, projection, and join are examples of such lazy operators. + /// The stream generated by [`execute`](ExecutionPlan::execute) is + /// demand-driven: it produces [`RecordBatch`]es in response to downstream + /// calls to `Stream::poll_next`. + /// + /// Filter, projection, and join operators are examples of lazy operators. /// /// Lazy operators are also known as demand-driven operators. Lazy, - /// The stream generated by [`execute`](ExecutionPlan::execute) eagerly generates `RecordBatch` - /// in one or more spawned Tokio tasks. Eager evaluation is only started the first time - /// `Stream::poll_next` is called. - /// Examples of eager operators are repartition, coalesce partitions, and sort preserving merge. + /// The stream generated by [`execute`](ExecutionPlan::execute) may drive + /// input or output [`RecordBatch`] production ahead of, or independently + /// from, downstream calls to `Stream::poll_next`. + /// + /// Eager operators commonly poll input streams from spawned Tokio tasks, + /// buffer batches ahead of demand, or otherwise create an independent + /// child-polling pipeline. Eager work may start when `execute` creates the + /// stream or when the returned stream is first polled; that timing is an + /// implementation detail. + /// + /// Repartition, coalesce partitions, sort-preserving merge, buffer, and + /// analyze operators are examples of eager operators. /// /// Eager operators are also known as a data-driven operators. Eager, @@ -1209,15 +1222,31 @@ pub fn check_default_invariants<P: ExecutionPlan + ?Sized>( Ok(()) } -/// Indicate whether a data exchange is needed for the input of `plan`, which will be very helpful -/// especially for the distributed engine to judge whether need to deal with shuffling. -/// Currently, there are 3 kinds of execution plan which needs data exchange -/// 1. RepartitionExec for changing the partition number between two `ExecutionPlan`s -/// 2. CoalescePartitionsExec for collapsing all of the partitions into one without ordering guarantee -/// 3. SortPreservingMergeExec for collapsing all of the sorted partitions into one with ordering guarantee +/// Indicate whether a data exchange is needed for the input of `plan`. +/// +/// This identifies physical operators that redistribute child partitions or +/// gather multiple child partitions into one output partition: +/// +/// 1. RepartitionExec for non-round-robin repartitioning +/// 2. CoalescePartitionsExec for collapsing multiple partitions into one without ordering guarantee +/// 3. SortPreservingMergeExec for collapsing multiple sorted partitions into one with ordering guarantee #[expect(clippy::needless_pass_by_value)] pub fn need_data_exchange(plan: Arc<dyn ExecutionPlan>) -> bool { - plan.properties().evaluation_type == EvaluationType::Eager + if let Some(repartition) = plan.downcast_ref::<RepartitionExec>() { + !matches!(repartition.partitioning(), Partitioning::RoundRobinBatch(_)) + } else if let Some(coalesce) = plan.downcast_ref::<CoalescePartitionsExec>() { + coalesce.input().output_partitioning().partition_count() > 1 + } else if let Some(sort_preserving_merge) = + plan.downcast_ref::<SortPreservingMergeExec>() + { + sort_preserving_merge + .input() + .output_partitioning() + .partition_count() + > 1 + } else { + false + } } /// Returns a copy of this plan if we change any child according to the pointer comparison. @@ -1556,6 +1585,8 @@ pub(crate) fn stub_properties() -> Arc<PlanProperties> { mod tests { use super::*; + use crate::buffer::BufferExec; + use crate::test::exec::MockExec; use crate::{DisplayAs, DisplayFormatType, ExecutionPlan}; use arrow::array::{DictionaryArray, Int32Array, NullArray, RunArray}; @@ -1768,6 +1799,15 @@ mod tests { let _ = plan.name(); } + #[test] + fn buffer_exec_does_not_need_data_exchange() { + let schema = Arc::new(Schema::empty()); + let input: Arc<dyn ExecutionPlan> = Arc::new(MockExec::new(vec![], schema)); + let buffer: Arc<dyn ExecutionPlan> = Arc::new(BufferExec::new(input, 1024)); + + assert!(!need_data_exchange(buffer)); + } + #[test] fn test_check_not_null_constraints_accept_non_null() -> Result<()> { check_not_null_constraints( --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
