This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 6cd03e2111 Minor: add testing case for add YieldStreamExec and polish docs (#16369) 6cd03e2111 is described below commit 6cd03e2111d0fad47f2f08d2161ea8df9b5a3ba4 Author: Qi Zhu <821684...@qq.com> AuthorDate: Fri Jun 13 18:31:20 2025 +0800 Minor: add testing case for add YieldStreamExec and polish docs (#16369) * Add more testing case for add YieldStreamExec * fmt * fix doc * Update datafusion/physical-optimizer/src/insert_yield_exec.rs Co-authored-by: Andrew Lamb <and...@nerdnetworks.org> * Update datafusion/physical-optimizer/src/insert_yield_exec.rs Co-authored-by: Andrew Lamb <and...@nerdnetworks.org> * Address comments --------- Co-authored-by: Andrew Lamb <and...@nerdnetworks.org> --- Cargo.lock | 1 + datafusion/physical-optimizer/Cargo.toml | 1 + .../physical-optimizer/src/insert_yield_exec.rs | 44 ++++++++++++++++++---- datafusion/physical-plan/src/lib.rs | 1 - datafusion/physical-plan/src/test.rs | 2 +- 5 files changed, 39 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7a84727605..c73211a992 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2465,6 +2465,7 @@ dependencies = [ "itertools 0.14.0", "log", "recursive", + "tokio", ] [[package]] diff --git a/datafusion/physical-optimizer/Cargo.toml b/datafusion/physical-optimizer/Cargo.toml index aaadb09bcc..2db453b45a 100644 --- a/datafusion/physical-optimizer/Cargo.toml +++ b/datafusion/physical-optimizer/Cargo.toml @@ -54,3 +54,4 @@ recursive = { workspace = true, optional = true } datafusion-expr = { workspace = true } datafusion-functions-nested = { workspace = true } insta = { workspace = true } +tokio = { workspace = true } diff --git a/datafusion/physical-optimizer/src/insert_yield_exec.rs b/datafusion/physical-optimizer/src/insert_yield_exec.rs index 30a01a67cc..8ce893866d 100644 --- a/datafusion/physical-optimizer/src/insert_yield_exec.rs +++ b/datafusion/physical-optimizer/src/insert_yield_exec.rs @@ -15,10 +15,12 @@ // specific language governing permissions and limitations // under the License. -//! The `InsertYieldExec` optimizer rule inspects the physical plan to look for -//! tight-looping operators and inserts explicit yielding mechanisms (whether -//! as a separate operator, or via a yielding variant) at leaf nodes to make -//! the plan cancellation friendly. +//! The [`InsertYieldExec`] optimizer rule inspects the physical plan to find all leaf +//! nodes corresponding to tight-looping operators. It first attempts to replace +//! each leaf with a cooperative-yielding variant via `with_cooperative_yields`, +//! and only if no built-in variant exists does it wrap the node in a +//! [`YieldStreamExec`] operator to enforce periodic yielding, ensuring the plan +//! remains cancellation-friendly. use std::fmt::{Debug, Formatter}; use std::sync::Arc; @@ -32,9 +34,10 @@ use datafusion_physical_plan::yield_stream::YieldStreamExec; use datafusion_physical_plan::ExecutionPlan; /// `InsertYieldExec` is a [`PhysicalOptimizerRule`] that finds every leaf node in -/// the plan, and replaces it with a variant that cooperatively yields -/// either using the its yielding variant given by `with_cooperative_yields`, -/// or, if none exists, by inserting a [`YieldStreamExec`] operator as a parent. +/// the plan and replaces it with a variant that yields cooperatively if supported. +/// If the node does not provide a built-in yielding variant via +/// [`ExecutionPlan::with_cooperative_yields`], it is wrapped in a [`YieldStreamExec`] parent to +/// enforce a configured yield frequency. pub struct InsertYieldExec {} impl InsertYieldExec { @@ -73,10 +76,11 @@ impl PhysicalOptimizerRule for InsertYieldExec { // Not a leaf, keep recursing down. return Ok(Transformed::no(plan)); } + // For leaf nodes, try to get a built-in cooperative-yielding variant. let new_plan = Arc::clone(&plan) .with_cooperative_yields() .unwrap_or_else(|| { - // Otherwise, insert a `YieldStreamExec` to enforce periodic yielding. + // Only if no built-in variant exists, insert a `YieldStreamExec`. Arc::new(YieldStreamExec::new(plan, yield_period)) }); Ok(Transformed::new(new_plan, true, TreeNodeRecursion::Jump)) @@ -92,3 +96,27 @@ impl PhysicalOptimizerRule for InsertYieldExec { true } } + +#[cfg(test)] +mod tests { + use super::*; + use datafusion_common::config::ConfigOptions; + use datafusion_physical_plan::{displayable, test::scan_partitioned}; + use insta::assert_snapshot; + + #[tokio::test] + async fn test_yield_stream_exec_for_custom_exec() { + let test_custom_exec = scan_partitioned(1); + let config = ConfigOptions::new(); + let optimized = InsertYieldExec::new() + .optimize(test_custom_exec, &config) + .unwrap(); + + let display = displayable(optimized.as_ref()).indent(true).to_string(); + // Use insta snapshot to ensure full plan structure + assert_snapshot!(display, @r###" + YieldStreamExec frequency=64 + DataSourceExec: partitions=1, partition_sizes=[1] + "###); + } +} diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 5d63ccdc13..9e703ced1f 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -92,6 +92,5 @@ pub mod udaf { } pub mod coalesce; -#[cfg(any(test, feature = "bench"))] pub mod test; pub mod yield_stream; diff --git a/datafusion/physical-plan/src/test.rs b/datafusion/physical-plan/src/test.rs index 140a5f35a7..5e6410a017 100644 --- a/datafusion/physical-plan/src/test.rs +++ b/datafusion/physical-plan/src/test.rs @@ -300,7 +300,7 @@ impl TestMemoryExec { } /// refer to `try_with_sort_information` at MemorySourceConfig for more information. - /// https://github.com/apache/datafusion/tree/main/datafusion/datasource/src/memory.rs + /// <https://github.com/apache/datafusion/tree/main/datafusion/datasource/src/memory.rs> pub fn try_with_sort_information( mut self, mut sort_information: Vec<LexOrdering>, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org