This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 6cd03e2111 Minor: add testing case for add YieldStreamExec and polish 
docs (#16369)
6cd03e2111 is described below

commit 6cd03e2111d0fad47f2f08d2161ea8df9b5a3ba4
Author: Qi Zhu <821684...@qq.com>
AuthorDate: Fri Jun 13 18:31:20 2025 +0800

    Minor: add testing case for add YieldStreamExec and polish docs (#16369)
    
    * Add more testing case for add YieldStreamExec
    
    * fmt
    
    * fix doc
    
    * Update datafusion/physical-optimizer/src/insert_yield_exec.rs
    
    Co-authored-by: Andrew Lamb <and...@nerdnetworks.org>
    
    * Update datafusion/physical-optimizer/src/insert_yield_exec.rs
    
    Co-authored-by: Andrew Lamb <and...@nerdnetworks.org>
    
    * Address comments
    
    ---------
    
    Co-authored-by: Andrew Lamb <and...@nerdnetworks.org>
---
 Cargo.lock                                         |  1 +
 datafusion/physical-optimizer/Cargo.toml           |  1 +
 .../physical-optimizer/src/insert_yield_exec.rs    | 44 ++++++++++++++++++----
 datafusion/physical-plan/src/lib.rs                |  1 -
 datafusion/physical-plan/src/test.rs               |  2 +-
 5 files changed, 39 insertions(+), 10 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 7a84727605..c73211a992 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2465,6 +2465,7 @@ dependencies = [
  "itertools 0.14.0",
  "log",
  "recursive",
+ "tokio",
 ]
 
 [[package]]
diff --git a/datafusion/physical-optimizer/Cargo.toml 
b/datafusion/physical-optimizer/Cargo.toml
index aaadb09bcc..2db453b45a 100644
--- a/datafusion/physical-optimizer/Cargo.toml
+++ b/datafusion/physical-optimizer/Cargo.toml
@@ -54,3 +54,4 @@ recursive = { workspace = true, optional = true }
 datafusion-expr = { workspace = true }
 datafusion-functions-nested = { workspace = true }
 insta = { workspace = true }
+tokio = { workspace = true }
diff --git a/datafusion/physical-optimizer/src/insert_yield_exec.rs 
b/datafusion/physical-optimizer/src/insert_yield_exec.rs
index 30a01a67cc..8ce893866d 100644
--- a/datafusion/physical-optimizer/src/insert_yield_exec.rs
+++ b/datafusion/physical-optimizer/src/insert_yield_exec.rs
@@ -15,10 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! The `InsertYieldExec` optimizer rule inspects the physical plan to look for
-//! tight-looping operators and inserts explicit yielding mechanisms (whether
-//! as a separate operator, or via a yielding variant) at leaf nodes to make
-//! the plan cancellation friendly.
+//! The [`InsertYieldExec`] optimizer rule inspects the physical plan to find 
all leaf
+//! nodes corresponding to tight-looping operators. It first attempts to 
replace
+//! each leaf with a cooperative-yielding variant via 
`with_cooperative_yields`,
+//! and only if no built-in variant exists does it wrap the node in a
+//! [`YieldStreamExec`] operator to enforce periodic yielding, ensuring the 
plan
+//! remains cancellation-friendly.
 
 use std::fmt::{Debug, Formatter};
 use std::sync::Arc;
@@ -32,9 +34,10 @@ use datafusion_physical_plan::yield_stream::YieldStreamExec;
 use datafusion_physical_plan::ExecutionPlan;
 
 /// `InsertYieldExec` is a [`PhysicalOptimizerRule`] that finds every leaf 
node in
-/// the plan, and replaces it with a variant that cooperatively yields
-/// either using the its yielding variant given by `with_cooperative_yields`,
-/// or, if none exists, by inserting a [`YieldStreamExec`] operator as a 
parent.
+/// the plan and replaces it with a variant that yields cooperatively if 
supported.
+/// If the node does not provide a built-in yielding variant via
+/// [`ExecutionPlan::with_cooperative_yields`], it is wrapped in a 
[`YieldStreamExec`] parent to
+/// enforce a configured yield frequency.
 pub struct InsertYieldExec {}
 
 impl InsertYieldExec {
@@ -73,10 +76,11 @@ impl PhysicalOptimizerRule for InsertYieldExec {
                     // Not a leaf, keep recursing down.
                     return Ok(Transformed::no(plan));
                 }
+                // For leaf nodes, try to get a built-in cooperative-yielding 
variant.
                 let new_plan = Arc::clone(&plan)
                     .with_cooperative_yields()
                     .unwrap_or_else(|| {
-                        // Otherwise, insert a `YieldStreamExec` to enforce 
periodic yielding.
+                        // Only if no built-in variant exists, insert a 
`YieldStreamExec`.
                         Arc::new(YieldStreamExec::new(plan, yield_period))
                     });
                 Ok(Transformed::new(new_plan, true, TreeNodeRecursion::Jump))
@@ -92,3 +96,27 @@ impl PhysicalOptimizerRule for InsertYieldExec {
         true
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use datafusion_common::config::ConfigOptions;
+    use datafusion_physical_plan::{displayable, test::scan_partitioned};
+    use insta::assert_snapshot;
+
+    #[tokio::test]
+    async fn test_yield_stream_exec_for_custom_exec() {
+        let test_custom_exec = scan_partitioned(1);
+        let config = ConfigOptions::new();
+        let optimized = InsertYieldExec::new()
+            .optimize(test_custom_exec, &config)
+            .unwrap();
+
+        let display = displayable(optimized.as_ref()).indent(true).to_string();
+        // Use insta snapshot to ensure full plan structure
+        assert_snapshot!(display, @r###"
+            YieldStreamExec frequency=64
+              DataSourceExec: partitions=1, partition_sizes=[1]
+            "###);
+    }
+}
diff --git a/datafusion/physical-plan/src/lib.rs 
b/datafusion/physical-plan/src/lib.rs
index 5d63ccdc13..9e703ced1f 100644
--- a/datafusion/physical-plan/src/lib.rs
+++ b/datafusion/physical-plan/src/lib.rs
@@ -92,6 +92,5 @@ pub mod udaf {
 }
 
 pub mod coalesce;
-#[cfg(any(test, feature = "bench"))]
 pub mod test;
 pub mod yield_stream;
diff --git a/datafusion/physical-plan/src/test.rs 
b/datafusion/physical-plan/src/test.rs
index 140a5f35a7..5e6410a017 100644
--- a/datafusion/physical-plan/src/test.rs
+++ b/datafusion/physical-plan/src/test.rs
@@ -300,7 +300,7 @@ impl TestMemoryExec {
     }
 
     /// refer to `try_with_sort_information` at MemorySourceConfig for more 
information.
-    /// 
https://github.com/apache/datafusion/tree/main/datafusion/datasource/src/memory.rs
+    /// 
<https://github.com/apache/datafusion/tree/main/datafusion/datasource/src/memory.rs>
     pub fn try_with_sort_information(
         mut self,
         mut sort_information: Vec<LexOrdering>,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to