This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 3a9e0d0e6 Execute sort in parallel when a limit is used after sort
(#3527)
3a9e0d0e6 is described below
commit 3a9e0d0e676836f9a9e23280a1b19b111024bf5d
Author: Daniƫl Heres <[email protected]>
AuthorDate: Mon Sep 19 14:48:01 2022 +0200
Execute sort in parallel when a limit is used after sort (#3527)
* Parallel sort
* Move it to optimization rule
* Add rule
* Improve rule
* Remove bench
* Fix doc
* Fix indent
---
datafusion/core/src/execution/context.rs | 3 +
datafusion/core/src/physical_optimizer/mod.rs | 1 +
.../core/src/physical_optimizer/parallel_sort.rs | 92 ++++++++++++++++++++++
datafusion/core/src/physical_plan/planner.rs | 2 +-
datafusion/core/src/physical_plan/sorts/sort.rs | 25 ++----
datafusion/core/tests/sql/explain_analyze.rs | 4 +-
6 files changed, 107 insertions(+), 20 deletions(-)
diff --git a/datafusion/core/src/execution/context.rs
b/datafusion/core/src/execution/context.rs
index 947a2adb0..492782f53 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -40,6 +40,7 @@ use crate::{
physical_optimizer::{
aggregate_statistics::AggregateStatistics,
hash_build_probe_order::HashBuildProbeOrder,
optimizer::PhysicalOptimizerRule,
+ parallel_sort::ParallelSort,
},
};
pub use datafusion_physical_expr::execution_props::ExecutionProps;
@@ -1469,6 +1470,8 @@ impl SessionState {
.unwrap(),
)));
}
+ physical_optimizers.push(Arc::new(ParallelSort::new()));
+
physical_optimizers.push(Arc::new(Repartition::new()));
physical_optimizers.push(Arc::new(AddCoalescePartitionsExec::new()));
diff --git a/datafusion/core/src/physical_optimizer/mod.rs
b/datafusion/core/src/physical_optimizer/mod.rs
index 55550bcd2..82b7087ab 100644
--- a/datafusion/core/src/physical_optimizer/mod.rs
+++ b/datafusion/core/src/physical_optimizer/mod.rs
@@ -23,6 +23,7 @@ pub mod coalesce_batches;
pub mod hash_build_probe_order;
pub mod merge_exec;
pub mod optimizer;
+pub mod parallel_sort;
pub mod pruning;
pub mod repartition;
mod utils;
diff --git a/datafusion/core/src/physical_optimizer/parallel_sort.rs
b/datafusion/core/src/physical_optimizer/parallel_sort.rs
new file mode 100644
index 000000000..3361d8155
--- /dev/null
+++ b/datafusion/core/src/physical_optimizer/parallel_sort.rs
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Parralel sort parallelizes sorts if a limit is present after a sort
(`ORDER BY LIMIT N`)
+use crate::{
+ error::Result,
+ physical_optimizer::PhysicalOptimizerRule,
+ physical_plan::{
+ limit::GlobalLimitExec,
+ sorts::{sort::SortExec,
sort_preserving_merge::SortPreservingMergeExec},
+ with_new_children_if_necessary,
+ },
+};
+use std::sync::Arc;
+
+/// Optimizer rule that makes sort parallel if a limit is used after sort
(`ORDER BY LIMIT N`)
+/// The plan will use `SortPreservingMergeExec` to merge the results
+#[derive(Default)]
+pub struct ParallelSort {}
+
+impl ParallelSort {
+ #[allow(missing_docs)]
+ pub fn new() -> Self {
+ Self {}
+ }
+}
+impl PhysicalOptimizerRule for ParallelSort {
+ fn optimize(
+ &self,
+ plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
+ config: &crate::execution::context::SessionConfig,
+ ) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
+ if plan.children().is_empty() {
+ // leaf node, children cannot be replaced
+ Ok(plan.clone())
+ } else {
+ // recurse down first
+ let children = plan
+ .children()
+ .iter()
+ .map(|child| self.optimize(child.clone(), config))
+ .collect::<Result<Vec<_>>>()?;
+ let plan = with_new_children_if_necessary(plan, children)?;
+ let children = plan.children();
+ let plan_any = plan.as_any();
+ // GlobalLimitExec (SortExec preserve_partitioning=False)
+ // -> GlobalLimitExec (SortExec preserve_partitioning=True)
+ let parallel_sort =
plan_any.downcast_ref::<GlobalLimitExec>().is_some()
+ && children.len() == 1
+ && children[0].as_any().downcast_ref::<SortExec>().is_some()
+ && !children[0]
+ .as_any()
+ .downcast_ref::<SortExec>()
+ .unwrap()
+ .preserve_partitioning();
+
+ Ok(if parallel_sort {
+ let sort =
children[0].as_any().downcast_ref::<SortExec>().unwrap();
+ let new_sort = SortExec::new_with_partitioning(
+ sort.expr().to_vec(),
+ sort.input().clone(),
+ true,
+ );
+ let merge = SortPreservingMergeExec::new(
+ sort.expr().to_vec(),
+ Arc::new(new_sort),
+ );
+ with_new_children_if_necessary(plan, vec![Arc::new(merge)])?
+ } else {
+ plan.clone()
+ })
+ }
+ }
+
+ fn name(&self) -> &str {
+ "parallel_sort"
+ }
+}
diff --git a/datafusion/core/src/physical_plan/planner.rs
b/datafusion/core/src/physical_plan/planner.rs
index c87590059..20a819622 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -841,7 +841,7 @@ impl DefaultPhysicalPlanner {
)),
})
.collect::<Result<Vec<_>>>()?;
- Ok(Arc::new(SortExec::try_new(sort_expr, physical_input)?)
)
+ Ok(Arc::new(SortExec::try_new(sort_expr, physical_input)?))
}
LogicalPlan::Join(Join {
left,
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs
b/datafusion/core/src/physical_plan/sorts/sort.rs
index 64312327b..cc0501785 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -668,6 +668,11 @@ impl SortExec {
Ok(Self::new_with_partitioning(expr, input, false))
}
+ /// Whether this `SortExec` preserves partitioning of the children
+ pub fn preserve_partitioning(&self) -> bool {
+ self.preserve_partitioning
+ }
+
/// Create a new sort execution plan with the option to preserve
/// the partitioning of the input plan
pub fn new_with_partitioning(
@@ -741,10 +746,11 @@ impl ExecutionPlan for SortExec {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- Ok(Arc::new(SortExec::try_new(
+ Ok(Arc::new(SortExec::new_with_partitioning(
self.expr.clone(),
children[0].clone(),
- )?))
+ self.preserve_partitioning,
+ )))
}
fn execute(
@@ -753,21 +759,6 @@ impl ExecutionPlan for SortExec {
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
debug!("Start SortExec::execute for partition {} of context session_id
{} and task_id {:?}", partition, context.session_id(), context.task_id());
- if !self.preserve_partitioning {
- if 0 != partition {
- return Err(DataFusionError::Internal(format!(
- "SortExec invalid partition {}",
- partition
- )));
- }
-
- // sort needs to operate on a single partition currently
- if 1 != self.input.output_partitioning().partition_count() {
- return Err(DataFusionError::Internal(
- "SortExec requires a single input partition".to_owned(),
- ));
- }
- }
debug!(
"Start invoking SortExec's input.execute for partition: {}",
diff --git a/datafusion/core/tests/sql/explain_analyze.rs
b/datafusion/core/tests/sql/explain_analyze.rs
index 7f465c4c6..a75e0e3fa 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -686,8 +686,8 @@ async fn test_physical_plan_display_indent() {
let physical_plan = ctx.create_physical_plan(&plan).await.unwrap();
let expected = vec![
"GlobalLimitExec: skip=0, fetch=10",
- " SortExec: [the_min@2 DESC]",
- " CoalescePartitionsExec",
+ " SortPreservingMergeExec: [the_min@2 DESC]",
+ " SortExec: [the_min@2 DESC]",
" ProjectionExec: expr=[c1@0 as c1, MAX(aggregate_test_100.c12)@1
as MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)@2 as the_min]",
" AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1],
aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]",
" CoalesceBatchesExec: target_batch_size=4096",