This is an automated email from the ASF dual-hosted git repository.
ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 6403222c1e TreeNode Refactor Part 2 (#8653)
6403222c1e is described below
commit 6403222c1eda8ed3438fe2555229319b92bfa056
Author: Berkay Şahin <[email protected]>
AuthorDate: Wed Dec 27 23:18:27 2023 +0300
TreeNode Refactor Part 2 (#8653)
* Refactor TreeNode's
* Update utils.rs
* Final review
* Remove unnecessary clones, more idiomatic Rust
---------
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
.../src/physical_optimizer/enforce_distribution.rs | 767 +++++++++------------
.../core/src/physical_optimizer/enforce_sorting.rs | 554 +++++++--------
.../src/physical_optimizer/output_requirements.rs | 4 +
.../src/physical_optimizer/pipeline_checker.rs | 32 +-
.../replace_with_order_preserving_variants.rs | 292 ++++----
.../core/src/physical_optimizer/sort_pushdown.rs | 138 ++--
datafusion/core/src/physical_optimizer/utils.rs | 69 +-
datafusion/physical-expr/src/sort_properties.rs | 10 +-
datafusion/physical-plan/src/union.rs | 20 +-
9 files changed, 872 insertions(+), 1014 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 0aef126578..d5a0862273 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -25,11 +25,11 @@ use std::fmt;
use std::fmt::Formatter;
use std::sync::Arc;
+use super::output_requirements::OutputRequirementExec;
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::utils::{
- add_sort_above, get_children_exectrees, is_coalesce_partitions,
is_repartition,
- is_sort_preserving_merge, ExecTree,
+ is_coalesce_partitions, is_repartition, is_sort_preserving_merge,
};
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode,
PhysicalGroupBy};
@@ -52,8 +52,10 @@ use datafusion_expr::logical_plan::JoinType;
use datafusion_physical_expr::expressions::{Column, NoOp};
use datafusion_physical_expr::utils::map_columns_before_projection;
use datafusion_physical_expr::{
- physical_exprs_equal, EquivalenceProperties, PhysicalExpr,
+ physical_exprs_equal, EquivalenceProperties, LexRequirementRef,
PhysicalExpr,
+ PhysicalSortRequirement,
};
+use datafusion_physical_plan::sorts::sort::SortExec;
use datafusion_physical_plan::windows::{get_best_fitting_window,
BoundedWindowAggExec};
use datafusion_physical_plan::{get_plan_string, unbounded_output};
@@ -268,11 +270,12 @@ impl PhysicalOptimizerRule for EnforceDistribution {
/// 5) For other types of operators, by default, pushdown the parent
requirements to children.
///
fn adjust_input_keys_ordering(
- requirements: PlanWithKeyRequirements,
+ mut requirements: PlanWithKeyRequirements,
) -> Result<Transformed<PlanWithKeyRequirements>> {
let parent_required = requirements.required_key_ordering.clone();
let plan_any = requirements.plan.as_any();
- let transformed = if let Some(HashJoinExec {
+
+ if let Some(HashJoinExec {
left,
right,
on,
@@ -287,7 +290,7 @@ fn adjust_input_keys_ordering(
PartitionMode::Partitioned => {
let join_constructor =
|new_conditions: (Vec<(Column, Column)>,
Vec<SortOptions>)| {
- Ok(Arc::new(HashJoinExec::try_new(
+ HashJoinExec::try_new(
left.clone(),
right.clone(),
new_conditions.0,
@@ -295,15 +298,17 @@ fn adjust_input_keys_ordering(
join_type,
PartitionMode::Partitioned,
*null_equals_null,
- )?) as Arc<dyn ExecutionPlan>)
+ )
+ .map(|e| Arc::new(e) as _)
};
- Some(reorder_partitioned_join_keys(
+ reorder_partitioned_join_keys(
requirements.plan.clone(),
&parent_required,
on,
vec![],
&join_constructor,
- )?)
+ )
+ .map(Transformed::Yes)
}
PartitionMode::CollectLeft => {
let new_right_request = match join_type {
@@ -321,15 +326,15 @@ fn adjust_input_keys_ordering(
};
// Push down requirements to the right side
- Some(PlanWithKeyRequirements {
- plan: requirements.plan.clone(),
- required_key_ordering: vec![],
- request_key_ordering: vec![None, new_right_request],
- })
+ requirements.children[1].required_key_ordering =
+ new_right_request.unwrap_or(vec![]);
+ Ok(Transformed::Yes(requirements))
}
PartitionMode::Auto => {
// Can not satisfy, clear the current requirements and
generate new empty requirements
- Some(PlanWithKeyRequirements::new(requirements.plan.clone()))
+ Ok(Transformed::Yes(PlanWithKeyRequirements::new(
+ requirements.plan,
+ )))
}
}
} else if let Some(CrossJoinExec { left, .. }) =
@@ -337,14 +342,9 @@ fn adjust_input_keys_ordering(
{
let left_columns_len = left.schema().fields().len();
// Push down requirements to the right side
- Some(PlanWithKeyRequirements {
- plan: requirements.plan.clone(),
- required_key_ordering: vec![],
- request_key_ordering: vec![
- None,
- shift_right_required(&parent_required, left_columns_len),
- ],
- })
+ requirements.children[1].required_key_ordering =
+ shift_right_required(&parent_required,
left_columns_len).unwrap_or_default();
+ Ok(Transformed::Yes(requirements))
} else if let Some(SortMergeJoinExec {
left,
right,
@@ -357,35 +357,40 @@ fn adjust_input_keys_ordering(
{
let join_constructor =
|new_conditions: (Vec<(Column, Column)>, Vec<SortOptions>)| {
- Ok(Arc::new(SortMergeJoinExec::try_new(
+ SortMergeJoinExec::try_new(
left.clone(),
right.clone(),
new_conditions.0,
*join_type,
new_conditions.1,
*null_equals_null,
- )?) as Arc<dyn ExecutionPlan>)
+ )
+ .map(|e| Arc::new(e) as _)
};
- Some(reorder_partitioned_join_keys(
+ reorder_partitioned_join_keys(
requirements.plan.clone(),
&parent_required,
on,
sort_options.clone(),
&join_constructor,
- )?)
+ )
+ .map(Transformed::Yes)
} else if let Some(aggregate_exec) =
plan_any.downcast_ref::<AggregateExec>() {
if !parent_required.is_empty() {
match aggregate_exec.mode() {
- AggregateMode::FinalPartitioned => Some(reorder_aggregate_keys(
+ AggregateMode::FinalPartitioned => reorder_aggregate_keys(
requirements.plan.clone(),
&parent_required,
aggregate_exec,
- )?),
- _ =>
Some(PlanWithKeyRequirements::new(requirements.plan.clone())),
+ )
+ .map(Transformed::Yes),
+ _ => Ok(Transformed::Yes(PlanWithKeyRequirements::new(
+ requirements.plan,
+ ))),
}
} else {
// Keep everything unchanged
- None
+ Ok(Transformed::No(requirements))
}
} else if let Some(proj) = plan_any.downcast_ref::<ProjectionExec>() {
let expr = proj.expr();
@@ -394,34 +399,28 @@ fn adjust_input_keys_ordering(
// Construct a mapping from new name to the the orginal Column
let new_required = map_columns_before_projection(&parent_required,
expr);
if new_required.len() == parent_required.len() {
- Some(PlanWithKeyRequirements {
- plan: requirements.plan.clone(),
- required_key_ordering: vec![],
- request_key_ordering: vec![Some(new_required.clone())],
- })
+ requirements.children[0].required_key_ordering = new_required;
+ Ok(Transformed::Yes(requirements))
} else {
// Can not satisfy, clear the current requirements and generate
new empty requirements
- Some(PlanWithKeyRequirements::new(requirements.plan.clone()))
+ Ok(Transformed::Yes(PlanWithKeyRequirements::new(
+ requirements.plan,
+ )))
}
} else if plan_any.downcast_ref::<RepartitionExec>().is_some()
|| plan_any.downcast_ref::<CoalescePartitionsExec>().is_some()
|| plan_any.downcast_ref::<WindowAggExec>().is_some()
{
- Some(PlanWithKeyRequirements::new(requirements.plan.clone()))
+ Ok(Transformed::Yes(PlanWithKeyRequirements::new(
+ requirements.plan,
+ )))
} else {
// By default, push down the parent requirements to children
- let children_len = requirements.plan.children().len();
- Some(PlanWithKeyRequirements {
- plan: requirements.plan.clone(),
- required_key_ordering: vec![],
- request_key_ordering: vec![Some(parent_required.clone());
children_len],
- })
- };
- Ok(if let Some(transformed) = transformed {
- Transformed::Yes(transformed)
- } else {
- Transformed::No(requirements)
- })
+ requirements.children.iter_mut().for_each(|child| {
+ child.required_key_ordering = parent_required.clone();
+ });
+ Ok(Transformed::Yes(requirements))
+ }
}
fn reorder_partitioned_join_keys<F>(
@@ -452,28 +451,24 @@ where
for idx in 0..sort_options.len() {
new_sort_options.push(sort_options[new_positions[idx]])
}
-
- Ok(PlanWithKeyRequirements {
- plan: join_constructor((new_join_on, new_sort_options))?,
- required_key_ordering: vec![],
- request_key_ordering: vec![Some(left_keys), Some(right_keys)],
- })
+ let mut requirement_tree =
PlanWithKeyRequirements::new(join_constructor((
+ new_join_on,
+ new_sort_options,
+ ))?);
+ requirement_tree.children[0].required_key_ordering = left_keys;
+ requirement_tree.children[1].required_key_ordering = right_keys;
+ Ok(requirement_tree)
} else {
- Ok(PlanWithKeyRequirements {
- plan: join_plan,
- required_key_ordering: vec![],
- request_key_ordering: vec![Some(left_keys), Some(right_keys)],
- })
+ let mut requirement_tree = PlanWithKeyRequirements::new(join_plan);
+ requirement_tree.children[0].required_key_ordering = left_keys;
+ requirement_tree.children[1].required_key_ordering = right_keys;
+ Ok(requirement_tree)
}
} else {
- Ok(PlanWithKeyRequirements {
- plan: join_plan,
- required_key_ordering: vec![],
- request_key_ordering: vec![
- Some(join_key_pairs.left_keys),
- Some(join_key_pairs.right_keys),
- ],
- })
+ let mut requirement_tree = PlanWithKeyRequirements::new(join_plan);
+ requirement_tree.children[0].required_key_ordering =
join_key_pairs.left_keys;
+ requirement_tree.children[1].required_key_ordering =
join_key_pairs.right_keys;
+ Ok(requirement_tree)
}
}
@@ -868,59 +863,24 @@ fn new_join_conditions(
.collect()
}
-/// Updates `dist_onward` such that, to keep track of
-/// `input` in the `exec_tree`.
-///
-/// # Arguments
-///
-/// * `input`: Current execution plan
-/// * `dist_onward`: It keeps track of executors starting from a distribution
-/// changing operator (e.g Repartition, SortPreservingMergeExec, etc.)
-/// until child of `input` (`input` should have single child).
-/// * `input_idx`: index of the `input`, for its parent.
-///
-fn update_distribution_onward(
- input: Arc<dyn ExecutionPlan>,
- dist_onward: &mut Option<ExecTree>,
- input_idx: usize,
-) {
- // Update the onward tree if there is an active branch
- if let Some(exec_tree) = dist_onward {
- // When we add a new operator to change distribution
- // we add RepartitionExec, SortPreservingMergeExec,
CoalescePartitionsExec
- // in this case, we need to update exec tree idx such that exec tree
is now child of these
- // operators (change the 0, since all of the operators have single
child).
- exec_tree.idx = 0;
- *exec_tree = ExecTree::new(input, input_idx, vec![exec_tree.clone()]);
- } else {
- *dist_onward = Some(ExecTree::new(input, input_idx, vec![]));
- }
-}
-
/// Adds RoundRobin repartition operator to the plan increase parallelism.
///
/// # Arguments
///
-/// * `input`: Current execution plan
+/// * `input`: Current node.
/// * `n_target`: desired target partition number, if partition number of the
/// current executor is less than this value. Partition number will be
increased.
-/// * `dist_onward`: It keeps track of executors starting from a distribution
-/// changing operator (e.g Repartition, SortPreservingMergeExec, etc.)
-/// until `input` plan.
-/// * `input_idx`: index of the `input`, for its parent.
///
/// # Returns
///
-/// A [Result] object that contains new execution plan, where desired
partition number
-/// is achieved by adding RoundRobin Repartition.
+/// A [`Result`] object that contains new execution plan where the desired
+/// partition number is achieved by adding a RoundRobin repartition.
fn add_roundrobin_on_top(
- input: Arc<dyn ExecutionPlan>,
+ input: DistributionContext,
n_target: usize,
- dist_onward: &mut Option<ExecTree>,
- input_idx: usize,
-) -> Result<Arc<dyn ExecutionPlan>> {
- // Adding repartition is helpful
- if input.output_partitioning().partition_count() < n_target {
+) -> Result<DistributionContext> {
+ // Adding repartition is helpful:
+ if input.plan.output_partitioning().partition_count() < n_target {
// When there is an existing ordering, we preserve ordering
// during repartition. This will be un-done in the future
// If any of the following conditions is true
@@ -928,13 +888,16 @@ fn add_roundrobin_on_top(
// - Usage of order preserving variants is not desirable
// (determined by flag `config.optimizer.prefer_existing_sort`)
let partitioning = Partitioning::RoundRobinBatch(n_target);
- let repartition =
- RepartitionExec::try_new(input,
partitioning)?.with_preserve_order();
+ let repartition = RepartitionExec::try_new(input.plan.clone(),
partitioning)?
+ .with_preserve_order();
- // update distribution onward with new operator
- let new_plan = Arc::new(repartition) as Arc<dyn ExecutionPlan>;
- update_distribution_onward(new_plan.clone(), dist_onward, input_idx);
- Ok(new_plan)
+ let new_plan = Arc::new(repartition) as _;
+
+ Ok(DistributionContext {
+ plan: new_plan,
+ distribution_connection: true,
+ children_nodes: vec![input],
+ })
} else {
// Partition is not helpful, we already have desired number of
partitions.
Ok(input)
@@ -948,46 +911,38 @@ fn add_roundrobin_on_top(
///
/// # Arguments
///
-/// * `input`: Current execution plan
+/// * `input`: Current node.
/// * `hash_exprs`: Stores Physical Exprs that are used during hashing.
/// * `n_target`: desired target partition number, if partition number of the
/// current executor is less than this value. Partition number will be
increased.
-/// * `dist_onward`: It keeps track of executors starting from a distribution
-/// changing operator (e.g Repartition, SortPreservingMergeExec, etc.)
-/// until `input` plan.
-/// * `input_idx`: index of the `input`, for its parent.
///
/// # Returns
///
-/// A [`Result`] object that contains new execution plan, where desired
distribution is
-/// satisfied by adding Hash Repartition.
+/// A [`Result`] object that contains new execution plan where the desired
+/// distribution is satisfied by adding a Hash repartition.
fn add_hash_on_top(
- input: Arc<dyn ExecutionPlan>,
+ mut input: DistributionContext,
hash_exprs: Vec<Arc<dyn PhysicalExpr>>,
- // Repartition(Hash) will have `n_target` partitions at the output.
n_target: usize,
- // Stores executors starting from Repartition(RoundRobin) until
- // current executor. When Repartition(Hash) is added, `dist_onward`
- // is updated such that it stores connection from Repartition(RoundRobin)
- // until Repartition(Hash).
- dist_onward: &mut Option<ExecTree>,
- input_idx: usize,
repartition_beneficial_stats: bool,
-) -> Result<Arc<dyn ExecutionPlan>> {
- if n_target == input.output_partitioning().partition_count() && n_target
== 1 {
- // In this case adding a hash repartition is unnecessary as the hash
- // requirement is implicitly satisfied.
+) -> Result<DistributionContext> {
+ let partition_count = input.plan.output_partitioning().partition_count();
+ // Early return if hash repartition is unnecessary
+ if n_target == partition_count && n_target == 1 {
return Ok(input);
}
+
let satisfied = input
+ .plan
.output_partitioning()
.satisfy(Distribution::HashPartitioned(hash_exprs.clone()), || {
- input.equivalence_properties()
+ input.plan.equivalence_properties()
});
+
// Add hash repartitioning when:
// - The hash distribution requirement is not satisfied, or
// - We can increase parallelism by adding hash partitioning.
- if !satisfied || n_target > input.output_partitioning().partition_count() {
+ if !satisfied || n_target >
input.plan.output_partitioning().partition_count() {
// When there is an existing ordering, we preserve ordering during
// repartition. This will be rolled back in the future if any of the
// following conditions is true:
@@ -995,75 +950,66 @@ fn add_hash_on_top(
// requirements.
// - Usage of order preserving variants is not desirable (per the flag
// `config.optimizer.prefer_existing_sort`).
- let mut new_plan = if repartition_beneficial_stats {
+ if repartition_beneficial_stats {
// Since hashing benefits from partitioning, add a round-robin
repartition
// before it:
- add_roundrobin_on_top(input, n_target, dist_onward, 0)?
- } else {
- input
- };
+ input = add_roundrobin_on_top(input, n_target)?;
+ }
+
let partitioning = Partitioning::Hash(hash_exprs, n_target);
- let repartition = RepartitionExec::try_new(new_plan, partitioning)?
- // preserve any ordering if possible
+ let repartition = RepartitionExec::try_new(input.plan.clone(),
partitioning)?
.with_preserve_order();
- new_plan = Arc::new(repartition) as _;
- // update distribution onward with new operator
- update_distribution_onward(new_plan.clone(), dist_onward, input_idx);
- Ok(new_plan)
- } else {
- Ok(input)
+ input.children_nodes = vec![input.clone()];
+ input.distribution_connection = true;
+ input.plan = Arc::new(repartition) as _;
}
+
+ Ok(input)
}
-/// Adds a `SortPreservingMergeExec` operator on top of input executor:
-/// - to satisfy single distribution requirement.
+/// Adds a [`SortPreservingMergeExec`] operator on top of input executor
+/// to satisfy single distribution requirement.
///
/// # Arguments
///
-/// * `input`: Current execution plan
-/// * `dist_onward`: It keeps track of executors starting from a distribution
-/// changing operator (e.g Repartition, SortPreservingMergeExec, etc.)
-/// until `input` plan.
-/// * `input_idx`: index of the `input`, for its parent.
+/// * `input`: Current node.
///
/// # Returns
///
-/// New execution plan, where desired single
-/// distribution is satisfied by adding `SortPreservingMergeExec`.
-fn add_spm_on_top(
- input: Arc<dyn ExecutionPlan>,
- dist_onward: &mut Option<ExecTree>,
- input_idx: usize,
-) -> Arc<dyn ExecutionPlan> {
+/// Updated node with an execution plan, where desired single
+/// distribution is satisfied by adding [`SortPreservingMergeExec`].
+fn add_spm_on_top(input: DistributionContext) -> DistributionContext {
// Add SortPreservingMerge only when partition count is larger than 1.
- if input.output_partitioning().partition_count() > 1 {
+ if input.plan.output_partitioning().partition_count() > 1 {
// When there is an existing ordering, we preserve ordering
- // during decreasıng partıtıons. This will be un-done in the future
- // If any of the following conditions is true
+ // when decreasing partitions. This will be un-done in the future
+ // if any of the following conditions is true
// - Preserving ordering is not helpful in terms of satisfying
ordering requirements
// - Usage of order preserving variants is not desirable
- // (determined by flag `config.optimizer.prefer_existing_sort`)
- let should_preserve_ordering = input.output_ordering().is_some();
- let new_plan: Arc<dyn ExecutionPlan> = if should_preserve_ordering {
- let existing_ordering = input.output_ordering().unwrap_or(&[]);
+ // (determined by flag
`config.optimizer.bounded_order_preserving_variants`)
+ let should_preserve_ordering = input.plan.output_ordering().is_some();
+
+ let new_plan = if should_preserve_ordering {
Arc::new(SortPreservingMergeExec::new(
- existing_ordering.to_vec(),
- input,
+ input.plan.output_ordering().unwrap_or(&[]).to_vec(),
+ input.plan.clone(),
)) as _
} else {
- Arc::new(CoalescePartitionsExec::new(input)) as _
+ Arc::new(CoalescePartitionsExec::new(input.plan.clone())) as _
};
- // update repartition onward with new operator
- update_distribution_onward(new_plan.clone(), dist_onward, input_idx);
- new_plan
+ DistributionContext {
+ plan: new_plan,
+ distribution_connection: true,
+ children_nodes: vec![input],
+ }
} else {
input
}
}
-/// Updates the physical plan inside `distribution_context` so that
distribution
+/// Updates the physical plan inside [`DistributionContext`] so that
distribution
/// changing operators are removed from the top. If they are necessary, they
will
/// be added in subsequent stages.
///
@@ -1081,48 +1027,23 @@ fn add_spm_on_top(
/// "ParquetExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c,
d, e], output_ordering=\[a@0 ASC]",
/// ```
fn remove_dist_changing_operators(
- distribution_context: DistributionContext,
+ mut distribution_context: DistributionContext,
) -> Result<DistributionContext> {
- let DistributionContext {
- mut plan,
- mut distribution_onwards,
- } = distribution_context;
-
- // Remove any distribution changing operators at the beginning:
- // Note that they will be re-inserted later on if necessary or helpful.
- while is_repartition(&plan)
- || is_coalesce_partitions(&plan)
- || is_sort_preserving_merge(&plan)
+ while is_repartition(&distribution_context.plan)
+ || is_coalesce_partitions(&distribution_context.plan)
+ || is_sort_preserving_merge(&distribution_context.plan)
{
- // All of above operators have a single child. When we remove the top
- // operator, we take the first child.
- plan = plan.children().swap_remove(0);
- distribution_onwards =
- get_children_exectrees(plan.children().len(),
&distribution_onwards[0]);
+ // All of above operators have a single child. First child is only
child.
+ let child = distribution_context.children_nodes.swap_remove(0);
+ // Remove any distribution changing operators at the beginning:
+ // Note that they will be re-inserted later on if necessary or helpful.
+ distribution_context = child;
}
- // Create a plan with the updated children:
- Ok(DistributionContext {
- plan,
- distribution_onwards,
- })
+ Ok(distribution_context)
}
-/// Updates the physical plan `input` by using `dist_onward` replace order
preserving operator variants
-/// with their corresponding operators that do not preserve order. It is a
wrapper for `replace_order_preserving_variants_helper`
-fn replace_order_preserving_variants(
- input: &mut Arc<dyn ExecutionPlan>,
- dist_onward: &mut Option<ExecTree>,
-) -> Result<()> {
- if let Some(dist_onward) = dist_onward {
- *input = replace_order_preserving_variants_helper(dist_onward)?;
- }
- *dist_onward = None;
- Ok(())
-}
-
-/// Updates the physical plan inside `ExecTree` if preserving ordering while
changing partitioning
-/// is not helpful or desirable.
+/// Updates the [`DistributionContext`] if preserving ordering while changing
partitioning is not helpful or desirable.
///
/// Assume that following plan is given:
/// ```text
@@ -1132,7 +1053,7 @@ fn replace_order_preserving_variants(
/// " ParquetExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a,
b, c, d, e], output_ordering=\[a@0 ASC]",
/// ```
///
-/// This function converts plan above (inside `ExecTree`) to the following:
+/// This function converts plan above to the following:
///
/// ```text
/// "CoalescePartitionsExec"
@@ -1140,30 +1061,75 @@ fn replace_order_preserving_variants(
/// " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2",
/// " ParquetExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a,
b, c, d, e], output_ordering=\[a@0 ASC]",
/// ```
-fn replace_order_preserving_variants_helper(
- exec_tree: &ExecTree,
-) -> Result<Arc<dyn ExecutionPlan>> {
- let mut updated_children = exec_tree.plan.children();
- for child in &exec_tree.children {
- updated_children[child.idx] =
replace_order_preserving_variants_helper(child)?;
- }
- if is_sort_preserving_merge(&exec_tree.plan) {
- return Ok(Arc::new(CoalescePartitionsExec::new(
- updated_children.swap_remove(0),
- )));
- }
- if let Some(repartition) =
exec_tree.plan.as_any().downcast_ref::<RepartitionExec>() {
+fn replace_order_preserving_variants(
+ mut context: DistributionContext,
+) -> Result<DistributionContext> {
+ let mut updated_children = context
+ .children_nodes
+ .iter()
+ .map(|child| {
+ if child.distribution_connection {
+ replace_order_preserving_variants(child.clone())
+ } else {
+ Ok(child.clone())
+ }
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ if is_sort_preserving_merge(&context.plan) {
+ let child = updated_children.swap_remove(0);
+ context.plan =
Arc::new(CoalescePartitionsExec::new(child.plan.clone()));
+ context.children_nodes = vec![child];
+ return Ok(context);
+ } else if let Some(repartition) =
+ context.plan.as_any().downcast_ref::<RepartitionExec>()
+ {
if repartition.preserve_order() {
- return Ok(Arc::new(
- // new RepartitionExec don't preserve order
- RepartitionExec::try_new(
- updated_children.swap_remove(0),
- repartition.partitioning().clone(),
- )?,
- ));
+ let child = updated_children.swap_remove(0);
+ context.plan = Arc::new(RepartitionExec::try_new(
+ child.plan.clone(),
+ repartition.partitioning().clone(),
+ )?);
+ context.children_nodes = vec![child];
+ return Ok(context);
+ }
+ }
+
+ context.plan = context
+ .plan
+ .clone()
+ .with_new_children(updated_children.into_iter().map(|c|
c.plan).collect())?;
+ Ok(context)
+}
+
+/// This utility function adds a [`SortExec`] above an operator according to
the
+/// given ordering requirements while preserving the original partitioning.
+fn add_sort_preserving_partitions(
+ node: DistributionContext,
+ sort_requirement: LexRequirementRef,
+ fetch: Option<usize>,
+) -> DistributionContext {
+ // If the ordering requirement is already satisfied, do not add a sort.
+ if !node
+ .plan
+ .equivalence_properties()
+ .ordering_satisfy_requirement(sort_requirement)
+ {
+ let sort_expr =
PhysicalSortRequirement::to_sort_exprs(sort_requirement.to_vec());
+ let new_sort = SortExec::new(sort_expr,
node.plan.clone()).with_fetch(fetch);
+
+ DistributionContext {
+ plan: Arc::new(if
node.plan.output_partitioning().partition_count() > 1 {
+ new_sort.with_preserve_partitioning(true)
+ } else {
+ new_sort
+ }),
+ distribution_connection: false,
+ children_nodes: vec![node],
}
+ } else {
+ node
}
- exec_tree.plan.clone().with_new_children(updated_children)
}
/// This function checks whether we need to add additional data exchange
@@ -1174,6 +1140,12 @@ fn ensure_distribution(
dist_context: DistributionContext,
config: &ConfigOptions,
) -> Result<Transformed<DistributionContext>> {
+ let dist_context = dist_context.update_children()?;
+
+ if dist_context.plan.children().is_empty() {
+ return Ok(Transformed::No(dist_context));
+ }
+
let target_partitions = config.execution.target_partitions;
// When `false`, round robin repartition will not be added to increase
parallelism
let enable_round_robin = config.optimizer.enable_round_robin_repartition;
@@ -1186,14 +1158,11 @@ fn ensure_distribution(
let order_preserving_variants_desirable =
is_unbounded || config.optimizer.prefer_existing_sort;
- if dist_context.plan.children().is_empty() {
- return Ok(Transformed::No(dist_context));
- }
-
// Remove unnecessary repartition from the physical plan if any
let DistributionContext {
mut plan,
- mut distribution_onwards,
+ distribution_connection,
+ children_nodes,
} = remove_dist_changing_operators(dist_context)?;
if let Some(exec) = plan.as_any().downcast_ref::<WindowAggExec>() {
@@ -1213,33 +1182,23 @@ fn ensure_distribution(
plan = updated_window;
}
};
- let n_children = plan.children().len();
+
// This loop iterates over all the children to:
// - Increase parallelism for every child if it is beneficial.
// - Satisfy the distribution requirements of every child, if it is not
// already satisfied.
// We store the updated children in `new_children`.
- let new_children = izip!(
- plan.children().into_iter(),
+ let children_nodes = izip!(
+ children_nodes.into_iter(),
plan.required_input_distribution().iter(),
plan.required_input_ordering().iter(),
- distribution_onwards.iter_mut(),
plan.benefits_from_input_partitioning(),
- plan.maintains_input_order(),
- 0..n_children
+ plan.maintains_input_order()
)
.map(
- |(
- mut child,
- requirement,
- required_input_ordering,
- dist_onward,
- would_benefit,
- maintains,
- child_idx,
- )| {
+ |(mut child, requirement, required_input_ordering, would_benefit,
maintains)| {
// Don't need to apply when the returned row count is not greater
than 1:
- let num_rows = child.statistics()?.num_rows;
+ let num_rows = child.plan.statistics()?.num_rows;
let repartition_beneficial_stats = if
num_rows.is_exact().unwrap_or(false) {
num_rows
.get_value()
@@ -1248,45 +1207,39 @@ fn ensure_distribution(
} else {
true
};
+
if enable_round_robin
// Operator benefits from partitioning (e.g. filter):
&& (would_benefit && repartition_beneficial_stats)
// Unless partitioning doesn't increase the partition count,
it is not beneficial:
- && child.output_partitioning().partition_count() <
target_partitions
+ && child.plan.output_partitioning().partition_count() <
target_partitions
{
// When `repartition_file_scans` is set, attempt to increase
// parallelism at the source.
if repartition_file_scans {
if let Some(new_child) =
- child.repartitioned(target_partitions, config)?
+ child.plan.repartitioned(target_partitions, config)?
{
- child = new_child;
+ child.plan = new_child;
}
}
// Increase parallelism by adding round-robin repartitioning
// on top of the operator. Note that we only do this if the
// partition count is not already equal to the desired
partition
// count.
- child = add_roundrobin_on_top(
- child,
- target_partitions,
- dist_onward,
- child_idx,
- )?;
+ child = add_roundrobin_on_top(child, target_partitions)?;
}
// Satisfy the distribution requirement if it is unmet.
match requirement {
Distribution::SinglePartition => {
- child = add_spm_on_top(child, dist_onward, child_idx);
+ child = add_spm_on_top(child);
}
Distribution::HashPartitioned(exprs) => {
child = add_hash_on_top(
child,
exprs.to_vec(),
target_partitions,
- dist_onward,
- child_idx,
repartition_beneficial_stats,
)?;
}
@@ -1299,31 +1252,38 @@ fn ensure_distribution(
// - Ordering requirement cannot be satisfied by preserving
ordering through repartitions, or
// - using order preserving variant is not desirable.
let ordering_satisfied = child
+ .plan
.equivalence_properties()
.ordering_satisfy_requirement(required_input_ordering);
- if !ordering_satisfied || !order_preserving_variants_desirable
{
- replace_order_preserving_variants(&mut child,
dist_onward)?;
+ if (!ordering_satisfied ||
!order_preserving_variants_desirable)
+ && child.distribution_connection
+ {
+ child = replace_order_preserving_variants(child)?;
// If ordering requirements were satisfied before
repartitioning,
// make sure ordering requirements are still satisfied
after.
if ordering_satisfied {
// Make sure to satisfy ordering requirement:
- add_sort_above(&mut child, required_input_ordering,
None);
+ child = add_sort_preserving_partitions(
+ child,
+ required_input_ordering,
+ None,
+ );
}
}
// Stop tracking distribution changing operators
- *dist_onward = None;
+ child.distribution_connection = false;
} else {
// no ordering requirement
match requirement {
// Operator requires specific distribution.
Distribution::SinglePartition |
Distribution::HashPartitioned(_) => {
// Since there is no ordering requirement, preserving
ordering is pointless
- replace_order_preserving_variants(&mut child,
dist_onward)?;
+ child = replace_order_preserving_variants(child)?;
}
Distribution::UnspecifiedDistribution => {
// Since ordering is lost, trying to preserve ordering
is pointless
- if !maintains {
- replace_order_preserving_variants(&mut child,
dist_onward)?;
+ if !maintains ||
plan.as_any().is::<OutputRequirementExec>() {
+ child = replace_order_preserving_variants(child)?;
}
}
}
@@ -1334,7 +1294,9 @@ fn ensure_distribution(
.collect::<Result<Vec<_>>>()?;
let new_distribution_context = DistributionContext {
- plan: if plan.as_any().is::<UnionExec>() &&
can_interleave(&new_children) {
+ plan: if plan.as_any().is::<UnionExec>()
+ && can_interleave(children_nodes.iter().map(|c| c.plan.clone()))
+ {
// Add a special case for [`UnionExec`] since we want to "bubble
up"
// hash-partitioned data. So instead of
//
@@ -1358,120 +1320,91 @@ fn ensure_distribution(
// - Agg:
// Repartition (hash):
// Data
- Arc::new(InterleaveExec::try_new(new_children)?)
+ Arc::new(InterleaveExec::try_new(
+ children_nodes.iter().map(|c| c.plan.clone()).collect(),
+ )?)
} else {
- plan.with_new_children(new_children)?
+ plan.with_new_children(
+ children_nodes.iter().map(|c| c.plan.clone()).collect(),
+ )?
},
- distribution_onwards,
+ distribution_connection,
+ children_nodes,
};
+
Ok(Transformed::Yes(new_distribution_context))
}
-/// A struct to keep track of distribution changing executors
+/// A struct to keep track of distribution changing operators
/// (`RepartitionExec`, `SortPreservingMergeExec`, `CoalescePartitionsExec`),
/// and their associated parents inside `plan`. Using this information,
/// we can optimize distribution of the plan if/when necessary.
#[derive(Debug, Clone)]
struct DistributionContext {
plan: Arc<dyn ExecutionPlan>,
- /// Keep track of associations for each child of the plan. If `None`,
- /// there is no distribution changing operator in its descendants.
- distribution_onwards: Vec<Option<ExecTree>>,
+ /// Indicates whether this plan is connected to a distribution-changing
+ /// operator.
+ distribution_connection: bool,
+ children_nodes: Vec<Self>,
}
impl DistributionContext {
- /// Creates an empty context.
+ /// Creates a tree according to the plan with empty states.
fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
- let length = plan.children().len();
- DistributionContext {
+ let children = plan.children();
+ Self {
plan,
- distribution_onwards: vec![None; length],
+ distribution_connection: false,
+ children_nodes: children.into_iter().map(Self::new).collect(),
}
}
- /// Constructs a new context from children contexts.
- fn new_from_children_nodes(
- children_nodes: Vec<DistributionContext>,
- parent_plan: Arc<dyn ExecutionPlan>,
- ) -> Result<Self> {
- let children_plans = children_nodes
- .iter()
- .map(|item| item.plan.clone())
- .collect();
- let distribution_onwards = children_nodes
- .into_iter()
- .enumerate()
- .map(|(idx, context)| {
- let DistributionContext {
- plan,
- // The `distribution_onwards` tree keeps track of operators
- // that change distribution, or preserves the existing
- // distribution (starting from an operator that change
distribution).
- distribution_onwards,
- } = context;
- if plan.children().is_empty() {
- // Plan has no children, there is nothing to propagate.
- None
- } else if distribution_onwards[0].is_none() {
- if let Some(repartition) =
- plan.as_any().downcast_ref::<RepartitionExec>()
- {
- match repartition.partitioning() {
- Partitioning::RoundRobinBatch(_)
- | Partitioning::Hash(_, _) => {
- // Start tracking operators starting from this
repartition (either roundrobin or hash):
- return Some(ExecTree::new(plan, idx, vec![]));
- }
- _ => {}
- }
- } else if plan.as_any().is::<SortPreservingMergeExec>()
- || plan.as_any().is::<CoalescePartitionsExec>()
- {
- // Start tracking operators starting from this sort
preserving merge:
- return Some(ExecTree::new(plan, idx, vec![]));
- }
- None
- } else {
- // Propagate children distribution tracking to the above
- let new_distribution_onwards = izip!(
- plan.required_input_distribution().iter(),
- distribution_onwards.into_iter()
- )
- .flat_map(|(required_dist, distribution_onwards)| {
- if let Some(distribution_onwards) =
distribution_onwards {
- // Operator can safely propagate the distribution
above.
- // This is similar to maintaining order in the
EnforceSorting rule.
- if let Distribution::UnspecifiedDistribution =
required_dist {
- return Some(distribution_onwards);
- }
- }
- None
- })
- .collect::<Vec<_>>();
- // Either:
- // - None of the children has a connection to an operator
that modifies distribution, or
- // - The current operator requires distribution at its
input so doesn't propagate it above.
- if new_distribution_onwards.is_empty() {
- None
- } else {
- Some(ExecTree::new(plan, idx,
new_distribution_onwards))
- }
+ fn update_children(mut self) -> Result<Self> {
+ for child_context in self.children_nodes.iter_mut() {
+ child_context.distribution_connection = match
child_context.plan.as_any() {
+ plan_any if plan_any.is::<RepartitionExec>() => matches!(
+ plan_any
+ .downcast_ref::<RepartitionExec>()
+ .unwrap()
+ .partitioning(),
+ Partitioning::RoundRobinBatch(_) | Partitioning::Hash(_, _)
+ ),
+ plan_any
+ if plan_any.is::<SortPreservingMergeExec>()
+ || plan_any.is::<CoalescePartitionsExec>() =>
+ {
+ true
}
- })
- .collect();
- Ok(DistributionContext {
- plan: with_new_children_if_necessary(parent_plan,
children_plans)?.into(),
- distribution_onwards,
- })
- }
+ _ => {
+ child_context.plan.children().is_empty()
+ ||
child_context.children_nodes[0].distribution_connection
+ || child_context
+ .plan
+ .required_input_distribution()
+ .iter()
+ .zip(child_context.children_nodes.iter())
+ .any(|(required_dist, child_context)| {
+ child_context.distribution_connection
+ && matches!(
+ required_dist,
+ Distribution::UnspecifiedDistribution
+ )
+ })
+ }
+ };
+ }
- /// Computes distribution tracking contexts for every child of the plan.
- fn children(&self) -> Vec<DistributionContext> {
- self.plan
- .children()
- .into_iter()
- .map(DistributionContext::new)
- .collect()
+ let children_plans = self
+ .children_nodes
+ .iter()
+ .map(|context| context.plan.clone())
+ .collect::<Vec<_>>();
+
+ Ok(Self {
+ plan: with_new_children_if_necessary(self.plan,
children_plans)?.into(),
+ distribution_connection: false,
+ children_nodes: self.children_nodes,
+ })
}
}
@@ -1480,8 +1413,8 @@ impl TreeNode for DistributionContext {
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
- for child in self.children() {
- match op(&child)? {
+ for child in &self.children_nodes {
+ match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
@@ -1490,20 +1423,23 @@ impl TreeNode for DistributionContext {
Ok(VisitRecursion::Continue)
}
- fn map_children<F>(self, transform: F) -> Result<Self>
+ fn map_children<F>(mut self, transform: F) -> Result<Self>
where
F: FnMut(Self) -> Result<Self>,
{
- let children = self.children();
- if children.is_empty() {
- Ok(self)
- } else {
- let children_nodes = children
+ if !self.children_nodes.is_empty() {
+ self.children_nodes = self
+ .children_nodes
.into_iter()
.map(transform)
- .collect::<Result<Vec<_>>>()?;
- DistributionContext::new_from_children_nodes(children_nodes,
self.plan)
+ .collect::<Result<_>>()?;
+ self.plan = with_new_children_if_necessary(
+ self.plan,
+ self.children_nodes.iter().map(|c| c.plan.clone()).collect(),
+ )?
+ .into();
}
+ Ok(self)
}
}
@@ -1512,11 +1448,11 @@ impl fmt::Display for DistributionContext {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let plan_string = get_plan_string(&self.plan);
write!(f, "plan: {:?}", plan_string)?;
- for (idx, child) in self.distribution_onwards.iter().enumerate() {
- if let Some(child) = child {
- write!(f, "idx:{:?}, exec_tree:{}", idx, child)?;
- }
- }
+ write!(
+ f,
+ "distribution_connection:{}",
+ self.distribution_connection,
+ )?;
write!(f, "")
}
}
@@ -1532,37 +1468,18 @@ struct PlanWithKeyRequirements {
plan: Arc<dyn ExecutionPlan>,
/// Parent required key ordering
required_key_ordering: Vec<Arc<dyn PhysicalExpr>>,
- /// The request key ordering to children
- request_key_ordering: Vec<Option<Vec<Arc<dyn PhysicalExpr>>>>,
+ children: Vec<Self>,
}
impl PlanWithKeyRequirements {
fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
- let children_len = plan.children().len();
- PlanWithKeyRequirements {
+ let children = plan.children();
+ Self {
plan,
required_key_ordering: vec![],
- request_key_ordering: vec![None; children_len],
+ children: children.into_iter().map(Self::new).collect(),
}
}
-
- fn children(&self) -> Vec<PlanWithKeyRequirements> {
- let plan_children = self.plan.children();
- assert_eq!(plan_children.len(), self.request_key_ordering.len());
- plan_children
- .into_iter()
- .zip(self.request_key_ordering.clone())
- .map(|(child, required)| {
- let from_parent = required.unwrap_or_default();
- let length = child.children().len();
- PlanWithKeyRequirements {
- plan: child,
- required_key_ordering: from_parent,
- request_key_ordering: vec![None; length],
- }
- })
- .collect()
- }
}
impl TreeNode for PlanWithKeyRequirements {
@@ -1570,9 +1487,8 @@ impl TreeNode for PlanWithKeyRequirements {
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
- let children = self.children();
- for child in children {
- match op(&child)? {
+ for child in &self.children {
+ match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
@@ -1582,28 +1498,23 @@ impl TreeNode for PlanWithKeyRequirements {
Ok(VisitRecursion::Continue)
}
- fn map_children<F>(self, transform: F) -> Result<Self>
+ fn map_children<F>(mut self, transform: F) -> Result<Self>
where
F: FnMut(Self) -> Result<Self>,
{
- let children = self.children();
- if !children.is_empty() {
- let new_children: Result<Vec<_>> =
- children.into_iter().map(transform).collect();
-
- let children_plans = new_children?
+ if !self.children.is_empty() {
+ self.children = self
+ .children
.into_iter()
- .map(|child| child.plan)
- .collect::<Vec<_>>();
- let new_plan = with_new_children_if_necessary(self.plan,
children_plans)?;
- Ok(PlanWithKeyRequirements {
- plan: new_plan.into(),
- required_key_ordering: self.required_key_ordering,
- request_key_ordering: self.request_key_ordering,
- })
- } else {
- Ok(self)
+ .map(transform)
+ .collect::<Result<_>>()?;
+ self.plan = with_new_children_if_necessary(
+ self.plan,
+ self.children.iter().map(|c| c.plan.clone()).collect(),
+ )?
+ .into();
}
+ Ok(self)
}
}
diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
index 2ecc1e11b9..77d04a61c5 100644
--- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
@@ -44,7 +44,7 @@ use
crate::physical_optimizer::replace_with_order_preserving_variants::{
use crate::physical_optimizer::sort_pushdown::{pushdown_sorts, SortPushDown};
use crate::physical_optimizer::utils::{
add_sort_above, is_coalesce_partitions, is_limit, is_repartition, is_sort,
- is_sort_preserving_merge, is_union, is_window, ExecTree,
+ is_sort_preserving_merge, is_union, is_window,
};
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -81,78 +81,66 @@ impl EnforceSorting {
#[derive(Debug, Clone)]
struct PlanWithCorrespondingSort {
plan: Arc<dyn ExecutionPlan>,
- // For every child, keep a subtree of `ExecutionPlan`s starting from the
- // child until the `SortExec`(s) -- could be multiple for n-ary plans like
- // Union -- that determine the output ordering of the child. If the child
- // has no connection to any sort, simply store None (and not a subtree).
- sort_onwards: Vec<Option<ExecTree>>,
+ // For every child, track `ExecutionPlan`s starting from the child until
+ // the `SortExec`(s). If the child has no connection to any sort, it simply
+ // stores false.
+ sort_connection: bool,
+ children_nodes: Vec<Self>,
}
impl PlanWithCorrespondingSort {
fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
- let length = plan.children().len();
- PlanWithCorrespondingSort {
+ let children = plan.children();
+ Self {
plan,
- sort_onwards: vec![None; length],
+ sort_connection: false,
+ children_nodes: children.into_iter().map(Self::new).collect(),
}
}
- fn new_from_children_nodes(
- children_nodes: Vec<PlanWithCorrespondingSort>,
+ fn update_children(
parent_plan: Arc<dyn ExecutionPlan>,
+ mut children_nodes: Vec<Self>,
) -> Result<Self> {
- let children_plans = children_nodes
- .iter()
- .map(|item| item.plan.clone())
- .collect::<Vec<_>>();
- let sort_onwards = children_nodes
- .into_iter()
- .enumerate()
- .map(|(idx, item)| {
- let plan = &item.plan;
- // Leaves of `sort_onwards` are `SortExec` operators, which
impose
- // an ordering. This tree collects all the intermediate
executors
- // that maintain this ordering. If we just saw a order imposing
- // operator, we reset the tree and start accumulating.
- if is_sort(plan) {
- return Some(ExecTree::new(item.plan, idx, vec![]));
- } else if is_limit(plan) {
- // There is no sort linkage for this path, it starts at a
limit.
- return None;
- }
+ for node in children_nodes.iter_mut() {
+ let plan = &node.plan;
+ // Leaves of `sort_onwards` are `SortExec` operators, which impose
+ // an ordering. This tree collects all the intermediate executors
+ // that maintain this ordering. If we just saw a order imposing
+ // operator, we reset the tree and start accumulating.
+ node.sort_connection = if is_sort(plan) {
+ // Initiate connection
+ true
+ } else if is_limit(plan) {
+ // There is no sort linkage for this path, it starts at a
limit.
+ false
+ } else {
let is_spm = is_sort_preserving_merge(plan);
let required_orderings = plan.required_input_ordering();
let flags = plan.maintains_input_order();
- let children = izip!(flags, item.sort_onwards,
required_orderings)
- .filter_map(|(maintains, element, required_ordering)| {
- if (required_ordering.is_none() && maintains) ||
is_spm {
- element
- } else {
- None
- }
- })
- .collect::<Vec<ExecTree>>();
- if !children.is_empty() {
- // Add parent node to the tree if there is at least one
- // child with a subtree:
- Some(ExecTree::new(item.plan, idx, children))
- } else {
- // There is no sort linkage for this child, do nothing.
- None
- }
- })
- .collect();
+ // Add parent node to the tree if there is at least one
+ // child with a sort connection:
+ izip!(flags, required_orderings).any(|(maintains,
required_ordering)| {
+ let propagates_ordering =
+ (maintains && required_ordering.is_none()) || is_spm;
+ let connected_to_sort =
+ node.children_nodes.iter().any(|item|
item.sort_connection);
+ propagates_ordering && connected_to_sort
+ })
+ }
+ }
+ let children_plans = children_nodes
+ .iter()
+ .map(|item| item.plan.clone())
+ .collect::<Vec<_>>();
let plan = with_new_children_if_necessary(parent_plan,
children_plans)?.into();
- Ok(PlanWithCorrespondingSort { plan, sort_onwards })
- }
- fn children(&self) -> Vec<PlanWithCorrespondingSort> {
- self.plan
- .children()
- .into_iter()
- .map(PlanWithCorrespondingSort::new)
- .collect()
+ Ok(Self {
+ plan,
+ sort_connection: false,
+ children_nodes,
+ })
}
}
@@ -161,9 +149,8 @@ impl TreeNode for PlanWithCorrespondingSort {
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
- let children = self.children();
- for child in children {
- match op(&child)? {
+ for child in &self.children_nodes {
+ match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
@@ -173,102 +160,79 @@ impl TreeNode for PlanWithCorrespondingSort {
Ok(VisitRecursion::Continue)
}
- fn map_children<F>(self, transform: F) -> Result<Self>
+ fn map_children<F>(mut self, transform: F) -> Result<Self>
where
F: FnMut(Self) -> Result<Self>,
{
- let children = self.children();
- if children.is_empty() {
- Ok(self)
- } else {
- let children_nodes = children
+ if !self.children_nodes.is_empty() {
+ self.children_nodes = self
+ .children_nodes
.into_iter()
.map(transform)
- .collect::<Result<Vec<_>>>()?;
- PlanWithCorrespondingSort::new_from_children_nodes(children_nodes,
self.plan)
+ .collect::<Result<_>>()?;
+ self.plan = with_new_children_if_necessary(
+ self.plan,
+ self.children_nodes.iter().map(|c| c.plan.clone()).collect(),
+ )?
+ .into();
}
+ Ok(self)
}
}
-/// This object is used within the [EnforceSorting] rule to track the closest
+/// This object is used within the [`EnforceSorting`] rule to track the closest
/// [`CoalescePartitionsExec`] descendant(s) for every child of a plan.
#[derive(Debug, Clone)]
struct PlanWithCorrespondingCoalescePartitions {
plan: Arc<dyn ExecutionPlan>,
- // For every child, keep a subtree of `ExecutionPlan`s starting from the
- // child until the `CoalescePartitionsExec`(s) -- could be multiple for
- // n-ary plans like Union -- that affect the output partitioning of the
- // child. If the child has no connection to any `CoalescePartitionsExec`,
- // simply store None (and not a subtree).
- coalesce_onwards: Vec<Option<ExecTree>>,
+ // Stores whether the plan is a `CoalescePartitionsExec` or it is
connected to
+ // a `CoalescePartitionsExec` via its children.
+ coalesce_connection: bool,
+ children_nodes: Vec<Self>,
}
impl PlanWithCorrespondingCoalescePartitions {
+ /// Creates an empty tree with empty connections.
fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
- let length = plan.children().len();
- PlanWithCorrespondingCoalescePartitions {
+ let children = plan.children();
+ Self {
plan,
- coalesce_onwards: vec![None; length],
+ coalesce_connection: false,
+ children_nodes: children.into_iter().map(Self::new).collect(),
}
}
- fn new_from_children_nodes(
- children_nodes: Vec<PlanWithCorrespondingCoalescePartitions>,
- parent_plan: Arc<dyn ExecutionPlan>,
- ) -> Result<Self> {
- let children_plans = children_nodes
+ fn update_children(mut self) -> Result<Self> {
+ self.coalesce_connection = if self.plan.children().is_empty() {
+ // Plan has no children, it cannot be a `CoalescePartitionsExec`.
+ false
+ } else if is_coalesce_partitions(&self.plan) {
+ // Initiate a connection
+ true
+ } else {
+ self.children_nodes
+ .iter()
+ .enumerate()
+ .map(|(idx, node)| {
+ // Only consider operators that don't require a
+ // single partition, and connected to any coalesce
+ node.coalesce_connection
+ && !matches!(
+ self.plan.required_input_distribution()[idx],
+ Distribution::SinglePartition
+ )
+ // If all children are None. There is nothing to track,
set connection false.
+ })
+ .any(|c| c)
+ };
+
+ let children_plans = self
+ .children_nodes
.iter()
.map(|item| item.plan.clone())
.collect();
- let coalesce_onwards = children_nodes
- .into_iter()
- .enumerate()
- .map(|(idx, item)| {
- // Leaves of the `coalesce_onwards` tree are
`CoalescePartitionsExec`
- // operators. This tree collects all the intermediate
executors that
- // maintain a single partition. If we just saw a
`CoalescePartitionsExec`
- // operator, we reset the tree and start accumulating.
- let plan = item.plan;
- if plan.children().is_empty() {
- // Plan has no children, there is nothing to propagate.
- None
- } else if is_coalesce_partitions(&plan) {
- Some(ExecTree::new(plan, idx, vec![]))
- } else {
- let children = item
- .coalesce_onwards
- .into_iter()
- .flatten()
- .filter(|item| {
- // Only consider operators that don't require a
- // single partition.
- !matches!(
- plan.required_input_distribution()[item.idx],
- Distribution::SinglePartition
- )
- })
- .collect::<Vec<_>>();
- if children.is_empty() {
- None
- } else {
- Some(ExecTree::new(plan, idx, children))
- }
- }
- })
- .collect();
- let plan = with_new_children_if_necessary(parent_plan,
children_plans)?.into();
- Ok(PlanWithCorrespondingCoalescePartitions {
- plan,
- coalesce_onwards,
- })
- }
-
- fn children(&self) -> Vec<PlanWithCorrespondingCoalescePartitions> {
- self.plan
- .children()
- .into_iter()
- .map(PlanWithCorrespondingCoalescePartitions::new)
- .collect()
+ self.plan = with_new_children_if_necessary(self.plan,
children_plans)?.into();
+ Ok(self)
}
}
@@ -277,9 +241,8 @@ impl TreeNode for PlanWithCorrespondingCoalescePartitions {
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
- let children = self.children();
- for child in children {
- match op(&child)? {
+ for child in &self.children_nodes {
+ match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
@@ -289,23 +252,23 @@ impl TreeNode for PlanWithCorrespondingCoalescePartitions
{
Ok(VisitRecursion::Continue)
}
- fn map_children<F>(self, transform: F) -> Result<Self>
+ fn map_children<F>(mut self, transform: F) -> Result<Self>
where
F: FnMut(Self) -> Result<Self>,
{
- let children = self.children();
- if children.is_empty() {
- Ok(self)
- } else {
- let children_nodes = children
+ if !self.children_nodes.is_empty() {
+ self.children_nodes = self
+ .children_nodes
.into_iter()
.map(transform)
- .collect::<Result<Vec<_>>>()?;
- PlanWithCorrespondingCoalescePartitions::new_from_children_nodes(
- children_nodes,
+ .collect::<Result<_>>()?;
+ self.plan = with_new_children_if_necessary(
self.plan,
- )
+ self.children_nodes.iter().map(|c| c.plan.clone()).collect(),
+ )?
+ .into();
}
+ Ok(self)
}
}
@@ -332,6 +295,7 @@ impl PhysicalOptimizerRule for EnforceSorting {
} else {
adjusted.plan
};
+
let plan_with_pipeline_fixer = OrderPreservationContext::new(new_plan);
let updated_plan =
plan_with_pipeline_fixer.transform_up(&|plan_with_pipeline_fixer| {
@@ -345,7 +309,8 @@ impl PhysicalOptimizerRule for EnforceSorting {
// Execute a top-down traversal to exploit sort push-down opportunities
// missed by the bottom-up traversal:
- let sort_pushdown = SortPushDown::init(updated_plan.plan);
+ let mut sort_pushdown = SortPushDown::new(updated_plan.plan);
+ sort_pushdown.assign_initial_requirements();
let adjusted = sort_pushdown.transform_down(&pushdown_sorts)?;
Ok(adjusted.plan)
}
@@ -376,16 +341,21 @@ impl PhysicalOptimizerRule for EnforceSorting {
fn parallelize_sorts(
requirements: PlanWithCorrespondingCoalescePartitions,
) -> Result<Transformed<PlanWithCorrespondingCoalescePartitions>> {
- let plan = requirements.plan;
- let mut coalesce_onwards = requirements.coalesce_onwards;
- if plan.children().is_empty() || coalesce_onwards[0].is_none() {
+ let PlanWithCorrespondingCoalescePartitions {
+ mut plan,
+ coalesce_connection,
+ mut children_nodes,
+ } = requirements.update_children()?;
+
+ if plan.children().is_empty() || !children_nodes[0].coalesce_connection {
// We only take an action when the plan is either a SortExec, a
// SortPreservingMergeExec or a CoalescePartitionsExec, and they
// all have a single child. Therefore, if the first child is `None`,
// we can return immediately.
return Ok(Transformed::No(PlanWithCorrespondingCoalescePartitions {
plan,
- coalesce_onwards,
+ coalesce_connection,
+ children_nodes,
}));
} else if (is_sort(&plan) || is_sort_preserving_merge(&plan))
&& plan.output_partitioning().partition_count() <= 1
@@ -395,34 +365,30 @@ fn parallelize_sorts(
// executors don't require single partition), then we can replace
// the CoalescePartitionsExec + Sort cascade with a SortExec +
// SortPreservingMergeExec cascade to parallelize sorting.
- let mut prev_layer = plan.clone();
- update_child_to_remove_coalesce(&mut prev_layer, &mut
coalesce_onwards[0])?;
let (sort_exprs, fetch) = get_sort_exprs(&plan)?;
- add_sort_above(
- &mut prev_layer,
- &PhysicalSortRequirement::from_sort_exprs(sort_exprs),
- fetch,
- );
- let spm = SortPreservingMergeExec::new(sort_exprs.to_vec(), prev_layer)
- .with_fetch(fetch);
- return Ok(Transformed::Yes(PlanWithCorrespondingCoalescePartitions {
- plan: Arc::new(spm),
- coalesce_onwards: vec![None],
- }));
+ let sort_reqs = PhysicalSortRequirement::from_sort_exprs(sort_exprs);
+ let sort_exprs = sort_exprs.to_vec();
+ update_child_to_remove_coalesce(&mut plan, &mut children_nodes[0])?;
+ add_sort_above(&mut plan, &sort_reqs, fetch);
+ let spm = SortPreservingMergeExec::new(sort_exprs,
plan).with_fetch(fetch);
+
+ return Ok(Transformed::Yes(
+ PlanWithCorrespondingCoalescePartitions::new(Arc::new(spm)),
+ ));
} else if is_coalesce_partitions(&plan) {
// There is an unnecessary `CoalescePartitionsExec` in the plan.
- let mut prev_layer = plan.clone();
- update_child_to_remove_coalesce(&mut prev_layer, &mut
coalesce_onwards[0])?;
- let new_plan = plan.with_new_children(vec![prev_layer])?;
- return Ok(Transformed::Yes(PlanWithCorrespondingCoalescePartitions {
- plan: new_plan,
- coalesce_onwards: vec![None],
- }));
+ update_child_to_remove_coalesce(&mut plan, &mut children_nodes[0])?;
+
+ let new_plan = Arc::new(CoalescePartitionsExec::new(plan)) as _;
+ return Ok(Transformed::Yes(
+ PlanWithCorrespondingCoalescePartitions::new(new_plan),
+ ));
}
Ok(Transformed::Yes(PlanWithCorrespondingCoalescePartitions {
plan,
- coalesce_onwards,
+ coalesce_connection,
+ children_nodes,
}))
}
@@ -431,91 +397,102 @@ fn parallelize_sorts(
fn ensure_sorting(
requirements: PlanWithCorrespondingSort,
) -> Result<Transformed<PlanWithCorrespondingSort>> {
+ let requirements = PlanWithCorrespondingSort::update_children(
+ requirements.plan,
+ requirements.children_nodes,
+ )?;
+
// Perform naive analysis at the beginning -- remove already-satisfied
sorts:
if requirements.plan.children().is_empty() {
return Ok(Transformed::No(requirements));
}
- let plan = requirements.plan;
- let mut children = plan.children();
- let mut sort_onwards = requirements.sort_onwards;
- if let Some(result) = analyze_immediate_sort_removal(&plan, &sort_onwards)
{
+ if let Some(result) = analyze_immediate_sort_removal(&requirements) {
return Ok(Transformed::Yes(result));
}
- for (idx, (child, sort_onwards, required_ordering)) in izip!(
- children.iter_mut(),
- sort_onwards.iter_mut(),
- plan.required_input_ordering()
- )
- .enumerate()
+
+ let plan = requirements.plan;
+ let mut children_nodes = requirements.children_nodes;
+
+ for (idx, (child_node, required_ordering)) in
+ izip!(children_nodes.iter_mut(),
plan.required_input_ordering()).enumerate()
{
- let physical_ordering = child.output_ordering();
+ let mut child_plan = child_node.plan.clone();
+ let physical_ordering = child_plan.output_ordering();
match (required_ordering, physical_ordering) {
(Some(required_ordering), Some(_)) => {
- if !child
+ if !child_plan
.equivalence_properties()
.ordering_satisfy_requirement(&required_ordering)
{
// Make sure we preserve the ordering requirements:
- update_child_to_remove_unnecessary_sort(child,
sort_onwards, &plan)?;
- add_sort_above(child, &required_ordering, None);
- if is_sort(child) {
- *sort_onwards = Some(ExecTree::new(child.clone(), idx,
vec![]));
- } else {
- *sort_onwards = None;
+ update_child_to_remove_unnecessary_sort(idx, child_node,
&plan)?;
+ add_sort_above(&mut child_plan, &required_ordering, None);
+ if is_sort(&child_plan) {
+ *child_node =
PlanWithCorrespondingSort::update_children(
+ child_plan,
+ vec![child_node.clone()],
+ )?;
+ child_node.sort_connection = true;
}
}
}
(Some(required), None) => {
// Ordering requirement is not met, we should add a `SortExec`
to the plan.
- add_sort_above(child, &required, None);
- *sort_onwards = Some(ExecTree::new(child.clone(), idx,
vec![]));
+ add_sort_above(&mut child_plan, &required, None);
+ *child_node = PlanWithCorrespondingSort::update_children(
+ child_plan,
+ vec![child_node.clone()],
+ )?;
+ child_node.sort_connection = true;
}
(None, Some(_)) => {
// We have a `SortExec` whose effect may be neutralized by
// another order-imposing operator. Remove this sort.
if !plan.maintains_input_order()[idx] || is_union(&plan) {
- update_child_to_remove_unnecessary_sort(child,
sort_onwards, &plan)?;
+ update_child_to_remove_unnecessary_sort(idx, child_node,
&plan)?;
}
}
(None, None) => {
- update_child_to_remove_unnecessary_sort(child, sort_onwards,
&plan)?;
+ update_child_to_remove_unnecessary_sort(idx, child_node,
&plan)?;
}
}
}
// For window expressions, we can remove some sorts when we can
// calculate the result in reverse:
- if is_window(&plan) {
- if let Some(tree) = &mut sort_onwards[0] {
- if let Some(result) = analyze_window_sort_removal(tree, &plan)? {
- return Ok(Transformed::Yes(result));
- }
+ if is_window(&plan) && children_nodes[0].sort_connection {
+ if let Some(result) = analyze_window_sort_removal(&mut
children_nodes[0], &plan)?
+ {
+ return Ok(Transformed::Yes(result));
}
} else if is_sort_preserving_merge(&plan)
- && children[0].output_partitioning().partition_count() <= 1
+ && children_nodes[0]
+ .plan
+ .output_partitioning()
+ .partition_count()
+ <= 1
{
// This SortPreservingMergeExec is unnecessary, input already has a
// single partition.
- sort_onwards.truncate(1);
- return Ok(Transformed::Yes(PlanWithCorrespondingSort {
- plan: children.swap_remove(0),
- sort_onwards,
- }));
+ let child_node = children_nodes.swap_remove(0);
+ return Ok(Transformed::Yes(child_node));
}
- Ok(Transformed::Yes(PlanWithCorrespondingSort {
- plan: plan.with_new_children(children)?,
- sort_onwards,
- }))
+ Ok(Transformed::Yes(
+ PlanWithCorrespondingSort::update_children(plan, children_nodes)?,
+ ))
}
/// Analyzes a given [`SortExec`] (`plan`) to determine whether its input
/// already has a finer ordering than it enforces.
fn analyze_immediate_sort_removal(
- plan: &Arc<dyn ExecutionPlan>,
- sort_onwards: &[Option<ExecTree>],
+ node: &PlanWithCorrespondingSort,
) -> Option<PlanWithCorrespondingSort> {
+ let PlanWithCorrespondingSort {
+ plan,
+ children_nodes,
+ ..
+ } = node;
if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
let sort_input = sort_exec.input().clone();
-
// If this sort is unnecessary, we should remove it:
if sort_input
.equivalence_properties()
@@ -533,20 +510,33 @@ fn analyze_immediate_sort_removal(
sort_exec.expr().to_vec(),
sort_input,
));
- let new_tree = ExecTree::new(
- new_plan.clone(),
- 0,
- sort_onwards.iter().flat_map(|e| e.clone()).collect(),
- );
PlanWithCorrespondingSort {
plan: new_plan,
- sort_onwards: vec![Some(new_tree)],
+ // SortPreservingMergeExec has single child.
+ sort_connection: false,
+ children_nodes: children_nodes
+ .iter()
+ .cloned()
+ .map(|mut node| {
+ node.sort_connection = false;
+ node
+ })
+ .collect(),
}
} else {
// Remove the sort:
PlanWithCorrespondingSort {
plan: sort_input,
- sort_onwards: sort_onwards.to_vec(),
+ sort_connection: false,
+ children_nodes: children_nodes[0]
+ .children_nodes
+ .iter()
+ .cloned()
+ .map(|mut node| {
+ node.sort_connection = false;
+ node
+ })
+ .collect(),
}
},
);
@@ -558,15 +548,15 @@ fn analyze_immediate_sort_removal(
/// Analyzes a [`WindowAggExec`] or a [`BoundedWindowAggExec`] to determine
/// whether it may allow removing a sort.
fn analyze_window_sort_removal(
- sort_tree: &mut ExecTree,
+ sort_tree: &mut PlanWithCorrespondingSort,
window_exec: &Arc<dyn ExecutionPlan>,
) -> Result<Option<PlanWithCorrespondingSort>> {
let requires_single_partition = matches!(
- window_exec.required_input_distribution()[sort_tree.idx],
+ window_exec.required_input_distribution()[0],
Distribution::SinglePartition
);
- let mut window_child =
- remove_corresponding_sort_from_sub_plan(sort_tree,
requires_single_partition)?;
+ remove_corresponding_sort_from_sub_plan(sort_tree,
requires_single_partition)?;
+ let mut window_child = sort_tree.plan.clone();
let (window_expr, new_window) =
if let Some(exec) =
window_exec.as_any().downcast_ref::<BoundedWindowAggExec>() {
(
@@ -628,9 +618,9 @@ fn analyze_window_sort_removal(
/// Updates child to remove the unnecessary [`CoalescePartitionsExec`] below
it.
fn update_child_to_remove_coalesce(
child: &mut Arc<dyn ExecutionPlan>,
- coalesce_onwards: &mut Option<ExecTree>,
+ coalesce_onwards: &mut PlanWithCorrespondingCoalescePartitions,
) -> Result<()> {
- if let Some(coalesce_onwards) = coalesce_onwards {
+ if coalesce_onwards.coalesce_connection {
*child = remove_corresponding_coalesce_in_sub_plan(coalesce_onwards,
child)?;
}
Ok(())
@@ -638,10 +628,10 @@ fn update_child_to_remove_coalesce(
/// Removes the [`CoalescePartitionsExec`] from the plan in `coalesce_onwards`.
fn remove_corresponding_coalesce_in_sub_plan(
- coalesce_onwards: &mut ExecTree,
+ coalesce_onwards: &mut PlanWithCorrespondingCoalescePartitions,
parent: &Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>> {
- Ok(if is_coalesce_partitions(&coalesce_onwards.plan) {
+ if is_coalesce_partitions(&coalesce_onwards.plan) {
// We can safely use the 0th index since we have a
`CoalescePartitionsExec`.
let mut new_plan = coalesce_onwards.plan.children()[0].clone();
while new_plan.output_partitioning() == parent.output_partitioning()
@@ -650,89 +640,113 @@ fn remove_corresponding_coalesce_in_sub_plan(
{
new_plan = new_plan.children().swap_remove(0)
}
- new_plan
+ Ok(new_plan)
} else {
let plan = coalesce_onwards.plan.clone();
let mut children = plan.children();
- for item in &mut coalesce_onwards.children {
- children[item.idx] =
remove_corresponding_coalesce_in_sub_plan(item, &plan)?;
+ for (idx, node) in
coalesce_onwards.children_nodes.iter_mut().enumerate() {
+ if node.coalesce_connection {
+ children[idx] =
remove_corresponding_coalesce_in_sub_plan(node, &plan)?;
+ }
}
- plan.with_new_children(children)?
- })
+ plan.with_new_children(children)
+ }
}
/// Updates child to remove the unnecessary sort below it.
fn update_child_to_remove_unnecessary_sort(
- child: &mut Arc<dyn ExecutionPlan>,
- sort_onwards: &mut Option<ExecTree>,
+ child_idx: usize,
+ sort_onwards: &mut PlanWithCorrespondingSort,
parent: &Arc<dyn ExecutionPlan>,
) -> Result<()> {
- if let Some(sort_onwards) = sort_onwards {
+ if sort_onwards.sort_connection {
let requires_single_partition = matches!(
- parent.required_input_distribution()[sort_onwards.idx],
+ parent.required_input_distribution()[child_idx],
Distribution::SinglePartition
);
- *child = remove_corresponding_sort_from_sub_plan(
- sort_onwards,
- requires_single_partition,
- )?;
+ remove_corresponding_sort_from_sub_plan(sort_onwards,
requires_single_partition)?;
}
- *sort_onwards = None;
+ sort_onwards.sort_connection = false;
Ok(())
}
/// Removes the sort from the plan in `sort_onwards`.
fn remove_corresponding_sort_from_sub_plan(
- sort_onwards: &mut ExecTree,
+ sort_onwards: &mut PlanWithCorrespondingSort,
requires_single_partition: bool,
-) -> Result<Arc<dyn ExecutionPlan>> {
+) -> Result<()> {
// A `SortExec` is always at the bottom of the tree.
- let mut updated_plan = if is_sort(&sort_onwards.plan) {
- sort_onwards.plan.children().swap_remove(0)
+ if is_sort(&sort_onwards.plan) {
+ *sort_onwards = sort_onwards.children_nodes.swap_remove(0);
} else {
- let plan = &sort_onwards.plan;
- let mut children = plan.children();
- for item in &mut sort_onwards.children {
- let requires_single_partition = matches!(
- plan.required_input_distribution()[item.idx],
- Distribution::SinglePartition
- );
- children[item.idx] =
- remove_corresponding_sort_from_sub_plan(item,
requires_single_partition)?;
+ let PlanWithCorrespondingSort {
+ plan,
+ sort_connection: _,
+ children_nodes,
+ } = sort_onwards;
+ let mut any_connection = false;
+ for (child_idx, child_node) in children_nodes.iter_mut().enumerate() {
+ if child_node.sort_connection {
+ any_connection = true;
+ let requires_single_partition = matches!(
+ plan.required_input_distribution()[child_idx],
+ Distribution::SinglePartition
+ );
+ remove_corresponding_sort_from_sub_plan(
+ child_node,
+ requires_single_partition,
+ )?;
+ }
}
+ if any_connection || children_nodes.is_empty() {
+ *sort_onwards = PlanWithCorrespondingSort::update_children(
+ plan.clone(),
+ children_nodes.clone(),
+ )?;
+ }
+ let PlanWithCorrespondingSort {
+ plan,
+ children_nodes,
+ ..
+ } = sort_onwards;
// Replace with variants that do not preserve order.
if is_sort_preserving_merge(plan) {
- children.swap_remove(0)
+ children_nodes.swap_remove(0);
+ *plan = plan.children().swap_remove(0);
} else if let Some(repartition) =
plan.as_any().downcast_ref::<RepartitionExec>()
{
- Arc::new(
- // By default, RepartitionExec does not preserve order
- RepartitionExec::try_new(
- children.swap_remove(0),
- repartition.partitioning().clone(),
- )?,
- )
- } else {
- plan.clone().with_new_children(children)?
+ *plan = Arc::new(RepartitionExec::try_new(
+ children_nodes[0].plan.clone(),
+ repartition.output_partitioning(),
+ )?) as _;
}
};
// Deleting a merging sort may invalidate distribution requirements.
// Ensure that we stay compliant with such requirements:
if requires_single_partition
- && updated_plan.output_partitioning().partition_count() > 1
+ && sort_onwards.plan.output_partitioning().partition_count() > 1
{
// If there is existing ordering, to preserve ordering use
SortPreservingMergeExec
// instead of CoalescePartitionsExec.
- if let Some(ordering) = updated_plan.output_ordering() {
- updated_plan = Arc::new(SortPreservingMergeExec::new(
+ if let Some(ordering) = sort_onwards.plan.output_ordering() {
+ let plan = Arc::new(SortPreservingMergeExec::new(
ordering.to_vec(),
- updated_plan,
- ));
+ sort_onwards.plan.clone(),
+ )) as _;
+ *sort_onwards = PlanWithCorrespondingSort::update_children(
+ plan,
+ vec![sort_onwards.clone()],
+ )?;
} else {
- updated_plan = Arc::new(CoalescePartitionsExec::new(updated_plan));
+ let plan =
+
Arc::new(CoalescePartitionsExec::new(sort_onwards.plan.clone())) as _;
+ *sort_onwards = PlanWithCorrespondingSort::update_children(
+ plan,
+ vec![sort_onwards.clone()],
+ )?;
}
}
- Ok(updated_plan)
+ Ok(())
}
/// Converts an [ExecutionPlan] trait object to a [PhysicalSortExpr] slice
when possible.
diff --git a/datafusion/core/src/physical_optimizer/output_requirements.rs
b/datafusion/core/src/physical_optimizer/output_requirements.rs
index f8bf3bb965..4d03840d3d 100644
--- a/datafusion/core/src/physical_optimizer/output_requirements.rs
+++ b/datafusion/core/src/physical_optimizer/output_requirements.rs
@@ -147,6 +147,10 @@ impl ExecutionPlan for OutputRequirementExec {
self.input.output_ordering()
}
+ fn maintains_input_order(&self) -> Vec<bool> {
+ vec![true]
+ }
+
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.input.clone()]
}
diff --git a/datafusion/core/src/physical_optimizer/pipeline_checker.rs
b/datafusion/core/src/physical_optimizer/pipeline_checker.rs
index d59248aadf..9e9f647d07 100644
--- a/datafusion/core/src/physical_optimizer/pipeline_checker.rs
+++ b/datafusion/core/src/physical_optimizer/pipeline_checker.rs
@@ -24,13 +24,13 @@ use std::sync::Arc;
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::PhysicalOptimizerRule;
-use crate::physical_plan::joins::SymmetricHashJoinExec;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
use datafusion_common::config::OptimizerOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::{plan_err, DataFusionError};
use datafusion_physical_expr::intervals::utils::{check_support,
is_datatype_supported};
+use datafusion_physical_plan::joins::SymmetricHashJoinExec;
/// The PipelineChecker rule rejects non-runnable query plans that use
/// pipeline-breaking operators on infinite input(s).
@@ -70,14 +70,14 @@ impl PhysicalOptimizerRule for PipelineChecker {
pub struct PipelineStatePropagator {
pub(crate) plan: Arc<dyn ExecutionPlan>,
pub(crate) unbounded: bool,
- pub(crate) children: Vec<PipelineStatePropagator>,
+ pub(crate) children: Vec<Self>,
}
impl PipelineStatePropagator {
/// Constructs a new, default pipelining state.
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
let children = plan.children();
- PipelineStatePropagator {
+ Self {
plan,
unbounded: false,
children: children.into_iter().map(Self::new).collect(),
@@ -86,10 +86,7 @@ impl PipelineStatePropagator {
/// Returns the children unboundedness information.
pub fn children_unbounded(&self) -> Vec<bool> {
- self.children
- .iter()
- .map(|c| c.unbounded)
- .collect::<Vec<_>>()
+ self.children.iter().map(|c| c.unbounded).collect()
}
}
@@ -109,26 +106,23 @@ impl TreeNode for PipelineStatePropagator {
Ok(VisitRecursion::Continue)
}
- fn map_children<F>(self, transform: F) -> Result<Self>
+ fn map_children<F>(mut self, transform: F) -> Result<Self>
where
F: FnMut(Self) -> Result<Self>,
{
if !self.children.is_empty() {
- let new_children = self
+ self.children = self
.children
.into_iter()
.map(transform)
- .collect::<Result<Vec<_>>>()?;
- let children_plans = new_children.iter().map(|c|
c.plan.clone()).collect();
-
- Ok(PipelineStatePropagator {
- plan: with_new_children_if_necessary(self.plan,
children_plans)?.into(),
- unbounded: self.unbounded,
- children: new_children,
- })
- } else {
- Ok(self)
+ .collect::<Result<_>>()?;
+ self.plan = with_new_children_if_necessary(
+ self.plan,
+ self.children.iter().map(|c| c.plan.clone()).collect(),
+ )?
+ .into();
}
+ Ok(self)
}
}
diff --git
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index 0ff7e9f48e..91f3d2abc6 100644
---
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -21,14 +21,13 @@
use std::sync::Arc;
+use super::utils::is_repartition;
use crate::error::Result;
-use crate::physical_optimizer::utils::{is_coalesce_partitions, is_sort,
ExecTree};
+use crate::physical_optimizer::utils::{is_coalesce_partitions, is_sort};
use crate::physical_plan::repartition::RepartitionExec;
use
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
-use super::utils::is_repartition;
-
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_physical_plan::unbounded_output;
@@ -40,80 +39,67 @@ use datafusion_physical_plan::unbounded_output;
#[derive(Debug, Clone)]
pub(crate) struct OrderPreservationContext {
pub(crate) plan: Arc<dyn ExecutionPlan>,
- ordering_onwards: Vec<Option<ExecTree>>,
+ ordering_connection: bool,
+ children_nodes: Vec<Self>,
}
impl OrderPreservationContext {
- /// Creates a "default" order-preservation context.
+ /// Creates an empty context tree. Each node has `false` connections.
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
- let length = plan.children().len();
- OrderPreservationContext {
+ let children = plan.children();
+ Self {
plan,
- ordering_onwards: vec![None; length],
+ ordering_connection: false,
+ children_nodes: children.into_iter().map(Self::new).collect(),
}
}
/// Creates a new order-preservation context from those of children nodes.
- pub fn new_from_children_nodes(
- children_nodes: Vec<OrderPreservationContext>,
- parent_plan: Arc<dyn ExecutionPlan>,
- ) -> Result<Self> {
- let children_plans = children_nodes
- .iter()
- .map(|item| item.plan.clone())
- .collect();
- let ordering_onwards = children_nodes
- .into_iter()
- .enumerate()
- .map(|(idx, item)| {
- // `ordering_onwards` tree keeps track of executors that
maintain
- // ordering, (or that can maintain ordering with the
replacement of
- // its variant)
- let plan = item.plan;
- let children = plan.children();
- let ordering_onwards = item.ordering_onwards;
- if children.is_empty() {
- // Plan has no children, there is nothing to propagate.
- None
- } else if ordering_onwards[0].is_none()
- && ((is_repartition(&plan) &&
!plan.maintains_input_order()[0])
- || (is_coalesce_partitions(&plan)
- && children[0].output_ordering().is_some()))
- {
- Some(ExecTree::new(plan, idx, vec![]))
- } else {
- let children = ordering_onwards
- .into_iter()
- .flatten()
- .filter(|item| {
- // Only consider operators that maintains ordering
- plan.maintains_input_order()[item.idx]
- || is_coalesce_partitions(&plan)
- || is_repartition(&plan)
- })
- .collect::<Vec<_>>();
- if children.is_empty() {
- None
- } else {
- Some(ExecTree::new(plan, idx, children))
- }
- }
- })
- .collect();
- let plan = with_new_children_if_necessary(parent_plan,
children_plans)?.into();
- Ok(OrderPreservationContext {
- plan,
- ordering_onwards,
- })
- }
+ pub fn update_children(mut self) -> Result<Self> {
+ for node in self.children_nodes.iter_mut() {
+ let plan = node.plan.clone();
+ let children = plan.children();
+ let maintains_input_order = plan.maintains_input_order();
+ let inspect_child = |idx| {
+ maintains_input_order[idx]
+ || is_coalesce_partitions(&plan)
+ || is_repartition(&plan)
+ };
+
+ // We cut the path towards nodes that do not maintain ordering.
+ for (idx, c) in node.children_nodes.iter_mut().enumerate() {
+ c.ordering_connection &= inspect_child(idx);
+ }
+
+ node.ordering_connection = if children.is_empty() {
+ false
+ } else if !node.children_nodes[0].ordering_connection
+ && ((is_repartition(&plan) && !maintains_input_order[0])
+ || (is_coalesce_partitions(&plan)
+ && children[0].output_ordering().is_some()))
+ {
+ // We either have a RepartitionExec or a CoalescePartitionsExec
+ // and they lose their input ordering, so initiate connection:
+ true
+ } else {
+ // Maintain connection if there is a child with a connection,
+ // and operator can possibly maintain that connection (either
+ // in its current form or when we replace it with the
corresponding
+ // order preserving operator).
+ node.children_nodes
+ .iter()
+ .enumerate()
+ .any(|(idx, c)| c.ordering_connection &&
inspect_child(idx))
+ }
+ }
- /// Computes order-preservation contexts for every child of the plan.
- pub fn children(&self) -> Vec<OrderPreservationContext> {
- self.plan
- .children()
- .into_iter()
- .map(OrderPreservationContext::new)
- .collect()
+ self.plan = with_new_children_if_necessary(
+ self.plan,
+ self.children_nodes.iter().map(|c| c.plan.clone()).collect(),
+ )?
+ .into();
+ self.ordering_connection = false;
+ Ok(self)
}
}
@@ -122,8 +108,8 @@ impl TreeNode for OrderPreservationContext {
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
- for child in self.children() {
- match op(&child)? {
+ for child in &self.children_nodes {
+ match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
@@ -132,68 +118,88 @@ impl TreeNode for OrderPreservationContext {
Ok(VisitRecursion::Continue)
}
- fn map_children<F>(self, transform: F) -> Result<Self>
+ fn map_children<F>(mut self, transform: F) -> Result<Self>
where
F: FnMut(Self) -> Result<Self>,
{
- let children = self.children();
- if children.is_empty() {
- Ok(self)
- } else {
- let children_nodes = children
+ if !self.children_nodes.is_empty() {
+ self.children_nodes = self
+ .children_nodes
.into_iter()
.map(transform)
- .collect::<Result<Vec<_>>>()?;
- OrderPreservationContext::new_from_children_nodes(children_nodes,
self.plan)
+ .collect::<Result<_>>()?;
+ self.plan = with_new_children_if_necessary(
+ self.plan,
+ self.children_nodes.iter().map(|c| c.plan.clone()).collect(),
+ )?
+ .into();
}
+ Ok(self)
}
}
-/// Calculates the updated plan by replacing executors that lose ordering
-/// inside the `ExecTree` with their order-preserving variants. This will
+/// Calculates the updated plan by replacing operators that lose ordering
+/// inside `sort_input` with their order-preserving variants. This will
/// generate an alternative plan, which will be accepted or rejected later on
/// depending on whether it helps us remove a `SortExec`.
fn get_updated_plan(
- exec_tree: &ExecTree,
+ mut sort_input: OrderPreservationContext,
// Flag indicating that it is desirable to replace `RepartitionExec`s with
// `SortPreservingRepartitionExec`s:
is_spr_better: bool,
// Flag indicating that it is desirable to replace
`CoalescePartitionsExec`s
// with `SortPreservingMergeExec`s:
is_spm_better: bool,
-) -> Result<Arc<dyn ExecutionPlan>> {
- let plan = exec_tree.plan.clone();
+) -> Result<OrderPreservationContext> {
+ let updated_children = sort_input
+ .children_nodes
+ .clone()
+ .into_iter()
+ .map(|item| {
+ // Update children and their descendants in the given tree if the
connection is open:
+ if item.ordering_connection {
+ get_updated_plan(item, is_spr_better, is_spm_better)
+ } else {
+ Ok(item)
+ }
+ })
+ .collect::<Result<Vec<_>>>()?;
- let mut children = plan.children();
- // Update children and their descendants in the given tree:
- for item in &exec_tree.children {
- children[item.idx] = get_updated_plan(item, is_spr_better,
is_spm_better)?;
- }
- // Construct the plan with updated children:
- let mut plan = plan.with_new_children(children)?;
+ sort_input.plan = sort_input
+ .plan
+ .with_new_children(updated_children.iter().map(|c|
c.plan.clone()).collect())?;
+ sort_input.ordering_connection = false;
+ sort_input.children_nodes = updated_children;
// When a `RepartitionExec` doesn't preserve ordering, replace it with
- // a `SortPreservingRepartitionExec` if appropriate:
- if is_repartition(&plan) && !plan.maintains_input_order()[0] &&
is_spr_better {
- let child = plan.children().swap_remove(0);
- let repartition = RepartitionExec::try_new(child,
plan.output_partitioning())?
- .with_preserve_order();
- plan = Arc::new(repartition) as _
- }
- // When the input of a `CoalescePartitionsExec` has an ordering, replace it
- // with a `SortPreservingMergeExec` if appropriate:
- let mut children = plan.children();
- if is_coalesce_partitions(&plan)
- && children[0].output_ordering().is_some()
- && is_spm_better
+ // a sort-preserving variant if appropriate:
+ if is_repartition(&sort_input.plan)
+ && !sort_input.plan.maintains_input_order()[0]
+ && is_spr_better
{
- let child = children.swap_remove(0);
- plan = Arc::new(SortPreservingMergeExec::new(
- child.output_ordering().unwrap_or(&[]).to_vec(),
- child,
- )) as _
+ let child = sort_input.plan.children().swap_remove(0);
+ let repartition =
+ RepartitionExec::try_new(child,
sort_input.plan.output_partitioning())?
+ .with_preserve_order();
+ sort_input.plan = Arc::new(repartition) as _;
+ sort_input.children_nodes[0].ordering_connection = true;
+ } else if is_coalesce_partitions(&sort_input.plan) && is_spm_better {
+ // When the input of a `CoalescePartitionsExec` has an ordering,
replace it
+ // with a `SortPreservingMergeExec` if appropriate:
+ if let Some(ordering) = sort_input.children_nodes[0]
+ .plan
+ .output_ordering()
+ .map(|o| o.to_vec())
+ {
+ // Now we can mutate `new_node.children_nodes` safely
+ let child = sort_input.children_nodes.clone().swap_remove(0);
+ sort_input.plan =
+ Arc::new(SortPreservingMergeExec::new(ordering, child.plan))
as _;
+ sort_input.children_nodes[0].ordering_connection = true;
+ }
}
- Ok(plan)
+
+ Ok(sort_input)
}
/// The `replace_with_order_preserving_variants` optimizer sub-rule tries to
@@ -211,11 +217,11 @@ fn get_updated_plan(
///
/// The algorithm flow is simply like this:
/// 1. Visit nodes of the physical plan bottom-up and look for `SortExec`
nodes.
-/// 1_1. During the traversal, build an `ExecTree` to keep track of operators
-/// that maintain ordering (or can maintain ordering when replaced by an
-/// order-preserving variant) until a `SortExec` is found.
+/// 1_1. During the traversal, keep track of operators that maintain ordering
+/// (or can maintain ordering when replaced by an order-preserving variant)
until
+/// a `SortExec` is found.
/// 2. When a `SortExec` is found, update the child of the `SortExec` by
replacing
-/// operators that do not preserve ordering in the `ExecTree` with their
order
+/// operators that do not preserve ordering in the tree with their order
/// preserving variants.
/// 3. Check if the `SortExec` is still necessary in the updated plan by
comparing
/// its input ordering with the output ordering it imposes. We do this
because
@@ -239,37 +245,41 @@ pub(crate) fn replace_with_order_preserving_variants(
is_spm_better: bool,
config: &ConfigOptions,
) -> Result<Transformed<OrderPreservationContext>> {
- let plan = &requirements.plan;
- let ordering_onwards = &requirements.ordering_onwards;
- if is_sort(plan) {
- let exec_tree = if let Some(exec_tree) = &ordering_onwards[0] {
- exec_tree
- } else {
- return Ok(Transformed::No(requirements));
- };
- // For unbounded cases, replace with the order-preserving variant in
- // any case, as doing so helps fix the pipeline.
- // Also do the replacement if opted-in via config options.
- let use_order_preserving_variant =
- config.optimizer.prefer_existing_sort || unbounded_output(plan);
- let updated_sort_input = get_updated_plan(
- exec_tree,
- is_spr_better || use_order_preserving_variant,
- is_spm_better || use_order_preserving_variant,
- )?;
- // If this sort is unnecessary, we should remove it and update the
plan:
- if updated_sort_input
- .equivalence_properties()
- .ordering_satisfy(plan.output_ordering().unwrap_or(&[]))
- {
- return Ok(Transformed::Yes(OrderPreservationContext {
- plan: updated_sort_input,
- ordering_onwards: vec![None],
- }));
- }
+ let mut requirements = requirements.update_children()?;
+ if !(is_sort(&requirements.plan)
+ && requirements.children_nodes[0].ordering_connection)
+ {
+ return Ok(Transformed::No(requirements));
}
- Ok(Transformed::No(requirements))
+ // For unbounded cases, replace with the order-preserving variant in
+ // any case, as doing so helps fix the pipeline.
+ // Also do the replacement if opted-in via config options.
+ let use_order_preserving_variant =
+ config.optimizer.prefer_existing_sort ||
unbounded_output(&requirements.plan);
+
+ let mut updated_sort_input = get_updated_plan(
+ requirements.children_nodes.clone().swap_remove(0),
+ is_spr_better || use_order_preserving_variant,
+ is_spm_better || use_order_preserving_variant,
+ )?;
+
+ // If this sort is unnecessary, we should remove it and update the plan:
+ if updated_sort_input
+ .plan
+ .equivalence_properties()
+ .ordering_satisfy(requirements.plan.output_ordering().unwrap_or(&[]))
+ {
+ for child in updated_sort_input.children_nodes.iter_mut() {
+ child.ordering_connection = false;
+ }
+ Ok(Transformed::Yes(updated_sort_input))
+ } else {
+ for child in requirements.children_nodes.iter_mut() {
+ child.ordering_connection = false;
+ }
+ Ok(Transformed::Yes(requirements))
+ }
}
#[cfg(test)]
diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs
b/datafusion/core/src/physical_optimizer/sort_pushdown.rs
index b9502d92ac..b001386301 100644
--- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs
@@ -36,8 +36,6 @@ use datafusion_physical_expr::{
LexRequirementRef, PhysicalSortExpr, PhysicalSortRequirement,
};
-use itertools::izip;
-
/// This is a "data class" we use within the [`EnforceSorting`] rule to push
/// down [`SortExec`] in the plan. In some cases, we can reduce the total
/// computational cost by pushing down `SortExec`s through some executors.
@@ -49,35 +47,26 @@ pub(crate) struct SortPushDown {
pub plan: Arc<dyn ExecutionPlan>,
/// Parent required sort ordering
required_ordering: Option<Vec<PhysicalSortRequirement>>,
- /// The adjusted request sort ordering to children.
- /// By default they are the same as the plan's required input ordering,
but can be adjusted based on parent required sort ordering properties.
- adjusted_request_ordering: Vec<Option<Vec<PhysicalSortRequirement>>>,
+ children_nodes: Vec<Self>,
}
impl SortPushDown {
- pub fn init(plan: Arc<dyn ExecutionPlan>) -> Self {
- let request_ordering = plan.required_input_ordering();
- SortPushDown {
+ /// Creates an empty tree with empty `required_ordering`'s.
+ pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
+ let children = plan.children();
+ Self {
plan,
required_ordering: None,
- adjusted_request_ordering: request_ordering,
+ children_nodes: children.into_iter().map(Self::new).collect(),
}
}
- pub fn children(&self) -> Vec<SortPushDown> {
- izip!(
- self.plan.children().into_iter(),
- self.adjusted_request_ordering.clone().into_iter(),
- )
- .map(|(child, from_parent)| {
- let child_request_ordering = child.required_input_ordering();
- SortPushDown {
- plan: child,
- required_ordering: from_parent,
- adjusted_request_ordering: child_request_ordering,
- }
- })
- .collect()
+ /// Assigns the ordering requirement of the root node to the its children.
+ pub fn assign_initial_requirements(&mut self) {
+ let reqs = self.plan.required_input_ordering();
+ for (child, requirement) in self.children_nodes.iter_mut().zip(reqs) {
+ child.required_ordering = requirement;
+ }
}
}
@@ -86,9 +75,8 @@ impl TreeNode for SortPushDown {
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
- let children = self.children();
- for child in children {
- match op(&child)? {
+ for child in &self.children_nodes {
+ match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
@@ -97,64 +85,64 @@ impl TreeNode for SortPushDown {
Ok(VisitRecursion::Continue)
}
-
fn map_children<F>(mut self, transform: F) -> Result<Self>
where
F: FnMut(Self) -> Result<Self>,
{
- let children = self.children();
- if !children.is_empty() {
- let children_plans = children
+ if !self.children_nodes.is_empty() {
+ self.children_nodes = self
+ .children_nodes
.into_iter()
.map(transform)
- .map(|r| r.map(|s| s.plan))
- .collect::<Result<Vec<_>>>()?;
-
- match with_new_children_if_necessary(self.plan, children_plans)? {
- Transformed::Yes(plan) | Transformed::No(plan) => {
- self.plan = plan;
- }
- }
- };
+ .collect::<Result<_>>()?;
+ self.plan = with_new_children_if_necessary(
+ self.plan,
+ self.children_nodes.iter().map(|c| c.plan.clone()).collect(),
+ )?
+ .into();
+ }
Ok(self)
}
}
pub(crate) fn pushdown_sorts(
- requirements: SortPushDown,
+ mut requirements: SortPushDown,
) -> Result<Transformed<SortPushDown>> {
let plan = &requirements.plan;
let parent_required =
requirements.required_ordering.as_deref().unwrap_or(&[]);
+
if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
- let new_plan = if !plan
+ if !plan
.equivalence_properties()
.ordering_satisfy_requirement(parent_required)
{
// If the current plan is a SortExec, modify it to satisfy parent
requirements:
let mut new_plan = sort_exec.input().clone();
add_sort_above(&mut new_plan, parent_required, sort_exec.fetch());
- new_plan
- } else {
- requirements.plan
+ requirements.plan = new_plan;
};
- let required_ordering = new_plan
+
+ let required_ordering = requirements
+ .plan
.output_ordering()
.map(PhysicalSortRequirement::from_sort_exprs)
.unwrap_or_default();
// Since new_plan is a SortExec, we can safely get the 0th index.
- let child = new_plan.children().swap_remove(0);
+ let mut child = requirements.children_nodes.swap_remove(0);
if let Some(adjusted) =
- pushdown_requirement_to_children(&child, &required_ordering)?
+ pushdown_requirement_to_children(&child.plan, &required_ordering)?
{
+ for (c, o) in child.children_nodes.iter_mut().zip(adjusted) {
+ c.required_ordering = o;
+ }
// Can push down requirements
- Ok(Transformed::Yes(SortPushDown {
- plan: child,
- required_ordering: None,
- adjusted_request_ordering: adjusted,
- }))
+ child.required_ordering = None;
+ Ok(Transformed::Yes(child))
} else {
// Can not push down requirements
- Ok(Transformed::Yes(SortPushDown::init(new_plan)))
+ let mut empty_node = SortPushDown::new(requirements.plan);
+ empty_node.assign_initial_requirements();
+ Ok(Transformed::Yes(empty_node))
}
} else {
// Executors other than SortExec
@@ -163,23 +151,27 @@ pub(crate) fn pushdown_sorts(
.ordering_satisfy_requirement(parent_required)
{
// Satisfies parent requirements, immediately return.
- return Ok(Transformed::Yes(SortPushDown {
- required_ordering: None,
- ..requirements
- }));
+ let reqs = requirements.plan.required_input_ordering();
+ for (child, order) in
requirements.children_nodes.iter_mut().zip(reqs) {
+ child.required_ordering = order;
+ }
+ return Ok(Transformed::Yes(requirements));
}
// Can not satisfy the parent requirements, check whether the
requirements can be pushed down:
if let Some(adjusted) = pushdown_requirement_to_children(plan,
parent_required)? {
- Ok(Transformed::Yes(SortPushDown {
- plan: requirements.plan,
- required_ordering: None,
- adjusted_request_ordering: adjusted,
- }))
+ for (c, o) in requirements.children_nodes.iter_mut().zip(adjusted)
{
+ c.required_ordering = o;
+ }
+ requirements.required_ordering = None;
+ Ok(Transformed::Yes(requirements))
} else {
// Can not push down requirements, add new SortExec:
let mut new_plan = requirements.plan;
add_sort_above(&mut new_plan, parent_required, None);
- Ok(Transformed::Yes(SortPushDown::init(new_plan)))
+ let mut new_empty = SortPushDown::new(new_plan);
+ new_empty.assign_initial_requirements();
+ // Can not push down requirements
+ Ok(Transformed::Yes(new_empty))
}
}
}
@@ -297,10 +289,11 @@ fn pushdown_requirement_to_children(
// TODO: Add support for Projection push down
}
-/// Determine the children requirements
-/// If the children requirements are more specific, do not push down the
parent requirements
-/// If the the parent requirements are more specific, push down the parent
requirements
-/// If they are not compatible, need to add Sort.
+/// Determine children requirements:
+/// - If children requirements are more specific, do not push down parent
+/// requirements.
+/// - If parent requirements are more specific, push down parent requirements.
+/// - If they are not compatible, need to add a sort.
fn determine_children_requirement(
parent_required: LexRequirementRef,
request_child: LexRequirementRef,
@@ -310,18 +303,15 @@ fn determine_children_requirement(
.equivalence_properties()
.requirements_compatible(request_child, parent_required)
{
- // request child requirements are more specific, no need to push down
the parent requirements
+ // Child requirements are more specific, no need to push down.
RequirementsCompatibility::Satisfy
} else if child_plan
.equivalence_properties()
.requirements_compatible(parent_required, request_child)
{
- // parent requirements are more specific, adjust the request child
requirements and push down the new requirements
- let adjusted = if parent_required.is_empty() {
- None
- } else {
- Some(parent_required.to_vec())
- };
+ // Parent requirements are more specific, adjust child's requirements
+ // and push down the new requirements:
+ let adjusted = (!parent_required.is_empty()).then(||
parent_required.to_vec());
RequirementsCompatibility::Compatible(adjusted)
} else {
RequirementsCompatibility::NonCompatible
diff --git a/datafusion/core/src/physical_optimizer/utils.rs
b/datafusion/core/src/physical_optimizer/utils.rs
index fccc1db0d3..f8063e9694 100644
--- a/datafusion/core/src/physical_optimizer/utils.rs
+++ b/datafusion/core/src/physical_optimizer/utils.rs
@@ -17,83 +17,18 @@
//! Collection of utility functions that are leveraged by the query optimizer
rules
-use std::fmt;
-use std::fmt::Formatter;
use std::sync::Arc;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
-use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort::SortExec;
use
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::union::UnionExec;
use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
-use crate::physical_plan::{get_plan_string, ExecutionPlan};
+use crate::physical_plan::ExecutionPlan;
use datafusion_physical_expr::{LexRequirementRef, PhysicalSortRequirement};
-
-/// This object implements a tree that we use while keeping track of paths
-/// leading to [`SortExec`]s.
-#[derive(Debug, Clone)]
-pub(crate) struct ExecTree {
- /// The `ExecutionPlan` associated with this node
- pub plan: Arc<dyn ExecutionPlan>,
- /// Child index of the plan in its parent
- pub idx: usize,
- /// Children of the plan that would need updating if we remove leaf
executors
- pub children: Vec<ExecTree>,
-}
-
-impl fmt::Display for ExecTree {
- fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
- let plan_string = get_plan_string(&self.plan);
- write!(f, "\nidx: {:?}", self.idx)?;
- write!(f, "\nplan: {:?}", plan_string)?;
- for child in self.children.iter() {
- write!(f, "\nexec_tree:{}", child)?;
- }
- writeln!(f)
- }
-}
-
-impl ExecTree {
- /// Create new Exec tree
- pub fn new(
- plan: Arc<dyn ExecutionPlan>,
- idx: usize,
- children: Vec<ExecTree>,
- ) -> Self {
- ExecTree {
- plan,
- idx,
- children,
- }
- }
-}
-
-/// Get `ExecTree` for each child of the plan if they are tracked.
-/// # Arguments
-///
-/// * `n_children` - Children count of the plan of interest
-/// * `onward` - Contains `Some(ExecTree)` of the plan tracked.
-/// - Contains `None` is plan is not tracked.
-///
-/// # Returns
-///
-/// A `Vec<Option<ExecTree>>` that contains tracking information of each child.
-/// If a child is `None`, it is not tracked. If `Some(ExecTree)` child is
tracked also.
-pub(crate) fn get_children_exectrees(
- n_children: usize,
- onward: &Option<ExecTree>,
-) -> Vec<Option<ExecTree>> {
- let mut children_onward = vec![None; n_children];
- if let Some(exec_tree) = &onward {
- for child in &exec_tree.children {
- children_onward[child.idx] = Some(child.clone());
- }
- }
- children_onward
-}
+use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
/// This utility function adds a `SortExec` above an operator according to the
/// given ordering requirements while preserving the original partitioning.
diff --git a/datafusion/physical-expr/src/sort_properties.rs
b/datafusion/physical-expr/src/sort_properties.rs
index f513744617..91238e5b04 100644
--- a/datafusion/physical-expr/src/sort_properties.rs
+++ b/datafusion/physical-expr/src/sort_properties.rs
@@ -151,7 +151,7 @@ impl Neg for SortProperties {
pub struct ExprOrdering {
pub expr: Arc<dyn PhysicalExpr>,
pub state: SortProperties,
- pub children: Vec<ExprOrdering>,
+ pub children: Vec<Self>,
}
impl ExprOrdering {
@@ -191,15 +191,13 @@ impl TreeNode for ExprOrdering {
where
F: FnMut(Self) -> Result<Self>,
{
- if self.children.is_empty() {
- Ok(self)
- } else {
+ if !self.children.is_empty() {
self.children = self
.children
.into_iter()
.map(transform)
- .collect::<Result<Vec<_>>>()?;
- Ok(self)
+ .collect::<Result<_>>()?;
}
+ Ok(self)
}
}
diff --git a/datafusion/physical-plan/src/union.rs
b/datafusion/physical-plan/src/union.rs
index 14ef9c2ec2..d01ea55074 100644
--- a/datafusion/physical-plan/src/union.rs
+++ b/datafusion/physical-plan/src/union.rs
@@ -21,6 +21,7 @@
//! The Union operator combines multiple inputs with the same schema
+use std::borrow::Borrow;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{any::Any, sync::Arc};
@@ -336,7 +337,7 @@ impl InterleaveExec {
pub fn try_new(inputs: Vec<Arc<dyn ExecutionPlan>>) -> Result<Self> {
let schema = union_schema(&inputs);
- if !can_interleave(&inputs) {
+ if !can_interleave(inputs.iter()) {
return internal_err!(
"Not all InterleaveExec children have a consistent hash
partitioning"
);
@@ -474,17 +475,18 @@ impl ExecutionPlan for InterleaveExec {
/// It might be too strict here in the case that the input partition specs are
compatible but not exactly the same.
/// For example one input partition has the partition spec Hash('a','b','c')
and
/// other has the partition spec Hash('a'), It is safe to derive the out
partition with the spec Hash('a','b','c').
-pub fn can_interleave(inputs: &[Arc<dyn ExecutionPlan>]) -> bool {
- if inputs.is_empty() {
+pub fn can_interleave<T: Borrow<Arc<dyn ExecutionPlan>>>(
+ mut inputs: impl Iterator<Item = T>,
+) -> bool {
+ let Some(first) = inputs.next() else {
return false;
- }
+ };
- let first_input_partition = inputs[0].output_partitioning();
- matches!(first_input_partition, Partitioning::Hash(_, _))
+ let reference = first.borrow().output_partitioning();
+ matches!(reference, Partitioning::Hash(_, _))
&& inputs
- .iter()
- .map(|plan| plan.output_partitioning())
- .all(|partition| partition == first_input_partition)
+ .map(|plan| plan.borrow().output_partitioning())
+ .all(|partition| partition == reference)
}
fn union_schema(inputs: &[Arc<dyn ExecutionPlan>]) -> SchemaRef {