This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 95658877d Feature/sort enforcement refactor (#5228)
95658877d is described below
commit 95658877dc8693a82d5046f64a5219595b64f9e3
Author: Mustafa Akur <[email protected]>
AuthorDate: Mon Feb 13 22:02:52 2023 +0300
Feature/sort enforcement refactor (#5228)
* Remove multilayer chain Ordering comparison for sort parallelize rule
* Update tree code
* Simplify if condition
* Update test
* Simplify sort insertion utility to avoid clones
---------
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
.../src/physical_optimizer/sort_enforcement.rs | 178 +++++++++++----------
datafusion/core/src/physical_optimizer/utils.rs | 34 ++--
2 files changed, 111 insertions(+), 101 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index a59f35fc1..8dfa4199b 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -35,7 +35,7 @@
//! by another SortExec. Therefore, this rule removes it from the physical
plan.
use crate::config::ConfigOptions;
use crate::error::Result;
-use crate::physical_optimizer::utils::add_sort_above_child;
+use crate::physical_optimizer::utils::add_sort_above;
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
@@ -69,15 +69,28 @@ impl EnforceSorting {
/// leading to `SortExec`s.
#[derive(Debug, Clone)]
struct ExecTree {
+ /// The `ExecutionPlan` associated with this node
+ pub plan: Arc<dyn ExecutionPlan>,
/// Child index of the plan in its parent
pub idx: usize,
/// Children of the plan that would need updating if we remove leaf
executors
pub children: Vec<ExecTree>,
- /// The `ExecutionPlan` associated with this node
- pub plan: Arc<dyn ExecutionPlan>,
}
impl ExecTree {
+ /// Create new Exec tree
+ pub fn new(
+ plan: Arc<dyn ExecutionPlan>,
+ idx: usize,
+ children: Vec<ExecTree>,
+ ) -> Self {
+ ExecTree {
+ plan,
+ idx,
+ children,
+ }
+ }
+
/// This function returns the executors at the leaves of the tree.
fn get_leaves(&self) -> Vec<Arc<dyn ExecutionPlan>> {
if self.children.is_empty() {
@@ -127,11 +140,7 @@ impl PlanWithCorrespondingSort {
// that maintain this ordering. If we just saw a order imposing
// operator, we reset the tree and start accumulating.
if is_sort(plan) {
- return Some(ExecTree {
- idx,
- plan: item.plan,
- children: vec![],
- });
+ return Some(ExecTree::new(item.plan, idx, vec![]));
} else if is_limit(plan) {
// There is no sort linkage for this path, it starts at a
limit.
return None;
@@ -159,11 +168,7 @@ impl PlanWithCorrespondingSort {
if !children.is_empty() {
// Add parent node to the tree if there is at least one
// child with a subtree:
- Some(ExecTree {
- idx,
- plan: item.plan,
- children,
- })
+ Some(ExecTree::new(item.plan, idx, children))
} else {
// There is no sort linkage for this child, do nothing.
None
@@ -242,11 +247,7 @@ impl PlanWithCorrespondingCoalescePartitions {
// operator, we reset the tree and start accumulating.
let plan = item.plan;
if plan.as_any().is::<CoalescePartitionsExec>() {
- Some(ExecTree {
- idx,
- plan,
- children: vec![],
- })
+ Some(ExecTree::new(plan, idx, vec![]))
} else if plan.children().is_empty() {
// Plan has no children, there is nothing to propagate.
None
@@ -267,11 +268,7 @@ impl PlanWithCorrespondingCoalescePartitions {
if children.is_empty() {
None
} else {
- Some(ExecTree {
- idx,
- plan,
- children,
- })
+ Some(ExecTree::new(plan, idx, children))
}
}
})
@@ -365,19 +362,21 @@ fn parallelize_sorts(
let mut coalesce_onwards = requirements.coalesce_onwards;
// We know that `plan` has children, so `coalesce_onwards` is non-empty.
if coalesce_onwards[0].is_some() {
- if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
+ if (is_sort(&plan) || is_sort_preserving_merge(&plan))
+ // Make sure that Sort is actually global sort
+ && plan.output_partitioning().partition_count() == 1
+ {
// If there is a connection between a `CoalescePartitionsExec` and
a
- // `SortExec` that satisfy the requirements (i.e. they don't
require a
- // single partition), then we can replace the
`CoalescePartitionsExec`
- // + `SortExec` cascade with a `SortExec` +
`SortPreservingMergeExec`
+ // Global Sort that satisfy the requirements (i.e. intermediate
+ // executors don't require single partition), then we can
+ // replace the `CoalescePartitionsExec`+ GlobalSort cascade with
+ // the `SortExec` + `SortPreservingMergeExec`
// cascade to parallelize sorting.
let mut prev_layer = plan.clone();
- update_child_to_change_coalesce(
- &mut prev_layer,
- &mut coalesce_onwards[0],
- Some(sort_exec),
- )?;
- let spm = SortPreservingMergeExec::new(sort_exec.expr().to_vec(),
prev_layer);
+ update_child_to_remove_coalesce(&mut prev_layer, &mut
coalesce_onwards[0])?;
+ let sort_exprs = get_sort_exprs(&plan)?;
+ add_sort_above(&mut prev_layer, sort_exprs.to_vec())?;
+ let spm = SortPreservingMergeExec::new(sort_exprs.to_vec(),
prev_layer);
return Ok(Some(PlanWithCorrespondingCoalescePartitions {
plan: Arc::new(spm),
coalesce_onwards: vec![None],
@@ -385,11 +384,7 @@ fn parallelize_sorts(
} else if plan.as_any().is::<CoalescePartitionsExec>() {
// There is an unnecessary `CoalescePartitionExec` in the plan.
let mut prev_layer = plan.clone();
- update_child_to_change_coalesce(
- &mut prev_layer,
- &mut coalesce_onwards[0],
- None,
- )?;
+ update_child_to_remove_coalesce(&mut prev_layer, &mut
coalesce_onwards[0])?;
let new_plan = plan.with_new_children(vec![prev_layer])?;
return Ok(Some(PlanWithCorrespondingCoalescePartitions {
plan: new_plan,
@@ -453,12 +448,8 @@ fn ensure_sorting(
// Make sure we preserve the ordering requirements:
update_child_to_remove_unnecessary_sort(child,
sort_onwards, &plan)?;
let sort_expr = required_ordering.to_vec();
- *child = add_sort_above_child(child, sort_expr)?;
- *sort_onwards = Some(ExecTree {
- idx,
- plan: child.clone(),
- children: vec![],
- })
+ add_sort_above(child, sort_expr)?;
+ *sort_onwards = Some(ExecTree::new(child.clone(), idx,
vec![]));
}
if let Some(tree) = sort_onwards {
// For window expressions, we can remove some sorts when
we can
@@ -474,12 +465,8 @@ fn ensure_sorting(
}
(Some(required), None) => {
// Ordering requirement is not met, we should add a `SortExec`
to the plan.
- *child = add_sort_above_child(child, required.to_vec())?;
- *sort_onwards = Some(ExecTree {
- idx,
- plan: child.clone(),
- children: vec![],
- })
+ add_sort_above(child, required.to_vec())?;
+ *sort_onwards = Some(ExecTree::new(child.clone(), idx,
vec![]));
}
(None, Some(_)) => {
// We have a `SortExec` whose effect may be neutralized by
@@ -532,11 +519,11 @@ fn analyze_immediate_sort_removal(
sort_exec.expr().to_vec(),
sort_input,
));
- let new_tree = ExecTree {
- idx: 0,
- plan: new_plan.clone(),
- children: sort_onwards.iter().flat_map(|e|
e.clone()).collect(),
- };
+ let new_tree = ExecTree::new(
+ new_plan.clone(),
+ 0,
+ sort_onwards.iter().flat_map(|e| e.clone()).collect(),
+ );
PlanWithCorrespondingSort {
plan: new_plan,
sort_onwards: vec![Some(new_tree)],
@@ -662,21 +649,19 @@ fn analyze_window_sort_removal(
}
/// Updates child to remove the unnecessary `CoalescePartitions` below it.
-fn update_child_to_change_coalesce(
+fn update_child_to_remove_coalesce(
child: &mut Arc<dyn ExecutionPlan>,
coalesce_onwards: &mut Option<ExecTree>,
- sort_exec: Option<&SortExec>,
) -> Result<()> {
if let Some(coalesce_onwards) = coalesce_onwards {
- *child = change_corresponding_coalesce_in_sub_plan(coalesce_onwards,
sort_exec)?;
+ *child = remove_corresponding_coalesce_in_sub_plan(coalesce_onwards)?;
}
Ok(())
}
/// Removes the `CoalescePartitions` from the plan in `coalesce_onwards`.
-fn change_corresponding_coalesce_in_sub_plan(
+fn remove_corresponding_coalesce_in_sub_plan(
coalesce_onwards: &mut ExecTree,
- sort_exec: Option<&SortExec>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(
if coalesce_onwards
@@ -685,24 +670,12 @@ fn change_corresponding_coalesce_in_sub_plan(
.is::<CoalescePartitionsExec>()
{
// We can safely use the 0th index since we have a
`CoalescePartitionsExec`.
- let coalesce_input = coalesce_onwards.plan.children()[0].clone();
- if let Some(sort_exec) = sort_exec {
- let sort_expr = sort_exec.expr();
- if !ordering_satisfy(
- coalesce_input.output_ordering(),
- Some(sort_expr),
- || coalesce_input.equivalence_properties(),
- ) {
- return add_sort_above_child(&coalesce_input,
sort_expr.to_vec());
- }
- }
- coalesce_input
+ coalesce_onwards.plan.children()[0].clone()
} else {
let plan = coalesce_onwards.plan.clone();
let mut children = plan.children();
for item in &mut coalesce_onwards.children {
- children[item.idx] =
- change_corresponding_coalesce_in_sub_plan(item,
sort_exec)?;
+ children[item.idx] =
remove_corresponding_coalesce_in_sub_plan(item)?;
}
plan.with_new_children(children)?
},
@@ -783,15 +756,11 @@ fn change_finer_sort_in_sub_plan(
let plan = &sort_onwards.plan;
// A `SortExec` is always at the bottom of the tree.
if is_sort(plan) {
- let prev_layer = plan.children()[0].clone();
+ let mut prev_layer = plan.children()[0].clone();
let new_sort_expr = get_sort_exprs(plan)?[0..n_sort_expr].to_vec();
- let updated_plan = add_sort_above_child(&prev_layer, new_sort_expr)?;
- *sort_onwards = ExecTree {
- idx: sort_onwards.idx,
- children: vec![],
- plan: updated_plan.clone(),
- };
- Ok(updated_plan)
+ add_sort_above(&mut prev_layer, new_sort_expr)?;
+ *sort_onwards = ExecTree::new(prev_layer.clone(), sort_onwards.idx,
vec![]);
+ Ok(prev_layer)
} else {
let mut children = plan.children();
for item in &mut sort_onwards.children {
@@ -1722,8 +1691,8 @@ mod tests {
];
let expected_optimized = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
- " FilterExec: NOT non_nullable_col@1",
- " SortExec: [nullable_col@0 ASC]",
+ " SortExec: [nullable_col@0 ASC]",
+ " FilterExec: NOT non_nullable_col@1",
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
];
@@ -1780,6 +1749,47 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn test_coalesce_propagate() -> Result<()> {
+ let schema = create_test_schema()?;
+ let source = memory_exec(&schema);
+ let repartition = repartition_exec(source);
+ let coalesce_partitions =
Arc::new(CoalescePartitionsExec::new(repartition));
+ let repartition = repartition_exec(coalesce_partitions);
+ let sort_exprs = vec![sort_expr("nullable_col", &schema)];
+ // Add local sort
+ let sort = Arc::new(SortExec::new_with_partitioning(
+ sort_exprs.clone(),
+ repartition,
+ true,
+ None,
+ )) as _;
+ let spm = sort_preserving_merge_exec(sort_exprs.clone(), sort);
+ let sort = sort_exec(sort_exprs, spm);
+
+ let physical_plan = sort.clone();
+ // Sort Parallelize rule should end Coalesce + Sort linkage when Sort
is Global Sort
+ // Also input plan is not valid as it is. We need to add SortExec
before SortPreservingMergeExec.
+ let expected_input = vec![
+ "SortExec: [nullable_col@0 ASC]",
+ " SortPreservingMergeExec: [nullable_col@0 ASC]",
+ " SortExec: [nullable_col@0 ASC]",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " CoalescePartitionsExec",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=0",
+ " MemoryExec: partitions=0, partition_sizes=[]",
+ ];
+ let expected_optimized = vec![
+ "SortPreservingMergeExec: [nullable_col@0 ASC]",
+ " SortExec: [nullable_col@0 ASC]",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=10",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=0",
+ " MemoryExec: partitions=0, partition_sizes=[]",
+ ];
+ assert_optimized!(expected_input, expected_optimized, physical_plan);
+ Ok(())
+ }
+
/// make PhysicalSortExpr with default options
fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr {
sort_expr_options(name, schema, SortOptions::default())
diff --git a/datafusion/core/src/physical_optimizer/utils.rs
b/datafusion/core/src/physical_optimizer/utils.rs
index 13e04bbc2..b6666fbef 100644
--- a/datafusion/core/src/physical_optimizer/utils.rs
+++ b/datafusion/core/src/physical_optimizer/utils.rs
@@ -23,6 +23,7 @@ use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
+use datafusion_physical_expr::utils::ordering_satisfy;
use datafusion_physical_expr::PhysicalSortExpr;
use std::sync::Arc;
@@ -48,22 +49,21 @@ pub fn optimize_children(
}
}
-/// Util function to add SortExec above child
-/// preserving the original partitioning
-pub fn add_sort_above_child(
- child: &Arc<dyn ExecutionPlan>,
+/// This utility function adds a `SortExec` above an operator according to the
+/// given ordering requirements while preserving the original partitioning.
+pub fn add_sort_above(
+ node: &mut Arc<dyn ExecutionPlan>,
sort_expr: Vec<PhysicalSortExpr>,
-) -> Result<Arc<dyn ExecutionPlan>> {
- let new_child = if child.output_partitioning().partition_count() > 1 {
- Arc::new(SortExec::new_with_partitioning(
- sort_expr,
- child.clone(),
- true,
- None,
- )) as Arc<dyn ExecutionPlan>
- } else {
- Arc::new(SortExec::try_new(sort_expr, child.clone(), None)?)
- as Arc<dyn ExecutionPlan>
- };
- Ok(new_child)
+) -> Result<()> {
+ // If the ordering requirement is already satisfied, do not add a sort.
+ if !ordering_satisfy(node.output_ordering(), Some(&sort_expr), || {
+ node.equivalence_properties()
+ }) {
+ *node = Arc::new(if node.output_partitioning().partition_count() > 1 {
+ SortExec::new_with_partitioning(sort_expr, node.clone(), true,
None)
+ } else {
+ SortExec::try_new(sort_expr, node.clone(), None)?
+ }) as _
+ }
+ Ok(())
}