alamb commented on code in PR #4586:
URL: https://github.com/apache/arrow-datafusion/pull/4586#discussion_r1050048160
##########
datafusion/core/src/physical_optimizer/enforcement.rs:
##########
@@ -835,13 +836,47 @@ fn new_join_conditions(
new_join_on
}
+/// Within this function, it checks whether we need to add additional plan
operators
+/// of data exchanging and data ordering to satisfy the required distribution
and ordering.
+/// And we should avoid to manually add plan operators of data exchanging and
data ordering in other places
fn ensure_distribution_and_ordering(
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
target_partitions: usize,
) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
if plan.children().is_empty() {
return Ok(plan);
}
+ // It's mainly for changing the single node global SortExec to
+ // the SortPreservingMergeExec with multiple local SortExec.
+ // What's more, if limit exists, it can also be pushed down to the local
sort
+ let plan = plan
+ .as_any()
+ .downcast_ref::<SortExec>()
+ .and_then(|sort_exec| {
+ // There are three situations that there's no need for this
optimization
+ // - There's only one input partition;
+ // - It's already preserving the partitioning so that it can be
regarded as a local sort
+ // - There's no limit pushed down to the local sort (It's still
controversial)
+ if sort_exec.input().output_partitioning().partition_count() > 1
+ && !sort_exec.preserve_partitioning()
+ && sort_exec.fetch().is_some()
Review Comment:
FYI @Dandandan the check for local limit has been restored
##########
datafusion/core/src/physical_optimizer/enforcement.rs:
##########
@@ -835,13 +836,47 @@ fn new_join_conditions(
new_join_on
}
+/// Within this function, it checks whether we need to add additional plan
operators
+/// of data exchanging and data ordering to satisfy the required distribution
and ordering.
+/// And we should avoid to manually add plan operators of data exchanging and
data ordering in other places
fn ensure_distribution_and_ordering(
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
target_partitions: usize,
) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
if plan.children().is_empty() {
return Ok(plan);
}
+ // It's mainly for changing the single node global SortExec to
+ // the SortPreservingMergeExec with multiple local SortExec.
+ // What's more, if limit exists, it can also be pushed down to the local
sort
+ let plan = plan
+ .as_any()
+ .downcast_ref::<SortExec>()
+ .and_then(|sort_exec| {
+ // There are three situations that there's no need for this
optimization
+ // - There's only one input partition;
+ // - It's already preserving the partitioning so that it can be
regarded as a local sort
+ // - There's no limit pushed down to the local sort (It's still
controversial)
Review Comment:
Might be worth a link to the ticket / PR for anyone who sees this comment
and wants more context / backstore
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]