This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 413eba124f [MINOR] Remove global sort rule from planner (#6965)
413eba124f is described below
commit 413eba124fed89b4189cba214c737b2b5e1de12a
Author: Mustafa Akur <[email protected]>
AuthorDate: Sat Jul 15 15:23:01 2023 +0300
[MINOR] Remove global sort rule from planner (#6965)
* Remove global sort rule from planner
* Add fetch check
---
.../physical_optimizer/global_sort_selection.rs | 94 ----------------------
datafusion/core/src/physical_optimizer/mod.rs | 1 -
.../core/src/physical_optimizer/optimizer.rs | 7 --
.../src/physical_optimizer/sort_enforcement.rs | 24 +++---
.../core/src/physical_optimizer/sort_pushdown.rs | 6 +-
datafusion/core/src/physical_optimizer/utils.rs | 3 +-
.../tests/sqllogictests/test_files/explain.slt | 1 -
7 files changed, 20 insertions(+), 116 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/global_sort_selection.rs
b/datafusion/core/src/physical_optimizer/global_sort_selection.rs
deleted file mode 100644
index 0b9054f89f..0000000000
--- a/datafusion/core/src/physical_optimizer/global_sort_selection.rs
+++ /dev/null
@@ -1,94 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Select the efficient global sort implementation based on sort details.
-
-use std::sync::Arc;
-
-use crate::config::ConfigOptions;
-use crate::error::Result;
-use crate::physical_optimizer::PhysicalOptimizerRule;
-use crate::physical_plan::sorts::sort::SortExec;
-use
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
-use crate::physical_plan::ExecutionPlan;
-use datafusion_common::tree_node::{Transformed, TreeNode};
-
-/// Currently for a sort operator, if
-/// - there are more than one input partitions
-/// - and there's some limit which can be pushed down to each of its input
partitions
-/// then [SortPreservingMergeExec] with local sort with a limit pushed down
will be preferred;
-/// Otherwise, the normal global sort [SortExec] will be used.
-/// Later more intelligent statistics-based decision can also be introduced.
-/// For example, for a small data set, the global sort may be efficient enough
-#[derive(Default)]
-pub struct GlobalSortSelection {}
-
-impl GlobalSortSelection {
- #[allow(missing_docs)]
- pub fn new() -> Self {
- Self {}
- }
-}
-
-impl PhysicalOptimizerRule for GlobalSortSelection {
- fn optimize(
- &self,
- plan: Arc<dyn ExecutionPlan>,
- config: &ConfigOptions,
- ) -> Result<Arc<dyn ExecutionPlan>> {
- plan.transform_up(&|plan| {
- let transformed =
- plan.as_any()
- .downcast_ref::<SortExec>()
- .and_then(|sort_exec| {
- if
sort_exec.input().output_partitioning().partition_count() > 1
- // It's already preserving the partitioning so that it
can be regarded as a local sort
- && !sort_exec.preserve_partitioning()
- && (sort_exec.fetch().is_some() ||
config.optimizer.repartition_sorts)
- {
- let sort = SortExec::new(
- sort_exec.expr().to_vec(),
- sort_exec.input().clone()
- )
- .with_fetch(sort_exec.fetch())
- .with_preserve_partitioning(true);
- let global_sort: Arc<dyn ExecutionPlan> =
- Arc::new(SortPreservingMergeExec::new(
- sort_exec.expr().to_vec(),
- Arc::new(sort),
- ).with_fetch(sort_exec.fetch()));
- Some(global_sort)
- } else {
- None
- }
- });
- Ok(if let Some(transformed) = transformed {
- Transformed::Yes(transformed)
- } else {
- Transformed::No(plan)
- })
- })
- }
-
- fn name(&self) -> &str {
- "global_sort_selection"
- }
-
- fn schema_check(&self) -> bool {
- false
- }
-}
diff --git a/datafusion/core/src/physical_optimizer/mod.rs
b/datafusion/core/src/physical_optimizer/mod.rs
index cd61fac9de..8ee95ea663 100644
--- a/datafusion/core/src/physical_optimizer/mod.rs
+++ b/datafusion/core/src/physical_optimizer/mod.rs
@@ -25,7 +25,6 @@ pub mod aggregate_statistics;
pub mod coalesce_batches;
pub mod combine_partial_final_agg;
pub mod dist_enforcement;
-pub mod global_sort_selection;
pub mod join_selection;
pub mod optimizer;
pub mod pipeline_checker;
diff --git a/datafusion/core/src/physical_optimizer/optimizer.rs
b/datafusion/core/src/physical_optimizer/optimizer.rs
index b1a2de253a..d35c82abd2 100644
--- a/datafusion/core/src/physical_optimizer/optimizer.rs
+++ b/datafusion/core/src/physical_optimizer/optimizer.rs
@@ -24,7 +24,6 @@ use
crate::physical_optimizer::aggregate_statistics::AggregateStatistics;
use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
use
crate::physical_optimizer::combine_partial_final_agg::CombinePartialFinalAggregate;
use crate::physical_optimizer::dist_enforcement::EnforceDistribution;
-use crate::physical_optimizer::global_sort_selection::GlobalSortSelection;
use crate::physical_optimizer::join_selection::JoinSelection;
use crate::physical_optimizer::pipeline_checker::PipelineChecker;
use crate::physical_optimizer::pipeline_fixer::PipelineFixer;
@@ -91,12 +90,6 @@ impl PhysicalOptimizer {
// introduce additional repartitioning while EnforceDistribution
aims to
// reduce unnecessary repartitioning.
Arc::new(Repartition::new()),
- // - Currently it will depend on the partition number to decide
whether to change the
- // single node sort to parallel local sort and merge. Therefore,
GlobalSortSelection
- // should run after the Repartition.
- // - Since it will change the output ordering of some operators,
it should run
- // before JoinSelection and EnforceSorting, which may depend on
that.
- Arc::new(GlobalSortSelection::new()),
// The EnforceDistribution rule is for adding essential
repartition to satisfy the required
// distribution. Please make sure that the whole plan tree is
determined before this rule.
Arc::new(EnforceDistribution::new()),
diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index 61d423165e..cdd3b39f0d 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -421,9 +421,10 @@ fn parallelize_sorts(
// SortPreservingMergeExec cascade to parallelize sorting.
let mut prev_layer = plan.clone();
update_child_to_remove_coalesce(&mut prev_layer, &mut
coalesce_onwards[0])?;
- let sort_exprs = get_sort_exprs(&plan)?;
- add_sort_above(&mut prev_layer, sort_exprs.to_vec())?;
- let spm = SortPreservingMergeExec::new(sort_exprs.to_vec(),
prev_layer);
+ let (sort_exprs, fetch) = get_sort_exprs(&plan)?;
+ add_sort_above(&mut prev_layer, sort_exprs.to_vec(), fetch)?;
+ let spm = SortPreservingMergeExec::new(sort_exprs.to_vec(), prev_layer)
+ .with_fetch(fetch);
return Ok(Transformed::Yes(PlanWithCorrespondingCoalescePartitions {
plan: Arc::new(spm),
coalesce_onwards: vec![None],
@@ -480,7 +481,7 @@ fn ensure_sorting(
update_child_to_remove_unnecessary_sort(child,
sort_onwards, &plan)?;
let sort_expr =
PhysicalSortRequirement::to_sort_exprs(required_ordering);
- add_sort_above(child, sort_expr)?;
+ add_sort_above(child, sort_expr, None)?;
if is_sort(child) {
*sort_onwards = Some(ExecTree::new(child.clone(), idx,
vec![]));
} else {
@@ -491,7 +492,7 @@ fn ensure_sorting(
(Some(required), None) => {
// Ordering requirement is not met, we should add a `SortExec`
to the plan.
let sort_expr =
PhysicalSortRequirement::to_sort_exprs(required);
- add_sort_above(child, sort_expr)?;
+ add_sort_above(child, sort_expr, None)?;
*sort_onwards = Some(ExecTree::new(child.clone(), idx,
vec![]));
}
(None, Some(_)) => {
@@ -672,7 +673,7 @@ fn analyze_window_sort_removal(
.swap_remove(0)
.unwrap_or(vec![]);
let sort_expr = PhysicalSortRequirement::to_sort_exprs(reqs);
- add_sort_above(&mut new_child, sort_expr)?;
+ add_sort_above(&mut new_child, sort_expr, None)?;
};
Arc::new(WindowAggExec::try_new(
window_expr,
@@ -787,13 +788,18 @@ fn remove_corresponding_sort_from_sub_plan(
}
/// Converts an [ExecutionPlan] trait object to a [PhysicalSortExpr] slice
when possible.
-fn get_sort_exprs(sort_any: &Arc<dyn ExecutionPlan>) ->
Result<&[PhysicalSortExpr]> {
+fn get_sort_exprs(
+ sort_any: &Arc<dyn ExecutionPlan>,
+) -> Result<(&[PhysicalSortExpr], Option<usize>)> {
if let Some(sort_exec) = sort_any.as_any().downcast_ref::<SortExec>() {
- Ok(sort_exec.expr())
+ Ok((sort_exec.expr(), sort_exec.fetch()))
} else if let Some(sort_preserving_merge_exec) =
sort_any.as_any().downcast_ref::<SortPreservingMergeExec>()
{
- Ok(sort_preserving_merge_exec.expr())
+ Ok((
+ sort_preserving_merge_exec.expr(),
+ sort_preserving_merge_exec.fetch(),
+ ))
} else {
Err(DataFusionError::Plan(
"Given ExecutionPlan is not a SortExec or a
SortPreservingMergeExec"
diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs
b/datafusion/core/src/physical_optimizer/sort_pushdown.rs
index 06260eb4e1..108f753800 100644
--- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs
@@ -136,7 +136,7 @@ pub(crate) fn pushdown_sorts(
parent_required.ok_or_else(err)?.iter().cloned(),
);
new_plan = sort_exec.input.clone();
- add_sort_above(&mut new_plan, parent_required_expr)?;
+ add_sort_above(&mut new_plan, parent_required_expr,
sort_exec.fetch())?;
};
let required_ordering = new_plan
.output_ordering()
@@ -183,7 +183,7 @@ pub(crate) fn pushdown_sorts(
parent_required.ok_or_else(err)?.iter().cloned(),
);
let mut new_plan = plan.clone();
- add_sort_above(&mut new_plan, parent_required_expr)?;
+ add_sort_above(&mut new_plan, parent_required_expr, None)?;
Ok(Transformed::Yes(SortPushDown::init(new_plan)))
}
}
@@ -345,7 +345,7 @@ fn try_pushdown_requirements_to_join(
}
RequirementsCompatibility::NonCompatible => {
// Can not push down, add new SortExec
- add_sort_above(&mut plan.clone(), sort_expr)?;
+ add_sort_above(&mut plan.clone(), sort_expr, None)?;
Ok(None)
}
}
diff --git a/datafusion/core/src/physical_optimizer/utils.rs
b/datafusion/core/src/physical_optimizer/utils.rs
index 68efa06c3f..82cfe76b68 100644
--- a/datafusion/core/src/physical_optimizer/utils.rs
+++ b/datafusion/core/src/physical_optimizer/utils.rs
@@ -39,6 +39,7 @@ use datafusion_physical_expr::PhysicalSortExpr;
pub fn add_sort_above(
node: &mut Arc<dyn ExecutionPlan>,
sort_expr: Vec<PhysicalSortExpr>,
+ fetch: Option<usize>,
) -> Result<()> {
// If the ordering requirement is already satisfied, do not add a sort.
if !ordering_satisfy(
@@ -47,7 +48,7 @@ pub fn add_sort_above(
|| node.equivalence_properties(),
|| node.ordering_equivalence_properties(),
) {
- let new_sort = SortExec::new(sort_expr, node.clone());
+ let new_sort = SortExec::new(sort_expr,
node.clone()).with_fetch(fetch);
*node = Arc::new(if node.output_partitioning().partition_count() > 1 {
new_sort.with_preserve_partitioning(true)
diff --git a/datafusion/core/tests/sqllogictests/test_files/explain.slt
b/datafusion/core/tests/sqllogictests/test_files/explain.slt
index 1032b9c7f9..56f2fdf10a 100644
--- a/datafusion/core/tests/sqllogictests/test_files/explain.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/explain.slt
@@ -242,7 +242,6 @@ physical_plan after aggregate_statistics SAME TEXT AS ABOVE
physical_plan after join_selection SAME TEXT AS ABOVE
physical_plan after PipelineFixer SAME TEXT AS ABOVE
physical_plan after repartition SAME TEXT AS ABOVE
-physical_plan after global_sort_selection SAME TEXT AS ABOVE
physical_plan after EnforceDistribution SAME TEXT AS ABOVE
physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE
physical_plan after EnforceSorting SAME TEXT AS ABOVE