This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 95658877d Feature/sort enforcement refactor (#5228)
95658877d is described below

commit 95658877dc8693a82d5046f64a5219595b64f9e3
Author: Mustafa Akur <[email protected]>
AuthorDate: Mon Feb 13 22:02:52 2023 +0300

    Feature/sort enforcement refactor (#5228)
    
    * Remove multilayer chain Ordering comparison for sort parallelize rule
    
    * Update tree code
    
    * Simplify if condition
    
    * Update test
    
    * Simplify sort insertion utility to avoid clones
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
 .../src/physical_optimizer/sort_enforcement.rs     | 178 +++++++++++----------
 datafusion/core/src/physical_optimizer/utils.rs    |  34 ++--
 2 files changed, 111 insertions(+), 101 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs 
b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index a59f35fc1..8dfa4199b 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -35,7 +35,7 @@
 //! by another SortExec. Therefore, this rule removes it from the physical 
plan.
 use crate::config::ConfigOptions;
 use crate::error::Result;
-use crate::physical_optimizer::utils::add_sort_above_child;
+use crate::physical_optimizer::utils::add_sort_above;
 use crate::physical_optimizer::PhysicalOptimizerRule;
 use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
 use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
@@ -69,15 +69,28 @@ impl EnforceSorting {
 /// leading to `SortExec`s.
 #[derive(Debug, Clone)]
 struct ExecTree {
+    /// The `ExecutionPlan` associated with this node
+    pub plan: Arc<dyn ExecutionPlan>,
     /// Child index of the plan in its parent
     pub idx: usize,
     /// Children of the plan that would need updating if we remove leaf 
executors
     pub children: Vec<ExecTree>,
-    /// The `ExecutionPlan` associated with this node
-    pub plan: Arc<dyn ExecutionPlan>,
 }
 
 impl ExecTree {
+    /// Create new Exec tree
+    pub fn new(
+        plan: Arc<dyn ExecutionPlan>,
+        idx: usize,
+        children: Vec<ExecTree>,
+    ) -> Self {
+        ExecTree {
+            plan,
+            idx,
+            children,
+        }
+    }
+
     /// This function returns the executors at the leaves of the tree.
     fn get_leaves(&self) -> Vec<Arc<dyn ExecutionPlan>> {
         if self.children.is_empty() {
@@ -127,11 +140,7 @@ impl PlanWithCorrespondingSort {
                 // that maintain this ordering. If we just saw a order imposing
                 // operator, we reset the tree and start accumulating.
                 if is_sort(plan) {
-                    return Some(ExecTree {
-                        idx,
-                        plan: item.plan,
-                        children: vec![],
-                    });
+                    return Some(ExecTree::new(item.plan, idx, vec![]));
                 } else if is_limit(plan) {
                     // There is no sort linkage for this path, it starts at a 
limit.
                     return None;
@@ -159,11 +168,7 @@ impl PlanWithCorrespondingSort {
                 if !children.is_empty() {
                     // Add parent node to the tree if there is at least one
                     // child with a subtree:
-                    Some(ExecTree {
-                        idx,
-                        plan: item.plan,
-                        children,
-                    })
+                    Some(ExecTree::new(item.plan, idx, children))
                 } else {
                     // There is no sort linkage for this child, do nothing.
                     None
@@ -242,11 +247,7 @@ impl PlanWithCorrespondingCoalescePartitions {
                 // operator, we reset the tree and start accumulating.
                 let plan = item.plan;
                 if plan.as_any().is::<CoalescePartitionsExec>() {
-                    Some(ExecTree {
-                        idx,
-                        plan,
-                        children: vec![],
-                    })
+                    Some(ExecTree::new(plan, idx, vec![]))
                 } else if plan.children().is_empty() {
                     // Plan has no children, there is nothing to propagate.
                     None
@@ -267,11 +268,7 @@ impl PlanWithCorrespondingCoalescePartitions {
                     if children.is_empty() {
                         None
                     } else {
-                        Some(ExecTree {
-                            idx,
-                            plan,
-                            children,
-                        })
+                        Some(ExecTree::new(plan, idx, children))
                     }
                 }
             })
@@ -365,19 +362,21 @@ fn parallelize_sorts(
     let mut coalesce_onwards = requirements.coalesce_onwards;
     // We know that `plan` has children, so `coalesce_onwards` is non-empty.
     if coalesce_onwards[0].is_some() {
-        if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
+        if (is_sort(&plan) || is_sort_preserving_merge(&plan))
+            // Make sure that Sort is actually global sort
+            && plan.output_partitioning().partition_count() == 1
+        {
             // If there is a connection between a `CoalescePartitionsExec` and 
a
-            // `SortExec` that satisfy the requirements (i.e. they don't 
require a
-            // single partition), then we can replace the 
`CoalescePartitionsExec`
-            // + `SortExec` cascade with a `SortExec` + 
`SortPreservingMergeExec`
+            // Global Sort that satisfy the requirements (i.e. intermediate
+            // executors  don't require single partition), then we can
+            // replace the `CoalescePartitionsExec`+ GlobalSort cascade with
+            // the `SortExec` + `SortPreservingMergeExec`
             // cascade to parallelize sorting.
             let mut prev_layer = plan.clone();
-            update_child_to_change_coalesce(
-                &mut prev_layer,
-                &mut coalesce_onwards[0],
-                Some(sort_exec),
-            )?;
-            let spm = SortPreservingMergeExec::new(sort_exec.expr().to_vec(), 
prev_layer);
+            update_child_to_remove_coalesce(&mut prev_layer, &mut 
coalesce_onwards[0])?;
+            let sort_exprs = get_sort_exprs(&plan)?;
+            add_sort_above(&mut prev_layer, sort_exprs.to_vec())?;
+            let spm = SortPreservingMergeExec::new(sort_exprs.to_vec(), 
prev_layer);
             return Ok(Some(PlanWithCorrespondingCoalescePartitions {
                 plan: Arc::new(spm),
                 coalesce_onwards: vec![None],
@@ -385,11 +384,7 @@ fn parallelize_sorts(
         } else if plan.as_any().is::<CoalescePartitionsExec>() {
             // There is an unnecessary `CoalescePartitionExec` in the plan.
             let mut prev_layer = plan.clone();
-            update_child_to_change_coalesce(
-                &mut prev_layer,
-                &mut coalesce_onwards[0],
-                None,
-            )?;
+            update_child_to_remove_coalesce(&mut prev_layer, &mut 
coalesce_onwards[0])?;
             let new_plan = plan.with_new_children(vec![prev_layer])?;
             return Ok(Some(PlanWithCorrespondingCoalescePartitions {
                 plan: new_plan,
@@ -453,12 +448,8 @@ fn ensure_sorting(
                     // Make sure we preserve the ordering requirements:
                     update_child_to_remove_unnecessary_sort(child, 
sort_onwards, &plan)?;
                     let sort_expr = required_ordering.to_vec();
-                    *child = add_sort_above_child(child, sort_expr)?;
-                    *sort_onwards = Some(ExecTree {
-                        idx,
-                        plan: child.clone(),
-                        children: vec![],
-                    })
+                    add_sort_above(child, sort_expr)?;
+                    *sort_onwards = Some(ExecTree::new(child.clone(), idx, 
vec![]));
                 }
                 if let Some(tree) = sort_onwards {
                     // For window expressions, we can remove some sorts when 
we can
@@ -474,12 +465,8 @@ fn ensure_sorting(
             }
             (Some(required), None) => {
                 // Ordering requirement is not met, we should add a `SortExec` 
to the plan.
-                *child = add_sort_above_child(child, required.to_vec())?;
-                *sort_onwards = Some(ExecTree {
-                    idx,
-                    plan: child.clone(),
-                    children: vec![],
-                })
+                add_sort_above(child, required.to_vec())?;
+                *sort_onwards = Some(ExecTree::new(child.clone(), idx, 
vec![]));
             }
             (None, Some(_)) => {
                 // We have a `SortExec` whose effect may be neutralized by
@@ -532,11 +519,11 @@ fn analyze_immediate_sort_removal(
                             sort_exec.expr().to_vec(),
                             sort_input,
                         ));
-                    let new_tree = ExecTree {
-                        idx: 0,
-                        plan: new_plan.clone(),
-                        children: sort_onwards.iter().flat_map(|e| 
e.clone()).collect(),
-                    };
+                    let new_tree = ExecTree::new(
+                        new_plan.clone(),
+                        0,
+                        sort_onwards.iter().flat_map(|e| e.clone()).collect(),
+                    );
                     PlanWithCorrespondingSort {
                         plan: new_plan,
                         sort_onwards: vec![Some(new_tree)],
@@ -662,21 +649,19 @@ fn analyze_window_sort_removal(
 }
 
 /// Updates child to remove the unnecessary `CoalescePartitions` below it.
-fn update_child_to_change_coalesce(
+fn update_child_to_remove_coalesce(
     child: &mut Arc<dyn ExecutionPlan>,
     coalesce_onwards: &mut Option<ExecTree>,
-    sort_exec: Option<&SortExec>,
 ) -> Result<()> {
     if let Some(coalesce_onwards) = coalesce_onwards {
-        *child = change_corresponding_coalesce_in_sub_plan(coalesce_onwards, 
sort_exec)?;
+        *child = remove_corresponding_coalesce_in_sub_plan(coalesce_onwards)?;
     }
     Ok(())
 }
 
 /// Removes the `CoalescePartitions` from the plan in `coalesce_onwards`.
-fn change_corresponding_coalesce_in_sub_plan(
+fn remove_corresponding_coalesce_in_sub_plan(
     coalesce_onwards: &mut ExecTree,
-    sort_exec: Option<&SortExec>,
 ) -> Result<Arc<dyn ExecutionPlan>> {
     Ok(
         if coalesce_onwards
@@ -685,24 +670,12 @@ fn change_corresponding_coalesce_in_sub_plan(
             .is::<CoalescePartitionsExec>()
         {
             // We can safely use the 0th index since we have a 
`CoalescePartitionsExec`.
-            let coalesce_input = coalesce_onwards.plan.children()[0].clone();
-            if let Some(sort_exec) = sort_exec {
-                let sort_expr = sort_exec.expr();
-                if !ordering_satisfy(
-                    coalesce_input.output_ordering(),
-                    Some(sort_expr),
-                    || coalesce_input.equivalence_properties(),
-                ) {
-                    return add_sort_above_child(&coalesce_input, 
sort_expr.to_vec());
-                }
-            }
-            coalesce_input
+            coalesce_onwards.plan.children()[0].clone()
         } else {
             let plan = coalesce_onwards.plan.clone();
             let mut children = plan.children();
             for item in &mut coalesce_onwards.children {
-                children[item.idx] =
-                    change_corresponding_coalesce_in_sub_plan(item, 
sort_exec)?;
+                children[item.idx] = 
remove_corresponding_coalesce_in_sub_plan(item)?;
             }
             plan.with_new_children(children)?
         },
@@ -783,15 +756,11 @@ fn change_finer_sort_in_sub_plan(
     let plan = &sort_onwards.plan;
     // A `SortExec` is always at the bottom of the tree.
     if is_sort(plan) {
-        let prev_layer = plan.children()[0].clone();
+        let mut prev_layer = plan.children()[0].clone();
         let new_sort_expr = get_sort_exprs(plan)?[0..n_sort_expr].to_vec();
-        let updated_plan = add_sort_above_child(&prev_layer, new_sort_expr)?;
-        *sort_onwards = ExecTree {
-            idx: sort_onwards.idx,
-            children: vec![],
-            plan: updated_plan.clone(),
-        };
-        Ok(updated_plan)
+        add_sort_above(&mut prev_layer, new_sort_expr)?;
+        *sort_onwards = ExecTree::new(prev_layer.clone(), sort_onwards.idx, 
vec![]);
+        Ok(prev_layer)
     } else {
         let mut children = plan.children();
         for item in &mut sort_onwards.children {
@@ -1722,8 +1691,8 @@ mod tests {
         ];
         let expected_optimized = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC]",
-            "  FilterExec: NOT non_nullable_col@1",
-            "    SortExec: [nullable_col@0 ASC]",
+            "  SortExec: [nullable_col@0 ASC]",
+            "    FilterExec: NOT non_nullable_col@1",
             "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
             "        ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
         ];
@@ -1780,6 +1749,47 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_coalesce_propagate() -> Result<()> {
+        let schema = create_test_schema()?;
+        let source = memory_exec(&schema);
+        let repartition = repartition_exec(source);
+        let coalesce_partitions = 
Arc::new(CoalescePartitionsExec::new(repartition));
+        let repartition = repartition_exec(coalesce_partitions);
+        let sort_exprs = vec![sort_expr("nullable_col", &schema)];
+        // Add local sort
+        let sort = Arc::new(SortExec::new_with_partitioning(
+            sort_exprs.clone(),
+            repartition,
+            true,
+            None,
+        )) as _;
+        let spm = sort_preserving_merge_exec(sort_exprs.clone(), sort);
+        let sort = sort_exec(sort_exprs, spm);
+
+        let physical_plan = sort.clone();
+        // Sort Parallelize rule should end Coalesce + Sort linkage when Sort 
is Global Sort
+        // Also input plan is not valid as it is. We need to add SortExec 
before SortPreservingMergeExec.
+        let expected_input = vec![
+            "SortExec: [nullable_col@0 ASC]",
+            "  SortPreservingMergeExec: [nullable_col@0 ASC]",
+            "    SortExec: [nullable_col@0 ASC]",
+            "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+            "        CoalescePartitionsExec",
+            "          RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=0",
+            "            MemoryExec: partitions=0, partition_sizes=[]",
+        ];
+        let expected_optimized = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC]",
+            "  SortExec: [nullable_col@0 ASC]",
+            "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=10",
+            "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=0",
+            "        MemoryExec: partitions=0, partition_sizes=[]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
     /// make PhysicalSortExpr with default options
     fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr {
         sort_expr_options(name, schema, SortOptions::default())
diff --git a/datafusion/core/src/physical_optimizer/utils.rs 
b/datafusion/core/src/physical_optimizer/utils.rs
index 13e04bbc2..b6666fbef 100644
--- a/datafusion/core/src/physical_optimizer/utils.rs
+++ b/datafusion/core/src/physical_optimizer/utils.rs
@@ -23,6 +23,7 @@ use crate::config::ConfigOptions;
 use crate::error::Result;
 use crate::physical_plan::sorts::sort::SortExec;
 use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
+use datafusion_physical_expr::utils::ordering_satisfy;
 use datafusion_physical_expr::PhysicalSortExpr;
 use std::sync::Arc;
 
@@ -48,22 +49,21 @@ pub fn optimize_children(
     }
 }
 
-/// Util function to add SortExec above child
-/// preserving the original partitioning
-pub fn add_sort_above_child(
-    child: &Arc<dyn ExecutionPlan>,
+/// This utility function adds a `SortExec` above an operator according to the
+/// given ordering requirements while preserving the original partitioning.
+pub fn add_sort_above(
+    node: &mut Arc<dyn ExecutionPlan>,
     sort_expr: Vec<PhysicalSortExpr>,
-) -> Result<Arc<dyn ExecutionPlan>> {
-    let new_child = if child.output_partitioning().partition_count() > 1 {
-        Arc::new(SortExec::new_with_partitioning(
-            sort_expr,
-            child.clone(),
-            true,
-            None,
-        )) as Arc<dyn ExecutionPlan>
-    } else {
-        Arc::new(SortExec::try_new(sort_expr, child.clone(), None)?)
-            as Arc<dyn ExecutionPlan>
-    };
-    Ok(new_child)
+) -> Result<()> {
+    // If the ordering requirement is already satisfied, do not add a sort.
+    if !ordering_satisfy(node.output_ordering(), Some(&sort_expr), || {
+        node.equivalence_properties()
+    }) {
+        *node = Arc::new(if node.output_partitioning().partition_count() > 1 {
+            SortExec::new_with_partitioning(sort_expr, node.clone(), true, 
None)
+        } else {
+            SortExec::try_new(sort_expr, node.clone(), None)?
+        }) as _
+    }
+    Ok(())
 }

Reply via email to