This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 450c861a8c Refactor SortPushdown using the standard top-down visitor 
and using `EquivalenceProperties` (#14821)
450c861a8c is described below

commit 450c861a8c38f8934b134368949606aaf3287792
Author: wiedld <[email protected]>
AuthorDate: Fri Mar 7 14:01:09 2025 -0800

    Refactor SortPushdown using the standard top-down visitor and using 
`EquivalenceProperties` (#14821)
    
    * refactor: have sort pushdown use transform_down, and provide minor 
refactor in sort_pushdown_helper to make it more understandable
    
    * test: inconsequential single change in test
    
    * Use consistent variable naming
    
    * chore: update variable naming
    
    * refactor: only sync the plan children when required
    
    * fix: have orderings include constants which are heterogenius across 
partitions
    
    * Revert "fix: have orderings include constants which are heterogenius 
across partitions"
    
    This reverts commit 47753541bdd1322f610b9ae72e922037d2999a12.
    
    * test: temporary commit to demonstrate changes that only occur with no 
partition by (in window agg), and when aggregating on an unordered column
    
    * Revert "test: temporary commit to demonstrate changes that only occur 
with no partition by (in window agg), and when aggregating on an unordered 
column"
    
    This reverts commit 2ee747f87b1abc2c94f13f793a5b2879e6c37197.
    
    * chore: cleanup after merging main, for anticipated test change
    
    * chore: rename variable
    
    * refactor: added test cases for orthogonal sorting, and remove 1 unneeded 
conditional
    
    * chore: remove unneeded conditional and make a comment
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 .../physical_optimizer/enforce_distribution.rs     |   2 +-
 .../tests/physical_optimizer/enforce_sorting.rs    | 102 ++++++++++++-
 .../core/tests/physical_optimizer/test_utils.rs    |  10 +-
 .../src/enforce_sorting/sort_pushdown.rs           | 162 ++++++++++++---------
 datafusion/physical-optimizer/src/utils.rs         |   4 +
 5 files changed, 199 insertions(+), 81 deletions(-)

diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
index f22a896c18..080a10c7b0 100644
--- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
@@ -2388,7 +2388,7 @@ fn repartition_transitively_past_sort_with_projection() 
-> Result<()> {
     );
 
     let expected = &[
-        "SortExec: expr=[c@2 ASC], preserve_partitioning=[true]",
+        "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
         // Since this projection is trivial, increasing parallelism is not 
beneficial
         "  ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
         "    DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs 
b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
index 26a00ef0f2..bb77192e05 100644
--- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
@@ -21,10 +21,10 @@ use crate::physical_optimizer::test_utils::{
     aggregate_exec, bounded_window_exec, check_integrity, 
coalesce_batches_exec,
     coalesce_partitions_exec, create_test_schema, create_test_schema2,
     create_test_schema3, filter_exec, global_limit_exec, hash_join_exec, 
limit_exec,
-    local_limit_exec, memory_exec, parquet_exec, repartition_exec, sort_exec, 
sort_expr,
-    sort_expr_options, sort_merge_join_exec, sort_preserving_merge_exec,
-    sort_preserving_merge_exec_with_fetch, spr_repartition_exec, 
stream_exec_ordered,
-    union_exec, RequirementsTestExec,
+    local_limit_exec, memory_exec, parquet_exec, repartition_exec, sort_exec,
+    sort_exec_with_fetch, sort_expr, sort_expr_options, sort_merge_join_exec,
+    sort_preserving_merge_exec, sort_preserving_merge_exec_with_fetch,
+    spr_repartition_exec, stream_exec_ordered, union_exec, 
RequirementsTestExec,
 };
 
 use arrow::compute::SortOptions;
@@ -2242,7 +2242,7 @@ async fn 
test_window_partial_constant_and_set_monotonicity() -> Result<()> {
                 "    DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 
ASC NULLS LAST], file_type=parquet",
             ],
             expected_plan: vec![
-                "SortExec: expr=[non_nullable_col@1 ASC NULLS LAST, count@2 
ASC NULLS LAST], preserve_partitioning=[false]",
+                "SortExec: expr=[non_nullable_col@1 ASC NULLS LAST], 
preserve_partitioning=[false]",
                 "  WindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Rows, start_bound: 
Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false 
}]",
                 "    DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 
