This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 450c861a8c Refactor SortPushdown using the standard top-down visitor
and using `EquivalenceProperties` (#14821)
450c861a8c is described below
commit 450c861a8c38f8934b134368949606aaf3287792
Author: wiedld <[email protected]>
AuthorDate: Fri Mar 7 14:01:09 2025 -0800
Refactor SortPushdown using the standard top-down visitor and using
`EquivalenceProperties` (#14821)
* refactor: have sort pushdown use transform_down, and provide minor
refactor in sort_pushdown_helper to make it more understandable
* test: inconsequential single change in test
* Use consistent variable naming
* chore: update variable naming
* refactor: only sync the plan children when required
* fix: have orderings include constants which are heterogenius across
partitions
* Revert "fix: have orderings include constants which are heterogenius
across partitions"
This reverts commit 47753541bdd1322f610b9ae72e922037d2999a12.
* test: temporary commit to demonstrate changes that only occur with no
partition by (in window agg), and when aggregating on an unordered column
* Revert "test: temporary commit to demonstrate changes that only occur
with no partition by (in window agg), and when aggregating on an unordered
column"
This reverts commit 2ee747f87b1abc2c94f13f793a5b2879e6c37197.
* chore: cleanup after merging main, for anticipated test change
* chore: rename variable
* refactor: added test cases for orthogonal sorting, and remove 1 unneeded
conditional
* chore: remove unneeded conditional and make a comment
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
.../physical_optimizer/enforce_distribution.rs | 2 +-
.../tests/physical_optimizer/enforce_sorting.rs | 102 ++++++++++++-
.../core/tests/physical_optimizer/test_utils.rs | 10 +-
.../src/enforce_sorting/sort_pushdown.rs | 162 ++++++++++++---------
datafusion/physical-optimizer/src/utils.rs | 4 +
5 files changed, 199 insertions(+), 81 deletions(-)
diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
index f22a896c18..080a10c7b0 100644
--- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
@@ -2388,7 +2388,7 @@ fn repartition_transitively_past_sort_with_projection()
-> Result<()> {
);
let expected = &[
- "SortExec: expr=[c@2 ASC], preserve_partitioning=[true]",
+ "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
// Since this projection is trivial, increasing parallelism is not
beneficial
" ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
index 26a00ef0f2..bb77192e05 100644
--- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
@@ -21,10 +21,10 @@ use crate::physical_optimizer::test_utils::{
aggregate_exec, bounded_window_exec, check_integrity,
coalesce_batches_exec,
coalesce_partitions_exec, create_test_schema, create_test_schema2,
create_test_schema3, filter_exec, global_limit_exec, hash_join_exec,
limit_exec,
- local_limit_exec, memory_exec, parquet_exec, repartition_exec, sort_exec,
sort_expr,
- sort_expr_options, sort_merge_join_exec, sort_preserving_merge_exec,
- sort_preserving_merge_exec_with_fetch, spr_repartition_exec,
stream_exec_ordered,
- union_exec, RequirementsTestExec,
+ local_limit_exec, memory_exec, parquet_exec, repartition_exec, sort_exec,
+ sort_exec_with_fetch, sort_expr, sort_expr_options, sort_merge_join_exec,
+ sort_preserving_merge_exec, sort_preserving_merge_exec_with_fetch,
+ spr_repartition_exec, stream_exec_ordered, union_exec,
RequirementsTestExec,
};
use arrow::compute::SortOptions;
@@ -2242,7 +2242,7 @@ async fn
test_window_partial_constant_and_set_monotonicity() -> Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0
ASC NULLS LAST], file_type=parquet",
],
expected_plan: vec![
- "SortExec: expr=[non_nullable_col@1 ASC NULLS LAST, count@2
ASC NULLS LAST], preserve_partitioning=[false]",
+ "SortExec: expr=[non_nullable_col@1 ASC NULLS LAST],
preserve_partitioning=[false]",
" WindowAggExec: wdw=[count: Ok(Field { name: \"count\",
data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false,
metadata: {} }), frame: WindowFrame { units: Rows, start_bound:
Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false
}]",
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0
ASC NULLS LAST], file_type=parquet",
],
@@ -2259,7 +2259,7 @@ async fn
test_window_partial_constant_and_set_monotonicity() -> Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0
ASC NULLS LAST], file_type=parquet",
],
expected_plan: vec![
- "SortExec: expr=[non_nullable_col@1 DESC NULLS LAST, max@2
DESC NULLS LAST], preserve_partitioning=[false]",
+ "SortExec: expr=[non_nullable_col@1 DESC NULLS LAST],
preserve_partitioning=[false]",
" WindowAggExec: wdw=[max: Ok(Field { name: \"max\",
data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)),
end_bound: Following(UInt64(NULL)), is_causal: false }]",
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0
ASC NULLS LAST], file_type=parquet",
],
@@ -2276,7 +2276,7 @@ async fn
test_window_partial_constant_and_set_monotonicity() -> Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0
ASC NULLS LAST], file_type=parquet",
],
expected_plan: vec![
- "SortExec: expr=[min@2 ASC NULLS LAST, non_nullable_col@1 ASC
NULLS LAST], preserve_partitioning=[false]",
+ "SortExec: expr=[non_nullable_col@1 ASC NULLS LAST],
preserve_partitioning=[false]",
" WindowAggExec: wdw=[min: Ok(Field { name: \"min\",
data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)),
end_bound: Following(UInt64(NULL)), is_causal: false }]",
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0
ASC NULLS LAST], file_type=parquet",
],
@@ -2293,7 +2293,7 @@ async fn
test_window_partial_constant_and_set_monotonicity() -> Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0
ASC NULLS LAST], file_type=parquet",
],
expected_plan: vec![
- "SortExec: expr=[avg@2 DESC NULLS LAST, nullable_col@0 DESC
NULLS LAST], preserve_partitioning=[false]",
+ "SortExec: expr=[nullable_col@0 DESC NULLS LAST],
preserve_partitioning=[false]",
" WindowAggExec: wdw=[avg: Ok(Field { name: \"avg\",
data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false,
metadata: {} }), frame: WindowFrame { units: Rows, start_bound:
Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false
}]",
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0
ASC NULLS LAST], file_type=parquet",
],
@@ -3346,3 +3346,89 @@ async fn
test_window_partial_constant_and_set_monotonicity() -> Result<()> {
Ok(())
}
+
+#[test]
+fn test_removes_unused_orthogonal_sort() -> Result<()> {
+ let schema = create_test_schema3()?;
+ let input_sort_exprs = vec![sort_expr("b", &schema), sort_expr("c",
&schema)];
+ let unbounded_input = stream_exec_ordered(&schema,
input_sort_exprs.clone());
+
+ let orthogonal_sort = sort_exec(vec![sort_expr("a", &schema)],
unbounded_input);
+ let output_sort = sort_exec(input_sort_exprs, orthogonal_sort); // same
sort as data source
+
+ // Test scenario/input has an orthogonal sort:
+ let expected_input = [
+ "SortExec: expr=[b@1 ASC, c@2 ASC], preserve_partitioning=[false]",
+ " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
+ " StreamingTableExec: partition_sizes=1, projection=[a, b, c, d,
e], infinite_source=true, output_ordering=[b@1 ASC, c@2 ASC]"
+ ];
+ assert_eq!(get_plan_string(&output_sort), expected_input,);
+
+ // Test: should remove orthogonal sort, and the uppermost (unneeded) sort:
+ let expected_optimized = [
+ "StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e],
infinite_source=true, output_ordering=[b@1 ASC, c@2 ASC]"
+ ];
+ assert_optimized!(expected_input, expected_optimized, output_sort, true);
+
+ Ok(())
+}
+
+#[test]
+fn test_keeps_used_orthogonal_sort() -> Result<()> {
+ let schema = create_test_schema3()?;
+ let input_sort_exprs = vec![sort_expr("b", &schema), sort_expr("c",
&schema)];
+ let unbounded_input = stream_exec_ordered(&schema,
input_sort_exprs.clone());
+
+ let orthogonal_sort =
+ sort_exec_with_fetch(vec![sort_expr("a", &schema)], Some(3),
unbounded_input); // has fetch, so this orthogonal sort changes the output
+ let output_sort = sort_exec(input_sort_exprs, orthogonal_sort);
+
+ // Test scenario/input has an orthogonal sort:
+ let expected_input = [
+ "SortExec: expr=[b@1 ASC, c@2 ASC], preserve_partitioning=[false]",
+ " SortExec: TopK(fetch=3), expr=[a@0 ASC],
preserve_partitioning=[false]",
+ " StreamingTableExec: partition_sizes=1, projection=[a, b, c, d,
e], infinite_source=true, output_ordering=[b@1 ASC, c@2 ASC]"
+ ];
+ assert_eq!(get_plan_string(&output_sort), expected_input,);
+
+ // Test: should keep the orthogonal sort, since it modifies the output:
+ let expected_optimized = expected_input;
+ assert_optimized!(expected_input, expected_optimized, output_sort, true);
+
+ Ok(())
+}
+
+#[test]
+fn test_handles_multiple_orthogonal_sorts() -> Result<()> {
+ let schema = create_test_schema3()?;
+ let input_sort_exprs = vec![sort_expr("b", &schema), sort_expr("c",
&schema)];
+ let unbounded_input = stream_exec_ordered(&schema,
input_sort_exprs.clone());
+
+ let orthogonal_sort_0 = sort_exec(vec![sort_expr("c", &schema)],
unbounded_input); // has no fetch, so can be removed
+ let orthogonal_sort_1 =
+ sort_exec_with_fetch(vec![sort_expr("a", &schema)], Some(3),
orthogonal_sort_0); // has fetch, so this orthogonal sort changes the output
+ let orthogonal_sort_2 = sort_exec(vec![sort_expr("c", &schema)],
orthogonal_sort_1); // has no fetch, so can be removed
+ let orthogonal_sort_3 = sort_exec(vec![sort_expr("a", &schema)],
orthogonal_sort_2); // has no fetch, so can be removed
+ let output_sort = sort_exec(input_sort_exprs, orthogonal_sort_3); // final
sort
+
+ // Test scenario/input has an orthogonal sort:
+ let expected_input = [
+ "SortExec: expr=[b@1 ASC, c@2 ASC], preserve_partitioning=[false]",
+ " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
+ " SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
+ " SortExec: TopK(fetch=3), expr=[a@0 ASC],
preserve_partitioning=[false]",
+ " SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
+ " StreamingTableExec: partition_sizes=1, projection=[a, b, c,
d, e], infinite_source=true, output_ordering=[b@1 ASC, c@2 ASC]",
+ ];
+ assert_eq!(get_plan_string(&output_sort), expected_input,);
+
+ // Test: should keep only the needed orthogonal sort, and remove the
unneeded ones:
+ let expected_optimized = [
+ "SortExec: expr=[b@1 ASC, c@2 ASC], preserve_partitioning=[false]",
+ " SortExec: TopK(fetch=3), expr=[a@0 ASC],
preserve_partitioning=[false]",
+ " StreamingTableExec: partition_sizes=1, projection=[a, b, c, d,
e], infinite_source=true, output_ordering=[b@1 ASC, c@2 ASC]",
+ ];
+ assert_optimized!(expected_input, expected_optimized, output_sort, true);
+
+ Ok(())
+}
diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs
b/datafusion/core/tests/physical_optimizer/test_utils.rs
index 1b8e754ee3..99a75e6e50 100644
--- a/datafusion/core/tests/physical_optimizer/test_utils.rs
+++ b/datafusion/core/tests/physical_optimizer/test_utils.rs
@@ -295,9 +295,17 @@ pub fn coalesce_batches_exec(input: Arc<dyn
ExecutionPlan>) -> Arc<dyn Execution
pub fn sort_exec(
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
input: Arc<dyn ExecutionPlan>,
+) -> Arc<dyn ExecutionPlan> {
+ sort_exec_with_fetch(sort_exprs, None, input)
+}
+
+pub fn sort_exec_with_fetch(
+ sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
+ fetch: Option<usize>,
+ input: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
let sort_exprs = sort_exprs.into_iter().collect();
- Arc::new(SortExec::new(sort_exprs, input))
+ Arc::new(SortExec::new(sort_exprs, input).with_fetch(fetch))
}
/// A test [`ExecutionPlan`] whose requirements can be configured.
diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs
b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs
index 17acb62729..2e20608d0e 100644
--- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs
+++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs
@@ -23,9 +23,7 @@ use crate::utils::{
};
use arrow::datatypes::SchemaRef;
-use datafusion_common::tree_node::{
- ConcreteTreeNode, Transformed, TreeNode, TreeNodeRecursion,
-};
+use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{plan_err, HashSet, JoinSide, Result};
use datafusion_expr::JoinType;
use datafusion_physical_expr::expressions::Column;
@@ -59,9 +57,9 @@ pub struct ParentRequirements {
pub type SortPushDown = PlanContext<ParentRequirements>;
/// Assigns the ordering requirement of the root node to the its children.
-pub fn assign_initial_requirements(node: &mut SortPushDown) {
- let reqs = node.plan.required_input_ordering();
- for (child, requirement) in node.children.iter_mut().zip(reqs) {
+pub fn assign_initial_requirements(sort_push_down: &mut SortPushDown) {
+ let reqs = sort_push_down.plan.required_input_ordering();
+ for (child, requirement) in sort_push_down.children.iter_mut().zip(reqs) {
child.data = ParentRequirements {
ordering_requirement: requirement,
// If the parent has a fetch value, assign it to the children
@@ -71,24 +69,26 @@ pub fn assign_initial_requirements(node: &mut SortPushDown)
{
}
}
-pub fn pushdown_sorts(sort_pushdown: SortPushDown) -> Result<SortPushDown> {
- let mut new_node = pushdown_sorts_helper(sort_pushdown)?;
- while new_node.tnr == TreeNodeRecursion::Stop {
- new_node = pushdown_sorts_helper(new_node.data)?;
+pub fn pushdown_sorts(sort_push_down: SortPushDown) -> Result<SortPushDown> {
+ sort_push_down
+ .transform_down(pushdown_sorts_helper)
+ .map(|transformed| transformed.data)
+}
+
+fn min_fetch(f1: Option<usize>, f2: Option<usize>) -> Option<usize> {
+ match (f1, f2) {
+ (Some(f1), Some(f2)) => Some(f1.min(f2)),
+ (Some(_), _) => f1,
+ (_, Some(_)) => f2,
+ _ => None,
}
- let (new_node, children) = new_node.data.take_children();
- let new_children = children
- .into_iter()
- .map(pushdown_sorts)
- .collect::<Result<_>>()?;
- new_node.with_new_children(new_children)
}
fn pushdown_sorts_helper(
- mut requirements: SortPushDown,
+ mut sort_push_down: SortPushDown,
) -> Result<Transformed<SortPushDown>> {
- let plan = &requirements.plan;
- let parent_reqs = requirements
+ let plan = &sort_push_down.plan;
+ let parent_reqs = sort_push_down
.data
.ordering_requirement
.clone()
@@ -98,82 +98,102 @@ fn pushdown_sorts_helper(
.ordering_satisfy_requirement(&parent_reqs);
if is_sort(plan) {
- let sort_fetch = plan.fetch();
- let required_ordering = plan
+ let current_sort_fetch = plan.fetch();
+ let parent_req_fetch = sort_push_down.data.fetch;
+
+ let current_plan_reqs = plan
.output_ordering()
.cloned()
.map(LexRequirement::from)
.unwrap_or_default();
- if !satisfy_parent {
- // Make sure this `SortExec` satisfies parent requirements:
- let sort_reqs =
requirements.data.ordering_requirement.unwrap_or_default();
- // It's possible current plan (`SortExec`) has a fetch value.
- // And if both of them have fetch values, we should use the
minimum one.
- if let Some(fetch) = sort_fetch {
- if let Some(requirement_fetch) = requirements.data.fetch {
- requirements.data.fetch =
Some(fetch.min(requirement_fetch));
- }
- }
- let fetch = requirements.data.fetch.or(sort_fetch);
- requirements = requirements.children.swap_remove(0);
- requirements = add_sort_above(requirements, sort_reqs, fetch);
- };
+ let parent_is_stricter = plan
+ .equivalence_properties()
+ .requirements_compatible(&parent_reqs, ¤t_plan_reqs);
+ let current_is_stricter = plan
+ .equivalence_properties()
+ .requirements_compatible(¤t_plan_reqs, &parent_reqs);
+
+ if !satisfy_parent && !parent_is_stricter {
+ // This new sort has different requirements than the ordering
being pushed down.
+ // 1. add a `SortExec` here for the pushed down ordering (parent
reqs).
+ // 2. continue sort pushdown, but with the new ordering of the new
sort.
+
+ // remove current sort (which will be the new ordering to pushdown)
+ let new_reqs = current_plan_reqs;
+ sort_push_down = sort_push_down.children.swap_remove(0);
+ sort_push_down = sort_push_down.update_plan_from_children()?; //
changed plan
+
+ // add back sort exec matching parent
+ sort_push_down =
+ add_sort_above(sort_push_down, parent_reqs, parent_req_fetch);
+
+ // make pushdown requirements be the new ones.
+ sort_push_down.children[0].data = ParentRequirements {
+ ordering_requirement: Some(new_reqs),
+ fetch: current_sort_fetch,
+ };
+ } else {
+ // Don't add a SortExec
+ // Do update what sort requirements to keep pushing down
- // We can safely get the 0th index as we are dealing with a `SortExec`.
- let mut child = requirements.children.swap_remove(0);
- if let Some(adjusted) =
- pushdown_requirement_to_children(&child.plan, &required_ordering)?
- {
- let fetch = sort_fetch.or_else(|| child.plan.fetch());
- for (grand_child, order) in
child.children.iter_mut().zip(adjusted) {
- grand_child.data = ParentRequirements {
- ordering_requirement: order,
- fetch,
- };
+ // remove current sort, and get the sort's child
+ sort_push_down = sort_push_down.children.swap_remove(0);
+ sort_push_down = sort_push_down.update_plan_from_children()?; //
changed plan
+
+ // set the stricter fetch
+ sort_push_down.data.fetch = min_fetch(current_sort_fetch,
parent_req_fetch);
+
+ // set the stricter ordering
+ if current_is_stricter {
+ sort_push_down.data.ordering_requirement =
Some(current_plan_reqs);
+ } else {
+ sort_push_down.data.ordering_requirement = Some(parent_reqs);
}
- // Can push down requirements
- child.data = ParentRequirements {
- ordering_requirement: Some(required_ordering),
- fetch,
- };
- return Ok(Transformed {
- data: child,
- transformed: true,
- tnr: TreeNodeRecursion::Stop,
- });
- } else {
- // Can not push down requirements
- requirements.children = vec![child];
- assign_initial_requirements(&mut requirements);
+ // recursive call to helper, so it doesn't transform_down and miss
the new node (previous child of sort)
+ return pushdown_sorts_helper(sort_push_down);
}
+ } else if parent_reqs.is_empty() {
+ // note: this `satisfy_parent`, but we don't want to push down
anything.
+ // Nothing to do.
+ return Ok(Transformed::no(sort_push_down));
} else if satisfy_parent {
- // For non-sort operators, immediately return if parent requirements
are met:
+ // For non-sort operators which satisfy ordering:
let reqs = plan.required_input_ordering();
- for (child, order) in requirements.children.iter_mut().zip(reqs) {
+ let parent_req_fetch = sort_push_down.data.fetch;
+
+ for (child, order) in sort_push_down.children.iter_mut().zip(reqs) {
child.data.ordering_requirement = order;
+ child.data.fetch = min_fetch(parent_req_fetch, child.data.fetch);
}
} else if let Some(adjusted) = pushdown_requirement_to_children(plan,
&parent_reqs)? {
- // Can not satisfy the parent requirements, check whether we can push
- // requirements down:
- for (child, order) in requirements.children.iter_mut().zip(adjusted) {
+ // For operators that can take a sort pushdown.
+
+ // Continue pushdown, with updated requirements:
+ let parent_fetch = sort_push_down.data.fetch;
+ let current_fetch = plan.fetch();
+ for (child, order) in sort_push_down.children.iter_mut().zip(adjusted)
{
child.data.ordering_requirement = order;
+ child.data.fetch = min_fetch(current_fetch, parent_fetch);
}
- requirements.data.ordering_requirement = None;
+ sort_push_down.data.ordering_requirement = None;
} else {
// Can not push down requirements, add new `SortExec`:
- let sort_reqs = requirements
+ let sort_reqs = sort_push_down
.data
.ordering_requirement
.clone()
.unwrap_or_default();
- let fetch = requirements.data.fetch;
- requirements = add_sort_above(requirements, sort_reqs, fetch);
- assign_initial_requirements(&mut requirements);
+ let fetch = sort_push_down.data.fetch;
+ sort_push_down = add_sort_above(sort_push_down, sort_reqs, fetch);
+ assign_initial_requirements(&mut sort_push_down);
}
- Ok(Transformed::yes(requirements))
+
+ Ok(Transformed::yes(sort_push_down))
}
+/// Calculate the pushdown ordering requirements for children.
+/// If sort cannot be pushed down, return None.
fn pushdown_requirement_to_children(
plan: &Arc<dyn ExecutionPlan>,
parent_required: &LexRequirement,
diff --git a/datafusion/physical-optimizer/src/utils.rs
b/datafusion/physical-optimizer/src/utils.rs
index 636e78a06c..57a193315a 100644
--- a/datafusion/physical-optimizer/src/utils.rs
+++ b/datafusion/physical-optimizer/src/utils.rs
@@ -31,6 +31,10 @@ use datafusion_physical_plan::{ExecutionPlan,
ExecutionPlanProperties};
/// This utility function adds a `SortExec` above an operator according to the
/// given ordering requirements while preserving the original partitioning.
+///
+/// Note that this updates the plan in both the [`PlanContext.children`] and
+/// the [`PlanContext.plan`]'s children. Therefore its not required to sync
+/// the child plans with [`PlanContext::update_plan_from_children`].
pub fn add_sort_above<T: Clone + Default>(
node: PlanContext<T>,
sort_requirements: LexRequirement,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]