This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a9e0d0e6 Execute sort in parallel when a limit is used after sort 
(#3527)
3a9e0d0e6 is described below

commit 3a9e0d0e676836f9a9e23280a1b19b111024bf5d
Author: DaniĆ«l Heres <[email protected]>
AuthorDate: Mon Sep 19 14:48:01 2022 +0200

    Execute sort in parallel when a limit is used after sort (#3527)
    
    * Parallel sort
    
    * Move it to optimization rule
    
    * Add rule
    
    * Improve rule
    
    * Remove bench
    
    * Fix doc
    
    * Fix indent
---
 datafusion/core/src/execution/context.rs           |  3 +
 datafusion/core/src/physical_optimizer/mod.rs      |  1 +
 .../core/src/physical_optimizer/parallel_sort.rs   | 92 ++++++++++++++++++++++
 datafusion/core/src/physical_plan/planner.rs       |  2 +-
 datafusion/core/src/physical_plan/sorts/sort.rs    | 25 ++----
 datafusion/core/tests/sql/explain_analyze.rs       |  4 +-
 6 files changed, 107 insertions(+), 20 deletions(-)

diff --git a/datafusion/core/src/execution/context.rs 
b/datafusion/core/src/execution/context.rs
index 947a2adb0..492782f53 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -40,6 +40,7 @@ use crate::{
     physical_optimizer::{
         aggregate_statistics::AggregateStatistics,
         hash_build_probe_order::HashBuildProbeOrder, 
optimizer::PhysicalOptimizerRule,
+        parallel_sort::ParallelSort,
     },
 };
 pub use datafusion_physical_expr::execution_props::ExecutionProps;
@@ -1469,6 +1470,8 @@ impl SessionState {
                     .unwrap(),
             )));
         }
+        physical_optimizers.push(Arc::new(ParallelSort::new()));
+
         physical_optimizers.push(Arc::new(Repartition::new()));
         physical_optimizers.push(Arc::new(AddCoalescePartitionsExec::new()));
 
diff --git a/datafusion/core/src/physical_optimizer/mod.rs 
b/datafusion/core/src/physical_optimizer/mod.rs
index 55550bcd2..82b7087ab 100644
--- a/datafusion/core/src/physical_optimizer/mod.rs
+++ b/datafusion/core/src/physical_optimizer/mod.rs
@@ -23,6 +23,7 @@ pub mod coalesce_batches;
 pub mod hash_build_probe_order;
 pub mod merge_exec;
 pub mod optimizer;
+pub mod parallel_sort;
 pub mod pruning;
 pub mod repartition;
 mod utils;
diff --git a/datafusion/core/src/physical_optimizer/parallel_sort.rs 
b/datafusion/core/src/physical_optimizer/parallel_sort.rs
new file mode 100644
index 000000000..3361d8155
--- /dev/null
+++ b/datafusion/core/src/physical_optimizer/parallel_sort.rs
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Parralel sort parallelizes sorts if a limit is present after a sort 
(`ORDER BY LIMIT N`)
+use crate::{
+    error::Result,
+    physical_optimizer::PhysicalOptimizerRule,
+    physical_plan::{
+        limit::GlobalLimitExec,
+        sorts::{sort::SortExec, 
sort_preserving_merge::SortPreservingMergeExec},
+        with_new_children_if_necessary,
+    },
+};
+use std::sync::Arc;
+
+/// Optimizer rule that makes sort parallel if a limit is used after sort 
(`ORDER BY LIMIT N`)
+/// The plan will use `SortPreservingMergeExec` to merge the results
+#[derive(Default)]
+pub struct ParallelSort {}
+
+impl ParallelSort {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+impl PhysicalOptimizerRule for ParallelSort {
+    fn optimize(
+        &self,
+        plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
+        config: &crate::execution::context::SessionConfig,
+    ) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
+        if plan.children().is_empty() {
+            // leaf node, children cannot be replaced
+            Ok(plan.clone())
+        } else {
+            // recurse down first
+            let children = plan
+                .children()
+                .iter()
+                .map(|child| self.optimize(child.clone(), config))
+                .collect::<Result<Vec<_>>>()?;
+            let plan = with_new_children_if_necessary(plan, children)?;
+            let children = plan.children();
+            let plan_any = plan.as_any();
+            // GlobalLimitExec (SortExec preserve_partitioning=False)
+            // -> GlobalLimitExec (SortExec preserve_partitioning=True)
+            let parallel_sort = 
plan_any.downcast_ref::<GlobalLimitExec>().is_some()
+                && children.len() == 1
+                && children[0].as_any().downcast_ref::<SortExec>().is_some()
+                && !children[0]
+                    .as_any()
+                    .downcast_ref::<SortExec>()
+                    .unwrap()
+                    .preserve_partitioning();
+
+            Ok(if parallel_sort {
+                let sort = 
children[0].as_any().downcast_ref::<SortExec>().unwrap();
+                let new_sort = SortExec::new_with_partitioning(
+                    sort.expr().to_vec(),
+                    sort.input().clone(),
+                    true,
+                );
+                let merge = SortPreservingMergeExec::new(
+                    sort.expr().to_vec(),
+                    Arc::new(new_sort),
+                );
+                with_new_children_if_necessary(plan, vec![Arc::new(merge)])?
+            } else {
+                plan.clone()
+            })
+        }
+    }
+
+    fn name(&self) -> &str {
+        "parallel_sort"
+    }
+}
diff --git a/datafusion/core/src/physical_plan/planner.rs 
b/datafusion/core/src/physical_plan/planner.rs
index c87590059..20a819622 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -841,7 +841,7 @@ impl DefaultPhysicalPlanner {
                             )),
                         })
                         .collect::<Result<Vec<_>>>()?;