ASC NULLS LAST], file_type=parquet",
             ],
@@ -2259,7 +2259,7 @@ async fn 
test_window_partial_constant_and_set_monotonicity() -> Result<()> {
                 "    DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 
ASC NULLS LAST], file_type=parquet",
             ],
             expected_plan: vec![
-                "SortExec: expr=[non_nullable_col@1 DESC NULLS LAST, max@2 
DESC NULLS LAST], preserve_partitioning=[false]",
+                "SortExec: expr=[non_nullable_col@1 DESC NULLS LAST], 
preserve_partitioning=[false]",
                 "  WindowAggExec: wdw=[max: Ok(Field { name: \"max\", 
data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), 
end_bound: Following(UInt64(NULL)), is_causal: false }]",
                 "    DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 
ASC NULLS LAST], file_type=parquet",
             ],
@@ -2276,7 +2276,7 @@ async fn 
test_window_partial_constant_and_set_monotonicity() -> Result<()> {
                 "    DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 
ASC NULLS LAST], file_type=parquet",
             ],
             expected_plan: vec![
-                "SortExec: expr=[min@2 ASC NULLS LAST, non_nullable_col@1 ASC 
NULLS LAST], preserve_partitioning=[false]",
+                "SortExec: expr=[non_nullable_col@1 ASC NULLS LAST], 
preserve_partitioning=[false]",
                 "  WindowAggExec: wdw=[min: Ok(Field { name: \"min\", 
data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), 
end_bound: Following(UInt64(NULL)), is_causal: false }]",
                 "    DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 
ASC NULLS LAST], file_type=parquet",
             ],
@@ -2293,7 +2293,7 @@ async fn 
test_window_partial_constant_and_set_monotonicity() -> Result<()> {
                 "    DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 
ASC NULLS LAST], file_type=parquet",
             ],
             expected_plan: vec![
-                "SortExec: expr=[avg@2 DESC NULLS LAST, nullable_col@0 DESC 
NULLS LAST], preserve_partitioning=[false]",
+                "SortExec: expr=[nullable_col@0 DESC NULLS LAST], 
preserve_partitioning=[false]",
                 "  WindowAggExec: wdw=[avg: Ok(Field { name: \"avg\", 
data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Rows, start_bound: 
Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false 
}]",
                 "    DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 
ASC NULLS LAST], file_type=parquet",
             ],
@@ -3346,3 +3346,89 @@ async fn 
test_window_partial_constant_and_set_monotonicity() -> Result<()> {
 
     Ok(())
 }
