This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 413eba124f [MINOR] Remove global sort rule from planner (#6965)
413eba124f is described below

commit 413eba124fed89b4189cba214c737b2b5e1de12a
Author: Mustafa Akur <[email protected]>
AuthorDate: Sat Jul 15 15:23:01 2023 +0300

    [MINOR] Remove global sort rule from planner (#6965)
    
    * Remove global sort rule from planner
    
    * Add fetch check
---
 .../physical_optimizer/global_sort_selection.rs    | 94 ----------------------
 datafusion/core/src/physical_optimizer/mod.rs      |  1 -
 .../core/src/physical_optimizer/optimizer.rs       |  7 --
 .../src/physical_optimizer/sort_enforcement.rs     | 24 +++---
 .../core/src/physical_optimizer/sort_pushdown.rs   |  6 +-
 datafusion/core/src/physical_optimizer/utils.rs    |  3 +-
 .../tests/sqllogictests/test_files/explain.slt     |  1 -
 7 files changed, 20 insertions(+), 116 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/global_sort_selection.rs 
b/datafusion/core/src/physical_optimizer/global_sort_selection.rs
deleted file mode 100644
index 0b9054f89f..0000000000
--- a/datafusion/core/src/physical_optimizer/global_sort_selection.rs
+++ /dev/null
@@ -1,94 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Select the efficient global sort implementation based on sort details.
-
-use std::sync::Arc;
-
-use crate::config::ConfigOptions;
-use crate::error::Result;
-use crate::physical_optimizer::PhysicalOptimizerRule;
-use crate::physical_plan::sorts::sort::SortExec;
-use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
-use crate::physical_plan::ExecutionPlan;
-use datafusion_common::tree_node::{Transformed, TreeNode};
-
-/// Currently for a sort operator, if
-/// - there are more than one input partitions
-/// - and there's some limit which can be pushed down to each of its input 
partitions
-/// then [SortPreservingMergeExec] with local sort with a limit pushed down 
will be preferred;
-/// Otherwise, the normal global sort [SortExec] will be used.
-/// Later more intelligent statistics-based decision can also be introduced.
-/// For example, for a small data set, the global sort may be efficient enough
-#[derive(Default)]
-pub struct GlobalSortSelection {}
-
-impl GlobalSortSelection {
-    #[allow(missing_docs)]
-    pub fn new() -> Self {
-        Self {}
-    }
-}
-
-impl PhysicalOptimizerRule for GlobalSortSelection {
-    fn optimize(
-        &self,
-        plan: Arc<dyn ExecutionPlan>,
-        config: &ConfigOptions,
-    ) -> Result<Arc<dyn ExecutionPlan>> {
-        plan.transform_up(&|plan| {
-            let transformed =
-                plan.as_any()
-                    .downcast_ref::<SortExec>()
-                    .and_then(|sort_exec| {
-                        if 
sort_exec.input().output_partitioning().partition_count() > 1
-                        // It's already preserving the partitioning so that it 
can be regarded as a local sort
-                        && !sort_exec.preserve_partitioning()
-                        && (sort_exec.fetch().is_some() ||  
config.optimizer.repartition_sorts)
-                    {
-                            let sort = SortExec::new(
-                                sort_exec.expr().to_vec(),
-                                sort_exec.input().clone()
-                            )
-                            .with_fetch(sort_exec.fetch())
-                            .with_preserve_partitioning(true);
-                            let global_sort: Arc<dyn ExecutionPlan> =
-                                Arc::new(SortPreservingMergeExec::new(
-                                    sort_exec.expr().to_vec(),
-                                    Arc::new(sort),
-                                ).with_fetch(sort_exec.fetch()));
-                            Some(global_sort)
-                        } else {
-                            None
-                        }
-                    });
-            Ok(if let Some(transformed) = transformed {
-                Transformed::Yes(transformed)
-            } else {
-                Transformed::No(plan)
-            })
-        })
-    }
-
-    fn name(&self) -> &str {
-        "global_sort_selection"
-    }
-
-    fn schema_check(&self) -> bool {
-        false
-    }
-}
diff --git a/datafusion/core/src/physical_optimizer/mod.rs 
b/datafusion/core/src/physical_optimizer/mod.rs
index cd61fac9de..8ee95ea663 100644
--- a/datafusion/core/src/physical_optimizer/mod.rs
+++ b/datafusion/core/src/physical_optimizer/mod.rs
@@ -25,7 +25,6 @@ pub mod aggregate_statistics;
 pub mod coalesce_batches;
 pub mod combine_partial_final_agg;
 pub mod dist_enforcement;
-pub mod global_sort_selection;
 pub mod join_selection;
 pub mod optimizer;
 pub mod pipeline_checker;
diff --git a/datafusion/core/src/physical_optimizer/optimizer.rs 
b/datafusion/core/src/physical_optimizer/optimizer.rs
index b1a2de253a..d35c82abd2 100644
--- a/datafusion/core/src/physical_optimizer/optimizer.rs
+++ b/datafusion/core/src/physical_optimizer/optimizer.rs
@@ -24,7 +24,6 @@ use 
crate::physical_optimizer::aggregate_statistics::AggregateStatistics;
 use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
 use 
crate::physical_optimizer::combine_partial_final_agg::CombinePartialFinalAggregate;
 use crate::physical_optimizer::dist_enforcement::EnforceDistribution;
-use crate::physical_optimizer::global_sort_selection::GlobalSortSelection;
 use crate::physical_optimizer::join_selection::JoinSelection;
 use crate::physical_optimizer::pipeline_checker::PipelineChecker;
 use crate::physical_optimizer::pipeline_fixer::PipelineFixer;
@@ -91,12 +90,6 @@ impl PhysicalOptimizer {
             //   introduce additional repartitioning while EnforceDistribution 
aims to
             //   reduce unnecessary repartitioning.
             Arc::new(Repartition::new()),
-            // - Currently it will depend on the partition number to decide 
whether to change the
-            // single node sort to parallel local sort and merge. Therefore, 
GlobalSortSelection
-            // should run after the Repartition.
-            // - Since it will change the output ordering of some operators, 
it should run
-            // before JoinSelection and EnforceSorting, which may depend on 
that.
-            Arc::new(GlobalSortSelection::new()),
             // The EnforceDistribution rule is for adding essential 
repartition to satisfy the required
             // distribution. Please make sure that the whole plan tree is 
determined before this rule.
             Arc::new(EnforceDistribution::new()),
diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs 
b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index 61d423165e..cdd3b39f0d 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -421,9 +421,10 @@ fn parallelize_sorts(
         // SortPreservingMergeExec cascade to parallelize sorting.
         let mut prev_layer = plan.clone();
         update_child_to_remove_coalesce(&mut prev_layer, &mut 
coalesce_onwards[0])?;
-        let sort_exprs = get_sort_exprs(&plan)?;
-        add_sort_above(&mut prev_layer, sort_exprs.to_vec())?;
-        let spm = SortPreservingMergeExec::new(sort_exprs.to_vec(), 
prev_layer);
+        let (sort_exprs, fetch) = get_sort_exprs(&plan)?;
+        add_sort_above(&mut prev_layer, sort_exprs.to_vec(), fetch)?;
+        let spm = SortPreservingMergeExec::new(sort_exprs.to_vec(), prev_layer)
+            .with_fetch(fetch);
         return Ok(Transformed::Yes(PlanWithCorrespondingCoalescePartitions {
             plan: Arc::new(spm),
             coalesce_onwards: vec![None],
@@ -480,7 +481,7 @@ fn ensure_sorting(
                     update_child_to_remove_unnecessary_sort(child, 
sort_onwards, &plan)?;
                     let sort_expr =
                         
PhysicalSortRequirement::to_sort_exprs(required_ordering);
-                    add_sort_above(child, sort_expr)?;
+                    add_sort_above(child, sort_expr, None)?;
                     if is_sort(child) {
                         *sort_onwards = Some(ExecTree::new(child.clone(), idx, 
vec![]));
                     } else {
@@ -491,7 +492,7 @@ fn ensure_sorting(
             (Some(required), None) => {
                 // Ordering requirement is not met, we should add a `SortExec` 
to the plan.
                 let sort_expr = 
PhysicalSortRequirement::to_sort_exprs(required);
-                add_sort_above(child, sort_expr)?;
+                add_sort_above(child, sort_expr, None)?;
                 *sort_onwards = Some(ExecTree::new(child.clone(), idx, 
vec![]));
             }
             (None, Some(_)) => {
@@ -672,7 +673,7 @@ fn analyze_window_sort_removal(
                     .swap_remove(0)
                     .unwrap_or(vec![]);
                 let sort_expr = PhysicalSortRequirement::to_sort_exprs(reqs);
-                add_sort_above(&mut new_child, sort_expr)?;
+                add_sort_above(&mut new_child, sort_expr, None)?;
             };
             Arc::new(WindowAggExec::try_new(
                 window_expr,
@@ -787,13 +788,18 @@ fn remove_corresponding_sort_from_sub_plan(
 }
 
 /// Converts an [ExecutionPlan] trait object to a [PhysicalSortExpr] slice 
when possible.
-fn get_sort_exprs(sort_any: &Arc<dyn ExecutionPlan>) -> 
Result<&[PhysicalSortExpr]> {
+fn get_sort_exprs(
+    sort_any: &Arc<dyn ExecutionPlan>,
+) -> Result<(&[PhysicalSortExpr], Option<usize>)> {
     if let Some(sort_exec) = sort_any.as_any().downcast_ref::<SortExec>() {
-        Ok(sort_exec.expr())
+        Ok((sort_exec.expr(), sort_exec.fetch()))
     } else if let Some(sort_preserving_merge_exec) =
         sort_any.as_any().downcast_ref::<SortPreservingMergeExec>()
     {
-        Ok(sort_preserving_merge_exec.expr())
+        Ok((
+            sort_preserving_merge_exec.expr(),
+            sort_preserving_merge_exec.fetch(),
+        ))
     } else {
         Err(DataFusionError::Plan(
             "Given ExecutionPlan is not a SortExec or a 
SortPreservingMergeExec"
diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs 
b/datafusion/core/src/physical_optimizer/sort_pushdown.rs
index 06260eb4e1..108f753800 100644
--- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs
@@ -136,7 +136,7 @@ pub(crate) fn pushdown_sorts(
                 parent_required.ok_or_else(err)?.iter().cloned(),
             );
             new_plan = sort_exec.input.clone();
-            add_sort_above(&mut new_plan, parent_required_expr)?;
+            add_sort_above(&mut new_plan, parent_required_expr, 
sort_exec.fetch())?;
         };
         let required_ordering = new_plan
             .output_ordering()
@@ -183,7 +183,7 @@ pub(crate) fn pushdown_sorts(
                 parent_required.ok_or_else(err)?.iter().cloned(),
             );
             let mut new_plan = plan.clone();
-            add_sort_above(&mut new_plan, parent_required_expr)?;
+            add_sort_above(&mut new_plan, parent_required_expr, None)?;
             Ok(Transformed::Yes(SortPushDown::init(new_plan)))
         }
     }
@@ -345,7 +345,7 @@ fn try_pushdown_requirements_to_join(
         }
         RequirementsCompatibility::NonCompatible => {
             // Can not push down, add new SortExec
-            add_sort_above(&mut plan.clone(), sort_expr)?;
+            add_sort_above(&mut plan.clone(), sort_expr, None)?;
             Ok(None)
         }
     }
diff --git a/datafusion/core/src/physical_optimizer/utils.rs 
b/datafusion/core/src/physical_optimizer/utils.rs
index 68efa06c3f..82cfe76b68 100644
--- a/datafusion/core/src/physical_optimizer/utils.rs
+++ b/datafusion/core/src/physical_optimizer/utils.rs
@@ -39,6 +39,7 @@ use datafusion_physical_expr::PhysicalSortExpr;
 pub fn add_sort_above(
     node: &mut Arc<dyn ExecutionPlan>,
     sort_expr: Vec<PhysicalSortExpr>,
+    fetch: Option<usize>,
 ) -> Result<()> {
     // If the ordering requirement is already satisfied, do not add a sort.
     if !ordering_satisfy(
@@ -47,7 +48,7 @@ pub fn add_sort_above(
         || node.equivalence_properties(),
         || node.ordering_equivalence_properties(),
     ) {
-        let new_sort = SortExec::new(sort_expr, node.clone());
+        let new_sort = SortExec::new(sort_expr, 
node.clone()).with_fetch(fetch);
 
         *node = Arc::new(if node.output_partitioning().partition_count() > 1 {
             new_sort.with_preserve_partitioning(true)
diff --git a/datafusion/core/tests/sqllogictests/test_files/explain.slt 
b/datafusion/core/tests/sqllogictests/test_files/explain.slt
index 1032b9c7f9..56f2fdf10a 100644
--- a/datafusion/core/tests/sqllogictests/test_files/explain.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/explain.slt
@@ -242,7 +242,6 @@ physical_plan after aggregate_statistics SAME TEXT AS ABOVE
 physical_plan after join_selection SAME TEXT AS ABOVE
 physical_plan after PipelineFixer SAME TEXT AS ABOVE
 physical_plan after repartition SAME TEXT AS ABOVE
-physical_plan after global_sort_selection SAME TEXT AS ABOVE
 physical_plan after EnforceDistribution SAME TEXT AS ABOVE
 physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE
 physical_plan after EnforceSorting SAME TEXT AS ABOVE

Reply via email to