-                    Ok(Arc::new(SortExec::try_new(sort_expr, physical_input)?) 
)
+                    Ok(Arc::new(SortExec::try_new(sort_expr, physical_input)?))
                 }
                 LogicalPlan::Join(Join {
                     left,
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs 
b/datafusion/core/src/physical_plan/sorts/sort.rs
index 64312327b..cc0501785 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -668,6 +668,11 @@ impl SortExec {
         Ok(Self::new_with_partitioning(expr, input, false))
     }
 
+    /// Whether this `SortExec` preserves partitioning of the children
+    pub fn preserve_partitioning(&self) -> bool {
+        self.preserve_partitioning
+    }
+
     /// Create a new sort execution plan with the option to preserve
     /// the partitioning of the input plan
     pub fn new_with_partitioning(
@@ -741,10 +746,11 @@ impl ExecutionPlan for SortExec {
         self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        Ok(Arc::new(SortExec::try_new(
+        Ok(Arc::new(SortExec::new_with_partitioning(
             self.expr.clone(),
             children[0].clone(),
-        )?))
+            self.preserve_partitioning,
+        )))
     }
 
     fn execute(
@@ -753,21 +759,6 @@ impl ExecutionPlan for SortExec {
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
         debug!("Start SortExec::execute for partition {} of context session_id 
{} and task_id {:?}", partition, context.session_id(), context.task_id());
-        if !self.preserve_partitioning {
-            if 0 != partition {
-                return Err(DataFusionError::Internal(format!(
-                    "SortExec invalid partition {}",
-                    partition
-                )));
-            }
-
-            // sort needs to operate on a single partition currently
-            if 1 != self.input.output_partitioning().partition_count() {
-                return Err(DataFusionError::Internal(
-                    "SortExec requires a single input partition".to_owned(),
-                ));
-            }
-        }
 
         debug!(
             "Start invoking SortExec's input.execute for partition: {}",
diff --git a/datafusion/core/tests/sql/explain_analyze.rs 
b/datafusion/core/tests/sql/explain_analyze.rs
index 7f465c4c6..a75e0e3fa 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -686,8 +686,8 @@ async fn test_physical_plan_display_indent() {
     let physical_plan = ctx.create_physical_plan(&plan).await.unwrap();
     let expected = vec![
         "GlobalLimitExec: skip=0, fetch=10",
-        "  SortExec: [the_min@2 DESC]",
-        "    CoalescePartitionsExec",
+        "  SortPreservingMergeExec: [the_min@2 DESC]",
+        "    SortExec: [the_min@2 DESC]",
         "      ProjectionExec: expr=[c1@0 as c1, MAX(aggregate_test_100.c12)@1 
as MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)@2 as the_min]",
         "        AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], 
aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]",
         "          CoalesceBatchesExec: target_batch_size=4096",

Reply via email to