+
+#[test]
+fn test_removes_unused_orthogonal_sort() -> Result<()> {
+    let schema = create_test_schema3()?;
+    let input_sort_exprs = vec![sort_expr("b", &schema), sort_expr("c", 
&schema)];
+    let unbounded_input = stream_exec_ordered(&schema, 
input_sort_exprs.clone());
+
+    let orthogonal_sort = sort_exec(vec![sort_expr("a", &schema)], 
unbounded_input);
+    let output_sort = sort_exec(input_sort_exprs, orthogonal_sort); // same 
sort as data source
+
+    // Test scenario/input has an orthogonal sort:
+    let expected_input = [
+        "SortExec: expr=[b@1 ASC, c@2 ASC], preserve_partitioning=[false]",
+        "  SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
+        "    StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, 
e], infinite_source=true, output_ordering=[b@1 ASC, c@2 ASC]"
+    ];
+    assert_eq!(get_plan_string(&output_sort), expected_input,);
+
+    // Test: should remove orthogonal sort, and the uppermost (unneeded) sort:
+    let expected_optimized = [
+        "StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], 
infinite_source=true, output_ordering=[b@1 ASC, c@2 ASC]"
+    ];
+    assert_optimized!(expected_input, expected_optimized, output_sort, true);
+
+    Ok(())
+}
+
+#[test]
+fn test_keeps_used_orthogonal_sort() -> Result<()> {
+    let schema = create_test_schema3()?;
+    let input_sort_exprs = vec![sort_expr("b", &schema), sort_expr("c", 
&schema)];
+    let unbounded_input = stream_exec_ordered(&schema, 
input_sort_exprs.clone());
+
+    let orthogonal_sort =
+        sort_exec_with_fetch(vec![sort_expr("a", &schema)], Some(3), 
unbounded_input); // has fetch, so this orthogonal sort changes the output
+    let output_sort = sort_exec(input_sort_exprs, orthogonal_sort);
+
+    // Test scenario/input has an orthogonal sort:
+    let expected_input = [
+        "SortExec: expr=[b@1 ASC, c@2 ASC], preserve_partitioning=[false]",
+        "  SortExec: TopK(fetch=3), expr=[a@0 ASC], 
preserve_partitioning=[false]",
+        "    StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, 
e], infinite_source=true, output_ordering=[b@1 ASC, c@2 ASC]"
+    ];
+    assert_eq!(get_plan_string(&output_sort), expected_input,);
+
+    // Test: should keep the orthogonal sort, since it modifies the output:
+    let expected_optimized = expected_input;
+    assert_optimized!(expected_input, expected_optimized, output_sort, true);
+
+    Ok(())
+}
+
+#[test]
+fn test_handles_multiple_orthogonal_sorts() -> Result<()> {
+    let schema = create_test_schema3()?;
+    let input_sort_exprs = vec![sort_expr("b", &schema), sort_expr("c", 
&schema)];
+    let unbounded_input = stream_exec_ordered(&schema, 
input_sort_exprs.clone());
+
+    let orthogonal_sort_0 = sort_exec(vec![sort_expr("c", &schema)], 
unbounded_input); // has no fetch, so can be removed
+    let orthogonal_sort_1 =
+        sort_exec_with_fetch(vec![sort_expr("a", &schema)], Some(3), 
orthogonal_sort_0); // has fetch, so this orthogonal sort changes the output
+    let orthogonal_sort_2 = sort_exec(vec![sort_expr("c", &schema)], 
orthogonal_sort_1); // has no fetch, so can be removed
+    let orthogonal_sort_3 = sort_exec(vec![sort_expr("a", &schema)], 
orthogonal_sort_2); // has no fetch, so can be removed
+    let output_sort = sort_exec(input_sort_exprs, orthogonal_sort_3); // final 
sort
+
+    // Test scenario/input has an orthogonal sort:
+    let expected_input = [
+        "SortExec: expr=[b@1 ASC, c@2 ASC], preserve_partitioning=[false]",
+        "  SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
+        "    SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
+        "      SortExec: TopK(fetch=3), expr=[a@0 ASC], 
preserve_partitioning=[false]",
+        "        SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
+        "          StreamingTableExec: partition_sizes=1, projection=[a, b, c, 
d, e], infinite_source=true, output_ordering=[b@1 ASC, c@2 ASC]",
+    ];
+    assert_eq!(get_plan_string(&output_sort), expected_input,);
+
+    // Test: should keep only the needed orthogonal sort, and remove the 
unneeded ones:
+    let expected_optimized = [
+        "SortExec: expr=[b@1 ASC, c@2 ASC], preserve_partitioning=[false]",
+        "  SortExec: TopK(fetch=3), expr=[a@0 ASC], 
preserve_partitioning=[false]",
+        "    StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, 
e], infinite_source=true, output_ordering=[b@1 ASC, c@2 ASC]",
+    ];
+    assert_optimized!(expected_input, expected_optimized, output_sort, true);
+
+    Ok(())
+}
diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs 
b/datafusion/core/tests/physical_optimizer/test_utils.rs
index 1b8e754ee3..99a75e6e50 100644
--- a/datafusion/core/tests/physical_optimizer/test_utils.rs
+++ b/datafusion/core/tests/physical_optimizer/test_utils.rs
@@ -295,9 +295,17 @@ pub fn coalesce_batches_exec(input: Arc<dyn 
ExecutionPlan>) -> Arc<dyn Execution
 pub fn sort_exec(
     sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
     input: Arc<dyn ExecutionPlan>,
+) -> Arc<dyn ExecutionPlan> {
+    sort_exec_with_fetch(sort_exprs, None, input)
+}
+
+pub fn sort_exec_with_fetch(
+    sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
+    fetch: Option<usize>,
+    input: Arc<dyn ExecutionPlan>,
 ) -> Arc<dyn ExecutionPlan> {
     let sort_exprs = sort_exprs.into_iter().collect();
-    Arc::new(SortExec::new(sort_exprs, input))
+    Arc::new(SortExec::new(sort_exprs, input).with_fetch(fetch))
 }
 
 /// A test [`ExecutionPlan`] whose requirements can be configured.
diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs 
b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs
index 17acb62729..2e20608d0e 100644
--- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs
+++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs
@@ -23,9 +23,7 @@ use crate::utils::{
 };
 
 use arrow::datatypes::SchemaRef;
-use datafusion_common::tree_node::{
-    ConcreteTreeNode, Transformed, TreeNode, TreeNodeRecursion,
-};
+use datafusion_common::tree_node::{Transformed, TreeNode};
 use datafusion_common::{plan_err, HashSet, JoinSide, Result};
 use datafusion_expr::JoinType;
 use datafusion_physical_expr::expressions::Column;
@@ -59,9 +57,9 @@ pub struct ParentRequirements {
 pub type SortPushDown = PlanContext<ParentRequirements>;
 
 /// Assigns the ordering requirement of the root node to the its children.
-pub fn assign_initial_requirements(node: &mut SortPushDown) {
-    let reqs = node.plan.required_input_ordering();
-    for (child, requirement) in node.children.iter_mut().zip(reqs) {
+pub fn assign_initial_requirements(sort_push_down: &mut SortPushDown) {
+    let reqs = sort_push_down.plan.required_input_ordering();
+    for (child, requirement) in sort_push_down.children.iter_mut().zip(reqs) {
         child.data = ParentRequirements {
             ordering_requirement: requirement,
             // If the parent has a fetch value, assign it to the children
@@ -71,24 +69,26 @@ pub fn assign_initial_requirements(node: &mut SortPushDown) 
{
     }
 }
 
-pub fn pushdown_sorts(sort_pushdown: SortPushDown) -> Result<SortPushDown> {
-    let mut new_node = pushdown_sorts_helper(sort_pushdown)?;
-    while new_node.tnr == TreeNodeRecursion::Stop {
-        new_node = pushdown_sorts_helper(new_node.data)?;
+pub fn pushdown_sorts(sort_push_down: SortPushDown) -> Result<SortPushDown> {
+    sort_push_down
+        .transform_down(pushdown_sorts_helper)
+        .map(|transformed| transformed.data)
+}
+
+fn min_fetch(f1: Option<usize>, f2: Option<usize>) -> Option<usize> {
+    match (f1, f2) {
+        (Some(f1), Some(f2)) => Some(f1.min(f2)),
+        (Some(_), _) => f1,
+        (_, Some(_)) => f2,
+        _ => None,
     }
-    let (new_node, children) = new_node.data.take_children();
-    let new_children = children
-        .into_iter()
-        .map(pushdown_sorts)
-        .collect::<Result<_>>()?;
-    new_node.with_new_children(new_children)
 }
 
 fn pushdown_sorts_helper(
-    mut requirements: SortPushDown,
+    mut sort_push_down: SortPushDown,
 ) -> Result<Transformed<SortPushDown>> {
-    let plan = &requirements.plan;
-    let parent_reqs = requirements
+    let plan = &sort_push_down.plan;
+    let parent_reqs = sort_push_down
         .data
         .ordering_requirement
         .clone()
@@ -98,82 +98,102 @@ fn pushdown_sorts_helper(
         .ordering_satisfy_requirement(&parent_reqs);
 
     if is_sort(plan) {
-        let sort_fetch = plan.fetch();
-        let required_ordering = plan
+        let current_sort_fetch = plan.fetch();
+        let parent_req_fetch = sort_push_down.data.fetch;
+
+        let current_plan_reqs = plan
             .output_ordering()
             .cloned()
             .map(LexRequirement::from)
             .unwrap_or_default();
-        if !satisfy_parent {
-            // Make sure this `SortExec` satisfies parent requirements:
-            let sort_reqs = 
requirements.data.ordering_requirement.unwrap_or_default();
-            // It's possible current plan (`SortExec`) has a fetch value.
-            // And if both of them have fetch values, we should use the 
minimum one.
-            if let Some(fetch) = sort_fetch {
-                if let Some(requirement_fetch) = requirements.data.fetch {
-                    requirements.data.fetch = 
Some(fetch.min(requirement_fetch));
-                }
-            }
-            let fetch = requirements.data.fetch.or(sort_fetch);
-            requirements = requirements.children.swap_remove(0);
-            requirements = add_sort_above(requirements, sort_reqs, fetch);
-        };
+        let parent_is_stricter = plan
+            .equivalence_properties()
+            .requirements_compatible(&parent_reqs, &current_plan_reqs);
+        let current_is_stricter = plan
+            .equivalence_properties()
+            .requirements_compatible(&current_plan_reqs, &parent_reqs);
+
+        if !satisfy_parent && !parent_is_stricter {
+            // This new sort has different requirements than the ordering 
being pushed down.
+            // 1. add a `SortExec` here for the pushed down ordering (parent 
reqs).
+            // 2. continue sort pushdown, but with the new ordering of the new 
sort.
+
+            // remove current sort (which will be the new ordering to pushdown)
+            let new_reqs = current_plan_reqs;
+            sort_push_down = sort_push_down.children.swap_remove(0);
+            sort_push_down = sort_push_down.update_plan_from_children()?; // 
changed plan
+
+            // add back sort exec matching parent
+            sort_push_down =
+                add_sort_above(sort_push_down, parent_reqs, parent_req_fetch);
+
+            // make pushdown requirements be the new ones.
+            sort_push_down.children[0].data = ParentRequirements {
+                ordering_requirement: Some(new_reqs),
+                fetch: current_sort_fetch,
+            };
+        } else {
+            // Don't add a SortExec
+            // Do update what sort requirements to keep pushing down
 
-        // We can safely get the 0th index as we are dealing with a `SortExec`.
-        let mut child = requirements.children.swap_remove(0);
-        if let Some(adjusted) =
-            pushdown_requirement_to_children(&child.plan, &required_ordering)?
-        {
-            let fetch = sort_fetch.or_else(|| child.plan.fetch());
-            for (grand_child, order) in 
child.children.iter_mut().zip(adjusted) {
-                grand_child.data = ParentRequirements {
-                    ordering_requirement: order,
-                    fetch,
-                };
+            // remove current sort, and get the sort's child
+            sort_push_down = sort_push_down.children.swap_remove(0);
+            sort_push_down = sort_push_down.update_plan_from_children()?; // 
changed plan
+
+            // set the stricter fetch
+            sort_push_down.data.fetch = min_fetch(current_sort_fetch, 
parent_req_fetch);
+
+            // set the stricter ordering
+            if current_is_stricter {
+                sort_push_down.data.ordering_requirement = 
Some(current_plan_reqs);
+            } else {
+                sort_push_down.data.ordering_requirement = Some(parent_reqs);
             }
-            // Can push down requirements
-            child.data = ParentRequirements {
-                ordering_requirement: Some(required_ordering),
-                fetch,
-            };
 
-            return Ok(Transformed {
-                data: child,
-                transformed: true,
-                tnr: TreeNodeRecursion::Stop,
-            });
-        } else {
-            // Can not push down requirements
-            requirements.children = vec![child];
-            assign_initial_requirements(&mut requirements);
+            // recursive call to helper, so it doesn't transform_down and miss 
the new node (previous child of sort)
+            return pushdown_sorts_helper(sort_push_down);
         }
+    } else if parent_reqs.is_empty() {
+        // note: this `satisfy_parent`, but we don't want to push down 
anything.
+        // Nothing to do.
+        return Ok(Transformed::no(sort_push_down));
     } else if satisfy_parent {
-        // For non-sort operators, immediately return if parent requirements 
are met:
+        // For non-sort operators which satisfy ordering:
         let reqs = plan.required_input_ordering();
-        for (child, order) in requirements.children.iter_mut().zip(reqs) {
+        let parent_req_fetch = sort_push_down.data.fetch;
+
+        for (child, order) in sort_push_down.children.iter_mut().zip(reqs) {
             child.data.ordering_requirement = order;
+            child.data.fetch = min_fetch(parent_req_fetch, child.data.fetch);
         }
     } else if let Some(adjusted) = pushdown_requirement_to_children(plan, 
&parent_reqs)? {
-        // Can not satisfy the parent requirements, check whether we can push
-        // requirements down:
-        for (child, order) in requirements.children.iter_mut().zip(adjusted) {
+        // For operators that can take a sort pushdown.
+
+        // Continue pushdown, with updated requirements:
+        let parent_fetch = sort_push_down.data.fetch;
+        let current_fetch = plan.fetch();
+        for (child, order) in sort_push_down.children.iter_mut().zip(adjusted) 
{
             child.data.ordering_requirement = order;
+            child.data.fetch = min_fetch(current_fetch, parent_fetch);
         }
-        requirements.data.ordering_requirement = None;
+        sort_push_down.data.ordering_requirement = None;
     } else {
         // Can not push down requirements, add new `SortExec`:
-        let sort_reqs = requirements
+        let sort_reqs = sort_push_down
             .data
             .ordering_requirement
             .clone()
             .unwrap_or_default();
-        let fetch = requirements.data.fetch;
-        requirements = add_sort_above(requirements, sort_reqs, fetch);
-        assign_initial_requirements(&mut requirements);
+        let fetch = sort_push_down.data.fetch;
+        sort_push_down = add_sort_above(sort_push_down, sort_reqs, fetch);
+        assign_initial_requirements(&mut sort_push_down);
     }
-    Ok(Transformed::yes(requirements))
+
+    Ok(Transformed::yes(sort_push_down))
 }
 
+/// Calculate the pushdown ordering requirements for children.
+/// If sort cannot be pushed down, return None.
 fn pushdown_requirement_to_children(
     plan: &Arc<dyn ExecutionPlan>,
     parent_required: &LexRequirement,
diff --git a/datafusion/physical-optimizer/src/utils.rs 
b/datafusion/physical-optimizer/src/utils.rs
index 636e78a06c..57a193315a 100644
--- a/datafusion/physical-optimizer/src/utils.rs
+++ b/datafusion/physical-optimizer/src/utils.rs
@@ -31,6 +31,10 @@ use datafusion_physical_plan::{ExecutionPlan, 
ExecutionPlanProperties};
 
 /// This utility function adds a `SortExec` above an operator according to the
 /// given ordering requirements while preserving the original partitioning.
+///
+/// Note that this updates the plan in both the [`PlanContext.children`] and
+/// the [`PlanContext.plan`]'s children. Therefore its not required to sync
+/// the child plans with [`PlanContext::update_plan_from_children`].
 pub fn add_sort_above<T: Clone + Default>(
     node: PlanContext<T>,
     sort_requirements: LexRequirement,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to