This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 2e3707e380 fix: projection for `CooperativeExec` and 
`CoalesceBatchesExec` (#19400)
2e3707e380 is described below

commit 2e3707e380172a4ba1ae5efabe7bd27a354bfb2d
Author: Huaijin <[email protected]>
AuthorDate: Fri Dec 19 22:53:07 2025 +0800

    fix: projection for `CooperativeExec` and `CoalesceBatchesExec` (#19400)
    
    ## Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax. For example
    `Closes #123` indicates that this PR will close issue #123.
    -->
    
    close #19398
    
    ## Rationale for this change
    
    see issue #19398
    
    ## What changes are included in this PR?
    
    impl `try_swapping_with_projection` for `CooperativeExec` and
    `CoalesceBatchesExec`
    
    ## Are these changes tested?
    
    add test case
    
    ## Are there any user-facing changes?
---
 .../physical_optimizer/projection_pushdown.rs      | 101 +++++++++++++++++++++
 datafusion/physical-plan/src/coalesce_batches.rs   |  13 +++
 datafusion/physical-plan/src/coop.rs               |  15 ++-
 3 files changed, 128 insertions(+), 1 deletion(-)

diff --git a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs 
b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
index 2e3ac97350..480f5c8cc9 100644
--- a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
+++ b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
@@ -45,7 +45,9 @@ use datafusion_physical_expr_common::sort_expr::{
 use datafusion_physical_optimizer::PhysicalOptimizerRule;
 use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
 use datafusion_physical_optimizer::projection_pushdown::ProjectionPushdown;
+use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
 use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use datafusion_physical_plan::coop::CooperativeExec;
 use datafusion_physical_plan::filter::FilterExec;
 use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter};
 use datafusion_physical_plan::joins::{
@@ -1677,3 +1679,102 @@ fn test_partition_col_projection_pushdown_expr() -> 
Result<()> {
 
     Ok(())
 }
+
+#[test]
+fn test_coalesce_batches_after_projection() -> Result<()> {
+    let csv = create_simple_csv_exec();
+    let filter = Arc::new(FilterExec::try_new(
+        Arc::new(BinaryExpr::new(
+            Arc::new(Column::new("c", 2)),
+            Operator::Gt,
+            Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
+        )),
+        csv,
+    )?);
+    let coalesce_batches: Arc<dyn ExecutionPlan> =
+        Arc::new(CoalesceBatchesExec::new(filter, 8192));
+    let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
+        vec![
+            ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a"),
+            ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b"),
+        ],
+        coalesce_batches,
+    )?);
+
+    let initial = displayable(projection.as_ref()).indent(true).to_string();
+    let actual = initial.trim();
+
+    assert_snapshot!(
+        actual,
+        @r"
+    ProjectionExec: expr=[a@0 as a, b@1 as b]
+      CoalesceBatchesExec: target_batch_size=8192
+        FilterExec: c@2 > 0
+          DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=csv, has_header=false
+    "
+    );
+
+    let after_optimize =
+        ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+
+    let after_optimize_string = displayable(after_optimize.as_ref())
+        .indent(true)
+        .to_string();
+    let actual = after_optimize_string.trim();
+
+    // Projection should be pushed down through CoalesceBatchesExec
+    assert_snapshot!(
+        actual,
+        @r"
+    CoalesceBatchesExec: target_batch_size=8192
+      FilterExec: c@2 > 0, projection=[a@0, b@1]
+        DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=csv, has_header=false
+    "
+    );
+
+    Ok(())
+}
+
+#[test]
+fn test_cooperative_exec_after_projection() -> Result<()> {
+    let csv = create_simple_csv_exec();
+    let cooperative: Arc<dyn ExecutionPlan> = 
Arc::new(CooperativeExec::new(csv));
+    let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
+        vec![
+            ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a"),
+            ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b"),
+        ],
+        cooperative,
+    )?);
+
+    let initial = displayable(projection.as_ref()).indent(true).to_string();
+    let actual = initial.trim();
+
+    assert_snapshot!(
+        actual,
+        @r"
+    ProjectionExec: expr=[a@0 as a, b@1 as b]
+      CooperativeExec
+        DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=csv, has_header=false
+    "
+    );
+
+    let after_optimize =
+        ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+
+    let after_optimize_string = displayable(after_optimize.as_ref())
+        .indent(true)
+        .to_string();
+    let actual = after_optimize_string.trim();
+
+    // Projection should be pushed down through CooperativeExec
+    assert_snapshot!(
+        actual,
+        @r"
+    CooperativeExec
+      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b], 
file_type=csv, has_header=false
+    "
+    );
+
+    Ok(())
+}
diff --git a/datafusion/physical-plan/src/coalesce_batches.rs 
b/datafusion/physical-plan/src/coalesce_batches.rs
index 494b5d60fb..13bb862ab9 100644
--- a/datafusion/physical-plan/src/coalesce_batches.rs
+++ b/datafusion/physical-plan/src/coalesce_batches.rs
@@ -24,6 +24,7 @@ use std::task::{Context, Poll};
 
 use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
 use super::{DisplayAs, ExecutionPlanProperties, PlanProperties, Statistics};
+use crate::projection::ProjectionExec;
 use crate::{
     DisplayFormatType, ExecutionPlan, RecordBatchStream, 
SendableRecordBatchStream,
 };
@@ -226,6 +227,18 @@ impl ExecutionPlan for CoalesceBatchesExec {
         CardinalityEffect::Equal
     }
 
+    fn try_swapping_with_projection(
+        &self,
+        projection: &ProjectionExec,
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        match self.input.try_swapping_with_projection(projection)? {
+            Some(new_input) => Ok(Some(
+                Arc::new(self.clone()).with_new_children(vec![new_input])?,
+            )),
+            None => Ok(None),
+        }
+    }
+
     fn gather_filters_for_pushdown(
         &self,
         _phase: FilterPushdownPhase,
diff --git a/datafusion/physical-plan/src/coop.rs 
b/datafusion/physical-plan/src/coop.rs
index 87d0ee8a3a..d929c78850 100644
--- a/datafusion/physical-plan/src/coop.rs
+++ b/datafusion/physical-plan/src/coop.rs
@@ -79,6 +79,7 @@ use crate::filter_pushdown::{
     ChildPushdownResult, FilterDescription, FilterPushdownPhase,
     FilterPushdownPropagation,
 };
+use crate::projection::ProjectionExec;
 use crate::{
     DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, 
RecordBatchStream,
     SendableRecordBatchStream,
@@ -207,7 +208,7 @@ where
 /// An execution plan decorator that enables cooperative multitasking.
 /// It wraps the streams produced by its input execution plan using the 
[`make_cooperative`] function,
 /// which makes the stream participate in Tokio cooperative scheduling.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct CooperativeExec {
     input: Arc<dyn ExecutionPlan>,
     properties: PlanProperties,
@@ -298,6 +299,18 @@ impl ExecutionPlan for CooperativeExec {
         Equal
     }
 
+    fn try_swapping_with_projection(
+        &self,
+        projection: &ProjectionExec,
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        match self.input.try_swapping_with_projection(projection)? {
+            Some(new_input) => Ok(Some(
+                Arc::new(self.clone()).with_new_children(vec![new_input])?,
+            )),
+            None => Ok(None),
+        }
+    }
+
     fn gather_filters_for_pushdown(
         &self,
         _phase: FilterPushdownPhase,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to